aurora_db/db.rs
1//! # Aurora Database
2//!
3//! Aurora is an embedded document database with tiered storage architecture.
4//! It provides document storage, querying, indexing, and search capabilities
5//! while optimizing for both performance and durability.
6//!
7//! ## Key Features
8//!
9//! * **Tiered Storage**: Hot in-memory cache + persistent cold storage
10//! * **Document Model**: Schema-flexible JSON-like document storage
11//! * **Querying**: Rich query capabilities with filtering and sorting
12//! * **Full-text Search**: Built-in search engine with relevance ranking
13//! * **Transactions**: ACID-compliant transaction support
14//! * **Blob Storage**: Efficient storage for large binary objects
15//!
16//! ## Usage Example
17//!
18//! ```rust
19//! use aurora::Aurora;
20//!
21//! // Open a database
22//! let db = Aurora::open("my_database.db")?;
23//!
24//! // Create a collection with schema
25//! db.new_collection("users", vec![
26//! ("name", FieldType::String, false),
27//! ("email", FieldType::String, true), // unique field
28//! ("age", FieldType::Int, false),
29//! ])?;
30//!
31//! // Insert a document
32//! let user_id = db.insert_into("users", vec![
33//! ("name", Value::String("Jane Doe".to_string())),
34//! ("email", Value::String("jane@example.com".to_string())),
35//! ("age", Value::Int(28)),
36//! ])?;
37//!
38//! // Query for documents
39//! let adult_users = db.query("users")
40//! .filter(|f| f.gt("age", 18))
41//! .order_by("name", true)
42//! .collect()
43//! .await?;
44//! ```
45
46use crate::error::{AuroraError, Result};
47use crate::index::{Index, IndexDefinition, IndexType};
48use crate::network::http_models::{
49 Filter as HttpFilter, FilterOperator, QueryPayload, json_to_value,
50};
51use crate::query::{Filter, FilterBuilder, QueryBuilder, SearchBuilder, SimpleQueryBuilder};
52use crate::storage::{ColdStore, HotStore, WriteBuffer};
53use crate::types::{
54 AuroraConfig, Collection, Document, DurabilityMode, FieldDefinition, FieldType, Value,
55};
56use crate::wal::{Operation, WriteAheadLog};
57use dashmap::DashMap;
58use serde_json::Value as JsonValue;
59use serde_json::from_str;
60use std::collections::HashMap;
61use std::fmt;
62use std::fs::File as StdFile;
63use std::path::{Path, PathBuf};
64use std::sync::{Arc, RwLock};
65use std::time::Duration;
66use tokio::fs::File;
67use tokio::fs::read_to_string;
68use tokio::io::AsyncReadExt;
69use tokio::sync::OnceCell;
70use uuid::Uuid;
71
72// Disk location metadata for primary index
73// Instead of storing full Vec<u8> values, we store minimal metadata
74#[derive(Debug, Clone, Copy)]
75struct DiskLocation {
76 size: u32, // Size in bytes (useful for statistics)
77}
78
79impl DiskLocation {
80 fn new(size: usize) -> Self {
81 Self {
82 size: size as u32,
83 }
84 }
85}
86
87// Index types for faster lookups
88type PrimaryIndex = DashMap<String, DiskLocation>;
89type SecondaryIndex = DashMap<String, Vec<String>>;
90
91// Move DataInfo enum outside impl block
92#[derive(Debug)]
93pub enum DataInfo {
94 Data { size: usize, preview: String },
95 Blob { size: usize },
96 Compressed { size: usize },
97}
98
99impl DataInfo {
100 pub fn size(&self) -> usize {
101 match self {
102 DataInfo::Data { size, .. } => *size,
103 DataInfo::Blob { size } => *size,
104 DataInfo::Compressed { size } => *size,
105 }
106 }
107}
108
109/// The main database engine
110///
111/// Aurora combines a tiered storage architecture with document-oriented database features:
112/// - Hot tier: In-memory cache for frequently accessed data
113/// - Cold tier: Persistent disk storage for durability
114/// - Primary indices: Fast key-based access
115/// - Secondary indices: Fast field-based queries
116///
117/// # Examples
118///
119/// ```
120/// // Open a database (creates if doesn't exist)
121/// let db = Aurora::open("my_app.db")?;
122///
123/// // Insert a document
124/// let doc_id = db.insert_into("users", vec![
125/// ("name", Value::String("Alice".to_string())),
126/// ("age", Value::Int(32)),
127/// ])?;
128///
129/// // Retrieve a document
130/// let user = db.get_document("users", &doc_id)?;
131/// ```
132pub struct Aurora {
133 hot: HotStore,
134 cold: Arc<ColdStore>,
135 primary_indices: Arc<DashMap<String, PrimaryIndex>>,
136 secondary_indices: Arc<DashMap<String, SecondaryIndex>>,
137 indices_initialized: Arc<OnceCell<()>>,
138 transaction_manager: crate::transaction::TransactionManager,
139 indices: Arc<DashMap<String, Index>>,
140 schema_cache: Arc<DashMap<String, Arc<Collection>>>,
141 config: AuroraConfig,
142 write_buffer: Option<WriteBuffer>,
143 pubsub: crate::pubsub::PubSubSystem,
144 // Write-ahead log for durability (optional, based on config)
145 wal: Option<Arc<RwLock<WriteAheadLog>>>,
146 // Background checkpoint task
147 checkpoint_shutdown: Option<tokio::sync::mpsc::UnboundedSender<()>>,
148 // Background compaction task
149 compaction_shutdown: Option<tokio::sync::mpsc::UnboundedSender<()>>,
150}
151
152impl fmt::Debug for Aurora {
153 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
154 f.debug_struct("Aurora")
155 .field("hot", &"HotStore")
156 .field("cold", &"ColdStore")
157 .field("primary_indices_count", &self.primary_indices.len())
158 .field("secondary_indices_count", &self.secondary_indices.len())
159 .field(
160 "active_transactions",
161 &self.transaction_manager.active_count(),
162 )
163 .field("indices_count", &self.indices.len())
164 .finish()
165 }
166}
167
168impl Aurora {
169 /// Open or create a database at the specified location
170 ///
171 /// # Arguments
172 /// * `path` - Path to the database file or directory
173 /// - Absolute paths (like `/data/myapp.db`) are used as-is
174 /// - Relative paths (like `./data/myapp.db`) are resolved relative to the current directory
175 /// - Simple names (like `myapp.db`) use the current directory
176 ///
177 /// # Returns
178 /// An initialized `Aurora` database instance
179 ///
180 /// # Examples
181 ///
182 /// ```
183 /// // Use a specific location
184 /// let db = Aurora::open("./data/my_application.db")?;
185 ///
186 /// // Just use a name (creates in current directory)
187 /// let db = Aurora::open("customer_data.db")?;
188 /// ```
189 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
190 let config = AuroraConfig {
191 db_path: Self::resolve_path(path)?,
192 ..Default::default()
193 };
194 Self::with_config(config)
195 }
196
197 /// Helper method to resolve database path
198 fn resolve_path<P: AsRef<Path>>(path: P) -> Result<PathBuf> {
199 let path = path.as_ref();
200
201 // If it's an absolute path, use it directly
202 if path.is_absolute() {
203 return Ok(path.to_path_buf());
204 }
205
206 // Otherwise, resolve relative to current directory
207 match std::env::current_dir() {
208 Ok(current_dir) => Ok(current_dir.join(path)),
209 Err(e) => Err(AuroraError::IoError(format!(
210 "Failed to resolve current directory: {}",
211 e
212 ))),
213 }
214 }
215
216 /// Open a database with custom configuration
217 ///
218 /// # Arguments
219 /// * `config` - Database configuration settings
220 ///
221 /// # Examples
222 ///
223 /// ```
224 /// use aurora_db::{Aurora, types::AuroraConfig};
225 /// use std::time::Duration;
226 ///
227 /// let config = AuroraConfig {
228 /// db_path: "my_data.db".into(),
229 /// hot_cache_size_mb: 512, // 512 MB cache
230 /// enable_write_buffering: true, // Batch writes for speed
231 /// enable_wal: true, // Durability
232 /// auto_compact: true, // Background compaction
233 /// compact_interval_mins: 60, // Compact every hour
234 /// ..Default::default()
235 /// };
236 ///
237 /// let db = Aurora::with_config(config)?;
238 /// ```
239 pub fn with_config(config: AuroraConfig) -> Result<Self> {
240 let path = Self::resolve_path(&config.db_path)?;
241
242 if config.create_dirs
243 && let Some(parent) = path.parent()
244 && !parent.exists() {
245 std::fs::create_dir_all(parent)?;
246 }
247
248 // Fix method calls to pass all required parameters
249 let cold = Arc::new(ColdStore::with_config(
250 path.to_str().unwrap(),
251 config.cold_cache_capacity_mb,
252 config.cold_flush_interval_ms,
253 config.cold_mode,
254 )?);
255
256 let hot = HotStore::with_config_and_eviction(
257 config.hot_cache_size_mb,
258 config.hot_cache_cleanup_interval_secs,
259 config.eviction_policy,
260 );
261
262 // Initialize write buffer if enabled
263 let write_buffer = if config.enable_write_buffering {
264 Some(WriteBuffer::new(
265 Arc::clone(&cold),
266 config.write_buffer_size,
267 config.write_buffer_flush_interval_ms,
268 ))
269 } else {
270 None
271 };
272
273 // Store auto_compact before moving config
274 let auto_compact = config.auto_compact;
275 let enable_wal = config.enable_wal;
276
277 let pubsub = crate::pubsub::PubSubSystem::new(10000);
278
279 // Initialize WAL if enabled and check for recovery
280 let (wal, wal_entries) = if enable_wal {
281 let wal_path = path.to_str().unwrap();
282 match WriteAheadLog::new(wal_path) {
283 Ok(mut wal_log) => {
284 let entries = wal_log.recover().unwrap_or_else(|_| Vec::new());
285 (Some(Arc::new(RwLock::new(wal_log))), entries)
286 }
287 Err(_) => (None, Vec::new()),
288 }
289 } else {
290 (None, Vec::new())
291 };
292
293 // Spawn background checkpoint task if WAL is enabled
294 let checkpoint_shutdown = if wal.is_some() {
295 let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel();
296 let cold_clone = Arc::clone(&cold);
297 let wal_clone = wal.clone();
298 let checkpoint_interval = config.checkpoint_interval_ms;
299
300 tokio::spawn(async move {
301 let mut interval =
302 tokio::time::interval(Duration::from_millis(checkpoint_interval));
303 loop {
304 tokio::select! {
305 _ = interval.tick() => {
306 // Checkpoint: flush cold storage
307 if let Err(e) = cold_clone.flush() {
308 eprintln!("Background checkpoint flush error: {}", e);
309 }
310 // Truncate WAL after successful flush
311 if let Some(ref wal) = wal_clone
312 && let Ok(mut wal_guard) = wal.write() {
313 let _ = wal_guard.truncate();
314 }
315 }
316 _ = shutdown_rx.recv() => {
317 // Final checkpoint before shutdown
318 let _ = cold_clone.flush();
319 if let Some(ref wal) = wal_clone
320 && let Ok(mut wal_guard) = wal.write() {
321 let _ = wal_guard.truncate();
322 }
323 break;
324 }
325 }
326 }
327 });
328
329 Some(shutdown_tx)
330 } else {
331 None
332 };
333
334 // Spawn background compaction task if auto_compact is enabled
335 let compaction_shutdown = if auto_compact {
336 let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel();
337 let cold_clone = Arc::clone(&cold);
338 let compact_interval = config.compact_interval_mins;
339
340 tokio::spawn(async move {
341 let mut interval =
342 tokio::time::interval(Duration::from_secs(compact_interval * 60));
343 loop {
344 tokio::select! {
345 _ = interval.tick() => {
346 if let Err(e) = cold_clone.compact() {
347 eprintln!("Background compaction error: {}", e);
348 }
349 }
350 _ = shutdown_rx.recv() => {
351 let _ = cold_clone.compact();
352 break;
353 }
354 }
355 }
356 });
357
358 Some(shutdown_tx)
359 } else {
360 None
361 };
362
363 // Initialize the database
364 let db = Self {
365 hot,
366 cold,
367 primary_indices: Arc::new(DashMap::new()),
368 secondary_indices: Arc::new(DashMap::new()),
369 indices_initialized: Arc::new(OnceCell::new()),
370 transaction_manager: crate::transaction::TransactionManager::new(),
371 indices: Arc::new(DashMap::new()),
372 schema_cache: Arc::new(DashMap::new()),
373 config,
374 write_buffer,
375 pubsub,
376 wal,
377 checkpoint_shutdown,
378 compaction_shutdown,
379 };
380
381 // Replay WAL entries if any
382 if !wal_entries.is_empty() {
383 db.replay_wal(wal_entries)?;
384 }
385
386 Ok(db)
387 }
388
389 // Lazy index initialization
390 pub async fn ensure_indices_initialized(&self) -> Result<()> {
391 self.indices_initialized
392 .get_or_init(|| async {
393 println!("Initializing indices...");
394 if let Err(e) = self.initialize_indices() {
395 eprintln!("Failed to initialize indices: {:?}", e);
396 }
397 println!("Indices initialized");
398
399 })
400 .await;
401 Ok(())
402 }
403
404 fn initialize_indices(&self) -> Result<()> {
405 for result in self.cold.scan() {
406 let (key, value) = result?;
407 let key_str = std::str::from_utf8(key.as_bytes())
408 .map_err(|_| AuroraError::InvalidKey("Invalid UTF-8".into()))?;
409
410 if let Some(collection_name) = key_str.split(':').next() {
411 self.index_value(collection_name, key_str, &value)?;
412 }
413 }
414 Ok(())
415 }
416
417 // Fast key-value operations with index support
418 /// Get a value by key (low-level key-value access)
419 ///
420 /// This is the low-level method. For document access, use `get_document()` instead.
421 /// Checks hot cache first, then falls back to cold storage for maximum performance.
422 ///
423 /// # Performance
424 /// - Hot cache hit: ~1M reads/sec (instant)
425 /// - Cold storage: ~500K reads/sec (disk I/O)
426 /// - Cache hit rate: typically 95%+ at scale
427 ///
428 /// # Examples
429 ///
430 /// ```
431 /// // Low-level key-value access
432 /// let data = db.get("users:12345")?;
433 /// if let Some(bytes) = data {
434 /// let doc: Document = serde_json::from_slice(&bytes)?;
435 /// println!("Found: {:?}", doc);
436 /// }
437 ///
438 /// // Better: use get_document() for documents
439 /// let user = db.get_document("users", "12345")?;
440 /// ```
441 pub fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
442 // Check hot cache first
443 if let Some(value) = self.hot.get(key) {
444 return Ok(Some(value));
445 }
446
447 // Check if key exists in primary index
448 if let Some(collection) = key.split(':').next()
449 && let Some(index) = self.primary_indices.get(collection)
450 && index.contains_key(key) {
451 // Key exists in index, fetch from cold storage
452 if let Some(value) = self.cold.get(key)? {
453 // Promote to hot cache for future fast access
454 self.hot.set(key.to_string(), value.clone(), None);
455 return Ok(Some(value));
456 }
457 }
458
459 // Fallback to cold storage
460 let value = self.cold.get(key)?;
461 if let Some(v) = &value {
462 self.hot.set(key.to_string(), v.clone(), None);
463 }
464 Ok(value)
465 }
466
467 /// Get value with zero-copy Arc reference (10-100x faster than get!)
468 /// Only checks hot cache - returns None if not cached
469 pub fn get_hot_ref(&self, key: &str) -> Option<Arc<Vec<u8>>> {
470 self.hot.get_ref(key)
471 }
472
473 /// Get cache statistics
474 ///
475 /// Returns detailed metrics about cache performance including hit/miss rates,
476 /// memory usage, and access patterns. Useful for monitoring, optimization,
477 /// and understanding database performance characteristics.
478 ///
479 /// # Returns
480 /// `CacheStats` struct containing:
481 /// - `hits`: Number of cache hits (data found in memory)
482 /// - `misses`: Number of cache misses (had to read from disk)
483 /// - `hit_rate`: Percentage of requests served from cache (0.0-1.0)
484 /// - `size`: Current number of entries in cache
485 /// - `capacity`: Maximum cache capacity
486 /// - `evictions`: Number of entries evicted due to capacity
487 ///
488 /// # Examples
489 ///
490 /// ```
491 /// use aurora_db::Aurora;
492 ///
493 /// let db = Aurora::open("mydb.db")?;
494 ///
495 /// // Check cache performance
496 /// let stats = db.get_cache_stats();
497 /// println!("Cache hit rate: {:.1}%", stats.hit_rate * 100.0);
498 /// println!("Cache size: {} / {} entries", stats.size, stats.capacity);
499 /// println!("Total hits: {}, misses: {}", stats.hits, stats.misses);
500 ///
501 /// // Monitor performance during operations
502 /// let before = db.get_cache_stats();
503 ///
504 /// // Perform many reads
505 /// for i in 0..1000 {
506 /// db.get_document("users", &format!("user-{}", i))?;
507 /// }
508 ///
509 /// let after = db.get_cache_stats();
510 /// let hit_rate = (after.hits - before.hits) as f64 / 1000.0;
511 /// println!("Read hit rate: {:.1}%", hit_rate * 100.0);
512 ///
513 /// // Performance tuning
514 /// let stats = db.get_cache_stats();
515 /// if stats.hit_rate < 0.80 {
516 /// println!("Low cache hit rate! Consider:");
517 /// println!("- Increasing cache size in config");
518 /// println!("- Prewarming cache with prewarm_cache()");
519 /// println!("- Reviewing query patterns");
520 /// }
521 ///
522 /// if stats.evictions > stats.size {
523 /// println!("High eviction rate! Cache may be too small.");
524 /// println!("Consider increasing cache capacity.");
525 /// }
526 ///
527 /// // Production monitoring
528 /// use std::time::Duration;
529 /// use std::thread;
530 ///
531 /// loop {
532 /// let stats = db.get_cache_stats();
533 ///
534 /// // Log to monitoring system
535 /// if stats.hit_rate < 0.90 {
536 /// eprintln!("Warning: Cache hit rate dropped to {:.1}%",
537 /// stats.hit_rate * 100.0);
538 /// }
539 ///
540 /// thread::sleep(Duration::from_secs(60));
541 /// }
542 /// ```
543 ///
544 /// # Typical Performance Metrics
545 /// - **Excellent**: 95%+ hit rate (most reads from memory)
546 /// - **Good**: 80-95% hit rate (acceptable performance)
547 /// - **Poor**: <80% hit rate (consider cache tuning)
548 ///
549 /// # See Also
550 /// - `prewarm_cache()` to improve hit rates by preloading data
551 /// - `Aurora::with_config()` to adjust cache capacity
552 pub fn get_cache_stats(&self) -> crate::storage::hot::CacheStats {
553 self.hot.get_stats()
554 }
555
556 // ============================================
557 // PubSub API - Real-time Change Notifications
558 // ============================================
559
560 /// Listen for real-time changes in a collection
561 ///
562 /// Returns a stream of change events (inserts, updates, deletes) that you can subscribe to.
563 /// Perfect for building reactive UIs, cache invalidation, audit logging, webhooks, and
564 /// data synchronization systems.
565 ///
566 /// # Performance
567 /// - Zero overhead when no listeners are active
568 /// - Events are broadcast to all listeners asynchronously
569 /// - Non-blocking - doesn't slow down write operations
570 /// - Multiple listeners can watch the same collection
571 ///
572 /// # Examples
573 ///
574 /// ```
575 /// use aurora_db::{Aurora, types::Value};
576 ///
577 /// let db = Aurora::open("mydb.db")?;
578 ///
579 /// // Basic listener
580 /// let mut listener = db.listen("users");
581 ///
582 /// tokio::spawn(async move {
583 /// while let Ok(event) = listener.recv().await {
584 /// match event.change_type {
585 /// ChangeType::Insert => println!("New user: {:?}", event.document),
586 /// ChangeType::Update => println!("Updated user: {:?}", event.document),
587 /// ChangeType::Delete => println!("Deleted user ID: {}", event.id),
588 /// }
589 /// }
590 /// });
591 ///
592 /// // Now any insert/update/delete will trigger the listener
593 /// db.insert_into("users", vec![("name", Value::String("Alice".into()))]).await?;
594 /// ```
595 ///
596 /// # Real-World Use Cases
597 ///
598 /// **Cache Invalidation:**
599 /// ```
600 /// use std::sync::Arc;
601 /// use tokio::sync::RwLock;
602 /// use std::collections::HashMap;
603 ///
604 /// let cache = Arc::new(RwLock::new(HashMap::new()));
605 /// let cache_clone = Arc::clone(&cache);
606 ///
607 /// let mut listener = db.listen("products");
608 ///
609 /// tokio::spawn(async move {
610 /// while let Ok(event) = listener.recv().await {
611 /// // Invalidate cache entry when product changes
612 /// cache_clone.write().await.remove(&event.id);
613 /// println!("Cache invalidated for product: {}", event.id);
614 /// }
615 /// });
616 /// ```
617 ///
618 /// **Webhook Notifications:**
619 /// ```
620 /// let mut listener = db.listen("orders");
621 ///
622 /// tokio::spawn(async move {
623 /// while let Ok(event) = listener.recv().await {
624 /// if event.change_type == ChangeType::Insert {
625 /// // Send webhook for new orders
626 /// send_webhook("https://api.example.com/webhooks/order", &event).await;
627 /// }
628 /// }
629 /// });
630 /// ```
631 ///
632 /// **Audit Logging:**
633 /// ```
634 /// let mut listener = db.listen("sensitive_data");
635 ///
636 /// tokio::spawn(async move {
637 /// while let Ok(event) = listener.recv().await {
638 /// // Log all changes to audit trail
639 /// db.insert_into("audit_log", vec![
640 /// ("collection", Value::String("sensitive_data".into())),
641 /// ("action", Value::String(format!("{:?}", event.change_type))),
642 /// ("document_id", Value::String(event.id.clone())),
643 /// ("timestamp", Value::String(chrono::Utc::now().to_rfc3339())),
644 /// ]).await?;
645 /// }
646 /// });
647 /// ```
648 ///
649 /// **Data Synchronization:**
650 /// ```
651 /// let mut listener = db.listen("users");
652 ///
653 /// tokio::spawn(async move {
654 /// while let Ok(event) = listener.recv().await {
655 /// // Sync changes to external system
656 /// match event.change_type {
657 /// ChangeType::Insert | ChangeType::Update => {
658 /// if let Some(doc) = event.document {
659 /// external_api.upsert_user(&doc).await?;
660 /// }
661 /// },
662 /// ChangeType::Delete => {
663 /// external_api.delete_user(&event.id).await?;
664 /// },
665 /// }
666 /// }
667 /// });
668 /// ```
669 ///
670 /// **Real-Time Notifications:**
671 /// ```
672 /// let mut listener = db.listen("messages");
673 ///
674 /// tokio::spawn(async move {
675 /// while let Ok(event) = listener.recv().await {
676 /// if event.change_type == ChangeType::Insert {
677 /// if let Some(msg) = event.document {
678 /// // Push notification to connected websockets
679 /// if let Some(recipient) = msg.data.get("recipient_id") {
680 /// websocket_manager.send_to_user(recipient, &msg).await;
681 /// }
682 /// }
683 /// }
684 /// }
685 /// });
686 /// ```
687 ///
688 /// **Filtered Listener:**
689 /// ```
690 /// use aurora_db::pubsub::EventFilter;
691 ///
692 /// // Only listen for inserts
693 /// let mut listener = db.listen("users")
694 /// .filter(EventFilter::ChangeType(ChangeType::Insert));
695 ///
696 /// // Only listen for documents with specific field value
697 /// let mut listener = db.listen("users")
698 /// .filter(EventFilter::FieldEquals("role".to_string(), Value::String("admin".into())));
699 /// ```
700 ///
701 /// # Important Notes
702 /// - Listener stays active until dropped
703 /// - Events are delivered in order
704 /// - Each listener has its own event stream
705 /// - Use filters to reduce unnecessary event processing
706 /// - Listeners don't affect write performance
707 ///
708 /// # See Also
709 /// - `listen_all()` to listen to all collections
710 /// - `ChangeListener::filter()` to filter events
711 /// - `query().watch()` for reactive queries with filtering
712 pub fn listen(&self, collection: impl Into<String>) -> crate::pubsub::ChangeListener {
713 self.pubsub.listen(collection)
714 }
715
716 /// Listen for all changes across all collections
717 ///
718 /// Returns a stream of change events for every insert, update, and delete
719 /// operation across the entire database. Useful for global audit logging,
720 /// replication, and monitoring systems.
721 ///
722 /// # Performance
723 /// - Same performance as single collection listener
724 /// - Filter events by collection in your handler
725 /// - Consider using `listen(collection)` if only watching specific collections
726 ///
727 /// # Examples
728 ///
729 /// ```
730 /// use aurora_db::Aurora;
731 ///
732 /// let db = Aurora::open("mydb.db")?;
733 ///
734 /// // Listen to everything
735 /// let mut listener = db.listen_all();
736 ///
737 /// tokio::spawn(async move {
738 /// while let Ok(event) = listener.recv().await {
739 /// println!("Change in {}: {:?}", event.collection, event.change_type);
740 /// }
741 /// });
742 /// ```
743 ///
744 /// # Real-World Use Cases
745 ///
746 /// **Global Audit Trail:**
747 /// ```
748 /// let mut listener = db.listen_all();
749 ///
750 /// tokio::spawn(async move {
751 /// while let Ok(event) = listener.recv().await {
752 /// // Log every database change
753 /// audit_logger.log(AuditEntry {
754 /// timestamp: chrono::Utc::now(),
755 /// collection: event.collection,
756 /// action: event.change_type,
757 /// document_id: event.id,
758 /// user_id: get_current_user_id(),
759 /// }).await;
760 /// }
761 /// });
762 /// ```
763 ///
764 /// **Database Replication:**
765 /// ```
766 /// let mut listener = db.listen_all();
767 ///
768 /// tokio::spawn(async move {
769 /// while let Ok(event) = listener.recv().await {
770 /// // Replicate to secondary database
771 /// replica_db.apply_change(event).await?;
772 /// }
773 /// });
774 /// ```
775 ///
776 /// **Change Data Capture (CDC):**
777 /// ```
778 /// let mut listener = db.listen_all();
779 ///
780 /// tokio::spawn(async move {
781 /// while let Ok(event) = listener.recv().await {
782 /// // Stream changes to Kafka/RabbitMQ
783 /// kafka_producer.send(
784 /// &format!("cdc.{}", event.collection),
785 /// serde_json::to_string(&event)?
786 /// ).await?;
787 /// }
788 /// });
789 /// ```
790 ///
791 /// **Monitoring & Metrics:**
792 /// ```
793 /// use std::sync::atomic::{AtomicUsize, Ordering};
794 ///
795 /// let write_counter = Arc::new(AtomicUsize::new(0));
796 /// let counter_clone = Arc::clone(&write_counter);
797 ///
798 /// let mut listener = db.listen_all();
799 ///
800 /// tokio::spawn(async move {
801 /// while let Ok(_event) = listener.recv().await {
802 /// counter_clone.fetch_add(1, Ordering::Relaxed);
803 /// }
804 /// });
805 ///
806 /// // Report metrics every 60 seconds
807 /// tokio::spawn(async move {
808 /// loop {
809 /// tokio::time::sleep(Duration::from_secs(60)).await;
810 /// let count = write_counter.swap(0, Ordering::Relaxed);
811 /// println!("Writes per minute: {}", count);
812 /// }
813 /// });
814 /// ```
815 ///
816 /// **Selective Processing:**
817 /// ```
818 /// let mut listener = db.listen_all();
819 ///
820 /// tokio::spawn(async move {
821 /// while let Ok(event) = listener.recv().await {
822 /// // Handle different collections differently
823 /// match event.collection.as_str() {
824 /// "users" => handle_user_change(event).await,
825 /// "orders" => handle_order_change(event).await,
826 /// "payments" => handle_payment_change(event).await,
827 /// _ => {} // Ignore others
828 /// }
829 /// }
830 /// });
831 /// ```
832 ///
833 /// # When to Use
834 /// - Global audit logging
835 /// - Database replication
836 /// - Change data capture (CDC)
837 /// - Monitoring and metrics
838 /// - Event sourcing systems
839 ///
840 /// # When NOT to Use
841 /// - Only need to watch 1-2 collections → Use `listen(collection)` instead
842 /// - High write volume with selective interest → Use collection-specific listeners
843 /// - Need complex filtering → Use `query().watch()` instead
844 ///
845 /// # See Also
846 /// - `listen()` for single collection listening
847 /// - `listener_count()` to check active listeners
848 /// - `query().watch()` for filtered reactive queries
849 pub fn listen_all(&self) -> crate::pubsub::ChangeListener {
850 self.pubsub.listen_all()
851 }
852
853 /// Get the number of active listeners for a collection
854 pub fn listener_count(&self, collection: &str) -> usize {
855 self.pubsub.listener_count(collection)
856 }
857
858 /// Get total number of active listeners
859 pub fn total_listeners(&self) -> usize {
860 self.pubsub.total_listeners()
861 }
862
863 /// Flushes all buffered writes to disk to ensure durability.
864 ///
865 /// This method forces all pending writes from:
866 /// - Write buffer (if enabled)
867 /// - Cold storage internal buffers
868 /// - Write-ahead log (if enabled)
869 ///
870 /// Call this when you need to ensure data persistence before
871 /// a critical operation or shutdown. After flush() completes,
872 /// all data is guaranteed to be on disk even if power fails.
873 ///
874 /// # Performance
875 /// - Flush time: ~10-50ms depending on buffered data
876 /// - Triggers OS-level fsync() for durability guarantee
877 /// - Truncates WAL after successful flush
878 /// - Not needed for every write (WAL provides durability)
879 ///
880 /// # Examples
881 ///
882 /// ```
883 /// use aurora_db::Aurora;
884 ///
885 /// let db = Aurora::open("mydb.db")?;
886 ///
887 /// // Basic flush after critical write
888 /// db.insert_into("users", data).await?;
889 /// db.flush()?; // Ensure data is persisted to disk
890 ///
891 /// // Graceful shutdown pattern
892 /// fn shutdown(db: &Aurora) -> Result<()> {
893 /// println!("Flushing pending writes...");
894 /// db.flush()?;
895 /// println!("Shutdown complete - all data persisted");
896 /// Ok(())
897 /// }
898 ///
899 /// // Periodic checkpoint pattern
900 /// use std::time::Duration;
901 /// use std::thread;
902 ///
903 /// let db = db.clone();
904 /// thread::spawn(move || {
905 /// loop {
906 /// thread::sleep(Duration::from_secs(60));
907 /// if let Err(e) = db.flush() {
908 /// eprintln!("Flush error: {}", e);
909 /// } else {
910 /// println!("Checkpoint: data flushed to disk");
911 /// }
912 /// }
913 /// });
914 ///
915 /// // Critical transaction pattern
916 /// let tx_id = db.begin_transaction();
917 ///
918 /// // Multiple operations
919 /// db.insert_into("orders", order_data).await?;
920 /// db.update_document("inventory", product_id, updates).await?;
921 /// db.insert_into("audit_log", audit_data).await?;
922 ///
923 /// // Commit and flush immediately
924 /// db.commit_transaction(tx_id)?;
925 /// db.flush()?; // Critical: ensure transaction is on disk
926 ///
927 /// // Backup preparation
928 /// println!("Preparing backup...");
929 /// db.flush()?; // Ensure all data is written
930 /// std::fs::copy("mydb.db", "backup.db")?;
931 /// println!("Backup complete");
932 /// ```
933 ///
934 /// # When to Use
935 /// - Before graceful shutdown
936 /// - After critical transactions
937 /// - Before creating backups
938 /// - Periodic checkpoints (every 30-60 seconds)
939 /// - Before risky operations
940 ///
941 /// # When NOT to Use
942 /// - After every single write (too slow, WAL provides durability)
943 /// - In high-throughput loops (batch instead)
944 /// - When durability mode is already Immediate
945 ///
946 /// # Important Notes
947 /// - WAL provides durability even without explicit flush()
948 /// - flush() adds latency (~10-50ms) so use strategically
949 /// - Automatic flush happens during graceful shutdown
950 /// - After flush(), WAL is truncated (data is in main storage)
951 ///
952 /// # See Also
953 /// - `Aurora::with_config()` to set durability mode
954 /// - WAL (Write-Ahead Log) provides durability without explicit flushes
955 pub fn flush(&self) -> Result<()> {
956 // Flush write buffer if present
957 if let Some(ref write_buffer) = self.write_buffer {
958 write_buffer.flush()?;
959 }
960
961 // Flush cold storage
962 self.cold.flush()?;
963
964 // Truncate WAL after successful flush (data is now in cold storage)
965 if let Some(ref wal) = self.wal
966 && let Ok(mut wal_lock) = wal.write() {
967 wal_lock.truncate()?;
968 }
969
970 Ok(())
971 }
972
973 /// Store a key-value pair (low-level storage)
974 ///
975 /// This is the low-level method. For documents, use `insert_into()` instead.
976 /// Writes are buffered and batched for performance.
977 ///
978 /// # Arguments
979 /// * `key` - Unique key (format: "collection:id" for documents)
980 /// * `value` - Raw bytes to store
981 /// * `ttl` - Optional time-to-live (None = permanent)
982 ///
983 /// # Performance
984 /// - Buffered writes: ~15-30K docs/sec
985 /// - Batching improves throughput significantly
986 /// - Call `flush()` to ensure data is persisted
987 ///
988 /// # Examples
989 ///
990 /// ```
991 /// use std::time::Duration;
992 ///
993 /// // Permanent storage
994 /// let data = serde_json::to_vec(&my_struct)?;
995 /// db.put("mykey".to_string(), data, None)?;
996 ///
997 /// // With TTL (expires after 1 hour)
998 /// db.put("session:abc".to_string(), session_data, Some(Duration::from_secs(3600)))?;
999 ///
1000 /// // Better: use insert_into() for documents
1001 /// db.insert_into("users", vec![("name", Value::String("Alice".into()))])?;
1002 /// ```
1003 pub fn put(&self, key: String, value: Vec<u8>, ttl: Option<Duration>) -> Result<()> {
1004 const MAX_BLOB_SIZE: usize = 50 * 1024 * 1024;
1005
1006 if value.len() > MAX_BLOB_SIZE {
1007 return Err(AuroraError::InvalidOperation(format!(
1008 "Blob size {} exceeds maximum allowed size of {}MB",
1009 value.len() / (1024 * 1024),
1010 MAX_BLOB_SIZE / (1024 * 1024)
1011 )));
1012 }
1013
1014 if let Some(ref wal) = self.wal
1015 && self.config.durability_mode != DurabilityMode::None {
1016 wal.write()
1017 .unwrap()
1018 .append(Operation::Put, &key, Some(&value))?;
1019 }
1020
1021 if let Some(ref write_buffer) = self.write_buffer {
1022 write_buffer.write(key.clone(), value.clone())?;
1023 } else {
1024 self.cold.set(key.clone(), value.clone())?;
1025 }
1026
1027 self.hot.set(key.clone(), value.clone(), ttl);
1028
1029 if let Some(collection_name) = key.split(':').next()
1030 && !collection_name.starts_with('_') {
1031 self.index_value(collection_name, &key, &value)?;
1032 }
1033
1034 Ok(())
1035 }
1036
1037 /// Replay WAL entries to recover from crash
1038 fn replay_wal(&self, entries: Vec<crate::wal::LogEntry>) -> Result<()> {
1039 for entry in entries {
1040 match entry.operation {
1041 Operation::Put => {
1042 if let Some(value) = entry.value {
1043 // Write directly to cold storage (skip WAL, already logged)
1044 self.cold.set(entry.key.clone(), value.clone())?;
1045
1046 // Update hot cache
1047 self.hot.set(entry.key.clone(), value.clone(), None);
1048
1049 // Rebuild indices
1050 if let Some(collection) = entry.key.split(':').next()
1051 && !collection.starts_with('_') {
1052 self.index_value(collection, &entry.key, &value)?;
1053 }
1054 }
1055 }
1056 Operation::Delete => {
1057 self.cold.delete(&entry.key)?;
1058 self.hot.delete(&entry.key);
1059 // TODO: Remove from indices
1060 }
1061 Operation::BeginTx | Operation::CommitTx | Operation::RollbackTx => {
1062 // Transaction boundaries - future implementation
1063 }
1064 }
1065 }
1066
1067 // Flush after replay and truncate WAL
1068 self.cold.flush()?;
1069 if let Some(ref wal) = self.wal {
1070 wal.write().unwrap().truncate()?;
1071 }
1072
1073 Ok(())
1074 }
1075
1076 fn index_value(&self, collection: &str, key: &str, value: &[u8]) -> Result<()> {
1077 // Update primary index with metadata only (not the full value)
1078 let location = DiskLocation::new(value.len());
1079 self.primary_indices
1080 .entry(collection.to_string())
1081 .or_default()
1082 .insert(key.to_string(), location);
1083
1084 // Try to get schema from cache first, otherwise load and cache it
1085 let collection_obj = match self.schema_cache.get(collection) {
1086 Some(cached_schema) => Arc::clone(cached_schema.value()),
1087 None => {
1088 // Schema not in cache - load it
1089 let collection_key = format!("_collection:{}", collection);
1090 let schema_data = match self.get(&collection_key)? {
1091 Some(data) => data,
1092 None => return Ok(()), // No schema = no secondary indices
1093 };
1094
1095 let obj: Collection = match serde_json::from_slice(&schema_data) {
1096 Ok(obj) => obj,
1097 Err(_) => return Ok(()), // Invalid schema = skip indexing
1098 };
1099
1100 // Cache the schema for future use
1101 let arc_obj = Arc::new(obj);
1102 self.schema_cache
1103 .insert(collection.to_string(), Arc::clone(&arc_obj));
1104 arc_obj
1105 }
1106 };
1107
1108 // Build list of fields that should be indexed (unique or explicitly indexed)
1109 let indexed_fields: Vec<String> = collection_obj
1110 .fields
1111 .iter()
1112 .filter(|(_, def)| def.indexed || def.unique)
1113 .map(|(name, _)| name.clone())
1114 .collect();
1115
1116 // If no fields need indexing, we're done
1117 if indexed_fields.is_empty() {
1118 return Ok(());
1119 }
1120
1121 // Update secondary indices - ONLY for indexed/unique fields
1122 if let Ok(doc) = serde_json::from_slice::<Document>(value) {
1123 for (field, field_value) in doc.data {
1124 // CRITICAL FIX: Skip fields that aren't indexed
1125 if !indexed_fields.contains(&field) {
1126 continue;
1127 }
1128
1129 // Use consistent string formatting for indexing
1130 let value_str = match &field_value {
1131 Value::String(s) => s.clone(),
1132 _ => field_value.to_string(),
1133 };
1134
1135 let index_key = format!("{}:{}", collection, field);
1136 let secondary_index = self
1137 .secondary_indices
1138 .entry(index_key)
1139 .or_default();
1140
1141 // Check if we're at the index limit
1142 let max_entries = self.config.max_index_entries_per_field;
1143
1144 secondary_index
1145 .entry(value_str)
1146 .and_modify(|doc_ids| {
1147 // Only add if we haven't exceeded the limit per value
1148 if doc_ids.len() < max_entries {
1149 doc_ids.push(key.to_string());
1150 }
1151 })
1152 .or_insert_with(|| vec![key.to_string()]);
1153 }
1154 }
1155 Ok(())
1156 }
1157
1158 /// Scan collection with filter and early termination support
1159 /// Used by QueryBuilder for optimized queries with LIMIT
1160 pub fn scan_and_filter<F>(&self, collection: &str, filter: F, limit: Option<usize>)
1161 -> Result<Vec<Document>>
1162 where
1163 F: Fn(&Document) -> bool,
1164 {
1165 let mut documents = Vec::new();
1166
1167 if let Some(index) = self.primary_indices.get(collection) {
1168 for entry in index.iter() {
1169 // Early termination
1170 if let Some(max) = limit {
1171 if documents.len() >= max {
1172 break;
1173 }
1174 }
1175
1176 let key = entry.key();
1177 if let Some(data) = self.get(key)? {
1178 if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
1179 if filter(&doc) {
1180 documents.push(doc);
1181 }
1182 }
1183 }
1184 }
1185 } else {
1186 // Fallback
1187 documents = self.scan_collection(collection)?;
1188 documents.retain(|doc| filter(doc));
1189 if let Some(max) = limit {
1190 documents.truncate(max);
1191 }
1192 }
1193
1194 Ok(documents)
1195 }
1196
1197 /// Smart collection scan that uses the primary index as a key directory
1198 /// Avoids forced flushes and leverages hot cache for better performance
1199 fn scan_collection(&self, collection: &str) -> Result<Vec<Document>> {
1200 let mut documents = Vec::new();
1201
1202 // Use the primary index as a "key directory" - it contains all document keys
1203 if let Some(index) = self.primary_indices.get(collection) {
1204 // Iterate through all keys in the primary index (fast, in-memory)
1205 for entry in index.iter() {
1206 let key = entry.key();
1207
1208 // Fetch each document using get() which checks:
1209 // 1. Hot cache first (instant)
1210 // 2. Cold storage if not cached (disk I/O only for uncached items)
1211 if let Some(data) = self.get(key)? {
1212 if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
1213 documents.push(doc);
1214 }
1215 }
1216 }
1217 } else {
1218 // Fallback: scan from cold storage if primary index not yet initialized
1219 let prefix = format!("{}:", collection);
1220 for result in self.cold.scan_prefix(&prefix) {
1221 if let Ok((_key, value)) = result {
1222 if let Ok(doc) = serde_json::from_slice::<Document>(&value) {
1223 documents.push(doc);
1224 }
1225 }
1226 }
1227 }
1228
1229 Ok(documents)
1230 }
1231
1232 // Restore missing methods
1233 pub async fn put_blob(&self, key: String, file_path: &Path) -> Result<()> {
1234 const MAX_FILE_SIZE: usize = 50 * 1024 * 1024; // 50MB limit
1235
1236 // Get file metadata to check size before reading
1237 let metadata = tokio::fs::metadata(file_path).await?;
1238 let file_size = metadata.len() as usize;
1239
1240 if file_size > MAX_FILE_SIZE {
1241 return Err(AuroraError::InvalidOperation(format!(
1242 "File size {} MB exceeds maximum allowed size of {} MB",
1243 file_size / (1024 * 1024),
1244 MAX_FILE_SIZE / (1024 * 1024)
1245 )));
1246 }
1247
1248 let mut file = File::open(file_path).await?;
1249 let mut buffer = Vec::new();
1250 file.read_to_end(&mut buffer).await?;
1251
1252 // Add BLOB: prefix to mark this as blob data
1253 let mut blob_data = Vec::with_capacity(5 + buffer.len());
1254 blob_data.extend_from_slice(b"BLOB:");
1255 blob_data.extend_from_slice(&buffer);
1256
1257 self.put(key, blob_data, None)
1258 }
1259
1260 /// Create a new collection with schema definition
1261 ///
1262 /// Collections are like tables in SQL - they define the structure of your documents.
1263 /// The third boolean parameter indicates if the field should be indexed for fast lookups.
1264 ///
1265 /// # Arguments
1266 /// * `name` - Collection name
1267 /// * `fields` - Vector of (field_name, field_type, indexed) tuples
1268 /// - Field name (accepts both &str and String)
1269 /// - Field type (String, Int, Float, Bool, etc.)
1270 /// - Indexed: true for fast lookups, false for no index
1271 ///
1272 /// # Performance
1273 /// - Indexed fields: Fast equality queries (O(1) lookup)
1274 /// - Non-indexed fields: Full scan required for queries
1275 /// - Unique fields are automatically indexed
1276 ///
1277 /// # Examples
1278 ///
1279 /// ```
1280 /// use aurora_db::{Aurora, types::FieldType};
1281 ///
1282 /// let db = Aurora::open("mydb.db")?;
1283 ///
1284 /// // Create a users collection
1285 /// db.new_collection("users", vec![
1286 /// ("name", FieldType::String, false), // Not indexed
1287 /// ("email", FieldType::String, true), // Indexed - fast lookups
1288 /// ("age", FieldType::Int, false),
1289 /// ("active", FieldType::Bool, true), // Indexed
1290 /// ("score", FieldType::Float, false),
1291 /// ])?;
1292 ///
1293 /// // Idempotent - calling again is safe
1294 /// db.new_collection("users", vec![/* ... */])?; // OK!
1295 /// ```
1296 pub fn new_collection<S: Into<String>>(
1297 &self,
1298 name: &str,
1299 fields: Vec<(S, FieldType, bool)>,
1300 ) -> Result<()> {
1301 let collection_key = format!("_collection:{}", name);
1302
1303 // Check if collection already exists - if so, just return Ok (idempotent)
1304 if self.get(&collection_key)?.is_some() {
1305 return Ok(());
1306 }
1307
1308 // Create field definitions
1309 let mut field_definitions = HashMap::new();
1310 for (field_name, field_type, unique) in fields {
1311 field_definitions.insert(
1312 field_name.into(),
1313 FieldDefinition {
1314 field_type,
1315 unique,
1316 indexed: unique, // Auto-index unique fields
1317 },
1318 );
1319 }
1320
1321 let collection = Collection {
1322 name: name.to_string(),
1323 fields: field_definitions,
1324 // REMOVED: unique_fields is now derived from fields
1325 };
1326
1327 let collection_data = serde_json::to_vec(&collection)?;
1328 self.put(collection_key, collection_data, None)?;
1329
1330 // Invalidate schema cache since we just created/updated the collection schema
1331 self.schema_cache.remove(name);
1332
1333 Ok(())
1334 }
1335
1336 /// Insert a document into a collection
1337 ///
1338 /// Automatically generates a UUID for the document and validates against
1339 /// collection schema and unique constraints. Returns the generated document ID.
1340 ///
1341 /// # Performance
1342 /// - Single insert: ~15,000 docs/sec
1343 /// - Bulk insert: Use `batch_insert()` for 10+ documents (~50,000 docs/sec)
1344 /// - Triggers PubSub events for real-time listeners
1345 ///
1346 /// # Arguments
1347 /// * `collection` - Name of the collection to insert into
1348 /// * `data` - Document fields and values to insert
1349 ///
1350 /// # Returns
1351 /// The auto-generated ID of the inserted document or an error
1352 ///
1353 /// # Errors
1354 /// - `CollectionNotFound`: Collection doesn't exist
1355 /// - `ValidationError`: Data violates schema or unique constraints
1356 /// - `SerializationError`: Invalid data format
1357 ///
1358 /// # Examples
1359 ///
1360 /// ```
1361 /// use aurora_db::{Aurora, types::Value};
1362 ///
1363 /// let db = Aurora::open("mydb.db")?;
1364 ///
1365 /// // Basic insertion
1366 /// let user_id = db.insert_into("users", vec![
1367 /// ("name", Value::String("Alice Smith".to_string())),
1368 /// ("email", Value::String("alice@example.com".to_string())),
1369 /// ("age", Value::Int(28)),
1370 /// ("active", Value::Bool(true)),
1371 /// ]).await?;
1372 ///
1373 /// println!("Created user with ID: {}", user_id);
1374 ///
1375 /// // Inserting with nested data
1376 /// let order_id = db.insert_into("orders", vec![
1377 /// ("user_id", Value::String(user_id.clone())),
1378 /// ("total", Value::Float(99.99)),
1379 /// ("status", Value::String("pending".to_string())),
1380 /// ("items", Value::Array(vec![
1381 /// Value::String("item-123".to_string()),
1382 /// Value::String("item-456".to_string()),
1383 /// ])),
1384 /// ]).await?;
1385 ///
1386 /// // Error handling - unique constraint violation
1387 /// match db.insert_into("users", vec![
1388 /// ("email", Value::String("alice@example.com".to_string())), // Duplicate!
1389 /// ("name", Value::String("Alice Clone".to_string())),
1390 /// ]).await {
1391 /// Ok(id) => println!("Inserted: {}", id),
1392 /// Err(e) => println!("Failed: {} (email already exists)", e),
1393 /// }
1394 ///
1395 /// // For bulk inserts (10+ documents), use batch_insert() instead
1396 /// let users = vec![
1397 /// HashMap::from([
1398 /// ("name".to_string(), Value::String("Bob".to_string())),
1399 /// ("email".to_string(), Value::String("bob@example.com".to_string())),
1400 /// ]),
1401 /// HashMap::from([
1402 /// ("name".to_string(), Value::String("Carol".to_string())),
1403 /// ("email".to_string(), Value::String("carol@example.com".to_string())),
1404 /// ]),
1405 /// // ... more documents
1406 /// ];
1407 /// let ids = db.batch_insert("users", users).await?; // 3x faster!
1408 /// println!("Inserted {} users", ids.len());
1409 /// ```
1410 pub async fn insert_into(&self, collection: &str, data: Vec<(&str, Value)>) -> Result<String> {
1411 // Convert Vec<(&str, Value)> to HashMap<String, Value>
1412 let data_map: HashMap<String, Value> =
1413 data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
1414
1415 // Validate unique constraints before inserting
1416 self.validate_unique_constraints(collection, &data_map)
1417 .await?;
1418
1419 let doc_id = Uuid::new_v4().to_string();
1420 let document = Document {
1421 id: doc_id.clone(),
1422 data: data_map,
1423 };
1424
1425 self.put(
1426 format!("{}:{}", collection, doc_id),
1427 serde_json::to_vec(&document)?,
1428 None,
1429 )?;
1430
1431 // Publish insert event
1432 let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
1433 let _ = self.pubsub.publish(event);
1434
1435 Ok(doc_id)
1436 }
1437
1438 pub async fn insert_map(
1439 &self,
1440 collection: &str,
1441 data: HashMap<String, Value>,
1442 ) -> Result<String> {
1443 // Validate unique constraints before inserting
1444 self.validate_unique_constraints(collection, &data).await?;
1445
1446 let doc_id = Uuid::new_v4().to_string();
1447 let document = Document {
1448 id: doc_id.clone(),
1449 data,
1450 };
1451
1452 self.put(
1453 format!("{}:{}", collection, doc_id),
1454 serde_json::to_vec(&document)?,
1455 None,
1456 )?;
1457
1458 // Publish insert event
1459 let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
1460 let _ = self.pubsub.publish(event);
1461
1462 Ok(doc_id)
1463 }
1464
1465 /// Batch insert multiple documents with optimized write path
1466 ///
1467 /// Inserts multiple documents in a single optimized operation, bypassing
1468 /// the write buffer for better performance. Ideal for bulk data loading,
1469 /// migrations, or initial database seeding. 3x faster than individual inserts.
1470 ///
1471 /// # Performance
1472 /// - Insert speed: ~50,000 docs/sec (vs ~15,000 for single inserts)
1473 /// - Batch writes to WAL and storage
1474 /// - Validates all unique constraints
1475 /// - Use for 10+ documents minimum
1476 ///
1477 /// # Arguments
1478 /// * `collection` - Name of the collection to insert into
1479 /// * `documents` - Vector of document data as HashMaps
1480 ///
1481 /// # Returns
1482 /// Vector of auto-generated document IDs or an error
1483 ///
1484 /// # Examples
1485 ///
1486 /// ```
1487 /// use aurora_db::{Aurora, types::Value};
1488 /// use std::collections::HashMap;
1489 ///
1490 /// let db = Aurora::open("mydb.db")?;
1491 ///
1492 /// // Bulk user import
1493 /// let users = vec![
1494 /// HashMap::from([
1495 /// ("name".to_string(), Value::String("Alice".into())),
1496 /// ("email".to_string(), Value::String("alice@example.com".into())),
1497 /// ("age".to_string(), Value::Int(28)),
1498 /// ]),
1499 /// HashMap::from([
1500 /// ("name".to_string(), Value::String("Bob".into())),
1501 /// ("email".to_string(), Value::String("bob@example.com".into())),
1502 /// ("age".to_string(), Value::Int(32)),
1503 /// ]),
1504 /// HashMap::from([
1505 /// ("name".to_string(), Value::String("Carol".into())),
1506 /// ("email".to_string(), Value::String("carol@example.com".into())),
1507 /// ("age".to_string(), Value::Int(25)),
1508 /// ]),
1509 /// ];
1510 ///
1511 /// let ids = db.batch_insert("users", users).await?;
1512 /// println!("Inserted {} users", ids.len());
1513 ///
1514 /// // Seeding test data
1515 /// let test_products: Vec<HashMap<String, Value>> = (0..1000)
1516 /// .map(|i| HashMap::from([
1517 /// ("sku".to_string(), Value::String(format!("PROD-{:04}", i))),
1518 /// ("price".to_string(), Value::Float(9.99 + i as f64)),
1519 /// ("stock".to_string(), Value::Int(100)),
1520 /// ]))
1521 /// .collect();
1522 ///
1523 /// let ids = db.batch_insert("products", test_products).await?;
1524 /// // Much faster than 1000 individual insert_into() calls!
1525 ///
1526 /// // Migration from CSV data
1527 /// let mut csv_reader = csv::Reader::from_path("data.csv")?;
1528 /// let mut batch = Vec::new();
1529 ///
1530 /// for result in csv_reader.records() {
1531 /// let record = result?;
1532 /// let doc = HashMap::from([
1533 /// ("field1".to_string(), Value::String(record[0].to_string())),
1534 /// ("field2".to_string(), Value::String(record[1].to_string())),
1535 /// ]);
1536 /// batch.push(doc);
1537 ///
1538 /// // Insert in batches of 1000
1539 /// if batch.len() >= 1000 {
1540 /// db.batch_insert("imported_data", batch.clone()).await?;
1541 /// batch.clear();
1542 /// }
1543 /// }
1544 ///
1545 /// // Insert remaining
1546 /// if !batch.is_empty() {
1547 /// db.batch_insert("imported_data", batch).await?;
1548 /// }
1549 /// ```
1550 ///
1551 /// # Errors
1552 /// - `ValidationError`: Unique constraint violation on any document
1553 /// - `CollectionNotFound`: Collection doesn't exist
1554 /// - `IoError`: Storage write failure
1555 ///
1556 /// # Important Notes
1557 /// - All inserts are atomic - if one fails, none are inserted
1558 /// - UUIDs are auto-generated for all documents
1559 /// - PubSub events are published for each insert
1560 /// - For 10+ documents, this is 3x faster than individual inserts
1561 /// - For < 10 documents, use `insert_into()` instead
1562 ///
1563 /// # See Also
1564 /// - `insert_into()` for single document inserts
1565 /// - `import_from_json()` for file-based bulk imports
1566 /// - `batch_write()` for low-level batch operations
1567 pub async fn batch_insert(
1568 &self,
1569 collection: &str,
1570 documents: Vec<HashMap<String, Value>>,
1571 ) -> Result<Vec<String>> {
1572 let mut doc_ids = Vec::with_capacity(documents.len());
1573 let mut pairs = Vec::with_capacity(documents.len());
1574
1575 // Prepare all documents
1576 for data in documents {
1577 // Validate unique constraints
1578 self.validate_unique_constraints(collection, &data).await?;
1579
1580 let doc_id = Uuid::new_v4().to_string();
1581 let document = Document {
1582 id: doc_id.clone(),
1583 data,
1584 };
1585
1586 let key = format!("{}:{}", collection, doc_id);
1587 let value = serde_json::to_vec(&document)?;
1588
1589 pairs.push((key, value));
1590 doc_ids.push(doc_id);
1591 }
1592
1593 // Write to WAL in batch (if enabled)
1594 if let Some(ref wal) = self.wal
1595 && self.config.durability_mode != DurabilityMode::None {
1596 let mut wal_lock = wal.write().unwrap();
1597 for (key, value) in &pairs {
1598 wal_lock.append(Operation::Put, key, Some(value))?;
1599 }
1600 }
1601
1602 // Bypass write buffer - go directly to cold storage batch API
1603 self.cold.batch_set(pairs.clone())?;
1604
1605 // Note: Durability is handled by background checkpoint process
1606
1607 // Update hot cache and indices
1608 for (key, value) in pairs {
1609 self.hot.set(key.clone(), value.clone(), None);
1610
1611 if let Some(collection_name) = key.split(':').next()
1612 && !collection_name.starts_with('_') {
1613 self.index_value(collection_name, &key, &value)?;
1614 }
1615 }
1616
1617 // Publish events
1618 for doc_id in &doc_ids {
1619 if let Ok(Some(doc)) = self.get_document(collection, doc_id) {
1620 let event = crate::pubsub::ChangeEvent::insert(collection, doc_id, doc);
1621 let _ = self.pubsub.publish(event);
1622 }
1623 }
1624
1625 Ok(doc_ids)
1626 }
1627
1628 /// Update a document by ID
1629 ///
1630 /// # Arguments
1631 /// * `collection` - Collection name
1632 /// * `doc_id` - Document ID to update
1633 /// * `data` - New field values to set
1634 ///
1635 /// # Returns
1636 /// Ok(()) on success, or an error if the document doesn't exist
1637 ///
1638 /// # Examples
1639 ///
1640 /// ```
1641 /// db.update_document("users", &user_id, vec![
1642 /// ("status", Value::String("active".to_string())),
1643 /// ("last_login", Value::String(chrono::Utc::now().to_rfc3339())),
1644 /// ]).await?;
1645 /// ```
1646 pub async fn update_document(
1647 &self,
1648 collection: &str,
1649 doc_id: &str,
1650 updates: Vec<(&str, Value)>,
1651 ) -> Result<()> {
1652 // Get existing document
1653 let mut document = self
1654 .get_document(collection, doc_id)?
1655 .ok_or_else(|| AuroraError::NotFound(format!("Document not found: {}", doc_id)))?;
1656
1657 // Store old document for event
1658 let old_document = document.clone();
1659
1660 // Apply updates
1661 for (field, value) in updates {
1662 document.data.insert(field.to_string(), value);
1663 }
1664
1665 // Validate unique constraints after update (excluding current document)
1666 self.validate_unique_constraints_excluding(collection, &document.data, doc_id)
1667 .await?;
1668
1669 // Save updated document
1670 self.put(
1671 format!("{}:{}", collection, doc_id),
1672 serde_json::to_vec(&document)?,
1673 None,
1674 )?;
1675
1676 // Publish update event
1677 let event =
1678 crate::pubsub::ChangeEvent::update(collection, doc_id, old_document, document.clone());
1679 let _ = self.pubsub.publish(event);
1680
1681 Ok(())
1682 }
1683
1684 pub async fn get_all_collection(&self, collection: &str) -> Result<Vec<Document>> {
1685 self.ensure_indices_initialized().await?;
1686 self.scan_collection(collection)
1687 }
1688
1689 pub fn get_data_by_pattern(&self, pattern: &str) -> Result<Vec<(String, DataInfo)>> {
1690 let mut data = Vec::new();
1691
1692 // Scan from cold storage instead of primary index
1693 for result in self.cold.scan() {
1694 if let Ok((key, value)) = result {
1695 if key.contains(pattern) {
1696 let info = if value.starts_with(b"BLOB:") {
1697 DataInfo::Blob { size: value.len() }
1698 } else {
1699 DataInfo::Data {
1700 size: value.len(),
1701 preview: String::from_utf8_lossy(&value[..value.len().min(50)])
1702 .into_owned(),
1703 }
1704 };
1705
1706 data.push((key.clone(), info));
1707 }
1708 }
1709 }
1710
1711 Ok(data)
1712 }
1713
1714 /// Begin a transaction
1715 ///
1716 /// All operations after beginning a transaction will be part of the transaction
1717 /// until either commit_transaction() or rollback_transaction() is called.
1718 ///
1719 /// # Returns
1720 /// Success or an error (e.g., if a transaction is already in progress)
1721 ///
1722 /// # Examples
1723 ///
1724 /// ```
1725 /// // Start a transaction for atomic operations
1726 /// db.begin_transaction()?;
1727 ///
1728 /// // Perform multiple operations
1729 /// db.insert_into("accounts", vec![("user_id", Value::String(user_id)), ("balance", Value::Float(100.0))])?;
1730 /// db.insert_into("audit_log", vec![("action", Value::String("account_created".to_string()))])?;
1731 ///
1732 /// // Commit all changes or roll back if there's an error
1733 /// if all_ok {
1734 /// db.commit_transaction()?;
1735 /// } else {
1736 /// db.rollback_transaction()?;
1737 /// }
1738 /// ```
1739 /// Begin a new transaction for atomic operations
1740 ///
1741 /// Transactions ensure all-or-nothing execution: either all operations succeed,
1742 /// or none of them are applied. Perfect for maintaining data consistency.
1743 ///
1744 /// # Examples
1745 ///
1746 /// ```
1747 /// use aurora_db::{Aurora, types::Value};
1748 ///
1749 /// let db = Aurora::open("mydb.db")?;
1750 ///
1751 /// // Start transaction
1752 /// let tx_id = db.begin_transaction();
1753 ///
1754 /// // Perform multiple operations
1755 /// db.insert_into("accounts", vec![
1756 /// ("user_id", Value::String("alice".into())),
1757 /// ("balance", Value::Int(1000)),
1758 /// ]).await?;
1759 ///
1760 /// db.insert_into("accounts", vec![
1761 /// ("user_id", Value::String("bob".into())),
1762 /// ("balance", Value::Int(500)),
1763 /// ])).await?;
1764 ///
1765 /// // Commit if all succeeded
1766 /// db.commit_transaction(tx_id)?;
1767 ///
1768 /// // Or rollback on error
1769 /// // db.rollback_transaction(tx_id)?;
1770 /// ```
1771 pub fn begin_transaction(&self) -> crate::transaction::TransactionId {
1772 let buffer = self.transaction_manager.begin();
1773 buffer.id
1774 }
1775
1776 /// Commit a transaction, making all changes permanent
1777 ///
1778 /// All operations within the transaction are atomically applied to the database.
1779 /// If any operation fails, none are applied.
1780 ///
1781 /// # Arguments
1782 /// * `tx_id` - Transaction ID returned from begin_transaction()
1783 ///
1784 /// # Examples
1785 ///
1786 /// ```
1787 /// use aurora_db::{Aurora, types::Value};
1788 ///
1789 /// let db = Aurora::open("mydb.db")?;
1790 ///
1791 /// // Transfer money between accounts
1792 /// let tx_id = db.begin_transaction();
1793 ///
1794 /// // Deduct from Alice
1795 /// db.update_document("accounts", "alice", vec![
1796 /// ("balance", Value::Int(900)), // Was 1000
1797 /// ]).await?;
1798 ///
1799 /// // Add to Bob
1800 /// db.update_document("accounts", "bob", vec![
1801 /// ("balance", Value::Int(600)), // Was 500
1802 /// ]).await?;
1803 ///
1804 /// // Both updates succeed - commit them
1805 /// db.commit_transaction(tx_id)?;
1806 ///
1807 /// println!("Transfer completed!");
1808 /// ```
1809 pub fn commit_transaction(&self, tx_id: crate::transaction::TransactionId) -> Result<()> {
1810 let buffer = self
1811 .transaction_manager
1812 .active_transactions
1813 .get(&tx_id)
1814 .ok_or_else(|| {
1815 AuroraError::InvalidOperation("Transaction not found or already completed".into())
1816 })?;
1817
1818 for item in buffer.writes.iter() {
1819 let key = item.key();
1820 let value = item.value();
1821 self.cold.set(key.clone(), value.clone())?;
1822 self.hot.set(key.clone(), value.clone(), None);
1823 if let Some(collection_name) = key.split(':').next()
1824 && !collection_name.starts_with('_') {
1825 self.index_value(collection_name, key, value)?;
1826 }
1827 }
1828
1829 for item in buffer.deletes.iter() {
1830 let key = item.key();
1831 if let Some((collection, id)) = key.split_once(':')
1832 && let Ok(Some(doc)) = self.get_document(collection, id) {
1833 self.remove_from_indices(collection, &doc)?;
1834 }
1835 self.cold.delete(key)?;
1836 self.hot.delete(key);
1837 }
1838
1839 // Drop the buffer reference to release the DashMap read lock
1840 // before calling commit which needs to remove the entry (write lock)
1841 drop(buffer);
1842
1843 self.transaction_manager.commit(tx_id)?;
1844
1845 self.cold.compact()?;
1846
1847 Ok(())
1848 }
1849
1850 /// Roll back a transaction, discarding all changes
1851 ///
1852 /// All operations within the transaction are discarded. The database state
1853 /// remains unchanged. Use this when an error occurs during transaction processing.
1854 ///
1855 /// # Arguments
1856 /// * `tx_id` - Transaction ID returned from begin_transaction()
1857 ///
1858 /// # Examples
1859 ///
1860 /// ```
1861 /// use aurora_db::{Aurora, types::Value};
1862 ///
1863 /// let db = Aurora::open("mydb.db")?;
1864 ///
1865 /// // Attempt a transfer with validation
1866 /// let tx_id = db.begin_transaction();
1867 ///
1868 /// let result = async {
1869 /// // Deduct from Alice
1870 /// let alice = db.get_document("accounts", "alice").await?;
1871 /// let balance = alice.and_then(|doc| doc.data.get("balance"));
1872 ///
1873 /// if let Some(Value::Int(bal)) = balance {
1874 /// if *bal < 100 {
1875 /// return Err("Insufficient funds");
1876 /// }
1877 ///
1878 /// db.update_document("accounts", "alice", vec![
1879 /// ("balance", Value::Int(bal - 100)),
1880 /// ]).await?;
1881 ///
1882 /// db.update_document("accounts", "bob", vec![
1883 /// ("balance", Value::Int(600)),
1884 /// ]).await?;
1885 ///
1886 /// Ok(())
1887 /// } else {
1888 /// Err("Account not found")
1889 /// }
1890 /// }.await;
1891 ///
1892 /// match result {
1893 /// Ok(_) => {
1894 /// db.commit_transaction(tx_id)?;
1895 /// println!("Transfer completed");
1896 /// }
1897 /// Err(e) => {
1898 /// db.rollback_transaction(tx_id)?;
1899 /// println!("Transfer failed: {}, changes rolled back", e);
1900 /// }
1901 /// }
1902 /// ```
1903 pub fn rollback_transaction(&self, tx_id: crate::transaction::TransactionId) -> Result<()> {
1904 self.transaction_manager.rollback(tx_id)
1905 }
1906
1907 /// Create a secondary index on a field for faster queries
1908 ///
1909 /// Indexes dramatically improve query performance for frequently accessed fields,
1910 /// trading increased memory usage and slower writes for much faster reads.
1911 ///
1912 /// # When to Create Indexes
1913 /// - **Frequent queries**: Fields used in 80%+ of your queries
1914 /// - **High cardinality**: Fields with many unique values (user_id, email)
1915 /// - **Sorting/filtering**: Fields used in ORDER BY or WHERE clauses
1916 /// - **Large collections**: Most beneficial with 10,000+ documents
1917 ///
1918 /// # When NOT to Index
1919 /// - Low cardinality fields (e.g., boolean flags, small enums)
1920 /// - Rarely queried fields
1921 /// - Fields that change frequently (write-heavy workloads)
1922 /// - Small collections (<1,000 documents) - full scans are fast enough
1923 ///
1924 /// # Performance Characteristics
1925 /// - **Query speedup**: O(n) → O(1) for equality filters
1926 /// - **Memory cost**: ~100-200 bytes per document per index
1927 /// - **Write slowdown**: ~20-30% longer insert/update times
1928 /// - **Build time**: ~5,000 docs/sec for initial indexing
1929 ///
1930 /// # Arguments
1931 /// * `collection` - Name of the collection to index
1932 /// * `field` - Name of the field to index
1933 ///
1934 /// # Examples
1935 ///
1936 /// ```
1937 /// use aurora_db::Aurora;
1938 ///
1939 /// let db = Aurora::open("mydb.db")?;
1940 /// db.new_collection("users", vec![
1941 /// ("email", FieldType::String),
1942 /// ("age", FieldType::Int),
1943 /// ("active", FieldType::Bool),
1944 /// ])?;
1945 ///
1946 /// // Index email - frequently queried, high cardinality
1947 /// db.create_index("users", "email").await?;
1948 ///
1949 /// // Now this query is FAST (O(1) instead of O(n))
1950 /// let user = db.query("users")
1951 /// .filter(|f| f.eq("email", "alice@example.com"))
1952 /// .first_one()
1953 /// .await?;
1954 ///
1955 /// // DON'T index 'active' - low cardinality (only 2 values: true/false)
1956 /// // A full scan is fast enough for boolean fields
1957 ///
1958 /// // DO index 'age' if you frequently query age ranges
1959 /// db.create_index("users", "age").await?;
1960 ///
1961 /// let young_users = db.query("users")
1962 /// .filter(|f| f.lt("age", 30))
1963 /// .collect()
1964 /// .await?;
1965 /// ```
1966 ///
1967 /// # Real-World Example: E-commerce Orders
1968 ///
1969 /// ```
1970 /// // Orders collection: 1 million documents
1971 /// db.new_collection("orders", vec![
1972 /// ("user_id", FieldType::String), // High cardinality
1973 /// ("status", FieldType::String), // Low cardinality (pending, shipped, delivered)
1974 /// ("created_at", FieldType::String),
1975 /// ("total", FieldType::Float),
1976 /// ])?;
1977 ///
1978 /// // Index user_id - queries like "show me my orders" are common
1979 /// db.create_index("orders", "user_id").await?; // Good choice
1980 ///
1981 /// // Query speedup: 2.5s → 0.001s
1982 /// let my_orders = db.query("orders")
1983 /// .filter(|f| f.eq("user_id", user_id))
1984 /// .collect()
1985 /// .await?;
1986 ///
1987 /// // DON'T index 'status' - only 3 possible values
1988 /// // Scanning 1M docs takes ~100ms, indexing won't help much
1989 ///
1990 /// // Index created_at if you frequently query recent orders
1991 /// db.create_index("orders", "created_at").await?; // Good for time-based queries
1992 /// ```
1993 pub async fn create_index(&self, collection: &str, field: &str) -> Result<()> {
1994 // Check if collection exists
1995 if self.get(&format!("_collection:{}", collection))?.is_none() {
1996 return Err(AuroraError::CollectionNotFound(collection.to_string()));
1997 }
1998
1999 // Generate a default index name
2000 let index_name = format!("idx_{}_{}", collection, field);
2001
2002 // Create index definition
2003 let definition = IndexDefinition {
2004 name: index_name.clone(),
2005 collection: collection.to_string(),
2006 fields: vec![field.to_string()],
2007 index_type: IndexType::BTree,
2008 unique: false,
2009 };
2010
2011 // Create the index
2012 let index = Index::new(definition.clone());
2013
2014 // Index all existing documents in the collection
2015 let prefix = format!("{}:", collection);
2016 for result in self.cold.scan_prefix(&prefix) {
2017 if let Ok((_, data)) = result
2018 && let Ok(doc) = serde_json::from_slice::<Document>(&data) {
2019 let _ = index.insert(&doc);
2020 }
2021 }
2022
2023 // Store the index
2024 self.indices.insert(index_name, index);
2025
2026 // Store the index definition for persistence
2027 let index_key = format!("_index:{}:{}", collection, field);
2028 self.put(index_key, serde_json::to_vec(&definition)?, None)?;
2029
2030 Ok(())
2031 }
2032
2033 /// Query documents in a collection with filtering, sorting, and pagination
2034 ///
2035 /// Returns a `QueryBuilder` that allows fluent chaining of query operations.
2036 /// Queries use early termination for LIMIT clauses, making them extremely fast
2037 /// even on large collections (6,800x faster than naive implementations).
2038 ///
2039 /// # Performance
2040 /// - With LIMIT: O(k) where k = limit + offset (early termination!)
2041 /// - Without LIMIT: O(n) where n = matching documents
2042 /// - Uses secondary indices when available for equality filters
2043 /// - Hot cache: ~1M reads/sec, Cold storage: ~500K reads/sec
2044 ///
2045 /// # Examples
2046 ///
2047 /// ```
2048 /// use aurora_db::{Aurora, types::Value};
2049 ///
2050 /// let db = Aurora::open("mydb.db")?;
2051 ///
2052 /// // Simple equality query
2053 /// let active_users = db.query("users")
2054 /// .filter(|f| f.eq("active", Value::Bool(true)))
2055 /// .collect()
2056 /// .await?;
2057 ///
2058 /// // Range query with pagination (FAST - uses early termination!)
2059 /// let top_scorers = db.query("users")
2060 /// .filter(|f| f.gt("score", Value::Int(1000)))
2061 /// .order_by("score", false) // descending
2062 /// .limit(10)
2063 /// .offset(20)
2064 /// .collect()
2065 /// .await?;
2066 ///
2067 /// // Multiple filters
2068 /// let premium_active = db.query("users")
2069 /// .filter(|f| f.eq("tier", Value::String("premium".into())))
2070 /// .filter(|f| f.eq("active", Value::Bool(true)))
2071 /// .limit(100) // Only scans ~200 docs, not all million!
2072 /// .collect()
2073 /// .await?;
2074 ///
2075 /// // Text search in a field
2076 /// let matching = db.query("articles")
2077 /// .filter(|f| f.contains("title", "rust"))
2078 /// .collect()
2079 /// .await?;
2080 /// ```
2081 pub fn query<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
2082 QueryBuilder::new(self, collection)
2083 }
2084
2085 /// Create a search builder for full-text search
2086 ///
2087 /// # Arguments
2088 /// * `collection` - Name of the collection to search
2089 ///
2090 /// # Returns
2091 /// A `SearchBuilder` for configuring and executing searches
2092 ///
2093 /// # Examples
2094 ///
2095 /// ```
2096 /// // Search for documents containing text
2097 /// let search_results = db.search("articles")
2098 /// .field("content")
2099 /// .matching("quantum computing")
2100 /// .fuzzy(true) // Enable fuzzy matching for typo tolerance
2101 /// .collect()
2102 /// .await?;
2103 /// ```
2104 pub fn search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
2105 SearchBuilder::new(self, collection)
2106 }
2107
2108 /// Retrieve a document by ID
2109 ///
2110 /// Fast direct lookup when you know the document ID. Significantly faster
2111 /// than querying with filters when ID is known.
2112 ///
2113 /// # Performance
2114 /// - Hot cache: ~1,000,000 reads/sec (instant)
2115 /// - Cold storage: ~500,000 reads/sec (disk I/O)
2116 /// - Complexity: O(1) - constant time lookup
2117 /// - Much faster than `.query().filter(|f| f.eq("id", ...))` which is O(n)
2118 ///
2119 /// # Arguments
2120 /// * `collection` - Name of the collection to query
2121 /// * `id` - ID of the document to retrieve
2122 ///
2123 /// # Returns
2124 /// The document if found, None if not found, or an error
2125 ///
2126 /// # Examples
2127 ///
2128 /// ```
2129 /// use aurora_db::{Aurora, types::Value};
2130 ///
2131 /// let db = Aurora::open("mydb.db")?;
2132 ///
2133 /// // Basic retrieval
2134 /// if let Some(user) = db.get_document("users", &user_id)? {
2135 /// println!("Found user: {}", user.id);
2136 ///
2137 /// // Access fields safely
2138 /// if let Some(Value::String(name)) = user.data.get("name") {
2139 /// println!("Name: {}", name);
2140 /// }
2141 ///
2142 /// if let Some(Value::Int(age)) = user.data.get("age") {
2143 /// println!("Age: {}", age);
2144 /// }
2145 /// } else {
2146 /// println!("User not found");
2147 /// }
2148 ///
2149 /// // Idiomatic error handling
2150 /// let user = db.get_document("users", &user_id)?
2151 /// .ok_or_else(|| AuroraError::NotFound("User not found".into()))?;
2152 ///
2153 /// // Checking existence before operations
2154 /// if db.get_document("users", &user_id)?.is_some() {
2155 /// db.update_document("users", &user_id, vec![
2156 /// ("last_login", Value::String(chrono::Utc::now().to_rfc3339())),
2157 /// ]).await?;
2158 /// }
2159 ///
2160 /// // Batch retrieval (fetch multiple by ID)
2161 /// let user_ids = vec!["user-1", "user-2", "user-3"];
2162 /// let users: Vec<Document> = user_ids.iter()
2163 /// .filter_map(|id| db.get_document("users", id).ok().flatten())
2164 /// .collect();
2165 ///
2166 /// println!("Found {} out of {} users", users.len(), user_ids.len());
2167 /// ```
2168 ///
2169 /// # When to Use
2170 /// - You know the document ID (from insert, previous query, or URL param)
2171 /// - Need fastest possible lookup (1M reads/sec)
2172 /// - Fetching a single document
2173 ///
2174 /// # When NOT to Use
2175 /// - Searching by other fields → Use `query().filter()` instead
2176 /// - Need multiple documents by criteria → Use `query().collect()` instead
2177 /// - Don't know the ID → Use `find_by_field()` or `query()` instead
2178 pub fn get_document(&self, collection: &str, id: &str) -> Result<Option<Document>> {
2179 let key = format!("{}:{}", collection, id);
2180 if let Some(data) = self.get(&key)? {
2181 Ok(Some(serde_json::from_slice(&data)?))
2182 } else {
2183 Ok(None)
2184 }
2185 }
2186
2187 /// Delete a document by ID
2188 ///
2189 /// Permanently removes a document from storage, cache, and all indices.
2190 /// Publishes a delete event for PubSub subscribers. This operation cannot be undone.
2191 ///
2192 /// # Performance
2193 /// - Delete speed: ~50,000 deletes/sec
2194 /// - Cleans up hot cache, cold storage, primary + secondary indices
2195 /// - Triggers PubSub events for listeners
2196 ///
2197 /// # Arguments
2198 /// * `key` - Full key in format "collection:id" (e.g., "users:123")
2199 ///
2200 /// # Returns
2201 /// Success or an error
2202 ///
2203 /// # Errors
2204 /// - `InvalidOperation`: Invalid key format (must be "collection:id")
2205 /// - `IoError`: Storage deletion failed
2206 ///
2207 /// # Examples
2208 ///
2209 /// ```
2210 /// use aurora_db::Aurora;
2211 ///
2212 /// let db = Aurora::open("mydb.db")?;
2213 ///
2214 /// // Basic deletion (note: requires "collection:id" format)
2215 /// db.delete("users:abc123").await?;
2216 ///
2217 /// // Delete with existence check
2218 /// let user_id = "user-456";
2219 /// if db.get_document("users", user_id)?.is_some() {
2220 /// db.delete(&format!("users:{}", user_id)).await?;
2221 /// println!("User deleted");
2222 /// } else {
2223 /// println!("User not found");
2224 /// }
2225 ///
2226 /// // Error handling
2227 /// match db.delete("users:nonexistent").await {
2228 /// Ok(_) => println!("Deleted successfully"),
2229 /// Err(e) => println!("Delete failed: {}", e),
2230 /// }
2231 ///
2232 /// // Batch deletion using query
2233 /// let inactive_count = db.delete_where("users", |f| {
2234 /// f.eq("active", Value::Bool(false))
2235 /// }).await?;
2236 /// println!("Deleted {} inactive users", inactive_count);
2237 ///
2238 /// // Delete with cascading (manual cascade pattern)
2239 /// let user_id = "user-123";
2240 ///
2241 /// // Delete user's orders first
2242 /// let orders = db.query("orders")
2243 /// .filter(|f| f.eq("user_id", user_id))
2244 /// .collect()
2245 /// .await?;
2246 ///
2247 /// for order in orders {
2248 /// db.delete(&format!("orders:{}", order.id)).await?;
2249 /// }
2250 ///
2251 /// // Then delete the user
2252 /// db.delete(&format!("users:{}", user_id)).await?;
2253 /// println!("User and all orders deleted");
2254 /// ```
2255 ///
2256 /// # Alternative: Soft Delete Pattern
2257 ///
2258 /// For recoverable deletions, use soft deletes instead:
2259 ///
2260 /// ```
2261 /// // Soft delete - mark as deleted instead of removing
2262 /// db.update_document("users", &user_id, vec![
2263 /// ("deleted", Value::Bool(true)),
2264 /// ("deleted_at", Value::String(chrono::Utc::now().to_rfc3339())),
2265 /// ]).await?;
2266 ///
2267 /// // Query excludes soft-deleted items
2268 /// let active_users = db.query("users")
2269 /// .filter(|f| f.eq("deleted", Value::Bool(false)))
2270 /// .collect()
2271 /// .await?;
2272 ///
2273 /// // Later: hard delete after retention period
2274 /// let old_deletions = db.query("users")
2275 /// .filter(|f| f.eq("deleted", Value::Bool(true)))
2276 /// .filter(|f| f.lt("deleted_at", thirty_days_ago))
2277 /// .collect()
2278 /// .await?;
2279 ///
2280 /// for user in old_deletions {
2281 /// db.delete(&format!("users:{}", user.id)).await?;
2282 /// }
2283 /// ```
2284 ///
2285 /// # Important Notes
2286 /// - Deletion is permanent - no undo/recovery
2287 /// - Consider soft deletes for recoverable operations
2288 /// - Use transactions for multi-document deletions
2289 /// - PubSub subscribers will receive delete events
2290 /// - All indices are automatically cleaned up
2291 pub async fn delete(&self, key: &str) -> Result<()> {
2292 // Extract collection and id from key (format: "collection:id")
2293 let (collection, id) = if let Some((coll, doc_id)) = key.split_once(':') {
2294 (coll, doc_id)
2295 } else {
2296 return Err(AuroraError::InvalidOperation(
2297 "Invalid key format, expected 'collection:id'".into(),
2298 ));
2299 };
2300
2301 // CRITICAL FIX: Get document BEFORE deletion to clean up secondary indices
2302 let document = self.get_document(collection, id)?;
2303
2304 // Delete from hot cache
2305 if self.hot.get(key).is_some() {
2306 self.hot.delete(key);
2307 }
2308
2309 // Delete from cold storage
2310 self.cold.delete(key)?;
2311
2312 // CRITICAL FIX: Clean up ALL indices (primary + secondary)
2313 if let Some(doc) = document {
2314 self.remove_from_indices(collection, &doc)?;
2315 } else {
2316 // Fallback: at least remove from primary index
2317 if let Some(index) = self.primary_indices.get_mut(collection) {
2318 index.remove(id);
2319 }
2320 }
2321
2322 // Publish delete event
2323 let event = crate::pubsub::ChangeEvent::delete(collection, id);
2324 let _ = self.pubsub.publish(event);
2325
2326 Ok(())
2327 }
2328
2329 pub async fn delete_collection(&self, collection: &str) -> Result<()> {
2330 let prefix = format!("{}:", collection);
2331
2332 // Get all keys in collection
2333 let keys: Vec<String> = self
2334 .cold
2335 .scan()
2336 .filter_map(|r| r.ok())
2337 .filter(|(k, _)| k.starts_with(&prefix))
2338 .map(|(k, _)| k)
2339 .collect();
2340
2341 // Delete each key
2342 for key in keys {
2343 self.delete(&key).await?;
2344 }
2345
2346 // Remove collection indices
2347 self.primary_indices.remove(collection);
2348 self.secondary_indices
2349 .retain(|k, _| !k.starts_with(&prefix));
2350
2351 // Invalidate schema cache
2352 self.schema_cache.remove(collection);
2353
2354 Ok(())
2355 }
2356
2357 fn remove_from_indices(&self, collection: &str, doc: &Document) -> Result<()> {
2358 // Remove from primary index
2359 if let Some(index) = self.primary_indices.get(collection) {
2360 index.remove(&doc.id);
2361 }
2362
2363 // Remove from secondary indices
2364 for (field, value) in &doc.data {
2365 let index_key = format!("{}:{}", collection, field);
2366 if let Some(index) = self.secondary_indices.get(&index_key)
2367 && let Some(mut doc_ids) = index.get_mut(&value.to_string()) {
2368 doc_ids.retain(|id| id != &doc.id);
2369 }
2370 }
2371
2372 Ok(())
2373 }
2374
2375 pub async fn search_text(
2376 &self,
2377 collection: &str,
2378 field: &str,
2379 query: &str,
2380 ) -> Result<Vec<Document>> {
2381 let mut results = Vec::new();
2382 let docs = self.get_all_collection(collection).await?;
2383
2384 for doc in docs {
2385 if let Some(Value::String(text)) = doc.data.get(field)
2386 && text.to_lowercase().contains(&query.to_lowercase()) {
2387 results.push(doc);
2388 }
2389 }
2390
2391 Ok(results)
2392 }
2393
2394 /// Export a collection to a JSON file
2395 ///
2396 /// Creates a JSON file containing all documents in the collection.
2397 /// Useful for backups, data migration, or sharing datasets.
2398 /// Automatically appends `.json` extension if not present.
2399 ///
2400 /// # Performance
2401 /// - Export speed: ~10,000 docs/sec
2402 /// - Scans entire collection from cold storage
2403 /// - Memory efficient: streams documents to file
2404 ///
2405 /// # Arguments
2406 /// * `collection` - Name of the collection to export
2407 /// * `output_path` - Path to the output JSON file (`.json` auto-appended)
2408 ///
2409 /// # Returns
2410 /// Success or an error
2411 ///
2412 /// # Examples
2413 ///
2414 /// ```
2415 /// use aurora_db::Aurora;
2416 ///
2417 /// let db = Aurora::open("mydb.db")?;
2418 ///
2419 /// // Basic export
2420 /// db.export_as_json("users", "./backups/users_2024-01-15")?;
2421 /// // Creates: ./backups/users_2024-01-15.json
2422 ///
2423 /// // Timestamped backup
2424 /// let timestamp = chrono::Utc::now().format("%Y%m%d_%H%M%S");
2425 /// let backup_path = format!("./backups/users_{}", timestamp);
2426 /// db.export_as_json("users", &backup_path)?;
2427 ///
2428 /// // Export multiple collections
2429 /// for collection in &["users", "orders", "products"] {
2430 /// db.export_as_json(collection, &format!("./export/{}", collection))?;
2431 /// }
2432 /// ```
2433 ///
2434 /// # Output Format
2435 ///
2436 /// The exported JSON has this structure:
2437 /// ```json
2438 /// {
2439 /// "users": [
2440 /// { "id": "123", "name": "Alice", "email": "alice@example.com" },
2441 /// { "id": "456", "name": "Bob", "email": "bob@example.com" }
2442 /// ]
2443 /// }
2444 /// ```
2445 ///
2446 /// # See Also
2447 /// - `export_as_csv()` for CSV format export
2448 /// - `import_from_json()` to restore exported data
2449 pub fn export_as_json(&self, collection: &str, output_path: &str) -> Result<()> {
2450 let output_path = if !output_path.ends_with(".json") {
2451 format!("{}.json", output_path)
2452 } else {
2453 output_path.to_string()
2454 };
2455
2456 let mut docs = Vec::new();
2457
2458 // Get all documents from the specified collection
2459 for result in self.cold.scan() {
2460 let (key, value) = result?;
2461
2462 // Only process documents from the specified collection
2463 if let Some(key_collection) = key.split(':').next()
2464 && key_collection == collection && !key.starts_with("_collection:")
2465 && let Ok(doc) = serde_json::from_slice::<Document>(&value) {
2466 // Convert Value enum to raw JSON values
2467 let mut clean_doc = serde_json::Map::new();
2468 for (k, v) in doc.data {
2469 match v {
2470 Value::String(s) => clean_doc.insert(k, JsonValue::String(s)),
2471 Value::Int(i) => clean_doc.insert(k, JsonValue::Number(i.into())),
2472 Value::Float(f) => {
2473 if let Some(n) = serde_json::Number::from_f64(f) {
2474 clean_doc.insert(k, JsonValue::Number(n))
2475 } else {
2476 clean_doc.insert(k, JsonValue::Null)
2477 }
2478 }
2479 Value::Bool(b) => clean_doc.insert(k, JsonValue::Bool(b)),
2480 Value::Array(arr) => {
2481 let clean_arr: Vec<JsonValue> = arr
2482 .into_iter()
2483 .map(|v| match v {
2484 Value::String(s) => JsonValue::String(s),
2485 Value::Int(i) => JsonValue::Number(i.into()),
2486 Value::Float(f) => serde_json::Number::from_f64(f)
2487 .map(JsonValue::Number)
2488 .unwrap_or(JsonValue::Null),
2489 Value::Bool(b) => JsonValue::Bool(b),
2490 Value::Null => JsonValue::Null,
2491 _ => JsonValue::Null,
2492 })
2493 .collect();
2494 clean_doc.insert(k, JsonValue::Array(clean_arr))
2495 }
2496 Value::Uuid(u) => {
2497 clean_doc.insert(k, JsonValue::String(u.to_string()))
2498 }
2499 Value::Null => clean_doc.insert(k, JsonValue::Null),
2500 Value::Object(_) => None, // Handle nested objects if needed
2501 };
2502 }
2503 docs.push(JsonValue::Object(clean_doc));
2504 }
2505 }
2506
2507 let output = JsonValue::Object(serde_json::Map::from_iter(vec![(
2508 collection.to_string(),
2509 JsonValue::Array(docs),
2510 )]));
2511
2512 let mut file = StdFile::create(&output_path)?;
2513 serde_json::to_writer_pretty(&mut file, &output)?;
2514 println!("Exported collection '{}' to {}", collection, &output_path);
2515 Ok(())
2516 }
2517
2518 /// Export a collection to a CSV file
2519 ///
2520 /// Creates a CSV file with headers from the first document and rows for each document.
2521 /// Useful for spreadsheet analysis, data science workflows, or reporting.
2522 /// Automatically appends `.csv` extension if not present.
2523 ///
2524 /// # Performance
2525 /// - Export speed: ~8,000 docs/sec
2526 /// - Memory efficient: streams rows to file
2527 /// - Headers determined from first document
2528 ///
2529 /// # Arguments
2530 /// * `collection` - Name of the collection to export
2531 /// * `filename` - Path to the output CSV file (`.csv` auto-appended)
2532 ///
2533 /// # Returns
2534 /// Success or an error
2535 ///
2536 /// # Examples
2537 ///
2538 /// ```
2539 /// use aurora_db::Aurora;
2540 ///
2541 /// let db = Aurora::open("mydb.db")?;
2542 ///
2543 /// // Basic CSV export
2544 /// db.export_as_csv("users", "./reports/users")?;
2545 /// // Creates: ./reports/users.csv
2546 ///
2547 /// // Export for analysis in Excel/Google Sheets
2548 /// db.export_as_csv("orders", "./analytics/sales_data")?;
2549 ///
2550 /// // Monthly report generation
2551 /// let month = chrono::Utc::now().format("%Y-%m");
2552 /// db.export_as_csv("transactions", &format!("./reports/transactions_{}", month))?;
2553 /// ```
2554 ///
2555 /// # Output Format
2556 ///
2557 /// ```csv
2558 /// id,name,email,age
2559 /// 123,Alice,alice@example.com,28
2560 /// 456,Bob,bob@example.com,32
2561 /// ```
2562 ///
2563 /// # Important Notes
2564 /// - Headers are taken from the first document's fields
2565 /// - Documents with different fields will have empty values for missing fields
2566 /// - Nested objects/arrays are converted to strings
2567 /// - Best for flat document structures
2568 ///
2569 /// # See Also
2570 /// - `export_as_json()` for JSON format (better for nested data)
2571 /// - For complex nested structures, use JSON export instead
2572 pub fn export_as_csv(&self, collection: &str, filename: &str) -> Result<()> {
2573 let output_path = if !filename.ends_with(".csv") {
2574 format!("{}.csv", filename)
2575 } else {
2576 filename.to_string()
2577 };
2578
2579 let mut writer = csv::Writer::from_path(&output_path)?;
2580 let mut headers = Vec::new();
2581 let mut first_doc = true;
2582
2583 // Get all documents from the specified collection
2584 for result in self.cold.scan() {
2585 let (key, value) = result?;
2586
2587 // Only process documents from the specified collection
2588 if let Some(key_collection) = key.split(':').next()
2589 && key_collection == collection && !key.starts_with("_collection:")
2590 && let Ok(doc) = serde_json::from_slice::<Document>(&value) {
2591 // Write headers from first document
2592 if first_doc && !doc.data.is_empty() {
2593 headers = doc.data.keys().cloned().collect();
2594 writer.write_record(&headers)?;
2595 first_doc = false;
2596 }
2597
2598 // Write the document values
2599 let values: Vec<String> = headers
2600 .iter()
2601 .map(|header| {
2602 doc.data
2603 .get(header)
2604 .map(|v| v.to_string())
2605 .unwrap_or_default()
2606 })
2607 .collect();
2608 writer.write_record(&values)?;
2609 }
2610 }
2611
2612 writer.flush()?;
2613 println!("Exported collection '{}' to {}", collection, &output_path);
2614 Ok(())
2615 }
2616
2617 // Helper method to create filter-based queries
2618 pub fn find<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
2619 self.query(collection)
2620 }
2621
2622 // Convenience methods that build on top of the FilterBuilder
2623
2624 pub async fn find_by_id(&self, collection: &str, id: &str) -> Result<Option<Document>> {
2625 self.query(collection)
2626 .filter(|f| f.eq("id", id))
2627 .first_one()
2628 .await
2629 }
2630
2631 pub async fn find_one<F>(&self, collection: &str, filter_fn: F) -> Result<Option<Document>>
2632 where
2633 F: Fn(&FilterBuilder) -> bool + Send + Sync + 'static,
2634 {
2635 self.query(collection).filter(filter_fn).first_one().await
2636 }
2637
2638 pub async fn find_by_field<T: Into<Value> + Clone + Send + Sync + 'static>(
2639 &self,
2640 collection: &str,
2641 field: &'static str,
2642 value: T,
2643 ) -> Result<Vec<Document>> {
2644 let value_clone = value.clone();
2645 self.query(collection)
2646 .filter(move |f| f.eq(field, value_clone.clone()))
2647 .collect()
2648 .await
2649 }
2650
2651 pub async fn find_by_fields(
2652 &self,
2653 collection: &str,
2654 fields: Vec<(&str, Value)>,
2655 ) -> Result<Vec<Document>> {
2656 let mut query = self.query(collection);
2657
2658 for (field, value) in fields {
2659 let field_owned = field.to_owned();
2660 let value_owned = value.clone();
2661 query = query.filter(move |f| f.eq(&field_owned, value_owned.clone()));
2662 }
2663
2664 query.collect().await
2665 }
2666
2667 // Advanced example: find documents with a field value in a specific range
2668 pub async fn find_in_range<T: Into<Value> + Clone + Send + Sync + 'static>(
2669 &self,
2670 collection: &str,
2671 field: &'static str,
2672 min: T,
2673 max: T,
2674 ) -> Result<Vec<Document>> {
2675 self.query(collection)
2676 .filter(move |f| f.between(field, min.clone(), max.clone()))
2677 .collect()
2678 .await
2679 }
2680
2681 // Complex query example: build with multiple combined filters
2682 pub async fn find_complex<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
2683 self.query(collection)
2684 }
2685
2686 // Create a full-text search query with added filter options
2687 pub fn advanced_search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
2688 self.search(collection)
2689 }
2690
2691 // Utility methods for common operations
2692 pub async fn upsert(
2693 &self,
2694 collection: &str,
2695 id: &str,
2696 data: Vec<(&str, Value)>,
2697 ) -> Result<String> {
2698 // Convert Vec<(&str, Value)> to HashMap<String, Value>
2699 let data_map: HashMap<String, Value> =
2700 data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
2701
2702 // Check if document exists
2703 if let Some(mut doc) = self.get_document(collection, id)? {
2704 // Update existing document - merge new data
2705 for (key, value) in data_map {
2706 doc.data.insert(key, value);
2707 }
2708
2709 // Validate unique constraints for the updated document
2710 // We need to exclude the current document from the uniqueness check
2711 self.validate_unique_constraints_excluding(collection, &doc.data, id)
2712 .await?;
2713
2714 self.put(
2715 format!("{}:{}", collection, id),
2716 serde_json::to_vec(&doc)?,
2717 None,
2718 )?;
2719 Ok(id.to_string())
2720 } else {
2721 // Insert new document with specified ID - validate unique constraints
2722 self.validate_unique_constraints(collection, &data_map)
2723 .await?;
2724
2725 let document = Document {
2726 id: id.to_string(),
2727 data: data_map,
2728 };
2729
2730 self.put(
2731 format!("{}:{}", collection, id),
2732 serde_json::to_vec(&document)?,
2733 None,
2734 )?;
2735 Ok(id.to_string())
2736 }
2737 }
2738
2739 // Atomic increment/decrement
2740 pub async fn increment(
2741 &self,
2742 collection: &str,
2743 id: &str,
2744 field: &str,
2745 amount: i64,
2746 ) -> Result<i64> {
2747 if let Some(mut doc) = self.get_document(collection, id)? {
2748 // Get current value
2749 let current = match doc.data.get(field) {
2750 Some(Value::Int(i)) => *i,
2751 _ => 0,
2752 };
2753
2754 // Increment
2755 let new_value = current + amount;
2756 doc.data.insert(field.to_string(), Value::Int(new_value));
2757
2758 // Save changes
2759 self.put(
2760 format!("{}:{}", collection, id),
2761 serde_json::to_vec(&doc)?,
2762 None,
2763 )?;
2764
2765 Ok(new_value)
2766 } else {
2767 Err(AuroraError::NotFound(format!(
2768 "Document {}:{} not found",
2769 collection, id
2770 )))
2771 }
2772 }
2773
2774 // Delete documents by query
2775 pub async fn delete_by_query<F>(&self, collection: &str, filter_fn: F) -> Result<usize>
2776 where
2777 F: Fn(&FilterBuilder) -> bool + Send + Sync + 'static,
2778 {
2779 let docs = self.query(collection).filter(filter_fn).collect().await?;
2780
2781 let mut deleted_count = 0;
2782
2783 for doc in docs {
2784 let key = format!("{}:{}", collection, doc.id);
2785 self.delete(&key).await?;
2786 deleted_count += 1;
2787 }
2788
2789 Ok(deleted_count)
2790 }
2791
2792 /// Import documents from a JSON file into a collection
2793 ///
2794 /// Validates each document against the collection schema, skips duplicates (by ID),
2795 /// and provides detailed statistics about the import operation. Useful for restoring
2796 /// backups, migrating data, or seeding development databases.
2797 ///
2798 /// # Performance
2799 /// - Import speed: ~5,000 docs/sec (with validation)
2800 /// - Memory efficient: processes documents one at a time
2801 /// - Validates schema and unique constraints
2802 ///
2803 /// # Arguments
2804 /// * `collection` - Name of the collection to import into
2805 /// * `filename` - Path to the JSON file containing documents (array format)
2806 ///
2807 /// # Returns
2808 /// `ImportStats` containing counts of imported, skipped, and failed documents
2809 ///
2810 /// # Examples
2811 ///
2812 /// ```
2813 /// use aurora_db::Aurora;
2814 ///
2815 /// let db = Aurora::open("mydb.db")?;
2816 ///
2817 /// // Basic import
2818 /// let stats = db.import_from_json("users", "./data/new_users.json").await?;
2819 /// println!("Imported: {}, Skipped: {}, Failed: {}",
2820 /// stats.imported, stats.skipped, stats.failed);
2821 ///
2822 /// // Restore from backup
2823 /// let backup_file = "./backups/users_2024-01-15.json";
2824 /// let stats = db.import_from_json("users", backup_file).await?;
2825 ///
2826 /// if stats.failed > 0 {
2827 /// eprintln!("Warning: {} documents failed validation", stats.failed);
2828 /// }
2829 ///
2830 /// // Idempotent import - duplicates are skipped
2831 /// let stats = db.import_from_json("users", "./data/users.json").await?;
2832 /// // Running again will skip all existing documents
2833 /// let stats2 = db.import_from_json("users", "./data/users.json").await?;
2834 /// assert_eq!(stats2.skipped, stats.imported);
2835 ///
2836 /// // Migration from another system
2837 /// db.new_collection("products", vec![
2838 /// ("sku", FieldType::String),
2839 /// ("name", FieldType::String),
2840 /// ("price", FieldType::Float),
2841 /// ])?;
2842 ///
2843 /// let stats = db.import_from_json("products", "./migration/products.json").await?;
2844 /// println!("Migration complete: {} products imported", stats.imported);
2845 /// ```
2846 ///
2847 /// # Expected JSON Format
2848 ///
2849 /// The JSON file should contain an array of document objects:
2850 /// ```json
2851 /// [
2852 /// { "id": "123", "name": "Alice", "email": "alice@example.com" },
2853 /// { "id": "456", "name": "Bob", "email": "bob@example.com" },
2854 /// { "name": "Carol", "email": "carol@example.com" }
2855 /// ]
2856 /// ```
2857 ///
2858 /// # Behavior
2859 /// - Documents with existing IDs are skipped (duplicate detection)
2860 /// - Documents without IDs get auto-generated UUIDs
2861 /// - Schema validation is performed on all fields
2862 /// - Failed documents are counted but don't stop the import
2863 /// - Unique constraints are checked
2864 ///
2865 /// # See Also
2866 /// - `export_as_json()` to create compatible backup files
2867 /// - `batch_insert()` for programmatic bulk inserts
2868 pub async fn import_from_json(&self, collection: &str, filename: &str) -> Result<ImportStats> {
2869 // Validate that the collection exists
2870 let collection_def = self.get_collection_definition(collection)?;
2871
2872 // Load JSON file
2873 let json_string = read_to_string(filename)
2874 .await
2875 .map_err(|e| AuroraError::IoError(format!("Failed to read import file: {}", e)))?;
2876
2877 // Parse JSON
2878 let documents: Vec<JsonValue> = from_str(&json_string)
2879 .map_err(|e| AuroraError::SerializationError(format!("Failed to parse JSON: {}", e)))?;
2880
2881 let mut stats = ImportStats::default();
2882
2883 // Process each document
2884 for doc_json in documents {
2885 match self
2886 .import_document(collection, &collection_def, doc_json)
2887 .await
2888 {
2889 Ok(ImportResult::Imported) => stats.imported += 1,
2890 Ok(ImportResult::Skipped) => stats.skipped += 1,
2891 Err(_) => stats.failed += 1,
2892 }
2893 }
2894
2895 Ok(stats)
2896 }
2897
2898 /// Import a single document, performing schema validation and duplicate checking
2899 async fn import_document(
2900 &self,
2901 collection: &str,
2902 collection_def: &Collection,
2903 doc_json: JsonValue,
2904 ) -> Result<ImportResult> {
2905 if !doc_json.is_object() {
2906 return Err(AuroraError::InvalidOperation("Expected JSON object".into()));
2907 }
2908
2909 // Extract document ID if present
2910 let doc_id = doc_json
2911 .get("id")
2912 .and_then(|id| id.as_str())
2913 .map(|s| s.to_string())
2914 .unwrap_or_else(|| Uuid::new_v4().to_string());
2915
2916 // Check if document with this ID already exists
2917 if self.get_document(collection, &doc_id)?.is_some() {
2918 return Ok(ImportResult::Skipped);
2919 }
2920
2921 // Convert JSON to our document format and validate against schema
2922 let mut data_map = HashMap::new();
2923
2924 if let Some(obj) = doc_json.as_object() {
2925 for (field_name, field_def) in &collection_def.fields {
2926 if let Some(json_value) = obj.get(field_name) {
2927 // Validate value against field type
2928 if !self.validate_field_value(json_value, &field_def.field_type) {
2929 return Err(AuroraError::InvalidOperation(format!(
2930 "Field '{}' has invalid type",
2931 field_name
2932 )));
2933 }
2934
2935 // Convert JSON value to our Value type
2936 let value = self.json_to_value(json_value)?;
2937 data_map.insert(field_name.clone(), value);
2938 } else if field_def.unique {
2939 // Missing required unique field
2940 return Err(AuroraError::InvalidOperation(format!(
2941 "Missing required unique field '{}'",
2942 field_name
2943 )));
2944 }
2945 }
2946 }
2947
2948 // Check for duplicates by unique fields
2949 let unique_fields = self.get_unique_fields(collection_def);
2950 for unique_field in &unique_fields {
2951 if let Some(value) = data_map.get(unique_field) {
2952 // Query for existing documents with this unique value
2953 let query_results = self
2954 .query(collection)
2955 .filter(move |f| f.eq(unique_field, value.clone()))
2956 .limit(1)
2957 .collect()
2958 .await?;
2959
2960 if !query_results.is_empty() {
2961 // Found duplicate by unique field
2962 return Ok(ImportResult::Skipped);
2963 }
2964 }
2965 }
2966
2967 // Create and insert document
2968 let document = Document {
2969 id: doc_id,
2970 data: data_map,
2971 };
2972
2973 self.put(
2974 format!("{}:{}", collection, document.id),
2975 serde_json::to_vec(&document)?,
2976 None,
2977 )?;
2978
2979 Ok(ImportResult::Imported)
2980 }
2981
2982 /// Validate that a JSON value matches the expected field type
2983 fn validate_field_value(&self, value: &JsonValue, field_type: &FieldType) -> bool {
2984 match field_type {
2985 FieldType::String => value.is_string(),
2986 FieldType::Int => value.is_i64() || value.is_u64(),
2987 FieldType::Float => value.is_number(),
2988 FieldType::Bool => value.is_boolean(),
2989 FieldType::Array => value.is_array(),
2990 FieldType::Object => value.is_object(),
2991 FieldType::Uuid => {
2992 value.is_string() && Uuid::parse_str(value.as_str().unwrap_or("")).is_ok()
2993 }
2994 }
2995 }
2996
2997 /// Convert a JSON value to our internal Value type
2998 #[allow(clippy::only_used_in_recursion)]
2999 fn json_to_value(&self, json_value: &JsonValue) -> Result<Value> {
3000 match json_value {
3001 JsonValue::Null => Ok(Value::Null),
3002 JsonValue::Bool(b) => Ok(Value::Bool(*b)),
3003 JsonValue::Number(n) => {
3004 if let Some(i) = n.as_i64() {
3005 Ok(Value::Int(i))
3006 } else if let Some(f) = n.as_f64() {
3007 Ok(Value::Float(f))
3008 } else {
3009 Err(AuroraError::InvalidOperation("Invalid number value".into()))
3010 }
3011 }
3012 JsonValue::String(s) => {
3013 // Try parsing as UUID first
3014 if let Ok(uuid) = Uuid::parse_str(s) {
3015 Ok(Value::Uuid(uuid))
3016 } else {
3017 Ok(Value::String(s.clone()))
3018 }
3019 }
3020 JsonValue::Array(arr) => {
3021 let mut values = Vec::new();
3022 for item in arr {
3023 values.push(self.json_to_value(item)?);
3024 }
3025 Ok(Value::Array(values))
3026 }
3027 JsonValue::Object(obj) => {
3028 let mut map = HashMap::new();
3029 for (k, v) in obj {
3030 map.insert(k.clone(), self.json_to_value(v)?);
3031 }
3032 Ok(Value::Object(map))
3033 }
3034 }
3035 }
3036
3037 /// Get collection definition
3038 fn get_collection_definition(&self, collection: &str) -> Result<Collection> {
3039 if let Some(data) = self.get(&format!("_collection:{}", collection))? {
3040 let collection_def: Collection = serde_json::from_slice(&data)?;
3041 Ok(collection_def)
3042 } else {
3043 Err(AuroraError::CollectionNotFound(collection.to_string()))
3044 }
3045 }
3046
3047 /// Get storage statistics and information about the database
3048 pub fn get_database_stats(&self) -> Result<DatabaseStats> {
3049 let hot_stats = self.hot.get_stats();
3050 let cold_stats = self.cold.get_stats()?;
3051
3052 Ok(DatabaseStats {
3053 hot_stats,
3054 cold_stats,
3055 estimated_size: self.cold.estimated_size(),
3056 collections: self.get_collection_stats()?,
3057 })
3058 }
3059
3060 /// Check if a key is currently stored in the hot cache
3061 pub fn is_in_hot_cache(&self, key: &str) -> bool {
3062 self.hot.is_hot(key)
3063 }
3064
3065 /// Start background cleanup of hot cache with specified interval
3066 pub async fn start_hot_cache_maintenance(&self, interval_secs: u64) {
3067 let hot_store = Arc::new(self.hot.clone());
3068 hot_store.start_cleanup_with_interval(interval_secs).await;
3069 }
3070
3071 /// Clear the hot cache (useful when memory needs to be freed)
3072 pub fn clear_hot_cache(&self) {
3073 self.hot.clear();
3074 println!(
3075 "Hot cache cleared, current hit ratio: {:.2}%",
3076 self.hot.hit_ratio() * 100.0
3077 );
3078 }
3079
3080 /// Prewarm the cache by loading frequently accessed data from cold storage
3081 ///
3082 /// Loads documents from a collection into memory cache to eliminate cold-start
3083 /// latency. Dramatically improves initial query performance after database startup
3084 /// by preloading the most commonly accessed data.
3085 ///
3086 /// # Performance Impact
3087 /// - Prewarming speed: ~20,000 docs/sec
3088 /// - Improves subsequent read latency from ~2ms (disk) to ~0.001ms (memory)
3089 /// - Cache hit rate jumps from 0% to 95%+ for prewarmed data
3090 /// - Memory cost: ~500 bytes per document average
3091 ///
3092 /// # Arguments
3093 /// * `collection` - The collection to prewarm
3094 /// * `limit` - Maximum number of documents to load (default: 1000, None = all)
3095 ///
3096 /// # Returns
3097 /// Number of documents loaded into cache
3098 ///
3099 /// # Examples
3100 ///
3101 /// ```
3102 /// use aurora_db::Aurora;
3103 ///
3104 /// let db = Aurora::open("mydb.db")?;
3105 ///
3106 /// // Prewarm frequently accessed collection
3107 /// let loaded = db.prewarm_cache("users", Some(1000)).await?;
3108 /// println!("Prewarmed {} user documents", loaded);
3109 ///
3110 /// // Now queries are fast from the start
3111 /// let stats_before = db.get_cache_stats();
3112 /// let users = db.query("users").collect().await?;
3113 /// let stats_after = db.get_cache_stats();
3114 ///
3115 /// // High hit rate thanks to prewarming
3116 /// assert!(stats_after.hit_rate > 0.95);
3117 ///
3118 /// // Startup optimization pattern
3119 /// async fn startup_prewarm(db: &Aurora) -> Result<()> {
3120 /// println!("Prewarming caches...");
3121 ///
3122 /// // Prewarm most frequently accessed collections
3123 /// db.prewarm_cache("users", Some(5000)).await?;
3124 /// db.prewarm_cache("sessions", Some(1000)).await?;
3125 /// db.prewarm_cache("products", Some(500)).await?;
3126 ///
3127 /// let stats = db.get_cache_stats();
3128 /// println!("Cache prewarmed: {} entries loaded", stats.size);
3129 ///
3130 /// Ok(())
3131 /// }
3132 ///
3133 /// // Web server startup
3134 /// #[tokio::main]
3135 /// async fn main() {
3136 /// let db = Aurora::open("app.db").unwrap();
3137 ///
3138 /// // Prewarm before accepting requests
3139 /// db.prewarm_cache("users", Some(10000)).await.unwrap();
3140 ///
3141 /// // Server is now ready with hot cache
3142 /// start_web_server(db).await;
3143 /// }
3144 ///
3145 /// // Prewarm all documents (for small collections)
3146 /// let all_loaded = db.prewarm_cache("config", None).await?;
3147 /// // All config documents now in memory
3148 ///
3149 /// // Selective prewarming based on access patterns
3150 /// async fn smart_prewarm(db: &Aurora) -> Result<()> {
3151 /// // Load recent users (they're accessed most)
3152 /// db.prewarm_cache("users", Some(1000)).await?;
3153 ///
3154 /// // Load active sessions only
3155 /// let active_sessions = db.query("sessions")
3156 /// .filter(|f| f.eq("active", Value::Bool(true)))
3157 /// .limit(500)
3158 /// .collect()
3159 /// .await?;
3160 ///
3161 /// // Manually populate cache with hot data
3162 /// for session in active_sessions {
3163 /// // Reading automatically caches
3164 /// db.get_document("sessions", &session.id)?;
3165 /// }
3166 ///
3167 /// Ok(())
3168 /// }
3169 /// ```
3170 ///
3171 /// # Typical Prewarming Scenarios
3172 ///
3173 /// **Web Application Startup:**
3174 /// ```
3175 /// // Load user data, sessions, and active content
3176 /// db.prewarm_cache("users", Some(5000)).await?;
3177 /// db.prewarm_cache("sessions", Some(2000)).await?;
3178 /// db.prewarm_cache("posts", Some(1000)).await?;
3179 /// ```
3180 ///
3181 /// **E-commerce Site:**
3182 /// ```
3183 /// // Load products, categories, and user carts
3184 /// db.prewarm_cache("products", Some(500)).await?;
3185 /// db.prewarm_cache("categories", None).await?; // All categories
3186 /// db.prewarm_cache("active_carts", Some(1000)).await?;
3187 /// ```
3188 ///
3189 /// **API Server:**
3190 /// ```
3191 /// // Load authentication data and rate limits
3192 /// db.prewarm_cache("api_keys", None).await?;
3193 /// db.prewarm_cache("rate_limits", Some(10000)).await?;
3194 /// ```
3195 ///
3196 /// # When to Use
3197 /// - At application startup to eliminate cold-start latency
3198 /// - After cache clear operations
3199 /// - Before high-traffic events (product launches, etc.)
3200 /// - When deploying new instances (load balancer warm-up)
3201 ///
3202 /// # Memory Considerations
3203 /// - 1,000 docs ≈ 500 KB memory
3204 /// - 10,000 docs ≈ 5 MB memory
3205 /// - 100,000 docs ≈ 50 MB memory
3206 /// - Stay within configured cache capacity
3207 ///
3208 /// # See Also
3209 /// - `get_cache_stats()` to monitor cache effectiveness
3210 /// - `prewarm_all_collections()` to prewarm all collections
3211 /// - `Aurora::with_config()` to adjust cache capacity
3212 pub async fn prewarm_cache(&self, collection: &str, limit: Option<usize>) -> Result<usize> {
3213 let limit = limit.unwrap_or(1000);
3214 let prefix = format!("{}:", collection);
3215 let mut loaded = 0;
3216
3217 for entry in self.cold.scan_prefix(&prefix) {
3218 if loaded >= limit {
3219 break;
3220 }
3221
3222 if let Ok((key, value)) = entry {
3223 // Load into hot cache
3224 self.hot.set(key.clone(), value, None);
3225 loaded += 1;
3226 }
3227 }
3228
3229 println!("Prewarmed {} with {} documents", collection, loaded);
3230 Ok(loaded)
3231 }
3232
3233 /// Prewarm cache for all collections
3234 pub async fn prewarm_all_collections(
3235 &self,
3236 docs_per_collection: Option<usize>,
3237 ) -> Result<HashMap<String, usize>> {
3238 let mut stats = HashMap::new();
3239
3240 // Get all collections
3241 let collections: Vec<String> = self
3242 .cold
3243 .scan()
3244 .filter_map(|r| r.ok())
3245 .map(|(k, _)| k)
3246 .filter(|k| k.starts_with("_collection:"))
3247 .map(|k| k.trim_start_matches("_collection:").to_string())
3248 .collect();
3249
3250 for collection in collections {
3251 let loaded = self.prewarm_cache(&collection, docs_per_collection).await?;
3252 stats.insert(collection, loaded);
3253 }
3254
3255 Ok(stats)
3256 }
3257
3258 /// Store multiple key-value pairs efficiently in a single batch operation
3259 ///
3260 /// Low-level batch write operation that bypasses document validation and
3261 /// writes raw byte data directly to storage. Useful for advanced use cases,
3262 /// custom serialization, or maximum performance scenarios.
3263 ///
3264 /// # Performance
3265 /// - Write speed: ~100,000 writes/sec
3266 /// - Single disk fsync for entire batch
3267 /// - No validation or schema checking
3268 /// - Direct storage access
3269 ///
3270 /// # Arguments
3271 /// * `pairs` - Vector of (key, value) tuples where value is raw bytes
3272 ///
3273 /// # Returns
3274 /// Success or an error
3275 ///
3276 /// # Examples
3277 ///
3278 /// ```
3279 /// use aurora_db::Aurora;
3280 ///
3281 /// let db = Aurora::open("mydb.db")?;
3282 ///
3283 /// // Low-level batch write
3284 /// let pairs = vec![
3285 /// ("users:123".to_string(), b"raw data 1".to_vec()),
3286 /// ("users:456".to_string(), b"raw data 2".to_vec()),
3287 /// ("cache:key1".to_string(), b"cached value".to_vec()),
3288 /// ];
3289 ///
3290 /// db.batch_write(pairs)?;
3291 ///
3292 /// // Custom binary serialization
3293 /// use bincode;
3294 ///
3295 /// #[derive(Serialize, Deserialize)]
3296 /// struct CustomData {
3297 /// id: u64,
3298 /// payload: Vec<u8>,
3299 /// }
3300 ///
3301 /// let custom_data = vec![
3302 /// CustomData { id: 1, payload: vec![1, 2, 3] },
3303 /// CustomData { id: 2, payload: vec![4, 5, 6] },
3304 /// ];
3305 ///
3306 /// let pairs: Vec<(String, Vec<u8>)> = custom_data
3307 /// .iter()
3308 /// .map(|data| {
3309 /// let key = format!("binary:{}", data.id);
3310 /// let value = bincode::serialize(data).unwrap();
3311 /// (key, value)
3312 /// })
3313 /// .collect();
3314 ///
3315 /// db.batch_write(pairs)?;
3316 ///
3317 /// // Bulk cache population
3318 /// let cache_entries: Vec<(String, Vec<u8>)> = (0..10000)
3319 /// .map(|i| {
3320 /// let key = format!("cache:item_{}", i);
3321 /// let value = format!("value_{}", i).into_bytes();
3322 /// (key, value)
3323 /// })
3324 /// .collect();
3325 ///
3326 /// db.batch_write(cache_entries)?;
3327 /// // Writes 10,000 entries in ~100ms
3328 /// ```
3329 ///
3330 /// # Important Notes
3331 /// - No schema validation performed
3332 /// - No unique constraint checking
3333 /// - No automatic indexing
3334 /// - Keys must follow "collection:id" format for proper grouping
3335 /// - Values are raw bytes - you handle serialization
3336 /// - Use `batch_insert()` for validated document inserts
3337 ///
3338 /// # When to Use
3339 /// - Maximum write performance needed
3340 /// - Custom serialization formats (bincode, msgpack, etc.)
3341 /// - Cache population
3342 /// - Low-level database operations
3343 /// - You're bypassing the document model
3344 ///
3345 /// # When NOT to Use
3346 /// - Regular document inserts → Use `batch_insert()` instead
3347 /// - Need validation → Use `batch_insert()` instead
3348 /// - Need indexing → Use `batch_insert()` instead
3349 ///
3350 /// # See Also
3351 /// - `batch_insert()` for validated document batch inserts
3352 /// - `put()` for single key-value writes
3353 pub fn batch_write(&self, pairs: Vec<(String, Vec<u8>)>) -> Result<()> {
3354 // Group pairs by collection name
3355 let mut collections: HashMap<String, Vec<(String, Vec<u8>)>> = HashMap::new();
3356 for (key, value) in &pairs {
3357 if let Some(collection_name) = key.split(':').next() {
3358 collections
3359 .entry(collection_name.to_string())
3360 .or_default()
3361 .push((key.clone(), value.clone()));
3362 }
3363 }
3364
3365 // First, do the batch write to cold storage for all pairs
3366 self.cold.batch_set(pairs)?;
3367
3368 // Then, process each collection for in-memory updates
3369 for (collection_name, batch) in collections {
3370 // --- Optimized Batch Indexing ---
3371
3372 // 1. Get schema once for the entire collection batch
3373 let collection_obj = match self.schema_cache.get(&collection_name) {
3374 Some(cached_schema) => Arc::clone(cached_schema.value()),
3375 None => {
3376 let collection_key = format!("_collection:{}", collection_name);
3377 match self.get(&collection_key)? {
3378 Some(data) => {
3379 let obj: Collection = serde_json::from_slice(&data)?;
3380 let arc_obj = Arc::new(obj);
3381 self.schema_cache
3382 .insert(collection_name.to_string(), Arc::clone(&arc_obj));
3383 arc_obj
3384 }
3385 None => continue,
3386 }
3387 }
3388 };
3389
3390 let indexed_fields: Vec<String> = collection_obj
3391 .fields
3392 .iter()
3393 .filter(|(_, def)| def.indexed || def.unique)
3394 .map(|(name, _)| name.clone())
3395 .collect();
3396
3397 let primary_index = self
3398 .primary_indices
3399 .entry(collection_name.to_string())
3400 .or_default();
3401
3402 for (key, value) in batch {
3403 // 2. Update hot cache
3404 self.hot.set(key.clone(), value.clone(), None);
3405
3406 // 3. Update primary index with metadata only
3407 let location = DiskLocation::new(value.len());
3408 primary_index.insert(key.clone(), location);
3409
3410 // 4. Update secondary indices
3411 if !indexed_fields.is_empty()
3412 && let Ok(doc) = serde_json::from_slice::<Document>(&value) {
3413 for (field, field_value) in doc.data {
3414 if indexed_fields.contains(&field) {
3415 let value_str = match &field_value {
3416 Value::String(s) => s.clone(),
3417 _ => field_value.to_string(),
3418 };
3419 let index_key = format!("{}:{}", collection_name, field);
3420 let secondary_index = self
3421 .secondary_indices
3422 .entry(index_key)
3423 .or_default();
3424
3425 let max_entries = self.config.max_index_entries_per_field;
3426 secondary_index
3427 .entry(value_str)
3428 .and_modify(|doc_ids| {
3429 if doc_ids.len() < max_entries {
3430 doc_ids.push(key.to_string());
3431 }
3432 })
3433 .or_insert_with(|| vec![key.to_string()]);
3434 }
3435 }
3436 }
3437 }
3438 }
3439
3440 Ok(())
3441 }
3442
3443 /// Scan for keys with a specific prefix
3444 pub fn scan_with_prefix(
3445 &self,
3446 prefix: &str,
3447 ) -> impl Iterator<Item = Result<(String, Vec<u8>)>> + '_ {
3448 self.cold.scan_prefix(prefix)
3449 }
3450
3451 /// Get storage efficiency metrics for the database
3452 pub fn get_collection_stats(&self) -> Result<HashMap<String, CollectionStats>> {
3453 let mut stats = HashMap::new();
3454
3455 // Scan all collections
3456 let collections: Vec<String> = self
3457 .cold
3458 .scan()
3459 .filter_map(|r| r.ok())
3460 .map(|(k, _)| k)
3461 .filter(|k| k.starts_with("_collection:"))
3462 .map(|k| k.trim_start_matches("_collection:").to_string())
3463 .collect();
3464
3465 for collection in collections {
3466 // Use primary index for fast stats (count + size from DiskLocation)
3467 let (count, size) = if let Some(index) = self.primary_indices.get(&collection) {
3468 let count = index.len();
3469 // Sum up sizes from DiskLocation metadata (much faster than disk scan)
3470 let size: usize = index.iter().map(|entry| entry.value().size as usize).sum();
3471 (count, size)
3472 } else {
3473 // Fallback: scan from cold storage if index not available
3474 let prefix = format!("{}:", collection);
3475 let count = self.cold.scan_prefix(&prefix).count();
3476 let size: usize = self
3477 .cold
3478 .scan_prefix(&prefix)
3479 .filter_map(|r| r.ok())
3480 .map(|(_, v)| v.len())
3481 .sum();
3482 (count, size)
3483 };
3484
3485 stats.insert(
3486 collection,
3487 CollectionStats {
3488 count,
3489 size_bytes: size,
3490 avg_doc_size: if count > 0 { size / count } else { 0 },
3491 },
3492 );
3493 }
3494
3495 Ok(stats)
3496 }
3497
3498 /// Search for documents by exact value using an index
3499 ///
3500 /// This method performs a fast lookup using a pre-created index
3501 pub fn search_by_value(
3502 &self,
3503 collection: &str,
3504 field: &str,
3505 value: &Value,
3506 ) -> Result<Vec<Document>> {
3507 let index_key = format!("_index:{}:{}", collection, field);
3508
3509 if let Some(index_data) = self.get(&index_key)? {
3510 let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
3511 let index = Index::new(index_def);
3512
3513 // Use the previously unused search method
3514 if let Some(doc_ids) = index.search(value) {
3515 // Load the documents by ID
3516 let mut docs = Vec::new();
3517 for id in doc_ids {
3518 if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
3519 let doc: Document = serde_json::from_slice(&doc_data)?;
3520 docs.push(doc);
3521 }
3522 }
3523 return Ok(docs);
3524 }
3525 }
3526
3527 // Return empty result if no index or no matches
3528 Ok(Vec::new())
3529 }
3530
3531 /// Perform a full-text search on an indexed text field
3532 ///
3533 /// This provides more advanced text search capabilities including
3534 /// relevance ranking of results
3535 pub fn full_text_search(
3536 &self,
3537 collection: &str,
3538 field: &str,
3539 query: &str,
3540 ) -> Result<Vec<Document>> {
3541 let index_key = format!("_index:{}:{}", collection, field);
3542
3543 if let Some(index_data) = self.get(&index_key)? {
3544 let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
3545
3546 // Ensure this is a full-text index
3547 if !matches!(index_def.index_type, IndexType::FullText) {
3548 return Err(AuroraError::InvalidOperation(format!(
3549 "Field '{}' is not indexed as full-text",
3550 field
3551 )));
3552 }
3553
3554 let index = Index::new(index_def);
3555
3556 // Use the previously unused search_text method
3557 if let Some(doc_id_scores) = index.search_text(query) {
3558 // Load the documents by ID, preserving score order
3559 let mut docs = Vec::new();
3560 for (id, _score) in doc_id_scores {
3561 if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
3562 let doc: Document = serde_json::from_slice(&doc_data)?;
3563 docs.push(doc);
3564 }
3565 }
3566 return Ok(docs);
3567 }
3568 }
3569
3570 // Return empty result if no index or no matches
3571 Ok(Vec::new())
3572 }
3573
3574 /// Create a full-text search index on a text field
3575 pub fn create_text_index(
3576 &self,
3577 collection: &str,
3578 field: &str,
3579 _enable_stop_words: bool,
3580 ) -> Result<()> {
3581 // Check if collection exists
3582 if self.get(&format!("_collection:{}", collection))?.is_none() {
3583 return Err(AuroraError::CollectionNotFound(collection.to_string()));
3584 }
3585
3586 // Create index definition
3587 let index_def = IndexDefinition {
3588 name: format!("{}_{}_fulltext", collection, field),
3589 collection: collection.to_string(),
3590 fields: vec![field.to_string()],
3591 index_type: IndexType::FullText,
3592 unique: false,
3593 };
3594
3595 // Store index definition
3596 let index_key = format!("_index:{}:{}", collection, field);
3597 self.put(index_key, serde_json::to_vec(&index_def)?, None)?;
3598
3599 // Create the actual index
3600 let index = Index::new(index_def);
3601
3602 // Index all existing documents in the collection
3603 let prefix = format!("{}:", collection);
3604 for (_, data) in self.cold.scan_prefix(&prefix).flatten() {
3605 let doc: Document = serde_json::from_slice(&data)?;
3606 index.insert(&doc)?;
3607 }
3608
3609 Ok(())
3610 }
3611
3612 pub async fn execute_simple_query(
3613 &self,
3614 builder: &SimpleQueryBuilder,
3615 ) -> Result<Vec<Document>> {
3616 // Ensure indices are initialized
3617 self.ensure_indices_initialized().await?;
3618
3619 // A place to store the IDs of the documents we need to fetch
3620 let mut doc_ids_to_load: Option<Vec<String>> = None;
3621
3622 // --- The "Query Planner" ---
3623 // Smart heuristic: For range queries with small LIMITs, full scan can be faster
3624 // than collecting millions of IDs from secondary index
3625 let use_index_for_range = if let Some(limit) = builder.limit {
3626 // If limit is small (< 1000), prefer full scan for range queries
3627 // The secondary index would scan all entries anyway, might as well
3628 // scan documents directly and benefit from early termination
3629 limit >= 1000
3630 } else {
3631 // No limit? Index might still help if result set is small
3632 true
3633 };
3634
3635 // Look for an opportunity to use an index
3636 for (_filter_idx, filter) in builder.filters.iter().enumerate() {
3637 match filter {
3638 Filter::Eq(field, value) => {
3639 let index_key = format!("{}:{}", &builder.collection, field);
3640
3641 // Do we have a secondary index for this field?
3642 if let Some(index) = self.secondary_indices.get(&index_key) {
3643 // Yes! Let's use it.
3644 if let Some(matching_ids) = index.get(&value.to_string()) {
3645 doc_ids_to_load = Some(matching_ids.clone());
3646 break; // Stop searching for other indexes for now
3647 }
3648 }
3649 }
3650 Filter::Gt(field, value)
3651 | Filter::Gte(field, value)
3652 | Filter::Lt(field, value)
3653 | Filter::Lte(field, value) => {
3654 // Skip index for range queries with small LIMITs (see query planner heuristic above)
3655 if !use_index_for_range {
3656 continue;
3657 }
3658
3659 let index_key = format!("{}:{}", &builder.collection, field);
3660
3661 // Do we have a secondary index for this field?
3662 if let Some(index) = self.secondary_indices.get(&index_key) {
3663 // For range queries, we need to scan through the index values
3664 let mut matching_ids = Vec::new();
3665
3666 for entry in index.iter() {
3667 let index_value_str = entry.key();
3668
3669 // Try to parse the index value to compare with our filter value
3670 if let Ok(index_value) =
3671 self.parse_value_from_string(index_value_str, value)
3672 {
3673 let matches = match filter {
3674 Filter::Gt(_, filter_val) => index_value > *filter_val,
3675 Filter::Gte(_, filter_val) => index_value >= *filter_val,
3676 Filter::Lt(_, filter_val) => index_value < *filter_val,
3677 Filter::Lte(_, filter_val) => index_value <= *filter_val,
3678 _ => false,
3679 };
3680
3681 if matches {
3682 matching_ids.extend(entry.value().clone());
3683 }
3684 }
3685 }
3686
3687 if !matching_ids.is_empty() {
3688 doc_ids_to_load = Some(matching_ids);
3689 break;
3690 }
3691 }
3692 }
3693 Filter::Contains(field, search_term) => {
3694 let index_key = format!("{}:{}", &builder.collection, field);
3695
3696 // Do we have a secondary index for this field?
3697 if let Some(index) = self.secondary_indices.get(&index_key) {
3698 let mut matching_ids = Vec::new();
3699
3700 for entry in index.iter() {
3701 let index_value_str = entry.key();
3702
3703 // Check if this indexed value contains our search term
3704 if index_value_str
3705 .to_lowercase()
3706 .contains(&search_term.to_lowercase())
3707 {
3708 matching_ids.extend(entry.value().clone());
3709 }
3710 }
3711
3712 if !matching_ids.is_empty() {
3713 // Remove duplicates since a document could match multiple indexed values
3714 matching_ids.sort();
3715 matching_ids.dedup();
3716
3717 doc_ids_to_load = Some(matching_ids);
3718 break;
3719 }
3720 }
3721 }
3722 }
3723 }
3724
3725 let mut final_docs: Vec<Document>;
3726
3727 if let Some(ids) = doc_ids_to_load {
3728 // Index path
3729 use std::io::Write;
3730 if let Ok(mut file) = std::fs::OpenOptions::new()
3731 .create(true)
3732 .append(true)
3733 .open("/tmp/aurora_query_stats.log") {
3734 let _ = writeln!(file, "[INDEX PATH] IDs to load: {} | Collection: {}",
3735 ids.len(), builder.collection);
3736 }
3737
3738 final_docs = Vec::with_capacity(ids.len());
3739
3740 for id in ids {
3741 let doc_key = format!("{}:{}", &builder.collection, id);
3742 if let Some(data) = self.get(&doc_key)?
3743 && let Ok(doc) = serde_json::from_slice::<Document>(&data) {
3744 final_docs.push(doc);
3745 }
3746 }
3747 } else {
3748 // --- Path 2: Full Collection Scan with Early Termination ---
3749
3750 // Optimization: If we have a LIMIT but no ORDER BY, we can stop scanning
3751 // as soon as we have enough matching documents
3752 let early_termination_target = if builder.order_by.is_none() {
3753 builder.limit.map(|l| l + builder.offset.unwrap_or(0))
3754 } else {
3755 // With ORDER BY, we need all matching docs to sort correctly
3756 None
3757 };
3758
3759 // Smart scan with early termination support
3760 final_docs = Vec::new();
3761 let mut scan_stats = (0usize, 0usize, 0usize); // (keys_scanned, docs_fetched, matches_found)
3762
3763 if let Some(index) = self.primary_indices.get(&builder.collection) {
3764 for entry in index.iter() {
3765 let key = entry.key();
3766 scan_stats.0 += 1; // keys scanned
3767
3768 // Early termination check
3769 if let Some(target) = early_termination_target {
3770 if final_docs.len() >= target {
3771 break; // We have enough documents!
3772 }
3773 }
3774
3775 // Fetch and filter document
3776 if let Some(data) = self.get(key)? {
3777 scan_stats.1 += 1; // docs fetched
3778
3779 if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
3780 // Apply all filters
3781 let matches_all_filters = builder.filters.iter().all(|filter| {
3782 match filter {
3783 Filter::Eq(field, value) => doc.data.get(field) == Some(value),
3784 Filter::Gt(field, value) => doc.data.get(field).is_some_and(|v| v > value),
3785 Filter::Gte(field, value) => doc.data.get(field).is_some_and(|v| v >= value),
3786 Filter::Lt(field, value) => doc.data.get(field).is_some_and(|v| v < value),
3787 Filter::Lte(field, value) => doc.data.get(field).is_some_and(|v| v <= value),
3788 Filter::Contains(field, value_str) => {
3789 doc.data.get(field).is_some_and(|v| match v {
3790 Value::String(s) => s.contains(value_str),
3791 Value::Array(arr) => arr.contains(&Value::String(value_str.clone())),
3792 _ => false,
3793 })
3794 }
3795 }
3796 });
3797
3798 if matches_all_filters {
3799 scan_stats.2 += 1; // matches found
3800 final_docs.push(doc);
3801 }
3802 }
3803 }
3804 }
3805
3806 // Debug logging for query performance analysis
3807 use std::io::Write;
3808 if let Ok(mut file) = std::fs::OpenOptions::new()
3809 .create(true)
3810 .append(true)
3811 .open("/tmp/aurora_query_stats.log") {
3812 let _ = writeln!(file, "[SCAN PATH] Scanned: {} keys | Fetched: {} docs | Matched: {} | Collection: {}",
3813 scan_stats.0, scan_stats.1, scan_stats.2, builder.collection);
3814 }
3815 } else {
3816 // Fallback: scan from cold storage if index not initialized
3817 final_docs = self.get_all_collection(&builder.collection).await?;
3818
3819 // Apply filters
3820 final_docs.retain(|doc| {
3821 builder.filters.iter().all(|filter| {
3822 match filter {
3823 Filter::Eq(field, value) => doc.data.get(field) == Some(value),
3824 Filter::Gt(field, value) => doc.data.get(field).is_some_and(|v| v > value),
3825 Filter::Gte(field, value) => doc.data.get(field).is_some_and(|v| v >= value),
3826 Filter::Lt(field, value) => doc.data.get(field).is_some_and(|v| v < value),
3827 Filter::Lte(field, value) => doc.data.get(field).is_some_and(|v| v <= value),
3828 Filter::Contains(field, value_str) => {
3829 doc.data.get(field).is_some_and(|v| match v {
3830 Value::String(s) => s.contains(value_str),
3831 Value::Array(arr) => arr.contains(&Value::String(value_str.clone())),
3832 _ => false,
3833 })
3834 }
3835 }
3836 })
3837 });
3838 }
3839 }
3840
3841 // Apply ordering
3842 if let Some((field, ascending)) = &builder.order_by {
3843 final_docs.sort_by(|a, b| match (a.data.get(field), b.data.get(field)) {
3844 (Some(v1), Some(v2)) => {
3845 let cmp = v1.cmp(v2);
3846 if *ascending {
3847 cmp
3848 } else {
3849 cmp.reverse()
3850 }
3851 }
3852 (None, Some(_)) => std::cmp::Ordering::Less,
3853 (Some(_), None) => std::cmp::Ordering::Greater,
3854 (None, None) => std::cmp::Ordering::Equal,
3855 });
3856 }
3857
3858 // Apply offset and limit
3859 let start = builder.offset.unwrap_or(0);
3860 let end = builder
3861 .limit
3862 .map(|l| start.saturating_add(l))
3863 .unwrap_or(final_docs.len());
3864
3865 let end = end.min(final_docs.len());
3866 Ok(final_docs.get(start..end).unwrap_or(&[]).to_vec())
3867 }
3868
3869 /// Helper method to parse a string value back to a Value for comparison
3870 fn parse_value_from_string(&self, value_str: &str, reference_value: &Value) -> Result<Value> {
3871 match reference_value {
3872 Value::Int(_) => {
3873 if let Ok(i) = value_str.parse::<i64>() {
3874 Ok(Value::Int(i))
3875 } else {
3876 Err(AuroraError::InvalidOperation("Failed to parse int".into()))
3877 }
3878 }
3879 Value::Float(_) => {
3880 if let Ok(f) = value_str.parse::<f64>() {
3881 Ok(Value::Float(f))
3882 } else {
3883 Err(AuroraError::InvalidOperation(
3884 "Failed to parse float".into(),
3885 ))
3886 }
3887 }
3888 Value::String(_) => Ok(Value::String(value_str.to_string())),
3889 _ => Ok(Value::String(value_str.to_string())),
3890 }
3891 }
3892
3893 pub async fn execute_dynamic_query(
3894 &self,
3895 collection: &str,
3896 payload: &QueryPayload,
3897 ) -> Result<Vec<Document>> {
3898 let mut docs = self.get_all_collection(collection).await?;
3899
3900 // 1. Apply Filters
3901 if let Some(filters) = &payload.filters {
3902 docs.retain(|doc| {
3903 filters.iter().all(|filter| {
3904 doc.data
3905 .get(&filter.field)
3906 .is_some_and(|doc_val| check_filter(doc_val, filter))
3907 })
3908 });
3909 }
3910
3911 // 2. Apply Sorting
3912 if let Some(sort_options) = &payload.sort {
3913 docs.sort_by(|a, b| {
3914 let a_val = a.data.get(&sort_options.field);
3915 let b_val = b.data.get(&sort_options.field);
3916 let ordering = a_val
3917 .partial_cmp(&b_val)
3918 .unwrap_or(std::cmp::Ordering::Equal);
3919 if sort_options.ascending {
3920 ordering
3921 } else {
3922 ordering.reverse()
3923 }
3924 });
3925 }
3926
3927 // 3. Apply Pagination
3928 if let Some(offset) = payload.offset {
3929 docs = docs.into_iter().skip(offset).collect();
3930 }
3931 if let Some(limit) = payload.limit {
3932 docs = docs.into_iter().take(limit).collect();
3933 }
3934
3935 // 4. Apply Field Selection (Projection)
3936 if let Some(select_fields) = &payload.select
3937 && !select_fields.is_empty() {
3938 docs = docs
3939 .into_iter()
3940 .map(|mut doc| {
3941 doc.data.retain(|key, _| select_fields.contains(key));
3942 doc
3943 })
3944 .collect();
3945 }
3946
3947 Ok(docs)
3948 }
3949
3950 pub async fn process_network_request(
3951 &self,
3952 request: crate::network::protocol::Request,
3953 ) -> crate::network::protocol::Response {
3954 use crate::network::protocol::Response;
3955
3956 match request {
3957 crate::network::protocol::Request::Get(key) => match self.get(&key) {
3958 Ok(value) => Response::Success(value),
3959 Err(e) => Response::Error(e.to_string()),
3960 },
3961 crate::network::protocol::Request::Put(key, value) => {
3962 match self.put(key, value, None) {
3963 Ok(_) => Response::Done,
3964 Err(e) => Response::Error(e.to_string()),
3965 }
3966 }
3967 crate::network::protocol::Request::Delete(key) => match self.delete(&key).await {
3968 Ok(_) => Response::Done,
3969 Err(e) => Response::Error(e.to_string()),
3970 },
3971 crate::network::protocol::Request::NewCollection { name, fields } => {
3972 let fields_for_db: Vec<(String, crate::types::FieldType, bool)> = fields
3973 .iter()
3974 .map(|(name, ft, unique)| (name.clone(), ft.clone(), *unique))
3975 .collect();
3976
3977 match self.new_collection(&name, fields_for_db) {
3978 Ok(_) => Response::Done,
3979 Err(e) => Response::Error(e.to_string()),
3980 }
3981 }
3982 crate::network::protocol::Request::Insert { collection, data } => {
3983 match self.insert_map(&collection, data).await {
3984 Ok(id) => Response::Message(id),
3985 Err(e) => Response::Error(e.to_string()),
3986 }
3987 }
3988 crate::network::protocol::Request::GetDocument { collection, id } => {
3989 match self.get_document(&collection, &id) {
3990 Ok(doc) => Response::Document(doc),
3991 Err(e) => Response::Error(e.to_string()),
3992 }
3993 }
3994 crate::network::protocol::Request::Query(builder) => {
3995 match self.execute_simple_query(&builder).await {
3996 Ok(docs) => Response::Documents(docs),
3997 Err(e) => Response::Error(e.to_string()),
3998 }
3999 }
4000 crate::network::protocol::Request::BeginTransaction => {
4001 let tx_id = self.begin_transaction();
4002 Response::TransactionId(tx_id.as_u64())
4003 }
4004 crate::network::protocol::Request::CommitTransaction(tx_id_u64) => {
4005 let tx_id = crate::transaction::TransactionId::from_u64(tx_id_u64);
4006 match self.commit_transaction(tx_id) {
4007 Ok(_) => Response::Done,
4008 Err(e) => Response::Error(e.to_string()),
4009 }
4010 }
4011 crate::network::protocol::Request::RollbackTransaction(tx_id_u64) => {
4012 let tx_id = crate::transaction::TransactionId::from_u64(tx_id_u64);
4013 match self.rollback_transaction(tx_id) {
4014 Ok(_) => Response::Done,
4015 Err(e) => Response::Error(e.to_string()),
4016 }
4017 }
4018 }
4019 }
4020
4021 /// Create indices for commonly queried fields automatically
4022 ///
4023 /// This is a convenience method that creates indices for fields that are
4024 /// likely to be queried frequently, improving performance.
4025 ///
4026 /// # Arguments
4027 /// * `collection` - Name of the collection
4028 /// * `fields` - List of field names to create indices for
4029 ///
4030 /// # Examples
4031 /// ```
4032 /// // Create indices for commonly queried fields
4033 /// db.create_indices("users", &["email", "status", "created_at"]).await?;
4034 /// ```
4035 pub async fn create_indices(&self, collection: &str, fields: &[&str]) -> Result<()> {
4036 for field in fields {
4037 if let Err(e) = self.create_index(collection, field).await {
4038 eprintln!(
4039 "Warning: Failed to create index for {}.{}: {}",
4040 collection, field, e
4041 );
4042 } else {
4043 println!("Created index for {}.{}", collection, field);
4044 }
4045 }
4046 Ok(())
4047 }
4048
4049 /// Get index statistics for a collection
4050 ///
4051 /// This helps understand which indices exist and how effective they are.
4052 pub fn get_index_stats(&self, collection: &str) -> HashMap<String, IndexStats> {
4053 let mut stats = HashMap::new();
4054
4055 for entry in self.secondary_indices.iter() {
4056 let key = entry.key();
4057 if key.starts_with(&format!("{}:", collection)) {
4058 let field = key.split(':').nth(1).unwrap_or("unknown");
4059 let index = entry.value();
4060
4061 let unique_values = index.len();
4062 let total_documents: usize = index.iter().map(|entry| entry.value().len()).sum();
4063
4064 stats.insert(
4065 field.to_string(),
4066 IndexStats {
4067 unique_values,
4068 total_documents,
4069 avg_docs_per_value: if unique_values > 0 {
4070 total_documents / unique_values
4071 } else {
4072 0
4073 },
4074 },
4075 );
4076 }
4077 }
4078
4079 stats
4080 }
4081
4082 /// Optimize a collection by creating indices for frequently filtered fields
4083 ///
4084 /// This analyzes common query patterns and suggests/creates optimal indices.
4085 pub async fn optimize_collection(&self, collection: &str) -> Result<()> {
4086 if let Ok(collection_def) = self.get_collection_definition(collection) {
4087 let field_names: Vec<&str> = collection_def.fields.keys().map(|s| s.as_str()).collect();
4088 self.create_indices(collection, &field_names).await?;
4089 }
4090
4091 Ok(())
4092 }
4093
4094 // Helper method to get unique fields from a collection
4095 fn get_unique_fields(&self, collection: &Collection) -> Vec<String> {
4096 collection
4097 .fields
4098 .iter()
4099 .filter(|(_, def)| def.unique)
4100 .map(|(name, _)| name.clone())
4101 .collect()
4102 }
4103
4104 // Update the validation method to use the helper
4105 async fn validate_unique_constraints(
4106 &self,
4107 collection: &str,
4108 data: &HashMap<String, Value>,
4109 ) -> Result<()> {
4110 self.ensure_indices_initialized().await?;
4111 let collection_def = self.get_collection_definition(collection)?;
4112 let unique_fields = self.get_unique_fields(&collection_def);
4113
4114 for unique_field in &unique_fields {
4115 if let Some(value) = data.get(unique_field) {
4116 let index_key = format!("{}:{}", collection, unique_field);
4117 if let Some(index) = self.secondary_indices.get(&index_key) {
4118 // Get the raw string value without JSON formatting
4119 let value_str = match value {
4120 Value::String(s) => s.clone(),
4121 _ => value.to_string(),
4122 };
4123 if index.contains_key(&value_str) {
4124 return Err(AuroraError::UniqueConstraintViolation(
4125 unique_field.clone(),
4126 value_str,
4127 ));
4128 }
4129 }
4130 }
4131 }
4132 Ok(())
4133 }
4134
4135 /// Validate unique constraints excluding a specific document ID (for updates)
4136 async fn validate_unique_constraints_excluding(
4137 &self,
4138 collection: &str,
4139 data: &HashMap<String, Value>,
4140 exclude_id: &str,
4141 ) -> Result<()> {
4142 self.ensure_indices_initialized().await?;
4143 let collection_def = self.get_collection_definition(collection)?;
4144 let unique_fields = self.get_unique_fields(&collection_def);
4145
4146 for unique_field in &unique_fields {
4147 if let Some(value) = data.get(unique_field) {
4148 let index_key = format!("{}:{}", collection, unique_field);
4149 if let Some(index) = self.secondary_indices.get(&index_key) {
4150 // Get the raw string value without JSON formatting
4151 let value_str = match value {
4152 Value::String(s) => s.clone(),
4153 _ => value.to_string(),
4154 };
4155 if let Some(doc_ids) = index.get(&value_str) {
4156 // Check if any document other than the excluded one has this value
4157 let exclude_key = format!("{}:{}", collection, exclude_id);
4158 for doc_key in doc_ids.value() {
4159 if doc_key != &exclude_key {
4160 return Err(AuroraError::UniqueConstraintViolation(
4161 unique_field.clone(),
4162 value_str,
4163 ));
4164 }
4165 }
4166 }
4167 }
4168 }
4169 }
4170 Ok(())
4171 }
4172}
4173
4174impl Drop for Aurora {
4175 fn drop(&mut self) {
4176 // Signal checkpoint task to shutdown gracefully
4177 if let Some(ref shutdown_tx) = self.checkpoint_shutdown {
4178 let _ = shutdown_tx.send(());
4179 }
4180 // Signal compaction task to shutdown gracefully
4181 if let Some(ref shutdown_tx) = self.compaction_shutdown {
4182 let _ = shutdown_tx.send(());
4183 }
4184 }
4185}
4186
4187fn check_filter(doc_val: &Value, filter: &HttpFilter) -> bool {
4188 let filter_val = match json_to_value(&filter.value) {
4189 Ok(v) => v,
4190 Err(_) => return false,
4191 };
4192
4193 match filter.operator {
4194 FilterOperator::Eq => doc_val == &filter_val,
4195 FilterOperator::Ne => doc_val != &filter_val,
4196 FilterOperator::Gt => doc_val > &filter_val,
4197 FilterOperator::Gte => doc_val >= &filter_val,
4198 FilterOperator::Lt => doc_val < &filter_val,
4199 FilterOperator::Lte => doc_val <= &filter_val,
4200 FilterOperator::Contains => match (doc_val, &filter_val) {
4201 (Value::String(s), Value::String(fv)) => s.contains(fv),
4202 (Value::Array(arr), _) => arr.contains(&filter_val),
4203 _ => false,
4204 },
4205 }
4206}
4207
4208/// Results of importing a document
4209enum ImportResult {
4210 Imported,
4211 Skipped,
4212}
4213
4214/// Statistics from an import operation
4215#[derive(Debug, Default)]
4216pub struct ImportStats {
4217 /// Number of documents successfully imported
4218 pub imported: usize,
4219 /// Number of documents skipped (usually because they already exist)
4220 pub skipped: usize,
4221 /// Number of documents that failed to import
4222 pub failed: usize,
4223}
4224
4225/// Statistics for a specific collection
4226#[derive(Debug)]
4227pub struct CollectionStats {
4228 /// Number of documents in the collection
4229 pub count: usize,
4230 /// Total size of the collection in bytes
4231 pub size_bytes: usize,
4232 /// Average document size in bytes
4233 pub avg_doc_size: usize,
4234}
4235
4236/// Statistics for an index
4237#[derive(Debug)]
4238pub struct IndexStats {
4239 /// Number of unique values in the index
4240 pub unique_values: usize,
4241 /// Total number of documents covered by the index
4242 pub total_documents: usize,
4243 /// Average number of documents per unique value
4244 pub avg_docs_per_value: usize,
4245}
4246
4247/// Combined database statistics
4248#[derive(Debug)]
4249pub struct DatabaseStats {
4250 /// Hot cache statistics
4251 pub hot_stats: crate::storage::hot::CacheStats,
4252 /// Cold storage statistics
4253 pub cold_stats: crate::storage::cold::ColdStoreStats,
4254 /// Estimated total database size in bytes
4255 pub estimated_size: u64,
4256 /// Statistics for each collection
4257 pub collections: HashMap<String, CollectionStats>,
4258}
4259
4260#[cfg(test)]
4261mod tests {
4262 use super::*;
4263 use tempfile::tempdir;
4264
4265 #[tokio::test]
4266 async fn test_basic_operations() -> Result<()> {
4267 let temp_dir = tempdir()?;
4268 let db_path = temp_dir.path().join("test.aurora");
4269 let db = Aurora::open(db_path.to_str().unwrap())?;
4270
4271 // Test collection creation
4272 db.new_collection(
4273 "users",
4274 vec![
4275 ("name", FieldType::String, false),
4276 ("age", FieldType::Int, false),
4277 ("email", FieldType::String, true),
4278 ],
4279 )?;
4280
4281 // Test document insertion
4282 let doc_id = db
4283 .insert_into(
4284 "users",
4285 vec![
4286 ("name", Value::String("John Doe".to_string())),
4287 ("age", Value::Int(30)),
4288 ("email", Value::String("john@example.com".to_string())),
4289 ],
4290 )
4291 .await?;
4292
4293 // Test document retrieval
4294 let doc = db.get_document("users", &doc_id)?.unwrap();
4295 assert_eq!(
4296 doc.data.get("name").unwrap(),
4297 &Value::String("John Doe".to_string())
4298 );
4299 assert_eq!(doc.data.get("age").unwrap(), &Value::Int(30));
4300
4301 Ok(())
4302 }
4303
4304 #[tokio::test]
4305 async fn test_transactions() -> Result<()> {
4306 let temp_dir = tempdir()?;
4307 let db_path = temp_dir.path().join("test.aurora");
4308 let db = Aurora::open(db_path.to_str().unwrap())?;
4309
4310 // Create collection
4311 db.new_collection("test", vec![("field", FieldType::String, false)])?;
4312
4313 // Start transaction
4314 let tx_id = db.begin_transaction();
4315
4316 // Insert document
4317 let doc_id = db
4318 .insert_into("test", vec![("field", Value::String("value".to_string()))])
4319 .await?;
4320
4321 // Commit transaction
4322 db.commit_transaction(tx_id)?;
4323
4324 // Verify document exists
4325 let doc = db.get_document("test", &doc_id)?.unwrap();
4326 assert_eq!(
4327 doc.data.get("field").unwrap(),
4328 &Value::String("value".to_string())
4329 );
4330
4331 Ok(())
4332 }
4333
4334 #[tokio::test]
4335 async fn test_query_operations() -> Result<()> {
4336 let temp_dir = tempdir()?;
4337 let db_path = temp_dir.path().join("test.aurora");
4338 let db = Aurora::open(db_path.to_str().unwrap())?;
4339
4340 // Test collection creation
4341 db.new_collection(
4342 "books",
4343 vec![
4344 ("title", FieldType::String, false),
4345 ("author", FieldType::String, false),
4346 ("year", FieldType::Int, false),
4347 ],
4348 )?;
4349
4350 // Test document insertion
4351 db.insert_into(
4352 "books",
4353 vec![
4354 ("title", Value::String("Book 1".to_string())),
4355 ("author", Value::String("Author 1".to_string())),
4356 ("year", Value::Int(2020)),
4357 ],
4358 )
4359 .await?;
4360
4361 db.insert_into(
4362 "books",
4363 vec![
4364 ("title", Value::String("Book 2".to_string())),
4365 ("author", Value::String("Author 2".to_string())),
4366 ("year", Value::Int(2021)),
4367 ],
4368 )
4369 .await?;
4370
4371 // Test query
4372 let results = db
4373 .query("books")
4374 .filter(|f| f.gt("year", Value::Int(2019)))
4375 .order_by("year", true)
4376 .collect()
4377 .await?;
4378
4379 assert_eq!(results.len(), 2);
4380 assert!(results[0].data.get("year").unwrap() < results[1].data.get("year").unwrap());
4381
4382 Ok(())
4383 }
4384
4385 #[tokio::test]
4386 async fn test_blob_operations() -> Result<()> {
4387 let temp_dir = tempdir()?;
4388 let db_path = temp_dir.path().join("test.aurora");
4389 let db = Aurora::open(db_path.to_str().unwrap())?;
4390
4391 // Create test file
4392 let file_path = temp_dir.path().join("test.txt");
4393 std::fs::write(&file_path, b"Hello, World!")?;
4394
4395 // Test blob storage
4396 db.put_blob("test:blob".to_string(), &file_path).await?;
4397
4398 // Verify blob exists
4399 let data = db.get_data_by_pattern("test:blob")?;
4400 assert_eq!(data.len(), 1);
4401 match &data[0].1 {
4402 DataInfo::Blob { size } => assert_eq!(*size, 13 + 5), // content + "BLOB:" prefix
4403 _ => panic!("Expected Blob type"),
4404 }
4405
4406 Ok(())
4407 }
4408
4409 #[tokio::test]
4410 async fn test_blob_size_limit() -> Result<()> {
4411 let temp_dir = tempdir()?;
4412 let db_path = temp_dir.path().join("test.aurora");
4413 let db = Aurora::open(db_path.to_str().unwrap())?;
4414
4415 // Create a test file that's too large (201MB)
4416 let large_file_path = temp_dir.path().join("large_file.bin");
4417 let large_data = vec![0u8; 201 * 1024 * 1024];
4418 std::fs::write(&large_file_path, &large_data)?;
4419
4420 // Attempt to store the large file
4421 let result = db
4422 .put_blob("test:large_blob".to_string(), &large_file_path)
4423 .await;
4424
4425 assert!(result.is_err());
4426 assert!(matches!(
4427 result.unwrap_err(),
4428 AuroraError::InvalidOperation(_)
4429 ));
4430
4431 Ok(())
4432 }
4433
4434 #[tokio::test]
4435 async fn test_unique_constraints() -> Result<()> {
4436 let temp_dir = tempdir()?;
4437 let db_path = temp_dir.path().join("test.aurora");
4438 let db = Aurora::open(db_path.to_str().unwrap())?;
4439
4440 // Create collection with unique email field
4441 db.new_collection(
4442 "users",
4443 vec![
4444 ("name", FieldType::String, false),
4445 ("email", FieldType::String, true), // unique field
4446 ("age", FieldType::Int, false),
4447 ],
4448 )?;
4449
4450 // Insert first document
4451 let _doc_id1 = db
4452 .insert_into(
4453 "users",
4454 vec![
4455 ("name", Value::String("John Doe".to_string())),
4456 ("email", Value::String("john@example.com".to_string())),
4457 ("age", Value::Int(30)),
4458 ],
4459 )
4460 .await?;
4461
4462 // Try to insert second document with same email - should fail
4463 let result = db
4464 .insert_into(
4465 "users",
4466 vec![
4467 ("name", Value::String("Jane Doe".to_string())),
4468 ("email", Value::String("john@example.com".to_string())), // duplicate email
4469 ("age", Value::Int(25)),
4470 ],
4471 )
4472 .await;
4473
4474 assert!(result.is_err());
4475 if let Err(AuroraError::UniqueConstraintViolation(field, value)) = result {
4476 assert_eq!(field, "email");
4477 assert_eq!(value, "john@example.com");
4478 } else {
4479 panic!("Expected UniqueConstraintViolation error");
4480 }
4481
4482 // Test upsert with unique constraint
4483 // Should succeed for new document
4484 let _doc_id2 = db
4485 .upsert(
4486 "users",
4487 "user2",
4488 vec![
4489 ("name", Value::String("Alice Smith".to_string())),
4490 ("email", Value::String("alice@example.com".to_string())),
4491 ("age", Value::Int(28)),
4492 ],
4493 )
4494 .await?;
4495
4496 // Should fail when trying to upsert with duplicate email
4497 let result = db
4498 .upsert(
4499 "users",
4500 "user3",
4501 vec![
4502 ("name", Value::String("Bob Wilson".to_string())),
4503 ("email", Value::String("alice@example.com".to_string())), // duplicate
4504 ("age", Value::Int(35)),
4505 ],
4506 )
4507 .await;
4508
4509 assert!(result.is_err());
4510
4511 // Should succeed when updating existing document with same email (no change)
4512 let result = db
4513 .upsert(
4514 "users",
4515 "user2",
4516 vec![
4517 ("name", Value::String("Alice Updated".to_string())),
4518 ("email", Value::String("alice@example.com".to_string())), // same email, same doc
4519 ("age", Value::Int(29)),
4520 ],
4521 )
4522 .await;
4523
4524 assert!(result.is_ok());
4525
4526 Ok(())
4527 }
4528}