aurora_db/
db.rs

1//! # Aurora Database
2//!
3//! Aurora is an embedded document database with tiered storage architecture.
4//! It provides document storage, querying, indexing, and search capabilities
5//! while optimizing for both performance and durability.
6//!
7//! ## Key Features
8//!
9//! * **Tiered Storage**: Hot in-memory cache + persistent cold storage
10//! * **Document Model**: Schema-flexible JSON-like document storage
11//! * **Querying**: Rich query capabilities with filtering and sorting
12//! * **Full-text Search**: Built-in search engine with relevance ranking
13//! * **Transactions**: ACID-compliant transaction support
14//! * **Blob Storage**: Efficient storage for large binary objects
15//!
16//! ## Usage Example
17//!
18//! ```rust
19//! use aurora::Aurora;
20//!
21//! // Open a database
22//! let db = Aurora::open("my_database.db")?;
23//!
24//! // Create a collection with schema
25//! db.new_collection("users", vec![
26//!     ("name", FieldType::String, false),
27//!     ("email", FieldType::String, true),  // unique field
28//!     ("age", FieldType::Int, false),
29//! ])?;
30//!
31//! // Insert a document
32//! let user_id = db.insert_into("users", vec![
33//!     ("name", Value::String("Jane Doe".to_string())),
34//!     ("email", Value::String("jane@example.com".to_string())),
35//!     ("age", Value::Int(28)),
36//! ])?;
37//!
38//! // Query for documents
39//! let adult_users = db.query("users")
40//!     .filter(|f| f.gt("age", 18))
41//!     .order_by("name", true)
42//!     .collect()
43//!     .await?;
44//! ```
45
46use crate::error::{AuroraError, Result};
47use crate::index::{Index, IndexDefinition, IndexType};
48use crate::network::http_models::{
49    document_to_json, json_to_value, Filter as HttpFilter, FilterOperator, QueryPayload,
50};
51use crate::query::{Filter, FilterBuilder, QueryBuilder, SearchBuilder, SimpleQueryBuilder};
52use crate::storage::{ColdStore, HotStore};
53use crate::types::{
54    AuroraConfig, Collection, Document, FieldDefinition, FieldType, InsertData, Value,
55};
56use dashmap::DashMap;
57use serde_json::from_str;
58use serde_json::Value as JsonValue;
59use std::collections::HashMap;
60use std::fs::File as StdFile;
61use std::path::{Path, PathBuf};
62use std::sync::Arc;
63use std::time::Duration;
64use tokio::fs::read_to_string;
65use tokio::fs::File;
66use tokio::io::AsyncReadExt;
67use tokio::sync::OnceCell;
68use uuid::Uuid;
69// Index types for faster lookups
70type PrimaryIndex = DashMap<String, Vec<u8>>;
71type SecondaryIndex = DashMap<String, Vec<String>>;
72
73// Move DataInfo enum outside impl block
74#[derive(Debug)]
75pub enum DataInfo {
76    Data { size: usize, preview: String },
77    Blob { size: usize },
78    Compressed { size: usize },
79}
80
81impl DataInfo {
82    pub fn size(&self) -> usize {
83        match self {
84            DataInfo::Data { size, .. } => *size,
85            DataInfo::Blob { size } => *size,
86            DataInfo::Compressed { size } => *size,
87        }
88    }
89}
90
91/// The main database engine
92///
93/// Aurora combines a tiered storage architecture with document-oriented database features:
94/// - Hot tier: In-memory cache for frequently accessed data
95/// - Cold tier: Persistent disk storage for durability
96/// - Primary indices: Fast key-based access
97/// - Secondary indices: Fast field-based queries
98///
99/// # Examples
100///
101/// ```
102/// // Open a database (creates if doesn't exist)
103/// let db = Aurora::open("my_app.db")?;
104///
105/// // Insert a document
106/// let doc_id = db.insert_into("users", vec![
107///     ("name", Value::String("Alice".to_string())),
108///     ("age", Value::Int(32)),
109/// ])?;
110///
111/// // Retrieve a document
112/// let user = db.get_document("users", &doc_id)?;
113/// ```
114pub struct Aurora {
115    hot: HotStore,
116    cold: ColdStore,
117    // Indexing
118    primary_indices: Arc<DashMap<String, PrimaryIndex>>,
119    secondary_indices: Arc<DashMap<String, SecondaryIndex>>,
120    indices_initialized: Arc<OnceCell<()>>,
121    in_transaction: std::sync::atomic::AtomicBool,
122    transaction_ops: DashMap<String, Vec<u8>>,
123    indices: Arc<DashMap<String, Index>>,
124}
125
126impl Aurora {
127    /// Open or create a database at the specified location
128    ///
129    /// # Arguments
130    /// * `path` - Path to the database file or directory
131    ///   - Absolute paths (like `/data/myapp.db`) are used as-is
132    ///   - Relative paths (like `./data/myapp.db`) are resolved relative to the current directory
133    ///   - Simple names (like `myapp.db`) use the current directory
134    ///
135    /// # Returns
136    /// An initialized `Aurora` database instance
137    ///
138    /// # Examples
139    ///
140    /// ```
141    /// // Use a specific location
142    /// let db = Aurora::open("./data/my_application.db")?;
143    ///
144    /// // Just use a name (creates in current directory)
145    /// let db = Aurora::open("customer_data.db")?;
146    /// ```
147    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
148        let path = Self::resolve_path(path)?;
149
150        // Create parent directory if needed
151        if let Some(parent) = path.parent() {
152            if !parent.exists() {
153                std::fs::create_dir_all(parent)?;
154            }
155        }
156
157        // Initialize hot and cold stores with the path
158        let cold = ColdStore::new(path.to_str().unwrap())?;
159        let hot = HotStore::new();
160
161        // Initialize the rest of Aurora...
162        let db = Self {
163            hot,
164            cold,
165            primary_indices: Arc::new(DashMap::new()),
166            secondary_indices: Arc::new(DashMap::new()),
167            indices_initialized: Arc::new(OnceCell::new()),
168            in_transaction: std::sync::atomic::AtomicBool::new(false),
169            transaction_ops: DashMap::new(),
170            indices: Arc::new(DashMap::new()),
171        };
172
173        // Load or initialize indices...
174        // Rest of your existing open() code...
175
176        Ok(db)
177    }
178
179    /// Helper method to resolve database path
180    fn resolve_path<P: AsRef<Path>>(path: P) -> Result<PathBuf> {
181        let path = path.as_ref();
182
183        // If it's an absolute path, use it directly
184        if path.is_absolute() {
185            return Ok(path.to_path_buf());
186        }
187
188        // Otherwise, resolve relative to current directory
189        match std::env::current_dir() {
190            Ok(current_dir) => Ok(current_dir.join(path)),
191            Err(e) => Err(AuroraError::IoError(format!(
192                "Failed to resolve current directory: {}",
193                e
194            ))),
195        }
196    }
197
198    /// Open a database with custom configuration
199    pub fn with_config(config: AuroraConfig) -> Result<Self> {
200        let path = Self::resolve_path(&config.db_path)?;
201
202        if config.create_dirs {
203            if let Some(parent) = path.parent() {
204                if !parent.exists() {
205                    std::fs::create_dir_all(parent)?;
206                }
207            }
208        }
209
210        // Fix method calls to pass all required parameters
211        let cold = ColdStore::with_config(
212            path.to_str().unwrap(),
213            config.cold_cache_capacity_mb,
214            config.cold_flush_interval_ms,
215            config.cold_mode,
216        )?;
217
218        let hot = HotStore::with_config(
219            config.hot_cache_size_mb,
220            config.hot_cache_cleanup_interval_secs,
221        );
222
223        // Initialize the rest using the config...
224        let db = Self {
225            hot,
226            cold,
227            primary_indices: Arc::new(DashMap::new()),
228            secondary_indices: Arc::new(DashMap::new()),
229            indices_initialized: Arc::new(OnceCell::new()),
230            in_transaction: std::sync::atomic::AtomicBool::new(false),
231            transaction_ops: DashMap::new(),
232            indices: Arc::new(DashMap::new()),
233        };
234
235        // Set up auto-compaction if enabled
236        if config.auto_compact {
237            // Implementation for auto-compaction scheduling
238            // ...
239        }
240
241        Ok(db)
242    }
243
244    // Lazy index initialization
245    async fn ensure_indices_initialized(&self) -> Result<()> {
246        self.indices_initialized
247            .get_or_init(|| async {
248                println!("Initializing indices...");
249                if let Err(e) = self.initialize_indices() {
250                    eprintln!("Failed to initialize indices: {:?}", e);
251                }
252                println!("Indices initialized");
253                ()
254            })
255            .await;
256        Ok(())
257    }
258
259    fn initialize_indices(&self) -> Result<()> {
260        // Scan existing data and build indices
261        for result in self.cold.scan() {
262            let (key, value) = result?;
263            let key_str = std::str::from_utf8(&key.as_bytes())
264                .map_err(|_| AuroraError::InvalidKey("Invalid UTF-8".into()))?;
265
266            if let Some(collection_name) = key_str.split(':').next() {
267                self.index_value(collection_name, key_str, &value)?;
268            }
269        }
270        Ok(())
271    }
272
273    // Fast key-value operations with index support
274    pub fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
275        // Check hot cache first
276        if let Some(value) = self.hot.get(key) {
277            return Ok(Some(value));
278        }
279
280        // Check primary index
281        if let Some(collection) = key.split(':').next() {
282            if let Some(index) = self.primary_indices.get(collection) {
283                if let Some(value) = index.get(key) {
284                    // Promote to hot cache
285                    self.hot.set(key.to_string(), value.clone(), None);
286                    return Ok(Some(value.clone()));
287                }
288            }
289        }
290
291        // Fallback to cold storage
292        let value = self.cold.get(key)?;
293        if let Some(v) = &value {
294            self.hot.set(key.to_string(), v.clone(), None);
295        }
296        Ok(value)
297    }
298
299    pub fn put(&self, key: String, value: Vec<u8>, ttl: Option<Duration>) -> Result<()> {
300        const MAX_BLOB_SIZE: usize = 50 * 1024 * 1024; // 50MB limit
301
302        if value.len() > MAX_BLOB_SIZE {
303            return Err(AuroraError::InvalidOperation(format!(
304                "Blob size {} exceeds maximum allowed size of {}MB",
305                value.len() / (1024 * 1024),
306                MAX_BLOB_SIZE / (1024 * 1024)
307            )));
308        }
309
310        // Track transaction if needed
311        if self
312            .in_transaction
313            .load(std::sync::atomic::Ordering::SeqCst)
314        {
315            self.transaction_ops.insert(key.clone(), value.clone());
316            return Ok(());
317        }
318
319        // Write directly to storage (sled handles WAL internally)
320        self.cold.set(key.clone(), value.clone())?;
321        self.hot.set(key.clone(), value.clone(), ttl);
322
323        // FIX: Update the in-memory indices immediately after a successful write.
324        if let Some(collection_name) = key.split(':').next() {
325            // Don't index internal data like _collection or _index definitions
326            if !collection_name.starts_with('_') {
327                self.index_value(collection_name, &key, &value)?;
328            }
329        }
330
331        Ok(())
332    }
333
334    fn index_value(&self, collection: &str, key: &str, value: &[u8]) -> Result<()> {
335        // Update primary index
336        self.primary_indices
337            .entry(collection.to_string())
338            .or_insert_with(DashMap::new)
339            .insert(key.to_string(), value.to_vec());
340
341        // Update secondary indices if it's a JSON document
342        if let Ok(doc) = serde_json::from_slice::<Document>(value) {
343            for (field, value) in doc.data {
344                self.secondary_indices
345                    .entry(format!("{}:{}", collection, field))
346                    .or_insert_with(DashMap::new)
347                    .entry(value.to_string())
348                    .or_insert_with(Vec::new)
349                    .push(key.to_string());
350            }
351        }
352        Ok(())
353    }
354
355    // Simplified collection scan (fallback)
356    fn scan_collection(&self, collection: &str) -> Result<Vec<Document>> {
357        let _prefix = format!("{}:", collection);
358        let mut documents = Vec::new();
359
360        if let Some(index) = self.primary_indices.get(collection) {
361            for entry in index.iter() {
362                if let Ok(doc) = serde_json::from_slice(entry.value()) {
363                    documents.push(doc);
364                }
365            }
366        }
367
368        Ok(documents)
369    }
370
371    // Restore missing methods
372    pub async fn put_blob(&self, key: String, file_path: &Path) -> Result<()> {
373        const MAX_FILE_SIZE: usize = 50 * 1024 * 1024; // 50MB limit
374
375        // Get file metadata to check size before reading
376        let metadata = tokio::fs::metadata(file_path).await?;
377        let file_size = metadata.len() as usize;
378
379        if file_size > MAX_FILE_SIZE {
380            return Err(AuroraError::InvalidOperation(format!(
381                "File size {} MB exceeds maximum allowed size of {} MB",
382                file_size / (1024 * 1024),
383                MAX_FILE_SIZE / (1024 * 1024)
384            )));
385        }
386
387        let mut file = File::open(file_path).await?;
388        let mut buffer = Vec::new();
389        file.read_to_end(&mut buffer).await?;
390
391        self.put(key, buffer, None)
392    }
393
394    /// Create a new collection with the given schema
395    ///
396    /// # Arguments
397    /// * `name` - Name of the collection to create
398    /// * `fields` - Schema definition as a list of field definitions:
399    ///   * Field name
400    ///   * Field type (String, Int, Float, Boolean, etc.)
401    ///   * Whether the field requires a unique value
402    ///
403    /// # Returns
404    /// Success or an error (e.g., collection already exists)
405    ///
406    /// # Examples
407    ///
408    /// ```
409    /// // Define a collection with schema
410    /// db.new_collection("products", vec![
411    ///     ("name", FieldType::String, false),
412    ///     ("price", FieldType::Float, false),
413    ///     ("sku", FieldType::String, true),  // unique field
414    ///     ("description", FieldType::String, false),
415    ///     ("in_stock", FieldType::Boolean, false),
416    /// ])?;
417    /// ```
418    pub fn new_collection(&self, name: &str, fields: Vec<(String, FieldType, bool)>) -> Result<()> {
419        let collection = Collection {
420            name: name.to_string(),
421            fields: fields
422                .into_iter()
423                .map(|(name, field_type, unique)| {
424                    (
425                        name,
426                        FieldDefinition {
427                            field_type,
428                            unique,
429                            indexed: unique,
430                        },
431                    )
432                })
433                .collect(),
434            unique_fields: Vec::new(),
435        };
436
437        self.put(
438            format!("_collection:{}", name),
439            serde_json::to_vec(&collection)?,
440            None,
441        )
442    }
443
444    /// Insert a document into a collection
445    ///
446    /// # Arguments
447    /// * `collection` - Name of the collection to insert into
448    /// * `data` - Document fields and values to insert
449    ///
450    /// # Returns
451    /// The ID of the inserted document or an error
452    ///
453    /// # Examples
454    ///
455    /// ```
456    /// // Insert a document
457    /// let doc_id = db.insert_into("users", vec![
458    ///     ("name", Value::String("John Doe".to_string())),
459    ///     ("email", Value::String("john@example.com".to_string())),
460    ///     ("active", Value::Bool(true)),
461    /// ])?;
462    /// ```
463    pub fn insert_into(&self, collection: &str, data: InsertData) -> Result<String> {
464        let data_map: HashMap<String, Value> =
465            data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
466
467        let doc_id = Uuid::new_v4().to_string();
468        let document = Document {
469            id: doc_id.clone(),
470            data: data_map,
471        };
472
473        self.put(
474            format!("{}:{}", collection, doc_id),
475            serde_json::to_vec(&document)?,
476            None,
477        )?;
478
479        Ok(doc_id)
480    }
481
482    pub fn insert_map(&self, collection: &str, data: HashMap<String, Value>) -> Result<String> {
483        let doc_id = Uuid::new_v4().to_string();
484        let document = Document {
485            id: doc_id.clone(),
486            data,
487        };
488
489        self.put(
490            format!("{}:{}", collection, doc_id),
491            serde_json::to_vec(&document)?,
492            None,
493        )?;
494
495        Ok(doc_id)
496    }
497
498    pub async fn get_all_collection(&self, collection: &str) -> Result<Vec<Document>> {
499        self.ensure_indices_initialized().await?;
500        self.scan_collection(collection)
501    }
502
503    pub fn get_data_by_pattern(&self, pattern: &str) -> Result<Vec<(String, DataInfo)>> {
504        let mut data = Vec::new();
505
506        if let Some(index) = self
507            .primary_indices
508            .get(pattern.split(':').next().unwrap_or(""))
509        {
510            for entry in index.iter() {
511                if entry.key().contains(pattern) {
512                    let value = entry.value();
513                    let info = if value.starts_with(b"BLOB:") {
514                        DataInfo::Blob { size: value.len() }
515                    } else {
516                        DataInfo::Data {
517                            size: value.len(),
518                            preview: String::from_utf8_lossy(&value[..value.len().min(50)])
519                                .into_owned(),
520                        }
521                    };
522
523                    data.push((entry.key().clone(), info));
524                }
525            }
526        }
527
528        Ok(data)
529    }
530
531    /// Begin a transaction
532    ///
533    /// All operations after beginning a transaction will be part of the transaction
534    /// until either commit_transaction() or rollback_transaction() is called.
535    ///
536    /// # Returns
537    /// Success or an error (e.g., if a transaction is already in progress)
538    ///
539    /// # Examples
540    ///
541    /// ```
542    /// // Start a transaction for atomic operations
543    /// db.begin_transaction()?;
544    ///
545    /// // Perform multiple operations
546    /// db.insert_into("accounts", vec![("user_id", Value::String(user_id)), ("balance", Value::Float(100.0))])?;
547    /// db.insert_into("audit_log", vec![("action", Value::String("account_created".to_string()))])?;
548    ///
549    /// // Commit all changes or roll back if there's an error
550    /// if all_ok {
551    ///     db.commit_transaction()?;
552    /// } else {
553    ///     db.rollback_transaction()?;
554    /// }
555    /// ```
556    pub fn begin_transaction(&self) -> Result<()> {
557        if self
558            .in_transaction
559            .load(std::sync::atomic::Ordering::SeqCst)
560        {
561            return Err(AuroraError::InvalidOperation(
562                "Transaction already in progress".into(),
563            ));
564        }
565
566        // Mark as in transaction
567        self.in_transaction
568            .store(true, std::sync::atomic::Ordering::SeqCst);
569        Ok(())
570    }
571
572    /// Commit the current transaction
573    ///
574    /// Makes all changes in the current transaction permanent.
575    ///
576    /// # Returns
577    /// Success or an error (e.g., if no transaction is active)
578    pub fn commit_transaction(&self) -> Result<()> {
579        if !self
580            .in_transaction
581            .load(std::sync::atomic::Ordering::SeqCst)
582        {
583            return Err(AuroraError::InvalidOperation(
584                "No transaction in progress".into(),
585            ));
586        }
587
588        // Apply all pending transaction operations
589        for item in self.transaction_ops.iter() {
590            self.cold.set(item.key().clone(), item.value().clone())?;
591            self.hot.set(item.key().clone(), item.value().clone(), None);
592        }
593
594        // Clear transaction data
595        self.transaction_ops.clear();
596        self.in_transaction
597            .store(false, std::sync::atomic::Ordering::SeqCst);
598
599        // Ensure durability by forcing a sync
600        self.cold.compact()?;
601
602        Ok(())
603    }
604
605    /// Roll back the current transaction
606    ///
607    /// Discards all changes made in the current transaction.
608    ///
609    /// # Returns
610    /// Success or an error (e.g., if no transaction is active)
611    pub fn rollback_transaction(&self) -> Result<()> {
612        if !self
613            .in_transaction
614            .load(std::sync::atomic::Ordering::SeqCst)
615        {
616            return Err(AuroraError::InvalidOperation(
617                "No transaction in progress".into(),
618            ));
619        }
620
621        // Simply discard the transaction operations
622        self.transaction_ops.clear();
623        self.in_transaction
624            .store(false, std::sync::atomic::Ordering::SeqCst);
625
626        Ok(())
627    }
628
629    pub async fn create_index(&self, collection: &str, field: &str) -> Result<()> {
630        // Check if collection exists
631        if self.get(&format!("_collection:{}", collection))?.is_none() {
632            return Err(AuroraError::CollectionNotFound(collection.to_string()));
633        }
634
635        // Generate a default index name
636        let index_name = format!("idx_{}_{}", collection, field);
637
638        // Create index definition
639        let definition = IndexDefinition {
640            name: index_name.clone(),
641            collection: collection.to_string(),
642            fields: vec![field.to_string()],
643            index_type: IndexType::BTree,
644            unique: false,
645        };
646
647        // Create the index
648        let index = Index::new(definition.clone());
649
650        // Index all existing documents in the collection
651        let prefix = format!("{}:", collection);
652        for result in self.cold.scan_prefix(&prefix) {
653            if let Ok((_, data)) = result {
654                if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
655                    let _ = index.insert(&doc);
656                }
657            }
658        }
659
660        // Store the index
661        self.indices.insert(index_name, index);
662
663        // Store the index definition for persistence
664        let index_key = format!("_index:{}:{}", collection, field);
665        self.put(index_key, serde_json::to_vec(&definition)?, None)?;
666
667        Ok(())
668    }
669
670    /// Create a query builder for advanced document queries
671    ///
672    /// # Arguments
673    /// * `collection` - Name of the collection to query
674    ///
675    /// # Returns
676    /// A `QueryBuilder` for constructing and executing queries
677    ///
678    /// # Examples
679    ///
680    /// ```
681    /// // Query for documents matching criteria
682    /// let active_premium_users = db.query("users")
683    ///     .filter(|f| f.eq("status", "active") && f.eq("plan", "premium"))
684    ///     .order_by("joined_date", false)  // newest first
685    ///     .limit(10)
686    ///     .collect()
687    ///     .await?;
688    /// ```
689    pub fn query<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
690        QueryBuilder::new(self, collection)
691    }
692
693    /// Create a search builder for full-text search
694    ///
695    /// # Arguments
696    /// * `collection` - Name of the collection to search
697    ///
698    /// # Returns
699    /// A `SearchBuilder` for configuring and executing searches
700    ///
701    /// # Examples
702    ///
703    /// ```
704    /// // Search for documents containing text
705    /// let search_results = db.search("articles")
706    ///     .field("content")
707    ///     .matching("quantum computing")
708    ///     .fuzzy(true)  // Enable fuzzy matching for typo tolerance
709    ///     .collect()
710    ///     .await?;
711    /// ```
712    pub fn search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
713        SearchBuilder::new(self, collection)
714    }
715
716    /// Retrieve a document by ID
717    ///
718    /// # Arguments
719    /// * `collection` - Name of the collection to query
720    /// * `id` - ID of the document to retrieve
721    ///
722    /// # Returns
723    /// The document if found, None if not found, or an error
724    ///
725    /// # Examples
726    ///
727    /// ```
728    /// // Get a document by ID
729    /// if let Some(user) = db.get_document("users", &user_id)? {
730    ///     println!("Found user: {}", user.data.get("name").unwrap());
731    /// } else {
732    ///     println!("User not found");
733    /// }
734    /// ```
735    pub fn get_document(&self, collection: &str, id: &str) -> Result<Option<Document>> {
736        let key = format!("{}:{}", collection, id);
737        if let Some(data) = self.get(&key)? {
738            Ok(Some(serde_json::from_slice(&data)?))
739        } else {
740            Ok(None)
741        }
742    }
743
744    /// Delete a document by ID
745    ///
746    /// # Arguments
747    /// * `collection` - Name of the collection containing the document
748    /// * `id` - ID of the document to delete
749    ///
750    /// # Returns
751    /// Success or an error
752    ///
753    /// # Examples
754    ///
755    /// ```
756    /// // Delete a specific document
757    /// db.delete("users", &user_id)?;
758    /// ```
759    pub async fn delete(&self, key: &str) -> Result<()> {
760        // Delete in all levels
761        if self.hot.get(key).is_some() {
762            self.hot.delete(key);
763        }
764
765        self.cold.delete(key)?;
766
767        // Update indices
768        if let Some(collection) = key.split(':').next() {
769            if let Some(index) = self.primary_indices.get_mut(collection) {
770                index.remove(key);
771            }
772        }
773
774        // If in transaction, record the operation (with null value to indicate deletion)
775        if self
776            .in_transaction
777            .load(std::sync::atomic::Ordering::SeqCst)
778        {
779            self.transaction_ops.insert(key.to_string(), Vec::new());
780        }
781
782        Ok(())
783    }
784
785    pub async fn delete_collection(&self, collection: &str) -> Result<()> {
786        let prefix = format!("{}:", collection);
787
788        // Get all keys in collection
789        let keys: Vec<String> = self
790            .cold
791            .scan()
792            .filter_map(|r| r.ok())
793            .filter(|(k, _)| k.starts_with(&prefix))
794            .map(|(k, _)| k)
795            .collect();
796
797        // Delete each key
798        for key in keys {
799            self.delete(&key).await?;
800        }
801
802        // Remove collection indices
803        self.primary_indices.remove(collection);
804        self.secondary_indices
805            .retain(|k, _| !k.starts_with(&prefix));
806
807        Ok(())
808    }
809
810    #[allow(dead_code)]
811    fn remove_from_indices(&self, collection: &str, doc: &Document) -> Result<()> {
812        // Remove from primary index
813        if let Some(index) = self.primary_indices.get(collection) {
814            index.remove(&doc.id);
815        }
816
817        // Remove from secondary indices
818        for (field, value) in &doc.data {
819            let index_key = format!("{}:{}", collection, field);
820            if let Some(index) = self.secondary_indices.get(&index_key) {
821                if let Some(mut doc_ids) = index.get_mut(&value.to_string()) {
822                    doc_ids.retain(|id| id != &doc.id);
823                }
824            }
825        }
826
827        Ok(())
828    }
829
830    pub async fn search_text(
831        &self,
832        collection: &str,
833        field: &str,
834        query: &str,
835    ) -> Result<Vec<Document>> {
836        let mut results = Vec::new();
837        let docs = self.get_all_collection(collection).await?;
838
839        for doc in docs {
840            if let Some(Value::String(text)) = doc.data.get(field) {
841                if text.to_lowercase().contains(&query.to_lowercase()) {
842                    results.push(doc);
843                }
844            }
845        }
846
847        Ok(results)
848    }
849
850    /// Export a collection to a JSON file
851    ///
852    /// # Arguments
853    /// * `collection` - Name of the collection to export
854    /// * `output_path` - Path to the output JSON file
855    ///
856    /// # Returns
857    /// Success or an error
858    ///
859    /// # Examples
860    ///
861    /// ```
862    /// // Backup a collection to JSON
863    /// db.export_as_json("users", "./backups/users_2023-10-15.json")?;
864    /// ```
865    pub fn export_as_json(&self, collection: &str, output_path: &str) -> Result<()> {
866        let output_path = if !output_path.ends_with(".json") {
867            format!("{}.json", output_path)
868        } else {
869            output_path.to_string()
870        };
871
872        let mut docs = Vec::new();
873
874        // Get all documents from the specified collection
875        for result in self.cold.scan() {
876            let (key, value) = result?;
877
878            // Only process documents from the specified collection
879            if let Some(key_collection) = key.split(':').next() {
880                if key_collection == collection && !key.starts_with("_collection:") {
881                    if let Ok(doc) = serde_json::from_slice::<Document>(&value) {
882                        // Convert Value enum to raw JSON values
883                        let mut clean_doc = serde_json::Map::new();
884                        for (k, v) in doc.data {
885                            match v {
886                                Value::String(s) => clean_doc.insert(k, JsonValue::String(s)),
887                                Value::Int(i) => clean_doc.insert(k, JsonValue::Number(i.into())),
888                                Value::Float(f) => {
889                                    if let Some(n) = serde_json::Number::from_f64(f) {
890                                        clean_doc.insert(k, JsonValue::Number(n))
891                                    } else {
892                                        clean_doc.insert(k, JsonValue::Null)
893                                    }
894                                }
895                                Value::Bool(b) => clean_doc.insert(k, JsonValue::Bool(b)),
896                                Value::Array(arr) => {
897                                    let clean_arr: Vec<JsonValue> = arr
898                                        .into_iter()
899                                        .map(|v| match v {
900                                            Value::String(s) => JsonValue::String(s),
901                                            Value::Int(i) => JsonValue::Number(i.into()),
902                                            Value::Float(f) => serde_json::Number::from_f64(f)
903                                                .map(JsonValue::Number)
904                                                .unwrap_or(JsonValue::Null),
905                                            Value::Bool(b) => JsonValue::Bool(b),
906                                            Value::Null => JsonValue::Null,
907                                            _ => JsonValue::Null,
908                                        })
909                                        .collect();
910                                    clean_doc.insert(k, JsonValue::Array(clean_arr))
911                                }
912                                Value::Uuid(u) => {
913                                    clean_doc.insert(k, JsonValue::String(u.to_string()))
914                                }
915                                Value::Null => clean_doc.insert(k, JsonValue::Null),
916                                Value::Object(_) => None, // Handle nested objects if needed
917                            };
918                        }
919                        docs.push(JsonValue::Object(clean_doc));
920                    }
921                }
922            }
923        }
924
925        let output = JsonValue::Object(serde_json::Map::from_iter(vec![(
926            collection.to_string(),
927            JsonValue::Array(docs),
928        )]));
929
930        let mut file = StdFile::create(&output_path)?;
931        serde_json::to_writer_pretty(&mut file, &output)?;
932        println!("Exported collection '{}' to {}", collection, &output_path);
933        Ok(())
934    }
935
936    /// Export specific collection to CSV file
937    pub fn export_as_csv(&self, collection: &str, filename: &str) -> Result<()> {
938        let output_path = if !filename.ends_with(".csv") {
939            format!("{}.csv", filename)
940        } else {
941            filename.to_string()
942        };
943
944        let mut writer = csv::Writer::from_path(&output_path)?;
945        let mut headers = Vec::new();
946        let mut first_doc = true;
947
948        // Get all documents from the specified collection
949        for result in self.cold.scan() {
950            let (key, value) = result?;
951
952            // Only process documents from the specified collection
953            if let Some(key_collection) = key.split(':').next() {
954                if key_collection == collection && !key.starts_with("_collection:") {
955                    if let Ok(doc) = serde_json::from_slice::<Document>(&value) {
956                        // Write headers from first document
957                        if first_doc && !doc.data.is_empty() {
958                            headers = doc.data.keys().cloned().collect();
959                            writer.write_record(&headers)?;
960                            first_doc = false;
961                        }
962
963                        // Write the document values
964                        let values: Vec<String> = headers
965                            .iter()
966                            .map(|header| {
967                                doc.data
968                                    .get(header)
969                                    .map(|v| v.to_string())
970                                    .unwrap_or_default()
971                            })
972                            .collect();
973                        writer.write_record(&values)?;
974                    }
975                }
976            }
977        }
978
979        writer.flush()?;
980        println!("Exported collection '{}' to {}", collection, &output_path);
981        Ok(())
982    }
983
984    // Helper method to create filter-based queries
985    pub fn find<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
986        self.query(collection)
987    }
988
989    // Convenience methods that build on top of the FilterBuilder
990
991    pub async fn find_by_id(&self, collection: &str, id: &str) -> Result<Option<Document>> {
992        self.query(collection)
993            .filter(|f| f.eq("id", id))
994            .first_one()
995            .await
996    }
997
998    pub async fn find_one<F>(&self, collection: &str, filter_fn: F) -> Result<Option<Document>>
999    where
1000        F: Fn(&FilterBuilder) -> bool + 'static,
1001    {
1002        self.query(collection).filter(filter_fn).first_one().await
1003    }
1004
1005    pub async fn find_by_field<T: Into<Value> + Clone + 'static>(
1006        &self,
1007        collection: &str,
1008        field: &'static str,
1009        value: T,
1010    ) -> Result<Vec<Document>> {
1011        let value_clone = value.clone();
1012        self.query(collection)
1013            .filter(move |f| f.eq(field, value_clone.clone()))
1014            .collect()
1015            .await
1016    }
1017
1018    pub async fn find_by_fields(
1019        &self,
1020        collection: &str,
1021        fields: Vec<(&str, Value)>,
1022    ) -> Result<Vec<Document>> {
1023        let mut query = self.query(collection);
1024
1025        for (field, value) in fields {
1026            let field_owned = field.to_owned();
1027            let value_owned = value.clone();
1028            query = query.filter(move |f| f.eq(&field_owned, value_owned.clone()));
1029        }
1030
1031        query.collect().await
1032    }
1033
1034    // Advanced example: find documents with a field value in a specific range
1035    pub async fn find_in_range<T: Into<Value> + Clone + 'static>(
1036        &self,
1037        collection: &str,
1038        field: &'static str,
1039        min: T,
1040        max: T,
1041    ) -> Result<Vec<Document>> {
1042        self.query(collection)
1043            .filter(move |f| f.between(field, min.clone(), max.clone()))
1044            .collect()
1045            .await
1046    }
1047
1048    // Complex query example: build with multiple combined filters
1049    pub async fn find_complex<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
1050        self.query(collection)
1051    }
1052
1053    // Create a full-text search query with added filter options
1054    pub fn advanced_search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
1055        self.search(collection)
1056    }
1057
1058    // Utility methods for common operations
1059    pub async fn upsert(&self, collection: &str, id: &str, data: InsertData) -> Result<String> {
1060        // Check if document exists
1061        if let Some(mut doc) = self.get_document(collection, id)? {
1062            // Update existing document
1063            for (key, value) in data {
1064                doc.data.insert(key.to_string(), value);
1065            }
1066
1067            // Save changes
1068            self.put(
1069                format!("{}:{}", collection, id),
1070                serde_json::to_vec(&doc)?,
1071                None,
1072            )?;
1073
1074            Ok(id.to_string())
1075        } else {
1076            // Create new document
1077            let mut data_with_id = data;
1078            data_with_id.push(("id", Value::String(id.to_string())));
1079            self.insert_into(collection, data_with_id)
1080        }
1081    }
1082
1083    // Atomic increment/decrement
1084    pub async fn increment(
1085        &self,
1086        collection: &str,
1087        id: &str,
1088        field: &str,
1089        amount: i64,
1090    ) -> Result<i64> {
1091        if let Some(mut doc) = self.get_document(collection, id)? {
1092            // Get current value
1093            let current = match doc.data.get(field) {
1094                Some(Value::Int(i)) => *i,
1095                _ => 0,
1096            };
1097
1098            // Increment
1099            let new_value = current + amount;
1100            doc.data.insert(field.to_string(), Value::Int(new_value));
1101
1102            // Save changes
1103            self.put(
1104                format!("{}:{}", collection, id),
1105                serde_json::to_vec(&doc)?,
1106                None,
1107            )?;
1108
1109            Ok(new_value)
1110        } else {
1111            Err(AuroraError::NotFound(format!(
1112                "Document {}:{} not found",
1113                collection, id
1114            )))
1115        }
1116    }
1117
1118    // Batch operations
1119    pub async fn batch_insert(
1120        &self,
1121        collection: &str,
1122        docs: Vec<InsertData>,
1123    ) -> Result<Vec<String>> {
1124        // Begin transaction
1125        self.begin_transaction()?;
1126
1127        let mut ids = Vec::with_capacity(docs.len());
1128
1129        // Insert all documents
1130        for data in docs {
1131            match self.insert_into(collection, data) {
1132                Ok(id) => ids.push(id),
1133                Err(e) => {
1134                    // Rollback on error
1135                    self.rollback_transaction()?;
1136                    return Err(e);
1137                }
1138            }
1139        }
1140
1141        // Commit transaction
1142        self.commit_transaction()?;
1143
1144        Ok(ids)
1145    }
1146
1147    // Delete documents by query
1148    pub async fn delete_by_query<F>(&self, collection: &str, filter_fn: F) -> Result<usize>
1149    where
1150        F: Fn(&FilterBuilder) -> bool + 'static,
1151    {
1152        let docs = self.query(collection).filter(filter_fn).collect().await?;
1153
1154        let mut deleted_count = 0;
1155
1156        for doc in docs {
1157            let key = format!("{}:{}", collection, doc.id);
1158            self.delete(&key).await?;
1159            deleted_count += 1;
1160        }
1161
1162        Ok(deleted_count)
1163    }
1164
1165    /// Import documents from a JSON file into a collection
1166    ///
1167    /// This method validates documents against the collection schema
1168    /// and skips documents that already exist in the database.
1169    ///
1170    /// # Arguments
1171    /// * `collection` - Name of the collection to import into
1172    /// * `filename` - Path to the JSON file containing documents
1173    ///
1174    /// # Returns
1175    /// Statistics about the import operation or an error
1176    ///
1177    /// # Examples
1178    ///
1179    /// ```
1180    /// // Import documents from JSON
1181    /// let stats = db.import_from_json("users", "./data/new_users.json").await?;
1182    /// println!("Imported: {}, Skipped: {}, Failed: {}",
1183    ///     stats.imported, stats.skipped, stats.failed);
1184    /// ```
1185    pub async fn import_from_json(&self, collection: &str, filename: &str) -> Result<ImportStats> {
1186        // Validate that the collection exists
1187        let collection_def = self.get_collection_definition(collection)?;
1188
1189        // Load JSON file
1190        let json_string = read_to_string(filename)
1191            .await
1192            .map_err(|e| AuroraError::IoError(format!("Failed to read import file: {}", e)))?;
1193
1194        // Parse JSON
1195        let documents: Vec<JsonValue> = from_str(&json_string)
1196            .map_err(|e| AuroraError::SerializationError(format!("Failed to parse JSON: {}", e)))?;
1197
1198        let mut stats = ImportStats::default();
1199
1200        // Process each document
1201        for doc_json in documents {
1202            match self
1203                .import_document(collection, &collection_def, doc_json)
1204                .await
1205            {
1206                Ok(ImportResult::Imported) => stats.imported += 1,
1207                Ok(ImportResult::Skipped) => stats.skipped += 1,
1208                Err(_) => stats.failed += 1,
1209            }
1210        }
1211
1212        Ok(stats)
1213    }
1214
1215    /// Import a single document, performing schema validation and duplicate checking
1216    async fn import_document(
1217        &self,
1218        collection: &str,
1219        collection_def: &Collection,
1220        doc_json: JsonValue,
1221    ) -> Result<ImportResult> {
1222        if !doc_json.is_object() {
1223            return Err(AuroraError::InvalidOperation("Expected JSON object".into()));
1224        }
1225
1226        // Extract document ID if present
1227        let doc_id = doc_json
1228            .get("id")
1229            .and_then(|id| id.as_str())
1230            .map(|s| s.to_string())
1231            .unwrap_or_else(|| Uuid::new_v4().to_string());
1232
1233        // Check if document with this ID already exists
1234        if let Some(_) = self.get_document(collection, &doc_id)? {
1235            return Ok(ImportResult::Skipped);
1236        }
1237
1238        // Convert JSON to our document format and validate against schema
1239        let mut data_map = HashMap::new();
1240
1241        if let Some(obj) = doc_json.as_object() {
1242            for (field_name, field_def) in &collection_def.fields {
1243                if let Some(json_value) = obj.get(field_name) {
1244                    // Validate value against field type
1245                    if !self.validate_field_value(json_value, &field_def.field_type) {
1246                        return Err(AuroraError::InvalidOperation(format!(
1247                            "Field '{}' has invalid type",
1248                            field_name
1249                        )));
1250                    }
1251
1252                    // Convert JSON value to our Value type
1253                    let value = self.json_to_value(json_value)?;
1254                    data_map.insert(field_name.clone(), value);
1255                } else if field_def.unique {
1256                    // Missing required unique field
1257                    return Err(AuroraError::InvalidOperation(format!(
1258                        "Missing required unique field '{}'",
1259                        field_name
1260                    )));
1261                }
1262            }
1263        }
1264
1265        // Check for duplicates by unique fields
1266        for unique_field in &collection_def.unique_fields {
1267            if let Some(value) = data_map.get(unique_field) {
1268                // Query for existing documents with this unique value
1269                let query_results = self
1270                    .query(collection)
1271                    .filter(move |f| f.eq(unique_field, value.clone()))
1272                    .limit(1)
1273                    .collect()
1274                    .await?;
1275
1276                if !query_results.is_empty() {
1277                    // Found duplicate by unique field
1278                    return Ok(ImportResult::Skipped);
1279                }
1280            }
1281        }
1282
1283        // Create and insert document
1284        let document = Document {
1285            id: doc_id,
1286            data: data_map,
1287        };
1288
1289        self.put(
1290            format!("{}:{}", collection, document.id),
1291            serde_json::to_vec(&document)?,
1292            None,
1293        )?;
1294
1295        Ok(ImportResult::Imported)
1296    }
1297
1298    /// Validate that a JSON value matches the expected field type
1299    fn validate_field_value(&self, value: &JsonValue, field_type: &FieldType) -> bool {
1300        match field_type {
1301            FieldType::String => value.is_string(),
1302            FieldType::Int => value.is_i64() || value.is_u64(),
1303            FieldType::Float => value.is_number(),
1304            FieldType::Boolean => value.is_boolean(),
1305            FieldType::Array => value.is_array(),
1306            FieldType::Uuid => {
1307                value.is_string() && Uuid::parse_str(value.as_str().unwrap_or("")).is_ok()
1308            }
1309        }
1310    }
1311
1312    /// Convert a JSON value to our internal Value type
1313    fn json_to_value(&self, json_value: &JsonValue) -> Result<Value> {
1314        match json_value {
1315            JsonValue::Null => Ok(Value::Null),
1316            JsonValue::Bool(b) => Ok(Value::Bool(*b)),
1317            JsonValue::Number(n) => {
1318                if let Some(i) = n.as_i64() {
1319                    Ok(Value::Int(i))
1320                } else if let Some(f) = n.as_f64() {
1321                    Ok(Value::Float(f))
1322                } else {
1323                    Err(AuroraError::InvalidOperation("Invalid number value".into()))
1324                }
1325            }
1326            JsonValue::String(s) => {
1327                // Try parsing as UUID first
1328                if let Ok(uuid) = Uuid::parse_str(s) {
1329                    Ok(Value::Uuid(uuid))
1330                } else {
1331                    Ok(Value::String(s.clone()))
1332                }
1333            }
1334            JsonValue::Array(arr) => {
1335                let mut values = Vec::new();
1336                for item in arr {
1337                    values.push(self.json_to_value(item)?);
1338                }
1339                Ok(Value::Array(values))
1340            }
1341            JsonValue::Object(obj) => {
1342                let mut map = HashMap::new();
1343                for (k, v) in obj {
1344                    map.insert(k.clone(), self.json_to_value(v)?);
1345                }
1346                Ok(Value::Object(map))
1347            }
1348        }
1349    }
1350
1351    /// Get collection definition
1352    fn get_collection_definition(&self, collection: &str) -> Result<Collection> {
1353        if let Some(data) = self.get(&format!("_collection:{}", collection))? {
1354            let collection_def: Collection = serde_json::from_slice(&data)?;
1355            Ok(collection_def)
1356        } else {
1357            Err(AuroraError::CollectionNotFound(collection.to_string()))
1358        }
1359    }
1360
1361    /// Get storage statistics and information about the database
1362    pub fn get_database_stats(&self) -> Result<DatabaseStats> {
1363        let hot_stats = self.hot.get_stats();
1364        let cold_stats = self.cold.get_stats()?;
1365
1366        Ok(DatabaseStats {
1367            hot_stats,
1368            cold_stats,
1369            estimated_size: self.cold.estimated_size(),
1370            collections: self.get_collection_stats()?,
1371        })
1372    }
1373
1374    /// Get a direct reference to a value in the hot cache
1375    pub fn get_hot_ref(&self, key: &str) -> Option<Arc<Vec<u8>>> {
1376        self.hot.get_ref(key)
1377    }
1378
1379    /// Check if a key is currently stored in the hot cache
1380    pub fn is_in_hot_cache(&self, key: &str) -> bool {
1381        self.hot.is_hot(key)
1382    }
1383
1384    /// Start background cleanup of hot cache with specified interval
1385    pub async fn start_hot_cache_maintenance(&self, interval_secs: u64) {
1386        let hot_store = Arc::new(self.hot.clone());
1387        hot_store.start_cleanup_with_interval(interval_secs).await;
1388    }
1389
1390    /// Clear the hot cache (useful when memory needs to be freed)
1391    pub fn clear_hot_cache(&self) {
1392        self.hot.clear();
1393        println!(
1394            "Hot cache cleared, current hit ratio: {:.2}%",
1395            self.hot.hit_ratio() * 100.0
1396        );
1397    }
1398
1399    /// Store multiple key-value pairs efficiently in a single batch operation
1400    pub fn batch_write(&self, pairs: Vec<(String, Vec<u8>)>) -> Result<()> {
1401        self.cold.batch_set(pairs)
1402    }
1403
1404    /// Scan for keys with a specific prefix
1405    pub fn scan_with_prefix(
1406        &self,
1407        prefix: &str,
1408    ) -> impl Iterator<Item = Result<(String, Vec<u8>)>> + '_ {
1409        self.cold.scan_prefix(prefix)
1410    }
1411
1412    /// Get storage efficiency metrics for the database
1413    pub fn get_collection_stats(&self) -> Result<HashMap<String, CollectionStats>> {
1414        let mut stats = HashMap::new();
1415
1416        // Scan all collections
1417        let collections: Vec<String> = self
1418            .cold
1419            .scan()
1420            .filter_map(|r| r.ok())
1421            .map(|(k, _)| k)
1422            .filter(|k| k.starts_with("_collection:"))
1423            .map(|k| k.trim_start_matches("_collection:").to_string())
1424            .collect();
1425
1426        for collection in collections {
1427            let prefix = format!("{}:", collection);
1428
1429            // Count documents
1430            let count = self.cold.scan_prefix(&prefix).count();
1431
1432            // Estimate size
1433            let size: usize = self
1434                .cold
1435                .scan_prefix(&prefix)
1436                .filter_map(|r| r.ok())
1437                .map(|(_, v)| v.len())
1438                .sum();
1439
1440            stats.insert(
1441                collection,
1442                CollectionStats {
1443                    count,
1444                    size_bytes: size,
1445                    avg_doc_size: if count > 0 { size / count } else { 0 },
1446                },
1447            );
1448        }
1449
1450        Ok(stats)
1451    }
1452
1453    /// Search for documents by exact value using an index
1454    ///
1455    /// This method performs a fast lookup using a pre-created index
1456    pub fn search_by_value(
1457        &self,
1458        collection: &str,
1459        field: &str,
1460        value: &Value,
1461    ) -> Result<Vec<Document>> {
1462        let index_key = format!("_index:{}:{}", collection, field);
1463
1464        if let Some(index_data) = self.get(&index_key)? {
1465            let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
1466            let index = Index::new(index_def);
1467
1468            // Use the previously unused search method
1469            if let Some(doc_ids) = index.search(value) {
1470                // Load the documents by ID
1471                let mut docs = Vec::new();
1472                for id in doc_ids {
1473                    if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
1474                        let doc: Document = serde_json::from_slice(&doc_data)?;
1475                        docs.push(doc);
1476                    }
1477                }
1478                return Ok(docs);
1479            }
1480        }
1481
1482        // Return empty result if no index or no matches
1483        Ok(Vec::new())
1484    }
1485
1486    /// Perform a full-text search on an indexed text field
1487    ///
1488    /// This provides more advanced text search capabilities including
1489    /// relevance ranking of results
1490    pub fn full_text_search(
1491        &self,
1492        collection: &str,
1493        field: &str,
1494        query: &str,
1495    ) -> Result<Vec<Document>> {
1496        let index_key = format!("_index:{}:{}", collection, field);
1497
1498        if let Some(index_data) = self.get(&index_key)? {
1499            let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
1500
1501            // Ensure this is a full-text index
1502            if !matches!(index_def.index_type, IndexType::FullText) {
1503                return Err(AuroraError::InvalidOperation(format!(
1504                    "Field '{}' is not indexed as full-text",
1505                    field
1506                )));
1507            }
1508
1509            let index = Index::new(index_def);
1510
1511            // Use the previously unused search_text method
1512            if let Some(doc_id_scores) = index.search_text(query) {
1513                // Load the documents by ID, preserving score order
1514                let mut docs = Vec::new();
1515                for (id, _score) in doc_id_scores {
1516                    if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
1517                        let doc: Document = serde_json::from_slice(&doc_data)?;
1518                        docs.push(doc);
1519                    }
1520                }
1521                return Ok(docs);
1522            }
1523        }
1524
1525        // Return empty result if no index or no matches
1526        Ok(Vec::new())
1527    }
1528
1529    /// Create a full-text search index on a text field
1530    pub fn create_text_index(
1531        &self,
1532        collection: &str,
1533        field: &str,
1534        _enable_stop_words: bool,
1535    ) -> Result<()> {
1536        // Check if collection exists
1537        if self.get(&format!("_collection:{}", collection))?.is_none() {
1538            return Err(AuroraError::CollectionNotFound(collection.to_string()));
1539        }
1540
1541        // Create index definition
1542        let index_def = IndexDefinition {
1543            name: format!("{}_{}_fulltext", collection, field),
1544            collection: collection.to_string(),
1545            fields: vec![field.to_string()],
1546            index_type: IndexType::FullText,
1547            unique: false,
1548        };
1549
1550        // Store index definition
1551        let index_key = format!("_index:{}:{}", collection, field);
1552        self.put(index_key, serde_json::to_vec(&index_def)?, None)?;
1553
1554        // Create the actual index
1555        let index = Index::new(index_def);
1556
1557        // Index all existing documents in the collection
1558        let prefix = format!("{}:", collection);
1559        for result in self.cold.scan_prefix(&prefix) {
1560            if let Ok((_, data)) = result {
1561                let doc: Document = serde_json::from_slice(&data)?;
1562                index.insert(&doc)?;
1563            }
1564        }
1565
1566        Ok(())
1567    }
1568
1569    pub async fn execute_simple_query(
1570        &self,
1571        builder: &SimpleQueryBuilder,
1572    ) -> Result<Vec<Document>> {
1573        let mut docs = self.get_all_collection(&builder.collection).await?;
1574
1575        docs.retain(|doc| {
1576            builder.filters.iter().all(|f| match f {
1577                Filter::Eq(field, value) => doc.data.get(field).map_or(false, |v| v == value),
1578                Filter::Gt(field, value) => doc.data.get(field).map_or(false, |v| v > value),
1579                Filter::Lt(field, value) => doc.data.get(field).map_or(false, |v| v < value),
1580                Filter::Contains(field, value_str) => {
1581                    doc.data.get(field).map_or(false, |v| match v {
1582                        Value::String(s) => s.contains(value_str),
1583                        Value::Array(arr) => arr.contains(&Value::String(value_str.clone())),
1584                        _ => false,
1585                    })
1586                }
1587            })
1588        });
1589
1590        // NOTE: This implementation does not yet support ordering, limits, or offsets.
1591        Ok(docs)
1592    }
1593
1594    pub async fn execute_dynamic_query(
1595        &self,
1596        collection: &str,
1597        payload: &QueryPayload,
1598    ) -> Result<Vec<Document>> {
1599        let mut docs = self.get_all_collection(collection).await?;
1600
1601        // 1. Apply Filters
1602        if let Some(filters) = &payload.filters {
1603            docs.retain(|doc| {
1604                filters.iter().all(|filter| {
1605                    doc.data
1606                        .get(&filter.field)
1607                        .map_or(false, |doc_val| check_filter(doc_val, filter))
1608                })
1609            });
1610        }
1611
1612        // 2. Apply Sorting
1613        if let Some(sort_options) = &payload.sort {
1614            docs.sort_by(|a, b| {
1615                let a_val = a.data.get(&sort_options.field);
1616                let b_val = b.data.get(&sort_options.field);
1617                let ordering = a_val
1618                    .partial_cmp(&b_val)
1619                    .unwrap_or(std::cmp::Ordering::Equal);
1620                if sort_options.ascending {
1621                    ordering
1622                } else {
1623                    ordering.reverse()
1624                }
1625            });
1626        }
1627
1628        // 3. Apply Pagination
1629        let mut docs = docs;
1630        if let Some(offset) = payload.offset {
1631            docs = docs.into_iter().skip(offset).collect();
1632        }
1633        if let Some(limit) = payload.limit {
1634            docs = docs.into_iter().take(limit).collect();
1635        }
1636
1637        // 4. Apply Field Selection (Projection)
1638        if let Some(select_fields) = &payload.select {
1639            if !select_fields.is_empty() {
1640                docs = docs
1641                    .into_iter()
1642                    .map(|mut doc| {
1643                        doc.data.retain(|key, _| select_fields.contains(key));
1644                        doc
1645                    })
1646                    .collect();
1647            }
1648        }
1649
1650        Ok(docs)
1651    }
1652
1653    pub async fn process_network_request(
1654        &self,
1655        request: crate::network::protocol::Request,
1656    ) -> crate::network::protocol::Response {
1657        use crate::network::protocol::Response;
1658
1659        match request {
1660            crate::network::protocol::Request::Get(key) => match self.get(&key) {
1661                Ok(value) => Response::Success(value),
1662                Err(e) => Response::Error(e.to_string()),
1663            },
1664            crate::network::protocol::Request::Put(key, value) => {
1665                match self.put(key, value, None) {
1666                    Ok(_) => Response::Done,
1667                    Err(e) => Response::Error(e.to_string()),
1668                }
1669            }
1670            crate::network::protocol::Request::Delete(key) => match self.delete(&key).await {
1671                Ok(_) => Response::Done,
1672                Err(e) => Response::Error(e.to_string()),
1673            },
1674            crate::network::protocol::Request::NewCollection { name, fields } => {
1675                let fields_for_db: Vec<(String, crate::types::FieldType, bool)> = fields
1676                    .iter()
1677                    .map(|(name, ft, unique)| (name.clone(), ft.clone(), *unique))
1678                    .collect();
1679
1680                match self.new_collection(&name, fields_for_db) {
1681                    Ok(_) => Response::Done,
1682                    Err(e) => Response::Error(e.to_string()),
1683                }
1684            }
1685            crate::network::protocol::Request::Insert { collection, data } => {
1686                match self.insert_map(&collection, data) {
1687                    Ok(id) => Response::Message(id),
1688                    Err(e) => Response::Error(e.to_string()),
1689                }
1690            }
1691            crate::network::protocol::Request::GetDocument { collection, id } => {
1692                match self.get_document(&collection, &id) {
1693                    Ok(doc) => Response::Document(doc),
1694                    Err(e) => Response::Error(e.to_string()),
1695                }
1696            }
1697            crate::network::protocol::Request::Query(builder) => {
1698                match self.execute_simple_query(&builder).await {
1699                    Ok(docs) => Response::Documents(docs),
1700                    Err(e) => Response::Error(e.to_string()),
1701                }
1702            }
1703            crate::network::protocol::Request::BeginTransaction => match self.begin_transaction() {
1704                Ok(_) => Response::Done,
1705                Err(e) => Response::Error(e.to_string()),
1706            },
1707            crate::network::protocol::Request::CommitTransaction => match self.commit_transaction()
1708            {
1709                Ok(_) => Response::Done,
1710                Err(e) => Response::Error(e.to_string()),
1711            },
1712            crate::network::protocol::Request::RollbackTransaction => {
1713                match self.rollback_transaction() {
1714                    Ok(_) => Response::Done,
1715                    Err(e) => Response::Error(e.to_string()),
1716                }
1717            }
1718        }
1719    }
1720}
1721
1722fn check_filter(doc_val: &Value, filter: &HttpFilter) -> bool {
1723    let filter_val = match json_to_value(&filter.value) {
1724        Ok(v) => v,
1725        Err(_) => return false,
1726    };
1727
1728    match filter.operator {
1729        FilterOperator::Eq => doc_val == &filter_val,
1730        FilterOperator::Ne => doc_val != &filter_val,
1731        FilterOperator::Gt => doc_val > &filter_val,
1732        FilterOperator::Gte => doc_val >= &filter_val,
1733        FilterOperator::Lt => doc_val < &filter_val,
1734        FilterOperator::Lte => doc_val <= &filter_val,
1735        FilterOperator::Contains => match (doc_val, &filter_val) {
1736            (Value::String(s), Value::String(fv)) => s.contains(fv),
1737            (Value::Array(arr), _) => arr.contains(&filter_val),
1738            _ => false,
1739        },
1740    }
1741}
1742
1743/// Results of importing a document
1744enum ImportResult {
1745    Imported,
1746    Skipped,
1747}
1748
1749/// Statistics from an import operation
1750#[derive(Debug, Default)]
1751pub struct ImportStats {
1752    /// Number of documents successfully imported
1753    pub imported: usize,
1754    /// Number of documents skipped (usually because they already exist)
1755    pub skipped: usize,
1756    /// Number of documents that failed to import
1757    pub failed: usize,
1758}
1759
1760/// Statistics for a specific collection
1761#[derive(Debug)]
1762pub struct CollectionStats {
1763    /// Number of documents in the collection
1764    pub count: usize,
1765    /// Total size of the collection in bytes
1766    pub size_bytes: usize,
1767    /// Average document size in bytes
1768    pub avg_doc_size: usize,
1769}
1770
1771/// Combined database statistics
1772#[derive(Debug)]
1773pub struct DatabaseStats {
1774    /// Hot cache statistics
1775    pub hot_stats: crate::storage::hot::CacheStats,
1776    /// Cold storage statistics
1777    pub cold_stats: crate::storage::cold::ColdStoreStats,
1778    /// Estimated total database size in bytes
1779    pub estimated_size: u64,
1780    /// Statistics for each collection
1781    pub collections: HashMap<String, CollectionStats>,
1782}
1783
1784#[cfg(test)]
1785mod tests {
1786    use super::*;
1787    use tempfile::tempdir;
1788
1789    #[tokio::test]
1790    async fn test_basic_operations() -> Result<()> {
1791        let temp_dir = tempdir()?;
1792        let db_path = temp_dir.path().join("test.aurora");
1793        let db = Aurora::open(db_path.to_str().unwrap())?;
1794
1795        // Test collection creation
1796        db.new_collection(
1797            "users",
1798            vec![
1799                ("name".to_string(), FieldType::String, false),
1800                ("age".to_string(), FieldType::Int, false),
1801                ("email".to_string(), FieldType::String, true),
1802            ],
1803        )?;
1804
1805        // Test document insertion
1806        let doc_id = db.insert_into(
1807            "users",
1808            vec![
1809                ("name", Value::String("John Doe".to_string())),
1810                ("age", Value::Int(30)),
1811                ("email", Value::String("john@example.com".to_string())),
1812            ],
1813        )?;
1814
1815        // Test document retrieval
1816        let doc = db.get_document("users", &doc_id)?.unwrap();
1817        assert_eq!(
1818            doc.data.get("name").unwrap(),
1819            &Value::String("John Doe".to_string())
1820        );
1821        assert_eq!(doc.data.get("age").unwrap(), &Value::Int(30));
1822
1823        Ok(())
1824    }
1825
1826    #[tokio::test]
1827    async fn test_transactions() -> Result<()> {
1828        let temp_dir = tempdir()?;
1829        let db_path = temp_dir.path().join("test.aurora");
1830        let db = Aurora::open(db_path.to_str().unwrap())?;
1831
1832        // Start transaction
1833        db.begin_transaction()?;
1834
1835        // Insert document
1836        let doc_id = db.insert_into("test", vec![("field", Value::String("value".to_string()))])?;
1837
1838        // Commit transaction
1839        db.commit_transaction()?;
1840
1841        // Verify document exists
1842        let doc = db.get_document("test", &doc_id)?.unwrap();
1843        assert_eq!(
1844            doc.data.get("field").unwrap(),
1845            &Value::String("value".to_string())
1846        );
1847
1848        Ok(())
1849    }
1850
1851    #[tokio::test]
1852    async fn test_query_operations() -> Result<()> {
1853        let temp_dir = tempdir()?;
1854        let db_path = temp_dir.path().join("test.aurora");
1855        let db = Aurora::open(db_path.to_str().unwrap())?;
1856
1857        // Test collection creation
1858        db.new_collection(
1859            "books",
1860            vec![
1861                ("title".to_string(), FieldType::String, false),
1862                ("author".to_string(), FieldType::String, false),
1863                ("year".to_string(), FieldType::Int, false),
1864            ],
1865        )?;
1866
1867        // Test document insertion
1868        db.insert_into(
1869            "books",
1870            vec![
1871                ("title", Value::String("Book 1".to_string())),
1872                ("author", Value::String("Author 1".to_string())),
1873                ("year", Value::Int(2020)),
1874            ],
1875        )?;
1876
1877        db.insert_into(
1878            "books",
1879            vec![
1880                ("title", Value::String("Book 2".to_string())),
1881                ("author", Value::String("Author 2".to_string())),
1882                ("year", Value::Int(2021)),
1883            ],
1884        )?;
1885
1886        // Test query
1887        let results = db
1888            .query("books")
1889            .filter(|f| f.gt("year", Value::Int(2019)))
1890            .order_by("year", true)
1891            .collect()
1892            .await?;
1893
1894        assert_eq!(results.len(), 2);
1895        assert!(results[0].data.get("year").unwrap() < results[1].data.get("year").unwrap());
1896
1897        Ok(())
1898    }
1899
1900    #[tokio::test]
1901    async fn test_blob_operations() -> Result<()> {
1902        let temp_dir = tempdir()?;
1903        let db_path = temp_dir.path().join("test.aurora");
1904        let db = Aurora::open(db_path.to_str().unwrap())?;
1905
1906        // Create test file
1907        let file_path = temp_dir.path().join("test.txt");
1908        std::fs::write(&file_path, b"Hello, World!")?;
1909
1910        // Test blob storage
1911        db.put_blob("test:blob".to_string(), &file_path).await?;
1912
1913        // Verify blob exists
1914        let data = db.get_data_by_pattern("test:blob")?;
1915        assert_eq!(data.len(), 1);
1916        match &data[0].1 {
1917            DataInfo::Blob { size } => assert_eq!(*size, 13 + 5), // content + "BLOB:" prefix
1918            _ => panic!("Expected Blob type"),
1919        }
1920
1921        Ok(())
1922    }
1923
1924    #[tokio::test]
1925    async fn test_blob_size_limit() -> Result<()> {
1926        let temp_dir = tempdir()?;
1927        let db_path = temp_dir.path().join("test.aurora");
1928        let db = Aurora::open(db_path.to_str().unwrap())?;
1929
1930        // Create a test file that's too large (201MB)
1931        let large_file_path = temp_dir.path().join("large_file.bin");
1932        let large_data = vec![0u8; 201 * 1024 * 1024];
1933        std::fs::write(&large_file_path, &large_data)?;
1934
1935        // Attempt to store the large file
1936        let result = db
1937            .put_blob("test:large_blob".to_string(), &large_file_path)
1938            .await;
1939
1940        assert!(result.is_err());
1941        assert!(matches!(
1942            result.unwrap_err(),
1943            AuroraError::InvalidOperation(_)
1944        ));
1945
1946        Ok(())
1947    }
1948}