aurora_db/db.rs
1//! # Aurora Database
2//!
3//! Aurora is an embedded document database with tiered storage architecture.
4//! It provides document storage, querying, indexing, and search capabilities
5//! while optimizing for both performance and durability.
6//!
7//! ## Key Features
8//!
9//! * **Tiered Storage**: Hot in-memory cache + persistent cold storage
10//! * **Document Model**: Schema-flexible JSON-like document storage
11//! * **Querying**: Rich query capabilities with filtering and sorting
12//! * **Full-text Search**: Built-in search engine with relevance ranking
13//! * **Transactions**: ACID-compliant transaction support
14//! * **Blob Storage**: Efficient storage for large binary objects
15//!
16//! ## Usage Example
17//!
18//! ```rust
19//! use aurora::Aurora;
20//!
21//! // Open a database
22//! let db = Aurora::open("my_database.db")?;
23//!
24//! // Create a collection with schema
25//! db.new_collection("users", vec![
26//! ("name", FieldType::String, false),
27//! ("email", FieldType::String, true), // unique field
28//! ("age", FieldType::Int, false),
29//! ])?;
30//!
31//! // Insert a document
32//! let user_id = db.insert_into("users", vec![
33//! ("name", Value::String("Jane Doe".to_string())),
34//! ("email", Value::String("jane@example.com".to_string())),
35//! ("age", Value::Int(28)),
36//! ])?;
37//!
38//! // Query for documents
39//! let adult_users = db.query("users")
40//! .filter(|f| f.gt("age", 18))
41//! .order_by("name", true)
42//! .collect()
43//! .await?;
44//! ```
45
46use crate::error::{AqlError, ErrorCode, Result};
47use crate::index::{Index, IndexDefinition, IndexType};
48use crate::network::http_models::{
49 Filter as HttpFilter, FilterOperator, QueryPayload, json_to_value,
50};
51use crate::parser;
52use crate::parser::executor::{ExecutionOptions, ExecutionPlan, ExecutionResult};
53use crate::query::FilterBuilder;
54use crate::query::{Filter, QueryBuilder, SearchBuilder, SimpleQueryBuilder};
55use crate::storage::{ColdStore, HotStore, WriteBuffer};
56use crate::types::{
57 AuroraConfig, Collection, Document, DurabilityMode, FieldDefinition, FieldType, Value,
58};
59use crate::wal::{Operation, WriteAheadLog};
60use dashmap::DashMap;
61use serde_json::Value as JsonValue;
62use serde_json::from_str;
63use std::collections::HashMap;
64use std::fmt;
65use std::fs::File as StdFile;
66use std::path::{Path, PathBuf};
67use std::sync::{Arc, RwLock};
68use std::time::Duration;
69use tokio::fs::File;
70use tokio::fs::read_to_string;
71use tokio::io::AsyncReadExt;
72use tokio::sync::OnceCell;
73use uuid::Uuid;
74
75// Disk location metadata for primary index
76// Instead of storing full Vec<u8> values, we store minimal metadata
77#[derive(Debug, Clone, Copy)]
78struct DiskLocation {
79 size: u32, // Size in bytes (useful for statistics)
80}
81
82impl DiskLocation {
83 fn new(size: usize) -> Self {
84 Self { size: size as u32 }
85 }
86}
87
88// Index types for faster lookups
89type PrimaryIndex = DashMap<String, DiskLocation>;
90type SecondaryIndex = DashMap<String, Vec<String>>;
91
92// Move DataInfo enum outside impl block
93#[derive(Debug)]
94pub enum DataInfo {
95 Data { size: usize, preview: String },
96 Blob { size: usize },
97 Compressed { size: usize },
98}
99
100#[derive(Debug)]
101#[allow(dead_code)]
102enum WalOperation {
103 Put {
104 key: Arc<String>,
105 value: Arc<Vec<u8>>,
106 },
107 Delete {
108 key: String,
109 },
110}
111
112impl DataInfo {
113 pub fn size(&self) -> usize {
114 match self {
115 DataInfo::Data { size, .. } => *size,
116 DataInfo::Blob { size } => *size,
117 DataInfo::Compressed { size } => *size,
118 }
119 }
120}
121
122/// The main database engine
123///
124/// Aurora combines a tiered storage architecture with document-oriented database features:
125/// - Hot tier: In-memory cache for frequently accessed data
126/// - Cold tier: Persistent disk storage for durability
127/// - Primary indices: Fast key-based access
128/// - Secondary indices: Fast field-based queries
129///
130/// # Examples
131///
132/// ```
133/// // Open a database (creates if doesn't exist)
134/// let db = Aurora::open("my_app.db")?;
135///
136/// // Insert a document
137/// let doc_id = db.insert_into("users", vec![
138/// ("name", Value::String("Alice".to_string())),
139/// ("age", Value::Int(32)),
140/// ])?;
141///
142/// // Retrieve a document
143/// let user = db.get_document("users", &doc_id)?;
144/// ```
145pub struct Aurora {
146 hot: HotStore,
147 cold: Arc<ColdStore>,
148 primary_indices: Arc<DashMap<String, PrimaryIndex>>,
149 secondary_indices: Arc<DashMap<String, SecondaryIndex>>,
150 indices_initialized: Arc<OnceCell<()>>,
151 transaction_manager: crate::transaction::TransactionManager,
152 indices: Arc<DashMap<String, Index>>,
153 schema_cache: Arc<DashMap<String, Arc<Collection>>>,
154 config: AuroraConfig,
155 write_buffer: Option<WriteBuffer>,
156 pub pubsub: crate::pubsub::PubSubSystem,
157 // Write-ahead log for durability (optional, based on config)
158 wal: Option<Arc<RwLock<WriteAheadLog>>>,
159 // Background checkpoint task
160 checkpoint_shutdown: Option<tokio::sync::mpsc::UnboundedSender<()>>,
161 // Background compaction task
162 compaction_shutdown: Option<tokio::sync::mpsc::UnboundedSender<()>>,
163 // Worker system for background jobs
164 pub workers: Option<Arc<crate::workers::WorkerSystem>>,
165 // Asynchronous WAL Writer Channel
166 wal_writer: Option<tokio::sync::mpsc::UnboundedSender<WalOperation>>,
167 // Computed fields manager
168 pub computed: Arc<RwLock<crate::computed::ComputedFields>>,
169}
170
171impl fmt::Debug for Aurora {
172 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
173 f.debug_struct("Aurora")
174 .field("hot", &"HotStore")
175 .field("cold", &"ColdStore")
176 .field("primary_indices_count", &self.primary_indices.len())
177 .field("secondary_indices_count", &self.secondary_indices.len())
178 .field(
179 "active_transactions",
180 &self.transaction_manager.active_count(),
181 )
182 .field("indices_count", &self.indices.len())
183 .finish()
184 }
185}
186
187impl Clone for Aurora {
188 fn clone(&self) -> Self {
189 Self {
190 hot: self.hot.clone(),
191 cold: Arc::clone(&self.cold),
192 primary_indices: Arc::clone(&self.primary_indices),
193 secondary_indices: Arc::clone(&self.secondary_indices),
194 indices_initialized: Arc::clone(&self.indices_initialized),
195 transaction_manager: self.transaction_manager.clone(),
196 indices: Arc::clone(&self.indices),
197 schema_cache: Arc::clone(&self.schema_cache),
198 config: self.config.clone(),
199 write_buffer: None,
200 pubsub: self.pubsub.clone(),
201 wal: self.wal.clone(),
202 checkpoint_shutdown: self.checkpoint_shutdown.clone(),
203 compaction_shutdown: self.compaction_shutdown.clone(),
204 workers: self.workers.clone(),
205
206 wal_writer: self.wal_writer.clone(),
207 computed: Arc::clone(&self.computed),
208 }
209 }
210}
211
212/// Trait for converting inputs into execution parameters
213pub trait ToExecParams {
214 fn into_params(self) -> Result<(String, ExecutionOptions)>;
215}
216
217impl<'a> ToExecParams for &'a str {
218 fn into_params(self) -> Result<(String, ExecutionOptions)> {
219 Ok((self.to_string(), ExecutionOptions::default()))
220 }
221}
222
223impl ToExecParams for String {
224 fn into_params(self) -> Result<(String, ExecutionOptions)> {
225 Ok((self, ExecutionOptions::default()))
226 }
227}
228
229impl<'a, V> ToExecParams for (&'a str, V)
230where
231 V: Into<serde_json::Value>,
232{
233 fn into_params(self) -> Result<(String, ExecutionOptions)> {
234 let (aql, vars) = self;
235 let json_vars = vars.into();
236 let map = match json_vars {
237 serde_json::Value::Object(map) => map.into_iter().collect(),
238 _ => {
239 return Err(AqlError::new(
240 ErrorCode::InvalidInput,
241 "Variables must be a JSON object",
242 ));
243 }
244 };
245 Ok((
246 aql.to_string(),
247 ExecutionOptions {
248 variables: map,
249 ..Default::default()
250 },
251 ))
252 }
253}
254
255impl Aurora {
256 /// Execute AQL query (variables are optional)
257 ///
258 /// Supports two forms:
259 /// 1. `db.execute("query").await`
260 /// 2. `db.execute(("query", vars)).await`
261 pub async fn execute<I: ToExecParams>(&self, input: I) -> Result<ExecutionResult> {
262 let (aql, options) = input.into_params()?;
263 parser::executor::execute(self, &aql, options).await
264 }
265
266 /// Stream real-time changes using AQL subscription syntax
267 ///
268 /// This is a convenience method that extracts the `ChangeListener` from
269 /// an AQL subscription query, providing a cleaner API than using `execute()` directly.
270 ///
271 /// # Example
272 /// ```
273 /// // Stream changes from active products
274 /// let mut listener = db.stream(r#"
275 /// subscription {
276 /// products(where: { active: { eq: true } }) {
277 /// id
278 /// name
279 /// }
280 /// }
281 /// "#).await?;
282 ///
283 /// // Receive real-time events
284 /// while let Ok(event) = listener.recv().await {
285 /// println!("Change: {:?} on {}", event.change_type, event.id);
286 /// }
287 /// ```
288 pub async fn stream(&self, aql: &str) -> Result<crate::pubsub::ChangeListener> {
289 let result = self.execute(aql).await?;
290
291 match result {
292 ExecutionResult::Subscription(sub) => sub.stream.ok_or_else(|| {
293 crate::error::AqlError::new(
294 crate::error::ErrorCode::QueryError,
295 "Subscription did not return a stream".to_string(),
296 )
297 }),
298 _ => Err(crate::error::AqlError::new(
299 crate::error::ErrorCode::QueryError,
300 "Expected a subscription query, got a different operation type".to_string(),
301 )),
302 }
303 }
304
305 /// Explain AQL query execution plan
306 pub async fn explain<I: ToExecParams>(&self, input: I) -> Result<ExecutionPlan> {
307 let (aql, options) = input.into_params()?;
308
309 // Parse and analyze without executing
310 let doc = parser::parse_with_variables(
311 &aql,
312 serde_json::Value::Object(options.variables.clone().into_iter().collect()),
313 )?;
314
315 self.analyze_execution_plan(&doc).await
316 }
317
318 /// Analyze execution plan for a parsed query
319 pub async fn analyze_execution_plan(
320 &self,
321 doc: &crate::parser::ast::Document,
322 ) -> Result<ExecutionPlan> {
323 let mut operations = Vec::new();
324 for op in &doc.operations {
325 operations.push(format!("{:?}", op));
326 }
327
328 Ok(ExecutionPlan {
329 operations,
330 estimated_cost: 1.0,
331 })
332 }
333 /// Remove stale lock files from a database directory
334 ///
335 /// If Aurora crashes or is forcefully terminated, it may leave behind lock files
336 /// that prevent the database from being reopened. This method safely removes
337 /// those lock files.
338 ///
339 /// # Safety
340 /// Only call this when you're certain no other Aurora instance is using the database.
341 /// Removing lock files while another process is running could cause data corruption.
342 ///
343 /// # Example
344 /// ```no_run
345 /// use aurora_db::Aurora;
346 ///
347 /// // If you get "Access denied" error when opening:
348 /// if let Err(e) = Aurora::open("my_db") {
349 /// eprintln!("Failed to open: {}", e);
350 /// // Try removing stale lock
351 /// if Aurora::remove_stale_lock("my_db").unwrap_or(false) {
352 /// println!("Removed stale lock, try opening again");
353 /// let db = Aurora::open("my_db")?;
354 /// }
355 /// }
356 /// # Ok::<(), aurora_db::error::AqlError>(())
357 /// ```
358 pub fn remove_stale_lock<P: AsRef<Path>>(path: P) -> Result<bool> {
359 let resolved_path = Self::resolve_path(path)?;
360 crate::storage::cold::ColdStore::try_remove_stale_lock(resolved_path.to_str().unwrap())
361 }
362
363 /// Open or create a database at the specified location
364 ///
365 /// # Arguments
366 /// * `path` - Path to the database file or directory
367 /// - Absolute paths (like `/data/myapp.db`) are used as-is
368 /// - Relative paths (like `./data/myapp.db`) are resolved relative to the current directory
369 /// - Simple names (like `myapp.db`) use the current directory
370 ///
371 /// # Returns
372 /// An initialized `Aurora` database instance
373 ///
374 /// # Examples
375 ///
376
377 /// use aurora_db::Aurora;
378 ///
379 /// let db = Aurora::open("./data/my_application.db")?;
380 ///
381 /// // Or use a relative path
382 /// let db = Aurora::open("customer_data.db")?;
383 /// ```
384 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
385 let config = AuroraConfig {
386 db_path: Self::resolve_path(path)?,
387 ..Default::default()
388 };
389 Self::with_config(config)
390 }
391
392 /// Helper method to resolve database path
393 fn resolve_path<P: AsRef<Path>>(path: P) -> Result<PathBuf> {
394 let path = path.as_ref();
395
396 // If it's an absolute path, use it directly
397 if path.is_absolute() {
398 return Ok(path.to_path_buf());
399 }
400
401 // Otherwise, resolve relative to current directory
402 match std::env::current_dir() {
403 Ok(current_dir) => Ok(current_dir.join(path)),
404 Err(e) => Err(AqlError::new(
405 ErrorCode::IoError,
406 format!("Failed to resolve current directory: {}", e),
407 )),
408 }
409 }
410
411 /// Open a database with custom configuration
412 ///
413 /// # Arguments
414 /// * `config` - Database configuration settings
415 ///
416 /// # Examples
417 ///
418 /// ```
419 /// use aurora_db::{Aurora, types::AuroraConfig};
420 /// use std::time::Duration;
421 ///
422 /// let config = AuroraConfig {
423 /// db_path: "my_data.db".into(),
424 /// hot_cache_size_mb: 512, // 512 MB cache
425 /// enable_write_buffering: true, // Batch writes for speed
426 /// enable_wal: true, // Durability
427 /// auto_compact: true, // Background compaction
428 /// compact_interval_mins: 60, // Compact every hour
429 /// ..Default::default()
430 /// };
431 ///
432 /// let db = Aurora::with_config(config)?;
433 /// ```
434 pub fn with_config(config: AuroraConfig) -> Result<Self> {
435 let path = Self::resolve_path(&config.db_path)?;
436
437 if config.create_dirs
438 && let Some(parent) = path.parent()
439 && !parent.exists()
440 {
441 std::fs::create_dir_all(parent)?;
442 }
443
444 let cold = Arc::new(ColdStore::with_config(
445 path.to_str().unwrap(),
446 config.cold_cache_capacity_mb,
447 config.cold_flush_interval_ms,
448 config.cold_mode,
449 )?);
450
451 let hot = HotStore::with_config_and_eviction(
452 config.hot_cache_size_mb,
453 config.hot_cache_cleanup_interval_secs,
454 config.eviction_policy,
455 );
456
457 let write_buffer = if config.enable_write_buffering {
458 Some(WriteBuffer::new(
459 Arc::clone(&cold),
460 config.write_buffer_size,
461 config.write_buffer_flush_interval_ms,
462 ))
463 } else {
464 None
465 };
466
467 let auto_compact = config.auto_compact;
468 let enable_wal = config.enable_wal;
469 let pubsub = crate::pubsub::PubSubSystem::new(10000);
470
471 // Initialize WAL
472 let (wal, wal_entries) = if enable_wal {
473 let wal_path = path.to_str().unwrap();
474 match WriteAheadLog::new(wal_path) {
475 Ok(mut wal_log) => {
476 let entries = wal_log.recover().unwrap_or_else(|_| Vec::new());
477 (Some(Arc::new(RwLock::new(wal_log))), entries)
478 }
479 Err(_) => (None, Vec::new()),
480 }
481 } else {
482 (None, Vec::new())
483 };
484
485 // --- FIX: Initialize Background WAL Writer ---
486 let wal_writer = if enable_wal {
487 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
488 let wal_clone = wal.clone();
489
490 // Spawn a single dedicated thread/task for writing WAL sequentially
491 tokio::spawn(async move {
492 if let Some(wal) = wal_clone {
493 while let Some(op) = rx.recv().await {
494 // Properly handle the RwLock write guard
495 match wal.write() {
496 Ok(mut guard) => {
497 let result = match op {
498 WalOperation::Put { key, value } => {
499 // Deref Arc to pass slices
500 guard.append(Operation::Put, &key, Some(value.as_ref()))
501 }
502 WalOperation::Delete { key } => {
503 guard.append(Operation::Delete, &key, None)
504 }
505 };
506 if let Err(e) = result {
507 eprintln!("CRITICAL: Failed to append to WAL: {}. Shutting down.", e);
508 // A failed WAL write is a catastrophic failure.
509 std::process::exit(1);
510 }
511 }
512 Err(e) => {
513 // This likely means the lock is poisoned, which is a critical state.
514 eprintln!("CRITICAL: Failed to acquire WAL lock: {}. Shutting down.", e);
515 std::process::exit(1);
516 }
517 }
518 }
519 }
520 });
521 Some(tx)
522 } else {
523 None
524 };
525
526 let checkpoint_shutdown = if wal.is_some() {
527 let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel();
528 let cold_clone = Arc::clone(&cold);
529 let wal_clone = wal.clone();
530 let checkpoint_interval = config.checkpoint_interval_ms;
531
532 tokio::spawn(async move {
533 let mut interval =
534 tokio::time::interval(Duration::from_millis(checkpoint_interval));
535 loop {
536 tokio::select! {
537 _ = interval.tick() => {
538 if let Err(e) = cold_clone.flush() {
539 eprintln!("Background checkpoint flush error: {}", e);
540 }
541 if let Some(ref wal) = wal_clone
542 && let Ok(mut wal_guard) = wal.write() {
543 let _ = wal_guard.truncate();
544 }
545 }
546 _ = shutdown_rx.recv() => {
547 let _ = cold_clone.flush();
548 if let Some(ref wal) = wal_clone
549 && let Ok(mut wal_guard) = wal.write() {
550 let _ = wal_guard.truncate();
551 }
552 break;
553 }
554 }
555 }
556 });
557
558 Some(shutdown_tx)
559 } else {
560 None
561 };
562
563 let compaction_shutdown = if auto_compact {
564 let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel();
565 let cold_clone = Arc::clone(&cold);
566 let compact_interval = config.compact_interval_mins;
567
568 tokio::spawn(async move {
569 let mut interval =
570 tokio::time::interval(Duration::from_secs(compact_interval * 60));
571 loop {
572 tokio::select! {
573 _ = interval.tick() => {
574 if let Err(e) = cold_clone.compact() {
575 eprintln!("Background compaction error: {}", e);
576 }
577 }
578 _ = shutdown_rx.recv() => {
579 let _ = cold_clone.compact();
580 break;
581 }
582 }
583 }
584 });
585
586 Some(shutdown_tx)
587 } else {
588 None
589 };
590
591 let workers_path = path.join("workers.db");
592 let worker_config = crate::workers::WorkerConfig {
593 storage_path: workers_path.to_string_lossy().into_owned(),
594 ..Default::default()
595 };
596 let workers = crate::workers::WorkerSystem::new(worker_config)
597 .map(Arc::new)
598 .ok();
599
600 let db = Self {
601 hot,
602 cold,
603 primary_indices: Arc::new(DashMap::new()),
604 secondary_indices: Arc::new(DashMap::new()),
605 indices_initialized: Arc::new(OnceCell::new()),
606 transaction_manager: crate::transaction::TransactionManager::new(),
607 indices: Arc::new(DashMap::new()),
608 schema_cache: Arc::new(DashMap::new()),
609 config,
610 write_buffer,
611 pubsub,
612 wal,
613 checkpoint_shutdown,
614 compaction_shutdown,
615 workers,
616 wal_writer, // Initialize the new channel sender
617 computed: Arc::new(RwLock::new(crate::computed::ComputedFields::new())),
618 };
619
620 if !wal_entries.is_empty() {
621 let handle = tokio::runtime::Handle::try_current()
622 .expect("Tokio runtime must be started to open database");
623 handle.block_on(db.replay_wal(wal_entries))?;
624 }
625
626 Ok(db)
627 }
628 // Lazy index initialization
629 pub async fn ensure_indices_initialized(&self) -> Result<()> {
630 self.indices_initialized
631 .get_or_init(|| async {
632 println!("Initializing indices...");
633 if let Err(e) = self.initialize_indices() {
634 eprintln!("Failed to initialize indices: {:?}", e);
635 }
636 println!("Indices initialized");
637 })
638 .await;
639 Ok(())
640 }
641
642 fn initialize_indices(&self) -> Result<()> {
643 for result in self.cold.scan() {
644 let (key, value) = result?;
645 let key_str = std::str::from_utf8(key.as_bytes())
646 .map_err(|_| AqlError::new(ErrorCode::InvalidKey, "Invalid UTF-8".to_string()))?;
647
648 if let Some(collection_name) = key_str.split(':').next() {
649 // Skip system/metadata prefixes (e.g., _collection, _sys_migration, _index)
650 if collection_name.starts_with('_') {
651 continue;
652 }
653 self.index_value(collection_name, key_str, &value)?;
654 }
655 }
656 Ok(())
657 }
658
659 // Fast key-value operations with index support
660 /// Get a value by key (low-level key-value access)
661 ///
662 /// This is the low-level method. For document access, use `get_document()` instead.
663 /// Checks hot cache first, then falls back to cold storage for maximum performance.
664 ///
665 /// # Performance
666 /// - Hot cache hit: ~1M reads/sec (instant)
667 /// - Cold storage: ~500K reads/sec (disk I/O)
668 /// - Cache hit rate: typically 95%+ at scale
669 ///
670 /// # Examples
671 ///
672 /// ```
673 /// // Low-level key-value access
674 /// let data = db.get("users:12345")?;
675 /// if let Some(bytes) = data {
676 /// let doc: Document = serde_json::from_slice(&bytes)?;
677 /// println!("Found: {:?}", doc);
678 /// }
679 ///
680 /// // Better: use get_document() for documents
681 /// let user = db.get_document("users", "12345")?;
682 /// ```
683 pub fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
684 // Check hot cache first
685 if let Some(value) = self.hot.get(key) {
686 // Check if this is a blob reference (pointer to cold storage)
687 if value.starts_with(b"BLOBREF:") {
688 // It's a blob ref - fetch actual data from cold storage
689 return self.cold.get(key);
690 }
691 return Ok(Some(value));
692 }
693
694 // Fetch from cold storage
695 let value = self.cold.get(key)?;
696
697 if let Some(v) = &value {
698 if self.should_cache_key(key) {
699 self.hot
700 .set(Arc::new(key.to_string()), Arc::new(v.clone()), None);
701 }
702 }
703
704 Ok(value)
705 }
706
707 /// Get value with zero-copy Arc reference (10-100x faster than get!)
708 /// Only checks hot cache - returns None if not cached
709 pub fn get_hot_ref(&self, key: &str) -> Option<Arc<Vec<u8>>> {
710 self.hot.get_ref(key)
711 }
712
713 /// Helper to decide if a key should be cached.
714 fn should_cache_key(&self, key: &str) -> bool {
715 if let Some(collection_name) = key.split(':').next() {
716 if !collection_name.starts_with('_') {
717 if let Ok(collection_def) = self.get_collection_definition(collection_name) {
718 if collection_def
719 .fields
720 .values()
721 .any(|def| def.field_type == FieldType::Any)
722 {
723 return false; // Don't cache if collection has Any field
724 }
725 }
726 }
727 }
728 true // Cache by default
729 }
730
731 /// Get cache statistics
732 ///
733 /// Returns detailed metrics about cache performance including hit/miss rates,
734 /// memory usage, and access patterns. Useful for monitoring, optimization,
735 /// and understanding database performance characteristics.
736 ///
737 /// # Returns
738 /// `CacheStats` struct containing:
739 /// - `hits`: Number of cache hits (data found in memory)
740 /// - `misses`: Number of cache misses (had to read from disk)
741 /// - `hit_rate`: Percentage of requests served from cache (0.0-1.0)
742 /// - `size`: Current number of entries in cache
743 /// - `capacity`: Maximum cache capacity
744 /// - `evictions`: Number of entries evicted due to capacity
745 ///
746 /// # Examples
747 ///
748
749 /// use aurora_db::Aurora;
750 ///
751 /// let db = Aurora::open("mydb.db")?;
752
753 /// let stats = db.get_cache_stats();
754 /// println!("Cache hit rate: {:.1}%", stats.hit_rate * 100.0);
755 /// println!("Cache size: {} / {} entries", stats.size, stats.capacity);
756 /// println!("Total hits: {}, misses: {}", stats.hits, stats.misses);
757 ///
758 /// // Monitor performance during operations
759 /// let before = db.get_cache_stats();
760 ///
761 /// // Perform many reads
762 /// for i in 0..1000 {
763 /// db.get_document("users", &format!("user-{}", i))?;
764 /// }
765 ///
766 /// let after = db.get_cache_stats();
767 /// let hit_rate = (after.hits - before.hits) as f64 / 1000.0;
768 /// println!("Read hit rate: {:.1}%", hit_rate * 100.0);
769 ///
770 /// // Performance tuning
771 /// let stats = db.get_cache_stats();
772 /// if stats.hit_rate < 0.80 {
773 /// println!("Low cache hit rate! Consider:");
774 /// println!("- Increasing cache size in config");
775 /// println!("- Prewarming cache with prewarm_cache()");
776 /// println!("- Reviewing query patterns");
777 /// }
778 ///
779 /// if stats.evictions > stats.size {
780 /// println!("High eviction rate! Cache may be too small.");
781 /// println!("Consider increasing cache capacity.");
782 /// }
783 ///
784 /// // Production monitoring
785 /// use std::time::Duration;
786 /// use std::thread;
787 ///
788 /// loop {
789 /// let stats = db.get_cache_stats();
790 ///
791 /// // Log to monitoring system
792 /// if stats.hit_rate < 0.90 {
793 /// eprintln!("Warning: Cache hit rate dropped to {:.1}%",
794 /// stats.hit_rate * 100.0);
795 /// }
796 ///
797 /// thread::sleep(Duration::from_secs(60));
798 /// }
799 /// ```
800 ///
801 /// # Typical Performance Metrics
802 /// - **Excellent**: 95%+ hit rate (most reads from memory)
803 /// - **Good**: 80-95% hit rate (acceptable performance)
804 /// - **Poor**: <80% hit rate (consider cache tuning)
805 ///
806 /// # See Also
807 /// - `prewarm_cache()` to improve hit rates by preloading data
808 /// - `Aurora::with_config()` to adjust cache capacity
809 pub fn get_cache_stats(&self) -> crate::storage::hot::CacheStats {
810 self.hot.get_stats()
811 }
812
813 pub fn has_index(&self, collection: &str, field: &str) -> bool {
814 if let Ok(collection_def) = self.get_collection_definition(collection) {
815 if let Some(field_def) = collection_def.fields.get(field) {
816 return field_def.indexed || field_def.unique;
817 }
818 }
819 false
820 }
821
822 pub fn get_ids_from_index(&self, collection: &str, field: &str, value: &Value) -> Vec<String> {
823 let index_key = format!("{}:{}", collection, field);
824 if let Some(index) = self.secondary_indices.get(&index_key) {
825 let value_str = match value {
826 Value::String(s) => s.clone(),
827 _ => value.to_string(),
828 };
829 if let Some(doc_ids) = index.get(&value_str) {
830 // The doc_ids in the secondary index are full keys like "collection:id".
831 // The query logic expects just the "id" part.
832 return doc_ids
833 .value()
834 .iter()
835 .map(|key| key.split(':').nth(1).unwrap_or(key).to_string())
836 .collect();
837 }
838 }
839 Vec::new()
840 }
841
842 /// Register a computed field definition
843 pub async fn register_computed_field(
844 &self,
845 collection: &str,
846 field: &str,
847 expression: crate::computed::ComputedExpression,
848 ) -> Result<()> {
849 let mut computed = self.computed.write().unwrap();
850 computed.register(collection, field, expression);
851 Ok(())
852 }
853
854 // ============================================
855 // PubSub API - Real-time Change Notifications
856 // ============================================
857
858 /// Listen for real-time changes in a collection
859 ///
860 /// Returns a stream of change events (inserts, updates, deletes) that you can subscribe to.
861 /// Perfect for building reactive UIs, cache invalidation, audit logging, webhooks, and
862 /// data synchronization systems.
863 ///
864 /// # Performance
865 /// - Zero overhead when no listeners are active
866 /// - Events are broadcast to all listeners asynchronously
867 /// - Non-blocking - doesn't slow down write operations
868 /// - Multiple listeners can watch the same collection
869 ///
870 /// # Examples
871 ///
872
873 /// use aurora_db::{Aurora, types::Value};
874 ///
875 /// let db = Aurora::open("mydb.db")?;
876 ///
877 /// // Basic listener
878 /// let mut listener = db.listen("users");
879 ///
880 /// tokio::spawn(async move {
881 /// while let Ok(event) = listener.recv().await {
882 /// match event.change_type {
883 /// ChangeType::Insert => println!("New user: {:?}", event.document),
884 /// ChangeType::Update => println!("Updated user: {:?}", event.document),
885 /// ChangeType::Delete => println!("Deleted user ID: {}", event.id),
886 /// }
887 /// }
888 /// });
889 ///
890 /// // Now any insert/update/delete will trigger the listener
891 /// db.insert_into("users", vec![("name", Value::String("Alice".into()))]).await?;
892 /// ```
893 ///
894 /// # Real-World Use Cases
895 ///
896 /// **Cache Invalidation:**
897
898 /// use std::sync::Arc;
899 /// use tokio::sync::RwLock;
900 /// use std::collections::HashMap;
901 ///
902 /// let cache = Arc::new(RwLock::new(HashMap::new()));
903 /// let cache_clone = Arc::clone(&cache);
904 ///
905 /// let mut listener = db.listen("products");
906 ///
907 /// tokio::spawn(async move {
908 /// while let Ok(event) = listener.recv().await {
909 /// // Invalidate cache entry when product changes
910 /// cache_clone.write().await.remove(&event.id);
911 /// println!("Cache invalidated for product: {}", event.id);
912 /// }
913 /// });
914 /// ```
915 ///
916 /// **Webhook Notifications:**
917 /// ```
918 /// let mut listener = db.listen("orders");
919 ///
920 /// tokio::spawn(async move {
921 /// while let Ok(event) = listener.recv().await {
922 /// if event.change_type == ChangeType::Insert {
923 /// // Send webhook for new orders
924 /// send_webhook("https://api.example.com/webhooks/order", &event).await;
925 /// }
926 /// }
927 /// });
928 /// ```
929 ///
930 /// **Audit Logging:**
931 /// ```
932 /// let mut listener = db.listen("sensitive_data");
933 ///
934 /// tokio::spawn(async move {
935 /// while let Ok(event) = listener.recv().await {
936 /// // Log all changes to audit trail
937 /// db.insert_into("audit_log", vec![
938 /// ("collection", Value::String("sensitive_data".into())),
939 /// ("action", Value::String(format!("{:?}", event.change_type))),
940 /// ("document_id", Value::String(event.id.clone())),
941 /// ("timestamp", Value::String(chrono::Utc::now().to_rfc3339())),
942 /// ]).await?;
943 /// }
944 /// });
945 /// ```
946 ///
947 /// **Data Synchronization:**
948 /// ```
949 /// let mut listener = db.listen("users");
950 ///
951 /// tokio::spawn(async move {
952 /// while let Ok(event) = listener.recv().await {
953 /// // Sync changes to external system
954 /// match event.change_type {
955 /// ChangeType::Insert | ChangeType::Update => {
956 /// if let Some(doc) = event.document {
957 /// external_api.upsert_user(&doc).await?;
958 /// }
959 /// },
960 /// ChangeType::Delete => {
961 /// external_api.delete_user(&event.id).await?;
962 /// },
963 /// }
964 /// }
965 /// });
966 /// ```
967 ///
968 /// **Real-Time Notifications:**
969 /// ```
970 /// let mut listener = db.listen("messages");
971 ///
972 /// tokio::spawn(async move {
973 /// while let Ok(event) = listener.recv().await {
974 /// if event.change_type == ChangeType::Insert {
975 /// if let Some(msg) = event.document {
976 /// // Push notification to connected websockets
977 /// if let Some(recipient) = msg.data.get("recipient_id") {
978 /// websocket_manager.send_to_user(recipient, &msg).await;
979 /// }
980 /// }
981 /// }
982 /// }
983 /// });
984 /// ```
985 ///
986 /// **Filtered Listener:**
987 /// ```
988 /// use aurora_db::pubsub::EventFilter;
989 ///
990 /// // Only listen for inserts
991 /// let mut listener = db.listen("users")
992 /// .filter(EventFilter::ChangeType(ChangeType::Insert));
993 ///
994 /// // Only listen for documents with specific field value
995 /// let mut listener = db.listen("users")
996 /// .filter(EventFilter::FieldEquals("role".to_string(), Value::String("admin".into())));
997 /// ```
998 ///
999 /// # Important Notes
1000 /// - Listener stays active until dropped
1001 /// - Events are delivered in order
1002 /// - Each listener has its own event stream
1003 /// - Use filters to reduce unnecessary event processing
1004 /// - Listeners don't affect write performance
1005 ///
1006 /// # See Also
1007 /// - `listen_all()` to listen to all collections
1008 /// - `ChangeListener::filter()` to filter events
1009 /// - `query().watch()` for reactive queries with filtering
1010 pub fn listen(&self, collection: impl Into<String>) -> crate::pubsub::ChangeListener {
1011 self.pubsub.listen(collection)
1012 }
1013
1014 /// Listen for all changes across all collections
1015 ///
1016 /// Returns a stream of change events for every insert, update, and delete
1017 /// operation across the entire database. Useful for global audit logging,
1018 /// replication, and monitoring systems.
1019 ///
1020 /// # Performance
1021 /// - Same performance as single collection listener
1022 /// - Filter events by collection in your handler
1023 /// - Consider using `listen(collection)` if only watching specific collections
1024 ///
1025 /// # Examples
1026 ///
1027 /// ```
1028 /// use aurora_db::Aurora;
1029 ///
1030 /// let db = Aurora::open("mydb.db")?;
1031 ///
1032 /// // Listen to everything
1033 /// let mut listener = db.listen_all();
1034 ///
1035 /// tokio::spawn(async move {
1036 /// while let Ok(event) = listener.recv().await {
1037 /// println!("Change in {}: {:?}", event.collection, event.change_type);
1038 /// }
1039 /// });
1040 /// ```
1041 ///
1042 /// # Real-World Use Cases
1043 ///
1044 /// **Global Audit Trail:**
1045 /// ```
1046 /// let mut listener = db.listen_all();
1047 ///
1048 /// tokio::spawn(async move {
1049 /// while let Ok(event) = listener.recv().await {
1050 /// // Log every database change
1051 /// audit_logger.log(AuditEntry {
1052 /// timestamp: chrono::Utc::now(),
1053 /// collection: event.collection,
1054 /// action: event.change_type,
1055 /// document_id: event.id,
1056 /// user_id: get_current_user_id(),
1057 /// }).await;
1058 /// }
1059 /// });
1060 /// ```
1061 ///
1062 /// **Database Replication:**
1063 /// ```
1064 /// let mut listener = db.listen_all();
1065 ///
1066 /// tokio::spawn(async move {
1067 /// while let Ok(event) = listener.recv().await {
1068 /// // Replicate to secondary database
1069 /// replica_db.apply_change(event).await?;
1070 /// }
1071 /// });
1072 /// ```
1073 ///
1074 /// **Change Data Capture (CDC):**
1075 /// ```
1076 /// let mut listener = db.listen_all();
1077 ///
1078 /// tokio::spawn(async move {
1079 /// while let Ok(event) = listener.recv().await {
1080 /// // Stream changes to Kafka/RabbitMQ
1081 /// kafka_producer.send(
1082 /// &format!("cdc.{}", event.collection),
1083 /// serde_json::to_string(&event)?
1084 /// ).await?;
1085 /// }
1086 /// });
1087 /// ```
1088 ///
1089 /// **Monitoring & Metrics:**
1090 /// ```
1091 /// use std::sync::atomic::{AtomicUsize, Ordering};
1092 ///
1093 /// let write_counter = Arc::new(AtomicUsize::new(0));
1094 /// let counter_clone = Arc::clone(&write_counter);
1095 ///
1096 /// let mut listener = db.listen_all();
1097 ///
1098 /// tokio::spawn(async move {
1099 /// while let Ok(_event) = listener.recv().await {
1100 /// counter_clone.fetch_add(1, Ordering::Relaxed);
1101 /// }
1102 /// });
1103 ///
1104 /// // Report metrics every 60 seconds
1105 /// tokio::spawn(async move {
1106 /// loop {
1107 /// tokio::time::sleep(Duration::from_secs(60)).await;
1108 /// let count = write_counter.swap(0, Ordering::Relaxed);
1109 /// println!("Writes per minute: {}", count);
1110 /// }
1111 /// });
1112 /// ```
1113 ///
1114 /// **Selective Processing:**
1115 /// ```
1116 /// let mut listener = db.listen_all();
1117 ///
1118 /// tokio::spawn(async move {
1119 /// while let Ok(event) = listener.recv().await {
1120 /// // Handle different collections differently
1121 /// match event.collection.as_str() {
1122 /// "users" => handle_user_change(event).await,
1123 /// "orders" => handle_order_change(event).await,
1124 /// "payments" => handle_payment_change(event).await,
1125 /// _ => {} // Ignore others
1126 /// }
1127 /// }
1128 /// });
1129 /// ```
1130 ///
1131 /// # When to Use
1132 /// - Global audit logging
1133 /// - Database replication
1134 /// - Change data capture (CDC)
1135 /// - Monitoring and metrics
1136 /// - Event sourcing systems
1137 ///
1138 /// # When NOT to Use
1139 /// - Only need to watch 1-2 collections → Use `listen(collection)` instead
1140 /// - High write volume with selective interest → Use collection-specific listeners
1141 /// - Need complex filtering → Use `query().watch()` instead
1142 ///
1143 /// # See Also
1144 /// - `listen()` for single collection listening
1145 /// - `listener_count()` to check active listeners
1146 /// - `query().watch()` for filtered reactive queries
1147 pub fn listen_all(&self) -> crate::pubsub::ChangeListener {
1148 self.pubsub.listen_all()
1149 }
1150
1151 /// Get the number of active listeners for a collection
1152 pub fn listener_count(&self, collection: &str) -> usize {
1153 self.pubsub.listener_count(collection)
1154 }
1155
1156 /// Get total number of active listeners
1157 pub fn total_listeners(&self) -> usize {
1158 self.pubsub.total_listeners()
1159 }
1160
1161 /// Flushes all buffered writes to disk to ensure durability.
1162 ///
1163 /// This method forces all pending writes from:
1164 /// - Write buffer (if enabled)
1165 /// - Cold storage internal buffers
1166 /// - Write-ahead log (if enabled)
1167 ///
1168 /// Call this when you need to ensure data persistence before
1169 /// a critical operation or shutdown. After flush() completes,
1170 /// all data is guaranteed to be on disk even if power fails.
1171 ///
1172 /// # Performance
1173 /// - Flush time: ~10-50ms depending on buffered data
1174 /// - Triggers OS-level fsync() for durability guarantee
1175 /// - Truncates WAL after successful flush
1176 /// - Not needed for every write (WAL provides durability)
1177 ///
1178 /// # Examples
1179 ///
1180 /// ```
1181 /// use aurora_db::Aurora;
1182 ///
1183 /// let db = Aurora::open("mydb.db")?;
1184 ///
1185 /// // Basic flush after critical write
1186 /// db.insert_into("users", data).await?;
1187 /// db.flush()?; // Ensure data is persisted to disk
1188 ///
1189 /// // Graceful shutdown pattern
1190 /// fn shutdown(db: &Aurora) -> Result<()> {
1191 /// println!("Flushing pending writes...");
1192 /// db.flush()?;
1193 /// println!("Shutdown complete - all data persisted");
1194 /// Ok(())
1195 /// }
1196 ///
1197 /// // Periodic checkpoint pattern
1198 /// use std::time::Duration;
1199 /// use std::thread;
1200 ///
1201 /// let db = db.clone();
1202 /// thread::spawn(move || {
1203 /// loop {
1204 /// thread::sleep(Duration::from_secs(60));
1205 /// if let Err(e) = db.flush() {
1206 /// eprintln!("Flush error: {}", e);
1207 /// } else {
1208 /// println!("Checkpoint: data flushed to disk");
1209 /// }
1210 /// }
1211 /// });
1212 ///
1213 /// // Critical transaction pattern
1214 /// let tx_id = db.begin_transaction();
1215 ///
1216 /// // Multiple operations
1217 /// db.insert_into("orders", order_data).await?;
1218 /// db.update_document("inventory", product_id, updates).await?;
1219 /// db.insert_into("audit_log", audit_data).await?;
1220 ///
1221 /// // Commit and flush immediately
1222 /// db.commit_transaction(tx_id)?;
1223 /// db.flush()?; // Critical: ensure transaction is on disk
1224 ///
1225 /// // Backup preparation
1226 /// println!("Preparing backup...");
1227 /// db.flush()?; // Ensure all data is written
1228 /// std::fs::copy("mydb.db", "backup.db")?;
1229 /// println!("Backup complete");
1230 /// ```
1231 ///
1232 /// # When to Use
1233 /// - Before graceful shutdown
1234 /// - After critical transactions
1235 /// - Before creating backups
1236 /// - Periodic checkpoints (every 30-60 seconds)
1237 /// - Before risky operations
1238 ///
1239 /// # When NOT to Use
1240 /// - After every single write (too slow, WAL provides durability)
1241 /// - In high-throughput loops (batch instead)
1242 /// - When durability mode is already Immediate
1243 ///
1244 /// # Important Notes
1245 /// - WAL provides durability even without explicit flush()
1246 /// - flush() adds latency (~10-50ms) so use strategically
1247 /// - Automatic flush happens during graceful shutdown
1248 /// - After flush(), WAL is truncated (data is in main storage)
1249 ///
1250 /// # See Also
1251 /// - `Aurora::with_config()` to set durability mode
1252 /// - WAL (Write-Ahead Log) provides durability without explicit flushes
1253 pub fn flush(&self) -> Result<()> {
1254 // Flush write buffer if present
1255 if let Some(ref write_buffer) = self.write_buffer {
1256 write_buffer.flush()?;
1257 }
1258
1259 // Flush cold storage
1260 self.cold.flush()?;
1261
1262 // Truncate WAL after successful flush (data is now in cold storage)
1263 if let Some(ref wal) = self.wal
1264 && let Ok(mut wal_lock) = wal.write()
1265 {
1266 wal_lock.truncate()?;
1267 }
1268
1269 Ok(())
1270 }
1271
1272 /// Store a key-value pair (low-level storage)
1273 ///
1274 /// This is the low-level method. For documents, use `insert_into()` instead.
1275 /// Writes are buffered and batched for performance.
1276 ///
1277 /// # Arguments
1278 /// * `key` - Unique key (format: "collection:id" for documents)
1279 /// * `value` - Raw bytes to store
1280 /// * `ttl` - Optional time-to-live (None = permanent)
1281 ///
1282 /// # Performance
1283 /// - Buffered writes: ~15-30K docs/sec
1284 /// - Batching improves throughput significantly
1285 /// - Call `flush()` to ensure data is persisted
1286 ///
1287 /// # Examples
1288 ///
1289 /// ```
1290 /// use std::time::Duration;
1291 ///
1292 /// // Permanent storage
1293 /// let data = serde_json::to_vec(&my_struct)?;
1294 /// db.put("mykey".to_string(), data, None)?;
1295 ///
1296 /// // With TTL (expires after 1 hour)
1297 /// db.put("session:abc".to_string(), session_data, Some(Duration::from_secs(3600)))?;
1298 ///
1299 /// // Better: use insert_into() for documents
1300 /// db.insert_into("users", vec![("name", Value::String("Alice".into()))])?;
1301 /// ```
1302 pub async fn put(&self, key: String, value: Vec<u8>, ttl: Option<Duration>) -> Result<()> {
1303 const MAX_BLOB_SIZE: usize = 50 * 1024 * 1024;
1304
1305 if value.len() > MAX_BLOB_SIZE {
1306 return Err(AqlError::invalid_operation(format!(
1307 "Blob size {} exceeds maximum allowed size of {}MB",
1308 value.len() / (1024 * 1024),
1309 MAX_BLOB_SIZE / (1024 * 1024)
1310 )));
1311 }
1312
1313 // --- OPTIMIZATION: Wrap in Arc ONCE ---
1314 let key_arc = Arc::new(key);
1315 let value_arc = Arc::new(value);
1316
1317 // Check if this is a blob (blobs bypass write buffer and hot cache)
1318 let is_blob = value_arc.starts_with(b"BLOB:");
1319
1320 // --- 1. WAL Write (Non-blocking send of Arcs) ---
1321 if let Some(ref sender) = self.wal_writer
1322 && self.config.durability_mode != DurabilityMode::None
1323 {
1324 let _ = sender.send(WalOperation::Put {
1325 key: Arc::clone(&key_arc),
1326 value: Arc::clone(&value_arc),
1327 });
1328 }
1329
1330 // Check if this key should be cached (false for Any-field collections)
1331 let should_cache = self.should_cache_key(&key_arc);
1332
1333 // --- 2. Cold Store Write ---
1334 if is_blob || !should_cache {
1335 // Blobs and Any-field docs write directly to cold storage
1336 // (blobs: avoid memory pressure, Any-fields: ensure immediate queryability)
1337 self.cold.set(key_arc.to_string(), value_arc.to_vec())?;
1338 } else if let Some(ref write_buffer) = self.write_buffer {
1339 write_buffer.write(Arc::clone(&key_arc), Arc::clone(&value_arc))?;
1340 } else {
1341 self.cold.set(key_arc.to_string(), value_arc.to_vec())?;
1342 }
1343
1344 // --- 3. Hot Cache Write ---
1345 if should_cache {
1346 if is_blob {
1347 // For blobs: cache only a lightweight reference (not the actual data)
1348 // Format: "BLOBREF:<size>" - just 16-24 bytes instead of potentially MB
1349 let blob_ref = format!("BLOBREF:{}", value_arc.len());
1350 self.hot
1351 .set(Arc::clone(&key_arc), Arc::new(blob_ref.into_bytes()), ttl);
1352 } else {
1353 self.hot
1354 .set(Arc::clone(&key_arc), Arc::clone(&value_arc), ttl);
1355 }
1356 }
1357
1358 // --- 4. Indexing (skip for blobs and system keys) ---
1359 if !is_blob {
1360 if let Some(collection_name) = key_arc.split(':').next()
1361 && !collection_name.starts_with('_')
1362 {
1363 self.index_value(collection_name, &key_arc, &value_arc)?;
1364 }
1365 }
1366
1367 Ok(())
1368 }
1369 /// Replay WAL entries to recover from crash
1370 ///
1371 /// Handles transaction boundaries:
1372 /// - Operations within a committed transaction are applied
1373 /// - Operations within a rolled-back transaction are discarded
1374 /// - Operations within an uncommitted transaction (crash during tx) are discarded
1375 async fn replay_wal(&self, entries: Vec<crate::wal::LogEntry>) -> Result<()> {
1376 // Buffer for operations within a transaction
1377 let mut tx_buffer: Vec<crate::wal::LogEntry> = Vec::new();
1378 let mut in_transaction = false;
1379
1380 for entry in entries {
1381 match entry.operation {
1382 Operation::BeginTx => {
1383 // Start buffering operations
1384 in_transaction = true;
1385 tx_buffer.clear();
1386 }
1387 Operation::CommitTx => {
1388 // Apply all buffered operations
1389 for buffered_entry in tx_buffer.drain(..) {
1390 self.apply_wal_entry(buffered_entry).await?;
1391 }
1392 in_transaction = false;
1393 }
1394 Operation::RollbackTx => {
1395 // Discard buffered operations
1396 tx_buffer.clear();
1397 in_transaction = false;
1398 }
1399 Operation::Put | Operation::Delete => {
1400 if in_transaction {
1401 // Buffer the operation for later
1402 tx_buffer.push(entry);
1403 } else {
1404 // Apply immediately (not in transaction)
1405 self.apply_wal_entry(entry).await?;
1406 }
1407 }
1408 }
1409 }
1410
1411 // If we end with in_transaction = true, it means we crashed mid-transaction
1412 // Those operations in tx_buffer are discarded (not committed)
1413 if in_transaction {
1414 eprintln!(
1415 "WAL replay: Discarding {} uncommitted transaction operations",
1416 tx_buffer.len()
1417 );
1418 }
1419
1420 // Flush after replay and truncate WAL
1421 self.cold.flush()?;
1422 if let Some(ref wal) = self.wal {
1423 wal.write().unwrap().truncate()?;
1424 }
1425
1426 Ok(())
1427 }
1428
1429 /// Apply a single WAL entry to storage
1430 async fn apply_wal_entry(&self, entry: crate::wal::LogEntry) -> Result<()> {
1431 match entry.operation {
1432 Operation::Put => {
1433 if let Some(value) = entry.value {
1434 // Write directly to cold storage (skip WAL, already logged)
1435 self.cold.set(entry.key.clone(), value.clone())?;
1436
1437 // Update hot cache
1438 if self.should_cache_key(&entry.key) {
1439 self.hot
1440 .set(Arc::new(entry.key.clone()), Arc::new(value.clone()), None);
1441 }
1442
1443 // Rebuild indices
1444 if let Some(collection) = entry.key.split(':').next()
1445 && !collection.starts_with('_')
1446 {
1447 self.index_value(collection, &entry.key, &value)?;
1448 }
1449 }
1450 }
1451 Operation::Delete => {
1452 // Remove from indices before deleting the data
1453 if let Some(collection) = entry.key.split(':').next()
1454 && !collection.starts_with('_')
1455 {
1456 let _ = self.remove_from_index(collection, &entry.key);
1457 }
1458 self.cold.delete(&entry.key)?;
1459 self.hot.delete(&entry.key);
1460 }
1461 _ => {} // Transaction markers handled in replay_wal
1462 }
1463 Ok(())
1464 }
1465
1466 fn ensure_schema_hot(&self, collection: &str) -> Result<Arc<Collection>> {
1467 // 1. Check the high-performance object cache first (O(1))
1468 if let Some(schema) = self.schema_cache.get(collection) {
1469 return Ok(schema.value().clone());
1470 }
1471
1472 let collection_key = format!("_collection:{}", collection);
1473
1474 // 2. Fallback to Hot Byte Cache (parse if found)
1475 if let Some(data) = self.hot.get(&collection_key) {
1476 return serde_json::from_slice::<Collection>(&data)
1477 .map(|s| {
1478 let arc_s = Arc::new(s);
1479 // Populate object cache to avoid this parse next time
1480 self.schema_cache
1481 .insert(collection.to_string(), arc_s.clone());
1482 arc_s
1483 })
1484 .map_err(|_| {
1485 AqlError::new(
1486 ErrorCode::SchemaError,
1487 "Failed to parse schema from hot cache",
1488 )
1489 });
1490 }
1491
1492 // 3. Fallback to Cold Storage
1493 if let Some(data) = self.get(&collection_key)? {
1494 // Update Hot Cache
1495 self.hot.set(
1496 Arc::new(collection_key.clone()),
1497 Arc::new(data.clone()),
1498 None,
1499 );
1500
1501 // Parse and update Schema Cache
1502 let schema = serde_json::from_slice::<Collection>(&data)?;
1503 let arc_schema = Arc::new(schema);
1504 self.schema_cache
1505 .insert(collection.to_string(), arc_schema.clone());
1506
1507 return Ok(arc_schema);
1508 }
1509
1510 Err(AqlError::new(
1511 ErrorCode::SchemaError,
1512 format!("Failed to load schema for collection '{}'", collection),
1513 ))
1514 }
1515
1516 fn index_value(&self, collection: &str, key: &str, value: &[u8]) -> Result<()> {
1517 // Update primary index with metadata only
1518 let location = DiskLocation::new(value.len());
1519 self.primary_indices
1520 .entry(collection.to_string())
1521 .or_default()
1522 .insert(key.to_string(), location);
1523
1524 // Use the optimized helper (Fast path via schema_cache)
1525 let collection_obj = self.ensure_schema_hot(collection)?;
1526
1527 // Build list of fields that should be indexed
1528 let indexed_fields: Vec<String> = collection_obj
1529 .fields
1530 .iter()
1531 .filter(|(_, def)| def.indexed || def.unique)
1532 .map(|(name, _)| name.clone())
1533 .collect();
1534
1535 if indexed_fields.is_empty() {
1536 return Ok(());
1537 }
1538
1539 // Update secondary indices
1540 if let Ok(doc) = serde_json::from_slice::<Document>(value) {
1541 for (field, field_value) in doc.data {
1542 if !indexed_fields.contains(&field) {
1543 continue;
1544 }
1545
1546 let value_str = match &field_value {
1547 Value::String(s) => s.clone(),
1548 _ => field_value.to_string(),
1549 };
1550
1551 let index_key = format!("{}:{}", collection, field);
1552 let secondary_index = self.secondary_indices.entry(index_key).or_default();
1553
1554 let max_entries = self.config.max_index_entries_per_field;
1555
1556 secondary_index
1557 .entry(value_str)
1558 .and_modify(|doc_ids| {
1559 if doc_ids.len() < max_entries {
1560 doc_ids.push(key.to_string());
1561 }
1562 })
1563 .or_insert_with(|| vec![key.to_string()]);
1564 }
1565 }
1566 Ok(())
1567 }
1568
1569 /// Remove a document from all indices (called during delete operations)
1570 fn remove_from_index(&self, collection: &str, key: &str) -> Result<()> {
1571 // Remove from primary index
1572 if let Some(primary_index) = self.primary_indices.get_mut(collection) {
1573 primary_index.remove(key);
1574 }
1575
1576 // Get the document data to know which secondary index entries to remove
1577 // Try cold storage first since we may be replaying WAL
1578 let doc_data = match self.cold.get(key) {
1579 Ok(Some(data)) => Some(data),
1580 _ => self.hot.get(key),
1581 };
1582
1583 // If we have the document data, remove from secondary indices
1584 if let Some(data) = doc_data {
1585 if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
1586 for (field, field_value) in &doc.data {
1587 let value_str = match field_value {
1588 Value::String(s) => s.clone(),
1589 _ => field_value.to_string(),
1590 };
1591
1592 let index_key = format!("{}:{}", collection, field);
1593 if let Some(secondary_index) = self.secondary_indices.get_mut(&index_key) {
1594 if let Some(mut doc_ids) = secondary_index.get_mut(&value_str) {
1595 doc_ids.retain(|id| id != key);
1596 // If empty, we could remove the entry but it's not strictly necessary
1597 }
1598 }
1599 }
1600 }
1601 }
1602
1603 Ok(())
1604 }
1605
1606 /// Scan collection with filter and early termination support
1607 /// Used by QueryBuilder for optimized queries with LIMIT
1608 pub fn scan_and_filter<F>(
1609 &self,
1610 collection: &str,
1611 filter: F,
1612 limit: Option<usize>,
1613 ) -> Result<Vec<Document>>
1614 where
1615 F: Fn(&Document) -> bool,
1616 {
1617 let mut documents = Vec::new();
1618
1619 if let Some(index) = self.primary_indices.get(collection) {
1620 for entry in index.iter() {
1621 // Early termination
1622 if let Some(max) = limit {
1623 if documents.len() >= max {
1624 break;
1625 }
1626 }
1627
1628 let key = entry.key();
1629 if let Some(data) = self.get(key)? {
1630 if let Ok(mut doc) = serde_json::from_slice::<Document>(&data) {
1631 // Apply computed fields
1632 if let Ok(computed) = self.computed.read() {
1633 let _ = computed.apply(collection, &mut doc);
1634 }
1635
1636 if filter(&doc) {
1637 documents.push(doc);
1638 }
1639 }
1640 }
1641 }
1642 } else {
1643 // Fallback
1644 documents = self.scan_collection(collection)?;
1645 documents.retain(|doc| filter(doc));
1646 if let Some(max) = limit {
1647 documents.truncate(max);
1648 }
1649 }
1650
1651 Ok(documents)
1652 }
1653
1654 /// Smart collection scan that uses the primary index as a key directory
1655 /// Avoids forced flushes and leverages hot cache for better performance
1656 fn scan_collection(&self, collection: &str) -> Result<Vec<Document>> {
1657 let mut documents = Vec::new();
1658
1659 // Use the primary index as a "key directory" - it contains all document keys
1660 if let Some(index) = self.primary_indices.get(collection) {
1661 // Iterate through all keys in the primary index (fast, in-memory)
1662 for entry in index.iter() {
1663 let key = entry.key();
1664
1665 // Fetch document via hot cache -> cold storage fallback
1666 if let Some(data) = self.get(key)? {
1667 if let Ok(mut doc) = serde_json::from_slice::<Document>(&data) {
1668 // Apply computed fields
1669 if let Ok(computed) = self.computed.read() {
1670 let _ = computed.apply(collection, &mut doc);
1671 }
1672 documents.push(doc);
1673 }
1674 }
1675 }
1676 } else {
1677 // Fallback: scan from cold storage if primary index not yet initialized
1678 let prefix = format!("{}:", collection);
1679 for result in self.cold.scan_prefix(&prefix) {
1680 if let Ok((_key, value)) = result {
1681 if let Ok(mut doc) = serde_json::from_slice::<Document>(&value) {
1682 // Apply computed fields
1683 if let Ok(computed) = self.computed.read() {
1684 let _ = computed.apply(collection, &mut doc);
1685 }
1686 documents.push(doc);
1687 }
1688 }
1689 }
1690 }
1691
1692 Ok(documents)
1693 }
1694
1695 // Restore missing methods
1696 pub async fn put_blob(&self, key: String, file_path: &Path) -> Result<()> {
1697 const MAX_FILE_SIZE: usize = 50 * 1024 * 1024; // 50MB limit
1698
1699 // Get file metadata to check size before reading
1700 let metadata = tokio::fs::metadata(file_path).await?;
1701 let file_size = metadata.len() as usize;
1702
1703 if file_size > MAX_FILE_SIZE {
1704 return Err(AqlError::invalid_operation(format!(
1705 "File size {} MB exceeds maximum allowed size of {} MB",
1706 file_size / (1024 * 1024),
1707 MAX_FILE_SIZE / (1024 * 1024)
1708 )));
1709 }
1710
1711 let mut file = File::open(file_path).await?;
1712 let mut buffer = Vec::new();
1713 file.read_to_end(&mut buffer).await?;
1714
1715 // Add BLOB: prefix to mark this as blob data
1716 let mut blob_data = Vec::with_capacity(5 + buffer.len());
1717 blob_data.extend_from_slice(b"BLOB:");
1718 blob_data.extend_from_slice(&buffer);
1719
1720 self.put(key, blob_data, None).await
1721 }
1722
1723 /// Create a new collection with schema definition
1724 ///
1725 /// Collections are like tables in SQL - they define the structure of your documents.
1726 /// The third boolean parameter indicates if the field should be indexed for fast lookups.
1727 ///
1728 /// # Arguments
1729 /// * `name` - Collection name
1730 /// * `fields` - Vector of (field_name, field_type, indexed) tuples
1731 /// - Field name (accepts both &str and String)
1732 /// - Field type (String, Int, Float, Bool, etc.)
1733 /// - Indexed: true for fast lookups, false for no index
1734 ///
1735 /// # Performance
1736 /// - Indexed fields: Fast equality queries (O(1) lookup)
1737 /// - Non-indexed fields: Full scan required for queries
1738 /// - Unique fields are automatically indexed
1739 ///
1740 /// # Examples
1741 ///
1742 /// ```
1743 /// use aurora_db::{Aurora, types::FieldType};
1744 ///
1745 /// let db = Aurora::open("mydb.db")?;
1746 ///
1747 /// // Create a users collection
1748 /// db.new_collection("users", vec![
1749 /// ("name", FieldType::String, false), // Not indexed
1750 /// ("email", FieldType::String, true), // Indexed - fast lookups
1751 /// ("age", FieldType::Int, false),
1752 /// ("active", FieldType::Bool, true), // Indexed
1753 /// ("score", FieldType::Float, false),
1754 /// ])?;
1755 ///
1756 /// // Idempotent - calling again is safe
1757 /// db.new_collection("users", vec![/* ... */])?.await; // OK!
1758 /// ```
1759 pub async fn new_collection<S: Into<String>>(
1760 &self,
1761 name: &str,
1762 fields: Vec<(S, FieldType, bool)>,
1763 ) -> Result<()> {
1764 let collection_key = format!("_collection:{}", name);
1765
1766 // Check if collection already exists - if so, just return Ok (idempotent)
1767 if self.get(&collection_key)?.is_some() {
1768 return Ok(());
1769 }
1770
1771 // Create field definitions
1772 let mut field_definitions = HashMap::new();
1773 for (field_name, field_type, unique) in fields {
1774 if field_type == FieldType::Any && unique {
1775 return Err(AqlError::new(
1776 ErrorCode::InvalidDefinition,
1777 "Fields of type 'Any' cannot be unique or indexed.".to_string(),
1778 ));
1779 }
1780 field_definitions.insert(
1781 field_name.into(),
1782 FieldDefinition {
1783 field_type,
1784 unique,
1785 indexed: unique, // Auto-index unique fields
1786 },
1787 );
1788 }
1789
1790 let collection = Collection {
1791 name: name.to_string(),
1792 fields: field_definitions,
1793 // REMOVED: unique_fields is now derived from fields
1794 };
1795
1796 let collection_data = serde_json::to_vec(&collection)?;
1797 self.put(collection_key, collection_data, None).await?;
1798
1799 // Invalidate schema cache since we just created/updated the collection schema
1800 self.schema_cache.remove(name);
1801
1802 Ok(())
1803 }
1804
1805 /// Insert a document into a collection
1806 ///
1807 /// Automatically generates a UUID for the document and validates against
1808 /// collection schema and unique constraints. Returns the generated document ID.
1809 ///
1810 /// # Performance
1811 /// - Single insert: ~15,000 docs/sec
1812 /// - Bulk insert: Use `batch_insert()` for 10+ documents (~50,000 docs/sec)
1813 /// - Triggers PubSub events for real-time listeners
1814 ///
1815 /// # Arguments
1816 /// * `collection` - Name of the collection to insert into
1817 /// * `data` - Document fields and values to insert
1818 ///
1819 /// # Returns
1820 /// The auto-generated ID of the inserted document or an error
1821 ///
1822 /// # Errors
1823 /// - `CollectionNotFound`: Collection doesn't exist
1824 /// - `ValidationError`: Data violates schema or unique constraints
1825 /// - `SerializationError`: Invalid data format
1826 ///
1827 /// # Examples
1828 ///
1829
1830 /// use aurora_db::{Aurora, types::Value};
1831 ///
1832 /// let db = Aurora::open("mydb.db")?;
1833 ///
1834 /// // Basic insertion
1835 /// let user_id = db.insert_into("users", vec![
1836 /// ("name", Value::String("Alice Smith".to_string())),
1837 /// ("email", Value::String("alice@example.com".to_string())),
1838 /// ("age", Value::Int(28)),
1839 /// ("active", Value::Bool(true)),
1840 /// ]).await?;
1841 ///
1842 /// println!("Created user with ID: {}", user_id);
1843 ///
1844 /// // Inserting with nested data
1845 /// let order_id = db.insert_into("orders", vec![
1846 /// ("user_id", Value::String(user_id.clone())),
1847 /// ("total", Value::Float(99.99)),
1848 /// ("status", Value::String("pending".to_string())),
1849 /// ("items", Value::Array(vec![
1850 /// Value::String("item-123".to_string()),
1851 /// Value::String("item-456".to_string()),
1852 /// ])),
1853 /// ]).await?;
1854 ///
1855 /// // Error handling - unique constraint violation
1856 /// match db.insert_into("users", vec![
1857 /// ("email", Value::String("alice@example.com".to_string())), // Duplicate!
1858 /// ("name", Value::String("Alice Clone".to_string())),
1859 /// ]).await {
1860 /// Ok(id) => println!("Inserted: {}", id),
1861 /// Err(e) => println!("Failed: {} (email already exists)", e),
1862 /// }
1863 ///
1864 /// // For bulk inserts (10+ documents), use batch_insert() instead
1865 /// let users = vec![
1866 /// HashMap::from([
1867 /// ("name".to_string(), Value::String("Bob".to_string())),
1868 /// ("email".to_string(), Value::String("bob@example.com".to_string())),
1869 /// ]),
1870 /// HashMap::from([
1871 /// ("name".to_string(), Value::String("Carol".to_string())),
1872 /// ("email".to_string(), Value::String("carol@example.com".to_string())),
1873 /// ]),
1874 /// // ... more documents
1875 /// ];
1876 /// let ids = db.batch_insert("users", users).await?; // 3x faster!
1877 /// println!("Inserted {} users", ids.len());
1878 /// ```
1879 pub async fn insert_into(&self, collection: &str, data: Vec<(&str, Value)>) -> Result<String> {
1880 // Convert Vec<(&str, Value)> to HashMap<String, Value>
1881 let data_map: HashMap<String, Value> =
1882 data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
1883
1884 // Validate unique constraints before inserting
1885 self.validate_unique_constraints(collection, &data_map)
1886 .await?;
1887
1888 let doc_id = Uuid::now_v7().to_string();
1889 let document = Document {
1890 id: doc_id.clone(),
1891 data: data_map,
1892 };
1893
1894 self.put(
1895 format!("{}:{}", collection, doc_id),
1896 serde_json::to_vec(&document)?,
1897 None,
1898 )
1899 .await?;
1900
1901 // Publish insert event
1902 let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
1903 let _ = self.pubsub.publish(event);
1904
1905 Ok(doc_id)
1906 }
1907
1908 pub async fn insert_map(
1909 &self,
1910 collection: &str,
1911 data: HashMap<String, Value>,
1912 ) -> Result<String> {
1913 // Validate unique constraints before inserting
1914 self.validate_unique_constraints(collection, &data).await?;
1915
1916 let doc_id = Uuid::now_v7().to_string();
1917 let document = Document {
1918 id: doc_id.clone(),
1919 data,
1920 };
1921
1922 self.put(
1923 format!("{}:{}", collection, doc_id),
1924 serde_json::to_vec(&document)?,
1925 None,
1926 )
1927 .await?;
1928
1929 // Publish insert event
1930 let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
1931 let _ = self.pubsub.publish(event);
1932
1933 Ok(doc_id)
1934 }
1935
1936 /// Batch insert multiple documents with optimized write path
1937 ///
1938 /// Inserts multiple documents in a single optimized operation, bypassing
1939 /// the write buffer for better performance. Ideal for bulk data loading,
1940 /// migrations, or initial database seeding. 3x faster than individual inserts.
1941 ///
1942 /// # Performance
1943 /// - Insert speed: ~50,000 docs/sec (vs ~15,000 for single inserts)
1944 /// - Batch writes to WAL and storage
1945 /// - Validates all unique constraints
1946 /// - Use for 10+ documents minimum
1947 ///
1948 /// # Arguments
1949 /// * `collection` - Name of the collection to insert into
1950 /// * `documents` - Vector of document data as HashMaps
1951 ///
1952 /// # Returns
1953 /// Vector of auto-generated document IDs or an error
1954 ///
1955 /// # Examples
1956 ///
1957
1958 /// use aurora_db::{Aurora, types::Value};
1959 /// use std::collections::HashMap;
1960 ///
1961 /// let db = Aurora::open("mydb.db")?;
1962 ///
1963 /// // Bulk user import
1964 /// let users = vec![
1965 /// HashMap::from([
1966 /// ("name".to_string(), Value::String("Alice".into())),
1967 /// ("email".to_string(), Value::String("alice@example.com".into())),
1968 /// ("age".to_string(), Value::Int(28)),
1969 /// ]),
1970 /// HashMap::from([
1971 /// ("name".to_string(), Value::String("Bob".into())),
1972 /// ("email".to_string(), Value::String("bob@example.com".into())),
1973 /// ("age".to_string(), Value::Int(32)),
1974 /// ]),
1975 /// HashMap::from([
1976 /// ("name".to_string(), Value::String("Carol".into())),
1977 /// ("email".to_string(), Value::String("carol@example.com".into())),
1978 /// ("age".to_string(), Value::Int(25)),
1979 /// ]),
1980 /// ];
1981 ///
1982 /// let ids = db.batch_insert("users", users).await?;
1983 /// println!("Inserted {} users", ids.len());
1984 ///
1985 /// // Seeding test data
1986 /// let test_products: Vec<HashMap<String, Value>> = (0..1000)
1987 /// .map(|i| HashMap::from([
1988 /// ("sku".to_string(), Value::String(format!("PROD-{:04}", i))),
1989 /// ("price".to_string(), Value::Float(9.99 + i as f64)),
1990 /// ("stock".to_string(), Value::Int(100)),
1991 /// ]))
1992 /// .collect();
1993 ///
1994 /// let ids = db.batch_insert("products", test_products).await?;
1995 /// // Much faster than 1000 individual insert_into() calls!
1996 ///
1997 /// // Migration from CSV data
1998 /// let mut csv_reader = csv::Reader::from_path("data.csv")?;
1999 /// let mut batch = Vec::new();
2000 ///
2001 /// for result in csv_reader.records() {
2002 /// let record = result?;
2003 /// let doc = HashMap::from([
2004 /// ("field1".to_string(), Value::String(record[0].to_string())),
2005 /// ("field2".to_string(), Value::String(record[1].to_string())),
2006 /// ]);
2007 /// batch.push(doc);
2008 ///
2009 /// // Insert in batches of 1000
2010 /// if batch.len() >= 1000 {
2011 /// db.batch_insert("imported_data", batch.clone()).await?;
2012 /// batch.clear();
2013 /// }
2014 /// }
2015 ///
2016 /// // Insert remaining
2017 /// if !batch.is_empty() {
2018 /// db.batch_insert("imported_data", batch).await?;
2019 /// }
2020 /// ```
2021 ///
2022 /// # Errors
2023 /// - `ValidationError`: Unique constraint violation on any document
2024 /// - `CollectionNotFound`: Collection doesn't exist
2025 /// - `IoError`: Storage write failure
2026 ///
2027 /// # Important Notes
2028 /// - All inserts are atomic - if one fails, none are inserted
2029 /// - UUIDs are auto-generated for all documents
2030 /// - PubSub events are published for each insert
2031 /// - For 10+ documents, this is 3x faster than individual inserts
2032 /// - For < 10 documents, use `insert_into()` instead
2033 ///
2034 /// # See Also
2035 /// - `insert_into()` for single document inserts
2036 /// - `import_from_json()` for file-based bulk imports
2037 /// - `batch_write()` for low-level batch operations
2038 pub async fn batch_insert(
2039 &self,
2040 collection: &str,
2041 documents: Vec<HashMap<String, Value>>,
2042 ) -> Result<Vec<String>> {
2043 let mut doc_ids = Vec::with_capacity(documents.len());
2044 let mut pairs = Vec::with_capacity(documents.len());
2045
2046 // Prepare all documents
2047 for data in documents {
2048 // Validate unique constraints
2049 self.validate_unique_constraints(collection, &data).await?;
2050
2051 let doc_id = Uuid::now_v7().to_string();
2052 let document = Document {
2053 id: doc_id.clone(),
2054 data,
2055 };
2056
2057 let key = format!("{}:{}", collection, doc_id);
2058 let value = serde_json::to_vec(&document)?;
2059
2060 pairs.push((key, value));
2061 doc_ids.push(doc_id);
2062 }
2063
2064 // Write to WAL in batch (if enabled)
2065 if let Some(ref wal) = self.wal
2066 && self.config.durability_mode != DurabilityMode::None
2067 {
2068 let mut wal_lock = wal.write().unwrap();
2069 for (key, value) in &pairs {
2070 wal_lock.append(Operation::Put, key, Some(value))?;
2071 }
2072 }
2073
2074 // Bypass write buffer - go directly to cold storage batch API
2075 self.cold.batch_set(pairs.clone())?;
2076
2077 // Note: Durability is handled by background checkpoint process
2078
2079 // Update hot cache and indices
2080 for (key, value) in pairs {
2081 if self.should_cache_key(&key) {
2082 self.hot
2083 .set(Arc::new(key.clone()), Arc::new(value.clone()), None);
2084 }
2085
2086 if let Some(collection_name) = key.split(':').next()
2087 && !collection_name.starts_with('_')
2088 {
2089 self.index_value(collection_name, &key, &value)?;
2090 }
2091 }
2092
2093 // Publish events
2094 for doc_id in &doc_ids {
2095 if let Ok(Some(doc)) = self.get_document(collection, doc_id) {
2096 let event = crate::pubsub::ChangeEvent::insert(collection, doc_id, doc);
2097 let _ = self.pubsub.publish(event);
2098 }
2099 }
2100
2101 Ok(doc_ids)
2102 }
2103
2104 /// Update a document by ID
2105 ///
2106 /// # Arguments
2107 /// * `collection` - Collection name
2108 /// * `doc_id` - Document ID to update
2109 /// * `data` - New field values to set
2110 ///
2111 /// # Returns
2112 /// Ok(()) on success, or an error if the document doesn't exist
2113 ///
2114 /// # Examples
2115 ///
2116 /// ```
2117 /// db.update_document("users", &user_id, vec![
2118 /// ("status", Value::String("active".to_string())),
2119 /// ("last_login", Value::String(chrono::Utc::now().to_rfc3339())),
2120 /// ]).await?;
2121 /// ```
2122 pub async fn update_document(
2123 &self,
2124 collection: &str,
2125 doc_id: &str,
2126 updates: Vec<(&str, Value)>,
2127 ) -> Result<()> {
2128 // Get existing document
2129 let mut document = self.get_document(collection, doc_id)?.ok_or_else(|| {
2130 AqlError::new(
2131 ErrorCode::NotFound,
2132 format!("Document not found: {}", doc_id),
2133 )
2134 })?;
2135
2136 // Store old document for event
2137 let old_document = document.clone();
2138
2139 // Apply updates
2140 for (field, value) in updates {
2141 document.data.insert(field.to_string(), value);
2142 }
2143
2144 // Validate unique constraints after update (excluding current document)
2145 self.validate_unique_constraints_excluding(collection, &document.data, doc_id)
2146 .await?;
2147
2148 // Save updated document
2149 self.put(
2150 format!("{}:{}", collection, doc_id),
2151 serde_json::to_vec(&document)?,
2152 None,
2153 )
2154 .await?;
2155
2156 // Publish update event
2157 let event =
2158 crate::pubsub::ChangeEvent::update(collection, doc_id, old_document, document.clone());
2159 let _ = self.pubsub.publish(event);
2160
2161 Ok(())
2162 }
2163
2164 pub async fn get_all_collection(&self, collection: &str) -> Result<Vec<Document>> {
2165 self.ensure_indices_initialized().await?;
2166 self.scan_collection(collection)
2167 }
2168
2169 pub fn get_data_by_pattern(&self, pattern: &str) -> Result<Vec<(String, DataInfo)>> {
2170 let mut data = Vec::new();
2171
2172 // Scan from cold storage instead of primary index
2173 for result in self.cold.scan() {
2174 if let Ok((key, value)) = result {
2175 if key.contains(pattern) {
2176 let info = if value.starts_with(b"BLOB:") {
2177 DataInfo::Blob { size: value.len() }
2178 } else {
2179 DataInfo::Data {
2180 size: value.len(),
2181 preview: String::from_utf8_lossy(&value[..value.len().min(50)])
2182 .into_owned(),
2183 }
2184 };
2185
2186 data.push((key.clone(), info));
2187 }
2188 }
2189 }
2190
2191 Ok(data)
2192 }
2193
2194 /// Begin a transaction
2195 ///
2196 /// All operations after beginning a transaction will be part of the transaction
2197 /// until either commit_transaction() or rollback_transaction() is called.
2198 ///
2199 /// # Returns
2200 /// Success or an error (e.g., if a transaction is already in progress)
2201 ///
2202 /// # Examples
2203 ///
2204
2205 /// // Start a transaction for atomic operations
2206 /// db.begin_transaction()?;
2207 ///
2208 /// // Perform multiple operations
2209 /// db.insert_into("accounts", vec![("user_id", Value::String(user_id)), ("balance", Value::Float(100.0))])?;
2210 /// db.insert_into("audit_log", vec![("action", Value::String("account_created".to_string()))])?;
2211 ///
2212 /// // Commit all changes or roll back if there's an error
2213 /// if all_ok {
2214 /// db.commit_transaction()?;
2215 /// } else {
2216 /// db.rollback_transaction()?;
2217 /// }
2218 /// ```
2219 /// Begin a new transaction for atomic operations
2220 ///
2221 /// Transactions ensure all-or-nothing execution: either all operations succeed,
2222 /// or none of them are applied. Perfect for maintaining data consistency.
2223 ///
2224 /// # Examples
2225 ///
2226
2227 /// use aurora_db::{Aurora, types::Value};
2228 ///
2229 /// let db = Aurora::open("mydb.db")?;
2230 ///
2231 /// // Start transaction
2232 /// let tx_id = db.begin_transaction();
2233 ///
2234 /// // Perform multiple operations
2235 /// db.insert_into("accounts", vec![
2236 /// ("user_id", Value::String("alice".into())),
2237 /// ("balance", Value::Int(1000)),
2238 /// ]).await?;
2239 ///
2240 /// db.insert_into("accounts", vec![
2241 /// ("user_id", Value::String("bob".into())),
2242 /// ("balance", Value::Int(500)),
2243 /// ])).await?;
2244 ///
2245 /// // Commit if all succeeded
2246 /// db.commit_transaction(tx_id)?;
2247 ///
2248 /// // Or rollback on error
2249 /// // db.rollback_transaction(tx_id)?;
2250 /// ```
2251 pub fn begin_transaction(&self) -> crate::transaction::TransactionId {
2252 let buffer = self.transaction_manager.begin();
2253 buffer.id
2254 }
2255
2256 /// Commit a transaction, making all changes permanent
2257 ///
2258 /// All operations within the transaction are atomically applied to the database.
2259 /// If any operation fails, none are applied.
2260 ///
2261 /// # Arguments
2262 /// * `tx_id` - Transaction ID returned from begin_transaction()
2263 ///
2264 /// # Examples
2265 ///
2266
2267 /// use aurora_db::{Aurora, types::Value};
2268 ///
2269 /// let db = Aurora::open("mydb.db")?;
2270 ///
2271 /// // Transfer money between accounts
2272 /// let tx_id = db.begin_transaction();
2273 ///
2274 /// // Deduct from Alice
2275 /// db.update_document("accounts", "alice", vec![
2276 /// ("balance", Value::Int(900)), // Was 1000
2277 /// ]).await?;
2278 ///
2279 /// // Add to Bob
2280 /// db.update_document("accounts", "bob", vec![
2281 /// ("balance", Value::Int(600)), // Was 500
2282 /// ]).await?;
2283 ///
2284 /// // Both updates succeed - commit them
2285 /// db.commit_transaction(tx_id)?;
2286 ///
2287 /// println!("Transfer completed!");
2288 /// ```
2289 pub fn commit_transaction(&self, tx_id: crate::transaction::TransactionId) -> Result<()> {
2290 let buffer = self
2291 .transaction_manager
2292 .active_transactions
2293 .get(&tx_id)
2294 .ok_or_else(|| {
2295 AqlError::invalid_operation(
2296 "Transaction not found or already completed".to_string(),
2297 )
2298 })?;
2299
2300 for item in buffer.writes.iter() {
2301 let key = item.key();
2302 let value = item.value();
2303 self.cold.set(key.clone(), value.clone())?;
2304 if self.should_cache_key(key) {
2305 self.hot
2306 .set(Arc::new(key.clone()), Arc::new(value.clone()), None);
2307 }
2308 if let Some(collection_name) = key.split(':').next()
2309 && !collection_name.starts_with('_')
2310 {
2311 self.index_value(collection_name, key, value)?;
2312 }
2313 }
2314
2315 for item in buffer.deletes.iter() {
2316 let key = item.key();
2317 if let Some((collection, id)) = key.split_once(':')
2318 && let Ok(Some(doc)) = self.get_document(collection, id)
2319 {
2320 self.remove_from_indices(collection, &doc)?;
2321 }
2322 self.cold.delete(key)?;
2323 self.hot.delete(key);
2324 }
2325
2326 // Drop the buffer reference to release the DashMap read lock
2327 // before calling commit which needs to remove the entry (write lock)
2328 drop(buffer);
2329
2330 self.transaction_manager.commit(tx_id)?;
2331
2332 self.cold.compact()?;
2333
2334 Ok(())
2335 }
2336
2337 /// Roll back a transaction, discarding all changes
2338 ///
2339 /// All operations within the transaction are discarded. The database state
2340 /// remains unchanged. Use this when an error occurs during transaction processing.
2341 ///
2342 /// # Arguments
2343 /// * `tx_id` - Transaction ID returned from begin_transaction()
2344 ///
2345 /// # Examples
2346 ///
2347
2348 /// use aurora_db::{Aurora, types::Value};
2349 ///
2350 /// let db = Aurora::open("mydb.db")?;
2351 ///
2352 /// // Attempt a transfer with validation
2353 /// let tx_id = db.begin_transaction();
2354 ///
2355 /// let result = async {
2356 /// // Deduct from Alice
2357 /// let alice = db.get_document("accounts", "alice").await?;
2358 /// let balance = alice.and_then(|doc| doc.data.get("balance"));
2359 ///
2360 /// if let Some(Value::Int(bal)) = balance {
2361 /// if *bal < 100 {
2362 /// return Err("Insufficient funds");
2363 /// }
2364 ///
2365 /// db.update_document("accounts", "alice", vec![
2366 /// ("balance", Value::Int(bal - 100)),
2367 /// ]).await?;
2368 ///
2369 /// db.update_document("accounts", "bob", vec![
2370 /// ("balance", Value::Int(600)),
2371 /// ]).await?;
2372 ///
2373 /// Ok(())
2374 /// } else {
2375 /// Err("Account not found")
2376 /// }
2377 /// }.await;
2378 ///
2379 /// match result {
2380 /// Ok(_) => {
2381 /// db.commit_transaction(tx_id)?;
2382 /// println!("Transfer completed");
2383 /// }
2384 /// Err(e) => {
2385 /// db.rollback_transaction(tx_id)?;
2386 /// println!("Transfer failed: {}, changes rolled back", e);
2387 /// }
2388 /// }
2389 /// ```
2390 pub fn rollback_transaction(&self, tx_id: crate::transaction::TransactionId) -> Result<()> {
2391 self.transaction_manager.rollback(tx_id)
2392 }
2393
2394 /// Create a secondary index on a field for faster queries
2395 ///
2396 /// Indexes dramatically improve query performance for frequently accessed fields,
2397 /// trading increased memory usage and slower writes for much faster reads.
2398 ///
2399 /// # When to Create Indexes
2400 /// - **Frequent queries**: Fields used in 80%+ of your queries
2401 /// - **High cardinality**: Fields with many unique values (user_id, email)
2402 /// - **Sorting/filtering**: Fields used in ORDER BY or WHERE clauses
2403 /// - **Large collections**: Most beneficial with 10,000+ documents
2404 ///
2405 /// # When NOT to Index
2406 /// - Low cardinality fields (e.g., boolean flags, small enums)
2407 /// - Rarely queried fields
2408 /// - Fields that change frequently (write-heavy workloads)
2409 /// - Small collections (<1,000 documents) - full scans are fast enough
2410 ///
2411 /// # Performance Characteristics
2412 /// - **Query speedup**: O(n) → O(1) for equality filters
2413 /// - **Memory cost**: ~100-200 bytes per document per index
2414 /// - **Write slowdown**: ~20-30% longer insert/update times
2415 /// - **Build time**: ~5,000 docs/sec for initial indexing
2416 ///
2417 /// # Arguments
2418 /// * `collection` - Name of the collection to index
2419 /// * `field` - Name of the field to index
2420 ///
2421 /// # Examples
2422 ///
2423 /// ```
2424 /// use aurora_db::Aurora;
2425 ///
2426 /// let db = Aurora::open("mydb.db")?;
2427 /// db.new_collection("users", vec![
2428 /// ("email", FieldType::String),
2429 /// ("age", FieldType::Int),
2430 /// ("active", FieldType::Bool),
2431 /// ])?;
2432 ///
2433 /// // Index email - frequently queried, high cardinality
2434 /// db.create_index("users", "email").await?;
2435 ///
2436 /// // Now this query is FAST (O(1) instead of O(n))
2437 /// let user = db.query("users")
2438 /// .filter(|f| f.eq("email", "alice@example.com"))
2439 /// .first_one()
2440 /// .await?;
2441 ///
2442 /// // DON'T index 'active' - low cardinality (only 2 values: true/false)
2443 /// // A full scan is fast enough for boolean fields
2444 ///
2445 /// // DO index 'age' if you frequently query age ranges
2446 /// db.create_index("users", "age").await?;
2447 ///
2448 /// let young_users = db.query("users")
2449 /// .filter(|f| f.lt("age", 30))
2450 /// .collect()
2451 /// .await?;
2452 /// ```
2453 ///
2454 /// # Real-World Example: E-commerce Orders
2455 ///
2456 /// ```
2457 /// // Orders collection: 1 million documents
2458 /// db.new_collection("orders", vec![
2459 /// ("user_id", FieldType::String), // High cardinality
2460 /// ("status", FieldType::String), // Low cardinality (pending, shipped, delivered)
2461 /// ("created_at", FieldType::String),
2462 /// ("total", FieldType::Float),
2463 /// ])?;
2464 ///
2465 /// // Index user_id - queries like "show me my orders" are common
2466 /// db.create_index("orders", "user_id").await?; // Good choice
2467 ///
2468 /// // Query speedup: 2.5s → 0.001s
2469 /// let my_orders = db.query("orders")
2470 /// .filter(|f| f.eq("user_id", user_id))
2471 /// .collect()
2472 /// .await?;
2473 ///
2474 /// // DON'T index 'status' - only 3 possible values
2475 /// // Scanning 1M docs takes ~100ms, indexing won't help much
2476 ///
2477 /// // Index created_at if you frequently query recent orders
2478 /// db.create_index("orders", "created_at").await?; // Good for time-based queries
2479 /// ```
2480 pub async fn create_index(&self, collection: &str, field: &str) -> Result<()> {
2481 let collection_def = self.get_collection_definition(collection)?;
2482
2483 if let Some(field_def) = collection_def.fields.get(field) {
2484 if field_def.field_type == FieldType::Any {
2485 return Err(AqlError::new(
2486 ErrorCode::InvalidDefinition,
2487 "Cannot create an index on a field of type 'Any'.".to_string(),
2488 ));
2489 }
2490 } else {
2491 return Err(AqlError::new(
2492 ErrorCode::InvalidDefinition,
2493 format!(
2494 "Field '{}' not found in collection '{}'.",
2495 field, collection
2496 ),
2497 ));
2498 }
2499
2500 // Generate a default index name
2501 let index_name = format!("idx_{}_{}", collection, field);
2502
2503 // Create index definition
2504 let definition = IndexDefinition {
2505 name: index_name.clone(),
2506 collection: collection.to_string(),
2507 fields: vec![field.to_string()],
2508 index_type: IndexType::BTree,
2509 unique: false,
2510 };
2511
2512 // Create the index
2513 let index = Index::new(definition.clone());
2514
2515 // Index all existing documents in the collection
2516 let prefix = format!("{}:", collection);
2517 for result in self.cold.scan_prefix(&prefix) {
2518 if let Ok((_, data)) = result
2519 && let Ok(doc) = serde_json::from_slice::<Document>(&data)
2520 {
2521 let _ = index.insert(&doc);
2522 }
2523 }
2524
2525 // Store the index
2526 self.indices.insert(index_name, index);
2527
2528 // Store the index definition for persistence
2529 let index_key = format!("_index:{}:{}", collection, field);
2530 self.put(index_key, serde_json::to_vec(&definition)?, None)
2531 .await?;
2532
2533 Ok(())
2534 }
2535
2536 /// Query documents in a collection with filtering, sorting, and pagination
2537 ///
2538 /// Returns a `QueryBuilder` that allows fluent chaining of query operations.
2539 /// Queries use early termination for LIMIT clauses, making them extremely fast
2540 /// even on large collections (6,800x faster than naive implementations).
2541 ///
2542 /// # Performance
2543 /// - With LIMIT: O(k) where k = limit + offset (early termination!)
2544 /// - Without LIMIT: O(n) where n = matching documents
2545 /// - Uses secondary indices when available for equality filters
2546 /// - Hot cache: ~1M reads/sec, Cold storage: ~500K reads/sec
2547 ///
2548 /// # Examples
2549 ///
2550 /// ```
2551 /// use aurora_db::{Aurora, types::Value};
2552 ///
2553 /// let db = Aurora::open("mydb.db")?;
2554 ///
2555 /// // Simple equality query
2556 /// let active_users = db.query("users")
2557 /// .filter(|f| f.eq("active", Value::Bool(true)))
2558 /// .collect()
2559 /// .await?;
2560 ///
2561 /// // Range query with pagination (FAST - uses early termination!)
2562 /// let top_scorers = db.query("users")
2563 /// .filter(|f| f.gt("score", Value::Int(1000)))
2564 /// .order_by("score", false) // descending
2565 /// .limit(10)
2566 /// .offset(20)
2567 /// .collect()
2568 /// .await?;
2569 ///
2570 /// // Multiple filters
2571 /// let premium_active = db.query("users")
2572 /// .filter(|f| f.eq("tier", Value::String("premium".into())))
2573 /// .filter(|f| f.eq("active", Value::Bool(true)))
2574 /// .limit(100) // Only scans ~200 docs, not all million!
2575 /// .collect()
2576 /// .await?;
2577 ///
2578 /// // Text search in a field
2579 /// let matching = db.query("articles")
2580 /// .filter(|f| f.contains("title", "rust"))
2581 /// .collect()
2582 /// .await?;
2583 /// ```
2584 pub fn query<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
2585 QueryBuilder::new(self, collection)
2586 }
2587
2588 /// Create a search builder for full-text search
2589 ///
2590 /// # Arguments
2591 /// * `collection` - Name of the collection to search
2592 ///
2593 /// # Returns
2594 /// A `SearchBuilder` for configuring and executing searches
2595 ///
2596 /// # Examples
2597 ///
2598 /// ```
2599 /// // Search for documents containing text
2600 /// let search_results = db.search("articles")
2601 /// .field("content")
2602 /// .matching("quantum computing")
2603 /// .fuzzy(true) // Enable fuzzy matching for typo tolerance
2604 /// .collect()
2605 /// .await?;
2606 /// ```
2607 pub fn search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
2608 SearchBuilder::new(self, collection)
2609 }
2610
2611 /// Retrieve a document by ID
2612 ///
2613 /// Fast direct lookup when you know the document ID. Significantly faster
2614 /// than querying with filters when ID is known.
2615 ///
2616 /// # Performance
2617 /// - Hot cache: ~1,000,000 reads/sec (instant)
2618 /// - Cold storage: ~500,000 reads/sec (disk I/O)
2619 /// - Complexity: O(1) - constant time lookup
2620 /// - Much faster than `.query().filter(|f| f.eq("id", ...))` which is O(n)
2621 ///
2622 /// # Arguments
2623 /// * `collection` - Name of the collection to query
2624 /// * `id` - ID of the document to retrieve
2625 ///
2626 /// # Returns
2627 /// The document if found, None if not found, or an error
2628 ///
2629 /// # Examples
2630 ///
2631
2632 /// use aurora_db::{Aurora, types::Value};
2633 ///
2634 /// let db = Aurora::open("mydb.db")?;
2635 ///
2636 /// // Basic retrieval
2637 /// if let Some(user) = db.get_document("users", &user_id)? {
2638 /// println!("Found user: {}", user.id);
2639 ///
2640 /// // Access fields safely
2641 /// if let Some(Value::String(name)) = user.data.get("name") {
2642 /// println!("Name: {}", name);
2643 /// }
2644 ///
2645 /// if let Some(Value::Int(age)) = user.data.get("age") {
2646 /// println!("Age: {}", age);
2647 /// }
2648 /// } else {
2649 /// println!("User not found");
2650 /// }
2651 ///
2652 /// // Idiomatic error handling
2653 /// let user = db.get_document("users", &user_id)?
2654 /// .ok_or_else(|| AqlError::new(ErrorCode::NotFound,"User not found".into()))?;
2655 ///
2656 /// // Checking existence before operations
2657 /// if db.get_document("users", &user_id)?.is_some() {
2658 /// db.update_document("users", &user_id, vec![
2659 /// ("last_login", Value::String(chrono::Utc::now().to_rfc3339())),
2660 /// ]).await?;
2661 /// }
2662 ///
2663 /// // Batch retrieval (fetch multiple by ID)
2664 /// let user_ids = vec!["user-1", "user-2", "user-3"];
2665 /// let users: Vec<Document> = user_ids.iter()
2666 /// .filter_map(|id| db.get_document("users", id).ok().flatten())
2667 /// .collect();
2668 ///
2669 /// println!("Found {} out of {} users", users.len(), user_ids.len());
2670 /// ```
2671 ///
2672 /// # When to Use
2673 /// - You know the document ID (from insert, previous query, or URL param)
2674 /// - Need fastest possible lookup (1M reads/sec)
2675 /// - Fetching a single document
2676 ///
2677 /// # When NOT to Use
2678 /// - Searching by other fields → Use `query().filter()` instead
2679 /// - Need multiple documents by criteria → Use `query().collect()` instead
2680 /// - Don't know the ID → Use `find_by_field()` or `query()` instead
2681 pub fn get_document(&self, collection: &str, id: &str) -> Result<Option<Document>> {
2682 let key = format!("{}:{}", collection, id);
2683 if let Some(data) = self.get(&key)? {
2684 Ok(Some(serde_json::from_slice(&data)?))
2685 } else {
2686 Ok(None)
2687 }
2688 }
2689
2690 /// Delete a document by ID
2691 ///
2692 /// Permanently removes a document from storage, cache, and all indices.
2693 /// Publishes a delete event for PubSub subscribers. This operation cannot be undone.
2694 ///
2695 /// # Performance
2696 /// - Delete speed: ~50,000 deletes/sec
2697 /// - Cleans up hot cache, cold storage, primary + secondary indices
2698 /// - Triggers PubSub events for listeners
2699 ///
2700 /// # Arguments
2701 /// * `key` - Full key in format "collection:id" (e.g., "users:123")
2702 ///
2703 /// # Returns
2704 /// Success or an error
2705 ///
2706 /// # Errors
2707 /// - `InvalidOperation`: Invalid key format (must be "collection:id")
2708 /// - `IoError`: Storage deletion failed
2709 ///
2710 /// # Examples
2711 ///
2712 /// ```
2713 /// use aurora_db::Aurora;
2714 ///
2715 /// let db = Aurora::open("mydb.db")?;
2716 ///
2717 /// // Basic deletion (note: requires "collection:id" format)
2718 /// db.delete("users:abc123").await?;
2719 ///
2720 /// // Delete with existence check
2721 /// let user_id = "user-456";
2722 /// if db.get_document("users", user_id)?.is_some() {
2723 /// db.delete(&format!("users:{}", user_id)).await?;
2724 /// println!("User deleted");
2725 /// } else {
2726 /// println!("User not found");
2727 /// }
2728 ///
2729 /// // Error handling
2730 /// match db.delete("users:nonexistent").await {
2731 /// Ok(_) => println!("Deleted successfully"),
2732 /// Err(e) => println!("Delete failed: {}", e),
2733 /// }
2734 ///
2735 /// // Batch deletion using query
2736 /// let inactive_count = db.delete_where("users", |f| {
2737 /// f.eq("active", Value::Bool(false))
2738 /// }).await?;
2739 /// println!("Deleted {} inactive users", inactive_count);
2740 ///
2741 /// // Delete with cascading (manual cascade pattern)
2742 /// let user_id = "user-123";
2743 ///
2744 /// // Delete user's orders first
2745 /// let orders = db.query("orders")
2746 /// .filter(|f| f.eq("user_id", user_id))
2747 /// .collect()
2748 /// .await?;
2749 ///
2750 /// for order in orders {
2751 /// db.delete(&format!("orders:{}", order.id)).await?;
2752 /// }
2753 ///
2754 /// // Then delete the user
2755 /// db.delete(&format!("users:{}", user_id)).await?;
2756 /// println!("User and all orders deleted");
2757 /// ```
2758 ///
2759 /// # Alternative: Soft Delete Pattern
2760 ///
2761 /// For recoverable deletions, use soft deletes instead:
2762 ///
2763 /// ```
2764 /// // Soft delete - mark as deleted instead of removing
2765 /// db.update_document("users", &user_id, vec![
2766 /// ("deleted", Value::Bool(true)),
2767 /// ("deleted_at", Value::String(chrono::Utc::now().to_rfc3339())),
2768 /// ]).await?;
2769 ///
2770 /// // Query excludes soft-deleted items
2771 /// let active_users = db.query("users")
2772 /// .filter(|f| f.eq("deleted", Value::Bool(false)))
2773 /// .collect()
2774 /// .await?;
2775 ///
2776 /// // Later: hard delete after retention period
2777 /// let old_deletions = db.query("users")
2778 /// .filter(|f| f.eq("deleted", Value::Bool(true)))
2779 /// .filter(|f| f.lt("deleted_at", thirty_days_ago))
2780 /// .collect()
2781 /// .await?;
2782 ///
2783 /// for user in old_deletions {
2784 /// db.delete(&format!("users:{}", user.id)).await?;
2785 /// }
2786 /// ```
2787 ///
2788 /// # Important Notes
2789 /// - Deletion is permanent - no undo/recovery
2790 /// - Consider soft deletes for recoverable operations
2791 /// - Use transactions for multi-document deletions
2792 /// - PubSub subscribers will receive delete events
2793 /// - All indices are automatically cleaned up
2794 pub async fn delete(&self, key: &str) -> Result<()> {
2795 // Extract collection and id from key (format: "collection:id")
2796 let (collection, id) = if let Some((coll, doc_id)) = key.split_once(':') {
2797 (coll, doc_id)
2798 } else {
2799 return Err(AqlError::invalid_operation(
2800 "Invalid key format, expected 'collection:id'".to_string(),
2801 ));
2802 };
2803
2804 // CRITICAL FIX: Get document BEFORE deletion to clean up secondary indices
2805 let document = self.get_document(collection, id)?;
2806
2807 // Delete from hot cache
2808 if self.hot.get(key).is_some() {
2809 self.hot.delete(key);
2810 }
2811
2812 // Delete from cold storage
2813 self.cold.delete(key)?;
2814
2815 // CRITICAL FIX: Clean up ALL indices (primary + secondary)
2816 if let Some(doc) = document {
2817 self.remove_from_indices(collection, &doc)?;
2818 } else {
2819 // Fallback: at least remove from primary index
2820 if let Some(index) = self.primary_indices.get_mut(collection) {
2821 index.remove(id);
2822 }
2823 }
2824
2825 // Publish delete event
2826 let event = crate::pubsub::ChangeEvent::delete(collection, id);
2827 let _ = self.pubsub.publish(event);
2828
2829 Ok(())
2830 }
2831
2832 pub async fn delete_collection(&self, collection: &str) -> Result<()> {
2833 let prefix = format!("{}:", collection);
2834
2835 // Get all keys in collection
2836 let keys: Vec<String> = self
2837 .cold
2838 .scan()
2839 .filter_map(|r| r.ok())
2840 .filter(|(k, _)| k.starts_with(&prefix))
2841 .map(|(k, _)| k)
2842 .collect();
2843
2844 // Delete each key
2845 for key in keys {
2846 self.delete(&key).await?;
2847 }
2848
2849 // Remove collection indices
2850 self.primary_indices.remove(collection);
2851 self.secondary_indices
2852 .retain(|k, _| !k.starts_with(&prefix));
2853
2854 // Invalidate schema cache
2855 self.schema_cache.remove(collection);
2856
2857 Ok(())
2858 }
2859
2860 fn remove_from_indices(&self, collection: &str, doc: &Document) -> Result<()> {
2861 // Remove from primary index
2862 if let Some(index) = self.primary_indices.get(collection) {
2863 index.remove(&doc.id);
2864 }
2865
2866 // Remove from secondary indices
2867 for (field, value) in &doc.data {
2868 let index_key = format!("{}:{}", collection, field);
2869 if let Some(index) = self.secondary_indices.get(&index_key)
2870 && let Some(mut doc_ids) = index.get_mut(&value.to_string())
2871 {
2872 doc_ids.retain(|id| id != &doc.id);
2873 }
2874 }
2875
2876 Ok(())
2877 }
2878
2879 pub async fn search_text(
2880 &self,
2881 collection: &str,
2882 field: &str,
2883 query: &str,
2884 ) -> Result<Vec<Document>> {
2885 let mut results = Vec::new();
2886 let docs = self.get_all_collection(collection).await?;
2887
2888 for doc in docs {
2889 if let Some(Value::String(text)) = doc.data.get(field)
2890 && text.to_lowercase().contains(&query.to_lowercase())
2891 {
2892 results.push(doc);
2893 }
2894 }
2895
2896 Ok(results)
2897 }
2898
2899 /// Export a collection to a JSON file
2900 ///
2901 /// Creates a JSON file containing all documents in the collection.
2902 /// Useful for backups, data migration, or sharing datasets.
2903 /// Automatically appends `.json` extension if not present.
2904 ///
2905 /// # Performance
2906 /// - Export speed: ~10,000 docs/sec
2907 /// - Scans entire collection from cold storage
2908 /// - Memory efficient: streams documents to file
2909 ///
2910 /// # Arguments
2911 /// * `collection` - Name of the collection to export
2912 /// * `output_path` - Path to the output JSON file (`.json` auto-appended)
2913 ///
2914 /// # Returns
2915 /// Success or an error
2916 ///
2917 /// # Examples
2918 ///
2919 /// ```
2920 /// use aurora_db::Aurora;
2921 ///
2922 /// let db = Aurora::open("mydb.db")?;
2923 ///
2924 /// // Basic export
2925 /// db.export_as_json("users", "./backups/users_2024-01-15")?;
2926 /// // Creates: ./backups/users_2024-01-15.json
2927 ///
2928 /// // Timestamped backup
2929 /// let timestamp = chrono::Utc::now().format("%Y%m%d_%H%M%S");
2930 /// let backup_path = format!("./backups/users_{}", timestamp);
2931 /// db.export_as_json("users", &backup_path)?;
2932 ///
2933 /// // Export multiple collections
2934 /// for collection in &["users", "orders", "products"] {
2935 /// db.export_as_json(collection, &format!("./export/{}", collection))?;
2936 /// }
2937 /// ```
2938 ///
2939 /// # Output Format
2940 ///
2941 /// The exported JSON has this structure:
2942 /// ```json
2943 /// {
2944 /// "users": [
2945 /// { "id": "123", "name": "Alice", "email": "alice@example.com" },
2946 /// { "id": "456", "name": "Bob", "email": "bob@example.com" }
2947 /// ]
2948 /// }
2949 /// ```
2950 ///
2951 /// # See Also
2952 /// - `export_as_csv()` for CSV format export
2953 /// - `import_from_json()` to restore exported data
2954 pub fn export_as_json(&self, collection: &str, output_path: &str) -> Result<()> {
2955 let output_path = if !output_path.ends_with(".json") {
2956 format!("{}.json", output_path)
2957 } else {
2958 output_path.to_string()
2959 };
2960
2961 let mut docs = Vec::new();
2962
2963 // Get all documents from the specified collection
2964 for result in self.cold.scan() {
2965 let (key, value) = result?;
2966
2967 // Only process documents from the specified collection
2968 if let Some(key_collection) = key.split(':').next()
2969 && key_collection == collection
2970 && !key.starts_with("_collection:")
2971 && let Ok(doc) = serde_json::from_slice::<Document>(&value)
2972 {
2973 // Convert Value enum to raw JSON values
2974 let mut clean_doc = serde_json::Map::new();
2975 for (k, v) in doc.data {
2976 match v {
2977 Value::String(s) => clean_doc.insert(k, JsonValue::String(s)),
2978 Value::Int(i) => clean_doc.insert(k, JsonValue::Number(i.into())),
2979 Value::Float(f) => {
2980 if let Some(n) = serde_json::Number::from_f64(f) {
2981 clean_doc.insert(k, JsonValue::Number(n))
2982 } else {
2983 clean_doc.insert(k, JsonValue::Null)
2984 }
2985 }
2986 Value::Bool(b) => clean_doc.insert(k, JsonValue::Bool(b)),
2987 Value::Array(arr) => {
2988 let clean_arr: Vec<JsonValue> = arr
2989 .into_iter()
2990 .map(|v| match v {
2991 Value::String(s) => JsonValue::String(s),
2992 Value::Int(i) => JsonValue::Number(i.into()),
2993 Value::Float(f) => serde_json::Number::from_f64(f)
2994 .map(JsonValue::Number)
2995 .unwrap_or(JsonValue::Null),
2996 Value::Bool(b) => JsonValue::Bool(b),
2997 Value::Null => JsonValue::Null,
2998 _ => JsonValue::Null,
2999 })
3000 .collect();
3001 clean_doc.insert(k, JsonValue::Array(clean_arr))
3002 }
3003 Value::Uuid(u) => clean_doc.insert(k, JsonValue::String(u.to_string())),
3004 Value::Null => clean_doc.insert(k, JsonValue::Null),
3005 Value::Object(_) => None, // Handle nested objects if needed
3006 };
3007 }
3008 docs.push(JsonValue::Object(clean_doc));
3009 }
3010 }
3011
3012 let output = JsonValue::Object(serde_json::Map::from_iter(vec![(
3013 collection.to_string(),
3014 JsonValue::Array(docs),
3015 )]));
3016
3017 let mut file = StdFile::create(&output_path)?;
3018 serde_json::to_writer_pretty(&mut file, &output)?;
3019 println!("Exported collection '{}' to {}", collection, &output_path);
3020 Ok(())
3021 }
3022
3023 /// Export a collection to a CSV file
3024 ///
3025 /// Creates a CSV file with headers from the first document and rows for each document.
3026 /// Useful for spreadsheet analysis, data science workflows, or reporting.
3027 /// Automatically appends `.csv` extension if not present.
3028 ///
3029 /// # Performance
3030 /// - Export speed: ~8,000 docs/sec
3031 /// - Memory efficient: streams rows to file
3032 /// - Headers determined from first document
3033 ///
3034 /// # Arguments
3035 /// * `collection` - Name of the collection to export
3036 /// * `filename` - Path to the output CSV file (`.csv` auto-appended)
3037 ///
3038 /// # Returns
3039 /// Success or an error
3040 ///
3041 /// # Examples
3042 ///
3043 /// ```
3044 /// use aurora_db::Aurora;
3045 ///
3046 /// let db = Aurora::open("mydb.db")?;
3047 ///
3048 /// // Basic CSV export
3049 /// db.export_as_csv("users", "./reports/users")?;
3050 /// // Creates: ./reports/users.csv
3051 ///
3052 /// // Export for analysis in Excel/Google Sheets
3053 /// db.export_as_csv("orders", "./analytics/sales_data")?;
3054 ///
3055 /// // Monthly report generation
3056 /// let month = chrono::Utc::now().format("%Y-%m");
3057 /// db.export_as_csv("transactions", &format!("./reports/transactions_{}", month))?;
3058 /// ```
3059 ///
3060 /// # Output Format
3061 ///
3062 /// ```csv
3063 /// id,name,email,age
3064 /// 123,Alice,alice@example.com,28
3065 /// 456,Bob,bob@example.com,32
3066 /// ```
3067 ///
3068 /// # Important Notes
3069 /// - Headers are taken from the first document's fields
3070 /// - Documents with different fields will have empty values for missing fields
3071 /// - Nested objects/arrays are converted to strings
3072 /// - Best for flat document structures
3073 ///
3074 /// # See Also
3075 /// - `export_as_json()` for JSON format (better for nested data)
3076 /// - For complex nested structures, use JSON export instead
3077 pub fn export_as_csv(&self, collection: &str, filename: &str) -> Result<()> {
3078 let output_path = if !filename.ends_with(".csv") {
3079 format!("{}.csv", filename)
3080 } else {
3081 filename.to_string()
3082 };
3083
3084 let mut writer = csv::Writer::from_path(&output_path)?;
3085 let mut headers = Vec::new();
3086 let mut first_doc = true;
3087
3088 // Get all documents from the specified collection
3089 for result in self.cold.scan() {
3090 let (key, value) = result?;
3091
3092 // Only process documents from the specified collection
3093 if let Some(key_collection) = key.split(':').next()
3094 && key_collection == collection
3095 && !key.starts_with("_collection:")
3096 && let Ok(doc) = serde_json::from_slice::<Document>(&value)
3097 {
3098 // Write headers from first document
3099 if first_doc && !doc.data.is_empty() {
3100 headers = doc.data.keys().cloned().collect();
3101 writer.write_record(&headers)?;
3102 first_doc = false;
3103 }
3104
3105 // Write the document values
3106 let values: Vec<String> = headers
3107 .iter()
3108 .map(|header| {
3109 doc.data
3110 .get(header)
3111 .map(|v| v.to_string())
3112 .unwrap_or_default()
3113 })
3114 .collect();
3115 writer.write_record(&values)?;
3116 }
3117 }
3118
3119 writer.flush()?;
3120 println!("Exported collection '{}' to {}", collection, &output_path);
3121 Ok(())
3122 }
3123
3124 // Helper method to create filter-based queries
3125 pub fn find<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
3126 self.query(collection)
3127 }
3128
3129 // Convenience methods that build on top of the FilterBuilder
3130
3131 pub async fn find_by_id(&self, collection: &str, id: &str) -> Result<Option<Document>> {
3132 self.query(collection)
3133 .filter(|f| f.eq("id", id))
3134 .first_one()
3135 .await
3136 }
3137
3138 pub async fn find_one<F>(&self, collection: &str, filter_fn: F) -> Result<Option<Document>>
3139 where
3140 F: Fn(&FilterBuilder) -> bool + Send + Sync + 'static,
3141 {
3142 self.query(collection).filter(filter_fn).first_one().await
3143 }
3144
3145 pub async fn find_by_field<T: Into<Value> + Clone + Send + Sync + 'static>(
3146 &self,
3147 collection: &str,
3148 field: &'static str,
3149 value: T,
3150 ) -> Result<Vec<Document>> {
3151 let value_clone = value.clone();
3152 self.query(collection)
3153 .filter(move |f: &FilterBuilder| f.eq(field, value_clone.clone()))
3154 .collect()
3155 .await
3156 }
3157
3158 pub async fn find_by_fields(
3159 &self,
3160 collection: &str,
3161 fields: Vec<(&str, Value)>,
3162 ) -> Result<Vec<Document>> {
3163 let mut query = self.query(collection);
3164
3165 for (field, value) in fields {
3166 let field_owned = field.to_owned();
3167 let value_owned = value.clone();
3168 query = query.filter(move |f| f.eq(&field_owned, value_owned.clone()));
3169 }
3170
3171 query.collect().await
3172 }
3173
3174 // Advanced example: find documents with a field value in a specific range
3175 pub async fn find_in_range<T: Into<Value> + Clone + Send + Sync + 'static>(
3176 &self,
3177 collection: &str,
3178 field: &'static str,
3179 min: T,
3180 max: T,
3181 ) -> Result<Vec<Document>> {
3182 self.query(collection)
3183 .filter(move |f| f.between(field, min.clone(), max.clone()))
3184 .collect()
3185 .await
3186 }
3187
3188 // Complex query example: build with multiple combined filters
3189 pub async fn find_complex<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
3190 self.query(collection)
3191 }
3192
3193 // Create a full-text search query with added filter options
3194 pub fn advanced_search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
3195 self.search(collection)
3196 }
3197
3198 // Utility methods for common operations
3199 pub async fn upsert(
3200 &self,
3201 collection: &str,
3202 id: &str,
3203 data: Vec<(&str, Value)>,
3204 ) -> Result<String> {
3205 // Convert Vec<(&str, Value)> to HashMap<String, Value>
3206 let data_map: HashMap<String, Value> =
3207 data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
3208
3209 // Check if document exists
3210 if let Some(mut doc) = self.get_document(collection, id)? {
3211 // Update existing document - merge new data
3212 for (key, value) in data_map {
3213 doc.data.insert(key, value);
3214 }
3215
3216 // Validate unique constraints for the updated document
3217 // We need to exclude the current document from the uniqueness check
3218 self.validate_unique_constraints_excluding(collection, &doc.data, id)
3219 .await?;
3220
3221 self.put(
3222 format!("{}:{}", collection, id),
3223 serde_json::to_vec(&doc)?,
3224 None,
3225 )
3226 .await?;
3227 Ok(id.to_string())
3228 } else {
3229 // Insert new document with specified ID - validate unique constraints
3230 self.validate_unique_constraints(collection, &data_map)
3231 .await?;
3232
3233 let document = Document {
3234 id: id.to_string(),
3235 data: data_map,
3236 };
3237
3238 self.put(
3239 format!("{}:{}", collection, id),
3240 serde_json::to_vec(&document)?,
3241 None,
3242 )
3243 .await?;
3244 Ok(id.to_string())
3245 }
3246 }
3247
3248 // Atomic increment/decrement
3249 pub async fn increment(
3250 &self,
3251 collection: &str,
3252 id: &str,
3253 field: &str,
3254 amount: i64,
3255 ) -> Result<i64> {
3256 if let Some(mut doc) = self.get_document(collection, id)? {
3257 // Get current value
3258 let current = match doc.data.get(field) {
3259 Some(Value::Int(i)) => *i,
3260 _ => 0,
3261 };
3262
3263 // Increment
3264 let new_value = current + amount;
3265 doc.data.insert(field.to_string(), Value::Int(new_value));
3266
3267 // Save changes
3268 self.put(
3269 format!("{}:{}", collection, id),
3270 serde_json::to_vec(&doc)?,
3271 None,
3272 )
3273 .await?;
3274
3275 Ok(new_value)
3276 } else {
3277 Err(AqlError::new(
3278 ErrorCode::NotFound,
3279 format!("Document {}:{} not found", collection, id),
3280 ))
3281 }
3282 }
3283
3284 // Delete documents by query
3285 pub async fn delete_by_query<F>(&self, collection: &str, filter_fn: F) -> Result<usize>
3286 where
3287 F: Fn(&FilterBuilder) -> bool + Send + Sync + 'static,
3288 {
3289 let docs = self.query(collection).filter(filter_fn).collect().await?;
3290 let mut deleted_count = 0;
3291
3292 for doc in docs {
3293 let key = format!("{}:{}", collection, doc.id);
3294 self.delete(&key).await?;
3295 deleted_count += 1;
3296 }
3297
3298 Ok(deleted_count)
3299 }
3300
3301 /// Import documents from a JSON file into a collection
3302 ///
3303 /// Validates each document against the collection schema, skips duplicates (by ID),
3304 /// and provides detailed statistics about the import operation. Useful for restoring
3305 /// backups, migrating data, or seeding development databases.
3306 ///
3307 /// # Performance
3308 /// - Import speed: ~5,000 docs/sec (with validation)
3309 /// - Memory efficient: processes documents one at a time
3310 /// - Validates schema and unique constraints
3311 ///
3312 /// # Arguments
3313 /// * `collection` - Name of the collection to import into
3314 /// * `filename` - Path to the JSON file containing documents (array format)
3315 ///
3316 /// # Returns
3317 /// `ImportStats` containing counts of imported, skipped, and failed documents
3318 ///
3319 /// # Examples
3320 ///
3321 /// ```
3322 /// use aurora_db::Aurora;
3323 ///
3324 /// let db = Aurora::open("mydb.db")?;
3325 ///
3326 /// // Basic import
3327 /// let stats = db.import_from_json("users", "./data/new_users.json").await?;
3328 /// println!("Imported: {}, Skipped: {}, Failed: {}",
3329 /// stats.imported, stats.skipped, stats.failed);
3330 ///
3331 /// // Restore from backup
3332 /// let backup_file = "./backups/users_2024-01-15.json";
3333 /// let stats = db.import_from_json("users", backup_file).await?;
3334 ///
3335 /// if stats.failed > 0 {
3336 /// eprintln!("Warning: {} documents failed validation", stats.failed);
3337 /// }
3338 ///
3339 /// // Idempotent import - duplicates are skipped
3340 /// let stats = db.import_from_json("users", "./data/users.json").await?;
3341 /// // Running again will skip all existing documents
3342 /// let stats2 = db.import_from_json("users", "./data/users.json").await?;
3343 /// assert_eq!(stats2.skipped, stats.imported);
3344 ///
3345 /// // Migration from another system
3346 /// db.new_collection("products", vec![
3347 /// ("sku", FieldType::String),
3348 /// ("name", FieldType::String),
3349 /// ("price", FieldType::Float),
3350 /// ])?;
3351 ///
3352 /// let stats = db.import_from_json("products", "./migration/products.json").await?;
3353 /// println!("Migration complete: {} products imported", stats.imported);
3354 /// ```
3355 ///
3356 /// # Expected JSON Format
3357 ///
3358 /// The JSON file should contain an array of document objects:
3359 /// ```json
3360 /// [
3361 /// { "id": "123", "name": "Alice", "email": "alice@example.com" },
3362 /// { "id": "456", "name": "Bob", "email": "bob@example.com" },
3363 /// { "name": "Carol", "email": "carol@example.com" }
3364 /// ]
3365 /// ```
3366 ///
3367 /// # Behavior
3368 /// - Documents with existing IDs are skipped (duplicate detection)
3369 /// - Documents without IDs get auto-generated UUIDs
3370 /// - Schema validation is performed on all fields
3371 /// - Failed documents are counted but don't stop the import
3372 /// - Unique constraints are checked
3373 ///
3374 /// # See Also
3375 /// - `export_as_json()` to create compatible backup files
3376 /// - `batch_insert()` for programmatic bulk inserts
3377 pub async fn import_from_json(&self, collection: &str, filename: &str) -> Result<ImportStats> {
3378 // Validate that the collection exists
3379 let collection_def = self.get_collection_definition(collection)?;
3380
3381 // Load JSON file
3382 let json_string = read_to_string(filename).await.map_err(|e| {
3383 AqlError::new(
3384 ErrorCode::IoError,
3385 format!("Failed to read import file: {}", e),
3386 )
3387 })?;
3388
3389 // Parse JSON
3390 let documents: Vec<JsonValue> = from_str(&json_string).map_err(|e| {
3391 AqlError::new(
3392 ErrorCode::SerializationError,
3393 format!("Failed to parse JSON: {}", e),
3394 )
3395 })?;
3396
3397 let mut stats = ImportStats::default();
3398
3399 // Process each document
3400 for doc_json in documents {
3401 match self
3402 .import_document(collection, &collection_def, doc_json)
3403 .await
3404 {
3405 Ok(ImportResult::Imported) => stats.imported += 1,
3406 Ok(ImportResult::Skipped) => stats.skipped += 1,
3407 Err(_) => stats.failed += 1,
3408 }
3409 }
3410
3411 Ok(stats)
3412 }
3413
3414 /// Import a single document, performing schema validation and duplicate checking
3415 async fn import_document(
3416 &self,
3417 collection: &str,
3418 collection_def: &Collection,
3419 doc_json: JsonValue,
3420 ) -> Result<ImportResult> {
3421 if !doc_json.is_object() {
3422 return Err(AqlError::invalid_operation(
3423 "Expected JSON object".to_string(),
3424 ));
3425 }
3426
3427 // Extract document ID if present
3428 let doc_id = doc_json
3429 .get("id")
3430 .and_then(|id| id.as_str())
3431 .map(|s| s.to_string())
3432 .unwrap_or_else(|| Uuid::now_v7().to_string());
3433
3434 // Check if document with this ID already exists
3435 if self.get_document(collection, &doc_id)?.is_some() {
3436 return Ok(ImportResult::Skipped);
3437 }
3438
3439 // Convert JSON to our document format and validate against schema
3440 let mut data_map = HashMap::new();
3441
3442 if let Some(obj) = doc_json.as_object() {
3443 for (field_name, field_def) in &collection_def.fields {
3444 if let Some(json_value) = obj.get(field_name) {
3445 // Validate value against field type
3446 if !self.validate_field_value(json_value, &field_def.field_type) {
3447 return Err(AqlError::invalid_operation(format!(
3448 "Field '{}' has invalid type",
3449 field_name
3450 )));
3451 }
3452
3453 // Convert JSON value to our Value type
3454 let value = self.json_to_value(json_value)?;
3455 data_map.insert(field_name.clone(), value);
3456 } else if field_def.unique {
3457 // Missing required unique field
3458 return Err(AqlError::invalid_operation(format!(
3459 "Missing required unique field '{}'",
3460 field_name
3461 )));
3462 }
3463 }
3464 }
3465
3466 // Check for duplicates by unique fields
3467 let unique_fields = self.get_unique_fields(collection_def);
3468 for unique_field in &unique_fields {
3469 if let Some(value) = data_map.get(unique_field) {
3470 // Query for existing documents with this unique value
3471 let query_results = self
3472 .query(collection)
3473 .filter(move |f| f.eq(unique_field, value.clone()))
3474 .limit(1)
3475 .collect()
3476 .await?;
3477
3478 if !query_results.is_empty() {
3479 // Found duplicate by unique field
3480 return Ok(ImportResult::Skipped);
3481 }
3482 }
3483 }
3484
3485 // Create and insert document
3486 let document = Document {
3487 id: doc_id,
3488 data: data_map,
3489 };
3490
3491 self.put(
3492 format!("{}:{}", collection, document.id),
3493 serde_json::to_vec(&document)?,
3494 None,
3495 )
3496 .await?;
3497
3498 Ok(ImportResult::Imported)
3499 }
3500
3501 /// Validate that a JSON value matches the expected field type
3502 fn validate_field_value(&self, value: &JsonValue, field_type: &FieldType) -> bool {
3503 match field_type {
3504 FieldType::String => value.is_string(),
3505 FieldType::Int => value.is_i64() || value.is_u64(),
3506 FieldType::Float => value.is_number(),
3507 FieldType::Bool => value.is_boolean(),
3508 FieldType::Array => value.is_array(),
3509 FieldType::Object => value.is_object(),
3510 FieldType::Uuid => {
3511 value.is_string() && Uuid::parse_str(value.as_str().unwrap_or("")).is_ok()
3512 }
3513 FieldType::Any => true,
3514 }
3515 }
3516
3517 /// Convert a JSON value to our internal Value type
3518 #[allow(clippy::only_used_in_recursion)]
3519 fn json_to_value(&self, json_value: &JsonValue) -> Result<Value> {
3520 match json_value {
3521 JsonValue::Null => Ok(Value::Null),
3522 JsonValue::Bool(b) => Ok(Value::Bool(*b)),
3523 JsonValue::Number(n) => {
3524 if let Some(i) = n.as_i64() {
3525 Ok(Value::Int(i))
3526 } else if let Some(f) = n.as_f64() {
3527 Ok(Value::Float(f))
3528 } else {
3529 Err(AqlError::invalid_operation(
3530 "Invalid number value".to_string(),
3531 ))
3532 }
3533 }
3534 JsonValue::String(s) => {
3535 // Try parsing as UUID first
3536 if let Ok(uuid) = Uuid::parse_str(s) {
3537 Ok(Value::Uuid(uuid))
3538 } else {
3539 Ok(Value::String(s.clone()))
3540 }
3541 }
3542 JsonValue::Array(arr) => {
3543 let mut values = Vec::new();
3544 for item in arr {
3545 values.push(self.json_to_value(item)?);
3546 }
3547 Ok(Value::Array(values))
3548 }
3549 JsonValue::Object(obj) => {
3550 let mut map = HashMap::new();
3551 for (k, v) in obj {
3552 map.insert(k.clone(), self.json_to_value(v)?);
3553 }
3554 Ok(Value::Object(map))
3555 }
3556 }
3557 }
3558
3559 /// Get collection definition
3560 pub fn get_collection_definition(&self, collection: &str) -> Result<Collection> {
3561 if let Some(data) = self.get(&format!("_collection:{}", collection))? {
3562 let collection_def: Collection = serde_json::from_slice(&data)?;
3563 Ok(collection_def)
3564 } else {
3565 Err(AqlError::new(
3566 ErrorCode::CollectionNotFound,
3567 collection.to_string(),
3568 ))
3569 }
3570 }
3571
3572 /// Get storage statistics and information about the database
3573 pub fn get_database_stats(&self) -> Result<DatabaseStats> {
3574 let hot_stats = self.hot.get_stats();
3575 let cold_stats = self.cold.get_stats()?;
3576
3577 Ok(DatabaseStats {
3578 hot_stats,
3579 cold_stats,
3580 estimated_size: self.cold.estimated_size(),
3581 collections: self.get_collection_stats()?,
3582 })
3583 }
3584
3585 /// Check if a key is currently stored in the hot cache
3586 pub fn is_in_hot_cache(&self, key: &str) -> bool {
3587 self.hot.is_hot(key)
3588 }
3589
3590 /// Clear the hot cache (useful when memory needs to be freed)
3591 pub fn clear_hot_cache(&self) {
3592 self.hot.clear();
3593 println!(
3594 "Hot cache cleared, current hit ratio: {:.2}%",
3595 self.hot.hit_ratio() * 100.0
3596 );
3597 }
3598
3599 /// Prewarm the cache by loading frequently accessed data from cold storage
3600 ///
3601 /// Loads documents from a collection into memory cache to eliminate cold-start
3602 /// latency. Dramatically improves initial query performance after database startup
3603 /// by preloading the most commonly accessed data.
3604 ///
3605 /// # Performance Impact
3606 /// - Prewarming speed: ~20,000 docs/sec
3607 /// - Improves subsequent read latency from ~2ms (disk) to ~0.001ms (memory)
3608 /// - Cache hit rate jumps from 0% to 95%+ for prewarmed data
3609 /// - Memory cost: ~500 bytes per document average
3610 ///
3611 /// # Arguments
3612 /// * `collection` - The collection to prewarm
3613 /// * `limit` - Maximum number of documents to load (default: 1000, None = all)
3614 ///
3615 /// # Returns
3616 /// Number of documents loaded into cache
3617 ///
3618 /// # Examples
3619 ///
3620 /// ```
3621 /// use aurora_db::Aurora;
3622 ///
3623 /// let db = Aurora::open("mydb.db")?;
3624 ///
3625 /// // Prewarm frequently accessed collection
3626 /// let loaded = db.prewarm_cache("users", Some(1000)).await?;
3627 /// println!("Prewarmed {} user documents", loaded);
3628 ///
3629 /// // Now queries are fast from the start
3630 /// let stats_before = db.get_cache_stats();
3631 /// let users = db.query("users").collect().await?;
3632 /// let stats_after = db.get_cache_stats();
3633 ///
3634 /// // High hit rate thanks to prewarming
3635 /// assert!(stats_after.hit_rate > 0.95);
3636 ///
3637 /// // Startup optimization pattern
3638 /// async fn startup_prewarm(db: &Aurora) -> Result<()> {
3639 /// println!("Prewarming caches...");
3640 ///
3641 /// // Prewarm most frequently accessed collections
3642 /// db.prewarm_cache("users", Some(5000)).await?;
3643 /// db.prewarm_cache("sessions", Some(1000)).await?;
3644 /// db.prewarm_cache("products", Some(500)).await?;
3645 ///
3646 /// let stats = db.get_cache_stats();
3647 /// println!("Cache prewarmed: {} entries loaded", stats.size);
3648 ///
3649 /// Ok(())
3650 /// }
3651 ///
3652 /// // Web server startup
3653 /// #[tokio::main]
3654 /// async fn main() {
3655 /// let db = Aurora::open("app.db").unwrap();
3656 ///
3657 /// // Prewarm before accepting requests
3658 /// db.prewarm_cache("users", Some(10000)).await.unwrap();
3659 ///
3660 /// // Server is now ready with hot cache
3661 /// start_web_server(db).await;
3662 /// }
3663 ///
3664 /// // Prewarm all documents (for small collections)
3665 /// let all_loaded = db.prewarm_cache("config", None).await?;
3666 /// // All config documents now in memory
3667 ///
3668 /// // Selective prewarming based on access patterns
3669 /// async fn smart_prewarm(db: &Aurora) -> Result<()> {
3670 /// // Load recent users (they're accessed most)
3671 /// db.prewarm_cache("users", Some(1000)).await?;
3672 ///
3673 /// // Load active sessions only
3674 /// let active_sessions = db.query("sessions")
3675 /// .filter(|f| f.eq("active", Value::Bool(true)))
3676 /// .limit(500)
3677 /// .collect()
3678 /// .await?;
3679 ///
3680 /// // Manually populate cache with hot data
3681 /// for session in active_sessions {
3682 /// // Reading automatically caches
3683 /// db.get_document("sessions", &session.id)?;
3684 /// }
3685 ///
3686 /// Ok(())
3687 /// }
3688 /// ```
3689 ///
3690 /// # Typical Prewarming Scenarios
3691 ///
3692 /// **Web Application Startup:**
3693 /// ```
3694 /// // Load user data, sessions, and active content
3695 /// db.prewarm_cache("users", Some(5000)).await?;
3696 /// db.prewarm_cache("sessions", Some(2000)).await?;
3697 /// db.prewarm_cache("posts", Some(1000)).await?;
3698 /// ```
3699 ///
3700 /// **E-commerce Site:**
3701 /// ```
3702 /// // Load products, categories, and user carts
3703 /// db.prewarm_cache("products", Some(500)).await?;
3704 /// db.prewarm_cache("categories", None).await?; // All categories
3705 /// db.prewarm_cache("active_carts", Some(1000)).await?;
3706 /// ```
3707 ///
3708 /// **API Server:**
3709 /// ```
3710 /// // Load authentication data and rate limits
3711 /// db.prewarm_cache("api_keys", None).await?;
3712 /// db.prewarm_cache("rate_limits", Some(10000)).await?;
3713 /// ```
3714 ///
3715 /// # When to Use
3716 /// - At application startup to eliminate cold-start latency
3717 /// - After cache clear operations
3718 /// - Before high-traffic events (product launches, etc.)
3719 /// - When deploying new instances (load balancer warm-up)
3720 ///
3721 /// # Memory Considerations
3722 /// - 1,000 docs ≈ 500 KB memory
3723 /// - 10,000 docs ≈ 5 MB memory
3724 /// - 100,000 docs ≈ 50 MB memory
3725 /// - Stay within configured cache capacity
3726 ///
3727 /// # See Also
3728 /// - `get_cache_stats()` to monitor cache effectiveness
3729 /// - `prewarm_all_collections()` to prewarm all collections
3730 /// - `Aurora::with_config()` to adjust cache capacity
3731 pub async fn prewarm_cache(&self, collection: &str, limit: Option<usize>) -> Result<usize> {
3732 let limit = limit.unwrap_or(1000);
3733 let prefix = format!("{}:", collection);
3734 let mut loaded = 0;
3735
3736 for entry in self.cold.scan_prefix(&prefix) {
3737 if loaded >= limit {
3738 break;
3739 }
3740
3741 if let Ok((key, value)) = entry {
3742 // Load into hot cache
3743 self.hot.set(Arc::new(key.clone()), Arc::new(value), None);
3744 loaded += 1;
3745 }
3746 }
3747
3748 println!("Prewarmed {} with {} documents", collection, loaded);
3749 Ok(loaded)
3750 }
3751
3752 /// Prewarm cache for all collections
3753 pub async fn prewarm_all_collections(
3754 &self,
3755 docs_per_collection: Option<usize>,
3756 ) -> Result<HashMap<String, usize>> {
3757 let mut stats = HashMap::new();
3758
3759 // Get all collections
3760 let collections: Vec<String> = self
3761 .cold
3762 .scan()
3763 .filter_map(|r| r.ok())
3764 .map(|(k, _)| k)
3765 .filter(|k| k.starts_with("_collection:"))
3766 .map(|k| k.trim_start_matches("_collection:").to_string())
3767 .collect();
3768
3769 for collection in collections {
3770 let loaded = self.prewarm_cache(&collection, docs_per_collection).await?;
3771 stats.insert(collection, loaded);
3772 }
3773
3774 Ok(stats)
3775 }
3776
3777 /// Store multiple key-value pairs efficiently in a single batch operation
3778 ///
3779 /// Low-level batch write operation that bypasses document validation and
3780 /// writes raw byte data directly to storage. Useful for advanced use cases,
3781 /// custom serialization, or maximum performance scenarios.
3782 ///
3783 /// # Performance
3784 /// - Write speed: ~100,000 writes/sec
3785 /// - Single disk fsync for entire batch
3786 /// - No validation or schema checking
3787 /// - Direct storage access
3788 ///
3789 /// # Arguments
3790 /// * `pairs` - Vector of (key, value) tuples where value is raw bytes
3791 ///
3792 /// # Returns
3793 /// Success or an error
3794 ///
3795 /// # Examples
3796 ///
3797 /// ```
3798 /// use aurora_db::Aurora;
3799 ///
3800 /// let db = Aurora::open("mydb.db")?;
3801 ///
3802 /// // Low-level batch write
3803 /// let pairs = vec![
3804 /// ("users:123".to_string(), b"raw data 1".to_vec()),
3805 /// ("users:456".to_string(), b"raw data 2".to_vec()),
3806 /// ("cache:key1".to_string(), b"cached value".to_vec()),
3807 /// ];
3808 ///
3809 /// db.batch_write(pairs)?;
3810 ///
3811 /// // Custom binary serialization
3812 /// use bincode;
3813 ///
3814 /// #[derive(Serialize, Deserialize)]
3815 /// struct CustomData {
3816 /// id: u64,
3817 /// payload: Vec<u8>,
3818 /// }
3819 ///
3820 /// let custom_data = vec![
3821 /// CustomData { id: 1, payload: vec![1, 2, 3] },
3822 /// CustomData { id: 2, payload: vec![4, 5, 6] },
3823 /// ];
3824 ///
3825 /// let pairs: Vec<(String, Vec<u8>)> = custom_data
3826 /// .iter()
3827 /// .map(|data| {
3828 /// let key = format!("binary:{}", data.id);
3829 /// let value = bincode::serialize(data).unwrap();
3830 /// (key, value)
3831 /// })
3832 /// .collect();
3833 ///
3834 /// db.batch_write(pairs)?;
3835 ///
3836 /// // Bulk cache population
3837 /// let cache_entries: Vec<(String, Vec<u8>)> = (0..10000)
3838 /// .map(|i| {
3839 /// let key = format!("cache:item_{}", i);
3840 /// let value = format!("value_{}", i).into_bytes();
3841 /// (key, value)
3842 /// })
3843 /// .collect();
3844 ///
3845 /// db.batch_write(cache_entries)?;
3846 /// // Writes 10,000 entries in ~100ms
3847 /// ```
3848 ///
3849 /// # Important Notes
3850 /// - No schema validation performed
3851 /// - No unique constraint checking
3852 /// - No automatic indexing
3853 /// - Keys must follow "collection:id" format for proper grouping
3854 /// - Values are raw bytes - you handle serialization
3855 /// - Use `batch_insert()` for validated document inserts
3856 ///
3857 /// # When to Use
3858 /// - Maximum write performance needed
3859 /// - Custom serialization formats (bincode, msgpack, etc.)
3860 /// - Cache population
3861 /// - Low-level database operations
3862 /// - You're bypassing the document model
3863 ///
3864 /// # When NOT to Use
3865 /// - Regular document inserts → Use `batch_insert()` instead
3866 /// - Need validation → Use `batch_insert()` instead
3867 /// - Need indexing → Use `batch_insert()` instead
3868 ///
3869 /// # See Also
3870 /// - `batch_insert()` for validated document batch inserts
3871 /// - `put()` for single key-value writes
3872 pub async fn batch_write(&self, pairs: Vec<(String, Vec<u8>)>) -> Result<()> {
3873 // Group pairs by collection name
3874 let mut collections: HashMap<String, Vec<(String, Vec<u8>)>> = HashMap::new();
3875 for (key, value) in &pairs {
3876 if let Some(collection_name) = key.split(':').next() {
3877 collections
3878 .entry(collection_name.to_string())
3879 .or_default()
3880 .push((key.clone(), value.clone()));
3881 }
3882 }
3883
3884 // First, do the batch write to cold storage for all pairs
3885 self.cold.batch_set(pairs)?;
3886
3887 // Then, process each collection for in-memory updates
3888 for (collection_name, batch) in collections {
3889 // --- Optimized Batch Indexing ---
3890
3891 // 1. Get schema once for the entire collection batch
3892 let collection_obj = match self.schema_cache.get(&collection_name) {
3893 Some(cached_schema) => Arc::clone(cached_schema.value()),
3894 None => {
3895 let collection_key = format!("_collection:{}", collection_name);
3896 match self.get(&collection_key)? {
3897 Some(data) => {
3898 let obj: Collection = serde_json::from_slice(&data)?;
3899 let arc_obj = Arc::new(obj);
3900 self.schema_cache
3901 .insert(collection_name.to_string(), Arc::clone(&arc_obj));
3902 arc_obj
3903 }
3904 None => continue,
3905 }
3906 }
3907 };
3908
3909 let indexed_fields: Vec<String> = collection_obj
3910 .fields
3911 .iter()
3912 .filter(|(_, def)| def.indexed || def.unique)
3913 .map(|(name, _)| name.clone())
3914 .collect();
3915
3916 let primary_index = self
3917 .primary_indices
3918 .entry(collection_name.to_string())
3919 .or_default();
3920
3921 for (key, value) in batch {
3922 // 2. Update hot cache
3923 if self.should_cache_key(&key) {
3924 self.hot
3925 .set(Arc::new(key.clone()), Arc::new(value.clone()), None);
3926 }
3927
3928 // 3. Update primary index with metadata only
3929 let location = DiskLocation::new(value.len());
3930 primary_index.insert(key.clone(), location);
3931
3932 // 4. Update secondary indices
3933 if !indexed_fields.is_empty()
3934 && let Ok(doc) = serde_json::from_slice::<Document>(&value)
3935 {
3936 for (field, field_value) in doc.data {
3937 if indexed_fields.contains(&field) {
3938 let value_str = match &field_value {
3939 Value::String(s) => s.clone(),
3940 _ => field_value.to_string(),
3941 };
3942 let index_key = format!("{}:{}", collection_name, field);
3943 let secondary_index =
3944 self.secondary_indices.entry(index_key).or_default();
3945
3946 let max_entries = self.config.max_index_entries_per_field;
3947 secondary_index
3948 .entry(value_str)
3949 .and_modify(|doc_ids| {
3950 if doc_ids.len() < max_entries {
3951 doc_ids.push(key.to_string());
3952 }
3953 })
3954 .or_insert_with(|| vec![key.to_string()]);
3955 }
3956 }
3957 }
3958 }
3959 }
3960
3961 Ok(())
3962 }
3963
3964 /// Scan for keys with a specific prefix
3965 pub fn scan_with_prefix(
3966 &self,
3967 prefix: &str,
3968 ) -> impl Iterator<Item = Result<(String, Vec<u8>)>> + '_ {
3969 self.cold.scan_prefix(prefix)
3970 }
3971
3972 /// Get storage efficiency metrics for the database
3973 pub fn get_collection_stats(&self) -> Result<HashMap<String, CollectionStats>> {
3974 let mut stats = HashMap::new();
3975
3976 // Scan all collections
3977 let collections: Vec<String> = self
3978 .cold
3979 .scan()
3980 .filter_map(|r| r.ok())
3981 .map(|(k, _)| k)
3982 .filter(|k| k.starts_with("_collection:"))
3983 .map(|k| k.trim_start_matches("_collection:").to_string())
3984 .collect();
3985
3986 for collection in collections {
3987 // Use primary index for fast stats (count + size from DiskLocation)
3988 let (count, size) = if let Some(index) = self.primary_indices.get(&collection) {
3989 let count = index.len();
3990 // Sum up sizes from DiskLocation metadata (much faster than disk scan)
3991 let size: usize = index.iter().map(|entry| entry.value().size as usize).sum();
3992 (count, size)
3993 } else {
3994 // Fallback: scan from cold storage if index not available
3995 let prefix = format!("{}:", collection);
3996 let count = self.cold.scan_prefix(&prefix).count();
3997 let size: usize = self
3998 .cold
3999 .scan_prefix(&prefix)
4000 .filter_map(|r| r.ok())
4001 .map(|(_, v)| v.len())
4002 .sum();
4003 (count, size)
4004 };
4005
4006 stats.insert(
4007 collection,
4008 CollectionStats {
4009 count,
4010 size_bytes: size,
4011 avg_doc_size: if count > 0 { size / count } else { 0 },
4012 },
4013 );
4014 }
4015
4016 Ok(stats)
4017 }
4018
4019 /// Search for documents by exact value using an index
4020 ///
4021 /// This method performs a fast lookup using a pre-created index
4022 pub fn search_by_value(
4023 &self,
4024 collection: &str,
4025 field: &str,
4026 value: &Value,
4027 ) -> Result<Vec<Document>> {
4028 let index_key = format!("_index:{}:{}", collection, field);
4029
4030 if let Some(index_data) = self.get(&index_key)? {
4031 let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
4032 let index = Index::new(index_def);
4033
4034 // Use the previously unused search method
4035 if let Some(doc_ids) = index.search(value) {
4036 // Load the documents by ID
4037 let mut docs = Vec::new();
4038 for id in doc_ids {
4039 if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
4040 let doc: Document = serde_json::from_slice(&doc_data)?;
4041 docs.push(doc);
4042 }
4043 }
4044 return Ok(docs);
4045 }
4046 }
4047
4048 // Return empty result if no index or no matches
4049 Ok(Vec::new())
4050 }
4051
4052 /// Perform a full-text search on an indexed text field
4053 ///
4054 /// This provides more advanced text search capabilities including
4055 /// relevance ranking of results
4056 pub fn full_text_search(
4057 &self,
4058 collection: &str,
4059 field: &str,
4060 query: &str,
4061 ) -> Result<Vec<Document>> {
4062 let index_key = format!("_index:{}:{}", collection, field);
4063
4064 if let Some(index_data) = self.get(&index_key)? {
4065 let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
4066
4067 // Ensure this is a full-text index
4068 if !matches!(index_def.index_type, IndexType::FullText) {
4069 return Err(AqlError::invalid_operation(format!(
4070 "Field '{}' is not indexed as full-text",
4071 field
4072 )));
4073 }
4074
4075 let index = Index::new(index_def);
4076
4077 // Use the previously unused search_text method
4078 if let Some(doc_id_scores) = index.search_text(query) {
4079 // Load the documents by ID, preserving score order
4080 let mut docs = Vec::new();
4081 for (id, _score) in doc_id_scores {
4082 if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
4083 let doc: Document = serde_json::from_slice(&doc_data)?;
4084 docs.push(doc);
4085 }
4086 }
4087 return Ok(docs);
4088 }
4089 }
4090
4091 // Return empty result if no index or no matches
4092 Ok(Vec::new())
4093 }
4094
4095 /// Create a full-text search index on a text field
4096 pub async fn create_text_index(
4097 &self,
4098 collection: &str,
4099 field: &str,
4100 _enable_stop_words: bool,
4101 ) -> Result<()> {
4102 // Check if collection exists
4103 if self.get(&format!("_collection:{}", collection))?.is_none() {
4104 return Err(AqlError::new(
4105 ErrorCode::CollectionNotFound,
4106 collection.to_string(),
4107 ));
4108 }
4109
4110 // Create index definition
4111 let index_def = IndexDefinition {
4112 name: format!("{}_{}_fulltext", collection, field),
4113 collection: collection.to_string(),
4114 fields: vec![field.to_string()],
4115 index_type: IndexType::FullText,
4116 unique: false,
4117 };
4118
4119 // Store index definition
4120 let index_key = format!("_index:{}:{}", collection, field);
4121 self.put(index_key, serde_json::to_vec(&index_def)?, None)
4122 .await?;
4123
4124 // Create the actual index
4125 let index = Index::new(index_def);
4126
4127 // Index all existing documents in the collection
4128 let prefix = format!("{}:", collection);
4129 for (_, data) in self.cold.scan_prefix(&prefix).flatten() {
4130 let doc: Document = serde_json::from_slice(&data)?;
4131 index.insert(&doc)?;
4132 }
4133
4134 Ok(())
4135 }
4136
4137 pub async fn execute_simple_query(
4138 &self,
4139 builder: &SimpleQueryBuilder,
4140 ) -> Result<Vec<Document>> {
4141 // Ensure indices are initialized
4142 self.ensure_indices_initialized().await?;
4143
4144 // A place to store the IDs of the documents we need to fetch
4145 let mut doc_ids_to_load: Option<Vec<String>> = None;
4146
4147 // --- The "Query Planner" ---
4148 // Smart heuristic: For range queries with small LIMITs, full scan can be faster
4149 // than collecting millions of IDs from secondary index
4150 let use_index_for_range = if let Some(limit) = builder.limit {
4151 // If limit is small (< 1000), prefer full scan for range queries
4152 // The secondary index would scan all entries anyway, might as well
4153 // scan documents directly and benefit from early termination
4154 limit >= 1000
4155 } else {
4156 // No limit? Index might still help if result set is small
4157 true
4158 };
4159
4160 // Look for an opportunity to use an index
4161 for (_filter_idx, filter) in builder.filters.iter().enumerate() {
4162 match filter {
4163 Filter::Eq(field, value) => {
4164 let index_key = format!("{}:{}", &builder.collection, field);
4165
4166 // Do we have a secondary index for this field?
4167 if let Some(index) = self.secondary_indices.get(&index_key) {
4168 // Yes! Let's use it.
4169 if let Some(matching_ids) = index.get(&value.to_string()) {
4170 doc_ids_to_load = Some(matching_ids.clone());
4171 break; // Stop searching for other indexes for now
4172 }
4173 }
4174 }
4175 Filter::Gt(field, value)
4176 | Filter::Gte(field, value)
4177 | Filter::Lt(field, value)
4178 | Filter::Lte(field, value) => {
4179 // Skip index for range queries with small LIMITs (see query planner heuristic above)
4180 if !use_index_for_range {
4181 continue;
4182 }
4183
4184 let index_key = format!("{}:{}", &builder.collection, field);
4185
4186 // Do we have a secondary index for this field?
4187 if let Some(index) = self.secondary_indices.get(&index_key) {
4188 // For range queries, we need to scan through the index values
4189 let mut matching_ids = Vec::new();
4190
4191 for entry in index.iter() {
4192 let index_value_str = entry.key();
4193
4194 // Try to parse the index value to compare with our filter value
4195 if let Ok(index_value) =
4196 self.parse_value_from_string(index_value_str, value)
4197 {
4198 let matches = match filter {
4199 Filter::Gt(_, filter_val) => index_value > *filter_val,
4200 Filter::Gte(_, filter_val) => index_value >= *filter_val,
4201 Filter::Lt(_, filter_val) => index_value < *filter_val,
4202 Filter::Lte(_, filter_val) => index_value <= *filter_val,
4203 _ => false,
4204 };
4205
4206 if matches {
4207 matching_ids.extend(entry.value().clone());
4208 }
4209 }
4210 }
4211
4212 if !matching_ids.is_empty() {
4213 doc_ids_to_load = Some(matching_ids);
4214 break;
4215 }
4216 }
4217 }
4218 Filter::Contains(field, search_term) => {
4219 let index_key = format!("{}:{}", &builder.collection, field);
4220
4221 // Do we have a secondary index for this field?
4222 if let Some(index) = self.secondary_indices.get(&index_key) {
4223 let mut matching_ids = Vec::new();
4224
4225 for entry in index.iter() {
4226 let index_value_str = entry.key();
4227
4228 // Check if this indexed value contains our search term
4229 if index_value_str
4230 .to_lowercase()
4231 .contains(&search_term.to_lowercase())
4232 {
4233 matching_ids.extend(entry.value().clone());
4234 }
4235 }
4236
4237 if !matching_ids.is_empty() {
4238 // Remove duplicates since a document could match multiple indexed values
4239 matching_ids.sort();
4240 matching_ids.dedup();
4241
4242 doc_ids_to_load = Some(matching_ids);
4243 break;
4244 }
4245 }
4246 }
4247 Filter::And(_) => {
4248 // For compound filters, we can't easily use a single index
4249 // This would require more complex query planning
4250 continue;
4251 }
4252 Filter::Or(sub_filters) => {
4253 // For OR filters, collect union of all matching IDs from each sub-filter
4254 let mut union_ids: Vec<String> = Vec::new();
4255 let mut used_index = false;
4256
4257 for sub_filter in sub_filters {
4258 match sub_filter {
4259 Filter::Eq(field, value) => {
4260 let index_key = format!("{}:{}", &builder.collection, field);
4261 if let Some(index) = self.secondary_indices.get(&index_key) {
4262 if let Some(matching_ids) = index.get(&value.to_string()) {
4263 union_ids.extend(matching_ids.clone());
4264 used_index = true;
4265 }
4266 }
4267 }
4268 // For other filter types in OR, we fall back to full scan
4269 _ => continue,
4270 }
4271 }
4272
4273 if used_index && !union_ids.is_empty() {
4274 // Remove duplicates
4275 union_ids.sort();
4276 union_ids.dedup();
4277 doc_ids_to_load = Some(union_ids);
4278 break;
4279 }
4280 // If no index was used, continue to full scan
4281 }
4282 }
4283 }
4284
4285 let mut final_docs: Vec<Document>;
4286
4287 if let Some(ids) = doc_ids_to_load {
4288 // Index path
4289 use std::io::Write;
4290 if let Ok(mut file) = std::fs::OpenOptions::new()
4291 .create(true)
4292 .append(true)
4293 .open("/tmp/aurora_query_stats.log")
4294 {
4295 let _ = writeln!(
4296 file,
4297 "[INDEX PATH] IDs to load: {} | Collection: {}",
4298 ids.len(),
4299 builder.collection
4300 );
4301 }
4302
4303 final_docs = Vec::with_capacity(ids.len());
4304
4305 for id in ids {
4306 let doc_key = format!("{}:{}", &builder.collection, id);
4307 if let Some(data) = self.get(&doc_key)?
4308 && let Ok(doc) = serde_json::from_slice::<Document>(&data)
4309 {
4310 final_docs.push(doc);
4311 }
4312 }
4313 } else {
4314 // --- Path 2: Full Collection Scan with Early Termination ---
4315
4316 // Optimization: If we have a LIMIT but no ORDER BY, we can stop scanning
4317 // as soon as we have enough matching documents
4318 let early_termination_target = if builder.order_by.is_none() {
4319 builder.limit.map(|l| l + builder.offset.unwrap_or(0))
4320 } else {
4321 // With ORDER BY, we need all matching docs to sort correctly
4322 None
4323 };
4324
4325 // Smart scan with early termination support
4326 final_docs = Vec::new();
4327 let mut scan_stats = (0usize, 0usize, 0usize); // (keys_scanned, docs_fetched, matches_found)
4328
4329 if let Some(index) = self.primary_indices.get(&builder.collection) {
4330 for entry in index.iter() {
4331 let key = entry.key();
4332 scan_stats.0 += 1; // keys scanned
4333
4334 // Early termination check
4335 if let Some(target) = early_termination_target {
4336 if final_docs.len() >= target {
4337 break; // We have enough documents!
4338 }
4339 }
4340
4341 // Fetch and filter document
4342 if let Some(data) = self.get(key)? {
4343 scan_stats.1 += 1; // docs fetched
4344
4345 if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
4346 // Apply all filters
4347 let matches_all_filters =
4348 builder.filters.iter().all(|filter| match filter {
4349 Filter::Eq(field, value) => doc.data.get(field) == Some(value),
4350 Filter::Gt(field, value) => {
4351 doc.data.get(field).is_some_and(|v| v > value)
4352 }
4353 Filter::Gte(field, value) => {
4354 doc.data.get(field).is_some_and(|v| v >= value)
4355 }
4356 Filter::Lt(field, value) => {
4357 doc.data.get(field).is_some_and(|v| v < value)
4358 }
4359 Filter::Lte(field, value) => {
4360 doc.data.get(field).is_some_and(|v| v <= value)
4361 }
4362 Filter::Contains(field, value_str) => {
4363 doc.data.get(field).is_some_and(|v| match v {
4364 Value::String(s) => s.contains(value_str),
4365 Value::Array(arr) => {
4366 arr.contains(&Value::String(value_str.clone()))
4367 }
4368 _ => false,
4369 })
4370 }
4371 Filter::And(filters) => filters.iter().all(|f| f.matches(&doc)),
4372 Filter::Or(filters) => filters.iter().any(|f| f.matches(&doc)),
4373 });
4374
4375 if matches_all_filters {
4376 scan_stats.2 += 1; // matches found
4377 final_docs.push(doc);
4378 }
4379 }
4380 }
4381 }
4382
4383 // Debug logging for query performance analysis
4384 use std::io::Write;
4385 if let Ok(mut file) = std::fs::OpenOptions::new()
4386 .create(true)
4387 .append(true)
4388 .open("/tmp/aurora_query_stats.log")
4389 {
4390 let _ = writeln!(
4391 file,
4392 "[SCAN PATH] Scanned: {} keys | Fetched: {} docs | Matched: {} | Collection: {}",
4393 scan_stats.0, scan_stats.1, scan_stats.2, builder.collection
4394 );
4395 }
4396 } else {
4397 // Fallback: scan from cold storage if index not initialized
4398 final_docs = self.get_all_collection(&builder.collection).await?;
4399
4400 // Apply filters
4401 final_docs.retain(|doc| {
4402 builder.filters.iter().all(|filter| match filter {
4403 Filter::Eq(field, value) => doc.data.get(field) == Some(value),
4404 Filter::Gt(field, value) => doc.data.get(field).is_some_and(|v| v > value),
4405 Filter::Gte(field, value) => {
4406 doc.data.get(field).is_some_and(|v| v >= value)
4407 }
4408 Filter::Lt(field, value) => doc.data.get(field).is_some_and(|v| v < value),
4409 Filter::Lte(field, value) => {
4410 doc.data.get(field).is_some_and(|v| v <= value)
4411 }
4412 Filter::Contains(field, value_str) => {
4413 doc.data.get(field).is_some_and(|v| match v {
4414 Value::String(s) => s.contains(value_str),
4415 Value::Array(arr) => {
4416 arr.contains(&Value::String(value_str.clone()))
4417 }
4418 _ => false,
4419 })
4420 }
4421 Filter::And(filters) => filters.iter().all(|f| f.matches(doc)),
4422 Filter::Or(filters) => filters.iter().any(|f| f.matches(doc)),
4423 })
4424 });
4425 }
4426 }
4427
4428 // Apply ordering
4429 if let Some((field, ascending)) = &builder.order_by {
4430 final_docs.sort_by(|a, b| match (a.data.get(field), b.data.get(field)) {
4431 (Some(v1), Some(v2)) => {
4432 let cmp = v1.cmp(v2);
4433 if *ascending { cmp } else { cmp.reverse() }
4434 }
4435 (None, Some(_)) => std::cmp::Ordering::Less,
4436 (Some(_), None) => std::cmp::Ordering::Greater,
4437 (None, None) => std::cmp::Ordering::Equal,
4438 });
4439 }
4440
4441 // Apply offset and limit
4442 let start = builder.offset.unwrap_or(0);
4443 let end = builder
4444 .limit
4445 .map(|l| start.saturating_add(l))
4446 .unwrap_or(final_docs.len());
4447
4448 let end = end.min(final_docs.len());
4449 Ok(final_docs.get(start..end).unwrap_or(&[]).to_vec())
4450 }
4451
4452 /// Helper method to parse a string value back to a Value for comparison
4453 fn parse_value_from_string(&self, value_str: &str, reference_value: &Value) -> Result<Value> {
4454 match reference_value {
4455 Value::Int(_) => {
4456 if let Ok(i) = value_str.parse::<i64>() {
4457 Ok(Value::Int(i))
4458 } else {
4459 Err(AqlError::invalid_operation(
4460 "Failed to parse int".to_string(),
4461 ))
4462 }
4463 }
4464 Value::Float(_) => {
4465 if let Ok(f) = value_str.parse::<f64>() {
4466 Ok(Value::Float(f))
4467 } else {
4468 Err(AqlError::invalid_operation(
4469 "Failed to parse float".to_string(),
4470 ))
4471 }
4472 }
4473 Value::String(_) => Ok(Value::String(value_str.to_string())),
4474 _ => Ok(Value::String(value_str.to_string())),
4475 }
4476 }
4477
4478 pub async fn execute_dynamic_query(
4479 &self,
4480 collection: &str,
4481 payload: &QueryPayload,
4482 ) -> Result<Vec<Document>> {
4483 let mut docs = self.get_all_collection(collection).await?;
4484
4485 // 1. Apply Filters
4486 if let Some(filters) = &payload.filters {
4487 docs.retain(|doc| {
4488 filters.iter().all(|filter| {
4489 doc.data
4490 .get(&filter.field)
4491 .is_some_and(|doc_val| check_filter(doc_val, filter))
4492 })
4493 });
4494 }
4495
4496 // 2. Apply Sorting
4497 if let Some(sort_options) = &payload.sort {
4498 docs.sort_by(|a, b| {
4499 let a_val = a.data.get(&sort_options.field);
4500 let b_val = b.data.get(&sort_options.field);
4501 let ordering = a_val
4502 .partial_cmp(&b_val)
4503 .unwrap_or(std::cmp::Ordering::Equal);
4504 if sort_options.ascending {
4505 ordering
4506 } else {
4507 ordering.reverse()
4508 }
4509 });
4510 }
4511
4512 // 3. Apply Pagination
4513 if let Some(offset) = payload.offset {
4514 docs = docs.into_iter().skip(offset).collect();
4515 }
4516 if let Some(limit) = payload.limit {
4517 docs = docs.into_iter().take(limit).collect();
4518 }
4519
4520 // 4. Apply Field Selection (Projection)
4521 if let Some(select_fields) = &payload.select
4522 && !select_fields.is_empty()
4523 {
4524 docs = docs
4525 .into_iter()
4526 .map(|mut doc| {
4527 doc.data.retain(|key, _| select_fields.contains(key));
4528 doc
4529 })
4530 .collect();
4531 }
4532
4533 Ok(docs)
4534 }
4535
4536 pub async fn process_network_request(
4537 &self,
4538 request: crate::network::protocol::Request,
4539 ) -> crate::network::protocol::Response {
4540 use crate::network::protocol::Response;
4541
4542 match request {
4543 crate::network::protocol::Request::Get(key) => match self.get(&key) {
4544 Ok(value) => Response::Success(value),
4545 Err(e) => Response::Error(e.to_string()),
4546 },
4547 crate::network::protocol::Request::Put(key, value) => {
4548 match self.put(key, value, None).await {
4549 Ok(_) => Response::Done,
4550 Err(e) => Response::Error(e.to_string()),
4551 }
4552 }
4553 crate::network::protocol::Request::Delete(key) => match self.delete(&key).await {
4554 Ok(_) => Response::Done,
4555 Err(e) => Response::Error(e.to_string()),
4556 },
4557 crate::network::protocol::Request::NewCollection { name, fields } => {
4558 let fields_for_db: Vec<(String, crate::types::FieldType, bool)> = fields
4559 .iter()
4560 .map(|(name, ft, unique)| (name.clone(), ft.clone(), *unique))
4561 .collect();
4562
4563 match self.new_collection(&name, fields_for_db).await {
4564 Ok(_) => Response::Done,
4565 Err(e) => Response::Error(e.to_string()),
4566 }
4567 }
4568 crate::network::protocol::Request::Insert { collection, data } => {
4569 match self.insert_map(&collection, data).await {
4570 Ok(id) => Response::Message(id),
4571 Err(e) => Response::Error(e.to_string()),
4572 }
4573 }
4574 crate::network::protocol::Request::GetDocument { collection, id } => {
4575 match self.get_document(&collection, &id) {
4576 Ok(doc) => Response::Document(doc),
4577 Err(e) => Response::Error(e.to_string()),
4578 }
4579 }
4580 crate::network::protocol::Request::Query(builder) => {
4581 match self.execute_simple_query(&builder).await {
4582 Ok(docs) => Response::Documents(docs),
4583 Err(e) => Response::Error(e.to_string()),
4584 }
4585 }
4586 crate::network::protocol::Request::BeginTransaction => {
4587 let tx_id = self.begin_transaction();
4588 Response::TransactionId(tx_id.as_u64())
4589 }
4590 crate::network::protocol::Request::CommitTransaction(tx_id_u64) => {
4591 let tx_id = crate::transaction::TransactionId::from_u64(tx_id_u64);
4592 match self.commit_transaction(tx_id) {
4593 Ok(_) => Response::Done,
4594 Err(e) => Response::Error(e.to_string()),
4595 }
4596 }
4597 crate::network::protocol::Request::RollbackTransaction(tx_id_u64) => {
4598 let tx_id = crate::transaction::TransactionId::from_u64(tx_id_u64);
4599 match self.rollback_transaction(tx_id) {
4600 Ok(_) => Response::Done,
4601 Err(e) => Response::Error(e.to_string()),
4602 }
4603 }
4604 }
4605 }
4606
4607 /// Create indices for commonly queried fields automatically
4608 ///
4609 /// This is a convenience method that creates indices for fields that are
4610 /// likely to be queried frequently, improving performance.
4611 ///
4612 /// # Arguments
4613 /// * `collection` - Name of the collection
4614 /// * `fields` - List of field names to create indices for
4615 ///
4616 /// # Examples
4617 /// ```
4618 /// // Create indices for commonly queried fields
4619 /// db.create_indices("users", &["email", "status", "created_at"]).await?;
4620 /// ```
4621 pub async fn create_indices(&self, collection: &str, fields: &[&str]) -> Result<()> {
4622 for field in fields {
4623 if let Err(e) = self.create_index(collection, field).await {
4624 eprintln!(
4625 "Warning: Failed to create index for {}.{}: {}",
4626 collection, field, e
4627 );
4628 } else {
4629 println!("Created index for {}.{}", collection, field);
4630 }
4631 }
4632 Ok(())
4633 }
4634
4635 /// Get index statistics for a collection
4636 ///
4637 /// This helps understand which indices exist and how effective they are.
4638 pub fn get_index_stats(&self, collection: &str) -> HashMap<String, IndexStats> {
4639 let mut stats = HashMap::new();
4640
4641 for entry in self.secondary_indices.iter() {
4642 let key = entry.key();
4643 if key.starts_with(&format!("{}:", collection)) {
4644 let field = key.split(':').nth(1).unwrap_or("unknown");
4645 let index = entry.value();
4646
4647 let unique_values = index.len();
4648 let total_documents: usize = index.iter().map(|entry| entry.value().len()).sum();
4649
4650 stats.insert(
4651 field.to_string(),
4652 IndexStats {
4653 unique_values,
4654 total_documents,
4655 avg_docs_per_value: if unique_values > 0 {
4656 total_documents / unique_values
4657 } else {
4658 0
4659 },
4660 },
4661 );
4662 }
4663 }
4664
4665 stats
4666 }
4667
4668 /// Optimize a collection by creating indices for frequently filtered fields
4669 ///
4670 /// This analyzes common query patterns and suggests/creates optimal indices.
4671 pub async fn optimize_collection(&self, collection: &str) -> Result<()> {
4672 if let Ok(collection_def) = self.get_collection_definition(collection) {
4673 let field_names: Vec<&str> = collection_def.fields.keys().map(|s| s.as_str()).collect();
4674 self.create_indices(collection, &field_names).await?;
4675 }
4676
4677 Ok(())
4678 }
4679
4680 // Helper method to get unique fields from a collection
4681 fn get_unique_fields(&self, collection: &Collection) -> Vec<String> {
4682 collection
4683 .fields
4684 .iter()
4685 .filter(|(_, def)| def.unique)
4686 .map(|(name, _)| name.clone())
4687 .collect()
4688 }
4689
4690 // Update the validation method to use the helper
4691 async fn validate_unique_constraints(
4692 &self,
4693 collection: &str,
4694 data: &HashMap<String, Value>,
4695 ) -> Result<()> {
4696 self.ensure_indices_initialized().await?;
4697 let collection_def = match self.get_collection_definition(collection) {
4698 Ok(def) => def,
4699 Err(e) if e.code == crate::error::ErrorCode::CollectionNotFound => return Ok(()),
4700 Err(e) => return Err(e),
4701 };
4702 let unique_fields = self.get_unique_fields(&collection_def);
4703
4704 for unique_field in &unique_fields {
4705 if let Some(value) = data.get(unique_field) {
4706 let index_key = format!("{}:{}", collection, unique_field);
4707 if let Some(index) = self.secondary_indices.get(&index_key) {
4708 // Get the raw string value without JSON formatting
4709 let value_str = match value {
4710 Value::String(s) => s.clone(),
4711 _ => value.to_string(),
4712 };
4713 if index.contains_key(&value_str) {
4714 return Err(AqlError::new(
4715 ErrorCode::UniqueConstraintViolation,
4716 format!(
4717 "Unique constraint violation on field '{}' with value '{}'",
4718 unique_field, value_str
4719 ),
4720 ));
4721 }
4722 }
4723 }
4724 }
4725 Ok(())
4726 }
4727
4728 /// Validate unique constraints excluding a specific document ID (for updates)
4729 async fn validate_unique_constraints_excluding(
4730 &self,
4731 collection: &str,
4732 data: &HashMap<String, Value>,
4733 exclude_id: &str,
4734 ) -> Result<()> {
4735 self.ensure_indices_initialized().await?;
4736 let collection_def = self.get_collection_definition(collection)?;
4737 let unique_fields = self.get_unique_fields(&collection_def);
4738
4739 for unique_field in &unique_fields {
4740 if let Some(value) = data.get(unique_field) {
4741 let index_key = format!("{}:{}", collection, unique_field);
4742 if let Some(index) = self.secondary_indices.get(&index_key) {
4743 // Get the raw string value without JSON formatting
4744 let value_str = match value {
4745 Value::String(s) => s.clone(),
4746 _ => value.to_string(),
4747 };
4748 if let Some(doc_ids) = index.get(&value_str) {
4749 // Check if any document other than the excluded one has this value
4750 let exclude_key = format!("{}:{}", collection, exclude_id);
4751 for doc_key in doc_ids.value() {
4752 if doc_key != &exclude_key {
4753 return Err(AqlError::new(
4754 ErrorCode::UniqueConstraintViolation,
4755 format!(
4756 "Unique constraint violation on field '{}' with value '{}'",
4757 unique_field, value_str
4758 ),
4759 ));
4760 }
4761 }
4762 }
4763 }
4764 }
4765 }
4766 Ok(())
4767 }
4768
4769 // =========================================================================
4770 // AQL Executor Helper Methods
4771 // These are wrapper methods for AQL executor integration.
4772 // They provide a simplified API compatible with the AQL query execution layer.
4773 // =========================================================================
4774
4775 /// Get all documents in a collection (AQL helper)
4776 ///
4777 /// This is a wrapper around the internal query system optimized for bulk retrieval.
4778 pub async fn aql_get_all_collection(&self, collection: &str) -> Result<Vec<Document>> {
4779 self.ensure_indices_initialized().await?;
4780
4781 let prefix = format!("{}:", collection);
4782 let mut docs = Vec::new();
4783
4784 for result in self.cold.scan_prefix(&prefix) {
4785 let (_, value) = result?;
4786 if let Ok(doc) = serde_json::from_slice::<Document>(&value) {
4787 docs.push(doc);
4788 }
4789 }
4790
4791 Ok(docs)
4792 }
4793
4794 /// Insert a document from a HashMap (AQL helper)
4795 ///
4796 /// Returns the complete document (not just ID) for AQL executor
4797 pub async fn aql_insert(
4798 &self,
4799 collection: &str,
4800 data: HashMap<String, Value>,
4801 ) -> Result<Document> {
4802 // Validate unique constraints before inserting
4803 self.validate_unique_constraints(collection, &data).await?;
4804
4805 let doc_id = Uuid::now_v7().to_string();
4806 let document = Document {
4807 id: doc_id.clone(),
4808 data,
4809 };
4810
4811 self.put(
4812 format!("{}:{}", collection, doc_id),
4813 serde_json::to_vec(&document)?,
4814 None,
4815 )
4816 .await?;
4817
4818 // Publish insert event
4819 let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
4820 let _ = self.pubsub.publish(event);
4821
4822 Ok(document)
4823 }
4824
4825 /// Update a document by ID with new data (AQL helper)
4826 ///
4827 /// Merges new data with existing data and returns updated document
4828 pub async fn aql_update_document(
4829 &self,
4830 collection: &str,
4831 doc_id: &str,
4832 updates: HashMap<String, Value>,
4833 ) -> Result<Document> {
4834 let key = format!("{}:{}", collection, doc_id);
4835
4836 // Get existing document
4837 let existing = self.get(&key)?.ok_or_else(|| {
4838 AqlError::new(
4839 ErrorCode::NotFound,
4840 format!("Document {} not found", doc_id),
4841 )
4842 })?;
4843
4844 let mut doc: Document = serde_json::from_slice(&existing)?;
4845 let old_doc = doc.clone();
4846
4847 // Merge updates into existing data
4848 for (k, v) in updates {
4849 doc.data.insert(k, v);
4850 }
4851
4852 // Validate unique constraints excluding this document
4853 self.validate_unique_constraints_excluding(collection, &doc.data, doc_id)
4854 .await?;
4855
4856 // Save updated document
4857 self.put(key, serde_json::to_vec(&doc)?, None).await?;
4858
4859 // Publish update event
4860 let event = crate::pubsub::ChangeEvent::update(collection, doc_id, old_doc, doc.clone());
4861 let _ = self.pubsub.publish(event);
4862
4863 Ok(doc)
4864 }
4865
4866 /// Delete a document by ID (AQL helper)
4867 ///
4868 /// Returns the deleted document
4869 pub async fn aql_delete_document(&self, collection: &str, doc_id: &str) -> Result<Document> {
4870 let key = format!("{}:{}", collection, doc_id);
4871
4872 // Get existing document first
4873 let existing = self.get(&key)?.ok_or_else(|| {
4874 AqlError::new(
4875 ErrorCode::NotFound,
4876 format!("Document {} not found", doc_id),
4877 )
4878 })?;
4879
4880 let doc: Document = serde_json::from_slice(&existing)?;
4881
4882 // Append to WAL for durability
4883 if let Some(wal) = &self.wal {
4884 wal.write()
4885 .map_err(|e| AqlError::new(ErrorCode::InternalError, e.to_string()))?
4886 .append(crate::wal::Operation::Delete, &key, None)?;
4887 }
4888
4889 // Delete from storage
4890 self.cold.delete(&key)?;
4891 self.hot.delete(&key);
4892
4893 // Remove from primary index
4894 if let Some(index) = self.primary_indices.get_mut(collection) {
4895 index.remove(&key);
4896 }
4897
4898 // Remove from secondary indices
4899 for (field_name, value) in &doc.data {
4900 let index_key = format!("{}:{}", collection, field_name);
4901 if let Some(index) = self.secondary_indices.get_mut(&index_key) {
4902 let value_str = match value {
4903 Value::String(s) => s.clone(),
4904 _ => value.to_string(),
4905 };
4906 if let Some(mut doc_ids) = index.get_mut(&value_str) {
4907 doc_ids.retain(|id| id != &key);
4908 }
4909 }
4910 }
4911
4912 // Publish delete event
4913 let event = crate::pubsub::ChangeEvent::delete(collection, doc_id);
4914 let _ = self.pubsub.publish(event);
4915
4916 Ok(doc)
4917 }
4918
4919 /// Get a single document by ID (AQL helper)
4920 pub async fn aql_get_document(
4921 &self,
4922 collection: &str,
4923 doc_id: &str,
4924 ) -> Result<Option<Document>> {
4925 let key = format!("{}:{}", collection, doc_id);
4926
4927 match self.get(&key)? {
4928 Some(data) => {
4929 let doc: Document = serde_json::from_slice(&data)?;
4930 Ok(Some(doc))
4931 }
4932 None => Ok(None),
4933 }
4934 }
4935
4936 /// Begin a transaction (AQL helper) - returns transaction ID
4937 pub fn aql_begin_transaction(&self) -> Result<crate::transaction::TransactionId> {
4938 Ok(self.begin_transaction())
4939 }
4940
4941 /// Commit a transaction (AQL helper)
4942 pub async fn aql_commit_transaction(
4943 &self,
4944 tx_id: crate::transaction::TransactionId,
4945 ) -> Result<()> {
4946 self.commit_transaction(tx_id)
4947 }
4948
4949 /// Rollback a transaction (AQL helper)
4950 pub async fn aql_rollback_transaction(
4951 &self,
4952 tx_id: crate::transaction::TransactionId,
4953 ) -> Result<()> {
4954 self.transaction_manager.rollback(tx_id)
4955 }
4956
4957 // ============================================
4958 // AQL Schema Management Wrappers
4959 // ============================================
4960
4961 /// Create a collection from AST schema definition
4962 pub async fn create_collection_schema(
4963 &self,
4964 name: &str,
4965 fields: HashMap<String, crate::types::FieldDefinition>,
4966 ) -> Result<()> {
4967 let collection_key = format!("_collection:{}", name);
4968
4969 // Check if collection already exists
4970 if self.get(&collection_key)?.is_some() {
4971 // Already exists
4972 return Ok(());
4973 }
4974
4975 let collection = Collection {
4976 name: name.to_string(),
4977 fields,
4978 };
4979
4980 let collection_data = serde_json::to_vec(&collection)?;
4981 self.put(collection_key, collection_data, None).await?;
4982 self.schema_cache.remove(name);
4983
4984 Ok(())
4985 }
4986
4987 /// Add a field to an existing collection schema
4988 pub async fn add_field_to_schema(
4989 &self,
4990 collection_name: &str,
4991 name: String,
4992 definition: crate::types::FieldDefinition,
4993 ) -> Result<()> {
4994 let mut collection = self
4995 .get_collection_definition(collection_name)
4996 .map_err(|_| {
4997 AqlError::new(ErrorCode::CollectionNotFound, collection_name.to_string())
4998 })?;
4999
5000 if collection.fields.contains_key(&name) {
5001 return Err(AqlError::new(
5002 ErrorCode::InvalidDefinition,
5003 format!(
5004 "Field '{}' already exists in collection '{}'",
5005 name, collection_name
5006 ),
5007 ));
5008 }
5009
5010 collection.fields.insert(name, definition);
5011
5012 let collection_key = format!("_collection:{}", collection_name);
5013 let collection_data = serde_json::to_vec(&collection)?;
5014 self.put(collection_key, collection_data, None).await?;
5015 self.schema_cache.remove(collection_name);
5016
5017 Ok(())
5018 }
5019
5020 /// Drop a field from an existing collection schema
5021 pub async fn drop_field_from_schema(
5022 &self,
5023 collection_name: &str,
5024 field_name: String,
5025 ) -> Result<()> {
5026 let mut collection = self
5027 .get_collection_definition(collection_name)
5028 .map_err(|_| {
5029 AqlError::new(ErrorCode::CollectionNotFound, collection_name.to_string())
5030 })?;
5031
5032 if !collection.fields.contains_key(&field_name) {
5033 return Err(AqlError::new(
5034 ErrorCode::InvalidDefinition,
5035 format!(
5036 "Field '{}' does not exist in collection '{}'",
5037 field_name, collection_name
5038 ),
5039 ));
5040 }
5041
5042 collection.fields.remove(&field_name);
5043
5044 let collection_key = format!("_collection:{}", collection_name);
5045 let collection_data = serde_json::to_vec(&collection)?;
5046 self.put(collection_key, collection_data, None).await?;
5047 self.schema_cache.remove(collection_name);
5048
5049 Ok(())
5050 }
5051
5052 /// Rename a field in an existing collection schema
5053 pub async fn rename_field_in_schema(
5054 &self,
5055 collection_name: &str,
5056 from: String,
5057 to: String,
5058 ) -> Result<()> {
5059 let mut collection = self
5060 .get_collection_definition(collection_name)
5061 .map_err(|_| {
5062 AqlError::new(ErrorCode::CollectionNotFound, collection_name.to_string())
5063 })?;
5064
5065 if let Some(def) = collection.fields.remove(&from) {
5066 if collection.fields.contains_key(&to) {
5067 return Err(AqlError::new(
5068 ErrorCode::InvalidDefinition,
5069 format!(
5070 "Target field name '{}' already exists in collection '{}'",
5071 to, collection_name
5072 ),
5073 ));
5074 }
5075 collection.fields.insert(to, def);
5076 } else {
5077 return Err(AqlError::new(
5078 ErrorCode::InvalidDefinition,
5079 format!("Field '{}' not found", from),
5080 ));
5081 }
5082
5083 let collection_key = format!("_collection:{}", collection_name);
5084 let collection_data = serde_json::to_vec(&collection)?;
5085 self.put(collection_key, collection_data, None).await?;
5086 self.schema_cache.remove(collection_name);
5087
5088 Ok(())
5089 }
5090
5091 /// Modify a field in an existing collection schema
5092 pub async fn modify_field_in_schema(
5093 &self,
5094 collection_name: &str,
5095 name: String,
5096 definition: crate::types::FieldDefinition,
5097 ) -> Result<()> {
5098 let mut collection = self
5099 .get_collection_definition(collection_name)
5100 .map_err(|_| {
5101 AqlError::new(ErrorCode::CollectionNotFound, collection_name.to_string())
5102 })?;
5103
5104 if !collection.fields.contains_key(&name) {
5105 return Err(AqlError::new(
5106 ErrorCode::InvalidDefinition,
5107 format!(
5108 "Field '{}' does not exist in collection '{}'",
5109 name, collection_name
5110 ),
5111 ));
5112 }
5113
5114 collection.fields.insert(name, definition);
5115
5116 let collection_key = format!("_collection:{}", collection_name);
5117 let collection_data = serde_json::to_vec(&collection)?;
5118 self.put(collection_key, collection_data, None).await?;
5119 self.schema_cache.remove(collection_name);
5120
5121 Ok(())
5122 }
5123
5124 /// Drop an entire collection definition
5125 pub async fn drop_collection_schema(&self, collection_name: &str) -> Result<()> {
5126 let collection_key = format!("_collection:{}", collection_name);
5127 self.cold.delete(&collection_key)?;
5128 self.schema_cache.remove(collection_name);
5129 Ok(())
5130 }
5131
5132 // ============================================
5133 // AQL Migration Wrappers
5134 // ============================================
5135
5136 /// Check if a migration version has been applied
5137 pub async fn is_migration_applied(&self, version: &str) -> Result<bool> {
5138 let migration_key = format!("_sys_migration:{}", version);
5139 Ok(self.get(&migration_key)?.is_some())
5140 }
5141
5142 /// Mark a migration version as applied
5143 pub async fn mark_migration_applied(&self, version: &str) -> Result<()> {
5144 let migration_key = format!("_sys_migration:{}", version);
5145 let timestamp = chrono::Utc::now().to_rfc3339();
5146 self.put(migration_key, timestamp.as_bytes().to_vec(), None)
5147 .await?;
5148 Ok(())
5149 }
5150}
5151
5152impl Drop for Aurora {
5153 fn drop(&mut self) {
5154 // Signal checkpoint task to shutdown gracefully
5155 if let Some(ref shutdown_tx) = self.checkpoint_shutdown {
5156 let _ = shutdown_tx.send(());
5157 }
5158 // Signal compaction task to shutdown gracefully
5159 if let Some(ref shutdown_tx) = self.compaction_shutdown {
5160 let _ = shutdown_tx.send(());
5161 }
5162 }
5163}
5164
5165fn check_filter(doc_val: &Value, filter: &HttpFilter) -> bool {
5166 let filter_val = match json_to_value(&filter.value) {
5167 Ok(v) => v,
5168 Err(_) => return false,
5169 };
5170
5171 match filter.operator {
5172 FilterOperator::Eq => doc_val == &filter_val,
5173 FilterOperator::Ne => doc_val != &filter_val,
5174 FilterOperator::Gt => doc_val > &filter_val,
5175 FilterOperator::Gte => doc_val >= &filter_val,
5176 FilterOperator::Lt => doc_val < &filter_val,
5177 FilterOperator::Lte => doc_val <= &filter_val,
5178 FilterOperator::Contains => match (doc_val, &filter_val) {
5179 (Value::String(s), Value::String(fv)) => s.contains(fv),
5180 (Value::Array(arr), _) => arr.contains(&filter_val),
5181 _ => false,
5182 },
5183 }
5184}
5185
5186/// Results of importing a document
5187enum ImportResult {
5188 Imported,
5189 Skipped,
5190}
5191
5192/// Statistics from an import operation
5193#[derive(Debug, Default)]
5194pub struct ImportStats {
5195 /// Number of documents successfully imported
5196 pub imported: usize,
5197 /// Number of documents skipped (usually because they already exist)
5198 pub skipped: usize,
5199 /// Number of documents that failed to import
5200 pub failed: usize,
5201}
5202
5203/// Statistics for a specific collection
5204#[derive(Debug)]
5205pub struct CollectionStats {
5206 /// Number of documents in the collection
5207 pub count: usize,
5208 /// Total size of the collection in bytes
5209 pub size_bytes: usize,
5210 /// Average document size in bytes
5211 pub avg_doc_size: usize,
5212}
5213
5214/// Statistics for an index
5215#[derive(Debug)]
5216pub struct IndexStats {
5217 /// Number of unique values in the index
5218 pub unique_values: usize,
5219 /// Total number of documents covered by the index
5220 pub total_documents: usize,
5221 /// Average number of documents per unique value
5222 pub avg_docs_per_value: usize,
5223}
5224
5225/// Combined database statistics
5226#[derive(Debug)]
5227pub struct DatabaseStats {
5228 /// Hot cache statistics
5229 pub hot_stats: crate::storage::hot::CacheStats,
5230 /// Cold storage statistics
5231 pub cold_stats: crate::storage::cold::ColdStoreStats,
5232 /// Estimated total database size in bytes
5233 pub estimated_size: u64,
5234 /// Statistics for each collection
5235 pub collections: HashMap<String, CollectionStats>,
5236}
5237
5238#[cfg(test)]
5239mod tests {
5240 use super::*;
5241 use tempfile::tempdir;
5242
5243 #[tokio::test]
5244 async fn test_async_wal_integration() {
5245 let temp_dir = tempfile::tempdir().unwrap();
5246 let db_path = temp_dir.path().join("test_wal_integration.db");
5247 let db = Aurora::open(db_path.to_str().unwrap()).unwrap();
5248
5249 // 1. Enable WAL explicitly (if your config defaults to off for tests)
5250 // Note: Your implementation might default it on based on AuroraConfig.
5251
5252 // Create collection schema first
5253 db.new_collection(
5254 "users",
5255 vec![
5256 ("id", FieldType::String, false),
5257 ("counter", FieldType::Int, false),
5258 ],
5259 )
5260 .await
5261 .unwrap();
5262
5263 let db_clone = db.clone();
5264
5265 // 2. Spawn 50 concurrent writes
5266 let handles: Vec<_> = (0..50)
5267 .map(|i| {
5268 let db = db_clone.clone();
5269 tokio::spawn(async move {
5270 let doc_id = db
5271 .insert_into(
5272 "users",
5273 vec![
5274 ("id", Value::String(format!("user-{}", i))),
5275 ("counter", Value::Int(i)),
5276 ],
5277 )
5278 .await
5279 .unwrap();
5280 (i, doc_id) // Return both the index and the document ID
5281 })
5282 })
5283 .collect();
5284
5285 // Wait for all to complete and collect the results
5286 let results = futures::future::join_all(handles).await;
5287 assert!(results.iter().all(|r| r.is_ok()), "Some writes failed");
5288
5289 // Extract the document IDs
5290 let doc_ids: Vec<(i64, String)> = results.into_iter().map(|r| r.unwrap()).collect();
5291
5292 // Find the document ID for the one with counter=25
5293 let target_doc_id = doc_ids
5294 .iter()
5295 .find(|(counter, _)| *counter == 25i64)
5296 .map(|(_, doc_id)| doc_id)
5297 .unwrap();
5298
5299 // 4. Verify Data Integrity
5300 let doc = db.get_document("users", target_doc_id).unwrap().unwrap();
5301 assert_eq!(doc.data.get("counter"), Some(&Value::Int(25)));
5302 }
5303
5304 #[tokio::test]
5305 async fn test_basic_operations() -> Result<()> {
5306 let temp_dir = tempdir()?;
5307 let db_path = temp_dir.path().join("test.aurora");
5308 let db = Aurora::open(db_path.to_str().unwrap())?;
5309
5310 // Test collection creation
5311 db.new_collection(
5312 "users",
5313 vec![
5314 ("name", FieldType::String, false),
5315 ("age", FieldType::Int, false),
5316 ("email", FieldType::String, true),
5317 ],
5318 )
5319 .await?;
5320
5321 // Test document insertion
5322 let doc_id = db
5323 .insert_into(
5324 "users",
5325 vec![
5326 ("name", Value::String("John Doe".to_string())),
5327 ("age", Value::Int(30)),
5328 ("email", Value::String("john@example.com".to_string())),
5329 ],
5330 )
5331 .await?;
5332
5333 // Test document retrieval
5334 let doc = db.get_document("users", &doc_id)?.unwrap();
5335 assert_eq!(
5336 doc.data.get("name").unwrap(),
5337 &Value::String("John Doe".to_string())
5338 );
5339 assert_eq!(doc.data.get("age").unwrap(), &Value::Int(30));
5340
5341 Ok(())
5342 }
5343
5344 #[tokio::test]
5345 async fn test_transactions() -> Result<()> {
5346 let temp_dir = tempdir()?;
5347 let db_path = temp_dir.path().join("test.aurora");
5348 let db = Aurora::open(db_path.to_str().unwrap())?;
5349
5350 // Create collection
5351 db.new_collection("test", vec![("field", FieldType::String, false)])
5352 .await?;
5353
5354 // Start transaction
5355 let tx_id = db.begin_transaction();
5356
5357 // Insert document
5358 let doc_id = db
5359 .insert_into("test", vec![("field", Value::String("value".to_string()))])
5360 .await?;
5361
5362 // Commit transaction
5363 db.commit_transaction(tx_id)?;
5364
5365 // Verify document exists
5366 let doc = db.get_document("test", &doc_id)?.unwrap();
5367 assert_eq!(
5368 doc.data.get("field").unwrap(),
5369 &Value::String("value".to_string())
5370 );
5371
5372 Ok(())
5373 }
5374
5375 #[tokio::test]
5376 async fn test_query_operations() -> Result<()> {
5377 let temp_dir = tempdir()?;
5378 let db_path = temp_dir.path().join("test.aurora");
5379 let db = Aurora::open(db_path.to_str().unwrap())?;
5380
5381 // Test collection creation
5382 db.new_collection(
5383 "books",
5384 vec![
5385 ("title", FieldType::String, false),
5386 ("author", FieldType::String, false),
5387 ("year", FieldType::Int, false),
5388 ],
5389 )
5390 .await?;
5391
5392 // Test document insertion
5393 db.insert_into(
5394 "books",
5395 vec![
5396 ("title", Value::String("Book 1".to_string())),
5397 ("author", Value::String("Author 1".to_string())),
5398 ("year", Value::Int(2020)),
5399 ],
5400 )
5401 .await?;
5402
5403 db.insert_into(
5404 "books",
5405 vec![
5406 ("title", Value::String("Book 2".to_string())),
5407 ("author", Value::String("Author 2".to_string())),
5408 ("year", Value::Int(2021)),
5409 ],
5410 )
5411 .await?;
5412
5413 // Test query
5414 let results = db
5415 .query("books")
5416 .filter(|f| f.gt("year", Value::Int(2019)))
5417 .order_by("year", true)
5418 .collect()
5419 .await?;
5420
5421 assert_eq!(results.len(), 2);
5422 assert!(results[0].data.get("year").unwrap() < results[1].data.get("year").unwrap());
5423
5424 Ok(())
5425 }
5426
5427 #[tokio::test]
5428 async fn test_blob_operations() -> Result<()> {
5429 let temp_dir = tempdir()?;
5430 let db_path = temp_dir.path().join("test.aurora");
5431 let db = Aurora::open(db_path.to_str().unwrap())?;
5432
5433 // Create test file
5434 let file_path = temp_dir.path().join("test.txt");
5435 std::fs::write(&file_path, b"Hello, World!")?;
5436
5437 // Test blob storage
5438 db.put_blob("test:blob".to_string(), &file_path).await?;
5439
5440 // Verify blob exists
5441 let data = db.get_data_by_pattern("test:blob")?;
5442 assert_eq!(data.len(), 1);
5443 match &data[0].1 {
5444 DataInfo::Blob { size } => assert_eq!(*size, 13 + 5), // content + "BLOB:" prefix
5445 _ => panic!("Expected Blob type"),
5446 }
5447
5448 Ok(())
5449 }
5450
5451 #[tokio::test]
5452 async fn test_blob_size_limit() -> Result<()> {
5453 let temp_dir = tempdir()?;
5454 let db_path = temp_dir.path().join("test.aurora");
5455 let db = Aurora::open(db_path.to_str().unwrap())?;
5456
5457 // Create a test file that's too large (201MB)
5458 let large_file_path = temp_dir.path().join("large_file.bin");
5459 let large_data = vec![0u8; 201 * 1024 * 1024];
5460 std::fs::write(&large_file_path, &large_data)?;
5461
5462 // Attempt to store the large file
5463 let result = db
5464 .put_blob("test:large_blob".to_string(), &large_file_path)
5465 .await;
5466
5467 assert!(result.is_err());
5468 let err = result.unwrap_err();
5469 assert!(err.code == ErrorCode::InvalidOperation); // Expected error
5470
5471 Ok(())
5472 }
5473
5474 #[tokio::test]
5475 async fn test_unique_constraints() -> Result<()> {
5476 let temp_dir = tempdir()?;
5477 let db_path = temp_dir.path().join("test.aurora");
5478 let db = Aurora::open(db_path.to_str().unwrap())?;
5479
5480 // Create collection with unique email field
5481 db.new_collection(
5482 "users",
5483 vec![
5484 ("name", FieldType::String, false),
5485 ("email", FieldType::String, true), // unique field
5486 ("age", FieldType::Int, false),
5487 ],
5488 )
5489 .await?;
5490
5491 // Insert first document
5492 let _doc_id1 = db
5493 .insert_into(
5494 "users",
5495 vec![
5496 ("name", Value::String("John Doe".to_string())),
5497 ("email", Value::String("john@example.com".to_string())),
5498 ("age", Value::Int(30)),
5499 ],
5500 )
5501 .await?;
5502
5503 // Try to insert second document with same email - should fail
5504 let result = db
5505 .insert_into(
5506 "users",
5507 vec![
5508 ("name", Value::String("Jane Doe".to_string())),
5509 ("email", Value::String("john@example.com".to_string())), // duplicate email
5510 ("age", Value::Int(25)),
5511 ],
5512 )
5513 .await;
5514
5515 assert!(result.is_err());
5516 if let Err(e) = result {
5517 if e.code == ErrorCode::UniqueConstraintViolation {
5518 let msg = e.message;
5519 assert!(msg.contains("email"));
5520 assert!(msg.contains("john@example.com")); // Changed from test@example.com to match the actual value
5521 } else {
5522 panic!("Expected UniqueConstraintViolation, got {:?}", e);
5523 }
5524 } else {
5525 panic!("Expected UniqueConstraintViolation");
5526 }
5527
5528 // Test upsert with unique constraint
5529 // Should succeed for new document
5530 let _doc_id2 = db
5531 .upsert(
5532 "users",
5533 "user2",
5534 vec![
5535 ("name", Value::String("Alice Smith".to_string())),
5536 ("email", Value::String("alice@example.com".to_string())),
5537 ("age", Value::Int(28)),
5538 ],
5539 )
5540 .await?;
5541
5542 // Should fail when trying to upsert with duplicate email
5543 let result = db
5544 .upsert(
5545 "users",
5546 "user3",
5547 vec![
5548 ("name", Value::String("Bob Wilson".to_string())),
5549 ("email", Value::String("alice@example.com".to_string())), // duplicate
5550 ("age", Value::Int(35)),
5551 ],
5552 )
5553 .await;
5554
5555 assert!(result.is_err());
5556
5557 // Should succeed when updating existing document with same email (no change)
5558 let result = db
5559 .upsert(
5560 "users",
5561 "user2",
5562 vec![
5563 ("name", Value::String("Alice Updated".to_string())),
5564 ("email", Value::String("alice@example.com".to_string())), // same email, same doc
5565 ("age", Value::Int(29)),
5566 ],
5567 )
5568 .await;
5569
5570 assert!(result.is_ok());
5571
5572 Ok(())
5573 }
5574}