1use crate::error::{AuroraError, Result};
47use crate::index::{Index, IndexDefinition, IndexType};
48use crate::network::http_models::{
49 Filter as HttpFilter, FilterOperator, QueryPayload, json_to_value,
50};
51use crate::query::{Filter, FilterBuilder, QueryBuilder, SearchBuilder, SimpleQueryBuilder};
52use crate::storage::{ColdStore, HotStore, WriteBuffer};
53use crate::types::{AuroraConfig, Collection, Document, FieldDefinition, FieldType, Value};
54use dashmap::DashMap;
55use serde_json::Value as JsonValue;
56use serde_json::from_str;
57use std::collections::HashMap;
58use std::fmt;
59use std::fs::File as StdFile;
60use std::path::{Path, PathBuf};
61use std::sync::Arc;
62use std::time::Duration;
63use tokio::fs::File;
64use tokio::fs::read_to_string;
65use tokio::io::AsyncReadExt;
66use tokio::sync::OnceCell;
67use uuid::Uuid;
68type PrimaryIndex = DashMap<String, Vec<u8>>;
70type SecondaryIndex = DashMap<String, Vec<String>>;
71
72#[derive(Debug)]
74pub enum DataInfo {
75 Data { size: usize, preview: String },
76 Blob { size: usize },
77 Compressed { size: usize },
78}
79
80impl DataInfo {
81 pub fn size(&self) -> usize {
82 match self {
83 DataInfo::Data { size, .. } => *size,
84 DataInfo::Blob { size } => *size,
85 DataInfo::Compressed { size } => *size,
86 }
87 }
88}
89
90pub struct Aurora {
114 hot: HotStore,
115 cold: Arc<ColdStore>,
116 primary_indices: Arc<DashMap<String, PrimaryIndex>>,
118 secondary_indices: Arc<DashMap<String, SecondaryIndex>>,
119 indices_initialized: Arc<OnceCell<()>>,
120 transaction_manager: crate::transaction::TransactionManager,
122 indices: Arc<DashMap<String, Index>>,
123 schema_cache: Arc<DashMap<String, Arc<Collection>>>,
125 config: AuroraConfig,
127 write_buffer: Option<WriteBuffer>,
129 pubsub: crate::pubsub::PubSubSystem,
131}
132
133impl fmt::Debug for Aurora {
134 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135 f.debug_struct("Aurora")
136 .field("hot", &"HotStore")
137 .field("cold", &"ColdStore")
138 .field("primary_indices_count", &self.primary_indices.len())
139 .field("secondary_indices_count", &self.secondary_indices.len())
140 .field(
141 "active_transactions",
142 &self.transaction_manager.active_count(),
143 )
144 .field("indices_count", &self.indices.len())
145 .finish()
146 }
147}
148
149impl Aurora {
150 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
171 let mut config = AuroraConfig::default();
172 config.db_path = Self::resolve_path(path)?;
173 Self::with_config(config)
174 }
175
176 fn resolve_path<P: AsRef<Path>>(path: P) -> Result<PathBuf> {
178 let path = path.as_ref();
179
180 if path.is_absolute() {
182 return Ok(path.to_path_buf());
183 }
184
185 match std::env::current_dir() {
187 Ok(current_dir) => Ok(current_dir.join(path)),
188 Err(e) => Err(AuroraError::IoError(format!(
189 "Failed to resolve current directory: {}",
190 e
191 ))),
192 }
193 }
194
195 pub fn with_config(config: AuroraConfig) -> Result<Self> {
197 let path = Self::resolve_path(&config.db_path)?;
198
199 if config.create_dirs {
200 if let Some(parent) = path.parent() {
201 if !parent.exists() {
202 std::fs::create_dir_all(parent)?;
203 }
204 }
205 }
206
207 let cold = Arc::new(ColdStore::with_config(
209 path.to_str().unwrap(),
210 config.cold_cache_capacity_mb,
211 config.cold_flush_interval_ms,
212 config.cold_mode,
213 )?);
214
215 let hot = HotStore::with_config_and_eviction(
216 config.hot_cache_size_mb,
217 config.hot_cache_cleanup_interval_secs,
218 config.eviction_policy,
219 );
220
221 let write_buffer = if config.enable_write_buffering {
223 Some(WriteBuffer::new(
224 Arc::clone(&cold),
225 config.write_buffer_size,
226 config.write_buffer_flush_interval_ms,
227 ))
228 } else {
229 None
230 };
231
232 let auto_compact = config.auto_compact;
234
235 let pubsub = crate::pubsub::PubSubSystem::new(10000);
237
238 let db = Self {
240 hot,
241 cold,
242 primary_indices: Arc::new(DashMap::new()),
243 secondary_indices: Arc::new(DashMap::new()),
244 indices_initialized: Arc::new(OnceCell::new()),
245 transaction_manager: crate::transaction::TransactionManager::new(),
246 indices: Arc::new(DashMap::new()),
247 schema_cache: Arc::new(DashMap::new()),
248 config,
249 write_buffer,
250 pubsub,
251 };
252
253 if auto_compact {
255 }
258
259 Ok(db)
260 }
261
262 pub async fn ensure_indices_initialized(&self) -> Result<()> {
264 self.indices_initialized
265 .get_or_init(|| async {
266 println!("Initializing indices...");
267 if let Err(e) = self.initialize_indices() {
268 eprintln!("Failed to initialize indices: {:?}", e);
269 }
270 println!("Indices initialized");
271 ()
272 })
273 .await;
274 Ok(())
275 }
276
277 fn initialize_indices(&self) -> Result<()> {
278 for result in self.cold.scan() {
280 let (key, value) = result?;
281 let key_str = std::str::from_utf8(&key.as_bytes())
282 .map_err(|_| AuroraError::InvalidKey("Invalid UTF-8".into()))?;
283
284 if let Some(collection_name) = key_str.split(':').next() {
285 self.index_value(collection_name, key_str, &value)?;
286 }
287 }
288 Ok(())
289 }
290
291 pub fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
293 if let Some(value) = self.hot.get(key) {
295 return Ok(Some(value));
296 }
297
298 if let Some(collection) = key.split(':').next() {
300 if let Some(index) = self.primary_indices.get(collection) {
301 if let Some(value) = index.get(key) {
302 self.hot.set(key.to_string(), value.clone(), None);
304 return Ok(Some(value.clone()));
305 }
306 }
307 }
308
309 let value = self.cold.get(key)?;
311 if let Some(v) = &value {
312 self.hot.set(key.to_string(), v.clone(), None);
313 }
314 Ok(value)
315 }
316
317 pub fn get_hot_ref(&self, key: &str) -> Option<Arc<Vec<u8>>> {
320 self.hot.get_ref(key)
321 }
322
323 pub fn get_cache_stats(&self) -> crate::storage::hot::CacheStats {
325 self.hot.get_stats()
326 }
327
328 pub fn listen(&self, collection: impl Into<String>) -> crate::pubsub::ChangeListener {
347 self.pubsub.listen(collection)
348 }
349
350 pub fn listen_all(&self) -> crate::pubsub::ChangeListener {
352 self.pubsub.listen_all()
353 }
354
355 pub fn listener_count(&self, collection: &str) -> usize {
357 self.pubsub.listener_count(collection)
358 }
359
360 pub fn total_listeners(&self) -> usize {
362 self.pubsub.total_listeners()
363 }
364
365 pub fn put(&self, key: String, value: Vec<u8>, ttl: Option<Duration>) -> Result<()> {
366 const MAX_BLOB_SIZE: usize = 50 * 1024 * 1024; if value.len() > MAX_BLOB_SIZE {
369 return Err(AuroraError::InvalidOperation(format!(
370 "Blob size {} exceeds maximum allowed size of {}MB",
371 value.len() / (1024 * 1024),
372 MAX_BLOB_SIZE / (1024 * 1024)
373 )));
374 }
375
376 if let Some(ref write_buffer) = self.write_buffer {
378 write_buffer.write(key.clone(), value.clone())?;
380 } else {
381 self.cold.set(key.clone(), value.clone())?;
383 }
384
385 self.hot.set(key.clone(), value.clone(), ttl);
387
388 if let Some(collection_name) = key.split(':').next() {
390 if !collection_name.starts_with('_') {
392 self.index_value(collection_name, &key, &value)?;
393 }
394 }
395
396 Ok(())
397 }
398
399 fn index_value(&self, collection: &str, key: &str, value: &[u8]) -> Result<()> {
400 self.primary_indices
402 .entry(collection.to_string())
403 .or_insert_with(DashMap::new)
404 .insert(key.to_string(), value.to_vec());
405
406 let collection_obj = match self.schema_cache.get(collection) {
408 Some(cached_schema) => Arc::clone(cached_schema.value()),
409 None => {
410 let collection_key = format!("_collection:{}", collection);
412 let schema_data = match self.get(&collection_key)? {
413 Some(data) => data,
414 None => return Ok(()), };
416
417 let obj: Collection = match serde_json::from_slice(&schema_data) {
418 Ok(obj) => obj,
419 Err(_) => return Ok(()), };
421
422 let arc_obj = Arc::new(obj);
424 self.schema_cache
425 .insert(collection.to_string(), Arc::clone(&arc_obj));
426 arc_obj
427 }
428 };
429
430 let indexed_fields: Vec<String> = collection_obj
432 .fields
433 .iter()
434 .filter(|(_, def)| def.indexed || def.unique)
435 .map(|(name, _)| name.clone())
436 .collect();
437
438 if indexed_fields.is_empty() {
440 return Ok(());
441 }
442
443 if let Ok(doc) = serde_json::from_slice::<Document>(value) {
445 for (field, field_value) in doc.data {
446 if !indexed_fields.contains(&field) {
448 continue;
449 }
450
451 let value_str = match &field_value {
453 Value::String(s) => s.clone(),
454 _ => field_value.to_string(),
455 };
456
457 let index_key = format!("{}:{}", collection, field);
458 let secondary_index = self
459 .secondary_indices
460 .entry(index_key)
461 .or_insert_with(DashMap::new);
462
463 let max_entries = self.config.max_index_entries_per_field;
465
466 secondary_index
467 .entry(value_str)
468 .and_modify(|doc_ids| {
469 if doc_ids.len() < max_entries {
471 doc_ids.push(key.to_string());
472 }
473 })
474 .or_insert_with(|| vec![key.to_string()]);
475 }
476 }
477 Ok(())
478 }
479
480 fn scan_collection(&self, collection: &str) -> Result<Vec<Document>> {
482 let _prefix = format!("{}:", collection);
483 let mut documents = Vec::new();
484
485 if let Some(index) = self.primary_indices.get(collection) {
486 for entry in index.iter() {
487 if let Ok(doc) = serde_json::from_slice(entry.value()) {
488 documents.push(doc);
489 }
490 }
491 }
492
493 Ok(documents)
494 }
495
496 pub async fn put_blob(&self, key: String, file_path: &Path) -> Result<()> {
498 const MAX_FILE_SIZE: usize = 50 * 1024 * 1024; let metadata = tokio::fs::metadata(file_path).await?;
502 let file_size = metadata.len() as usize;
503
504 if file_size > MAX_FILE_SIZE {
505 return Err(AuroraError::InvalidOperation(format!(
506 "File size {} MB exceeds maximum allowed size of {} MB",
507 file_size / (1024 * 1024),
508 MAX_FILE_SIZE / (1024 * 1024)
509 )));
510 }
511
512 let mut file = File::open(file_path).await?;
513 let mut buffer = Vec::new();
514 file.read_to_end(&mut buffer).await?;
515
516 let mut blob_data = Vec::with_capacity(5 + buffer.len());
518 blob_data.extend_from_slice(b"BLOB:");
519 blob_data.extend_from_slice(&buffer);
520
521 self.put(key, blob_data, None)
522 }
523
524 pub fn new_collection<S: Into<String>>(
549 &self,
550 name: &str,
551 fields: Vec<(S, FieldType, bool)>,
552 ) -> Result<()> {
553 let collection_key = format!("_collection:{}", name);
554
555 if self.get(&collection_key)?.is_some() {
557 return Ok(());
558 }
559
560 let mut field_definitions = HashMap::new();
562 for (field_name, field_type, unique) in fields {
563 field_definitions.insert(
564 field_name.into(),
565 FieldDefinition {
566 field_type,
567 unique,
568 indexed: unique, },
570 );
571 }
572
573 let collection = Collection {
574 name: name.to_string(),
575 fields: field_definitions,
576 };
578
579 let collection_data = serde_json::to_vec(&collection)?;
580 self.put(collection_key, collection_data, None)?;
581
582 self.schema_cache.remove(name);
584
585 Ok(())
586 }
587
588 pub async fn insert_into(&self, collection: &str, data: Vec<(&str, Value)>) -> Result<String> {
608 let data_map: HashMap<String, Value> =
610 data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
611
612 self.validate_unique_constraints(collection, &data_map)
614 .await?;
615
616 let doc_id = Uuid::new_v4().to_string();
617 let document = Document {
618 id: doc_id.clone(),
619 data: data_map,
620 };
621
622 self.put(
623 format!("{}:{}", collection, doc_id),
624 serde_json::to_vec(&document)?,
625 None,
626 )?;
627
628 let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
630 let _ = self.pubsub.publish(event);
631
632 Ok(doc_id)
633 }
634
635 pub async fn insert_map(
636 &self,
637 collection: &str,
638 data: HashMap<String, Value>,
639 ) -> Result<String> {
640 self.validate_unique_constraints(collection, &data).await?;
642
643 let doc_id = Uuid::new_v4().to_string();
644 let document = Document {
645 id: doc_id.clone(),
646 data,
647 };
648
649 self.put(
650 format!("{}:{}", collection, doc_id),
651 serde_json::to_vec(&document)?,
652 None,
653 )?;
654
655 let event = crate::pubsub::ChangeEvent::insert(collection, &doc_id, document.clone());
657 let _ = self.pubsub.publish(event);
658
659 Ok(doc_id)
660 }
661
662 pub async fn update_document(
681 &self,
682 collection: &str,
683 doc_id: &str,
684 updates: Vec<(&str, Value)>,
685 ) -> Result<()> {
686 let mut document = self
688 .get_document(collection, doc_id)?
689 .ok_or_else(|| AuroraError::NotFound(format!("Document not found: {}", doc_id)))?;
690
691 let old_document = document.clone();
693
694 for (field, value) in updates {
696 document.data.insert(field.to_string(), value);
697 }
698
699 self.validate_unique_constraints_excluding(collection, &document.data, doc_id)
701 .await?;
702
703 self.put(
705 format!("{}:{}", collection, doc_id),
706 serde_json::to_vec(&document)?,
707 None,
708 )?;
709
710 let event =
712 crate::pubsub::ChangeEvent::update(collection, doc_id, old_document, document.clone());
713 let _ = self.pubsub.publish(event);
714
715 Ok(())
716 }
717
718 pub async fn get_all_collection(&self, collection: &str) -> Result<Vec<Document>> {
719 self.ensure_indices_initialized().await?;
720 self.scan_collection(collection)
721 }
722
723 pub fn get_data_by_pattern(&self, pattern: &str) -> Result<Vec<(String, DataInfo)>> {
724 let mut data = Vec::new();
725
726 if let Some(index) = self
727 .primary_indices
728 .get(pattern.split(':').next().unwrap_or(""))
729 {
730 for entry in index.iter() {
731 if entry.key().contains(pattern) {
732 let value = entry.value();
733 let info = if value.starts_with(b"BLOB:") {
734 DataInfo::Blob { size: value.len() }
735 } else {
736 DataInfo::Data {
737 size: value.len(),
738 preview: String::from_utf8_lossy(&value[..value.len().min(50)])
739 .into_owned(),
740 }
741 };
742
743 data.push((entry.key().clone(), info));
744 }
745 }
746 }
747
748 Ok(data)
749 }
750
751 pub fn begin_transaction(&self) -> crate::transaction::TransactionId {
777 let buffer = self.transaction_manager.begin();
778 buffer.id
779 }
780
781 pub fn commit_transaction(&self, tx_id: crate::transaction::TransactionId) -> Result<()> {
791 let buffer = self
792 .transaction_manager
793 .active_transactions
794 .get(&tx_id)
795 .ok_or_else(|| {
796 AuroraError::InvalidOperation("Transaction not found or already completed".into())
797 })?;
798
799 for item in buffer.writes.iter() {
800 let key = item.key();
801 let value = item.value();
802 self.cold.set(key.clone(), value.clone())?;
803 self.hot.set(key.clone(), value.clone(), None);
804 if let Some(collection_name) = key.split(':').next() {
805 if !collection_name.starts_with('_') {
806 self.index_value(collection_name, key, value)?;
807 }
808 }
809 }
810
811 for item in buffer.deletes.iter() {
812 let key = item.key();
813 if let Some((collection, id)) = key.split_once(':') {
814 if let Ok(Some(doc)) = self.get_document(collection, id) {
815 self.remove_from_indices(collection, &doc)?;
816 }
817 }
818 self.cold.delete(key)?;
819 self.hot.delete(key);
820 }
821
822 self.transaction_manager.commit(tx_id)?;
823
824 self.cold.compact()?;
825
826 Ok(())
827 }
828
829 pub fn rollback_transaction(&self, tx_id: crate::transaction::TransactionId) -> Result<()> {
839 self.transaction_manager.rollback(tx_id)
840 }
841
842 pub async fn create_index(&self, collection: &str, field: &str) -> Result<()> {
843 if self.get(&format!("_collection:{}", collection))?.is_none() {
845 return Err(AuroraError::CollectionNotFound(collection.to_string()));
846 }
847
848 let index_name = format!("idx_{}_{}", collection, field);
850
851 let definition = IndexDefinition {
853 name: index_name.clone(),
854 collection: collection.to_string(),
855 fields: vec![field.to_string()],
856 index_type: IndexType::BTree,
857 unique: false,
858 };
859
860 let index = Index::new(definition.clone());
862
863 let prefix = format!("{}:", collection);
865 for result in self.cold.scan_prefix(&prefix) {
866 if let Ok((_, data)) = result {
867 if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
868 let _ = index.insert(&doc);
869 }
870 }
871 }
872
873 self.indices.insert(index_name, index);
875
876 let index_key = format!("_index:{}:{}", collection, field);
878 self.put(index_key, serde_json::to_vec(&definition)?, None)?;
879
880 Ok(())
881 }
882
883 pub fn query<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
903 QueryBuilder::new(self, collection)
904 }
905
906 pub fn search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
926 SearchBuilder::new(self, collection)
927 }
928
929 pub fn get_document(&self, collection: &str, id: &str) -> Result<Option<Document>> {
949 let key = format!("{}:{}", collection, id);
950 if let Some(data) = self.get(&key)? {
951 Ok(Some(serde_json::from_slice(&data)?))
952 } else {
953 Ok(None)
954 }
955 }
956
957 pub async fn delete(&self, key: &str) -> Result<()> {
973 let (collection, id) = if let Some((coll, doc_id)) = key.split_once(':') {
975 (coll, doc_id)
976 } else {
977 return Err(AuroraError::InvalidOperation(
978 "Invalid key format, expected 'collection:id'".into(),
979 ));
980 };
981
982 let document = self.get_document(collection, id)?;
984
985 if self.hot.get(key).is_some() {
987 self.hot.delete(key);
988 }
989
990 self.cold.delete(key)?;
992
993 if let Some(doc) = document {
995 self.remove_from_indices(collection, &doc)?;
996 } else {
997 if let Some(index) = self.primary_indices.get_mut(collection) {
999 index.remove(id);
1000 }
1001 }
1002
1003 let event = crate::pubsub::ChangeEvent::delete(collection, id);
1005 let _ = self.pubsub.publish(event);
1006
1007 Ok(())
1008 }
1009
1010 pub async fn delete_collection(&self, collection: &str) -> Result<()> {
1011 let prefix = format!("{}:", collection);
1012
1013 let keys: Vec<String> = self
1015 .cold
1016 .scan()
1017 .filter_map(|r| r.ok())
1018 .filter(|(k, _)| k.starts_with(&prefix))
1019 .map(|(k, _)| k)
1020 .collect();
1021
1022 for key in keys {
1024 self.delete(&key).await?;
1025 }
1026
1027 self.primary_indices.remove(collection);
1029 self.secondary_indices
1030 .retain(|k, _| !k.starts_with(&prefix));
1031
1032 self.schema_cache.remove(collection);
1034
1035 Ok(())
1036 }
1037
1038 fn remove_from_indices(&self, collection: &str, doc: &Document) -> Result<()> {
1039 if let Some(index) = self.primary_indices.get(collection) {
1041 index.remove(&doc.id);
1042 }
1043
1044 for (field, value) in &doc.data {
1046 let index_key = format!("{}:{}", collection, field);
1047 if let Some(index) = self.secondary_indices.get(&index_key) {
1048 if let Some(mut doc_ids) = index.get_mut(&value.to_string()) {
1049 doc_ids.retain(|id| id != &doc.id);
1050 }
1051 }
1052 }
1053
1054 Ok(())
1055 }
1056
1057 pub async fn search_text(
1058 &self,
1059 collection: &str,
1060 field: &str,
1061 query: &str,
1062 ) -> Result<Vec<Document>> {
1063 let mut results = Vec::new();
1064 let docs = self.get_all_collection(collection).await?;
1065
1066 for doc in docs {
1067 if let Some(Value::String(text)) = doc.data.get(field) {
1068 if text.to_lowercase().contains(&query.to_lowercase()) {
1069 results.push(doc);
1070 }
1071 }
1072 }
1073
1074 Ok(results)
1075 }
1076
1077 pub fn export_as_json(&self, collection: &str, output_path: &str) -> Result<()> {
1093 let output_path = if !output_path.ends_with(".json") {
1094 format!("{}.json", output_path)
1095 } else {
1096 output_path.to_string()
1097 };
1098
1099 let mut docs = Vec::new();
1100
1101 for result in self.cold.scan() {
1103 let (key, value) = result?;
1104
1105 if let Some(key_collection) = key.split(':').next() {
1107 if key_collection == collection && !key.starts_with("_collection:") {
1108 if let Ok(doc) = serde_json::from_slice::<Document>(&value) {
1109 let mut clean_doc = serde_json::Map::new();
1111 for (k, v) in doc.data {
1112 match v {
1113 Value::String(s) => clean_doc.insert(k, JsonValue::String(s)),
1114 Value::Int(i) => clean_doc.insert(k, JsonValue::Number(i.into())),
1115 Value::Float(f) => {
1116 if let Some(n) = serde_json::Number::from_f64(f) {
1117 clean_doc.insert(k, JsonValue::Number(n))
1118 } else {
1119 clean_doc.insert(k, JsonValue::Null)
1120 }
1121 }
1122 Value::Bool(b) => clean_doc.insert(k, JsonValue::Bool(b)),
1123 Value::Array(arr) => {
1124 let clean_arr: Vec<JsonValue> = arr
1125 .into_iter()
1126 .map(|v| match v {
1127 Value::String(s) => JsonValue::String(s),
1128 Value::Int(i) => JsonValue::Number(i.into()),
1129 Value::Float(f) => serde_json::Number::from_f64(f)
1130 .map(JsonValue::Number)
1131 .unwrap_or(JsonValue::Null),
1132 Value::Bool(b) => JsonValue::Bool(b),
1133 Value::Null => JsonValue::Null,
1134 _ => JsonValue::Null,
1135 })
1136 .collect();
1137 clean_doc.insert(k, JsonValue::Array(clean_arr))
1138 }
1139 Value::Uuid(u) => {
1140 clean_doc.insert(k, JsonValue::String(u.to_string()))
1141 }
1142 Value::Null => clean_doc.insert(k, JsonValue::Null),
1143 Value::Object(_) => None, };
1145 }
1146 docs.push(JsonValue::Object(clean_doc));
1147 }
1148 }
1149 }
1150 }
1151
1152 let output = JsonValue::Object(serde_json::Map::from_iter(vec![(
1153 collection.to_string(),
1154 JsonValue::Array(docs),
1155 )]));
1156
1157 let mut file = StdFile::create(&output_path)?;
1158 serde_json::to_writer_pretty(&mut file, &output)?;
1159 println!("Exported collection '{}' to {}", collection, &output_path);
1160 Ok(())
1161 }
1162
1163 pub fn export_as_csv(&self, collection: &str, filename: &str) -> Result<()> {
1165 let output_path = if !filename.ends_with(".csv") {
1166 format!("{}.csv", filename)
1167 } else {
1168 filename.to_string()
1169 };
1170
1171 let mut writer = csv::Writer::from_path(&output_path)?;
1172 let mut headers = Vec::new();
1173 let mut first_doc = true;
1174
1175 for result in self.cold.scan() {
1177 let (key, value) = result?;
1178
1179 if let Some(key_collection) = key.split(':').next() {
1181 if key_collection == collection && !key.starts_with("_collection:") {
1182 if let Ok(doc) = serde_json::from_slice::<Document>(&value) {
1183 if first_doc && !doc.data.is_empty() {
1185 headers = doc.data.keys().cloned().collect();
1186 writer.write_record(&headers)?;
1187 first_doc = false;
1188 }
1189
1190 let values: Vec<String> = headers
1192 .iter()
1193 .map(|header| {
1194 doc.data
1195 .get(header)
1196 .map(|v| v.to_string())
1197 .unwrap_or_default()
1198 })
1199 .collect();
1200 writer.write_record(&values)?;
1201 }
1202 }
1203 }
1204 }
1205
1206 writer.flush()?;
1207 println!("Exported collection '{}' to {}", collection, &output_path);
1208 Ok(())
1209 }
1210
1211 pub fn find<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
1213 self.query(collection)
1214 }
1215
1216 pub async fn find_by_id(&self, collection: &str, id: &str) -> Result<Option<Document>> {
1219 self.query(collection)
1220 .filter(|f| f.eq("id", id))
1221 .first_one()
1222 .await
1223 }
1224
1225 pub async fn find_one<F>(&self, collection: &str, filter_fn: F) -> Result<Option<Document>>
1226 where
1227 F: Fn(&FilterBuilder) -> bool + Send + Sync + 'static,
1228 {
1229 self.query(collection).filter(filter_fn).first_one().await
1230 }
1231
1232 pub async fn find_by_field<T: Into<Value> + Clone + Send + Sync + 'static>(
1233 &self,
1234 collection: &str,
1235 field: &'static str,
1236 value: T,
1237 ) -> Result<Vec<Document>> {
1238 let value_clone = value.clone();
1239 self.query(collection)
1240 .filter(move |f| f.eq(field, value_clone.clone()))
1241 .collect()
1242 .await
1243 }
1244
1245 pub async fn find_by_fields(
1246 &self,
1247 collection: &str,
1248 fields: Vec<(&str, Value)>,
1249 ) -> Result<Vec<Document>> {
1250 let mut query = self.query(collection);
1251
1252 for (field, value) in fields {
1253 let field_owned = field.to_owned();
1254 let value_owned = value.clone();
1255 query = query.filter(move |f| f.eq(&field_owned, value_owned.clone()));
1256 }
1257
1258 query.collect().await
1259 }
1260
1261 pub async fn find_in_range<T: Into<Value> + Clone + Send + Sync + 'static>(
1263 &self,
1264 collection: &str,
1265 field: &'static str,
1266 min: T,
1267 max: T,
1268 ) -> Result<Vec<Document>> {
1269 self.query(collection)
1270 .filter(move |f| f.between(field, min.clone(), max.clone()))
1271 .collect()
1272 .await
1273 }
1274
1275 pub async fn find_complex<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
1277 self.query(collection)
1278 }
1279
1280 pub fn advanced_search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
1282 self.search(collection)
1283 }
1284
1285 pub async fn upsert(
1287 &self,
1288 collection: &str,
1289 id: &str,
1290 data: Vec<(&str, Value)>,
1291 ) -> Result<String> {
1292 let data_map: HashMap<String, Value> =
1294 data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
1295
1296 if let Some(mut doc) = self.get_document(collection, id)? {
1298 for (key, value) in data_map {
1300 doc.data.insert(key, value);
1301 }
1302
1303 self.validate_unique_constraints_excluding(collection, &doc.data, id)
1306 .await?;
1307
1308 self.put(
1309 format!("{}:{}", collection, id),
1310 serde_json::to_vec(&doc)?,
1311 None,
1312 )?;
1313 Ok(id.to_string())
1314 } else {
1315 self.validate_unique_constraints(collection, &data_map)
1317 .await?;
1318
1319 let document = Document {
1320 id: id.to_string(),
1321 data: data_map,
1322 };
1323
1324 self.put(
1325 format!("{}:{}", collection, id),
1326 serde_json::to_vec(&document)?,
1327 None,
1328 )?;
1329 Ok(id.to_string())
1330 }
1331 }
1332
1333 pub async fn increment(
1335 &self,
1336 collection: &str,
1337 id: &str,
1338 field: &str,
1339 amount: i64,
1340 ) -> Result<i64> {
1341 if let Some(mut doc) = self.get_document(collection, id)? {
1342 let current = match doc.data.get(field) {
1344 Some(Value::Int(i)) => *i,
1345 _ => 0,
1346 };
1347
1348 let new_value = current + amount;
1350 doc.data.insert(field.to_string(), Value::Int(new_value));
1351
1352 self.put(
1354 format!("{}:{}", collection, id),
1355 serde_json::to_vec(&doc)?,
1356 None,
1357 )?;
1358
1359 Ok(new_value)
1360 } else {
1361 Err(AuroraError::NotFound(format!(
1362 "Document {}:{} not found",
1363 collection, id
1364 )))
1365 }
1366 }
1367
1368 pub async fn batch_insert(
1370 &self,
1371 collection: &str,
1372 docs: Vec<Vec<(&str, Value)>>,
1373 ) -> Result<Vec<String>> {
1374 let doc_maps: Vec<HashMap<String, Value>> = docs
1376 .into_iter()
1377 .map(|doc| doc.into_iter().map(|(k, v)| (k.to_string(), v)).collect())
1378 .collect();
1379
1380 let tx_id = self.begin_transaction();
1382
1383 let mut ids = Vec::with_capacity(doc_maps.len());
1384
1385 for data_map in doc_maps {
1387 let data_vec: Vec<(&str, Value)> = data_map
1389 .iter()
1390 .map(|(k, v)| (k.as_str(), v.clone()))
1391 .collect();
1392
1393 match self.insert_into(collection, data_vec).await {
1394 Ok(id) => ids.push(id),
1395 Err(e) => {
1396 self.rollback_transaction(tx_id)?;
1397 return Err(e);
1398 }
1399 }
1400 }
1401
1402 self.commit_transaction(tx_id)?;
1404
1405 Ok(ids)
1406 }
1407
1408 pub async fn delete_by_query<F>(&self, collection: &str, filter_fn: F) -> Result<usize>
1410 where
1411 F: Fn(&FilterBuilder) -> bool + Send + Sync + 'static,
1412 {
1413 let docs = self.query(collection).filter(filter_fn).collect().await?;
1414
1415 let mut deleted_count = 0;
1416
1417 for doc in docs {
1418 let key = format!("{}:{}", collection, doc.id);
1419 self.delete(&key).await?;
1420 deleted_count += 1;
1421 }
1422
1423 Ok(deleted_count)
1424 }
1425
1426 pub async fn import_from_json(&self, collection: &str, filename: &str) -> Result<ImportStats> {
1447 let collection_def = self.get_collection_definition(collection)?;
1449
1450 let json_string = read_to_string(filename)
1452 .await
1453 .map_err(|e| AuroraError::IoError(format!("Failed to read import file: {}", e)))?;
1454
1455 let documents: Vec<JsonValue> = from_str(&json_string)
1457 .map_err(|e| AuroraError::SerializationError(format!("Failed to parse JSON: {}", e)))?;
1458
1459 let mut stats = ImportStats::default();
1460
1461 for doc_json in documents {
1463 match self
1464 .import_document(collection, &collection_def, doc_json)
1465 .await
1466 {
1467 Ok(ImportResult::Imported) => stats.imported += 1,
1468 Ok(ImportResult::Skipped) => stats.skipped += 1,
1469 Err(_) => stats.failed += 1,
1470 }
1471 }
1472
1473 Ok(stats)
1474 }
1475
1476 async fn import_document(
1478 &self,
1479 collection: &str,
1480 collection_def: &Collection,
1481 doc_json: JsonValue,
1482 ) -> Result<ImportResult> {
1483 if !doc_json.is_object() {
1484 return Err(AuroraError::InvalidOperation("Expected JSON object".into()));
1485 }
1486
1487 let doc_id = doc_json
1489 .get("id")
1490 .and_then(|id| id.as_str())
1491 .map(|s| s.to_string())
1492 .unwrap_or_else(|| Uuid::new_v4().to_string());
1493
1494 if let Some(_) = self.get_document(collection, &doc_id)? {
1496 return Ok(ImportResult::Skipped);
1497 }
1498
1499 let mut data_map = HashMap::new();
1501
1502 if let Some(obj) = doc_json.as_object() {
1503 for (field_name, field_def) in &collection_def.fields {
1504 if let Some(json_value) = obj.get(field_name) {
1505 if !self.validate_field_value(json_value, &field_def.field_type) {
1507 return Err(AuroraError::InvalidOperation(format!(
1508 "Field '{}' has invalid type",
1509 field_name
1510 )));
1511 }
1512
1513 let value = self.json_to_value(json_value)?;
1515 data_map.insert(field_name.clone(), value);
1516 } else if field_def.unique {
1517 return Err(AuroraError::InvalidOperation(format!(
1519 "Missing required unique field '{}'",
1520 field_name
1521 )));
1522 }
1523 }
1524 }
1525
1526 let unique_fields = self.get_unique_fields(&collection_def);
1528 for unique_field in &unique_fields {
1529 if let Some(value) = data_map.get(unique_field) {
1530 let query_results = self
1532 .query(collection)
1533 .filter(move |f| f.eq(unique_field, value.clone()))
1534 .limit(1)
1535 .collect()
1536 .await?;
1537
1538 if !query_results.is_empty() {
1539 return Ok(ImportResult::Skipped);
1541 }
1542 }
1543 }
1544
1545 let document = Document {
1547 id: doc_id,
1548 data: data_map,
1549 };
1550
1551 self.put(
1552 format!("{}:{}", collection, document.id),
1553 serde_json::to_vec(&document)?,
1554 None,
1555 )?;
1556
1557 Ok(ImportResult::Imported)
1558 }
1559
1560 fn validate_field_value(&self, value: &JsonValue, field_type: &FieldType) -> bool {
1562 match field_type {
1563 FieldType::String => value.is_string(),
1564 FieldType::Int => value.is_i64() || value.is_u64(),
1565 FieldType::Float => value.is_number(),
1566 FieldType::Bool => value.is_boolean(),
1567 FieldType::Array => value.is_array(),
1568 FieldType::Object => value.is_object(),
1569 FieldType::Uuid => {
1570 value.is_string() && Uuid::parse_str(value.as_str().unwrap_or("")).is_ok()
1571 }
1572 }
1573 }
1574
1575 fn json_to_value(&self, json_value: &JsonValue) -> Result<Value> {
1577 match json_value {
1578 JsonValue::Null => Ok(Value::Null),
1579 JsonValue::Bool(b) => Ok(Value::Bool(*b)),
1580 JsonValue::Number(n) => {
1581 if let Some(i) = n.as_i64() {
1582 Ok(Value::Int(i))
1583 } else if let Some(f) = n.as_f64() {
1584 Ok(Value::Float(f))
1585 } else {
1586 Err(AuroraError::InvalidOperation("Invalid number value".into()))
1587 }
1588 }
1589 JsonValue::String(s) => {
1590 if let Ok(uuid) = Uuid::parse_str(s) {
1592 Ok(Value::Uuid(uuid))
1593 } else {
1594 Ok(Value::String(s.clone()))
1595 }
1596 }
1597 JsonValue::Array(arr) => {
1598 let mut values = Vec::new();
1599 for item in arr {
1600 values.push(self.json_to_value(item)?);
1601 }
1602 Ok(Value::Array(values))
1603 }
1604 JsonValue::Object(obj) => {
1605 let mut map = HashMap::new();
1606 for (k, v) in obj {
1607 map.insert(k.clone(), self.json_to_value(v)?);
1608 }
1609 Ok(Value::Object(map))
1610 }
1611 }
1612 }
1613
1614 fn get_collection_definition(&self, collection: &str) -> Result<Collection> {
1616 if let Some(data) = self.get(&format!("_collection:{}", collection))? {
1617 let collection_def: Collection = serde_json::from_slice(&data)?;
1618 Ok(collection_def)
1619 } else {
1620 Err(AuroraError::CollectionNotFound(collection.to_string()))
1621 }
1622 }
1623
1624 pub fn get_database_stats(&self) -> Result<DatabaseStats> {
1626 let hot_stats = self.hot.get_stats();
1627 let cold_stats = self.cold.get_stats()?;
1628
1629 Ok(DatabaseStats {
1630 hot_stats,
1631 cold_stats,
1632 estimated_size: self.cold.estimated_size(),
1633 collections: self.get_collection_stats()?,
1634 })
1635 }
1636
1637 pub fn is_in_hot_cache(&self, key: &str) -> bool {
1639 self.hot.is_hot(key)
1640 }
1641
1642 pub async fn start_hot_cache_maintenance(&self, interval_secs: u64) {
1644 let hot_store = Arc::new(self.hot.clone());
1645 hot_store.start_cleanup_with_interval(interval_secs).await;
1646 }
1647
1648 pub fn clear_hot_cache(&self) {
1650 self.hot.clear();
1651 println!(
1652 "Hot cache cleared, current hit ratio: {:.2}%",
1653 self.hot.hit_ratio() * 100.0
1654 );
1655 }
1656
1657 pub async fn prewarm_cache(&self, collection: &str, limit: Option<usize>) -> Result<usize> {
1669 let limit = limit.unwrap_or(1000);
1670 let prefix = format!("{}:", collection);
1671 let mut loaded = 0;
1672
1673 for entry in self.cold.scan_prefix(&prefix) {
1674 if loaded >= limit {
1675 break;
1676 }
1677
1678 if let Ok((key, value)) = entry {
1679 self.hot.set(key.clone(), value, None);
1681 loaded += 1;
1682 }
1683 }
1684
1685 println!("Prewarmed {} with {} documents", collection, loaded);
1686 Ok(loaded)
1687 }
1688
1689 pub async fn prewarm_all_collections(
1691 &self,
1692 docs_per_collection: Option<usize>,
1693 ) -> Result<HashMap<String, usize>> {
1694 let mut stats = HashMap::new();
1695
1696 let collections: Vec<String> = self
1698 .cold
1699 .scan()
1700 .filter_map(|r| r.ok())
1701 .map(|(k, _)| k)
1702 .filter(|k| k.starts_with("_collection:"))
1703 .map(|k| k.trim_start_matches("_collection:").to_string())
1704 .collect();
1705
1706 for collection in collections {
1707 let loaded = self.prewarm_cache(&collection, docs_per_collection).await?;
1708 stats.insert(collection, loaded);
1709 }
1710
1711 Ok(stats)
1712 }
1713
1714 pub fn batch_write(&self, pairs: Vec<(String, Vec<u8>)>) -> Result<()> {
1716 let mut collections: HashMap<String, Vec<(String, Vec<u8>)>> = HashMap::new();
1718 for (key, value) in &pairs {
1719 if let Some(collection_name) = key.split(':').next() {
1720 collections
1721 .entry(collection_name.to_string())
1722 .or_default()
1723 .push((key.clone(), value.clone()));
1724 }
1725 }
1726
1727 self.cold.batch_set(pairs)?;
1729
1730 for (collection_name, batch) in collections {
1732 let collection_obj = match self.schema_cache.get(&collection_name) {
1736 Some(cached_schema) => Arc::clone(cached_schema.value()),
1737 None => {
1738 let collection_key = format!("_collection:{}", collection_name);
1739 match self.get(&collection_key)? {
1740 Some(data) => {
1741 let obj: Collection = serde_json::from_slice(&data)?;
1742 let arc_obj = Arc::new(obj);
1743 self.schema_cache
1744 .insert(collection_name.to_string(), Arc::clone(&arc_obj));
1745 arc_obj
1746 }
1747 None => continue,
1748 }
1749 }
1750 };
1751
1752 let indexed_fields: Vec<String> = collection_obj
1753 .fields
1754 .iter()
1755 .filter(|(_, def)| def.indexed || def.unique)
1756 .map(|(name, _)| name.clone())
1757 .collect();
1758
1759 let primary_index = self
1760 .primary_indices
1761 .entry(collection_name.to_string())
1762 .or_insert_with(DashMap::new);
1763
1764 for (key, value) in batch {
1765 self.hot.set(key.clone(), value.clone(), None);
1767
1768 primary_index.insert(key.clone(), value.clone());
1770
1771 if !indexed_fields.is_empty() {
1773 if let Ok(doc) = serde_json::from_slice::<Document>(&value) {
1774 for (field, field_value) in doc.data {
1775 if indexed_fields.contains(&field) {
1776 let value_str = match &field_value {
1777 Value::String(s) => s.clone(),
1778 _ => field_value.to_string(),
1779 };
1780 let index_key = format!("{}:{}", collection_name, field);
1781 let secondary_index = self
1782 .secondary_indices
1783 .entry(index_key)
1784 .or_insert_with(DashMap::new);
1785
1786 let max_entries = self.config.max_index_entries_per_field;
1787 secondary_index
1788 .entry(value_str)
1789 .and_modify(|doc_ids| {
1790 if doc_ids.len() < max_entries {
1791 doc_ids.push(key.to_string());
1792 }
1793 })
1794 .or_insert_with(|| vec![key.to_string()]);
1795 }
1796 }
1797 }
1798 }
1799 }
1800 }
1801
1802 Ok(())
1803 }
1804
1805 pub fn scan_with_prefix(
1807 &self,
1808 prefix: &str,
1809 ) -> impl Iterator<Item = Result<(String, Vec<u8>)>> + '_ {
1810 self.cold.scan_prefix(prefix)
1811 }
1812
1813 pub fn get_collection_stats(&self) -> Result<HashMap<String, CollectionStats>> {
1815 let mut stats = HashMap::new();
1816
1817 let collections: Vec<String> = self
1819 .cold
1820 .scan()
1821 .filter_map(|r| r.ok())
1822 .map(|(k, _)| k)
1823 .filter(|k| k.starts_with("_collection:"))
1824 .map(|k| k.trim_start_matches("_collection:").to_string())
1825 .collect();
1826
1827 for collection in collections {
1828 let prefix = format!("{}:", collection);
1829
1830 let count = self.cold.scan_prefix(&prefix).count();
1832
1833 let size: usize = self
1835 .cold
1836 .scan_prefix(&prefix)
1837 .filter_map(|r| r.ok())
1838 .map(|(_, v)| v.len())
1839 .sum();
1840
1841 stats.insert(
1842 collection,
1843 CollectionStats {
1844 count,
1845 size_bytes: size,
1846 avg_doc_size: if count > 0 { size / count } else { 0 },
1847 },
1848 );
1849 }
1850
1851 Ok(stats)
1852 }
1853
1854 pub fn search_by_value(
1858 &self,
1859 collection: &str,
1860 field: &str,
1861 value: &Value,
1862 ) -> Result<Vec<Document>> {
1863 let index_key = format!("_index:{}:{}", collection, field);
1864
1865 if let Some(index_data) = self.get(&index_key)? {
1866 let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
1867 let index = Index::new(index_def);
1868
1869 if let Some(doc_ids) = index.search(value) {
1871 let mut docs = Vec::new();
1873 for id in doc_ids {
1874 if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
1875 let doc: Document = serde_json::from_slice(&doc_data)?;
1876 docs.push(doc);
1877 }
1878 }
1879 return Ok(docs);
1880 }
1881 }
1882
1883 Ok(Vec::new())
1885 }
1886
1887 pub fn full_text_search(
1892 &self,
1893 collection: &str,
1894 field: &str,
1895 query: &str,
1896 ) -> Result<Vec<Document>> {
1897 let index_key = format!("_index:{}:{}", collection, field);
1898
1899 if let Some(index_data) = self.get(&index_key)? {
1900 let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
1901
1902 if !matches!(index_def.index_type, IndexType::FullText) {
1904 return Err(AuroraError::InvalidOperation(format!(
1905 "Field '{}' is not indexed as full-text",
1906 field
1907 )));
1908 }
1909
1910 let index = Index::new(index_def);
1911
1912 if let Some(doc_id_scores) = index.search_text(query) {
1914 let mut docs = Vec::new();
1916 for (id, _score) in doc_id_scores {
1917 if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
1918 let doc: Document = serde_json::from_slice(&doc_data)?;
1919 docs.push(doc);
1920 }
1921 }
1922 return Ok(docs);
1923 }
1924 }
1925
1926 Ok(Vec::new())
1928 }
1929
1930 pub fn create_text_index(
1932 &self,
1933 collection: &str,
1934 field: &str,
1935 _enable_stop_words: bool,
1936 ) -> Result<()> {
1937 if self.get(&format!("_collection:{}", collection))?.is_none() {
1939 return Err(AuroraError::CollectionNotFound(collection.to_string()));
1940 }
1941
1942 let index_def = IndexDefinition {
1944 name: format!("{}_{}_fulltext", collection, field),
1945 collection: collection.to_string(),
1946 fields: vec![field.to_string()],
1947 index_type: IndexType::FullText,
1948 unique: false,
1949 };
1950
1951 let index_key = format!("_index:{}:{}", collection, field);
1953 self.put(index_key, serde_json::to_vec(&index_def)?, None)?;
1954
1955 let index = Index::new(index_def);
1957
1958 let prefix = format!("{}:", collection);
1960 for result in self.cold.scan_prefix(&prefix) {
1961 if let Ok((_, data)) = result {
1962 let doc: Document = serde_json::from_slice(&data)?;
1963 index.insert(&doc)?;
1964 }
1965 }
1966
1967 Ok(())
1968 }
1969
1970 pub async fn execute_simple_query(
1971 &self,
1972 builder: &SimpleQueryBuilder,
1973 ) -> Result<Vec<Document>> {
1974 self.ensure_indices_initialized().await?;
1976
1977 let mut doc_ids_to_load: Option<Vec<String>> = None;
1979 let mut used_filter_index: Option<usize> = None;
1980
1981 for (filter_idx, filter) in builder.filters.iter().enumerate() {
1984 match filter {
1985 Filter::Eq(field, value) => {
1986 let index_key = format!("{}:{}", &builder.collection, field);
1987
1988 if let Some(index) = self.secondary_indices.get(&index_key) {
1990 if let Some(matching_ids) = index.get(&value.to_string()) {
1992 doc_ids_to_load = Some(matching_ids.clone());
1993 used_filter_index = Some(filter_idx);
1994 break; }
1996 }
1997 }
1998 Filter::Gt(field, value)
1999 | Filter::Gte(field, value)
2000 | Filter::Lt(field, value)
2001 | Filter::Lte(field, value) => {
2002 let index_key = format!("{}:{}", &builder.collection, field);
2003
2004 if let Some(index) = self.secondary_indices.get(&index_key) {
2006 let mut matching_ids = Vec::new();
2008
2009 for entry in index.iter() {
2010 let index_value_str = entry.key();
2011
2012 if let Ok(index_value) =
2014 self.parse_value_from_string(index_value_str, value)
2015 {
2016 let matches = match filter {
2017 Filter::Gt(_, filter_val) => index_value > *filter_val,
2018 Filter::Gte(_, filter_val) => index_value >= *filter_val,
2019 Filter::Lt(_, filter_val) => index_value < *filter_val,
2020 Filter::Lte(_, filter_val) => index_value <= *filter_val,
2021 _ => false,
2022 };
2023
2024 if matches {
2025 matching_ids.extend(entry.value().clone());
2026 }
2027 }
2028 }
2029
2030 if !matching_ids.is_empty() {
2031 doc_ids_to_load = Some(matching_ids);
2032 used_filter_index = Some(filter_idx);
2033 break;
2034 }
2035 }
2036 }
2037 Filter::Contains(field, search_term) => {
2038 let index_key = format!("{}:{}", &builder.collection, field);
2039
2040 if let Some(index) = self.secondary_indices.get(&index_key) {
2042 let mut matching_ids = Vec::new();
2045
2046 for entry in index.iter() {
2047 let index_value_str = entry.key();
2048
2049 if index_value_str
2051 .to_lowercase()
2052 .contains(&search_term.to_lowercase())
2053 {
2054 matching_ids.extend(entry.value().clone());
2055 }
2056 }
2057
2058 if !matching_ids.is_empty() {
2059 matching_ids.sort();
2061 matching_ids.dedup();
2062
2063 doc_ids_to_load = Some(matching_ids);
2064 used_filter_index = Some(filter_idx);
2065 break;
2066 }
2067 }
2068 }
2069 }
2070 }
2071
2072 let mut final_docs: Vec<Document>;
2073
2074 if let Some(ids) = doc_ids_to_load {
2075 println!("📊 Loading {} documents via index", ids.len());
2078 final_docs = Vec::with_capacity(ids.len());
2079
2080 for id in ids {
2081 let doc_key = format!("{}:{}", &builder.collection, id);
2082 if let Some(data) = self.get(&doc_key)? {
2083 if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
2084 final_docs.push(doc);
2085 }
2086 }
2087 }
2088 } else {
2089 println!(
2092 "⚠️ INDEX MISS. Falling back to full collection scan for '{}'",
2093 &builder.collection
2094 );
2095 final_docs = self.get_all_collection(&builder.collection).await?;
2096 }
2097
2098 final_docs.retain(|doc| {
2102 builder.filters.iter().enumerate().all(|(idx, filter)| {
2103 if Some(idx) == used_filter_index {
2105 return true;
2106 }
2107
2108 match filter {
2109 Filter::Eq(field, value) => doc.data.get(field).map_or(false, |v| v == value),
2110 Filter::Gt(field, value) => doc.data.get(field).map_or(false, |v| v > value),
2111 Filter::Gte(field, value) => doc.data.get(field).map_or(false, |v| v >= value),
2112 Filter::Lt(field, value) => doc.data.get(field).map_or(false, |v| v < value),
2113 Filter::Lte(field, value) => doc.data.get(field).map_or(false, |v| v <= value),
2114 Filter::Contains(field, value_str) => {
2115 doc.data.get(field).map_or(false, |v| match v {
2116 Value::String(s) => s.contains(value_str),
2117 Value::Array(arr) => arr.contains(&Value::String(value_str.clone())),
2118 _ => false,
2119 })
2120 }
2121 }
2122 })
2123 });
2124
2125 println!(
2126 "✅ Query completed. Returning {} documents",
2127 final_docs.len()
2128 );
2129
2130 Ok(final_docs)
2133 }
2134
2135 fn parse_value_from_string(&self, value_str: &str, reference_value: &Value) -> Result<Value> {
2137 match reference_value {
2138 Value::Int(_) => {
2139 if let Ok(i) = value_str.parse::<i64>() {
2140 Ok(Value::Int(i))
2141 } else {
2142 Err(AuroraError::InvalidOperation("Failed to parse int".into()))
2143 }
2144 }
2145 Value::Float(_) => {
2146 if let Ok(f) = value_str.parse::<f64>() {
2147 Ok(Value::Float(f))
2148 } else {
2149 Err(AuroraError::InvalidOperation(
2150 "Failed to parse float".into(),
2151 ))
2152 }
2153 }
2154 Value::String(_) => Ok(Value::String(value_str.to_string())),
2155 _ => Ok(Value::String(value_str.to_string())),
2156 }
2157 }
2158
2159 pub async fn execute_dynamic_query(
2160 &self,
2161 collection: &str,
2162 payload: &QueryPayload,
2163 ) -> Result<Vec<Document>> {
2164 let mut docs = self.get_all_collection(collection).await?;
2165
2166 if let Some(filters) = &payload.filters {
2168 docs.retain(|doc| {
2169 filters.iter().all(|filter| {
2170 doc.data
2171 .get(&filter.field)
2172 .map_or(false, |doc_val| check_filter(doc_val, filter))
2173 })
2174 });
2175 }
2176
2177 if let Some(sort_options) = &payload.sort {
2179 docs.sort_by(|a, b| {
2180 let a_val = a.data.get(&sort_options.field);
2181 let b_val = b.data.get(&sort_options.field);
2182 let ordering = a_val
2183 .partial_cmp(&b_val)
2184 .unwrap_or(std::cmp::Ordering::Equal);
2185 if sort_options.ascending {
2186 ordering
2187 } else {
2188 ordering.reverse()
2189 }
2190 });
2191 }
2192
2193 let mut docs = docs;
2195 if let Some(offset) = payload.offset {
2196 docs = docs.into_iter().skip(offset).collect();
2197 }
2198 if let Some(limit) = payload.limit {
2199 docs = docs.into_iter().take(limit).collect();
2200 }
2201
2202 if let Some(select_fields) = &payload.select {
2204 if !select_fields.is_empty() {
2205 docs = docs
2206 .into_iter()
2207 .map(|mut doc| {
2208 doc.data.retain(|key, _| select_fields.contains(key));
2209 doc
2210 })
2211 .collect();
2212 }
2213 }
2214
2215 Ok(docs)
2216 }
2217
2218 pub async fn process_network_request(
2219 &self,
2220 request: crate::network::protocol::Request,
2221 ) -> crate::network::protocol::Response {
2222 use crate::network::protocol::Response;
2223
2224 match request {
2225 crate::network::protocol::Request::Get(key) => match self.get(&key) {
2226 Ok(value) => Response::Success(value),
2227 Err(e) => Response::Error(e.to_string()),
2228 },
2229 crate::network::protocol::Request::Put(key, value) => {
2230 match self.put(key, value, None) {
2231 Ok(_) => Response::Done,
2232 Err(e) => Response::Error(e.to_string()),
2233 }
2234 }
2235 crate::network::protocol::Request::Delete(key) => match self.delete(&key).await {
2236 Ok(_) => Response::Done,
2237 Err(e) => Response::Error(e.to_string()),
2238 },
2239 crate::network::protocol::Request::NewCollection { name, fields } => {
2240 let fields_for_db: Vec<(String, crate::types::FieldType, bool)> = fields
2241 .iter()
2242 .map(|(name, ft, unique)| (name.clone(), ft.clone(), *unique))
2243 .collect();
2244
2245 match self.new_collection(&name, fields_for_db) {
2246 Ok(_) => Response::Done,
2247 Err(e) => Response::Error(e.to_string()),
2248 }
2249 }
2250 crate::network::protocol::Request::Insert { collection, data } => {
2251 match self.insert_map(&collection, data).await {
2252 Ok(id) => Response::Message(id),
2253 Err(e) => Response::Error(e.to_string()),
2254 }
2255 }
2256 crate::network::protocol::Request::GetDocument { collection, id } => {
2257 match self.get_document(&collection, &id) {
2258 Ok(doc) => Response::Document(doc),
2259 Err(e) => Response::Error(e.to_string()),
2260 }
2261 }
2262 crate::network::protocol::Request::Query(builder) => {
2263 match self.execute_simple_query(&builder).await {
2264 Ok(docs) => Response::Documents(docs),
2265 Err(e) => Response::Error(e.to_string()),
2266 }
2267 }
2268 crate::network::protocol::Request::BeginTransaction => {
2269 let tx_id = self.begin_transaction();
2270 Response::TransactionId(tx_id.as_u64())
2271 }
2272 crate::network::protocol::Request::CommitTransaction(tx_id_u64) => {
2273 let tx_id = crate::transaction::TransactionId::from_u64(tx_id_u64);
2274 match self.commit_transaction(tx_id) {
2275 Ok(_) => Response::Done,
2276 Err(e) => Response::Error(e.to_string()),
2277 }
2278 }
2279 crate::network::protocol::Request::RollbackTransaction(tx_id_u64) => {
2280 let tx_id = crate::transaction::TransactionId::from_u64(tx_id_u64);
2281 match self.rollback_transaction(tx_id) {
2282 Ok(_) => Response::Done,
2283 Err(e) => Response::Error(e.to_string()),
2284 }
2285 }
2286 }
2287 }
2288
2289 pub async fn create_indices(&self, collection: &str, fields: &[&str]) -> Result<()> {
2304 for field in fields {
2305 if let Err(e) = self.create_index(collection, field).await {
2306 eprintln!(
2307 "Warning: Failed to create index for {}.{}: {}",
2308 collection, field, e
2309 );
2310 } else {
2311 println!("✅ Created index for {}.{}", collection, field);
2312 }
2313 }
2314 Ok(())
2315 }
2316
2317 pub fn get_index_stats(&self, collection: &str) -> HashMap<String, IndexStats> {
2321 let mut stats = HashMap::new();
2322
2323 for entry in self.secondary_indices.iter() {
2324 let key = entry.key();
2325 if key.starts_with(&format!("{}:", collection)) {
2326 let field = key.split(':').nth(1).unwrap_or("unknown");
2327 let index = entry.value();
2328
2329 let unique_values = index.len();
2330 let total_documents: usize = index.iter().map(|entry| entry.value().len()).sum();
2331
2332 stats.insert(
2333 field.to_string(),
2334 IndexStats {
2335 unique_values,
2336 total_documents,
2337 avg_docs_per_value: if unique_values > 0 {
2338 total_documents / unique_values
2339 } else {
2340 0
2341 },
2342 },
2343 );
2344 }
2345 }
2346
2347 stats
2348 }
2349
2350 pub async fn optimize_collection(&self, collection: &str) -> Result<()> {
2354 if let Ok(collection_def) = self.get_collection_definition(collection) {
2358 let field_names: Vec<&str> = collection_def.fields.keys().map(|s| s.as_str()).collect();
2359 self.create_indices(collection, &field_names).await?;
2360 println!(
2361 "🚀 Optimized collection '{}' with {} indices",
2362 collection,
2363 field_names.len()
2364 );
2365 }
2366
2367 Ok(())
2368 }
2369
2370 fn get_unique_fields(&self, collection: &Collection) -> Vec<String> {
2372 collection
2373 .fields
2374 .iter()
2375 .filter(|(_, def)| def.unique)
2376 .map(|(name, _)| name.clone())
2377 .collect()
2378 }
2379
2380 async fn validate_unique_constraints(
2382 &self,
2383 collection: &str,
2384 data: &HashMap<String, Value>,
2385 ) -> Result<()> {
2386 self.ensure_indices_initialized().await?;
2387 let collection_def = self.get_collection_definition(collection)?;
2388 let unique_fields = self.get_unique_fields(&collection_def);
2389
2390 for unique_field in &unique_fields {
2391 if let Some(value) = data.get(unique_field) {
2392 let index_key = format!("{}:{}", collection, unique_field);
2393 if let Some(index) = self.secondary_indices.get(&index_key) {
2394 let value_str = match value {
2396 Value::String(s) => s.clone(),
2397 _ => value.to_string(),
2398 };
2399 if index.contains_key(&value_str) {
2400 return Err(AuroraError::UniqueConstraintViolation(
2401 unique_field.clone(),
2402 value_str,
2403 ));
2404 }
2405 }
2406 }
2407 }
2408 Ok(())
2409 }
2410
2411 async fn validate_unique_constraints_excluding(
2413 &self,
2414 collection: &str,
2415 data: &HashMap<String, Value>,
2416 exclude_id: &str,
2417 ) -> Result<()> {
2418 self.ensure_indices_initialized().await?;
2419 let collection_def = self.get_collection_definition(collection)?;
2420 let unique_fields = self.get_unique_fields(&collection_def);
2421
2422 for unique_field in &unique_fields {
2423 if let Some(value) = data.get(unique_field) {
2424 let index_key = format!("{}:{}", collection, unique_field);
2425 if let Some(index) = self.secondary_indices.get(&index_key) {
2426 let value_str = match value {
2428 Value::String(s) => s.clone(),
2429 _ => value.to_string(),
2430 };
2431 if let Some(doc_ids) = index.get(&value_str) {
2432 let exclude_key = format!("{}:{}", collection, exclude_id);
2434 for doc_key in doc_ids.value() {
2435 if doc_key != &exclude_key {
2436 return Err(AuroraError::UniqueConstraintViolation(
2437 unique_field.clone(),
2438 value_str,
2439 ));
2440 }
2441 }
2442 }
2443 }
2444 }
2445 }
2446 Ok(())
2447 }
2448}
2449
2450fn check_filter(doc_val: &Value, filter: &HttpFilter) -> bool {
2451 let filter_val = match json_to_value(&filter.value) {
2452 Ok(v) => v,
2453 Err(_) => return false,
2454 };
2455
2456 match filter.operator {
2457 FilterOperator::Eq => doc_val == &filter_val,
2458 FilterOperator::Ne => doc_val != &filter_val,
2459 FilterOperator::Gt => doc_val > &filter_val,
2460 FilterOperator::Gte => doc_val >= &filter_val,
2461 FilterOperator::Lt => doc_val < &filter_val,
2462 FilterOperator::Lte => doc_val <= &filter_val,
2463 FilterOperator::Contains => match (doc_val, &filter_val) {
2464 (Value::String(s), Value::String(fv)) => s.contains(fv),
2465 (Value::Array(arr), _) => arr.contains(&filter_val),
2466 _ => false,
2467 },
2468 }
2469}
2470
2471enum ImportResult {
2473 Imported,
2474 Skipped,
2475}
2476
2477#[derive(Debug, Default)]
2479pub struct ImportStats {
2480 pub imported: usize,
2482 pub skipped: usize,
2484 pub failed: usize,
2486}
2487
2488#[derive(Debug)]
2490pub struct CollectionStats {
2491 pub count: usize,
2493 pub size_bytes: usize,
2495 pub avg_doc_size: usize,
2497}
2498
2499#[derive(Debug)]
2501pub struct IndexStats {
2502 pub unique_values: usize,
2504 pub total_documents: usize,
2506 pub avg_docs_per_value: usize,
2508}
2509
2510#[derive(Debug)]
2512pub struct DatabaseStats {
2513 pub hot_stats: crate::storage::hot::CacheStats,
2515 pub cold_stats: crate::storage::cold::ColdStoreStats,
2517 pub estimated_size: u64,
2519 pub collections: HashMap<String, CollectionStats>,
2521}
2522
2523#[cfg(test)]
2524mod tests {
2525 use super::*;
2526 use tempfile::tempdir;
2527
2528 #[tokio::test]
2529 async fn test_basic_operations() -> Result<()> {
2530 let temp_dir = tempdir()?;
2531 let db_path = temp_dir.path().join("test.aurora");
2532 let db = Aurora::open(db_path.to_str().unwrap())?;
2533
2534 db.new_collection(
2536 "users",
2537 vec![
2538 ("name", FieldType::String, false),
2539 ("age", FieldType::Int, false),
2540 ("email", FieldType::String, true),
2541 ],
2542 )?;
2543
2544 let doc_id = db
2546 .insert_into(
2547 "users",
2548 vec![
2549 ("name", Value::String("John Doe".to_string())),
2550 ("age", Value::Int(30)),
2551 ("email", Value::String("john@example.com".to_string())),
2552 ],
2553 )
2554 .await?;
2555
2556 let doc = db.get_document("users", &doc_id)?.unwrap();
2558 assert_eq!(
2559 doc.data.get("name").unwrap(),
2560 &Value::String("John Doe".to_string())
2561 );
2562 assert_eq!(doc.data.get("age").unwrap(), &Value::Int(30));
2563
2564 Ok(())
2565 }
2566
2567 #[tokio::test]
2568 async fn test_transactions() -> Result<()> {
2569 let temp_dir = tempdir()?;
2570 let db_path = temp_dir.path().join("test.aurora");
2571 let db = Aurora::open(db_path.to_str().unwrap())?;
2572
2573 let tx_id = db.begin_transaction();
2575
2576 let doc_id = db
2578 .insert_into("test", vec![("field", Value::String("value".to_string()))])
2579 .await?;
2580
2581 db.commit_transaction(tx_id)?;
2583
2584 let doc = db.get_document("test", &doc_id)?.unwrap();
2586 assert_eq!(
2587 doc.data.get("field").unwrap(),
2588 &Value::String("value".to_string())
2589 );
2590
2591 Ok(())
2592 }
2593
2594 #[tokio::test]
2595 async fn test_query_operations() -> Result<()> {
2596 let temp_dir = tempdir()?;
2597 let db_path = temp_dir.path().join("test.aurora");
2598 let db = Aurora::open(db_path.to_str().unwrap())?;
2599
2600 db.new_collection(
2602 "books",
2603 vec![
2604 ("title", FieldType::String, false),
2605 ("author", FieldType::String, false),
2606 ("year", FieldType::Int, false),
2607 ],
2608 )?;
2609
2610 db.insert_into(
2612 "books",
2613 vec![
2614 ("title", Value::String("Book 1".to_string())),
2615 ("author", Value::String("Author 1".to_string())),
2616 ("year", Value::Int(2020)),
2617 ],
2618 )
2619 .await?;
2620
2621 db.insert_into(
2622 "books",
2623 vec![
2624 ("title", Value::String("Book 2".to_string())),
2625 ("author", Value::String("Author 2".to_string())),
2626 ("year", Value::Int(2021)),
2627 ],
2628 )
2629 .await?;
2630
2631 let results = db
2633 .query("books")
2634 .filter(|f| f.gt("year", Value::Int(2019)))
2635 .order_by("year", true)
2636 .collect()
2637 .await?;
2638
2639 assert_eq!(results.len(), 2);
2640 assert!(results[0].data.get("year").unwrap() < results[1].data.get("year").unwrap());
2641
2642 Ok(())
2643 }
2644
2645 #[tokio::test]
2646 async fn test_blob_operations() -> Result<()> {
2647 let temp_dir = tempdir()?;
2648 let db_path = temp_dir.path().join("test.aurora");
2649 let db = Aurora::open(db_path.to_str().unwrap())?;
2650
2651 let file_path = temp_dir.path().join("test.txt");
2653 std::fs::write(&file_path, b"Hello, World!")?;
2654
2655 db.put_blob("test:blob".to_string(), &file_path).await?;
2657
2658 let data = db.get_data_by_pattern("test:blob")?;
2660 assert_eq!(data.len(), 1);
2661 match &data[0].1 {
2662 DataInfo::Blob { size } => assert_eq!(*size, 13 + 5), _ => panic!("Expected Blob type"),
2664 }
2665
2666 Ok(())
2667 }
2668
2669 #[tokio::test]
2670 async fn test_blob_size_limit() -> Result<()> {
2671 let temp_dir = tempdir()?;
2672 let db_path = temp_dir.path().join("test.aurora");
2673 let db = Aurora::open(db_path.to_str().unwrap())?;
2674
2675 let large_file_path = temp_dir.path().join("large_file.bin");
2677 let large_data = vec![0u8; 201 * 1024 * 1024];
2678 std::fs::write(&large_file_path, &large_data)?;
2679
2680 let result = db
2682 .put_blob("test:large_blob".to_string(), &large_file_path)
2683 .await;
2684
2685 assert!(result.is_err());
2686 assert!(matches!(
2687 result.unwrap_err(),
2688 AuroraError::InvalidOperation(_)
2689 ));
2690
2691 Ok(())
2692 }
2693
2694 #[tokio::test]
2695 async fn test_unique_constraints() -> Result<()> {
2696 let temp_dir = tempdir()?;
2697 let db_path = temp_dir.path().join("test.aurora");
2698 let db = Aurora::open(db_path.to_str().unwrap())?;
2699
2700 db.new_collection(
2702 "users",
2703 vec![
2704 ("name", FieldType::String, false),
2705 ("email", FieldType::String, true), ("age", FieldType::Int, false),
2707 ],
2708 )?;
2709
2710 let _doc_id1 = db
2712 .insert_into(
2713 "users",
2714 vec![
2715 ("name", Value::String("John Doe".to_string())),
2716 ("email", Value::String("john@example.com".to_string())),
2717 ("age", Value::Int(30)),
2718 ],
2719 )
2720 .await?;
2721
2722 let result = db
2724 .insert_into(
2725 "users",
2726 vec![
2727 ("name", Value::String("Jane Doe".to_string())),
2728 ("email", Value::String("john@example.com".to_string())), ("age", Value::Int(25)),
2730 ],
2731 )
2732 .await;
2733
2734 assert!(result.is_err());
2735 if let Err(AuroraError::UniqueConstraintViolation(field, value)) = result {
2736 assert_eq!(field, "email");
2737 assert_eq!(value, "john@example.com");
2738 } else {
2739 panic!("Expected UniqueConstraintViolation error");
2740 }
2741
2742 let _doc_id2 = db
2745 .upsert(
2746 "users",
2747 "user2",
2748 vec![
2749 ("name", Value::String("Alice Smith".to_string())),
2750 ("email", Value::String("alice@example.com".to_string())),
2751 ("age", Value::Int(28)),
2752 ],
2753 )
2754 .await?;
2755
2756 let result = db
2758 .upsert(
2759 "users",
2760 "user3",
2761 vec![
2762 ("name", Value::String("Bob Wilson".to_string())),
2763 ("email", Value::String("alice@example.com".to_string())), ("age", Value::Int(35)),
2765 ],
2766 )
2767 .await;
2768
2769 assert!(result.is_err());
2770
2771 let result = db
2773 .upsert(
2774 "users",
2775 "user2",
2776 vec![
2777 ("name", Value::String("Alice Updated".to_string())),
2778 ("email", Value::String("alice@example.com".to_string())), ("age", Value::Int(29)),
2780 ],
2781 )
2782 .await;
2783
2784 assert!(result.is_ok());
2785
2786 Ok(())
2787 }
2788}