aurora_db/db.rs
1//! # Aurora Database
2//!
3//! Aurora is an embedded document database with tiered storage architecture.
4//! It provides document storage, querying, indexing, and search capabilities
5//! while optimizing for both performance and durability.
6//!
7//! ## Key Features
8//!
9//! * **Tiered Storage**: Hot in-memory cache + persistent cold storage
10//! * **Document Model**: Schema-flexible JSON-like document storage
11//! * **Querying**: Rich query capabilities with filtering and sorting
12//! * **Full-text Search**: Built-in search engine with relevance ranking
13//! * **Transactions**: ACID-compliant transaction support
14//! * **Blob Storage**: Efficient storage for large binary objects
15//!
16//! ## Usage Example
17//!
18//! ```rust
19//! use aurora::Aurora;
20//!
21//! // Open a database
22//! let db = Aurora::open("my_database.db")?;
23//!
24//! // Create a collection with schema
25//! db.new_collection("users", vec![
26//! ("name", FieldType::String, false),
27//! ("email", FieldType::String, true), // unique field
28//! ("age", FieldType::Int, false),
29//! ])?;
30//!
31//! // Insert a document
32//! let user_id = db.insert_into("users", vec![
33//! ("name", Value::String("Jane Doe".to_string())),
34//! ("email", Value::String("jane@example.com".to_string())),
35//! ("age", Value::Int(28)),
36//! ])?;
37//!
38//! // Query for documents
39//! let adult_users = db.query("users")
40//! .filter(|f| f.gt("age", 18))
41//! .order_by("name", true)
42//! .collect()
43//! .await?;
44//! ```
45
46use crate::error::{AuroraError, Result};
47use crate::index::{Index, IndexDefinition, IndexType};
48use crate::network::http_models::{
49 Filter as HttpFilter, FilterOperator, QueryPayload, json_to_value,
50};
51use crate::query::{Filter, FilterBuilder, QueryBuilder, SearchBuilder, SimpleQueryBuilder};
52use crate::storage::{ColdStore, HotStore, WriteBuffer};
53use crate::types::{
54 AuroraConfig, Collection, Document, DurabilityMode, FieldDefinition, FieldType, Value,
55};
56use crate::wal::{Operation, WriteAheadLog};
57use dashmap::DashMap;
58use serde_json::Value as JsonValue;
59use serde_json::from_str;
60use std::collections::HashMap;
61use std::fmt;
62use std::fs::File as StdFile;
63use std::path::{Path, PathBuf};
64use std::sync::{Arc, RwLock};
65use std::time::Duration;
66use tokio::fs::File;
67use tokio::fs::read_to_string;
68use tokio::io::AsyncReadExt;
69use tokio::sync::OnceCell;
70use uuid::Uuid;
71
72// Disk location metadata for primary index
73// Instead of storing full Vec<u8> values, we store minimal metadata
74#[derive(Debug, Clone, Copy)]
75struct DiskLocation {
76 size: u32, // Size in bytes (useful for statistics)
77}
78
79impl DiskLocation {
80 fn new(size: usize) -> Self {
81 Self {
82 size: size as u32,
83 }
84 }
85}
86
87// Index types for faster lookups
88type PrimaryIndex = DashMap<String, DiskLocation>;
89type SecondaryIndex = DashMap<String, Vec<String>>;
90
91// Move DataInfo enum outside impl block
92#[derive(Debug)]
93pub enum DataInfo {
94 Data { size: usize, preview: String },
95 Blob { size: usize },
96 Compressed { size: usize },
97}
98
99impl DataInfo {
100 pub fn size(&self) -> usize {
101 match self {
102 DataInfo::Data { size, .. } => *size,
103 DataInfo::Blob { size } => *size,
104 DataInfo::Compressed { size } => *size,
105 }
106 }
107}
108
109/// The main database engine
110///
111/// Aurora combines a tiered storage architecture with document-oriented database features:
112/// - Hot tier: In-memory cache for frequently accessed data
113/// - Cold tier: Persistent disk storage for durability
114/// - Primary indices: Fast key-based access
115/// - Secondary indices: Fast field-based queries
116///
117/// # Examples
118///
119/// ```
120/// // Open a database (creates if doesn't exist)
121/// let db = Aurora::open("my_app.db")?;
122///
123/// // Insert a document
124/// let doc_id = db.insert_into("users", vec![
125/// ("name", Value::String("Alice".to_string())),
126/// ("age", Value::Int(32)),
127/// ])?;
128///
129/// // Retrieve a document
130/// let user = db.get_document("users", &doc_id)?;
131/// ```
132pub struct Aurora {
133 hot: HotStore,
134 cold: Arc<ColdStore>,
135 primary_indices: Arc<DashMap<String, PrimaryIndex>>,
136 secondary_indices: Arc<DashMap<String, SecondaryIndex>>,
137 indices_initialized: Arc<OnceCell<()>>,
138 transaction_manager: crate::transaction::TransactionManager,
139 indices: Arc<DashMap<String, Index>>,
140 schema_cache: Arc<DashMap<String, Arc<Collection>>>,
141 config: AuroraConfig,
142 write_buffer: Option<WriteBuffer>,
143 pubsub: crate::pubsub::PubSubSystem,
144 // Write-ahead log for durability (optional, based on config)
145 wal: Option<Arc<RwLock<WriteAheadLog>>>,
146 // Background checkpoint task
147 checkpoint_shutdown: Option<tokio::sync::mpsc::UnboundedSender<()>>,
148 // Background compaction task
149 compaction_shutdown: Option<tokio::sync::mpsc::UnboundedSender<()>>,
150}
151
152impl fmt::Debug for Aurora {
153 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
154 f.debug_struct("Aurora")
155 .field("hot", &"HotStore")
156 .field("cold", &"ColdStore")
157 .field("primary_indices_count", &self.primary_indices.len())
158 .field("secondary_indices_count", &self.secondary_indices.len())
159 .field(
160 "active_transactions",
161 &self.transaction_manager.active_count(),
162 )
163 .field("indices_count", &self.indices.len())
164 .finish()
165 }
166}
167
168impl Aurora {
169 /// Remove stale lock files from a database directory
170 ///
171 /// If Aurora crashes or is forcefully terminated, it may leave behind lock files
172 /// that prevent the database from being reopened. This method safely removes
173 /// those lock files.
174 ///
175 /// # Safety
176 /// Only call this when you're certain no other Aurora instance is using the database.
177 /// Removing lock files while another process is running could cause data corruption.
178 ///
179 /// # Example
180 /// ```no_run
181 /// use aurora_db::Aurora;
182 ///
183 /// // If you get "Access denied" error when opening:
184 /// if let Err(e) = Aurora::open("my_db") {
185 /// eprintln!("Failed to open: {}", e);
186 /// // Try removing stale lock
187 /// if Aurora::remove_stale_lock("my_db").unwrap_or(false) {
188 /// println!("Removed stale lock, try opening again");
189 /// let db = Aurora::open("my_db")?;
190 /// }
191 /// }
192 /// # Ok::<(), aurora_db::error::AuroraError>(())
193 /// ```
194 pub fn remove_stale_lock<P: AsRef<Path>>(path: P) -> Result<bool> {
195 let resolved_path = Self::resolve_path(path)?;
196 crate::storage::cold::ColdStore::try_remove_stale_lock(
197 resolved_path.to_str().unwrap()
198 )
199 }
200
201 /// Open or create a database at the specified location
202 ///
203 /// # Arguments
204 /// * `path` - Path to the database file or directory
205 /// - Absolute paths (like `/data/myapp.db`) are used as-is
206 /// - Relative paths (like `./data/myapp.db`) are resolved relative to the current directory
207 /// - Simple names (like `myapp.db`) use the current directory
208 ///
209 /// # Returns
210 /// An initialized `Aurora` database instance
211 ///
212 /// # Examples
213 ///
214 /// ```
215 /// // Use a specific location
216 /// let db = Aurora::open("./data/my_application.db")?;
217 ///
218 /// // Just use a name (creates in current directory)
219 /// let db = Aurora::open("customer_data.db")?;
220 /// ```
221 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
222 let config = AuroraConfig {
223 db_path: Self::resolve_path(path)?,
224 ..Default::default()
225 };
226 Self::with_config(config)
227 }
228
229 /// Helper method to resolve database path
230 fn resolve_path<P: AsRef<Path>>(path: P) -> Result<PathBuf> {
231 let path = path.as_ref();
232
233 // If it's an absolute path, use it directly
234 if path.is_absolute() {
235 return Ok(path.to_path_buf());
236 }
237
238 // Otherwise, resolve relative to current directory
239 match std::env::current_dir() {
240 Ok(current_dir) => Ok(current_dir.join(path)),
241 Err(e) => Err(AuroraError::IoError(format!(
242 "Failed to resolve current directory: {}",
243 e
244 ))),
245 }
246 }
247
248 /// Open a database with custom configuration
249 ///
250 /// # Arguments
251 /// * `config` - Database configuration settings
252 ///
253 /// # Examples
254 ///
255 /// ```
256 /// use aurora_db::{Aurora, types::AuroraConfig};
257 /// use std::time::Duration;
258 ///
259 /// let config = AuroraConfig {
260 /// db_path: "my_data.db".into(),
261 /// hot_cache_size_mb: 512, // 512 MB cache
262 /// enable_write_buffering: true, // Batch writes for speed
263 /// enable_wal: true, // Durability
264 /// auto_compact: true, // Background compaction
265 /// compact_interval_mins: 60, // Compact every hour
266 /// ..Default::default()
267 /// };
268 ///
269 /// let db = Aurora::with_config(config)?;
270 /// ```
271 pub fn with_config(config: AuroraConfig) -> Result<Self> {
272 let path = Self::resolve_path(&config.db_path)?;
273
274 if config.create_dirs
275 && let Some(parent) = path.parent()
276 && !parent.exists() {
277 std::fs::create_dir_all(parent)?;
278 }
279
280 // Fix method calls to pass all required parameters
281 let cold = Arc::new(ColdStore::with_config(
282 path.to_str().unwrap(),
283 config.cold_cache_capacity_mb,
284 config.cold_flush_interval_ms,
285 config.cold_mode,
286 )?);
287
288 let hot = HotStore::with_config_and_eviction(
289 config.hot_cache_size_mb,
290 config.hot_cache_cleanup_interval_secs,
291 config.eviction_policy,
292 );
293
294 // Initialize write buffer if enabled
295 let write_buffer = if config.enable_write_buffering {
296 Some(WriteBuffer::new(
297 Arc::clone(&cold),
298 config.write_buffer_size,
299 config.write_buffer_flush_interval_ms,
300 ))
301 } else {
302 None
303 };
304
305 // Store auto_compact before moving config
306 let auto_compact = config.auto_compact;
307 let enable_wal = config.enable_wal;
308
309 let pubsub = crate::pubsub::PubSubSystem::new(10000);
310
311 // Initialize WAL if enabled and check for recovery
312 let (wal, wal_entries) = if enable_wal {
313 let wal_path = path.to_str().unwrap();
314 match WriteAheadLog::new(wal_path) {
315 Ok(mut wal_log) => {
316 let entries = wal_log.recover().unwrap_or_else(|_| Vec::new());
317 (Some(Arc::new(RwLock::new(wal_log))), entries)
318 }
319 Err(_) => (None, Vec::new()),
320 }
321 } else {
322 (None, Vec::new())
323 };
324
325 // Spawn background checkpoint task if WAL is enabled
326 let checkpoint_shutdown = if wal.is_some() {
327 let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel();
328 let cold_clone = Arc::clone(&cold);
329 let wal_clone = wal.clone();
330 let checkpoint_interval = config.checkpoint_interval_ms;
331
332 tokio::spawn(async move {
333 let mut interval =
334 tokio::time::interval(Duration::from_millis(checkpoint_interval));
335 loop {
336 tokio::select! {
337 _ = interval.tick() => {
338 // Checkpoint: flush cold storage
339 if let Err(e) = cold_clone.flush() {
340 eprintln!("Background checkpoint flush error: {}", e);
341 }
342 // Truncate WAL after successful flush
343 if let Some(ref wal) = wal_clone
344 && let Ok(mut wal_guard) = wal.write() {
345 let _ = wal_guard.truncate();
346 }
347 }
348 _ = shutdown_rx.recv() => {
349 // Final checkpoint before shutdown
350 let _ = cold_clone.flush();
351 if let Some(ref wal) = wal_clone
352 && let Ok(mut wal_guard) = wal.write() {
353 let _ = wal_guard.truncate();
354 }
355 break;
356 }
357 }
358 }
359 });
360
361 Some(shutdown_tx)
362 } else {
363 None
364 };
365
366 // Spawn background compaction task if auto_compact is enabled
367 let compaction_shutdown = if auto_compact {
368 let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel();
369 let cold_clone = Arc::clone(&cold);
370 let compact_interval = config.compact_interval_mins;
371
372 tokio::spawn(async move {
373 let mut interval =
374 tokio::time::interval(Duration::from_secs(compact_interval * 60));
375 loop {
376 tokio::select! {
377 _ = interval.tick() => {
378 if let Err(e) = cold_clone.compact() {
379 eprintln!("Background compaction error: {}", e);
380 }
381 }
382 _ = shutdown_rx.recv() => {
383 let _ = cold_clone.compact();
384 break;
385 }
386 }
387 }
388 });
389
390 Some(shutdown_tx)
391 } else {
392 None
393 };
394
395 // Initialize the database
396 let db = Self {
397 hot,
398 cold,
399 primary_indices: Arc::new(DashMap::new()),
400 secondary_indices: Arc::new(DashMap::new()),
401 indices_initialized: Arc::new(OnceCell::new()),
402 transaction_manager: crate::transaction::TransactionManager::new(),
403 indices: Arc::new(DashMap::new()),
404 schema_cache: Arc::new(DashMap::new()),
405 config,
406 write_buffer,
407 pubsub,
408 wal,
409 checkpoint_shutdown,
410 compaction_shutdown,
411 };
412
413 // Replay WAL entries if any
414 if !wal_entries.is_empty() {
415 db.replay_wal(wal_entries)?;
416 }
417
418 Ok(db)
419 }
420
421 // Lazy index initialization
422 pub async fn ensure_indices_initialized(&self) -> Result<()> {
423 self.indices_initialized
424 .get_or_init(|| async {
425 println!("Initializing indices...");
426 if let Err(e) = self.initialize_indices() {
427 eprintln!("Failed to initialize indices: {:?}", e);
428 }
429 println!("Indices initialized");
430
431 })
432 .await;
433 Ok(())
434 }
435
436 fn initialize_indices(&self) -> Result<()> {
437 for result in self.cold.scan() {
438 let (key, value) = result?;
439 let key_str = std::str::from_utf8(key.as_bytes())
440 .map_err(|_| AuroraError::InvalidKey("Invalid UTF-8".into()))?;
441
442 if let Some(collection_name) = key_str.split(':').next() {
443 self.index_value(collection_name, key_str, &value)?;
444 }
445 }
446 Ok(())
447 }
448
449 // Fast key-value operations with index support
450 /// Get a value by key (low-level key-value access)
451 ///
452 /// This is the low-level method. For document access, use `get_document()` instead.
453 /// Checks hot cache first, then falls back to cold storage for maximum performance.
454 ///
455 /// # Performance
456 /// - Hot cache hit: ~1M reads/sec (instant)
457 /// - Cold storage: ~500K reads/sec (disk I/O)
458 /// - Cache hit rate: typically 95%+ at scale
459 ///
460 /// # Examples
461 ///
462 /// ```
463 /// // Low-level key-value access
464 /// let data = db.get("users:12345")?;
465 /// if let Some(bytes) = data {
466 /// let doc: Document = serde_json::from_slice(&bytes)?;
467 /// println!("Found: {:?}", doc);
468 /// }
469 ///
470 /// // Better: use get_document() for documents
471 /// let user = db.get_document("users", "12345")?;
472 /// ```
473 pub fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
474 // Check hot cache first
475 if let Some(value) = self.hot.get(key) {
476 return Ok(Some(value));
477 }
478
479 // Check if key exists in primary index
480 if let Some(collection) = key.split(':').next()
481 && let Some(index) = self.primary_indices.get(collection)
482 && index.contains_key(key) {
483 // Key exists in index, fetch from cold storage
484 if let Some(value) = self.cold.get(key)? {
485 // Promote to hot cache for future fast access
486 self.hot.set(key.to_string(), value.clone(), None);
487 return Ok(Some(value));
488 }
489 }
490
491 // Fallback to cold storage
492 let value = self.cold.get(key)?;
493 if let Some(v) = &value {
494 self.hot.set(key.to_string(), v.clone(), None);
495 }
496 Ok(value)
497 }
498
499 /// Get value with zero-copy Arc reference (10-100x faster than get!)
500 /// Only checks hot cache - returns None if not cached
501 pub fn get_hot_ref(&self, key: &str) -> Option<Arc<Vec<u8>>> {
502 self.hot.get_ref(key)
503 }
504
505 /// Get cache statistics
506 ///
507 /// Returns detailed metrics about cache performance including hit/miss rates,
508 /// memory usage, and access patterns. Useful for monitoring, optimization,
509 /// and understanding database performance characteristics.
510 ///
511 /// # Returns
512 /// `CacheStats` struct containing:
513 /// - `hits`: Number of cache hits (data found in memory)
514 /// - `misses`: Number of cache misses (had to read from disk)
515 /// - `hit_rate`: Percentage of requests served from cache (0.0-1.0)
516 /// - `size`: Current number of entries in cache
517 /// - `capacity`: Maximum cache capacity
518 /// - `evictions`: Number of entries evicted due to capacity
519 ///
520 /// # Examples
521 ///
522 /// ```
523 /// use aurora_db::Aurora;
524 ///
525 /// let db = Aurora::open("mydb.db")?;
526 ///
527 /// // Check cache performance
528 /// let stats = db.get_cache_stats();
529 /// println!("Cache hit rate: {:.1}%", stats.hit_rate * 100.0);
530 /// println!("Cache size: {} / {} entries", stats.size, stats.capacity);
531 /// println!("Total hits: {}, misses: {}", stats.hits, stats.misses);
532 ///
533 /// // Monitor performance during operations
534 /// let before = db.get_cache_stats();
535 ///
536 /// // Perform many reads
537 /// for i in 0..1000 {
538 /// db.get_document("users", &format!("user-{}", i))?;
539 /// }
540 ///
541 /// let after = db.get_cache_stats();
542 /// let hit_rate = (after.hits - before.hits) as f64 / 1000.0;
543 /// println!("Read hit rate: {:.1}%", hit_rate * 100.0);
544 ///
545 /// // Performance tuning
546 /// let stats = db.get_cache_stats();
547 /// if stats.hit_rate < 0.80 {
548 /// println!("Low cache hit rate! Consider:");
549 /// println!("- Increasing cache size in config");
550 /// println!("- Prewarming cache with prewarm_cache()");
551 /// println!("- Reviewing query patterns");
552 /// }
553 ///
554 /// if stats.evictions > stats.size {
555 /// println!("High eviction rate! Cache may be too small.");
556 /// println!("Consider increasing cache capacity.");
557 /// }
558 ///
559 /// // Production monitoring
560 /// use std::time::Duration;
561 /// use std::thread;
562 ///
563 /// loop {
564 /// let stats = db.get_cache_stats();
565 ///
566 /// // Log to monitoring system
567 /// if stats.hit_rate < 0.90 {
568 /// eprintln!("Warning: Cache hit rate dropped to {:.1}%",
569 /// stats.hit_rate * 100.0);
570 /// }
571 ///
572 /// thread::sleep(Duration::from_secs(60));
573 /// }
574 /// ```
575 ///
576 /// # Typical Performance Metrics
577 /// - **Excellent**: 95%+ hit rate (most reads from memory)
578 /// - **Good**: 80-95% hit rate (acceptable performance)
579 /// - **Poor**: <80% hit rate (consider cache tuning)
580 ///
581 /// # See Also
582 /// - `prewarm_cache()` to improve hit rates by preloading data
583 /// - `Aurora::with_config()` to adjust cache capacity
584 pub fn get_cache_stats(&self) -> crate::storage::hot::CacheStats {
585 self.hot.get_stats()
586 }
587
588 // ============================================
589 // PubSub API - Real-time Change Notifications
590 // ============================================
591
592 /// Listen for real-time changes in a collection
593 ///
594 /// Returns a stream of change events (inserts, updates, deletes) that you can subscribe to.
595 /// Perfect for building reactive UIs, cache invalidation, audit logging, webhooks, and
596 /// data synchronization systems.
597 ///
598 /// # Performance
599 /// - Zero overhead when no listeners are active
600 /// - Events are broadcast to all listeners asynchronously
601 /// - Non-blocking - doesn't slow down write operations
602 /// - Multiple listeners can watch the same collection
603 ///
604 /// # Examples
605 ///
606 /// ```
607 /// use aurora_db::{Aurora, types::Value};
608 ///
609 /// let db = Aurora::open("mydb.db")?;
610 ///
611 /// // Basic listener
612 /// let mut listener = db.listen("users");
613 ///
614 /// tokio::spawn(async move {
615 /// while let Ok(event) = listener.recv().await {
616 /// match event.change_type {
617 /// ChangeType::Insert => println!("New user: {:?}", event.document),
618 /// ChangeType::Update => println!("Updated user: {:?}", event.document),
619 /// ChangeType::Delete => println!("Deleted user ID: {}", event.id),
620 /// }
621 /// }
622 /// });
623 ///
624 /// // Now any insert/update/delete will trigger the listener
625 /// db.insert_into("users", vec![("name", Value::String("Alice".into()))]).await?;
626 /// ```
627 ///
628 /// # Real-World Use Cases
629 ///
630 /// **Cache Invalidation:**
631 /// ```
632 /// use std::sync::Arc;
633 /// use tokio::sync::RwLock;
634 /// use std::collections::HashMap;
635 ///
636 /// let cache = Arc::new(RwLock::new(HashMap::new()));
637 /// let cache_clone = Arc::clone(&cache);
638 ///
639 /// let mut listener = db.listen("products");
640 ///
641 /// tokio::spawn(async move {
642 /// while let Ok(event) = listener.recv().await {
643 /// // Invalidate cache entry when product changes
644 /// cache_clone.write().await.remove(&event.id);
645 /// println!("Cache invalidated for product: {}", event.id);
646 /// }
647 /// });
648 /// ```
649 ///
650 /// **Webhook Notifications:**
651 /// ```
652 /// let mut listener = db.listen("orders");
653 ///
654 /// tokio::spawn(async move {
655 /// while let Ok(event) = listener.recv().await {
656 /// if event.change_type == ChangeType::Insert {
657 /// // Send webhook for new orders
658 /// send_webhook("https://api.example.com/webhooks/order", &event).await;
659 /// }
660 /// }
661 /// });
662 /// ```
663 ///
664 /// **Audit Logging:**
665 /// ```
666 /// let mut listener = db.listen("sensitive_data");
667 ///
668 /// tokio::spawn(async move {
669 /// while let Ok(event) = listener.recv().await {
670 /// // Log all changes to audit trail
671 /// db.insert_into("audit_log", vec![
672 /// ("collection", Value::String("sensitive_data".into())),
673 /// ("action", Value::String(format!("{:?}", event.change_type))),
674 /// ("document_id", Value::String(event.id.clone())),
675 /// ("timestamp", Value::String(chrono::Utc::now().to_rfc3339())),
676 /// ]).await?;
677 /// }
678 /// });
679 /// ```
680 ///
681 /// **Data Synchronization:**
682 /// ```
683 /// let mut listener = db.listen("users");
684 ///
685 /// tokio::spawn(async move {
686 /// while let Ok(event) = listener.recv().await {
687 /// // Sync changes to external system
688 /// match event.change_type {
689 /// ChangeType::Insert | ChangeType::Update => {
690 /// if let Some(doc) = event.document {
691 /// external_api.upsert_user(&doc).await?;
692 /// }
693 /// },
694 /// ChangeType::Delete => {
695 /// external_api.delete_user(&event.id).await?;
696 /// },
697 /// }
698 /// }
699 /// });
700 /// ```
701 ///
702 /// **Real-Time Notifications:**
703 /// ```
704 /// let mut listener = db.listen("messages");
705 ///
706 /// tokio::spawn(async move {
707 /// while let Ok(event) = listener.recv().await {
708 /// if event.change_type == ChangeType::Insert {
709 /// if let Some(msg) = event.document {
710 /// // Push notification to connected websockets
711 /// if let Some(recipient) = msg.data.get("recipient_id") {
712 /// websocket_manager.send_to_user(recipient, &msg).await;
713 /// }
714 /// }
715 /// }
716 /// }
717 /// });
718 /// ```
719 ///
720 /// **Filtered Listener:**
721 /// ```
722 /// use aurora_db::pubsub::EventFilter;
723 ///
724 /// // Only listen for inserts
725 /// let mut listener = db.listen("users")
726 /// .filter(EventFilter::ChangeType(ChangeType::Insert));
727 ///
728 /// // Only listen for documents with specific field value
729 /// let mut listener = db.listen("users")
730 /// .filter(EventFilter::FieldEquals("role".to_string(), Value::String("admin".into())));
731 /// ```
732 ///
733 /// # Important Notes
734 /// - Listener stays active until dropped
735 /// - Events are delivered in order
736 /// - Each listener has its own event stream
737 /// - Use filters to reduce unnecessary event processing
738 /// - Listeners don't affect write performance
739 ///
740 /// # See Also
741 /// - `listen_all()` to listen to all collections
742 /// - `ChangeListener::filter()` to filter events
743 /// - `query().watch()` for reactive queries with filtering
744 pub fn listen(&self, collection: impl Into<String>) -> crate::pubsub::ChangeListener {
745 self.pubsub.listen(collection)
746 }
747
748 /// Listen for all changes across all collections
749 ///
750 /// Returns a stream of change events for every insert, update, and delete
751 /// operation across the entire database. Useful for global audit logging,
752 /// replication, and monitoring systems.
753 ///
754 /// # Performance
755 /// - Same performance as single collection listener
756 /// - Filter events by collection in your handler
757 /// - Consider using `listen(collection)` if only watching specific collections
758 ///
759 /// # Examples
760 ///
761 /// ```
762 /// use aurora_db::Aurora;
763 ///
764 /// let db = Aurora::open("mydb.db")?;
765 ///
766 /// // Listen to everything
767 /// let mut listener = db.listen_all();
768 ///
769 /// tokio::spawn(async move {
770 /// while let Ok(event) = listener.recv().await {
771 /// println!("Change in {}: {:?}", event.collection, event.change_type);
772 /// }
773 /// });
774 /// ```
775 ///
776 /// # Real-World Use Cases
777 ///
778 /// **Global Audit Trail:**
779 /// ```
780 /// let mut listener = db.listen_all();
781 ///
782 /// tokio::spawn(async move {
783 /// while let Ok(event) = listener.recv().await {
784 /// // Log every database change
785 /// audit_logger.log(AuditEntry {
786 /// timestamp: chrono::Utc::now(),
787 /// collection: event.collection,
788 /// action: event.change_type,
789 /// document_id: event.id,
790 /// user_id: get_current_user_id(),
791 /// }).await;
792 /// }
793 /// });
794 /// ```
795 ///
796 /// **Database Replication:**
797 /// ```
798 /// let mut listener = db.listen_all();
799 ///
800 /// tokio::spawn(async move {
801 /// while let Ok(event) = listener.recv().await {
802 /// // Replicate to secondary database
803 /// replica_db.apply_change(event).await?;
804 /// }
805 /// });
806 /// ```
807 ///
808 /// **Change Data Capture (CDC):**
809 /// ```
810 /// let mut listener = db.listen_all();
811 ///
812 /// tokio::spawn(async move {
813 /// while let Ok(event) = listener.recv().await {
814 /// // Stream changes to Kafka/RabbitMQ
815 /// kafka_producer.send(
816 /// &format!("cdc.{}", event.collection),
817 /// serde_json::to_string(&event)?
818 /// ).await?;
819 /// }
820 /// });
821 /// ```
822 ///
823 /// **Monitoring & Metrics:**
824 /// ```
825 /// use std::sync::atomic::{AtomicUsize, Ordering};
826 ///
827 /// let write_counter = Arc::new(AtomicUsize::new(0));
828 /// let counter_clone = Arc::clone(&write_counter);
829 ///
830 /// let mut listener = db.listen_all();
831 ///
832 /// tokio::spawn(async move {
833 /// while let Ok(_event) = listener.recv().await {
834 /// counter_clone.fetch_add(1, Ordering::Relaxed);
835 /// }
836 /// });
837 ///
838 /// // Report metrics every 60 seconds
839 /// tokio::spawn(async move {
840 /// loop {
841 /// tokio::time::sleep(Duration::from_secs(60)).await;
842 /// let count = write_counter.swap(0, Ordering::Relaxed);
843 /// println!("Writes per minute: {}", count);
844 /// }
845 /// });
846 /// ```
847 ///
848 /// **Selective Processing:**
849 /// ```
850 /// let mut listener = db.listen_all();
851 ///
852 /// tokio::spawn(async move {
853 /// while let Ok(event) = listener.recv().await {
854 /// // Handle different collections differently
855 /// match event.collection.as_str() {
856 /// "users" => handle_user_change(event).await,
857 /// "orders" => handle_order_change(event).await,
858 /// "payments" => handle_payment_change(event).await,
859 /// _ => {} // Ignore others
860 /// }
861 /// }
862 /// });
863 /// ```
864 ///
865 /// # When to Use
866 /// - Global audit logging
867 /// - Database replication
868 /// - Change data capture (CDC)
869 /// - Monitoring and metrics
870 /// - Event sourcing systems
871 ///
872 /// # When NOT to Use
873 /// - Only need to watch 1-2 collections → Use `listen(collection)` instead
874 /// - High write volume with selective interest → Use collection-specific listeners
875 /// - Need complex filtering → Use `query().watch()` instead
876 ///
877 /// # See Also
878 /// - `listen()` for single collection listening
879 /// - `listener_count()` to check active listeners
880 /// - `query().watch()` for filtered reactive queries
881 pub fn listen_all(&self) -> crate::pubsub::ChangeListener {
882 self.pubsub.listen_all()
883 }
884
885 /// Get the number of active listeners for a collection
886 pub fn listener_count(&self, collection: &str) -> usize {
887 self.pubsub.listener_count(collection)
888 }
889
890 /// Get total number of active listeners
891 pub fn total_listeners(&self) -> usize {
892 self.pubsub.total_listeners()
893 }
894
895 /// Flushes all buffered writes to disk to ensure durability.
896 ///
897 /// This method forces all pending writes from:
898 /// - Write buffer (if enabled)
899 /// - Cold storage internal buffers
900 /// - Write-ahead log (if enabled)
901 ///
902 /// Call this when you need to ensure data persistence before
903 /// a critical operation or shutdown. After flush() completes,
904 /// all data is guaranteed to be on disk even if power fails.
905 ///
906 /// # Performance
907 /// - Flush time: ~10-50ms depending on buffered data
908 /// - Triggers OS-level fsync() for durability guarantee
909 /// - Truncates WAL after successful flush
910 /// - Not needed for every write (WAL provides durability)
911 ///
912 /// # Examples
913 ///
914 /// ```
915 /// use aurora_db::Aurora;
916 ///
917 /// let db = Aurora::open("mydb.db")?;
918 ///
919 /// // Basic flush after critical write
920 /// db.insert_into("users", data).await?;
921 /// db.flush()?; // Ensure data is persisted to disk
922 ///
923 /// // Graceful shutdown pattern
924 /// fn shutdown(db: &Aurora) -> Result<()> {
925 /// println!("Flushing pending writes...");
926 /// db.flush()?;
927 /// println!("Shutdown complete - all data persisted");
928 /// Ok(())
929 /// }
930 ///
931 /// // Periodic checkpoint pattern
932 /// use std::time::Duration;
933 /// use std::thread;
934 ///
935 /// let db = db.clone();
936 /// thread::spawn(move || {
937 /// loop {
938 /// thread::sleep(Duration::from_secs(60));
939 /// if let Err(e) = db.flush() {
940 /// eprintln!("Flush error: {}", e);
941 /// } else {
942 /// println!("Checkpoint: data flushed to disk");
943 /// }
944 /// }
945 /// });
946 ///
947 /// // Critical transaction pattern
948 /// let tx_id = db.begin_transaction();
949 ///
950 /// // Multiple operations
951 /// db.insert_into("orders", order_data).await?;
952 /// db.update_document("inventory", product_id, updates).await?;
953 /// db.insert_into("audit_log", audit_data).await?;
954 ///
955 /// // Commit and flush immediately
956 /// db.commit_transaction(tx_id)?;
957 /// db.flush()?; // Critical: ensure transaction is on disk
958 ///
959 /// // Backup preparation
960 /// println!("Preparing backup...");
961 /// db.flush()?; // Ensure all data is written
962 /// std::fs::copy("mydb.db", "backup.db")?;
963 /// println!("Backup complete");
964 /// ```
965 ///
966 /// # When to Use
967 /// - Before graceful shutdown
968 /// - After critical transactions
969 /// - Before creating backups
970 /// - Periodic checkpoints (every 30-60 seconds)
971 /// - Before risky operations
972 ///
973 /// # When NOT to Use
974 /// - After every single write (too slow, WAL provides durability)
975 /// - In high-throughput loops (batch instead)
976 /// - When durability mode is already Immediate
977 ///
978 /// # Important Notes
979 /// - WAL provides durability even without explicit flush()
980 /// - flush() adds latency (~10-50ms) so use strategically
981 /// - Automatic flush happens during graceful shutdown
982 /// - After flush(), WAL is truncated (data is in main storage)
983 ///
984 /// # See Also
985 /// - `Aurora::with_config()` to set durability mode
986 /// - WAL (Write-Ahead Log) provides durability without explicit flushes
987 pub fn flush(&self) -> Result<()> {
988 // Flush write buffer if present
989 if let Some(ref write_buffer) = self.write_buffer {
990 write_buffer.flush()?;
991 }
992
993 // Flush cold storage
994 self.cold.flush()?;
995
996 // Truncate WAL after successful flush (data is now in cold storage)
997 if let Some(ref wal) = self.wal
998 && let Ok(mut wal_lock) = wal.write() {
999 wal_lock.truncate()?;
1000 }
1001
1002 Ok(())
1003 }
1004
1005 /// Store a key-value pair (low-level storage)
1006 ///
1007 /// This is the low-level method. For documents, use `insert_into()` instead.
1008 /// Writes are buffered and batched for performance.
1009 ///
1010 /// # Arguments
1011 /// * `key` - Unique key (format: "collection:id" for documents)
1012 /// * `value` - Raw bytes to store
1013 /// * `ttl` - Optional time-to-live (None = permanent)
1014 ///
1015 /// # Performance
1016 /// - Buffered writes: ~15-30K docs/sec
1017 /// - Batching improves throughput significantly
1018 /// - Call `flush()` to ensure data is persisted
1019 ///
1020 /// # Examples
1021 ///
1022 /// ```
1023 /// use std::time::Duration;
1024 ///
1025 /// // Permanent storage
1026 /// let data = serde_json::to_vec(&my_struct)?;
1027 /// db.put("mykey".to_string(), data, None)?;
1028 ///
1029 /// // With TTL (expires after 1 hour)
1030 /// db.put("session:abc".to_string(), session_data, Some(Duration::from_secs(3600)))?;
1031 ///
1032 /// // Better: use insert_into() for documents
1033 /// db.insert_into("users", vec![("name", Value::String("Alice".into()))])?;
1034 /// ```
1035 pub fn put(&self, key: String, value: Vec<u8>, ttl: Option<Duration>) -> Result<()> {
1036 const MAX_BLOB_SIZE: usize = 50 * 1024 * 1024;
1037
1038 if value.len() > MAX_BLOB_SIZE {
1039 return Err(AuroraError::InvalidOperation(format!(
1040 "Blob size {} exceeds maximum allowed size of {}MB",
1041 value.len() / (1024 * 1024),
1042 MAX_BLOB_SIZE / (1024 * 1024)
1043 )));
1044 }
1045
1046 if let Some(ref wal) = self.wal
1047 && self.config.durability_mode != DurabilityMode::None {
1048 wal.write()
1049 .unwrap()
1050 .append(Operation::Put, &key, Some(&value))?;
1051 }
1052
1053 if let Some(ref write_buffer) = self.write_buffer {
1054 write_buffer.write(key.clone(), value.clone())?;
1055 } else {
1056 self.cold.set(key.clone(), value.clone())?;
1057 }
1058
1059 self.hot.set(key.clone(), value.clone(), ttl);
1060
1061 if let Some(collection_name) = key.split(':').next()
1062 && !collection_name.starts_with('_') {
1063 self.index_value(collection_name, &key, &value)?;
1064 }
1065
1066 Ok(())
1067 }
1068
1069 /// Replay WAL entries to recover from crash
1070 fn replay_wal(&self, entries: Vec<crate::wal::LogEntry>) -> Result<()> {
1071 for entry in entries {
1072 match entry.operation {
1073 Operation::Put => {
1074 if let Some(value) = entry.value {
1075 // Write directly to cold storage (skip WAL, already logged)
1076 self.cold.set(entry.key.clone(), value.clone())?;
1077
1078 // Update hot cache
1079 self.hot.set(entry.key.clone(), value.clone(), None);
1080
1081 // Rebuild indices
1082 if let Some(collection) = entry.key.split(':').next()
1083 && !collection.starts_with('_') {
1084 self.index_value(collection, &entry.key, &value)?;
1085 }
1086 }
1087 }
1088 Operation::Delete => {
1089 self.cold.delete(&entry.key)?;
1090 self.hot.delete(&entry.key);
1091 // TODO: Remove from indices
1092 }
1093 Operation::BeginTx | Operation::CommitTx | Operation::RollbackTx => {
1094 // Transaction boundaries - future implementation
1095 }
1096 }
1097 }
1098
1099 // Flush after replay and truncate WAL
1100 self.cold.flush()?;
1101 if let Some(ref wal) = self.wal {
1102 wal.write().unwrap().truncate()?;
1103 }
1104
1105 Ok(())
1106 }
1107
1108 fn index_value(&self, collection: &str, key: &str, value: &[u8]) -> Result<()> {
1109 // Update primary index with metadata only (not the full value)
1110 let location = DiskLocation::new(value.len());
1111 self.primary_indices
1112 .entry(collection.to_string())
1113 .or_default()
1114 .insert(key.to_string(), location);
1115
1116 // Try to get schema from cache first, otherwise load and cache it
1117 let collection_obj = match self.schema_cache.get(collection) {
1118 Some(cached_schema) => Arc::clone(cached_schema.value()),
1119 None => {
1120 // Schema not in cache - load it
1121 let collection_key = format!("_collection:{}", collection);
1122 let schema_data = match self.get(&collection_key)? {
1123 Some(data) => data,
1124 None => return Ok(()), // No schema = no secondary indices
1125 };
1126
1127 let obj: Collection = match serde_json::from_slice(&schema_data) {
1128 Ok(obj) => obj,
1129 Err(_) => return Ok(()), // Invalid schema = skip indexing
1130 };
1131
1132 // Cache the schema for future use
1133 let arc_obj = Arc::new(obj);
1134 self.schema_cache
1135 .insert(collection.to_string(), Arc::clone(&arc_obj));
1136 arc_obj
1137 }
1138 };
1139
1140 // Build list of fields that should be indexed (unique or explicitly indexed)
1141 let indexed_fields: Vec<String> = collection_obj
1142 .fields
1143 .iter()
1144 .filter(|(_, def)| def.indexed || def.unique)
1145 .map(|(name, _)| name.clone())
1146 .collect();
1147
1148 // If no fields need indexing, we're done
1149 if indexed_fields.is_empty() {
1150 return Ok(());
1151 }
1152
1153 // Update secondary indices - ONLY for indexed/unique fields
1154 if let Ok(doc) = serde_json::from_slice::<Document>(value) {
1155 for (field, field_value) in doc.data {
1156 // CRITICAL FIX: Skip fields that aren't indexed
1157 if !indexed_fields.contains(&field) {
1158 continue;
1159 }
1160
1161 // Use consistent string formatting for indexing
1162 let value_str = match &field_value {
1163 Value::String(s) => s.clone(),
1164 _ => field_value.to_string(),
1165 };
1166
1167 let index_key = format!("{}:{}", collection, field);
1168 let secondary_index = self
1169 .secondary_indices
1170 .entry(index_key)
1171 .or_default();
1172
1173 // Check if we're at the index limit
1174 let max_entries = self.config.max_index_entries_per_field;
1175
1176 secondary_index
1177 .entry(value_str)
1178 .and_modify(|doc_ids| {
1179 // Only add if we haven't exceeded the limit per value
1180 if doc_ids.len() < max_entries {
1181 doc_ids.push(key.to_string());
1182 }
1183 })
1184 .or_insert_with(|| vec![key.to_string()]);
1185 }
1186 }
1187 Ok(())
1188 }
1189
1190 /// Scan collection with filter and early termination support
1191 /// Used by QueryBuilder for optimized queries with LIMIT
1192 pub fn scan_and_filter<F>(&self, collection: &str, filter: F, limit: Option<usize>)
1193 -> Result<Vec<Document>>
1194 where
1195 F: Fn(&Document) -> bool,
1196 {
1197 let mut documents = Vec::new();
1198
1199 if let Some(index) = self.primary_indices.get(collection) {
1200 for entry in index.iter() {
1201 // Early termination
1202 if let Some(max) = limit {
1203 if documents.len() >= max {
1204 break;
1205 }
1206 }
1207
1208 let key = entry.key();
1209 if let Some(data) = self.get(key)? {
1210 if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
1211 if filter(&doc) {
1212 documents.push(doc);
1213 }
1214 }
1215 }
1216 }
1217 } else {
1218 // Fallback
1219 documents = self.scan_collection(collection)?;
1220 documents.retain(|doc| filter(doc));
1221 if let Some(max) = limit {
1222 documents.truncate(max);
1223 }
1224 }
1225
1226 Ok(documents)
1227 }
1228
1229 /// Smart collection scan that uses the primary index as a key directory
1230 /// Avoids forced flushes and leverages hot cache for better performance
1231 fn scan_collection(&self, collection: &str) -> Result<Vec<Document>> {
1232 let mut documents = Vec::new();
1233
1234 // Use the primary index as a "key directory" - it contains all document keys
1235 if let Some(index) = self.primary_indices.get(collection) {
1236 // Iterate through all keys in the primary index (fast, in-memory)
1237 for entry in index.iter() {
1238 let key = entry.key();
1239
1240 // Fetch each document using get() which checks:
1241 // 1. Hot cache first (instant)
1242 // 2. Cold storage if not cached (disk I/O only for uncached items)
1243 if let Some(data) = self.get(key)? {
1244 if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
1245 documents.push(doc);
1246 }
1247 }
1248 }
1249 } else {
1250 // Fallback: scan from cold storage if primary index not yet initialized
1251 let prefix = format!("{}:", collection);
1252 for result in self.cold.scan_prefix(&prefix) {
1253 if let Ok((_key, value)) = result {
1254 if let Ok(doc) = serde_json::from_slice::<Document>(&value) {
1255 documents.push(doc);
1256 }
1257 }
1258 }
1259 }
1260
1261 Ok(documents)
1262 }
1263
1264 // Restore missing methods
1265 pub async fn put_blob(&self, key: String, file_path: &Path) -> Result<()> {
1266 const MAX_FILE_SIZE: usize = 50 * 1024 * 1024; // 50MB limit
1267
1268 // Get file metadata to check size before reading
1269 let metadata = tokio::fs::metadata(file_path).await?;
1270 let file_size = metadata.len() as usize;
1271
1272 if file_size > MAX_FILE_SIZE {
1273 return Err(AuroraError::InvalidOperation(format!(
1274 "File size {} MB exceeds maximum allowed size of {} MB",
1275 file_size / (1024 * 1024),
1276 MAX_FILE_SIZE / (1024 * 1024)
1277 )));
1278 }
1279
1280 let mut file = File::open(file_path).await?;
1281 let mut buffer = Vec::new();
1282 file.read_to_end(&mut buffer).await?;
1283
1284 // Add BLOB: prefix to mark this as blob data
1285 let mut blob_data = Vec::with_capacity(5 + buffer.len());
1286 blob_data.extend_from_slice(b"BLOB:");
1287 blob_data.extend_from_slice(&buffer);
1288
1289 self.put(key, blob_data, None)
1290 }
1291
1292 /// Create a new collection with schema definition
1293 ///
1294 /// Collections are like tables in SQL - they define the structure of your documents.
1295 /// The third boolean parameter indicates if the field should be indexed for fast lookups.
1296 ///
1297 /// # Arguments
1298 /// * `name` - Collection name
1299 /// * `fields` - Vector of (field_name, field_type, indexed) tuples
1300 /// - Field name (accepts both &str and String)
1301 /// - Field type (String, Int, Float, Bool, etc.)
1302 /// - Indexed: true for fast lookups, false for no index
1303 ///
1304 /// # Performance
1305 /// - Indexed fields: Fast equality queries (O(1) lookup)
1306 /// - Non-indexed fields: Full scan required for queries
1307 /// - Unique fields are automatically indexed
1308 ///
1309 /// # Examples
1310 ///
1311 /// ```
1312 /// use aurora_db::{Aurora, types::FieldType};
1313 ///
1314 /// let db = Aurora::open("mydb.db")?;
1315 ///
1316 /// // Create a users collection
1317 /// db.new_collection("users", vec![
1318 /// ("name", FieldType::String, false), // Not indexed
1319 /// ("email", FieldType::String, true), // Indexed - fast lookups
1320 /// ("age", FieldType::Int, false),
1321 /// ("active", FieldType::Bool, true), // Indexed
1322 /// ("score", FieldType::Float, false),
1323 /// ])?;
1324 ///
1325 /// // Idempotent - calling again is safe
1326 /// db.new_collection("users", vec![/* ... */])?; // OK!
1327 /// ```
1328 pub fn new_collection<S: Into<String>>(
1329 &self,
1330 name: &str,
1331 fields: Vec<(S, FieldType, bool)>,
1332 ) -> Result<()> {
1333 let collection_key = format!("_collection:{}", name);
1334
1335 // Check if collection already exists - if so, just return Ok (idempotent)
1336 if self.get(&collection_key)?.is_some() {
1337 return Ok(());
1338 }
1339
1340 // Create field definitions
1341 let mut field_definitions = HashMap::new();
1342 for (field_name, field_type, unique) in fields {
1343 field_definitions.insert(
1344 field_name.into(),
1345 FieldDefinition {
1346 field_type,
1347 unique,
1348 indexed: unique, // Auto-index unique fields
1349 },
1350 );
1351 }
1352
1353 let collection = Collection {
1354 name: name.to_string(),
1355 fields: field_definitions,
1356 // REMOVED: unique_fields is now derived from fields
1357 };
1358
1359 let collection_data = serde_json::to_vec(&collection)?;
1360 self.put(collection_key, collection_data, None)?;
1361
1362 // Invalidate schema cache since we just created/updated the collection schema
1363 self.schema_cache.remove(name);
1364
1365 Ok(())
1366 }
1367
1368 /// Insert a document into a collection
1369 ///
1370 /// Automatically generates a UUID for the document and validates against
1371 /// collection schema and unique constraints. Returns the generated document ID.
1372 ///
1373 /// # Performance
1374 /// - Single insert: ~15,000 docs/sec
1375 /// - Bulk insert: Use `batch_insert()` for 10+ documents (~50,000 docs/sec)
1376 /// - Triggers PubSub events for real-time listeners
1377 ///
1378 /// # Arguments
1379 /// * `collection` - Name of the collection to insert into
1380 /// * `data` - Document fields and values to insert
1381 ///
1382 /// # Returns
1383 /// The auto-generated ID of the inserted document or an error
1384 ///
1385 /// # Errors
1386 /// - `CollectionNotFound`: Collection doesn't exist
1387 /// - `ValidationError`: Data violates schema or unique constraints
1388 /// - `SerializationError`: Invalid data format
1389 ///
1390 /// # Examples
1391 ///
1392 /// ```
1393 /// use aurora_db::{Aurora, types::Value};
1394 ///
1395 /// let db = Aurora::open("mydb.db")?;
1396 ///
1397 /// // Basic insertion
1398 /// let user_id = db.insert_into("users", vec![
1399 /// ("name", Value::String("Alice Smith".to_string())),
1400 /// ("email", Value::String("alice@example.com".to_string())),
1401 /// ("age", Value::Int(28)),
1402 /// ("active", Value::Bool(true)),
1403 /// ]).await?;
1404 ///
1405 /// println!("Created user with ID: {}", user_id);
1406 ///
1407 /// // Inserting with nested data
1408 /// let order_id = db.insert_into("orders", vec![
1409 /// ("user_id", Value::String(user_id.clone())),
1410 /// ("total", Value::Float(99.99)),
1411 /// ("status", Value::String("pending".to_string())),
1412 /// ("items", Value::Array(vec![
1413 /// Value::String("item-123".to_string()),
1414 /// Value::String("item-456".to_string()),
1415 /// ])),
1416 /// ]).await?;
1417 ///
1418 /// // Error handling - unique constraint violation
1419 /// match db.insert_into("users", vec![
1420 /// ("email", Value::String("alice@example.com".to_string())), // Duplicate!
1421 /// ("name", Value::String("Alice Clone".to_string())),
1422 /// ]).await {
1423 /// Ok(id) => println!("Inserted: {}", id),
1424 /// Err(e) => println!("Failed: {} (email already exists)", e),
1425 /// }
1426 ///
1427 /// // For bulk inserts (10+ documents), use batch_insert() instead
1428 /// let users = vec![
1429 /// HashMap::from([
1430 /// ("name".to_string(), Value::String("Bob".to_string())),
1431 /// ("email".to_string(), Value::String("bob@example.com".to_string())),
1432 /// ]),
1433 /// HashMap::from([
1434 /// ("name".to_string(), Value::String("Carol".to_string())),
1435 /// ("email".to_string(), Value::String("carol@example.com".to_string())),
1436 /// ]),
1437 /// // ... more documents
1438 /// ];
1439 /// let ids = db.batch_insert("users", users).await?; // 3x faster!
1440 /// println!("Inserted {} users", ids.len());
1441 /// ```
1442 pub async fn insert_into(&self, collection: &str, data: Vec<(&str, Value)>) -> Result<String> {
1443 // Convert Vec<(&str, Value)> to HashMap<String, Value>
1444 let data_map: HashMap<String, Value> =
1445 data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
1446
1447 // Validate unique constraints before inserting
1448 self.validate_unique_constraints(collection, &data_map)
1449 .await?;
1450
1451 let doc_id = Uuid::new_v4().to_string();
1452 let document = Document {
1453 id: doc_id.clone(),
1454 data: data_map,
1455 };
1456
1457 self.put(
1458 format!("{}:{}", collection, doc_id),
1459 serde_json::to_vec(&document)?,
1460 None,
1461 )?;
1462
1463 // Publish insert event
1464 let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
1465 let _ = self.pubsub.publish(event);
1466
1467 Ok(doc_id)
1468 }
1469
1470 pub async fn insert_map(
1471 &self,
1472 collection: &str,
1473 data: HashMap<String, Value>,
1474 ) -> Result<String> {
1475 // Validate unique constraints before inserting
1476 self.validate_unique_constraints(collection, &data).await?;
1477
1478 let doc_id = Uuid::new_v4().to_string();
1479 let document = Document {
1480 id: doc_id.clone(),
1481 data,
1482 };
1483
1484 self.put(
1485 format!("{}:{}", collection, doc_id),
1486 serde_json::to_vec(&document)?,
1487 None,
1488 )?;
1489
1490 // Publish insert event
1491 let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
1492 let _ = self.pubsub.publish(event);
1493
1494 Ok(doc_id)
1495 }
1496
1497 /// Batch insert multiple documents with optimized write path
1498 ///
1499 /// Inserts multiple documents in a single optimized operation, bypassing
1500 /// the write buffer for better performance. Ideal for bulk data loading,
1501 /// migrations, or initial database seeding. 3x faster than individual inserts.
1502 ///
1503 /// # Performance
1504 /// - Insert speed: ~50,000 docs/sec (vs ~15,000 for single inserts)
1505 /// - Batch writes to WAL and storage
1506 /// - Validates all unique constraints
1507 /// - Use for 10+ documents minimum
1508 ///
1509 /// # Arguments
1510 /// * `collection` - Name of the collection to insert into
1511 /// * `documents` - Vector of document data as HashMaps
1512 ///
1513 /// # Returns
1514 /// Vector of auto-generated document IDs or an error
1515 ///
1516 /// # Examples
1517 ///
1518 /// ```
1519 /// use aurora_db::{Aurora, types::Value};
1520 /// use std::collections::HashMap;
1521 ///
1522 /// let db = Aurora::open("mydb.db")?;
1523 ///
1524 /// // Bulk user import
1525 /// let users = vec![
1526 /// HashMap::from([
1527 /// ("name".to_string(), Value::String("Alice".into())),
1528 /// ("email".to_string(), Value::String("alice@example.com".into())),
1529 /// ("age".to_string(), Value::Int(28)),
1530 /// ]),
1531 /// HashMap::from([
1532 /// ("name".to_string(), Value::String("Bob".into())),
1533 /// ("email".to_string(), Value::String("bob@example.com".into())),
1534 /// ("age".to_string(), Value::Int(32)),
1535 /// ]),
1536 /// HashMap::from([
1537 /// ("name".to_string(), Value::String("Carol".into())),
1538 /// ("email".to_string(), Value::String("carol@example.com".into())),
1539 /// ("age".to_string(), Value::Int(25)),
1540 /// ]),
1541 /// ];
1542 ///
1543 /// let ids = db.batch_insert("users", users).await?;
1544 /// println!("Inserted {} users", ids.len());
1545 ///
1546 /// // Seeding test data
1547 /// let test_products: Vec<HashMap<String, Value>> = (0..1000)
1548 /// .map(|i| HashMap::from([
1549 /// ("sku".to_string(), Value::String(format!("PROD-{:04}", i))),
1550 /// ("price".to_string(), Value::Float(9.99 + i as f64)),
1551 /// ("stock".to_string(), Value::Int(100)),
1552 /// ]))
1553 /// .collect();
1554 ///
1555 /// let ids = db.batch_insert("products", test_products).await?;
1556 /// // Much faster than 1000 individual insert_into() calls!
1557 ///
1558 /// // Migration from CSV data
1559 /// let mut csv_reader = csv::Reader::from_path("data.csv")?;
1560 /// let mut batch = Vec::new();
1561 ///
1562 /// for result in csv_reader.records() {
1563 /// let record = result?;
1564 /// let doc = HashMap::from([
1565 /// ("field1".to_string(), Value::String(record[0].to_string())),
1566 /// ("field2".to_string(), Value::String(record[1].to_string())),
1567 /// ]);
1568 /// batch.push(doc);
1569 ///
1570 /// // Insert in batches of 1000
1571 /// if batch.len() >= 1000 {
1572 /// db.batch_insert("imported_data", batch.clone()).await?;
1573 /// batch.clear();
1574 /// }
1575 /// }
1576 ///
1577 /// // Insert remaining
1578 /// if !batch.is_empty() {
1579 /// db.batch_insert("imported_data", batch).await?;
1580 /// }
1581 /// ```
1582 ///
1583 /// # Errors
1584 /// - `ValidationError`: Unique constraint violation on any document
1585 /// - `CollectionNotFound`: Collection doesn't exist
1586 /// - `IoError`: Storage write failure
1587 ///
1588 /// # Important Notes
1589 /// - All inserts are atomic - if one fails, none are inserted
1590 /// - UUIDs are auto-generated for all documents
1591 /// - PubSub events are published for each insert
1592 /// - For 10+ documents, this is 3x faster than individual inserts
1593 /// - For < 10 documents, use `insert_into()` instead
1594 ///
1595 /// # See Also
1596 /// - `insert_into()` for single document inserts
1597 /// - `import_from_json()` for file-based bulk imports
1598 /// - `batch_write()` for low-level batch operations
1599 pub async fn batch_insert(
1600 &self,
1601 collection: &str,
1602 documents: Vec<HashMap<String, Value>>,
1603 ) -> Result<Vec<String>> {
1604 let mut doc_ids = Vec::with_capacity(documents.len());
1605 let mut pairs = Vec::with_capacity(documents.len());
1606
1607 // Prepare all documents
1608 for data in documents {
1609 // Validate unique constraints
1610 self.validate_unique_constraints(collection, &data).await?;
1611
1612 let doc_id = Uuid::new_v4().to_string();
1613 let document = Document {
1614 id: doc_id.clone(),
1615 data,
1616 };
1617
1618 let key = format!("{}:{}", collection, doc_id);
1619 let value = serde_json::to_vec(&document)?;
1620
1621 pairs.push((key, value));
1622 doc_ids.push(doc_id);
1623 }
1624
1625 // Write to WAL in batch (if enabled)
1626 if let Some(ref wal) = self.wal
1627 && self.config.durability_mode != DurabilityMode::None {
1628 let mut wal_lock = wal.write().unwrap();
1629 for (key, value) in &pairs {
1630 wal_lock.append(Operation::Put, key, Some(value))?;
1631 }
1632 }
1633
1634 // Bypass write buffer - go directly to cold storage batch API
1635 self.cold.batch_set(pairs.clone())?;
1636
1637 // Note: Durability is handled by background checkpoint process
1638
1639 // Update hot cache and indices
1640 for (key, value) in pairs {
1641 self.hot.set(key.clone(), value.clone(), None);
1642
1643 if let Some(collection_name) = key.split(':').next()
1644 && !collection_name.starts_with('_') {
1645 self.index_value(collection_name, &key, &value)?;
1646 }
1647 }
1648
1649 // Publish events
1650 for doc_id in &doc_ids {
1651 if let Ok(Some(doc)) = self.get_document(collection, doc_id) {
1652 let event = crate::pubsub::ChangeEvent::insert(collection, doc_id, doc);
1653 let _ = self.pubsub.publish(event);
1654 }
1655 }
1656
1657 Ok(doc_ids)
1658 }
1659
1660 /// Update a document by ID
1661 ///
1662 /// # Arguments
1663 /// * `collection` - Collection name
1664 /// * `doc_id` - Document ID to update
1665 /// * `data` - New field values to set
1666 ///
1667 /// # Returns
1668 /// Ok(()) on success, or an error if the document doesn't exist
1669 ///
1670 /// # Examples
1671 ///
1672 /// ```
1673 /// db.update_document("users", &user_id, vec![
1674 /// ("status", Value::String("active".to_string())),
1675 /// ("last_login", Value::String(chrono::Utc::now().to_rfc3339())),
1676 /// ]).await?;
1677 /// ```
1678 pub async fn update_document(
1679 &self,
1680 collection: &str,
1681 doc_id: &str,
1682 updates: Vec<(&str, Value)>,
1683 ) -> Result<()> {
1684 // Get existing document
1685 let mut document = self
1686 .get_document(collection, doc_id)?
1687 .ok_or_else(|| AuroraError::NotFound(format!("Document not found: {}", doc_id)))?;
1688
1689 // Store old document for event
1690 let old_document = document.clone();
1691
1692 // Apply updates
1693 for (field, value) in updates {
1694 document.data.insert(field.to_string(), value);
1695 }
1696
1697 // Validate unique constraints after update (excluding current document)
1698 self.validate_unique_constraints_excluding(collection, &document.data, doc_id)
1699 .await?;
1700
1701 // Save updated document
1702 self.put(
1703 format!("{}:{}", collection, doc_id),
1704 serde_json::to_vec(&document)?,
1705 None,
1706 )?;
1707
1708 // Publish update event
1709 let event =
1710 crate::pubsub::ChangeEvent::update(collection, doc_id, old_document, document.clone());
1711 let _ = self.pubsub.publish(event);
1712
1713 Ok(())
1714 }
1715
1716 pub async fn get_all_collection(&self, collection: &str) -> Result<Vec<Document>> {
1717 self.ensure_indices_initialized().await?;
1718 self.scan_collection(collection)
1719 }
1720
1721 pub fn get_data_by_pattern(&self, pattern: &str) -> Result<Vec<(String, DataInfo)>> {
1722 let mut data = Vec::new();
1723
1724 // Scan from cold storage instead of primary index
1725 for result in self.cold.scan() {
1726 if let Ok((key, value)) = result {
1727 if key.contains(pattern) {
1728 let info = if value.starts_with(b"BLOB:") {
1729 DataInfo::Blob { size: value.len() }
1730 } else {
1731 DataInfo::Data {
1732 size: value.len(),
1733 preview: String::from_utf8_lossy(&value[..value.len().min(50)])
1734 .into_owned(),
1735 }
1736 };
1737
1738 data.push((key.clone(), info));
1739 }
1740 }
1741 }
1742
1743 Ok(data)
1744 }
1745
1746 /// Begin a transaction
1747 ///
1748 /// All operations after beginning a transaction will be part of the transaction
1749 /// until either commit_transaction() or rollback_transaction() is called.
1750 ///
1751 /// # Returns
1752 /// Success or an error (e.g., if a transaction is already in progress)
1753 ///
1754 /// # Examples
1755 ///
1756 /// ```
1757 /// // Start a transaction for atomic operations
1758 /// db.begin_transaction()?;
1759 ///
1760 /// // Perform multiple operations
1761 /// db.insert_into("accounts", vec![("user_id", Value::String(user_id)), ("balance", Value::Float(100.0))])?;
1762 /// db.insert_into("audit_log", vec![("action", Value::String("account_created".to_string()))])?;
1763 ///
1764 /// // Commit all changes or roll back if there's an error
1765 /// if all_ok {
1766 /// db.commit_transaction()?;
1767 /// } else {
1768 /// db.rollback_transaction()?;
1769 /// }
1770 /// ```
1771 /// Begin a new transaction for atomic operations
1772 ///
1773 /// Transactions ensure all-or-nothing execution: either all operations succeed,
1774 /// or none of them are applied. Perfect for maintaining data consistency.
1775 ///
1776 /// # Examples
1777 ///
1778 /// ```
1779 /// use aurora_db::{Aurora, types::Value};
1780 ///
1781 /// let db = Aurora::open("mydb.db")?;
1782 ///
1783 /// // Start transaction
1784 /// let tx_id = db.begin_transaction();
1785 ///
1786 /// // Perform multiple operations
1787 /// db.insert_into("accounts", vec![
1788 /// ("user_id", Value::String("alice".into())),
1789 /// ("balance", Value::Int(1000)),
1790 /// ]).await?;
1791 ///
1792 /// db.insert_into("accounts", vec![
1793 /// ("user_id", Value::String("bob".into())),
1794 /// ("balance", Value::Int(500)),
1795 /// ])).await?;
1796 ///
1797 /// // Commit if all succeeded
1798 /// db.commit_transaction(tx_id)?;
1799 ///
1800 /// // Or rollback on error
1801 /// // db.rollback_transaction(tx_id)?;
1802 /// ```
1803 pub fn begin_transaction(&self) -> crate::transaction::TransactionId {
1804 let buffer = self.transaction_manager.begin();
1805 buffer.id
1806 }
1807
1808 /// Commit a transaction, making all changes permanent
1809 ///
1810 /// All operations within the transaction are atomically applied to the database.
1811 /// If any operation fails, none are applied.
1812 ///
1813 /// # Arguments
1814 /// * `tx_id` - Transaction ID returned from begin_transaction()
1815 ///
1816 /// # Examples
1817 ///
1818 /// ```
1819 /// use aurora_db::{Aurora, types::Value};
1820 ///
1821 /// let db = Aurora::open("mydb.db")?;
1822 ///
1823 /// // Transfer money between accounts
1824 /// let tx_id = db.begin_transaction();
1825 ///
1826 /// // Deduct from Alice
1827 /// db.update_document("accounts", "alice", vec![
1828 /// ("balance", Value::Int(900)), // Was 1000
1829 /// ]).await?;
1830 ///
1831 /// // Add to Bob
1832 /// db.update_document("accounts", "bob", vec![
1833 /// ("balance", Value::Int(600)), // Was 500
1834 /// ]).await?;
1835 ///
1836 /// // Both updates succeed - commit them
1837 /// db.commit_transaction(tx_id)?;
1838 ///
1839 /// println!("Transfer completed!");
1840 /// ```
1841 pub fn commit_transaction(&self, tx_id: crate::transaction::TransactionId) -> Result<()> {
1842 let buffer = self
1843 .transaction_manager
1844 .active_transactions
1845 .get(&tx_id)
1846 .ok_or_else(|| {
1847 AuroraError::InvalidOperation("Transaction not found or already completed".into())
1848 })?;
1849
1850 for item in buffer.writes.iter() {
1851 let key = item.key();
1852 let value = item.value();
1853 self.cold.set(key.clone(), value.clone())?;
1854 self.hot.set(key.clone(), value.clone(), None);
1855 if let Some(collection_name) = key.split(':').next()
1856 && !collection_name.starts_with('_') {
1857 self.index_value(collection_name, key, value)?;
1858 }
1859 }
1860
1861 for item in buffer.deletes.iter() {
1862 let key = item.key();
1863 if let Some((collection, id)) = key.split_once(':')
1864 && let Ok(Some(doc)) = self.get_document(collection, id) {
1865 self.remove_from_indices(collection, &doc)?;
1866 }
1867 self.cold.delete(key)?;
1868 self.hot.delete(key);
1869 }
1870
1871 // Drop the buffer reference to release the DashMap read lock
1872 // before calling commit which needs to remove the entry (write lock)
1873 drop(buffer);
1874
1875 self.transaction_manager.commit(tx_id)?;
1876
1877 self.cold.compact()?;
1878
1879 Ok(())
1880 }
1881
1882 /// Roll back a transaction, discarding all changes
1883 ///
1884 /// All operations within the transaction are discarded. The database state
1885 /// remains unchanged. Use this when an error occurs during transaction processing.
1886 ///
1887 /// # Arguments
1888 /// * `tx_id` - Transaction ID returned from begin_transaction()
1889 ///
1890 /// # Examples
1891 ///
1892 /// ```
1893 /// use aurora_db::{Aurora, types::Value};
1894 ///
1895 /// let db = Aurora::open("mydb.db")?;
1896 ///
1897 /// // Attempt a transfer with validation
1898 /// let tx_id = db.begin_transaction();
1899 ///
1900 /// let result = async {
1901 /// // Deduct from Alice
1902 /// let alice = db.get_document("accounts", "alice").await?;
1903 /// let balance = alice.and_then(|doc| doc.data.get("balance"));
1904 ///
1905 /// if let Some(Value::Int(bal)) = balance {
1906 /// if *bal < 100 {
1907 /// return Err("Insufficient funds");
1908 /// }
1909 ///
1910 /// db.update_document("accounts", "alice", vec![
1911 /// ("balance", Value::Int(bal - 100)),
1912 /// ]).await?;
1913 ///
1914 /// db.update_document("accounts", "bob", vec![
1915 /// ("balance", Value::Int(600)),
1916 /// ]).await?;
1917 ///
1918 /// Ok(())
1919 /// } else {
1920 /// Err("Account not found")
1921 /// }
1922 /// }.await;
1923 ///
1924 /// match result {
1925 /// Ok(_) => {
1926 /// db.commit_transaction(tx_id)?;
1927 /// println!("Transfer completed");
1928 /// }
1929 /// Err(e) => {
1930 /// db.rollback_transaction(tx_id)?;
1931 /// println!("Transfer failed: {}, changes rolled back", e);
1932 /// }
1933 /// }
1934 /// ```
1935 pub fn rollback_transaction(&self, tx_id: crate::transaction::TransactionId) -> Result<()> {
1936 self.transaction_manager.rollback(tx_id)
1937 }
1938
1939 /// Create a secondary index on a field for faster queries
1940 ///
1941 /// Indexes dramatically improve query performance for frequently accessed fields,
1942 /// trading increased memory usage and slower writes for much faster reads.
1943 ///
1944 /// # When to Create Indexes
1945 /// - **Frequent queries**: Fields used in 80%+ of your queries
1946 /// - **High cardinality**: Fields with many unique values (user_id, email)
1947 /// - **Sorting/filtering**: Fields used in ORDER BY or WHERE clauses
1948 /// - **Large collections**: Most beneficial with 10,000+ documents
1949 ///
1950 /// # When NOT to Index
1951 /// - Low cardinality fields (e.g., boolean flags, small enums)
1952 /// - Rarely queried fields
1953 /// - Fields that change frequently (write-heavy workloads)
1954 /// - Small collections (<1,000 documents) - full scans are fast enough
1955 ///
1956 /// # Performance Characteristics
1957 /// - **Query speedup**: O(n) → O(1) for equality filters
1958 /// - **Memory cost**: ~100-200 bytes per document per index
1959 /// - **Write slowdown**: ~20-30% longer insert/update times
1960 /// - **Build time**: ~5,000 docs/sec for initial indexing
1961 ///
1962 /// # Arguments
1963 /// * `collection` - Name of the collection to index
1964 /// * `field` - Name of the field to index
1965 ///
1966 /// # Examples
1967 ///
1968 /// ```
1969 /// use aurora_db::Aurora;
1970 ///
1971 /// let db = Aurora::open("mydb.db")?;
1972 /// db.new_collection("users", vec![
1973 /// ("email", FieldType::String),
1974 /// ("age", FieldType::Int),
1975 /// ("active", FieldType::Bool),
1976 /// ])?;
1977 ///
1978 /// // Index email - frequently queried, high cardinality
1979 /// db.create_index("users", "email").await?;
1980 ///
1981 /// // Now this query is FAST (O(1) instead of O(n))
1982 /// let user = db.query("users")
1983 /// .filter(|f| f.eq("email", "alice@example.com"))
1984 /// .first_one()
1985 /// .await?;
1986 ///
1987 /// // DON'T index 'active' - low cardinality (only 2 values: true/false)
1988 /// // A full scan is fast enough for boolean fields
1989 ///
1990 /// // DO index 'age' if you frequently query age ranges
1991 /// db.create_index("users", "age").await?;
1992 ///
1993 /// let young_users = db.query("users")
1994 /// .filter(|f| f.lt("age", 30))
1995 /// .collect()
1996 /// .await?;
1997 /// ```
1998 ///
1999 /// # Real-World Example: E-commerce Orders
2000 ///
2001 /// ```
2002 /// // Orders collection: 1 million documents
2003 /// db.new_collection("orders", vec![
2004 /// ("user_id", FieldType::String), // High cardinality
2005 /// ("status", FieldType::String), // Low cardinality (pending, shipped, delivered)
2006 /// ("created_at", FieldType::String),
2007 /// ("total", FieldType::Float),
2008 /// ])?;
2009 ///
2010 /// // Index user_id - queries like "show me my orders" are common
2011 /// db.create_index("orders", "user_id").await?; // Good choice
2012 ///
2013 /// // Query speedup: 2.5s → 0.001s
2014 /// let my_orders = db.query("orders")
2015 /// .filter(|f| f.eq("user_id", user_id))
2016 /// .collect()
2017 /// .await?;
2018 ///
2019 /// // DON'T index 'status' - only 3 possible values
2020 /// // Scanning 1M docs takes ~100ms, indexing won't help much
2021 ///
2022 /// // Index created_at if you frequently query recent orders
2023 /// db.create_index("orders", "created_at").await?; // Good for time-based queries
2024 /// ```
2025 pub async fn create_index(&self, collection: &str, field: &str) -> Result<()> {
2026 // Check if collection exists
2027 if self.get(&format!("_collection:{}", collection))?.is_none() {
2028 return Err(AuroraError::CollectionNotFound(collection.to_string()));
2029 }
2030
2031 // Generate a default index name
2032 let index_name = format!("idx_{}_{}", collection, field);
2033
2034 // Create index definition
2035 let definition = IndexDefinition {
2036 name: index_name.clone(),
2037 collection: collection.to_string(),
2038 fields: vec![field.to_string()],
2039 index_type: IndexType::BTree,
2040 unique: false,
2041 };
2042
2043 // Create the index
2044 let index = Index::new(definition.clone());
2045
2046 // Index all existing documents in the collection
2047 let prefix = format!("{}:", collection);
2048 for result in self.cold.scan_prefix(&prefix) {
2049 if let Ok((_, data)) = result
2050 && let Ok(doc) = serde_json::from_slice::<Document>(&data) {
2051 let _ = index.insert(&doc);
2052 }
2053 }
2054
2055 // Store the index
2056 self.indices.insert(index_name, index);
2057
2058 // Store the index definition for persistence
2059 let index_key = format!("_index:{}:{}", collection, field);
2060 self.put(index_key, serde_json::to_vec(&definition)?, None)?;
2061
2062 Ok(())
2063 }
2064
2065 /// Query documents in a collection with filtering, sorting, and pagination
2066 ///
2067 /// Returns a `QueryBuilder` that allows fluent chaining of query operations.
2068 /// Queries use early termination for LIMIT clauses, making them extremely fast
2069 /// even on large collections (6,800x faster than naive implementations).
2070 ///
2071 /// # Performance
2072 /// - With LIMIT: O(k) where k = limit + offset (early termination!)
2073 /// - Without LIMIT: O(n) where n = matching documents
2074 /// - Uses secondary indices when available for equality filters
2075 /// - Hot cache: ~1M reads/sec, Cold storage: ~500K reads/sec
2076 ///
2077 /// # Examples
2078 ///
2079 /// ```
2080 /// use aurora_db::{Aurora, types::Value};
2081 ///
2082 /// let db = Aurora::open("mydb.db")?;
2083 ///
2084 /// // Simple equality query
2085 /// let active_users = db.query("users")
2086 /// .filter(|f| f.eq("active", Value::Bool(true)))
2087 /// .collect()
2088 /// .await?;
2089 ///
2090 /// // Range query with pagination (FAST - uses early termination!)
2091 /// let top_scorers = db.query("users")
2092 /// .filter(|f| f.gt("score", Value::Int(1000)))
2093 /// .order_by("score", false) // descending
2094 /// .limit(10)
2095 /// .offset(20)
2096 /// .collect()
2097 /// .await?;
2098 ///
2099 /// // Multiple filters
2100 /// let premium_active = db.query("users")
2101 /// .filter(|f| f.eq("tier", Value::String("premium".into())))
2102 /// .filter(|f| f.eq("active", Value::Bool(true)))
2103 /// .limit(100) // Only scans ~200 docs, not all million!
2104 /// .collect()
2105 /// .await?;
2106 ///
2107 /// // Text search in a field
2108 /// let matching = db.query("articles")
2109 /// .filter(|f| f.contains("title", "rust"))
2110 /// .collect()
2111 /// .await?;
2112 /// ```
2113 pub fn query<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
2114 QueryBuilder::new(self, collection)
2115 }
2116
2117 /// Create a search builder for full-text search
2118 ///
2119 /// # Arguments
2120 /// * `collection` - Name of the collection to search
2121 ///
2122 /// # Returns
2123 /// A `SearchBuilder` for configuring and executing searches
2124 ///
2125 /// # Examples
2126 ///
2127 /// ```
2128 /// // Search for documents containing text
2129 /// let search_results = db.search("articles")
2130 /// .field("content")
2131 /// .matching("quantum computing")
2132 /// .fuzzy(true) // Enable fuzzy matching for typo tolerance
2133 /// .collect()
2134 /// .await?;
2135 /// ```
2136 pub fn search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
2137 SearchBuilder::new(self, collection)
2138 }
2139
2140 /// Retrieve a document by ID
2141 ///
2142 /// Fast direct lookup when you know the document ID. Significantly faster
2143 /// than querying with filters when ID is known.
2144 ///
2145 /// # Performance
2146 /// - Hot cache: ~1,000,000 reads/sec (instant)
2147 /// - Cold storage: ~500,000 reads/sec (disk I/O)
2148 /// - Complexity: O(1) - constant time lookup
2149 /// - Much faster than `.query().filter(|f| f.eq("id", ...))` which is O(n)
2150 ///
2151 /// # Arguments
2152 /// * `collection` - Name of the collection to query
2153 /// * `id` - ID of the document to retrieve
2154 ///
2155 /// # Returns
2156 /// The document if found, None if not found, or an error
2157 ///
2158 /// # Examples
2159 ///
2160 /// ```
2161 /// use aurora_db::{Aurora, types::Value};
2162 ///
2163 /// let db = Aurora::open("mydb.db")?;
2164 ///
2165 /// // Basic retrieval
2166 /// if let Some(user) = db.get_document("users", &user_id)? {
2167 /// println!("Found user: {}", user.id);
2168 ///
2169 /// // Access fields safely
2170 /// if let Some(Value::String(name)) = user.data.get("name") {
2171 /// println!("Name: {}", name);
2172 /// }
2173 ///
2174 /// if let Some(Value::Int(age)) = user.data.get("age") {
2175 /// println!("Age: {}", age);
2176 /// }
2177 /// } else {
2178 /// println!("User not found");
2179 /// }
2180 ///
2181 /// // Idiomatic error handling
2182 /// let user = db.get_document("users", &user_id)?
2183 /// .ok_or_else(|| AuroraError::NotFound("User not found".into()))?;
2184 ///
2185 /// // Checking existence before operations
2186 /// if db.get_document("users", &user_id)?.is_some() {
2187 /// db.update_document("users", &user_id, vec![
2188 /// ("last_login", Value::String(chrono::Utc::now().to_rfc3339())),
2189 /// ]).await?;
2190 /// }
2191 ///
2192 /// // Batch retrieval (fetch multiple by ID)
2193 /// let user_ids = vec!["user-1", "user-2", "user-3"];
2194 /// let users: Vec<Document> = user_ids.iter()
2195 /// .filter_map(|id| db.get_document("users", id).ok().flatten())
2196 /// .collect();
2197 ///
2198 /// println!("Found {} out of {} users", users.len(), user_ids.len());
2199 /// ```
2200 ///
2201 /// # When to Use
2202 /// - You know the document ID (from insert, previous query, or URL param)
2203 /// - Need fastest possible lookup (1M reads/sec)
2204 /// - Fetching a single document
2205 ///
2206 /// # When NOT to Use
2207 /// - Searching by other fields → Use `query().filter()` instead
2208 /// - Need multiple documents by criteria → Use `query().collect()` instead
2209 /// - Don't know the ID → Use `find_by_field()` or `query()` instead
2210 pub fn get_document(&self, collection: &str, id: &str) -> Result<Option<Document>> {
2211 let key = format!("{}:{}", collection, id);
2212 if let Some(data) = self.get(&key)? {
2213 Ok(Some(serde_json::from_slice(&data)?))
2214 } else {
2215 Ok(None)
2216 }
2217 }
2218
2219 /// Delete a document by ID
2220 ///
2221 /// Permanently removes a document from storage, cache, and all indices.
2222 /// Publishes a delete event for PubSub subscribers. This operation cannot be undone.
2223 ///
2224 /// # Performance
2225 /// - Delete speed: ~50,000 deletes/sec
2226 /// - Cleans up hot cache, cold storage, primary + secondary indices
2227 /// - Triggers PubSub events for listeners
2228 ///
2229 /// # Arguments
2230 /// * `key` - Full key in format "collection:id" (e.g., "users:123")
2231 ///
2232 /// # Returns
2233 /// Success or an error
2234 ///
2235 /// # Errors
2236 /// - `InvalidOperation`: Invalid key format (must be "collection:id")
2237 /// - `IoError`: Storage deletion failed
2238 ///
2239 /// # Examples
2240 ///
2241 /// ```
2242 /// use aurora_db::Aurora;
2243 ///
2244 /// let db = Aurora::open("mydb.db")?;
2245 ///
2246 /// // Basic deletion (note: requires "collection:id" format)
2247 /// db.delete("users:abc123").await?;
2248 ///
2249 /// // Delete with existence check
2250 /// let user_id = "user-456";
2251 /// if db.get_document("users", user_id)?.is_some() {
2252 /// db.delete(&format!("users:{}", user_id)).await?;
2253 /// println!("User deleted");
2254 /// } else {
2255 /// println!("User not found");
2256 /// }
2257 ///
2258 /// // Error handling
2259 /// match db.delete("users:nonexistent").await {
2260 /// Ok(_) => println!("Deleted successfully"),
2261 /// Err(e) => println!("Delete failed: {}", e),
2262 /// }
2263 ///
2264 /// // Batch deletion using query
2265 /// let inactive_count = db.delete_where("users", |f| {
2266 /// f.eq("active", Value::Bool(false))
2267 /// }).await?;
2268 /// println!("Deleted {} inactive users", inactive_count);
2269 ///
2270 /// // Delete with cascading (manual cascade pattern)
2271 /// let user_id = "user-123";
2272 ///
2273 /// // Delete user's orders first
2274 /// let orders = db.query("orders")
2275 /// .filter(|f| f.eq("user_id", user_id))
2276 /// .collect()
2277 /// .await?;
2278 ///
2279 /// for order in orders {
2280 /// db.delete(&format!("orders:{}", order.id)).await?;
2281 /// }
2282 ///
2283 /// // Then delete the user
2284 /// db.delete(&format!("users:{}", user_id)).await?;
2285 /// println!("User and all orders deleted");
2286 /// ```
2287 ///
2288 /// # Alternative: Soft Delete Pattern
2289 ///
2290 /// For recoverable deletions, use soft deletes instead:
2291 ///
2292 /// ```
2293 /// // Soft delete - mark as deleted instead of removing
2294 /// db.update_document("users", &user_id, vec![
2295 /// ("deleted", Value::Bool(true)),
2296 /// ("deleted_at", Value::String(chrono::Utc::now().to_rfc3339())),
2297 /// ]).await?;
2298 ///
2299 /// // Query excludes soft-deleted items
2300 /// let active_users = db.query("users")
2301 /// .filter(|f| f.eq("deleted", Value::Bool(false)))
2302 /// .collect()
2303 /// .await?;
2304 ///
2305 /// // Later: hard delete after retention period
2306 /// let old_deletions = db.query("users")
2307 /// .filter(|f| f.eq("deleted", Value::Bool(true)))
2308 /// .filter(|f| f.lt("deleted_at", thirty_days_ago))
2309 /// .collect()
2310 /// .await?;
2311 ///
2312 /// for user in old_deletions {
2313 /// db.delete(&format!("users:{}", user.id)).await?;
2314 /// }
2315 /// ```
2316 ///
2317 /// # Important Notes
2318 /// - Deletion is permanent - no undo/recovery
2319 /// - Consider soft deletes for recoverable operations
2320 /// - Use transactions for multi-document deletions
2321 /// - PubSub subscribers will receive delete events
2322 /// - All indices are automatically cleaned up
2323 pub async fn delete(&self, key: &str) -> Result<()> {
2324 // Extract collection and id from key (format: "collection:id")
2325 let (collection, id) = if let Some((coll, doc_id)) = key.split_once(':') {
2326 (coll, doc_id)
2327 } else {
2328 return Err(AuroraError::InvalidOperation(
2329 "Invalid key format, expected 'collection:id'".into(),
2330 ));
2331 };
2332
2333 // CRITICAL FIX: Get document BEFORE deletion to clean up secondary indices
2334 let document = self.get_document(collection, id)?;
2335
2336 // Delete from hot cache
2337 if self.hot.get(key).is_some() {
2338 self.hot.delete(key);
2339 }
2340
2341 // Delete from cold storage
2342 self.cold.delete(key)?;
2343
2344 // CRITICAL FIX: Clean up ALL indices (primary + secondary)
2345 if let Some(doc) = document {
2346 self.remove_from_indices(collection, &doc)?;
2347 } else {
2348 // Fallback: at least remove from primary index
2349 if let Some(index) = self.primary_indices.get_mut(collection) {
2350 index.remove(id);
2351 }
2352 }
2353
2354 // Publish delete event
2355 let event = crate::pubsub::ChangeEvent::delete(collection, id);
2356 let _ = self.pubsub.publish(event);
2357
2358 Ok(())
2359 }
2360
2361 pub async fn delete_collection(&self, collection: &str) -> Result<()> {
2362 let prefix = format!("{}:", collection);
2363
2364 // Get all keys in collection
2365 let keys: Vec<String> = self
2366 .cold
2367 .scan()
2368 .filter_map(|r| r.ok())
2369 .filter(|(k, _)| k.starts_with(&prefix))
2370 .map(|(k, _)| k)
2371 .collect();
2372
2373 // Delete each key
2374 for key in keys {
2375 self.delete(&key).await?;
2376 }
2377
2378 // Remove collection indices
2379 self.primary_indices.remove(collection);
2380 self.secondary_indices
2381 .retain(|k, _| !k.starts_with(&prefix));
2382
2383 // Invalidate schema cache
2384 self.schema_cache.remove(collection);
2385
2386 Ok(())
2387 }
2388
2389 fn remove_from_indices(&self, collection: &str, doc: &Document) -> Result<()> {
2390 // Remove from primary index
2391 if let Some(index) = self.primary_indices.get(collection) {
2392 index.remove(&doc.id);
2393 }
2394
2395 // Remove from secondary indices
2396 for (field, value) in &doc.data {
2397 let index_key = format!("{}:{}", collection, field);
2398 if let Some(index) = self.secondary_indices.get(&index_key)
2399 && let Some(mut doc_ids) = index.get_mut(&value.to_string()) {
2400 doc_ids.retain(|id| id != &doc.id);
2401 }
2402 }
2403
2404 Ok(())
2405 }
2406
2407 pub async fn search_text(
2408 &self,
2409 collection: &str,
2410 field: &str,
2411 query: &str,
2412 ) -> Result<Vec<Document>> {
2413 let mut results = Vec::new();
2414 let docs = self.get_all_collection(collection).await?;
2415
2416 for doc in docs {
2417 if let Some(Value::String(text)) = doc.data.get(field)
2418 && text.to_lowercase().contains(&query.to_lowercase()) {
2419 results.push(doc);
2420 }
2421 }
2422
2423 Ok(results)
2424 }
2425
2426 /// Export a collection to a JSON file
2427 ///
2428 /// Creates a JSON file containing all documents in the collection.
2429 /// Useful for backups, data migration, or sharing datasets.
2430 /// Automatically appends `.json` extension if not present.
2431 ///
2432 /// # Performance
2433 /// - Export speed: ~10,000 docs/sec
2434 /// - Scans entire collection from cold storage
2435 /// - Memory efficient: streams documents to file
2436 ///
2437 /// # Arguments
2438 /// * `collection` - Name of the collection to export
2439 /// * `output_path` - Path to the output JSON file (`.json` auto-appended)
2440 ///
2441 /// # Returns
2442 /// Success or an error
2443 ///
2444 /// # Examples
2445 ///
2446 /// ```
2447 /// use aurora_db::Aurora;
2448 ///
2449 /// let db = Aurora::open("mydb.db")?;
2450 ///
2451 /// // Basic export
2452 /// db.export_as_json("users", "./backups/users_2024-01-15")?;
2453 /// // Creates: ./backups/users_2024-01-15.json
2454 ///
2455 /// // Timestamped backup
2456 /// let timestamp = chrono::Utc::now().format("%Y%m%d_%H%M%S");
2457 /// let backup_path = format!("./backups/users_{}", timestamp);
2458 /// db.export_as_json("users", &backup_path)?;
2459 ///
2460 /// // Export multiple collections
2461 /// for collection in &["users", "orders", "products"] {
2462 /// db.export_as_json(collection, &format!("./export/{}", collection))?;
2463 /// }
2464 /// ```
2465 ///
2466 /// # Output Format
2467 ///
2468 /// The exported JSON has this structure:
2469 /// ```json
2470 /// {
2471 /// "users": [
2472 /// { "id": "123", "name": "Alice", "email": "alice@example.com" },
2473 /// { "id": "456", "name": "Bob", "email": "bob@example.com" }
2474 /// ]
2475 /// }
2476 /// ```
2477 ///
2478 /// # See Also
2479 /// - `export_as_csv()` for CSV format export
2480 /// - `import_from_json()` to restore exported data
2481 pub fn export_as_json(&self, collection: &str, output_path: &str) -> Result<()> {
2482 let output_path = if !output_path.ends_with(".json") {
2483 format!("{}.json", output_path)
2484 } else {
2485 output_path.to_string()
2486 };
2487
2488 let mut docs = Vec::new();
2489
2490 // Get all documents from the specified collection
2491 for result in self.cold.scan() {
2492 let (key, value) = result?;
2493
2494 // Only process documents from the specified collection
2495 if let Some(key_collection) = key.split(':').next()
2496 && key_collection == collection && !key.starts_with("_collection:")
2497 && let Ok(doc) = serde_json::from_slice::<Document>(&value) {
2498 // Convert Value enum to raw JSON values
2499 let mut clean_doc = serde_json::Map::new();
2500 for (k, v) in doc.data {
2501 match v {
2502 Value::String(s) => clean_doc.insert(k, JsonValue::String(s)),
2503 Value::Int(i) => clean_doc.insert(k, JsonValue::Number(i.into())),
2504 Value::Float(f) => {
2505 if let Some(n) = serde_json::Number::from_f64(f) {
2506 clean_doc.insert(k, JsonValue::Number(n))
2507 } else {
2508 clean_doc.insert(k, JsonValue::Null)
2509 }
2510 }
2511 Value::Bool(b) => clean_doc.insert(k, JsonValue::Bool(b)),
2512 Value::Array(arr) => {
2513 let clean_arr: Vec<JsonValue> = arr
2514 .into_iter()
2515 .map(|v| match v {
2516 Value::String(s) => JsonValue::String(s),
2517 Value::Int(i) => JsonValue::Number(i.into()),
2518 Value::Float(f) => serde_json::Number::from_f64(f)
2519 .map(JsonValue::Number)
2520 .unwrap_or(JsonValue::Null),
2521 Value::Bool(b) => JsonValue::Bool(b),
2522 Value::Null => JsonValue::Null,
2523 _ => JsonValue::Null,
2524 })
2525 .collect();
2526 clean_doc.insert(k, JsonValue::Array(clean_arr))
2527 }
2528 Value::Uuid(u) => {
2529 clean_doc.insert(k, JsonValue::String(u.to_string()))
2530 }
2531 Value::Null => clean_doc.insert(k, JsonValue::Null),
2532 Value::Object(_) => None, // Handle nested objects if needed
2533 };
2534 }
2535 docs.push(JsonValue::Object(clean_doc));
2536 }
2537 }
2538
2539 let output = JsonValue::Object(serde_json::Map::from_iter(vec![(
2540 collection.to_string(),
2541 JsonValue::Array(docs),
2542 )]));
2543
2544 let mut file = StdFile::create(&output_path)?;
2545 serde_json::to_writer_pretty(&mut file, &output)?;
2546 println!("Exported collection '{}' to {}", collection, &output_path);
2547 Ok(())
2548 }
2549
2550 /// Export a collection to a CSV file
2551 ///
2552 /// Creates a CSV file with headers from the first document and rows for each document.
2553 /// Useful for spreadsheet analysis, data science workflows, or reporting.
2554 /// Automatically appends `.csv` extension if not present.
2555 ///
2556 /// # Performance
2557 /// - Export speed: ~8,000 docs/sec
2558 /// - Memory efficient: streams rows to file
2559 /// - Headers determined from first document
2560 ///
2561 /// # Arguments
2562 /// * `collection` - Name of the collection to export
2563 /// * `filename` - Path to the output CSV file (`.csv` auto-appended)
2564 ///
2565 /// # Returns
2566 /// Success or an error
2567 ///
2568 /// # Examples
2569 ///
2570 /// ```
2571 /// use aurora_db::Aurora;
2572 ///
2573 /// let db = Aurora::open("mydb.db")?;
2574 ///
2575 /// // Basic CSV export
2576 /// db.export_as_csv("users", "./reports/users")?;
2577 /// // Creates: ./reports/users.csv
2578 ///
2579 /// // Export for analysis in Excel/Google Sheets
2580 /// db.export_as_csv("orders", "./analytics/sales_data")?;
2581 ///
2582 /// // Monthly report generation
2583 /// let month = chrono::Utc::now().format("%Y-%m");
2584 /// db.export_as_csv("transactions", &format!("./reports/transactions_{}", month))?;
2585 /// ```
2586 ///
2587 /// # Output Format
2588 ///
2589 /// ```csv
2590 /// id,name,email,age
2591 /// 123,Alice,alice@example.com,28
2592 /// 456,Bob,bob@example.com,32
2593 /// ```
2594 ///
2595 /// # Important Notes
2596 /// - Headers are taken from the first document's fields
2597 /// - Documents with different fields will have empty values for missing fields
2598 /// - Nested objects/arrays are converted to strings
2599 /// - Best for flat document structures
2600 ///
2601 /// # See Also
2602 /// - `export_as_json()` for JSON format (better for nested data)
2603 /// - For complex nested structures, use JSON export instead
2604 pub fn export_as_csv(&self, collection: &str, filename: &str) -> Result<()> {
2605 let output_path = if !filename.ends_with(".csv") {
2606 format!("{}.csv", filename)
2607 } else {
2608 filename.to_string()
2609 };
2610
2611 let mut writer = csv::Writer::from_path(&output_path)?;
2612 let mut headers = Vec::new();
2613 let mut first_doc = true;
2614
2615 // Get all documents from the specified collection
2616 for result in self.cold.scan() {
2617 let (key, value) = result?;
2618
2619 // Only process documents from the specified collection
2620 if let Some(key_collection) = key.split(':').next()
2621 && key_collection == collection && !key.starts_with("_collection:")
2622 && let Ok(doc) = serde_json::from_slice::<Document>(&value) {
2623 // Write headers from first document
2624 if first_doc && !doc.data.is_empty() {
2625 headers = doc.data.keys().cloned().collect();
2626 writer.write_record(&headers)?;
2627 first_doc = false;
2628 }
2629
2630 // Write the document values
2631 let values: Vec<String> = headers
2632 .iter()
2633 .map(|header| {
2634 doc.data
2635 .get(header)
2636 .map(|v| v.to_string())
2637 .unwrap_or_default()
2638 })
2639 .collect();
2640 writer.write_record(&values)?;
2641 }
2642 }
2643
2644 writer.flush()?;
2645 println!("Exported collection '{}' to {}", collection, &output_path);
2646 Ok(())
2647 }
2648
2649 // Helper method to create filter-based queries
2650 pub fn find<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
2651 self.query(collection)
2652 }
2653
2654 // Convenience methods that build on top of the FilterBuilder
2655
2656 pub async fn find_by_id(&self, collection: &str, id: &str) -> Result<Option<Document>> {
2657 self.query(collection)
2658 .filter(|f| f.eq("id", id))
2659 .first_one()
2660 .await
2661 }
2662
2663 pub async fn find_one<F>(&self, collection: &str, filter_fn: F) -> Result<Option<Document>>
2664 where
2665 F: Fn(&FilterBuilder) -> bool + Send + Sync + 'static,
2666 {
2667 self.query(collection).filter(filter_fn).first_one().await
2668 }
2669
2670 pub async fn find_by_field<T: Into<Value> + Clone + Send + Sync + 'static>(
2671 &self,
2672 collection: &str,
2673 field: &'static str,
2674 value: T,
2675 ) -> Result<Vec<Document>> {
2676 let value_clone = value.clone();
2677 self.query(collection)
2678 .filter(move |f| f.eq(field, value_clone.clone()))
2679 .collect()
2680 .await
2681 }
2682
2683 pub async fn find_by_fields(
2684 &self,
2685 collection: &str,
2686 fields: Vec<(&str, Value)>,
2687 ) -> Result<Vec<Document>> {
2688 let mut query = self.query(collection);
2689
2690 for (field, value) in fields {
2691 let field_owned = field.to_owned();
2692 let value_owned = value.clone();
2693 query = query.filter(move |f| f.eq(&field_owned, value_owned.clone()));
2694 }
2695
2696 query.collect().await
2697 }
2698
2699 // Advanced example: find documents with a field value in a specific range
2700 pub async fn find_in_range<T: Into<Value> + Clone + Send + Sync + 'static>(
2701 &self,
2702 collection: &str,
2703 field: &'static str,
2704 min: T,
2705 max: T,
2706 ) -> Result<Vec<Document>> {
2707 self.query(collection)
2708 .filter(move |f| f.between(field, min.clone(), max.clone()))
2709 .collect()
2710 .await
2711 }
2712
2713 // Complex query example: build with multiple combined filters
2714 pub async fn find_complex<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
2715 self.query(collection)
2716 }
2717
2718 // Create a full-text search query with added filter options
2719 pub fn advanced_search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
2720 self.search(collection)
2721 }
2722
2723 // Utility methods for common operations
2724 pub async fn upsert(
2725 &self,
2726 collection: &str,
2727 id: &str,
2728 data: Vec<(&str, Value)>,
2729 ) -> Result<String> {
2730 // Convert Vec<(&str, Value)> to HashMap<String, Value>
2731 let data_map: HashMap<String, Value> =
2732 data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
2733
2734 // Check if document exists
2735 if let Some(mut doc) = self.get_document(collection, id)? {
2736 // Update existing document - merge new data
2737 for (key, value) in data_map {
2738 doc.data.insert(key, value);
2739 }
2740
2741 // Validate unique constraints for the updated document
2742 // We need to exclude the current document from the uniqueness check
2743 self.validate_unique_constraints_excluding(collection, &doc.data, id)
2744 .await?;
2745
2746 self.put(
2747 format!("{}:{}", collection, id),
2748 serde_json::to_vec(&doc)?,
2749 None,
2750 )?;
2751 Ok(id.to_string())
2752 } else {
2753 // Insert new document with specified ID - validate unique constraints
2754 self.validate_unique_constraints(collection, &data_map)
2755 .await?;
2756
2757 let document = Document {
2758 id: id.to_string(),
2759 data: data_map,
2760 };
2761
2762 self.put(
2763 format!("{}:{}", collection, id),
2764 serde_json::to_vec(&document)?,
2765 None,
2766 )?;
2767 Ok(id.to_string())
2768 }
2769 }
2770
2771 // Atomic increment/decrement
2772 pub async fn increment(
2773 &self,
2774 collection: &str,
2775 id: &str,
2776 field: &str,
2777 amount: i64,
2778 ) -> Result<i64> {
2779 if let Some(mut doc) = self.get_document(collection, id)? {
2780 // Get current value
2781 let current = match doc.data.get(field) {
2782 Some(Value::Int(i)) => *i,
2783 _ => 0,
2784 };
2785
2786 // Increment
2787 let new_value = current + amount;
2788 doc.data.insert(field.to_string(), Value::Int(new_value));
2789
2790 // Save changes
2791 self.put(
2792 format!("{}:{}", collection, id),
2793 serde_json::to_vec(&doc)?,
2794 None,
2795 )?;
2796
2797 Ok(new_value)
2798 } else {
2799 Err(AuroraError::NotFound(format!(
2800 "Document {}:{} not found",
2801 collection, id
2802 )))
2803 }
2804 }
2805
2806 // Delete documents by query
2807 pub async fn delete_by_query<F>(&self, collection: &str, filter_fn: F) -> Result<usize>
2808 where
2809 F: Fn(&FilterBuilder) -> bool + Send + Sync + 'static,
2810 {
2811 let docs = self.query(collection).filter(filter_fn).collect().await?;
2812
2813 let mut deleted_count = 0;
2814
2815 for doc in docs {
2816 let key = format!("{}:{}", collection, doc.id);
2817 self.delete(&key).await?;
2818 deleted_count += 1;
2819 }
2820
2821 Ok(deleted_count)
2822 }
2823
2824 /// Import documents from a JSON file into a collection
2825 ///
2826 /// Validates each document against the collection schema, skips duplicates (by ID),
2827 /// and provides detailed statistics about the import operation. Useful for restoring
2828 /// backups, migrating data, or seeding development databases.
2829 ///
2830 /// # Performance
2831 /// - Import speed: ~5,000 docs/sec (with validation)
2832 /// - Memory efficient: processes documents one at a time
2833 /// - Validates schema and unique constraints
2834 ///
2835 /// # Arguments
2836 /// * `collection` - Name of the collection to import into
2837 /// * `filename` - Path to the JSON file containing documents (array format)
2838 ///
2839 /// # Returns
2840 /// `ImportStats` containing counts of imported, skipped, and failed documents
2841 ///
2842 /// # Examples
2843 ///
2844 /// ```
2845 /// use aurora_db::Aurora;
2846 ///
2847 /// let db = Aurora::open("mydb.db")?;
2848 ///
2849 /// // Basic import
2850 /// let stats = db.import_from_json("users", "./data/new_users.json").await?;
2851 /// println!("Imported: {}, Skipped: {}, Failed: {}",
2852 /// stats.imported, stats.skipped, stats.failed);
2853 ///
2854 /// // Restore from backup
2855 /// let backup_file = "./backups/users_2024-01-15.json";
2856 /// let stats = db.import_from_json("users", backup_file).await?;
2857 ///
2858 /// if stats.failed > 0 {
2859 /// eprintln!("Warning: {} documents failed validation", stats.failed);
2860 /// }
2861 ///
2862 /// // Idempotent import - duplicates are skipped
2863 /// let stats = db.import_from_json("users", "./data/users.json").await?;
2864 /// // Running again will skip all existing documents
2865 /// let stats2 = db.import_from_json("users", "./data/users.json").await?;
2866 /// assert_eq!(stats2.skipped, stats.imported);
2867 ///
2868 /// // Migration from another system
2869 /// db.new_collection("products", vec![
2870 /// ("sku", FieldType::String),
2871 /// ("name", FieldType::String),
2872 /// ("price", FieldType::Float),
2873 /// ])?;
2874 ///
2875 /// let stats = db.import_from_json("products", "./migration/products.json").await?;
2876 /// println!("Migration complete: {} products imported", stats.imported);
2877 /// ```
2878 ///
2879 /// # Expected JSON Format
2880 ///
2881 /// The JSON file should contain an array of document objects:
2882 /// ```json
2883 /// [
2884 /// { "id": "123", "name": "Alice", "email": "alice@example.com" },
2885 /// { "id": "456", "name": "Bob", "email": "bob@example.com" },
2886 /// { "name": "Carol", "email": "carol@example.com" }
2887 /// ]
2888 /// ```
2889 ///
2890 /// # Behavior
2891 /// - Documents with existing IDs are skipped (duplicate detection)
2892 /// - Documents without IDs get auto-generated UUIDs
2893 /// - Schema validation is performed on all fields
2894 /// - Failed documents are counted but don't stop the import
2895 /// - Unique constraints are checked
2896 ///
2897 /// # See Also
2898 /// - `export_as_json()` to create compatible backup files
2899 /// - `batch_insert()` for programmatic bulk inserts
2900 pub async fn import_from_json(&self, collection: &str, filename: &str) -> Result<ImportStats> {
2901 // Validate that the collection exists
2902 let collection_def = self.get_collection_definition(collection)?;
2903
2904 // Load JSON file
2905 let json_string = read_to_string(filename)
2906 .await
2907 .map_err(|e| AuroraError::IoError(format!("Failed to read import file: {}", e)))?;
2908
2909 // Parse JSON
2910 let documents: Vec<JsonValue> = from_str(&json_string)
2911 .map_err(|e| AuroraError::SerializationError(format!("Failed to parse JSON: {}", e)))?;
2912
2913 let mut stats = ImportStats::default();
2914
2915 // Process each document
2916 for doc_json in documents {
2917 match self
2918 .import_document(collection, &collection_def, doc_json)
2919 .await
2920 {
2921 Ok(ImportResult::Imported) => stats.imported += 1,
2922 Ok(ImportResult::Skipped) => stats.skipped += 1,
2923 Err(_) => stats.failed += 1,
2924 }
2925 }
2926
2927 Ok(stats)
2928 }
2929
2930 /// Import a single document, performing schema validation and duplicate checking
2931 async fn import_document(
2932 &self,
2933 collection: &str,
2934 collection_def: &Collection,
2935 doc_json: JsonValue,
2936 ) -> Result<ImportResult> {
2937 if !doc_json.is_object() {
2938 return Err(AuroraError::InvalidOperation("Expected JSON object".into()));
2939 }
2940
2941 // Extract document ID if present
2942 let doc_id = doc_json
2943 .get("id")
2944 .and_then(|id| id.as_str())
2945 .map(|s| s.to_string())
2946 .unwrap_or_else(|| Uuid::new_v4().to_string());
2947
2948 // Check if document with this ID already exists
2949 if self.get_document(collection, &doc_id)?.is_some() {
2950 return Ok(ImportResult::Skipped);
2951 }
2952
2953 // Convert JSON to our document format and validate against schema
2954 let mut data_map = HashMap::new();
2955
2956 if let Some(obj) = doc_json.as_object() {
2957 for (field_name, field_def) in &collection_def.fields {
2958 if let Some(json_value) = obj.get(field_name) {
2959 // Validate value against field type
2960 if !self.validate_field_value(json_value, &field_def.field_type) {
2961 return Err(AuroraError::InvalidOperation(format!(
2962 "Field '{}' has invalid type",
2963 field_name
2964 )));
2965 }
2966
2967 // Convert JSON value to our Value type
2968 let value = self.json_to_value(json_value)?;
2969 data_map.insert(field_name.clone(), value);
2970 } else if field_def.unique {
2971 // Missing required unique field
2972 return Err(AuroraError::InvalidOperation(format!(
2973 "Missing required unique field '{}'",
2974 field_name
2975 )));
2976 }
2977 }
2978 }
2979
2980 // Check for duplicates by unique fields
2981 let unique_fields = self.get_unique_fields(collection_def);
2982 for unique_field in &unique_fields {
2983 if let Some(value) = data_map.get(unique_field) {
2984 // Query for existing documents with this unique value
2985 let query_results = self
2986 .query(collection)
2987 .filter(move |f| f.eq(unique_field, value.clone()))
2988 .limit(1)
2989 .collect()
2990 .await?;
2991
2992 if !query_results.is_empty() {
2993 // Found duplicate by unique field
2994 return Ok(ImportResult::Skipped);
2995 }
2996 }
2997 }
2998
2999 // Create and insert document
3000 let document = Document {
3001 id: doc_id,
3002 data: data_map,
3003 };
3004
3005 self.put(
3006 format!("{}:{}", collection, document.id),
3007 serde_json::to_vec(&document)?,
3008 None,
3009 )?;
3010
3011 Ok(ImportResult::Imported)
3012 }
3013
3014 /// Validate that a JSON value matches the expected field type
3015 fn validate_field_value(&self, value: &JsonValue, field_type: &FieldType) -> bool {
3016 match field_type {
3017 FieldType::String => value.is_string(),
3018 FieldType::Int => value.is_i64() || value.is_u64(),
3019 FieldType::Float => value.is_number(),
3020 FieldType::Bool => value.is_boolean(),
3021 FieldType::Array => value.is_array(),
3022 FieldType::Object => value.is_object(),
3023 FieldType::Uuid => {
3024 value.is_string() && Uuid::parse_str(value.as_str().unwrap_or("")).is_ok()
3025 }
3026 }
3027 }
3028
3029 /// Convert a JSON value to our internal Value type
3030 #[allow(clippy::only_used_in_recursion)]
3031 fn json_to_value(&self, json_value: &JsonValue) -> Result<Value> {
3032 match json_value {
3033 JsonValue::Null => Ok(Value::Null),
3034 JsonValue::Bool(b) => Ok(Value::Bool(*b)),
3035 JsonValue::Number(n) => {
3036 if let Some(i) = n.as_i64() {
3037 Ok(Value::Int(i))
3038 } else if let Some(f) = n.as_f64() {
3039 Ok(Value::Float(f))
3040 } else {
3041 Err(AuroraError::InvalidOperation("Invalid number value".into()))
3042 }
3043 }
3044 JsonValue::String(s) => {
3045 // Try parsing as UUID first
3046 if let Ok(uuid) = Uuid::parse_str(s) {
3047 Ok(Value::Uuid(uuid))
3048 } else {
3049 Ok(Value::String(s.clone()))
3050 }
3051 }
3052 JsonValue::Array(arr) => {
3053 let mut values = Vec::new();
3054 for item in arr {
3055 values.push(self.json_to_value(item)?);
3056 }
3057 Ok(Value::Array(values))
3058 }
3059 JsonValue::Object(obj) => {
3060 let mut map = HashMap::new();
3061 for (k, v) in obj {
3062 map.insert(k.clone(), self.json_to_value(v)?);
3063 }
3064 Ok(Value::Object(map))
3065 }
3066 }
3067 }
3068
3069 /// Get collection definition
3070 fn get_collection_definition(&self, collection: &str) -> Result<Collection> {
3071 if let Some(data) = self.get(&format!("_collection:{}", collection))? {
3072 let collection_def: Collection = serde_json::from_slice(&data)?;
3073 Ok(collection_def)
3074 } else {
3075 Err(AuroraError::CollectionNotFound(collection.to_string()))
3076 }
3077 }
3078
3079 /// Get storage statistics and information about the database
3080 pub fn get_database_stats(&self) -> Result<DatabaseStats> {
3081 let hot_stats = self.hot.get_stats();
3082 let cold_stats = self.cold.get_stats()?;
3083
3084 Ok(DatabaseStats {
3085 hot_stats,
3086 cold_stats,
3087 estimated_size: self.cold.estimated_size(),
3088 collections: self.get_collection_stats()?,
3089 })
3090 }
3091
3092 /// Check if a key is currently stored in the hot cache
3093 pub fn is_in_hot_cache(&self, key: &str) -> bool {
3094 self.hot.is_hot(key)
3095 }
3096
3097 /// Start background cleanup of hot cache with specified interval
3098 pub async fn start_hot_cache_maintenance(&self, interval_secs: u64) {
3099 let hot_store = Arc::new(self.hot.clone());
3100 hot_store.start_cleanup_with_interval(interval_secs).await;
3101 }
3102
3103 /// Clear the hot cache (useful when memory needs to be freed)
3104 pub fn clear_hot_cache(&self) {
3105 self.hot.clear();
3106 println!(
3107 "Hot cache cleared, current hit ratio: {:.2}%",
3108 self.hot.hit_ratio() * 100.0
3109 );
3110 }
3111
3112 /// Prewarm the cache by loading frequently accessed data from cold storage
3113 ///
3114 /// Loads documents from a collection into memory cache to eliminate cold-start
3115 /// latency. Dramatically improves initial query performance after database startup
3116 /// by preloading the most commonly accessed data.
3117 ///
3118 /// # Performance Impact
3119 /// - Prewarming speed: ~20,000 docs/sec
3120 /// - Improves subsequent read latency from ~2ms (disk) to ~0.001ms (memory)
3121 /// - Cache hit rate jumps from 0% to 95%+ for prewarmed data
3122 /// - Memory cost: ~500 bytes per document average
3123 ///
3124 /// # Arguments
3125 /// * `collection` - The collection to prewarm
3126 /// * `limit` - Maximum number of documents to load (default: 1000, None = all)
3127 ///
3128 /// # Returns
3129 /// Number of documents loaded into cache
3130 ///
3131 /// # Examples
3132 ///
3133 /// ```
3134 /// use aurora_db::Aurora;
3135 ///
3136 /// let db = Aurora::open("mydb.db")?;
3137 ///
3138 /// // Prewarm frequently accessed collection
3139 /// let loaded = db.prewarm_cache("users", Some(1000)).await?;
3140 /// println!("Prewarmed {} user documents", loaded);
3141 ///
3142 /// // Now queries are fast from the start
3143 /// let stats_before = db.get_cache_stats();
3144 /// let users = db.query("users").collect().await?;
3145 /// let stats_after = db.get_cache_stats();
3146 ///
3147 /// // High hit rate thanks to prewarming
3148 /// assert!(stats_after.hit_rate > 0.95);
3149 ///
3150 /// // Startup optimization pattern
3151 /// async fn startup_prewarm(db: &Aurora) -> Result<()> {
3152 /// println!("Prewarming caches...");
3153 ///
3154 /// // Prewarm most frequently accessed collections
3155 /// db.prewarm_cache("users", Some(5000)).await?;
3156 /// db.prewarm_cache("sessions", Some(1000)).await?;
3157 /// db.prewarm_cache("products", Some(500)).await?;
3158 ///
3159 /// let stats = db.get_cache_stats();
3160 /// println!("Cache prewarmed: {} entries loaded", stats.size);
3161 ///
3162 /// Ok(())
3163 /// }
3164 ///
3165 /// // Web server startup
3166 /// #[tokio::main]
3167 /// async fn main() {
3168 /// let db = Aurora::open("app.db").unwrap();
3169 ///
3170 /// // Prewarm before accepting requests
3171 /// db.prewarm_cache("users", Some(10000)).await.unwrap();
3172 ///
3173 /// // Server is now ready with hot cache
3174 /// start_web_server(db).await;
3175 /// }
3176 ///
3177 /// // Prewarm all documents (for small collections)
3178 /// let all_loaded = db.prewarm_cache("config", None).await?;
3179 /// // All config documents now in memory
3180 ///
3181 /// // Selective prewarming based on access patterns
3182 /// async fn smart_prewarm(db: &Aurora) -> Result<()> {
3183 /// // Load recent users (they're accessed most)
3184 /// db.prewarm_cache("users", Some(1000)).await?;
3185 ///
3186 /// // Load active sessions only
3187 /// let active_sessions = db.query("sessions")
3188 /// .filter(|f| f.eq("active", Value::Bool(true)))
3189 /// .limit(500)
3190 /// .collect()
3191 /// .await?;
3192 ///
3193 /// // Manually populate cache with hot data
3194 /// for session in active_sessions {
3195 /// // Reading automatically caches
3196 /// db.get_document("sessions", &session.id)?;
3197 /// }
3198 ///
3199 /// Ok(())
3200 /// }
3201 /// ```
3202 ///
3203 /// # Typical Prewarming Scenarios
3204 ///
3205 /// **Web Application Startup:**
3206 /// ```
3207 /// // Load user data, sessions, and active content
3208 /// db.prewarm_cache("users", Some(5000)).await?;
3209 /// db.prewarm_cache("sessions", Some(2000)).await?;
3210 /// db.prewarm_cache("posts", Some(1000)).await?;
3211 /// ```
3212 ///
3213 /// **E-commerce Site:**
3214 /// ```
3215 /// // Load products, categories, and user carts
3216 /// db.prewarm_cache("products", Some(500)).await?;
3217 /// db.prewarm_cache("categories", None).await?; // All categories
3218 /// db.prewarm_cache("active_carts", Some(1000)).await?;
3219 /// ```
3220 ///
3221 /// **API Server:**
3222 /// ```
3223 /// // Load authentication data and rate limits
3224 /// db.prewarm_cache("api_keys", None).await?;
3225 /// db.prewarm_cache("rate_limits", Some(10000)).await?;
3226 /// ```
3227 ///
3228 /// # When to Use
3229 /// - At application startup to eliminate cold-start latency
3230 /// - After cache clear operations
3231 /// - Before high-traffic events (product launches, etc.)
3232 /// - When deploying new instances (load balancer warm-up)
3233 ///
3234 /// # Memory Considerations
3235 /// - 1,000 docs ≈ 500 KB memory
3236 /// - 10,000 docs ≈ 5 MB memory
3237 /// - 100,000 docs ≈ 50 MB memory
3238 /// - Stay within configured cache capacity
3239 ///
3240 /// # See Also
3241 /// - `get_cache_stats()` to monitor cache effectiveness
3242 /// - `prewarm_all_collections()` to prewarm all collections
3243 /// - `Aurora::with_config()` to adjust cache capacity
3244 pub async fn prewarm_cache(&self, collection: &str, limit: Option<usize>) -> Result<usize> {
3245 let limit = limit.unwrap_or(1000);
3246 let prefix = format!("{}:", collection);
3247 let mut loaded = 0;
3248
3249 for entry in self.cold.scan_prefix(&prefix) {
3250 if loaded >= limit {
3251 break;
3252 }
3253
3254 if let Ok((key, value)) = entry {
3255 // Load into hot cache
3256 self.hot.set(key.clone(), value, None);
3257 loaded += 1;
3258 }
3259 }
3260
3261 println!("Prewarmed {} with {} documents", collection, loaded);
3262 Ok(loaded)
3263 }
3264
3265 /// Prewarm cache for all collections
3266 pub async fn prewarm_all_collections(
3267 &self,
3268 docs_per_collection: Option<usize>,
3269 ) -> Result<HashMap<String, usize>> {
3270 let mut stats = HashMap::new();
3271
3272 // Get all collections
3273 let collections: Vec<String> = self
3274 .cold
3275 .scan()
3276 .filter_map(|r| r.ok())
3277 .map(|(k, _)| k)
3278 .filter(|k| k.starts_with("_collection:"))
3279 .map(|k| k.trim_start_matches("_collection:").to_string())
3280 .collect();
3281
3282 for collection in collections {
3283 let loaded = self.prewarm_cache(&collection, docs_per_collection).await?;
3284 stats.insert(collection, loaded);
3285 }
3286
3287 Ok(stats)
3288 }
3289
3290 /// Store multiple key-value pairs efficiently in a single batch operation
3291 ///
3292 /// Low-level batch write operation that bypasses document validation and
3293 /// writes raw byte data directly to storage. Useful for advanced use cases,
3294 /// custom serialization, or maximum performance scenarios.
3295 ///
3296 /// # Performance
3297 /// - Write speed: ~100,000 writes/sec
3298 /// - Single disk fsync for entire batch
3299 /// - No validation or schema checking
3300 /// - Direct storage access
3301 ///
3302 /// # Arguments
3303 /// * `pairs` - Vector of (key, value) tuples where value is raw bytes
3304 ///
3305 /// # Returns
3306 /// Success or an error
3307 ///
3308 /// # Examples
3309 ///
3310 /// ```
3311 /// use aurora_db::Aurora;
3312 ///
3313 /// let db = Aurora::open("mydb.db")?;
3314 ///
3315 /// // Low-level batch write
3316 /// let pairs = vec![
3317 /// ("users:123".to_string(), b"raw data 1".to_vec()),
3318 /// ("users:456".to_string(), b"raw data 2".to_vec()),
3319 /// ("cache:key1".to_string(), b"cached value".to_vec()),
3320 /// ];
3321 ///
3322 /// db.batch_write(pairs)?;
3323 ///
3324 /// // Custom binary serialization
3325 /// use bincode;
3326 ///
3327 /// #[derive(Serialize, Deserialize)]
3328 /// struct CustomData {
3329 /// id: u64,
3330 /// payload: Vec<u8>,
3331 /// }
3332 ///
3333 /// let custom_data = vec![
3334 /// CustomData { id: 1, payload: vec![1, 2, 3] },
3335 /// CustomData { id: 2, payload: vec![4, 5, 6] },
3336 /// ];
3337 ///
3338 /// let pairs: Vec<(String, Vec<u8>)> = custom_data
3339 /// .iter()
3340 /// .map(|data| {
3341 /// let key = format!("binary:{}", data.id);
3342 /// let value = bincode::serialize(data).unwrap();
3343 /// (key, value)
3344 /// })
3345 /// .collect();
3346 ///
3347 /// db.batch_write(pairs)?;
3348 ///
3349 /// // Bulk cache population
3350 /// let cache_entries: Vec<(String, Vec<u8>)> = (0..10000)
3351 /// .map(|i| {
3352 /// let key = format!("cache:item_{}", i);
3353 /// let value = format!("value_{}", i).into_bytes();
3354 /// (key, value)
3355 /// })
3356 /// .collect();
3357 ///
3358 /// db.batch_write(cache_entries)?;
3359 /// // Writes 10,000 entries in ~100ms
3360 /// ```
3361 ///
3362 /// # Important Notes
3363 /// - No schema validation performed
3364 /// - No unique constraint checking
3365 /// - No automatic indexing
3366 /// - Keys must follow "collection:id" format for proper grouping
3367 /// - Values are raw bytes - you handle serialization
3368 /// - Use `batch_insert()` for validated document inserts
3369 ///
3370 /// # When to Use
3371 /// - Maximum write performance needed
3372 /// - Custom serialization formats (bincode, msgpack, etc.)
3373 /// - Cache population
3374 /// - Low-level database operations
3375 /// - You're bypassing the document model
3376 ///
3377 /// # When NOT to Use
3378 /// - Regular document inserts → Use `batch_insert()` instead
3379 /// - Need validation → Use `batch_insert()` instead
3380 /// - Need indexing → Use `batch_insert()` instead
3381 ///
3382 /// # See Also
3383 /// - `batch_insert()` for validated document batch inserts
3384 /// - `put()` for single key-value writes
3385 pub fn batch_write(&self, pairs: Vec<(String, Vec<u8>)>) -> Result<()> {
3386 // Group pairs by collection name
3387 let mut collections: HashMap<String, Vec<(String, Vec<u8>)>> = HashMap::new();
3388 for (key, value) in &pairs {
3389 if let Some(collection_name) = key.split(':').next() {
3390 collections
3391 .entry(collection_name.to_string())
3392 .or_default()
3393 .push((key.clone(), value.clone()));
3394 }
3395 }
3396
3397 // First, do the batch write to cold storage for all pairs
3398 self.cold.batch_set(pairs)?;
3399
3400 // Then, process each collection for in-memory updates
3401 for (collection_name, batch) in collections {
3402 // --- Optimized Batch Indexing ---
3403
3404 // 1. Get schema once for the entire collection batch
3405 let collection_obj = match self.schema_cache.get(&collection_name) {
3406 Some(cached_schema) => Arc::clone(cached_schema.value()),
3407 None => {
3408 let collection_key = format!("_collection:{}", collection_name);
3409 match self.get(&collection_key)? {
3410 Some(data) => {
3411 let obj: Collection = serde_json::from_slice(&data)?;
3412 let arc_obj = Arc::new(obj);
3413 self.schema_cache
3414 .insert(collection_name.to_string(), Arc::clone(&arc_obj));
3415 arc_obj
3416 }
3417 None => continue,
3418 }
3419 }
3420 };
3421
3422 let indexed_fields: Vec<String> = collection_obj
3423 .fields
3424 .iter()
3425 .filter(|(_, def)| def.indexed || def.unique)
3426 .map(|(name, _)| name.clone())
3427 .collect();
3428
3429 let primary_index = self
3430 .primary_indices
3431 .entry(collection_name.to_string())
3432 .or_default();
3433
3434 for (key, value) in batch {
3435 // 2. Update hot cache
3436 self.hot.set(key.clone(), value.clone(), None);
3437
3438 // 3. Update primary index with metadata only
3439 let location = DiskLocation::new(value.len());
3440 primary_index.insert(key.clone(), location);
3441
3442 // 4. Update secondary indices
3443 if !indexed_fields.is_empty()
3444 && let Ok(doc) = serde_json::from_slice::<Document>(&value) {
3445 for (field, field_value) in doc.data {
3446 if indexed_fields.contains(&field) {
3447 let value_str = match &field_value {
3448 Value::String(s) => s.clone(),
3449 _ => field_value.to_string(),
3450 };
3451 let index_key = format!("{}:{}", collection_name, field);
3452 let secondary_index = self
3453 .secondary_indices
3454 .entry(index_key)
3455 .or_default();
3456
3457 let max_entries = self.config.max_index_entries_per_field;
3458 secondary_index
3459 .entry(value_str)
3460 .and_modify(|doc_ids| {
3461 if doc_ids.len() < max_entries {
3462 doc_ids.push(key.to_string());
3463 }
3464 })
3465 .or_insert_with(|| vec![key.to_string()]);
3466 }
3467 }
3468 }
3469 }
3470 }
3471
3472 Ok(())
3473 }
3474
3475 /// Scan for keys with a specific prefix
3476 pub fn scan_with_prefix(
3477 &self,
3478 prefix: &str,
3479 ) -> impl Iterator<Item = Result<(String, Vec<u8>)>> + '_ {
3480 self.cold.scan_prefix(prefix)
3481 }
3482
3483 /// Get storage efficiency metrics for the database
3484 pub fn get_collection_stats(&self) -> Result<HashMap<String, CollectionStats>> {
3485 let mut stats = HashMap::new();
3486
3487 // Scan all collections
3488 let collections: Vec<String> = self
3489 .cold
3490 .scan()
3491 .filter_map(|r| r.ok())
3492 .map(|(k, _)| k)
3493 .filter(|k| k.starts_with("_collection:"))
3494 .map(|k| k.trim_start_matches("_collection:").to_string())
3495 .collect();
3496
3497 for collection in collections {
3498 // Use primary index for fast stats (count + size from DiskLocation)
3499 let (count, size) = if let Some(index) = self.primary_indices.get(&collection) {
3500 let count = index.len();
3501 // Sum up sizes from DiskLocation metadata (much faster than disk scan)
3502 let size: usize = index.iter().map(|entry| entry.value().size as usize).sum();
3503 (count, size)
3504 } else {
3505 // Fallback: scan from cold storage if index not available
3506 let prefix = format!("{}:", collection);
3507 let count = self.cold.scan_prefix(&prefix).count();
3508 let size: usize = self
3509 .cold
3510 .scan_prefix(&prefix)
3511 .filter_map(|r| r.ok())
3512 .map(|(_, v)| v.len())
3513 .sum();
3514 (count, size)
3515 };
3516
3517 stats.insert(
3518 collection,
3519 CollectionStats {
3520 count,
3521 size_bytes: size,
3522 avg_doc_size: if count > 0 { size / count } else { 0 },
3523 },
3524 );
3525 }
3526
3527 Ok(stats)
3528 }
3529
3530 /// Search for documents by exact value using an index
3531 ///
3532 /// This method performs a fast lookup using a pre-created index
3533 pub fn search_by_value(
3534 &self,
3535 collection: &str,
3536 field: &str,
3537 value: &Value,
3538 ) -> Result<Vec<Document>> {
3539 let index_key = format!("_index:{}:{}", collection, field);
3540
3541 if let Some(index_data) = self.get(&index_key)? {
3542 let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
3543 let index = Index::new(index_def);
3544
3545 // Use the previously unused search method
3546 if let Some(doc_ids) = index.search(value) {
3547 // Load the documents by ID
3548 let mut docs = Vec::new();
3549 for id in doc_ids {
3550 if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
3551 let doc: Document = serde_json::from_slice(&doc_data)?;
3552 docs.push(doc);
3553 }
3554 }
3555 return Ok(docs);
3556 }
3557 }
3558
3559 // Return empty result if no index or no matches
3560 Ok(Vec::new())
3561 }
3562
3563 /// Perform a full-text search on an indexed text field
3564 ///
3565 /// This provides more advanced text search capabilities including
3566 /// relevance ranking of results
3567 pub fn full_text_search(
3568 &self,
3569 collection: &str,
3570 field: &str,
3571 query: &str,
3572 ) -> Result<Vec<Document>> {
3573 let index_key = format!("_index:{}:{}", collection, field);
3574
3575 if let Some(index_data) = self.get(&index_key)? {
3576 let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
3577
3578 // Ensure this is a full-text index
3579 if !matches!(index_def.index_type, IndexType::FullText) {
3580 return Err(AuroraError::InvalidOperation(format!(
3581 "Field '{}' is not indexed as full-text",
3582 field
3583 )));
3584 }
3585
3586 let index = Index::new(index_def);
3587
3588 // Use the previously unused search_text method
3589 if let Some(doc_id_scores) = index.search_text(query) {
3590 // Load the documents by ID, preserving score order
3591 let mut docs = Vec::new();
3592 for (id, _score) in doc_id_scores {
3593 if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
3594 let doc: Document = serde_json::from_slice(&doc_data)?;
3595 docs.push(doc);
3596 }
3597 }
3598 return Ok(docs);
3599 }
3600 }
3601
3602 // Return empty result if no index or no matches
3603 Ok(Vec::new())
3604 }
3605
3606 /// Create a full-text search index on a text field
3607 pub fn create_text_index(
3608 &self,
3609 collection: &str,
3610 field: &str,
3611 _enable_stop_words: bool,
3612 ) -> Result<()> {
3613 // Check if collection exists
3614 if self.get(&format!("_collection:{}", collection))?.is_none() {
3615 return Err(AuroraError::CollectionNotFound(collection.to_string()));
3616 }
3617
3618 // Create index definition
3619 let index_def = IndexDefinition {
3620 name: format!("{}_{}_fulltext", collection, field),
3621 collection: collection.to_string(),
3622 fields: vec![field.to_string()],
3623 index_type: IndexType::FullText,
3624 unique: false,
3625 };
3626
3627 // Store index definition
3628 let index_key = format!("_index:{}:{}", collection, field);
3629 self.put(index_key, serde_json::to_vec(&index_def)?, None)?;
3630
3631 // Create the actual index
3632 let index = Index::new(index_def);
3633
3634 // Index all existing documents in the collection
3635 let prefix = format!("{}:", collection);
3636 for (_, data) in self.cold.scan_prefix(&prefix).flatten() {
3637 let doc: Document = serde_json::from_slice(&data)?;
3638 index.insert(&doc)?;
3639 }
3640
3641 Ok(())
3642 }
3643
3644 pub async fn execute_simple_query(
3645 &self,
3646 builder: &SimpleQueryBuilder,
3647 ) -> Result<Vec<Document>> {
3648 // Ensure indices are initialized
3649 self.ensure_indices_initialized().await?;
3650
3651 // A place to store the IDs of the documents we need to fetch
3652 let mut doc_ids_to_load: Option<Vec<String>> = None;
3653
3654 // --- The "Query Planner" ---
3655 // Smart heuristic: For range queries with small LIMITs, full scan can be faster
3656 // than collecting millions of IDs from secondary index
3657 let use_index_for_range = if let Some(limit) = builder.limit {
3658 // If limit is small (< 1000), prefer full scan for range queries
3659 // The secondary index would scan all entries anyway, might as well
3660 // scan documents directly and benefit from early termination
3661 limit >= 1000
3662 } else {
3663 // No limit? Index might still help if result set is small
3664 true
3665 };
3666
3667 // Look for an opportunity to use an index
3668 for (_filter_idx, filter) in builder.filters.iter().enumerate() {
3669 match filter {
3670 Filter::Eq(field, value) => {
3671 let index_key = format!("{}:{}", &builder.collection, field);
3672
3673 // Do we have a secondary index for this field?
3674 if let Some(index) = self.secondary_indices.get(&index_key) {
3675 // Yes! Let's use it.
3676 if let Some(matching_ids) = index.get(&value.to_string()) {
3677 doc_ids_to_load = Some(matching_ids.clone());
3678 break; // Stop searching for other indexes for now
3679 }
3680 }
3681 }
3682 Filter::Gt(field, value)
3683 | Filter::Gte(field, value)
3684 | Filter::Lt(field, value)
3685 | Filter::Lte(field, value) => {
3686 // Skip index for range queries with small LIMITs (see query planner heuristic above)
3687 if !use_index_for_range {
3688 continue;
3689 }
3690
3691 let index_key = format!("{}:{}", &builder.collection, field);
3692
3693 // Do we have a secondary index for this field?
3694 if let Some(index) = self.secondary_indices.get(&index_key) {
3695 // For range queries, we need to scan through the index values
3696 let mut matching_ids = Vec::new();
3697
3698 for entry in index.iter() {
3699 let index_value_str = entry.key();
3700
3701 // Try to parse the index value to compare with our filter value
3702 if let Ok(index_value) =
3703 self.parse_value_from_string(index_value_str, value)
3704 {
3705 let matches = match filter {
3706 Filter::Gt(_, filter_val) => index_value > *filter_val,
3707 Filter::Gte(_, filter_val) => index_value >= *filter_val,
3708 Filter::Lt(_, filter_val) => index_value < *filter_val,
3709 Filter::Lte(_, filter_val) => index_value <= *filter_val,
3710 _ => false,
3711 };
3712
3713 if matches {
3714 matching_ids.extend(entry.value().clone());
3715 }
3716 }
3717 }
3718
3719 if !matching_ids.is_empty() {
3720 doc_ids_to_load = Some(matching_ids);
3721 break;
3722 }
3723 }
3724 }
3725 Filter::Contains(field, search_term) => {
3726 let index_key = format!("{}:{}", &builder.collection, field);
3727
3728 // Do we have a secondary index for this field?
3729 if let Some(index) = self.secondary_indices.get(&index_key) {
3730 let mut matching_ids = Vec::new();
3731
3732 for entry in index.iter() {
3733 let index_value_str = entry.key();
3734
3735 // Check if this indexed value contains our search term
3736 if index_value_str
3737 .to_lowercase()
3738 .contains(&search_term.to_lowercase())
3739 {
3740 matching_ids.extend(entry.value().clone());
3741 }
3742 }
3743
3744 if !matching_ids.is_empty() {
3745 // Remove duplicates since a document could match multiple indexed values
3746 matching_ids.sort();
3747 matching_ids.dedup();
3748
3749 doc_ids_to_load = Some(matching_ids);
3750 break;
3751 }
3752 }
3753 }
3754 }
3755 }
3756
3757 let mut final_docs: Vec<Document>;
3758
3759 if let Some(ids) = doc_ids_to_load {
3760 // Index path
3761 use std::io::Write;
3762 if let Ok(mut file) = std::fs::OpenOptions::new()
3763 .create(true)
3764 .append(true)
3765 .open("/tmp/aurora_query_stats.log") {
3766 let _ = writeln!(file, "[INDEX PATH] IDs to load: {} | Collection: {}",
3767 ids.len(), builder.collection);
3768 }
3769
3770 final_docs = Vec::with_capacity(ids.len());
3771
3772 for id in ids {
3773 let doc_key = format!("{}:{}", &builder.collection, id);
3774 if let Some(data) = self.get(&doc_key)?
3775 && let Ok(doc) = serde_json::from_slice::<Document>(&data) {
3776 final_docs.push(doc);
3777 }
3778 }
3779 } else {
3780 // --- Path 2: Full Collection Scan with Early Termination ---
3781
3782 // Optimization: If we have a LIMIT but no ORDER BY, we can stop scanning
3783 // as soon as we have enough matching documents
3784 let early_termination_target = if builder.order_by.is_none() {
3785 builder.limit.map(|l| l + builder.offset.unwrap_or(0))
3786 } else {
3787 // With ORDER BY, we need all matching docs to sort correctly
3788 None
3789 };
3790
3791 // Smart scan with early termination support
3792 final_docs = Vec::new();
3793 let mut scan_stats = (0usize, 0usize, 0usize); // (keys_scanned, docs_fetched, matches_found)
3794
3795 if let Some(index) = self.primary_indices.get(&builder.collection) {
3796 for entry in index.iter() {
3797 let key = entry.key();
3798 scan_stats.0 += 1; // keys scanned
3799
3800 // Early termination check
3801 if let Some(target) = early_termination_target {
3802 if final_docs.len() >= target {
3803 break; // We have enough documents!
3804 }
3805 }
3806
3807 // Fetch and filter document
3808 if let Some(data) = self.get(key)? {
3809 scan_stats.1 += 1; // docs fetched
3810
3811 if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
3812 // Apply all filters
3813 let matches_all_filters = builder.filters.iter().all(|filter| {
3814 match filter {
3815 Filter::Eq(field, value) => doc.data.get(field) == Some(value),
3816 Filter::Gt(field, value) => doc.data.get(field).is_some_and(|v| v > value),
3817 Filter::Gte(field, value) => doc.data.get(field).is_some_and(|v| v >= value),
3818 Filter::Lt(field, value) => doc.data.get(field).is_some_and(|v| v < value),
3819 Filter::Lte(field, value) => doc.data.get(field).is_some_and(|v| v <= value),
3820 Filter::Contains(field, value_str) => {
3821 doc.data.get(field).is_some_and(|v| match v {
3822 Value::String(s) => s.contains(value_str),
3823 Value::Array(arr) => arr.contains(&Value::String(value_str.clone())),
3824 _ => false,
3825 })
3826 }
3827 }
3828 });
3829
3830 if matches_all_filters {
3831 scan_stats.2 += 1; // matches found
3832 final_docs.push(doc);
3833 }
3834 }
3835 }
3836 }
3837
3838 // Debug logging for query performance analysis
3839 use std::io::Write;
3840 if let Ok(mut file) = std::fs::OpenOptions::new()
3841 .create(true)
3842 .append(true)
3843 .open("/tmp/aurora_query_stats.log") {
3844 let _ = writeln!(file, "[SCAN PATH] Scanned: {} keys | Fetched: {} docs | Matched: {} | Collection: {}",
3845 scan_stats.0, scan_stats.1, scan_stats.2, builder.collection);
3846 }
3847 } else {
3848 // Fallback: scan from cold storage if index not initialized
3849 final_docs = self.get_all_collection(&builder.collection).await?;
3850
3851 // Apply filters
3852 final_docs.retain(|doc| {
3853 builder.filters.iter().all(|filter| {
3854 match filter {
3855 Filter::Eq(field, value) => doc.data.get(field) == Some(value),
3856 Filter::Gt(field, value) => doc.data.get(field).is_some_and(|v| v > value),
3857 Filter::Gte(field, value) => doc.data.get(field).is_some_and(|v| v >= value),
3858 Filter::Lt(field, value) => doc.data.get(field).is_some_and(|v| v < value),
3859 Filter::Lte(field, value) => doc.data.get(field).is_some_and(|v| v <= value),
3860 Filter::Contains(field, value_str) => {
3861 doc.data.get(field).is_some_and(|v| match v {
3862 Value::String(s) => s.contains(value_str),
3863 Value::Array(arr) => arr.contains(&Value::String(value_str.clone())),
3864 _ => false,
3865 })
3866 }
3867 }
3868 })
3869 });
3870 }
3871 }
3872
3873 // Apply ordering
3874 if let Some((field, ascending)) = &builder.order_by {
3875 final_docs.sort_by(|a, b| match (a.data.get(field), b.data.get(field)) {
3876 (Some(v1), Some(v2)) => {
3877 let cmp = v1.cmp(v2);
3878 if *ascending {
3879 cmp
3880 } else {
3881 cmp.reverse()
3882 }
3883 }
3884 (None, Some(_)) => std::cmp::Ordering::Less,
3885 (Some(_), None) => std::cmp::Ordering::Greater,
3886 (None, None) => std::cmp::Ordering::Equal,
3887 });
3888 }
3889
3890 // Apply offset and limit
3891 let start = builder.offset.unwrap_or(0);
3892 let end = builder
3893 .limit
3894 .map(|l| start.saturating_add(l))
3895 .unwrap_or(final_docs.len());
3896
3897 let end = end.min(final_docs.len());
3898 Ok(final_docs.get(start..end).unwrap_or(&[]).to_vec())
3899 }
3900
3901 /// Helper method to parse a string value back to a Value for comparison
3902 fn parse_value_from_string(&self, value_str: &str, reference_value: &Value) -> Result<Value> {
3903 match reference_value {
3904 Value::Int(_) => {
3905 if let Ok(i) = value_str.parse::<i64>() {
3906 Ok(Value::Int(i))
3907 } else {
3908 Err(AuroraError::InvalidOperation("Failed to parse int".into()))
3909 }
3910 }
3911 Value::Float(_) => {
3912 if let Ok(f) = value_str.parse::<f64>() {
3913 Ok(Value::Float(f))
3914 } else {
3915 Err(AuroraError::InvalidOperation(
3916 "Failed to parse float".into(),
3917 ))
3918 }
3919 }
3920 Value::String(_) => Ok(Value::String(value_str.to_string())),
3921 _ => Ok(Value::String(value_str.to_string())),
3922 }
3923 }
3924
3925 pub async fn execute_dynamic_query(
3926 &self,
3927 collection: &str,
3928 payload: &QueryPayload,
3929 ) -> Result<Vec<Document>> {
3930 let mut docs = self.get_all_collection(collection).await?;
3931
3932 // 1. Apply Filters
3933 if let Some(filters) = &payload.filters {
3934 docs.retain(|doc| {
3935 filters.iter().all(|filter| {
3936 doc.data
3937 .get(&filter.field)
3938 .is_some_and(|doc_val| check_filter(doc_val, filter))
3939 })
3940 });
3941 }
3942
3943 // 2. Apply Sorting
3944 if let Some(sort_options) = &payload.sort {
3945 docs.sort_by(|a, b| {
3946 let a_val = a.data.get(&sort_options.field);
3947 let b_val = b.data.get(&sort_options.field);
3948 let ordering = a_val
3949 .partial_cmp(&b_val)
3950 .unwrap_or(std::cmp::Ordering::Equal);
3951 if sort_options.ascending {
3952 ordering
3953 } else {
3954 ordering.reverse()
3955 }
3956 });
3957 }
3958
3959 // 3. Apply Pagination
3960 if let Some(offset) = payload.offset {
3961 docs = docs.into_iter().skip(offset).collect();
3962 }
3963 if let Some(limit) = payload.limit {
3964 docs = docs.into_iter().take(limit).collect();
3965 }
3966
3967 // 4. Apply Field Selection (Projection)
3968 if let Some(select_fields) = &payload.select
3969 && !select_fields.is_empty() {
3970 docs = docs
3971 .into_iter()
3972 .map(|mut doc| {
3973 doc.data.retain(|key, _| select_fields.contains(key));
3974 doc
3975 })
3976 .collect();
3977 }
3978
3979 Ok(docs)
3980 }
3981
3982 pub async fn process_network_request(
3983 &self,
3984 request: crate::network::protocol::Request,
3985 ) -> crate::network::protocol::Response {
3986 use crate::network::protocol::Response;
3987
3988 match request {
3989 crate::network::protocol::Request::Get(key) => match self.get(&key) {
3990 Ok(value) => Response::Success(value),
3991 Err(e) => Response::Error(e.to_string()),
3992 },
3993 crate::network::protocol::Request::Put(key, value) => {
3994 match self.put(key, value, None) {
3995 Ok(_) => Response::Done,
3996 Err(e) => Response::Error(e.to_string()),
3997 }
3998 }
3999 crate::network::protocol::Request::Delete(key) => match self.delete(&key).await {
4000 Ok(_) => Response::Done,
4001 Err(e) => Response::Error(e.to_string()),
4002 },
4003 crate::network::protocol::Request::NewCollection { name, fields } => {
4004 let fields_for_db: Vec<(String, crate::types::FieldType, bool)> = fields
4005 .iter()
4006 .map(|(name, ft, unique)| (name.clone(), ft.clone(), *unique))
4007 .collect();
4008
4009 match self.new_collection(&name, fields_for_db) {
4010 Ok(_) => Response::Done,
4011 Err(e) => Response::Error(e.to_string()),
4012 }
4013 }
4014 crate::network::protocol::Request::Insert { collection, data } => {
4015 match self.insert_map(&collection, data).await {
4016 Ok(id) => Response::Message(id),
4017 Err(e) => Response::Error(e.to_string()),
4018 }
4019 }
4020 crate::network::protocol::Request::GetDocument { collection, id } => {
4021 match self.get_document(&collection, &id) {
4022 Ok(doc) => Response::Document(doc),
4023 Err(e) => Response::Error(e.to_string()),
4024 }
4025 }
4026 crate::network::protocol::Request::Query(builder) => {
4027 match self.execute_simple_query(&builder).await {
4028 Ok(docs) => Response::Documents(docs),
4029 Err(e) => Response::Error(e.to_string()),
4030 }
4031 }
4032 crate::network::protocol::Request::BeginTransaction => {
4033 let tx_id = self.begin_transaction();
4034 Response::TransactionId(tx_id.as_u64())
4035 }
4036 crate::network::protocol::Request::CommitTransaction(tx_id_u64) => {
4037 let tx_id = crate::transaction::TransactionId::from_u64(tx_id_u64);
4038 match self.commit_transaction(tx_id) {
4039 Ok(_) => Response::Done,
4040 Err(e) => Response::Error(e.to_string()),
4041 }
4042 }
4043 crate::network::protocol::Request::RollbackTransaction(tx_id_u64) => {
4044 let tx_id = crate::transaction::TransactionId::from_u64(tx_id_u64);
4045 match self.rollback_transaction(tx_id) {
4046 Ok(_) => Response::Done,
4047 Err(e) => Response::Error(e.to_string()),
4048 }
4049 }
4050 }
4051 }
4052
4053 /// Create indices for commonly queried fields automatically
4054 ///
4055 /// This is a convenience method that creates indices for fields that are
4056 /// likely to be queried frequently, improving performance.
4057 ///
4058 /// # Arguments
4059 /// * `collection` - Name of the collection
4060 /// * `fields` - List of field names to create indices for
4061 ///
4062 /// # Examples
4063 /// ```
4064 /// // Create indices for commonly queried fields
4065 /// db.create_indices("users", &["email", "status", "created_at"]).await?;
4066 /// ```
4067 pub async fn create_indices(&self, collection: &str, fields: &[&str]) -> Result<()> {
4068 for field in fields {
4069 if let Err(e) = self.create_index(collection, field).await {
4070 eprintln!(
4071 "Warning: Failed to create index for {}.{}: {}",
4072 collection, field, e
4073 );
4074 } else {
4075 println!("Created index for {}.{}", collection, field);
4076 }
4077 }
4078 Ok(())
4079 }
4080
4081 /// Get index statistics for a collection
4082 ///
4083 /// This helps understand which indices exist and how effective they are.
4084 pub fn get_index_stats(&self, collection: &str) -> HashMap<String, IndexStats> {
4085 let mut stats = HashMap::new();
4086
4087 for entry in self.secondary_indices.iter() {
4088 let key = entry.key();
4089 if key.starts_with(&format!("{}:", collection)) {
4090 let field = key.split(':').nth(1).unwrap_or("unknown");
4091 let index = entry.value();
4092
4093 let unique_values = index.len();
4094 let total_documents: usize = index.iter().map(|entry| entry.value().len()).sum();
4095
4096 stats.insert(
4097 field.to_string(),
4098 IndexStats {
4099 unique_values,
4100 total_documents,
4101 avg_docs_per_value: if unique_values > 0 {
4102 total_documents / unique_values
4103 } else {
4104 0
4105 },
4106 },
4107 );
4108 }
4109 }
4110
4111 stats
4112 }
4113
4114 /// Optimize a collection by creating indices for frequently filtered fields
4115 ///
4116 /// This analyzes common query patterns and suggests/creates optimal indices.
4117 pub async fn optimize_collection(&self, collection: &str) -> Result<()> {
4118 if let Ok(collection_def) = self.get_collection_definition(collection) {
4119 let field_names: Vec<&str> = collection_def.fields.keys().map(|s| s.as_str()).collect();
4120 self.create_indices(collection, &field_names).await?;
4121 }
4122
4123 Ok(())
4124 }
4125
4126 // Helper method to get unique fields from a collection
4127 fn get_unique_fields(&self, collection: &Collection) -> Vec<String> {
4128 collection
4129 .fields
4130 .iter()
4131 .filter(|(_, def)| def.unique)
4132 .map(|(name, _)| name.clone())
4133 .collect()
4134 }
4135
4136 // Update the validation method to use the helper
4137 async fn validate_unique_constraints(
4138 &self,
4139 collection: &str,
4140 data: &HashMap<String, Value>,
4141 ) -> Result<()> {
4142 self.ensure_indices_initialized().await?;
4143 let collection_def = self.get_collection_definition(collection)?;
4144 let unique_fields = self.get_unique_fields(&collection_def);
4145
4146 for unique_field in &unique_fields {
4147 if let Some(value) = data.get(unique_field) {
4148 let index_key = format!("{}:{}", collection, unique_field);
4149 if let Some(index) = self.secondary_indices.get(&index_key) {
4150 // Get the raw string value without JSON formatting
4151 let value_str = match value {
4152 Value::String(s) => s.clone(),
4153 _ => value.to_string(),
4154 };
4155 if index.contains_key(&value_str) {
4156 return Err(AuroraError::UniqueConstraintViolation(
4157 unique_field.clone(),
4158 value_str,
4159 ));
4160 }
4161 }
4162 }
4163 }
4164 Ok(())
4165 }
4166
4167 /// Validate unique constraints excluding a specific document ID (for updates)
4168 async fn validate_unique_constraints_excluding(
4169 &self,
4170 collection: &str,
4171 data: &HashMap<String, Value>,
4172 exclude_id: &str,
4173 ) -> Result<()> {
4174 self.ensure_indices_initialized().await?;
4175 let collection_def = self.get_collection_definition(collection)?;
4176 let unique_fields = self.get_unique_fields(&collection_def);
4177
4178 for unique_field in &unique_fields {
4179 if let Some(value) = data.get(unique_field) {
4180 let index_key = format!("{}:{}", collection, unique_field);
4181 if let Some(index) = self.secondary_indices.get(&index_key) {
4182 // Get the raw string value without JSON formatting
4183 let value_str = match value {
4184 Value::String(s) => s.clone(),
4185 _ => value.to_string(),
4186 };
4187 if let Some(doc_ids) = index.get(&value_str) {
4188 // Check if any document other than the excluded one has this value
4189 let exclude_key = format!("{}:{}", collection, exclude_id);
4190 for doc_key in doc_ids.value() {
4191 if doc_key != &exclude_key {
4192 return Err(AuroraError::UniqueConstraintViolation(
4193 unique_field.clone(),
4194 value_str,
4195 ));
4196 }
4197 }
4198 }
4199 }
4200 }
4201 }
4202 Ok(())
4203 }
4204}
4205
4206impl Drop for Aurora {
4207 fn drop(&mut self) {
4208 // Signal checkpoint task to shutdown gracefully
4209 if let Some(ref shutdown_tx) = self.checkpoint_shutdown {
4210 let _ = shutdown_tx.send(());
4211 }
4212 // Signal compaction task to shutdown gracefully
4213 if let Some(ref shutdown_tx) = self.compaction_shutdown {
4214 let _ = shutdown_tx.send(());
4215 }
4216 }
4217}
4218
4219fn check_filter(doc_val: &Value, filter: &HttpFilter) -> bool {
4220 let filter_val = match json_to_value(&filter.value) {
4221 Ok(v) => v,
4222 Err(_) => return false,
4223 };
4224
4225 match filter.operator {
4226 FilterOperator::Eq => doc_val == &filter_val,
4227 FilterOperator::Ne => doc_val != &filter_val,
4228 FilterOperator::Gt => doc_val > &filter_val,
4229 FilterOperator::Gte => doc_val >= &filter_val,
4230 FilterOperator::Lt => doc_val < &filter_val,
4231 FilterOperator::Lte => doc_val <= &filter_val,
4232 FilterOperator::Contains => match (doc_val, &filter_val) {
4233 (Value::String(s), Value::String(fv)) => s.contains(fv),
4234 (Value::Array(arr), _) => arr.contains(&filter_val),
4235 _ => false,
4236 },
4237 }
4238}
4239
4240/// Results of importing a document
4241enum ImportResult {
4242 Imported,
4243 Skipped,
4244}
4245
4246/// Statistics from an import operation
4247#[derive(Debug, Default)]
4248pub struct ImportStats {
4249 /// Number of documents successfully imported
4250 pub imported: usize,
4251 /// Number of documents skipped (usually because they already exist)
4252 pub skipped: usize,
4253 /// Number of documents that failed to import
4254 pub failed: usize,
4255}
4256
4257/// Statistics for a specific collection
4258#[derive(Debug)]
4259pub struct CollectionStats {
4260 /// Number of documents in the collection
4261 pub count: usize,
4262 /// Total size of the collection in bytes
4263 pub size_bytes: usize,
4264 /// Average document size in bytes
4265 pub avg_doc_size: usize,
4266}
4267
4268/// Statistics for an index
4269#[derive(Debug)]
4270pub struct IndexStats {
4271 /// Number of unique values in the index
4272 pub unique_values: usize,
4273 /// Total number of documents covered by the index
4274 pub total_documents: usize,
4275 /// Average number of documents per unique value
4276 pub avg_docs_per_value: usize,
4277}
4278
4279/// Combined database statistics
4280#[derive(Debug)]
4281pub struct DatabaseStats {
4282 /// Hot cache statistics
4283 pub hot_stats: crate::storage::hot::CacheStats,
4284 /// Cold storage statistics
4285 pub cold_stats: crate::storage::cold::ColdStoreStats,
4286 /// Estimated total database size in bytes
4287 pub estimated_size: u64,
4288 /// Statistics for each collection
4289 pub collections: HashMap<String, CollectionStats>,
4290}
4291
4292#[cfg(test)]
4293mod tests {
4294 use super::*;
4295 use tempfile::tempdir;
4296
4297 #[tokio::test]
4298 async fn test_basic_operations() -> Result<()> {
4299 let temp_dir = tempdir()?;
4300 let db_path = temp_dir.path().join("test.aurora");
4301 let db = Aurora::open(db_path.to_str().unwrap())?;
4302
4303 // Test collection creation
4304 db.new_collection(
4305 "users",
4306 vec![
4307 ("name", FieldType::String, false),
4308 ("age", FieldType::Int, false),
4309 ("email", FieldType::String, true),
4310 ],
4311 )?;
4312
4313 // Test document insertion
4314 let doc_id = db
4315 .insert_into(
4316 "users",
4317 vec![
4318 ("name", Value::String("John Doe".to_string())),
4319 ("age", Value::Int(30)),
4320 ("email", Value::String("john@example.com".to_string())),
4321 ],
4322 )
4323 .await?;
4324
4325 // Test document retrieval
4326 let doc = db.get_document("users", &doc_id)?.unwrap();
4327 assert_eq!(
4328 doc.data.get("name").unwrap(),
4329 &Value::String("John Doe".to_string())
4330 );
4331 assert_eq!(doc.data.get("age").unwrap(), &Value::Int(30));
4332
4333 Ok(())
4334 }
4335
4336 #[tokio::test]
4337 async fn test_transactions() -> Result<()> {
4338 let temp_dir = tempdir()?;
4339 let db_path = temp_dir.path().join("test.aurora");
4340 let db = Aurora::open(db_path.to_str().unwrap())?;
4341
4342 // Create collection
4343 db.new_collection("test", vec![("field", FieldType::String, false)])?;
4344
4345 // Start transaction
4346 let tx_id = db.begin_transaction();
4347
4348 // Insert document
4349 let doc_id = db
4350 .insert_into("test", vec![("field", Value::String("value".to_string()))])
4351 .await?;
4352
4353 // Commit transaction
4354 db.commit_transaction(tx_id)?;
4355
4356 // Verify document exists
4357 let doc = db.get_document("test", &doc_id)?.unwrap();
4358 assert_eq!(
4359 doc.data.get("field").unwrap(),
4360 &Value::String("value".to_string())
4361 );
4362
4363 Ok(())
4364 }
4365
4366 #[tokio::test]
4367 async fn test_query_operations() -> Result<()> {
4368 let temp_dir = tempdir()?;
4369 let db_path = temp_dir.path().join("test.aurora");
4370 let db = Aurora::open(db_path.to_str().unwrap())?;
4371
4372 // Test collection creation
4373 db.new_collection(
4374 "books",
4375 vec![
4376 ("title", FieldType::String, false),
4377 ("author", FieldType::String, false),
4378 ("year", FieldType::Int, false),
4379 ],
4380 )?;
4381
4382 // Test document insertion
4383 db.insert_into(
4384 "books",
4385 vec![
4386 ("title", Value::String("Book 1".to_string())),
4387 ("author", Value::String("Author 1".to_string())),
4388 ("year", Value::Int(2020)),
4389 ],
4390 )
4391 .await?;
4392
4393 db.insert_into(
4394 "books",
4395 vec![
4396 ("title", Value::String("Book 2".to_string())),
4397 ("author", Value::String("Author 2".to_string())),
4398 ("year", Value::Int(2021)),
4399 ],
4400 )
4401 .await?;
4402
4403 // Test query
4404 let results = db
4405 .query("books")
4406 .filter(|f| f.gt("year", Value::Int(2019)))
4407 .order_by("year", true)
4408 .collect()
4409 .await?;
4410
4411 assert_eq!(results.len(), 2);
4412 assert!(results[0].data.get("year").unwrap() < results[1].data.get("year").unwrap());
4413
4414 Ok(())
4415 }
4416
4417 #[tokio::test]
4418 async fn test_blob_operations() -> Result<()> {
4419 let temp_dir = tempdir()?;
4420 let db_path = temp_dir.path().join("test.aurora");
4421 let db = Aurora::open(db_path.to_str().unwrap())?;
4422
4423 // Create test file
4424 let file_path = temp_dir.path().join("test.txt");
4425 std::fs::write(&file_path, b"Hello, World!")?;
4426
4427 // Test blob storage
4428 db.put_blob("test:blob".to_string(), &file_path).await?;
4429
4430 // Verify blob exists
4431 let data = db.get_data_by_pattern("test:blob")?;
4432 assert_eq!(data.len(), 1);
4433 match &data[0].1 {
4434 DataInfo::Blob { size } => assert_eq!(*size, 13 + 5), // content + "BLOB:" prefix
4435 _ => panic!("Expected Blob type"),
4436 }
4437
4438 Ok(())
4439 }
4440
4441 #[tokio::test]
4442 async fn test_blob_size_limit() -> Result<()> {
4443 let temp_dir = tempdir()?;
4444 let db_path = temp_dir.path().join("test.aurora");
4445 let db = Aurora::open(db_path.to_str().unwrap())?;
4446
4447 // Create a test file that's too large (201MB)
4448 let large_file_path = temp_dir.path().join("large_file.bin");
4449 let large_data = vec![0u8; 201 * 1024 * 1024];
4450 std::fs::write(&large_file_path, &large_data)?;
4451
4452 // Attempt to store the large file
4453 let result = db
4454 .put_blob("test:large_blob".to_string(), &large_file_path)
4455 .await;
4456
4457 assert!(result.is_err());
4458 assert!(matches!(
4459 result.unwrap_err(),
4460 AuroraError::InvalidOperation(_)
4461 ));
4462
4463 Ok(())
4464 }
4465
4466 #[tokio::test]
4467 async fn test_unique_constraints() -> Result<()> {
4468 let temp_dir = tempdir()?;
4469 let db_path = temp_dir.path().join("test.aurora");
4470 let db = Aurora::open(db_path.to_str().unwrap())?;
4471
4472 // Create collection with unique email field
4473 db.new_collection(
4474 "users",
4475 vec![
4476 ("name", FieldType::String, false),
4477 ("email", FieldType::String, true), // unique field
4478 ("age", FieldType::Int, false),
4479 ],
4480 )?;
4481
4482 // Insert first document
4483 let _doc_id1 = db
4484 .insert_into(
4485 "users",
4486 vec![
4487 ("name", Value::String("John Doe".to_string())),
4488 ("email", Value::String("john@example.com".to_string())),
4489 ("age", Value::Int(30)),
4490 ],
4491 )
4492 .await?;
4493
4494 // Try to insert second document with same email - should fail
4495 let result = db
4496 .insert_into(
4497 "users",
4498 vec![
4499 ("name", Value::String("Jane Doe".to_string())),
4500 ("email", Value::String("john@example.com".to_string())), // duplicate email
4501 ("age", Value::Int(25)),
4502 ],
4503 )
4504 .await;
4505
4506 assert!(result.is_err());
4507 if let Err(AuroraError::UniqueConstraintViolation(field, value)) = result {
4508 assert_eq!(field, "email");
4509 assert_eq!(value, "john@example.com");
4510 } else {
4511 panic!("Expected UniqueConstraintViolation error");
4512 }
4513
4514 // Test upsert with unique constraint
4515 // Should succeed for new document
4516 let _doc_id2 = db
4517 .upsert(
4518 "users",
4519 "user2",
4520 vec![
4521 ("name", Value::String("Alice Smith".to_string())),
4522 ("email", Value::String("alice@example.com".to_string())),
4523 ("age", Value::Int(28)),
4524 ],
4525 )
4526 .await?;
4527
4528 // Should fail when trying to upsert with duplicate email
4529 let result = db
4530 .upsert(
4531 "users",
4532 "user3",
4533 vec![
4534 ("name", Value::String("Bob Wilson".to_string())),
4535 ("email", Value::String("alice@example.com".to_string())), // duplicate
4536 ("age", Value::Int(35)),
4537 ],
4538 )
4539 .await;
4540
4541 assert!(result.is_err());
4542
4543 // Should succeed when updating existing document with same email (no change)
4544 let result = db
4545 .upsert(
4546 "users",
4547 "user2",
4548 vec![
4549 ("name", Value::String("Alice Updated".to_string())),
4550 ("email", Value::String("alice@example.com".to_string())), // same email, same doc
4551 ("age", Value::Int(29)),
4552 ],
4553 )
4554 .await;
4555
4556 assert!(result.is_ok());
4557
4558 Ok(())
4559 }
4560}