aurora_db/db.rs
1//! # Aurora Core Engine
2//!
3//! This module contains the `Aurora` struct, which is the primary handle for
4//! interacting with the database. It coordinates the tiered storage system,
5//! index management, transaction lifecycle, and AQL execution.
6//!
7//! ## Key Responsibilities
8//! - **Collection Management**: Creating and deleting collections with defined schemas.
9//! - **CRUD Operations**: High-performance insertion, retrieval, update, and deletion of documents.
10//! - **Transaction Coordination**: Managing ACID-compliant transaction buffers and commits.
11//! - **Index Coordination**: Maintaining primary and secondary indices (including Roaring Bitmaps).
12//! - **PubSub Integration**: Publishing change events for real-time listeners.
13//!
14//! ## Thread Safety
15//! `Aurora` is designed to be wrapped in an `Arc` and shared across multiple
16//! threads or async tasks. Internally, it uses highly concurrent data structures
17//! (like `DashMap`) and fine-grained locking to ensure safe parallel access.
18
19use crate::error::{AqlError, ErrorCode, Result};
20use crate::index::{Index, IndexDefinition, IndexType};
21use crate::network::http_models::{
22 Filter as HttpFilter, FilterOperator, QueryPayload, json_to_value,
23};
24use crate::parser;
25use crate::parser::executor::{ExecutionOptions, ExecutionPlan, ExecutionResult};
26use crate::query::FilterBuilder;
27use crate::query::{Filter, QueryBuilder, SearchBuilder, SimpleQueryBuilder};
28use crate::storage::{ColdStore, HotStore, IndexStorage, WriteBuffer};
29use crate::types::{
30 AuroraConfig, Collection, Document, DurabilityMode, FieldDefinition, FieldType, Value,
31};
32use crate::wal::{Operation, WriteAheadLog};
33use roaring::RoaringBitmap;
34
35use dashmap::DashMap;
36use moka::sync::Cache as MokaCache;
37use serde_json::Value as JsonValue;
38use serde_json::from_str;
39use std::collections::HashMap;
40use std::fmt;
41use std::fs::File as StdFile;
42use std::path::{Path, PathBuf};
43use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
44use std::sync::{Arc, RwLock};
45use std::time::Duration;
46use tokio::fs::File;
47use tokio::fs::read_to_string;
48use tokio::io::AsyncReadExt;
49use tokio::sync::OnceCell;
50use uuid::Uuid;
51
52// Disk location metadata for primary index
53// Instead of storing full Vec<u8> values, we store minimal metadata
54#[derive(Debug, Clone, Copy)]
55pub(crate) struct DiskLocation {
56 size: u32, // Size in bytes (useful for statistics)
57}
58
59impl DiskLocation {
60 fn new(size: usize) -> Self {
61 Self { size: size as u32 }
62 }
63}
64
65// Index types for faster lookups
66type PrimaryIndex = DashMap<String, DiskLocation>;
67/// Managed secondary index using adaptive storage modes (Single -> List -> Bitmap)
68type SecondaryIndex = DashMap<String, Arc<RwLock<IndexStorage>>>;
69
70// Move DataInfo enum outside impl block
71#[derive(Debug)]
72pub enum DataInfo {
73 Data { size: usize, preview: String },
74 Blob { size: usize },
75 Compressed { size: usize },
76}
77
78/// Trait to convert field tuples into FieldDefinition
79/// Supports both 3-tuple (defaults nullable to false) and 4-tuple (explicit nullable)
80pub trait IntoFieldDefinition {
81 fn into_field_definition(self) -> (String, FieldDefinition);
82}
83
84// 3-tuple: (field_name, field_type, unique) - defaults nullable to false
85impl<S: Into<String>> IntoFieldDefinition for (S, FieldType, bool) {
86 fn into_field_definition(self) -> (String, FieldDefinition) {
87 let (name, field_type, unique) = self;
88 (
89 name.into(),
90 FieldDefinition {
91 field_type,
92 unique,
93 indexed: unique,
94 nullable: false, // Default to false
95 validations: vec![],
96 ..Default::default()
97 },
98 )
99 }
100}
101
102// 4-tuple: (field_name, field_type, unique, nullable) - explicit control
103impl<S: Into<String>> IntoFieldDefinition for (S, FieldType, bool, bool) {
104 fn into_field_definition(self) -> (String, FieldDefinition) {
105 let (name, field_type, unique, nullable) = self;
106 (
107 name.into(),
108 FieldDefinition {
109 field_type,
110 unique,
111 indexed: unique,
112 nullable,
113 validations: vec![],
114 ..Default::default()
115 },
116 )
117 }
118}
119
120// Direct FieldDefinition: (field_name, field_definition)
121impl<S: Into<String>> IntoFieldDefinition for (S, FieldDefinition) {
122 fn into_field_definition(self) -> (String, FieldDefinition) {
123 (self.0.into(), self.1)
124 }
125}
126
127#[derive(Debug)]
128#[allow(dead_code)]
129enum WalOperation {
130 Put {
131 key: Arc<String>,
132 value: Arc<Vec<u8>>,
133 },
134 Delete {
135 key: String,
136 },
137}
138
139impl DataInfo {
140 pub fn size(&self) -> usize {
141 match self {
142 DataInfo::Data { size, .. } => *size,
143 DataInfo::Blob { size } => *size,
144 DataInfo::Compressed { size } => *size,
145 }
146 }
147}
148
149/// The main database engine
150pub struct Aurora {
151 pub(crate) hot: HotStore,
152 pub(crate) cold: Arc<ColdStore>,
153 pub(crate) primary_indices: Arc<DashMap<String, PrimaryIndex>>,
154 pub(crate) secondary_indices: Arc<DashMap<String, SecondaryIndex>>,
155 pub(crate) sys_id_mapping: sled::Tree,
156 pub(crate) mmap_index: Arc<RwLock<Option<memmap2::Mmap>>>,
157 pub(crate) index_manifest: Arc<DashMap<String, (usize, usize)>>,
158 /// Mapping from external u128 ID to internal u32 ID (Ticket 1)
159 _sid_dictionary: Arc<crossbeam_skiplist::SkipMap<u128, u32>>,
160 /// Mapping from internal u32 ID back to external u128 ID (O(1) lookup)
161 reverse_sid_dictionary: Arc<RwLock<Vec<u128>>>,
162 /// Bitset of deleted internal IDs available for recycling (Ticket 3)
163 deleted_ids: Arc<RwLock<RoaringBitmap>>,
164 /// IDs that have been cleared from all persistent bitmaps and are safe to reuse
165 recyclable_ids: Arc<RwLock<RoaringBitmap>>,
166 next_internal_id: Arc<AtomicU32>,
167 indices_initialized: Arc<OnceCell<()>>,
168 pub(crate) transaction_manager: crate::transaction::TransactionManager,
169 indices: Arc<DashMap<String, Index>>,
170 schema_cache: Arc<DashMap<String, Arc<Collection>>>,
171 config: AuroraConfig,
172 write_buffer: Option<Arc<WriteBuffer>>,
173 pub pubsub: crate::pubsub::PubSubSystem,
174 // Write-ahead log for durability
175 wal: Option<Arc<RwLock<WriteAheadLog>>>,
176 // Background task shutdown senders
177 checkpoint_shutdown: Option<tokio::sync::mpsc::UnboundedSender<()>>,
178 compaction_shutdown: Option<tokio::sync::mpsc::UnboundedSender<()>>,
179 wal_shutdown: Option<tokio::sync::mpsc::UnboundedSender<()>>,
180 // Worker system for background jobs
181 pub workers: Option<Arc<crate::workers::WorkerSystem>>,
182 // Asynchronous WAL Writer Channel
183 wal_writer: Option<tokio::sync::mpsc::UnboundedSender<WalOperation>>,
184 /// Set to true when the WAL writer task hits a terminal error. put() checks
185 /// this before sending so it skips the closed channel and degrades gracefully.
186 wal_disabled: Arc<AtomicBool>,
187 // Computed fields manager
188 pub computed: Arc<RwLock<crate::computed::ComputedFields>>,
189 /// LRU cache for parsed AQL AST
190 ast_cache: Arc<MokaCache<u64, crate::parser::ast::Document>>,
191 /// LRU cache for query plans
192 pub plan_cache: Arc<MokaCache<u64, Arc<crate::parser::executor::QueryPlan>>>,
193 /// Set of "collection:field" index keys currently being rebuilt in the background.
194 /// Queries hitting a building index fall back to full scan instead of returning
195 /// incorrect empty results from a half-populated index.
196 pub(crate) building_indices: Arc<DashMap<String, ()>>,
197}
198
199impl fmt::Debug for Aurora {
200 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
201 f.debug_struct("Aurora")
202 .field("hot", &"HotStore")
203 .field("cold", &"ColdStore")
204 .field("primary_indices_count", &self.primary_indices.len())
205 .field("secondary_indices_count", &self.secondary_indices.len())
206 .field(
207 "active_transactions",
208 &self.transaction_manager.active_count(),
209 )
210 .field("indices_count", &self.indices.len())
211 .finish()
212 }
213}
214
215impl Clone for Aurora {
216 fn clone(&self) -> Self {
217 Self {
218 hot: self.hot.clone(),
219 cold: Arc::clone(&self.cold),
220 primary_indices: Arc::clone(&self.primary_indices),
221 secondary_indices: Arc::clone(&self.secondary_indices),
222 sys_id_mapping: self.sys_id_mapping.clone(),
223 mmap_index: Arc::clone(&self.mmap_index),
224 index_manifest: Arc::clone(&self.index_manifest),
225 _sid_dictionary: Arc::clone(&self._sid_dictionary),
226 reverse_sid_dictionary: Arc::clone(&self.reverse_sid_dictionary),
227 deleted_ids: Arc::clone(&self.deleted_ids),
228 recyclable_ids: Arc::clone(&self.recyclable_ids),
229 next_internal_id: Arc::clone(&self.next_internal_id),
230 indices_initialized: Arc::clone(&self.indices_initialized),
231 transaction_manager: self.transaction_manager.clone(),
232 indices: Arc::clone(&self.indices),
233 schema_cache: Arc::clone(&self.schema_cache),
234 config: self.config.clone(),
235 write_buffer: self.write_buffer.clone(),
236 pubsub: self.pubsub.clone(),
237 wal: self.wal.clone(),
238 checkpoint_shutdown: self.checkpoint_shutdown.clone(),
239 compaction_shutdown: self.compaction_shutdown.clone(),
240 wal_shutdown: self.wal_shutdown.clone(),
241 workers: self.workers.clone(),
242 wal_writer: self.wal_writer.clone(),
243 wal_disabled: Arc::clone(&self.wal_disabled),
244 computed: Arc::clone(&self.computed),
245 ast_cache: Arc::clone(&self.ast_cache),
246 plan_cache: Arc::clone(&self.plan_cache),
247 building_indices: Arc::clone(&self.building_indices),
248 }
249 }
250}
251
252/// Trait for converting inputs into execution parameters
253pub trait ToExecParams {
254 fn into_params(self) -> (String, ExecutionOptions);
255}
256
257impl<'a> ToExecParams for &'a str {
258 fn into_params(self) -> (String, ExecutionOptions) {
259 (self.to_string(), ExecutionOptions::default())
260 }
261}
262
263impl ToExecParams for String {
264 fn into_params(self) -> (String, ExecutionOptions) {
265 (self, ExecutionOptions::default())
266 }
267}
268
269impl ToExecParams for (String, ExecutionOptions) {
270 fn into_params(self) -> (String, ExecutionOptions) {
271 self
272 }
273}
274
275impl<'a> ToExecParams for (&'a str, ExecutionOptions) {
276 fn into_params(self) -> (String, ExecutionOptions) {
277 (self.0.to_string(), self.1)
278 }
279}
280
281impl<'a, V> ToExecParams for (&'a str, V)
282where
283 V: Into<serde_json::Value>,
284{
285 fn into_params(self) -> (String, ExecutionOptions) {
286 let (aql, vars) = self;
287 let json_vars = vars.into();
288 let mut map = std::collections::HashMap::new();
289 if let serde_json::Value::Object(obj) = json_vars {
290 for (k, v) in obj {
291 map.insert(k, crate::parser::executor::json_to_aql_value(v));
292 }
293 }
294 (
295 aql.to_string(),
296 ExecutionOptions {
297 variables: map,
298 ..Default::default()
299 },
300 )
301 }
302}
303
304impl Aurora {
305 /// Get reference to AST cache for parsed AQL queries
306 pub fn ast_cache(&self) -> &MokaCache<u64, crate::parser::ast::Document> {
307 &self.ast_cache
308 }
309
310 /// Convert an external String ID back to an internal ID, creating if not found
311 fn get_or_create_internal_id(&self, external_id: &str) -> u32 {
312 let u = self.parse_external_id(external_id);
313
314 // 1. Check dictionary for existing mapping
315 if let Some(entry) = self._sid_dictionary.get(&u) {
316 return *entry.value();
317 }
318
319 // 2. Try to recycle a deleted ID
320 let recycled_id = if let Ok(mut recyclable) = self.recyclable_ids.write() {
321 if !recyclable.is_empty() {
322 let id = recyclable.min().unwrap();
323 recyclable.remove(id);
324 Some(id)
325 } else {
326 None
327 }
328 } else {
329 None
330 };
331
332 let internal_id = if let Some(id) = recycled_id {
333 id
334 } else {
335 // 3. Allocate new ID
336 self.next_internal_id.fetch_add(1, Ordering::SeqCst)
337 };
338
339 // 4. Update dictionaries
340 self._sid_dictionary.insert(u, internal_id);
341
342 // 5. Persist mapping to Sled for cross-session stability
343 let _ = self
344 .sys_id_mapping
345 .insert(u.to_be_bytes(), &internal_id.to_be_bytes());
346
347 if let Ok(mut reverse) = self.reverse_sid_dictionary.write() {
348 if (internal_id as usize) >= reverse.len() {
349 reverse.resize((internal_id as usize) + 1, 0);
350 }
351 reverse[internal_id as usize] = u;
352 }
353
354 internal_id
355 }
356
357 /// Load persisted internal ID mappings from Sled at startup
358 fn load_id_mapping_from_sled(&self) -> Result<()> {
359 let mut max_id = 0u32;
360 let mut reverse_guard = self.reverse_sid_dictionary.write().unwrap();
361
362 for result in self.sys_id_mapping.iter() {
363 let (k, v) = result?;
364 if k.len() != 16 || v.len() != 4 {
365 continue;
366 }
367
368 let u = u128::from_be_bytes(k.as_ref().try_into().map_err(|_| {
369 AqlError::new(
370 ErrorCode::InternalError,
371 "ID mapping key corrupt".to_string(),
372 )
373 })?);
374 let id = u32::from_be_bytes(v.as_ref().try_into().map_err(|_| {
375 AqlError::new(
376 ErrorCode::InternalError,
377 "ID mapping value corrupt".to_string(),
378 )
379 })?);
380
381 self._sid_dictionary.insert(u, id);
382
383 if (id as usize) >= reverse_guard.len() {
384 reverse_guard.resize((id as usize) + 1, 0);
385 }
386 reverse_guard[id as usize] = u;
387
388 max_id = max_id.max(id);
389 }
390
391 // Ensure the counter starts AFTER the highest loaded ID
392 if max_id > 0 || !self._sid_dictionary.is_empty() {
393 self.next_internal_id.store(max_id + 1, Ordering::SeqCst);
394 }
395
396 Ok(())
397 }
398
399 /// Get a secondary index by name
400 pub fn get_secondary_index(&self, key: &str) -> Option<SecondaryIndex> {
401 self.secondary_indices.get(key).map(|e| e.value().clone())
402 }
403
404 /// Look up a single IndexStorage entry without cloning the whole inner DashMap.
405 /// This is O(1) vs the O(N) clone in `get_secondary_index`.
406 pub fn get_indexed_storage(
407 &self,
408 index_key: &str,
409 val_str: &str,
410 ) -> Option<Arc<RwLock<IndexStorage>>> {
411 self.secondary_indices
412 .get(index_key)?
413 .value()
414 .get(val_str)
415 .map(|e| e.value().clone())
416 }
417
418 /// Check whether a secondary index key exists in hot RAM (i.e. the field
419 /// is tracked by the indexer for this session).
420 pub fn has_index_key(&self, index_key: &str) -> bool {
421 self.secondary_indices.contains_key(index_key)
422 }
423
424 /// Convert an internal u32 ID back to an external String ID
425 pub fn get_external_id(&self, internal_id: u32) -> Option<String> {
426 let reverse_dict = self.reverse_sid_dictionary.read().ok()?;
427 reverse_dict
428 .get(internal_id as usize)
429 .map(|u| {
430 if *u == 0 {
431 return String::new(); // Or handle as None if 0 is used for empty
432 }
433 self.format_external_id(*u)
434 })
435 .filter(|s| !s.is_empty())
436 }
437
438 /// Convert an external String ID to its binary u128 representation
439 pub fn parse_external_id(&self, id: &str) -> u128 {
440 uuid::Uuid::parse_str(id)
441 .map(|u| u.as_u128())
442 .unwrap_or_else(|_| {
443 // Fallback to hashing if not a UUID
444 use std::collections::hash_map::DefaultHasher;
445 use std::hash::{Hash, Hasher};
446 let mut s = DefaultHasher::new();
447 id.hash(&mut s);
448 s.finish() as u128
449 })
450 }
451
452 /// Convert binary u128 back to UUID string
453 pub fn format_external_id(&self, id: u128) -> String {
454 uuid::Uuid::from_u128(id).to_string()
455 }
456
457 /// Ensure secondary indices are initialized from checkpoint
458 pub async fn ensure_indices_initialized(&self) -> Result<()> {
459 self.indices_initialized
460 .get_or_try_init(|| async { self.load_index_checkpoint() })
461 .await?;
462 // Cross-check schema against checkpoint: spawn rebuilds for any index
463 // that was missing from the checkpoint (e.g. after a crash).
464 self.init_schema_indices().await;
465 Ok(())
466 }
467
468 /// Load index checkpoint from disk
469 fn load_index_checkpoint(&self) -> Result<()> {
470 let path = &self.config.db_path;
471 let index_bin = path.join("aurora_indices.bin");
472 let index_json = path.join("aurora_indices.json");
473
474 if !index_bin.exists() || !index_json.exists() {
475 return Ok(());
476 }
477
478 // 1. Load manifest
479 let manifest_data = std::fs::read_to_string(index_json)?;
480 let manifest: HashMap<String, (usize, usize)> = serde_json::from_str(&manifest_data)?;
481 for (k, v) in manifest {
482 self.index_manifest.insert(k, v);
483 }
484
485 // 2. Memory map indices
486 let file = StdFile::open(index_bin)?;
487 let mmap = unsafe { memmap2::Mmap::map(&file)? };
488 if let Ok(mut guard) = self.mmap_index.write() {
489 *guard = Some(mmap);
490 }
491
492 // 3. Hydrate secondary_indices from checkpoint so the hot map is not
493 // empty after a crash/restart. Without this every indexed lookup
494 // falls through to a full scan until the next checkpoint fires.
495 //
496 // Manifest key format: "collection:field:value"
497 // index_key = "collection:field" (split on 2nd colon)
498 let mmap_guard = self.mmap_index.read().ok();
499 if let Some(ref guard) = mmap_guard {
500 if let Some(ref mmap) = **guard {
501 for entry in self.index_manifest.iter() {
502 let full_key = entry.key();
503 let (offset, len) = *entry.value();
504
505 // Split on the second ':' to recover index_key vs value_str
506 let colon2 = full_key.match_indices(':').nth(1).map(|(i, _)| i);
507 let (index_key, val_str) = match colon2 {
508 Some(pos) => (&full_key[..pos], &full_key[pos + 1..]),
509 None => continue,
510 };
511
512 if let Some(bytes) = mmap.get(offset..offset + len) {
513 if let Ok(bitmap) = RoaringBitmap::deserialize_from(bytes) {
514 let index = self
515 .secondary_indices
516 .entry(index_key.to_string())
517 .or_default();
518 index.insert(
519 val_str.to_string(),
520 Arc::new(std::sync::RwLock::new(IndexStorage::Bitmap(bitmap))),
521 );
522 }
523 }
524 }
525 }
526 }
527
528 Ok(())
529 }
530
531 /// Returns true if the secondary index for `collection:field` is currently
532 /// being rebuilt in the background. Callers should fall back to a full scan.
533 pub fn is_index_building(&self, collection: &str, field: &str) -> bool {
534 let key = format!("{}:{}", collection, field);
535 self.building_indices.contains_key(&key)
536 }
537
538 /// Scan every collection defined in the schema. For any indexed/unique field
539 /// whose index is absent from the checkpoint, mark it as building and spawn
540 /// a background task to rebuild it from the cold store.
541 ///
542 /// Called once during `ensure_indices_initialized`, after `load_index_checkpoint`.
543 async fn init_schema_indices(&self) {
544 let collection_names: Vec<String> = self
545 .cold
546 .scan_prefix("_collection:")
547 .filter_map(|r| r.ok())
548 .map(|(key, _)| key.trim_start_matches("_collection:").to_string())
549 .collect();
550
551 // Group missing index keys by collection so we do ONE scan per collection,
552 // not one scan per field (which would re-scan the same docs N times).
553 let mut missing_by_collection: HashMap<String, Vec<String>> = HashMap::new();
554
555 for collection_name in collection_names {
556 let Ok(col_def) = self.get_collection_definition(&collection_name) else {
557 continue;
558 };
559
560 // Always include the auto-indexed "id" field — it's not schema-derived
561 // but index_value() indexes it automatically if present in doc.data.
562 let auto_id_key = format!("{}:id", collection_name);
563 if !self.has_index(&collection_name, "id") {
564 self.building_indices.insert(auto_id_key.clone(), ());
565 missing_by_collection
566 .entry(collection_name.clone())
567 .or_default()
568 .push(auto_id_key);
569 }
570
571 for (field_name, field_def) in &col_def.fields {
572 if !field_def.indexed && !field_def.unique {
573 continue;
574 }
575 let index_key = format!("{}:{}", collection_name, field_name);
576 if self.has_index(&collection_name, field_name) {
577 continue;
578 }
579 self.building_indices.insert(index_key.clone(), ());
580 missing_by_collection
581 .entry(collection_name.clone())
582 .or_default()
583 .push(index_key);
584 }
585 }
586
587 // One rebuild task per collection — single scan, indexes all missing fields.
588 for (collection_name, index_keys) in missing_by_collection {
589 let db = self.clone();
590 tokio::spawn(async move {
591 db.rebuild_collection_indices(&collection_name, index_keys)
592 .await;
593 });
594 }
595 }
596
597 /// Background rebuild: one full collection scan that rebuilds ALL missing index keys.
598 /// Using a single scan avoids redundant I/O when multiple fields are missing.
599 async fn rebuild_collection_indices(&self, collection: &str, index_keys: Vec<String>) {
600 let db = self.clone();
601 let coll = collection.to_string();
602 let keys_clone = index_keys.clone();
603
604 let result = tokio::task::spawn_blocking(move || {
605 let prefix = format!("{}:", coll);
606 for entry in db.cold.scan_prefix(&prefix).flatten() {
607 let (key, value) = entry;
608 if key.starts_with('_') {
609 continue;
610 }
611 // index_value is idempotent and covers all indexed fields in one pass.
612 let _ = db.index_value(&coll, &key, &value, None);
613 }
614 })
615 .await;
616
617 // Clear all building flags for this collection's keys regardless of outcome.
618 for key in &index_keys {
619 self.building_indices.remove(key);
620 }
621
622 if result.is_ok() {
623 let _ = self.save_index_checkpoint();
624 } else {
625 eprintln!(
626 "Index rebuild panicked for collection '{}' (keys: {:?}); will retry on next query cycle.",
627 collection, keys_clone
628 );
629 }
630 }
631
632 /// Persist dense secondary indices to disk and re-mmap the result.
633 ///
634 /// ## Dense-Bitmap-Only rule
635 /// Only `IndexStorage::Bitmap` entries are written. `Single` and `List` are
636 /// high-cardinality (one entry per unique email/UUID) — checkpointing them would
637 /// produce a manifest with millions of keys (50-100 MB of JSON). Skipping them
638 /// keeps the manifest in the low-kilobyte range regardless of document count.
639 ///
640 /// ## Hot / Cold merge
641 /// `secondary_indices` only accumulates writes since the last boot. The previous
642 /// checkpoint (in `mmap_index`) holds everything before that. Each merged bitmap
643 /// is `cold | hot` so no data is lost across checkpoints. Deleted IDs are masked
644 /// out using the `deleted_ids` bitmap before serializing.
645 ///
646 /// ## Format
647 /// aurora_indices.bin — concatenated native-serialized RoaringBitmaps
648 /// aurora_indices.json — { "col:field:value" -> [byte_offset, byte_len] }
649 fn save_index_checkpoint(&self) -> Result<()> {
650 use std::io::Write as IoWrite;
651
652 if self.secondary_indices.is_empty() {
653 return Ok(());
654 }
655
656 let path = &self.config.db_path;
657 let index_bin = path.join("aurora_indices.bin");
658 let index_json = path.join("aurora_indices.json");
659
660 // Snapshot deleted IDs once; used to mask ghost IDs from every bitmap
661 let deleted_snap = self
662 .deleted_ids
663 .read()
664 .map(|g| g.clone())
665 .unwrap_or_default();
666
667 let mut bin_data: Vec<u8> = Vec::new();
668 let mut manifest: HashMap<String, (usize, usize)> = HashMap::new();
669
670 // Hold the mmap read-lock for the whole loop so cold lookups are stable
671 let mmap_guard = self.mmap_index.read().ok();
672 let mmap_ref = mmap_guard.as_ref().and_then(|g| g.as_ref());
673
674 for col_entry in self.secondary_indices.iter() {
675 let index_key = col_entry.key(); // e.g. "users:status"
676
677 // Unique fields are always Single(u32) — they never accumulate
678 // enough docs per value to promote to Bitmap. We must checkpoint
679 // them explicitly or the uniqueness guard becomes blind after a
680 // restart (the hot index is empty and the check is silently skipped).
681 let is_unique_field = if let Some((coll, field)) = index_key.split_once(':') {
682 self.get_collection_definition(coll)
683 .ok()
684 .and_then(|def| def.fields.get(field).cloned())
685 .map(|f| f.unique)
686 .unwrap_or(false)
687 } else {
688 false
689 };
690
691 for val_entry in col_entry.value().iter() {
692 let Ok(storage) = val_entry.value().read() else {
693 continue;
694 };
695
696 // Dense-Bitmap-Only for non-unique fields: skip Single / List.
697 // Unique fields are always checkpointed regardless of storage type.
698 let hot_bitmap: RoaringBitmap = match &*storage {
699 IndexStorage::Bitmap(b) => b.clone(),
700 _ if is_unique_field => storage.to_bitmap(),
701 _ => continue,
702 };
703
704 let val_str = val_entry.key();
705 let full_key = format!("{}:{}", index_key, val_str);
706
707 // Merge with any previously checkpointed cold bitmap for this key
708 let mut merged = if let Some(loc) = self.index_manifest.get(&full_key) {
709 let (offset, len) = *loc.value();
710 mmap_ref
711 .and_then(|m| m.get(offset..offset + len))
712 .and_then(|bytes| RoaringBitmap::deserialize_from(bytes).ok())
713 .unwrap_or_default()
714 } else {
715 RoaringBitmap::new()
716 };
717
718 merged |= hot_bitmap;
719
720 // Strip IDs that have been deleted since the last checkpoint
721 if !deleted_snap.is_empty() {
722 merged -= &deleted_snap;
723 }
724
725 if merged.is_empty() {
726 continue;
727 }
728
729 let offset = bin_data.len();
730 merged.serialize_into(&mut bin_data).map_err(|e| {
731 AqlError::new(ErrorCode::IoError, format!("bitmap serialize: {}", e))
732 })?;
733 manifest.insert(full_key, (offset, bin_data.len() - offset));
734 }
735 }
736
737 if manifest.is_empty() {
738 return Ok(());
739 }
740
741 // Atomic write: temp → rename so a mid-write crash leaves the old checkpoint intact
742 let tmp_bin = path.join("aurora_indices.bin.tmp");
743 let tmp_json = path.join("aurora_indices.json.tmp");
744
745 {
746 let mut f = StdFile::create(&tmp_bin)?;
747 f.write_all(&bin_data)?;
748 f.flush()?;
749 }
750
751 let manifest_json = serde_json::to_string(&manifest)
752 .map_err(|e| AqlError::new(ErrorCode::SerializationError, e.to_string()))?;
753 std::fs::write(&tmp_json, &manifest_json)?;
754
755 std::fs::rename(&tmp_bin, &index_bin)?;
756 std::fs::rename(&tmp_json, &index_json)?;
757
758 // Drop mmap guard before acquiring the write lock below
759 drop(mmap_guard);
760
761 // Refresh live manifest so the current process reads from the new checkpoint
762 self.index_manifest.clear();
763 for (k, v) in manifest {
764 self.index_manifest.insert(k, v);
765 }
766
767 // Re-mmap the new binary so queries hit the updated cold index immediately
768 let file = StdFile::open(&index_bin)?;
769 let mmap = unsafe { memmap2::Mmap::map(&file)? };
770 if let Ok(mut guard) = self.mmap_index.write() {
771 *guard = Some(mmap);
772 }
773
774 // 4. Finalize ID recycling: move from pending (deleted_ids) to recyclable
775 // This ensures IDs are only reused AFTER they have been wiped from all cold bitmaps
776 if !deleted_snap.is_empty() {
777 if let Ok(mut pending) = self.deleted_ids.write() {
778 *pending -= &deleted_snap;
779 }
780 if let Ok(mut recyclable) = self.recyclable_ids.write() {
781 *recyclable |= &deleted_snap;
782 // Persist recyclable IDs so they survive restart
783 let _ = self.save_recyclable_ids(&recyclable);
784 }
785 }
786
787 Ok(())
788 }
789
790 fn save_recyclable_ids(&self, bitmap: &RoaringBitmap) -> Result<()> {
791 let mut buf = Vec::new();
792 bitmap
793 .serialize_into(&mut buf)
794 .map_err(|e| AqlError::new(ErrorCode::IoError, e.to_string()))?;
795 self.cold.set("_sys:recyclable_ids".to_string(), buf)?;
796 Ok(())
797 }
798
799 fn load_recyclable_ids(&self) -> Result<()> {
800 if let Some(data) = self.cold.get("_sys:recyclable_ids")? {
801 if let Ok(bitmap) = RoaringBitmap::deserialize_from(&data[..]) {
802 if let Ok(mut recyclable) = self.recyclable_ids.write() {
803 *recyclable = bitmap;
804 }
805 }
806 }
807 Ok(())
808 }
809
810 /// Internal serialization helper
811 pub fn deserialize_internal<T: serde::de::DeserializeOwned>(&self, data: &[u8]) -> Result<T> {
812 if data.starts_with(b"AUR\x01") {
813 Ok(rmp_serde::from_slice(&data[4..])
814 .map_err(|e| AqlError::new(ErrorCode::SerializationError, e.to_string()))?)
815 } else {
816 Ok(serde_json::from_slice(data)
817 .map_err(|e| AqlError::new(ErrorCode::SerializationError, e.to_string()))?)
818 }
819 }
820
821 /// Optimized value indexing using adaptive storage (Ticket 3)
822 fn index_value(
823 &self,
824 collection: &str,
825 key: &str,
826 value: &[u8],
827 _tx_id: Option<u64>,
828 ) -> Result<()> {
829 // Update primary index with metadata only
830 let location = DiskLocation::new(value.len());
831 self.primary_indices
832 .entry(collection.to_string())
833 .or_default()
834 .insert(key.to_string(), location);
835
836 if let Ok(doc) = self.deserialize_internal::<Document>(value) {
837 let internal_id = self.get_or_create_internal_id(&doc._sid);
838 let collection_def = self.get_collection_definition(collection)?;
839
840 // PERFORMANCE: Automatically index 'id' if present in data
841 if let Some(id_val) = doc.data.get("id") {
842 let index_key = format!("{}:id", collection);
843 let val_str = match id_val {
844 Value::String(s) => s.clone(),
845 _ => id_val.to_string(),
846 };
847 let index = self.secondary_indices.entry(index_key).or_default();
848 index
849 .entry(val_str)
850 .and_modify(|existing| {
851 if let Ok(mut storage) = existing.write() {
852 storage.add(internal_id);
853 }
854 })
855 .or_insert_with(|| Arc::new(RwLock::new(IndexStorage::Single(internal_id))));
856 }
857
858 for (field_name, field_val) in doc.data {
859 if collection_def
860 .fields
861 .get(&field_name)
862 .map_or(false, |f| f.indexed)
863 {
864 let index_key = format!("{}:{}", collection, field_name);
865 let val_str = match &field_val {
866 Value::String(s) => s.clone(),
867 _ => field_val.to_string(),
868 };
869
870 let index = self.secondary_indices.entry(index_key).or_default();
871 // Use and_modify+or_insert_with so newly-created entries start as
872 // Single(id) without an extra add() call that would corrupt them
873 // into List([id, id]).
874 index
875 .entry(val_str)
876 .and_modify(|existing| {
877 if let Ok(mut storage) = existing.write() {
878 storage.add(internal_id);
879 }
880 })
881 .or_insert_with(|| {
882 Arc::new(RwLock::new(IndexStorage::Single(internal_id)))
883 });
884 }
885 }
886 }
887 Ok(())
888 }
889
890 /// Get an iterator over all documents in a collection
891 pub fn stream_collection(&self, collection: &str) -> Result<impl Iterator<Item = Document>> {
892 let mut docs = Vec::new();
893 if let Some(index) = self.primary_indices.get(collection) {
894 for entry in index.iter() {
895 let id = entry.key();
896 if let Some(doc) = self.get_document(collection, id)? {
897 docs.push(doc);
898 }
899 }
900 }
901 Ok(docs.into_iter())
902 }
903
904 /// Get first document matching filter
905 pub async fn first_one(
906 &self,
907 collection: &str,
908 filter_fn: impl Fn(&Document) -> bool,
909 ) -> Result<Option<Document>> {
910 let docs = self.scan_and_filter(collection, filter_fn, Some(1))?;
911 Ok(docs.into_iter().next())
912 }
913
914 /// Executes an AQL (Aurora Query Language) operation against the database.
915 ///
916 /// This is the primary entry point for all database operations including queries,
917 /// mutations, schema definitions, and migrations.
918 ///
919 /// # Arguments
920 /// * `input` - Anything that implements `ToExecParams`. This includes:
921 /// - A simple query string: `"query { users { id } }"`
922 /// - A query string with variables: `("query($id: String) { users(where: {id: {eq: $id}}) { name } }", json!({"id": "u1"}))`
923 /// - The `doc!` macro output: `doc!("query...", {"var": 123})`
924 ///
925 /// # Returns
926 /// An `ExecutionResult` which can be bound to Rust types using `.bind()` or `.bind_first()`.
927 ///
928 /// # Examples
929 ///
930 /// ```
931 /// # use aurora_db::{Aurora, doc, Value};
932 /// # async fn example(db: Aurora) -> Result<(), Box<dyn std::error::Error>> {
933 /// // Simple query
934 /// let res = db.execute("query { users { name } }").await?;
935 ///
936 /// // Query with variables using doc! macro
937 /// let res = db.execute(doc!(
938 /// "query($minAge: Int) { users(where: { age: { gte: $minAge } }) { name } }",
939 /// { "minAge": 18 }
940 /// )).await?;
941 /// # Ok(())
942 /// # }
943 /// ```
944 pub async fn execute<I: ToExecParams>(&self, input: I) -> Result<ExecutionResult> {
945 let (aql, options) = input.into_params();
946 parser::executor::execute(self, &aql, options).await
947 }
948
949 /// Subscribes to real-time changes using AQL subscription syntax.
950 ///
951 /// This method returns a `ChangeListener` that yields events whenever documents
952 /// matching the subscription criteria are inserted, updated, or deleted.
953 ///
954 /// # Arguments
955 /// * `aql` - An AQL subscription string.
956 ///
957 /// # Returns
958 /// A `ChangeListener` (stream-like) for receiving real-time `ChangeEvent`s.
959 ///
960 /// # Example
961 ///
962 /// ```ignore
963 /// let mut listener = db.stream("subscription { users { id name } }").await?;
964 /// while let Ok(event) = listener.recv().await {
965 /// println!("Change detected: {:?}", event);
966 /// }
967 /// ```
968 pub async fn stream(&self, aql: &str) -> Result<crate::pubsub::ChangeListener> {
969 let result = self.execute(aql).await?;
970
971 match result {
972 ExecutionResult::Subscription(sub) => sub.stream.ok_or_else(|| {
973 crate::error::AqlError::new(
974 crate::error::ErrorCode::QueryError,
975 "Subscription did not return a stream".to_string(),
976 )
977 }),
978 _ => Err(crate::error::AqlError::new(
979 crate::error::ErrorCode::QueryError,
980 "Expected a subscription query, got a different operation type".to_string(),
981 )),
982 }
983 }
984
985 /// Explains the execution plan for an AQL query without executing it.
986 ///
987 /// This is useful for debugging performance issues and understanding how Aurora
988 /// will process a given query (e.g., whether it will use an index or a full scan).
989 ///
990 /// # Arguments
991 /// * `input` - The query to explain (accepts same formats as `execute`).
992 ///
993 /// # Returns
994 /// An `ExecutionPlan` containing the sequence of operations and estimated cost.
995 pub async fn explain<I: ToExecParams>(&self, input: I) -> Result<ExecutionPlan> {
996 let (aql, options) = input.into_params();
997
998 // Parse and analyze without executing
999 let doc = parser::parse_with_variables(&aql, &options.variables)?;
1000
1001 self.analyze_execution_plan(&doc).await
1002 }
1003
1004 /// Analyzes a parsed AQL document to produce an execution plan.
1005 ///
1006 /// Internal helper used by `explain`.
1007 pub async fn analyze_execution_plan(
1008 &self,
1009 doc: &crate::parser::ast::Document,
1010 ) -> Result<ExecutionPlan> {
1011 let mut operations = Vec::new();
1012 for op in &doc.operations {
1013 operations.push(format!("{:?}", op));
1014 }
1015
1016 Ok(ExecutionPlan {
1017 operations,
1018 estimated_cost: 1.0,
1019 })
1020 }
1021
1022 /// Removes stale lock files from a database directory.
1023 ///
1024 /// If Aurora crashes or is forcefully terminated, it may leave behind lock files
1025 /// that prevent the database from being reopened. This method safely removes
1026 /// those lock files.
1027 ///
1028 /// # Safety
1029 /// Only call this when you're certain no other Aurora instance is using the database.
1030 /// Removing lock files while another process is running could cause data corruption.
1031 pub fn remove_stale_lock<P: AsRef<Path>>(path: P) -> Result<bool> {
1032 let resolved_path = Self::resolve_path(path)?;
1033 crate::storage::cold::ColdStore::try_remove_stale_lock(resolved_path.to_str().unwrap())
1034 }
1035
1036 /// Opens or creates a database at the specified path.
1037 ///
1038 /// If the database doesn't exist, it will be created. If it does exist, it will
1039 /// be opened and any pending WAL (Write-Ahead Log) entries will be replayed
1040 /// to ensure consistency.
1041 ///
1042 /// # Arguments
1043 /// * `path` - The filesystem path to the database.
1044 ///
1045 /// # Examples
1046 ///
1047 /// ```no_run
1048 /// # use aurora_db::Aurora;
1049 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1050 /// let db = Aurora::open("my_database.db").await?;
1051 /// # Ok(())
1052 /// # }
1053 /// ```
1054 pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
1055 let config = AuroraConfig {
1056 db_path: Self::resolve_path(path)?,
1057 ..Default::default()
1058 };
1059 Self::with_config(config).await
1060 }
1061
1062 /// Helper method to resolve database path
1063 fn resolve_path<P: AsRef<Path>>(path: P) -> Result<PathBuf> {
1064 let path = path.as_ref();
1065
1066 // If it's an absolute path, use it directly
1067 if path.is_absolute() {
1068 return Ok(path.to_path_buf());
1069 }
1070
1071 // Otherwise, resolve relative to current directory
1072 match std::env::current_dir() {
1073 Ok(current_dir) => Ok(current_dir.join(path)),
1074 Err(e) => Err(AqlError::new(
1075 ErrorCode::IoError,
1076 format!("Failed to resolve current directory: {}", e),
1077 )),
1078 }
1079 }
1080
1081 /// Opens a database with a custom configuration.
1082 ///
1083 /// This allows fine-tuning performance parameters like cache sizes,
1084 /// durability modes, and worker settings.
1085 ///
1086 /// # Arguments
1087 /// * `config` - The `AuroraConfig` to use.
1088 ///
1089 /// # Examples
1090 ///
1091 /// ```no_run
1092 /// # use aurora_db::{Aurora, AuroraConfig};
1093 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1094 /// let config = AuroraConfig {
1095 /// db_path: "custom.db".into(),
1096 /// hot_cache_size_mb: 1024,
1097 /// enable_wal: true,
1098 /// ..Default::default()
1099 /// };
1100 /// let db = Aurora::with_config(config).await?;
1101 /// # Ok(())
1102 /// # }
1103 /// ```
1104 pub async fn with_config(config: AuroraConfig) -> Result<Self> {
1105 let path = Self::resolve_path(&config.db_path)?;
1106
1107 if config.create_dirs
1108 && let Some(parent) = path.parent()
1109 && !parent.exists()
1110 {
1111 std::fs::create_dir_all(parent)?;
1112 }
1113
1114 let cold = Arc::new(ColdStore::with_config(
1115 path.to_str().unwrap(),
1116 config.cold_cache_capacity_mb,
1117 config.cold_flush_interval_ms,
1118 config.cold_mode.clone(),
1119 )?);
1120
1121 let hot = HotStore::with_config_and_eviction(
1122 config.hot_cache_size_mb,
1123 config.hot_cache_cleanup_interval_secs,
1124 config.eviction_policy,
1125 );
1126
1127 let write_buffer = if config.enable_write_buffering {
1128 Some(Arc::new(WriteBuffer::new(
1129 Arc::clone(&cold),
1130 config.write_buffer_size,
1131 config.write_buffer_flush_interval_ms,
1132 )))
1133 } else {
1134 None
1135 };
1136
1137 let auto_compact = config.auto_compact;
1138 let enable_wal = config.enable_wal;
1139 let pubsub = crate::pubsub::PubSubSystem::new(10000);
1140
1141 // Initialize WAL
1142 let (wal, wal_entries) = if enable_wal {
1143 let wal_path = path.to_str().unwrap();
1144 match WriteAheadLog::new(wal_path) {
1145 Ok(mut wal_log) => {
1146 let entries = wal_log.recover().unwrap_or_else(|_| Vec::new());
1147 (Some(Arc::new(RwLock::new(wal_log))), entries)
1148 }
1149 Err(_) => (None, Vec::new()),
1150 }
1151 } else {
1152 (None, Vec::new())
1153 };
1154
1155 // --- FIX: Initialize Background WAL Writer ---
1156 // Only enable WAL writer if WAL was successfully initialized
1157 let wal_disabled_flag = Arc::new(AtomicBool::new(false));
1158 let wal_writer = if enable_wal && wal.is_some() {
1159 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<WalOperation>();
1160 let wal_clone = wal.clone();
1161 let wal_disabled_task = Arc::clone(&wal_disabled_flag);
1162
1163 // Spawn a single dedicated thread/task for writing WAL sequentially
1164 tokio::spawn(async move {
1165 if let Some(wal) = wal_clone {
1166 while let Some(op) = rx.recv().await {
1167 // If WAL has been disabled due to a prior error, drain messages silently.
1168 if wal_disabled_task.load(Ordering::Relaxed) {
1169 continue;
1170 }
1171
1172 let wal = wal.clone();
1173 // Move blocking I/O to a blocking thread
1174 let result = tokio::task::spawn_blocking(
1175 move || -> std::result::Result<(), String> {
1176 match wal.write() {
1177 Ok(mut guard) => {
1178 let wal_result = match op {
1179 WalOperation::Put { key, value } => guard.append(
1180 Operation::Put,
1181 &key,
1182 Some(value.as_ref()),
1183 ),
1184 WalOperation::Delete { key } => {
1185 guard.append(Operation::Delete, &key, None)
1186 }
1187 };
1188 wal_result.map_err(|e| format!("WAL append failed: {}", e))
1189 }
1190 Err(e) => {
1191 // Lock is poisoned — degrade gracefully, do not kill the process.
1192 Err(format!("WAL lock poisoned: {}", e))
1193 }
1194 }
1195 },
1196 )
1197 .await;
1198
1199 match result {
1200 Err(e) if e.is_cancelled() => break,
1201 Err(e) => {
1202 eprintln!(
1203 "WAL task panicked: {}. Durability disabled for this session.",
1204 e
1205 );
1206 wal_disabled_task.store(true, Ordering::SeqCst);
1207 // Keep draining so callers never see a closed-channel error.
1208 }
1209 Ok(Err(msg)) => {
1210 eprintln!(
1211 "CRITICAL WAL write error: {}. Durability disabled for this session.",
1212 msg
1213 );
1214 wal_disabled_task.store(true, Ordering::SeqCst);
1215 }
1216 Ok(Ok(())) => {}
1217 }
1218 }
1219 }
1220 });
1221 Some(tx)
1222 } else {
1223 None
1224 };
1225
1226 let checkpoint_shutdown = if wal.is_some() {
1227 let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1228 let cold_clone = Arc::clone(&cold);
1229 let wal_clone = wal.clone();
1230 let checkpoint_interval = config.checkpoint_interval_ms;
1231
1232 tokio::spawn(async move {
1233 let mut interval =
1234 tokio::time::interval(Duration::from_millis(checkpoint_interval));
1235 loop {
1236 tokio::select! {
1237 _ = interval.tick() => {
1238 if let Err(e) = cold_clone.flush() {
1239 eprintln!("Background checkpoint flush error: {}", e);
1240 }
1241 if let Some(ref wal) = wal_clone
1242 && let Ok(mut wal_guard) = wal.write() {
1243 let _ = wal_guard.truncate();
1244 }
1245 }
1246 _ = shutdown_rx.recv() => {
1247 let _ = cold_clone.flush();
1248 if let Some(ref wal) = wal_clone
1249 && let Ok(mut wal_guard) = wal.write() {
1250 let _ = wal_guard.truncate();
1251 }
1252 break;
1253 }
1254 }
1255 }
1256 });
1257
1258 Some(shutdown_tx)
1259 } else {
1260 None
1261 };
1262
1263 let compaction_shutdown = if auto_compact {
1264 let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1265 let cold_clone = Arc::clone(&cold);
1266 let compact_interval = config.compact_interval_mins;
1267
1268 tokio::spawn(async move {
1269 let mut interval =
1270 tokio::time::interval(Duration::from_secs(compact_interval * 60));
1271 loop {
1272 tokio::select! {
1273 _ = interval.tick() => {
1274 if let Err(e) = cold_clone.compact() {
1275 eprintln!("Background compaction error: {}", e);
1276 }
1277 }
1278 _ = shutdown_rx.recv() => {
1279 let _ = cold_clone.compact();
1280 break;
1281 }
1282 }
1283 }
1284 });
1285
1286 Some(shutdown_tx)
1287 } else {
1288 None
1289 };
1290
1291 let workers = if config.workers_enabled {
1292 let workers_path = path.join("workers.db");
1293 let worker_config = crate::workers::WorkerConfig {
1294 storage_path: workers_path.to_string_lossy().into_owned(),
1295 concurrency: config.worker_threads,
1296 ..Default::default()
1297 };
1298 crate::workers::WorkerSystem::new(worker_config)
1299 .map(Arc::new)
1300 .ok()
1301 } else {
1302 None
1303 };
1304
1305 let sys_id_mapping = cold.open_tree("_sys_id_mapping")?;
1306
1307 let db = Self {
1308 hot,
1309 cold,
1310 primary_indices: Arc::new(DashMap::new()),
1311 secondary_indices: Arc::new(DashMap::new()),
1312 sys_id_mapping,
1313 mmap_index: Arc::new(RwLock::new(None)),
1314 index_manifest: Arc::new(DashMap::new()),
1315 _sid_dictionary: Arc::new(crossbeam_skiplist::SkipMap::new()),
1316 reverse_sid_dictionary: Arc::new(RwLock::new(Vec::new())),
1317 deleted_ids: Arc::new(RwLock::new(RoaringBitmap::new())),
1318 recyclable_ids: Arc::new(RwLock::new(RoaringBitmap::new())),
1319 next_internal_id: Arc::new(AtomicU32::new(0)),
1320 indices_initialized: Arc::new(OnceCell::new()),
1321 transaction_manager: crate::transaction::TransactionManager::new(),
1322 indices: Arc::new(DashMap::new()),
1323 schema_cache: Arc::new(DashMap::new()),
1324 config,
1325 write_buffer,
1326 pubsub,
1327 wal,
1328 checkpoint_shutdown,
1329 compaction_shutdown,
1330 wal_shutdown: None,
1331 workers,
1332 wal_writer,
1333 computed: Arc::new(RwLock::new(crate::computed::ComputedFields::new())),
1334 ast_cache: Arc::new(MokaCache::new(1000)),
1335 plan_cache: Arc::new(MokaCache::new(1000)),
1336 building_indices: Arc::new(DashMap::new()),
1337 wal_disabled: wal_disabled_flag,
1338 };
1339
1340 // 0. Load persisted ID mappings from Sled so that internal IDs are stable
1341 db.load_id_mapping_from_sled()?;
1342
1343 // 0.1 Load recyclable IDs
1344 db.load_recyclable_ids()?;
1345
1346 // 1. Load index checkpoint immediately from disk
1347 db.load_index_checkpoint()?;
1348
1349 // 2. Replay WAL entries on top of the checkpoint state
1350 if !wal_entries.is_empty() {
1351 db.replay_wal(wal_entries).await?;
1352 }
1353
1354 // 3. Mark indices as initialized so lazy calls don't re-run load_index_checkpoint
1355 let _ = db.indices_initialized.set(());
1356
1357 // 4. Trigger background rebuilds for indices missing from checkpoint
1358 // (Done after WAL replay so Sled is up to date)
1359 db.init_schema_indices().await;
1360
1361 // 5. Always rebuild primary index from cold storage after startup.
1362 // WAL replay only covers entries written since the last flush; documents
1363 // that were already checkpointed to cold storage (WAL truncated) must be
1364 // reloaded here so queries see all persisted data.
1365 db.rebuild_primary_index_from_cold()?;
1366
1367 Ok(db)
1368 }
1369 // Fast key-value operations with index support
1370 /// Get a value by key (low-level key-value access)
1371 ///
1372 /// This is the low-level method. For document access, use `get_document()` instead.
1373 /// Checks hot cache first, then falls back to cold storage for maximum performance.
1374 ///
1375 /// # Performance
1376 /// - Hot cache hit: ~1M reads/sec (instant)
1377 /// - Cold storage: ~500K reads/sec (disk I/O)
1378 /// - Cache hit rate: typically 95%+ at scale
1379 ///
1380 /// # Examples
1381 ///
1382 /// ```
1383 /// // Low-level key-value access
1384 /// let data = db.get("users:12345")?;
1385 /// if let Some(bytes) = data {
1386 /// let doc: Document = serde_json::from_slice(&bytes)?;
1387 /// println!("Found: {:?}", doc);
1388 /// }
1389 ///
1390 /// // Better: use get_document() for documents
1391 /// let user = db.get_document("users", "12345")?;
1392 /// ```ignore
1393 pub fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
1394 // If inside a transaction, check the buffer first.
1395 // Provides read-your-own-writes within the transaction while keeping
1396 // changes invisible to all other readers until commit.
1397 if let Ok(tx_id) = crate::transaction::ACTIVE_TRANSACTION_ID.try_with(|id| *id) {
1398 if let Some(buffer) = self.transaction_manager.active_transactions.get(&tx_id) {
1399 if buffer.deletes.contains_key(key) {
1400 return Ok(None);
1401 }
1402 if let Some(value) = buffer.read(key) {
1403 return Ok(Some(value));
1404 }
1405 // Not in buffer → fall through to storage (read-committed for
1406 // values not yet touched by this transaction)
1407 }
1408 }
1409
1410 // Check hot cache first
1411 if let Some(value) = self.hot.get(key) {
1412 // Check if this is a blob reference (pointer to cold storage)
1413 if value.starts_with(b"BLOBREF:") {
1414 // It's a blob ref - fetch actual data from cold storage
1415 return self.cold.get(key);
1416 }
1417 return Ok(Some(value));
1418 }
1419
1420 // Fetch from cold storage
1421 let value = self.cold.get(key)?;
1422
1423 if let Some(v) = &value {
1424 if self.should_cache_key(key) {
1425 self.hot
1426 .set(Arc::new(key.to_string()), Arc::new(v.clone()), None);
1427 }
1428 }
1429
1430 Ok(value)
1431 }
1432
1433 /// Get value with zero-copy Arc reference (10-100x faster than get!)
1434 /// Only checks hot cache - returns None if not cached
1435 pub fn get_hot_ref(&self, key: &str) -> Option<Arc<Vec<u8>>> {
1436 self.hot.get_ref(key)
1437 }
1438
1439 /// Helper to decide if a key should be cached.
1440 fn should_cache_key(&self, key: &str) -> bool {
1441 if let Some(collection_name) = key.split(':').next() {
1442 if !collection_name.starts_with('_') {
1443 if let Ok(collection_def) = self.get_collection_definition(collection_name) {
1444 if collection_def
1445 .fields
1446 .values()
1447 .any(|def| def.field_type == FieldType::Any)
1448 {
1449 return false; // Don't cache if collection has Any field
1450 }
1451 }
1452 }
1453 }
1454 true // Cache by default
1455 }
1456
1457 /// Retrieves metrics about hot cache performance.
1458 ///
1459 /// # Example
1460 /// ```rust
1461 /// # use aurora_db::Aurora;
1462 /// # async fn example(db: Aurora) {
1463 /// let stats = db.get_cache_stats();
1464 /// println!("Hit ratio: {:.2}%", stats.hit_rate * 100.0);
1465 /// # }
1466 /// ```
1467 pub fn get_cache_stats(&self) -> crate::storage::hot::CacheStats {
1468 self.hot.get_stats()
1469 }
1470
1471 pub fn has_index(&self, collection: &str, field: &str) -> bool {
1472 // _sid is the primary key — always an O(1) lookup, no secondary index needed.
1473 if field == "_sid" {
1474 return true;
1475 }
1476 // For all other fields (including auto-indexed "id"), only claim the index
1477 // exists if there is actual data in the hot map or cold manifest.
1478 // This prevents the indexed fast-path from firing on a post-crash empty index
1479 // and returning zero results instead of falling back to a full scan.
1480 let index_key = format!("{}:{}", collection, field);
1481 if self.secondary_indices.contains_key(&index_key) {
1482 return true;
1483 }
1484 let cold_prefix = format!("{}:", index_key);
1485 if self
1486 .index_manifest
1487 .iter()
1488 .any(|e| e.key().starts_with(&cold_prefix))
1489 {
1490 return true;
1491 }
1492 false
1493 }
1494
1495 pub fn get_ids_from_index(&self, collection: &str, field: &str, value: &Value) -> Vec<String> {
1496 let mut bitmap = RoaringBitmap::new();
1497 let ik = format!("{}:{}", collection, field);
1498 let vstr = match value {
1499 Value::String(s) => s.clone(),
1500 _ => value.to_string(),
1501 };
1502
1503 // 1. Check Cold Index (mmap)
1504 if let Some(loc) = self.index_manifest.get(&format!("{}:{}", ik, vstr)) {
1505 if let Ok(g) = self.mmap_index.read()
1506 && let Some(m) = g.as_ref()
1507 {
1508 if let Ok(cb) = RoaringBitmap::deserialize_from(&m[loc.0..(loc.0 + loc.1)]) {
1509 bitmap |= cb;
1510 }
1511 }
1512 }
1513
1514 // 2. Check Hot Index (RAM)
1515 if let Some(index_map) = self.secondary_indices.get(&ik) {
1516 if let Some(storage_arc) = index_map.get(&vstr) {
1517 if let Ok(storage) = storage_arc.value().read() {
1518 bitmap |= storage.to_bitmap();
1519 }
1520 }
1521 }
1522
1523 bitmap
1524 .iter()
1525 .filter_map(|id| self.get_external_id(id))
1526 .collect()
1527 }
1528
1529 /// Register a computed field definition
1530 pub async fn register_computed_field(
1531 &self,
1532 collection: &str,
1533 field: &str,
1534 expression: crate::computed::ComputedExpression,
1535 ) -> Result<()> {
1536 let mut computed = self.computed.write().unwrap();
1537 computed.register(collection, field, expression);
1538 Ok(())
1539 }
1540
1541 // ============================================
1542 // PubSub API - Real-time Change Notifications
1543 // ============================================
1544
1545 /// Listen for real-time changes in a collection
1546 ///
1547 /// Returns a stream of change events (inserts, updates, deletes) that you can subscribe to.
1548 /// Perfect for building reactive UIs, cache invalidation, audit logging, webhooks, and
1549 /// data synchronization systems.
1550 ///
1551 /// # Performance
1552 /// - Zero overhead when no listeners are active
1553 /// - Events are broadcast to all listeners asynchronously
1554 /// - Non-blocking - doesn't slow down write operations
1555 /// - Multiple listeners can watch the same collection
1556 ///
1557 /// # Examples
1558 ///
1559
1560 /// use aurora_db::{Aurora, types::Value};
1561 ///
1562 /// let db = Aurora::open("mydb.db")?;
1563 ///
1564 /// // Basic listener
1565 /// let mut listener = db.listen("users");
1566 ///
1567 /// tokio::spawn(async move {
1568 /// while let Ok(event) = listener.recv().await {
1569 /// match event.change_type {
1570 /// ChangeType::Insert => println!("New user: {:?}", event.document),
1571 /// ChangeType::Update => println!("Updated user: {:?}", event.document),
1572 /// ChangeType::Delete => println!("Deleted user ID: {}", event._sid),
1573 /// }
1574 /// }
1575 /// });
1576 ///
1577 /// // Now any insert/update/delete will trigger the listener
1578 /// db.insert_into("users", vec![("name", Value::String("Alice".into()))]).await?;
1579 /// ```ignore
1580 ///
1581 /// # Real-World Use Cases
1582 ///
1583 /// **Cache Invalidation:**
1584
1585 /// use std::sync::Arc;
1586 /// use tokio::sync::RwLock;
1587 /// use std::collections::HashMap;
1588 ///
1589 /// let cache = Arc::new(RwLock::new(HashMap::new()));
1590 /// let cache_clone = Arc::clone(&cache);
1591 ///
1592 /// let mut listener = db.listen("products");
1593 ///
1594 /// tokio::spawn(async move {
1595 /// while let Ok(event) = listener.recv().await {
1596 /// // Invalidate cache entry when product changes
1597 /// cache_clone.write().await.remove(&event._sid);
1598 /// println!("Cache invalidated for product: {}", event._sid);
1599 /// }
1600 /// });
1601 /// ```
1602 ///
1603 /// **Webhook Notifications:**
1604 /// ```ignore
1605 /// let mut listener = db.listen("orders");
1606 ///
1607 /// tokio::spawn(async move {
1608 /// while let Ok(event) = listener.recv().await {
1609 /// if event.change_type == ChangeType::Insert {
1610 /// // Send webhook for new orders
1611 /// send_webhook("https://api.example.com/webhooks/order", &event).await;
1612 /// }
1613 /// }
1614 /// });
1615 /// ```
1616 ///
1617 /// **Audit Logging:**
1618 /// ```ignore
1619 /// let mut listener = db.listen("sensitive_data");
1620 ///
1621 /// tokio::spawn(async move {
1622 /// while let Ok(event) = listener.recv().await {
1623 /// // Log all changes to audit trail
1624 /// db.insert_into("audit_log", vec![
1625 /// ("collection", Value::String("sensitive_data".into())),
1626 /// ("action", Value::String(format!("{:?}", event.change_type))),
1627 /// ("document_id", Value::String(event._sid.clone())),
1628 /// ("timestamp", Value::String(chrono::Utc::now().to_rfc3339())),
1629 /// ]).await?;
1630 /// }
1631 /// });
1632 /// ```
1633 ///
1634 /// **Data Synchronization:**
1635 /// ```ignore
1636 /// let mut listener = db.listen("users");
1637 ///
1638 /// tokio::spawn(async move {
1639 /// while let Ok(event) = listener.recv().await {
1640 /// // Sync changes to external system
1641 /// match event.change_type {
1642 /// ChangeType::Insert | ChangeType::Update => {
1643 /// if let Some(doc) = event.document {
1644 /// external_api.upsert_user(&doc).await?;
1645 /// }
1646 /// },
1647 /// ChangeType::Delete => {
1648 /// external_api.delete_user(&event._sid).await?;
1649 /// },
1650 /// }
1651 /// }
1652 /// });
1653 /// ```
1654 ///
1655 /// **Real-Time Notifications:**
1656 /// ```ignore
1657 /// let mut listener = db.listen("messages");
1658 ///
1659 /// tokio::spawn(async move {
1660 /// while let Ok(event) = listener.recv().await {
1661 /// if event.change_type == ChangeType::Insert {
1662 /// if let Some(msg) = event.document {
1663 /// // Push notification to connected websockets
1664 /// if let Some(recipient) = msg.data.get("recipient_id") {
1665 /// websocket_manager.send_to_user(recipient, &msg).await;
1666 /// }
1667 /// }
1668 /// }
1669 /// }
1670 /// });
1671 /// ```
1672 ///
1673 /// **Filtered Listener:**
1674 /// ```ignore
1675 /// use aurora_db::pubsub::EventFilter;
1676 ///
1677 /// // Only listen for inserts
1678 /// let mut listener = db.listen("users")
1679 /// .filter(EventFilter::ChangeType(ChangeType::Insert));
1680 ///
1681 /// // Only listen for documents with specific field value
1682 /// let mut listener = db.listen("users")
1683 /// .filter(EventFilter::FieldEquals("role".to_string(), Value::String("admin".into())));
1684 /// ```
1685 ///
1686 /// Listens for real-time changes in a specific collection.
1687 ///
1688 /// Returns a `ChangeListener` stream that yields events for inserts, updates, and deletes.
1689 ///
1690 /// # Example
1691 /// ```rust
1692 /// # use aurora_db::{Aurora, Value};
1693 /// # async fn example(db: Aurora) -> Result<(), Box<dyn std::error::Error>> {
1694 /// let mut listener = db.listen("users");
1695 ///
1696 /// tokio::spawn(async move {
1697 /// while let Ok(event) = listener.recv().await {
1698 /// println!("Change detected: {:?}", event);
1699 /// }
1700 /// });
1701 /// # Ok(())
1702 /// # }
1703 /// ```
1704 pub fn listen(&self, collection: impl Into<String>) -> crate::pubsub::ChangeListener {
1705 self.pubsub.listen(collection)
1706 }
1707
1708 /// Listens for all changes across all collections in the database.
1709 pub fn listen_all(&self) -> crate::pubsub::ChangeListener {
1710 self.pubsub.listen_all()
1711 }
1712
1713 /// Get the number of active listeners for a collection
1714 pub fn listener_count(&self, collection: &str) -> usize {
1715 self.pubsub.listener_count(collection)
1716 }
1717
1718 /// Get total number of active listeners
1719 pub fn total_listeners(&self) -> usize {
1720 self.pubsub.total_listeners()
1721 }
1722
1723 /// Flushes all buffered writes to disk to ensure durability.
1724 ///
1725 /// This method forces all pending writes from:
1726 /// - Write buffer (if enabled)
1727 /// - Cold storage internal buffers
1728 /// - Write-ahead log (if enabled)
1729 ///
1730 /// Call this when you need to ensure data persistence before
1731 /// a critical operation or shutdown. After flush() completes,
1732 /// all data is guaranteed to be on disk even if power fails.
1733 ///
1734 /// # Performance
1735 /// - Flush time: ~10-50ms depending on buffered data
1736 /// - Triggers OS-level fsync() for durability guarantee
1737 /// - Truncates WAL after successful flush
1738 /// - Not needed for every write (WAL provides durability)
1739 ///
1740 /// # Examples
1741 ///
1742 /// ```ignore
1743 /// use aurora_db::Aurora;
1744 ///
1745 /// let db = Aurora::open("mydb.db")?;
1746 ///
1747 /// // Basic flush after critical write
1748 /// db.insert_into("users", data).await?;
1749 /// db.flush()?; // Ensure data is persisted to disk
1750 ///
1751 /// // Graceful shutdown pattern
1752 /// fn shutdown(db: &Aurora) -> Result<()> {
1753 /// println!("Flushing pending writes...");
1754 /// db.flush()?;
1755 /// println!("Shutdown complete - all data persisted");
1756 /// Ok(())
1757 /// }
1758 ///
1759 /// // Periodic checkpoint pattern
1760 /// use std::time::Duration;
1761 /// use std::thread;
1762 ///
1763 /// let db = db.clone();
1764 /// thread::spawn(move || {
1765 /// loop {
1766 /// thread::sleep(Duration::from_secs(60));
1767 /// if let Err(e) = db.flush() {
1768 /// eprintln!("Flush error: {}", e);
1769 /// } else {
1770 /// println!("Checkpoint: data flushed to disk");
1771 /// }
1772 /// }
1773 /// });
1774 ///
1775 /// // Critical transaction pattern
1776 /// let tx_id = db.begin_transaction();
1777 ///
1778 /// // Multiple operations
1779 /// db.insert_into("orders", order_data).await?;
1780 /// db.update_document("inventory", product_id, updates).await?;
1781 /// db.insert_into("audit_log", audit_data).await?;
1782 ///
1783 /// // Commit and flush immediately
1784 /// db.commit_transaction(tx_id)?;
1785 /// db.flush()?; // Critical: ensure transaction is on disk
1786 ///
1787 /// // Backup preparation
1788 /// println!("Preparing backup...");
1789 /// db.flush()?; // Ensure all data is written
1790 /// std::fs::copy("mydb.db", "backup.db")?;
1791 /// println!("Backup complete");
1792 /// ```
1793 ///
1794 /// # When to Use
1795 /// - Before graceful shutdown
1796 /// - After critical transactions
1797 /// - Before creating backups
1798 /// - Periodic checkpoints (every 30-60 seconds)
1799 /// - Before risky operations
1800 ///
1801 /// # When NOT to Use
1802 /// - After every single write (too slow, WAL provides durability)
1803 /// - In high-throughput loops (batch instead)
1804 /// - When durability mode is already Immediate
1805 ///
1806 /// # Important Notes
1807 /// - WAL provides durability even without explicit flush()
1808 /// - flush() adds latency (~10-50ms) so use strategically
1809 /// - Automatic flush happens during graceful shutdown
1810 /// - After flush(), WAL is truncated (data is in main storage)
1811 ///
1812 /// # See Also
1813 /// Flushes all buffered writes to disk to ensure durability.
1814 ///
1815 /// This method forces all pending writes from the write buffer and cold storage
1816 /// internal buffers to the physical disk.
1817 ///
1818 /// # Performance
1819 /// Adding explicit flushes can significantly slow down write performance.
1820 /// Aurora typically manages flushes automatically via the WAL.
1821 pub fn flush(&self) -> Result<()> {
1822 // Flush write buffer if present
1823 if let Some(ref write_buffer) = self.write_buffer {
1824 write_buffer.flush()?;
1825 }
1826
1827 // Flush cold storage
1828 self.cold.flush()?;
1829
1830 // Truncate WAL after successful flush (data is now in cold storage)
1831 if let Some(ref wal) = self.wal
1832 && let Ok(mut wal_lock) = wal.write()
1833 {
1834 wal_lock.truncate()?;
1835 }
1836
1837 // Persist secondary indices so next boot loads from mmap instead of scanning
1838 self.save_index_checkpoint()?;
1839
1840 Ok(())
1841 }
1842
1843 /// Asynchronously flushes all buffered writes to disk.
1844 ///
1845 /// This is the async-friendly version of `flush()`.
1846 pub async fn sync(&self) -> Result<()> {
1847 self.flush()
1848 }
1849
1850 /// Store a key-value pair (low-level storage)
1851 ///
1852 /// This is the low-level method. For documents, use `insert_into()` instead.
1853 /// Writes are buffered and batched for performance.
1854 ///
1855 /// # Arguments
1856 /// * `key` - Unique key (format: "collection:id" for documents)
1857 /// * `value` - Raw bytes to store
1858 /// * `ttl` - Optional time-to-live (None = permanent)
1859 ///
1860 /// # Performance
1861 /// - Buffered writes: ~15-30K docs/sec
1862 /// - Batching improves throughput significantly
1863 /// - Call `flush()` to ensure data is persisted
1864 ///
1865 /// # Examples
1866 ///
1867 /// ```ignore
1868 /// use std::time::Duration;
1869 ///
1870 /// // Permanent storage
1871 /// let data = serde_json::to_vec(&my_struct)?;
1872 /// db.put("mykey".to_string(), data, None)?;
1873 ///
1874 /// // With TTL (expires after 1 hour)
1875 /// db.put("session:abc".to_string(), session_data, Some(Duration::from_secs(3600)))?;
1876 ///
1877 /// // Better: use insert_into() for documents
1878 /// db.insert_into("users", vec![("name", Value::String("Alice".into()))])?;
1879 /// ```
1880 pub async fn put(&self, key: String, value: Vec<u8>, ttl: Option<Duration>) -> Result<()> {
1881 const MAX_BLOB_SIZE: usize = 50 * 1024 * 1024;
1882
1883 if value.len() > MAX_BLOB_SIZE {
1884 return Err(AqlError::invalid_operation(format!(
1885 "Blob size {} exceeds maximum allowed size of {}MB",
1886 value.len() / (1024 * 1024),
1887 MAX_BLOB_SIZE / (1024 * 1024)
1888 )));
1889 }
1890
1891 // If inside a transaction scope, buffer the write instead of touching
1892 // storage. The buffer is flushed atomically on commit or discarded on
1893 // rollback — nothing lands in WAL/cold/hot until then.
1894 if let Ok(tx_id) = crate::transaction::ACTIVE_TRANSACTION_ID.try_with(|id| *id) {
1895 if let Some(buffer) = self.transaction_manager.active_transactions.get(&tx_id) {
1896 buffer.write(key, value);
1897 return Ok(());
1898 }
1899 }
1900
1901 // --- OPTIMIZATION: Wrap in Arc ONCE ---
1902 let key_arc = Arc::new(key);
1903 let value_arc = Arc::new(value);
1904
1905 // Check if this is a blob (blobs bypass write buffer and hot cache)
1906 let is_blob = value_arc.starts_with(b"BLOB:");
1907
1908 // --- 1. WAL Write (Non-blocking send of Arcs) ---
1909 if let Some(ref sender) = self.wal_writer
1910 && self.config.durability_mode != DurabilityMode::None
1911 && !self.wal_disabled.load(Ordering::Relaxed)
1912 {
1913 sender
1914 .send(WalOperation::Put {
1915 key: Arc::clone(&key_arc),
1916 value: Arc::clone(&value_arc),
1917 })
1918 .map_err(|_| {
1919 AqlError::new(
1920 ErrorCode::InternalError,
1921 "WAL writer channel closed".to_string(),
1922 )
1923 })?;
1924 }
1925
1926 // Check if this key should be cached (false for Any-field collections)
1927 let should_cache = self.should_cache_key(&key_arc);
1928
1929 // --- 2. Cold Store Write ---
1930 if is_blob || !should_cache {
1931 // Blobs and Any-field docs write directly to cold storage
1932 // (blobs: avoid memory pressure, Any-fields: ensure immediate queryability)
1933 self.cold.set(key_arc.to_string(), value_arc.to_vec())?;
1934 } else if let Some(ref write_buffer) = self.write_buffer {
1935 write_buffer.write(Arc::clone(&key_arc), Arc::clone(&value_arc))?;
1936 } else {
1937 self.cold.set(key_arc.to_string(), value_arc.to_vec())?;
1938 }
1939
1940 // --- 3. Hot Cache Write ---
1941 if should_cache {
1942 if is_blob {
1943 // For blobs: cache only a lightweight reference (not the actual data)
1944 // Format: "BLOBREF:<size>" - just 16-24 bytes instead of potentially MB
1945 let blob_ref = format!("BLOBREF:{}", value_arc.len());
1946 self.hot
1947 .set(Arc::clone(&key_arc), Arc::new(blob_ref.into_bytes()), ttl);
1948 } else {
1949 self.hot
1950 .set(Arc::clone(&key_arc), Arc::clone(&value_arc), ttl);
1951 }
1952 }
1953
1954 // --- 4. Indexing (skip for blobs and system keys) ---
1955 if !is_blob {
1956 if let Some(collection_name) = key_arc.split(':').next()
1957 && !collection_name.starts_with('_')
1958 {
1959 self.index_value(collection_name, &key_arc, &value_arc, None)?;
1960 }
1961 }
1962
1963 Ok(())
1964 }
1965 /// Rebuild primary index from cold storage.
1966 ///
1967 /// Scans all `_collection:<name>` keys to discover collections, then for
1968 /// each collection scans `<name>:<id>` keys and inserts them into the
1969 /// in-memory primary index. This is idempotent — entries already present
1970 /// (e.g. from WAL replay) are left unchanged.
1971 fn rebuild_primary_index_from_cold(&self) -> Result<()> {
1972 let collection_prefix = "_collection:";
1973 let collection_names: Vec<String> = self
1974 .cold
1975 .scan_prefix(collection_prefix)
1976 .filter_map(|r| r.ok())
1977 .map(|(key, _)| key.trim_start_matches(collection_prefix).to_string())
1978 .collect();
1979
1980 for collection_name in collection_names {
1981 let doc_prefix = format!("{}:", collection_name);
1982 for result in self.cold.scan_prefix(&doc_prefix) {
1983 if let Ok((key, value)) = result {
1984 let primary_index = self
1985 .primary_indices
1986 .entry(collection_name.clone())
1987 .or_default();
1988 primary_index
1989 .entry(key)
1990 .or_insert_with(|| DiskLocation::new(value.len()));
1991
1992 // Rebuild the _sid ↔ internal-u32 mapping so that bitmaps
1993 // loaded from the checkpoint can be translated back to real
1994 // external IDs via get_external_id(). Without this, every
1995 // u32 in a persisted bitmap maps to None after restart.
1996 if let Ok(doc) = self.deserialize_internal::<Document>(&value) {
1997 let _ = self.get_or_create_internal_id(&doc._sid);
1998 }
1999 }
2000 }
2001 }
2002
2003 Ok(())
2004 }
2005
2006 /// Replay WAL entries to recover from crash
2007 ///
2008 /// Handles transaction boundaries:
2009 /// - Operations within a committed transaction are applied
2010 /// - Operations within a rolled-back transaction are discarded
2011 /// - Operations within an uncommitted transaction (crash during tx) are discarded
2012 async fn replay_wal(&self, entries: Vec<crate::wal::LogEntry>) -> Result<()> {
2013 // Buffer for operations within a transaction
2014 let mut tx_buffer: Vec<crate::wal::LogEntry> = Vec::new();
2015 let mut in_transaction = false;
2016
2017 for entry in entries {
2018 match entry.operation {
2019 Operation::BeginTx => {
2020 // Start buffering operations
2021 in_transaction = true;
2022 tx_buffer.clear();
2023 }
2024 Operation::CommitTx => {
2025 // Apply all buffered operations
2026 for buffered_entry in tx_buffer.drain(..) {
2027 self.apply_wal_entry(buffered_entry).await?;
2028 }
2029 in_transaction = false;
2030 }
2031 Operation::RollbackTx => {
2032 // Discard buffered operations
2033 tx_buffer.clear();
2034 in_transaction = false;
2035 }
2036 Operation::Put | Operation::Delete => {
2037 if in_transaction {
2038 // Buffer the operation for later
2039 tx_buffer.push(entry);
2040 } else {
2041 // Apply immediately (not in transaction)
2042 self.apply_wal_entry(entry).await?;
2043 }
2044 }
2045 }
2046 }
2047
2048 // If we end with in_transaction = true, it means we crashed mid-transaction
2049 // Those operations in tx_buffer are discarded (not committed)
2050 if in_transaction {
2051 eprintln!(
2052 "WAL replay: Discarding {} uncommitted transaction operations",
2053 tx_buffer.len()
2054 );
2055 }
2056
2057 // Flush after replay and truncate WAL
2058 self.cold.flush()?;
2059 if let Some(ref wal) = self.wal {
2060 wal.write().unwrap().truncate()?;
2061 }
2062
2063 Ok(())
2064 }
2065
2066 /// Apply a single WAL entry to storage
2067 async fn apply_wal_entry(&self, entry: crate::wal::LogEntry) -> Result<()> {
2068 match entry.operation {
2069 Operation::Put => {
2070 if let Some(value) = entry.value {
2071 // Write directly to cold storage (skip WAL, already logged)
2072 self.cold.set(entry.key.clone(), value.clone())?;
2073
2074 // Update hot cache
2075 if self.should_cache_key(&entry.key) {
2076 self.hot
2077 .set(Arc::new(entry.key.clone()), Arc::new(value.clone()), None);
2078 }
2079
2080 // Rebuild indices
2081 if let Some(collection) = entry.key.split(':').next()
2082 && !collection.starts_with('_')
2083 {
2084 self.index_value(collection, &entry.key, &value, None)?;
2085 }
2086 }
2087 }
2088 Operation::Delete => {
2089 // Remove from indices before deleting the data
2090 if let Some(collection) = entry.key.split(':').next()
2091 && !collection.starts_with('_')
2092 {
2093 let id = entry.key.split(':').nth(1).unwrap_or("");
2094 if let Ok(Some(doc)) = self.get_document(collection, id) {
2095 let _ = self.remove_from_indices(collection, &doc);
2096 } else if let Some(index) = self.primary_indices.get_mut(collection) {
2097 index.remove(&entry.key);
2098 }
2099 }
2100 self.cold.delete(&entry.key)?;
2101 self.hot.delete(&entry.key);
2102 }
2103 _ => {} // Transaction markers handled in replay_wal
2104 }
2105 Ok(())
2106 }
2107
2108 #[cfg(test)]
2109 fn ensure_schema_hot(&self, collection: &str) -> Result<Arc<Collection>> {
2110 // 1. Check the high-performance object cache first (O(1))
2111 if let Some(schema) = self.schema_cache.get(collection) {
2112 return Ok(schema.value().clone());
2113 }
2114
2115 let collection_key = format!("_collection:{}", collection);
2116
2117 // 2. Fallback to Hot Byte Cache (parse if found)
2118 if let Some(data) = self.hot.get(&collection_key) {
2119 return serde_json::from_slice::<Collection>(&data)
2120 .map(|s| {
2121 let arc_s = Arc::new(s);
2122 // Populate object cache to avoid this parse next time
2123 self.schema_cache
2124 .insert(collection.to_string(), arc_s.clone());
2125 arc_s
2126 })
2127 .map_err(|_| {
2128 AqlError::new(
2129 ErrorCode::SchemaError,
2130 "Failed to parse schema from hot cache",
2131 )
2132 });
2133 }
2134
2135 // 3. Fallback to Cold Storage
2136 if let Some(data) = self.get(&collection_key)? {
2137 // Update Hot Cache
2138 self.hot.set(
2139 Arc::new(collection_key.clone()),
2140 Arc::new(data.clone()),
2141 None,
2142 );
2143
2144 // Parse and update Schema Cache
2145 let schema = serde_json::from_slice::<Collection>(&data)?;
2146 let arc_schema = Arc::new(schema);
2147 self.schema_cache
2148 .insert(collection.to_string(), arc_schema.clone());
2149
2150 return Ok(arc_schema);
2151 }
2152
2153 Err(AqlError::new(
2154 ErrorCode::SchemaError,
2155 format!("Failed to load schema for collection '{}'", collection),
2156 ))
2157 }
2158
2159 /// Smart collection scan that uses the primary index as a key directory
2160 /// Avoids forced flushes and leverages hot cache for better performance
2161 fn scan_collection(&self, collection: &str) -> Result<Vec<Document>> {
2162 // Use the primary index as a "key directory" - it contains all document keys
2163 if let Some(index) = self.primary_indices.get(collection) {
2164 // Pre-allocate based on index size to avoid reallocations
2165 let mut documents = Vec::with_capacity(index.len());
2166
2167 // Iterate through all keys in the primary index (fast, in-memory)
2168 for entry in index.iter() {
2169 let key = entry.key();
2170
2171 // Fetch document via hot cache -> cold storage fallback
2172 if let Some(data) = self.get(key)? {
2173 if let Ok(mut doc) = serde_json::from_slice::<Document>(&data) {
2174 // Apply computed fields
2175 if let Ok(computed) = self.computed.read() {
2176 let _ = computed.apply(collection, &mut doc);
2177 }
2178 documents.push(doc);
2179 }
2180 }
2181 }
2182 Ok(documents)
2183 } else {
2184 // Fallback: scan from cold storage if primary index not yet initialized
2185 let prefix = format!("{}:", collection);
2186 let mut documents = Vec::new();
2187 for result in self.cold.scan_prefix(&prefix) {
2188 if let Ok((_key, value)) = result {
2189 if let Ok(mut doc) = serde_json::from_slice::<Document>(&value) {
2190 // Apply computed fields
2191 if let Ok(computed) = self.computed.read() {
2192 let _ = computed.apply(collection, &mut doc);
2193 }
2194 documents.push(doc);
2195 }
2196 }
2197 }
2198 Ok(documents)
2199 }
2200 }
2201
2202 /// Scan collection with filter and early termination support
2203 /// Used by QueryBuilder for optimized queries with LIMIT
2204 pub fn scan_and_filter<F>(
2205 &self,
2206 collection: &str,
2207 filter_fn: F,
2208 limit: Option<usize>,
2209 ) -> Result<Vec<Document>>
2210 where
2211 F: Fn(&Document) -> bool,
2212 {
2213 let mut results = Vec::new();
2214 if let Some(index) = self.primary_indices.get(collection) {
2215 for entry in index.iter() {
2216 if let Some(data) = self.get(entry.key())? {
2217 if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
2218 if filter_fn(&doc) {
2219 results.push(doc);
2220 if let Some(l) = limit {
2221 if results.len() >= l {
2222 break;
2223 }
2224 }
2225 }
2226 }
2227 }
2228 }
2229 } else {
2230 // No primary index (e.g. system collections like _migrations): fall back to cold scan.
2231 // Flush the write buffer first so any recently buffered writes land in cold storage
2232 // and are visible to the prefix scan below.
2233 if let Some(ref wb) = self.write_buffer {
2234 let _ = wb.flush();
2235 }
2236 let prefix = format!("{}:", collection);
2237 for result in self.cold.scan_prefix(&prefix) {
2238 if let Ok((_key, value)) = result {
2239 if let Ok(doc) = serde_json::from_slice::<Document>(&value) {
2240 if filter_fn(&doc) {
2241 results.push(doc);
2242 if let Some(l) = limit {
2243 if results.len() >= l {
2244 break;
2245 }
2246 }
2247 }
2248 }
2249 }
2250 }
2251 }
2252 Ok(results)
2253 }
2254
2255 // Restore missing methods
2256 pub async fn put_blob(&self, key: String, file_path: &Path) -> Result<()> {
2257 const MAX_FILE_SIZE: usize = 50 * 1024 * 1024; // 50MB limit
2258
2259 // Get file metadata to check size before reading
2260 let metadata = tokio::fs::metadata(file_path).await?;
2261 let file_size = metadata.len() as usize;
2262
2263 if file_size > MAX_FILE_SIZE {
2264 return Err(AqlError::invalid_operation(format!(
2265 "File size {} MB exceeds maximum allowed size of {} MB",
2266 file_size / (1024 * 1024),
2267 MAX_FILE_SIZE / (1024 * 1024)
2268 )));
2269 }
2270
2271 let mut file = File::open(file_path).await?;
2272 let mut buffer = Vec::new();
2273 file.read_to_end(&mut buffer).await?;
2274
2275 // Add BLOB: prefix to mark this as blob data
2276 let mut blob_data = Vec::with_capacity(5 + buffer.len());
2277 blob_data.extend_from_slice(b"BLOB:");
2278 blob_data.extend_from_slice(&buffer);
2279
2280 self.put(key, blob_data, None).await
2281 }
2282
2283 /// Create a new collection with schema definition
2284 ///
2285 /// Collections are like tables in SQL - they define the structure of your documents.
2286 /// The third boolean parameter indicates if the field should be indexed for fast lookups.
2287 ///
2288 /// # Arguments
2289 /// * `name` - Collection name
2290 /// * `fields` - Vector of (field_name, field_type, indexed) tuples
2291 /// - Field name (accepts both &str and String)
2292 /// - Field type (String, Int, Float, Bool, etc.)
2293 /// - Indexed: true for fast lookups, false for no index
2294 ///
2295 /// # Performance
2296 /// - Indexed fields: Fast equality queries (O(1) lookup)
2297 /// - Non-indexed fields: Full scan required for queries
2298 /// - Unique fields are automatically indexed
2299 ///
2300 /// # Examples
2301 ///
2302 /// ```ignore
2303 /// use aurora_db::{Aurora, types::FieldType};
2304 ///
2305 /// let db = Aurora::open("mydb.db")?;
2306 ///
2307 /// // Create a users collection
2308 /// db.new_collection("users", vec![
2309 /// ("name", FieldType::Scalar(crate::types::ScalarType::String), false), // Not indexed
2310 /// ("email", FieldType::Scalar(crate::types::ScalarType::String), true), // Indexed - fast lookups
2311 /// ("age", FieldType::Scalar(crate::types::ScalarType::Int), false),
2312 /// ("active", FieldType::Scalar(crate::types::ScalarType::Bool), true), // Indexed
2313 /// ("score", FieldType::Float, false),
2314 /// ])?;
2315 ///
2316 /// // Idempotent - calling again is safe
2317 /// db.new_collection("users", vec![/* ... */])?.await; // OK!
2318 /// ```
2319 pub async fn new_collection<F: IntoFieldDefinition>(
2320 &self,
2321 name: &str,
2322 fields: Vec<F>,
2323 ) -> Result<()> {
2324 println!("DB: Creating collection: {}", name);
2325 let collection_key = format!("_collection:{}", name);
2326
2327 // Check if collection already exists - if so, just return Ok (idempotent)
2328 if self.get(&collection_key)?.is_some() {
2329 return Ok(());
2330 }
2331
2332 // Create field definitions
2333 let mut field_definitions = HashMap::new();
2334 for field in fields {
2335 let (field_name, field_def) = field.into_field_definition();
2336
2337 if field_def.field_type == FieldType::Any && field_def.unique {
2338 return Err(AqlError::new(
2339 ErrorCode::InvalidDefinition,
2340 "Fields of type 'Any' cannot be unique or indexed.".to_string(),
2341 ));
2342 }
2343
2344 field_definitions.insert(field_name, field_def);
2345 }
2346
2347 let collection = Collection {
2348 name: name.to_string(),
2349 fields: field_definitions,
2350 // REMOVED: unique_fields is now derived from fields
2351 };
2352
2353 let collection_data = serde_json::to_vec(&collection)?;
2354 self.put(collection_key, collection_data, None).await?;
2355
2356 // Invalidate schema cache since we just created/updated the collection schema
2357 self.schema_cache.remove(name);
2358
2359 Ok(())
2360 }
2361
2362 /// Insert a document into a collection
2363 ///
2364 /// Automatically generates a UUID for the document and validates against
2365 /// collection schema and unique constraints. Returns the generated document ID.
2366 ///
2367 /// # Performance
2368 /// - Single insert: ~15,000 docs/sec
2369 /// - Bulk insert: Use `batch_insert()` for 10+ documents (~50,000 docs/sec)
2370 /// - Triggers PubSub events for real-time listeners
2371 ///
2372 /// # Arguments
2373 /// * `collection` - Name of the collection to insert into
2374 /// * `data` - Document fields and values to insert
2375 ///
2376 /// # Returns
2377 /// The auto-generated ID of the inserted document or an error
2378 ///
2379 /// # Errors
2380 /// - `CollectionNotFound`: Collection doesn't exist
2381 /// - `ValidationError`: Data violates schema or unique constraints
2382 /// - `SerializationError`: Invalid data format
2383 ///
2384 /// # Examples
2385 ///
2386
2387 /// use aurora_db::{Aurora, types::Value};
2388 ///
2389 /// let db = Aurora::open("mydb.db")?;
2390 ///
2391 /// // Basic insertion
2392 /// let user_id = db.insert_into("users", vec![
2393 /// ("name", Value::String("Alice Smith".to_string())),
2394 /// ("email", Value::String("alice@example.com".to_string())),
2395 /// ("age", Value::Int(28)),
2396 /// ("active", Value::Bool(true)),
2397 /// ]).await?;
2398 ///
2399 /// println!("Created user with ID: {}", user_id);
2400 ///
2401 /// // Inserting with nested data
2402 /// let order_id = db.insert_into("orders", vec![
2403 /// ("user_id", Value::String(user_id.clone())),
2404 /// ("total", Value::Float(99.99)),
2405 /// ("status", Value::String("pending".to_string())),
2406 /// ("items", Value::Array(vec![
2407 /// Value::String("item-123".to_string()),
2408 /// Value::String("item-456".to_string()),
2409 /// ])),
2410 /// ]).await?;
2411 ///
2412 /// // Error handling - unique constraint violation
2413 /// match db.insert_into("users", vec![
2414 /// ("email", Value::String("alice@example.com".to_string())), // Duplicate!
2415 /// ("name", Value::String("Alice Clone".to_string())),
2416 /// ]).await {
2417 /// Ok(id) => println!("Inserted: {}", id),
2418 /// Err(e) => println!("Failed: {} (email already exists)", e),
2419 /// }
2420 ///
2421 /// // For bulk inserts (10+ documents), use batch_insert() instead
2422 /// let users = vec![
2423 /// HashMap::from([
2424 /// ("name".to_string(), Value::String("Bob".to_string())),
2425 /// ("email".to_string(), Value::String("bob@example.com".to_string())),
2426 /// ]),
2427 /// HashMap::from([
2428 /// ("name".to_string(), Value::String("Carol".to_string())),
2429 /// ("email".to_string(), Value::String("carol@example.com".to_string())),
2430 /// ]),
2431 /// // ... more documents
2432 /// ];
2433 /// let ids = db.batch_insert("users", users).await?; // 3x faster!
2434 /// println!("Inserted {} users", ids.len());
2435 /// ```ignore
2436 pub async fn insert_into(&self, collection: &str, data: Vec<(&str, Value)>) -> Result<String> {
2437 // Convert Vec<(&str, Value)> to HashMap<String, Value>
2438 let data_map: HashMap<String, Value> =
2439 data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
2440
2441 // Validate unique constraints before inserting
2442 self.validate_unique_constraints(collection, &data_map)
2443 .await?;
2444
2445 let doc_id = Uuid::now_v7().to_string();
2446 let document = Document {
2447 _sid: doc_id.clone(),
2448 data: data_map,
2449 };
2450
2451 self.put(
2452 format!("{}:{}", collection, doc_id),
2453 serde_json::to_vec(&document)?,
2454 None,
2455 )
2456 .await?;
2457
2458 // Publish insert event
2459 let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
2460 let _ = self.pubsub.publish(event);
2461
2462 Ok(doc_id)
2463 }
2464
2465 pub async fn insert_map(
2466 &self,
2467 collection: &str,
2468 data: HashMap<String, Value>,
2469 ) -> Result<String> {
2470 // Validate unique constraints before inserting
2471 self.validate_unique_constraints(collection, &data).await?;
2472
2473 let doc_id = Uuid::now_v7().to_string();
2474 let document = Document {
2475 _sid: doc_id.clone(),
2476 data,
2477 };
2478
2479 self.put(
2480 format!("{}:{}", collection, doc_id),
2481 serde_json::to_vec(&document)?,
2482 None,
2483 )
2484 .await?;
2485
2486 // Publish insert event
2487 let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
2488 let _ = self.pubsub.publish(event);
2489
2490 Ok(doc_id)
2491 }
2492
2493 /// Batch insert multiple documents with optimized write path
2494 ///
2495 /// Inserts multiple documents in a single optimized operation, bypassing
2496 /// the write buffer for better performance. Ideal for bulk data loading,
2497 /// migrations, or initial database seeding. 3x faster than individual inserts.
2498 ///
2499 /// # Performance
2500 /// - Insert speed: ~50,000 docs/sec (vs ~15,000 for single inserts)
2501 /// - Batch writes to WAL and storage
2502 /// - Validates all unique constraints
2503 /// - Use for 10+ documents minimum
2504 ///
2505 /// # Arguments
2506 /// * `collection` - Name of the collection to insert into
2507 /// * `documents` - Vector of document data as HashMaps
2508 ///
2509 /// # Returns
2510 /// Vector of auto-generated document IDs or an error
2511 ///
2512 /// # Examples
2513 ///
2514
2515 /// use aurora_db::{Aurora, types::Value};
2516 /// use std::collections::HashMap;
2517 ///
2518 /// let db = Aurora::open("mydb.db")?;
2519 ///
2520 /// // Bulk user import
2521 /// let users = vec![
2522 /// HashMap::from([
2523 /// ("name".to_string(), Value::String("Alice".into())),
2524 /// ("email".to_string(), Value::String("alice@example.com".into())),
2525 /// ("age".to_string(), Value::Int(28)),
2526 /// ]),
2527 /// HashMap::from([
2528 /// ("name".to_string(), Value::String("Bob".into())),
2529 /// ("email".to_string(), Value::String("bob@example.com".into())),
2530 /// ("age".to_string(), Value::Int(32)),
2531 /// ]),
2532 /// HashMap::from([
2533 /// ("name".to_string(), Value::String("Carol".into())),
2534 /// ("email".to_string(), Value::String("carol@example.com".into())),
2535 /// ("age".to_string(), Value::Int(25)),
2536 /// ]),
2537 /// ];
2538 ///
2539 /// let ids = db.batch_insert("users", users).await?;
2540 /// println!("Inserted {} users", ids.len());
2541 ///
2542 /// // Seeding test data
2543 /// let test_products: Vec<HashMap<String, Value>> = (0..1000)
2544 /// .map(|i| HashMap::from([
2545 /// ("sku".to_string(), Value::String(format!("PROD-{:04}", i))),
2546 /// ("price".to_string(), Value::Float(9.99 + i as f64)),
2547 /// ("stock".to_string(), Value::Int(100)),
2548 /// ]))
2549 /// .collect();
2550 ///
2551 /// let ids = db.batch_insert("products", test_products).await?;
2552 /// // Much faster than 1000 individual insert_into() calls!
2553 ///
2554 /// // Migration from CSV data
2555 /// let mut csv_reader = csv::Reader::from_path("data.csv")?;
2556 /// let mut batch = Vec::new();
2557 ///
2558 /// for result in csv_reader.records() {
2559 /// let record = result?;
2560 /// let doc = HashMap::from([
2561 /// ("field1".to_string(), Value::String(record[0].to_string())),
2562 /// ("field2".to_string(), Value::String(record[1].to_string())),
2563 /// ]);
2564 /// batch.push(doc);
2565 ///
2566 /// // Insert in batches of 1000
2567 /// if batch.len() >= 1000 {
2568 /// db.batch_insert("imported_data", batch.clone()).await?;
2569 /// batch.clear();
2570 /// }
2571 /// }
2572 ///
2573 /// // Insert remaining
2574 /// if !batch.is_empty() {
2575 /// db.batch_insert("imported_data", batch).await?;
2576 /// }
2577 /// ```
2578 ///
2579 /// # Errors
2580 /// - `ValidationError`: Unique constraint violation on any document
2581 /// - `CollectionNotFound`: Collection doesn't exist
2582 /// - `IoError`: Storage write failure
2583 ///
2584 /// # Important Notes
2585 /// - All inserts are atomic - if one fails, none are inserted
2586 /// - UUIDs are auto-generated for all documents
2587 /// - PubSub events are published for each insert
2588 /// - For 10+ documents, this is 3x faster than individual inserts
2589 /// - For < 10 documents, use `insert_into()` instead
2590 ///
2591 /// # See Also
2592 /// - `insert_into()` for single document inserts
2593 /// - `import_from_json()` for file-based bulk imports
2594 /// - `batch_write()` for low-level batch operations
2595 pub async fn batch_insert(
2596 &self,
2597 collection: &str,
2598 documents: Vec<HashMap<String, Value>>,
2599 ) -> Result<Vec<String>> {
2600 let mut doc_ids = Vec::with_capacity(documents.len());
2601 let mut pairs = Vec::with_capacity(documents.len());
2602 // Keep Document objects alive so pubsub doesn't need to re-read from cold storage
2603 let mut doc_objects = Vec::with_capacity(documents.len());
2604
2605 // Prepare all documents
2606 for data in documents {
2607 // Validate unique constraints
2608 self.validate_unique_constraints(collection, &data).await?;
2609
2610 let doc_id = Uuid::now_v7().to_string();
2611 let document = Document {
2612 _sid: doc_id.clone(),
2613 data,
2614 };
2615
2616 let key = format!("{}:{}", collection, doc_id);
2617 let value = serde_json::to_vec(&document)?;
2618
2619 pairs.push((key, value));
2620 doc_ids.push(doc_id);
2621 doc_objects.push(document);
2622 }
2623
2624 // Write to WAL in batch (if enabled)
2625 if let Some(ref wal) = self.wal
2626 && self.config.durability_mode != DurabilityMode::None
2627 {
2628 let mut wal_lock = wal.write().unwrap();
2629 for (key, value) in &pairs {
2630 wal_lock.append(Operation::Put, key, Some(value))?;
2631 }
2632 }
2633
2634 // Bypass write buffer - go directly to cold storage batch API
2635 self.cold.batch_set(pairs.clone())?;
2636
2637 // Note: Durability is handled by background checkpoint process
2638
2639 // Update hot cache and indices
2640 for (key, value) in pairs {
2641 if self.should_cache_key(&key) {
2642 self.hot
2643 .set(Arc::new(key.clone()), Arc::new(value.clone()), None);
2644 }
2645
2646 if let Some(collection_name) = key.split(':').next()
2647 && !collection_name.starts_with('_')
2648 {
2649 self.index_value(collection_name, &key, &value, None)?;
2650 }
2651 }
2652
2653 // Publish events — use already-in-memory Document objects, no cold storage re-read
2654 for (doc_id, doc) in doc_ids.iter().zip(doc_objects.into_iter()) {
2655 let event = crate::pubsub::ChangeEvent::insert(collection, doc_id, doc);
2656 let _ = self.pubsub.publish(event);
2657 }
2658
2659 Ok(doc_ids)
2660 }
2661
2662 /// Update a document by ID
2663 ///
2664 /// # Arguments
2665 /// * `collection` - Collection name
2666 /// * `doc_id` - Document ID to update
2667 /// * `data` - New field values to set
2668 ///
2669 /// # Returns
2670 /// Updates specific fields in an existing document.
2671 ///
2672 /// # Arguments
2673 /// * `collection` - The collection name.
2674 /// * `doc_id` - The ID of the document to update.
2675 /// * `updates` - a vector of field-value pairs to update.
2676 ///
2677 /// # Example
2678 /// ```rust
2679 /// # use aurora_db::{Aurora, Value};
2680 /// # async fn example(db: Aurora) -> Result<(), Box<dyn std::error::Error>> {
2681 /// db.update_document("users", "u123", vec![
2682 /// ("active", Value::Bool(false)),
2683 /// ]).await?;
2684 /// # Ok(())
2685 /// # }
2686 /// ```
2687 pub async fn update_document(
2688 &self,
2689 collection: &str,
2690 doc_id: &str,
2691 updates: Vec<(&str, Value)>,
2692 ) -> Result<()> {
2693 // Get existing document
2694 let mut document = self.get_document(collection, doc_id)?.ok_or_else(|| {
2695 AqlError::new(
2696 ErrorCode::NotFound,
2697 format!("Document not found: {}", doc_id),
2698 )
2699 })?;
2700
2701 // Store old document for event
2702 let old_document = document.clone();
2703
2704 // Apply updates
2705 for (field, value) in updates {
2706 document.data.insert(field.to_string(), value);
2707 }
2708
2709 // Validate unique constraints after update (excluding current document)
2710 self.validate_unique_constraints_excluding(collection, &document.data, doc_id)
2711 .await?;
2712
2713 // Save updated document
2714 self.put(
2715 format!("{}:{}", collection, doc_id),
2716 serde_json::to_vec(&document)?,
2717 None,
2718 )
2719 .await?;
2720
2721 // Remove stale index entries for modified fields
2722 let u = self.parse_external_id(doc_id);
2723 if let Some(internal_id) = self._sid_dictionary.get(&u).map(|e| *e.value()) {
2724 for (field, old_val) in &old_document.data {
2725 if let Some(new_val) = document.data.get(field) {
2726 if new_val == old_val {
2727 continue;
2728 }
2729 }
2730
2731 let index_key = format!("{}:{}", collection, field);
2732 if let Some(index_map) = self.secondary_indices.get(&index_key) {
2733 let val_str = match old_val {
2734 Value::String(s) => s.clone(),
2735 _ => old_val.to_string(),
2736 };
2737 if let Some(storage_arc) = index_map.get(&val_str) {
2738 if let Ok(mut storage) = storage_arc.value().write() {
2739 println!("REMOVING {} FROM INDEX {}", internal_id, index_key);
2740 storage.remove(internal_id);
2741 }
2742 } else {
2743 println!("STORAGE ARC NOT FOUND FOR {}", val_str);
2744 }
2745 } else {
2746 println!("INDEX MAP NOT FOUND FOR {}", index_key);
2747 }
2748 }
2749 } else {
2750 println!("INTERNAL ID NOT FOUND FOR {}", doc_id);
2751 }
2752
2753 // Publish update event
2754 let event =
2755 crate::pubsub::ChangeEvent::update(collection, doc_id, old_document, document.clone());
2756 let _ = self.pubsub.publish(event);
2757
2758 Ok(())
2759 }
2760
2761 /// Remove a single field key from a document's data (used by field-rename migrations).
2762 pub async fn drop_field_from_document(
2763 &self,
2764 collection: &str,
2765 doc_id: &str,
2766 field: &str,
2767 ) -> Result<()> {
2768 let key = format!("{}:{}", collection, doc_id);
2769 let existing = self.get(&key)?.ok_or_else(|| {
2770 AqlError::new(
2771 ErrorCode::NotFound,
2772 format!("Document {} not found", doc_id),
2773 )
2774 })?;
2775 let mut document: Document = serde_json::from_slice(&existing)?;
2776 document.data.remove(field);
2777 self.put(key, serde_json::to_vec(&document)?, None).await?;
2778 Ok(())
2779 }
2780
2781 /// Retrieves all documents from a collection.
2782 ///
2783 /// # Arguments
2784 /// * `collection` - The name of the collection.
2785 ///
2786 /// # Returns
2787 /// A vector containing all documents in the collection.
2788 ///
2789 /// # Performance
2790 /// For large collections, consider using `query().limit().collect()` instead to
2791 /// avoid loading too much data into memory at once.
2792 pub async fn get_all_collection(&self, collection: &str) -> Result<Vec<Document>> {
2793 self.ensure_indices_initialized().await?;
2794 self.scan_collection(collection)
2795 }
2796
2797 /// Searches for raw storage keys matching a pattern.
2798 ///
2799 /// Returns a list of (key, data_info) pairs. This is a low-level diagnostic tool.
2800 ///
2801 /// # Arguments
2802 /// * `pattern` - The string pattern to search for in keys.
2803 pub fn get_data_by_pattern(&self, pattern: &str) -> Result<Vec<(String, DataInfo)>> {
2804 let mut data = Vec::new();
2805
2806 // Scan from cold storage instead of primary index
2807 for result in self.cold.scan() {
2808 if let Ok((key, value)) = result {
2809 if key.contains(pattern) {
2810 let info = if value.starts_with(b"BLOB:") {
2811 DataInfo::Blob { size: value.len() }
2812 } else {
2813 DataInfo::Data {
2814 size: value.len(),
2815 preview: String::from_utf8_lossy(&value[..value.len().min(50)])
2816 .into_owned(),
2817 }
2818 };
2819
2820 data.push((key.clone(), info));
2821 }
2822 }
2823 }
2824
2825 Ok(data)
2826 }
2827
2828 /// Begin a transaction
2829 ///
2830 /// All operations after beginning a transaction will be part of the transaction
2831 /// until either commit_transaction() or rollback_transaction() is called.
2832 ///
2833 /// # Returns
2834 /// Success or an error (e.g., if a transaction is already in progress)
2835 ///
2836 /// # Examples
2837 ///
2838
2839 /// // Start a transaction for atomic operations
2840 /// db.begin_transaction()?;
2841 ///
2842 /// // Perform multiple operations
2843 /// db.insert_into("accounts", vec![("user_id", Value::String(user_id)), ("balance", Value::Float(100.0))])?;
2844 /// db.insert_into("audit_log", vec![("action", Value::String("account_created".to_string()))])?;
2845 ///
2846 /// // Commit all changes or roll back if there's an error
2847 /// if all_ok {
2848 /// db.commit_transaction()?;
2849 /// } else {
2850 /// db.rollback_transaction()?;
2851 /// }
2852 /// ```ignore
2853 /// Starts a new ACID-compliant transaction.
2854 ///
2855 /// Transactions allow you to group multiple operations together. Either all
2856 /// operations succeed and are committed, or none of them are applied.
2857 ///
2858 /// # Returns
2859 /// A unique `TransactionId` to identify this transaction in subsequent calls.
2860 ///
2861 /// # Example
2862 /// ```rust
2863 /// # use aurora_db::{Aurora, Value};
2864 /// # async fn example(db: Aurora) -> Result<(), Box<dyn std::error::Error>> {
2865 /// let tx_id = db.begin_transaction().await;
2866 ///
2867 /// // ... perform operations ...
2868 ///
2869 /// db.commit_transaction(tx_id).await?;
2870 /// # Ok(())
2871 /// # }
2872 /// ```
2873 pub async fn begin_transaction(&self) -> crate::transaction::TransactionId {
2874 let buffer = self.transaction_manager.begin();
2875 buffer._sid
2876 }
2877
2878 /// Commits a transaction, making all changes permanent.
2879 ///
2880 /// All operations within the transaction are atomically applied to the database.
2881 ///
2882 /// # Arguments
2883 /// * `tx_id` - The ID of the transaction to commit.
2884 pub async fn commit_transaction(&self, tx_id: crate::transaction::TransactionId) -> Result<()> {
2885 let buffer = self
2886 .transaction_manager
2887 .active_transactions
2888 .get(&tx_id)
2889 .ok_or_else(|| {
2890 AqlError::invalid_operation(
2891 "Transaction not found or already completed".to_string(),
2892 )
2893 })?;
2894
2895 for item in buffer.writes.iter() {
2896 let key = item.key();
2897 let value = item.value();
2898 // WAL write for crash-durability (skipped during transaction to
2899 // avoid writing uncommitted entries that could be replayed)
2900 if let Some(ref sender) = self.wal_writer
2901 && self.config.durability_mode != DurabilityMode::None
2902 {
2903 let _ = sender.send(WalOperation::Put {
2904 key: Arc::new(key.clone()),
2905 value: Arc::new(value.clone()),
2906 });
2907 }
2908 self.cold.set(key.clone(), value.clone())?;
2909 if self.should_cache_key(key) {
2910 self.hot
2911 .set(Arc::new(key.clone()), Arc::new(value.clone()), None);
2912 }
2913 if let Some(collection_name) = key.split(':').next()
2914 && !collection_name.starts_with('_')
2915 {
2916 self.index_value(collection_name, key, value, None)?;
2917 }
2918 }
2919
2920 for item in buffer.deletes.iter() {
2921 let key = item.key();
2922 if let Some((collection, id)) = key.split_once(':')
2923 && let Ok(Some(doc)) = self.get_document(collection, id)
2924 {
2925 self.remove_from_indices(collection, &doc)?;
2926 }
2927 self.cold.delete(key)?;
2928 self.hot.delete(key);
2929 }
2930
2931 // Drop the buffer reference to release the DashMap read lock
2932 // before calling commit which needs to remove the entry (write lock)
2933 drop(buffer);
2934
2935 self.transaction_manager.commit(tx_id)?;
2936
2937 self.cold.compact()?;
2938
2939 Ok(())
2940 }
2941
2942 /// Roll back a transaction, discarding all changes
2943 ///
2944 /// All operations within the transaction are discarded. The database state
2945 /// remains unchanged. Use this when an error occurs during transaction processing.
2946 ///
2947 /// # Arguments
2948 /// * `tx_id` - Transaction ID returned from begin_transaction()
2949 ///
2950 /// # Examples
2951 ///
2952
2953 /// use aurora_db::{Aurora, types::Value};
2954 ///
2955 /// let db = Aurora::open("mydb.db")?;
2956 ///
2957 /// // Attempt a transfer with validation
2958 /// let tx_id = db.begin_transaction();
2959 ///
2960 /// let result = async {
2961 /// // Deduct from Alice
2962 /// let alice = db.get_document("accounts", "alice").await?;
2963 /// let balance = alice.and_then(|doc| doc.data.get("balance"));
2964 ///
2965 /// if let Some(Value::Int(bal)) = balance {
2966 /// if *bal < 100 {
2967 /// return Err("Insufficient funds");
2968 /// }
2969 ///
2970 /// db.update_document("accounts", "alice", vec![
2971 /// ("balance", Value::Int(bal - 100)),
2972 /// ]).await?;
2973 ///
2974 /// db.update_document("accounts", "bob", vec![
2975 /// ("balance", Value::Int(600)),
2976 /// ]).await?;
2977 ///
2978 /// Ok(())
2979 /// } else {
2980 /// Err("Account not found")
2981 /// }
2982 /// }.await;
2983 ///
2984 /// match result {
2985 /// Ok(_) => {
2986 /// db.commit_transaction(tx_id)?;
2987 /// println!("Transfer completed");
2988 /// }
2989 /// Err(e) => {
2990 /// db.rollback_transaction(tx_id)?;
2991 /// println!("Transfer failed: {}, changes rolled back", e);
2992 /// }
2993 /// }
2994 /// ```
2995 /// Aborts a transaction and discards all pending changes.
2996 ///
2997 /// # Arguments
2998 /// * `tx_id` - The ID of the transaction to roll back.
2999 pub async fn rollback_transaction(
3000 &self,
3001 tx_id: crate::transaction::TransactionId,
3002 ) -> Result<()> {
3003 self.transaction_manager.rollback(tx_id)
3004 }
3005
3006 /// Create a secondary index on a field for faster queries
3007 ///
3008 /// Indexes dramatically improve query performance for frequently accessed fields,
3009 /// trading increased memory usage and slower writes for much faster reads.
3010 ///
3011 /// # When to Create Indexes
3012 /// - **Frequent queries**: Fields used in 80%+ of your queries
3013 /// - **High cardinality**: Fields with many unique values (user_id, email)
3014 /// - **Sorting/filtering**: Fields used in ORDER BY or WHERE clauses
3015 /// - **Large collections**: Most beneficial with 10,000+ documents
3016 ///
3017 /// # When NOT to Index
3018 /// - Low cardinality fields (e.g., boolean flags, small enums)
3019 /// - Rarely queried fields
3020 /// - Fields that change frequently (write-heavy workloads)
3021 /// - Small collections (<1,000 documents) - full scans are fast enough
3022 ///
3023 /// # Performance Characteristics
3024 /// - **Query speedup**: O(n) → O(1) for equality filters
3025 /// - **Memory cost**: ~100-200 bytes per document per index
3026 /// - **Write slowdown**: ~20-30% longer insert/update times
3027 /// - **Build time**: ~5,000 docs/sec for initial indexing
3028 /// Create a new collection with the given schema
3029 ///
3030 /// # Arguments
3031 /// * `name` - Collection name
3032 /// * `fields` - Field definitions as tuples of (name, type, unique)
3033 /// - The boolean indicates whether the field has a **unique constraint**
3034 /// - Unique fields are automatically indexed
3035 /// - Non-unique fields can be indexed separately using `create_index()`
3036 ///
3037 /// # Examples
3038 /// ```ignore
3039 /// # use aurora_db::{Aurora, types::FieldType};
3040 /// # async fn example(db: &Aurora) {
3041 /// db.new_collection("users", vec![
3042 /// .first_one()
3043 /// .await?;
3044 ///
3045 /// // DON'T index 'active' - low cardinality (only 2 values: true/false)
3046 /// // A full scan is fast enough for boolean fields
3047 ///
3048 /// // DO index 'age' if you frequently query age ranges
3049 /// db.create_index("users", "age").await?;
3050 ///
3051 /// let young_users = db.query("users")
3052 /// .filter(|f| f.lt("age", 30))
3053 /// .collect()
3054 /// .await?;
3055 /// ```
3056 ///
3057 /// # Real-World Example: E-commerce Orders
3058 ///
3059 /// ```ignore
3060 /// // Orders collection: 1 million documents
3061 /// db.new_collection("orders", vec![
3062 /// ("user_id", FieldType::Scalar(crate::types::ScalarType::String)), // High cardinality
3063 /// ("status", FieldType::Scalar(crate::types::ScalarType::String)), // Low cardinality (pending, shipped, delivered)
3064 /// ("created_at", FieldType::Scalar(crate::types::ScalarType::String)),
3065 /// ("total", FieldType::Float),
3066 /// ])?;
3067 ///
3068 /// // Index user_id - queries like "show me my orders" are common
3069 /// db.create_index("orders", "user_id").await?; // Good choice
3070 ///
3071 /// // Query speedup: 2.5s → 0.001s
3072 /// let my_orders = db.query("orders")
3073 /// .filter(|f| f.eq("user_id", user_id))
3074 /// .collect()
3075 /// .await?;
3076 ///
3077 /// // DON'T index 'status' - only 3 possible values
3078 /// // Scanning 1M docs takes ~100ms, indexing won't help much
3079 ///
3080 /// // Index created_at if you frequently query recent orders
3081 /// db.create_index("orders", "created_at").await?; // Good for time-based queries
3082 /// ```
3083 pub async fn create_index(&self, collection: &str, field: &str) -> Result<()> {
3084 let collection_def = self.get_collection_definition(collection)?;
3085
3086 if let Some(field_def) = collection_def.fields.get(field) {
3087 if field_def.field_type == FieldType::Any {
3088 return Err(AqlError::new(
3089 ErrorCode::InvalidDefinition,
3090 "Cannot create an index on a field of type 'Any'.".to_string(),
3091 ));
3092 }
3093 } else {
3094 return Err(AqlError::new(
3095 ErrorCode::InvalidDefinition,
3096 format!(
3097 "Field '{}' not found in collection '{}'.",
3098 field, collection
3099 ),
3100 ));
3101 }
3102
3103 // Generate a default index name
3104 let index_name = format!("idx_{}_{}", collection, field);
3105
3106 // Create index definition
3107 let definition = IndexDefinition {
3108 name: index_name.clone(),
3109 collection: collection.to_string(),
3110 fields: vec![field.to_string()],
3111 index_type: IndexType::BTree,
3112 unique: false,
3113 };
3114
3115 // Create the index
3116 let index = Index::new(definition.clone());
3117
3118 // Index all existing documents in the collection
3119 let prefix = format!("{}:", collection);
3120 for result in self.cold.scan_prefix(&prefix) {
3121 if let Ok((_, data)) = result
3122 && let Ok(doc) = serde_json::from_slice::<Document>(&data)
3123 {
3124 let _ = index.insert(&doc);
3125 }
3126 }
3127
3128 // Store the index
3129 self.indices.insert(index_name, index);
3130
3131 // Store the index definition for persistence
3132 let index_key = format!("_index:{}:{}", collection, field);
3133 self.put(index_key, serde_json::to_vec(&definition)?, None)
3134 .await?;
3135
3136 Ok(())
3137 }
3138
3139 /// Query documents in a collection with filtering, sorting, and pagination
3140 ///
3141 /// Returns a `QueryBuilder` that allows fluent chaining of query operations.
3142 /// Queries use early termination for LIMIT clauses, making them extremely fast
3143 /// even on large collections (6,800x faster than naive implementations).
3144 ///
3145 /// # Performance
3146 /// - With LIMIT: O(k) where k = limit + offset (early termination!)
3147 /// - Without LIMIT: O(n) where n = matching documents
3148 /// - Uses secondary indices when available for equality filters
3149 /// - Hot cache: ~1M reads/sec, Cold storage: ~500K reads/sec
3150 ///
3151 /// # Examples
3152 ///
3153 /// ```ignore
3154 /// use aurora_db::{Aurora, types::Value};
3155 ///
3156 /// let db = Aurora::open("mydb.db")?;
3157 ///
3158 /// // Simple equality query
3159 /// let active_users = db.query("users")
3160 /// .filter(|f| f.eq("active", Value::Bool(true)))
3161 /// .collect()
3162 /// .await?;
3163 ///
3164 /// // Range query with pagination (FAST - uses early termination!)
3165 /// let top_scorers = db.query("users")
3166 /// .filter(|f| f.gt("score", Value::Int(1000)))
3167 /// .order_by("score", false) // descending
3168 /// .limit(10)
3169 /// .offset(20)
3170 /// .collect()
3171 /// .await?;
3172 ///
3173 /// // Multiple filters
3174 /// let premium_active = db.query("users")
3175 /// .filter(|f| f.eq("tier", Value::String("premium".into())))
3176 /// .filter(|f| f.eq("active", Value::Bool(true)))
3177 /// .limit(100) // Only scans ~200 docs, not all million!
3178 /// .collect()
3179 /// .await?;
3180 ///
3181 /// // Text search in a field
3182 /// Starts a fluent query builder for the specified collection.
3183 ///
3184 /// # Example
3185 /// ```rust
3186 /// # use aurora_db::{Aurora, Value};
3187 /// # async fn example(db: Aurora) -> Result<(), Box<dyn std::error::Error>> {
3188 /// let users = db.query("users")
3189 /// .filter(|f| f.eq("active", true))
3190 /// .collect()
3191 /// .await?;
3192 /// # Ok(())
3193 /// # }
3194 /// ```
3195 pub fn query<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
3196 QueryBuilder::new(self, collection)
3197 }
3198
3199 /// Starts a full-text search builder for the specified collection.
3200 ///
3201 /// # Example
3202 /// ```rust
3203 /// # use aurora_db::{Aurora, Value};
3204 /// # async fn example(db: Aurora) -> Result<(), Box<dyn std::error::Error>> {
3205 /// let results = db.search("products")
3206 /// .query("wireless headphones")
3207 /// .fuzzy(1)
3208 /// .collect()
3209 /// .await?;
3210 /// # Ok(())
3211 /// # }
3212 /// ```
3213 pub fn search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
3214 SearchBuilder::new(self, collection)
3215 }
3216
3217 /// Retrieve a document by ID
3218 ///
3219 /// Fast direct lookup when you know the document ID. Significantly faster
3220 /// than querying with filters when ID is known.
3221 ///
3222 /// # Performance
3223 /// - Hot cache: ~1,000,000 reads/sec (instant)
3224 /// - Cold storage: ~500,000 reads/sec (disk I/O)
3225 /// - Complexity: O(1) - constant time lookup
3226 /// - Much faster than `.query().filter(|f| f.eq("id", ...))` which is O(n)
3227 ///
3228 /// # Arguments
3229 /// * `collection` - Name of the collection to query
3230 /// * `id` - ID of the document to retrieve
3231 ///
3232 /// # Returns
3233 /// The document if found, None if not found, or an error
3234 ///
3235 /// # Examples
3236 ///
3237
3238 /// use aurora_db::{Aurora, types::Value};
3239 ///
3240 /// let db = Aurora::open("mydb.db")?;
3241 ///
3242 /// // Basic retrieval
3243 /// if let Some(user) = db.get_document("users", &user_id)? {
3244 /// println!("Found user: {}", user._sid);
3245 ///
3246 /// // Access fields safely
3247 /// if let Some(Value::String(name)) = user.data.get("name") {
3248 /// println!("Name: {}", name);
3249 /// }
3250 ///
3251 /// if let Some(Value::Int(age)) = user.data.get("age") {
3252 /// println!("Age: {}", age);
3253 /// }
3254 /// } else {
3255 /// println!("User not found");
3256 /// }
3257 ///
3258 /// // Idiomatic error handling
3259 /// let user = db.get_document("users", &user_id)?
3260 /// .ok_or_else(|| AqlError::new(ErrorCode::NotFound,"User not found".into()))?;
3261 ///
3262 /// // Checking existence before operations
3263 /// if db.get_document("users", &user_id)?.is_some() {
3264 /// db.update_document("users", &user_id, vec![
3265 /// ("last_login", Value::String(chrono::Utc::now().to_rfc3339())),
3266 /// ]).await?;
3267 /// }
3268 ///
3269 /// // Batch retrieval (fetch multiple by ID)
3270 /// let user_ids = vec!["user-1", "user-2", "user-3"];
3271 /// let users: Vec<Document> = user_ids.iter()
3272 /// .filter_map(|id| db.get_document("users", id).ok().flatten())
3273 /// .collect();
3274 ///
3275 /// println!("Found {} out of {} users", users.len(), user_ids.len());
3276 /// ```ignore
3277 ///
3278 /// # When to Use
3279 /// - You know the document ID (from insert, previous query, or URL param)
3280 /// - Need fastest possible lookup (1M reads/sec)
3281 /// - Fetching a single document
3282 ///
3283 /// Retrieves a single document by its ID.
3284 ///
3285 /// # Arguments
3286 /// * `collection` - The collection name.
3287 /// * `sid` - The document system ID.
3288 ///
3289 /// # Returns
3290 /// `Ok(Some(Document))` if found, `Ok(None)` if not found, or an error.
3291 ///
3292 /// # Example
3293 /// ```rust
3294 /// # use aurora_db::{Aurora, Value};
3295 /// # async fn example(db: Aurora) -> Result<(), Box<dyn std::error::Error>> {
3296 /// if let Some(user) = db.get_document("users", "u123")? {
3297 /// println!("Found user: {:?}", user);
3298 /// }
3299 /// # Ok(())
3300 /// # }
3301 /// ```
3302 pub fn get_document(&self, collection: &str, sid: &str) -> Result<Option<Document>> {
3303 let key = format!("{}:{}", collection, sid);
3304 if let Some(data) = self.get(&key)? {
3305 Ok(Some(serde_json::from_slice(&data)?))
3306 } else {
3307 Ok(None)
3308 }
3309 }
3310
3311 /// Deletes a document by its full internal key (format: "collection:id").
3312 ///
3313 /// This is a low-level deletion method. It removes the document from storage,
3314 /// clears it from the hot cache, and updates all associated indices.
3315 ///
3316 /// # Arguments
3317 /// * `key` - The full key of the document (e.g., `"users:u123"`).
3318 pub async fn delete(&self, key: &str) -> Result<()> {
3319 // Extract collection and id from key (format: "collection:id")
3320 let (collection, id) = if let Some((coll, doc_id)) = key.split_once(':') {
3321 (coll, doc_id)
3322 } else {
3323 return Err(AqlError::invalid_operation(
3324 "Invalid key format, expected 'collection:id'".to_string(),
3325 ));
3326 };
3327
3328 // CRITICAL FIX: Get document BEFORE deletion to clean up secondary indices
3329 let document = self.get_document(collection, id)?;
3330
3331 // Delete from hot cache
3332 if self.hot.get(key).is_some() {
3333 self.hot.delete(key);
3334 }
3335
3336 // Delete from cold storage
3337 self.cold.delete(key)?;
3338
3339 // CRITICAL FIX: Clean up ALL indices (primary + secondary)
3340 if let Some(doc) = document {
3341 self.remove_from_indices(collection, &doc)?;
3342 } else {
3343 // Fallback: at least remove from primary index using the full sled key.
3344 if let Some(index) = self.primary_indices.get_mut(collection) {
3345 index.remove(key);
3346 }
3347 }
3348
3349 // Publish delete event
3350 let event = crate::pubsub::ChangeEvent::delete(collection, id);
3351 let _ = self.pubsub.publish(event);
3352
3353 Ok(())
3354 }
3355
3356 /// Deletes an entire collection and all documents within it.
3357 ///
3358 /// This operation permanently removes all documents in the collection and
3359 /// cleans up all associated primary and secondary indices.
3360 pub async fn delete_collection(&self, collection: &str) -> Result<()> {
3361 let prefix = format!("{}:", collection);
3362
3363 // Get all keys in collection
3364 let keys: Vec<String> = self
3365 .cold
3366 .scan()
3367 .filter_map(|r| r.ok())
3368 .filter(|(k, _)| k.starts_with(&prefix))
3369 .map(|(k, _)| k)
3370 .collect();
3371
3372 // Delete each key
3373 for key in keys {
3374 self.delete(&key).await?;
3375 }
3376
3377 // Remove collection indices
3378 self.primary_indices.remove(collection);
3379 self.secondary_indices
3380 .retain(|k, _| !k.starts_with(&prefix));
3381
3382 // Invalidate schema cache
3383 self.schema_cache.remove(collection);
3384
3385 Ok(())
3386 }
3387
3388 fn remove_from_indices(&self, collection: &str, doc: &Document) -> Result<()> {
3389 // Remove from primary index — key format is "collection:sid" (the full sled key).
3390 if let Some(index) = self.primary_indices.get(collection) {
3391 index.remove(&format!("{}:{}", collection, doc._sid));
3392 }
3393
3394 let u = self.parse_external_id(&doc._sid);
3395 let internal_id = self._sid_dictionary.get(&u).map(|e| *e.value());
3396
3397 // Remove from secondary indices
3398 for (field, value) in &doc.data {
3399 let index_key = format!("{}:{}", collection, field);
3400 if let Some(index_map) = self.secondary_indices.get(&index_key) {
3401 let val_str = match value {
3402 Value::String(s) => s.clone(),
3403 _ => value.to_string(),
3404 };
3405 if let Some(storage_arc) = index_map.get(&val_str) {
3406 if let Ok(mut storage) = storage_arc.value().write() {
3407 if let Some(id) = internal_id {
3408 storage.remove(id);
3409 }
3410 }
3411 }
3412 }
3413 }
3414
3415 // Return the internal ID to the recycling pool so it can be reused
3416 if let Some(id) = internal_id {
3417 // Remove mapping from dictionaries and Sled
3418 self._sid_dictionary.remove(&u);
3419 if let Ok(mut reverse) = self.reverse_sid_dictionary.write() {
3420 if let Some(r) = reverse.get_mut(id as usize) {
3421 *r = 0; // Clear the reverse mapping
3422 }
3423 }
3424 let _ = self.sys_id_mapping.remove(u.to_be_bytes());
3425
3426 if let Ok(mut deleted) = self.deleted_ids.write() {
3427 deleted.insert(id);
3428 }
3429 }
3430
3431 Ok(())
3432 }
3433
3434 pub async fn search_text(
3435 &self,
3436 collection: &str,
3437 field: &str,
3438 query: &str,
3439 ) -> Result<Vec<Document>> {
3440 let mut results = Vec::new();
3441 let docs = self.get_all_collection(collection).await?;
3442
3443 for doc in docs {
3444 if let Some(Value::String(text)) = doc.data.get(field)
3445 && text.to_lowercase().contains(&query.to_lowercase())
3446 {
3447 results.push(doc);
3448 }
3449 }
3450
3451 Ok(results)
3452 }
3453
3454 /// Export a collection to a JSON file
3455 ///
3456 /// Creates a JSON file containing all documents in the collection.
3457 /// Useful for backups, data migration, or sharing datasets.
3458 /// Automatically appends `.json` extension if not present.
3459 ///
3460 /// # Performance
3461 /// - Export speed: ~10,000 docs/sec
3462 /// - Scans entire collection from cold storage
3463 /// - Memory efficient: streams documents to file
3464 ///
3465 /// # Arguments
3466 /// * `collection` - Name of the collection to export
3467 /// * `output_path` - Path to the output JSON file (`.json` auto-appended)
3468 ///
3469 /// # Returns
3470 /// Success or an error
3471 ///
3472 /// # Examples
3473 ///
3474 /// ```
3475 /// use aurora_db::Aurora;
3476 ///
3477 /// let db = Aurora::open("mydb.db")?;
3478 ///
3479 /// // Basic export
3480 /// db.export_as_json("users", "./backups/users_2024-01-15")?;
3481 /// // Creates: ./backups/users_2024-01-15.json
3482 ///
3483 /// // Timestamped backup
3484 /// let timestamp = chrono::Utc::now().format("%Y%m%d_%H%M%S");
3485 /// let backup_path = format!("./backups/users_{}", timestamp);
3486 /// db.export_as_json("users", &backup_path)?;
3487 ///
3488 /// // Export multiple collections
3489 /// for collection in &["users", "orders", "products"] {
3490 /// db.export_as_json(collection, &format!("./export/{}", collection))?;
3491 /// }
3492 /// ```ignore
3493 ///
3494 /// # Output Format
3495 ///
3496 /// The exported JSON has this structure:
3497 /// ```json
3498 /// {
3499 /// "users": [
3500 /// { "id": "123", "name": "Alice", "email": "alice@example.com" },
3501 /// { "id": "456", "name": "Bob", "email": "bob@example.com" }
3502 /// ]
3503 /// }
3504 /// ```
3505 ///
3506 /// # See Also
3507 /// - `export_as_csv()` for CSV format export
3508 /// - `import_from_json()` to restore exported data
3509 pub fn export_as_json(&self, collection: &str, output_path: &str) -> Result<()> {
3510 use std::io::{BufWriter, Write};
3511
3512 let output_path = if !output_path.ends_with(".json") {
3513 format!("{}.json", output_path)
3514 } else {
3515 output_path.to_string()
3516 };
3517
3518 // Flush write buffer so in-flight docs reach cold storage before we scan.
3519 self.flush()?;
3520
3521 let file = StdFile::create(&output_path)?;
3522 let mut writer = BufWriter::new(file);
3523
3524 // Stream as a JSON object: { "<collection>": [ ... ] }
3525 // Each document is serialised individually — O(1) memory regardless of
3526 // dataset size.
3527 write!(writer, "{{\"{}\": [\n", collection)?;
3528
3529 let prefix = format!("{}:", collection);
3530 let mut first = true;
3531
3532 for result in self.cold.scan_prefix(&prefix) {
3533 let (key, value) = result?;
3534 if key.starts_with("_collection:") {
3535 continue;
3536 }
3537 let Ok(doc) = serde_json::from_slice::<Document>(&value) else {
3538 continue;
3539 };
3540
3541 // Recursively convert every Value to a JsonValue — no silent drops.
3542 let mut obj = serde_json::Map::new();
3543 // Always include the document id field
3544 obj.insert("_id".to_string(), JsonValue::String(doc._sid));
3545 for (k, v) in doc.data {
3546 obj.insert(k, Self::value_to_json(v));
3547 }
3548
3549 if !first {
3550 write!(writer, ",\n")?;
3551 }
3552 serde_json::to_writer(&mut writer, &JsonValue::Object(obj))?;
3553 first = false;
3554 }
3555
3556 write!(writer, "\n]}}")?;
3557 writer.flush()?;
3558 println!("Exported collection '{}' to {}", collection, &output_path);
3559 Ok(())
3560 }
3561
3562 /// Recursively convert an Aurora `Value` to a `serde_json::Value` with no
3563 /// data loss — nested objects and arrays are handled at every depth.
3564 fn value_to_json(v: Value) -> JsonValue {
3565 match v {
3566 Value::String(s) => JsonValue::String(s),
3567 Value::Int(i) => JsonValue::Number(i.into()),
3568 Value::Float(f) => serde_json::Number::from_f64(f)
3569 .map(JsonValue::Number)
3570 .unwrap_or(JsonValue::Null),
3571 Value::Bool(b) => JsonValue::Bool(b),
3572 Value::Null => JsonValue::Null,
3573 Value::Uuid(u) => JsonValue::String(u.to_string()),
3574 Value::DateTime(dt) => JsonValue::String(dt.to_rfc3339()),
3575 Value::Array(arr) => {
3576 JsonValue::Array(arr.into_iter().map(Self::value_to_json).collect())
3577 }
3578 Value::Object(map) => {
3579 let mut obj = serde_json::Map::new();
3580 for (k, v) in map {
3581 obj.insert(k, Self::value_to_json(v));
3582 }
3583 JsonValue::Object(obj)
3584 }
3585 }
3586 }
3587
3588 /// Export a collection to a CSV file
3589 ///
3590 /// Creates a CSV file with headers from the first document and rows for each document.
3591 /// Useful for spreadsheet analysis, data science workflows, or reporting.
3592 /// Automatically appends `.csv` extension if not present.
3593 ///
3594 /// # Performance
3595 /// - Export speed: ~8,000 docs/sec
3596 /// - Memory efficient: streams rows to file
3597 /// - Headers determined from first document
3598 ///
3599 /// # Arguments
3600 /// * `collection` - Name of the collection to export
3601 /// * `filename` - Path to the output CSV file (`.csv` auto-appended)
3602 ///
3603 /// # Returns
3604 /// Success or an error
3605 ///
3606 /// # Examples
3607 ///
3608 /// ```ignore
3609 /// use aurora_db::Aurora;
3610 ///
3611 /// let db = Aurora::open("mydb.db")?;
3612 ///
3613 /// // Basic CSV export
3614 /// db.export_as_csv("users", "./reports/users")?;
3615 /// // Creates: ./reports/users.csv
3616 ///
3617 /// // Export for analysis in Excel/Google Sheets
3618 /// db.export_as_csv("orders", "./analytics/sales_data")?;
3619 ///
3620 /// // Monthly report generation
3621 /// let month = chrono::Utc::now().format("%Y-%m");
3622 /// db.export_as_csv("transactions", &format!("./reports/transactions_{}", month))?;
3623 /// ```
3624 ///
3625 /// # Output Format
3626 ///
3627 /// ```csv
3628 /// id,name,email,age
3629 /// 123,Alice,alice@example.com,28
3630 /// 456,Bob,bob@example.com,32
3631 /// ```ignore
3632 ///
3633 /// # Important Notes
3634 /// - Headers are taken from the first document's fields
3635 /// - Documents with different fields will have empty values for missing fields
3636 /// - Nested objects/arrays are converted to strings
3637 /// - Best for flat document structures
3638 ///
3639 /// # See Also
3640 /// - `export_as_json()` for JSON format (better for nested data)
3641 /// - For complex nested structures, use JSON export instead
3642 pub fn export_as_csv(&self, collection: &str, filename: &str) -> Result<()> {
3643 let output_path = if !filename.ends_with(".csv") {
3644 format!("{}.csv", filename)
3645 } else {
3646 filename.to_string()
3647 };
3648
3649 // Flush write buffer so in-flight docs are visible in cold storage.
3650 self.flush()?;
3651
3652 // Derive the canonical column order from the schema definition so that
3653 // every row has the same columns even when some docs are missing optional
3654 // fields. Fall back to an empty list; first-doc heuristic fills it in
3655 // that case (schema-less collections).
3656 let mut headers: Vec<String> = match self.get_collection_definition(collection) {
3657 Ok(coll) => {
3658 // _id first, then all schema fields in definition order
3659 let mut cols = vec!["_id".to_string()];
3660 cols.extend(coll.fields.into_keys());
3661 cols
3662 }
3663 Err(_) => vec!["_id".to_string()],
3664 };
3665
3666 let mut writer = csv::Writer::from_path(&output_path)?;
3667 let mut headers_written = false;
3668
3669 let prefix = format!("{}:", collection);
3670
3671 for result in self.cold.scan_prefix(&prefix) {
3672 let (key, value) = result?;
3673 if key.starts_with("_collection:") {
3674 continue;
3675 }
3676 let Ok(doc) = serde_json::from_slice::<Document>(&value) else {
3677 continue;
3678 };
3679
3680 // If we only have the _id column (schema-less path), discover
3681 // headers from the first document we encounter.
3682 if !headers_written && headers.len() == 1 {
3683 for k in doc.data.keys() {
3684 if !headers.contains(k) {
3685 headers.push(k.clone());
3686 }
3687 }
3688 }
3689
3690 if !headers_written {
3691 writer.write_record(&headers)?;
3692 headers_written = true;
3693 }
3694
3695 let row: Vec<String> = headers
3696 .iter()
3697 .map(|col| {
3698 if col == "_id" {
3699 return doc._sid.clone();
3700 }
3701 match doc.data.get(col) {
3702 None => String::new(),
3703 Some(Value::String(s)) => s.clone(),
3704 Some(Value::Int(i)) => i.to_string(),
3705 Some(Value::Float(f)) => f.to_string(),
3706 Some(Value::Bool(b)) => b.to_string(),
3707 Some(Value::Null) => String::new(),
3708 Some(Value::Uuid(u)) => u.to_string(),
3709 Some(Value::DateTime(dt)) => dt.to_rfc3339(),
3710 // Nested types: JSON-encode for lossless round-trip.
3711 // Consumers (pandas, DuckDB, etc.) can parse these back.
3712 Some(v) => serde_json::to_string(&Self::value_to_json(v.clone()))
3713 .unwrap_or_default(),
3714 }
3715 })
3716 .collect();
3717
3718 writer.write_record(&row)?;
3719 }
3720
3721 writer.flush()?;
3722 println!("Exported collection '{}' to {}", collection, &output_path);
3723 Ok(())
3724 }
3725
3726 /// Helper method to create filter-based queries. Alias for `query()`.
3727 pub fn find<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
3728 self.query(collection)
3729 }
3730
3731 // Convenience methods that build on top of the FilterBuilder
3732
3733 /// Finds a single document by its ID.
3734 pub async fn find_by_id(&self, collection: &str, id: &str) -> Result<Option<Document>> {
3735 self.query(collection)
3736 .filter(|f| f.eq("id", id))
3737 .first_one()
3738 .await
3739 }
3740
3741 /// Finds a single document matching the given filter.
3742 pub async fn find_one<F>(&self, collection: &str, filter_fn: F) -> Result<Option<Document>>
3743 where
3744 F: FnOnce(&FilterBuilder) -> Filter + Send + Sync + 'static,
3745 {
3746 self.query(collection).filter(filter_fn).first_one().await
3747 }
3748
3749 /// Finds all documents where a specific field matches a value.
3750 pub async fn find_by_field<T: Into<Value> + Clone + Send + Sync + 'static>(
3751 &self,
3752 collection: &str,
3753 field: &'static str,
3754 value: T,
3755 ) -> Result<Vec<Document>> {
3756 let value_clone = value.clone();
3757 self.query(collection)
3758 .filter(move |f: &FilterBuilder| f.eq(field, value_clone.clone()))
3759 .collect()
3760 .await
3761 }
3762
3763 /// Finds documents matching multiple field-value pairs (equality).
3764 pub async fn find_by_fields(
3765 &self,
3766 collection: &str,
3767 fields: Vec<(&str, Value)>,
3768 ) -> Result<Vec<Document>> {
3769 let mut query = self.query(collection);
3770
3771 for (field, value) in fields {
3772 let field_owned = field.to_owned();
3773 let value_owned = value.clone();
3774 query = query.filter(move |f| f.eq(&field_owned, value_owned.clone()));
3775 }
3776
3777 query.collect().await
3778 }
3779
3780 /// Finds documents where a field value is within a specified range (inclusive).
3781 pub async fn find_in_range<T: Into<Value> + Clone + Send + Sync + 'static>(
3782 &self,
3783 collection: &str,
3784 field: &'static str,
3785 min: T,
3786 max: T,
3787 ) -> Result<Vec<Document>> {
3788 self.query(collection)
3789 .filter(move |f| f.between(field, min.clone(), max.clone()))
3790 .collect()
3791 .await
3792 }
3793
3794 /// Complex query example: build with multiple combined filters.
3795 pub async fn find_complex<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
3796 self.query(collection)
3797 }
3798
3799 /// Create a full-text search query with added filter options.
3800 pub fn advanced_search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
3801 self.search(collection)
3802 }
3803
3804 // Utility methods for common operations
3805 pub async fn upsert(
3806 &self,
3807 collection: &str,
3808 id: &str,
3809 data: Vec<(&str, Value)>,
3810 ) -> Result<String> {
3811 // Convert Vec<(&str, Value)> to HashMap<String, Value>
3812 let data_map: HashMap<String, Value> =
3813 data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
3814
3815 // Check if document exists
3816 if let Some(mut doc) = self.get_document(collection, id)? {
3817 // Clone for notification
3818 let old_doc = doc.clone();
3819
3820 // Update existing document - merge new data
3821 for (key, value) in data_map {
3822 doc.data.insert(key, value);
3823 }
3824
3825 // Validate unique constraints for the updated document
3826 // We need to exclude the current document from the uniqueness check
3827 self.validate_unique_constraints_excluding(collection, &doc.data, id)
3828 .await?;
3829
3830 self.put(
3831 format!("{}:{}", collection, id),
3832 serde_json::to_vec(&doc)?,
3833 None,
3834 )
3835 .await?;
3836
3837 // Publish update event
3838 let event = crate::pubsub::ChangeEvent::update(collection, id, old_doc, doc);
3839 let _ = self.pubsub.publish(event);
3840
3841 Ok(id.to_string())
3842 } else {
3843 // Insert new document with specified ID - validate unique constraints
3844 self.validate_unique_constraints(collection, &data_map)
3845 .await?;
3846
3847 let document = Document {
3848 _sid: id.to_string(),
3849 data: data_map,
3850 };
3851
3852 self.put(
3853 format!("{}:{}", collection, id),
3854 serde_json::to_vec(&document)?,
3855 None,
3856 )
3857 .await?;
3858
3859 // Publish insert event
3860 let event = crate::pubsub::ChangeEvent::insert(collection, id, document);
3861 let _ = self.pubsub.publish(event);
3862
3863 Ok(id.to_string())
3864 }
3865 }
3866
3867 // Atomic increment/decrement
3868 pub async fn increment(
3869 &self,
3870 collection: &str,
3871 id: &str,
3872 field: &str,
3873 amount: i64,
3874 ) -> Result<i64> {
3875 if let Some(mut doc) = self.get_document(collection, id)? {
3876 // Get current value
3877 let current = match doc.data.get(field) {
3878 Some(Value::Int(i)) => *i,
3879 _ => 0,
3880 };
3881
3882 // Increment
3883 let new_value = current + amount;
3884 doc.data.insert(field.to_string(), Value::Int(new_value));
3885
3886 // Save changes
3887 self.put(
3888 format!("{}:{}", collection, id),
3889 serde_json::to_vec(&doc)?,
3890 None,
3891 )
3892 .await?;
3893
3894 Ok(new_value)
3895 } else {
3896 Err(AqlError::new(
3897 ErrorCode::NotFound,
3898 format!("Document {}:{} not found", collection, id),
3899 ))
3900 }
3901 }
3902
3903 // Delete documents by query
3904 pub async fn delete_by_query<F>(&self, collection: &str, filter_fn: F) -> Result<usize>
3905 where
3906 F: FnOnce(&FilterBuilder) -> Filter + Send + Sync + 'static,
3907 {
3908 let docs = self.query(collection).filter(filter_fn).collect().await?;
3909 let mut deleted_count = 0;
3910
3911 for doc in docs {
3912 let key = format!("{}:{}", collection, doc._sid);
3913 self.delete(&key).await?;
3914 deleted_count += 1;
3915 }
3916
3917 Ok(deleted_count)
3918 }
3919
3920 /// Import documents from a JSON file into a collection
3921 ///
3922 /// Validates each document against the collection schema, skips duplicates (by ID),
3923 /// and provides detailed statistics about the import operation. Useful for restoring
3924 /// backups, migrating data, or seeding development databases.
3925 ///
3926 /// # Performance
3927 /// - Import speed: ~5,000 docs/sec (with validation)
3928 /// - Memory efficient: processes documents one at a time
3929 /// - Validates schema and unique constraints
3930 ///
3931 /// # Arguments
3932 /// * `collection` - Name of the collection to import into
3933 /// * `filename` - Path to the JSON file containing documents (array format)
3934 ///
3935 /// # Returns
3936 /// `ImportStats` containing counts of imported, skipped, and failed documents
3937 ///
3938 /// # Examples
3939 ///
3940 /// ```
3941 /// use aurora_db::Aurora;
3942 ///
3943 /// let db = Aurora::open("mydb.db")?;
3944 ///
3945 /// // Basic import
3946 /// let stats = db.import_from_json("users", "./data/new_users.json").await?;
3947 /// println!("Imported: {}, Skipped: {}, Failed: {}",
3948 /// stats.imported, stats.skipped, stats.failed);
3949 ///
3950 /// // Restore from backup
3951 /// let backup_file = "./backups/users_2024-01-15.json";
3952 /// let stats = db.import_from_json("users", backup_file).await?;
3953 ///
3954 /// if stats.failed > 0 {
3955 /// eprintln!("Warning: {} documents failed validation", stats.failed);
3956 /// }
3957 ///
3958 /// // Idempotent import - duplicates are skipped
3959 /// let stats = db.import_from_json("users", "./data/users.json").await?;
3960 /// // Running again will skip all existing documents
3961 /// let stats2 = db.import_from_json("users", "./data/users.json").await?;
3962 /// assert_eq!(stats2.skipped, stats.imported);
3963 ///
3964 /// // Migration from another system
3965 /// db.new_collection("products", vec![
3966 /// ("sku", FieldType::Scalar(crate::types::ScalarType::String)),
3967 /// ("name", FieldType::Scalar(crate::types::ScalarType::String)),
3968 /// ("price", FieldType::Float),
3969 /// ])?;
3970 ///
3971 /// let stats = db.import_from_json("products", "./migration/products.json").await?;
3972 /// println!("Migration complete: {} products imported", stats.imported);
3973 /// ```ignore
3974 ///
3975 /// # Expected JSON Format
3976 ///
3977 /// The JSON file should contain an array of document objects:
3978 /// ```json
3979 /// [
3980 /// { "id": "123", "name": "Alice", "email": "alice@example.com" },
3981 /// { "id": "456", "name": "Bob", "email": "bob@example.com" },
3982 /// { "name": "Carol", "email": "carol@example.com" }
3983 /// ]
3984 /// ```
3985 ///
3986 /// # Behavior
3987 /// - Documents with existing IDs are skipped (duplicate detection)
3988 /// - Documents without IDs get auto-generated UUIDs
3989 /// - Schema validation is performed on all fields
3990 /// - Failed documents are counted but don't stop the import
3991 /// - Unique constraints are checked
3992 ///
3993 /// # See Also
3994 /// - `export_as_json()` to create compatible backup files
3995 /// - `batch_insert()` for programmatic bulk inserts
3996 pub async fn import_from_json(&self, collection: &str, filename: &str) -> Result<ImportStats> {
3997 // Validate that the collection exists
3998 let collection_def = self.get_collection_definition(collection)?;
3999
4000 // Load JSON file
4001 let json_string = read_to_string(filename).await.map_err(|e| {
4002 AqlError::new(
4003 ErrorCode::IoError,
4004 format!("Failed to read import file: {}", e),
4005 )
4006 })?;
4007
4008 // Parse JSON
4009 let documents: Vec<JsonValue> = from_str(&json_string).map_err(|e| {
4010 AqlError::new(
4011 ErrorCode::SerializationError,
4012 format!("Failed to parse JSON: {}", e),
4013 )
4014 })?;
4015
4016 let mut stats = ImportStats::default();
4017
4018 // Process each document
4019 for doc_json in documents {
4020 match self
4021 .import_document(collection, &collection_def, doc_json)
4022 .await
4023 {
4024 Ok(ImportResult::Imported) => stats.imported += 1,
4025 Ok(ImportResult::Skipped) => stats.skipped += 1,
4026 Err(_) => stats.failed += 1,
4027 }
4028 }
4029
4030 Ok(stats)
4031 }
4032
4033 /// Import a single document, performing schema validation and duplicate checking
4034 async fn import_document(
4035 &self,
4036 collection: &str,
4037 collection_def: &Collection,
4038 doc_json: JsonValue,
4039 ) -> Result<ImportResult> {
4040 if !doc_json.is_object() {
4041 return Err(AqlError::invalid_operation(
4042 "Expected JSON object".to_string(),
4043 ));
4044 }
4045
4046 // Extract document ID if present
4047 let doc_id = doc_json
4048 .get("id")
4049 .and_then(|id| id.as_str())
4050 .map(|s| s.to_string())
4051 .unwrap_or_else(|| Uuid::now_v7().to_string());
4052
4053 // Check if document with this ID already exists
4054 if self.get_document(collection, &doc_id)?.is_some() {
4055 return Ok(ImportResult::Skipped);
4056 }
4057
4058 // Convert JSON to our document format and validate against schema
4059 let mut data_map = HashMap::new();
4060
4061 if let Some(obj) = doc_json.as_object() {
4062 for (field_name, field_def) in &collection_def.fields {
4063 if let Some(json_value) = obj.get(field_name) {
4064 // Validate value against field type
4065 if !self.validate_field_value(json_value, &field_def.field_type) {
4066 return Err(AqlError::invalid_operation(format!(
4067 "Field '{}' has invalid type",
4068 field_name
4069 )));
4070 }
4071
4072 // Convert JSON value to our Value type
4073 let value = self.json_to_value(json_value)?;
4074 data_map.insert(field_name.clone(), value);
4075 } else if field_def.unique {
4076 // Missing required unique field
4077 return Err(AqlError::invalid_operation(format!(
4078 "Missing required unique field '{}'",
4079 field_name
4080 )));
4081 }
4082 }
4083 }
4084
4085 // Check for duplicates by unique fields
4086 let unique_fields = self.get_unique_fields(collection_def);
4087 for unique_field in &unique_fields {
4088 if let Some(value) = data_map.get(unique_field) {
4089 // Query for existing documents with this unique value
4090 let query_results = self
4091 .query(collection)
4092 .filter(move |f| f.eq(unique_field, value.clone()))
4093 .limit(1)
4094 .collect()
4095 .await?;
4096
4097 if !query_results.is_empty() {
4098 // Found duplicate by unique field
4099 return Ok(ImportResult::Skipped);
4100 }
4101 }
4102 }
4103
4104 // Create and insert document
4105 let document = Document {
4106 _sid: doc_id,
4107 data: data_map,
4108 };
4109
4110 self.put(
4111 format!("{}:{}", collection, document._sid),
4112 serde_json::to_vec(&document)?,
4113 None,
4114 )
4115 .await?;
4116
4117 Ok(ImportResult::Imported)
4118 }
4119
4120 /// Validate that a JSON value matches the expected field type
4121 fn validate_field_value(&self, value: &JsonValue, field_type: &FieldType) -> bool {
4122 use crate::types::ScalarType;
4123 match field_type {
4124 FieldType::Scalar(ScalarType::String) => value.is_string(),
4125 FieldType::Scalar(ScalarType::Int) => value.is_i64() || value.is_u64(),
4126 FieldType::Scalar(ScalarType::Float) => value.is_number(),
4127 FieldType::Scalar(ScalarType::Bool) => value.is_boolean(),
4128 FieldType::Scalar(ScalarType::Uuid) => {
4129 value.is_string() && Uuid::parse_str(value.as_str().unwrap_or("")).is_ok()
4130 }
4131 FieldType::Scalar(ScalarType::Any) => true,
4132
4133 FieldType::Array(_) => value.is_array(),
4134 FieldType::Object | FieldType::Nested(_) => value.is_object(),
4135 // Legacy/Duplicate catches just in case
4136 _ => true,
4137 }
4138 }
4139
4140 /// Convert a JSON value to our internal Value type
4141 #[allow(clippy::only_used_in_recursion)]
4142 fn json_to_value(&self, json_value: &JsonValue) -> Result<Value> {
4143 match json_value {
4144 JsonValue::Null => Ok(Value::Null),
4145 JsonValue::Bool(b) => Ok(Value::Bool(*b)),
4146 JsonValue::Number(n) => {
4147 if let Some(i) = n.as_i64() {
4148 Ok(Value::Int(i))
4149 } else if let Some(f) = n.as_f64() {
4150 Ok(Value::Float(f))
4151 } else {
4152 Err(AqlError::invalid_operation(
4153 "Invalid number value".to_string(),
4154 ))
4155 }
4156 }
4157 JsonValue::String(s) => {
4158 // Try parsing as UUID first
4159 if let Ok(uuid) = Uuid::parse_str(s) {
4160 Ok(Value::Uuid(uuid))
4161 } else {
4162 Ok(Value::String(s.clone()))
4163 }
4164 }
4165 JsonValue::Array(arr) => {
4166 let mut values = Vec::new();
4167 for item in arr {
4168 values.push(self.json_to_value(item)?);
4169 }
4170 Ok(Value::Array(values))
4171 }
4172 JsonValue::Object(obj) => {
4173 let mut map = HashMap::new();
4174 for (k, v) in obj {
4175 map.insert(k.clone(), self.json_to_value(v)?);
4176 }
4177 Ok(Value::Object(map))
4178 }
4179 }
4180 }
4181
4182 /// Get collection definition
4183 pub fn get_collection_definition(&self, collection: &str) -> Result<Collection> {
4184 println!("DB: Getting definition for: {}", collection);
4185 if let Some(data) = self.get(&format!("_collection:{}", collection))? {
4186 let collection_def: Collection = serde_json::from_slice(&data)?;
4187 Ok(collection_def)
4188 } else {
4189 Err(AqlError::new(
4190 ErrorCode::CollectionNotFound,
4191 collection.to_string(),
4192 ))
4193 }
4194 }
4195
4196 /// Returns the names of all registered collections in the database.
4197 pub fn list_collection_names(&self) -> Vec<String> {
4198 self.schema_cache.iter().map(|r| r.key().clone()).collect()
4199 }
4200
4201 /// Retrieves detailed statistics and information about the database storage and performance.
4202 ///
4203 /// # Returns
4204 /// A `DatabaseStats` struct containing hot cache, cold storage, and per-collection metrics.
4205 pub fn get_database_stats(&self) -> Result<DatabaseStats> {
4206 let hot_stats = self.hot.get_stats();
4207 let cold_stats = self.cold.get_stats()?;
4208
4209 Ok(DatabaseStats {
4210 hot_stats,
4211 cold_stats,
4212 estimated_size: self.cold.estimated_size(),
4213 collections: self.get_collection_stats()?,
4214 })
4215 }
4216
4217 /// Checks if a specific key is currently residing in the hot cache.
4218 ///
4219 /// # Arguments
4220 /// * `key` - The full key to check.
4221 pub fn is_in_hot_cache(&self, key: &str) -> bool {
4222 self.hot.is_hot(key)
4223 }
4224
4225 /// Clears all entries from the hot cache.
4226 ///
4227 /// This is useful for manual memory management or when testing performance
4228 /// from a cold start.
4229 pub fn clear_hot_cache(&self) {
4230 self.hot.clear();
4231 println!(
4232 "Hot cache cleared, current hit ratio: {:.2}%",
4233 self.hot.hit_ratio() * 100.0
4234 );
4235 }
4236
4237 /// Prewarm the cache by loading frequently accessed data from cold storage
4238 ///
4239 /// Loads documents from a collection into memory cache to eliminate cold-start
4240 /// latency. Dramatically improves initial query performance after database startup
4241 /// by preloading the most commonly accessed data.
4242 ///
4243 /// # Performance Impact
4244 /// - Prewarming speed: ~20,000 docs/sec
4245 /// - Improves subsequent read latency from ~2ms (disk) to ~0.001ms (memory)
4246 /// - Cache hit rate jumps from 0% to 95%+ for prewarmed data
4247 /// - Memory cost: ~500 bytes per document average
4248 ///
4249 /// # Arguments
4250 /// * `collection` - The collection to prewarm
4251 /// * `limit` - Maximum number of documents to load (default: 1000, None = all)
4252 ///
4253 /// # Returns
4254 /// Number of documents loaded into cache
4255 ///
4256 /// # Examples
4257 ///
4258 /// ```ignore
4259 /// use aurora_db::Aurora;
4260 ///
4261 /// let db = Aurora::open("mydb.db")?;
4262 ///
4263 /// // Prewarm frequently accessed collection
4264 /// let loaded = db.prewarm_cache("users", Some(1000)).await?;
4265 /// println!("Prewarmed {} user documents", loaded);
4266 ///
4267 /// // Now queries are fast from the start
4268 /// let stats_before = db.get_cache_stats();
4269 /// let users = db.query("users").collect().await?;
4270 /// let stats_after = db.get_cache_stats();
4271 ///
4272 /// // High hit rate thanks to prewarming
4273 /// assert!(stats_after.hit_rate > 0.95);
4274 ///
4275 /// // Startup optimization pattern
4276 /// async fn startup_prewarm(db: &Aurora) -> Result<()> {
4277 /// println!("Prewarming caches...");
4278 ///
4279 /// // Prewarm most frequently accessed collections
4280 /// db.prewarm_cache("users", Some(5000)).await?;
4281 /// db.prewarm_cache("sessions", Some(1000)).await?;
4282 /// db.prewarm_cache("products", Some(500)).await?;
4283 ///
4284 /// let stats = db.get_cache_stats();
4285 /// println!("Cache prewarmed: {} entries loaded", stats.size);
4286 ///
4287 /// Ok(())
4288 /// }
4289 ///
4290 /// // Web server startup
4291 /// #[tokio::main]
4292 /// async fn main() {
4293 /// let db = Aurora::open("app.db").unwrap();
4294 ///
4295 /// // Prewarm before accepting requests
4296 /// db.prewarm_cache("users", Some(10000)).await.unwrap();
4297 ///
4298 /// // Server is now ready with hot cache
4299 /// start_web_server(db).await;
4300 /// }
4301 ///
4302 /// // Prewarm all documents (for small collections)
4303 /// let all_loaded = db.prewarm_cache("config", None).await?;
4304 /// // All config documents now in memory
4305 ///
4306 /// // Selective prewarming based on access patterns
4307 /// async fn smart_prewarm(db: &Aurora) -> Result<()> {
4308 /// // Load recent users (they're accessed most)
4309 /// db.prewarm_cache("users", Some(1000)).await?;
4310 ///
4311 /// // Load active sessions only
4312 /// let active_sessions = db.query("sessions")
4313 /// .filter(|f| f.eq("active", Value::Bool(true)))
4314 /// .limit(500)
4315 /// .collect()
4316 /// .await?;
4317 ///
4318 /// // Manually populate cache with hot data
4319 /// for session in active_sessions {
4320 /// // Reading automatically caches
4321 /// db.get_document("sessions", &session._sid)?;
4322 /// }
4323 ///
4324 /// Ok(())
4325 /// }
4326 /// ```
4327 ///
4328 /// # Typical Prewarming Scenarios
4329 ///
4330 /// **Web Application Startup:**
4331 /// ```ignore
4332 /// // Load user data, sessions, and active content
4333 /// db.prewarm_cache("users", Some(5000)).await?;
4334 /// db.prewarm_cache("sessions", Some(2000)).await?;
4335 /// db.prewarm_cache("posts", Some(1000)).await?;
4336 /// ```
4337 ///
4338 /// **E-commerce Site:**
4339 /// ```ignore
4340 /// // Load products, categories, and user carts
4341 /// db.prewarm_cache("products", Some(500)).await?;
4342 /// db.prewarm_cache("categories", None).await?; // All categories
4343 /// db.prewarm_cache("active_carts", Some(1000)).await?;
4344 /// ```
4345 ///
4346 /// **API Server:**
4347 /// ```ignore
4348 /// // Load authentication data and rate limits
4349 /// db.prewarm_cache("api_keys", None).await?;
4350 /// db.prewarm_cache("rate_limits", Some(10000)).await?;
4351 /// ```
4352 ///
4353 /// # When to Use
4354 /// - At application startup to eliminate cold-start latency
4355 /// - After cache clear operations
4356 /// - Before high-traffic events (product launches, etc.)
4357 /// - When deploying new instances (load balancer warm-up)
4358 ///
4359 /// # Memory Considerations
4360 /// - 1,000 docs ≈ 500 KB memory
4361 /// - 10,000 docs ≈ 5 MB memory
4362 /// - 100,000 docs ≈ 50 MB memory
4363 /// - Stay within configured cache capacity
4364 ///
4365 /// # See Also
4366 /// - `get_cache_stats()` to monitor cache effectiveness
4367 /// - `prewarm_all_collections()` to prewarm all collections
4368 /// - `Aurora::with_config()` to adjust cache capacity
4369 pub async fn prewarm_cache(&self, collection: &str, limit: Option<usize>) -> Result<usize> {
4370 let limit = limit.unwrap_or(1000);
4371 let prefix = format!("{}:", collection);
4372 let mut loaded = 0;
4373
4374 for entry in self.cold.scan_prefix(&prefix) {
4375 if loaded >= limit {
4376 break;
4377 }
4378
4379 if let Ok((key, value)) = entry {
4380 // Load into hot cache
4381 self.hot.set(Arc::new(key.clone()), Arc::new(value), None);
4382 loaded += 1;
4383 }
4384 }
4385
4386 println!("Prewarmed {} with {} documents", collection, loaded);
4387 Ok(loaded)
4388 }
4389
4390 /// Prewarm cache for all collections
4391 pub async fn prewarm_all_collections(
4392 &self,
4393 docs_per_collection: Option<usize>,
4394 ) -> Result<HashMap<String, usize>> {
4395 let mut stats = HashMap::new();
4396
4397 // Get all collections
4398 let collections: Vec<String> = self
4399 .cold
4400 .scan()
4401 .filter_map(|r| r.ok())
4402 .map(|(k, _)| k)
4403 .filter(|k| k.starts_with("_collection:"))
4404 .map(|k| k.trim_start_matches("_collection:").to_string())
4405 .collect();
4406
4407 for collection in collections {
4408 let loaded = self.prewarm_cache(&collection, docs_per_collection).await?;
4409 stats.insert(collection, loaded);
4410 }
4411
4412 Ok(stats)
4413 }
4414
4415 /// Store multiple key-value pairs efficiently in a single batch operation
4416 ///
4417 /// Low-level batch write operation that bypasses document validation and
4418 /// writes raw byte data directly to storage. Useful for advanced use cases,
4419 /// custom serialization, or maximum performance scenarios.
4420 ///
4421 /// # Performance
4422 /// - Write speed: ~100,000 writes/sec
4423 /// - Single disk fsync for entire batch
4424 /// - No validation or schema checking
4425 /// - Direct storage access
4426 ///
4427 /// # Arguments
4428 /// * `pairs` - Vector of (key, value) tuples where value is raw bytes
4429 ///
4430 /// # Returns
4431 /// Success or an error
4432 ///
4433 /// # Examples
4434 ///
4435 /// ```ignore
4436 /// use aurora_db::Aurora;
4437 ///
4438 /// let db = Aurora::open("mydb.db")?;
4439 ///
4440 /// // Low-level batch write
4441 /// let pairs = vec![
4442 /// ("users:123".to_string(), b"raw data 1".to_vec()),
4443 /// ("users:456".to_string(), b"raw data 2".to_vec()),
4444 /// ("cache:key1".to_string(), b"cached value".to_vec()),
4445 /// ];
4446 ///
4447 /// db.batch_write(pairs)?;
4448 ///
4449 /// // Custom binary serialization
4450 /// use bincode;
4451 ///
4452 /// #[derive(Serialize, Deserialize)]
4453 /// struct CustomData {
4454 /// id: u64,
4455 /// payload: Vec<u8>,
4456 /// }
4457 ///
4458 /// let custom_data = vec![
4459 /// CustomData { id: 1, payload: vec![1, 2, 3] },
4460 /// CustomData { id: 2, payload: vec![4, 5, 6] },
4461 /// ];
4462 ///
4463 /// let pairs: Vec<(String, Vec<u8>)> = custom_data
4464 /// .iter()
4465 /// .map(|data| {
4466 /// let key = format!("binary:{}", data._sid);
4467 /// let value = bincode::serialize(data).unwrap();
4468 /// (key, value)
4469 /// })
4470 /// .collect();
4471 ///
4472 /// db.batch_write(pairs)?;
4473 ///
4474 /// // Bulk cache population
4475 /// let cache_entries: Vec<(String, Vec<u8>)> = (0..10000)
4476 /// .map(|i| {
4477 /// let key = format!("cache:item_{}", i);
4478 /// let value = format!("value_{}", i).into_bytes();
4479 /// (key, value)
4480 /// })
4481 /// .collect();
4482 ///
4483 /// db.batch_write(cache_entries)?;
4484 /// // Writes 10,000 entries in ~100ms
4485 /// ```
4486 ///
4487 /// # Important Notes
4488 /// - No schema validation performed
4489 /// - No unique constraint checking
4490 /// - No automatic indexing
4491 /// - Keys must follow "collection:id" format for proper grouping
4492 /// - Values are raw bytes - you handle serialization
4493 /// - Use `batch_insert()` for validated document inserts
4494 ///
4495 /// # When to Use
4496 /// - Maximum write performance needed
4497 /// - Custom serialization formats (bincode, msgpack, etc.)
4498 /// - Cache population
4499 /// - Low-level database operations
4500 /// - You're bypassing the document model
4501 ///
4502 /// # When NOT to Use
4503 /// - Regular document inserts → Use `batch_insert()` instead
4504 /// - Need validation → Use `batch_insert()` instead
4505 /// - Need indexing → Use `batch_insert()` instead
4506 ///
4507 /// # See Also
4508 /// - `batch_insert()` for validated document batch inserts
4509 /// - `put()` for single key-value writes
4510 pub async fn batch_write(&self, pairs: Vec<(String, Vec<u8>)>) -> Result<()> {
4511 // Group pairs by collection name
4512 let mut collections: HashMap<String, Vec<(String, Vec<u8>)>> = HashMap::new();
4513 for (key, value) in &pairs {
4514 if let Some(collection_name) = key.split(':').next() {
4515 collections
4516 .entry(collection_name.to_string())
4517 .or_default()
4518 .push((key.clone(), value.clone()));
4519 }
4520 }
4521
4522 // First, do the batch write to cold storage for all pairs
4523 self.cold.batch_set(pairs)?;
4524
4525 // Then, process each collection for in-memory updates
4526 for (collection_name, batch) in collections {
4527 // --- Optimized Batch Indexing ---
4528
4529 // 1. Get schema once for the entire collection batch
4530 let collection_obj = match self.schema_cache.get(&collection_name) {
4531 Some(cached_schema) => Arc::clone(cached_schema.value()),
4532 None => {
4533 let collection_key = format!("_collection:{}", collection_name);
4534 match self.get(&collection_key)? {
4535 Some(data) => {
4536 let obj: Collection = serde_json::from_slice(&data)?;
4537 let arc_obj = Arc::new(obj);
4538 self.schema_cache
4539 .insert(collection_name.to_string(), Arc::clone(&arc_obj));
4540 arc_obj
4541 }
4542 None => continue,
4543 }
4544 }
4545 };
4546
4547 let indexed_fields: Vec<String> = collection_obj
4548 .fields
4549 .iter()
4550 .filter(|(_, def)| def.indexed || def.unique)
4551 .map(|(name, _)| name.clone())
4552 .collect();
4553
4554 let primary_index = self
4555 .primary_indices
4556 .entry(collection_name.to_string())
4557 .or_default();
4558
4559 for (key, value) in batch {
4560 // 2. Update hot cache
4561 if self.should_cache_key(&key) {
4562 self.hot
4563 .set(Arc::new(key.clone()), Arc::new(value.clone()), None);
4564 }
4565
4566 // 3. Update primary index with metadata only
4567 let location = DiskLocation::new(value.len());
4568 primary_index.insert(key.clone(), location);
4569
4570 // 4. Update secondary indices
4571 if !indexed_fields.is_empty()
4572 && let Ok(doc) = serde_json::from_slice::<Document>(&value)
4573 {
4574 for (field, field_value) in doc.data {
4575 if indexed_fields.contains(&field) {
4576 let value_str = match &field_value {
4577 Value::String(s) => s.clone(),
4578 _ => field_value.to_string(),
4579 };
4580 let index_key = format!("{}:{}", collection_name, field);
4581 let secondary_index =
4582 self.secondary_indices.entry(index_key).or_default();
4583
4584 let id = key.split(':').nth(1).unwrap_or("");
4585 let internal_id = self.get_or_create_internal_id(id);
4586
4587 let index_entry =
4588 secondary_index.entry(value_str).or_insert_with(|| {
4589 Arc::new(RwLock::new(IndexStorage::Single(internal_id)))
4590 });
4591
4592 if let Ok(mut storage) = index_entry.value().write() {
4593 storage.add(internal_id);
4594 }
4595 }
4596 }
4597 }
4598 }
4599 }
4600
4601 Ok(())
4602 }
4603
4604 /// Scans the database for keys matching a specific prefix.
4605 ///
4606 /// This is a low-level method that returns an iterator over raw byte values.
4607 ///
4608 /// # Arguments
4609 /// * `prefix` - The key prefix to scan for.
4610 pub fn scan_with_prefix(
4611 &self,
4612 prefix: &str,
4613 ) -> impl Iterator<Item = Result<(String, Vec<u8>)>> + '_ {
4614 self.cold.scan_prefix(prefix)
4615 }
4616
4617 /// Returns storage efficiency and document count metrics for each collection.
4618 pub fn get_collection_stats(&self) -> Result<HashMap<String, CollectionStats>> {
4619 let mut stats = HashMap::new();
4620
4621 // Scan all collections
4622 let collections: Vec<String> = self
4623 .cold
4624 .scan()
4625 .filter_map(|r| r.ok())
4626 .map(|(k, _)| k)
4627 .filter(|k| k.starts_with("_collection:"))
4628 .map(|k| k.trim_start_matches("_collection:").to_string())
4629 .collect();
4630
4631 for collection in collections {
4632 // Use primary index for fast stats (count + size from DiskLocation)
4633 let (count, size) = if let Some(index) = self.primary_indices.get(&collection) {
4634 let count = index.len();
4635 // Sum up sizes from DiskLocation metadata (much faster than disk scan)
4636 let size: usize = index.iter().map(|entry| entry.value().size as usize).sum();
4637 (count, size)
4638 } else {
4639 // Fallback: scan from cold storage if index not available
4640 let prefix = format!("{}:", collection);
4641 let count = self.cold.scan_prefix(&prefix).count();
4642 let size: usize = self
4643 .cold
4644 .scan_prefix(&prefix)
4645 .filter_map(|r| r.ok())
4646 .map(|(_, v)| v.len())
4647 .sum();
4648 (count, size)
4649 };
4650
4651 stats.insert(
4652 collection,
4653 CollectionStats {
4654 count,
4655 size_bytes: size,
4656 avg_doc_size: if count > 0 { size / count } else { 0 },
4657 },
4658 );
4659 }
4660
4661 Ok(stats)
4662 }
4663
4664 /// Searches for documents where a field exactly matches a value using a secondary index.
4665 ///
4666 /// This is an optimized O(1) lookup path.
4667 pub fn search_by_value(
4668 &self,
4669 collection: &str,
4670 field: &str,
4671 value: &Value,
4672 ) -> Result<Vec<Document>> {
4673 let index_key = format!("_index:{}:{}", collection, field);
4674
4675 if let Some(index_data) = self.get(&index_key)? {
4676 let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
4677 let index = Index::new(index_def);
4678
4679 // Use the previously unused search method
4680 if let Some(doc_ids) = index.search(value) {
4681 // Load the documents by ID
4682 let mut docs = Vec::new();
4683 for id in doc_ids {
4684 if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
4685 let doc: Document = serde_json::from_slice(&doc_data)?;
4686 docs.push(doc);
4687 }
4688 }
4689 return Ok(docs);
4690 }
4691 }
4692
4693 // Return empty result if no index or no matches
4694 Ok(Vec::new())
4695 }
4696
4697 /// Performs a full-text search on an indexed text field.
4698 ///
4699 /// Returns results ranked by relevance if supported by the underlying index.
4700 pub fn full_text_search(
4701 &self,
4702 collection: &str,
4703 field: &str,
4704 query: &str,
4705 ) -> Result<Vec<Document>> {
4706 let index_key = format!("_index:{}:{}", collection, field);
4707
4708 if let Some(index_data) = self.get(&index_key)? {
4709 let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
4710
4711 // Ensure this is a full-text index
4712 if !matches!(index_def.index_type, IndexType::FullText) {
4713 return Err(AqlError::invalid_operation(format!(
4714 "Field '{}' is not indexed as full-text",
4715 field
4716 )));
4717 }
4718
4719 let index = Index::new(index_def);
4720
4721 // Use the previously unused search_text method
4722 if let Some(doc_id_scores) = index.search_text(query) {
4723 // Load the documents by ID, preserving score order
4724 let mut docs = Vec::new();
4725 for (id, _score) in doc_id_scores {
4726 if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
4727 let doc: Document = serde_json::from_slice(&doc_data)?;
4728 docs.push(doc);
4729 }
4730 }
4731 return Ok(docs);
4732 }
4733 }
4734
4735 // Return empty result if no index or no matches
4736 Ok(Vec::new())
4737 }
4738
4739 /// Creates a full-text search index on a text field.
4740 ///
4741 /// This enables high-performance keyword searching and relevance ranking.
4742 pub async fn create_text_index(
4743 &self,
4744 collection: &str,
4745 field: &str,
4746 _enable_stop_words: bool,
4747 ) -> Result<()> {
4748 // Check if collection exists
4749 if self.get(&format!("_collection:{}", collection))?.is_none() {
4750 return Err(AqlError::new(
4751 ErrorCode::CollectionNotFound,
4752 collection.to_string(),
4753 ));
4754 }
4755
4756 // Create index definition
4757 let index_def = IndexDefinition {
4758 name: format!("{}_{}_fulltext", collection, field),
4759 collection: collection.to_string(),
4760 fields: vec![field.to_string()],
4761 index_type: IndexType::FullText,
4762 unique: false,
4763 };
4764
4765 // Store index definition
4766 let index_key = format!("_index:{}:{}", collection, field);
4767 self.put(index_key, serde_json::to_vec(&index_def)?, None)
4768 .await?;
4769
4770 // Create the actual index
4771 let index = Index::new(index_def);
4772
4773 // Index all existing documents in the collection
4774 let prefix = format!("{}:", collection);
4775 for (_, data) in self.cold.scan_prefix(&prefix).flatten() {
4776 let doc: Document = serde_json::from_slice(&data)?;
4777 index.insert(&doc)?;
4778 }
4779
4780 Ok(())
4781 }
4782
4783 pub async fn execute_simple_query(
4784 &self,
4785 builder: &SimpleQueryBuilder,
4786 ) -> Result<Vec<Document>> {
4787 // Ensure indices are initialized
4788 self.ensure_indices_initialized().await?;
4789
4790 // A place to store the IDs of the documents we need to fetch
4791 let mut doc_ids_to_load: Option<Vec<String>> = None;
4792
4793 // --- The "Query Planner" ---
4794 // Smart heuristic: For range queries with small LIMITs, full scan can be faster
4795 // than collecting millions of IDs from secondary index
4796 let use_index_for_range = if let Some(limit) = builder.limit {
4797 // If limit is small (< 1000), prefer full scan for range queries
4798 // The secondary index would scan all entries anyway, might as well
4799 // scan documents directly and benefit from early termination
4800 limit >= 1000
4801 } else {
4802 // No limit? Index might still help if result set is small
4803 true
4804 };
4805
4806 // Look for an opportunity to use an index
4807 for (_filter_idx, filter) in builder.filters.iter().enumerate() {
4808 match filter {
4809 Filter::Eq(field, value) => {
4810 let index_key = format!("{}:{}", &builder.collection, field);
4811 if let Some(index) = self.secondary_indices.get(&index_key) {
4812 if let Some(matching_ids_arc) = index.get(&value.to_string()) {
4813 if let Ok(storage) = matching_ids_arc.value().read() {
4814 let ids = storage
4815 .iter()
4816 .filter_map(|id| self.get_external_id(id))
4817 .collect();
4818 doc_ids_to_load = Some(ids);
4819 break;
4820 }
4821 }
4822 }
4823 }
4824 Filter::Gt(field, value)
4825 | Filter::Gte(field, value)
4826 | Filter::Lt(field, value)
4827 | Filter::Lte(field, value) => {
4828 // Skip index for range queries with small LIMITs (see query planner heuristic above)
4829 if !use_index_for_range {
4830 continue;
4831 }
4832
4833 let index_key = format!("{}:{}", &builder.collection, field);
4834
4835 // Do we have a secondary index for this field?
4836 if let Some(index) = self.secondary_indices.get(&index_key) {
4837 // For range queries, we need to scan through the index values
4838 let mut matching_ids = Vec::new();
4839
4840 for entry in index.iter() {
4841 let index_value_str = entry.key();
4842
4843 // Try to parse the index value to compare with our filter value
4844 if let Ok(index_value) =
4845 self.parse_value_from_string(index_value_str, value)
4846 {
4847 let matches = match filter {
4848 Filter::Gt(_, filter_val) => index_value > *filter_val,
4849 Filter::Gte(_, filter_val) => index_value >= *filter_val,
4850 Filter::Lt(_, filter_val) => index_value < *filter_val,
4851 Filter::Lte(_, filter_val) => index_value <= *filter_val,
4852 _ => false,
4853 };
4854
4855 if matches {
4856 if let Ok(storage) = entry.value().read() {
4857 matching_ids.extend(
4858 storage
4859 .iter()
4860 .filter_map(|id| self.get_external_id(id)),
4861 );
4862 }
4863 }
4864 }
4865 }
4866
4867 if !matching_ids.is_empty() {
4868 doc_ids_to_load = Some(matching_ids);
4869 break;
4870 }
4871 }
4872 }
4873 Filter::Contains(field, search_term) => {
4874 let index_key = format!("{}:{}", &builder.collection, field);
4875
4876 // Do we have a secondary index for this field?
4877 if let Some(index) = self.secondary_indices.get(&index_key) {
4878 let mut matching_ids = Vec::new();
4879
4880 for entry in index.iter() {
4881 let index_value_str = entry.key();
4882
4883 // Check if this indexed value contains our search term
4884 if index_value_str
4885 .to_lowercase()
4886 .contains(&search_term.to_lowercase())
4887 {
4888 if let Ok(storage) = entry.value().read() {
4889 matching_ids.extend(
4890 storage.iter().filter_map(|id| self.get_external_id(id)),
4891 );
4892 }
4893 }
4894 }
4895
4896 if !matching_ids.is_empty() {
4897 // Remove duplicates since a document could match multiple indexed values
4898 matching_ids.sort();
4899 matching_ids.dedup();
4900
4901 doc_ids_to_load = Some(matching_ids);
4902 break;
4903 }
4904 }
4905 }
4906 Filter::And(_) => {
4907 // For compound filters, we can't easily use a single index
4908 // This would require more complex query planning
4909 continue;
4910 }
4911 Filter::Or(sub_filters) => {
4912 // For OR filters, collect union of all matching IDs from each sub-filter
4913 let mut union_ids: Vec<String> = Vec::new();
4914 let mut used_index = false;
4915
4916 for sub_filter in sub_filters {
4917 match sub_filter {
4918 Filter::Eq(field, value) => {
4919 let index_key = format!("{}:{}", &builder.collection, field);
4920 if let Some(index) = self.secondary_indices.get(&index_key) {
4921 if let Some(matching_ids_arc) = index.get(&value.to_string()) {
4922 if let Ok(storage) = matching_ids_arc.value().read() {
4923 union_ids.extend(
4924 storage
4925 .iter()
4926 .filter_map(|id| self.get_external_id(id)),
4927 );
4928 used_index = true;
4929 }
4930 }
4931 }
4932 }
4933 // For other filter types in OR, we fall back to full scan
4934 _ => continue,
4935 }
4936 }
4937
4938 if used_index && !union_ids.is_empty() {
4939 // Remove duplicates
4940 union_ids.sort();
4941 union_ids.dedup();
4942 doc_ids_to_load = Some(union_ids);
4943 break;
4944 }
4945 // If no index was used, continue to full scan
4946 }
4947 _ => continue,
4948 }
4949 }
4950
4951 let mut final_docs: Vec<Document>;
4952
4953 if let Some(ids) = doc_ids_to_load {
4954 // Index path
4955 use std::io::Write;
4956 if let Ok(mut file) = std::fs::OpenOptions::new()
4957 .create(true)
4958 .append(true)
4959 .open("/tmp/aurora_query_stats.log")
4960 {
4961 let _ = writeln!(
4962 file,
4963 "[INDEX PATH] IDs to load: {} | Collection: {}",
4964 ids.len(),
4965 builder.collection
4966 );
4967 }
4968
4969 final_docs = Vec::with_capacity(ids.len());
4970
4971 for id in ids {
4972 let doc_key = format!("{}:{}", &builder.collection, id);
4973 if let Some(data) = self.get(&doc_key)?
4974 && let Ok(doc) = serde_json::from_slice::<Document>(&data)
4975 {
4976 final_docs.push(doc);
4977 }
4978 }
4979 } else {
4980 // --- Path 2: Full Collection Scan with Early Termination ---
4981
4982 // Optimization: If we have a LIMIT but no ORDER BY, we can stop scanning
4983 // as soon as we have enough matching documents
4984 let early_termination_target = if builder.order_by.is_none() {
4985 builder.limit.map(|l| l + builder.offset.unwrap_or(0))
4986 } else {
4987 // With ORDER BY, we need all matching docs to sort correctly
4988 None
4989 };
4990
4991 // Smart scan with early termination support
4992 final_docs = Vec::new();
4993 let mut scan_stats = (0usize, 0usize, 0usize); // (keys_scanned, docs_fetched, matches_found)
4994
4995 if let Some(index) = self.primary_indices.get(&builder.collection) {
4996 for entry in index.iter() {
4997 let key = entry.key();
4998 scan_stats.0 += 1; // keys scanned
4999
5000 // Early termination check
5001 if let Some(target) = early_termination_target {
5002 if final_docs.len() >= target {
5003 break; // We have enough documents!
5004 }
5005 }
5006
5007 // Fetch and filter document
5008 if let Some(data) = self.get(key)? {
5009 scan_stats.1 += 1; // docs fetched
5010
5011 if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
5012 // Apply all filters (Ticket 4 Optimized Path)
5013 if builder.filters.iter().all(|filter| filter.matches(&doc)) {
5014 scan_stats.2 += 1; // matches found
5015 final_docs.push(doc);
5016 }
5017 }
5018 }
5019 }
5020
5021 // Debug logging for query performance analysis
5022 use std::io::Write;
5023 if let Ok(mut file) = std::fs::OpenOptions::new()
5024 .create(true)
5025 .append(true)
5026 .open("/tmp/aurora_query_stats.log")
5027 {
5028 let _ = writeln!(
5029 file,
5030 "[SCAN PATH] Scanned: {} keys | Fetched: {} docs | Matched: {} | Collection: {}",
5031 scan_stats.0, scan_stats.1, scan_stats.2, builder.collection
5032 );
5033 }
5034 } else {
5035 // Fallback: scan from cold storage if index not initialized
5036 final_docs = self.get_all_collection(&builder.collection).await?;
5037
5038 // Apply filters
5039 final_docs.retain(|doc| builder.filters.iter().all(|filter| filter.matches(doc)));
5040 }
5041 }
5042
5043 // Apply ordering
5044 if let Some((field, ascending)) = &builder.order_by {
5045 final_docs.sort_by(|a, b| match (a.data.get(field), b.data.get(field)) {
5046 (Some(v1), Some(v2)) => {
5047 let cmp = v1.cmp(v2);
5048 if *ascending { cmp } else { cmp.reverse() }
5049 }
5050 (None, Some(_)) => std::cmp::Ordering::Less,
5051 (Some(_), None) => std::cmp::Ordering::Greater,
5052 (None, None) => std::cmp::Ordering::Equal,
5053 });
5054 }
5055
5056 // Apply offset and limit
5057 let start = builder.offset.unwrap_or(0);
5058 let end = builder
5059 .limit
5060 .map(|l| start.saturating_add(l))
5061 .unwrap_or(final_docs.len());
5062
5063 let end = end.min(final_docs.len());
5064 Ok(final_docs.get(start..end).unwrap_or(&[]).to_vec())
5065 }
5066
5067 /// Helper method to parse a string value back to a Value for comparison
5068 fn parse_value_from_string(&self, value_str: &str, reference_value: &Value) -> Result<Value> {
5069 match reference_value {
5070 Value::Int(_) => {
5071 if let Ok(i) = value_str.parse::<i64>() {
5072 Ok(Value::Int(i))
5073 } else {
5074 Err(AqlError::invalid_operation(
5075 "Failed to parse int".to_string(),
5076 ))
5077 }
5078 }
5079 Value::Float(_) => {
5080 if let Ok(f) = value_str.parse::<f64>() {
5081 Ok(Value::Float(f))
5082 } else {
5083 Err(AqlError::invalid_operation(
5084 "Failed to parse float".to_string(),
5085 ))
5086 }
5087 }
5088 Value::String(_) => Ok(Value::String(value_str.to_string())),
5089 _ => Ok(Value::String(value_str.to_string())),
5090 }
5091 }
5092
5093 pub async fn execute_dynamic_query(
5094 &self,
5095 collection: &str,
5096 payload: &QueryPayload,
5097 ) -> Result<Vec<Document>> {
5098 let mut docs = self.get_all_collection(collection).await?;
5099
5100 // 1. Apply Filters
5101 if let Some(filters) = &payload.filters {
5102 docs.retain(|doc| {
5103 filters.iter().all(|filter| {
5104 doc.data
5105 .get(&filter.field)
5106 .is_some_and(|doc_val| check_filter(doc_val, filter))
5107 })
5108 });
5109 }
5110
5111 // 2. Apply Sorting
5112 if let Some(sort_options) = &payload.sort {
5113 docs.sort_by(|a, b| {
5114 let a_val = a.data.get(&sort_options.field);
5115 let b_val = b.data.get(&sort_options.field);
5116 let ordering = a_val
5117 .partial_cmp(&b_val)
5118 .unwrap_or(std::cmp::Ordering::Equal);
5119 if sort_options.ascending {
5120 ordering
5121 } else {
5122 ordering.reverse()
5123 }
5124 });
5125 }
5126
5127 // 3. Apply Pagination
5128 if let Some(offset) = payload.offset {
5129 docs = docs.into_iter().skip(offset).collect();
5130 }
5131 if let Some(limit) = payload.limit {
5132 docs = docs.into_iter().take(limit).collect();
5133 }
5134
5135 // 4. Apply Field Selection (Projection)
5136 if let Some(select_fields) = &payload.select
5137 && !select_fields.is_empty()
5138 {
5139 docs = docs
5140 .into_iter()
5141 .map(|mut doc| {
5142 doc.data.retain(|key, _| select_fields.contains(key));
5143 doc
5144 })
5145 .collect();
5146 }
5147
5148 Ok(docs)
5149 }
5150
5151 pub async fn process_network_request(
5152 &self,
5153 request: crate::network::protocol::Request,
5154 ) -> crate::network::protocol::Response {
5155 use crate::network::protocol::Response;
5156
5157 match request {
5158 crate::network::protocol::Request::Get(key) => match self.get(&key) {
5159 Ok(value) => Response::Success(value),
5160 Err(e) => Response::Error(e.to_string()),
5161 },
5162 crate::network::protocol::Request::Put(key, value) => {
5163 match self.put(key, value, None).await {
5164 Ok(_) => Response::Done,
5165 Err(e) => Response::Error(e.to_string()),
5166 }
5167 }
5168 crate::network::protocol::Request::Delete(key) => match self.delete(&key).await {
5169 Ok(_) => Response::Done,
5170 Err(e) => Response::Error(e.to_string()),
5171 },
5172 crate::network::protocol::Request::NewCollection { name, fields } => {
5173 let fields_for_db: Vec<(String, crate::types::FieldType, bool)> = fields
5174 .iter()
5175 .map(|(name, ft, unique)| (name.clone(), ft.clone(), *unique))
5176 .collect();
5177
5178 match self.new_collection(&name, fields_for_db).await {
5179 Ok(_) => Response::Done,
5180 Err(e) => Response::Error(e.to_string()),
5181 }
5182 }
5183 crate::network::protocol::Request::Insert { collection, data } => {
5184 match self.insert_map(&collection, data).await {
5185 Ok(id) => Response::Message(id),
5186 Err(e) => Response::Error(e.to_string()),
5187 }
5188 }
5189 crate::network::protocol::Request::GetDocument { collection, id } => {
5190 match self.get_document(&collection, &id) {
5191 Ok(doc) => Response::Document(doc),
5192 Err(e) => Response::Error(e.to_string()),
5193 }
5194 }
5195 crate::network::protocol::Request::Query(builder) => {
5196 match self.execute_simple_query(&builder).await {
5197 Ok(docs) => Response::Documents(docs),
5198 Err(e) => Response::Error(e.to_string()),
5199 }
5200 }
5201 crate::network::protocol::Request::BeginTransaction => {
5202 let tx_id = self.begin_transaction().await;
5203 Response::TransactionId(tx_id.as_u64())
5204 }
5205 crate::network::protocol::Request::CommitTransaction(tx_id_u64) => {
5206 let tx_id = crate::transaction::TransactionId::from_u64(tx_id_u64);
5207 match self.commit_transaction(tx_id).await {
5208 Ok(_) => Response::Done,
5209 Err(e) => Response::Error(e.to_string()),
5210 }
5211 }
5212 crate::network::protocol::Request::RollbackTransaction(tx_id_u64) => {
5213 let tx_id = crate::transaction::TransactionId::from_u64(tx_id_u64);
5214 match self.rollback_transaction(tx_id).await {
5215 Ok(_) => Response::Done,
5216 Err(e) => Response::Error(e.to_string()),
5217 }
5218 }
5219 }
5220 }
5221
5222 /// Create indices for commonly queried fields automatically
5223 ///
5224 /// This is a convenience method that creates indices for fields that are
5225 /// likely to be queried frequently, improving performance.
5226 ///
5227 /// # Arguments
5228 /// * `collection` - Name of the collection
5229 /// * `fields` - List of field names to create indices for
5230 ///
5231 /// # Examples
5232 /// ```ignore
5233 /// // Create indices for commonly queried fields
5234 /// db.create_indices("users", &["email", "status", "created_at"]).await?;
5235 /// ```
5236 pub async fn create_indices(&self, collection: &str, fields: &[&str]) -> Result<()> {
5237 for field in fields {
5238 if let Err(e) = self.create_index(collection, field).await {
5239 eprintln!(
5240 "Warning: Failed to create index for {}.{}: {}",
5241 collection, field, e
5242 );
5243 } else {
5244 println!("Created index for {}.{}", collection, field);
5245 }
5246 }
5247 Ok(())
5248 }
5249
5250 /// Get index statistics for a collection
5251 ///
5252 /// This helps understand which indices exist and how effective they are.
5253 pub fn get_index_stats(&self, collection: &str) -> HashMap<String, IndexStats> {
5254 let mut stats = HashMap::new();
5255
5256 for entry in self.secondary_indices.iter() {
5257 let key = entry.key();
5258 if key.starts_with(&format!("{}:", collection)) {
5259 let field = key.split(':').nth(1).unwrap_or("unknown");
5260 let index = entry.value();
5261
5262 let unique_values = index.len();
5263 let total_documents: usize = index
5264 .iter()
5265 .map(|entry| {
5266 if let Ok(storage) = entry.value().read() {
5267 storage.to_bitmap().len() as usize
5268 } else {
5269 0
5270 }
5271 })
5272 .sum();
5273
5274 stats.insert(
5275 field.to_string(),
5276 IndexStats {
5277 unique_values,
5278 total_documents,
5279 avg_docs_per_value: if unique_values > 0 {
5280 total_documents / unique_values
5281 } else {
5282 0
5283 },
5284 },
5285 );
5286 }
5287 }
5288
5289 stats
5290 }
5291
5292 /// Optimize a collection by creating indices for frequently filtered fields
5293 ///
5294 /// This analyzes common query patterns and suggests/creates optimal indices.
5295 pub async fn optimize_collection(&self, collection: &str) -> Result<()> {
5296 if let Ok(collection_def) = self.get_collection_definition(collection) {
5297 let field_names: Vec<&str> = collection_def.fields.keys().map(|s| s.as_str()).collect();
5298 self.create_indices(collection, &field_names).await?;
5299 }
5300
5301 Ok(())
5302 }
5303
5304 // Helper method to get unique fields from a collection
5305 fn get_unique_fields(&self, collection: &Collection) -> Vec<String> {
5306 collection
5307 .fields
5308 .iter()
5309 .filter(|(_, def)| def.unique)
5310 .map(|(name, _)| name.clone())
5311 .collect()
5312 }
5313
5314 // Update the validation method to use the helper
5315 async fn validate_unique_constraints(
5316 &self,
5317 collection: &str,
5318 data: &HashMap<String, Value>,
5319 ) -> Result<()> {
5320 self.ensure_indices_initialized().await?;
5321 let collection_def = match self.get_collection_definition(collection) {
5322 Ok(def) => def,
5323 Err(e) if e.code == crate::error::ErrorCode::CollectionNotFound => return Ok(()),
5324 Err(e) => return Err(e),
5325 };
5326
5327 // 1. Unique Constraints
5328 let unique_fields = self.get_unique_fields(&collection_def);
5329 for unique_field in &unique_fields {
5330 if let Some(value) = data.get(unique_field) {
5331 let index_key = format!("{}:{}", collection, unique_field);
5332 if let Some(_index) = self.secondary_indices.get(&index_key) {
5333 // Get the raw string value without JSON formatting
5334 let value_str = match value {
5335 Value::String(s) => s.clone(),
5336 _ => value.to_string(),
5337 };
5338 if let Some(storage) = self.get_indexed_storage(&index_key, &value_str) {
5339 if let Ok(s) = storage.read() {
5340 if !s.is_empty() {
5341 return Err(AqlError::new(
5342 ErrorCode::UniqueConstraintViolation,
5343 format!(
5344 "Unique constraint violation on field '{}' with value '{}'",
5345 unique_field, value_str
5346 ),
5347 ));
5348 }
5349 }
5350 }
5351 }
5352 }
5353 }
5354
5355 // 2. Relation Constraints (Foreign Keys)
5356 for (field_name, field_def) in &collection_def.fields {
5357 if let Some(rel) = &field_def.relation {
5358 if let Some(val) = data.get(field_name) {
5359 if !matches!(val, Value::Null) {
5360 let fk_value = match val {
5361 Value::String(s) => s.clone(),
5362 Value::Uuid(u) => u.to_string(),
5363 _ => val.to_string(),
5364 };
5365
5366 // Check if the referenced ID exists in the target collection
5367 // If the key is 'id' or '_sid', we can use get_document directly
5368 let exists = if rel.key == "id" || rel.key == "_sid" {
5369 self.get_document(&rel.to, &fk_value)?.is_some()
5370 } else {
5371 // Otherwise, we need to check the index of the target collection
5372 let index_key = format!("{}:{}", rel.to, rel.key);
5373 if let Some(storage) = self.get_indexed_storage(&index_key, &fk_value) {
5374 if let Ok(s) = storage.read() {
5375 !s.is_empty()
5376 } else {
5377 false
5378 }
5379 } else {
5380 // Target field is not indexed, fallback to scan (slow)
5381 let docs = self.scan_and_filter(
5382 &rel.to,
5383 |d| {
5384 d.data.get(&rel.key).map(|v| v.to_string())
5385 == Some(fk_value.clone())
5386 },
5387 Some(1),
5388 )?;
5389 !docs.is_empty()
5390 }
5391 };
5392
5393 if !exists {
5394 return Err(AqlError::new(
5395 ErrorCode::ValidationError,
5396 format!(
5397 "Foreign key violation: value '{}' for field '{}' does not exist in collection '{}' (field '{}')",
5398 fk_value, field_name, rel.to, rel.key
5399 ),
5400 ));
5401 }
5402 }
5403 }
5404 }
5405 }
5406
5407 Ok(())
5408 }
5409
5410 /// Validate unique constraints excluding a specific document ID (for updates)
5411 async fn validate_unique_constraints_excluding(
5412 &self,
5413 collection: &str,
5414 data: &HashMap<String, Value>,
5415 exclude_id: &str,
5416 ) -> Result<()> {
5417 self.ensure_indices_initialized().await?;
5418 let collection_def = self.get_collection_definition(collection)?;
5419 let unique_fields = self.get_unique_fields(&collection_def);
5420
5421 for unique_field in &unique_fields {
5422 if let Some(value) = data.get(unique_field) {
5423 let index_key = format!("{}:{}", collection, unique_field);
5424 if let Some(index) = self.secondary_indices.get(&index_key) {
5425 // Get the raw string value without JSON formatting
5426 let value_str = match value {
5427 Value::String(s) => s.clone(),
5428 _ => value.to_string(),
5429 };
5430 if let Some(doc_ids_arc) = index.get(&value_str) {
5431 let exclude_internal = self
5432 ._sid_dictionary
5433 .get(&self.parse_external_id(exclude_id))
5434 .map(|e| *e.value());
5435
5436 if let Ok(storage) = doc_ids_arc.value().read() {
5437 let has_violation = if let Some(exc) = exclude_internal {
5438 // Normal path: dictionary populated, use bitmap exclusion.
5439 storage.iter().any(|id| id != exc)
5440 } else {
5441 // _sid_dictionary not yet populated (first update after
5442 // restart — the dictionary is rebuilt lazily, not from the
5443 // checkpoint). Fall back to a cold-store ownership check:
5444 // if the document being updated already holds this exact
5445 // unique value, it is NOT a violation.
5446 let cold_key = format!("{}:{}", collection, exclude_id);
5447 let doc_owns_value = self
5448 .get(&cold_key)
5449 .ok()
5450 .flatten()
5451 .and_then(|raw| serde_json::from_slice::<Document>(&raw).ok())
5452 .and_then(|doc| doc.data.get(unique_field).cloned())
5453 .map(|existing_val| match &existing_val {
5454 Value::String(s) => *s == value_str,
5455 _ => existing_val.to_string() == value_str,
5456 })
5457 .unwrap_or(false);
5458
5459 if doc_owns_value {
5460 // Warm up the dictionary so subsequent checks on this
5461 // document use the fast path instead of cold storage.
5462 self.get_or_create_internal_id(exclude_id);
5463 false
5464 } else {
5465 !storage.is_empty()
5466 }
5467 };
5468
5469 if has_violation {
5470 return Err(AqlError::new(
5471 ErrorCode::UniqueConstraintViolation,
5472 format!(
5473 "Unique constraint violation on field '{}' with value '{}'",
5474 unique_field, value_str
5475 ),
5476 ));
5477 }
5478 }
5479 }
5480 }
5481 }
5482 }
5483
5484 // 2. Relation Constraints (Foreign Keys)
5485 for (field_name, field_def) in &collection_def.fields {
5486 if let Some(rel) = &field_def.relation {
5487 if let Some(val) = data.get(field_name) {
5488 if !matches!(val, Value::Null) {
5489 let fk_value = match val {
5490 Value::String(s) => s.clone(),
5491 Value::Uuid(u) => u.to_string(),
5492 _ => val.to_string(),
5493 };
5494
5495 let exists = if rel.key == "id" || rel.key == "_sid" {
5496 self.get_document(&rel.to, &fk_value)?.is_some()
5497 } else {
5498 let index_key = format!("{}:{}", rel.to, rel.key);
5499 if let Some(storage) = self.get_indexed_storage(&index_key, &fk_value) {
5500 if let Ok(s) = storage.read() {
5501 !s.is_empty()
5502 } else {
5503 false
5504 }
5505 } else {
5506 let docs = self.scan_and_filter(
5507 &rel.to,
5508 |d| {
5509 d.data.get(&rel.key).map(|v| v.to_string())
5510 == Some(fk_value.clone())
5511 },
5512 Some(1),
5513 )?;
5514 !docs.is_empty()
5515 }
5516 };
5517
5518 if !exists {
5519 return Err(AqlError::new(
5520 ErrorCode::ValidationError,
5521 format!(
5522 "Foreign key violation: value '{}' for field '{}' does not exist in collection '{}' (field '{}')",
5523 fk_value, field_name, rel.to, rel.key
5524 ),
5525 ));
5526 }
5527 }
5528 }
5529 }
5530 }
5531
5532 Ok(())
5533 }
5534
5535 // =========================================================================
5536 // AQL Executor Helper Methods
5537 // These are wrapper methods for AQL executor integration.
5538 // They provide a simplified API compatible with the AQL query execution layer.
5539 // =========================================================================
5540
5541 /// Get all documents in a collection (AQL helper)
5542 ///
5543 /// This is a wrapper around the internal query system optimized for bulk retrieval.
5544 pub async fn aql_get_all_collection(&self, collection: &str) -> Result<Vec<Document>> {
5545 self.ensure_indices_initialized().await?;
5546
5547 let prefix = format!("{}:", collection);
5548 let mut docs = Vec::new();
5549
5550 for result in self.cold.scan_prefix(&prefix) {
5551 let (_, value) = result?;
5552 if let Ok(doc) = serde_json::from_slice::<Document>(&value) {
5553 docs.push(doc);
5554 }
5555 }
5556
5557 Ok(docs)
5558 }
5559
5560 /// Insert a document from a HashMap (AQL helper)
5561 ///
5562 /// Returns the complete document (not just ID) for AQL executor
5563 pub async fn aql_insert(
5564 &self,
5565 collection: &str,
5566 data: HashMap<String, Value>,
5567 ) -> Result<Document> {
5568 // Validate unique constraints before inserting
5569 self.validate_unique_constraints(collection, &data).await?;
5570
5571 // Use 'id' from data as the internal system ID if provided and it's a string/uuid
5572 let sid = if let Some(v) = data.get("id") {
5573 match v {
5574 Value::String(s) => s.clone(),
5575 Value::Uuid(u) => u.to_string(),
5576 _ => Uuid::now_v7().to_string(),
5577 }
5578 } else {
5579 Uuid::now_v7().to_string()
5580 };
5581
5582 let document = Document {
5583 _sid: sid.clone(),
5584 data,
5585 };
5586
5587 self.put(
5588 format!("{}:{}", collection, sid),
5589 serde_json::to_vec(&document)?,
5590 None,
5591 )
5592 .await?;
5593
5594 // Publish insert event
5595 let event = crate::pubsub::ChangeEvent::insert(collection, &sid, document.clone());
5596 let _ = self.pubsub.publish(event);
5597
5598 Ok(document)
5599 }
5600
5601 /// Update a document by ID with new data (AQL helper)
5602 ///
5603 /// Merges new data with existing data and returns updated document
5604 pub async fn aql_update_document(
5605 &self,
5606 collection: &str,
5607 doc_id: &str,
5608 updates: HashMap<String, Value>,
5609 ) -> Result<Document> {
5610 let key = format!("{}:{}", collection, doc_id);
5611
5612 // Get existing document
5613 let existing = self.get(&key)?.ok_or_else(|| {
5614 AqlError::new(
5615 ErrorCode::NotFound,
5616 format!("Document {} not found", doc_id),
5617 )
5618 })?;
5619
5620 let mut doc: Document = serde_json::from_slice(&existing)?;
5621 let old_doc = doc.clone();
5622
5623 // Merge updates into existing data
5624 for (k, v) in updates {
5625 doc.data.insert(k, v);
5626 }
5627
5628 // Validate unique constraints excluding this document
5629 self.validate_unique_constraints_excluding(collection, &doc.data, doc_id)
5630 .await?;
5631
5632 // Save updated document
5633 self.put(key, serde_json::to_vec(&doc)?, None).await?;
5634
5635 // Remove stale secondary-index entries for fields whose values changed.
5636 // `put` adds the new value's bitmap entry but cannot know the previous
5637 // value, so without this the old entry lingers and blocks reuse of that
5638 // value under a unique constraint.
5639 let external_u128 = self.parse_external_id(doc_id);
5640 if let Some(internal_id) = self._sid_dictionary.get(&external_u128).map(|e| *e.value()) {
5641 for (field, old_val) in &old_doc.data {
5642 if let Some(new_val) = doc.data.get(field) {
5643 if new_val == old_val {
5644 continue;
5645 }
5646 }
5647
5648 let index_key = format!("{}:{}", collection, field);
5649 if let Some(index_map) = self.secondary_indices.get(&index_key) {
5650 let val_str = match old_val {
5651 Value::String(s) => s.clone(),
5652 _ => old_val.to_string(),
5653 };
5654 if let Some(storage_arc) = index_map.get(&val_str) {
5655 if let Ok(mut storage) = storage_arc.value().write() {
5656 storage.remove(internal_id);
5657 }
5658 }
5659 }
5660 }
5661 }
5662
5663 // Publish update event
5664 let event = crate::pubsub::ChangeEvent::update(collection, doc_id, old_doc, doc.clone());
5665 let _ = self.pubsub.publish(event);
5666
5667 Ok(doc)
5668 }
5669
5670 /// Delete a document by ID (AQL helper)
5671 ///
5672 /// Returns the deleted document
5673 pub async fn aql_delete_document(&self, collection: &str, doc_id: &str) -> Result<Document> {
5674 let key = format!("{}:{}", collection, doc_id);
5675
5676 // Get existing document first
5677 let existing = self.get(&key)?.ok_or_else(|| {
5678 AqlError::new(
5679 ErrorCode::NotFound,
5680 format!("Document {} not found", doc_id),
5681 )
5682 })?;
5683
5684 let doc: Document = serde_json::from_slice(&existing)?;
5685
5686 // Publish delete event before the transaction check so handlers fire
5687 // consistently regardless of whether this is inside an auto-transaction.
5688 // This mirrors aql_insert / aql_update_document which publish after put()
5689 // even when the underlying write is buffered in a transaction.
5690 let event = crate::pubsub::ChangeEvent::delete(collection, doc_id);
5691 let _ = self.pubsub.publish(event);
5692
5693 // If inside a transaction, buffer the delete instead of writing to storage.
5694 if let Ok(tx_id) = crate::transaction::ACTIVE_TRANSACTION_ID.try_with(|id| *id) {
5695 if let Some(buffer) = self.transaction_manager.active_transactions.get(&tx_id) {
5696 buffer.delete(key);
5697 return Ok(doc);
5698 }
5699 }
5700
5701 // Append to WAL for durability
5702 if self.config.durability_mode != DurabilityMode::None {
5703 if let Some(wal_writer) = &self.wal_writer {
5704 let op = WalOperation::Delete { key: key.clone() };
5705 wal_writer.send(op).map_err(|_| {
5706 AqlError::new(
5707 ErrorCode::InternalError,
5708 "Failed to send to WAL writer".to_string(),
5709 )
5710 })?;
5711 } else if let Some(wal) = &self.wal {
5712 wal.write()
5713 .map_err(|e| AqlError::new(ErrorCode::InternalError, e.to_string()))?
5714 .append(crate::wal::Operation::Delete, &key, None)?;
5715 }
5716 }
5717
5718 // Delete from storage
5719 self.cold.delete(&key)?;
5720 self.hot.delete(&key);
5721
5722 // Remove from primary index
5723 if let Some(index) = self.primary_indices.get_mut(collection) {
5724 index.remove(&key);
5725 }
5726
5727 // Remove from secondary indices
5728 for (field_name, value) in &doc.data {
5729 let index_key = format!("{}:{}", collection, field_name);
5730 if let Some(index) = self.secondary_indices.get(&index_key) {
5731 let value_str = match value {
5732 Value::String(s) => s.clone(),
5733 _ => value.to_string(),
5734 };
5735 if let Some(doc_ids_arc) = index.get(&value_str) {
5736 let external_u128 = self.parse_external_id(doc_id);
5737 if let Some(internal_id_entry) = self._sid_dictionary.get(&external_u128) {
5738 if let Ok(mut storage) = doc_ids_arc.value().write() {
5739 storage.remove(*internal_id_entry.value());
5740 }
5741 }
5742 }
5743 }
5744 }
5745
5746 // Return the internal ID to the recycling pool
5747 let external_u128 = self.parse_external_id(doc_id);
5748 if let Some(internal_id_entry) = self._sid_dictionary.get(&external_u128) {
5749 if let Ok(mut deleted) = self.deleted_ids.write() {
5750 deleted.insert(*internal_id_entry.value());
5751 }
5752 }
5753
5754 // Event was already published above (before transaction check).
5755
5756 Ok(doc)
5757 }
5758
5759 /// Get a single document by ID (AQL helper)
5760 pub async fn aql_get_document(
5761 &self,
5762 collection: &str,
5763 doc_id: &str,
5764 ) -> Result<Option<Document>> {
5765 let key = format!("{}:{}", collection, doc_id);
5766
5767 match self.get(&key)? {
5768 Some(data) => {
5769 let doc: Document = serde_json::from_slice(&data)?;
5770 Ok(Some(doc))
5771 }
5772 None => Ok(None),
5773 }
5774 }
5775
5776 /// Begin a transaction (AQL helper) - returns transaction ID
5777 pub async fn aql_begin_transaction(&self) -> Result<crate::transaction::TransactionId> {
5778 Ok(self.begin_transaction().await)
5779 }
5780
5781 /// Commit a transaction (AQL helper)
5782 pub async fn aql_commit_transaction(
5783 &self,
5784 tx_id: crate::transaction::TransactionId,
5785 ) -> Result<()> {
5786 self.commit_transaction(tx_id).await
5787 }
5788
5789 /// Rollback a transaction (AQL helper)
5790 pub async fn aql_rollback_transaction(
5791 &self,
5792 tx_id: crate::transaction::TransactionId,
5793 ) -> Result<()> {
5794 self.transaction_manager.rollback(tx_id)
5795 }
5796
5797 // ============================================
5798 // AQL Schema Management Wrappers
5799 // ============================================
5800
5801 /// Create a collection from AST schema definition
5802 pub async fn create_collection_schema(
5803 &self,
5804 name: &str,
5805 fields: HashMap<String, crate::types::FieldDefinition>,
5806 ) -> Result<()> {
5807 let collection_key = format!("_collection:{}", name);
5808
5809 // Check if collection already exists
5810 if self.get(&collection_key)?.is_some() {
5811 // Already exists
5812 return Ok(());
5813 }
5814
5815 let collection = Collection {
5816 name: name.to_string(),
5817 fields,
5818 };
5819
5820 let collection_data = serde_json::to_vec(&collection)?;
5821 self.put(collection_key, collection_data, None).await?;
5822 self.schema_cache.remove(name);
5823
5824 Ok(())
5825 }
5826
5827 /// Add a field to an existing collection schema
5828 pub async fn add_field_to_schema(
5829 &self,
5830 collection_name: &str,
5831 name: String,
5832 definition: crate::types::FieldDefinition,
5833 ) -> Result<()> {
5834 let mut collection = self
5835 .get_collection_definition(collection_name)
5836 .map_err(|_| {
5837 AqlError::new(ErrorCode::CollectionNotFound, collection_name.to_string())
5838 })?;
5839
5840 if collection.fields.contains_key(&name) {
5841 return Err(AqlError::new(
5842 ErrorCode::InvalidDefinition,
5843 format!(
5844 "Field '{}' already exists in collection '{}'",
5845 name, collection_name
5846 ),
5847 ));
5848 }
5849
5850 collection.fields.insert(name, definition);
5851
5852 let collection_key = format!("_collection:{}", collection_name);
5853 let collection_data = serde_json::to_vec(&collection)?;
5854 self.put(collection_key, collection_data, None).await?;
5855 self.schema_cache.remove(collection_name);
5856
5857 Ok(())
5858 }
5859
5860 /// Drop a field from an existing collection schema
5861 pub async fn drop_field_from_schema(
5862 &self,
5863 collection_name: &str,
5864 field_name: String,
5865 ) -> Result<()> {
5866 let mut collection = self
5867 .get_collection_definition(collection_name)
5868 .map_err(|_| {
5869 AqlError::new(ErrorCode::CollectionNotFound, collection_name.to_string())
5870 })?;
5871
5872 if !collection.fields.contains_key(&field_name) {
5873 return Err(AqlError::new(
5874 ErrorCode::InvalidDefinition,
5875 format!(
5876 "Field '{}' does not exist in collection '{}'",
5877 field_name, collection_name
5878 ),
5879 ));
5880 }
5881
5882 collection.fields.remove(&field_name);
5883
5884 let collection_key = format!("_collection:{}", collection_name);
5885 let collection_data = serde_json::to_vec(&collection)?;
5886 self.put(collection_key, collection_data, None).await?;
5887 self.schema_cache.remove(collection_name);
5888
5889 Ok(())
5890 }
5891
5892 /// Rename a field in an existing collection schema
5893 pub async fn rename_field_in_schema(
5894 &self,
5895 collection_name: &str,
5896 from: String,
5897 to: String,
5898 ) -> Result<()> {
5899 let mut collection = self
5900 .get_collection_definition(collection_name)
5901 .map_err(|_| {
5902 AqlError::new(ErrorCode::CollectionNotFound, collection_name.to_string())
5903 })?;
5904
5905 if let Some(def) = collection.fields.remove(&from) {
5906 if collection.fields.contains_key(&to) {
5907 return Err(AqlError::new(
5908 ErrorCode::InvalidDefinition,
5909 format!(
5910 "Target field name '{}' already exists in collection '{}'",
5911 to, collection_name
5912 ),
5913 ));
5914 }
5915 collection.fields.insert(to, def);
5916 } else {
5917 return Err(AqlError::new(
5918 ErrorCode::InvalidDefinition,
5919 format!("Field '{}' not found", from),
5920 ));
5921 }
5922
5923 let collection_key = format!("_collection:{}", collection_name);
5924 let collection_data = serde_json::to_vec(&collection)?;
5925 self.put(collection_key, collection_data, None).await?;
5926 self.schema_cache.remove(collection_name);
5927
5928 Ok(())
5929 }
5930
5931 /// Modify a field in an existing collection schema
5932 pub async fn modify_field_in_schema(
5933 &self,
5934 collection_name: &str,
5935 name: String,
5936 definition: crate::types::FieldDefinition,
5937 ) -> Result<()> {
5938 let mut collection = self
5939 .get_collection_definition(collection_name)
5940 .map_err(|_| {
5941 AqlError::new(ErrorCode::CollectionNotFound, collection_name.to_string())
5942 })?;
5943
5944 if !collection.fields.contains_key(&name) {
5945 return Err(AqlError::new(
5946 ErrorCode::InvalidDefinition,
5947 format!(
5948 "Field '{}' does not exist in collection '{}'",
5949 name, collection_name
5950 ),
5951 ));
5952 }
5953
5954 collection.fields.insert(name, definition);
5955
5956 let collection_key = format!("_collection:{}", collection_name);
5957 let collection_data = serde_json::to_vec(&collection)?;
5958 self.put(collection_key, collection_data, None).await?;
5959 self.schema_cache.remove(collection_name);
5960
5961 Ok(())
5962 }
5963
5964 /// Drop an entire collection definition
5965 pub async fn drop_collection_schema(&self, collection_name: &str) -> Result<()> {
5966 let collection_key = format!("_collection:{}", collection_name);
5967 // Write to WAL before cold delete so a crash doesn't leave orphaned docs
5968 // with no schema to describe them.
5969 if self.config.durability_mode != DurabilityMode::None {
5970 if let Some(wal_writer) = &self.wal_writer {
5971 let _ = wal_writer.send(WalOperation::Delete {
5972 key: collection_key.clone(),
5973 });
5974 }
5975 }
5976 self.cold.delete(&collection_key)?;
5977 self.schema_cache.remove(collection_name);
5978 Ok(())
5979 }
5980
5981 // ============================================
5982 // AQL Migration Wrappers
5983 // ============================================
5984
5985 /// Check if a migration version has been applied
5986 pub async fn is_migration_applied(&self, version: &str) -> Result<bool> {
5987 let migration_key = format!("_sys_migration:{}", version);
5988 Ok(self.get(&migration_key)?.is_some())
5989 }
5990
5991 /// Mark a migration version as applied
5992 pub async fn mark_migration_applied(&self, version: &str) -> Result<()> {
5993 let migration_key = format!("_sys_migration:{}", version);
5994 let timestamp = chrono::Utc::now().to_rfc3339();
5995 self.put(migration_key.clone(), timestamp.as_bytes().to_vec(), None)
5996 .await?;
5997
5998 // Also store as a document in _migrations so it's queryable via AQL
5999 let mut data = HashMap::new();
6000 data.insert("version".to_string(), Value::String(version.to_string()));
6001 data.insert("status".to_string(), Value::String("applied".to_string()));
6002 data.insert("applied_at".to_string(), Value::String(timestamp));
6003 let doc = Document {
6004 _sid: version.to_string(),
6005 data,
6006 };
6007 let doc_key = format!("_migrations:{}", version);
6008 self.put(doc_key, serde_json::to_vec(&doc)?, None).await?;
6009
6010 Ok(())
6011 }
6012}
6013
6014impl Drop for Aurora {
6015 fn drop(&mut self) {
6016 // Signal checkpoint task to shutdown gracefully
6017 if let Some(ref shutdown_tx) = self.checkpoint_shutdown {
6018 let _ = shutdown_tx.send(());
6019 }
6020 // Signal compaction task to shutdown gracefully
6021 if let Some(ref shutdown_tx) = self.compaction_shutdown {
6022 let _ = shutdown_tx.send(());
6023 }
6024 }
6025}
6026
6027fn check_filter(doc_val: &Value, filter: &HttpFilter) -> bool {
6028 let filter_val = match json_to_value(&filter.value) {
6029 Ok(v) => v,
6030 Err(_) => return false,
6031 };
6032
6033 match filter.operator {
6034 FilterOperator::Eq => doc_val == &filter_val,
6035 FilterOperator::Ne => doc_val != &filter_val,
6036 FilterOperator::Gt => doc_val > &filter_val,
6037 FilterOperator::Gte => doc_val >= &filter_val,
6038 FilterOperator::Lt => doc_val < &filter_val,
6039 FilterOperator::Lte => doc_val <= &filter_val,
6040 FilterOperator::Contains => match (doc_val, &filter_val) {
6041 (Value::String(s), Value::String(fv)) => s.contains(fv),
6042 (Value::Array(arr), _) => arr.contains(&filter_val),
6043 _ => false,
6044 },
6045 }
6046}
6047
6048/// Results of importing a document
6049enum ImportResult {
6050 Imported,
6051 Skipped,
6052}
6053
6054/// Statistics from an import operation
6055#[derive(Debug, Default)]
6056pub struct ImportStats {
6057 /// Number of documents successfully imported
6058 pub imported: usize,
6059 /// Number of documents skipped (usually because they already exist)
6060 pub skipped: usize,
6061 /// Number of documents that failed to import
6062 pub failed: usize,
6063}
6064
6065/// Statistics for a specific collection
6066#[derive(Debug)]
6067pub struct CollectionStats {
6068 /// Number of documents in the collection
6069 pub count: usize,
6070 /// Total size of the collection in bytes
6071 pub size_bytes: usize,
6072 /// Average document size in bytes
6073 pub avg_doc_size: usize,
6074}
6075
6076/// Statistics for an index
6077#[derive(Debug)]
6078pub struct IndexStats {
6079 /// Number of unique values in the index
6080 pub unique_values: usize,
6081 /// Total number of documents covered by the index
6082 pub total_documents: usize,
6083 /// Average number of documents per unique value
6084 pub avg_docs_per_value: usize,
6085}
6086
6087/// Combined database statistics
6088#[derive(Debug)]
6089pub struct DatabaseStats {
6090 /// Hot cache statistics
6091 pub hot_stats: crate::storage::hot::CacheStats,
6092 /// Cold storage statistics
6093 pub cold_stats: crate::storage::cold::ColdStoreStats,
6094 /// Estimated total database size in bytes
6095 pub estimated_size: u64,
6096 /// Statistics for each collection
6097 pub collections: HashMap<String, CollectionStats>,
6098}
6099
6100#[cfg(test)]
6101mod tests {
6102 use super::*;
6103 use tempfile::tempdir;
6104
6105 #[tokio::test]
6106 async fn test_async_wal_integration() {
6107 let temp_dir = tempfile::tempdir().unwrap();
6108 let db_path = temp_dir.path().join("test_wal_integration.db");
6109 let db = Aurora::open(db_path.to_str().unwrap()).await.unwrap();
6110
6111 // 1. Enable WAL explicitly (if your config defaults to off for tests)
6112 // Note: Your implementation might default it on based on AuroraConfig.
6113
6114 // Create collection schema first
6115 db.new_collection(
6116 "users",
6117 vec![
6118 (
6119 "id",
6120 FieldType::Scalar(crate::types::ScalarType::String),
6121 false,
6122 ),
6123 (
6124 "counter",
6125 FieldType::Scalar(crate::types::ScalarType::Int),
6126 false,
6127 ),
6128 ],
6129 )
6130 .await
6131 .unwrap();
6132
6133 let db_clone = db.clone();
6134
6135 // 2. Spawn 50 concurrent writes
6136 let handles: Vec<_> = (0..50)
6137 .map(|i| {
6138 let db = db_clone.clone();
6139 tokio::spawn(async move {
6140 let doc_id = db
6141 .insert_into(
6142 "users",
6143 vec![
6144 ("id", Value::String(format!("user-{}", i))),
6145 ("counter", Value::Int(i)),
6146 ],
6147 )
6148 .await
6149 .unwrap();
6150 (i, doc_id) // Return both the index and the document ID
6151 })
6152 })
6153 .collect();
6154
6155 // Wait for all to complete and collect the results
6156 let results = futures::future::join_all(handles).await;
6157 assert!(results.iter().all(|r| r.is_ok()), "Some writes failed");
6158
6159 // Extract the document IDs
6160 let doc_ids: Vec<(i64, String)> = results.into_iter().map(|r| r.unwrap()).collect();
6161
6162 // Find the document ID for the one with counter=25
6163 let target_doc_id = doc_ids
6164 .iter()
6165 .find(|(counter, _)| *counter == 25i64)
6166 .map(|(_, doc_id)| doc_id)
6167 .unwrap();
6168
6169 // 4. Verify Data Integrity
6170 let doc = db.get_document("users", target_doc_id).unwrap().unwrap();
6171 assert_eq!(doc.data.get("counter"), Some(&Value::Int(25)));
6172 }
6173
6174 #[tokio::test]
6175 async fn test_basic_operations() -> Result<()> {
6176 let temp_dir = tempdir()?;
6177 let db_path = temp_dir.path().join("test.aurora");
6178 let db = Aurora::open(db_path.to_str().unwrap()).await?;
6179
6180 // Test collection creation
6181 db.new_collection(
6182 "users",
6183 vec![
6184 (
6185 "name",
6186 FieldType::Scalar(crate::types::ScalarType::String),
6187 false,
6188 ),
6189 (
6190 "age",
6191 FieldType::Scalar(crate::types::ScalarType::Int),
6192 false,
6193 ),
6194 (
6195 "email",
6196 FieldType::Scalar(crate::types::ScalarType::String),
6197 true,
6198 ),
6199 ],
6200 )
6201 .await?;
6202
6203 // Test document insertion
6204 let doc_id = db
6205 .insert_into(
6206 "users",
6207 vec![
6208 ("name", Value::String("John Doe".to_string())),
6209 ("age", Value::Int(30)),
6210 ("email", Value::String("john@example.com".to_string())),
6211 ],
6212 )
6213 .await?;
6214
6215 // Test document retrieval
6216 let doc = db.get_document("users", &doc_id)?.unwrap();
6217 assert_eq!(
6218 doc.data.get("name").unwrap(),
6219 &Value::String("John Doe".to_string())
6220 );
6221 assert_eq!(doc.data.get("age").unwrap(), &Value::Int(30));
6222
6223 Ok(())
6224 }
6225
6226 #[tokio::test]
6227 async fn test_transactions() -> Result<()> {
6228 let temp_dir = tempdir()?;
6229 let db_path = temp_dir.path().join("test.aurora");
6230 let db = Aurora::open(db_path.to_str().unwrap()).await?;
6231
6232 // Create collection
6233 db.new_collection(
6234 "test",
6235 vec![(
6236 "field",
6237 FieldType::Scalar(crate::types::ScalarType::String),
6238 false,
6239 )],
6240 )
6241 .await?;
6242
6243 // Start transaction
6244 let tx_id = db.begin_transaction().await;
6245
6246 // Insert document
6247 let doc_id = db
6248 .insert_into("test", vec![("field", Value::String("value".to_string()))])
6249 .await?;
6250
6251 // Commit transaction
6252 db.commit_transaction(tx_id).await?;
6253
6254 // Verify document exists
6255 let doc = db.get_document("test", &doc_id)?.unwrap();
6256 assert_eq!(
6257 doc.data.get("field").unwrap(),
6258 &Value::String("value".to_string())
6259 );
6260
6261 Ok(())
6262 }
6263
6264 #[tokio::test]
6265 async fn test_query_operations() -> Result<()> {
6266 let temp_dir = tempdir()?;
6267 let db_path = temp_dir.path().join("test.aurora");
6268 let db = Aurora::open(db_path.to_str().unwrap()).await?;
6269
6270 // Test collection creation
6271 db.new_collection(
6272 "books",
6273 vec![
6274 (
6275 "title",
6276 FieldType::Scalar(crate::types::ScalarType::String),
6277 false,
6278 ),
6279 (
6280 "author",
6281 FieldType::Scalar(crate::types::ScalarType::String),
6282 false,
6283 ),
6284 (
6285 "year",
6286 FieldType::Scalar(crate::types::ScalarType::Int),
6287 false,
6288 ),
6289 ],
6290 )
6291 .await?;
6292
6293 // Test document insertion
6294 db.insert_into(
6295 "books",
6296 vec![
6297 ("title", Value::String("Book 1".to_string())),
6298 ("author", Value::String("Author 1".to_string())),
6299 ("year", Value::Int(2020)),
6300 ],
6301 )
6302 .await?;
6303
6304 db.insert_into(
6305 "books",
6306 vec![
6307 ("title", Value::String("Book 2".to_string())),
6308 ("author", Value::String("Author 2".to_string())),
6309 ("year", Value::Int(2021)),
6310 ],
6311 )
6312 .await?;
6313
6314 // Test query
6315 let results = db
6316 .query("books")
6317 .filter(|f| f.gt("year", Value::Int(2019)))
6318 .order_by("year", true)
6319 .collect()
6320 .await?;
6321
6322 assert_eq!(results.len(), 2);
6323 assert!(results[0].data.get("year").unwrap() < results[1].data.get("year").unwrap());
6324
6325 Ok(())
6326 }
6327
6328 #[tokio::test]
6329 async fn test_blob_operations() -> Result<()> {
6330 let temp_dir = tempdir()?;
6331 let db_path = temp_dir.path().join("test.aurora");
6332 let db = Aurora::open(db_path.to_str().unwrap()).await?;
6333
6334 // Create test file
6335 let file_path = temp_dir.path().join("test.txt");
6336 std::fs::write(&file_path, b"Hello, World!")?;
6337
6338 // Test blob storage
6339 db.put_blob("test:blob".to_string(), &file_path).await?;
6340
6341 // Verify blob exists
6342 let data = db.get_data_by_pattern("test:blob")?;
6343 assert_eq!(data.len(), 1);
6344 match &data[0].1 {
6345 DataInfo::Blob { size } => assert_eq!(*size, 13 + 5), // content + "BLOB:" prefix
6346 _ => panic!("Expected Blob type"),
6347 }
6348
6349 Ok(())
6350 }
6351
6352 #[tokio::test]
6353 async fn test_blob_size_limit() -> Result<()> {
6354 let temp_dir = tempdir()?;
6355 let db_path = temp_dir.path().join("test.aurora");
6356 let db = Aurora::open(db_path.to_str().unwrap()).await?;
6357
6358 // Create a test file that's too large (201MB)
6359 let large_file_path = temp_dir.path().join("large_file.bin");
6360 let large_data = vec![0u8; 201 * 1024 * 1024];
6361 std::fs::write(&large_file_path, &large_data)?;
6362
6363 // Attempt to store the large file
6364 let result = db
6365 .put_blob("test:large_blob".to_string(), &large_file_path)
6366 .await;
6367
6368 assert!(result.is_err());
6369 let err = result.unwrap_err();
6370 assert!(err.code == ErrorCode::InvalidOperation); // Expected error
6371
6372 Ok(())
6373 }
6374
6375 #[tokio::test]
6376 async fn test_unique_constraints() -> Result<()> {
6377 let temp_dir = tempdir()?;
6378 let db_path = temp_dir.path().join("test.aurora");
6379 let db = Aurora::open(db_path.to_str().unwrap()).await?;
6380
6381 // Create collection with unique email field
6382 db.new_collection(
6383 "users",
6384 vec![
6385 (
6386 "name",
6387 FieldType::Scalar(crate::types::ScalarType::String),
6388 false,
6389 ),
6390 (
6391 "email",
6392 FieldType::Scalar(crate::types::ScalarType::String),
6393 true,
6394 ), // unique field
6395 (
6396 "age",
6397 FieldType::Scalar(crate::types::ScalarType::Int),
6398 false,
6399 ),
6400 ],
6401 )
6402 .await?;
6403
6404 // Insert first document
6405 let _doc_id1 = db
6406 .insert_into(
6407 "users",
6408 vec![
6409 ("name", Value::String("John Doe".to_string())),
6410 ("email", Value::String("john@example.com".to_string())),
6411 ("age", Value::Int(30)),
6412 ],
6413 )
6414 .await?;
6415
6416 // Try to insert second document with same email - should fail
6417 let result = db
6418 .insert_into(
6419 "users",
6420 vec![
6421 ("name", Value::String("Jane Doe".to_string())),
6422 ("email", Value::String("john@example.com".to_string())), // duplicate email
6423 ("age", Value::Int(25)),
6424 ],
6425 )
6426 .await;
6427
6428 assert!(result.is_err());
6429 if let Err(e) = result {
6430 if e.code == ErrorCode::UniqueConstraintViolation {
6431 let msg = e.message;
6432 assert!(msg.contains("email"));
6433 assert!(msg.contains("john@example.com")); // Changed from test@example.com to match the actual value
6434 } else {
6435 panic!("Expected UniqueConstraintViolation, got {:?}", e);
6436 }
6437 } else {
6438 panic!("Expected UniqueConstraintViolation");
6439 }
6440
6441 // Test upsert with unique constraint
6442 // Should succeed for new document
6443 let _doc_id2 = db
6444 .upsert(
6445 "users",
6446 "user2",
6447 vec![
6448 ("name", Value::String("Alice Smith".to_string())),
6449 ("email", Value::String("alice@example.com".to_string())),
6450 ("age", Value::Int(28)),
6451 ],
6452 )
6453 .await?;
6454
6455 // Should fail when trying to upsert with duplicate email
6456 let result = db
6457 .upsert(
6458 "users",
6459 "user3",
6460 vec![
6461 ("name", Value::String("Bob Wilson".to_string())),
6462 ("email", Value::String("alice@example.com".to_string())), // duplicate
6463 ("age", Value::Int(35)),
6464 ],
6465 )
6466 .await;
6467
6468 assert!(result.is_err());
6469
6470 // Should succeed when updating existing document with same email (no change)
6471 let result = db
6472 .upsert(
6473 "users",
6474 "user2",
6475 vec![
6476 ("name", Value::String("Alice Updated".to_string())),
6477 ("email", Value::String("alice@example.com".to_string())), // same email, same doc
6478 ("age", Value::Int(29)),
6479 ],
6480 )
6481 .await;
6482
6483 assert!(result.is_ok());
6484
6485 Ok(())
6486 }
6487
6488 #[tokio::test]
6489 async fn test_nullable_field_definition() -> Result<()> {
6490 use tempfile::TempDir;
6491
6492 let temp_dir = TempDir::new()?;
6493 let db_path = temp_dir.path().join("test.aurora");
6494 let db = Aurora::open(db_path.to_str().unwrap()).await?;
6495
6496 // Test 3-tuple format (defaults nullable to false)
6497 db.new_collection(
6498 "products",
6499 vec![
6500 (
6501 "name",
6502 FieldType::Scalar(crate::types::ScalarType::String),
6503 false,
6504 ),
6505 (
6506 "sku",
6507 FieldType::Scalar(crate::types::ScalarType::String),
6508 true,
6509 ), // unique
6510 ],
6511 )
6512 .await?;
6513
6514 // Test 4-tuple format with explicit nullable control
6515 db.new_collection(
6516 "users",
6517 vec![
6518 (
6519 "name",
6520 FieldType::Scalar(crate::types::ScalarType::String),
6521 false,
6522 false,
6523 ), // not nullable
6524 (
6525 "email",
6526 FieldType::Scalar(crate::types::ScalarType::String),
6527 true,
6528 false,
6529 ), // unique, not nullable
6530 (
6531 "bio",
6532 FieldType::Scalar(crate::types::ScalarType::String),
6533 false,
6534 true,
6535 ), // nullable
6536 (
6537 "age",
6538 FieldType::Scalar(crate::types::ScalarType::Int),
6539 false,
6540 true,
6541 ), // nullable
6542 ],
6543 )
6544 .await?;
6545
6546 // Verify the schema was created correctly
6547 let schema = db.ensure_schema_hot("users")?;
6548
6549 // Check that name is not nullable
6550 assert_eq!(schema.fields.get("name").unwrap().nullable, false);
6551
6552 // Check that email is not nullable
6553 assert_eq!(schema.fields.get("email").unwrap().nullable, false);
6554
6555 // Check that bio is nullable
6556 assert_eq!(schema.fields.get("bio").unwrap().nullable, true);
6557
6558 // Check that age is nullable
6559 assert_eq!(schema.fields.get("age").unwrap().nullable, true);
6560
6561 // Verify products collection (3-tuple defaults to false)
6562 let products_schema = db.ensure_schema_hot("products")?;
6563 assert_eq!(products_schema.fields.get("name").unwrap().nullable, false);
6564 assert_eq!(products_schema.fields.get("sku").unwrap().nullable, false);
6565
6566 Ok(())
6567 }
6568
6569 #[tokio::test]
6570 async fn test_upsert_unique_field_after_restart() {
6571 // Simulates the post-restart scenario:
6572 // _sid_dictionary is empty but secondary_indices have the bitmap entries.
6573 let dir = tempfile::tempdir().unwrap();
6574 let db_path = dir.path().to_path_buf();
6575
6576 // Session 1: insert a fact
6577 let mut document_id = String::new();
6578 {
6579 let db = Aurora::with_config(AuroraConfig {
6580 db_path: db_path.clone(),
6581 enable_wal: false,
6582 ..Default::default()
6583 })
6584 .await
6585 .unwrap();
6586
6587 db.new_collection(
6588 "facts",
6589 vec![
6590 (
6591 "key",
6592 FieldDefinition {
6593 field_type: FieldType::SCALAR_STRING,
6594 unique: true,
6595 indexed: true,
6596 nullable: false,
6597 validations: vec![],
6598 relation: None,
6599 },
6600 ),
6601 (
6602 "value_json",
6603 FieldDefinition {
6604 field_type: FieldType::SCALAR_STRING,
6605 unique: false,
6606 indexed: false,
6607 nullable: false,
6608 validations: vec![],
6609 relation: None,
6610 },
6611 ),
6612 ],
6613 )
6614 .await
6615 .unwrap();
6616
6617 let aql = r#"
6618 mutation {
6619 insertInto(collection: "facts", data: {
6620 key: "last_session_at",
6621 value_json: "\"2026-04-05T01:00:00Z\""
6622 }) { id }
6623 }
6624 "#
6625 .to_string();
6626 let result = db.execute(aql).await.unwrap();
6627
6628 // Extract ID from result (QueryResult format)
6629 if let crate::parser::executor::ExecutionResult::Mutation(mutres) = result {
6630 document_id = mutres.returned_documents[0]._sid.clone();
6631 }
6632
6633 // Explicitly flush to ensure secondary_indices are written to disk
6634 db.flush().unwrap();
6635 }
6636 // db dropped here — simulates process restart
6637 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
6638
6639 // Session 2: new Aurora instance, _sid_dictionary starts empty
6640 {
6641 let db = Aurora::with_config(AuroraConfig {
6642 db_path: db_path.clone(),
6643 enable_wal: false,
6644 ..Default::default()
6645 })
6646 .await
6647 .unwrap();
6648
6649 // Update the same key with a new value directly by ID — must NOT error
6650 let aql = format!(
6651 r#"
6652 mutation {{
6653 update(
6654 collection: "facts",
6655 id: "{}",
6656 data: {{ key: "last_session_at", value_json: "\"2026-04-05T03:00:00Z\"" }}
6657 ) {{ id }}
6658 }}
6659 "#,
6660 document_id
6661 );
6662
6663 db.execute(aql)
6664 .await
6665 .expect("update of existing unique key after restart should not fail");
6666 }
6667 }
6668}