aurora_db/db.rs
1//! # Aurora Database
2//!
3//! Aurora is an embedded document database with tiered storage architecture.
4//! It provides document storage, querying, indexing, and search capabilities
5//! while optimizing for both performance and durability.
6//!
7//! ## Key Features
8//!
9//! * **Tiered Storage**: Hot in-memory cache + persistent cold storage
10//! * **Document Model**: Schema-flexible JSON-like document storage
11//! * **Querying**: Rich query capabilities with filtering and sorting
12//! * **Full-text Search**: Built-in search engine with relevance ranking
13//! * **Transactions**: ACID-compliant transaction support
14//! * **Blob Storage**: Efficient storage for large binary objects
15//!
16//! ## Usage Example
17//!
18//! ```rust
19//! use aurora::Aurora;
20//!
21//! // Open a database
22//! let db = Aurora::open("my_database.db")?;
23//!
24//! // Create a collection with schema
25//! db.new_collection("users", vec![
26//! ("name", FieldType::String, false),
27//! ("email", FieldType::String, true), // unique field
28//! ("age", FieldType::Int, false),
29//! ])?;
30//!
31//! // Insert a document
32//! let user_id = db.insert_into("users", vec![
33//! ("name", Value::String("Jane Doe".to_string())),
34//! ("email", Value::String("jane@example.com".to_string())),
35//! ("age", Value::Int(28)),
36//! ])?;
37//!
38//! // Query for documents
39//! let adult_users = db.query("users")
40//! .filter(|f| f.gt("age", 18))
41//! .order_by("name", true)
42//! .collect()
43//! .await?;
44//! ```
45
46use crate::error::{AqlError, ErrorCode, Result};
47use crate::index::{Index, IndexDefinition, IndexType};
48use crate::network::http_models::{
49 Filter as HttpFilter, FilterOperator, QueryPayload, json_to_value,
50};
51use crate::parser;
52use crate::parser::executor::{ExecutionOptions, ExecutionPlan, ExecutionResult};
53use crate::query::FilterBuilder;
54use crate::query::{Filter, QueryBuilder, SearchBuilder, SimpleQueryBuilder};
55use crate::storage::{ColdStore, HotStore, WriteBuffer};
56use crate::types::{
57 AuroraConfig, Collection, Document, DurabilityMode, FieldDefinition, FieldType, Value,
58};
59use crate::wal::{Operation, WriteAheadLog};
60use dashmap::DashMap;
61use serde_json::Value as JsonValue;
62use serde_json::from_str;
63use std::collections::HashMap;
64use std::fmt;
65use std::fs::File as StdFile;
66use std::path::{Path, PathBuf};
67use std::sync::{Arc, RwLock};
68use std::time::Duration;
69use tokio::fs::File;
70use tokio::fs::read_to_string;
71use tokio::io::AsyncReadExt;
72use tokio::sync::OnceCell;
73use uuid::Uuid;
74
75// Disk location metadata for primary index
76// Instead of storing full Vec<u8> values, we store minimal metadata
77#[derive(Debug, Clone, Copy)]
78struct DiskLocation {
79 size: u32, // Size in bytes (useful for statistics)
80}
81
82impl DiskLocation {
83 fn new(size: usize) -> Self {
84 Self { size: size as u32 }
85 }
86}
87
88// Index types for faster lookups
89type PrimaryIndex = DashMap<String, DiskLocation>;
90type SecondaryIndex = DashMap<String, Vec<String>>;
91
92// Move DataInfo enum outside impl block
93#[derive(Debug)]
94pub enum DataInfo {
95 Data { size: usize, preview: String },
96 Blob { size: usize },
97 Compressed { size: usize },
98}
99
100/// Trait to convert field tuples into FieldDefinition
101/// Supports both 3-tuple (defaults nullable to false) and 4-tuple (explicit nullable)
102pub trait IntoFieldDefinition {
103 fn into_field_definition(self) -> (String, FieldDefinition);
104}
105
106// 3-tuple: (field_name, field_type, unique) - defaults nullable to false
107impl<S: Into<String>> IntoFieldDefinition for (S, FieldType, bool) {
108 fn into_field_definition(self) -> (String, FieldDefinition) {
109 let (name, field_type, unique) = self;
110 (
111 name.into(),
112 FieldDefinition {
113 field_type,
114 unique,
115 indexed: unique,
116 nullable: false, // Default to false
117 },
118 )
119 }
120}
121
122// 4-tuple: (field_name, field_type, unique, nullable) - explicit control
123impl<S: Into<String>> IntoFieldDefinition for (S, FieldType, bool, bool) {
124 fn into_field_definition(self) -> (String, FieldDefinition) {
125 let (name, field_type, unique, nullable) = self;
126 (
127 name.into(),
128 FieldDefinition {
129 field_type,
130 unique,
131 indexed: unique,
132 nullable,
133 },
134 )
135 }
136}
137
138#[derive(Debug)]
139#[allow(dead_code)]
140enum WalOperation {
141 Put {
142 key: Arc<String>,
143 value: Arc<Vec<u8>>,
144 },
145 Delete {
146 key: String,
147 },
148}
149
150impl DataInfo {
151 pub fn size(&self) -> usize {
152 match self {
153 DataInfo::Data { size, .. } => *size,
154 DataInfo::Blob { size } => *size,
155 DataInfo::Compressed { size } => *size,
156 }
157 }
158}
159
160/// The main database engine
161///
162/// Aurora combines a tiered storage architecture with document-oriented database features:
163/// - Hot tier: In-memory cache for frequently accessed data
164/// - Cold tier: Persistent disk storage for durability
165/// - Primary indices: Fast key-based access
166/// - Secondary indices: Fast field-based queries
167///
168/// # Examples
169///
170/// ```
171/// // Open a database (creates if doesn't exist)
172/// let db = Aurora::open("my_app.db")?;
173///
174/// // Insert a document
175/// let doc_id = db.insert_into("users", vec![
176/// ("name", Value::String("Alice".to_string())),
177/// ("age", Value::Int(32)),
178/// ])?;
179///
180/// // Retrieve a document
181/// let user = db.get_document("users", &doc_id)?;
182/// ```
183pub struct Aurora {
184 hot: HotStore,
185 cold: Arc<ColdStore>,
186 primary_indices: Arc<DashMap<String, PrimaryIndex>>,
187 secondary_indices: Arc<DashMap<String, SecondaryIndex>>,
188 indices_initialized: Arc<OnceCell<()>>,
189 transaction_manager: crate::transaction::TransactionManager,
190 indices: Arc<DashMap<String, Index>>,
191 schema_cache: Arc<DashMap<String, Arc<Collection>>>,
192 config: AuroraConfig,
193 write_buffer: Option<WriteBuffer>,
194 pub pubsub: crate::pubsub::PubSubSystem,
195 // Write-ahead log for durability (optional, based on config)
196 wal: Option<Arc<RwLock<WriteAheadLog>>>,
197 // Background checkpoint task
198 checkpoint_shutdown: Option<tokio::sync::mpsc::UnboundedSender<()>>,
199 // Background compaction task
200 compaction_shutdown: Option<tokio::sync::mpsc::UnboundedSender<()>>,
201 // Worker system for background jobs
202 pub workers: Option<Arc<crate::workers::WorkerSystem>>,
203 // Asynchronous WAL Writer Channel
204 wal_writer: Option<tokio::sync::mpsc::UnboundedSender<WalOperation>>,
205 // Computed fields manager
206 pub computed: Arc<RwLock<crate::computed::ComputedFields>>,
207}
208
209impl fmt::Debug for Aurora {
210 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
211 f.debug_struct("Aurora")
212 .field("hot", &"HotStore")
213 .field("cold", &"ColdStore")
214 .field("primary_indices_count", &self.primary_indices.len())
215 .field("secondary_indices_count", &self.secondary_indices.len())
216 .field(
217 "active_transactions",
218 &self.transaction_manager.active_count(),
219 )
220 .field("indices_count", &self.indices.len())
221 .finish()
222 }
223}
224
225impl Clone for Aurora {
226 fn clone(&self) -> Self {
227 Self {
228 hot: self.hot.clone(),
229 cold: Arc::clone(&self.cold),
230 primary_indices: Arc::clone(&self.primary_indices),
231 secondary_indices: Arc::clone(&self.secondary_indices),
232 indices_initialized: Arc::clone(&self.indices_initialized),
233 transaction_manager: self.transaction_manager.clone(),
234 indices: Arc::clone(&self.indices),
235 schema_cache: Arc::clone(&self.schema_cache),
236 config: self.config.clone(),
237 write_buffer: None,
238 pubsub: self.pubsub.clone(),
239 wal: self.wal.clone(),
240 checkpoint_shutdown: self.checkpoint_shutdown.clone(),
241 compaction_shutdown: self.compaction_shutdown.clone(),
242 workers: self.workers.clone(),
243
244 wal_writer: self.wal_writer.clone(),
245 computed: Arc::clone(&self.computed),
246 }
247 }
248}
249
250/// Trait for converting inputs into execution parameters
251pub trait ToExecParams {
252 fn into_params(self) -> Result<(String, ExecutionOptions)>;
253}
254
255impl<'a> ToExecParams for &'a str {
256 fn into_params(self) -> Result<(String, ExecutionOptions)> {
257 Ok((self.to_string(), ExecutionOptions::default()))
258 }
259}
260
261impl ToExecParams for String {
262 fn into_params(self) -> Result<(String, ExecutionOptions)> {
263 Ok((self, ExecutionOptions::default()))
264 }
265}
266
267impl<'a, V> ToExecParams for (&'a str, V)
268where
269 V: Into<serde_json::Value>,
270{
271 fn into_params(self) -> Result<(String, ExecutionOptions)> {
272 let (aql, vars) = self;
273 let json_vars = vars.into();
274 let map = match json_vars {
275 serde_json::Value::Object(map) => map.into_iter().collect(),
276 _ => {
277 return Err(AqlError::new(
278 ErrorCode::InvalidInput,
279 "Variables must be a JSON object",
280 ));
281 }
282 };
283 Ok((
284 aql.to_string(),
285 ExecutionOptions {
286 variables: map,
287 ..Default::default()
288 },
289 ))
290 }
291}
292
293impl Aurora {
294 /// Execute AQL query (variables are optional)
295 ///
296 /// Supports two forms:
297 /// 1. `db.execute("query").await`
298 /// 2. `db.execute(("query", vars)).await`
299 pub async fn execute<I: ToExecParams>(&self, input: I) -> Result<ExecutionResult> {
300 let (aql, options) = input.into_params()?;
301 parser::executor::execute(self, &aql, options).await
302 }
303
304 /// Stream real-time changes using AQL subscription syntax
305 ///
306 /// This is a convenience method that extracts the `ChangeListener` from
307 /// an AQL subscription query, providing a cleaner API than using `execute()` directly.
308 ///
309 /// # Example
310 /// ```
311 /// // Stream changes from active products
312 /// let mut listener = db.stream(r#"
313 /// subscription {
314 /// products(where: { active: { eq: true } }) {
315 /// id
316 /// name
317 /// }
318 /// }
319 /// "#).await?;
320 ///
321 /// // Receive real-time events
322 /// while let Ok(event) = listener.recv().await {
323 /// println!("Change: {:?} on {}", event.change_type, event.id);
324 /// }
325 /// ```
326 pub async fn stream(&self, aql: &str) -> Result<crate::pubsub::ChangeListener> {
327 let result = self.execute(aql).await?;
328
329 match result {
330 ExecutionResult::Subscription(sub) => sub.stream.ok_or_else(|| {
331 crate::error::AqlError::new(
332 crate::error::ErrorCode::QueryError,
333 "Subscription did not return a stream".to_string(),
334 )
335 }),
336 _ => Err(crate::error::AqlError::new(
337 crate::error::ErrorCode::QueryError,
338 "Expected a subscription query, got a different operation type".to_string(),
339 )),
340 }
341 }
342
343 /// Explain AQL query execution plan
344 pub async fn explain<I: ToExecParams>(&self, input: I) -> Result<ExecutionPlan> {
345 let (aql, options) = input.into_params()?;
346
347 // Parse and analyze without executing
348 let doc = parser::parse_with_variables(
349 &aql,
350 serde_json::Value::Object(options.variables.clone().into_iter().collect()),
351 )?;
352
353 self.analyze_execution_plan(&doc).await
354 }
355
356 /// Analyze execution plan for a parsed query
357 pub async fn analyze_execution_plan(
358 &self,
359 doc: &crate::parser::ast::Document,
360 ) -> Result<ExecutionPlan> {
361 let mut operations = Vec::new();
362 for op in &doc.operations {
363 operations.push(format!("{:?}", op));
364 }
365
366 Ok(ExecutionPlan {
367 operations,
368 estimated_cost: 1.0,
369 })
370 }
371 /// Remove stale lock files from a database directory
372 ///
373 /// If Aurora crashes or is forcefully terminated, it may leave behind lock files
374 /// that prevent the database from being reopened. This method safely removes
375 /// those lock files.
376 ///
377 /// # Safety
378 /// Only call this when you're certain no other Aurora instance is using the database.
379 /// Removing lock files while another process is running could cause data corruption.
380 ///
381 /// # Example
382 /// ```no_run
383 /// use aurora_db::Aurora;
384 ///
385 /// // If you get "Access denied" error when opening:
386 /// if let Err(e) = Aurora::open("my_db") {
387 /// eprintln!("Failed to open: {}", e);
388 /// // Try removing stale lock
389 /// if Aurora::remove_stale_lock("my_db").unwrap_or(false) {
390 /// println!("Removed stale lock, try opening again");
391 /// let db = Aurora::open("my_db")?;
392 /// }
393 /// }
394 /// # Ok::<(), aurora_db::error::AqlError>(())
395 /// ```
396 pub fn remove_stale_lock<P: AsRef<Path>>(path: P) -> Result<bool> {
397 let resolved_path = Self::resolve_path(path)?;
398 crate::storage::cold::ColdStore::try_remove_stale_lock(resolved_path.to_str().unwrap())
399 }
400
401 /// Open or create a database at the specified location
402 ///
403 /// # Arguments
404 /// * `path` - Path to the database file or directory
405 /// - Absolute paths (like `/data/myapp.db`) are used as-is
406 /// - Relative paths (like `./data/myapp.db`) are resolved relative to the current directory
407 /// - Simple names (like `myapp.db`) use the current directory
408 ///
409 /// # Returns
410 /// An initialized `Aurora` database instance
411 ///
412 /// # Examples
413 ///
414
415 /// use aurora_db::Aurora;
416 ///
417 /// let db = Aurora::open("./data/my_application.db")?;
418 ///
419 /// // Or use a relative path
420 /// let db = Aurora::open("customer_data.db")?;
421 /// ```
422 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
423 let config = AuroraConfig {
424 db_path: Self::resolve_path(path)?,
425 ..Default::default()
426 };
427 Self::with_config(config)
428 }
429
430 /// Helper method to resolve database path
431 fn resolve_path<P: AsRef<Path>>(path: P) -> Result<PathBuf> {
432 let path = path.as_ref();
433
434 // If it's an absolute path, use it directly
435 if path.is_absolute() {
436 return Ok(path.to_path_buf());
437 }
438
439 // Otherwise, resolve relative to current directory
440 match std::env::current_dir() {
441 Ok(current_dir) => Ok(current_dir.join(path)),
442 Err(e) => Err(AqlError::new(
443 ErrorCode::IoError,
444 format!("Failed to resolve current directory: {}", e),
445 )),
446 }
447 }
448
449 /// Open a database with custom configuration
450 ///
451 /// # Arguments
452 /// * `config` - Database configuration settings
453 ///
454 /// # Examples
455 ///
456 /// ```
457 /// use aurora_db::{Aurora, types::AuroraConfig};
458 /// use std::time::Duration;
459 ///
460 /// let config = AuroraConfig {
461 /// db_path: "my_data.db".into(),
462 /// hot_cache_size_mb: 512, // 512 MB cache
463 /// enable_write_buffering: true, // Batch writes for speed
464 /// enable_wal: true, // Durability
465 /// auto_compact: true, // Background compaction
466 /// compact_interval_mins: 60, // Compact every hour
467 /// ..Default::default()
468 /// };
469 ///
470 /// let db = Aurora::with_config(config)?;
471 /// ```
472 pub fn with_config(config: AuroraConfig) -> Result<Self> {
473 let path = Self::resolve_path(&config.db_path)?;
474
475 if config.create_dirs
476 && let Some(parent) = path.parent()
477 && !parent.exists()
478 {
479 std::fs::create_dir_all(parent)?;
480 }
481
482 let cold = Arc::new(ColdStore::with_config(
483 path.to_str().unwrap(),
484 config.cold_cache_capacity_mb,
485 config.cold_flush_interval_ms,
486 config.cold_mode,
487 )?);
488
489 let hot = HotStore::with_config_and_eviction(
490 config.hot_cache_size_mb,
491 config.hot_cache_cleanup_interval_secs,
492 config.eviction_policy,
493 );
494
495 let write_buffer = if config.enable_write_buffering {
496 Some(WriteBuffer::new(
497 Arc::clone(&cold),
498 config.write_buffer_size,
499 config.write_buffer_flush_interval_ms,
500 ))
501 } else {
502 None
503 };
504
505 let auto_compact = config.auto_compact;
506 let enable_wal = config.enable_wal;
507 let pubsub = crate::pubsub::PubSubSystem::new(10000);
508
509 // Initialize WAL
510 let (wal, wal_entries) = if enable_wal {
511 let wal_path = path.to_str().unwrap();
512 match WriteAheadLog::new(wal_path) {
513 Ok(mut wal_log) => {
514 let entries = wal_log.recover().unwrap_or_else(|_| Vec::new());
515 (Some(Arc::new(RwLock::new(wal_log))), entries)
516 }
517 Err(_) => (None, Vec::new()),
518 }
519 } else {
520 (None, Vec::new())
521 };
522
523 // --- FIX: Initialize Background WAL Writer ---
524 // Only enable WAL writer if WAL was successfully initialized
525 let wal_writer = if enable_wal && wal.is_some() {
526 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
527 let wal_clone = wal.clone();
528
529 // Spawn a single dedicated thread/task for writing WAL sequentially
530 tokio::spawn(async move {
531 if let Some(wal) = wal_clone {
532 while let Some(op) = rx.recv().await {
533 let wal = wal.clone();
534 // Move blocking I/O to a blocking thread
535 let result = tokio::task::spawn_blocking(move || {
536 // Properly handle the RwLock write guard
537 match wal.write() {
538 Ok(mut guard) => {
539 let result = match op {
540 WalOperation::Put { key, value } => {
541 // Deref Arc to pass slices
542 guard.append(Operation::Put, &key, Some(value.as_ref()))
543 }
544 WalOperation::Delete { key } => {
545 guard.append(Operation::Delete, &key, None)
546 }
547 };
548 if let Err(e) = result {
549 eprintln!(
550 "CRITICAL: Failed to append to WAL: {}. Shutting down.",
551 e
552 );
553 // A failed WAL write is a catastrophic failure.
554 std::process::exit(1);
555 }
556 }
557 Err(e) => {
558 // This likely means the lock is poisoned, which is a critical state.
559 eprintln!(
560 "CRITICAL: Failed to acquire WAL lock: {}. Shutting down.",
561 e
562 );
563 std::process::exit(1);
564 }
565 }
566 })
567 .await;
568
569 if let Err(e) = result {
570 eprintln!("WAL blocking task failed: {}", e);
571 // If the blocking task panics or is cancelled, that's critical
572 std::process::exit(1);
573 }
574 }
575 }
576 });
577 Some(tx)
578 } else {
579 None
580 };
581
582 let checkpoint_shutdown = if wal.is_some() {
583 let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel();
584 let cold_clone = Arc::clone(&cold);
585 let wal_clone = wal.clone();
586 let checkpoint_interval = config.checkpoint_interval_ms;
587
588 tokio::spawn(async move {
589 let mut interval =
590 tokio::time::interval(Duration::from_millis(checkpoint_interval));
591 loop {
592 tokio::select! {
593 _ = interval.tick() => {
594 if let Err(e) = cold_clone.flush() {
595 eprintln!("Background checkpoint flush error: {}", e);
596 }
597 if let Some(ref wal) = wal_clone
598 && let Ok(mut wal_guard) = wal.write() {
599 let _ = wal_guard.truncate();
600 }
601 }
602 _ = shutdown_rx.recv() => {
603 let _ = cold_clone.flush();
604 if let Some(ref wal) = wal_clone
605 && let Ok(mut wal_guard) = wal.write() {
606 let _ = wal_guard.truncate();
607 }
608 break;
609 }
610 }
611 }
612 });
613
614 Some(shutdown_tx)
615 } else {
616 None
617 };
618
619 let compaction_shutdown = if auto_compact {
620 let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel();
621 let cold_clone = Arc::clone(&cold);
622 let compact_interval = config.compact_interval_mins;
623
624 tokio::spawn(async move {
625 let mut interval =
626 tokio::time::interval(Duration::from_secs(compact_interval * 60));
627 loop {
628 tokio::select! {
629 _ = interval.tick() => {
630 if let Err(e) = cold_clone.compact() {
631 eprintln!("Background compaction error: {}", e);
632 }
633 }
634 _ = shutdown_rx.recv() => {
635 let _ = cold_clone.compact();
636 break;
637 }
638 }
639 }
640 });
641
642 Some(shutdown_tx)
643 } else {
644 None
645 };
646
647 let workers_path = path.join("workers.db");
648 let worker_config = crate::workers::WorkerConfig {
649 storage_path: workers_path.to_string_lossy().into_owned(),
650 ..Default::default()
651 };
652 let workers = crate::workers::WorkerSystem::new(worker_config)
653 .map(Arc::new)
654 .ok();
655
656 let db = Self {
657 hot,
658 cold,
659 primary_indices: Arc::new(DashMap::new()),
660 secondary_indices: Arc::new(DashMap::new()),
661 indices_initialized: Arc::new(OnceCell::new()),
662 transaction_manager: crate::transaction::TransactionManager::new(),
663 indices: Arc::new(DashMap::new()),
664 schema_cache: Arc::new(DashMap::new()),
665 config,
666 write_buffer,
667 pubsub,
668 wal,
669 checkpoint_shutdown,
670 compaction_shutdown,
671 workers,
672 wal_writer, // Initialize the new channel sender
673 computed: Arc::new(RwLock::new(crate::computed::ComputedFields::new())),
674 };
675
676 if !wal_entries.is_empty() {
677 match tokio::runtime::Handle::try_current() {
678 Ok(handle) => handle.block_on(db.replay_wal(wal_entries))?,
679 Err(_) => {
680 let rt = tokio::runtime::Builder::new_current_thread()
681 .enable_all()
682 .build()
683 .map_err(|e| {
684 AqlError::new(
685 ErrorCode::InternalError,
686 format!("Failed to create temp runtime: {}", e),
687 )
688 })?;
689 rt.block_on(db.replay_wal(wal_entries))?;
690 }
691 }
692 }
693
694 Ok(db)
695 }
696 // Lazy index initialization
697 pub async fn ensure_indices_initialized(&self) -> Result<()> {
698 self.indices_initialized
699 .get_or_init(|| async {
700 println!("Initializing indices...");
701 if let Err(e) = self.initialize_indices() {
702 eprintln!("Failed to initialize indices: {:?}", e);
703 }
704 println!("Indices initialized");
705 })
706 .await;
707 Ok(())
708 }
709
710 fn initialize_indices(&self) -> Result<()> {
711 for result in self.cold.scan() {
712 let (key, value) = result?;
713 let key_str = std::str::from_utf8(key.as_bytes())
714 .map_err(|_| AqlError::new(ErrorCode::InvalidKey, "Invalid UTF-8".to_string()))?;
715
716 if let Some(collection_name) = key_str.split(':').next() {
717 // Skip system/metadata prefixes (e.g., _collection, _sys_migration, _index)
718 if collection_name.starts_with('_') {
719 continue;
720 }
721 self.index_value(collection_name, key_str, &value)?;
722 }
723 }
724 Ok(())
725 }
726
727 // Fast key-value operations with index support
728 /// Get a value by key (low-level key-value access)
729 ///
730 /// This is the low-level method. For document access, use `get_document()` instead.
731 /// Checks hot cache first, then falls back to cold storage for maximum performance.
732 ///
733 /// # Performance
734 /// - Hot cache hit: ~1M reads/sec (instant)
735 /// - Cold storage: ~500K reads/sec (disk I/O)
736 /// - Cache hit rate: typically 95%+ at scale
737 ///
738 /// # Examples
739 ///
740 /// ```
741 /// // Low-level key-value access
742 /// let data = db.get("users:12345")?;
743 /// if let Some(bytes) = data {
744 /// let doc: Document = serde_json::from_slice(&bytes)?;
745 /// println!("Found: {:?}", doc);
746 /// }
747 ///
748 /// // Better: use get_document() for documents
749 /// let user = db.get_document("users", "12345")?;
750 /// ```
751 pub fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
752 // Check hot cache first
753 if let Some(value) = self.hot.get(key) {
754 // Check if this is a blob reference (pointer to cold storage)
755 if value.starts_with(b"BLOBREF:") {
756 // It's a blob ref - fetch actual data from cold storage
757 return self.cold.get(key);
758 }
759 return Ok(Some(value));
760 }
761
762 // Fetch from cold storage
763 let value = self.cold.get(key)?;
764
765 if let Some(v) = &value {
766 if self.should_cache_key(key) {
767 self.hot
768 .set(Arc::new(key.to_string()), Arc::new(v.clone()), None);
769 }
770 }
771
772 Ok(value)
773 }
774
775 /// Get value with zero-copy Arc reference (10-100x faster than get!)
776 /// Only checks hot cache - returns None if not cached
777 pub fn get_hot_ref(&self, key: &str) -> Option<Arc<Vec<u8>>> {
778 self.hot.get_ref(key)
779 }
780
781 /// Helper to decide if a key should be cached.
782 fn should_cache_key(&self, key: &str) -> bool {
783 if let Some(collection_name) = key.split(':').next() {
784 if !collection_name.starts_with('_') {
785 if let Ok(collection_def) = self.get_collection_definition(collection_name) {
786 if collection_def
787 .fields
788 .values()
789 .any(|def| def.field_type == FieldType::Any)
790 {
791 return false; // Don't cache if collection has Any field
792 }
793 }
794 }
795 }
796 true // Cache by default
797 }
798
799 /// Get cache statistics
800 ///
801 /// Returns detailed metrics about cache performance including hit/miss rates,
802 /// memory usage, and access patterns. Useful for monitoring, optimization,
803 /// and understanding database performance characteristics.
804 ///
805 /// # Returns
806 /// `CacheStats` struct containing:
807 /// - `hits`: Number of cache hits (data found in memory)
808 /// - `misses`: Number of cache misses (had to read from disk)
809 /// - `hit_rate`: Percentage of requests served from cache (0.0-1.0)
810 /// - `size`: Current number of entries in cache
811 /// - `capacity`: Maximum cache capacity
812 /// - `evictions`: Number of entries evicted due to capacity
813 ///
814 /// # Examples
815 ///
816
817 /// use aurora_db::Aurora;
818 ///
819 /// let db = Aurora::open("mydb.db")?;
820
821 /// let stats = db.get_cache_stats();
822 /// println!("Cache hit rate: {:.1}%", stats.hit_rate * 100.0);
823 /// println!("Cache size: {} / {} entries", stats.size, stats.capacity);
824 /// println!("Total hits: {}, misses: {}", stats.hits, stats.misses);
825 ///
826 /// // Monitor performance during operations
827 /// let before = db.get_cache_stats();
828 ///
829 /// // Perform many reads
830 /// for i in 0..1000 {
831 /// db.get_document("users", &format!("user-{}", i))?;
832 /// }
833 ///
834 /// let after = db.get_cache_stats();
835 /// let hit_rate = (after.hits - before.hits) as f64 / 1000.0;
836 /// println!("Read hit rate: {:.1}%", hit_rate * 100.0);
837 ///
838 /// // Performance tuning
839 /// let stats = db.get_cache_stats();
840 /// if stats.hit_rate < 0.80 {
841 /// println!("Low cache hit rate! Consider:");
842 /// println!("- Increasing cache size in config");
843 /// println!("- Prewarming cache with prewarm_cache()");
844 /// println!("- Reviewing query patterns");
845 /// }
846 ///
847 /// if stats.evictions > stats.size {
848 /// println!("High eviction rate! Cache may be too small.");
849 /// println!("Consider increasing cache capacity.");
850 /// }
851 ///
852 /// // Production monitoring
853 /// use std::time::Duration;
854 /// use std::thread;
855 ///
856 /// loop {
857 /// let stats = db.get_cache_stats();
858 ///
859 /// // Log to monitoring system
860 /// if stats.hit_rate < 0.90 {
861 /// eprintln!("Warning: Cache hit rate dropped to {:.1}%",
862 /// stats.hit_rate * 100.0);
863 /// }
864 ///
865 /// thread::sleep(Duration::from_secs(60));
866 /// }
867 /// ```
868 ///
869 /// # Typical Performance Metrics
870 /// - **Excellent**: 95%+ hit rate (most reads from memory)
871 /// - **Good**: 80-95% hit rate (acceptable performance)
872 /// - **Poor**: <80% hit rate (consider cache tuning)
873 ///
874 /// # See Also
875 /// - `prewarm_cache()` to improve hit rates by preloading data
876 /// - `Aurora::with_config()` to adjust cache capacity
877 pub fn get_cache_stats(&self) -> crate::storage::hot::CacheStats {
878 self.hot.get_stats()
879 }
880
881 pub fn has_index(&self, collection: &str, field: &str) -> bool {
882 if let Ok(collection_def) = self.get_collection_definition(collection) {
883 if let Some(field_def) = collection_def.fields.get(field) {
884 return field_def.indexed || field_def.unique;
885 }
886 }
887 false
888 }
889
890 pub fn get_ids_from_index(&self, collection: &str, field: &str, value: &Value) -> Vec<String> {
891 let index_key = format!("{}:{}", collection, field);
892 if let Some(index) = self.secondary_indices.get(&index_key) {
893 let value_str = match value {
894 Value::String(s) => s.clone(),
895 _ => value.to_string(),
896 };
897 if let Some(doc_ids) = index.get(&value_str) {
898 // The doc_ids in the secondary index are full keys like "collection:id".
899 // The query logic expects just the "id" part.
900 return doc_ids
901 .value()
902 .iter()
903 .map(|key| key.split(':').nth(1).unwrap_or(key).to_string())
904 .collect();
905 }
906 }
907 Vec::new()
908 }
909
910 /// Register a computed field definition
911 pub async fn register_computed_field(
912 &self,
913 collection: &str,
914 field: &str,
915 expression: crate::computed::ComputedExpression,
916 ) -> Result<()> {
917 let mut computed = self.computed.write().unwrap();
918 computed.register(collection, field, expression);
919 Ok(())
920 }
921
922 // ============================================
923 // PubSub API - Real-time Change Notifications
924 // ============================================
925
926 /// Listen for real-time changes in a collection
927 ///
928 /// Returns a stream of change events (inserts, updates, deletes) that you can subscribe to.
929 /// Perfect for building reactive UIs, cache invalidation, audit logging, webhooks, and
930 /// data synchronization systems.
931 ///
932 /// # Performance
933 /// - Zero overhead when no listeners are active
934 /// - Events are broadcast to all listeners asynchronously
935 /// - Non-blocking - doesn't slow down write operations
936 /// - Multiple listeners can watch the same collection
937 ///
938 /// # Examples
939 ///
940
941 /// use aurora_db::{Aurora, types::Value};
942 ///
943 /// let db = Aurora::open("mydb.db")?;
944 ///
945 /// // Basic listener
946 /// let mut listener = db.listen("users");
947 ///
948 /// tokio::spawn(async move {
949 /// while let Ok(event) = listener.recv().await {
950 /// match event.change_type {
951 /// ChangeType::Insert => println!("New user: {:?}", event.document),
952 /// ChangeType::Update => println!("Updated user: {:?}", event.document),
953 /// ChangeType::Delete => println!("Deleted user ID: {}", event.id),
954 /// }
955 /// }
956 /// });
957 ///
958 /// // Now any insert/update/delete will trigger the listener
959 /// db.insert_into("users", vec![("name", Value::String("Alice".into()))]).await?;
960 /// ```
961 ///
962 /// # Real-World Use Cases
963 ///
964 /// **Cache Invalidation:**
965
966 /// use std::sync::Arc;
967 /// use tokio::sync::RwLock;
968 /// use std::collections::HashMap;
969 ///
970 /// let cache = Arc::new(RwLock::new(HashMap::new()));
971 /// let cache_clone = Arc::clone(&cache);
972 ///
973 /// let mut listener = db.listen("products");
974 ///
975 /// tokio::spawn(async move {
976 /// while let Ok(event) = listener.recv().await {
977 /// // Invalidate cache entry when product changes
978 /// cache_clone.write().await.remove(&event.id);
979 /// println!("Cache invalidated for product: {}", event.id);
980 /// }
981 /// });
982 /// ```
983 ///
984 /// **Webhook Notifications:**
985 /// ```
986 /// let mut listener = db.listen("orders");
987 ///
988 /// tokio::spawn(async move {
989 /// while let Ok(event) = listener.recv().await {
990 /// if event.change_type == ChangeType::Insert {
991 /// // Send webhook for new orders
992 /// send_webhook("https://api.example.com/webhooks/order", &event).await;
993 /// }
994 /// }
995 /// });
996 /// ```
997 ///
998 /// **Audit Logging:**
999 /// ```
1000 /// let mut listener = db.listen("sensitive_data");
1001 ///
1002 /// tokio::spawn(async move {
1003 /// while let Ok(event) = listener.recv().await {
1004 /// // Log all changes to audit trail
1005 /// db.insert_into("audit_log", vec![
1006 /// ("collection", Value::String("sensitive_data".into())),
1007 /// ("action", Value::String(format!("{:?}", event.change_type))),
1008 /// ("document_id", Value::String(event.id.clone())),
1009 /// ("timestamp", Value::String(chrono::Utc::now().to_rfc3339())),
1010 /// ]).await?;
1011 /// }
1012 /// });
1013 /// ```
1014 ///
1015 /// **Data Synchronization:**
1016 /// ```
1017 /// let mut listener = db.listen("users");
1018 ///
1019 /// tokio::spawn(async move {
1020 /// while let Ok(event) = listener.recv().await {
1021 /// // Sync changes to external system
1022 /// match event.change_type {
1023 /// ChangeType::Insert | ChangeType::Update => {
1024 /// if let Some(doc) = event.document {
1025 /// external_api.upsert_user(&doc).await?;
1026 /// }
1027 /// },
1028 /// ChangeType::Delete => {
1029 /// external_api.delete_user(&event.id).await?;
1030 /// },
1031 /// }
1032 /// }
1033 /// });
1034 /// ```
1035 ///
1036 /// **Real-Time Notifications:**
1037 /// ```
1038 /// let mut listener = db.listen("messages");
1039 ///
1040 /// tokio::spawn(async move {
1041 /// while let Ok(event) = listener.recv().await {
1042 /// if event.change_type == ChangeType::Insert {
1043 /// if let Some(msg) = event.document {
1044 /// // Push notification to connected websockets
1045 /// if let Some(recipient) = msg.data.get("recipient_id") {
1046 /// websocket_manager.send_to_user(recipient, &msg).await;
1047 /// }
1048 /// }
1049 /// }
1050 /// }
1051 /// });
1052 /// ```
1053 ///
1054 /// **Filtered Listener:**
1055 /// ```
1056 /// use aurora_db::pubsub::EventFilter;
1057 ///
1058 /// // Only listen for inserts
1059 /// let mut listener = db.listen("users")
1060 /// .filter(EventFilter::ChangeType(ChangeType::Insert));
1061 ///
1062 /// // Only listen for documents with specific field value
1063 /// let mut listener = db.listen("users")
1064 /// .filter(EventFilter::FieldEquals("role".to_string(), Value::String("admin".into())));
1065 /// ```
1066 ///
1067 /// # Important Notes
1068 /// - Listener stays active until dropped
1069 /// - Events are delivered in order
1070 /// - Each listener has its own event stream
1071 /// - Use filters to reduce unnecessary event processing
1072 /// - Listeners don't affect write performance
1073 ///
1074 /// # See Also
1075 /// - `listen_all()` to listen to all collections
1076 /// - `ChangeListener::filter()` to filter events
1077 /// - `query().watch()` for reactive queries with filtering
1078 pub fn listen(&self, collection: impl Into<String>) -> crate::pubsub::ChangeListener {
1079 self.pubsub.listen(collection)
1080 }
1081
1082 /// Listen for all changes across all collections
1083 ///
1084 /// Returns a stream of change events for every insert, update, and delete
1085 /// operation across the entire database. Useful for global audit logging,
1086 /// replication, and monitoring systems.
1087 ///
1088 /// # Performance
1089 /// - Same performance as single collection listener
1090 /// - Filter events by collection in your handler
1091 /// - Consider using `listen(collection)` if only watching specific collections
1092 ///
1093 /// # Examples
1094 ///
1095 /// ```
1096 /// use aurora_db::Aurora;
1097 ///
1098 /// let db = Aurora::open("mydb.db")?;
1099 ///
1100 /// // Listen to everything
1101 /// let mut listener = db.listen_all();
1102 ///
1103 /// tokio::spawn(async move {
1104 /// while let Ok(event) = listener.recv().await {
1105 /// println!("Change in {}: {:?}", event.collection, event.change_type);
1106 /// }
1107 /// });
1108 /// ```
1109 ///
1110 /// # Real-World Use Cases
1111 ///
1112 /// **Global Audit Trail:**
1113 /// ```
1114 /// let mut listener = db.listen_all();
1115 ///
1116 /// tokio::spawn(async move {
1117 /// while let Ok(event) = listener.recv().await {
1118 /// // Log every database change
1119 /// audit_logger.log(AuditEntry {
1120 /// timestamp: chrono::Utc::now(),
1121 /// collection: event.collection,
1122 /// action: event.change_type,
1123 /// document_id: event.id,
1124 /// user_id: get_current_user_id(),
1125 /// }).await;
1126 /// }
1127 /// });
1128 /// ```
1129 ///
1130 /// **Database Replication:**
1131 /// ```
1132 /// let mut listener = db.listen_all();
1133 ///
1134 /// tokio::spawn(async move {
1135 /// while let Ok(event) = listener.recv().await {
1136 /// // Replicate to secondary database
1137 /// replica_db.apply_change(event).await?;
1138 /// }
1139 /// });
1140 /// ```
1141 ///
1142 /// **Change Data Capture (CDC):**
1143 /// ```
1144 /// let mut listener = db.listen_all();
1145 ///
1146 /// tokio::spawn(async move {
1147 /// while let Ok(event) = listener.recv().await {
1148 /// // Stream changes to Kafka/RabbitMQ
1149 /// kafka_producer.send(
1150 /// &format!("cdc.{}", event.collection),
1151 /// serde_json::to_string(&event)?
1152 /// ).await?;
1153 /// }
1154 /// });
1155 /// ```
1156 ///
1157 /// **Monitoring & Metrics:**
1158 /// ```
1159 /// use std::sync::atomic::{AtomicUsize, Ordering};
1160 ///
1161 /// let write_counter = Arc::new(AtomicUsize::new(0));
1162 /// let counter_clone = Arc::clone(&write_counter);
1163 ///
1164 /// let mut listener = db.listen_all();
1165 ///
1166 /// tokio::spawn(async move {
1167 /// while let Ok(_event) = listener.recv().await {
1168 /// counter_clone.fetch_add(1, Ordering::Relaxed);
1169 /// }
1170 /// });
1171 ///
1172 /// // Report metrics every 60 seconds
1173 /// tokio::spawn(async move {
1174 /// loop {
1175 /// tokio::time::sleep(Duration::from_secs(60)).await;
1176 /// let count = write_counter.swap(0, Ordering::Relaxed);
1177 /// println!("Writes per minute: {}", count);
1178 /// }
1179 /// });
1180 /// ```
1181 ///
1182 /// **Selective Processing:**
1183 /// ```
1184 /// let mut listener = db.listen_all();
1185 ///
1186 /// tokio::spawn(async move {
1187 /// while let Ok(event) = listener.recv().await {
1188 /// // Handle different collections differently
1189 /// match event.collection.as_str() {
1190 /// "users" => handle_user_change(event).await,
1191 /// "orders" => handle_order_change(event).await,
1192 /// "payments" => handle_payment_change(event).await,
1193 /// _ => {} // Ignore others
1194 /// }
1195 /// }
1196 /// });
1197 /// ```
1198 ///
1199 /// # When to Use
1200 /// - Global audit logging
1201 /// - Database replication
1202 /// - Change data capture (CDC)
1203 /// - Monitoring and metrics
1204 /// - Event sourcing systems
1205 ///
1206 /// # When NOT to Use
1207 /// - Only need to watch 1-2 collections → Use `listen(collection)` instead
1208 /// - High write volume with selective interest → Use collection-specific listeners
1209 /// - Need complex filtering → Use `query().watch()` instead
1210 ///
1211 /// # See Also
1212 /// - `listen()` for single collection listening
1213 /// - `listener_count()` to check active listeners
1214 /// - `query().watch()` for filtered reactive queries
1215 pub fn listen_all(&self) -> crate::pubsub::ChangeListener {
1216 self.pubsub.listen_all()
1217 }
1218
1219 /// Get the number of active listeners for a collection
1220 pub fn listener_count(&self, collection: &str) -> usize {
1221 self.pubsub.listener_count(collection)
1222 }
1223
1224 /// Get total number of active listeners
1225 pub fn total_listeners(&self) -> usize {
1226 self.pubsub.total_listeners()
1227 }
1228
1229 /// Flushes all buffered writes to disk to ensure durability.
1230 ///
1231 /// This method forces all pending writes from:
1232 /// - Write buffer (if enabled)
1233 /// - Cold storage internal buffers
1234 /// - Write-ahead log (if enabled)
1235 ///
1236 /// Call this when you need to ensure data persistence before
1237 /// a critical operation or shutdown. After flush() completes,
1238 /// all data is guaranteed to be on disk even if power fails.
1239 ///
1240 /// # Performance
1241 /// - Flush time: ~10-50ms depending on buffered data
1242 /// - Triggers OS-level fsync() for durability guarantee
1243 /// - Truncates WAL after successful flush
1244 /// - Not needed for every write (WAL provides durability)
1245 ///
1246 /// # Examples
1247 ///
1248 /// ```
1249 /// use aurora_db::Aurora;
1250 ///
1251 /// let db = Aurora::open("mydb.db")?;
1252 ///
1253 /// // Basic flush after critical write
1254 /// db.insert_into("users", data).await?;
1255 /// db.flush()?; // Ensure data is persisted to disk
1256 ///
1257 /// // Graceful shutdown pattern
1258 /// fn shutdown(db: &Aurora) -> Result<()> {
1259 /// println!("Flushing pending writes...");
1260 /// db.flush()?;
1261 /// println!("Shutdown complete - all data persisted");
1262 /// Ok(())
1263 /// }
1264 ///
1265 /// // Periodic checkpoint pattern
1266 /// use std::time::Duration;
1267 /// use std::thread;
1268 ///
1269 /// let db = db.clone();
1270 /// thread::spawn(move || {
1271 /// loop {
1272 /// thread::sleep(Duration::from_secs(60));
1273 /// if let Err(e) = db.flush() {
1274 /// eprintln!("Flush error: {}", e);
1275 /// } else {
1276 /// println!("Checkpoint: data flushed to disk");
1277 /// }
1278 /// }
1279 /// });
1280 ///
1281 /// // Critical transaction pattern
1282 /// let tx_id = db.begin_transaction();
1283 ///
1284 /// // Multiple operations
1285 /// db.insert_into("orders", order_data).await?;
1286 /// db.update_document("inventory", product_id, updates).await?;
1287 /// db.insert_into("audit_log", audit_data).await?;
1288 ///
1289 /// // Commit and flush immediately
1290 /// db.commit_transaction(tx_id)?;
1291 /// db.flush()?; // Critical: ensure transaction is on disk
1292 ///
1293 /// // Backup preparation
1294 /// println!("Preparing backup...");
1295 /// db.flush()?; // Ensure all data is written
1296 /// std::fs::copy("mydb.db", "backup.db")?;
1297 /// println!("Backup complete");
1298 /// ```
1299 ///
1300 /// # When to Use
1301 /// - Before graceful shutdown
1302 /// - After critical transactions
1303 /// - Before creating backups
1304 /// - Periodic checkpoints (every 30-60 seconds)
1305 /// - Before risky operations
1306 ///
1307 /// # When NOT to Use
1308 /// - After every single write (too slow, WAL provides durability)
1309 /// - In high-throughput loops (batch instead)
1310 /// - When durability mode is already Immediate
1311 ///
1312 /// # Important Notes
1313 /// - WAL provides durability even without explicit flush()
1314 /// - flush() adds latency (~10-50ms) so use strategically
1315 /// - Automatic flush happens during graceful shutdown
1316 /// - After flush(), WAL is truncated (data is in main storage)
1317 ///
1318 /// # See Also
1319 /// - `Aurora::with_config()` to set durability mode
1320 /// - WAL (Write-Ahead Log) provides durability without explicit flushes
1321 pub fn flush(&self) -> Result<()> {
1322 // Flush write buffer if present
1323 if let Some(ref write_buffer) = self.write_buffer {
1324 write_buffer.flush()?;
1325 }
1326
1327 // Flush cold storage
1328 self.cold.flush()?;
1329
1330 // Truncate WAL after successful flush (data is now in cold storage)
1331 if let Some(ref wal) = self.wal
1332 && let Ok(mut wal_lock) = wal.write()
1333 {
1334 wal_lock.truncate()?;
1335 }
1336
1337 Ok(())
1338 }
1339
1340 /// Store a key-value pair (low-level storage)
1341 ///
1342 /// This is the low-level method. For documents, use `insert_into()` instead.
1343 /// Writes are buffered and batched for performance.
1344 ///
1345 /// # Arguments
1346 /// * `key` - Unique key (format: "collection:id" for documents)
1347 /// * `value` - Raw bytes to store
1348 /// * `ttl` - Optional time-to-live (None = permanent)
1349 ///
1350 /// # Performance
1351 /// - Buffered writes: ~15-30K docs/sec
1352 /// - Batching improves throughput significantly
1353 /// - Call `flush()` to ensure data is persisted
1354 ///
1355 /// # Examples
1356 ///
1357 /// ```
1358 /// use std::time::Duration;
1359 ///
1360 /// // Permanent storage
1361 /// let data = serde_json::to_vec(&my_struct)?;
1362 /// db.put("mykey".to_string(), data, None)?;
1363 ///
1364 /// // With TTL (expires after 1 hour)
1365 /// db.put("session:abc".to_string(), session_data, Some(Duration::from_secs(3600)))?;
1366 ///
1367 /// // Better: use insert_into() for documents
1368 /// db.insert_into("users", vec![("name", Value::String("Alice".into()))])?;
1369 /// ```
1370 pub async fn put(&self, key: String, value: Vec<u8>, ttl: Option<Duration>) -> Result<()> {
1371 const MAX_BLOB_SIZE: usize = 50 * 1024 * 1024;
1372
1373 if value.len() > MAX_BLOB_SIZE {
1374 return Err(AqlError::invalid_operation(format!(
1375 "Blob size {} exceeds maximum allowed size of {}MB",
1376 value.len() / (1024 * 1024),
1377 MAX_BLOB_SIZE / (1024 * 1024)
1378 )));
1379 }
1380
1381 // --- OPTIMIZATION: Wrap in Arc ONCE ---
1382 let key_arc = Arc::new(key);
1383 let value_arc = Arc::new(value);
1384
1385 // Check if this is a blob (blobs bypass write buffer and hot cache)
1386 let is_blob = value_arc.starts_with(b"BLOB:");
1387
1388 // --- 1. WAL Write (Non-blocking send of Arcs) ---
1389 if let Some(ref sender) = self.wal_writer
1390 && self.config.durability_mode != DurabilityMode::None
1391 {
1392 sender
1393 .send(WalOperation::Put {
1394 key: Arc::clone(&key_arc),
1395 value: Arc::clone(&value_arc),
1396 })
1397 .map_err(|_| {
1398 AqlError::new(
1399 ErrorCode::InternalError,
1400 "WAL writer channel closed".to_string(),
1401 )
1402 })?;
1403 }
1404
1405 // Check if this key should be cached (false for Any-field collections)
1406 let should_cache = self.should_cache_key(&key_arc);
1407
1408 // --- 2. Cold Store Write ---
1409 if is_blob || !should_cache {
1410 // Blobs and Any-field docs write directly to cold storage
1411 // (blobs: avoid memory pressure, Any-fields: ensure immediate queryability)
1412 self.cold.set(key_arc.to_string(), value_arc.to_vec())?;
1413 } else if let Some(ref write_buffer) = self.write_buffer {
1414 write_buffer.write(Arc::clone(&key_arc), Arc::clone(&value_arc))?;
1415 } else {
1416 self.cold.set(key_arc.to_string(), value_arc.to_vec())?;
1417 }
1418
1419 // --- 3. Hot Cache Write ---
1420 if should_cache {
1421 if is_blob {
1422 // For blobs: cache only a lightweight reference (not the actual data)
1423 // Format: "BLOBREF:<size>" - just 16-24 bytes instead of potentially MB
1424 let blob_ref = format!("BLOBREF:{}", value_arc.len());
1425 self.hot
1426 .set(Arc::clone(&key_arc), Arc::new(blob_ref.into_bytes()), ttl);
1427 } else {
1428 self.hot
1429 .set(Arc::clone(&key_arc), Arc::clone(&value_arc), ttl);
1430 }
1431 }
1432
1433 // --- 4. Indexing (skip for blobs and system keys) ---
1434 if !is_blob {
1435 if let Some(collection_name) = key_arc.split(':').next()
1436 && !collection_name.starts_with('_')
1437 {
1438 self.index_value(collection_name, &key_arc, &value_arc)?;
1439 }
1440 }
1441
1442 Ok(())
1443 }
1444 /// Replay WAL entries to recover from crash
1445 ///
1446 /// Handles transaction boundaries:
1447 /// - Operations within a committed transaction are applied
1448 /// - Operations within a rolled-back transaction are discarded
1449 /// - Operations within an uncommitted transaction (crash during tx) are discarded
1450 async fn replay_wal(&self, entries: Vec<crate::wal::LogEntry>) -> Result<()> {
1451 // Buffer for operations within a transaction
1452 let mut tx_buffer: Vec<crate::wal::LogEntry> = Vec::new();
1453 let mut in_transaction = false;
1454
1455 for entry in entries {
1456 match entry.operation {
1457 Operation::BeginTx => {
1458 // Start buffering operations
1459 in_transaction = true;
1460 tx_buffer.clear();
1461 }
1462 Operation::CommitTx => {
1463 // Apply all buffered operations
1464 for buffered_entry in tx_buffer.drain(..) {
1465 self.apply_wal_entry(buffered_entry).await?;
1466 }
1467 in_transaction = false;
1468 }
1469 Operation::RollbackTx => {
1470 // Discard buffered operations
1471 tx_buffer.clear();
1472 in_transaction = false;
1473 }
1474 Operation::Put | Operation::Delete => {
1475 if in_transaction {
1476 // Buffer the operation for later
1477 tx_buffer.push(entry);
1478 } else {
1479 // Apply immediately (not in transaction)
1480 self.apply_wal_entry(entry).await?;
1481 }
1482 }
1483 }
1484 }
1485
1486 // If we end with in_transaction = true, it means we crashed mid-transaction
1487 // Those operations in tx_buffer are discarded (not committed)
1488 if in_transaction {
1489 eprintln!(
1490 "WAL replay: Discarding {} uncommitted transaction operations",
1491 tx_buffer.len()
1492 );
1493 }
1494
1495 // Flush after replay and truncate WAL
1496 self.cold.flush()?;
1497 if let Some(ref wal) = self.wal {
1498 wal.write().unwrap().truncate()?;
1499 }
1500
1501 Ok(())
1502 }
1503
1504 /// Apply a single WAL entry to storage
1505 async fn apply_wal_entry(&self, entry: crate::wal::LogEntry) -> Result<()> {
1506 match entry.operation {
1507 Operation::Put => {
1508 if let Some(value) = entry.value {
1509 // Write directly to cold storage (skip WAL, already logged)
1510 self.cold.set(entry.key.clone(), value.clone())?;
1511
1512 // Update hot cache
1513 if self.should_cache_key(&entry.key) {
1514 self.hot
1515 .set(Arc::new(entry.key.clone()), Arc::new(value.clone()), None);
1516 }
1517
1518 // Rebuild indices
1519 if let Some(collection) = entry.key.split(':').next()
1520 && !collection.starts_with('_')
1521 {
1522 self.index_value(collection, &entry.key, &value)?;
1523 }
1524 }
1525 }
1526 Operation::Delete => {
1527 // Remove from indices before deleting the data
1528 if let Some(collection) = entry.key.split(':').next()
1529 && !collection.starts_with('_')
1530 {
1531 let _ = self.remove_from_index(collection, &entry.key);
1532 }
1533 self.cold.delete(&entry.key)?;
1534 self.hot.delete(&entry.key);
1535 }
1536 _ => {} // Transaction markers handled in replay_wal
1537 }
1538 Ok(())
1539 }
1540
1541 fn ensure_schema_hot(&self, collection: &str) -> Result<Arc<Collection>> {
1542 // 1. Check the high-performance object cache first (O(1))
1543 if let Some(schema) = self.schema_cache.get(collection) {
1544 return Ok(schema.value().clone());
1545 }
1546
1547 let collection_key = format!("_collection:{}", collection);
1548
1549 // 2. Fallback to Hot Byte Cache (parse if found)
1550 if let Some(data) = self.hot.get(&collection_key) {
1551 return serde_json::from_slice::<Collection>(&data)
1552 .map(|s| {
1553 let arc_s = Arc::new(s);
1554 // Populate object cache to avoid this parse next time
1555 self.schema_cache
1556 .insert(collection.to_string(), arc_s.clone());
1557 arc_s
1558 })
1559 .map_err(|_| {
1560 AqlError::new(
1561 ErrorCode::SchemaError,
1562 "Failed to parse schema from hot cache",
1563 )
1564 });
1565 }
1566
1567 // 3. Fallback to Cold Storage
1568 if let Some(data) = self.get(&collection_key)? {
1569 // Update Hot Cache
1570 self.hot.set(
1571 Arc::new(collection_key.clone()),
1572 Arc::new(data.clone()),
1573 None,
1574 );
1575
1576 // Parse and update Schema Cache
1577 let schema = serde_json::from_slice::<Collection>(&data)?;
1578 let arc_schema = Arc::new(schema);
1579 self.schema_cache
1580 .insert(collection.to_string(), arc_schema.clone());
1581
1582 return Ok(arc_schema);
1583 }
1584
1585 Err(AqlError::new(
1586 ErrorCode::SchemaError,
1587 format!("Failed to load schema for collection '{}'", collection),
1588 ))
1589 }
1590
1591 fn index_value(&self, collection: &str, key: &str, value: &[u8]) -> Result<()> {
1592 // Update primary index with metadata only
1593 let location = DiskLocation::new(value.len());
1594 self.primary_indices
1595 .entry(collection.to_string())
1596 .or_default()
1597 .insert(key.to_string(), location);
1598
1599 // Use the optimized helper (Fast path via schema_cache)
1600 let collection_obj = self.ensure_schema_hot(collection)?;
1601
1602 // Build list of fields that should be indexed
1603 let indexed_fields: Vec<String> = collection_obj
1604 .fields
1605 .iter()
1606 .filter(|(_, def)| def.indexed || def.unique)
1607 .map(|(name, _)| name.clone())
1608 .collect();
1609
1610 if indexed_fields.is_empty() {
1611 return Ok(());
1612 }
1613
1614 // Update secondary indices
1615 if let Ok(doc) = serde_json::from_slice::<Document>(value) {
1616 for (field, field_value) in doc.data {
1617 if !indexed_fields.contains(&field) {
1618 continue;
1619 }
1620
1621 let value_str = match &field_value {
1622 Value::String(s) => s.clone(),
1623 _ => field_value.to_string(),
1624 };
1625
1626 let index_key = format!("{}:{}", collection, field);
1627 let secondary_index = self.secondary_indices.entry(index_key).or_default();
1628
1629 let max_entries = self.config.max_index_entries_per_field;
1630
1631 secondary_index
1632 .entry(value_str)
1633 .and_modify(|doc_ids| {
1634 if doc_ids.len() < max_entries {
1635 doc_ids.push(key.to_string());
1636 }
1637 })
1638 .or_insert_with(|| vec![key.to_string()]);
1639 }
1640 }
1641 Ok(())
1642 }
1643
1644 /// Remove a document from all indices (called during delete operations)
1645 fn remove_from_index(&self, collection: &str, key: &str) -> Result<()> {
1646 // Remove from primary index
1647 if let Some(primary_index) = self.primary_indices.get_mut(collection) {
1648 primary_index.remove(key);
1649 }
1650
1651 // Get the document data to know which secondary index entries to remove
1652 // Try cold storage first since we may be replaying WAL
1653 let doc_data = match self.cold.get(key) {
1654 Ok(Some(data)) => Some(data),
1655 _ => self.hot.get(key),
1656 };
1657
1658 // If we have the document data, remove from secondary indices
1659 if let Some(data) = doc_data {
1660 if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
1661 for (field, field_value) in &doc.data {
1662 let value_str = match field_value {
1663 Value::String(s) => s.clone(),
1664 _ => field_value.to_string(),
1665 };
1666
1667 let index_key = format!("{}:{}", collection, field);
1668 if let Some(secondary_index) = self.secondary_indices.get_mut(&index_key) {
1669 if let Some(mut doc_ids) = secondary_index.get_mut(&value_str) {
1670 doc_ids.retain(|id| id != key);
1671 // If empty, we could remove the entry but it's not strictly necessary
1672 }
1673 }
1674 }
1675 }
1676 }
1677
1678 Ok(())
1679 }
1680
1681 /// Scan collection with filter and early termination support
1682 /// Used by QueryBuilder for optimized queries with LIMIT
1683 pub fn scan_and_filter<F>(
1684 &self,
1685 collection: &str,
1686 filter: F,
1687 limit: Option<usize>,
1688 ) -> Result<Vec<Document>>
1689 where
1690 F: Fn(&Document) -> bool,
1691 {
1692 let mut documents = Vec::new();
1693
1694 if let Some(index) = self.primary_indices.get(collection) {
1695 for entry in index.iter() {
1696 // Early termination
1697 if let Some(max) = limit {
1698 if documents.len() >= max {
1699 break;
1700 }
1701 }
1702
1703 let key = entry.key();
1704 if let Some(data) = self.get(key)? {
1705 if let Ok(mut doc) = serde_json::from_slice::<Document>(&data) {
1706 // Apply computed fields
1707 if let Ok(computed) = self.computed.read() {
1708 let _ = computed.apply(collection, &mut doc);
1709 }
1710
1711 if filter(&doc) {
1712 documents.push(doc);
1713 }
1714 }
1715 }
1716 }
1717 } else {
1718 // Fallback: scan from cold storage if primary index not yet initialized
1719 // Optimization: Apply filter and limit during scan to avoid full load
1720 let prefix = format!("{}:", collection);
1721 for result in self.cold.scan_prefix(&prefix) {
1722 // Early termination
1723 if let Some(max) = limit {
1724 if documents.len() >= max {
1725 break;
1726 }
1727 }
1728
1729 if let Ok((_key, value)) = result {
1730 if let Ok(mut doc) = serde_json::from_slice::<Document>(&value) {
1731 // Apply computed fields
1732 if let Ok(computed) = self.computed.read() {
1733 let _ = computed.apply(collection, &mut doc);
1734 }
1735
1736 if filter(&doc) {
1737 documents.push(doc);
1738 }
1739 }
1740 }
1741 }
1742 }
1743
1744 Ok(documents)
1745 }
1746
1747 /// Smart collection scan that uses the primary index as a key directory
1748 /// Avoids forced flushes and leverages hot cache for better performance
1749 fn scan_collection(&self, collection: &str) -> Result<Vec<Document>> {
1750 let mut documents = Vec::new();
1751
1752 // Use the primary index as a "key directory" - it contains all document keys
1753 if let Some(index) = self.primary_indices.get(collection) {
1754 // Iterate through all keys in the primary index (fast, in-memory)
1755 for entry in index.iter() {
1756 let key = entry.key();
1757
1758 // Fetch document via hot cache -> cold storage fallback
1759 if let Some(data) = self.get(key)? {
1760 if let Ok(mut doc) = serde_json::from_slice::<Document>(&data) {
1761 // Apply computed fields
1762 if let Ok(computed) = self.computed.read() {
1763 let _ = computed.apply(collection, &mut doc);
1764 }
1765 documents.push(doc);
1766 }
1767 }
1768 }
1769 } else {
1770 // Fallback: scan from cold storage if primary index not yet initialized
1771 let prefix = format!("{}:", collection);
1772 for result in self.cold.scan_prefix(&prefix) {
1773 if let Ok((_key, value)) = result {
1774 if let Ok(mut doc) = serde_json::from_slice::<Document>(&value) {
1775 // Apply computed fields
1776 if let Ok(computed) = self.computed.read() {
1777 let _ = computed.apply(collection, &mut doc);
1778 }
1779 documents.push(doc);
1780 }
1781 }
1782 }
1783 }
1784
1785 Ok(documents)
1786 }
1787
1788 // Restore missing methods
1789 pub async fn put_blob(&self, key: String, file_path: &Path) -> Result<()> {
1790 const MAX_FILE_SIZE: usize = 50 * 1024 * 1024; // 50MB limit
1791
1792 // Get file metadata to check size before reading
1793 let metadata = tokio::fs::metadata(file_path).await?;
1794 let file_size = metadata.len() as usize;
1795
1796 if file_size > MAX_FILE_SIZE {
1797 return Err(AqlError::invalid_operation(format!(
1798 "File size {} MB exceeds maximum allowed size of {} MB",
1799 file_size / (1024 * 1024),
1800 MAX_FILE_SIZE / (1024 * 1024)
1801 )));
1802 }
1803
1804 let mut file = File::open(file_path).await?;
1805 let mut buffer = Vec::new();
1806 file.read_to_end(&mut buffer).await?;
1807
1808 // Add BLOB: prefix to mark this as blob data
1809 let mut blob_data = Vec::with_capacity(5 + buffer.len());
1810 blob_data.extend_from_slice(b"BLOB:");
1811 blob_data.extend_from_slice(&buffer);
1812
1813 self.put(key, blob_data, None).await
1814 }
1815
1816 /// Create a new collection with schema definition
1817 ///
1818 /// Collections are like tables in SQL - they define the structure of your documents.
1819 /// The third boolean parameter indicates if the field should be indexed for fast lookups.
1820 ///
1821 /// # Arguments
1822 /// * `name` - Collection name
1823 /// * `fields` - Vector of (field_name, field_type, indexed) tuples
1824 /// - Field name (accepts both &str and String)
1825 /// - Field type (String, Int, Float, Bool, etc.)
1826 /// - Indexed: true for fast lookups, false for no index
1827 ///
1828 /// # Performance
1829 /// - Indexed fields: Fast equality queries (O(1) lookup)
1830 /// - Non-indexed fields: Full scan required for queries
1831 /// - Unique fields are automatically indexed
1832 ///
1833 /// # Examples
1834 ///
1835 /// ```
1836 /// use aurora_db::{Aurora, types::FieldType};
1837 ///
1838 /// let db = Aurora::open("mydb.db")?;
1839 ///
1840 /// // Create a users collection
1841 /// db.new_collection("users", vec![
1842 /// ("name", FieldType::String, false), // Not indexed
1843 /// ("email", FieldType::String, true), // Indexed - fast lookups
1844 /// ("age", FieldType::Int, false),
1845 /// ("active", FieldType::Bool, true), // Indexed
1846 /// ("score", FieldType::Float, false),
1847 /// ])?;
1848 ///
1849 /// // Idempotent - calling again is safe
1850 /// db.new_collection("users", vec![/* ... */])?.await; // OK!
1851 /// ```
1852 pub async fn new_collection<F: IntoFieldDefinition>(
1853 &self,
1854 name: &str,
1855 fields: Vec<F>,
1856 ) -> Result<()> {
1857 let collection_key = format!("_collection:{}", name);
1858
1859 // Check if collection already exists - if so, just return Ok (idempotent)
1860 if self.get(&collection_key)?.is_some() {
1861 return Ok(());
1862 }
1863
1864 // Create field definitions
1865 let mut field_definitions = HashMap::new();
1866 for field in fields {
1867 let (field_name, field_def) = field.into_field_definition();
1868
1869 if field_def.field_type == FieldType::Any && field_def.unique {
1870 return Err(AqlError::new(
1871 ErrorCode::InvalidDefinition,
1872 "Fields of type 'Any' cannot be unique or indexed.".to_string(),
1873 ));
1874 }
1875
1876 field_definitions.insert(field_name, field_def);
1877 }
1878
1879 let collection = Collection {
1880 name: name.to_string(),
1881 fields: field_definitions,
1882 // REMOVED: unique_fields is now derived from fields
1883 };
1884
1885 let collection_data = serde_json::to_vec(&collection)?;
1886 self.put(collection_key, collection_data, None).await?;
1887
1888 // Invalidate schema cache since we just created/updated the collection schema
1889 self.schema_cache.remove(name);
1890
1891 Ok(())
1892 }
1893
1894 /// Insert a document into a collection
1895 ///
1896 /// Automatically generates a UUID for the document and validates against
1897 /// collection schema and unique constraints. Returns the generated document ID.
1898 ///
1899 /// # Performance
1900 /// - Single insert: ~15,000 docs/sec
1901 /// - Bulk insert: Use `batch_insert()` for 10+ documents (~50,000 docs/sec)
1902 /// - Triggers PubSub events for real-time listeners
1903 ///
1904 /// # Arguments
1905 /// * `collection` - Name of the collection to insert into
1906 /// * `data` - Document fields and values to insert
1907 ///
1908 /// # Returns
1909 /// The auto-generated ID of the inserted document or an error
1910 ///
1911 /// # Errors
1912 /// - `CollectionNotFound`: Collection doesn't exist
1913 /// - `ValidationError`: Data violates schema or unique constraints
1914 /// - `SerializationError`: Invalid data format
1915 ///
1916 /// # Examples
1917 ///
1918
1919 /// use aurora_db::{Aurora, types::Value};
1920 ///
1921 /// let db = Aurora::open("mydb.db")?;
1922 ///
1923 /// // Basic insertion
1924 /// let user_id = db.insert_into("users", vec![
1925 /// ("name", Value::String("Alice Smith".to_string())),
1926 /// ("email", Value::String("alice@example.com".to_string())),
1927 /// ("age", Value::Int(28)),
1928 /// ("active", Value::Bool(true)),
1929 /// ]).await?;
1930 ///
1931 /// println!("Created user with ID: {}", user_id);
1932 ///
1933 /// // Inserting with nested data
1934 /// let order_id = db.insert_into("orders", vec![
1935 /// ("user_id", Value::String(user_id.clone())),
1936 /// ("total", Value::Float(99.99)),
1937 /// ("status", Value::String("pending".to_string())),
1938 /// ("items", Value::Array(vec![
1939 /// Value::String("item-123".to_string()),
1940 /// Value::String("item-456".to_string()),
1941 /// ])),
1942 /// ]).await?;
1943 ///
1944 /// // Error handling - unique constraint violation
1945 /// match db.insert_into("users", vec![
1946 /// ("email", Value::String("alice@example.com".to_string())), // Duplicate!
1947 /// ("name", Value::String("Alice Clone".to_string())),
1948 /// ]).await {
1949 /// Ok(id) => println!("Inserted: {}", id),
1950 /// Err(e) => println!("Failed: {} (email already exists)", e),
1951 /// }
1952 ///
1953 /// // For bulk inserts (10+ documents), use batch_insert() instead
1954 /// let users = vec![
1955 /// HashMap::from([
1956 /// ("name".to_string(), Value::String("Bob".to_string())),
1957 /// ("email".to_string(), Value::String("bob@example.com".to_string())),
1958 /// ]),
1959 /// HashMap::from([
1960 /// ("name".to_string(), Value::String("Carol".to_string())),
1961 /// ("email".to_string(), Value::String("carol@example.com".to_string())),
1962 /// ]),
1963 /// // ... more documents
1964 /// ];
1965 /// let ids = db.batch_insert("users", users).await?; // 3x faster!
1966 /// println!("Inserted {} users", ids.len());
1967 /// ```
1968 pub async fn insert_into(&self, collection: &str, data: Vec<(&str, Value)>) -> Result<String> {
1969 // Convert Vec<(&str, Value)> to HashMap<String, Value>
1970 let data_map: HashMap<String, Value> =
1971 data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
1972
1973 // Validate unique constraints before inserting
1974 self.validate_unique_constraints(collection, &data_map)
1975 .await?;
1976
1977 let doc_id = Uuid::now_v7().to_string();
1978 let document = Document {
1979 id: doc_id.clone(),
1980 data: data_map,
1981 };
1982
1983 self.put(
1984 format!("{}:{}", collection, doc_id),
1985 serde_json::to_vec(&document)?,
1986 None,
1987 )
1988 .await?;
1989
1990 // Publish insert event
1991 let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
1992 let _ = self.pubsub.publish(event);
1993
1994 Ok(doc_id)
1995 }
1996
1997 pub async fn insert_map(
1998 &self,
1999 collection: &str,
2000 data: HashMap<String, Value>,
2001 ) -> Result<String> {
2002 // Validate unique constraints before inserting
2003 self.validate_unique_constraints(collection, &data).await?;
2004
2005 let doc_id = Uuid::now_v7().to_string();
2006 let document = Document {
2007 id: doc_id.clone(),
2008 data,
2009 };
2010
2011 self.put(
2012 format!("{}:{}", collection, doc_id),
2013 serde_json::to_vec(&document)?,
2014 None,
2015 )
2016 .await?;
2017
2018 // Publish insert event
2019 let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
2020 let _ = self.pubsub.publish(event);
2021
2022 Ok(doc_id)
2023 }
2024
2025 /// Batch insert multiple documents with optimized write path
2026 ///
2027 /// Inserts multiple documents in a single optimized operation, bypassing
2028 /// the write buffer for better performance. Ideal for bulk data loading,
2029 /// migrations, or initial database seeding. 3x faster than individual inserts.
2030 ///
2031 /// # Performance
2032 /// - Insert speed: ~50,000 docs/sec (vs ~15,000 for single inserts)
2033 /// - Batch writes to WAL and storage
2034 /// - Validates all unique constraints
2035 /// - Use for 10+ documents minimum
2036 ///
2037 /// # Arguments
2038 /// * `collection` - Name of the collection to insert into
2039 /// * `documents` - Vector of document data as HashMaps
2040 ///
2041 /// # Returns
2042 /// Vector of auto-generated document IDs or an error
2043 ///
2044 /// # Examples
2045 ///
2046
2047 /// use aurora_db::{Aurora, types::Value};
2048 /// use std::collections::HashMap;
2049 ///
2050 /// let db = Aurora::open("mydb.db")?;
2051 ///
2052 /// // Bulk user import
2053 /// let users = vec![
2054 /// HashMap::from([
2055 /// ("name".to_string(), Value::String("Alice".into())),
2056 /// ("email".to_string(), Value::String("alice@example.com".into())),
2057 /// ("age".to_string(), Value::Int(28)),
2058 /// ]),
2059 /// HashMap::from([
2060 /// ("name".to_string(), Value::String("Bob".into())),
2061 /// ("email".to_string(), Value::String("bob@example.com".into())),
2062 /// ("age".to_string(), Value::Int(32)),
2063 /// ]),
2064 /// HashMap::from([
2065 /// ("name".to_string(), Value::String("Carol".into())),
2066 /// ("email".to_string(), Value::String("carol@example.com".into())),
2067 /// ("age".to_string(), Value::Int(25)),
2068 /// ]),
2069 /// ];
2070 ///
2071 /// let ids = db.batch_insert("users", users).await?;
2072 /// println!("Inserted {} users", ids.len());
2073 ///
2074 /// // Seeding test data
2075 /// let test_products: Vec<HashMap<String, Value>> = (0..1000)
2076 /// .map(|i| HashMap::from([
2077 /// ("sku".to_string(), Value::String(format!("PROD-{:04}", i))),
2078 /// ("price".to_string(), Value::Float(9.99 + i as f64)),
2079 /// ("stock".to_string(), Value::Int(100)),
2080 /// ]))
2081 /// .collect();
2082 ///
2083 /// let ids = db.batch_insert("products", test_products).await?;
2084 /// // Much faster than 1000 individual insert_into() calls!
2085 ///
2086 /// // Migration from CSV data
2087 /// let mut csv_reader = csv::Reader::from_path("data.csv")?;
2088 /// let mut batch = Vec::new();
2089 ///
2090 /// for result in csv_reader.records() {
2091 /// let record = result?;
2092 /// let doc = HashMap::from([
2093 /// ("field1".to_string(), Value::String(record[0].to_string())),
2094 /// ("field2".to_string(), Value::String(record[1].to_string())),
2095 /// ]);
2096 /// batch.push(doc);
2097 ///
2098 /// // Insert in batches of 1000
2099 /// if batch.len() >= 1000 {
2100 /// db.batch_insert("imported_data", batch.clone()).await?;
2101 /// batch.clear();
2102 /// }
2103 /// }
2104 ///
2105 /// // Insert remaining
2106 /// if !batch.is_empty() {
2107 /// db.batch_insert("imported_data", batch).await?;
2108 /// }
2109 /// ```
2110 ///
2111 /// # Errors
2112 /// - `ValidationError`: Unique constraint violation on any document
2113 /// - `CollectionNotFound`: Collection doesn't exist
2114 /// - `IoError`: Storage write failure
2115 ///
2116 /// # Important Notes
2117 /// - All inserts are atomic - if one fails, none are inserted
2118 /// - UUIDs are auto-generated for all documents
2119 /// - PubSub events are published for each insert
2120 /// - For 10+ documents, this is 3x faster than individual inserts
2121 /// - For < 10 documents, use `insert_into()` instead
2122 ///
2123 /// # See Also
2124 /// - `insert_into()` for single document inserts
2125 /// - `import_from_json()` for file-based bulk imports
2126 /// - `batch_write()` for low-level batch operations
2127 pub async fn batch_insert(
2128 &self,
2129 collection: &str,
2130 documents: Vec<HashMap<String, Value>>,
2131 ) -> Result<Vec<String>> {
2132 let mut doc_ids = Vec::with_capacity(documents.len());
2133 let mut pairs = Vec::with_capacity(documents.len());
2134
2135 // Prepare all documents
2136 for data in documents {
2137 // Validate unique constraints
2138 self.validate_unique_constraints(collection, &data).await?;
2139
2140 let doc_id = Uuid::now_v7().to_string();
2141 let document = Document {
2142 id: doc_id.clone(),
2143 data,
2144 };
2145
2146 let key = format!("{}:{}", collection, doc_id);
2147 let value = serde_json::to_vec(&document)?;
2148
2149 pairs.push((key, value));
2150 doc_ids.push(doc_id);
2151 }
2152
2153 // Write to WAL in batch (if enabled)
2154 if let Some(ref wal) = self.wal
2155 && self.config.durability_mode != DurabilityMode::None
2156 {
2157 let mut wal_lock = wal.write().unwrap();
2158 for (key, value) in &pairs {
2159 wal_lock.append(Operation::Put, key, Some(value))?;
2160 }
2161 }
2162
2163 // Bypass write buffer - go directly to cold storage batch API
2164 self.cold.batch_set(pairs.clone())?;
2165
2166 // Note: Durability is handled by background checkpoint process
2167
2168 // Update hot cache and indices
2169 for (key, value) in pairs {
2170 if self.should_cache_key(&key) {
2171 self.hot
2172 .set(Arc::new(key.clone()), Arc::new(value.clone()), None);
2173 }
2174
2175 if let Some(collection_name) = key.split(':').next()
2176 && !collection_name.starts_with('_')
2177 {
2178 self.index_value(collection_name, &key, &value)?;
2179 }
2180 }
2181
2182 // Publish events
2183 for doc_id in &doc_ids {
2184 if let Ok(Some(doc)) = self.get_document(collection, doc_id) {
2185 let event = crate::pubsub::ChangeEvent::insert(collection, doc_id, doc);
2186 let _ = self.pubsub.publish(event);
2187 }
2188 }
2189
2190 Ok(doc_ids)
2191 }
2192
2193 /// Update a document by ID
2194 ///
2195 /// # Arguments
2196 /// * `collection` - Collection name
2197 /// * `doc_id` - Document ID to update
2198 /// * `data` - New field values to set
2199 ///
2200 /// # Returns
2201 /// Ok(()) on success, or an error if the document doesn't exist
2202 ///
2203 /// # Examples
2204 ///
2205 /// ```
2206 /// db.update_document("users", &user_id, vec![
2207 /// ("status", Value::String("active".to_string())),
2208 /// ("last_login", Value::String(chrono::Utc::now().to_rfc3339())),
2209 /// ]).await?;
2210 /// ```
2211 pub async fn update_document(
2212 &self,
2213 collection: &str,
2214 doc_id: &str,
2215 updates: Vec<(&str, Value)>,
2216 ) -> Result<()> {
2217 // Get existing document
2218 let mut document = self.get_document(collection, doc_id)?.ok_or_else(|| {
2219 AqlError::new(
2220 ErrorCode::NotFound,
2221 format!("Document not found: {}", doc_id),
2222 )
2223 })?;
2224
2225 // Store old document for event
2226 let old_document = document.clone();
2227
2228 // Apply updates
2229 for (field, value) in updates {
2230 document.data.insert(field.to_string(), value);
2231 }
2232
2233 // Validate unique constraints after update (excluding current document)
2234 self.validate_unique_constraints_excluding(collection, &document.data, doc_id)
2235 .await?;
2236
2237 // Save updated document
2238 self.put(
2239 format!("{}:{}", collection, doc_id),
2240 serde_json::to_vec(&document)?,
2241 None,
2242 )
2243 .await?;
2244
2245 // Publish update event
2246 let event =
2247 crate::pubsub::ChangeEvent::update(collection, doc_id, old_document, document.clone());
2248 let _ = self.pubsub.publish(event);
2249
2250 Ok(())
2251 }
2252
2253 pub async fn get_all_collection(&self, collection: &str) -> Result<Vec<Document>> {
2254 self.ensure_indices_initialized().await?;
2255 self.scan_collection(collection)
2256 }
2257
2258 pub fn get_data_by_pattern(&self, pattern: &str) -> Result<Vec<(String, DataInfo)>> {
2259 let mut data = Vec::new();
2260
2261 // Scan from cold storage instead of primary index
2262 for result in self.cold.scan() {
2263 if let Ok((key, value)) = result {
2264 if key.contains(pattern) {
2265 let info = if value.starts_with(b"BLOB:") {
2266 DataInfo::Blob { size: value.len() }
2267 } else {
2268 DataInfo::Data {
2269 size: value.len(),
2270 preview: String::from_utf8_lossy(&value[..value.len().min(50)])
2271 .into_owned(),
2272 }
2273 };
2274
2275 data.push((key.clone(), info));
2276 }
2277 }
2278 }
2279
2280 Ok(data)
2281 }
2282
2283 /// Begin a transaction
2284 ///
2285 /// All operations after beginning a transaction will be part of the transaction
2286 /// until either commit_transaction() or rollback_transaction() is called.
2287 ///
2288 /// # Returns
2289 /// Success or an error (e.g., if a transaction is already in progress)
2290 ///
2291 /// # Examples
2292 ///
2293
2294 /// // Start a transaction for atomic operations
2295 /// db.begin_transaction()?;
2296 ///
2297 /// // Perform multiple operations
2298 /// db.insert_into("accounts", vec![("user_id", Value::String(user_id)), ("balance", Value::Float(100.0))])?;
2299 /// db.insert_into("audit_log", vec![("action", Value::String("account_created".to_string()))])?;
2300 ///
2301 /// // Commit all changes or roll back if there's an error
2302 /// if all_ok {
2303 /// db.commit_transaction()?;
2304 /// } else {
2305 /// db.rollback_transaction()?;
2306 /// }
2307 /// ```
2308 /// Begin a new transaction for atomic operations
2309 ///
2310 /// Transactions ensure all-or-nothing execution: either all operations succeed,
2311 /// or none of them are applied. Perfect for maintaining data consistency.
2312 ///
2313 /// # Examples
2314 ///
2315
2316 /// use aurora_db::{Aurora, types::Value};
2317 ///
2318 /// let db = Aurora::open("mydb.db")?;
2319 ///
2320 /// // Start transaction
2321 /// let tx_id = db.begin_transaction();
2322 ///
2323 /// // Perform multiple operations
2324 /// db.insert_into("accounts", vec![
2325 /// ("user_id", Value::String("alice".into())),
2326 /// ("balance", Value::Int(1000)),
2327 /// ]).await?;
2328 ///
2329 /// db.insert_into("accounts", vec![
2330 /// ("user_id", Value::String("bob".into())),
2331 /// ("balance", Value::Int(500)),
2332 /// ])).await?;
2333 ///
2334 /// // Commit if all succeeded
2335 /// db.commit_transaction(tx_id)?;
2336 ///
2337 /// // Or rollback on error
2338 /// // db.rollback_transaction(tx_id)?;
2339 /// ```
2340 pub fn begin_transaction(&self) -> crate::transaction::TransactionId {
2341 let buffer = self.transaction_manager.begin();
2342 buffer.id
2343 }
2344
2345 /// Commit a transaction, making all changes permanent
2346 ///
2347 /// All operations within the transaction are atomically applied to the database.
2348 /// If any operation fails, none are applied.
2349 ///
2350 /// # Arguments
2351 /// * `tx_id` - Transaction ID returned from begin_transaction()
2352 ///
2353 /// # Examples
2354 ///
2355
2356 /// use aurora_db::{Aurora, types::Value};
2357 ///
2358 /// let db = Aurora::open("mydb.db")?;
2359 ///
2360 /// // Transfer money between accounts
2361 /// let tx_id = db.begin_transaction();
2362 ///
2363 /// // Deduct from Alice
2364 /// db.update_document("accounts", "alice", vec![
2365 /// ("balance", Value::Int(900)), // Was 1000
2366 /// ]).await?;
2367 ///
2368 /// // Add to Bob
2369 /// db.update_document("accounts", "bob", vec![
2370 /// ("balance", Value::Int(600)), // Was 500
2371 /// ]).await?;
2372 ///
2373 /// // Both updates succeed - commit them
2374 /// db.commit_transaction(tx_id)?;
2375 ///
2376 /// println!("Transfer completed!");
2377 /// ```
2378 pub fn commit_transaction(&self, tx_id: crate::transaction::TransactionId) -> Result<()> {
2379 let buffer = self
2380 .transaction_manager
2381 .active_transactions
2382 .get(&tx_id)
2383 .ok_or_else(|| {
2384 AqlError::invalid_operation(
2385 "Transaction not found or already completed".to_string(),
2386 )
2387 })?;
2388
2389 for item in buffer.writes.iter() {
2390 let key = item.key();
2391 let value = item.value();
2392 self.cold.set(key.clone(), value.clone())?;
2393 if self.should_cache_key(key) {
2394 self.hot
2395 .set(Arc::new(key.clone()), Arc::new(value.clone()), None);
2396 }
2397 if let Some(collection_name) = key.split(':').next()
2398 && !collection_name.starts_with('_')
2399 {
2400 self.index_value(collection_name, key, value)?;
2401 }
2402 }
2403
2404 for item in buffer.deletes.iter() {
2405 let key = item.key();
2406 if let Some((collection, id)) = key.split_once(':')
2407 && let Ok(Some(doc)) = self.get_document(collection, id)
2408 {
2409 self.remove_from_indices(collection, &doc)?;
2410 }
2411 self.cold.delete(key)?;
2412 self.hot.delete(key);
2413 }
2414
2415 // Drop the buffer reference to release the DashMap read lock
2416 // before calling commit which needs to remove the entry (write lock)
2417 drop(buffer);
2418
2419 self.transaction_manager.commit(tx_id)?;
2420
2421 self.cold.compact()?;
2422
2423 Ok(())
2424 }
2425
2426 /// Roll back a transaction, discarding all changes
2427 ///
2428 /// All operations within the transaction are discarded. The database state
2429 /// remains unchanged. Use this when an error occurs during transaction processing.
2430 ///
2431 /// # Arguments
2432 /// * `tx_id` - Transaction ID returned from begin_transaction()
2433 ///
2434 /// # Examples
2435 ///
2436
2437 /// use aurora_db::{Aurora, types::Value};
2438 ///
2439 /// let db = Aurora::open("mydb.db")?;
2440 ///
2441 /// // Attempt a transfer with validation
2442 /// let tx_id = db.begin_transaction();
2443 ///
2444 /// let result = async {
2445 /// // Deduct from Alice
2446 /// let alice = db.get_document("accounts", "alice").await?;
2447 /// let balance = alice.and_then(|doc| doc.data.get("balance"));
2448 ///
2449 /// if let Some(Value::Int(bal)) = balance {
2450 /// if *bal < 100 {
2451 /// return Err("Insufficient funds");
2452 /// }
2453 ///
2454 /// db.update_document("accounts", "alice", vec![
2455 /// ("balance", Value::Int(bal - 100)),
2456 /// ]).await?;
2457 ///
2458 /// db.update_document("accounts", "bob", vec![
2459 /// ("balance", Value::Int(600)),
2460 /// ]).await?;
2461 ///
2462 /// Ok(())
2463 /// } else {
2464 /// Err("Account not found")
2465 /// }
2466 /// }.await;
2467 ///
2468 /// match result {
2469 /// Ok(_) => {
2470 /// db.commit_transaction(tx_id)?;
2471 /// println!("Transfer completed");
2472 /// }
2473 /// Err(e) => {
2474 /// db.rollback_transaction(tx_id)?;
2475 /// println!("Transfer failed: {}, changes rolled back", e);
2476 /// }
2477 /// }
2478 /// ```
2479 pub fn rollback_transaction(&self, tx_id: crate::transaction::TransactionId) -> Result<()> {
2480 self.transaction_manager.rollback(tx_id)
2481 }
2482
2483 /// Create a secondary index on a field for faster queries
2484 ///
2485 /// Indexes dramatically improve query performance for frequently accessed fields,
2486 /// trading increased memory usage and slower writes for much faster reads.
2487 ///
2488 /// # When to Create Indexes
2489 /// - **Frequent queries**: Fields used in 80%+ of your queries
2490 /// - **High cardinality**: Fields with many unique values (user_id, email)
2491 /// - **Sorting/filtering**: Fields used in ORDER BY or WHERE clauses
2492 /// - **Large collections**: Most beneficial with 10,000+ documents
2493 ///
2494 /// # When NOT to Index
2495 /// - Low cardinality fields (e.g., boolean flags, small enums)
2496 /// - Rarely queried fields
2497 /// - Fields that change frequently (write-heavy workloads)
2498 /// - Small collections (<1,000 documents) - full scans are fast enough
2499 ///
2500 /// # Performance Characteristics
2501 /// - **Query speedup**: O(n) → O(1) for equality filters
2502 /// - **Memory cost**: ~100-200 bytes per document per index
2503 /// - **Write slowdown**: ~20-30% longer insert/update times
2504 /// - **Build time**: ~5,000 docs/sec for initial indexing
2505 /// Create a new collection with the given schema
2506 ///
2507 /// # Arguments
2508 /// * `name` - Collection name
2509 /// * `fields` - Field definitions as tuples of (name, type, unique)
2510 /// - The boolean indicates whether the field has a **unique constraint**
2511 /// - Unique fields are automatically indexed
2512 /// - Non-unique fields can be indexed separately using `create_index()`
2513 ///
2514 /// # Examples
2515 /// ```no_run
2516 /// # use aurora_db::{Aurora, types::FieldType};
2517 /// # async fn example(db: &Aurora) {
2518 /// db.new_collection("users", vec![
2519 /// .first_one()
2520 /// .await?;
2521 ///
2522 /// // DON'T index 'active' - low cardinality (only 2 values: true/false)
2523 /// // A full scan is fast enough for boolean fields
2524 ///
2525 /// // DO index 'age' if you frequently query age ranges
2526 /// db.create_index("users", "age").await?;
2527 ///
2528 /// let young_users = db.query("users")
2529 /// .filter(|f| f.lt("age", 30))
2530 /// .collect()
2531 /// .await?;
2532 /// ```
2533 ///
2534 /// # Real-World Example: E-commerce Orders
2535 ///
2536 /// ```
2537 /// // Orders collection: 1 million documents
2538 /// db.new_collection("orders", vec![
2539 /// ("user_id", FieldType::String), // High cardinality
2540 /// ("status", FieldType::String), // Low cardinality (pending, shipped, delivered)
2541 /// ("created_at", FieldType::String),
2542 /// ("total", FieldType::Float),
2543 /// ])?;
2544 ///
2545 /// // Index user_id - queries like "show me my orders" are common
2546 /// db.create_index("orders", "user_id").await?; // Good choice
2547 ///
2548 /// // Query speedup: 2.5s → 0.001s
2549 /// let my_orders = db.query("orders")
2550 /// .filter(|f| f.eq("user_id", user_id))
2551 /// .collect()
2552 /// .await?;
2553 ///
2554 /// // DON'T index 'status' - only 3 possible values
2555 /// // Scanning 1M docs takes ~100ms, indexing won't help much
2556 ///
2557 /// // Index created_at if you frequently query recent orders
2558 /// db.create_index("orders", "created_at").await?; // Good for time-based queries
2559 /// ```
2560 pub async fn create_index(&self, collection: &str, field: &str) -> Result<()> {
2561 let collection_def = self.get_collection_definition(collection)?;
2562
2563 if let Some(field_def) = collection_def.fields.get(field) {
2564 if field_def.field_type == FieldType::Any {
2565 return Err(AqlError::new(
2566 ErrorCode::InvalidDefinition,
2567 "Cannot create an index on a field of type 'Any'.".to_string(),
2568 ));
2569 }
2570 } else {
2571 return Err(AqlError::new(
2572 ErrorCode::InvalidDefinition,
2573 format!(
2574 "Field '{}' not found in collection '{}'.",
2575 field, collection
2576 ),
2577 ));
2578 }
2579
2580 // Generate a default index name
2581 let index_name = format!("idx_{}_{}", collection, field);
2582
2583 // Create index definition
2584 let definition = IndexDefinition {
2585 name: index_name.clone(),
2586 collection: collection.to_string(),
2587 fields: vec![field.to_string()],
2588 index_type: IndexType::BTree,
2589 unique: false,
2590 };
2591
2592 // Create the index
2593 let index = Index::new(definition.clone());
2594
2595 // Index all existing documents in the collection
2596 let prefix = format!("{}:", collection);
2597 for result in self.cold.scan_prefix(&prefix) {
2598 if let Ok((_, data)) = result
2599 && let Ok(doc) = serde_json::from_slice::<Document>(&data)
2600 {
2601 let _ = index.insert(&doc);
2602 }
2603 }
2604
2605 // Store the index
2606 self.indices.insert(index_name, index);
2607
2608 // Store the index definition for persistence
2609 let index_key = format!("_index:{}:{}", collection, field);
2610 self.put(index_key, serde_json::to_vec(&definition)?, None)
2611 .await?;
2612
2613 Ok(())
2614 }
2615
2616 /// Query documents in a collection with filtering, sorting, and pagination
2617 ///
2618 /// Returns a `QueryBuilder` that allows fluent chaining of query operations.
2619 /// Queries use early termination for LIMIT clauses, making them extremely fast
2620 /// even on large collections (6,800x faster than naive implementations).
2621 ///
2622 /// # Performance
2623 /// - With LIMIT: O(k) where k = limit + offset (early termination!)
2624 /// - Without LIMIT: O(n) where n = matching documents
2625 /// - Uses secondary indices when available for equality filters
2626 /// - Hot cache: ~1M reads/sec, Cold storage: ~500K reads/sec
2627 ///
2628 /// # Examples
2629 ///
2630 /// ```
2631 /// use aurora_db::{Aurora, types::Value};
2632 ///
2633 /// let db = Aurora::open("mydb.db")?;
2634 ///
2635 /// // Simple equality query
2636 /// let active_users = db.query("users")
2637 /// .filter(|f| f.eq("active", Value::Bool(true)))
2638 /// .collect()
2639 /// .await?;
2640 ///
2641 /// // Range query with pagination (FAST - uses early termination!)
2642 /// let top_scorers = db.query("users")
2643 /// .filter(|f| f.gt("score", Value::Int(1000)))
2644 /// .order_by("score", false) // descending
2645 /// .limit(10)
2646 /// .offset(20)
2647 /// .collect()
2648 /// .await?;
2649 ///
2650 /// // Multiple filters
2651 /// let premium_active = db.query("users")
2652 /// .filter(|f| f.eq("tier", Value::String("premium".into())))
2653 /// .filter(|f| f.eq("active", Value::Bool(true)))
2654 /// .limit(100) // Only scans ~200 docs, not all million!
2655 /// .collect()
2656 /// .await?;
2657 ///
2658 /// // Text search in a field
2659 /// let matching = db.query("articles")
2660 /// .filter(|f| f.contains("title", "rust"))
2661 /// .collect()
2662 /// .await?;
2663 /// ```
2664 pub fn query<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
2665 QueryBuilder::new(self, collection)
2666 }
2667
2668 /// Create a search builder for full-text search
2669 ///
2670 /// # Arguments
2671 /// * `collection` - Name of the collection to search
2672 ///
2673 /// # Returns
2674 /// A `SearchBuilder` for configuring and executing searches
2675 ///
2676 /// # Examples
2677 ///
2678 /// ```
2679 /// // Search for documents containing text
2680 /// let search_results = db.search("articles")
2681 /// .field("content")
2682 /// .matching("quantum computing")
2683 /// .fuzzy(true) // Enable fuzzy matching for typo tolerance
2684 /// .collect()
2685 /// .await?;
2686 /// ```
2687 pub fn search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
2688 SearchBuilder::new(self, collection)
2689 }
2690
2691 /// Retrieve a document by ID
2692 ///
2693 /// Fast direct lookup when you know the document ID. Significantly faster
2694 /// than querying with filters when ID is known.
2695 ///
2696 /// # Performance
2697 /// - Hot cache: ~1,000,000 reads/sec (instant)
2698 /// - Cold storage: ~500,000 reads/sec (disk I/O)
2699 /// - Complexity: O(1) - constant time lookup
2700 /// - Much faster than `.query().filter(|f| f.eq("id", ...))` which is O(n)
2701 ///
2702 /// # Arguments
2703 /// * `collection` - Name of the collection to query
2704 /// * `id` - ID of the document to retrieve
2705 ///
2706 /// # Returns
2707 /// The document if found, None if not found, or an error
2708 ///
2709 /// # Examples
2710 ///
2711
2712 /// use aurora_db::{Aurora, types::Value};
2713 ///
2714 /// let db = Aurora::open("mydb.db")?;
2715 ///
2716 /// // Basic retrieval
2717 /// if let Some(user) = db.get_document("users", &user_id)? {
2718 /// println!("Found user: {}", user.id);
2719 ///
2720 /// // Access fields safely
2721 /// if let Some(Value::String(name)) = user.data.get("name") {
2722 /// println!("Name: {}", name);
2723 /// }
2724 ///
2725 /// if let Some(Value::Int(age)) = user.data.get("age") {
2726 /// println!("Age: {}", age);
2727 /// }
2728 /// } else {
2729 /// println!("User not found");
2730 /// }
2731 ///
2732 /// // Idiomatic error handling
2733 /// let user = db.get_document("users", &user_id)?
2734 /// .ok_or_else(|| AqlError::new(ErrorCode::NotFound,"User not found".into()))?;
2735 ///
2736 /// // Checking existence before operations
2737 /// if db.get_document("users", &user_id)?.is_some() {
2738 /// db.update_document("users", &user_id, vec![
2739 /// ("last_login", Value::String(chrono::Utc::now().to_rfc3339())),
2740 /// ]).await?;
2741 /// }
2742 ///
2743 /// // Batch retrieval (fetch multiple by ID)
2744 /// let user_ids = vec!["user-1", "user-2", "user-3"];
2745 /// let users: Vec<Document> = user_ids.iter()
2746 /// .filter_map(|id| db.get_document("users", id).ok().flatten())
2747 /// .collect();
2748 ///
2749 /// println!("Found {} out of {} users", users.len(), user_ids.len());
2750 /// ```
2751 ///
2752 /// # When to Use
2753 /// - You know the document ID (from insert, previous query, or URL param)
2754 /// - Need fastest possible lookup (1M reads/sec)
2755 /// - Fetching a single document
2756 ///
2757 /// # When NOT to Use
2758 /// - Searching by other fields → Use `query().filter()` instead
2759 /// - Need multiple documents by criteria → Use `query().collect()` instead
2760 /// - Don't know the ID → Use `find_by_field()` or `query()` instead
2761 pub fn get_document(&self, collection: &str, id: &str) -> Result<Option<Document>> {
2762 let key = format!("{}:{}", collection, id);
2763 if let Some(data) = self.get(&key)? {
2764 Ok(Some(serde_json::from_slice(&data)?))
2765 } else {
2766 Ok(None)
2767 }
2768 }
2769
2770 /// Delete a document by ID
2771 ///
2772 /// Permanently removes a document from storage, cache, and all indices.
2773 /// Publishes a delete event for PubSub subscribers. This operation cannot be undone.
2774 ///
2775 /// # Performance
2776 /// - Delete speed: ~50,000 deletes/sec
2777 /// - Cleans up hot cache, cold storage, primary + secondary indices
2778 /// - Triggers PubSub events for listeners
2779 ///
2780 /// # Arguments
2781 /// * `key` - Full key in format "collection:id" (e.g., "users:123")
2782 ///
2783 /// # Returns
2784 /// Success or an error
2785 ///
2786 /// # Errors
2787 /// - `InvalidOperation`: Invalid key format (must be "collection:id")
2788 /// - `IoError`: Storage deletion failed
2789 ///
2790 /// # Examples
2791 ///
2792 /// ```
2793 /// use aurora_db::Aurora;
2794 ///
2795 /// let db = Aurora::open("mydb.db")?;
2796 ///
2797 /// // Basic deletion (note: requires "collection:id" format)
2798 /// db.delete("users:abc123").await?;
2799 ///
2800 /// // Delete with existence check
2801 /// let user_id = "user-456";
2802 /// if db.get_document("users", user_id)?.is_some() {
2803 /// db.delete(&format!("users:{}", user_id)).await?;
2804 /// println!("User deleted");
2805 /// } else {
2806 /// println!("User not found");
2807 /// }
2808 ///
2809 /// // Error handling
2810 /// match db.delete("users:nonexistent").await {
2811 /// Ok(_) => println!("Deleted successfully"),
2812 /// Err(e) => println!("Delete failed: {}", e),
2813 /// }
2814 ///
2815 /// // Batch deletion using query
2816 /// let inactive_count = db.delete_where("users", |f| {
2817 /// f.eq("active", Value::Bool(false))
2818 /// }).await?;
2819 /// println!("Deleted {} inactive users", inactive_count);
2820 ///
2821 /// // Delete with cascading (manual cascade pattern)
2822 /// let user_id = "user-123";
2823 ///
2824 /// // Delete user's orders first
2825 /// let orders = db.query("orders")
2826 /// .filter(|f| f.eq("user_id", user_id))
2827 /// .collect()
2828 /// .await?;
2829 ///
2830 /// for order in orders {
2831 /// db.delete(&format!("orders:{}", order.id)).await?;
2832 /// }
2833 ///
2834 /// // Then delete the user
2835 /// db.delete(&format!("users:{}", user_id)).await?;
2836 /// println!("User and all orders deleted");
2837 /// ```
2838 ///
2839 /// # Alternative: Soft Delete Pattern
2840 ///
2841 /// For recoverable deletions, use soft deletes instead:
2842 ///
2843 /// ```
2844 /// // Soft delete - mark as deleted instead of removing
2845 /// db.update_document("users", &user_id, vec![
2846 /// ("deleted", Value::Bool(true)),
2847 /// ("deleted_at", Value::String(chrono::Utc::now().to_rfc3339())),
2848 /// ]).await?;
2849 ///
2850 /// // Query excludes soft-deleted items
2851 /// let active_users = db.query("users")
2852 /// .filter(|f| f.eq("deleted", Value::Bool(false)))
2853 /// .collect()
2854 /// .await?;
2855 ///
2856 /// // Later: hard delete after retention period
2857 /// let old_deletions = db.query("users")
2858 /// .filter(|f| f.eq("deleted", Value::Bool(true)))
2859 /// .filter(|f| f.lt("deleted_at", thirty_days_ago))
2860 /// .collect()
2861 /// .await?;
2862 ///
2863 /// for user in old_deletions {
2864 /// db.delete(&format!("users:{}", user.id)).await?;
2865 /// }
2866 /// ```
2867 ///
2868 /// # Important Notes
2869 /// - Deletion is permanent - no undo/recovery
2870 /// - Consider soft deletes for recoverable operations
2871 /// - Use transactions for multi-document deletions
2872 /// - PubSub subscribers will receive delete events
2873 /// - All indices are automatically cleaned up
2874 pub async fn delete(&self, key: &str) -> Result<()> {
2875 // Extract collection and id from key (format: "collection:id")
2876 let (collection, id) = if let Some((coll, doc_id)) = key.split_once(':') {
2877 (coll, doc_id)
2878 } else {
2879 return Err(AqlError::invalid_operation(
2880 "Invalid key format, expected 'collection:id'".to_string(),
2881 ));
2882 };
2883
2884 // CRITICAL FIX: Get document BEFORE deletion to clean up secondary indices
2885 let document = self.get_document(collection, id)?;
2886
2887 // Delete from hot cache
2888 if self.hot.get(key).is_some() {
2889 self.hot.delete(key);
2890 }
2891
2892 // Delete from cold storage
2893 self.cold.delete(key)?;
2894
2895 // CRITICAL FIX: Clean up ALL indices (primary + secondary)
2896 if let Some(doc) = document {
2897 self.remove_from_indices(collection, &doc)?;
2898 } else {
2899 // Fallback: at least remove from primary index
2900 if let Some(index) = self.primary_indices.get_mut(collection) {
2901 index.remove(id);
2902 }
2903 }
2904
2905 // Publish delete event
2906 let event = crate::pubsub::ChangeEvent::delete(collection, id);
2907 let _ = self.pubsub.publish(event);
2908
2909 Ok(())
2910 }
2911
2912 pub async fn delete_collection(&self, collection: &str) -> Result<()> {
2913 let prefix = format!("{}:", collection);
2914
2915 // Get all keys in collection
2916 let keys: Vec<String> = self
2917 .cold
2918 .scan()
2919 .filter_map(|r| r.ok())
2920 .filter(|(k, _)| k.starts_with(&prefix))
2921 .map(|(k, _)| k)
2922 .collect();
2923
2924 // Delete each key
2925 for key in keys {
2926 self.delete(&key).await?;
2927 }
2928
2929 // Remove collection indices
2930 self.primary_indices.remove(collection);
2931 self.secondary_indices
2932 .retain(|k, _| !k.starts_with(&prefix));
2933
2934 // Invalidate schema cache
2935 self.schema_cache.remove(collection);
2936
2937 Ok(())
2938 }
2939
2940 fn remove_from_indices(&self, collection: &str, doc: &Document) -> Result<()> {
2941 // Remove from primary index
2942 if let Some(index) = self.primary_indices.get(collection) {
2943 index.remove(&doc.id);
2944 }
2945
2946 // Remove from secondary indices
2947 for (field, value) in &doc.data {
2948 let index_key = format!("{}:{}", collection, field);
2949 if let Some(index) = self.secondary_indices.get(&index_key)
2950 && let Some(mut doc_ids) = index.get_mut(&value.to_string())
2951 {
2952 doc_ids.retain(|id| id != &doc.id);
2953 }
2954 }
2955
2956 Ok(())
2957 }
2958
2959 pub async fn search_text(
2960 &self,
2961 collection: &str,
2962 field: &str,
2963 query: &str,
2964 ) -> Result<Vec<Document>> {
2965 let mut results = Vec::new();
2966 let docs = self.get_all_collection(collection).await?;
2967
2968 for doc in docs {
2969 if let Some(Value::String(text)) = doc.data.get(field)
2970 && text.to_lowercase().contains(&query.to_lowercase())
2971 {
2972 results.push(doc);
2973 }
2974 }
2975
2976 Ok(results)
2977 }
2978
2979 /// Export a collection to a JSON file
2980 ///
2981 /// Creates a JSON file containing all documents in the collection.
2982 /// Useful for backups, data migration, or sharing datasets.
2983 /// Automatically appends `.json` extension if not present.
2984 ///
2985 /// # Performance
2986 /// - Export speed: ~10,000 docs/sec
2987 /// - Scans entire collection from cold storage
2988 /// - Memory efficient: streams documents to file
2989 ///
2990 /// # Arguments
2991 /// * `collection` - Name of the collection to export
2992 /// * `output_path` - Path to the output JSON file (`.json` auto-appended)
2993 ///
2994 /// # Returns
2995 /// Success or an error
2996 ///
2997 /// # Examples
2998 ///
2999 /// ```
3000 /// use aurora_db::Aurora;
3001 ///
3002 /// let db = Aurora::open("mydb.db")?;
3003 ///
3004 /// // Basic export
3005 /// db.export_as_json("users", "./backups/users_2024-01-15")?;
3006 /// // Creates: ./backups/users_2024-01-15.json
3007 ///
3008 /// // Timestamped backup
3009 /// let timestamp = chrono::Utc::now().format("%Y%m%d_%H%M%S");
3010 /// let backup_path = format!("./backups/users_{}", timestamp);
3011 /// db.export_as_json("users", &backup_path)?;
3012 ///
3013 /// // Export multiple collections
3014 /// for collection in &["users", "orders", "products"] {
3015 /// db.export_as_json(collection, &format!("./export/{}", collection))?;
3016 /// }
3017 /// ```
3018 ///
3019 /// # Output Format
3020 ///
3021 /// The exported JSON has this structure:
3022 /// ```json
3023 /// {
3024 /// "users": [
3025 /// { "id": "123", "name": "Alice", "email": "alice@example.com" },
3026 /// { "id": "456", "name": "Bob", "email": "bob@example.com" }
3027 /// ]
3028 /// }
3029 /// ```
3030 ///
3031 /// # See Also
3032 /// - `export_as_csv()` for CSV format export
3033 /// - `import_from_json()` to restore exported data
3034 pub fn export_as_json(&self, collection: &str, output_path: &str) -> Result<()> {
3035 let output_path = if !output_path.ends_with(".json") {
3036 format!("{}.json", output_path)
3037 } else {
3038 output_path.to_string()
3039 };
3040
3041 let mut docs = Vec::new();
3042
3043 // Get all documents from the specified collection
3044 for result in self.cold.scan() {
3045 let (key, value) = result?;
3046
3047 // Only process documents from the specified collection
3048 if let Some(key_collection) = key.split(':').next()
3049 && key_collection == collection
3050 && !key.starts_with("_collection:")
3051 && let Ok(doc) = serde_json::from_slice::<Document>(&value)
3052 {
3053 // Convert Value enum to raw JSON values
3054 let mut clean_doc = serde_json::Map::new();
3055 for (k, v) in doc.data {
3056 match v {
3057 Value::String(s) => clean_doc.insert(k, JsonValue::String(s)),
3058 Value::Int(i) => clean_doc.insert(k, JsonValue::Number(i.into())),
3059 Value::Float(f) => {
3060 if let Some(n) = serde_json::Number::from_f64(f) {
3061 clean_doc.insert(k, JsonValue::Number(n))
3062 } else {
3063 clean_doc.insert(k, JsonValue::Null)
3064 }
3065 }
3066 Value::Bool(b) => clean_doc.insert(k, JsonValue::Bool(b)),
3067 Value::Array(arr) => {
3068 let clean_arr: Vec<JsonValue> = arr
3069 .into_iter()
3070 .map(|v| match v {
3071 Value::String(s) => JsonValue::String(s),
3072 Value::Int(i) => JsonValue::Number(i.into()),
3073 Value::Float(f) => serde_json::Number::from_f64(f)
3074 .map(JsonValue::Number)
3075 .unwrap_or(JsonValue::Null),
3076 Value::Bool(b) => JsonValue::Bool(b),
3077 Value::Null => JsonValue::Null,
3078 _ => JsonValue::Null,
3079 })
3080 .collect();
3081 clean_doc.insert(k, JsonValue::Array(clean_arr))
3082 }
3083 Value::Uuid(u) => clean_doc.insert(k, JsonValue::String(u.to_string())),
3084 Value::Null => clean_doc.insert(k, JsonValue::Null),
3085 Value::Object(_) => None, // Handle nested objects if needed
3086 };
3087 }
3088 docs.push(JsonValue::Object(clean_doc));
3089 }
3090 }
3091
3092 let output = JsonValue::Object(serde_json::Map::from_iter(vec![(
3093 collection.to_string(),
3094 JsonValue::Array(docs),
3095 )]));
3096
3097 let mut file = StdFile::create(&output_path)?;
3098 serde_json::to_writer_pretty(&mut file, &output)?;
3099 println!("Exported collection '{}' to {}", collection, &output_path);
3100 Ok(())
3101 }
3102
3103 /// Export a collection to a CSV file
3104 ///
3105 /// Creates a CSV file with headers from the first document and rows for each document.
3106 /// Useful for spreadsheet analysis, data science workflows, or reporting.
3107 /// Automatically appends `.csv` extension if not present.
3108 ///
3109 /// # Performance
3110 /// - Export speed: ~8,000 docs/sec
3111 /// - Memory efficient: streams rows to file
3112 /// - Headers determined from first document
3113 ///
3114 /// # Arguments
3115 /// * `collection` - Name of the collection to export
3116 /// * `filename` - Path to the output CSV file (`.csv` auto-appended)
3117 ///
3118 /// # Returns
3119 /// Success or an error
3120 ///
3121 /// # Examples
3122 ///
3123 /// ```
3124 /// use aurora_db::Aurora;
3125 ///
3126 /// let db = Aurora::open("mydb.db")?;
3127 ///
3128 /// // Basic CSV export
3129 /// db.export_as_csv("users", "./reports/users")?;
3130 /// // Creates: ./reports/users.csv
3131 ///
3132 /// // Export for analysis in Excel/Google Sheets
3133 /// db.export_as_csv("orders", "./analytics/sales_data")?;
3134 ///
3135 /// // Monthly report generation
3136 /// let month = chrono::Utc::now().format("%Y-%m");
3137 /// db.export_as_csv("transactions", &format!("./reports/transactions_{}", month))?;
3138 /// ```
3139 ///
3140 /// # Output Format
3141 ///
3142 /// ```csv
3143 /// id,name,email,age
3144 /// 123,Alice,alice@example.com,28
3145 /// 456,Bob,bob@example.com,32
3146 /// ```
3147 ///
3148 /// # Important Notes
3149 /// - Headers are taken from the first document's fields
3150 /// - Documents with different fields will have empty values for missing fields
3151 /// - Nested objects/arrays are converted to strings
3152 /// - Best for flat document structures
3153 ///
3154 /// # See Also
3155 /// - `export_as_json()` for JSON format (better for nested data)
3156 /// - For complex nested structures, use JSON export instead
3157 pub fn export_as_csv(&self, collection: &str, filename: &str) -> Result<()> {
3158 let output_path = if !filename.ends_with(".csv") {
3159 format!("{}.csv", filename)
3160 } else {
3161 filename.to_string()
3162 };
3163
3164 let mut writer = csv::Writer::from_path(&output_path)?;
3165 let mut headers = Vec::new();
3166 let mut first_doc = true;
3167
3168 // Get all documents from the specified collection
3169 for result in self.cold.scan() {
3170 let (key, value) = result?;
3171
3172 // Only process documents from the specified collection
3173 if let Some(key_collection) = key.split(':').next()
3174 && key_collection == collection
3175 && !key.starts_with("_collection:")
3176 && let Ok(doc) = serde_json::from_slice::<Document>(&value)
3177 {
3178 // Write headers from first document
3179 if first_doc && !doc.data.is_empty() {
3180 headers = doc.data.keys().cloned().collect();
3181 writer.write_record(&headers)?;
3182 first_doc = false;
3183 }
3184
3185 // Write the document values
3186 let values: Vec<String> = headers
3187 .iter()
3188 .map(|header| {
3189 doc.data
3190 .get(header)
3191 .map(|v| v.to_string())
3192 .unwrap_or_default()
3193 })
3194 .collect();
3195 writer.write_record(&values)?;
3196 }
3197 }
3198
3199 writer.flush()?;
3200 println!("Exported collection '{}' to {}", collection, &output_path);
3201 Ok(())
3202 }
3203
3204 // Helper method to create filter-based queries
3205 pub fn find<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
3206 self.query(collection)
3207 }
3208
3209 // Convenience methods that build on top of the FilterBuilder
3210
3211 pub async fn find_by_id(&self, collection: &str, id: &str) -> Result<Option<Document>> {
3212 self.query(collection)
3213 .filter(|f| f.eq("id", id))
3214 .first_one()
3215 .await
3216 }
3217
3218 pub async fn find_one<F>(&self, collection: &str, filter_fn: F) -> Result<Option<Document>>
3219 where
3220 F: Fn(&FilterBuilder) -> bool + Send + Sync + 'static,
3221 {
3222 self.query(collection).filter(filter_fn).first_one().await
3223 }
3224
3225 pub async fn find_by_field<T: Into<Value> + Clone + Send + Sync + 'static>(
3226 &self,
3227 collection: &str,
3228 field: &'static str,
3229 value: T,
3230 ) -> Result<Vec<Document>> {
3231 let value_clone = value.clone();
3232 self.query(collection)
3233 .filter(move |f: &FilterBuilder| f.eq(field, value_clone.clone()))
3234 .collect()
3235 .await
3236 }
3237
3238 pub async fn find_by_fields(
3239 &self,
3240 collection: &str,
3241 fields: Vec<(&str, Value)>,
3242 ) -> Result<Vec<Document>> {
3243 let mut query = self.query(collection);
3244
3245 for (field, value) in fields {
3246 let field_owned = field.to_owned();
3247 let value_owned = value.clone();
3248 query = query.filter(move |f| f.eq(&field_owned, value_owned.clone()));
3249 }
3250
3251 query.collect().await
3252 }
3253
3254 // Advanced example: find documents with a field value in a specific range
3255 pub async fn find_in_range<T: Into<Value> + Clone + Send + Sync + 'static>(
3256 &self,
3257 collection: &str,
3258 field: &'static str,
3259 min: T,
3260 max: T,
3261 ) -> Result<Vec<Document>> {
3262 self.query(collection)
3263 .filter(move |f| f.between(field, min.clone(), max.clone()))
3264 .collect()
3265 .await
3266 }
3267
3268 // Complex query example: build with multiple combined filters
3269 pub async fn find_complex<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
3270 self.query(collection)
3271 }
3272
3273 // Create a full-text search query with added filter options
3274 pub fn advanced_search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
3275 self.search(collection)
3276 }
3277
3278 // Utility methods for common operations
3279 pub async fn upsert(
3280 &self,
3281 collection: &str,
3282 id: &str,
3283 data: Vec<(&str, Value)>,
3284 ) -> Result<String> {
3285 // Convert Vec<(&str, Value)> to HashMap<String, Value>
3286 let data_map: HashMap<String, Value> =
3287 data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
3288
3289 // Check if document exists
3290 if let Some(mut doc) = self.get_document(collection, id)? {
3291 // Clone for notification
3292 let old_doc = doc.clone();
3293
3294 // Update existing document - merge new data
3295 for (key, value) in data_map {
3296 doc.data.insert(key, value);
3297 }
3298
3299 // Validate unique constraints for the updated document
3300 // We need to exclude the current document from the uniqueness check
3301 self.validate_unique_constraints_excluding(collection, &doc.data, id)
3302 .await?;
3303
3304 self.put(
3305 format!("{}:{}", collection, id),
3306 serde_json::to_vec(&doc)?,
3307 None,
3308 )
3309 .await?;
3310
3311 // Publish update event
3312 let event = crate::pubsub::ChangeEvent::update(collection, id, old_doc, doc);
3313 let _ = self.pubsub.publish(event);
3314
3315 Ok(id.to_string())
3316 } else {
3317 // Insert new document with specified ID - validate unique constraints
3318 self.validate_unique_constraints(collection, &data_map)
3319 .await?;
3320
3321 let document = Document {
3322 id: id.to_string(),
3323 data: data_map,
3324 };
3325
3326 self.put(
3327 format!("{}:{}", collection, id),
3328 serde_json::to_vec(&document)?,
3329 None,
3330 )
3331 .await?;
3332
3333 // Publish insert event
3334 let event = crate::pubsub::ChangeEvent::insert(collection, id, document);
3335 let _ = self.pubsub.publish(event);
3336
3337 Ok(id.to_string())
3338 }
3339 }
3340
3341 // Atomic increment/decrement
3342 pub async fn increment(
3343 &self,
3344 collection: &str,
3345 id: &str,
3346 field: &str,
3347 amount: i64,
3348 ) -> Result<i64> {
3349 if let Some(mut doc) = self.get_document(collection, id)? {
3350 // Get current value
3351 let current = match doc.data.get(field) {
3352 Some(Value::Int(i)) => *i,
3353 _ => 0,
3354 };
3355
3356 // Increment
3357 let new_value = current + amount;
3358 doc.data.insert(field.to_string(), Value::Int(new_value));
3359
3360 // Save changes
3361 self.put(
3362 format!("{}:{}", collection, id),
3363 serde_json::to_vec(&doc)?,
3364 None,
3365 )
3366 .await?;
3367
3368 Ok(new_value)
3369 } else {
3370 Err(AqlError::new(
3371 ErrorCode::NotFound,
3372 format!("Document {}:{} not found", collection, id),
3373 ))
3374 }
3375 }
3376
3377 // Delete documents by query
3378 pub async fn delete_by_query<F>(&self, collection: &str, filter_fn: F) -> Result<usize>
3379 where
3380 F: Fn(&FilterBuilder) -> bool + Send + Sync + 'static,
3381 {
3382 let docs = self.query(collection).filter(filter_fn).collect().await?;
3383 let mut deleted_count = 0;
3384
3385 for doc in docs {
3386 let key = format!("{}:{}", collection, doc.id);
3387 self.delete(&key).await?;
3388 deleted_count += 1;
3389 }
3390
3391 Ok(deleted_count)
3392 }
3393
3394 /// Import documents from a JSON file into a collection
3395 ///
3396 /// Validates each document against the collection schema, skips duplicates (by ID),
3397 /// and provides detailed statistics about the import operation. Useful for restoring
3398 /// backups, migrating data, or seeding development databases.
3399 ///
3400 /// # Performance
3401 /// - Import speed: ~5,000 docs/sec (with validation)
3402 /// - Memory efficient: processes documents one at a time
3403 /// - Validates schema and unique constraints
3404 ///
3405 /// # Arguments
3406 /// * `collection` - Name of the collection to import into
3407 /// * `filename` - Path to the JSON file containing documents (array format)
3408 ///
3409 /// # Returns
3410 /// `ImportStats` containing counts of imported, skipped, and failed documents
3411 ///
3412 /// # Examples
3413 ///
3414 /// ```
3415 /// use aurora_db::Aurora;
3416 ///
3417 /// let db = Aurora::open("mydb.db")?;
3418 ///
3419 /// // Basic import
3420 /// let stats = db.import_from_json("users", "./data/new_users.json").await?;
3421 /// println!("Imported: {}, Skipped: {}, Failed: {}",
3422 /// stats.imported, stats.skipped, stats.failed);
3423 ///
3424 /// // Restore from backup
3425 /// let backup_file = "./backups/users_2024-01-15.json";
3426 /// let stats = db.import_from_json("users", backup_file).await?;
3427 ///
3428 /// if stats.failed > 0 {
3429 /// eprintln!("Warning: {} documents failed validation", stats.failed);
3430 /// }
3431 ///
3432 /// // Idempotent import - duplicates are skipped
3433 /// let stats = db.import_from_json("users", "./data/users.json").await?;
3434 /// // Running again will skip all existing documents
3435 /// let stats2 = db.import_from_json("users", "./data/users.json").await?;
3436 /// assert_eq!(stats2.skipped, stats.imported);
3437 ///
3438 /// // Migration from another system
3439 /// db.new_collection("products", vec![
3440 /// ("sku", FieldType::String),
3441 /// ("name", FieldType::String),
3442 /// ("price", FieldType::Float),
3443 /// ])?;
3444 ///
3445 /// let stats = db.import_from_json("products", "./migration/products.json").await?;
3446 /// println!("Migration complete: {} products imported", stats.imported);
3447 /// ```
3448 ///
3449 /// # Expected JSON Format
3450 ///
3451 /// The JSON file should contain an array of document objects:
3452 /// ```json
3453 /// [
3454 /// { "id": "123", "name": "Alice", "email": "alice@example.com" },
3455 /// { "id": "456", "name": "Bob", "email": "bob@example.com" },
3456 /// { "name": "Carol", "email": "carol@example.com" }
3457 /// ]
3458 /// ```
3459 ///
3460 /// # Behavior
3461 /// - Documents with existing IDs are skipped (duplicate detection)
3462 /// - Documents without IDs get auto-generated UUIDs
3463 /// - Schema validation is performed on all fields
3464 /// - Failed documents are counted but don't stop the import
3465 /// - Unique constraints are checked
3466 ///
3467 /// # See Also
3468 /// - `export_as_json()` to create compatible backup files
3469 /// - `batch_insert()` for programmatic bulk inserts
3470 pub async fn import_from_json(&self, collection: &str, filename: &str) -> Result<ImportStats> {
3471 // Validate that the collection exists
3472 let collection_def = self.get_collection_definition(collection)?;
3473
3474 // Load JSON file
3475 let json_string = read_to_string(filename).await.map_err(|e| {
3476 AqlError::new(
3477 ErrorCode::IoError,
3478 format!("Failed to read import file: {}", e),
3479 )
3480 })?;
3481
3482 // Parse JSON
3483 let documents: Vec<JsonValue> = from_str(&json_string).map_err(|e| {
3484 AqlError::new(
3485 ErrorCode::SerializationError,
3486 format!("Failed to parse JSON: {}", e),
3487 )
3488 })?;
3489
3490 let mut stats = ImportStats::default();
3491
3492 // Process each document
3493 for doc_json in documents {
3494 match self
3495 .import_document(collection, &collection_def, doc_json)
3496 .await
3497 {
3498 Ok(ImportResult::Imported) => stats.imported += 1,
3499 Ok(ImportResult::Skipped) => stats.skipped += 1,
3500 Err(_) => stats.failed += 1,
3501 }
3502 }
3503
3504 Ok(stats)
3505 }
3506
3507 /// Import a single document, performing schema validation and duplicate checking
3508 async fn import_document(
3509 &self,
3510 collection: &str,
3511 collection_def: &Collection,
3512 doc_json: JsonValue,
3513 ) -> Result<ImportResult> {
3514 if !doc_json.is_object() {
3515 return Err(AqlError::invalid_operation(
3516 "Expected JSON object".to_string(),
3517 ));
3518 }
3519
3520 // Extract document ID if present
3521 let doc_id = doc_json
3522 .get("id")
3523 .and_then(|id| id.as_str())
3524 .map(|s| s.to_string())
3525 .unwrap_or_else(|| Uuid::now_v7().to_string());
3526
3527 // Check if document with this ID already exists
3528 if self.get_document(collection, &doc_id)?.is_some() {
3529 return Ok(ImportResult::Skipped);
3530 }
3531
3532 // Convert JSON to our document format and validate against schema
3533 let mut data_map = HashMap::new();
3534
3535 if let Some(obj) = doc_json.as_object() {
3536 for (field_name, field_def) in &collection_def.fields {
3537 if let Some(json_value) = obj.get(field_name) {
3538 // Validate value against field type
3539 if !self.validate_field_value(json_value, &field_def.field_type) {
3540 return Err(AqlError::invalid_operation(format!(
3541 "Field '{}' has invalid type",
3542 field_name
3543 )));
3544 }
3545
3546 // Convert JSON value to our Value type
3547 let value = self.json_to_value(json_value)?;
3548 data_map.insert(field_name.clone(), value);
3549 } else if field_def.unique {
3550 // Missing required unique field
3551 return Err(AqlError::invalid_operation(format!(
3552 "Missing required unique field '{}'",
3553 field_name
3554 )));
3555 }
3556 }
3557 }
3558
3559 // Check for duplicates by unique fields
3560 let unique_fields = self.get_unique_fields(collection_def);
3561 for unique_field in &unique_fields {
3562 if let Some(value) = data_map.get(unique_field) {
3563 // Query for existing documents with this unique value
3564 let query_results = self
3565 .query(collection)
3566 .filter(move |f| f.eq(unique_field, value.clone()))
3567 .limit(1)
3568 .collect()
3569 .await?;
3570
3571 if !query_results.is_empty() {
3572 // Found duplicate by unique field
3573 return Ok(ImportResult::Skipped);
3574 }
3575 }
3576 }
3577
3578 // Create and insert document
3579 let document = Document {
3580 id: doc_id,
3581 data: data_map,
3582 };
3583
3584 self.put(
3585 format!("{}:{}", collection, document.id),
3586 serde_json::to_vec(&document)?,
3587 None,
3588 )
3589 .await?;
3590
3591 Ok(ImportResult::Imported)
3592 }
3593
3594 /// Validate that a JSON value matches the expected field type
3595 fn validate_field_value(&self, value: &JsonValue, field_type: &FieldType) -> bool {
3596 use crate::types::ScalarType;
3597 match field_type {
3598 FieldType::Scalar(ScalarType::String) => value.is_string(),
3599 FieldType::Scalar(ScalarType::Int) => value.is_i64() || value.is_u64(),
3600 FieldType::Scalar(ScalarType::Float) => value.is_number(),
3601 FieldType::Scalar(ScalarType::Bool) => value.is_boolean(),
3602 FieldType::Scalar(ScalarType::Uuid) => {
3603 value.is_string() && Uuid::parse_str(value.as_str().unwrap_or("")).is_ok()
3604 }
3605 FieldType::Scalar(ScalarType::Any) => true,
3606
3607 FieldType::Array(_) => value.is_array(),
3608 FieldType::Object | FieldType::Nested(_) => value.is_object(),
3609 // Legacy/Duplicate catches just in case
3610 _ => true,
3611 }
3612 }
3613
3614 /// Convert a JSON value to our internal Value type
3615 #[allow(clippy::only_used_in_recursion)]
3616 fn json_to_value(&self, json_value: &JsonValue) -> Result<Value> {
3617 match json_value {
3618 JsonValue::Null => Ok(Value::Null),
3619 JsonValue::Bool(b) => Ok(Value::Bool(*b)),
3620 JsonValue::Number(n) => {
3621 if let Some(i) = n.as_i64() {
3622 Ok(Value::Int(i))
3623 } else if let Some(f) = n.as_f64() {
3624 Ok(Value::Float(f))
3625 } else {
3626 Err(AqlError::invalid_operation(
3627 "Invalid number value".to_string(),
3628 ))
3629 }
3630 }
3631 JsonValue::String(s) => {
3632 // Try parsing as UUID first
3633 if let Ok(uuid) = Uuid::parse_str(s) {
3634 Ok(Value::Uuid(uuid))
3635 } else {
3636 Ok(Value::String(s.clone()))
3637 }
3638 }
3639 JsonValue::Array(arr) => {
3640 let mut values = Vec::new();
3641 for item in arr {
3642 values.push(self.json_to_value(item)?);
3643 }
3644 Ok(Value::Array(values))
3645 }
3646 JsonValue::Object(obj) => {
3647 let mut map = HashMap::new();
3648 for (k, v) in obj {
3649 map.insert(k.clone(), self.json_to_value(v)?);
3650 }
3651 Ok(Value::Object(map))
3652 }
3653 }
3654 }
3655
3656 /// Get collection definition
3657 pub fn get_collection_definition(&self, collection: &str) -> Result<Collection> {
3658 if let Some(data) = self.get(&format!("_collection:{}", collection))? {
3659 let collection_def: Collection = serde_json::from_slice(&data)?;
3660 Ok(collection_def)
3661 } else {
3662 Err(AqlError::new(
3663 ErrorCode::CollectionNotFound,
3664 collection.to_string(),
3665 ))
3666 }
3667 }
3668
3669 /// Get storage statistics and information about the database
3670 pub fn get_database_stats(&self) -> Result<DatabaseStats> {
3671 let hot_stats = self.hot.get_stats();
3672 let cold_stats = self.cold.get_stats()?;
3673
3674 Ok(DatabaseStats {
3675 hot_stats,
3676 cold_stats,
3677 estimated_size: self.cold.estimated_size(),
3678 collections: self.get_collection_stats()?,
3679 })
3680 }
3681
3682 /// Check if a key is currently stored in the hot cache
3683 pub fn is_in_hot_cache(&self, key: &str) -> bool {
3684 self.hot.is_hot(key)
3685 }
3686
3687 /// Clear the hot cache (useful when memory needs to be freed)
3688 pub fn clear_hot_cache(&self) {
3689 self.hot.clear();
3690 println!(
3691 "Hot cache cleared, current hit ratio: {:.2}%",
3692 self.hot.hit_ratio() * 100.0
3693 );
3694 }
3695
3696 /// Prewarm the cache by loading frequently accessed data from cold storage
3697 ///
3698 /// Loads documents from a collection into memory cache to eliminate cold-start
3699 /// latency. Dramatically improves initial query performance after database startup
3700 /// by preloading the most commonly accessed data.
3701 ///
3702 /// # Performance Impact
3703 /// - Prewarming speed: ~20,000 docs/sec
3704 /// - Improves subsequent read latency from ~2ms (disk) to ~0.001ms (memory)
3705 /// - Cache hit rate jumps from 0% to 95%+ for prewarmed data
3706 /// - Memory cost: ~500 bytes per document average
3707 ///
3708 /// # Arguments
3709 /// * `collection` - The collection to prewarm
3710 /// * `limit` - Maximum number of documents to load (default: 1000, None = all)
3711 ///
3712 /// # Returns
3713 /// Number of documents loaded into cache
3714 ///
3715 /// # Examples
3716 ///
3717 /// ```
3718 /// use aurora_db::Aurora;
3719 ///
3720 /// let db = Aurora::open("mydb.db")?;
3721 ///
3722 /// // Prewarm frequently accessed collection
3723 /// let loaded = db.prewarm_cache("users", Some(1000)).await?;
3724 /// println!("Prewarmed {} user documents", loaded);
3725 ///
3726 /// // Now queries are fast from the start
3727 /// let stats_before = db.get_cache_stats();
3728 /// let users = db.query("users").collect().await?;
3729 /// let stats_after = db.get_cache_stats();
3730 ///
3731 /// // High hit rate thanks to prewarming
3732 /// assert!(stats_after.hit_rate > 0.95);
3733 ///
3734 /// // Startup optimization pattern
3735 /// async fn startup_prewarm(db: &Aurora) -> Result<()> {
3736 /// println!("Prewarming caches...");
3737 ///
3738 /// // Prewarm most frequently accessed collections
3739 /// db.prewarm_cache("users", Some(5000)).await?;
3740 /// db.prewarm_cache("sessions", Some(1000)).await?;
3741 /// db.prewarm_cache("products", Some(500)).await?;
3742 ///
3743 /// let stats = db.get_cache_stats();
3744 /// println!("Cache prewarmed: {} entries loaded", stats.size);
3745 ///
3746 /// Ok(())
3747 /// }
3748 ///
3749 /// // Web server startup
3750 /// #[tokio::main]
3751 /// async fn main() {
3752 /// let db = Aurora::open("app.db").unwrap();
3753 ///
3754 /// // Prewarm before accepting requests
3755 /// db.prewarm_cache("users", Some(10000)).await.unwrap();
3756 ///
3757 /// // Server is now ready with hot cache
3758 /// start_web_server(db).await;
3759 /// }
3760 ///
3761 /// // Prewarm all documents (for small collections)
3762 /// let all_loaded = db.prewarm_cache("config", None).await?;
3763 /// // All config documents now in memory
3764 ///
3765 /// // Selective prewarming based on access patterns
3766 /// async fn smart_prewarm(db: &Aurora) -> Result<()> {
3767 /// // Load recent users (they're accessed most)
3768 /// db.prewarm_cache("users", Some(1000)).await?;
3769 ///
3770 /// // Load active sessions only
3771 /// let active_sessions = db.query("sessions")
3772 /// .filter(|f| f.eq("active", Value::Bool(true)))
3773 /// .limit(500)
3774 /// .collect()
3775 /// .await?;
3776 ///
3777 /// // Manually populate cache with hot data
3778 /// for session in active_sessions {
3779 /// // Reading automatically caches
3780 /// db.get_document("sessions", &session.id)?;
3781 /// }
3782 ///
3783 /// Ok(())
3784 /// }
3785 /// ```
3786 ///
3787 /// # Typical Prewarming Scenarios
3788 ///
3789 /// **Web Application Startup:**
3790 /// ```
3791 /// // Load user data, sessions, and active content
3792 /// db.prewarm_cache("users", Some(5000)).await?;
3793 /// db.prewarm_cache("sessions", Some(2000)).await?;
3794 /// db.prewarm_cache("posts", Some(1000)).await?;
3795 /// ```
3796 ///
3797 /// **E-commerce Site:**
3798 /// ```
3799 /// // Load products, categories, and user carts
3800 /// db.prewarm_cache("products", Some(500)).await?;
3801 /// db.prewarm_cache("categories", None).await?; // All categories
3802 /// db.prewarm_cache("active_carts", Some(1000)).await?;
3803 /// ```
3804 ///
3805 /// **API Server:**
3806 /// ```
3807 /// // Load authentication data and rate limits
3808 /// db.prewarm_cache("api_keys", None).await?;
3809 /// db.prewarm_cache("rate_limits", Some(10000)).await?;
3810 /// ```
3811 ///
3812 /// # When to Use
3813 /// - At application startup to eliminate cold-start latency
3814 /// - After cache clear operations
3815 /// - Before high-traffic events (product launches, etc.)
3816 /// - When deploying new instances (load balancer warm-up)
3817 ///
3818 /// # Memory Considerations
3819 /// - 1,000 docs ≈ 500 KB memory
3820 /// - 10,000 docs ≈ 5 MB memory
3821 /// - 100,000 docs ≈ 50 MB memory
3822 /// - Stay within configured cache capacity
3823 ///
3824 /// # See Also
3825 /// - `get_cache_stats()` to monitor cache effectiveness
3826 /// - `prewarm_all_collections()` to prewarm all collections
3827 /// - `Aurora::with_config()` to adjust cache capacity
3828 pub async fn prewarm_cache(&self, collection: &str, limit: Option<usize>) -> Result<usize> {
3829 let limit = limit.unwrap_or(1000);
3830 let prefix = format!("{}:", collection);
3831 let mut loaded = 0;
3832
3833 for entry in self.cold.scan_prefix(&prefix) {
3834 if loaded >= limit {
3835 break;
3836 }
3837
3838 if let Ok((key, value)) = entry {
3839 // Load into hot cache
3840 self.hot.set(Arc::new(key.clone()), Arc::new(value), None);
3841 loaded += 1;
3842 }
3843 }
3844
3845 println!("Prewarmed {} with {} documents", collection, loaded);
3846 Ok(loaded)
3847 }
3848
3849 /// Prewarm cache for all collections
3850 pub async fn prewarm_all_collections(
3851 &self,
3852 docs_per_collection: Option<usize>,
3853 ) -> Result<HashMap<String, usize>> {
3854 let mut stats = HashMap::new();
3855
3856 // Get all collections
3857 let collections: Vec<String> = self
3858 .cold
3859 .scan()
3860 .filter_map(|r| r.ok())
3861 .map(|(k, _)| k)
3862 .filter(|k| k.starts_with("_collection:"))
3863 .map(|k| k.trim_start_matches("_collection:").to_string())
3864 .collect();
3865
3866 for collection in collections {
3867 let loaded = self.prewarm_cache(&collection, docs_per_collection).await?;
3868 stats.insert(collection, loaded);
3869 }
3870
3871 Ok(stats)
3872 }
3873
3874 /// Store multiple key-value pairs efficiently in a single batch operation
3875 ///
3876 /// Low-level batch write operation that bypasses document validation and
3877 /// writes raw byte data directly to storage. Useful for advanced use cases,
3878 /// custom serialization, or maximum performance scenarios.
3879 ///
3880 /// # Performance
3881 /// - Write speed: ~100,000 writes/sec
3882 /// - Single disk fsync for entire batch
3883 /// - No validation or schema checking
3884 /// - Direct storage access
3885 ///
3886 /// # Arguments
3887 /// * `pairs` - Vector of (key, value) tuples where value is raw bytes
3888 ///
3889 /// # Returns
3890 /// Success or an error
3891 ///
3892 /// # Examples
3893 ///
3894 /// ```
3895 /// use aurora_db::Aurora;
3896 ///
3897 /// let db = Aurora::open("mydb.db")?;
3898 ///
3899 /// // Low-level batch write
3900 /// let pairs = vec![
3901 /// ("users:123".to_string(), b"raw data 1".to_vec()),
3902 /// ("users:456".to_string(), b"raw data 2".to_vec()),
3903 /// ("cache:key1".to_string(), b"cached value".to_vec()),
3904 /// ];
3905 ///
3906 /// db.batch_write(pairs)?;
3907 ///
3908 /// // Custom binary serialization
3909 /// use bincode;
3910 ///
3911 /// #[derive(Serialize, Deserialize)]
3912 /// struct CustomData {
3913 /// id: u64,
3914 /// payload: Vec<u8>,
3915 /// }
3916 ///
3917 /// let custom_data = vec![
3918 /// CustomData { id: 1, payload: vec![1, 2, 3] },
3919 /// CustomData { id: 2, payload: vec![4, 5, 6] },
3920 /// ];
3921 ///
3922 /// let pairs: Vec<(String, Vec<u8>)> = custom_data
3923 /// .iter()
3924 /// .map(|data| {
3925 /// let key = format!("binary:{}", data.id);
3926 /// let value = bincode::serialize(data).unwrap();
3927 /// (key, value)
3928 /// })
3929 /// .collect();
3930 ///
3931 /// db.batch_write(pairs)?;
3932 ///
3933 /// // Bulk cache population
3934 /// let cache_entries: Vec<(String, Vec<u8>)> = (0..10000)
3935 /// .map(|i| {
3936 /// let key = format!("cache:item_{}", i);
3937 /// let value = format!("value_{}", i).into_bytes();
3938 /// (key, value)
3939 /// })
3940 /// .collect();
3941 ///
3942 /// db.batch_write(cache_entries)?;
3943 /// // Writes 10,000 entries in ~100ms
3944 /// ```
3945 ///
3946 /// # Important Notes
3947 /// - No schema validation performed
3948 /// - No unique constraint checking
3949 /// - No automatic indexing
3950 /// - Keys must follow "collection:id" format for proper grouping
3951 /// - Values are raw bytes - you handle serialization
3952 /// - Use `batch_insert()` for validated document inserts
3953 ///
3954 /// # When to Use
3955 /// - Maximum write performance needed
3956 /// - Custom serialization formats (bincode, msgpack, etc.)
3957 /// - Cache population
3958 /// - Low-level database operations
3959 /// - You're bypassing the document model
3960 ///
3961 /// # When NOT to Use
3962 /// - Regular document inserts → Use `batch_insert()` instead
3963 /// - Need validation → Use `batch_insert()` instead
3964 /// - Need indexing → Use `batch_insert()` instead
3965 ///
3966 /// # See Also
3967 /// - `batch_insert()` for validated document batch inserts
3968 /// - `put()` for single key-value writes
3969 pub async fn batch_write(&self, pairs: Vec<(String, Vec<u8>)>) -> Result<()> {
3970 // Group pairs by collection name
3971 let mut collections: HashMap<String, Vec<(String, Vec<u8>)>> = HashMap::new();
3972 for (key, value) in &pairs {
3973 if let Some(collection_name) = key.split(':').next() {
3974 collections
3975 .entry(collection_name.to_string())
3976 .or_default()
3977 .push((key.clone(), value.clone()));
3978 }
3979 }
3980
3981 // First, do the batch write to cold storage for all pairs
3982 self.cold.batch_set(pairs)?;
3983
3984 // Then, process each collection for in-memory updates
3985 for (collection_name, batch) in collections {
3986 // --- Optimized Batch Indexing ---
3987
3988 // 1. Get schema once for the entire collection batch
3989 let collection_obj = match self.schema_cache.get(&collection_name) {
3990 Some(cached_schema) => Arc::clone(cached_schema.value()),
3991 None => {
3992 let collection_key = format!("_collection:{}", collection_name);
3993 match self.get(&collection_key)? {
3994 Some(data) => {
3995 let obj: Collection = serde_json::from_slice(&data)?;
3996 let arc_obj = Arc::new(obj);
3997 self.schema_cache
3998 .insert(collection_name.to_string(), Arc::clone(&arc_obj));
3999 arc_obj
4000 }
4001 None => continue,
4002 }
4003 }
4004 };
4005
4006 let indexed_fields: Vec<String> = collection_obj
4007 .fields
4008 .iter()
4009 .filter(|(_, def)| def.indexed || def.unique)
4010 .map(|(name, _)| name.clone())
4011 .collect();
4012
4013 let primary_index = self
4014 .primary_indices
4015 .entry(collection_name.to_string())
4016 .or_default();
4017
4018 for (key, value) in batch {
4019 // 2. Update hot cache
4020 if self.should_cache_key(&key) {
4021 self.hot
4022 .set(Arc::new(key.clone()), Arc::new(value.clone()), None);
4023 }
4024
4025 // 3. Update primary index with metadata only
4026 let location = DiskLocation::new(value.len());
4027 primary_index.insert(key.clone(), location);
4028
4029 // 4. Update secondary indices
4030 if !indexed_fields.is_empty()
4031 && let Ok(doc) = serde_json::from_slice::<Document>(&value)
4032 {
4033 for (field, field_value) in doc.data {
4034 if indexed_fields.contains(&field) {
4035 let value_str = match &field_value {
4036 Value::String(s) => s.clone(),
4037 _ => field_value.to_string(),
4038 };
4039 let index_key = format!("{}:{}", collection_name, field);
4040 let secondary_index =
4041 self.secondary_indices.entry(index_key).or_default();
4042
4043 let max_entries = self.config.max_index_entries_per_field;
4044 secondary_index
4045 .entry(value_str)
4046 .and_modify(|doc_ids| {
4047 if doc_ids.len() < max_entries {
4048 doc_ids.push(key.to_string());
4049 }
4050 })
4051 .or_insert_with(|| vec![key.to_string()]);
4052 }
4053 }
4054 }
4055 }
4056 }
4057
4058 Ok(())
4059 }
4060
4061 /// Scan for keys with a specific prefix
4062 pub fn scan_with_prefix(
4063 &self,
4064 prefix: &str,
4065 ) -> impl Iterator<Item = Result<(String, Vec<u8>)>> + '_ {
4066 self.cold.scan_prefix(prefix)
4067 }
4068
4069 /// Get storage efficiency metrics for the database
4070 pub fn get_collection_stats(&self) -> Result<HashMap<String, CollectionStats>> {
4071 let mut stats = HashMap::new();
4072
4073 // Scan all collections
4074 let collections: Vec<String> = self
4075 .cold
4076 .scan()
4077 .filter_map(|r| r.ok())
4078 .map(|(k, _)| k)
4079 .filter(|k| k.starts_with("_collection:"))
4080 .map(|k| k.trim_start_matches("_collection:").to_string())
4081 .collect();
4082
4083 for collection in collections {
4084 // Use primary index for fast stats (count + size from DiskLocation)
4085 let (count, size) = if let Some(index) = self.primary_indices.get(&collection) {
4086 let count = index.len();
4087 // Sum up sizes from DiskLocation metadata (much faster than disk scan)
4088 let size: usize = index.iter().map(|entry| entry.value().size as usize).sum();
4089 (count, size)
4090 } else {
4091 // Fallback: scan from cold storage if index not available
4092 let prefix = format!("{}:", collection);
4093 let count = self.cold.scan_prefix(&prefix).count();
4094 let size: usize = self
4095 .cold
4096 .scan_prefix(&prefix)
4097 .filter_map(|r| r.ok())
4098 .map(|(_, v)| v.len())
4099 .sum();
4100 (count, size)
4101 };
4102
4103 stats.insert(
4104 collection,
4105 CollectionStats {
4106 count,
4107 size_bytes: size,
4108 avg_doc_size: if count > 0 { size / count } else { 0 },
4109 },
4110 );
4111 }
4112
4113 Ok(stats)
4114 }
4115
4116 /// Search for documents by exact value using an index
4117 ///
4118 /// This method performs a fast lookup using a pre-created index
4119 pub fn search_by_value(
4120 &self,
4121 collection: &str,
4122 field: &str,
4123 value: &Value,
4124 ) -> Result<Vec<Document>> {
4125 let index_key = format!("_index:{}:{}", collection, field);
4126
4127 if let Some(index_data) = self.get(&index_key)? {
4128 let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
4129 let index = Index::new(index_def);
4130
4131 // Use the previously unused search method
4132 if let Some(doc_ids) = index.search(value) {
4133 // Load the documents by ID
4134 let mut docs = Vec::new();
4135 for id in doc_ids {
4136 if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
4137 let doc: Document = serde_json::from_slice(&doc_data)?;
4138 docs.push(doc);
4139 }
4140 }
4141 return Ok(docs);
4142 }
4143 }
4144
4145 // Return empty result if no index or no matches
4146 Ok(Vec::new())
4147 }
4148
4149 /// Perform a full-text search on an indexed text field
4150 ///
4151 /// This provides more advanced text search capabilities including
4152 /// relevance ranking of results
4153 pub fn full_text_search(
4154 &self,
4155 collection: &str,
4156 field: &str,
4157 query: &str,
4158 ) -> Result<Vec<Document>> {
4159 let index_key = format!("_index:{}:{}", collection, field);
4160
4161 if let Some(index_data) = self.get(&index_key)? {
4162 let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
4163
4164 // Ensure this is a full-text index
4165 if !matches!(index_def.index_type, IndexType::FullText) {
4166 return Err(AqlError::invalid_operation(format!(
4167 "Field '{}' is not indexed as full-text",
4168 field
4169 )));
4170 }
4171
4172 let index = Index::new(index_def);
4173
4174 // Use the previously unused search_text method
4175 if let Some(doc_id_scores) = index.search_text(query) {
4176 // Load the documents by ID, preserving score order
4177 let mut docs = Vec::new();
4178 for (id, _score) in doc_id_scores {
4179 if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
4180 let doc: Document = serde_json::from_slice(&doc_data)?;
4181 docs.push(doc);
4182 }
4183 }
4184 return Ok(docs);
4185 }
4186 }
4187
4188 // Return empty result if no index or no matches
4189 Ok(Vec::new())
4190 }
4191
4192 /// Create a full-text search index on a text field
4193 pub async fn create_text_index(
4194 &self,
4195 collection: &str,
4196 field: &str,
4197 _enable_stop_words: bool,
4198 ) -> Result<()> {
4199 // Check if collection exists
4200 if self.get(&format!("_collection:{}", collection))?.is_none() {
4201 return Err(AqlError::new(
4202 ErrorCode::CollectionNotFound,
4203 collection.to_string(),
4204 ));
4205 }
4206
4207 // Create index definition
4208 let index_def = IndexDefinition {
4209 name: format!("{}_{}_fulltext", collection, field),
4210 collection: collection.to_string(),
4211 fields: vec![field.to_string()],
4212 index_type: IndexType::FullText,
4213 unique: false,
4214 };
4215
4216 // Store index definition
4217 let index_key = format!("_index:{}:{}", collection, field);
4218 self.put(index_key, serde_json::to_vec(&index_def)?, None)
4219 .await?;
4220
4221 // Create the actual index
4222 let index = Index::new(index_def);
4223
4224 // Index all existing documents in the collection
4225 let prefix = format!("{}:", collection);
4226 for (_, data) in self.cold.scan_prefix(&prefix).flatten() {
4227 let doc: Document = serde_json::from_slice(&data)?;
4228 index.insert(&doc)?;
4229 }
4230
4231 Ok(())
4232 }
4233
4234 pub async fn execute_simple_query(
4235 &self,
4236 builder: &SimpleQueryBuilder,
4237 ) -> Result<Vec<Document>> {
4238 // Ensure indices are initialized
4239 self.ensure_indices_initialized().await?;
4240
4241 // A place to store the IDs of the documents we need to fetch
4242 let mut doc_ids_to_load: Option<Vec<String>> = None;
4243
4244 // --- The "Query Planner" ---
4245 // Smart heuristic: For range queries with small LIMITs, full scan can be faster
4246 // than collecting millions of IDs from secondary index
4247 let use_index_for_range = if let Some(limit) = builder.limit {
4248 // If limit is small (< 1000), prefer full scan for range queries
4249 // The secondary index would scan all entries anyway, might as well
4250 // scan documents directly and benefit from early termination
4251 limit >= 1000
4252 } else {
4253 // No limit? Index might still help if result set is small
4254 true
4255 };
4256
4257 // Look for an opportunity to use an index
4258 for (_filter_idx, filter) in builder.filters.iter().enumerate() {
4259 match filter {
4260 Filter::Eq(field, value) => {
4261 let index_key = format!("{}:{}", &builder.collection, field);
4262
4263 // Do we have a secondary index for this field?
4264 if let Some(index) = self.secondary_indices.get(&index_key) {
4265 // Yes! Let's use it.
4266 if let Some(matching_ids) = index.get(&value.to_string()) {
4267 doc_ids_to_load = Some(matching_ids.clone());
4268 break; // Stop searching for other indexes for now
4269 }
4270 }
4271 }
4272 Filter::Gt(field, value)
4273 | Filter::Gte(field, value)
4274 | Filter::Lt(field, value)
4275 | Filter::Lte(field, value) => {
4276 // Skip index for range queries with small LIMITs (see query planner heuristic above)
4277 if !use_index_for_range {
4278 continue;
4279 }
4280
4281 let index_key = format!("{}:{}", &builder.collection, field);
4282
4283 // Do we have a secondary index for this field?
4284 if let Some(index) = self.secondary_indices.get(&index_key) {
4285 // For range queries, we need to scan through the index values
4286 let mut matching_ids = Vec::new();
4287
4288 for entry in index.iter() {
4289 let index_value_str = entry.key();
4290
4291 // Try to parse the index value to compare with our filter value
4292 if let Ok(index_value) =
4293 self.parse_value_from_string(index_value_str, value)
4294 {
4295 let matches = match filter {
4296 Filter::Gt(_, filter_val) => index_value > *filter_val,
4297 Filter::Gte(_, filter_val) => index_value >= *filter_val,
4298 Filter::Lt(_, filter_val) => index_value < *filter_val,
4299 Filter::Lte(_, filter_val) => index_value <= *filter_val,
4300 _ => false,
4301 };
4302
4303 if matches {
4304 matching_ids.extend(entry.value().clone());
4305 }
4306 }
4307 }
4308
4309 if !matching_ids.is_empty() {
4310 doc_ids_to_load = Some(matching_ids);
4311 break;
4312 }
4313 }
4314 }
4315 Filter::Contains(field, search_term) => {
4316 let index_key = format!("{}:{}", &builder.collection, field);
4317
4318 // Do we have a secondary index for this field?
4319 if let Some(index) = self.secondary_indices.get(&index_key) {
4320 let mut matching_ids = Vec::new();
4321
4322 for entry in index.iter() {
4323 let index_value_str = entry.key();
4324
4325 // Check if this indexed value contains our search term
4326 if index_value_str
4327 .to_lowercase()
4328 .contains(&search_term.to_lowercase())
4329 {
4330 matching_ids.extend(entry.value().clone());
4331 }
4332 }
4333
4334 if !matching_ids.is_empty() {
4335 // Remove duplicates since a document could match multiple indexed values
4336 matching_ids.sort();
4337 matching_ids.dedup();
4338
4339 doc_ids_to_load = Some(matching_ids);
4340 break;
4341 }
4342 }
4343 }
4344 Filter::And(_) => {
4345 // For compound filters, we can't easily use a single index
4346 // This would require more complex query planning
4347 continue;
4348 }
4349 Filter::Or(sub_filters) => {
4350 // For OR filters, collect union of all matching IDs from each sub-filter
4351 let mut union_ids: Vec<String> = Vec::new();
4352 let mut used_index = false;
4353
4354 for sub_filter in sub_filters {
4355 match sub_filter {
4356 Filter::Eq(field, value) => {
4357 let index_key = format!("{}:{}", &builder.collection, field);
4358 if let Some(index) = self.secondary_indices.get(&index_key) {
4359 if let Some(matching_ids) = index.get(&value.to_string()) {
4360 union_ids.extend(matching_ids.clone());
4361 used_index = true;
4362 }
4363 }
4364 }
4365 // For other filter types in OR, we fall back to full scan
4366 _ => continue,
4367 }
4368 }
4369
4370 if used_index && !union_ids.is_empty() {
4371 // Remove duplicates
4372 union_ids.sort();
4373 union_ids.dedup();
4374 doc_ids_to_load = Some(union_ids);
4375 break;
4376 }
4377 // If no index was used, continue to full scan
4378 }
4379 }
4380 }
4381
4382 let mut final_docs: Vec<Document>;
4383
4384 if let Some(ids) = doc_ids_to_load {
4385 // Index path
4386 use std::io::Write;
4387 if let Ok(mut file) = std::fs::OpenOptions::new()
4388 .create(true)
4389 .append(true)
4390 .open("/tmp/aurora_query_stats.log")
4391 {
4392 let _ = writeln!(
4393 file,
4394 "[INDEX PATH] IDs to load: {} | Collection: {}",
4395 ids.len(),
4396 builder.collection
4397 );
4398 }
4399
4400 final_docs = Vec::with_capacity(ids.len());
4401
4402 for id in ids {
4403 let doc_key = format!("{}:{}", &builder.collection, id);
4404 if let Some(data) = self.get(&doc_key)?
4405 && let Ok(doc) = serde_json::from_slice::<Document>(&data)
4406 {
4407 final_docs.push(doc);
4408 }
4409 }
4410 } else {
4411 // --- Path 2: Full Collection Scan with Early Termination ---
4412
4413 // Optimization: If we have a LIMIT but no ORDER BY, we can stop scanning
4414 // as soon as we have enough matching documents
4415 let early_termination_target = if builder.order_by.is_none() {
4416 builder.limit.map(|l| l + builder.offset.unwrap_or(0))
4417 } else {
4418 // With ORDER BY, we need all matching docs to sort correctly
4419 None
4420 };
4421
4422 // Smart scan with early termination support
4423 final_docs = Vec::new();
4424 let mut scan_stats = (0usize, 0usize, 0usize); // (keys_scanned, docs_fetched, matches_found)
4425
4426 if let Some(index) = self.primary_indices.get(&builder.collection) {
4427 for entry in index.iter() {
4428 let key = entry.key();
4429 scan_stats.0 += 1; // keys scanned
4430
4431 // Early termination check
4432 if let Some(target) = early_termination_target {
4433 if final_docs.len() >= target {
4434 break; // We have enough documents!
4435 }
4436 }
4437
4438 // Fetch and filter document
4439 if let Some(data) = self.get(key)? {
4440 scan_stats.1 += 1; // docs fetched
4441
4442 if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
4443 // Apply all filters
4444 let matches_all_filters =
4445 builder.filters.iter().all(|filter| match filter {
4446 Filter::Eq(field, value) => doc.data.get(field) == Some(value),
4447 Filter::Gt(field, value) => {
4448 doc.data.get(field).is_some_and(|v| v > value)
4449 }
4450 Filter::Gte(field, value) => {
4451 doc.data.get(field).is_some_and(|v| v >= value)
4452 }
4453 Filter::Lt(field, value) => {
4454 doc.data.get(field).is_some_and(|v| v < value)
4455 }
4456 Filter::Lte(field, value) => {
4457 doc.data.get(field).is_some_and(|v| v <= value)
4458 }
4459 Filter::Contains(field, value_str) => {
4460 doc.data.get(field).is_some_and(|v| match v {
4461 Value::String(s) => s.contains(value_str),
4462 Value::Array(arr) => {
4463 arr.contains(&Value::String(value_str.clone()))
4464 }
4465 _ => false,
4466 })
4467 }
4468 Filter::And(filters) => filters.iter().all(|f| f.matches(&doc)),
4469 Filter::Or(filters) => filters.iter().any(|f| f.matches(&doc)),
4470 });
4471
4472 if matches_all_filters {
4473 scan_stats.2 += 1; // matches found
4474 final_docs.push(doc);
4475 }
4476 }
4477 }
4478 }
4479
4480 // Debug logging for query performance analysis
4481 use std::io::Write;
4482 if let Ok(mut file) = std::fs::OpenOptions::new()
4483 .create(true)
4484 .append(true)
4485 .open("/tmp/aurora_query_stats.log")
4486 {
4487 let _ = writeln!(
4488 file,
4489 "[SCAN PATH] Scanned: {} keys | Fetched: {} docs | Matched: {} | Collection: {}",
4490 scan_stats.0, scan_stats.1, scan_stats.2, builder.collection
4491 );
4492 }
4493 } else {
4494 // Fallback: scan from cold storage if index not initialized
4495 final_docs = self.get_all_collection(&builder.collection).await?;
4496
4497 // Apply filters
4498 final_docs.retain(|doc| {
4499 builder.filters.iter().all(|filter| match filter {
4500 Filter::Eq(field, value) => doc.data.get(field) == Some(value),
4501 Filter::Gt(field, value) => doc.data.get(field).is_some_and(|v| v > value),
4502 Filter::Gte(field, value) => {
4503 doc.data.get(field).is_some_and(|v| v >= value)
4504 }
4505 Filter::Lt(field, value) => doc.data.get(field).is_some_and(|v| v < value),
4506 Filter::Lte(field, value) => {
4507 doc.data.get(field).is_some_and(|v| v <= value)
4508 }
4509 Filter::Contains(field, value_str) => {
4510 doc.data.get(field).is_some_and(|v| match v {
4511 Value::String(s) => s.contains(value_str),
4512 Value::Array(arr) => {
4513 arr.contains(&Value::String(value_str.clone()))
4514 }
4515 _ => false,
4516 })
4517 }
4518 Filter::And(filters) => filters.iter().all(|f| f.matches(doc)),
4519 Filter::Or(filters) => filters.iter().any(|f| f.matches(doc)),
4520 })
4521 });
4522 }
4523 }
4524
4525 // Apply ordering
4526 if let Some((field, ascending)) = &builder.order_by {
4527 final_docs.sort_by(|a, b| match (a.data.get(field), b.data.get(field)) {
4528 (Some(v1), Some(v2)) => {
4529 let cmp = v1.cmp(v2);
4530 if *ascending { cmp } else { cmp.reverse() }
4531 }
4532 (None, Some(_)) => std::cmp::Ordering::Less,
4533 (Some(_), None) => std::cmp::Ordering::Greater,
4534 (None, None) => std::cmp::Ordering::Equal,
4535 });
4536 }
4537
4538 // Apply offset and limit
4539 let start = builder.offset.unwrap_or(0);
4540 let end = builder
4541 .limit
4542 .map(|l| start.saturating_add(l))
4543 .unwrap_or(final_docs.len());
4544
4545 let end = end.min(final_docs.len());
4546 Ok(final_docs.get(start..end).unwrap_or(&[]).to_vec())
4547 }
4548
4549 /// Helper method to parse a string value back to a Value for comparison
4550 fn parse_value_from_string(&self, value_str: &str, reference_value: &Value) -> Result<Value> {
4551 match reference_value {
4552 Value::Int(_) => {
4553 if let Ok(i) = value_str.parse::<i64>() {
4554 Ok(Value::Int(i))
4555 } else {
4556 Err(AqlError::invalid_operation(
4557 "Failed to parse int".to_string(),
4558 ))
4559 }
4560 }
4561 Value::Float(_) => {
4562 if let Ok(f) = value_str.parse::<f64>() {
4563 Ok(Value::Float(f))
4564 } else {
4565 Err(AqlError::invalid_operation(
4566 "Failed to parse float".to_string(),
4567 ))
4568 }
4569 }
4570 Value::String(_) => Ok(Value::String(value_str.to_string())),
4571 _ => Ok(Value::String(value_str.to_string())),
4572 }
4573 }
4574
4575 pub async fn execute_dynamic_query(
4576 &self,
4577 collection: &str,
4578 payload: &QueryPayload,
4579 ) -> Result<Vec<Document>> {
4580 let mut docs = self.get_all_collection(collection).await?;
4581
4582 // 1. Apply Filters
4583 if let Some(filters) = &payload.filters {
4584 docs.retain(|doc| {
4585 filters.iter().all(|filter| {
4586 doc.data
4587 .get(&filter.field)
4588 .is_some_and(|doc_val| check_filter(doc_val, filter))
4589 })
4590 });
4591 }
4592
4593 // 2. Apply Sorting
4594 if let Some(sort_options) = &payload.sort {
4595 docs.sort_by(|a, b| {
4596 let a_val = a.data.get(&sort_options.field);
4597 let b_val = b.data.get(&sort_options.field);
4598 let ordering = a_val
4599 .partial_cmp(&b_val)
4600 .unwrap_or(std::cmp::Ordering::Equal);
4601 if sort_options.ascending {
4602 ordering
4603 } else {
4604 ordering.reverse()
4605 }
4606 });
4607 }
4608
4609 // 3. Apply Pagination
4610 if let Some(offset) = payload.offset {
4611 docs = docs.into_iter().skip(offset).collect();
4612 }
4613 if let Some(limit) = payload.limit {
4614 docs = docs.into_iter().take(limit).collect();
4615 }
4616
4617 // 4. Apply Field Selection (Projection)
4618 if let Some(select_fields) = &payload.select
4619 && !select_fields.is_empty()
4620 {
4621 docs = docs
4622 .into_iter()
4623 .map(|mut doc| {
4624 doc.data.retain(|key, _| select_fields.contains(key));
4625 doc
4626 })
4627 .collect();
4628 }
4629
4630 Ok(docs)
4631 }
4632
4633 pub async fn process_network_request(
4634 &self,
4635 request: crate::network::protocol::Request,
4636 ) -> crate::network::protocol::Response {
4637 use crate::network::protocol::Response;
4638
4639 match request {
4640 crate::network::protocol::Request::Get(key) => match self.get(&key) {
4641 Ok(value) => Response::Success(value),
4642 Err(e) => Response::Error(e.to_string()),
4643 },
4644 crate::network::protocol::Request::Put(key, value) => {
4645 match self.put(key, value, None).await {
4646 Ok(_) => Response::Done,
4647 Err(e) => Response::Error(e.to_string()),
4648 }
4649 }
4650 crate::network::protocol::Request::Delete(key) => match self.delete(&key).await {
4651 Ok(_) => Response::Done,
4652 Err(e) => Response::Error(e.to_string()),
4653 },
4654 crate::network::protocol::Request::NewCollection { name, fields } => {
4655 let fields_for_db: Vec<(String, crate::types::FieldType, bool)> = fields
4656 .iter()
4657 .map(|(name, ft, unique)| (name.clone(), ft.clone(), *unique))
4658 .collect();
4659
4660 match self.new_collection(&name, fields_for_db).await {
4661 Ok(_) => Response::Done,
4662 Err(e) => Response::Error(e.to_string()),
4663 }
4664 }
4665 crate::network::protocol::Request::Insert { collection, data } => {
4666 match self.insert_map(&collection, data).await {
4667 Ok(id) => Response::Message(id),
4668 Err(e) => Response::Error(e.to_string()),
4669 }
4670 }
4671 crate::network::protocol::Request::GetDocument { collection, id } => {
4672 match self.get_document(&collection, &id) {
4673 Ok(doc) => Response::Document(doc),
4674 Err(e) => Response::Error(e.to_string()),
4675 }
4676 }
4677 crate::network::protocol::Request::Query(builder) => {
4678 match self.execute_simple_query(&builder).await {
4679 Ok(docs) => Response::Documents(docs),
4680 Err(e) => Response::Error(e.to_string()),
4681 }
4682 }
4683 crate::network::protocol::Request::BeginTransaction => {
4684 let tx_id = self.begin_transaction();
4685 Response::TransactionId(tx_id.as_u64())
4686 }
4687 crate::network::protocol::Request::CommitTransaction(tx_id_u64) => {
4688 let tx_id = crate::transaction::TransactionId::from_u64(tx_id_u64);
4689 match self.commit_transaction(tx_id) {
4690 Ok(_) => Response::Done,
4691 Err(e) => Response::Error(e.to_string()),
4692 }
4693 }
4694 crate::network::protocol::Request::RollbackTransaction(tx_id_u64) => {
4695 let tx_id = crate::transaction::TransactionId::from_u64(tx_id_u64);
4696 match self.rollback_transaction(tx_id) {
4697 Ok(_) => Response::Done,
4698 Err(e) => Response::Error(e.to_string()),
4699 }
4700 }
4701 }
4702 }
4703
4704 /// Create indices for commonly queried fields automatically
4705 ///
4706 /// This is a convenience method that creates indices for fields that are
4707 /// likely to be queried frequently, improving performance.
4708 ///
4709 /// # Arguments
4710 /// * `collection` - Name of the collection
4711 /// * `fields` - List of field names to create indices for
4712 ///
4713 /// # Examples
4714 /// ```
4715 /// // Create indices for commonly queried fields
4716 /// db.create_indices("users", &["email", "status", "created_at"]).await?;
4717 /// ```
4718 pub async fn create_indices(&self, collection: &str, fields: &[&str]) -> Result<()> {
4719 for field in fields {
4720 if let Err(e) = self.create_index(collection, field).await {
4721 eprintln!(
4722 "Warning: Failed to create index for {}.{}: {}",
4723 collection, field, e
4724 );
4725 } else {
4726 println!("Created index for {}.{}", collection, field);
4727 }
4728 }
4729 Ok(())
4730 }
4731
4732 /// Get index statistics for a collection
4733 ///
4734 /// This helps understand which indices exist and how effective they are.
4735 pub fn get_index_stats(&self, collection: &str) -> HashMap<String, IndexStats> {
4736 let mut stats = HashMap::new();
4737
4738 for entry in self.secondary_indices.iter() {
4739 let key = entry.key();
4740 if key.starts_with(&format!("{}:", collection)) {
4741 let field = key.split(':').nth(1).unwrap_or("unknown");
4742 let index = entry.value();
4743
4744 let unique_values = index.len();
4745 let total_documents: usize = index.iter().map(|entry| entry.value().len()).sum();
4746
4747 stats.insert(
4748 field.to_string(),
4749 IndexStats {
4750 unique_values,
4751 total_documents,
4752 avg_docs_per_value: if unique_values > 0 {
4753 total_documents / unique_values
4754 } else {
4755 0
4756 },
4757 },
4758 );
4759 }
4760 }
4761
4762 stats
4763 }
4764
4765 /// Optimize a collection by creating indices for frequently filtered fields
4766 ///
4767 /// This analyzes common query patterns and suggests/creates optimal indices.
4768 pub async fn optimize_collection(&self, collection: &str) -> Result<()> {
4769 if let Ok(collection_def) = self.get_collection_definition(collection) {
4770 let field_names: Vec<&str> = collection_def.fields.keys().map(|s| s.as_str()).collect();
4771 self.create_indices(collection, &field_names).await?;
4772 }
4773
4774 Ok(())
4775 }
4776
4777 // Helper method to get unique fields from a collection
4778 fn get_unique_fields(&self, collection: &Collection) -> Vec<String> {
4779 collection
4780 .fields
4781 .iter()
4782 .filter(|(_, def)| def.unique)
4783 .map(|(name, _)| name.clone())
4784 .collect()
4785 }
4786
4787 // Update the validation method to use the helper
4788 async fn validate_unique_constraints(
4789 &self,
4790 collection: &str,
4791 data: &HashMap<String, Value>,
4792 ) -> Result<()> {
4793 self.ensure_indices_initialized().await?;
4794 let collection_def = match self.get_collection_definition(collection) {
4795 Ok(def) => def,
4796 Err(e) if e.code == crate::error::ErrorCode::CollectionNotFound => return Ok(()),
4797 Err(e) => return Err(e),
4798 };
4799 let unique_fields = self.get_unique_fields(&collection_def);
4800
4801 for unique_field in &unique_fields {
4802 if let Some(value) = data.get(unique_field) {
4803 let index_key = format!("{}:{}", collection, unique_field);
4804 if let Some(index) = self.secondary_indices.get(&index_key) {
4805 // Get the raw string value without JSON formatting
4806 let value_str = match value {
4807 Value::String(s) => s.clone(),
4808 _ => value.to_string(),
4809 };
4810 if index.contains_key(&value_str) {
4811 return Err(AqlError::new(
4812 ErrorCode::UniqueConstraintViolation,
4813 format!(
4814 "Unique constraint violation on field '{}' with value '{}'",
4815 unique_field, value_str
4816 ),
4817 ));
4818 }
4819 }
4820 }
4821 }
4822 Ok(())
4823 }
4824
4825 /// Validate unique constraints excluding a specific document ID (for updates)
4826 async fn validate_unique_constraints_excluding(
4827 &self,
4828 collection: &str,
4829 data: &HashMap<String, Value>,
4830 exclude_id: &str,
4831 ) -> Result<()> {
4832 self.ensure_indices_initialized().await?;
4833 let collection_def = self.get_collection_definition(collection)?;
4834 let unique_fields = self.get_unique_fields(&collection_def);
4835
4836 for unique_field in &unique_fields {
4837 if let Some(value) = data.get(unique_field) {
4838 let index_key = format!("{}:{}", collection, unique_field);
4839 if let Some(index) = self.secondary_indices.get(&index_key) {
4840 // Get the raw string value without JSON formatting
4841 let value_str = match value {
4842 Value::String(s) => s.clone(),
4843 _ => value.to_string(),
4844 };
4845 if let Some(doc_ids) = index.get(&value_str) {
4846 // Check if any document other than the excluded one has this value
4847 let exclude_key = format!("{}:{}", collection, exclude_id);
4848 for doc_key in doc_ids.value() {
4849 if doc_key != &exclude_key {
4850 return Err(AqlError::new(
4851 ErrorCode::UniqueConstraintViolation,
4852 format!(
4853 "Unique constraint violation on field '{}' with value '{}'",
4854 unique_field, value_str
4855 ),
4856 ));
4857 }
4858 }
4859 }
4860 }
4861 }
4862 }
4863 Ok(())
4864 }
4865
4866 // =========================================================================
4867 // AQL Executor Helper Methods
4868 // These are wrapper methods for AQL executor integration.
4869 // They provide a simplified API compatible with the AQL query execution layer.
4870 // =========================================================================
4871
4872 /// Get all documents in a collection (AQL helper)
4873 ///
4874 /// This is a wrapper around the internal query system optimized for bulk retrieval.
4875 pub async fn aql_get_all_collection(&self, collection: &str) -> Result<Vec<Document>> {
4876 self.ensure_indices_initialized().await?;
4877
4878 let prefix = format!("{}:", collection);
4879 let mut docs = Vec::new();
4880
4881 for result in self.cold.scan_prefix(&prefix) {
4882 let (_, value) = result?;
4883 if let Ok(doc) = serde_json::from_slice::<Document>(&value) {
4884 docs.push(doc);
4885 }
4886 }
4887
4888 Ok(docs)
4889 }
4890
4891 /// Insert a document from a HashMap (AQL helper)
4892 ///
4893 /// Returns the complete document (not just ID) for AQL executor
4894 pub async fn aql_insert(
4895 &self,
4896 collection: &str,
4897 data: HashMap<String, Value>,
4898 ) -> Result<Document> {
4899 // Validate unique constraints before inserting
4900 self.validate_unique_constraints(collection, &data).await?;
4901
4902 let doc_id = Uuid::now_v7().to_string();
4903 let document = Document {
4904 id: doc_id.clone(),
4905 data,
4906 };
4907
4908 self.put(
4909 format!("{}:{}", collection, doc_id),
4910 serde_json::to_vec(&document)?,
4911 None,
4912 )
4913 .await?;
4914
4915 // Publish insert event
4916 let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
4917 let _ = self.pubsub.publish(event);
4918
4919 Ok(document)
4920 }
4921
4922 /// Update a document by ID with new data (AQL helper)
4923 ///
4924 /// Merges new data with existing data and returns updated document
4925 pub async fn aql_update_document(
4926 &self,
4927 collection: &str,
4928 doc_id: &str,
4929 updates: HashMap<String, Value>,
4930 ) -> Result<Document> {
4931 let key = format!("{}:{}", collection, doc_id);
4932
4933 // Get existing document
4934 let existing = self.get(&key)?.ok_or_else(|| {
4935 AqlError::new(
4936 ErrorCode::NotFound,
4937 format!("Document {} not found", doc_id),
4938 )
4939 })?;
4940
4941 let mut doc: Document = serde_json::from_slice(&existing)?;
4942 let old_doc = doc.clone();
4943
4944 // Merge updates into existing data
4945 for (k, v) in updates {
4946 doc.data.insert(k, v);
4947 }
4948
4949 // Validate unique constraints excluding this document
4950 self.validate_unique_constraints_excluding(collection, &doc.data, doc_id)
4951 .await?;
4952
4953 // Save updated document
4954 self.put(key, serde_json::to_vec(&doc)?, None).await?;
4955
4956 // Publish update event
4957 let event = crate::pubsub::ChangeEvent::update(collection, doc_id, old_doc, doc.clone());
4958 let _ = self.pubsub.publish(event);
4959
4960 Ok(doc)
4961 }
4962
4963 /// Delete a document by ID (AQL helper)
4964 ///
4965 /// Returns the deleted document
4966 pub async fn aql_delete_document(&self, collection: &str, doc_id: &str) -> Result<Document> {
4967 let key = format!("{}:{}", collection, doc_id);
4968
4969 // Get existing document first
4970 let existing = self.get(&key)?.ok_or_else(|| {
4971 AqlError::new(
4972 ErrorCode::NotFound,
4973 format!("Document {} not found", doc_id),
4974 )
4975 })?;
4976
4977 let doc: Document = serde_json::from_slice(&existing)?;
4978
4979 // Append to WAL for durability
4980 // Write to WAL if enabled and not in None durability mode
4981 if self.config.durability_mode != DurabilityMode::None {
4982 if let Some(wal_writer) = &self.wal_writer {
4983 // Use async writer if available
4984 let op = WalOperation::Delete { key: key.clone() };
4985 wal_writer.send(op).map_err(|_| {
4986 AqlError::new(
4987 ErrorCode::InternalError,
4988 "Failed to send to WAL writer".to_string(),
4989 )
4990 })?;
4991 } else if let Some(wal) = &self.wal {
4992 // Fallback to blocking write if async writer not set (should be rare/legacy)
4993 wal.write()
4994 .map_err(|e| AqlError::new(ErrorCode::InternalError, e.to_string()))?
4995 .append(crate::wal::Operation::Delete, &key, None)?;
4996 }
4997 }
4998
4999 // Delete from storage
5000 self.cold.delete(&key)?;
5001 self.hot.delete(&key);
5002
5003 // Remove from primary index
5004 if let Some(index) = self.primary_indices.get_mut(collection) {
5005 index.remove(&key);
5006 }
5007
5008 // Remove from secondary indices
5009 for (field_name, value) in &doc.data {
5010 let index_key = format!("{}:{}", collection, field_name);
5011 if let Some(index) = self.secondary_indices.get_mut(&index_key) {
5012 let value_str = match value {
5013 Value::String(s) => s.clone(),
5014 _ => value.to_string(),
5015 };
5016 if let Some(mut doc_ids) = index.get_mut(&value_str) {
5017 doc_ids.retain(|id| id != &key);
5018 }
5019 }
5020 }
5021
5022 // Publish delete event
5023 let event = crate::pubsub::ChangeEvent::delete(collection, doc_id);
5024 let _ = self.pubsub.publish(event);
5025
5026 Ok(doc)
5027 }
5028
5029 /// Get a single document by ID (AQL helper)
5030 pub async fn aql_get_document(
5031 &self,
5032 collection: &str,
5033 doc_id: &str,
5034 ) -> Result<Option<Document>> {
5035 let key = format!("{}:{}", collection, doc_id);
5036
5037 match self.get(&key)? {
5038 Some(data) => {
5039 let doc: Document = serde_json::from_slice(&data)?;
5040 Ok(Some(doc))
5041 }
5042 None => Ok(None),
5043 }
5044 }
5045
5046 /// Begin a transaction (AQL helper) - returns transaction ID
5047 pub fn aql_begin_transaction(&self) -> Result<crate::transaction::TransactionId> {
5048 Ok(self.begin_transaction())
5049 }
5050
5051 /// Commit a transaction (AQL helper)
5052 pub async fn aql_commit_transaction(
5053 &self,
5054 tx_id: crate::transaction::TransactionId,
5055 ) -> Result<()> {
5056 self.commit_transaction(tx_id)
5057 }
5058
5059 /// Rollback a transaction (AQL helper)
5060 pub async fn aql_rollback_transaction(
5061 &self,
5062 tx_id: crate::transaction::TransactionId,
5063 ) -> Result<()> {
5064 self.transaction_manager.rollback(tx_id)
5065 }
5066
5067 // ============================================
5068 // AQL Schema Management Wrappers
5069 // ============================================
5070
5071 /// Create a collection from AST schema definition
5072 pub async fn create_collection_schema(
5073 &self,
5074 name: &str,
5075 fields: HashMap<String, crate::types::FieldDefinition>,
5076 ) -> Result<()> {
5077 let collection_key = format!("_collection:{}", name);
5078
5079 // Check if collection already exists
5080 if self.get(&collection_key)?.is_some() {
5081 // Already exists
5082 return Ok(());
5083 }
5084
5085 let collection = Collection {
5086 name: name.to_string(),
5087 fields,
5088 };
5089
5090 let collection_data = serde_json::to_vec(&collection)?;
5091 self.put(collection_key, collection_data, None).await?;
5092 self.schema_cache.remove(name);
5093
5094 Ok(())
5095 }
5096
5097 /// Add a field to an existing collection schema
5098 pub async fn add_field_to_schema(
5099 &self,
5100 collection_name: &str,
5101 name: String,
5102 definition: crate::types::FieldDefinition,
5103 ) -> Result<()> {
5104 let mut collection = self
5105 .get_collection_definition(collection_name)
5106 .map_err(|_| {
5107 AqlError::new(ErrorCode::CollectionNotFound, collection_name.to_string())
5108 })?;
5109
5110 if collection.fields.contains_key(&name) {
5111 return Err(AqlError::new(
5112 ErrorCode::InvalidDefinition,
5113 format!(
5114 "Field '{}' already exists in collection '{}'",
5115 name, collection_name
5116 ),
5117 ));
5118 }
5119
5120 collection.fields.insert(name, definition);
5121
5122 let collection_key = format!("_collection:{}", collection_name);
5123 let collection_data = serde_json::to_vec(&collection)?;
5124 self.put(collection_key, collection_data, None).await?;
5125 self.schema_cache.remove(collection_name);
5126
5127 Ok(())
5128 }
5129
5130 /// Drop a field from an existing collection schema
5131 pub async fn drop_field_from_schema(
5132 &self,
5133 collection_name: &str,
5134 field_name: String,
5135 ) -> Result<()> {
5136 let mut collection = self
5137 .get_collection_definition(collection_name)
5138 .map_err(|_| {
5139 AqlError::new(ErrorCode::CollectionNotFound, collection_name.to_string())
5140 })?;
5141
5142 if !collection.fields.contains_key(&field_name) {
5143 return Err(AqlError::new(
5144 ErrorCode::InvalidDefinition,
5145 format!(
5146 "Field '{}' does not exist in collection '{}'",
5147 field_name, collection_name
5148 ),
5149 ));
5150 }
5151
5152 collection.fields.remove(&field_name);
5153
5154 let collection_key = format!("_collection:{}", collection_name);
5155 let collection_data = serde_json::to_vec(&collection)?;
5156 self.put(collection_key, collection_data, None).await?;
5157 self.schema_cache.remove(collection_name);
5158
5159 Ok(())
5160 }
5161
5162 /// Rename a field in an existing collection schema
5163 pub async fn rename_field_in_schema(
5164 &self,
5165 collection_name: &str,
5166 from: String,
5167 to: String,
5168 ) -> Result<()> {
5169 let mut collection = self
5170 .get_collection_definition(collection_name)
5171 .map_err(|_| {
5172 AqlError::new(ErrorCode::CollectionNotFound, collection_name.to_string())
5173 })?;
5174
5175 if let Some(def) = collection.fields.remove(&from) {
5176 if collection.fields.contains_key(&to) {
5177 return Err(AqlError::new(
5178 ErrorCode::InvalidDefinition,
5179 format!(
5180 "Target field name '{}' already exists in collection '{}'",
5181 to, collection_name
5182 ),
5183 ));
5184 }
5185 collection.fields.insert(to, def);
5186 } else {
5187 return Err(AqlError::new(
5188 ErrorCode::InvalidDefinition,
5189 format!("Field '{}' not found", from),
5190 ));
5191 }
5192
5193 let collection_key = format!("_collection:{}", collection_name);
5194 let collection_data = serde_json::to_vec(&collection)?;
5195 self.put(collection_key, collection_data, None).await?;
5196 self.schema_cache.remove(collection_name);
5197
5198 Ok(())
5199 }
5200
5201 /// Modify a field in an existing collection schema
5202 pub async fn modify_field_in_schema(
5203 &self,
5204 collection_name: &str,
5205 name: String,
5206 definition: crate::types::FieldDefinition,
5207 ) -> Result<()> {
5208 let mut collection = self
5209 .get_collection_definition(collection_name)
5210 .map_err(|_| {
5211 AqlError::new(ErrorCode::CollectionNotFound, collection_name.to_string())
5212 })?;
5213
5214 if !collection.fields.contains_key(&name) {
5215 return Err(AqlError::new(
5216 ErrorCode::InvalidDefinition,
5217 format!(
5218 "Field '{}' does not exist in collection '{}'",
5219 name, collection_name
5220 ),
5221 ));
5222 }
5223
5224 collection.fields.insert(name, definition);
5225
5226 let collection_key = format!("_collection:{}", collection_name);
5227 let collection_data = serde_json::to_vec(&collection)?;
5228 self.put(collection_key, collection_data, None).await?;
5229 self.schema_cache.remove(collection_name);
5230
5231 Ok(())
5232 }
5233
5234 /// Drop an entire collection definition
5235 pub async fn drop_collection_schema(&self, collection_name: &str) -> Result<()> {
5236 let collection_key = format!("_collection:{}", collection_name);
5237 self.cold.delete(&collection_key)?;
5238 self.schema_cache.remove(collection_name);
5239 Ok(())
5240 }
5241
5242 // ============================================
5243 // AQL Migration Wrappers
5244 // ============================================
5245
5246 /// Check if a migration version has been applied
5247 pub async fn is_migration_applied(&self, version: &str) -> Result<bool> {
5248 let migration_key = format!("_sys_migration:{}", version);
5249 Ok(self.get(&migration_key)?.is_some())
5250 }
5251
5252 /// Mark a migration version as applied
5253 pub async fn mark_migration_applied(&self, version: &str) -> Result<()> {
5254 let migration_key = format!("_sys_migration:{}", version);
5255 let timestamp = chrono::Utc::now().to_rfc3339();
5256 self.put(migration_key, timestamp.as_bytes().to_vec(), None)
5257 .await?;
5258 Ok(())
5259 }
5260}
5261
5262impl Drop for Aurora {
5263 fn drop(&mut self) {
5264 // Signal checkpoint task to shutdown gracefully
5265 if let Some(ref shutdown_tx) = self.checkpoint_shutdown {
5266 let _ = shutdown_tx.send(());
5267 }
5268 // Signal compaction task to shutdown gracefully
5269 if let Some(ref shutdown_tx) = self.compaction_shutdown {
5270 let _ = shutdown_tx.send(());
5271 }
5272 }
5273}
5274
5275fn check_filter(doc_val: &Value, filter: &HttpFilter) -> bool {
5276 let filter_val = match json_to_value(&filter.value) {
5277 Ok(v) => v,
5278 Err(_) => return false,
5279 };
5280
5281 match filter.operator {
5282 FilterOperator::Eq => doc_val == &filter_val,
5283 FilterOperator::Ne => doc_val != &filter_val,
5284 FilterOperator::Gt => doc_val > &filter_val,
5285 FilterOperator::Gte => doc_val >= &filter_val,
5286 FilterOperator::Lt => doc_val < &filter_val,
5287 FilterOperator::Lte => doc_val <= &filter_val,
5288 FilterOperator::Contains => match (doc_val, &filter_val) {
5289 (Value::String(s), Value::String(fv)) => s.contains(fv),
5290 (Value::Array(arr), _) => arr.contains(&filter_val),
5291 _ => false,
5292 },
5293 }
5294}
5295
5296/// Results of importing a document
5297enum ImportResult {
5298 Imported,
5299 Skipped,
5300}
5301
5302/// Statistics from an import operation
5303#[derive(Debug, Default)]
5304pub struct ImportStats {
5305 /// Number of documents successfully imported
5306 pub imported: usize,
5307 /// Number of documents skipped (usually because they already exist)
5308 pub skipped: usize,
5309 /// Number of documents that failed to import
5310 pub failed: usize,
5311}
5312
5313/// Statistics for a specific collection
5314#[derive(Debug)]
5315pub struct CollectionStats {
5316 /// Number of documents in the collection
5317 pub count: usize,
5318 /// Total size of the collection in bytes
5319 pub size_bytes: usize,
5320 /// Average document size in bytes
5321 pub avg_doc_size: usize,
5322}
5323
5324/// Statistics for an index
5325#[derive(Debug)]
5326pub struct IndexStats {
5327 /// Number of unique values in the index
5328 pub unique_values: usize,
5329 /// Total number of documents covered by the index
5330 pub total_documents: usize,
5331 /// Average number of documents per unique value
5332 pub avg_docs_per_value: usize,
5333}
5334
5335/// Combined database statistics
5336#[derive(Debug)]
5337pub struct DatabaseStats {
5338 /// Hot cache statistics
5339 pub hot_stats: crate::storage::hot::CacheStats,
5340 /// Cold storage statistics
5341 pub cold_stats: crate::storage::cold::ColdStoreStats,
5342 /// Estimated total database size in bytes
5343 pub estimated_size: u64,
5344 /// Statistics for each collection
5345 pub collections: HashMap<String, CollectionStats>,
5346}
5347
5348#[cfg(test)]
5349mod tests {
5350 use super::*;
5351 use tempfile::tempdir;
5352
5353 #[tokio::test]
5354 async fn test_async_wal_integration() {
5355 let temp_dir = tempfile::tempdir().unwrap();
5356 let db_path = temp_dir.path().join("test_wal_integration.db");
5357 let db = Aurora::open(db_path.to_str().unwrap()).unwrap();
5358
5359 // 1. Enable WAL explicitly (if your config defaults to off for tests)
5360 // Note: Your implementation might default it on based on AuroraConfig.
5361
5362 // Create collection schema first
5363 db.new_collection(
5364 "users",
5365 vec![
5366 ("id", FieldType::String, false),
5367 ("counter", FieldType::Int, false),
5368 ],
5369 )
5370 .await
5371 .unwrap();
5372
5373 let db_clone = db.clone();
5374
5375 // 2. Spawn 50 concurrent writes
5376 let handles: Vec<_> = (0..50)
5377 .map(|i| {
5378 let db = db_clone.clone();
5379 tokio::spawn(async move {
5380 let doc_id = db
5381 .insert_into(
5382 "users",
5383 vec![
5384 ("id", Value::String(format!("user-{}", i))),
5385 ("counter", Value::Int(i)),
5386 ],
5387 )
5388 .await
5389 .unwrap();
5390 (i, doc_id) // Return both the index and the document ID
5391 })
5392 })
5393 .collect();
5394
5395 // Wait for all to complete and collect the results
5396 let results = futures::future::join_all(handles).await;
5397 assert!(results.iter().all(|r| r.is_ok()), "Some writes failed");
5398
5399 // Extract the document IDs
5400 let doc_ids: Vec<(i64, String)> = results.into_iter().map(|r| r.unwrap()).collect();
5401
5402 // Find the document ID for the one with counter=25
5403 let target_doc_id = doc_ids
5404 .iter()
5405 .find(|(counter, _)| *counter == 25i64)
5406 .map(|(_, doc_id)| doc_id)
5407 .unwrap();
5408
5409 // 4. Verify Data Integrity
5410 let doc = db.get_document("users", target_doc_id).unwrap().unwrap();
5411 assert_eq!(doc.data.get("counter"), Some(&Value::Int(25)));
5412 }
5413
5414 #[tokio::test]
5415 async fn test_basic_operations() -> Result<()> {
5416 let temp_dir = tempdir()?;
5417 let db_path = temp_dir.path().join("test.aurora");
5418 let db = Aurora::open(db_path.to_str().unwrap())?;
5419
5420 // Test collection creation
5421 db.new_collection(
5422 "users",
5423 vec![
5424 ("name", FieldType::String, false),
5425 ("age", FieldType::Int, false),
5426 ("email", FieldType::String, true),
5427 ],
5428 )
5429 .await?;
5430
5431 // Test document insertion
5432 let doc_id = db
5433 .insert_into(
5434 "users",
5435 vec![
5436 ("name", Value::String("John Doe".to_string())),
5437 ("age", Value::Int(30)),
5438 ("email", Value::String("john@example.com".to_string())),
5439 ],
5440 )
5441 .await?;
5442
5443 // Test document retrieval
5444 let doc = db.get_document("users", &doc_id)?.unwrap();
5445 assert_eq!(
5446 doc.data.get("name").unwrap(),
5447 &Value::String("John Doe".to_string())
5448 );
5449 assert_eq!(doc.data.get("age").unwrap(), &Value::Int(30));
5450
5451 Ok(())
5452 }
5453
5454 #[tokio::test]
5455 async fn test_transactions() -> Result<()> {
5456 let temp_dir = tempdir()?;
5457 let db_path = temp_dir.path().join("test.aurora");
5458 let db = Aurora::open(db_path.to_str().unwrap())?;
5459
5460 // Create collection
5461 db.new_collection("test", vec![("field", FieldType::String, false)])
5462 .await?;
5463
5464 // Start transaction
5465 let tx_id = db.begin_transaction();
5466
5467 // Insert document
5468 let doc_id = db
5469 .insert_into("test", vec![("field", Value::String("value".to_string()))])
5470 .await?;
5471
5472 // Commit transaction
5473 db.commit_transaction(tx_id)?;
5474
5475 // Verify document exists
5476 let doc = db.get_document("test", &doc_id)?.unwrap();
5477 assert_eq!(
5478 doc.data.get("field").unwrap(),
5479 &Value::String("value".to_string())
5480 );
5481
5482 Ok(())
5483 }
5484
5485 #[tokio::test]
5486 async fn test_query_operations() -> Result<()> {
5487 let temp_dir = tempdir()?;
5488 let db_path = temp_dir.path().join("test.aurora");
5489 let db = Aurora::open(db_path.to_str().unwrap())?;
5490
5491 // Test collection creation
5492 db.new_collection(
5493 "books",
5494 vec![
5495 ("title", FieldType::String, false),
5496 ("author", FieldType::String, false),
5497 ("year", FieldType::Int, false),
5498 ],
5499 )
5500 .await?;
5501
5502 // Test document insertion
5503 db.insert_into(
5504 "books",
5505 vec![
5506 ("title", Value::String("Book 1".to_string())),
5507 ("author", Value::String("Author 1".to_string())),
5508 ("year", Value::Int(2020)),
5509 ],
5510 )
5511 .await?;
5512
5513 db.insert_into(
5514 "books",
5515 vec![
5516 ("title", Value::String("Book 2".to_string())),
5517 ("author", Value::String("Author 2".to_string())),
5518 ("year", Value::Int(2021)),
5519 ],
5520 )
5521 .await?;
5522
5523 // Test query
5524 let results = db
5525 .query("books")
5526 .filter(|f| f.gt("year", Value::Int(2019)))
5527 .order_by("year", true)
5528 .collect()
5529 .await?;
5530
5531 assert_eq!(results.len(), 2);
5532 assert!(results[0].data.get("year").unwrap() < results[1].data.get("year").unwrap());
5533
5534 Ok(())
5535 }
5536
5537 #[tokio::test]
5538 async fn test_blob_operations() -> Result<()> {
5539 let temp_dir = tempdir()?;
5540 let db_path = temp_dir.path().join("test.aurora");
5541 let db = Aurora::open(db_path.to_str().unwrap())?;
5542
5543 // Create test file
5544 let file_path = temp_dir.path().join("test.txt");
5545 std::fs::write(&file_path, b"Hello, World!")?;
5546
5547 // Test blob storage
5548 db.put_blob("test:blob".to_string(), &file_path).await?;
5549
5550 // Verify blob exists
5551 let data = db.get_data_by_pattern("test:blob")?;
5552 assert_eq!(data.len(), 1);
5553 match &data[0].1 {
5554 DataInfo::Blob { size } => assert_eq!(*size, 13 + 5), // content + "BLOB:" prefix
5555 _ => panic!("Expected Blob type"),
5556 }
5557
5558 Ok(())
5559 }
5560
5561 #[tokio::test]
5562 async fn test_blob_size_limit() -> Result<()> {
5563 let temp_dir = tempdir()?;
5564 let db_path = temp_dir.path().join("test.aurora");
5565 let db = Aurora::open(db_path.to_str().unwrap())?;
5566
5567 // Create a test file that's too large (201MB)
5568 let large_file_path = temp_dir.path().join("large_file.bin");
5569 let large_data = vec![0u8; 201 * 1024 * 1024];
5570 std::fs::write(&large_file_path, &large_data)?;
5571
5572 // Attempt to store the large file
5573 let result = db
5574 .put_blob("test:large_blob".to_string(), &large_file_path)
5575 .await;
5576
5577 assert!(result.is_err());
5578 let err = result.unwrap_err();
5579 assert!(err.code == ErrorCode::InvalidOperation); // Expected error
5580
5581 Ok(())
5582 }
5583
5584 #[tokio::test]
5585 async fn test_unique_constraints() -> Result<()> {
5586 let temp_dir = tempdir()?;
5587 let db_path = temp_dir.path().join("test.aurora");
5588 let db = Aurora::open(db_path.to_str().unwrap())?;
5589
5590 // Create collection with unique email field
5591 db.new_collection(
5592 "users",
5593 vec![
5594 ("name", FieldType::String, false),
5595 ("email", FieldType::String, true), // unique field
5596 ("age", FieldType::Int, false),
5597 ],
5598 )
5599 .await?;
5600
5601 // Insert first document
5602 let _doc_id1 = db
5603 .insert_into(
5604 "users",
5605 vec![
5606 ("name", Value::String("John Doe".to_string())),
5607 ("email", Value::String("john@example.com".to_string())),
5608 ("age", Value::Int(30)),
5609 ],
5610 )
5611 .await?;
5612
5613 // Try to insert second document with same email - should fail
5614 let result = db
5615 .insert_into(
5616 "users",
5617 vec![
5618 ("name", Value::String("Jane Doe".to_string())),
5619 ("email", Value::String("john@example.com".to_string())), // duplicate email
5620 ("age", Value::Int(25)),
5621 ],
5622 )
5623 .await;
5624
5625 assert!(result.is_err());
5626 if let Err(e) = result {
5627 if e.code == ErrorCode::UniqueConstraintViolation {
5628 let msg = e.message;
5629 assert!(msg.contains("email"));
5630 assert!(msg.contains("john@example.com")); // Changed from test@example.com to match the actual value
5631 } else {
5632 panic!("Expected UniqueConstraintViolation, got {:?}", e);
5633 }
5634 } else {
5635 panic!("Expected UniqueConstraintViolation");
5636 }
5637
5638 // Test upsert with unique constraint
5639 // Should succeed for new document
5640 let _doc_id2 = db
5641 .upsert(
5642 "users",
5643 "user2",
5644 vec![
5645 ("name", Value::String("Alice Smith".to_string())),
5646 ("email", Value::String("alice@example.com".to_string())),
5647 ("age", Value::Int(28)),
5648 ],
5649 )
5650 .await?;
5651
5652 // Should fail when trying to upsert with duplicate email
5653 let result = db
5654 .upsert(
5655 "users",
5656 "user3",
5657 vec![
5658 ("name", Value::String("Bob Wilson".to_string())),
5659 ("email", Value::String("alice@example.com".to_string())), // duplicate
5660 ("age", Value::Int(35)),
5661 ],
5662 )
5663 .await;
5664
5665 assert!(result.is_err());
5666
5667 // Should succeed when updating existing document with same email (no change)
5668 let result = db
5669 .upsert(
5670 "users",
5671 "user2",
5672 vec![
5673 ("name", Value::String("Alice Updated".to_string())),
5674 ("email", Value::String("alice@example.com".to_string())), // same email, same doc
5675 ("age", Value::Int(29)),
5676 ],
5677 )
5678 .await;
5679
5680 assert!(result.is_ok());
5681
5682 Ok(())
5683 }
5684
5685 #[tokio::test]
5686 async fn test_nullable_field_definition() -> Result<()> {
5687 use tempfile::TempDir;
5688
5689 let temp_dir = TempDir::new()?;
5690 let db_path = temp_dir.path().join("test.aurora");
5691 let db = Aurora::open(db_path.to_str().unwrap())?;
5692
5693 // Test 3-tuple format (defaults nullable to false)
5694 db.new_collection(
5695 "products",
5696 vec![
5697 ("name", FieldType::String, false),
5698 ("sku", FieldType::String, true), // unique
5699 ],
5700 )
5701 .await?;
5702
5703 // Test 4-tuple format with explicit nullable control
5704 db.new_collection(
5705 "users",
5706 vec![
5707 ("name", FieldType::String, false, false), // not nullable
5708 ("email", FieldType::String, true, false), // unique, not nullable
5709 ("bio", FieldType::String, false, true), // nullable
5710 ("age", FieldType::Int, false, true), // nullable
5711 ],
5712 )
5713 .await?;
5714
5715 // Verify the schema was created correctly
5716 let schema = db.ensure_schema_hot("users")?;
5717
5718 // Check that name is not nullable
5719 assert_eq!(schema.fields.get("name").unwrap().nullable, false);
5720
5721 // Check that email is not nullable
5722 assert_eq!(schema.fields.get("email").unwrap().nullable, false);
5723
5724 // Check that bio is nullable
5725 assert_eq!(schema.fields.get("bio").unwrap().nullable, true);
5726
5727 // Check that age is nullable
5728 assert_eq!(schema.fields.get("age").unwrap().nullable, true);
5729
5730 // Verify products collection (3-tuple defaults to false)
5731 let products_schema = db.ensure_schema_hot("products")?;
5732 assert_eq!(products_schema.fields.get("name").unwrap().nullable, false);
5733 assert_eq!(products_schema.fields.get("sku").unwrap().nullable, false);
5734
5735 Ok(())
5736 }
5737}