Skip to main content

aurora_db/
db.rs

1//! # Aurora Core Engine
2//!
3//! This module contains the `Aurora` struct, which is the primary handle for 
4//! interacting with the database. It coordinates the tiered storage system, 
5//! index management, transaction lifecycle, and AQL execution.
6//!
7//! ## Key Responsibilities
8//! - **Collection Management**: Creating and deleting collections with defined schemas.
9//! - **CRUD Operations**: High-performance insertion, retrieval, update, and deletion of documents.
10//! - **Transaction Coordination**: Managing ACID-compliant transaction buffers and commits.
11//! - **Index Coordination**: Maintaining primary and secondary indices (including Roaring Bitmaps).
12//! - **PubSub Integration**: Publishing change events for real-time listeners.
13//!
14//! ## Thread Safety
15//! `Aurora` is designed to be wrapped in an `Arc` and shared across multiple 
16//! threads or async tasks. Internally, it uses highly concurrent data structures 
17//! (like `DashMap`) and fine-grained locking to ensure safe parallel access.
18
19use crate::error::{AqlError, ErrorCode, Result};
20use crate::index::{Index, IndexDefinition, IndexType};
21use crate::network::http_models::{
22    Filter as HttpFilter, FilterOperator, QueryPayload, json_to_value,
23};
24use crate::parser;
25use crate::parser::executor::{ExecutionOptions, ExecutionPlan, ExecutionResult};
26use crate::query::FilterBuilder;
27use crate::query::{Filter, QueryBuilder, SearchBuilder, SimpleQueryBuilder};
28use crate::storage::{ColdStore, HotStore, IndexStorage, WriteBuffer};
29use crate::types::{
30    AuroraConfig, Collection, Document, DurabilityMode, FieldDefinition, FieldType, Value,
31};
32use crate::wal::{Operation, WriteAheadLog};
33use roaring::RoaringBitmap;
34
35use dashmap::DashMap;
36use moka::sync::Cache as MokaCache;
37use serde_json::Value as JsonValue;
38use serde_json::from_str;
39use std::collections::HashMap;
40use std::fmt;
41use std::fs::File as StdFile;
42use std::path::{Path, PathBuf};
43use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
44use std::sync::{Arc, RwLock};
45use std::time::Duration;
46use tokio::fs::File;
47use tokio::fs::read_to_string;
48use tokio::io::AsyncReadExt;
49use tokio::sync::OnceCell;
50use uuid::Uuid;
51
52// Disk location metadata for primary index
53// Instead of storing full Vec<u8> values, we store minimal metadata
54#[derive(Debug, Clone, Copy)]
55pub(crate) struct DiskLocation {
56    size: u32, // Size in bytes (useful for statistics)
57}
58
59impl DiskLocation {
60    fn new(size: usize) -> Self {
61        Self { size: size as u32 }
62    }
63}
64
65// Index types for faster lookups
66type PrimaryIndex = DashMap<String, DiskLocation>;
67/// Managed secondary index using adaptive storage modes (Single -> List -> Bitmap)
68type SecondaryIndex = DashMap<String, Arc<RwLock<IndexStorage>>>;
69
70// Move DataInfo enum outside impl block
71#[derive(Debug)]
72pub enum DataInfo {
73    Data { size: usize, preview: String },
74    Blob { size: usize },
75    Compressed { size: usize },
76}
77
78/// Trait to convert field tuples into FieldDefinition
79/// Supports both 3-tuple (defaults nullable to false) and 4-tuple (explicit nullable)
80pub trait IntoFieldDefinition {
81    fn into_field_definition(self) -> (String, FieldDefinition);
82}
83
84// 3-tuple: (field_name, field_type, unique) - defaults nullable to false
85impl<S: Into<String>> IntoFieldDefinition for (S, FieldType, bool) {
86    fn into_field_definition(self) -> (String, FieldDefinition) {
87        let (name, field_type, unique) = self;
88        (
89            name.into(),
90            FieldDefinition {
91                field_type,
92                unique,
93                indexed: unique,
94                nullable: false, // Default to false
95                validations: vec![],
96                ..Default::default()
97            },
98        )
99    }
100}
101
102// 4-tuple: (field_name, field_type, unique, nullable) - explicit control
103impl<S: Into<String>> IntoFieldDefinition for (S, FieldType, bool, bool) {
104    fn into_field_definition(self) -> (String, FieldDefinition) {
105        let (name, field_type, unique, nullable) = self;
106        (
107            name.into(),
108            FieldDefinition {
109                field_type,
110                unique,
111                indexed: unique,
112                nullable,
113                validations: vec![],
114                ..Default::default()
115            },
116        )
117    }
118}
119
120// Direct FieldDefinition: (field_name, field_definition)
121impl<S: Into<String>> IntoFieldDefinition for (S, FieldDefinition) {
122    fn into_field_definition(self) -> (String, FieldDefinition) {
123        (self.0.into(), self.1)
124    }
125}
126
127#[derive(Debug)]
128#[allow(dead_code)]
129enum WalOperation {
130    Put {
131        key: Arc<String>,
132        value: Arc<Vec<u8>>,
133    },
134    Delete {
135        key: String,
136    },
137}
138
139impl DataInfo {
140    pub fn size(&self) -> usize {
141        match self {
142            DataInfo::Data { size, .. } => *size,
143            DataInfo::Blob { size } => *size,
144            DataInfo::Compressed { size } => *size,
145        }
146    }
147}
148
149/// The main database engine
150pub struct Aurora {
151    pub(crate) hot: HotStore,
152    pub(crate) cold: Arc<ColdStore>,
153    pub(crate) primary_indices: Arc<DashMap<String, PrimaryIndex>>,
154    pub(crate) secondary_indices: Arc<DashMap<String, SecondaryIndex>>,
155    pub(crate) sys_id_mapping: sled::Tree,
156    pub(crate) mmap_index: Arc<RwLock<Option<memmap2::Mmap>>>,
157    pub(crate) index_manifest: Arc<DashMap<String, (usize, usize)>>,
158    /// Mapping from external u128 ID to internal u32 ID (Ticket 1)
159    _sid_dictionary: Arc<crossbeam_skiplist::SkipMap<u128, u32>>,
160    /// Mapping from internal u32 ID back to external u128 ID (O(1) lookup)
161    reverse_sid_dictionary: Arc<RwLock<Vec<u128>>>,
162    /// Bitset of deleted internal IDs available for recycling (Ticket 3)
163    deleted_ids: Arc<RwLock<RoaringBitmap>>,
164    /// IDs that have been cleared from all persistent bitmaps and are safe to reuse
165    recyclable_ids: Arc<RwLock<RoaringBitmap>>,
166    next_internal_id: Arc<AtomicU32>,
167    indices_initialized: Arc<OnceCell<()>>,
168    pub(crate) transaction_manager: crate::transaction::TransactionManager,
169    indices: Arc<DashMap<String, Index>>,
170    schema_cache: Arc<DashMap<String, Arc<Collection>>>,
171    config: AuroraConfig,
172    write_buffer: Option<Arc<WriteBuffer>>,
173    pub pubsub: crate::pubsub::PubSubSystem,
174    // Write-ahead log for durability
175    wal: Option<Arc<RwLock<WriteAheadLog>>>,
176    // Background task shutdown senders
177    checkpoint_shutdown: Option<tokio::sync::mpsc::UnboundedSender<()>>,
178    compaction_shutdown: Option<tokio::sync::mpsc::UnboundedSender<()>>,
179    wal_shutdown: Option<tokio::sync::mpsc::UnboundedSender<()>>,
180    // Worker system for background jobs
181    pub workers: Option<Arc<crate::workers::WorkerSystem>>,
182    // Asynchronous WAL Writer Channel
183    wal_writer: Option<tokio::sync::mpsc::UnboundedSender<WalOperation>>,
184    /// Set to true when the WAL writer task hits a terminal error. put() checks
185    /// this before sending so it skips the closed channel and degrades gracefully.
186    wal_disabled: Arc<AtomicBool>,
187    // Computed fields manager
188    pub computed: Arc<RwLock<crate::computed::ComputedFields>>,
189    /// LRU cache for parsed AQL AST
190    ast_cache: Arc<MokaCache<u64, crate::parser::ast::Document>>,
191    /// LRU cache for query plans
192    pub plan_cache: Arc<MokaCache<u64, Arc<crate::parser::executor::QueryPlan>>>,
193    /// Set of "collection:field" index keys currently being rebuilt in the background.
194    /// Queries hitting a building index fall back to full scan instead of returning
195    /// incorrect empty results from a half-populated index.
196    pub(crate) building_indices: Arc<DashMap<String, ()>>,
197}
198
199impl fmt::Debug for Aurora {
200    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
201        f.debug_struct("Aurora")
202            .field("hot", &"HotStore")
203            .field("cold", &"ColdStore")
204            .field("primary_indices_count", &self.primary_indices.len())
205            .field("secondary_indices_count", &self.secondary_indices.len())
206            .field(
207                "active_transactions",
208                &self.transaction_manager.active_count(),
209            )
210            .field("indices_count", &self.indices.len())
211            .finish()
212    }
213}
214
215impl Clone for Aurora {
216    fn clone(&self) -> Self {
217        Self {
218            hot: self.hot.clone(),
219            cold: Arc::clone(&self.cold),
220            primary_indices: Arc::clone(&self.primary_indices),
221            secondary_indices: Arc::clone(&self.secondary_indices),
222            sys_id_mapping: self.sys_id_mapping.clone(),
223            mmap_index: Arc::clone(&self.mmap_index),
224            index_manifest: Arc::clone(&self.index_manifest),
225            _sid_dictionary: Arc::clone(&self._sid_dictionary),
226            reverse_sid_dictionary: Arc::clone(&self.reverse_sid_dictionary),
227            deleted_ids: Arc::clone(&self.deleted_ids),
228            recyclable_ids: Arc::clone(&self.recyclable_ids),
229            next_internal_id: Arc::clone(&self.next_internal_id),
230            indices_initialized: Arc::clone(&self.indices_initialized),
231            transaction_manager: self.transaction_manager.clone(),
232            indices: Arc::clone(&self.indices),
233            schema_cache: Arc::clone(&self.schema_cache),
234            config: self.config.clone(),
235            write_buffer: self.write_buffer.clone(),
236            pubsub: self.pubsub.clone(),
237            wal: self.wal.clone(),
238            checkpoint_shutdown: self.checkpoint_shutdown.clone(),
239            compaction_shutdown: self.compaction_shutdown.clone(),
240            wal_shutdown: self.wal_shutdown.clone(),
241            workers: self.workers.clone(),
242            wal_writer: self.wal_writer.clone(),
243            wal_disabled: Arc::clone(&self.wal_disabled),
244            computed: Arc::clone(&self.computed),
245            ast_cache: Arc::clone(&self.ast_cache),
246            plan_cache: Arc::clone(&self.plan_cache),
247            building_indices: Arc::clone(&self.building_indices),
248        }
249    }
250}
251
252/// Trait for converting inputs into execution parameters
253pub trait ToExecParams {
254    fn into_params(self) -> (String, ExecutionOptions);
255}
256
257impl<'a> ToExecParams for &'a str {
258    fn into_params(self) -> (String, ExecutionOptions) {
259        (self.to_string(), ExecutionOptions::default())
260    }
261}
262
263impl ToExecParams for String {
264    fn into_params(self) -> (String, ExecutionOptions) {
265        (self, ExecutionOptions::default())
266    }
267}
268
269impl ToExecParams for (String, ExecutionOptions) {
270    fn into_params(self) -> (String, ExecutionOptions) {
271        self
272    }
273}
274
275impl<'a> ToExecParams for (&'a str, ExecutionOptions) {
276    fn into_params(self) -> (String, ExecutionOptions) {
277        (self.0.to_string(), self.1)
278    }
279}
280
281impl<'a, V> ToExecParams for (&'a str, V)
282where
283    V: Into<serde_json::Value>,
284{
285    fn into_params(self) -> (String, ExecutionOptions) {
286        let (aql, vars) = self;
287        let json_vars = vars.into();
288        let mut map = std::collections::HashMap::new();
289        if let serde_json::Value::Object(obj) = json_vars {
290            for (k, v) in obj {
291                map.insert(k, crate::parser::executor::json_to_aql_value(v));
292            }
293        }
294        (
295            aql.to_string(),
296            ExecutionOptions {
297                variables: map,
298                ..Default::default()
299            },
300        )
301    }
302}
303
304impl Aurora {
305    /// Get reference to AST cache for parsed AQL queries
306    pub fn ast_cache(&self) -> &MokaCache<u64, crate::parser::ast::Document> {
307        &self.ast_cache
308    }
309
310    /// Convert an external String ID back to an internal ID, creating if not found
311    fn get_or_create_internal_id(&self, external_id: &str) -> u32 {
312        let u = self.parse_external_id(external_id);
313
314        // 1. Check dictionary for existing mapping
315        if let Some(entry) = self._sid_dictionary.get(&u) {
316            return *entry.value();
317        }
318
319        // 2. Try to recycle a deleted ID
320        let recycled_id = if let Ok(mut recyclable) = self.recyclable_ids.write() {
321            if !recyclable.is_empty() {
322                let id = recyclable.min().unwrap();
323                recyclable.remove(id);
324                Some(id)
325            } else {
326                None
327            }
328        } else {
329            None
330        };
331
332        let internal_id = if let Some(id) = recycled_id {
333            id
334        } else {
335            // 3. Allocate new ID
336            self.next_internal_id.fetch_add(1, Ordering::SeqCst)
337        };
338
339        // 4. Update dictionaries
340        self._sid_dictionary.insert(u, internal_id);
341
342        // 5. Persist mapping to Sled for cross-session stability
343        let _ = self
344            .sys_id_mapping
345            .insert(u.to_be_bytes(), &internal_id.to_be_bytes());
346
347        if let Ok(mut reverse) = self.reverse_sid_dictionary.write() {
348            if (internal_id as usize) >= reverse.len() {
349                reverse.resize((internal_id as usize) + 1, 0);
350            }
351            reverse[internal_id as usize] = u;
352        }
353
354        internal_id
355    }
356
357    /// Load persisted internal ID mappings from Sled at startup
358    fn load_id_mapping_from_sled(&self) -> Result<()> {
359        let mut max_id = 0u32;
360        let mut reverse_guard = self.reverse_sid_dictionary.write().unwrap();
361
362        for result in self.sys_id_mapping.iter() {
363            let (k, v) = result?;
364            if k.len() != 16 || v.len() != 4 {
365                continue;
366            }
367
368            let u = u128::from_be_bytes(k.as_ref().try_into().map_err(|_| {
369                AqlError::new(
370                    ErrorCode::InternalError,
371                    "ID mapping key corrupt".to_string(),
372                )
373            })?);
374            let id = u32::from_be_bytes(v.as_ref().try_into().map_err(|_| {
375                AqlError::new(
376                    ErrorCode::InternalError,
377                    "ID mapping value corrupt".to_string(),
378                )
379            })?);
380
381            self._sid_dictionary.insert(u, id);
382
383            if (id as usize) >= reverse_guard.len() {
384                reverse_guard.resize((id as usize) + 1, 0);
385            }
386            reverse_guard[id as usize] = u;
387
388            max_id = max_id.max(id);
389        }
390
391        // Ensure the counter starts AFTER the highest loaded ID
392        if max_id > 0 || !self._sid_dictionary.is_empty() {
393            self.next_internal_id.store(max_id + 1, Ordering::SeqCst);
394        }
395
396        Ok(())
397    }
398
399    /// Get a secondary index by name
400    pub fn get_secondary_index(&self, key: &str) -> Option<SecondaryIndex> {
401        self.secondary_indices.get(key).map(|e| e.value().clone())
402    }
403
404    /// Look up a single IndexStorage entry without cloning the whole inner DashMap.
405    /// This is O(1) vs the O(N) clone in `get_secondary_index`.
406    pub fn get_indexed_storage(
407        &self,
408        index_key: &str,
409        val_str: &str,
410    ) -> Option<Arc<RwLock<IndexStorage>>> {
411        self.secondary_indices
412            .get(index_key)?
413            .value()
414            .get(val_str)
415            .map(|e| e.value().clone())
416    }
417
418    /// Check whether a secondary index key exists in hot RAM (i.e. the field
419    /// is tracked by the indexer for this session).
420    pub fn has_index_key(&self, index_key: &str) -> bool {
421        self.secondary_indices.contains_key(index_key)
422    }
423
424    /// Convert an internal u32 ID back to an external String ID
425    pub fn get_external_id(&self, internal_id: u32) -> Option<String> {
426        let reverse_dict = self.reverse_sid_dictionary.read().ok()?;
427        reverse_dict
428            .get(internal_id as usize)
429            .map(|u| {
430                if *u == 0 {
431                    return String::new(); // Or handle as None if 0 is used for empty
432                }
433                self.format_external_id(*u)
434            })
435            .filter(|s| !s.is_empty())
436    }
437
438    /// Convert an external String ID to its binary u128 representation
439    pub fn parse_external_id(&self, id: &str) -> u128 {
440        uuid::Uuid::parse_str(id)
441            .map(|u| u.as_u128())
442            .unwrap_or_else(|_| {
443                // Fallback to hashing if not a UUID
444                use std::collections::hash_map::DefaultHasher;
445                use std::hash::{Hash, Hasher};
446                let mut s = DefaultHasher::new();
447                id.hash(&mut s);
448                s.finish() as u128
449            })
450    }
451
452    /// Convert binary u128 back to UUID string
453    pub fn format_external_id(&self, id: u128) -> String {
454        uuid::Uuid::from_u128(id).to_string()
455    }
456
457    /// Ensure secondary indices are initialized from checkpoint
458    pub async fn ensure_indices_initialized(&self) -> Result<()> {
459        self.indices_initialized
460            .get_or_try_init(|| async { self.load_index_checkpoint() })
461            .await?;
462        // Cross-check schema against checkpoint: spawn rebuilds for any index
463        // that was missing from the checkpoint (e.g. after a crash).
464        self.init_schema_indices().await;
465        Ok(())
466    }
467
468    /// Load index checkpoint from disk
469    fn load_index_checkpoint(&self) -> Result<()> {
470        let path = &self.config.db_path;
471        let index_bin = path.join("aurora_indices.bin");
472        let index_json = path.join("aurora_indices.json");
473
474        if !index_bin.exists() || !index_json.exists() {
475            return Ok(());
476        }
477
478        // 1. Load manifest
479        let manifest_data = std::fs::read_to_string(index_json)?;
480        let manifest: HashMap<String, (usize, usize)> = serde_json::from_str(&manifest_data)?;
481        for (k, v) in manifest {
482            self.index_manifest.insert(k, v);
483        }
484
485        // 2. Memory map indices
486        let file = StdFile::open(index_bin)?;
487        let mmap = unsafe { memmap2::Mmap::map(&file)? };
488        if let Ok(mut guard) = self.mmap_index.write() {
489            *guard = Some(mmap);
490        }
491
492        // 3. Hydrate secondary_indices from checkpoint so the hot map is not
493        //    empty after a crash/restart. Without this every indexed lookup
494        //    falls through to a full scan until the next checkpoint fires.
495        //
496        //    Manifest key format: "collection:field:value"
497        //    index_key           = "collection:field"   (split on 2nd colon)
498        let mmap_guard = self.mmap_index.read().ok();
499        if let Some(ref guard) = mmap_guard {
500            if let Some(ref mmap) = **guard {
501                for entry in self.index_manifest.iter() {
502                    let full_key = entry.key();
503                    let (offset, len) = *entry.value();
504
505                    // Split on the second ':' to recover index_key vs value_str
506                    let colon2 = full_key.match_indices(':').nth(1).map(|(i, _)| i);
507                    let (index_key, val_str) = match colon2 {
508                        Some(pos) => (&full_key[..pos], &full_key[pos + 1..]),
509                        None => continue,
510                    };
511
512                    if let Some(bytes) = mmap.get(offset..offset + len) {
513                        if let Ok(bitmap) = RoaringBitmap::deserialize_from(bytes) {
514                            let index = self
515                                .secondary_indices
516                                .entry(index_key.to_string())
517                                .or_default();
518                            index.insert(
519                                val_str.to_string(),
520                                Arc::new(std::sync::RwLock::new(IndexStorage::Bitmap(bitmap))),
521                            );
522                        }
523                    }
524                }
525            }
526        }
527
528        Ok(())
529    }
530
531    /// Returns true if the secondary index for `collection:field` is currently
532    /// being rebuilt in the background. Callers should fall back to a full scan.
533    pub fn is_index_building(&self, collection: &str, field: &str) -> bool {
534        let key = format!("{}:{}", collection, field);
535        self.building_indices.contains_key(&key)
536    }
537
538    /// Scan every collection defined in the schema. For any indexed/unique field
539    /// whose index is absent from the checkpoint, mark it as building and spawn
540    /// a background task to rebuild it from the cold store.
541    ///
542    /// Called once during `ensure_indices_initialized`, after `load_index_checkpoint`.
543    async fn init_schema_indices(&self) {
544        let collection_names: Vec<String> = self
545            .cold
546            .scan_prefix("_collection:")
547            .filter_map(|r| r.ok())
548            .map(|(key, _)| key.trim_start_matches("_collection:").to_string())
549            .collect();
550
551        // Group missing index keys by collection so we do ONE scan per collection,
552        // not one scan per field (which would re-scan the same docs N times).
553        let mut missing_by_collection: HashMap<String, Vec<String>> = HashMap::new();
554
555        for collection_name in collection_names {
556            let Ok(col_def) = self.get_collection_definition(&collection_name) else {
557                continue;
558            };
559
560            // Always include the auto-indexed "id" field — it's not schema-derived
561            // but index_value() indexes it automatically if present in doc.data.
562            let auto_id_key = format!("{}:id", collection_name);
563            if !self.has_index(&collection_name, "id") {
564                self.building_indices.insert(auto_id_key.clone(), ());
565                missing_by_collection
566                    .entry(collection_name.clone())
567                    .or_default()
568                    .push(auto_id_key);
569            }
570
571            for (field_name, field_def) in &col_def.fields {
572                if !field_def.indexed && !field_def.unique {
573                    continue;
574                }
575                let index_key = format!("{}:{}", collection_name, field_name);
576                if self.has_index(&collection_name, field_name) {
577                    continue;
578                }
579                self.building_indices.insert(index_key.clone(), ());
580                missing_by_collection
581                    .entry(collection_name.clone())
582                    .or_default()
583                    .push(index_key);
584            }
585        }
586
587        // One rebuild task per collection — single scan, indexes all missing fields.
588        for (collection_name, index_keys) in missing_by_collection {
589            let db = self.clone();
590            tokio::spawn(async move {
591                db.rebuild_collection_indices(&collection_name, index_keys)
592                    .await;
593            });
594        }
595    }
596
597    /// Background rebuild: one full collection scan that rebuilds ALL missing index keys.
598    /// Using a single scan avoids redundant I/O when multiple fields are missing.
599    async fn rebuild_collection_indices(&self, collection: &str, index_keys: Vec<String>) {
600        let db = self.clone();
601        let coll = collection.to_string();
602        let keys_clone = index_keys.clone();
603
604        let result = tokio::task::spawn_blocking(move || {
605            let prefix = format!("{}:", coll);
606            for entry in db.cold.scan_prefix(&prefix).flatten() {
607                let (key, value) = entry;
608                if key.starts_with('_') {
609                    continue;
610                }
611                // index_value is idempotent and covers all indexed fields in one pass.
612                let _ = db.index_value(&coll, &key, &value, None);
613            }
614        })
615        .await;
616
617        // Clear all building flags for this collection's keys regardless of outcome.
618        for key in &index_keys {
619            self.building_indices.remove(key);
620        }
621
622        if result.is_ok() {
623            let _ = self.save_index_checkpoint();
624        } else {
625            eprintln!(
626                "Index rebuild panicked for collection '{}' (keys: {:?}); will retry on next query cycle.",
627                collection, keys_clone
628            );
629        }
630    }
631
632    /// Persist dense secondary indices to disk and re-mmap the result.
633    ///
634    /// ## Dense-Bitmap-Only rule
635    /// Only `IndexStorage::Bitmap` entries are written. `Single` and `List` are
636    /// high-cardinality (one entry per unique email/UUID) — checkpointing them would
637    /// produce a manifest with millions of keys (50-100 MB of JSON). Skipping them
638    /// keeps the manifest in the low-kilobyte range regardless of document count.
639    ///
640    /// ## Hot / Cold merge
641    /// `secondary_indices` only accumulates writes since the last boot. The previous
642    /// checkpoint (in `mmap_index`) holds everything before that. Each merged bitmap
643    /// is `cold | hot` so no data is lost across checkpoints. Deleted IDs are masked
644    /// out using the `deleted_ids` bitmap before serializing.
645    ///
646    /// ## Format
647    ///   aurora_indices.bin  — concatenated native-serialized RoaringBitmaps
648    ///   aurora_indices.json — { "col:field:value" -> [byte_offset, byte_len] }
649    fn save_index_checkpoint(&self) -> Result<()> {
650        use std::io::Write as IoWrite;
651
652        if self.secondary_indices.is_empty() {
653            return Ok(());
654        }
655
656        let path = &self.config.db_path;
657        let index_bin = path.join("aurora_indices.bin");
658        let index_json = path.join("aurora_indices.json");
659
660        // Snapshot deleted IDs once; used to mask ghost IDs from every bitmap
661        let deleted_snap = self
662            .deleted_ids
663            .read()
664            .map(|g| g.clone())
665            .unwrap_or_default();
666
667        let mut bin_data: Vec<u8> = Vec::new();
668        let mut manifest: HashMap<String, (usize, usize)> = HashMap::new();
669
670        // Hold the mmap read-lock for the whole loop so cold lookups are stable
671        let mmap_guard = self.mmap_index.read().ok();
672        let mmap_ref = mmap_guard.as_ref().and_then(|g| g.as_ref());
673
674        for col_entry in self.secondary_indices.iter() {
675            let index_key = col_entry.key(); // e.g. "users:status"
676
677            // Unique fields are always Single(u32) — they never accumulate
678            // enough docs per value to promote to Bitmap.  We must checkpoint
679            // them explicitly or the uniqueness guard becomes blind after a
680            // restart (the hot index is empty and the check is silently skipped).
681            let is_unique_field = if let Some((coll, field)) = index_key.split_once(':') {
682                self.get_collection_definition(coll)
683                    .ok()
684                    .and_then(|def| def.fields.get(field).cloned())
685                    .map(|f| f.unique)
686                    .unwrap_or(false)
687            } else {
688                false
689            };
690
691            for val_entry in col_entry.value().iter() {
692                let Ok(storage) = val_entry.value().read() else {
693                    continue;
694                };
695
696                // Dense-Bitmap-Only for non-unique fields: skip Single / List.
697                // Unique fields are always checkpointed regardless of storage type.
698                let hot_bitmap: RoaringBitmap = match &*storage {
699                    IndexStorage::Bitmap(b) => b.clone(),
700                    _ if is_unique_field => storage.to_bitmap(),
701                    _ => continue,
702                };
703
704                let val_str = val_entry.key();
705                let full_key = format!("{}:{}", index_key, val_str);
706
707                // Merge with any previously checkpointed cold bitmap for this key
708                let mut merged = if let Some(loc) = self.index_manifest.get(&full_key) {
709                    let (offset, len) = *loc.value();
710                    mmap_ref
711                        .and_then(|m| m.get(offset..offset + len))
712                        .and_then(|bytes| RoaringBitmap::deserialize_from(bytes).ok())
713                        .unwrap_or_default()
714                } else {
715                    RoaringBitmap::new()
716                };
717
718                merged |= hot_bitmap;
719
720                // Strip IDs that have been deleted since the last checkpoint
721                if !deleted_snap.is_empty() {
722                    merged -= &deleted_snap;
723                }
724
725                if merged.is_empty() {
726                    continue;
727                }
728
729                let offset = bin_data.len();
730                merged.serialize_into(&mut bin_data).map_err(|e| {
731                    AqlError::new(ErrorCode::IoError, format!("bitmap serialize: {}", e))
732                })?;
733                manifest.insert(full_key, (offset, bin_data.len() - offset));
734            }
735        }
736
737        if manifest.is_empty() {
738            return Ok(());
739        }
740
741        // Atomic write: temp → rename so a mid-write crash leaves the old checkpoint intact
742        let tmp_bin = path.join("aurora_indices.bin.tmp");
743        let tmp_json = path.join("aurora_indices.json.tmp");
744
745        {
746            let mut f = StdFile::create(&tmp_bin)?;
747            f.write_all(&bin_data)?;
748            f.flush()?;
749        }
750
751        let manifest_json = serde_json::to_string(&manifest)
752            .map_err(|e| AqlError::new(ErrorCode::SerializationError, e.to_string()))?;
753        std::fs::write(&tmp_json, &manifest_json)?;
754
755        std::fs::rename(&tmp_bin, &index_bin)?;
756        std::fs::rename(&tmp_json, &index_json)?;
757
758        // Drop mmap guard before acquiring the write lock below
759        drop(mmap_guard);
760
761        // Refresh live manifest so the current process reads from the new checkpoint
762        self.index_manifest.clear();
763        for (k, v) in manifest {
764            self.index_manifest.insert(k, v);
765        }
766
767        // Re-mmap the new binary so queries hit the updated cold index immediately
768        let file = StdFile::open(&index_bin)?;
769        let mmap = unsafe { memmap2::Mmap::map(&file)? };
770        if let Ok(mut guard) = self.mmap_index.write() {
771            *guard = Some(mmap);
772        }
773
774        // 4. Finalize ID recycling: move from pending (deleted_ids) to recyclable
775        // This ensures IDs are only reused AFTER they have been wiped from all cold bitmaps
776        if !deleted_snap.is_empty() {
777            if let Ok(mut pending) = self.deleted_ids.write() {
778                *pending -= &deleted_snap;
779            }
780            if let Ok(mut recyclable) = self.recyclable_ids.write() {
781                *recyclable |= &deleted_snap;
782                // Persist recyclable IDs so they survive restart
783                let _ = self.save_recyclable_ids(&recyclable);
784            }
785        }
786
787        Ok(())
788    }
789
790    fn save_recyclable_ids(&self, bitmap: &RoaringBitmap) -> Result<()> {
791        let mut buf = Vec::new();
792        bitmap
793            .serialize_into(&mut buf)
794            .map_err(|e| AqlError::new(ErrorCode::IoError, e.to_string()))?;
795        self.cold.set("_sys:recyclable_ids".to_string(), buf)?;
796        Ok(())
797    }
798
799    fn load_recyclable_ids(&self) -> Result<()> {
800        if let Some(data) = self.cold.get("_sys:recyclable_ids")? {
801            if let Ok(bitmap) = RoaringBitmap::deserialize_from(&data[..]) {
802                if let Ok(mut recyclable) = self.recyclable_ids.write() {
803                    *recyclable = bitmap;
804                }
805            }
806        }
807        Ok(())
808    }
809
810    /// Internal serialization helper
811    pub fn deserialize_internal<T: serde::de::DeserializeOwned>(&self, data: &[u8]) -> Result<T> {
812        if data.starts_with(b"AUR\x01") {
813            Ok(rmp_serde::from_slice(&data[4..])
814                .map_err(|e| AqlError::new(ErrorCode::SerializationError, e.to_string()))?)
815        } else {
816            Ok(serde_json::from_slice(data)
817                .map_err(|e| AqlError::new(ErrorCode::SerializationError, e.to_string()))?)
818        }
819    }
820
821    /// Optimized value indexing using adaptive storage (Ticket 3)
822    fn index_value(
823        &self,
824        collection: &str,
825        key: &str,
826        value: &[u8],
827        _tx_id: Option<u64>,
828    ) -> Result<()> {
829        // Update primary index with metadata only
830        let location = DiskLocation::new(value.len());
831        self.primary_indices
832            .entry(collection.to_string())
833            .or_default()
834            .insert(key.to_string(), location);
835
836        if let Ok(doc) = self.deserialize_internal::<Document>(value) {
837            let internal_id = self.get_or_create_internal_id(&doc._sid);
838            let collection_def = self.get_collection_definition(collection)?;
839
840            // PERFORMANCE: Automatically index 'id' if present in data
841            if let Some(id_val) = doc.data.get("id") {
842                let index_key = format!("{}:id", collection);
843                let val_str = match id_val {
844                    Value::String(s) => s.clone(),
845                    _ => id_val.to_string(),
846                };
847                let index = self.secondary_indices.entry(index_key).or_default();
848                index
849                    .entry(val_str)
850                    .and_modify(|existing| {
851                        if let Ok(mut storage) = existing.write() {
852                            storage.add(internal_id);
853                        }
854                    })
855                    .or_insert_with(|| Arc::new(RwLock::new(IndexStorage::Single(internal_id))));
856            }
857
858            for (field_name, field_val) in doc.data {
859                if collection_def
860                    .fields
861                    .get(&field_name)
862                    .map_or(false, |f| f.indexed)
863                {
864                    let index_key = format!("{}:{}", collection, field_name);
865                    let val_str = match &field_val {
866                        Value::String(s) => s.clone(),
867                        _ => field_val.to_string(),
868                    };
869
870                    let index = self.secondary_indices.entry(index_key).or_default();
871                    // Use and_modify+or_insert_with so newly-created entries start as
872                    // Single(id) without an extra add() call that would corrupt them
873                    // into List([id, id]).
874                    index
875                        .entry(val_str)
876                        .and_modify(|existing| {
877                            if let Ok(mut storage) = existing.write() {
878                                storage.add(internal_id);
879                            }
880                        })
881                        .or_insert_with(|| {
882                            Arc::new(RwLock::new(IndexStorage::Single(internal_id)))
883                        });
884                }
885            }
886        }
887        Ok(())
888    }
889
890    /// Get an iterator over all documents in a collection
891    pub fn stream_collection(&self, collection: &str) -> Result<impl Iterator<Item = Document>> {
892        let mut docs = Vec::new();
893        if let Some(index) = self.primary_indices.get(collection) {
894            for entry in index.iter() {
895                let id = entry.key();
896                if let Some(doc) = self.get_document(collection, id)? {
897                    docs.push(doc);
898                }
899            }
900        }
901        Ok(docs.into_iter())
902    }
903
904    /// Get first document matching filter
905    pub async fn first_one(
906        &self,
907        collection: &str,
908        filter_fn: impl Fn(&Document) -> bool,
909    ) -> Result<Option<Document>> {
910        let docs = self.scan_and_filter(collection, filter_fn, Some(1))?;
911        Ok(docs.into_iter().next())
912    }
913
914    /// Executes an AQL (Aurora Query Language) operation against the database.
915    ///
916    /// This is the primary entry point for all database operations including queries,
917    /// mutations, schema definitions, and migrations.
918    ///
919    /// # Arguments
920    /// * `input` - Anything that implements `ToExecParams`. This includes:
921    ///   - A simple query string: `"query { users { id } }"`
922    ///   - A query string with variables: `("query($id: String) { users(where: {id: {eq: $id}}) { name } }", json!({"id": "u1"}))`
923    ///   - The `doc!` macro output: `doc!("query...", {"var": 123})`
924    ///
925    /// # Returns
926    /// An `ExecutionResult` which can be bound to Rust types using `.bind()` or `.bind_first()`.
927    ///
928    /// # Examples
929    ///
930    /// ```
931    /// # use aurora_db::{Aurora, doc, Value};
932    /// # async fn example(db: Aurora) -> Result<(), Box<dyn std::error::Error>> {
933    /// // Simple query
934    /// let res = db.execute("query { users { name } }").await?;
935    ///
936    /// // Query with variables using doc! macro
937    /// let res = db.execute(doc!(
938    ///     "query($minAge: Int) { users(where: { age: { gte: $minAge } }) { name } }",
939    ///     { "minAge": 18 }
940    /// )).await?;
941    /// # Ok(())
942    /// # }
943    /// ```
944    pub async fn execute<I: ToExecParams>(&self, input: I) -> Result<ExecutionResult> {
945        let (aql, options) = input.into_params();
946        parser::executor::execute(self, &aql, options).await
947    }
948
949    /// Subscribes to real-time changes using AQL subscription syntax.
950    ///
951    /// This method returns a `ChangeListener` that yields events whenever documents
952    /// matching the subscription criteria are inserted, updated, or deleted.
953    ///
954    /// # Arguments
955    /// * `aql` - An AQL subscription string.
956    ///
957    /// # Returns
958    /// A `ChangeListener` (stream-like) for receiving real-time `ChangeEvent`s.
959    ///
960    /// # Example
961    ///
962    /// ```ignore
963    /// let mut listener = db.stream("subscription { users { id name } }").await?;
964    /// while let Ok(event) = listener.recv().await {
965    ///     println!("Change detected: {:?}", event);
966    /// }
967    /// ```
968    pub async fn stream(&self, aql: &str) -> Result<crate::pubsub::ChangeListener> {
969        let result = self.execute(aql).await?;
970
971        match result {
972            ExecutionResult::Subscription(sub) => sub.stream.ok_or_else(|| {
973                crate::error::AqlError::new(
974                    crate::error::ErrorCode::QueryError,
975                    "Subscription did not return a stream".to_string(),
976                )
977            }),
978            _ => Err(crate::error::AqlError::new(
979                crate::error::ErrorCode::QueryError,
980                "Expected a subscription query, got a different operation type".to_string(),
981            )),
982        }
983    }
984
985    /// Explains the execution plan for an AQL query without executing it.
986    ///
987    /// This is useful for debugging performance issues and understanding how Aurora
988    /// will process a given query (e.g., whether it will use an index or a full scan).
989    ///
990    /// # Arguments
991    /// * `input` - The query to explain (accepts same formats as `execute`).
992    ///
993    /// # Returns
994    /// An `ExecutionPlan` containing the sequence of operations and estimated cost.
995    pub async fn explain<I: ToExecParams>(&self, input: I) -> Result<ExecutionPlan> {
996        let (aql, options) = input.into_params();
997
998        // Parse and analyze without executing
999        let doc = parser::parse_with_variables(&aql, &options.variables)?;
1000
1001        self.analyze_execution_plan(&doc).await
1002    }
1003
1004    /// Analyzes a parsed AQL document to produce an execution plan.
1005    ///
1006    /// Internal helper used by `explain`.
1007    pub async fn analyze_execution_plan(
1008        &self,
1009        doc: &crate::parser::ast::Document,
1010    ) -> Result<ExecutionPlan> {
1011        let mut operations = Vec::new();
1012        for op in &doc.operations {
1013            operations.push(format!("{:?}", op));
1014        }
1015
1016        Ok(ExecutionPlan {
1017            operations,
1018            estimated_cost: 1.0,
1019        })
1020    }
1021
1022    /// Removes stale lock files from a database directory.
1023    ///
1024    /// If Aurora crashes or is forcefully terminated, it may leave behind lock files
1025    /// that prevent the database from being reopened. This method safely removes
1026    /// those lock files.
1027    ///
1028    /// # Safety
1029    /// Only call this when you're certain no other Aurora instance is using the database.
1030    /// Removing lock files while another process is running could cause data corruption.
1031    pub fn remove_stale_lock<P: AsRef<Path>>(path: P) -> Result<bool> {
1032        let resolved_path = Self::resolve_path(path)?;
1033        crate::storage::cold::ColdStore::try_remove_stale_lock(resolved_path.to_str().unwrap())
1034    }
1035
1036    /// Opens or creates a database at the specified path.
1037    ///
1038    /// If the database doesn't exist, it will be created. If it does exist, it will
1039    /// be opened and any pending WAL (Write-Ahead Log) entries will be replayed
1040    /// to ensure consistency.
1041    ///
1042    /// # Arguments
1043    /// * `path` - The filesystem path to the database.
1044    ///
1045    /// # Examples
1046    ///
1047    /// ```no_run
1048    /// # use aurora_db::Aurora;
1049    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1050    /// let db = Aurora::open("my_database.db").await?;
1051    /// # Ok(())
1052    /// # }
1053    /// ```
1054    pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
1055        let config = AuroraConfig {
1056            db_path: Self::resolve_path(path)?,
1057            ..Default::default()
1058        };
1059        Self::with_config(config).await
1060    }
1061
1062    /// Helper method to resolve database path
1063    fn resolve_path<P: AsRef<Path>>(path: P) -> Result<PathBuf> {
1064        let path = path.as_ref();
1065
1066        // If it's an absolute path, use it directly
1067        if path.is_absolute() {
1068            return Ok(path.to_path_buf());
1069        }
1070
1071        // Otherwise, resolve relative to current directory
1072        match std::env::current_dir() {
1073            Ok(current_dir) => Ok(current_dir.join(path)),
1074            Err(e) => Err(AqlError::new(
1075                ErrorCode::IoError,
1076                format!("Failed to resolve current directory: {}", e),
1077            )),
1078        }
1079    }
1080
1081    /// Opens a database with a custom configuration.
1082    ///
1083    /// This allows fine-tuning performance parameters like cache sizes,
1084    /// durability modes, and worker settings.
1085    ///
1086    /// # Arguments
1087    /// * `config` - The `AuroraConfig` to use.
1088    ///
1089    /// # Examples
1090    ///
1091    /// ```no_run
1092    /// # use aurora_db::{Aurora, AuroraConfig};
1093    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1094    /// let config = AuroraConfig {
1095    ///     db_path: "custom.db".into(),
1096    ///     hot_cache_size_mb: 1024,
1097    ///     enable_wal: true,
1098    ///     ..Default::default()
1099    /// };
1100    /// let db = Aurora::with_config(config).await?;
1101    /// # Ok(())
1102    /// # }
1103    /// ```
1104    pub async fn with_config(config: AuroraConfig) -> Result<Self> {
1105        let path = Self::resolve_path(&config.db_path)?;
1106
1107        if config.create_dirs
1108            && let Some(parent) = path.parent()
1109            && !parent.exists()
1110        {
1111            std::fs::create_dir_all(parent)?;
1112        }
1113
1114        let cold = Arc::new(ColdStore::with_config(
1115            path.to_str().unwrap(),
1116            config.cold_cache_capacity_mb,
1117            config.cold_flush_interval_ms,
1118            config.cold_mode.clone(),
1119        )?);
1120
1121        let hot = HotStore::with_config_and_eviction(
1122            config.hot_cache_size_mb,
1123            config.hot_cache_cleanup_interval_secs,
1124            config.eviction_policy,
1125        );
1126
1127        let write_buffer = if config.enable_write_buffering {
1128            Some(Arc::new(WriteBuffer::new(
1129                Arc::clone(&cold),
1130                config.write_buffer_size,
1131                config.write_buffer_flush_interval_ms,
1132            )))
1133        } else {
1134            None
1135        };
1136
1137        let auto_compact = config.auto_compact;
1138        let enable_wal = config.enable_wal;
1139        let pubsub = crate::pubsub::PubSubSystem::new(10000);
1140
1141        // Initialize WAL
1142        let (wal, wal_entries) = if enable_wal {
1143            let wal_path = path.to_str().unwrap();
1144            match WriteAheadLog::new(wal_path) {
1145                Ok(mut wal_log) => {
1146                    let entries = wal_log.recover().unwrap_or_else(|_| Vec::new());
1147                    (Some(Arc::new(RwLock::new(wal_log))), entries)
1148                }
1149                Err(_) => (None, Vec::new()),
1150            }
1151        } else {
1152            (None, Vec::new())
1153        };
1154
1155        // --- FIX: Initialize Background WAL Writer ---
1156        // Only enable WAL writer if WAL was successfully initialized
1157        let wal_disabled_flag = Arc::new(AtomicBool::new(false));
1158        let wal_writer = if enable_wal && wal.is_some() {
1159            let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<WalOperation>();
1160            let wal_clone = wal.clone();
1161            let wal_disabled_task = Arc::clone(&wal_disabled_flag);
1162
1163            // Spawn a single dedicated thread/task for writing WAL sequentially
1164            tokio::spawn(async move {
1165                if let Some(wal) = wal_clone {
1166                    while let Some(op) = rx.recv().await {
1167                        // If WAL has been disabled due to a prior error, drain messages silently.
1168                        if wal_disabled_task.load(Ordering::Relaxed) {
1169                            continue;
1170                        }
1171
1172                        let wal = wal.clone();
1173                        // Move blocking I/O to a blocking thread
1174                        let result = tokio::task::spawn_blocking(
1175                            move || -> std::result::Result<(), String> {
1176                                match wal.write() {
1177                                    Ok(mut guard) => {
1178                                        let wal_result = match op {
1179                                            WalOperation::Put { key, value } => guard.append(
1180                                                Operation::Put,
1181                                                &key,
1182                                                Some(value.as_ref()),
1183                                            ),
1184                                            WalOperation::Delete { key } => {
1185                                                guard.append(Operation::Delete, &key, None)
1186                                            }
1187                                        };
1188                                        wal_result.map_err(|e| format!("WAL append failed: {}", e))
1189                                    }
1190                                    Err(e) => {
1191                                        // Lock is poisoned — degrade gracefully, do not kill the process.
1192                                        Err(format!("WAL lock poisoned: {}", e))
1193                                    }
1194                                }
1195                            },
1196                        )
1197                        .await;
1198
1199                        match result {
1200                            Err(e) if e.is_cancelled() => break,
1201                            Err(e) => {
1202                                eprintln!(
1203                                    "WAL task panicked: {}. Durability disabled for this session.",
1204                                    e
1205                                );
1206                                wal_disabled_task.store(true, Ordering::SeqCst);
1207                                // Keep draining so callers never see a closed-channel error.
1208                            }
1209                            Ok(Err(msg)) => {
1210                                eprintln!(
1211                                    "CRITICAL WAL write error: {}. Durability disabled for this session.",
1212                                    msg
1213                                );
1214                                wal_disabled_task.store(true, Ordering::SeqCst);
1215                            }
1216                            Ok(Ok(())) => {}
1217                        }
1218                    }
1219                }
1220            });
1221            Some(tx)
1222        } else {
1223            None
1224        };
1225
1226        let checkpoint_shutdown = if wal.is_some() {
1227            let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1228            let cold_clone = Arc::clone(&cold);
1229            let wal_clone = wal.clone();
1230            let checkpoint_interval = config.checkpoint_interval_ms;
1231
1232            tokio::spawn(async move {
1233                let mut interval =
1234                    tokio::time::interval(Duration::from_millis(checkpoint_interval));
1235                loop {
1236                    tokio::select! {
1237                        _ = interval.tick() => {
1238                            if let Err(e) = cold_clone.flush() {
1239                                eprintln!("Background checkpoint flush error: {}", e);
1240                            }
1241                            if let Some(ref wal) = wal_clone
1242                                && let Ok(mut wal_guard) = wal.write() {
1243                                    let _ = wal_guard.truncate();
1244                                }
1245                        }
1246                        _ = shutdown_rx.recv() => {
1247                            let _ = cold_clone.flush();
1248                            if let Some(ref wal) = wal_clone
1249                                && let Ok(mut wal_guard) = wal.write() {
1250                                    let _ = wal_guard.truncate();
1251                                }
1252                            break;
1253                        }
1254                    }
1255                }
1256            });
1257
1258            Some(shutdown_tx)
1259        } else {
1260            None
1261        };
1262
1263        let compaction_shutdown = if auto_compact {
1264            let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1265            let cold_clone = Arc::clone(&cold);
1266            let compact_interval = config.compact_interval_mins;
1267
1268            tokio::spawn(async move {
1269                let mut interval =
1270                    tokio::time::interval(Duration::from_secs(compact_interval * 60));
1271                loop {
1272                    tokio::select! {
1273                        _ = interval.tick() => {
1274                            if let Err(e) = cold_clone.compact() {
1275                                eprintln!("Background compaction error: {}", e);
1276                            }
1277                        }
1278                        _ = shutdown_rx.recv() => {
1279                            let _ = cold_clone.compact();
1280                            break;
1281                        }
1282                    }
1283                }
1284            });
1285
1286            Some(shutdown_tx)
1287        } else {
1288            None
1289        };
1290
1291        let workers = if config.workers_enabled {
1292            let workers_path = path.join("workers.db");
1293            let worker_config = crate::workers::WorkerConfig {
1294                storage_path: workers_path.to_string_lossy().into_owned(),
1295                concurrency: config.worker_threads,
1296                ..Default::default()
1297            };
1298            crate::workers::WorkerSystem::new(worker_config)
1299                .map(Arc::new)
1300                .ok()
1301        } else {
1302            None
1303        };
1304
1305        let sys_id_mapping = cold.open_tree("_sys_id_mapping")?;
1306
1307        let db = Self {
1308            hot,
1309            cold,
1310            primary_indices: Arc::new(DashMap::new()),
1311            secondary_indices: Arc::new(DashMap::new()),
1312            sys_id_mapping,
1313            mmap_index: Arc::new(RwLock::new(None)),
1314            index_manifest: Arc::new(DashMap::new()),
1315            _sid_dictionary: Arc::new(crossbeam_skiplist::SkipMap::new()),
1316            reverse_sid_dictionary: Arc::new(RwLock::new(Vec::new())),
1317            deleted_ids: Arc::new(RwLock::new(RoaringBitmap::new())),
1318            recyclable_ids: Arc::new(RwLock::new(RoaringBitmap::new())),
1319            next_internal_id: Arc::new(AtomicU32::new(0)),
1320            indices_initialized: Arc::new(OnceCell::new()),
1321            transaction_manager: crate::transaction::TransactionManager::new(),
1322            indices: Arc::new(DashMap::new()),
1323            schema_cache: Arc::new(DashMap::new()),
1324            config,
1325            write_buffer,
1326            pubsub,
1327            wal,
1328            checkpoint_shutdown,
1329            compaction_shutdown,
1330            wal_shutdown: None,
1331            workers,
1332            wal_writer,
1333            computed: Arc::new(RwLock::new(crate::computed::ComputedFields::new())),
1334            ast_cache: Arc::new(MokaCache::new(1000)),
1335            plan_cache: Arc::new(MokaCache::new(1000)),
1336            building_indices: Arc::new(DashMap::new()),
1337            wal_disabled: wal_disabled_flag,
1338        };
1339
1340        // 0. Load persisted ID mappings from Sled so that internal IDs are stable
1341        db.load_id_mapping_from_sled()?;
1342
1343        // 0.1 Load recyclable IDs
1344        db.load_recyclable_ids()?;
1345
1346        // 1. Load index checkpoint immediately from disk
1347        db.load_index_checkpoint()?;
1348
1349        // 2. Replay WAL entries on top of the checkpoint state
1350        if !wal_entries.is_empty() {
1351            db.replay_wal(wal_entries).await?;
1352        }
1353
1354        // 3. Mark indices as initialized so lazy calls don't re-run load_index_checkpoint
1355        let _ = db.indices_initialized.set(());
1356
1357        // 4. Trigger background rebuilds for indices missing from checkpoint
1358        //    (Done after WAL replay so Sled is up to date)
1359        db.init_schema_indices().await;
1360
1361        // 5. Always rebuild primary index from cold storage after startup.
1362        // WAL replay only covers entries written since the last flush; documents
1363        // that were already checkpointed to cold storage (WAL truncated) must be
1364        // reloaded here so queries see all persisted data.
1365        db.rebuild_primary_index_from_cold()?;
1366
1367        Ok(db)
1368    }
1369    // Fast key-value operations with index support
1370    /// Get a value by key (low-level key-value access)
1371    ///
1372    /// This is the low-level method. For document access, use `get_document()` instead.
1373    /// Checks hot cache first, then falls back to cold storage for maximum performance.
1374    ///
1375    /// # Performance
1376    /// - Hot cache hit: ~1M reads/sec (instant)
1377    /// - Cold storage: ~500K reads/sec (disk I/O)
1378    /// - Cache hit rate: typically 95%+ at scale
1379    ///
1380    /// # Examples
1381    ///
1382    /// ```
1383    /// // Low-level key-value access
1384    /// let data = db.get("users:12345")?;
1385    /// if let Some(bytes) = data {
1386    ///     let doc: Document = serde_json::from_slice(&bytes)?;
1387    ///     println!("Found: {:?}", doc);
1388    /// }
1389    ///
1390    /// // Better: use get_document() for documents
1391    /// let user = db.get_document("users", "12345")?;
1392    /// ```ignore
1393    pub fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
1394        // If inside a transaction, check the buffer first.
1395        // Provides read-your-own-writes within the transaction while keeping
1396        // changes invisible to all other readers until commit.
1397        if let Ok(tx_id) = crate::transaction::ACTIVE_TRANSACTION_ID.try_with(|id| *id) {
1398            if let Some(buffer) = self.transaction_manager.active_transactions.get(&tx_id) {
1399                if buffer.deletes.contains_key(key) {
1400                    return Ok(None);
1401                }
1402                if let Some(value) = buffer.read(key) {
1403                    return Ok(Some(value));
1404                }
1405                // Not in buffer → fall through to storage (read-committed for
1406                // values not yet touched by this transaction)
1407            }
1408        }
1409
1410        // Check hot cache first
1411        if let Some(value) = self.hot.get(key) {
1412            // Check if this is a blob reference (pointer to cold storage)
1413            if value.starts_with(b"BLOBREF:") {
1414                // It's a blob ref - fetch actual data from cold storage
1415                return self.cold.get(key);
1416            }
1417            return Ok(Some(value));
1418        }
1419
1420        // Fetch from cold storage
1421        let value = self.cold.get(key)?;
1422
1423        if let Some(v) = &value {
1424            if self.should_cache_key(key) {
1425                self.hot
1426                    .set(Arc::new(key.to_string()), Arc::new(v.clone()), None);
1427            }
1428        }
1429
1430        Ok(value)
1431    }
1432
1433    /// Get value with zero-copy Arc reference (10-100x faster than get!)
1434    /// Only checks hot cache - returns None if not cached
1435    pub fn get_hot_ref(&self, key: &str) -> Option<Arc<Vec<u8>>> {
1436        self.hot.get_ref(key)
1437    }
1438
1439    /// Helper to decide if a key should be cached.
1440    fn should_cache_key(&self, key: &str) -> bool {
1441        if let Some(collection_name) = key.split(':').next() {
1442            if !collection_name.starts_with('_') {
1443                if let Ok(collection_def) = self.get_collection_definition(collection_name) {
1444                    if collection_def
1445                        .fields
1446                        .values()
1447                        .any(|def| def.field_type == FieldType::Any)
1448                    {
1449                        return false; // Don't cache if collection has Any field
1450                    }
1451                }
1452            }
1453        }
1454        true // Cache by default
1455    }
1456
1457    /// Retrieves metrics about hot cache performance.
1458    ///
1459    /// # Example
1460    /// ```rust
1461    /// # use aurora_db::Aurora;
1462    /// # async fn example(db: Aurora) {
1463    /// let stats = db.get_cache_stats();
1464    /// println!("Hit ratio: {:.2}%", stats.hit_rate * 100.0);
1465    /// # }
1466    /// ```
1467    pub fn get_cache_stats(&self) -> crate::storage::hot::CacheStats {
1468        self.hot.get_stats()
1469    }
1470
1471    pub fn has_index(&self, collection: &str, field: &str) -> bool {
1472        // _sid is the primary key — always an O(1) lookup, no secondary index needed.
1473        if field == "_sid" {
1474            return true;
1475        }
1476        // For all other fields (including auto-indexed "id"), only claim the index
1477        // exists if there is actual data in the hot map or cold manifest.
1478        // This prevents the indexed fast-path from firing on a post-crash empty index
1479        // and returning zero results instead of falling back to a full scan.
1480        let index_key = format!("{}:{}", collection, field);
1481        if self.secondary_indices.contains_key(&index_key) {
1482            return true;
1483        }
1484        let cold_prefix = format!("{}:", index_key);
1485        if self
1486            .index_manifest
1487            .iter()
1488            .any(|e| e.key().starts_with(&cold_prefix))
1489        {
1490            return true;
1491        }
1492        false
1493    }
1494
1495    pub fn get_ids_from_index(&self, collection: &str, field: &str, value: &Value) -> Vec<String> {
1496        let mut bitmap = RoaringBitmap::new();
1497        let ik = format!("{}:{}", collection, field);
1498        let vstr = match value {
1499            Value::String(s) => s.clone(),
1500            _ => value.to_string(),
1501        };
1502
1503        // 1. Check Cold Index (mmap)
1504        if let Some(loc) = self.index_manifest.get(&format!("{}:{}", ik, vstr)) {
1505            if let Ok(g) = self.mmap_index.read()
1506                && let Some(m) = g.as_ref()
1507            {
1508                if let Ok(cb) = RoaringBitmap::deserialize_from(&m[loc.0..(loc.0 + loc.1)]) {
1509                    bitmap |= cb;
1510                }
1511            }
1512        }
1513
1514        // 2. Check Hot Index (RAM)
1515        if let Some(index_map) = self.secondary_indices.get(&ik) {
1516            if let Some(storage_arc) = index_map.get(&vstr) {
1517                if let Ok(storage) = storage_arc.value().read() {
1518                    bitmap |= storage.to_bitmap();
1519                }
1520            }
1521        }
1522
1523        bitmap
1524            .iter()
1525            .filter_map(|id| self.get_external_id(id))
1526            .collect()
1527    }
1528
1529    /// Register a computed field definition
1530    pub async fn register_computed_field(
1531        &self,
1532        collection: &str,
1533        field: &str,
1534        expression: crate::computed::ComputedExpression,
1535    ) -> Result<()> {
1536        let mut computed = self.computed.write().unwrap();
1537        computed.register(collection, field, expression);
1538        Ok(())
1539    }
1540
1541    // ============================================
1542    // PubSub API - Real-time Change Notifications
1543    // ============================================
1544
1545    /// Listen for real-time changes in a collection
1546    ///
1547    /// Returns a stream of change events (inserts, updates, deletes) that you can subscribe to.
1548    /// Perfect for building reactive UIs, cache invalidation, audit logging, webhooks, and
1549    /// data synchronization systems.
1550    ///
1551    /// # Performance
1552    /// - Zero overhead when no listeners are active
1553    /// - Events are broadcast to all listeners asynchronously
1554    /// - Non-blocking - doesn't slow down write operations
1555    /// - Multiple listeners can watch the same collection
1556    ///
1557    /// # Examples
1558    ///
1559
1560    /// use aurora_db::{Aurora, types::Value};
1561    ///
1562    /// let db = Aurora::open("mydb.db")?;
1563    ///
1564    /// // Basic listener
1565    /// let mut listener = db.listen("users");
1566    ///
1567    /// tokio::spawn(async move {
1568    ///     while let Ok(event) = listener.recv().await {
1569    ///         match event.change_type {
1570    ///             ChangeType::Insert => println!("New user: {:?}", event.document),
1571    ///             ChangeType::Update => println!("Updated user: {:?}", event.document),
1572    ///             ChangeType::Delete => println!("Deleted user ID: {}", event._sid),
1573    ///         }
1574    ///     }
1575    /// });
1576    ///
1577    /// // Now any insert/update/delete will trigger the listener
1578    /// db.insert_into("users", vec![("name", Value::String("Alice".into()))]).await?;
1579    /// ```ignore
1580    ///
1581    /// # Real-World Use Cases
1582    ///
1583    /// **Cache Invalidation:**
1584
1585    /// use std::sync::Arc;
1586    /// use tokio::sync::RwLock;
1587    /// use std::collections::HashMap;
1588    ///
1589    /// let cache = Arc::new(RwLock::new(HashMap::new()));
1590    /// let cache_clone = Arc::clone(&cache);
1591    ///
1592    /// let mut listener = db.listen("products");
1593    ///
1594    /// tokio::spawn(async move {
1595    ///     while let Ok(event) = listener.recv().await {
1596    ///         // Invalidate cache entry when product changes
1597    ///         cache_clone.write().await.remove(&event._sid);
1598    ///         println!("Cache invalidated for product: {}", event._sid);
1599    ///     }
1600    /// });
1601    /// ```
1602    ///
1603    /// **Webhook Notifications:**
1604    /// ```ignore
1605    /// let mut listener = db.listen("orders");
1606    ///
1607    /// tokio::spawn(async move {
1608    ///     while let Ok(event) = listener.recv().await {
1609    ///         if event.change_type == ChangeType::Insert {
1610    ///             // Send webhook for new orders
1611    ///             send_webhook("https://api.example.com/webhooks/order", &event).await;
1612    ///         }
1613    ///     }
1614    /// });
1615    /// ```
1616    ///
1617    /// **Audit Logging:**
1618    /// ```ignore
1619    /// let mut listener = db.listen("sensitive_data");
1620    ///
1621    /// tokio::spawn(async move {
1622    ///     while let Ok(event) = listener.recv().await {
1623    ///         // Log all changes to audit trail
1624    ///         db.insert_into("audit_log", vec![
1625    ///             ("collection", Value::String("sensitive_data".into())),
1626    ///             ("action", Value::String(format!("{:?}", event.change_type))),
1627    ///             ("document_id", Value::String(event._sid.clone())),
1628    ///             ("timestamp", Value::String(chrono::Utc::now().to_rfc3339())),
1629    ///         ]).await?;
1630    ///     }
1631    /// });
1632    /// ```
1633    ///
1634    /// **Data Synchronization:**
1635    /// ```ignore
1636    /// let mut listener = db.listen("users");
1637    ///
1638    /// tokio::spawn(async move {
1639    ///     while let Ok(event) = listener.recv().await {
1640    ///         // Sync changes to external system
1641    ///         match event.change_type {
1642    ///             ChangeType::Insert | ChangeType::Update => {
1643    ///                 if let Some(doc) = event.document {
1644    ///                     external_api.upsert_user(&doc).await?;
1645    ///                 }
1646    ///             },
1647    ///             ChangeType::Delete => {
1648    ///                 external_api.delete_user(&event._sid).await?;
1649    ///             },
1650    ///         }
1651    ///     }
1652    /// });
1653    /// ```
1654    ///
1655    /// **Real-Time Notifications:**
1656    /// ```ignore
1657    /// let mut listener = db.listen("messages");
1658    ///
1659    /// tokio::spawn(async move {
1660    ///     while let Ok(event) = listener.recv().await {
1661    ///         if event.change_type == ChangeType::Insert {
1662    ///             if let Some(msg) = event.document {
1663    ///                 // Push notification to connected websockets
1664    ///                 if let Some(recipient) = msg.data.get("recipient_id") {
1665    ///                     websocket_manager.send_to_user(recipient, &msg).await;
1666    ///                 }
1667    ///             }
1668    ///         }
1669    ///     }
1670    /// });
1671    /// ```
1672    ///
1673    /// **Filtered Listener:**
1674    /// ```ignore
1675    /// use aurora_db::pubsub::EventFilter;
1676    ///
1677    /// // Only listen for inserts
1678    /// let mut listener = db.listen("users")
1679    ///     .filter(EventFilter::ChangeType(ChangeType::Insert));
1680    ///
1681    /// // Only listen for documents with specific field value
1682    /// let mut listener = db.listen("users")
1683    ///     .filter(EventFilter::FieldEquals("role".to_string(), Value::String("admin".into())));
1684    /// ```
1685    ///
1686    /// Listens for real-time changes in a specific collection.
1687    ///
1688    /// Returns a `ChangeListener` stream that yields events for inserts, updates, and deletes.
1689    ///
1690    /// # Example
1691    /// ```rust
1692    /// # use aurora_db::{Aurora, Value};
1693    /// # async fn example(db: Aurora) -> Result<(), Box<dyn std::error::Error>> {
1694    /// let mut listener = db.listen("users");
1695    ///
1696    /// tokio::spawn(async move {
1697    ///     while let Ok(event) = listener.recv().await {
1698    ///         println!("Change detected: {:?}", event);
1699    ///     }
1700    /// });
1701    /// # Ok(())
1702    /// # }
1703    /// ```
1704    pub fn listen(&self, collection: impl Into<String>) -> crate::pubsub::ChangeListener {
1705        self.pubsub.listen(collection)
1706    }
1707
1708    /// Listens for all changes across all collections in the database.
1709    pub fn listen_all(&self) -> crate::pubsub::ChangeListener {
1710        self.pubsub.listen_all()
1711    }
1712
1713    /// Get the number of active listeners for a collection
1714    pub fn listener_count(&self, collection: &str) -> usize {
1715        self.pubsub.listener_count(collection)
1716    }
1717
1718    /// Get total number of active listeners
1719    pub fn total_listeners(&self) -> usize {
1720        self.pubsub.total_listeners()
1721    }
1722
1723    /// Flushes all buffered writes to disk to ensure durability.
1724    ///
1725    /// This method forces all pending writes from:
1726    /// - Write buffer (if enabled)
1727    /// - Cold storage internal buffers
1728    /// - Write-ahead log (if enabled)
1729    ///
1730    /// Call this when you need to ensure data persistence before
1731    /// a critical operation or shutdown. After flush() completes,
1732    /// all data is guaranteed to be on disk even if power fails.
1733    ///
1734    /// # Performance
1735    /// - Flush time: ~10-50ms depending on buffered data
1736    /// - Triggers OS-level fsync() for durability guarantee
1737    /// - Truncates WAL after successful flush
1738    /// - Not needed for every write (WAL provides durability)
1739    ///
1740    /// # Examples
1741    ///
1742    /// ```ignore
1743    /// use aurora_db::Aurora;
1744    ///
1745    /// let db = Aurora::open("mydb.db")?;
1746    ///
1747    /// // Basic flush after critical write
1748    /// db.insert_into("users", data).await?;
1749    /// db.flush()?;  // Ensure data is persisted to disk
1750    ///
1751    /// // Graceful shutdown pattern
1752    /// fn shutdown(db: &Aurora) -> Result<()> {
1753    ///     println!("Flushing pending writes...");
1754    ///     db.flush()?;
1755    ///     println!("Shutdown complete - all data persisted");
1756    ///     Ok(())
1757    /// }
1758    ///
1759    /// // Periodic checkpoint pattern
1760    /// use std::time::Duration;
1761    /// use std::thread;
1762    ///
1763    /// let db = db.clone();
1764    /// thread::spawn(move || {
1765    ///     loop {
1766    ///         thread::sleep(Duration::from_secs(60));
1767    ///         if let Err(e) = db.flush() {
1768    ///             eprintln!("Flush error: {}", e);
1769    ///         } else {
1770    ///             println!("Checkpoint: data flushed to disk");
1771    ///         }
1772    ///     }
1773    /// });
1774    ///
1775    /// // Critical transaction pattern
1776    /// let tx_id = db.begin_transaction();
1777    ///
1778    /// // Multiple operations
1779    /// db.insert_into("orders", order_data).await?;
1780    /// db.update_document("inventory", product_id, updates).await?;
1781    /// db.insert_into("audit_log", audit_data).await?;
1782    ///
1783    /// // Commit and flush immediately
1784    /// db.commit_transaction(tx_id)?;
1785    /// db.flush()?;  // Critical: ensure transaction is on disk
1786    ///
1787    /// // Backup preparation
1788    /// println!("Preparing backup...");
1789    /// db.flush()?;  // Ensure all data is written
1790    /// std::fs::copy("mydb.db", "backup.db")?;
1791    /// println!("Backup complete");
1792    /// ```
1793    ///
1794    /// # When to Use
1795    /// - Before graceful shutdown
1796    /// - After critical transactions
1797    /// - Before creating backups
1798    /// - Periodic checkpoints (every 30-60 seconds)
1799    /// - Before risky operations
1800    ///
1801    /// # When NOT to Use
1802    /// - After every single write (too slow, WAL provides durability)
1803    /// - In high-throughput loops (batch instead)
1804    /// - When durability mode is already Immediate
1805    ///
1806    /// # Important Notes
1807    /// - WAL provides durability even without explicit flush()
1808    /// - flush() adds latency (~10-50ms) so use strategically
1809    /// - Automatic flush happens during graceful shutdown
1810    /// - After flush(), WAL is truncated (data is in main storage)
1811    ///
1812    /// # See Also
1813    /// Flushes all buffered writes to disk to ensure durability.
1814    ///
1815    /// This method forces all pending writes from the write buffer and cold storage
1816    /// internal buffers to the physical disk.
1817    ///
1818    /// # Performance
1819    /// Adding explicit flushes can significantly slow down write performance. 
1820    /// Aurora typically manages flushes automatically via the WAL.
1821    pub fn flush(&self) -> Result<()> {
1822        // Flush write buffer if present
1823        if let Some(ref write_buffer) = self.write_buffer {
1824            write_buffer.flush()?;
1825        }
1826
1827        // Flush cold storage
1828        self.cold.flush()?;
1829
1830        // Truncate WAL after successful flush (data is now in cold storage)
1831        if let Some(ref wal) = self.wal
1832            && let Ok(mut wal_lock) = wal.write()
1833        {
1834            wal_lock.truncate()?;
1835        }
1836
1837        // Persist secondary indices so next boot loads from mmap instead of scanning
1838        self.save_index_checkpoint()?;
1839
1840        Ok(())
1841    }
1842
1843    /// Asynchronously flushes all buffered writes to disk.
1844    ///
1845    /// This is the async-friendly version of `flush()`.
1846    pub async fn sync(&self) -> Result<()> {
1847        self.flush()
1848    }
1849
1850    /// Store a key-value pair (low-level storage)
1851    ///
1852    /// This is the low-level method. For documents, use `insert_into()` instead.
1853    /// Writes are buffered and batched for performance.
1854    ///
1855    /// # Arguments
1856    /// * `key` - Unique key (format: "collection:id" for documents)
1857    /// * `value` - Raw bytes to store
1858    /// * `ttl` - Optional time-to-live (None = permanent)
1859    ///
1860    /// # Performance
1861    /// - Buffered writes: ~15-30K docs/sec
1862    /// - Batching improves throughput significantly
1863    /// - Call `flush()` to ensure data is persisted
1864    ///
1865    /// # Examples
1866    ///
1867    /// ```ignore
1868    /// use std::time::Duration;
1869    ///
1870    /// // Permanent storage
1871    /// let data = serde_json::to_vec(&my_struct)?;
1872    /// db.put("mykey".to_string(), data, None)?;
1873    ///
1874    /// // With TTL (expires after 1 hour)
1875    /// db.put("session:abc".to_string(), session_data, Some(Duration::from_secs(3600)))?;
1876    ///
1877    /// // Better: use insert_into() for documents
1878    /// db.insert_into("users", vec![("name", Value::String("Alice".into()))])?;
1879    /// ```
1880    pub async fn put(&self, key: String, value: Vec<u8>, ttl: Option<Duration>) -> Result<()> {
1881        const MAX_BLOB_SIZE: usize = 50 * 1024 * 1024;
1882
1883        if value.len() > MAX_BLOB_SIZE {
1884            return Err(AqlError::invalid_operation(format!(
1885                "Blob size {} exceeds maximum allowed size of {}MB",
1886                value.len() / (1024 * 1024),
1887                MAX_BLOB_SIZE / (1024 * 1024)
1888            )));
1889        }
1890
1891        // If inside a transaction scope, buffer the write instead of touching
1892        // storage.  The buffer is flushed atomically on commit or discarded on
1893        // rollback — nothing lands in WAL/cold/hot until then.
1894        if let Ok(tx_id) = crate::transaction::ACTIVE_TRANSACTION_ID.try_with(|id| *id) {
1895            if let Some(buffer) = self.transaction_manager.active_transactions.get(&tx_id) {
1896                buffer.write(key, value);
1897                return Ok(());
1898            }
1899        }
1900
1901        // --- OPTIMIZATION: Wrap in Arc ONCE ---
1902        let key_arc = Arc::new(key);
1903        let value_arc = Arc::new(value);
1904
1905        // Check if this is a blob (blobs bypass write buffer and hot cache)
1906        let is_blob = value_arc.starts_with(b"BLOB:");
1907
1908        // --- 1. WAL Write (Non-blocking send of Arcs) ---
1909        if let Some(ref sender) = self.wal_writer
1910            && self.config.durability_mode != DurabilityMode::None
1911            && !self.wal_disabled.load(Ordering::Relaxed)
1912        {
1913            sender
1914                .send(WalOperation::Put {
1915                    key: Arc::clone(&key_arc),
1916                    value: Arc::clone(&value_arc),
1917                })
1918                .map_err(|_| {
1919                    AqlError::new(
1920                        ErrorCode::InternalError,
1921                        "WAL writer channel closed".to_string(),
1922                    )
1923                })?;
1924        }
1925
1926        // Check if this key should be cached (false for Any-field collections)
1927        let should_cache = self.should_cache_key(&key_arc);
1928
1929        // --- 2. Cold Store Write ---
1930        if is_blob || !should_cache {
1931            // Blobs and Any-field docs write directly to cold storage
1932            // (blobs: avoid memory pressure, Any-fields: ensure immediate queryability)
1933            self.cold.set(key_arc.to_string(), value_arc.to_vec())?;
1934        } else if let Some(ref write_buffer) = self.write_buffer {
1935            write_buffer.write(Arc::clone(&key_arc), Arc::clone(&value_arc))?;
1936        } else {
1937            self.cold.set(key_arc.to_string(), value_arc.to_vec())?;
1938        }
1939
1940        // --- 3. Hot Cache Write ---
1941        if should_cache {
1942            if is_blob {
1943                // For blobs: cache only a lightweight reference (not the actual data)
1944                // Format: "BLOBREF:<size>" - just 16-24 bytes instead of potentially MB
1945                let blob_ref = format!("BLOBREF:{}", value_arc.len());
1946                self.hot
1947                    .set(Arc::clone(&key_arc), Arc::new(blob_ref.into_bytes()), ttl);
1948            } else {
1949                self.hot
1950                    .set(Arc::clone(&key_arc), Arc::clone(&value_arc), ttl);
1951            }
1952        }
1953
1954        // --- 4. Indexing (skip for blobs and system keys) ---
1955        if !is_blob {
1956            if let Some(collection_name) = key_arc.split(':').next()
1957                && !collection_name.starts_with('_')
1958            {
1959                self.index_value(collection_name, &key_arc, &value_arc, None)?;
1960            }
1961        }
1962
1963        Ok(())
1964    }
1965    /// Rebuild primary index from cold storage.
1966    ///
1967    /// Scans all `_collection:<name>` keys to discover collections, then for
1968    /// each collection scans `<name>:<id>` keys and inserts them into the
1969    /// in-memory primary index.  This is idempotent — entries already present
1970    /// (e.g. from WAL replay) are left unchanged.
1971    fn rebuild_primary_index_from_cold(&self) -> Result<()> {
1972        let collection_prefix = "_collection:";
1973        let collection_names: Vec<String> = self
1974            .cold
1975            .scan_prefix(collection_prefix)
1976            .filter_map(|r| r.ok())
1977            .map(|(key, _)| key.trim_start_matches(collection_prefix).to_string())
1978            .collect();
1979
1980        for collection_name in collection_names {
1981            let doc_prefix = format!("{}:", collection_name);
1982            for result in self.cold.scan_prefix(&doc_prefix) {
1983                if let Ok((key, value)) = result {
1984                    let primary_index = self
1985                        .primary_indices
1986                        .entry(collection_name.clone())
1987                        .or_default();
1988                    primary_index
1989                        .entry(key)
1990                        .or_insert_with(|| DiskLocation::new(value.len()));
1991
1992                    // Rebuild the _sid ↔ internal-u32 mapping so that bitmaps
1993                    // loaded from the checkpoint can be translated back to real
1994                    // external IDs via get_external_id(). Without this, every
1995                    // u32 in a persisted bitmap maps to None after restart.
1996                    if let Ok(doc) = self.deserialize_internal::<Document>(&value) {
1997                        let _ = self.get_or_create_internal_id(&doc._sid);
1998                    }
1999                }
2000            }
2001        }
2002
2003        Ok(())
2004    }
2005
2006    /// Replay WAL entries to recover from crash
2007    ///
2008    /// Handles transaction boundaries:
2009    /// - Operations within a committed transaction are applied
2010    /// - Operations within a rolled-back transaction are discarded
2011    /// - Operations within an uncommitted transaction (crash during tx) are discarded
2012    async fn replay_wal(&self, entries: Vec<crate::wal::LogEntry>) -> Result<()> {
2013        // Buffer for operations within a transaction
2014        let mut tx_buffer: Vec<crate::wal::LogEntry> = Vec::new();
2015        let mut in_transaction = false;
2016
2017        for entry in entries {
2018            match entry.operation {
2019                Operation::BeginTx => {
2020                    // Start buffering operations
2021                    in_transaction = true;
2022                    tx_buffer.clear();
2023                }
2024                Operation::CommitTx => {
2025                    // Apply all buffered operations
2026                    for buffered_entry in tx_buffer.drain(..) {
2027                        self.apply_wal_entry(buffered_entry).await?;
2028                    }
2029                    in_transaction = false;
2030                }
2031                Operation::RollbackTx => {
2032                    // Discard buffered operations
2033                    tx_buffer.clear();
2034                    in_transaction = false;
2035                }
2036                Operation::Put | Operation::Delete => {
2037                    if in_transaction {
2038                        // Buffer the operation for later
2039                        tx_buffer.push(entry);
2040                    } else {
2041                        // Apply immediately (not in transaction)
2042                        self.apply_wal_entry(entry).await?;
2043                    }
2044                }
2045            }
2046        }
2047
2048        // If we end with in_transaction = true, it means we crashed mid-transaction
2049        // Those operations in tx_buffer are discarded (not committed)
2050        if in_transaction {
2051            eprintln!(
2052                "WAL replay: Discarding {} uncommitted transaction operations",
2053                tx_buffer.len()
2054            );
2055        }
2056
2057        // Flush after replay and truncate WAL
2058        self.cold.flush()?;
2059        if let Some(ref wal) = self.wal {
2060            wal.write().unwrap().truncate()?;
2061        }
2062
2063        Ok(())
2064    }
2065
2066    /// Apply a single WAL entry to storage
2067    async fn apply_wal_entry(&self, entry: crate::wal::LogEntry) -> Result<()> {
2068        match entry.operation {
2069            Operation::Put => {
2070                if let Some(value) = entry.value {
2071                    // Write directly to cold storage (skip WAL, already logged)
2072                    self.cold.set(entry.key.clone(), value.clone())?;
2073
2074                    // Update hot cache
2075                    if self.should_cache_key(&entry.key) {
2076                        self.hot
2077                            .set(Arc::new(entry.key.clone()), Arc::new(value.clone()), None);
2078                    }
2079
2080                    // Rebuild indices
2081                    if let Some(collection) = entry.key.split(':').next()
2082                        && !collection.starts_with('_')
2083                    {
2084                        self.index_value(collection, &entry.key, &value, None)?;
2085                    }
2086                }
2087            }
2088            Operation::Delete => {
2089                // Remove from indices before deleting the data
2090                if let Some(collection) = entry.key.split(':').next()
2091                    && !collection.starts_with('_')
2092                {
2093                    let id = entry.key.split(':').nth(1).unwrap_or("");
2094                    if let Ok(Some(doc)) = self.get_document(collection, id) {
2095                        let _ = self.remove_from_indices(collection, &doc);
2096                    } else if let Some(index) = self.primary_indices.get_mut(collection) {
2097                        index.remove(&entry.key);
2098                    }
2099                }
2100                self.cold.delete(&entry.key)?;
2101                self.hot.delete(&entry.key);
2102            }
2103            _ => {} // Transaction markers handled in replay_wal
2104        }
2105        Ok(())
2106    }
2107
2108    #[cfg(test)]
2109    fn ensure_schema_hot(&self, collection: &str) -> Result<Arc<Collection>> {
2110        // 1. Check the high-performance object cache first (O(1))
2111        if let Some(schema) = self.schema_cache.get(collection) {
2112            return Ok(schema.value().clone());
2113        }
2114
2115        let collection_key = format!("_collection:{}", collection);
2116
2117        // 2. Fallback to Hot Byte Cache (parse if found)
2118        if let Some(data) = self.hot.get(&collection_key) {
2119            return serde_json::from_slice::<Collection>(&data)
2120                .map(|s| {
2121                    let arc_s = Arc::new(s);
2122                    // Populate object cache to avoid this parse next time
2123                    self.schema_cache
2124                        .insert(collection.to_string(), arc_s.clone());
2125                    arc_s
2126                })
2127                .map_err(|_| {
2128                    AqlError::new(
2129                        ErrorCode::SchemaError,
2130                        "Failed to parse schema from hot cache",
2131                    )
2132                });
2133        }
2134
2135        // 3. Fallback to Cold Storage
2136        if let Some(data) = self.get(&collection_key)? {
2137            // Update Hot Cache
2138            self.hot.set(
2139                Arc::new(collection_key.clone()),
2140                Arc::new(data.clone()),
2141                None,
2142            );
2143
2144            // Parse and update Schema Cache
2145            let schema = serde_json::from_slice::<Collection>(&data)?;
2146            let arc_schema = Arc::new(schema);
2147            self.schema_cache
2148                .insert(collection.to_string(), arc_schema.clone());
2149
2150            return Ok(arc_schema);
2151        }
2152
2153        Err(AqlError::new(
2154            ErrorCode::SchemaError,
2155            format!("Failed to load schema for collection '{}'", collection),
2156        ))
2157    }
2158
2159    /// Smart collection scan that uses the primary index as a key directory
2160    /// Avoids forced flushes and leverages hot cache for better performance
2161    fn scan_collection(&self, collection: &str) -> Result<Vec<Document>> {
2162        // Use the primary index as a "key directory" - it contains all document keys
2163        if let Some(index) = self.primary_indices.get(collection) {
2164            // Pre-allocate based on index size to avoid reallocations
2165            let mut documents = Vec::with_capacity(index.len());
2166
2167            // Iterate through all keys in the primary index (fast, in-memory)
2168            for entry in index.iter() {
2169                let key = entry.key();
2170
2171                // Fetch document via hot cache -> cold storage fallback
2172                if let Some(data) = self.get(key)? {
2173                    if let Ok(mut doc) = serde_json::from_slice::<Document>(&data) {
2174                        // Apply computed fields
2175                        if let Ok(computed) = self.computed.read() {
2176                            let _ = computed.apply(collection, &mut doc);
2177                        }
2178                        documents.push(doc);
2179                    }
2180                }
2181            }
2182            Ok(documents)
2183        } else {
2184            // Fallback: scan from cold storage if primary index not yet initialized
2185            let prefix = format!("{}:", collection);
2186            let mut documents = Vec::new();
2187            for result in self.cold.scan_prefix(&prefix) {
2188                if let Ok((_key, value)) = result {
2189                    if let Ok(mut doc) = serde_json::from_slice::<Document>(&value) {
2190                        // Apply computed fields
2191                        if let Ok(computed) = self.computed.read() {
2192                            let _ = computed.apply(collection, &mut doc);
2193                        }
2194                        documents.push(doc);
2195                    }
2196                }
2197            }
2198            Ok(documents)
2199        }
2200    }
2201
2202    /// Scan collection with filter and early termination support
2203    /// Used by QueryBuilder for optimized queries with LIMIT
2204    pub fn scan_and_filter<F>(
2205        &self,
2206        collection: &str,
2207        filter_fn: F,
2208        limit: Option<usize>,
2209    ) -> Result<Vec<Document>>
2210    where
2211        F: Fn(&Document) -> bool,
2212    {
2213        let mut results = Vec::new();
2214        if let Some(index) = self.primary_indices.get(collection) {
2215            for entry in index.iter() {
2216                if let Some(data) = self.get(entry.key())? {
2217                    if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
2218                        if filter_fn(&doc) {
2219                            results.push(doc);
2220                            if let Some(l) = limit {
2221                                if results.len() >= l {
2222                                    break;
2223                                }
2224                            }
2225                        }
2226                    }
2227                }
2228            }
2229        } else {
2230            // No primary index (e.g. system collections like _migrations): fall back to cold scan.
2231            // Flush the write buffer first so any recently buffered writes land in cold storage
2232            // and are visible to the prefix scan below.
2233            if let Some(ref wb) = self.write_buffer {
2234                let _ = wb.flush();
2235            }
2236            let prefix = format!("{}:", collection);
2237            for result in self.cold.scan_prefix(&prefix) {
2238                if let Ok((_key, value)) = result {
2239                    if let Ok(doc) = serde_json::from_slice::<Document>(&value) {
2240                        if filter_fn(&doc) {
2241                            results.push(doc);
2242                            if let Some(l) = limit {
2243                                if results.len() >= l {
2244                                    break;
2245                                }
2246                            }
2247                        }
2248                    }
2249                }
2250            }
2251        }
2252        Ok(results)
2253    }
2254
2255    // Restore missing methods
2256    pub async fn put_blob(&self, key: String, file_path: &Path) -> Result<()> {
2257        const MAX_FILE_SIZE: usize = 50 * 1024 * 1024; // 50MB limit
2258
2259        // Get file metadata to check size before reading
2260        let metadata = tokio::fs::metadata(file_path).await?;
2261        let file_size = metadata.len() as usize;
2262
2263        if file_size > MAX_FILE_SIZE {
2264            return Err(AqlError::invalid_operation(format!(
2265                "File size {} MB exceeds maximum allowed size of {} MB",
2266                file_size / (1024 * 1024),
2267                MAX_FILE_SIZE / (1024 * 1024)
2268            )));
2269        }
2270
2271        let mut file = File::open(file_path).await?;
2272        let mut buffer = Vec::new();
2273        file.read_to_end(&mut buffer).await?;
2274
2275        // Add BLOB: prefix to mark this as blob data
2276        let mut blob_data = Vec::with_capacity(5 + buffer.len());
2277        blob_data.extend_from_slice(b"BLOB:");
2278        blob_data.extend_from_slice(&buffer);
2279
2280        self.put(key, blob_data, None).await
2281    }
2282
2283    /// Create a new collection with schema definition
2284    ///
2285    /// Collections are like tables in SQL - they define the structure of your documents.
2286    /// The third boolean parameter indicates if the field should be indexed for fast lookups.
2287    ///
2288    /// # Arguments
2289    /// * `name` - Collection name
2290    /// * `fields` - Vector of (field_name, field_type, indexed) tuples
2291    ///   - Field name (accepts both &str and String)
2292    ///   - Field type (String, Int, Float, Bool, etc.)
2293    ///   - Indexed: true for fast lookups, false for no index
2294    ///
2295    /// # Performance
2296    /// - Indexed fields: Fast equality queries (O(1) lookup)
2297    /// - Non-indexed fields: Full scan required for queries
2298    /// - Unique fields are automatically indexed
2299    ///
2300    /// # Examples
2301    ///
2302    /// ```ignore
2303    /// use aurora_db::{Aurora, types::FieldType};
2304    ///
2305    /// let db = Aurora::open("mydb.db")?;
2306    ///
2307    /// // Create a users collection
2308    /// db.new_collection("users", vec![
2309    ///     ("name", FieldType::Scalar(crate::types::ScalarType::String), false),      // Not indexed
2310    ///     ("email", FieldType::Scalar(crate::types::ScalarType::String), true),      // Indexed - fast lookups
2311    ///     ("age", FieldType::Scalar(crate::types::ScalarType::Int), false),
2312    ///     ("active", FieldType::Scalar(crate::types::ScalarType::Bool), true),       // Indexed
2313    ///     ("score", FieldType::Float, false),
2314    /// ])?;
2315    ///
2316    /// // Idempotent - calling again is safe
2317    /// db.new_collection("users", vec![/* ... */])?.await; // OK!
2318    /// ```
2319    pub async fn new_collection<F: IntoFieldDefinition>(
2320        &self,
2321        name: &str,
2322        fields: Vec<F>,
2323    ) -> Result<()> {
2324        println!("DB: Creating collection: {}", name);
2325        let collection_key = format!("_collection:{}", name);
2326
2327        // Check if collection already exists - if so, just return Ok (idempotent)
2328        if self.get(&collection_key)?.is_some() {
2329            return Ok(());
2330        }
2331
2332        // Create field definitions
2333        let mut field_definitions = HashMap::new();
2334        for field in fields {
2335            let (field_name, field_def) = field.into_field_definition();
2336
2337            if field_def.field_type == FieldType::Any && field_def.unique {
2338                return Err(AqlError::new(
2339                    ErrorCode::InvalidDefinition,
2340                    "Fields of type 'Any' cannot be unique or indexed.".to_string(),
2341                ));
2342            }
2343
2344            field_definitions.insert(field_name, field_def);
2345        }
2346
2347        let collection = Collection {
2348            name: name.to_string(),
2349            fields: field_definitions,
2350            // REMOVED: unique_fields is now derived from fields
2351        };
2352
2353        let collection_data = serde_json::to_vec(&collection)?;
2354        self.put(collection_key, collection_data, None).await?;
2355
2356        // Invalidate schema cache since we just created/updated the collection schema
2357        self.schema_cache.remove(name);
2358
2359        Ok(())
2360    }
2361
2362    /// Insert a document into a collection
2363    ///
2364    /// Automatically generates a UUID for the document and validates against
2365    /// collection schema and unique constraints. Returns the generated document ID.
2366    ///
2367    /// # Performance
2368    /// - Single insert: ~15,000 docs/sec
2369    /// - Bulk insert: Use `batch_insert()` for 10+ documents (~50,000 docs/sec)
2370    /// - Triggers PubSub events for real-time listeners
2371    ///
2372    /// # Arguments
2373    /// * `collection` - Name of the collection to insert into
2374    /// * `data` - Document fields and values to insert
2375    ///
2376    /// # Returns
2377    /// The auto-generated ID of the inserted document or an error
2378    ///
2379    /// # Errors
2380    /// - `CollectionNotFound`: Collection doesn't exist
2381    /// - `ValidationError`: Data violates schema or unique constraints
2382    /// - `SerializationError`: Invalid data format
2383    ///
2384    /// # Examples
2385    ///
2386
2387    /// use aurora_db::{Aurora, types::Value};
2388    ///
2389    /// let db = Aurora::open("mydb.db")?;
2390    ///
2391    /// // Basic insertion
2392    /// let user_id = db.insert_into("users", vec![
2393    ///     ("name", Value::String("Alice Smith".to_string())),
2394    ///     ("email", Value::String("alice@example.com".to_string())),
2395    ///     ("age", Value::Int(28)),
2396    ///     ("active", Value::Bool(true)),
2397    /// ]).await?;
2398    ///
2399    /// println!("Created user with ID: {}", user_id);
2400    ///
2401    /// // Inserting with nested data
2402    /// let order_id = db.insert_into("orders", vec![
2403    ///     ("user_id", Value::String(user_id.clone())),
2404    ///     ("total", Value::Float(99.99)),
2405    ///     ("status", Value::String("pending".to_string())),
2406    ///     ("items", Value::Array(vec![
2407    ///         Value::String("item-123".to_string()),
2408    ///         Value::String("item-456".to_string()),
2409    ///     ])),
2410    /// ]).await?;
2411    ///
2412    /// // Error handling - unique constraint violation
2413    /// match db.insert_into("users", vec![
2414    ///     ("email", Value::String("alice@example.com".to_string())),  // Duplicate!
2415    ///     ("name", Value::String("Alice Clone".to_string())),
2416    /// ]).await {
2417    ///     Ok(id) => println!("Inserted: {}", id),
2418    ///     Err(e) => println!("Failed: {} (email already exists)", e),
2419    /// }
2420    ///
2421    /// // For bulk inserts (10+ documents), use batch_insert() instead
2422    /// let users = vec![
2423    ///     HashMap::from([
2424    ///         ("name".to_string(), Value::String("Bob".to_string())),
2425    ///         ("email".to_string(), Value::String("bob@example.com".to_string())),
2426    ///     ]),
2427    ///     HashMap::from([
2428    ///         ("name".to_string(), Value::String("Carol".to_string())),
2429    ///         ("email".to_string(), Value::String("carol@example.com".to_string())),
2430    ///     ]),
2431    ///     // ... more documents
2432    /// ];
2433    /// let ids = db.batch_insert("users", users).await?;  // 3x faster!
2434    /// println!("Inserted {} users", ids.len());
2435    /// ```ignore
2436    pub async fn insert_into(&self, collection: &str, data: Vec<(&str, Value)>) -> Result<String> {
2437        // Convert Vec<(&str, Value)> to HashMap<String, Value>
2438        let data_map: HashMap<String, Value> =
2439            data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
2440
2441        // Validate unique constraints before inserting
2442        self.validate_unique_constraints(collection, &data_map)
2443            .await?;
2444
2445        let doc_id = Uuid::now_v7().to_string();
2446        let document = Document {
2447            _sid: doc_id.clone(),
2448            data: data_map,
2449        };
2450
2451        self.put(
2452            format!("{}:{}", collection, doc_id),
2453            serde_json::to_vec(&document)?,
2454            None,
2455        )
2456        .await?;
2457
2458        // Publish insert event
2459        let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
2460        let _ = self.pubsub.publish(event);
2461
2462        Ok(doc_id)
2463    }
2464
2465    pub async fn insert_map(
2466        &self,
2467        collection: &str,
2468        data: HashMap<String, Value>,
2469    ) -> Result<String> {
2470        // Validate unique constraints before inserting
2471        self.validate_unique_constraints(collection, &data).await?;
2472
2473        let doc_id = Uuid::now_v7().to_string();
2474        let document = Document {
2475            _sid: doc_id.clone(),
2476            data,
2477        };
2478
2479        self.put(
2480            format!("{}:{}", collection, doc_id),
2481            serde_json::to_vec(&document)?,
2482            None,
2483        )
2484        .await?;
2485
2486        // Publish insert event
2487        let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
2488        let _ = self.pubsub.publish(event);
2489
2490        Ok(doc_id)
2491    }
2492
2493    /// Batch insert multiple documents with optimized write path
2494    ///
2495    /// Inserts multiple documents in a single optimized operation, bypassing
2496    /// the write buffer for better performance. Ideal for bulk data loading,
2497    /// migrations, or initial database seeding. 3x faster than individual inserts.
2498    ///
2499    /// # Performance
2500    /// - Insert speed: ~50,000 docs/sec (vs ~15,000 for single inserts)
2501    /// - Batch writes to WAL and storage
2502    /// - Validates all unique constraints
2503    /// - Use for 10+ documents minimum
2504    ///
2505    /// # Arguments
2506    /// * `collection` - Name of the collection to insert into
2507    /// * `documents` - Vector of document data as HashMaps
2508    ///
2509    /// # Returns
2510    /// Vector of auto-generated document IDs or an error
2511    ///
2512    /// # Examples
2513    ///
2514
2515    /// use aurora_db::{Aurora, types::Value};
2516    /// use std::collections::HashMap;
2517    ///
2518    /// let db = Aurora::open("mydb.db")?;
2519    ///
2520    /// // Bulk user import
2521    /// let users = vec![
2522    ///     HashMap::from([
2523    ///         ("name".to_string(), Value::String("Alice".into())),
2524    ///         ("email".to_string(), Value::String("alice@example.com".into())),
2525    ///         ("age".to_string(), Value::Int(28)),
2526    ///     ]),
2527    ///     HashMap::from([
2528    ///         ("name".to_string(), Value::String("Bob".into())),
2529    ///         ("email".to_string(), Value::String("bob@example.com".into())),
2530    ///         ("age".to_string(), Value::Int(32)),
2531    ///     ]),
2532    ///     HashMap::from([
2533    ///         ("name".to_string(), Value::String("Carol".into())),
2534    ///         ("email".to_string(), Value::String("carol@example.com".into())),
2535    ///         ("age".to_string(), Value::Int(25)),
2536    ///     ]),
2537    /// ];
2538    ///
2539    /// let ids = db.batch_insert("users", users).await?;
2540    /// println!("Inserted {} users", ids.len());
2541    ///
2542    /// // Seeding test data
2543    /// let test_products: Vec<HashMap<String, Value>> = (0..1000)
2544    ///     .map(|i| HashMap::from([
2545    ///         ("sku".to_string(), Value::String(format!("PROD-{:04}", i))),
2546    ///         ("price".to_string(), Value::Float(9.99 + i as f64)),
2547    ///         ("stock".to_string(), Value::Int(100)),
2548    ///     ]))
2549    ///     .collect();
2550    ///
2551    /// let ids = db.batch_insert("products", test_products).await?;
2552    /// // Much faster than 1000 individual insert_into() calls!
2553    ///
2554    /// // Migration from CSV data
2555    /// let mut csv_reader = csv::Reader::from_path("data.csv")?;
2556    /// let mut batch = Vec::new();
2557    ///
2558    /// for result in csv_reader.records() {
2559    ///     let record = result?;
2560    ///     let doc = HashMap::from([
2561    ///         ("field1".to_string(), Value::String(record[0].to_string())),
2562    ///         ("field2".to_string(), Value::String(record[1].to_string())),
2563    ///     ]);
2564    ///     batch.push(doc);
2565    ///
2566    ///     // Insert in batches of 1000
2567    ///     if batch.len() >= 1000 {
2568    ///         db.batch_insert("imported_data", batch.clone()).await?;
2569    ///         batch.clear();
2570    ///     }
2571    /// }
2572    ///
2573    /// // Insert remaining
2574    /// if !batch.is_empty() {
2575    ///     db.batch_insert("imported_data", batch).await?;
2576    /// }
2577    /// ```
2578    ///
2579    /// # Errors
2580    /// - `ValidationError`: Unique constraint violation on any document
2581    /// - `CollectionNotFound`: Collection doesn't exist
2582    /// - `IoError`: Storage write failure
2583    ///
2584    /// # Important Notes
2585    /// - All inserts are atomic - if one fails, none are inserted
2586    /// - UUIDs are auto-generated for all documents
2587    /// - PubSub events are published for each insert
2588    /// - For 10+ documents, this is 3x faster than individual inserts
2589    /// - For < 10 documents, use `insert_into()` instead
2590    ///
2591    /// # See Also
2592    /// - `insert_into()` for single document inserts
2593    /// - `import_from_json()` for file-based bulk imports
2594    /// - `batch_write()` for low-level batch operations
2595    pub async fn batch_insert(
2596        &self,
2597        collection: &str,
2598        documents: Vec<HashMap<String, Value>>,
2599    ) -> Result<Vec<String>> {
2600        let mut doc_ids = Vec::with_capacity(documents.len());
2601        let mut pairs = Vec::with_capacity(documents.len());
2602        // Keep Document objects alive so pubsub doesn't need to re-read from cold storage
2603        let mut doc_objects = Vec::with_capacity(documents.len());
2604
2605        // Prepare all documents
2606        for data in documents {
2607            // Validate unique constraints
2608            self.validate_unique_constraints(collection, &data).await?;
2609
2610            let doc_id = Uuid::now_v7().to_string();
2611            let document = Document {
2612                _sid: doc_id.clone(),
2613                data,
2614            };
2615
2616            let key = format!("{}:{}", collection, doc_id);
2617            let value = serde_json::to_vec(&document)?;
2618
2619            pairs.push((key, value));
2620            doc_ids.push(doc_id);
2621            doc_objects.push(document);
2622        }
2623
2624        // Write to WAL in batch (if enabled)
2625        if let Some(ref wal) = self.wal
2626            && self.config.durability_mode != DurabilityMode::None
2627        {
2628            let mut wal_lock = wal.write().unwrap();
2629            for (key, value) in &pairs {
2630                wal_lock.append(Operation::Put, key, Some(value))?;
2631            }
2632        }
2633
2634        // Bypass write buffer - go directly to cold storage batch API
2635        self.cold.batch_set(pairs.clone())?;
2636
2637        // Note: Durability is handled by background checkpoint process
2638
2639        // Update hot cache and indices
2640        for (key, value) in pairs {
2641            if self.should_cache_key(&key) {
2642                self.hot
2643                    .set(Arc::new(key.clone()), Arc::new(value.clone()), None);
2644            }
2645
2646            if let Some(collection_name) = key.split(':').next()
2647                && !collection_name.starts_with('_')
2648            {
2649                self.index_value(collection_name, &key, &value, None)?;
2650            }
2651        }
2652
2653        // Publish events — use already-in-memory Document objects, no cold storage re-read
2654        for (doc_id, doc) in doc_ids.iter().zip(doc_objects.into_iter()) {
2655            let event = crate::pubsub::ChangeEvent::insert(collection, doc_id, doc);
2656            let _ = self.pubsub.publish(event);
2657        }
2658
2659        Ok(doc_ids)
2660    }
2661
2662    /// Update a document by ID
2663    ///
2664    /// # Arguments
2665    /// * `collection` - Collection name
2666    /// * `doc_id` - Document ID to update
2667    /// * `data` - New field values to set
2668    ///
2669    /// # Returns
2670    /// Updates specific fields in an existing document.
2671    ///
2672    /// # Arguments
2673    /// * `collection` - The collection name.
2674    /// * `doc_id` - The ID of the document to update.
2675    /// * `updates` - a vector of field-value pairs to update.
2676    ///
2677    /// # Example
2678    /// ```rust
2679    /// # use aurora_db::{Aurora, Value};
2680    /// # async fn example(db: Aurora) -> Result<(), Box<dyn std::error::Error>> {
2681    /// db.update_document("users", "u123", vec![
2682    ///     ("active", Value::Bool(false)),
2683    /// ]).await?;
2684    /// # Ok(())
2685    /// # }
2686    /// ```
2687    pub async fn update_document(
2688        &self,
2689        collection: &str,
2690        doc_id: &str,
2691        updates: Vec<(&str, Value)>,
2692    ) -> Result<()> {
2693        // Get existing document
2694        let mut document = self.get_document(collection, doc_id)?.ok_or_else(|| {
2695            AqlError::new(
2696                ErrorCode::NotFound,
2697                format!("Document not found: {}", doc_id),
2698            )
2699        })?;
2700
2701        // Store old document for event
2702        let old_document = document.clone();
2703
2704        // Apply updates
2705        for (field, value) in updates {
2706            document.data.insert(field.to_string(), value);
2707        }
2708
2709        // Validate unique constraints after update (excluding current document)
2710        self.validate_unique_constraints_excluding(collection, &document.data, doc_id)
2711            .await?;
2712
2713        // Save updated document
2714        self.put(
2715            format!("{}:{}", collection, doc_id),
2716            serde_json::to_vec(&document)?,
2717            None,
2718        )
2719        .await?;
2720
2721        // Remove stale index entries for modified fields
2722        let u = self.parse_external_id(doc_id);
2723        if let Some(internal_id) = self._sid_dictionary.get(&u).map(|e| *e.value()) {
2724            for (field, old_val) in &old_document.data {
2725                if let Some(new_val) = document.data.get(field) {
2726                    if new_val == old_val {
2727                        continue;
2728                    }
2729                }
2730                
2731                let index_key = format!("{}:{}", collection, field);
2732                if let Some(index_map) = self.secondary_indices.get(&index_key) {
2733                    let val_str = match old_val {
2734                        Value::String(s) => s.clone(),
2735                        _ => old_val.to_string(),
2736                    };
2737                    if let Some(storage_arc) = index_map.get(&val_str) {
2738                        if let Ok(mut storage) = storage_arc.value().write() {
2739                            println!("REMOVING {} FROM INDEX {}", internal_id, index_key);
2740                            storage.remove(internal_id);
2741                        }
2742                    } else {
2743                        println!("STORAGE ARC NOT FOUND FOR {}", val_str);
2744                    }
2745                } else {
2746                    println!("INDEX MAP NOT FOUND FOR {}", index_key);
2747                }
2748            }
2749        } else {
2750            println!("INTERNAL ID NOT FOUND FOR {}", doc_id);
2751        }
2752
2753        // Publish update event
2754        let event =
2755            crate::pubsub::ChangeEvent::update(collection, doc_id, old_document, document.clone());
2756        let _ = self.pubsub.publish(event);
2757
2758        Ok(())
2759    }
2760
2761    /// Remove a single field key from a document's data (used by field-rename migrations).
2762    pub async fn drop_field_from_document(
2763        &self,
2764        collection: &str,
2765        doc_id: &str,
2766        field: &str,
2767    ) -> Result<()> {
2768        let key = format!("{}:{}", collection, doc_id);
2769        let existing = self.get(&key)?.ok_or_else(|| {
2770            AqlError::new(
2771                ErrorCode::NotFound,
2772                format!("Document {} not found", doc_id),
2773            )
2774        })?;
2775        let mut document: Document = serde_json::from_slice(&existing)?;
2776        document.data.remove(field);
2777        self.put(key, serde_json::to_vec(&document)?, None).await?;
2778        Ok(())
2779    }
2780
2781    /// Retrieves all documents from a collection.
2782    ///
2783    /// # Arguments
2784    /// * `collection` - The name of the collection.
2785    ///
2786    /// # Returns
2787    /// A vector containing all documents in the collection.
2788    ///
2789    /// # Performance
2790    /// For large collections, consider using `query().limit().collect()` instead to
2791    /// avoid loading too much data into memory at once.
2792    pub async fn get_all_collection(&self, collection: &str) -> Result<Vec<Document>> {
2793        self.ensure_indices_initialized().await?;
2794        self.scan_collection(collection)
2795    }
2796
2797    /// Searches for raw storage keys matching a pattern.
2798    ///
2799    /// Returns a list of (key, data_info) pairs. This is a low-level diagnostic tool.
2800    ///
2801    /// # Arguments
2802    /// * `pattern` - The string pattern to search for in keys.
2803    pub fn get_data_by_pattern(&self, pattern: &str) -> Result<Vec<(String, DataInfo)>> {
2804        let mut data = Vec::new();
2805
2806        // Scan from cold storage instead of primary index
2807        for result in self.cold.scan() {
2808            if let Ok((key, value)) = result {
2809                if key.contains(pattern) {
2810                    let info = if value.starts_with(b"BLOB:") {
2811                        DataInfo::Blob { size: value.len() }
2812                    } else {
2813                        DataInfo::Data {
2814                            size: value.len(),
2815                            preview: String::from_utf8_lossy(&value[..value.len().min(50)])
2816                                .into_owned(),
2817                        }
2818                    };
2819
2820                    data.push((key.clone(), info));
2821                }
2822            }
2823        }
2824
2825        Ok(data)
2826    }
2827
2828    /// Begin a transaction
2829    ///
2830    /// All operations after beginning a transaction will be part of the transaction
2831    /// until either commit_transaction() or rollback_transaction() is called.
2832    ///
2833    /// # Returns
2834    /// Success or an error (e.g., if a transaction is already in progress)
2835    ///
2836    /// # Examples
2837    ///
2838
2839    /// // Start a transaction for atomic operations
2840    /// db.begin_transaction()?;
2841    ///
2842    /// // Perform multiple operations
2843    /// db.insert_into("accounts", vec![("user_id", Value::String(user_id)), ("balance", Value::Float(100.0))])?;
2844    /// db.insert_into("audit_log", vec![("action", Value::String("account_created".to_string()))])?;
2845    ///
2846    /// // Commit all changes or roll back if there's an error
2847    /// if all_ok {
2848    ///     db.commit_transaction()?;
2849    /// } else {
2850    ///     db.rollback_transaction()?;
2851    /// }
2852    /// ```ignore
2853    /// Starts a new ACID-compliant transaction.
2854    ///
2855    /// Transactions allow you to group multiple operations together. Either all
2856    /// operations succeed and are committed, or none of them are applied.
2857    ///
2858    /// # Returns
2859    /// A unique `TransactionId` to identify this transaction in subsequent calls.
2860    ///
2861    /// # Example
2862    /// ```rust
2863    /// # use aurora_db::{Aurora, Value};
2864    /// # async fn example(db: Aurora) -> Result<(), Box<dyn std::error::Error>> {
2865    /// let tx_id = db.begin_transaction().await;
2866    ///
2867    /// // ... perform operations ...
2868    ///
2869    /// db.commit_transaction(tx_id).await?;
2870    /// # Ok(())
2871    /// # }
2872    /// ```
2873    pub async fn begin_transaction(&self) -> crate::transaction::TransactionId {
2874        let buffer = self.transaction_manager.begin();
2875        buffer._sid
2876    }
2877
2878    /// Commits a transaction, making all changes permanent.
2879    ///
2880    /// All operations within the transaction are atomically applied to the database.
2881    ///
2882    /// # Arguments
2883    /// * `tx_id` - The ID of the transaction to commit.
2884    pub async fn commit_transaction(&self, tx_id: crate::transaction::TransactionId) -> Result<()> {
2885        let buffer = self
2886            .transaction_manager
2887            .active_transactions
2888            .get(&tx_id)
2889            .ok_or_else(|| {
2890                AqlError::invalid_operation(
2891                    "Transaction not found or already completed".to_string(),
2892                )
2893            })?;
2894
2895        for item in buffer.writes.iter() {
2896            let key = item.key();
2897            let value = item.value();
2898            // WAL write for crash-durability (skipped during transaction to
2899            // avoid writing uncommitted entries that could be replayed)
2900            if let Some(ref sender) = self.wal_writer
2901                && self.config.durability_mode != DurabilityMode::None
2902            {
2903                let _ = sender.send(WalOperation::Put {
2904                    key: Arc::new(key.clone()),
2905                    value: Arc::new(value.clone()),
2906                });
2907            }
2908            self.cold.set(key.clone(), value.clone())?;
2909            if self.should_cache_key(key) {
2910                self.hot
2911                    .set(Arc::new(key.clone()), Arc::new(value.clone()), None);
2912            }
2913            if let Some(collection_name) = key.split(':').next()
2914                && !collection_name.starts_with('_')
2915            {
2916                self.index_value(collection_name, key, value, None)?;
2917            }
2918        }
2919
2920        for item in buffer.deletes.iter() {
2921            let key = item.key();
2922            if let Some((collection, id)) = key.split_once(':')
2923                && let Ok(Some(doc)) = self.get_document(collection, id)
2924            {
2925                self.remove_from_indices(collection, &doc)?;
2926            }
2927            self.cold.delete(key)?;
2928            self.hot.delete(key);
2929        }
2930
2931        // Drop the buffer reference to release the DashMap read lock
2932        // before calling commit which needs to remove the entry (write lock)
2933        drop(buffer);
2934
2935        self.transaction_manager.commit(tx_id)?;
2936
2937        self.cold.compact()?;
2938
2939        Ok(())
2940    }
2941
2942    /// Roll back a transaction, discarding all changes
2943    ///
2944    /// All operations within the transaction are discarded. The database state
2945    /// remains unchanged. Use this when an error occurs during transaction processing.
2946    ///
2947    /// # Arguments
2948    /// * `tx_id` - Transaction ID returned from begin_transaction()
2949    ///
2950    /// # Examples
2951    ///
2952
2953    /// use aurora_db::{Aurora, types::Value};
2954    ///
2955    /// let db = Aurora::open("mydb.db")?;
2956    ///
2957    /// // Attempt a transfer with validation
2958    /// let tx_id = db.begin_transaction();
2959    ///
2960    /// let result = async {
2961    ///     // Deduct from Alice
2962    ///     let alice = db.get_document("accounts", "alice").await?;
2963    ///     let balance = alice.and_then(|doc| doc.data.get("balance"));
2964    ///
2965    ///     if let Some(Value::Int(bal)) = balance {
2966    ///         if *bal < 100 {
2967    ///             return Err("Insufficient funds");
2968    ///         }
2969    ///
2970    ///         db.update_document("accounts", "alice", vec![
2971    ///             ("balance", Value::Int(bal - 100)),
2972    ///         ]).await?;
2973    ///
2974    ///         db.update_document("accounts", "bob", vec![
2975    ///             ("balance", Value::Int(600)),
2976    ///         ]).await?;
2977    ///
2978    ///         Ok(())
2979    ///     } else {
2980    ///         Err("Account not found")
2981    ///     }
2982    /// }.await;
2983    ///
2984    /// match result {
2985    ///     Ok(_) => {
2986    ///         db.commit_transaction(tx_id)?;
2987    ///         println!("Transfer completed");
2988    ///     }
2989    ///     Err(e) => {
2990    ///         db.rollback_transaction(tx_id)?;
2991    ///         println!("Transfer failed: {}, changes rolled back", e);
2992    ///     }
2993    /// }
2994    /// ```
2995    /// Aborts a transaction and discards all pending changes.
2996    ///
2997    /// # Arguments
2998    /// * `tx_id` - The ID of the transaction to roll back.
2999    pub async fn rollback_transaction(
3000        &self,
3001        tx_id: crate::transaction::TransactionId,
3002    ) -> Result<()> {
3003        self.transaction_manager.rollback(tx_id)
3004    }
3005
3006    /// Create a secondary index on a field for faster queries
3007    ///
3008    /// Indexes dramatically improve query performance for frequently accessed fields,
3009    /// trading increased memory usage and slower writes for much faster reads.
3010    ///
3011    /// # When to Create Indexes
3012    /// - **Frequent queries**: Fields used in 80%+ of your queries
3013    /// - **High cardinality**: Fields with many unique values (user_id, email)
3014    /// - **Sorting/filtering**: Fields used in ORDER BY or WHERE clauses
3015    /// - **Large collections**: Most beneficial with 10,000+ documents
3016    ///
3017    /// # When NOT to Index
3018    /// - Low cardinality fields (e.g., boolean flags, small enums)
3019    /// - Rarely queried fields
3020    /// - Fields that change frequently (write-heavy workloads)
3021    /// - Small collections (<1,000 documents) - full scans are fast enough
3022    ///
3023    /// # Performance Characteristics
3024    /// - **Query speedup**: O(n) → O(1) for equality filters
3025    /// - **Memory cost**: ~100-200 bytes per document per index
3026    /// - **Write slowdown**: ~20-30% longer insert/update times
3027    /// - **Build time**: ~5,000 docs/sec for initial indexing
3028    /// Create a new collection with the given schema
3029    ///
3030    /// # Arguments
3031    /// * `name` - Collection name
3032    /// * `fields` - Field definitions as tuples of (name, type, unique)
3033    ///   - The boolean indicates whether the field has a **unique constraint**
3034    ///   - Unique fields are automatically indexed
3035    ///   - Non-unique fields can be indexed separately using `create_index()`
3036    ///
3037    /// # Examples
3038    /// ```ignore
3039    /// # use aurora_db::{Aurora, types::FieldType};
3040    /// # async fn example(db: &Aurora) {
3041    /// db.new_collection("users", vec![
3042    ///     .first_one()
3043    ///     .await?;
3044    ///
3045    /// // DON'T index 'active' - low cardinality (only 2 values: true/false)
3046    /// // A full scan is fast enough for boolean fields
3047    ///
3048    /// // DO index 'age' if you frequently query age ranges
3049    /// db.create_index("users", "age").await?;
3050    ///
3051    /// let young_users = db.query("users")
3052    ///     .filter(|f| f.lt("age", 30))
3053    ///     .collect()
3054    ///     .await?;
3055    /// ```
3056    ///
3057    /// # Real-World Example: E-commerce Orders
3058    ///
3059    /// ```ignore
3060    /// // Orders collection: 1 million documents
3061    /// db.new_collection("orders", vec![
3062    ///     ("user_id", FieldType::Scalar(crate::types::ScalarType::String)),    // High cardinality
3063    ///     ("status", FieldType::Scalar(crate::types::ScalarType::String)),      // Low cardinality (pending, shipped, delivered)
3064    ///     ("created_at", FieldType::Scalar(crate::types::ScalarType::String)),
3065    ///     ("total", FieldType::Float),
3066    /// ])?;
3067    ///
3068    /// // Index user_id - queries like "show me my orders" are common
3069    /// db.create_index("orders", "user_id").await?;  // Good choice
3070    ///
3071    /// // Query speedup: 2.5s → 0.001s
3072    /// let my_orders = db.query("orders")
3073    ///     .filter(|f| f.eq("user_id", user_id))
3074    ///     .collect()
3075    ///     .await?;
3076    ///
3077    /// // DON'T index 'status' - only 3 possible values
3078    /// // Scanning 1M docs takes ~100ms, indexing won't help much
3079    ///
3080    /// // Index created_at if you frequently query recent orders
3081    /// db.create_index("orders", "created_at").await?;  // Good for time-based queries
3082    /// ```
3083    pub async fn create_index(&self, collection: &str, field: &str) -> Result<()> {
3084        let collection_def = self.get_collection_definition(collection)?;
3085
3086        if let Some(field_def) = collection_def.fields.get(field) {
3087            if field_def.field_type == FieldType::Any {
3088                return Err(AqlError::new(
3089                    ErrorCode::InvalidDefinition,
3090                    "Cannot create an index on a field of type 'Any'.".to_string(),
3091                ));
3092            }
3093        } else {
3094            return Err(AqlError::new(
3095                ErrorCode::InvalidDefinition,
3096                format!(
3097                    "Field '{}' not found in collection '{}'.",
3098                    field, collection
3099                ),
3100            ));
3101        }
3102
3103        // Generate a default index name
3104        let index_name = format!("idx_{}_{}", collection, field);
3105
3106        // Create index definition
3107        let definition = IndexDefinition {
3108            name: index_name.clone(),
3109            collection: collection.to_string(),
3110            fields: vec![field.to_string()],
3111            index_type: IndexType::BTree,
3112            unique: false,
3113        };
3114
3115        // Create the index
3116        let index = Index::new(definition.clone());
3117
3118        // Index all existing documents in the collection
3119        let prefix = format!("{}:", collection);
3120        for result in self.cold.scan_prefix(&prefix) {
3121            if let Ok((_, data)) = result
3122                && let Ok(doc) = serde_json::from_slice::<Document>(&data)
3123            {
3124                let _ = index.insert(&doc);
3125            }
3126        }
3127
3128        // Store the index
3129        self.indices.insert(index_name, index);
3130
3131        // Store the index definition for persistence
3132        let index_key = format!("_index:{}:{}", collection, field);
3133        self.put(index_key, serde_json::to_vec(&definition)?, None)
3134            .await?;
3135
3136        Ok(())
3137    }
3138
3139    /// Query documents in a collection with filtering, sorting, and pagination
3140    ///
3141    /// Returns a `QueryBuilder` that allows fluent chaining of query operations.
3142    /// Queries use early termination for LIMIT clauses, making them extremely fast
3143    /// even on large collections (6,800x faster than naive implementations).
3144    ///
3145    /// # Performance
3146    /// - With LIMIT: O(k) where k = limit + offset (early termination!)
3147    /// - Without LIMIT: O(n) where n = matching documents
3148    /// - Uses secondary indices when available for equality filters
3149    /// - Hot cache: ~1M reads/sec, Cold storage: ~500K reads/sec
3150    ///
3151    /// # Examples
3152    ///
3153    /// ```ignore
3154    /// use aurora_db::{Aurora, types::Value};
3155    ///
3156    /// let db = Aurora::open("mydb.db")?;
3157    ///
3158    /// // Simple equality query
3159    /// let active_users = db.query("users")
3160    ///     .filter(|f| f.eq("active", Value::Bool(true)))
3161    ///     .collect()
3162    ///     .await?;
3163    ///
3164    /// // Range query with pagination (FAST - uses early termination!)
3165    /// let top_scorers = db.query("users")
3166    ///     .filter(|f| f.gt("score", Value::Int(1000)))
3167    ///     .order_by("score", false)  // descending
3168    ///     .limit(10)
3169    ///     .offset(20)
3170    ///     .collect()
3171    ///     .await?;
3172    ///
3173    /// // Multiple filters
3174    /// let premium_active = db.query("users")
3175    ///     .filter(|f| f.eq("tier", Value::String("premium".into())))
3176    ///     .filter(|f| f.eq("active", Value::Bool(true)))
3177    ///     .limit(100)  // Only scans ~200 docs, not all million!
3178    ///     .collect()
3179    ///     .await?;
3180    ///
3181    /// // Text search in a field
3182    /// Starts a fluent query builder for the specified collection.
3183    ///
3184    /// # Example
3185    /// ```rust
3186    /// # use aurora_db::{Aurora, Value};
3187    /// # async fn example(db: Aurora) -> Result<(), Box<dyn std::error::Error>> {
3188    /// let users = db.query("users")
3189    ///     .filter(|f| f.eq("active", true))
3190    ///     .collect()
3191    ///     .await?;
3192    /// # Ok(())
3193    /// # }
3194    /// ```
3195    pub fn query<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
3196        QueryBuilder::new(self, collection)
3197    }
3198
3199    /// Starts a full-text search builder for the specified collection.
3200    ///
3201    /// # Example
3202    /// ```rust
3203    /// # use aurora_db::{Aurora, Value};
3204    /// # async fn example(db: Aurora) -> Result<(), Box<dyn std::error::Error>> {
3205    /// let results = db.search("products")
3206    ///     .query("wireless headphones")
3207    ///     .fuzzy(1)
3208    ///     .collect()
3209    ///     .await?;
3210    /// # Ok(())
3211    /// # }
3212    /// ```
3213    pub fn search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
3214        SearchBuilder::new(self, collection)
3215    }
3216
3217    /// Retrieve a document by ID
3218    ///
3219    /// Fast direct lookup when you know the document ID. Significantly faster
3220    /// than querying with filters when ID is known.
3221    ///
3222    /// # Performance
3223    /// - Hot cache: ~1,000,000 reads/sec (instant)
3224    /// - Cold storage: ~500,000 reads/sec (disk I/O)
3225    /// - Complexity: O(1) - constant time lookup
3226    /// - Much faster than `.query().filter(|f| f.eq("id", ...))` which is O(n)
3227    ///
3228    /// # Arguments
3229    /// * `collection` - Name of the collection to query
3230    /// * `id` - ID of the document to retrieve
3231    ///
3232    /// # Returns
3233    /// The document if found, None if not found, or an error
3234    ///
3235    /// # Examples
3236    ///
3237
3238    /// use aurora_db::{Aurora, types::Value};
3239    ///
3240    /// let db = Aurora::open("mydb.db")?;
3241    ///
3242    /// // Basic retrieval
3243    /// if let Some(user) = db.get_document("users", &user_id)? {
3244    ///     println!("Found user: {}", user._sid);
3245    ///
3246    ///     // Access fields safely
3247    ///     if let Some(Value::String(name)) = user.data.get("name") {
3248    ///         println!("Name: {}", name);
3249    ///     }
3250    ///
3251    ///     if let Some(Value::Int(age)) = user.data.get("age") {
3252    ///         println!("Age: {}", age);
3253    ///     }
3254    /// } else {
3255    ///     println!("User not found");
3256    /// }
3257    ///
3258    /// // Idiomatic error handling
3259    /// let user = db.get_document("users", &user_id)?
3260    ///     .ok_or_else(|| AqlError::new(ErrorCode::NotFound,"User not found".into()))?;
3261    ///
3262    /// // Checking existence before operations
3263    /// if db.get_document("users", &user_id)?.is_some() {
3264    ///     db.update_document("users", &user_id, vec![
3265    ///         ("last_login", Value::String(chrono::Utc::now().to_rfc3339())),
3266    ///     ]).await?;
3267    /// }
3268    ///
3269    /// // Batch retrieval (fetch multiple by ID)
3270    /// let user_ids = vec!["user-1", "user-2", "user-3"];
3271    /// let users: Vec<Document> = user_ids.iter()
3272    ///     .filter_map(|id| db.get_document("users", id).ok().flatten())
3273    ///     .collect();
3274    ///
3275    /// println!("Found {} out of {} users", users.len(), user_ids.len());
3276    /// ```ignore
3277    ///
3278    /// # When to Use
3279    /// - You know the document ID (from insert, previous query, or URL param)
3280    /// - Need fastest possible lookup (1M reads/sec)
3281    /// - Fetching a single document
3282    ///
3283    /// Retrieves a single document by its ID.
3284    ///
3285    /// # Arguments
3286    /// * `collection` - The collection name.
3287    /// * `sid` - The document system ID.
3288    ///
3289    /// # Returns
3290    /// `Ok(Some(Document))` if found, `Ok(None)` if not found, or an error.
3291    ///
3292    /// # Example
3293    /// ```rust
3294    /// # use aurora_db::{Aurora, Value};
3295    /// # async fn example(db: Aurora) -> Result<(), Box<dyn std::error::Error>> {
3296    /// if let Some(user) = db.get_document("users", "u123")? {
3297    ///     println!("Found user: {:?}", user);
3298    /// }
3299    /// # Ok(())
3300    /// # }
3301    /// ```
3302    pub fn get_document(&self, collection: &str, sid: &str) -> Result<Option<Document>> {
3303        let key = format!("{}:{}", collection, sid);
3304        if let Some(data) = self.get(&key)? {
3305            Ok(Some(serde_json::from_slice(&data)?))
3306        } else {
3307            Ok(None)
3308        }
3309    }
3310
3311    /// Deletes a document by its full internal key (format: "collection:id").
3312    ///
3313    /// This is a low-level deletion method. It removes the document from storage,
3314    /// clears it from the hot cache, and updates all associated indices.
3315    ///
3316    /// # Arguments
3317    /// * `key` - The full key of the document (e.g., `"users:u123"`).
3318    pub async fn delete(&self, key: &str) -> Result<()> {
3319        // Extract collection and id from key (format: "collection:id")
3320        let (collection, id) = if let Some((coll, doc_id)) = key.split_once(':') {
3321            (coll, doc_id)
3322        } else {
3323            return Err(AqlError::invalid_operation(
3324                "Invalid key format, expected 'collection:id'".to_string(),
3325            ));
3326        };
3327
3328        // CRITICAL FIX: Get document BEFORE deletion to clean up secondary indices
3329        let document = self.get_document(collection, id)?;
3330
3331        // Delete from hot cache
3332        if self.hot.get(key).is_some() {
3333            self.hot.delete(key);
3334        }
3335
3336        // Delete from cold storage
3337        self.cold.delete(key)?;
3338
3339        // CRITICAL FIX: Clean up ALL indices (primary + secondary)
3340        if let Some(doc) = document {
3341            self.remove_from_indices(collection, &doc)?;
3342        } else {
3343            // Fallback: at least remove from primary index using the full sled key.
3344            if let Some(index) = self.primary_indices.get_mut(collection) {
3345                index.remove(key);
3346            }
3347        }
3348
3349        // Publish delete event
3350        let event = crate::pubsub::ChangeEvent::delete(collection, id);
3351        let _ = self.pubsub.publish(event);
3352
3353        Ok(())
3354    }
3355
3356    /// Deletes an entire collection and all documents within it.
3357    ///
3358    /// This operation permanently removes all documents in the collection and
3359    /// cleans up all associated primary and secondary indices.
3360    pub async fn delete_collection(&self, collection: &str) -> Result<()> {
3361        let prefix = format!("{}:", collection);
3362
3363        // Get all keys in collection
3364        let keys: Vec<String> = self
3365            .cold
3366            .scan()
3367            .filter_map(|r| r.ok())
3368            .filter(|(k, _)| k.starts_with(&prefix))
3369            .map(|(k, _)| k)
3370            .collect();
3371
3372        // Delete each key
3373        for key in keys {
3374            self.delete(&key).await?;
3375        }
3376
3377        // Remove collection indices
3378        self.primary_indices.remove(collection);
3379        self.secondary_indices
3380            .retain(|k, _| !k.starts_with(&prefix));
3381
3382        // Invalidate schema cache
3383        self.schema_cache.remove(collection);
3384
3385        Ok(())
3386    }
3387
3388    fn remove_from_indices(&self, collection: &str, doc: &Document) -> Result<()> {
3389        // Remove from primary index — key format is "collection:sid" (the full sled key).
3390        if let Some(index) = self.primary_indices.get(collection) {
3391            index.remove(&format!("{}:{}", collection, doc._sid));
3392        }
3393
3394        let u = self.parse_external_id(&doc._sid);
3395        let internal_id = self._sid_dictionary.get(&u).map(|e| *e.value());
3396
3397        // Remove from secondary indices
3398        for (field, value) in &doc.data {
3399            let index_key = format!("{}:{}", collection, field);
3400            if let Some(index_map) = self.secondary_indices.get(&index_key) {
3401                let val_str = match value {
3402                    Value::String(s) => s.clone(),
3403                    _ => value.to_string(),
3404                };
3405                if let Some(storage_arc) = index_map.get(&val_str) {
3406                    if let Ok(mut storage) = storage_arc.value().write() {
3407                        if let Some(id) = internal_id {
3408                            storage.remove(id);
3409                        }
3410                    }
3411                }
3412            }
3413        }
3414
3415        // Return the internal ID to the recycling pool so it can be reused
3416        if let Some(id) = internal_id {
3417            // Remove mapping from dictionaries and Sled
3418            self._sid_dictionary.remove(&u);
3419            if let Ok(mut reverse) = self.reverse_sid_dictionary.write() {
3420                if let Some(r) = reverse.get_mut(id as usize) {
3421                    *r = 0; // Clear the reverse mapping
3422                }
3423            }
3424            let _ = self.sys_id_mapping.remove(u.to_be_bytes());
3425
3426            if let Ok(mut deleted) = self.deleted_ids.write() {
3427                deleted.insert(id);
3428            }
3429        }
3430
3431        Ok(())
3432    }
3433
3434    pub async fn search_text(
3435        &self,
3436        collection: &str,
3437        field: &str,
3438        query: &str,
3439    ) -> Result<Vec<Document>> {
3440        let mut results = Vec::new();
3441        let docs = self.get_all_collection(collection).await?;
3442
3443        for doc in docs {
3444            if let Some(Value::String(text)) = doc.data.get(field)
3445                && text.to_lowercase().contains(&query.to_lowercase())
3446            {
3447                results.push(doc);
3448            }
3449        }
3450
3451        Ok(results)
3452    }
3453
3454    /// Export a collection to a JSON file
3455    ///
3456    /// Creates a JSON file containing all documents in the collection.
3457    /// Useful for backups, data migration, or sharing datasets.
3458    /// Automatically appends `.json` extension if not present.
3459    ///
3460    /// # Performance
3461    /// - Export speed: ~10,000 docs/sec
3462    /// - Scans entire collection from cold storage
3463    /// - Memory efficient: streams documents to file
3464    ///
3465    /// # Arguments
3466    /// * `collection` - Name of the collection to export
3467    /// * `output_path` - Path to the output JSON file (`.json` auto-appended)
3468    ///
3469    /// # Returns
3470    /// Success or an error
3471    ///
3472    /// # Examples
3473    ///
3474    /// ```
3475    /// use aurora_db::Aurora;
3476    ///
3477    /// let db = Aurora::open("mydb.db")?;
3478    ///
3479    /// // Basic export
3480    /// db.export_as_json("users", "./backups/users_2024-01-15")?;
3481    /// // Creates: ./backups/users_2024-01-15.json
3482    ///
3483    /// // Timestamped backup
3484    /// let timestamp = chrono::Utc::now().format("%Y%m%d_%H%M%S");
3485    /// let backup_path = format!("./backups/users_{}", timestamp);
3486    /// db.export_as_json("users", &backup_path)?;
3487    ///
3488    /// // Export multiple collections
3489    /// for collection in &["users", "orders", "products"] {
3490    ///     db.export_as_json(collection, &format!("./export/{}", collection))?;
3491    /// }
3492    /// ```ignore
3493    ///
3494    /// # Output Format
3495    ///
3496    /// The exported JSON has this structure:
3497    /// ```json
3498    /// {
3499    ///   "users": [
3500    ///     { "id": "123", "name": "Alice", "email": "alice@example.com" },
3501    ///     { "id": "456", "name": "Bob", "email": "bob@example.com" }
3502    ///   ]
3503    /// }
3504    /// ```
3505    ///
3506    /// # See Also
3507    /// - `export_as_csv()` for CSV format export
3508    /// - `import_from_json()` to restore exported data
3509    pub fn export_as_json(&self, collection: &str, output_path: &str) -> Result<()> {
3510        use std::io::{BufWriter, Write};
3511
3512        let output_path = if !output_path.ends_with(".json") {
3513            format!("{}.json", output_path)
3514        } else {
3515            output_path.to_string()
3516        };
3517
3518        // Flush write buffer so in-flight docs reach cold storage before we scan.
3519        self.flush()?;
3520
3521        let file = StdFile::create(&output_path)?;
3522        let mut writer = BufWriter::new(file);
3523
3524        // Stream as a JSON object: { "<collection>": [ ... ] }
3525        // Each document is serialised individually — O(1) memory regardless of
3526        // dataset size.
3527        write!(writer, "{{\"{}\": [\n", collection)?;
3528
3529        let prefix = format!("{}:", collection);
3530        let mut first = true;
3531
3532        for result in self.cold.scan_prefix(&prefix) {
3533            let (key, value) = result?;
3534            if key.starts_with("_collection:") {
3535                continue;
3536            }
3537            let Ok(doc) = serde_json::from_slice::<Document>(&value) else {
3538                continue;
3539            };
3540
3541            // Recursively convert every Value to a JsonValue — no silent drops.
3542            let mut obj = serde_json::Map::new();
3543            // Always include the document id field
3544            obj.insert("_id".to_string(), JsonValue::String(doc._sid));
3545            for (k, v) in doc.data {
3546                obj.insert(k, Self::value_to_json(v));
3547            }
3548
3549            if !first {
3550                write!(writer, ",\n")?;
3551            }
3552            serde_json::to_writer(&mut writer, &JsonValue::Object(obj))?;
3553            first = false;
3554        }
3555
3556        write!(writer, "\n]}}")?;
3557        writer.flush()?;
3558        println!("Exported collection '{}' to {}", collection, &output_path);
3559        Ok(())
3560    }
3561
3562    /// Recursively convert an Aurora `Value` to a `serde_json::Value` with no
3563    /// data loss — nested objects and arrays are handled at every depth.
3564    fn value_to_json(v: Value) -> JsonValue {
3565        match v {
3566            Value::String(s) => JsonValue::String(s),
3567            Value::Int(i) => JsonValue::Number(i.into()),
3568            Value::Float(f) => serde_json::Number::from_f64(f)
3569                .map(JsonValue::Number)
3570                .unwrap_or(JsonValue::Null),
3571            Value::Bool(b) => JsonValue::Bool(b),
3572            Value::Null => JsonValue::Null,
3573            Value::Uuid(u) => JsonValue::String(u.to_string()),
3574            Value::DateTime(dt) => JsonValue::String(dt.to_rfc3339()),
3575            Value::Array(arr) => {
3576                JsonValue::Array(arr.into_iter().map(Self::value_to_json).collect())
3577            }
3578            Value::Object(map) => {
3579                let mut obj = serde_json::Map::new();
3580                for (k, v) in map {
3581                    obj.insert(k, Self::value_to_json(v));
3582                }
3583                JsonValue::Object(obj)
3584            }
3585        }
3586    }
3587
3588    /// Export a collection to a CSV file
3589    ///
3590    /// Creates a CSV file with headers from the first document and rows for each document.
3591    /// Useful for spreadsheet analysis, data science workflows, or reporting.
3592    /// Automatically appends `.csv` extension if not present.
3593    ///
3594    /// # Performance
3595    /// - Export speed: ~8,000 docs/sec
3596    /// - Memory efficient: streams rows to file
3597    /// - Headers determined from first document
3598    ///
3599    /// # Arguments
3600    /// * `collection` - Name of the collection to export
3601    /// * `filename` - Path to the output CSV file (`.csv` auto-appended)
3602    ///
3603    /// # Returns
3604    /// Success or an error
3605    ///
3606    /// # Examples
3607    ///
3608    /// ```ignore
3609    /// use aurora_db::Aurora;
3610    ///
3611    /// let db = Aurora::open("mydb.db")?;
3612    ///
3613    /// // Basic CSV export
3614    /// db.export_as_csv("users", "./reports/users")?;
3615    /// // Creates: ./reports/users.csv
3616    ///
3617    /// // Export for analysis in Excel/Google Sheets
3618    /// db.export_as_csv("orders", "./analytics/sales_data")?;
3619    ///
3620    /// // Monthly report generation
3621    /// let month = chrono::Utc::now().format("%Y-%m");
3622    /// db.export_as_csv("transactions", &format!("./reports/transactions_{}", month))?;
3623    /// ```
3624    ///
3625    /// # Output Format
3626    ///
3627    /// ```csv
3628    /// id,name,email,age
3629    /// 123,Alice,alice@example.com,28
3630    /// 456,Bob,bob@example.com,32
3631    /// ```ignore
3632    ///
3633    /// # Important Notes
3634    /// - Headers are taken from the first document's fields
3635    /// - Documents with different fields will have empty values for missing fields
3636    /// - Nested objects/arrays are converted to strings
3637    /// - Best for flat document structures
3638    ///
3639    /// # See Also
3640    /// - `export_as_json()` for JSON format (better for nested data)
3641    /// - For complex nested structures, use JSON export instead
3642    pub fn export_as_csv(&self, collection: &str, filename: &str) -> Result<()> {
3643        let output_path = if !filename.ends_with(".csv") {
3644            format!("{}.csv", filename)
3645        } else {
3646            filename.to_string()
3647        };
3648
3649        // Flush write buffer so in-flight docs are visible in cold storage.
3650        self.flush()?;
3651
3652        // Derive the canonical column order from the schema definition so that
3653        // every row has the same columns even when some docs are missing optional
3654        // fields.  Fall back to an empty list; first-doc heuristic fills it in
3655        // that case (schema-less collections).
3656        let mut headers: Vec<String> = match self.get_collection_definition(collection) {
3657            Ok(coll) => {
3658                // _id first, then all schema fields in definition order
3659                let mut cols = vec!["_id".to_string()];
3660                cols.extend(coll.fields.into_keys());
3661                cols
3662            }
3663            Err(_) => vec!["_id".to_string()],
3664        };
3665
3666        let mut writer = csv::Writer::from_path(&output_path)?;
3667        let mut headers_written = false;
3668
3669        let prefix = format!("{}:", collection);
3670
3671        for result in self.cold.scan_prefix(&prefix) {
3672            let (key, value) = result?;
3673            if key.starts_with("_collection:") {
3674                continue;
3675            }
3676            let Ok(doc) = serde_json::from_slice::<Document>(&value) else {
3677                continue;
3678            };
3679
3680            // If we only have the _id column (schema-less path), discover
3681            // headers from the first document we encounter.
3682            if !headers_written && headers.len() == 1 {
3683                for k in doc.data.keys() {
3684                    if !headers.contains(k) {
3685                        headers.push(k.clone());
3686                    }
3687                }
3688            }
3689
3690            if !headers_written {
3691                writer.write_record(&headers)?;
3692                headers_written = true;
3693            }
3694
3695            let row: Vec<String> = headers
3696                .iter()
3697                .map(|col| {
3698                    if col == "_id" {
3699                        return doc._sid.clone();
3700                    }
3701                    match doc.data.get(col) {
3702                        None => String::new(),
3703                        Some(Value::String(s)) => s.clone(),
3704                        Some(Value::Int(i)) => i.to_string(),
3705                        Some(Value::Float(f)) => f.to_string(),
3706                        Some(Value::Bool(b)) => b.to_string(),
3707                        Some(Value::Null) => String::new(),
3708                        Some(Value::Uuid(u)) => u.to_string(),
3709                        Some(Value::DateTime(dt)) => dt.to_rfc3339(),
3710                        // Nested types: JSON-encode for lossless round-trip.
3711                        // Consumers (pandas, DuckDB, etc.) can parse these back.
3712                        Some(v) => serde_json::to_string(&Self::value_to_json(v.clone()))
3713                            .unwrap_or_default(),
3714                    }
3715                })
3716                .collect();
3717
3718            writer.write_record(&row)?;
3719        }
3720
3721        writer.flush()?;
3722        println!("Exported collection '{}' to {}", collection, &output_path);
3723        Ok(())
3724    }
3725
3726    /// Helper method to create filter-based queries. Alias for `query()`.
3727    pub fn find<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
3728        self.query(collection)
3729    }
3730
3731    // Convenience methods that build on top of the FilterBuilder
3732
3733    /// Finds a single document by its ID.
3734    pub async fn find_by_id(&self, collection: &str, id: &str) -> Result<Option<Document>> {
3735        self.query(collection)
3736            .filter(|f| f.eq("id", id))
3737            .first_one()
3738            .await
3739    }
3740
3741    /// Finds a single document matching the given filter.
3742    pub async fn find_one<F>(&self, collection: &str, filter_fn: F) -> Result<Option<Document>>
3743    where
3744        F: FnOnce(&FilterBuilder) -> Filter + Send + Sync + 'static,
3745    {
3746        self.query(collection).filter(filter_fn).first_one().await
3747    }
3748
3749    /// Finds all documents where a specific field matches a value.
3750    pub async fn find_by_field<T: Into<Value> + Clone + Send + Sync + 'static>(
3751        &self,
3752        collection: &str,
3753        field: &'static str,
3754        value: T,
3755    ) -> Result<Vec<Document>> {
3756        let value_clone = value.clone();
3757        self.query(collection)
3758            .filter(move |f: &FilterBuilder| f.eq(field, value_clone.clone()))
3759            .collect()
3760            .await
3761    }
3762
3763    /// Finds documents matching multiple field-value pairs (equality).
3764    pub async fn find_by_fields(
3765        &self,
3766        collection: &str,
3767        fields: Vec<(&str, Value)>,
3768    ) -> Result<Vec<Document>> {
3769        let mut query = self.query(collection);
3770
3771        for (field, value) in fields {
3772            let field_owned = field.to_owned();
3773            let value_owned = value.clone();
3774            query = query.filter(move |f| f.eq(&field_owned, value_owned.clone()));
3775        }
3776
3777        query.collect().await
3778    }
3779
3780    /// Finds documents where a field value is within a specified range (inclusive).
3781    pub async fn find_in_range<T: Into<Value> + Clone + Send + Sync + 'static>(
3782        &self,
3783        collection: &str,
3784        field: &'static str,
3785        min: T,
3786        max: T,
3787    ) -> Result<Vec<Document>> {
3788        self.query(collection)
3789            .filter(move |f| f.between(field, min.clone(), max.clone()))
3790            .collect()
3791            .await
3792    }
3793
3794    /// Complex query example: build with multiple combined filters.
3795    pub async fn find_complex<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
3796        self.query(collection)
3797    }
3798
3799    /// Create a full-text search query with added filter options.
3800    pub fn advanced_search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
3801        self.search(collection)
3802    }
3803
3804    // Utility methods for common operations
3805    pub async fn upsert(
3806        &self,
3807        collection: &str,
3808        id: &str,
3809        data: Vec<(&str, Value)>,
3810    ) -> Result<String> {
3811        // Convert Vec<(&str, Value)> to HashMap<String, Value>
3812        let data_map: HashMap<String, Value> =
3813            data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
3814
3815        // Check if document exists
3816        if let Some(mut doc) = self.get_document(collection, id)? {
3817            // Clone for notification
3818            let old_doc = doc.clone();
3819
3820            // Update existing document - merge new data
3821            for (key, value) in data_map {
3822                doc.data.insert(key, value);
3823            }
3824
3825            // Validate unique constraints for the updated document
3826            // We need to exclude the current document from the uniqueness check
3827            self.validate_unique_constraints_excluding(collection, &doc.data, id)
3828                .await?;
3829
3830            self.put(
3831                format!("{}:{}", collection, id),
3832                serde_json::to_vec(&doc)?,
3833                None,
3834            )
3835            .await?;
3836
3837            // Publish update event
3838            let event = crate::pubsub::ChangeEvent::update(collection, id, old_doc, doc);
3839            let _ = self.pubsub.publish(event);
3840
3841            Ok(id.to_string())
3842        } else {
3843            // Insert new document with specified ID - validate unique constraints
3844            self.validate_unique_constraints(collection, &data_map)
3845                .await?;
3846
3847            let document = Document {
3848                _sid: id.to_string(),
3849                data: data_map,
3850            };
3851
3852            self.put(
3853                format!("{}:{}", collection, id),
3854                serde_json::to_vec(&document)?,
3855                None,
3856            )
3857            .await?;
3858
3859            // Publish insert event
3860            let event = crate::pubsub::ChangeEvent::insert(collection, id, document);
3861            let _ = self.pubsub.publish(event);
3862
3863            Ok(id.to_string())
3864        }
3865    }
3866
3867    // Atomic increment/decrement
3868    pub async fn increment(
3869        &self,
3870        collection: &str,
3871        id: &str,
3872        field: &str,
3873        amount: i64,
3874    ) -> Result<i64> {
3875        if let Some(mut doc) = self.get_document(collection, id)? {
3876            // Get current value
3877            let current = match doc.data.get(field) {
3878                Some(Value::Int(i)) => *i,
3879                _ => 0,
3880            };
3881
3882            // Increment
3883            let new_value = current + amount;
3884            doc.data.insert(field.to_string(), Value::Int(new_value));
3885
3886            // Save changes
3887            self.put(
3888                format!("{}:{}", collection, id),
3889                serde_json::to_vec(&doc)?,
3890                None,
3891            )
3892            .await?;
3893
3894            Ok(new_value)
3895        } else {
3896            Err(AqlError::new(
3897                ErrorCode::NotFound,
3898                format!("Document {}:{} not found", collection, id),
3899            ))
3900        }
3901    }
3902
3903    // Delete documents by query
3904    pub async fn delete_by_query<F>(&self, collection: &str, filter_fn: F) -> Result<usize>
3905    where
3906        F: FnOnce(&FilterBuilder) -> Filter + Send + Sync + 'static,
3907    {
3908        let docs = self.query(collection).filter(filter_fn).collect().await?;
3909        let mut deleted_count = 0;
3910
3911        for doc in docs {
3912            let key = format!("{}:{}", collection, doc._sid);
3913            self.delete(&key).await?;
3914            deleted_count += 1;
3915        }
3916
3917        Ok(deleted_count)
3918    }
3919
3920    /// Import documents from a JSON file into a collection
3921    ///
3922    /// Validates each document against the collection schema, skips duplicates (by ID),
3923    /// and provides detailed statistics about the import operation. Useful for restoring
3924    /// backups, migrating data, or seeding development databases.
3925    ///
3926    /// # Performance
3927    /// - Import speed: ~5,000 docs/sec (with validation)
3928    /// - Memory efficient: processes documents one at a time
3929    /// - Validates schema and unique constraints
3930    ///
3931    /// # Arguments
3932    /// * `collection` - Name of the collection to import into
3933    /// * `filename` - Path to the JSON file containing documents (array format)
3934    ///
3935    /// # Returns
3936    /// `ImportStats` containing counts of imported, skipped, and failed documents
3937    ///
3938    /// # Examples
3939    ///
3940    /// ```
3941    /// use aurora_db::Aurora;
3942    ///
3943    /// let db = Aurora::open("mydb.db")?;
3944    ///
3945    /// // Basic import
3946    /// let stats = db.import_from_json("users", "./data/new_users.json").await?;
3947    /// println!("Imported: {}, Skipped: {}, Failed: {}",
3948    ///     stats.imported, stats.skipped, stats.failed);
3949    ///
3950    /// // Restore from backup
3951    /// let backup_file = "./backups/users_2024-01-15.json";
3952    /// let stats = db.import_from_json("users", backup_file).await?;
3953    ///
3954    /// if stats.failed > 0 {
3955    ///     eprintln!("Warning: {} documents failed validation", stats.failed);
3956    /// }
3957    ///
3958    /// // Idempotent import - duplicates are skipped
3959    /// let stats = db.import_from_json("users", "./data/users.json").await?;
3960    /// // Running again will skip all existing documents
3961    /// let stats2 = db.import_from_json("users", "./data/users.json").await?;
3962    /// assert_eq!(stats2.skipped, stats.imported);
3963    ///
3964    /// // Migration from another system
3965    /// db.new_collection("products", vec![
3966    ///     ("sku", FieldType::Scalar(crate::types::ScalarType::String)),
3967    ///     ("name", FieldType::Scalar(crate::types::ScalarType::String)),
3968    ///     ("price", FieldType::Float),
3969    /// ])?;
3970    ///
3971    /// let stats = db.import_from_json("products", "./migration/products.json").await?;
3972    /// println!("Migration complete: {} products imported", stats.imported);
3973    /// ```ignore
3974    ///
3975    /// # Expected JSON Format
3976    ///
3977    /// The JSON file should contain an array of document objects:
3978    /// ```json
3979    /// [
3980    ///   { "id": "123", "name": "Alice", "email": "alice@example.com" },
3981    ///   { "id": "456", "name": "Bob", "email": "bob@example.com" },
3982    ///   { "name": "Carol", "email": "carol@example.com" }
3983    /// ]
3984    /// ```
3985    ///
3986    /// # Behavior
3987    /// - Documents with existing IDs are skipped (duplicate detection)
3988    /// - Documents without IDs get auto-generated UUIDs
3989    /// - Schema validation is performed on all fields
3990    /// - Failed documents are counted but don't stop the import
3991    /// - Unique constraints are checked
3992    ///
3993    /// # See Also
3994    /// - `export_as_json()` to create compatible backup files
3995    /// - `batch_insert()` for programmatic bulk inserts
3996    pub async fn import_from_json(&self, collection: &str, filename: &str) -> Result<ImportStats> {
3997        // Validate that the collection exists
3998        let collection_def = self.get_collection_definition(collection)?;
3999
4000        // Load JSON file
4001        let json_string = read_to_string(filename).await.map_err(|e| {
4002            AqlError::new(
4003                ErrorCode::IoError,
4004                format!("Failed to read import file: {}", e),
4005            )
4006        })?;
4007
4008        // Parse JSON
4009        let documents: Vec<JsonValue> = from_str(&json_string).map_err(|e| {
4010            AqlError::new(
4011                ErrorCode::SerializationError,
4012                format!("Failed to parse JSON: {}", e),
4013            )
4014        })?;
4015
4016        let mut stats = ImportStats::default();
4017
4018        // Process each document
4019        for doc_json in documents {
4020            match self
4021                .import_document(collection, &collection_def, doc_json)
4022                .await
4023            {
4024                Ok(ImportResult::Imported) => stats.imported += 1,
4025                Ok(ImportResult::Skipped) => stats.skipped += 1,
4026                Err(_) => stats.failed += 1,
4027            }
4028        }
4029
4030        Ok(stats)
4031    }
4032
4033    /// Import a single document, performing schema validation and duplicate checking
4034    async fn import_document(
4035        &self,
4036        collection: &str,
4037        collection_def: &Collection,
4038        doc_json: JsonValue,
4039    ) -> Result<ImportResult> {
4040        if !doc_json.is_object() {
4041            return Err(AqlError::invalid_operation(
4042                "Expected JSON object".to_string(),
4043            ));
4044        }
4045
4046        // Extract document ID if present
4047        let doc_id = doc_json
4048            .get("id")
4049            .and_then(|id| id.as_str())
4050            .map(|s| s.to_string())
4051            .unwrap_or_else(|| Uuid::now_v7().to_string());
4052
4053        // Check if document with this ID already exists
4054        if self.get_document(collection, &doc_id)?.is_some() {
4055            return Ok(ImportResult::Skipped);
4056        }
4057
4058        // Convert JSON to our document format and validate against schema
4059        let mut data_map = HashMap::new();
4060
4061        if let Some(obj) = doc_json.as_object() {
4062            for (field_name, field_def) in &collection_def.fields {
4063                if let Some(json_value) = obj.get(field_name) {
4064                    // Validate value against field type
4065                    if !self.validate_field_value(json_value, &field_def.field_type) {
4066                        return Err(AqlError::invalid_operation(format!(
4067                            "Field '{}' has invalid type",
4068                            field_name
4069                        )));
4070                    }
4071
4072                    // Convert JSON value to our Value type
4073                    let value = self.json_to_value(json_value)?;
4074                    data_map.insert(field_name.clone(), value);
4075                } else if field_def.unique {
4076                    // Missing required unique field
4077                    return Err(AqlError::invalid_operation(format!(
4078                        "Missing required unique field '{}'",
4079                        field_name
4080                    )));
4081                }
4082            }
4083        }
4084
4085        // Check for duplicates by unique fields
4086        let unique_fields = self.get_unique_fields(collection_def);
4087        for unique_field in &unique_fields {
4088            if let Some(value) = data_map.get(unique_field) {
4089                // Query for existing documents with this unique value
4090                let query_results = self
4091                    .query(collection)
4092                    .filter(move |f| f.eq(unique_field, value.clone()))
4093                    .limit(1)
4094                    .collect()
4095                    .await?;
4096
4097                if !query_results.is_empty() {
4098                    // Found duplicate by unique field
4099                    return Ok(ImportResult::Skipped);
4100                }
4101            }
4102        }
4103
4104        // Create and insert document
4105        let document = Document {
4106            _sid: doc_id,
4107            data: data_map,
4108        };
4109
4110        self.put(
4111            format!("{}:{}", collection, document._sid),
4112            serde_json::to_vec(&document)?,
4113            None,
4114        )
4115        .await?;
4116
4117        Ok(ImportResult::Imported)
4118    }
4119
4120    /// Validate that a JSON value matches the expected field type
4121    fn validate_field_value(&self, value: &JsonValue, field_type: &FieldType) -> bool {
4122        use crate::types::ScalarType;
4123        match field_type {
4124            FieldType::Scalar(ScalarType::String) => value.is_string(),
4125            FieldType::Scalar(ScalarType::Int) => value.is_i64() || value.is_u64(),
4126            FieldType::Scalar(ScalarType::Float) => value.is_number(),
4127            FieldType::Scalar(ScalarType::Bool) => value.is_boolean(),
4128            FieldType::Scalar(ScalarType::Uuid) => {
4129                value.is_string() && Uuid::parse_str(value.as_str().unwrap_or("")).is_ok()
4130            }
4131            FieldType::Scalar(ScalarType::Any) => true,
4132
4133            FieldType::Array(_) => value.is_array(),
4134            FieldType::Object | FieldType::Nested(_) => value.is_object(),
4135            // Legacy/Duplicate catches just in case
4136            _ => true,
4137        }
4138    }
4139
4140    /// Convert a JSON value to our internal Value type
4141    #[allow(clippy::only_used_in_recursion)]
4142    fn json_to_value(&self, json_value: &JsonValue) -> Result<Value> {
4143        match json_value {
4144            JsonValue::Null => Ok(Value::Null),
4145            JsonValue::Bool(b) => Ok(Value::Bool(*b)),
4146            JsonValue::Number(n) => {
4147                if let Some(i) = n.as_i64() {
4148                    Ok(Value::Int(i))
4149                } else if let Some(f) = n.as_f64() {
4150                    Ok(Value::Float(f))
4151                } else {
4152                    Err(AqlError::invalid_operation(
4153                        "Invalid number value".to_string(),
4154                    ))
4155                }
4156            }
4157            JsonValue::String(s) => {
4158                // Try parsing as UUID first
4159                if let Ok(uuid) = Uuid::parse_str(s) {
4160                    Ok(Value::Uuid(uuid))
4161                } else {
4162                    Ok(Value::String(s.clone()))
4163                }
4164            }
4165            JsonValue::Array(arr) => {
4166                let mut values = Vec::new();
4167                for item in arr {
4168                    values.push(self.json_to_value(item)?);
4169                }
4170                Ok(Value::Array(values))
4171            }
4172            JsonValue::Object(obj) => {
4173                let mut map = HashMap::new();
4174                for (k, v) in obj {
4175                    map.insert(k.clone(), self.json_to_value(v)?);
4176                }
4177                Ok(Value::Object(map))
4178            }
4179        }
4180    }
4181
4182    /// Get collection definition
4183    pub fn get_collection_definition(&self, collection: &str) -> Result<Collection> {
4184        println!("DB: Getting definition for: {}", collection);
4185        if let Some(data) = self.get(&format!("_collection:{}", collection))? {
4186            let collection_def: Collection = serde_json::from_slice(&data)?;
4187            Ok(collection_def)
4188        } else {
4189            Err(AqlError::new(
4190                ErrorCode::CollectionNotFound,
4191                collection.to_string(),
4192            ))
4193        }
4194    }
4195
4196    /// Returns the names of all registered collections in the database.
4197    pub fn list_collection_names(&self) -> Vec<String> {
4198        self.schema_cache.iter().map(|r| r.key().clone()).collect()
4199    }
4200
4201    /// Retrieves detailed statistics and information about the database storage and performance.
4202    ///
4203    /// # Returns
4204    /// A `DatabaseStats` struct containing hot cache, cold storage, and per-collection metrics.
4205    pub fn get_database_stats(&self) -> Result<DatabaseStats> {
4206        let hot_stats = self.hot.get_stats();
4207        let cold_stats = self.cold.get_stats()?;
4208
4209        Ok(DatabaseStats {
4210            hot_stats,
4211            cold_stats,
4212            estimated_size: self.cold.estimated_size(),
4213            collections: self.get_collection_stats()?,
4214        })
4215    }
4216
4217    /// Checks if a specific key is currently residing in the hot cache.
4218    ///
4219    /// # Arguments
4220    /// * `key` - The full key to check.
4221    pub fn is_in_hot_cache(&self, key: &str) -> bool {
4222        self.hot.is_hot(key)
4223    }
4224
4225    /// Clears all entries from the hot cache.
4226    ///
4227    /// This is useful for manual memory management or when testing performance
4228    /// from a cold start.
4229    pub fn clear_hot_cache(&self) {
4230        self.hot.clear();
4231        println!(
4232            "Hot cache cleared, current hit ratio: {:.2}%",
4233            self.hot.hit_ratio() * 100.0
4234        );
4235    }
4236
4237    /// Prewarm the cache by loading frequently accessed data from cold storage
4238    ///
4239    /// Loads documents from a collection into memory cache to eliminate cold-start
4240    /// latency. Dramatically improves initial query performance after database startup
4241    /// by preloading the most commonly accessed data.
4242    ///
4243    /// # Performance Impact
4244    /// - Prewarming speed: ~20,000 docs/sec
4245    /// - Improves subsequent read latency from ~2ms (disk) to ~0.001ms (memory)
4246    /// - Cache hit rate jumps from 0% to 95%+ for prewarmed data
4247    /// - Memory cost: ~500 bytes per document average
4248    ///
4249    /// # Arguments
4250    /// * `collection` - The collection to prewarm
4251    /// * `limit` - Maximum number of documents to load (default: 1000, None = all)
4252    ///
4253    /// # Returns
4254    /// Number of documents loaded into cache
4255    ///
4256    /// # Examples
4257    ///
4258    /// ```ignore
4259    /// use aurora_db::Aurora;
4260    ///
4261    /// let db = Aurora::open("mydb.db")?;
4262    ///
4263    /// // Prewarm frequently accessed collection
4264    /// let loaded = db.prewarm_cache("users", Some(1000)).await?;
4265    /// println!("Prewarmed {} user documents", loaded);
4266    ///
4267    /// // Now queries are fast from the start
4268    /// let stats_before = db.get_cache_stats();
4269    /// let users = db.query("users").collect().await?;
4270    /// let stats_after = db.get_cache_stats();
4271    ///
4272    /// // High hit rate thanks to prewarming
4273    /// assert!(stats_after.hit_rate > 0.95);
4274    ///
4275    /// // Startup optimization pattern
4276    /// async fn startup_prewarm(db: &Aurora) -> Result<()> {
4277    ///     println!("Prewarming caches...");
4278    ///
4279    ///     // Prewarm most frequently accessed collections
4280    ///     db.prewarm_cache("users", Some(5000)).await?;
4281    ///     db.prewarm_cache("sessions", Some(1000)).await?;
4282    ///     db.prewarm_cache("products", Some(500)).await?;
4283    ///
4284    ///     let stats = db.get_cache_stats();
4285    ///     println!("Cache prewarmed: {} entries loaded", stats.size);
4286    ///
4287    ///     Ok(())
4288    /// }
4289    ///
4290    /// // Web server startup
4291    /// #[tokio::main]
4292    /// async fn main() {
4293    ///     let db = Aurora::open("app.db").unwrap();
4294    ///
4295    ///     // Prewarm before accepting requests
4296    ///     db.prewarm_cache("users", Some(10000)).await.unwrap();
4297    ///
4298    ///     // Server is now ready with hot cache
4299    ///     start_web_server(db).await;
4300    /// }
4301    ///
4302    /// // Prewarm all documents (for small collections)
4303    /// let all_loaded = db.prewarm_cache("config", None).await?;
4304    /// // All config documents now in memory
4305    ///
4306    /// // Selective prewarming based on access patterns
4307    /// async fn smart_prewarm(db: &Aurora) -> Result<()> {
4308    ///     // Load recent users (they're accessed most)
4309    ///     db.prewarm_cache("users", Some(1000)).await?;
4310    ///
4311    ///     // Load active sessions only
4312    ///     let active_sessions = db.query("sessions")
4313    ///         .filter(|f| f.eq("active", Value::Bool(true)))
4314    ///         .limit(500)
4315    ///         .collect()
4316    ///         .await?;
4317    ///
4318    ///     // Manually populate cache with hot data
4319    ///     for session in active_sessions {
4320    ///         // Reading automatically caches
4321    ///         db.get_document("sessions", &session._sid)?;
4322    ///     }
4323    ///
4324    ///     Ok(())
4325    /// }
4326    /// ```
4327    ///
4328    /// # Typical Prewarming Scenarios
4329    ///
4330    /// **Web Application Startup:**
4331    /// ```ignore
4332    /// // Load user data, sessions, and active content
4333    /// db.prewarm_cache("users", Some(5000)).await?;
4334    /// db.prewarm_cache("sessions", Some(2000)).await?;
4335    /// db.prewarm_cache("posts", Some(1000)).await?;
4336    /// ```
4337    ///
4338    /// **E-commerce Site:**
4339    /// ```ignore
4340    /// // Load products, categories, and user carts
4341    /// db.prewarm_cache("products", Some(500)).await?;
4342    /// db.prewarm_cache("categories", None).await?;  // All categories
4343    /// db.prewarm_cache("active_carts", Some(1000)).await?;
4344    /// ```
4345    ///
4346    /// **API Server:**
4347    /// ```ignore
4348    /// // Load authentication data and rate limits
4349    /// db.prewarm_cache("api_keys", None).await?;
4350    /// db.prewarm_cache("rate_limits", Some(10000)).await?;
4351    /// ```
4352    ///
4353    /// # When to Use
4354    /// - At application startup to eliminate cold-start latency
4355    /// - After cache clear operations
4356    /// - Before high-traffic events (product launches, etc.)
4357    /// - When deploying new instances (load balancer warm-up)
4358    ///
4359    /// # Memory Considerations
4360    /// - 1,000 docs ≈ 500 KB memory
4361    /// - 10,000 docs ≈ 5 MB memory
4362    /// - 100,000 docs ≈ 50 MB memory
4363    /// - Stay within configured cache capacity
4364    ///
4365    /// # See Also
4366    /// - `get_cache_stats()` to monitor cache effectiveness
4367    /// - `prewarm_all_collections()` to prewarm all collections
4368    /// - `Aurora::with_config()` to adjust cache capacity
4369    pub async fn prewarm_cache(&self, collection: &str, limit: Option<usize>) -> Result<usize> {
4370        let limit = limit.unwrap_or(1000);
4371        let prefix = format!("{}:", collection);
4372        let mut loaded = 0;
4373
4374        for entry in self.cold.scan_prefix(&prefix) {
4375            if loaded >= limit {
4376                break;
4377            }
4378
4379            if let Ok((key, value)) = entry {
4380                // Load into hot cache
4381                self.hot.set(Arc::new(key.clone()), Arc::new(value), None);
4382                loaded += 1;
4383            }
4384        }
4385
4386        println!("Prewarmed {} with {} documents", collection, loaded);
4387        Ok(loaded)
4388    }
4389
4390    /// Prewarm cache for all collections
4391    pub async fn prewarm_all_collections(
4392        &self,
4393        docs_per_collection: Option<usize>,
4394    ) -> Result<HashMap<String, usize>> {
4395        let mut stats = HashMap::new();
4396
4397        // Get all collections
4398        let collections: Vec<String> = self
4399            .cold
4400            .scan()
4401            .filter_map(|r| r.ok())
4402            .map(|(k, _)| k)
4403            .filter(|k| k.starts_with("_collection:"))
4404            .map(|k| k.trim_start_matches("_collection:").to_string())
4405            .collect();
4406
4407        for collection in collections {
4408            let loaded = self.prewarm_cache(&collection, docs_per_collection).await?;
4409            stats.insert(collection, loaded);
4410        }
4411
4412        Ok(stats)
4413    }
4414
4415    /// Store multiple key-value pairs efficiently in a single batch operation
4416    ///
4417    /// Low-level batch write operation that bypasses document validation and
4418    /// writes raw byte data directly to storage. Useful for advanced use cases,
4419    /// custom serialization, or maximum performance scenarios.
4420    ///
4421    /// # Performance
4422    /// - Write speed: ~100,000 writes/sec
4423    /// - Single disk fsync for entire batch
4424    /// - No validation or schema checking
4425    /// - Direct storage access
4426    ///
4427    /// # Arguments
4428    /// * `pairs` - Vector of (key, value) tuples where value is raw bytes
4429    ///
4430    /// # Returns
4431    /// Success or an error
4432    ///
4433    /// # Examples
4434    ///
4435    /// ```ignore
4436    /// use aurora_db::Aurora;
4437    ///
4438    /// let db = Aurora::open("mydb.db")?;
4439    ///
4440    /// // Low-level batch write
4441    /// let pairs = vec![
4442    ///     ("users:123".to_string(), b"raw data 1".to_vec()),
4443    ///     ("users:456".to_string(), b"raw data 2".to_vec()),
4444    ///     ("cache:key1".to_string(), b"cached value".to_vec()),
4445    /// ];
4446    ///
4447    /// db.batch_write(pairs)?;
4448    ///
4449    /// // Custom binary serialization
4450    /// use bincode;
4451    ///
4452    /// #[derive(Serialize, Deserialize)]
4453    /// struct CustomData {
4454    ///     id: u64,
4455    ///     payload: Vec<u8>,
4456    /// }
4457    ///
4458    /// let custom_data = vec![
4459    ///     CustomData { id: 1, payload: vec![1, 2, 3] },
4460    ///     CustomData { id: 2, payload: vec![4, 5, 6] },
4461    /// ];
4462    ///
4463    /// let pairs: Vec<(String, Vec<u8>)> = custom_data
4464    ///     .iter()
4465    ///     .map(|data| {
4466    ///         let key = format!("binary:{}", data._sid);
4467    ///         let value = bincode::serialize(data).unwrap();
4468    ///         (key, value)
4469    ///     })
4470    ///     .collect();
4471    ///
4472    /// db.batch_write(pairs)?;
4473    ///
4474    /// // Bulk cache population
4475    /// let cache_entries: Vec<(String, Vec<u8>)> = (0..10000)
4476    ///     .map(|i| {
4477    ///         let key = format!("cache:item_{}", i);
4478    ///         let value = format!("value_{}", i).into_bytes();
4479    ///         (key, value)
4480    ///     })
4481    ///     .collect();
4482    ///
4483    /// db.batch_write(cache_entries)?;
4484    /// // Writes 10,000 entries in ~100ms
4485    /// ```
4486    ///
4487    /// # Important Notes
4488    /// - No schema validation performed
4489    /// - No unique constraint checking
4490    /// - No automatic indexing
4491    /// - Keys must follow "collection:id" format for proper grouping
4492    /// - Values are raw bytes - you handle serialization
4493    /// - Use `batch_insert()` for validated document inserts
4494    ///
4495    /// # When to Use
4496    /// - Maximum write performance needed
4497    /// - Custom serialization formats (bincode, msgpack, etc.)
4498    /// - Cache population
4499    /// - Low-level database operations
4500    /// - You're bypassing the document model
4501    ///
4502    /// # When NOT to Use
4503    /// - Regular document inserts → Use `batch_insert()` instead
4504    /// - Need validation → Use `batch_insert()` instead
4505    /// - Need indexing → Use `batch_insert()` instead
4506    ///
4507    /// # See Also
4508    /// - `batch_insert()` for validated document batch inserts
4509    /// - `put()` for single key-value writes
4510    pub async fn batch_write(&self, pairs: Vec<(String, Vec<u8>)>) -> Result<()> {
4511        // Group pairs by collection name
4512        let mut collections: HashMap<String, Vec<(String, Vec<u8>)>> = HashMap::new();
4513        for (key, value) in &pairs {
4514            if let Some(collection_name) = key.split(':').next() {
4515                collections
4516                    .entry(collection_name.to_string())
4517                    .or_default()
4518                    .push((key.clone(), value.clone()));
4519            }
4520        }
4521
4522        // First, do the batch write to cold storage for all pairs
4523        self.cold.batch_set(pairs)?;
4524
4525        // Then, process each collection for in-memory updates
4526        for (collection_name, batch) in collections {
4527            // --- Optimized Batch Indexing ---
4528
4529            // 1. Get schema once for the entire collection batch
4530            let collection_obj = match self.schema_cache.get(&collection_name) {
4531                Some(cached_schema) => Arc::clone(cached_schema.value()),
4532                None => {
4533                    let collection_key = format!("_collection:{}", collection_name);
4534                    match self.get(&collection_key)? {
4535                        Some(data) => {
4536                            let obj: Collection = serde_json::from_slice(&data)?;
4537                            let arc_obj = Arc::new(obj);
4538                            self.schema_cache
4539                                .insert(collection_name.to_string(), Arc::clone(&arc_obj));
4540                            arc_obj
4541                        }
4542                        None => continue,
4543                    }
4544                }
4545            };
4546
4547            let indexed_fields: Vec<String> = collection_obj
4548                .fields
4549                .iter()
4550                .filter(|(_, def)| def.indexed || def.unique)
4551                .map(|(name, _)| name.clone())
4552                .collect();
4553
4554            let primary_index = self
4555                .primary_indices
4556                .entry(collection_name.to_string())
4557                .or_default();
4558
4559            for (key, value) in batch {
4560                // 2. Update hot cache
4561                if self.should_cache_key(&key) {
4562                    self.hot
4563                        .set(Arc::new(key.clone()), Arc::new(value.clone()), None);
4564                }
4565
4566                // 3. Update primary index with metadata only
4567                let location = DiskLocation::new(value.len());
4568                primary_index.insert(key.clone(), location);
4569
4570                // 4. Update secondary indices
4571                if !indexed_fields.is_empty()
4572                    && let Ok(doc) = serde_json::from_slice::<Document>(&value)
4573                {
4574                    for (field, field_value) in doc.data {
4575                        if indexed_fields.contains(&field) {
4576                            let value_str = match &field_value {
4577                                Value::String(s) => s.clone(),
4578                                _ => field_value.to_string(),
4579                            };
4580                            let index_key = format!("{}:{}", collection_name, field);
4581                            let secondary_index =
4582                                self.secondary_indices.entry(index_key).or_default();
4583
4584                            let id = key.split(':').nth(1).unwrap_or("");
4585                            let internal_id = self.get_or_create_internal_id(id);
4586
4587                            let index_entry =
4588                                secondary_index.entry(value_str).or_insert_with(|| {
4589                                    Arc::new(RwLock::new(IndexStorage::Single(internal_id)))
4590                                });
4591
4592                            if let Ok(mut storage) = index_entry.value().write() {
4593                                storage.add(internal_id);
4594                            }
4595                        }
4596                    }
4597                }
4598            }
4599        }
4600
4601        Ok(())
4602    }
4603
4604    /// Scans the database for keys matching a specific prefix.
4605    ///
4606    /// This is a low-level method that returns an iterator over raw byte values.
4607    ///
4608    /// # Arguments
4609    /// * `prefix` - The key prefix to scan for.
4610    pub fn scan_with_prefix(
4611        &self,
4612        prefix: &str,
4613    ) -> impl Iterator<Item = Result<(String, Vec<u8>)>> + '_ {
4614        self.cold.scan_prefix(prefix)
4615    }
4616
4617    /// Returns storage efficiency and document count metrics for each collection.
4618    pub fn get_collection_stats(&self) -> Result<HashMap<String, CollectionStats>> {
4619        let mut stats = HashMap::new();
4620
4621        // Scan all collections
4622        let collections: Vec<String> = self
4623            .cold
4624            .scan()
4625            .filter_map(|r| r.ok())
4626            .map(|(k, _)| k)
4627            .filter(|k| k.starts_with("_collection:"))
4628            .map(|k| k.trim_start_matches("_collection:").to_string())
4629            .collect();
4630
4631        for collection in collections {
4632            // Use primary index for fast stats (count + size from DiskLocation)
4633            let (count, size) = if let Some(index) = self.primary_indices.get(&collection) {
4634                let count = index.len();
4635                // Sum up sizes from DiskLocation metadata (much faster than disk scan)
4636                let size: usize = index.iter().map(|entry| entry.value().size as usize).sum();
4637                (count, size)
4638            } else {
4639                // Fallback: scan from cold storage if index not available
4640                let prefix = format!("{}:", collection);
4641                let count = self.cold.scan_prefix(&prefix).count();
4642                let size: usize = self
4643                    .cold
4644                    .scan_prefix(&prefix)
4645                    .filter_map(|r| r.ok())
4646                    .map(|(_, v)| v.len())
4647                    .sum();
4648                (count, size)
4649            };
4650
4651            stats.insert(
4652                collection,
4653                CollectionStats {
4654                    count,
4655                    size_bytes: size,
4656                    avg_doc_size: if count > 0 { size / count } else { 0 },
4657                },
4658            );
4659        }
4660
4661        Ok(stats)
4662    }
4663
4664    /// Searches for documents where a field exactly matches a value using a secondary index.
4665    ///
4666    /// This is an optimized O(1) lookup path.
4667    pub fn search_by_value(
4668        &self,
4669        collection: &str,
4670        field: &str,
4671        value: &Value,
4672    ) -> Result<Vec<Document>> {
4673        let index_key = format!("_index:{}:{}", collection, field);
4674
4675        if let Some(index_data) = self.get(&index_key)? {
4676            let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
4677            let index = Index::new(index_def);
4678
4679            // Use the previously unused search method
4680            if let Some(doc_ids) = index.search(value) {
4681                // Load the documents by ID
4682                let mut docs = Vec::new();
4683                for id in doc_ids {
4684                    if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
4685                        let doc: Document = serde_json::from_slice(&doc_data)?;
4686                        docs.push(doc);
4687                    }
4688                }
4689                return Ok(docs);
4690            }
4691        }
4692
4693        // Return empty result if no index or no matches
4694        Ok(Vec::new())
4695    }
4696
4697    /// Performs a full-text search on an indexed text field.
4698    ///
4699    /// Returns results ranked by relevance if supported by the underlying index.
4700    pub fn full_text_search(
4701        &self,
4702        collection: &str,
4703        field: &str,
4704        query: &str,
4705    ) -> Result<Vec<Document>> {
4706        let index_key = format!("_index:{}:{}", collection, field);
4707
4708        if let Some(index_data) = self.get(&index_key)? {
4709            let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
4710
4711            // Ensure this is a full-text index
4712            if !matches!(index_def.index_type, IndexType::FullText) {
4713                return Err(AqlError::invalid_operation(format!(
4714                    "Field '{}' is not indexed as full-text",
4715                    field
4716                )));
4717            }
4718
4719            let index = Index::new(index_def);
4720
4721            // Use the previously unused search_text method
4722            if let Some(doc_id_scores) = index.search_text(query) {
4723                // Load the documents by ID, preserving score order
4724                let mut docs = Vec::new();
4725                for (id, _score) in doc_id_scores {
4726                    if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
4727                        let doc: Document = serde_json::from_slice(&doc_data)?;
4728                        docs.push(doc);
4729                    }
4730                }
4731                return Ok(docs);
4732            }
4733        }
4734
4735        // Return empty result if no index or no matches
4736        Ok(Vec::new())
4737    }
4738
4739    /// Creates a full-text search index on a text field.
4740    ///
4741    /// This enables high-performance keyword searching and relevance ranking.
4742    pub async fn create_text_index(
4743        &self,
4744        collection: &str,
4745        field: &str,
4746        _enable_stop_words: bool,
4747    ) -> Result<()> {
4748        // Check if collection exists
4749        if self.get(&format!("_collection:{}", collection))?.is_none() {
4750            return Err(AqlError::new(
4751                ErrorCode::CollectionNotFound,
4752                collection.to_string(),
4753            ));
4754        }
4755
4756        // Create index definition
4757        let index_def = IndexDefinition {
4758            name: format!("{}_{}_fulltext", collection, field),
4759            collection: collection.to_string(),
4760            fields: vec![field.to_string()],
4761            index_type: IndexType::FullText,
4762            unique: false,
4763        };
4764
4765        // Store index definition
4766        let index_key = format!("_index:{}:{}", collection, field);
4767        self.put(index_key, serde_json::to_vec(&index_def)?, None)
4768            .await?;
4769
4770        // Create the actual index
4771        let index = Index::new(index_def);
4772
4773        // Index all existing documents in the collection
4774        let prefix = format!("{}:", collection);
4775        for (_, data) in self.cold.scan_prefix(&prefix).flatten() {
4776            let doc: Document = serde_json::from_slice(&data)?;
4777            index.insert(&doc)?;
4778        }
4779
4780        Ok(())
4781    }
4782
4783    pub async fn execute_simple_query(
4784        &self,
4785        builder: &SimpleQueryBuilder,
4786    ) -> Result<Vec<Document>> {
4787        // Ensure indices are initialized
4788        self.ensure_indices_initialized().await?;
4789
4790        // A place to store the IDs of the documents we need to fetch
4791        let mut doc_ids_to_load: Option<Vec<String>> = None;
4792
4793        // --- The "Query Planner" ---
4794        // Smart heuristic: For range queries with small LIMITs, full scan can be faster
4795        // than collecting millions of IDs from secondary index
4796        let use_index_for_range = if let Some(limit) = builder.limit {
4797            // If limit is small (< 1000), prefer full scan for range queries
4798            // The secondary index would scan all entries anyway, might as well
4799            // scan documents directly and benefit from early termination
4800            limit >= 1000
4801        } else {
4802            // No limit? Index might still help if result set is small
4803            true
4804        };
4805
4806        // Look for an opportunity to use an index
4807        for (_filter_idx, filter) in builder.filters.iter().enumerate() {
4808            match filter {
4809                Filter::Eq(field, value) => {
4810                    let index_key = format!("{}:{}", &builder.collection, field);
4811                    if let Some(index) = self.secondary_indices.get(&index_key) {
4812                        if let Some(matching_ids_arc) = index.get(&value.to_string()) {
4813                            if let Ok(storage) = matching_ids_arc.value().read() {
4814                                let ids = storage
4815                                    .iter()
4816                                    .filter_map(|id| self.get_external_id(id))
4817                                    .collect();
4818                                doc_ids_to_load = Some(ids);
4819                                break;
4820                            }
4821                        }
4822                    }
4823                }
4824                Filter::Gt(field, value)
4825                | Filter::Gte(field, value)
4826                | Filter::Lt(field, value)
4827                | Filter::Lte(field, value) => {
4828                    // Skip index for range queries with small LIMITs (see query planner heuristic above)
4829                    if !use_index_for_range {
4830                        continue;
4831                    }
4832
4833                    let index_key = format!("{}:{}", &builder.collection, field);
4834
4835                    // Do we have a secondary index for this field?
4836                    if let Some(index) = self.secondary_indices.get(&index_key) {
4837                        // For range queries, we need to scan through the index values
4838                        let mut matching_ids = Vec::new();
4839
4840                        for entry in index.iter() {
4841                            let index_value_str = entry.key();
4842
4843                            // Try to parse the index value to compare with our filter value
4844                            if let Ok(index_value) =
4845                                self.parse_value_from_string(index_value_str, value)
4846                            {
4847                                let matches = match filter {
4848                                    Filter::Gt(_, filter_val) => index_value > *filter_val,
4849                                    Filter::Gte(_, filter_val) => index_value >= *filter_val,
4850                                    Filter::Lt(_, filter_val) => index_value < *filter_val,
4851                                    Filter::Lte(_, filter_val) => index_value <= *filter_val,
4852                                    _ => false,
4853                                };
4854
4855                                if matches {
4856                                    if let Ok(storage) = entry.value().read() {
4857                                        matching_ids.extend(
4858                                            storage
4859                                                .iter()
4860                                                .filter_map(|id| self.get_external_id(id)),
4861                                        );
4862                                    }
4863                                }
4864                            }
4865                        }
4866
4867                        if !matching_ids.is_empty() {
4868                            doc_ids_to_load = Some(matching_ids);
4869                            break;
4870                        }
4871                    }
4872                }
4873                Filter::Contains(field, search_term) => {
4874                    let index_key = format!("{}:{}", &builder.collection, field);
4875
4876                    // Do we have a secondary index for this field?
4877                    if let Some(index) = self.secondary_indices.get(&index_key) {
4878                        let mut matching_ids = Vec::new();
4879
4880                        for entry in index.iter() {
4881                            let index_value_str = entry.key();
4882
4883                            // Check if this indexed value contains our search term
4884                            if index_value_str
4885                                .to_lowercase()
4886                                .contains(&search_term.to_lowercase())
4887                            {
4888                                if let Ok(storage) = entry.value().read() {
4889                                    matching_ids.extend(
4890                                        storage.iter().filter_map(|id| self.get_external_id(id)),
4891                                    );
4892                                }
4893                            }
4894                        }
4895
4896                        if !matching_ids.is_empty() {
4897                            // Remove duplicates since a document could match multiple indexed values
4898                            matching_ids.sort();
4899                            matching_ids.dedup();
4900
4901                            doc_ids_to_load = Some(matching_ids);
4902                            break;
4903                        }
4904                    }
4905                }
4906                Filter::And(_) => {
4907                    // For compound filters, we can't easily use a single index
4908                    // This would require more complex query planning
4909                    continue;
4910                }
4911                Filter::Or(sub_filters) => {
4912                    // For OR filters, collect union of all matching IDs from each sub-filter
4913                    let mut union_ids: Vec<String> = Vec::new();
4914                    let mut used_index = false;
4915
4916                    for sub_filter in sub_filters {
4917                        match sub_filter {
4918                            Filter::Eq(field, value) => {
4919                                let index_key = format!("{}:{}", &builder.collection, field);
4920                                if let Some(index) = self.secondary_indices.get(&index_key) {
4921                                    if let Some(matching_ids_arc) = index.get(&value.to_string()) {
4922                                        if let Ok(storage) = matching_ids_arc.value().read() {
4923                                            union_ids.extend(
4924                                                storage
4925                                                    .iter()
4926                                                    .filter_map(|id| self.get_external_id(id)),
4927                                            );
4928                                            used_index = true;
4929                                        }
4930                                    }
4931                                }
4932                            }
4933                            // For other filter types in OR, we fall back to full scan
4934                            _ => continue,
4935                        }
4936                    }
4937
4938                    if used_index && !union_ids.is_empty() {
4939                        // Remove duplicates
4940                        union_ids.sort();
4941                        union_ids.dedup();
4942                        doc_ids_to_load = Some(union_ids);
4943                        break;
4944                    }
4945                    // If no index was used, continue to full scan
4946                }
4947                _ => continue,
4948            }
4949        }
4950
4951        let mut final_docs: Vec<Document>;
4952
4953        if let Some(ids) = doc_ids_to_load {
4954            // Index path
4955            use std::io::Write;
4956            if let Ok(mut file) = std::fs::OpenOptions::new()
4957                .create(true)
4958                .append(true)
4959                .open("/tmp/aurora_query_stats.log")
4960            {
4961                let _ = writeln!(
4962                    file,
4963                    "[INDEX PATH] IDs to load: {} | Collection: {}",
4964                    ids.len(),
4965                    builder.collection
4966                );
4967            }
4968
4969            final_docs = Vec::with_capacity(ids.len());
4970
4971            for id in ids {
4972                let doc_key = format!("{}:{}", &builder.collection, id);
4973                if let Some(data) = self.get(&doc_key)?
4974                    && let Ok(doc) = serde_json::from_slice::<Document>(&data)
4975                {
4976                    final_docs.push(doc);
4977                }
4978            }
4979        } else {
4980            // --- Path 2: Full Collection Scan with Early Termination ---
4981
4982            // Optimization: If we have a LIMIT but no ORDER BY, we can stop scanning
4983            // as soon as we have enough matching documents
4984            let early_termination_target = if builder.order_by.is_none() {
4985                builder.limit.map(|l| l + builder.offset.unwrap_or(0))
4986            } else {
4987                // With ORDER BY, we need all matching docs to sort correctly
4988                None
4989            };
4990
4991            // Smart scan with early termination support
4992            final_docs = Vec::new();
4993            let mut scan_stats = (0usize, 0usize, 0usize); // (keys_scanned, docs_fetched, matches_found)
4994
4995            if let Some(index) = self.primary_indices.get(&builder.collection) {
4996                for entry in index.iter() {
4997                    let key = entry.key();
4998                    scan_stats.0 += 1; // keys scanned
4999
5000                    // Early termination check
5001                    if let Some(target) = early_termination_target {
5002                        if final_docs.len() >= target {
5003                            break; // We have enough documents!
5004                        }
5005                    }
5006
5007                    // Fetch and filter document
5008                    if let Some(data) = self.get(key)? {
5009                        scan_stats.1 += 1; // docs fetched
5010
5011                        if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
5012                            // Apply all filters (Ticket 4 Optimized Path)
5013                            if builder.filters.iter().all(|filter| filter.matches(&doc)) {
5014                                scan_stats.2 += 1; // matches found
5015                                final_docs.push(doc);
5016                            }
5017                        }
5018                    }
5019                }
5020
5021                // Debug logging for query performance analysis
5022                use std::io::Write;
5023                if let Ok(mut file) = std::fs::OpenOptions::new()
5024                    .create(true)
5025                    .append(true)
5026                    .open("/tmp/aurora_query_stats.log")
5027                {
5028                    let _ = writeln!(
5029                        file,
5030                        "[SCAN PATH] Scanned: {} keys | Fetched: {} docs | Matched: {} | Collection: {}",
5031                        scan_stats.0, scan_stats.1, scan_stats.2, builder.collection
5032                    );
5033                }
5034            } else {
5035                // Fallback: scan from cold storage if index not initialized
5036                final_docs = self.get_all_collection(&builder.collection).await?;
5037
5038                // Apply filters
5039                final_docs.retain(|doc| builder.filters.iter().all(|filter| filter.matches(doc)));
5040            }
5041        }
5042
5043        // Apply ordering
5044        if let Some((field, ascending)) = &builder.order_by {
5045            final_docs.sort_by(|a, b| match (a.data.get(field), b.data.get(field)) {
5046                (Some(v1), Some(v2)) => {
5047                    let cmp = v1.cmp(v2);
5048                    if *ascending { cmp } else { cmp.reverse() }
5049                }
5050                (None, Some(_)) => std::cmp::Ordering::Less,
5051                (Some(_), None) => std::cmp::Ordering::Greater,
5052                (None, None) => std::cmp::Ordering::Equal,
5053            });
5054        }
5055
5056        // Apply offset and limit
5057        let start = builder.offset.unwrap_or(0);
5058        let end = builder
5059            .limit
5060            .map(|l| start.saturating_add(l))
5061            .unwrap_or(final_docs.len());
5062
5063        let end = end.min(final_docs.len());
5064        Ok(final_docs.get(start..end).unwrap_or(&[]).to_vec())
5065    }
5066
5067    /// Helper method to parse a string value back to a Value for comparison
5068    fn parse_value_from_string(&self, value_str: &str, reference_value: &Value) -> Result<Value> {
5069        match reference_value {
5070            Value::Int(_) => {
5071                if let Ok(i) = value_str.parse::<i64>() {
5072                    Ok(Value::Int(i))
5073                } else {
5074                    Err(AqlError::invalid_operation(
5075                        "Failed to parse int".to_string(),
5076                    ))
5077                }
5078            }
5079            Value::Float(_) => {
5080                if let Ok(f) = value_str.parse::<f64>() {
5081                    Ok(Value::Float(f))
5082                } else {
5083                    Err(AqlError::invalid_operation(
5084                        "Failed to parse float".to_string(),
5085                    ))
5086                }
5087            }
5088            Value::String(_) => Ok(Value::String(value_str.to_string())),
5089            _ => Ok(Value::String(value_str.to_string())),
5090        }
5091    }
5092
5093    pub async fn execute_dynamic_query(
5094        &self,
5095        collection: &str,
5096        payload: &QueryPayload,
5097    ) -> Result<Vec<Document>> {
5098        let mut docs = self.get_all_collection(collection).await?;
5099
5100        // 1. Apply Filters
5101        if let Some(filters) = &payload.filters {
5102            docs.retain(|doc| {
5103                filters.iter().all(|filter| {
5104                    doc.data
5105                        .get(&filter.field)
5106                        .is_some_and(|doc_val| check_filter(doc_val, filter))
5107                })
5108            });
5109        }
5110
5111        // 2. Apply Sorting
5112        if let Some(sort_options) = &payload.sort {
5113            docs.sort_by(|a, b| {
5114                let a_val = a.data.get(&sort_options.field);
5115                let b_val = b.data.get(&sort_options.field);
5116                let ordering = a_val
5117                    .partial_cmp(&b_val)
5118                    .unwrap_or(std::cmp::Ordering::Equal);
5119                if sort_options.ascending {
5120                    ordering
5121                } else {
5122                    ordering.reverse()
5123                }
5124            });
5125        }
5126
5127        // 3. Apply Pagination
5128        if let Some(offset) = payload.offset {
5129            docs = docs.into_iter().skip(offset).collect();
5130        }
5131        if let Some(limit) = payload.limit {
5132            docs = docs.into_iter().take(limit).collect();
5133        }
5134
5135        // 4. Apply Field Selection (Projection)
5136        if let Some(select_fields) = &payload.select
5137            && !select_fields.is_empty()
5138        {
5139            docs = docs
5140                .into_iter()
5141                .map(|mut doc| {
5142                    doc.data.retain(|key, _| select_fields.contains(key));
5143                    doc
5144                })
5145                .collect();
5146        }
5147
5148        Ok(docs)
5149    }
5150
5151    pub async fn process_network_request(
5152        &self,
5153        request: crate::network::protocol::Request,
5154    ) -> crate::network::protocol::Response {
5155        use crate::network::protocol::Response;
5156
5157        match request {
5158            crate::network::protocol::Request::Get(key) => match self.get(&key) {
5159                Ok(value) => Response::Success(value),
5160                Err(e) => Response::Error(e.to_string()),
5161            },
5162            crate::network::protocol::Request::Put(key, value) => {
5163                match self.put(key, value, None).await {
5164                    Ok(_) => Response::Done,
5165                    Err(e) => Response::Error(e.to_string()),
5166                }
5167            }
5168            crate::network::protocol::Request::Delete(key) => match self.delete(&key).await {
5169                Ok(_) => Response::Done,
5170                Err(e) => Response::Error(e.to_string()),
5171            },
5172            crate::network::protocol::Request::NewCollection { name, fields } => {
5173                let fields_for_db: Vec<(String, crate::types::FieldType, bool)> = fields
5174                    .iter()
5175                    .map(|(name, ft, unique)| (name.clone(), ft.clone(), *unique))
5176                    .collect();
5177
5178                match self.new_collection(&name, fields_for_db).await {
5179                    Ok(_) => Response::Done,
5180                    Err(e) => Response::Error(e.to_string()),
5181                }
5182            }
5183            crate::network::protocol::Request::Insert { collection, data } => {
5184                match self.insert_map(&collection, data).await {
5185                    Ok(id) => Response::Message(id),
5186                    Err(e) => Response::Error(e.to_string()),
5187                }
5188            }
5189            crate::network::protocol::Request::GetDocument { collection, id } => {
5190                match self.get_document(&collection, &id) {
5191                    Ok(doc) => Response::Document(doc),
5192                    Err(e) => Response::Error(e.to_string()),
5193                }
5194            }
5195            crate::network::protocol::Request::Query(builder) => {
5196                match self.execute_simple_query(&builder).await {
5197                    Ok(docs) => Response::Documents(docs),
5198                    Err(e) => Response::Error(e.to_string()),
5199                }
5200            }
5201            crate::network::protocol::Request::BeginTransaction => {
5202                let tx_id = self.begin_transaction().await;
5203                Response::TransactionId(tx_id.as_u64())
5204            }
5205            crate::network::protocol::Request::CommitTransaction(tx_id_u64) => {
5206                let tx_id = crate::transaction::TransactionId::from_u64(tx_id_u64);
5207                match self.commit_transaction(tx_id).await {
5208                    Ok(_) => Response::Done,
5209                    Err(e) => Response::Error(e.to_string()),
5210                }
5211            }
5212            crate::network::protocol::Request::RollbackTransaction(tx_id_u64) => {
5213                let tx_id = crate::transaction::TransactionId::from_u64(tx_id_u64);
5214                match self.rollback_transaction(tx_id).await {
5215                    Ok(_) => Response::Done,
5216                    Err(e) => Response::Error(e.to_string()),
5217                }
5218            }
5219        }
5220    }
5221
5222    /// Create indices for commonly queried fields automatically
5223    ///
5224    /// This is a convenience method that creates indices for fields that are
5225    /// likely to be queried frequently, improving performance.
5226    ///
5227    /// # Arguments
5228    /// * `collection` - Name of the collection
5229    /// * `fields` - List of field names to create indices for
5230    ///
5231    /// # Examples
5232    /// ```ignore
5233    /// // Create indices for commonly queried fields
5234    /// db.create_indices("users", &["email", "status", "created_at"]).await?;
5235    /// ```
5236    pub async fn create_indices(&self, collection: &str, fields: &[&str]) -> Result<()> {
5237        for field in fields {
5238            if let Err(e) = self.create_index(collection, field).await {
5239                eprintln!(
5240                    "Warning: Failed to create index for {}.{}: {}",
5241                    collection, field, e
5242                );
5243            } else {
5244                println!("Created index for {}.{}", collection, field);
5245            }
5246        }
5247        Ok(())
5248    }
5249
5250    /// Get index statistics for a collection
5251    ///
5252    /// This helps understand which indices exist and how effective they are.
5253    pub fn get_index_stats(&self, collection: &str) -> HashMap<String, IndexStats> {
5254        let mut stats = HashMap::new();
5255
5256        for entry in self.secondary_indices.iter() {
5257            let key = entry.key();
5258            if key.starts_with(&format!("{}:", collection)) {
5259                let field = key.split(':').nth(1).unwrap_or("unknown");
5260                let index = entry.value();
5261
5262                let unique_values = index.len();
5263                let total_documents: usize = index
5264                    .iter()
5265                    .map(|entry| {
5266                        if let Ok(storage) = entry.value().read() {
5267                            storage.to_bitmap().len() as usize
5268                        } else {
5269                            0
5270                        }
5271                    })
5272                    .sum();
5273
5274                stats.insert(
5275                    field.to_string(),
5276                    IndexStats {
5277                        unique_values,
5278                        total_documents,
5279                        avg_docs_per_value: if unique_values > 0 {
5280                            total_documents / unique_values
5281                        } else {
5282                            0
5283                        },
5284                    },
5285                );
5286            }
5287        }
5288
5289        stats
5290    }
5291
5292    /// Optimize a collection by creating indices for frequently filtered fields
5293    ///
5294    /// This analyzes common query patterns and suggests/creates optimal indices.
5295    pub async fn optimize_collection(&self, collection: &str) -> Result<()> {
5296        if let Ok(collection_def) = self.get_collection_definition(collection) {
5297            let field_names: Vec<&str> = collection_def.fields.keys().map(|s| s.as_str()).collect();
5298            self.create_indices(collection, &field_names).await?;
5299        }
5300
5301        Ok(())
5302    }
5303
5304    // Helper method to get unique fields from a collection
5305    fn get_unique_fields(&self, collection: &Collection) -> Vec<String> {
5306        collection
5307            .fields
5308            .iter()
5309            .filter(|(_, def)| def.unique)
5310            .map(|(name, _)| name.clone())
5311            .collect()
5312    }
5313
5314    // Update the validation method to use the helper
5315    async fn validate_unique_constraints(
5316        &self,
5317        collection: &str,
5318        data: &HashMap<String, Value>,
5319    ) -> Result<()> {
5320        self.ensure_indices_initialized().await?;
5321        let collection_def = match self.get_collection_definition(collection) {
5322            Ok(def) => def,
5323            Err(e) if e.code == crate::error::ErrorCode::CollectionNotFound => return Ok(()),
5324            Err(e) => return Err(e),
5325        };
5326
5327        // 1. Unique Constraints
5328        let unique_fields = self.get_unique_fields(&collection_def);
5329        for unique_field in &unique_fields {
5330            if let Some(value) = data.get(unique_field) {
5331                let index_key = format!("{}:{}", collection, unique_field);
5332                if let Some(_index) = self.secondary_indices.get(&index_key) {
5333                    // Get the raw string value without JSON formatting
5334                    let value_str = match value {
5335                        Value::String(s) => s.clone(),
5336                        _ => value.to_string(),
5337                    };
5338                    if let Some(storage) = self.get_indexed_storage(&index_key, &value_str) {
5339                        if let Ok(s) = storage.read() {
5340                            if !s.is_empty() {
5341                                return Err(AqlError::new(
5342                                    ErrorCode::UniqueConstraintViolation,
5343                                    format!(
5344                                        "Unique constraint violation on field '{}' with value '{}'",
5345                                        unique_field, value_str
5346                                    ),
5347                                ));
5348                            }
5349                        }
5350                    }
5351                }
5352            }
5353        }
5354
5355        // 2. Relation Constraints (Foreign Keys)
5356        for (field_name, field_def) in &collection_def.fields {
5357            if let Some(rel) = &field_def.relation {
5358                if let Some(val) = data.get(field_name) {
5359                    if !matches!(val, Value::Null) {
5360                        let fk_value = match val {
5361                            Value::String(s) => s.clone(),
5362                            Value::Uuid(u) => u.to_string(),
5363                            _ => val.to_string(),
5364                        };
5365
5366                        // Check if the referenced ID exists in the target collection
5367                        // If the key is 'id' or '_sid', we can use get_document directly
5368                        let exists = if rel.key == "id" || rel.key == "_sid" {
5369                            self.get_document(&rel.to, &fk_value)?.is_some()
5370                        } else {
5371                            // Otherwise, we need to check the index of the target collection
5372                            let index_key = format!("{}:{}", rel.to, rel.key);
5373                            if let Some(storage) = self.get_indexed_storage(&index_key, &fk_value) {
5374                                if let Ok(s) = storage.read() {
5375                                    !s.is_empty()
5376                                } else {
5377                                    false
5378                                }
5379                            } else {
5380                                // Target field is not indexed, fallback to scan (slow)
5381                                let docs = self.scan_and_filter(
5382                                    &rel.to,
5383                                    |d| {
5384                                        d.data.get(&rel.key).map(|v| v.to_string())
5385                                            == Some(fk_value.clone())
5386                                    },
5387                                    Some(1),
5388                                )?;
5389                                !docs.is_empty()
5390                            }
5391                        };
5392
5393                        if !exists {
5394                            return Err(AqlError::new(
5395                                ErrorCode::ValidationError,
5396                                format!(
5397                                    "Foreign key violation: value '{}' for field '{}' does not exist in collection '{}' (field '{}')",
5398                                    fk_value, field_name, rel.to, rel.key
5399                                ),
5400                            ));
5401                        }
5402                    }
5403                }
5404            }
5405        }
5406
5407        Ok(())
5408    }
5409
5410    /// Validate unique constraints excluding a specific document ID (for updates)
5411    async fn validate_unique_constraints_excluding(
5412        &self,
5413        collection: &str,
5414        data: &HashMap<String, Value>,
5415        exclude_id: &str,
5416    ) -> Result<()> {
5417        self.ensure_indices_initialized().await?;
5418        let collection_def = self.get_collection_definition(collection)?;
5419        let unique_fields = self.get_unique_fields(&collection_def);
5420
5421        for unique_field in &unique_fields {
5422            if let Some(value) = data.get(unique_field) {
5423                let index_key = format!("{}:{}", collection, unique_field);
5424                if let Some(index) = self.secondary_indices.get(&index_key) {
5425                    // Get the raw string value without JSON formatting
5426                    let value_str = match value {
5427                        Value::String(s) => s.clone(),
5428                        _ => value.to_string(),
5429                    };
5430                    if let Some(doc_ids_arc) = index.get(&value_str) {
5431                        let exclude_internal = self
5432                            ._sid_dictionary
5433                            .get(&self.parse_external_id(exclude_id))
5434                            .map(|e| *e.value());
5435
5436                        if let Ok(storage) = doc_ids_arc.value().read() {
5437                            let has_violation = if let Some(exc) = exclude_internal {
5438                                // Normal path: dictionary populated, use bitmap exclusion.
5439                                storage.iter().any(|id| id != exc)
5440                            } else {
5441                                // _sid_dictionary not yet populated (first update after
5442                                // restart — the dictionary is rebuilt lazily, not from the
5443                                // checkpoint). Fall back to a cold-store ownership check:
5444                                // if the document being updated already holds this exact
5445                                // unique value, it is NOT a violation.
5446                                let cold_key = format!("{}:{}", collection, exclude_id);
5447                                let doc_owns_value = self
5448                                    .get(&cold_key)
5449                                    .ok()
5450                                    .flatten()
5451                                    .and_then(|raw| serde_json::from_slice::<Document>(&raw).ok())
5452                                    .and_then(|doc| doc.data.get(unique_field).cloned())
5453                                    .map(|existing_val| match &existing_val {
5454                                        Value::String(s) => *s == value_str,
5455                                        _ => existing_val.to_string() == value_str,
5456                                    })
5457                                    .unwrap_or(false);
5458
5459                                if doc_owns_value {
5460                                    // Warm up the dictionary so subsequent checks on this
5461                                    // document use the fast path instead of cold storage.
5462                                    self.get_or_create_internal_id(exclude_id);
5463                                    false
5464                                } else {
5465                                    !storage.is_empty()
5466                                }
5467                            };
5468
5469                            if has_violation {
5470                                return Err(AqlError::new(
5471                                    ErrorCode::UniqueConstraintViolation,
5472                                    format!(
5473                                        "Unique constraint violation on field '{}' with value '{}'",
5474                                        unique_field, value_str
5475                                    ),
5476                                ));
5477                            }
5478                        }
5479                    }
5480                }
5481            }
5482        }
5483
5484        // 2. Relation Constraints (Foreign Keys)
5485        for (field_name, field_def) in &collection_def.fields {
5486            if let Some(rel) = &field_def.relation {
5487                if let Some(val) = data.get(field_name) {
5488                    if !matches!(val, Value::Null) {
5489                        let fk_value = match val {
5490                            Value::String(s) => s.clone(),
5491                            Value::Uuid(u) => u.to_string(),
5492                            _ => val.to_string(),
5493                        };
5494
5495                        let exists = if rel.key == "id" || rel.key == "_sid" {
5496                            self.get_document(&rel.to, &fk_value)?.is_some()
5497                        } else {
5498                            let index_key = format!("{}:{}", rel.to, rel.key);
5499                            if let Some(storage) = self.get_indexed_storage(&index_key, &fk_value) {
5500                                if let Ok(s) = storage.read() {
5501                                    !s.is_empty()
5502                                } else {
5503                                    false
5504                                }
5505                            } else {
5506                                let docs = self.scan_and_filter(
5507                                    &rel.to,
5508                                    |d| {
5509                                        d.data.get(&rel.key).map(|v| v.to_string())
5510                                            == Some(fk_value.clone())
5511                                    },
5512                                    Some(1),
5513                                )?;
5514                                !docs.is_empty()
5515                            }
5516                        };
5517
5518                        if !exists {
5519                            return Err(AqlError::new(
5520                                ErrorCode::ValidationError,
5521                                format!(
5522                                    "Foreign key violation: value '{}' for field '{}' does not exist in collection '{}' (field '{}')",
5523                                    fk_value, field_name, rel.to, rel.key
5524                                ),
5525                            ));
5526                        }
5527                    }
5528                }
5529            }
5530        }
5531
5532        Ok(())
5533    }
5534
5535    // =========================================================================
5536    // AQL Executor Helper Methods
5537    // These are wrapper methods for AQL executor integration.
5538    // They provide a simplified API compatible with the AQL query execution layer.
5539    // =========================================================================
5540
5541    /// Get all documents in a collection (AQL helper)
5542    ///
5543    /// This is a wrapper around the internal query system optimized for bulk retrieval.
5544    pub async fn aql_get_all_collection(&self, collection: &str) -> Result<Vec<Document>> {
5545        self.ensure_indices_initialized().await?;
5546
5547        let prefix = format!("{}:", collection);
5548        let mut docs = Vec::new();
5549
5550        for result in self.cold.scan_prefix(&prefix) {
5551            let (_, value) = result?;
5552            if let Ok(doc) = serde_json::from_slice::<Document>(&value) {
5553                docs.push(doc);
5554            }
5555        }
5556
5557        Ok(docs)
5558    }
5559
5560    /// Insert a document from a HashMap (AQL helper)
5561    ///
5562    /// Returns the complete document (not just ID) for AQL executor
5563    pub async fn aql_insert(
5564        &self,
5565        collection: &str,
5566        data: HashMap<String, Value>,
5567    ) -> Result<Document> {
5568        // Validate unique constraints before inserting
5569        self.validate_unique_constraints(collection, &data).await?;
5570
5571        // Use 'id' from data as the internal system ID if provided and it's a string/uuid
5572        let sid = if let Some(v) = data.get("id") {
5573            match v {
5574                Value::String(s) => s.clone(),
5575                Value::Uuid(u) => u.to_string(),
5576                _ => Uuid::now_v7().to_string(),
5577            }
5578        } else {
5579            Uuid::now_v7().to_string()
5580        };
5581
5582        let document = Document {
5583            _sid: sid.clone(),
5584            data,
5585        };
5586
5587        self.put(
5588            format!("{}:{}", collection, sid),
5589            serde_json::to_vec(&document)?,
5590            None,
5591        )
5592        .await?;
5593
5594        // Publish insert event
5595        let event = crate::pubsub::ChangeEvent::insert(collection, &sid, document.clone());
5596        let _ = self.pubsub.publish(event);
5597
5598        Ok(document)
5599    }
5600
5601    /// Update a document by ID with new data (AQL helper)
5602    ///
5603    /// Merges new data with existing data and returns updated document
5604    pub async fn aql_update_document(
5605        &self,
5606        collection: &str,
5607        doc_id: &str,
5608        updates: HashMap<String, Value>,
5609    ) -> Result<Document> {
5610        let key = format!("{}:{}", collection, doc_id);
5611
5612        // Get existing document
5613        let existing = self.get(&key)?.ok_or_else(|| {
5614            AqlError::new(
5615                ErrorCode::NotFound,
5616                format!("Document {} not found", doc_id),
5617            )
5618        })?;
5619
5620        let mut doc: Document = serde_json::from_slice(&existing)?;
5621        let old_doc = doc.clone();
5622
5623        // Merge updates into existing data
5624        for (k, v) in updates {
5625            doc.data.insert(k, v);
5626        }
5627
5628        // Validate unique constraints excluding this document
5629        self.validate_unique_constraints_excluding(collection, &doc.data, doc_id)
5630            .await?;
5631
5632        // Save updated document
5633        self.put(key, serde_json::to_vec(&doc)?, None).await?;
5634
5635        // Remove stale secondary-index entries for fields whose values changed.
5636        // `put` adds the new value's bitmap entry but cannot know the previous
5637        // value, so without this the old entry lingers and blocks reuse of that
5638        // value under a unique constraint.
5639        let external_u128 = self.parse_external_id(doc_id);
5640        if let Some(internal_id) = self._sid_dictionary.get(&external_u128).map(|e| *e.value()) {
5641            for (field, old_val) in &old_doc.data {
5642                if let Some(new_val) = doc.data.get(field) {
5643                    if new_val == old_val {
5644                        continue;
5645                    }
5646                }
5647
5648                let index_key = format!("{}:{}", collection, field);
5649                if let Some(index_map) = self.secondary_indices.get(&index_key) {
5650                    let val_str = match old_val {
5651                        Value::String(s) => s.clone(),
5652                        _ => old_val.to_string(),
5653                    };
5654                    if let Some(storage_arc) = index_map.get(&val_str) {
5655                        if let Ok(mut storage) = storage_arc.value().write() {
5656                            storage.remove(internal_id);
5657                        }
5658                    }
5659                }
5660            }
5661        }
5662
5663        // Publish update event
5664        let event = crate::pubsub::ChangeEvent::update(collection, doc_id, old_doc, doc.clone());
5665        let _ = self.pubsub.publish(event);
5666
5667        Ok(doc)
5668    }
5669
5670    /// Delete a document by ID (AQL helper)
5671    ///
5672    /// Returns the deleted document
5673    pub async fn aql_delete_document(&self, collection: &str, doc_id: &str) -> Result<Document> {
5674        let key = format!("{}:{}", collection, doc_id);
5675
5676        // Get existing document first
5677        let existing = self.get(&key)?.ok_or_else(|| {
5678            AqlError::new(
5679                ErrorCode::NotFound,
5680                format!("Document {} not found", doc_id),
5681            )
5682        })?;
5683
5684        let doc: Document = serde_json::from_slice(&existing)?;
5685
5686        // Publish delete event before the transaction check so handlers fire
5687        // consistently regardless of whether this is inside an auto-transaction.
5688        // This mirrors aql_insert / aql_update_document which publish after put()
5689        // even when the underlying write is buffered in a transaction.
5690        let event = crate::pubsub::ChangeEvent::delete(collection, doc_id);
5691        let _ = self.pubsub.publish(event);
5692
5693        // If inside a transaction, buffer the delete instead of writing to storage.
5694        if let Ok(tx_id) = crate::transaction::ACTIVE_TRANSACTION_ID.try_with(|id| *id) {
5695            if let Some(buffer) = self.transaction_manager.active_transactions.get(&tx_id) {
5696                buffer.delete(key);
5697                return Ok(doc);
5698            }
5699        }
5700
5701        // Append to WAL for durability
5702        if self.config.durability_mode != DurabilityMode::None {
5703            if let Some(wal_writer) = &self.wal_writer {
5704                let op = WalOperation::Delete { key: key.clone() };
5705                wal_writer.send(op).map_err(|_| {
5706                    AqlError::new(
5707                        ErrorCode::InternalError,
5708                        "Failed to send to WAL writer".to_string(),
5709                    )
5710                })?;
5711            } else if let Some(wal) = &self.wal {
5712                wal.write()
5713                    .map_err(|e| AqlError::new(ErrorCode::InternalError, e.to_string()))?
5714                    .append(crate::wal::Operation::Delete, &key, None)?;
5715            }
5716        }
5717
5718        // Delete from storage
5719        self.cold.delete(&key)?;
5720        self.hot.delete(&key);
5721
5722        // Remove from primary index
5723        if let Some(index) = self.primary_indices.get_mut(collection) {
5724            index.remove(&key);
5725        }
5726
5727        // Remove from secondary indices
5728        for (field_name, value) in &doc.data {
5729            let index_key = format!("{}:{}", collection, field_name);
5730            if let Some(index) = self.secondary_indices.get(&index_key) {
5731                let value_str = match value {
5732                    Value::String(s) => s.clone(),
5733                    _ => value.to_string(),
5734                };
5735                if let Some(doc_ids_arc) = index.get(&value_str) {
5736                    let external_u128 = self.parse_external_id(doc_id);
5737                    if let Some(internal_id_entry) = self._sid_dictionary.get(&external_u128) {
5738                        if let Ok(mut storage) = doc_ids_arc.value().write() {
5739                            storage.remove(*internal_id_entry.value());
5740                        }
5741                    }
5742                }
5743            }
5744        }
5745
5746        // Return the internal ID to the recycling pool
5747        let external_u128 = self.parse_external_id(doc_id);
5748        if let Some(internal_id_entry) = self._sid_dictionary.get(&external_u128) {
5749            if let Ok(mut deleted) = self.deleted_ids.write() {
5750                deleted.insert(*internal_id_entry.value());
5751            }
5752        }
5753
5754        // Event was already published above (before transaction check).
5755
5756        Ok(doc)
5757    }
5758
5759    /// Get a single document by ID (AQL helper)
5760    pub async fn aql_get_document(
5761        &self,
5762        collection: &str,
5763        doc_id: &str,
5764    ) -> Result<Option<Document>> {
5765        let key = format!("{}:{}", collection, doc_id);
5766
5767        match self.get(&key)? {
5768            Some(data) => {
5769                let doc: Document = serde_json::from_slice(&data)?;
5770                Ok(Some(doc))
5771            }
5772            None => Ok(None),
5773        }
5774    }
5775
5776    /// Begin a transaction (AQL helper) - returns transaction ID
5777    pub async fn aql_begin_transaction(&self) -> Result<crate::transaction::TransactionId> {
5778        Ok(self.begin_transaction().await)
5779    }
5780
5781    /// Commit a transaction (AQL helper)
5782    pub async fn aql_commit_transaction(
5783        &self,
5784        tx_id: crate::transaction::TransactionId,
5785    ) -> Result<()> {
5786        self.commit_transaction(tx_id).await
5787    }
5788
5789    /// Rollback a transaction (AQL helper)
5790    pub async fn aql_rollback_transaction(
5791        &self,
5792        tx_id: crate::transaction::TransactionId,
5793    ) -> Result<()> {
5794        self.transaction_manager.rollback(tx_id)
5795    }
5796
5797    // ============================================
5798    // AQL Schema Management Wrappers
5799    // ============================================
5800
5801    /// Create a collection from AST schema definition
5802    pub async fn create_collection_schema(
5803        &self,
5804        name: &str,
5805        fields: HashMap<String, crate::types::FieldDefinition>,
5806    ) -> Result<()> {
5807        let collection_key = format!("_collection:{}", name);
5808
5809        // Check if collection already exists
5810        if self.get(&collection_key)?.is_some() {
5811            // Already exists
5812            return Ok(());
5813        }
5814
5815        let collection = Collection {
5816            name: name.to_string(),
5817            fields,
5818        };
5819
5820        let collection_data = serde_json::to_vec(&collection)?;
5821        self.put(collection_key, collection_data, None).await?;
5822        self.schema_cache.remove(name);
5823
5824        Ok(())
5825    }
5826
5827    /// Add a field to an existing collection schema
5828    pub async fn add_field_to_schema(
5829        &self,
5830        collection_name: &str,
5831        name: String,
5832        definition: crate::types::FieldDefinition,
5833    ) -> Result<()> {
5834        let mut collection = self
5835            .get_collection_definition(collection_name)
5836            .map_err(|_| {
5837                AqlError::new(ErrorCode::CollectionNotFound, collection_name.to_string())
5838            })?;
5839
5840        if collection.fields.contains_key(&name) {
5841            return Err(AqlError::new(
5842                ErrorCode::InvalidDefinition,
5843                format!(
5844                    "Field '{}' already exists in collection '{}'",
5845                    name, collection_name
5846                ),
5847            ));
5848        }
5849
5850        collection.fields.insert(name, definition);
5851
5852        let collection_key = format!("_collection:{}", collection_name);
5853        let collection_data = serde_json::to_vec(&collection)?;
5854        self.put(collection_key, collection_data, None).await?;
5855        self.schema_cache.remove(collection_name);
5856
5857        Ok(())
5858    }
5859
5860    /// Drop a field from an existing collection schema
5861    pub async fn drop_field_from_schema(
5862        &self,
5863        collection_name: &str,
5864        field_name: String,
5865    ) -> Result<()> {
5866        let mut collection = self
5867            .get_collection_definition(collection_name)
5868            .map_err(|_| {
5869                AqlError::new(ErrorCode::CollectionNotFound, collection_name.to_string())
5870            })?;
5871
5872        if !collection.fields.contains_key(&field_name) {
5873            return Err(AqlError::new(
5874                ErrorCode::InvalidDefinition,
5875                format!(
5876                    "Field '{}' does not exist in collection '{}'",
5877                    field_name, collection_name
5878                ),
5879            ));
5880        }
5881
5882        collection.fields.remove(&field_name);
5883
5884        let collection_key = format!("_collection:{}", collection_name);
5885        let collection_data = serde_json::to_vec(&collection)?;
5886        self.put(collection_key, collection_data, None).await?;
5887        self.schema_cache.remove(collection_name);
5888
5889        Ok(())
5890    }
5891
5892    /// Rename a field in an existing collection schema
5893    pub async fn rename_field_in_schema(
5894        &self,
5895        collection_name: &str,
5896        from: String,
5897        to: String,
5898    ) -> Result<()> {
5899        let mut collection = self
5900            .get_collection_definition(collection_name)
5901            .map_err(|_| {
5902                AqlError::new(ErrorCode::CollectionNotFound, collection_name.to_string())
5903            })?;
5904
5905        if let Some(def) = collection.fields.remove(&from) {
5906            if collection.fields.contains_key(&to) {
5907                return Err(AqlError::new(
5908                    ErrorCode::InvalidDefinition,
5909                    format!(
5910                        "Target field name '{}' already exists in collection '{}'",
5911                        to, collection_name
5912                    ),
5913                ));
5914            }
5915            collection.fields.insert(to, def);
5916        } else {
5917            return Err(AqlError::new(
5918                ErrorCode::InvalidDefinition,
5919                format!("Field '{}' not found", from),
5920            ));
5921        }
5922
5923        let collection_key = format!("_collection:{}", collection_name);
5924        let collection_data = serde_json::to_vec(&collection)?;
5925        self.put(collection_key, collection_data, None).await?;
5926        self.schema_cache.remove(collection_name);
5927
5928        Ok(())
5929    }
5930
5931    /// Modify a field in an existing collection schema
5932    pub async fn modify_field_in_schema(
5933        &self,
5934        collection_name: &str,
5935        name: String,
5936        definition: crate::types::FieldDefinition,
5937    ) -> Result<()> {
5938        let mut collection = self
5939            .get_collection_definition(collection_name)
5940            .map_err(|_| {
5941                AqlError::new(ErrorCode::CollectionNotFound, collection_name.to_string())
5942            })?;
5943
5944        if !collection.fields.contains_key(&name) {
5945            return Err(AqlError::new(
5946                ErrorCode::InvalidDefinition,
5947                format!(
5948                    "Field '{}' does not exist in collection '{}'",
5949                    name, collection_name
5950                ),
5951            ));
5952        }
5953
5954        collection.fields.insert(name, definition);
5955
5956        let collection_key = format!("_collection:{}", collection_name);
5957        let collection_data = serde_json::to_vec(&collection)?;
5958        self.put(collection_key, collection_data, None).await?;
5959        self.schema_cache.remove(collection_name);
5960
5961        Ok(())
5962    }
5963
5964    /// Drop an entire collection definition
5965    pub async fn drop_collection_schema(&self, collection_name: &str) -> Result<()> {
5966        let collection_key = format!("_collection:{}", collection_name);
5967        // Write to WAL before cold delete so a crash doesn't leave orphaned docs
5968        // with no schema to describe them.
5969        if self.config.durability_mode != DurabilityMode::None {
5970            if let Some(wal_writer) = &self.wal_writer {
5971                let _ = wal_writer.send(WalOperation::Delete {
5972                    key: collection_key.clone(),
5973                });
5974            }
5975        }
5976        self.cold.delete(&collection_key)?;
5977        self.schema_cache.remove(collection_name);
5978        Ok(())
5979    }
5980
5981    // ============================================
5982    // AQL Migration Wrappers
5983    // ============================================
5984
5985    /// Check if a migration version has been applied
5986    pub async fn is_migration_applied(&self, version: &str) -> Result<bool> {
5987        let migration_key = format!("_sys_migration:{}", version);
5988        Ok(self.get(&migration_key)?.is_some())
5989    }
5990
5991    /// Mark a migration version as applied
5992    pub async fn mark_migration_applied(&self, version: &str) -> Result<()> {
5993        let migration_key = format!("_sys_migration:{}", version);
5994        let timestamp = chrono::Utc::now().to_rfc3339();
5995        self.put(migration_key.clone(), timestamp.as_bytes().to_vec(), None)
5996            .await?;
5997
5998        // Also store as a document in _migrations so it's queryable via AQL
5999        let mut data = HashMap::new();
6000        data.insert("version".to_string(), Value::String(version.to_string()));
6001        data.insert("status".to_string(), Value::String("applied".to_string()));
6002        data.insert("applied_at".to_string(), Value::String(timestamp));
6003        let doc = Document {
6004            _sid: version.to_string(),
6005            data,
6006        };
6007        let doc_key = format!("_migrations:{}", version);
6008        self.put(doc_key, serde_json::to_vec(&doc)?, None).await?;
6009
6010        Ok(())
6011    }
6012}
6013
6014impl Drop for Aurora {
6015    fn drop(&mut self) {
6016        // Signal checkpoint task to shutdown gracefully
6017        if let Some(ref shutdown_tx) = self.checkpoint_shutdown {
6018            let _ = shutdown_tx.send(());
6019        }
6020        // Signal compaction task to shutdown gracefully
6021        if let Some(ref shutdown_tx) = self.compaction_shutdown {
6022            let _ = shutdown_tx.send(());
6023        }
6024    }
6025}
6026
6027fn check_filter(doc_val: &Value, filter: &HttpFilter) -> bool {
6028    let filter_val = match json_to_value(&filter.value) {
6029        Ok(v) => v,
6030        Err(_) => return false,
6031    };
6032
6033    match filter.operator {
6034        FilterOperator::Eq => doc_val == &filter_val,
6035        FilterOperator::Ne => doc_val != &filter_val,
6036        FilterOperator::Gt => doc_val > &filter_val,
6037        FilterOperator::Gte => doc_val >= &filter_val,
6038        FilterOperator::Lt => doc_val < &filter_val,
6039        FilterOperator::Lte => doc_val <= &filter_val,
6040        FilterOperator::Contains => match (doc_val, &filter_val) {
6041            (Value::String(s), Value::String(fv)) => s.contains(fv),
6042            (Value::Array(arr), _) => arr.contains(&filter_val),
6043            _ => false,
6044        },
6045    }
6046}
6047
6048/// Results of importing a document
6049enum ImportResult {
6050    Imported,
6051    Skipped,
6052}
6053
6054/// Statistics from an import operation
6055#[derive(Debug, Default)]
6056pub struct ImportStats {
6057    /// Number of documents successfully imported
6058    pub imported: usize,
6059    /// Number of documents skipped (usually because they already exist)
6060    pub skipped: usize,
6061    /// Number of documents that failed to import
6062    pub failed: usize,
6063}
6064
6065/// Statistics for a specific collection
6066#[derive(Debug)]
6067pub struct CollectionStats {
6068    /// Number of documents in the collection
6069    pub count: usize,
6070    /// Total size of the collection in bytes
6071    pub size_bytes: usize,
6072    /// Average document size in bytes
6073    pub avg_doc_size: usize,
6074}
6075
6076/// Statistics for an index
6077#[derive(Debug)]
6078pub struct IndexStats {
6079    /// Number of unique values in the index
6080    pub unique_values: usize,
6081    /// Total number of documents covered by the index
6082    pub total_documents: usize,
6083    /// Average number of documents per unique value
6084    pub avg_docs_per_value: usize,
6085}
6086
6087/// Combined database statistics
6088#[derive(Debug)]
6089pub struct DatabaseStats {
6090    /// Hot cache statistics
6091    pub hot_stats: crate::storage::hot::CacheStats,
6092    /// Cold storage statistics
6093    pub cold_stats: crate::storage::cold::ColdStoreStats,
6094    /// Estimated total database size in bytes
6095    pub estimated_size: u64,
6096    /// Statistics for each collection
6097    pub collections: HashMap<String, CollectionStats>,
6098}
6099
6100#[cfg(test)]
6101mod tests {
6102    use super::*;
6103    use tempfile::tempdir;
6104
6105    #[tokio::test]
6106    async fn test_async_wal_integration() {
6107        let temp_dir = tempfile::tempdir().unwrap();
6108        let db_path = temp_dir.path().join("test_wal_integration.db");
6109        let db = Aurora::open(db_path.to_str().unwrap()).await.unwrap();
6110
6111        // 1. Enable WAL explicitly (if your config defaults to off for tests)
6112        // Note: Your implementation might default it on based on AuroraConfig.
6113
6114        // Create collection schema first
6115        db.new_collection(
6116            "users",
6117            vec![
6118                (
6119                    "id",
6120                    FieldType::Scalar(crate::types::ScalarType::String),
6121                    false,
6122                ),
6123                (
6124                    "counter",
6125                    FieldType::Scalar(crate::types::ScalarType::Int),
6126                    false,
6127                ),
6128            ],
6129        )
6130        .await
6131        .unwrap();
6132
6133        let db_clone = db.clone();
6134
6135        // 2. Spawn 50 concurrent writes
6136        let handles: Vec<_> = (0..50)
6137            .map(|i| {
6138                let db = db_clone.clone();
6139                tokio::spawn(async move {
6140                    let doc_id = db
6141                        .insert_into(
6142                            "users",
6143                            vec![
6144                                ("id", Value::String(format!("user-{}", i))),
6145                                ("counter", Value::Int(i)),
6146                            ],
6147                        )
6148                        .await
6149                        .unwrap();
6150                    (i, doc_id) // Return both the index and the document ID
6151                })
6152            })
6153            .collect();
6154
6155        // Wait for all to complete and collect the results
6156        let results = futures::future::join_all(handles).await;
6157        assert!(results.iter().all(|r| r.is_ok()), "Some writes failed");
6158
6159        // Extract the document IDs
6160        let doc_ids: Vec<(i64, String)> = results.into_iter().map(|r| r.unwrap()).collect();
6161
6162        // Find the document ID for the one with counter=25
6163        let target_doc_id = doc_ids
6164            .iter()
6165            .find(|(counter, _)| *counter == 25i64)
6166            .map(|(_, doc_id)| doc_id)
6167            .unwrap();
6168
6169        // 4. Verify Data Integrity
6170        let doc = db.get_document("users", target_doc_id).unwrap().unwrap();
6171        assert_eq!(doc.data.get("counter"), Some(&Value::Int(25)));
6172    }
6173
6174    #[tokio::test]
6175    async fn test_basic_operations() -> Result<()> {
6176        let temp_dir = tempdir()?;
6177        let db_path = temp_dir.path().join("test.aurora");
6178        let db = Aurora::open(db_path.to_str().unwrap()).await?;
6179
6180        // Test collection creation
6181        db.new_collection(
6182            "users",
6183            vec![
6184                (
6185                    "name",
6186                    FieldType::Scalar(crate::types::ScalarType::String),
6187                    false,
6188                ),
6189                (
6190                    "age",
6191                    FieldType::Scalar(crate::types::ScalarType::Int),
6192                    false,
6193                ),
6194                (
6195                    "email",
6196                    FieldType::Scalar(crate::types::ScalarType::String),
6197                    true,
6198                ),
6199            ],
6200        )
6201        .await?;
6202
6203        // Test document insertion
6204        let doc_id = db
6205            .insert_into(
6206                "users",
6207                vec![
6208                    ("name", Value::String("John Doe".to_string())),
6209                    ("age", Value::Int(30)),
6210                    ("email", Value::String("john@example.com".to_string())),
6211                ],
6212            )
6213            .await?;
6214
6215        // Test document retrieval
6216        let doc = db.get_document("users", &doc_id)?.unwrap();
6217        assert_eq!(
6218            doc.data.get("name").unwrap(),
6219            &Value::String("John Doe".to_string())
6220        );
6221        assert_eq!(doc.data.get("age").unwrap(), &Value::Int(30));
6222
6223        Ok(())
6224    }
6225
6226    #[tokio::test]
6227    async fn test_transactions() -> Result<()> {
6228        let temp_dir = tempdir()?;
6229        let db_path = temp_dir.path().join("test.aurora");
6230        let db = Aurora::open(db_path.to_str().unwrap()).await?;
6231
6232        // Create collection
6233        db.new_collection(
6234            "test",
6235            vec![(
6236                "field",
6237                FieldType::Scalar(crate::types::ScalarType::String),
6238                false,
6239            )],
6240        )
6241        .await?;
6242
6243        // Start transaction
6244        let tx_id = db.begin_transaction().await;
6245
6246        // Insert document
6247        let doc_id = db
6248            .insert_into("test", vec![("field", Value::String("value".to_string()))])
6249            .await?;
6250
6251        // Commit transaction
6252        db.commit_transaction(tx_id).await?;
6253
6254        // Verify document exists
6255        let doc = db.get_document("test", &doc_id)?.unwrap();
6256        assert_eq!(
6257            doc.data.get("field").unwrap(),
6258            &Value::String("value".to_string())
6259        );
6260
6261        Ok(())
6262    }
6263
6264    #[tokio::test]
6265    async fn test_query_operations() -> Result<()> {
6266        let temp_dir = tempdir()?;
6267        let db_path = temp_dir.path().join("test.aurora");
6268        let db = Aurora::open(db_path.to_str().unwrap()).await?;
6269
6270        // Test collection creation
6271        db.new_collection(
6272            "books",
6273            vec![
6274                (
6275                    "title",
6276                    FieldType::Scalar(crate::types::ScalarType::String),
6277                    false,
6278                ),
6279                (
6280                    "author",
6281                    FieldType::Scalar(crate::types::ScalarType::String),
6282                    false,
6283                ),
6284                (
6285                    "year",
6286                    FieldType::Scalar(crate::types::ScalarType::Int),
6287                    false,
6288                ),
6289            ],
6290        )
6291        .await?;
6292
6293        // Test document insertion
6294        db.insert_into(
6295            "books",
6296            vec![
6297                ("title", Value::String("Book 1".to_string())),
6298                ("author", Value::String("Author 1".to_string())),
6299                ("year", Value::Int(2020)),
6300            ],
6301        )
6302        .await?;
6303
6304        db.insert_into(
6305            "books",
6306            vec![
6307                ("title", Value::String("Book 2".to_string())),
6308                ("author", Value::String("Author 2".to_string())),
6309                ("year", Value::Int(2021)),
6310            ],
6311        )
6312        .await?;
6313
6314        // Test query
6315        let results = db
6316            .query("books")
6317            .filter(|f| f.gt("year", Value::Int(2019)))
6318            .order_by("year", true)
6319            .collect()
6320            .await?;
6321
6322        assert_eq!(results.len(), 2);
6323        assert!(results[0].data.get("year").unwrap() < results[1].data.get("year").unwrap());
6324
6325        Ok(())
6326    }
6327
6328    #[tokio::test]
6329    async fn test_blob_operations() -> Result<()> {
6330        let temp_dir = tempdir()?;
6331        let db_path = temp_dir.path().join("test.aurora");
6332        let db = Aurora::open(db_path.to_str().unwrap()).await?;
6333
6334        // Create test file
6335        let file_path = temp_dir.path().join("test.txt");
6336        std::fs::write(&file_path, b"Hello, World!")?;
6337
6338        // Test blob storage
6339        db.put_blob("test:blob".to_string(), &file_path).await?;
6340
6341        // Verify blob exists
6342        let data = db.get_data_by_pattern("test:blob")?;
6343        assert_eq!(data.len(), 1);
6344        match &data[0].1 {
6345            DataInfo::Blob { size } => assert_eq!(*size, 13 + 5), // content + "BLOB:" prefix
6346            _ => panic!("Expected Blob type"),
6347        }
6348
6349        Ok(())
6350    }
6351
6352    #[tokio::test]
6353    async fn test_blob_size_limit() -> Result<()> {
6354        let temp_dir = tempdir()?;
6355        let db_path = temp_dir.path().join("test.aurora");
6356        let db = Aurora::open(db_path.to_str().unwrap()).await?;
6357
6358        // Create a test file that's too large (201MB)
6359        let large_file_path = temp_dir.path().join("large_file.bin");
6360        let large_data = vec![0u8; 201 * 1024 * 1024];
6361        std::fs::write(&large_file_path, &large_data)?;
6362
6363        // Attempt to store the large file
6364        let result = db
6365            .put_blob("test:large_blob".to_string(), &large_file_path)
6366            .await;
6367
6368        assert!(result.is_err());
6369        let err = result.unwrap_err();
6370        assert!(err.code == ErrorCode::InvalidOperation); // Expected error
6371
6372        Ok(())
6373    }
6374
6375    #[tokio::test]
6376    async fn test_unique_constraints() -> Result<()> {
6377        let temp_dir = tempdir()?;
6378        let db_path = temp_dir.path().join("test.aurora");
6379        let db = Aurora::open(db_path.to_str().unwrap()).await?;
6380
6381        // Create collection with unique email field
6382        db.new_collection(
6383            "users",
6384            vec![
6385                (
6386                    "name",
6387                    FieldType::Scalar(crate::types::ScalarType::String),
6388                    false,
6389                ),
6390                (
6391                    "email",
6392                    FieldType::Scalar(crate::types::ScalarType::String),
6393                    true,
6394                ), // unique field
6395                (
6396                    "age",
6397                    FieldType::Scalar(crate::types::ScalarType::Int),
6398                    false,
6399                ),
6400            ],
6401        )
6402        .await?;
6403
6404        // Insert first document
6405        let _doc_id1 = db
6406            .insert_into(
6407                "users",
6408                vec![
6409                    ("name", Value::String("John Doe".to_string())),
6410                    ("email", Value::String("john@example.com".to_string())),
6411                    ("age", Value::Int(30)),
6412                ],
6413            )
6414            .await?;
6415
6416        // Try to insert second document with same email - should fail
6417        let result = db
6418            .insert_into(
6419                "users",
6420                vec![
6421                    ("name", Value::String("Jane Doe".to_string())),
6422                    ("email", Value::String("john@example.com".to_string())), // duplicate email
6423                    ("age", Value::Int(25)),
6424                ],
6425            )
6426            .await;
6427
6428        assert!(result.is_err());
6429        if let Err(e) = result {
6430            if e.code == ErrorCode::UniqueConstraintViolation {
6431                let msg = e.message;
6432                assert!(msg.contains("email"));
6433                assert!(msg.contains("john@example.com")); // Changed from test@example.com to match the actual value
6434            } else {
6435                panic!("Expected UniqueConstraintViolation, got {:?}", e);
6436            }
6437        } else {
6438            panic!("Expected UniqueConstraintViolation");
6439        }
6440
6441        // Test upsert with unique constraint
6442        // Should succeed for new document
6443        let _doc_id2 = db
6444            .upsert(
6445                "users",
6446                "user2",
6447                vec![
6448                    ("name", Value::String("Alice Smith".to_string())),
6449                    ("email", Value::String("alice@example.com".to_string())),
6450                    ("age", Value::Int(28)),
6451                ],
6452            )
6453            .await?;
6454
6455        // Should fail when trying to upsert with duplicate email
6456        let result = db
6457            .upsert(
6458                "users",
6459                "user3",
6460                vec![
6461                    ("name", Value::String("Bob Wilson".to_string())),
6462                    ("email", Value::String("alice@example.com".to_string())), // duplicate
6463                    ("age", Value::Int(35)),
6464                ],
6465            )
6466            .await;
6467
6468        assert!(result.is_err());
6469
6470        // Should succeed when updating existing document with same email (no change)
6471        let result = db
6472            .upsert(
6473                "users",
6474                "user2",
6475                vec![
6476                    ("name", Value::String("Alice Updated".to_string())),
6477                    ("email", Value::String("alice@example.com".to_string())), // same email, same doc
6478                    ("age", Value::Int(29)),
6479                ],
6480            )
6481            .await;
6482
6483        assert!(result.is_ok());
6484
6485        Ok(())
6486    }
6487
6488    #[tokio::test]
6489    async fn test_nullable_field_definition() -> Result<()> {
6490        use tempfile::TempDir;
6491
6492        let temp_dir = TempDir::new()?;
6493        let db_path = temp_dir.path().join("test.aurora");
6494        let db = Aurora::open(db_path.to_str().unwrap()).await?;
6495
6496        // Test 3-tuple format (defaults nullable to false)
6497        db.new_collection(
6498            "products",
6499            vec![
6500                (
6501                    "name",
6502                    FieldType::Scalar(crate::types::ScalarType::String),
6503                    false,
6504                ),
6505                (
6506                    "sku",
6507                    FieldType::Scalar(crate::types::ScalarType::String),
6508                    true,
6509                ), // unique
6510            ],
6511        )
6512        .await?;
6513
6514        // Test 4-tuple format with explicit nullable control
6515        db.new_collection(
6516            "users",
6517            vec![
6518                (
6519                    "name",
6520                    FieldType::Scalar(crate::types::ScalarType::String),
6521                    false,
6522                    false,
6523                ), // not nullable
6524                (
6525                    "email",
6526                    FieldType::Scalar(crate::types::ScalarType::String),
6527                    true,
6528                    false,
6529                ), // unique, not nullable
6530                (
6531                    "bio",
6532                    FieldType::Scalar(crate::types::ScalarType::String),
6533                    false,
6534                    true,
6535                ), // nullable
6536                (
6537                    "age",
6538                    FieldType::Scalar(crate::types::ScalarType::Int),
6539                    false,
6540                    true,
6541                ), // nullable
6542            ],
6543        )
6544        .await?;
6545
6546        // Verify the schema was created correctly
6547        let schema = db.ensure_schema_hot("users")?;
6548
6549        // Check that name is not nullable
6550        assert_eq!(schema.fields.get("name").unwrap().nullable, false);
6551
6552        // Check that email is not nullable
6553        assert_eq!(schema.fields.get("email").unwrap().nullable, false);
6554
6555        // Check that bio is nullable
6556        assert_eq!(schema.fields.get("bio").unwrap().nullable, true);
6557
6558        // Check that age is nullable
6559        assert_eq!(schema.fields.get("age").unwrap().nullable, true);
6560
6561        // Verify products collection (3-tuple defaults to false)
6562        let products_schema = db.ensure_schema_hot("products")?;
6563        assert_eq!(products_schema.fields.get("name").unwrap().nullable, false);
6564        assert_eq!(products_schema.fields.get("sku").unwrap().nullable, false);
6565
6566        Ok(())
6567    }
6568
6569    #[tokio::test]
6570    async fn test_upsert_unique_field_after_restart() {
6571        // Simulates the post-restart scenario:
6572        // _sid_dictionary is empty but secondary_indices have the bitmap entries.
6573        let dir = tempfile::tempdir().unwrap();
6574        let db_path = dir.path().to_path_buf();
6575
6576        // Session 1: insert a fact
6577        let mut document_id = String::new();
6578        {
6579            let db = Aurora::with_config(AuroraConfig {
6580                db_path: db_path.clone(),
6581                enable_wal: false,
6582                ..Default::default()
6583            })
6584            .await
6585            .unwrap();
6586
6587            db.new_collection(
6588                "facts",
6589                vec![
6590                    (
6591                        "key",
6592                        FieldDefinition {
6593                            field_type: FieldType::SCALAR_STRING,
6594                            unique: true,
6595                            indexed: true,
6596                            nullable: false,
6597                            validations: vec![],
6598                            relation: None,
6599                        },
6600                    ),
6601                    (
6602                        "value_json",
6603                        FieldDefinition {
6604                            field_type: FieldType::SCALAR_STRING,
6605                            unique: false,
6606                            indexed: false,
6607                            nullable: false,
6608                            validations: vec![],
6609                            relation: None,
6610                        },
6611                    ),
6612                ],
6613            )
6614            .await
6615            .unwrap();
6616
6617            let aql = r#"
6618                mutation {
6619                    insertInto(collection: "facts", data: {
6620                        key: "last_session_at",
6621                        value_json: "\"2026-04-05T01:00:00Z\""
6622                    }) { id }
6623                }
6624            "#
6625            .to_string();
6626            let result = db.execute(aql).await.unwrap();
6627
6628            // Extract ID from result (QueryResult format)
6629            if let crate::parser::executor::ExecutionResult::Mutation(mutres) = result {
6630                document_id = mutres.returned_documents[0]._sid.clone();
6631            }
6632
6633            // Explicitly flush to ensure secondary_indices are written to disk
6634            db.flush().unwrap();
6635        }
6636        // db dropped here — simulates process restart
6637        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
6638
6639        // Session 2: new Aurora instance, _sid_dictionary starts empty
6640        {
6641            let db = Aurora::with_config(AuroraConfig {
6642                db_path: db_path.clone(),
6643                enable_wal: false,
6644                ..Default::default()
6645            })
6646            .await
6647            .unwrap();
6648
6649            // Update the same key with a new value directly by ID — must NOT error
6650            let aql = format!(
6651                r#"
6652                mutation {{
6653                    update(
6654                        collection: "facts",
6655                        id: "{}",
6656                        data: {{ key: "last_session_at", value_json: "\"2026-04-05T03:00:00Z\"" }}
6657                    ) {{ id }}
6658                }}
6659            "#,
6660                document_id
6661            );
6662
6663            db.execute(aql)
6664                .await
6665                .expect("update of existing unique key after restart should not fail");
6666        }
6667    }
6668}