1use crate::error::{AuroraError, Result};
47use crate::index::{Index, IndexDefinition, IndexType};
48use crate::network::http_models::{
49 Filter as HttpFilter, FilterOperator, QueryPayload, json_to_value,
50};
51use crate::query::{Filter, FilterBuilder, QueryBuilder, SearchBuilder, SimpleQueryBuilder};
52use crate::storage::{ColdStore, HotStore};
53use crate::types::{AuroraConfig, Collection, Document, FieldDefinition, FieldType, Value};
54use dashmap::DashMap;
55use serde_json::Value as JsonValue;
56use serde_json::from_str;
57use std::collections::HashMap;
58use std::fs::File as StdFile;
59use std::path::{Path, PathBuf};
60use std::sync::Arc;
61use std::time::Duration;
62use tokio::fs::File;
63use tokio::fs::read_to_string;
64use tokio::io::AsyncReadExt;
65use tokio::sync::OnceCell;
66use uuid::Uuid;
67type PrimaryIndex = DashMap<String, Vec<u8>>;
69type SecondaryIndex = DashMap<String, Vec<String>>;
70
71#[derive(Debug)]
73pub enum DataInfo {
74 Data { size: usize, preview: String },
75 Blob { size: usize },
76 Compressed { size: usize },
77}
78
79impl DataInfo {
80 pub fn size(&self) -> usize {
81 match self {
82 DataInfo::Data { size, .. } => *size,
83 DataInfo::Blob { size } => *size,
84 DataInfo::Compressed { size } => *size,
85 }
86 }
87}
88
89pub struct Aurora {
113 hot: HotStore,
114 cold: ColdStore,
115 primary_indices: Arc<DashMap<String, PrimaryIndex>>,
117 secondary_indices: Arc<DashMap<String, SecondaryIndex>>,
118 indices_initialized: Arc<OnceCell<()>>,
119 in_transaction: std::sync::atomic::AtomicBool,
120 transaction_ops: DashMap<String, Vec<u8>>,
121 indices: Arc<DashMap<String, Index>>,
122}
123
124impl Aurora {
125 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
146 let path = Self::resolve_path(path)?;
147
148 if let Some(parent) = path.parent() {
150 if !parent.exists() {
151 std::fs::create_dir_all(parent)?;
152 }
153 }
154
155 let cold = ColdStore::new(path.to_str().unwrap())?;
157 let hot = HotStore::new();
158
159 let db = Self {
161 hot,
162 cold,
163 primary_indices: Arc::new(DashMap::new()),
164 secondary_indices: Arc::new(DashMap::new()),
165 indices_initialized: Arc::new(OnceCell::new()),
166 in_transaction: std::sync::atomic::AtomicBool::new(false),
167 transaction_ops: DashMap::new(),
168 indices: Arc::new(DashMap::new()),
169 };
170
171 Ok(db)
175 }
176
177 fn resolve_path<P: AsRef<Path>>(path: P) -> Result<PathBuf> {
179 let path = path.as_ref();
180
181 if path.is_absolute() {
183 return Ok(path.to_path_buf());
184 }
185
186 match std::env::current_dir() {
188 Ok(current_dir) => Ok(current_dir.join(path)),
189 Err(e) => Err(AuroraError::IoError(format!(
190 "Failed to resolve current directory: {}",
191 e
192 ))),
193 }
194 }
195
196 pub fn with_config(config: AuroraConfig) -> Result<Self> {
198 let path = Self::resolve_path(&config.db_path)?;
199
200 if config.create_dirs {
201 if let Some(parent) = path.parent() {
202 if !parent.exists() {
203 std::fs::create_dir_all(parent)?;
204 }
205 }
206 }
207
208 let cold = ColdStore::with_config(
210 path.to_str().unwrap(),
211 config.cold_cache_capacity_mb,
212 config.cold_flush_interval_ms,
213 config.cold_mode,
214 )?;
215
216 let hot = HotStore::with_config(
217 config.hot_cache_size_mb,
218 config.hot_cache_cleanup_interval_secs,
219 );
220
221 let db = Self {
223 hot,
224 cold,
225 primary_indices: Arc::new(DashMap::new()),
226 secondary_indices: Arc::new(DashMap::new()),
227 indices_initialized: Arc::new(OnceCell::new()),
228 in_transaction: std::sync::atomic::AtomicBool::new(false),
229 transaction_ops: DashMap::new(),
230 indices: Arc::new(DashMap::new()),
231 };
232
233 if config.auto_compact {
235 }
238
239 Ok(db)
240 }
241
242 pub async fn ensure_indices_initialized(&self) -> Result<()> {
244 self.indices_initialized
245 .get_or_init(|| async {
246 println!("Initializing indices...");
247 if let Err(e) = self.initialize_indices() {
248 eprintln!("Failed to initialize indices: {:?}", e);
249 }
250 println!("Indices initialized");
251 ()
252 })
253 .await;
254 Ok(())
255 }
256
257 fn initialize_indices(&self) -> Result<()> {
258 for result in self.cold.scan() {
260 let (key, value) = result?;
261 let key_str = std::str::from_utf8(&key.as_bytes())
262 .map_err(|_| AuroraError::InvalidKey("Invalid UTF-8".into()))?;
263
264 if let Some(collection_name) = key_str.split(':').next() {
265 self.index_value(collection_name, key_str, &value)?;
266 }
267 }
268 Ok(())
269 }
270
271 pub fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
273 if let Some(value) = self.hot.get(key) {
275 return Ok(Some(value));
276 }
277
278 if let Some(collection) = key.split(':').next() {
280 if let Some(index) = self.primary_indices.get(collection) {
281 if let Some(value) = index.get(key) {
282 self.hot.set(key.to_string(), value.clone(), None);
284 return Ok(Some(value.clone()));
285 }
286 }
287 }
288
289 let value = self.cold.get(key)?;
291 if let Some(v) = &value {
292 self.hot.set(key.to_string(), v.clone(), None);
293 }
294 Ok(value)
295 }
296
297 pub fn put(&self, key: String, value: Vec<u8>, ttl: Option<Duration>) -> Result<()> {
298 const MAX_BLOB_SIZE: usize = 50 * 1024 * 1024; if value.len() > MAX_BLOB_SIZE {
301 return Err(AuroraError::InvalidOperation(format!(
302 "Blob size {} exceeds maximum allowed size of {}MB",
303 value.len() / (1024 * 1024),
304 MAX_BLOB_SIZE / (1024 * 1024)
305 )));
306 }
307
308 if self
310 .in_transaction
311 .load(std::sync::atomic::Ordering::SeqCst)
312 {
313 self.transaction_ops.insert(key.clone(), value.clone());
314 return Ok(());
315 }
316
317 self.cold.set(key.clone(), value.clone())?;
319 self.hot.set(key.clone(), value.clone(), ttl);
320
321 if let Some(collection_name) = key.split(':').next() {
323 if !collection_name.starts_with('_') {
325 self.index_value(collection_name, &key, &value)?;
326 }
327 }
328
329 Ok(())
330 }
331
332 fn index_value(&self, collection: &str, key: &str, value: &[u8]) -> Result<()> {
333 self.primary_indices
335 .entry(collection.to_string())
336 .or_insert_with(DashMap::new)
337 .insert(key.to_string(), value.to_vec());
338
339 if let Ok(doc) = serde_json::from_slice::<Document>(value) {
341 for (field, value) in doc.data {
342 let value_str = match &value {
344 Value::String(s) => s.clone(),
345 _ => value.to_string(),
346 };
347 self.secondary_indices
348 .entry(format!("{}:{}", collection, field))
349 .or_insert_with(DashMap::new)
350 .entry(value_str)
351 .or_insert_with(Vec::new)
352 .push(key.to_string());
353 }
354 }
355 Ok(())
356 }
357
358 fn scan_collection(&self, collection: &str) -> Result<Vec<Document>> {
360 let _prefix = format!("{}:", collection);
361 let mut documents = Vec::new();
362
363 if let Some(index) = self.primary_indices.get(collection) {
364 for entry in index.iter() {
365 if let Ok(doc) = serde_json::from_slice(entry.value()) {
366 documents.push(doc);
367 }
368 }
369 }
370
371 Ok(documents)
372 }
373
374 pub async fn put_blob(&self, key: String, file_path: &Path) -> Result<()> {
376 const MAX_FILE_SIZE: usize = 50 * 1024 * 1024; let metadata = tokio::fs::metadata(file_path).await?;
380 let file_size = metadata.len() as usize;
381
382 if file_size > MAX_FILE_SIZE {
383 return Err(AuroraError::InvalidOperation(format!(
384 "File size {} MB exceeds maximum allowed size of {} MB",
385 file_size / (1024 * 1024),
386 MAX_FILE_SIZE / (1024 * 1024)
387 )));
388 }
389
390 let mut file = File::open(file_path).await?;
391 let mut buffer = Vec::new();
392 file.read_to_end(&mut buffer).await?;
393
394 let mut blob_data = Vec::with_capacity(5 + buffer.len());
396 blob_data.extend_from_slice(b"BLOB:");
397 blob_data.extend_from_slice(&buffer);
398
399 self.put(key, blob_data, None)
400 }
401
402 pub fn new_collection(&self, name: &str, fields: Vec<(String, FieldType, bool)>) -> Result<()> {
427 let collection_key = format!("_collection:{}", name);
428
429 if self.get(&collection_key)?.is_some() {
431 return Err(AuroraError::CollectionAlreadyExists(name.to_string()));
432 }
433
434 let mut field_definitions = HashMap::new();
436 for (field_name, field_type, unique) in fields {
437 field_definitions.insert(
438 field_name,
439 FieldDefinition {
440 field_type,
441 unique,
442 indexed: unique, },
444 );
445 }
446
447 let collection = Collection {
448 name: name.to_string(),
449 fields: field_definitions,
450 };
452
453 let collection_data = serde_json::to_vec(&collection)?;
454 self.put(collection_key, collection_data, None)?;
455
456 Ok(())
457 }
458
459 pub async fn insert_into(&self, collection: &str, data: Vec<(&str, Value)>) -> Result<String> {
479 let data_map: HashMap<String, Value> =
481 data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
482
483 self.validate_unique_constraints(collection, &data_map)
485 .await?;
486
487 let doc_id = Uuid::new_v4().to_string();
488 let document = Document {
489 id: doc_id.clone(),
490 data: data_map,
491 };
492
493 self.put(
494 format!("{}:{}", collection, doc_id),
495 serde_json::to_vec(&document)?,
496 None,
497 )?;
498
499 Ok(doc_id)
500 }
501
502 pub async fn insert_map(
503 &self,
504 collection: &str,
505 data: HashMap<String, Value>,
506 ) -> Result<String> {
507 self.validate_unique_constraints(collection, &data).await?;
509
510 let doc_id = Uuid::new_v4().to_string();
511 let document = Document {
512 id: doc_id.clone(),
513 data,
514 };
515
516 self.put(
517 format!("{}:{}", collection, doc_id),
518 serde_json::to_vec(&document)?,
519 None,
520 )?;
521
522 Ok(doc_id)
523 }
524
525 pub async fn get_all_collection(&self, collection: &str) -> Result<Vec<Document>> {
526 self.ensure_indices_initialized().await?;
527 self.scan_collection(collection)
528 }
529
530 pub fn get_data_by_pattern(&self, pattern: &str) -> Result<Vec<(String, DataInfo)>> {
531 let mut data = Vec::new();
532
533 if let Some(index) = self
534 .primary_indices
535 .get(pattern.split(':').next().unwrap_or(""))
536 {
537 for entry in index.iter() {
538 if entry.key().contains(pattern) {
539 let value = entry.value();
540 let info = if value.starts_with(b"BLOB:") {
541 DataInfo::Blob { size: value.len() }
542 } else {
543 DataInfo::Data {
544 size: value.len(),
545 preview: String::from_utf8_lossy(&value[..value.len().min(50)])
546 .into_owned(),
547 }
548 };
549
550 data.push((entry.key().clone(), info));
551 }
552 }
553 }
554
555 Ok(data)
556 }
557
558 pub fn begin_transaction(&self) -> Result<()> {
584 if self
585 .in_transaction
586 .load(std::sync::atomic::Ordering::SeqCst)
587 {
588 return Err(AuroraError::InvalidOperation(
589 "Transaction already in progress".into(),
590 ));
591 }
592
593 self.in_transaction
595 .store(true, std::sync::atomic::Ordering::SeqCst);
596 Ok(())
597 }
598
599 pub fn commit_transaction(&self) -> Result<()> {
606 if !self
607 .in_transaction
608 .load(std::sync::atomic::Ordering::SeqCst)
609 {
610 return Err(AuroraError::InvalidOperation(
611 "No transaction in progress".into(),
612 ));
613 }
614
615 for item in self.transaction_ops.iter() {
617 self.cold.set(item.key().clone(), item.value().clone())?;
618 self.hot.set(item.key().clone(), item.value().clone(), None);
619 }
620
621 self.transaction_ops.clear();
623 self.in_transaction
624 .store(false, std::sync::atomic::Ordering::SeqCst);
625
626 self.cold.compact()?;
628
629 Ok(())
630 }
631
632 pub fn rollback_transaction(&self) -> Result<()> {
639 if !self
640 .in_transaction
641 .load(std::sync::atomic::Ordering::SeqCst)
642 {
643 return Err(AuroraError::InvalidOperation(
644 "No transaction in progress".into(),
645 ));
646 }
647
648 self.transaction_ops.clear();
650 self.in_transaction
651 .store(false, std::sync::atomic::Ordering::SeqCst);
652
653 Ok(())
654 }
655
656 pub async fn create_index(&self, collection: &str, field: &str) -> Result<()> {
657 if self.get(&format!("_collection:{}", collection))?.is_none() {
659 return Err(AuroraError::CollectionNotFound(collection.to_string()));
660 }
661
662 let index_name = format!("idx_{}_{}", collection, field);
664
665 let definition = IndexDefinition {
667 name: index_name.clone(),
668 collection: collection.to_string(),
669 fields: vec![field.to_string()],
670 index_type: IndexType::BTree,
671 unique: false,
672 };
673
674 let index = Index::new(definition.clone());
676
677 let prefix = format!("{}:", collection);
679 for result in self.cold.scan_prefix(&prefix) {
680 if let Ok((_, data)) = result {
681 if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
682 let _ = index.insert(&doc);
683 }
684 }
685 }
686
687 self.indices.insert(index_name, index);
689
690 let index_key = format!("_index:{}:{}", collection, field);
692 self.put(index_key, serde_json::to_vec(&definition)?, None)?;
693
694 Ok(())
695 }
696
697 pub fn query<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
717 QueryBuilder::new(self, collection)
718 }
719
720 pub fn search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
740 SearchBuilder::new(self, collection)
741 }
742
743 pub fn get_document(&self, collection: &str, id: &str) -> Result<Option<Document>> {
763 let key = format!("{}:{}", collection, id);
764 if let Some(data) = self.get(&key)? {
765 Ok(Some(serde_json::from_slice(&data)?))
766 } else {
767 Ok(None)
768 }
769 }
770
771 pub async fn delete(&self, key: &str) -> Result<()> {
787 if self.hot.get(key).is_some() {
789 self.hot.delete(key);
790 }
791
792 self.cold.delete(key)?;
793
794 if let Some(collection) = key.split(':').next() {
796 if let Some(index) = self.primary_indices.get_mut(collection) {
797 index.remove(key);
798 }
799 }
800
801 if self
803 .in_transaction
804 .load(std::sync::atomic::Ordering::SeqCst)
805 {
806 self.transaction_ops.insert(key.to_string(), Vec::new());
807 }
808
809 Ok(())
810 }
811
812 pub async fn delete_collection(&self, collection: &str) -> Result<()> {
813 let prefix = format!("{}:", collection);
814
815 let keys: Vec<String> = self
817 .cold
818 .scan()
819 .filter_map(|r| r.ok())
820 .filter(|(k, _)| k.starts_with(&prefix))
821 .map(|(k, _)| k)
822 .collect();
823
824 for key in keys {
826 self.delete(&key).await?;
827 }
828
829 self.primary_indices.remove(collection);
831 self.secondary_indices
832 .retain(|k, _| !k.starts_with(&prefix));
833
834 Ok(())
835 }
836
837 #[allow(dead_code)]
838 fn remove_from_indices(&self, collection: &str, doc: &Document) -> Result<()> {
839 if let Some(index) = self.primary_indices.get(collection) {
841 index.remove(&doc.id);
842 }
843
844 for (field, value) in &doc.data {
846 let index_key = format!("{}:{}", collection, field);
847 if let Some(index) = self.secondary_indices.get(&index_key) {
848 if let Some(mut doc_ids) = index.get_mut(&value.to_string()) {
849 doc_ids.retain(|id| id != &doc.id);
850 }
851 }
852 }
853
854 Ok(())
855 }
856
857 pub async fn search_text(
858 &self,
859 collection: &str,
860 field: &str,
861 query: &str,
862 ) -> Result<Vec<Document>> {
863 let mut results = Vec::new();
864 let docs = self.get_all_collection(collection).await?;
865
866 for doc in docs {
867 if let Some(Value::String(text)) = doc.data.get(field) {
868 if text.to_lowercase().contains(&query.to_lowercase()) {
869 results.push(doc);
870 }
871 }
872 }
873
874 Ok(results)
875 }
876
877 pub fn export_as_json(&self, collection: &str, output_path: &str) -> Result<()> {
893 let output_path = if !output_path.ends_with(".json") {
894 format!("{}.json", output_path)
895 } else {
896 output_path.to_string()
897 };
898
899 let mut docs = Vec::new();
900
901 for result in self.cold.scan() {
903 let (key, value) = result?;
904
905 if let Some(key_collection) = key.split(':').next() {
907 if key_collection == collection && !key.starts_with("_collection:") {
908 if let Ok(doc) = serde_json::from_slice::<Document>(&value) {
909 let mut clean_doc = serde_json::Map::new();
911 for (k, v) in doc.data {
912 match v {
913 Value::String(s) => clean_doc.insert(k, JsonValue::String(s)),
914 Value::Int(i) => clean_doc.insert(k, JsonValue::Number(i.into())),
915 Value::Float(f) => {
916 if let Some(n) = serde_json::Number::from_f64(f) {
917 clean_doc.insert(k, JsonValue::Number(n))
918 } else {
919 clean_doc.insert(k, JsonValue::Null)
920 }
921 }
922 Value::Bool(b) => clean_doc.insert(k, JsonValue::Bool(b)),
923 Value::Array(arr) => {
924 let clean_arr: Vec<JsonValue> = arr
925 .into_iter()
926 .map(|v| match v {
927 Value::String(s) => JsonValue::String(s),
928 Value::Int(i) => JsonValue::Number(i.into()),
929 Value::Float(f) => serde_json::Number::from_f64(f)
930 .map(JsonValue::Number)
931 .unwrap_or(JsonValue::Null),
932 Value::Bool(b) => JsonValue::Bool(b),
933 Value::Null => JsonValue::Null,
934 _ => JsonValue::Null,
935 })
936 .collect();
937 clean_doc.insert(k, JsonValue::Array(clean_arr))
938 }
939 Value::Uuid(u) => {
940 clean_doc.insert(k, JsonValue::String(u.to_string()))
941 }
942 Value::Null => clean_doc.insert(k, JsonValue::Null),
943 Value::Object(_) => None, };
945 }
946 docs.push(JsonValue::Object(clean_doc));
947 }
948 }
949 }
950 }
951
952 let output = JsonValue::Object(serde_json::Map::from_iter(vec![(
953 collection.to_string(),
954 JsonValue::Array(docs),
955 )]));
956
957 let mut file = StdFile::create(&output_path)?;
958 serde_json::to_writer_pretty(&mut file, &output)?;
959 println!("Exported collection '{}' to {}", collection, &output_path);
960 Ok(())
961 }
962
963 pub fn export_as_csv(&self, collection: &str, filename: &str) -> Result<()> {
965 let output_path = if !filename.ends_with(".csv") {
966 format!("{}.csv", filename)
967 } else {
968 filename.to_string()
969 };
970
971 let mut writer = csv::Writer::from_path(&output_path)?;
972 let mut headers = Vec::new();
973 let mut first_doc = true;
974
975 for result in self.cold.scan() {
977 let (key, value) = result?;
978
979 if let Some(key_collection) = key.split(':').next() {
981 if key_collection == collection && !key.starts_with("_collection:") {
982 if let Ok(doc) = serde_json::from_slice::<Document>(&value) {
983 if first_doc && !doc.data.is_empty() {
985 headers = doc.data.keys().cloned().collect();
986 writer.write_record(&headers)?;
987 first_doc = false;
988 }
989
990 let values: Vec<String> = headers
992 .iter()
993 .map(|header| {
994 doc.data
995 .get(header)
996 .map(|v| v.to_string())
997 .unwrap_or_default()
998 })
999 .collect();
1000 writer.write_record(&values)?;
1001 }
1002 }
1003 }
1004 }
1005
1006 writer.flush()?;
1007 println!("Exported collection '{}' to {}", collection, &output_path);
1008 Ok(())
1009 }
1010
1011 pub fn find<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
1013 self.query(collection)
1014 }
1015
1016 pub async fn find_by_id(&self, collection: &str, id: &str) -> Result<Option<Document>> {
1019 self.query(collection)
1020 .filter(|f| f.eq("id", id))
1021 .first_one()
1022 .await
1023 }
1024
1025 pub async fn find_one<F>(&self, collection: &str, filter_fn: F) -> Result<Option<Document>>
1026 where
1027 F: Fn(&FilterBuilder) -> bool + Send + 'static,
1028 {
1029 self.query(collection).filter(filter_fn).first_one().await
1030 }
1031
1032 pub async fn find_by_field<T: Into<Value> + Clone + Send + 'static>(
1033 &self,
1034 collection: &str,
1035 field: &'static str,
1036 value: T,
1037 ) -> Result<Vec<Document>> {
1038 let value_clone = value.clone();
1039 self.query(collection)
1040 .filter(move |f| f.eq(field, value_clone.clone()))
1041 .collect()
1042 .await
1043 }
1044
1045 pub async fn find_by_fields(
1046 &self,
1047 collection: &str,
1048 fields: Vec<(&str, Value)>,
1049 ) -> Result<Vec<Document>> {
1050 let mut query = self.query(collection);
1051
1052 for (field, value) in fields {
1053 let field_owned = field.to_owned();
1054 let value_owned = value.clone();
1055 query = query.filter(move |f| f.eq(&field_owned, value_owned.clone()));
1056 }
1057
1058 query.collect().await
1059 }
1060
1061 pub async fn find_in_range<T: Into<Value> + Clone + Send + 'static>(
1063 &self,
1064 collection: &str,
1065 field: &'static str,
1066 min: T,
1067 max: T,
1068 ) -> Result<Vec<Document>> {
1069 self.query(collection)
1070 .filter(move |f| f.between(field, min.clone(), max.clone()))
1071 .collect()
1072 .await
1073 }
1074
1075 pub async fn find_complex<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
1077 self.query(collection)
1078 }
1079
1080 pub fn advanced_search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
1082 self.search(collection)
1083 }
1084
1085 pub async fn upsert(
1087 &self,
1088 collection: &str,
1089 id: &str,
1090 data: Vec<(&str, Value)>,
1091 ) -> Result<String> {
1092 let data_map: HashMap<String, Value> =
1094 data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
1095
1096 if let Some(mut doc) = self.get_document(collection, id)? {
1098 for (key, value) in data_map {
1100 doc.data.insert(key, value);
1101 }
1102
1103 self.validate_unique_constraints_excluding(collection, &doc.data, id)
1106 .await?;
1107
1108 self.put(
1109 format!("{}:{}", collection, id),
1110 serde_json::to_vec(&doc)?,
1111 None,
1112 )?;
1113 Ok(id.to_string())
1114 } else {
1115 self.validate_unique_constraints(collection, &data_map)
1117 .await?;
1118
1119 let document = Document {
1120 id: id.to_string(),
1121 data: data_map,
1122 };
1123
1124 self.put(
1125 format!("{}:{}", collection, id),
1126 serde_json::to_vec(&document)?,
1127 None,
1128 )?;
1129 Ok(id.to_string())
1130 }
1131 }
1132
1133 pub async fn increment(
1135 &self,
1136 collection: &str,
1137 id: &str,
1138 field: &str,
1139 amount: i64,
1140 ) -> Result<i64> {
1141 if let Some(mut doc) = self.get_document(collection, id)? {
1142 let current = match doc.data.get(field) {
1144 Some(Value::Int(i)) => *i,
1145 _ => 0,
1146 };
1147
1148 let new_value = current + amount;
1150 doc.data.insert(field.to_string(), Value::Int(new_value));
1151
1152 self.put(
1154 format!("{}:{}", collection, id),
1155 serde_json::to_vec(&doc)?,
1156 None,
1157 )?;
1158
1159 Ok(new_value)
1160 } else {
1161 Err(AuroraError::NotFound(format!(
1162 "Document {}:{} not found",
1163 collection, id
1164 )))
1165 }
1166 }
1167
1168 pub async fn batch_insert(
1170 &self,
1171 collection: &str,
1172 docs: Vec<Vec<(&str, Value)>>,
1173 ) -> Result<Vec<String>> {
1174 let doc_maps: Vec<HashMap<String, Value>> = docs
1176 .into_iter()
1177 .map(|doc| doc.into_iter().map(|(k, v)| (k.to_string(), v)).collect())
1178 .collect();
1179
1180 self.begin_transaction()?;
1182
1183 let mut ids = Vec::with_capacity(doc_maps.len());
1184
1185 for data_map in doc_maps {
1187 let data_vec: Vec<(&str, Value)> = data_map
1189 .iter()
1190 .map(|(k, v)| (k.as_str(), v.clone()))
1191 .collect();
1192
1193 match self.insert_into(collection, data_vec).await {
1194 Ok(id) => ids.push(id),
1195 Err(e) => {
1196 self.rollback_transaction()?;
1197 return Err(e);
1198 }
1199 }
1200 }
1201
1202 self.commit_transaction()?;
1204
1205 Ok(ids)
1206 }
1207
1208 pub async fn delete_by_query<F>(&self, collection: &str, filter_fn: F) -> Result<usize>
1210 where
1211 F: Fn(&FilterBuilder) -> bool + Send + 'static,
1212 {
1213 let docs = self.query(collection).filter(filter_fn).collect().await?;
1214
1215 let mut deleted_count = 0;
1216
1217 for doc in docs {
1218 let key = format!("{}:{}", collection, doc.id);
1219 self.delete(&key).await?;
1220 deleted_count += 1;
1221 }
1222
1223 Ok(deleted_count)
1224 }
1225
1226 pub async fn import_from_json(&self, collection: &str, filename: &str) -> Result<ImportStats> {
1247 let collection_def = self.get_collection_definition(collection)?;
1249
1250 let json_string = read_to_string(filename)
1252 .await
1253 .map_err(|e| AuroraError::IoError(format!("Failed to read import file: {}", e)))?;
1254
1255 let documents: Vec<JsonValue> = from_str(&json_string)
1257 .map_err(|e| AuroraError::SerializationError(format!("Failed to parse JSON: {}", e)))?;
1258
1259 let mut stats = ImportStats::default();
1260
1261 for doc_json in documents {
1263 match self
1264 .import_document(collection, &collection_def, doc_json)
1265 .await
1266 {
1267 Ok(ImportResult::Imported) => stats.imported += 1,
1268 Ok(ImportResult::Skipped) => stats.skipped += 1,
1269 Err(_) => stats.failed += 1,
1270 }
1271 }
1272
1273 Ok(stats)
1274 }
1275
1276 async fn import_document(
1278 &self,
1279 collection: &str,
1280 collection_def: &Collection,
1281 doc_json: JsonValue,
1282 ) -> Result<ImportResult> {
1283 if !doc_json.is_object() {
1284 return Err(AuroraError::InvalidOperation("Expected JSON object".into()));
1285 }
1286
1287 let doc_id = doc_json
1289 .get("id")
1290 .and_then(|id| id.as_str())
1291 .map(|s| s.to_string())
1292 .unwrap_or_else(|| Uuid::new_v4().to_string());
1293
1294 if let Some(_) = self.get_document(collection, &doc_id)? {
1296 return Ok(ImportResult::Skipped);
1297 }
1298
1299 let mut data_map = HashMap::new();
1301
1302 if let Some(obj) = doc_json.as_object() {
1303 for (field_name, field_def) in &collection_def.fields {
1304 if let Some(json_value) = obj.get(field_name) {
1305 if !self.validate_field_value(json_value, &field_def.field_type) {
1307 return Err(AuroraError::InvalidOperation(format!(
1308 "Field '{}' has invalid type",
1309 field_name
1310 )));
1311 }
1312
1313 let value = self.json_to_value(json_value)?;
1315 data_map.insert(field_name.clone(), value);
1316 } else if field_def.unique {
1317 return Err(AuroraError::InvalidOperation(format!(
1319 "Missing required unique field '{}'",
1320 field_name
1321 )));
1322 }
1323 }
1324 }
1325
1326 let unique_fields = self.get_unique_fields(&collection_def);
1328 for unique_field in &unique_fields {
1329 if let Some(value) = data_map.get(unique_field) {
1330 let query_results = self
1332 .query(collection)
1333 .filter(move |f| f.eq(unique_field, value.clone()))
1334 .limit(1)
1335 .collect()
1336 .await?;
1337
1338 if !query_results.is_empty() {
1339 return Ok(ImportResult::Skipped);
1341 }
1342 }
1343 }
1344
1345 let document = Document {
1347 id: doc_id,
1348 data: data_map,
1349 };
1350
1351 self.put(
1352 format!("{}:{}", collection, document.id),
1353 serde_json::to_vec(&document)?,
1354 None,
1355 )?;
1356
1357 Ok(ImportResult::Imported)
1358 }
1359
1360 fn validate_field_value(&self, value: &JsonValue, field_type: &FieldType) -> bool {
1362 match field_type {
1363 FieldType::String => value.is_string(),
1364 FieldType::Int => value.is_i64() || value.is_u64(),
1365 FieldType::Float => value.is_number(),
1366 FieldType::Boolean => value.is_boolean(),
1367 FieldType::Array => value.is_array(),
1368 FieldType::Object => value.is_object(),
1369 FieldType::Uuid => {
1370 value.is_string() && Uuid::parse_str(value.as_str().unwrap_or("")).is_ok()
1371 }
1372 }
1373 }
1374
1375 fn json_to_value(&self, json_value: &JsonValue) -> Result<Value> {
1377 match json_value {
1378 JsonValue::Null => Ok(Value::Null),
1379 JsonValue::Bool(b) => Ok(Value::Bool(*b)),
1380 JsonValue::Number(n) => {
1381 if let Some(i) = n.as_i64() {
1382 Ok(Value::Int(i))
1383 } else if let Some(f) = n.as_f64() {
1384 Ok(Value::Float(f))
1385 } else {
1386 Err(AuroraError::InvalidOperation("Invalid number value".into()))
1387 }
1388 }
1389 JsonValue::String(s) => {
1390 if let Ok(uuid) = Uuid::parse_str(s) {
1392 Ok(Value::Uuid(uuid))
1393 } else {
1394 Ok(Value::String(s.clone()))
1395 }
1396 }
1397 JsonValue::Array(arr) => {
1398 let mut values = Vec::new();
1399 for item in arr {
1400 values.push(self.json_to_value(item)?);
1401 }
1402 Ok(Value::Array(values))
1403 }
1404 JsonValue::Object(obj) => {
1405 let mut map = HashMap::new();
1406 for (k, v) in obj {
1407 map.insert(k.clone(), self.json_to_value(v)?);
1408 }
1409 Ok(Value::Object(map))
1410 }
1411 }
1412 }
1413
1414 fn get_collection_definition(&self, collection: &str) -> Result<Collection> {
1416 if let Some(data) = self.get(&format!("_collection:{}", collection))? {
1417 let collection_def: Collection = serde_json::from_slice(&data)?;
1418 Ok(collection_def)
1419 } else {
1420 Err(AuroraError::CollectionNotFound(collection.to_string()))
1421 }
1422 }
1423
1424 pub fn get_database_stats(&self) -> Result<DatabaseStats> {
1426 let hot_stats = self.hot.get_stats();
1427 let cold_stats = self.cold.get_stats()?;
1428
1429 Ok(DatabaseStats {
1430 hot_stats,
1431 cold_stats,
1432 estimated_size: self.cold.estimated_size(),
1433 collections: self.get_collection_stats()?,
1434 })
1435 }
1436
1437 pub fn get_hot_ref(&self, key: &str) -> Option<Arc<Vec<u8>>> {
1439 self.hot.get_ref(key)
1440 }
1441
1442 pub fn is_in_hot_cache(&self, key: &str) -> bool {
1444 self.hot.is_hot(key)
1445 }
1446
1447 pub async fn start_hot_cache_maintenance(&self, interval_secs: u64) {
1449 let hot_store = Arc::new(self.hot.clone());
1450 hot_store.start_cleanup_with_interval(interval_secs).await;
1451 }
1452
1453 pub fn clear_hot_cache(&self) {
1455 self.hot.clear();
1456 println!(
1457 "Hot cache cleared, current hit ratio: {:.2}%",
1458 self.hot.hit_ratio() * 100.0
1459 );
1460 }
1461
1462 pub fn batch_write(&self, pairs: Vec<(String, Vec<u8>)>) -> Result<()> {
1464 self.cold.batch_set(pairs)
1465 }
1466
1467 pub fn scan_with_prefix(
1469 &self,
1470 prefix: &str,
1471 ) -> impl Iterator<Item = Result<(String, Vec<u8>)>> + '_ {
1472 self.cold.scan_prefix(prefix)
1473 }
1474
1475 pub fn get_collection_stats(&self) -> Result<HashMap<String, CollectionStats>> {
1477 let mut stats = HashMap::new();
1478
1479 let collections: Vec<String> = self
1481 .cold
1482 .scan()
1483 .filter_map(|r| r.ok())
1484 .map(|(k, _)| k)
1485 .filter(|k| k.starts_with("_collection:"))
1486 .map(|k| k.trim_start_matches("_collection:").to_string())
1487 .collect();
1488
1489 for collection in collections {
1490 let prefix = format!("{}:", collection);
1491
1492 let count = self.cold.scan_prefix(&prefix).count();
1494
1495 let size: usize = self
1497 .cold
1498 .scan_prefix(&prefix)
1499 .filter_map(|r| r.ok())
1500 .map(|(_, v)| v.len())
1501 .sum();
1502
1503 stats.insert(
1504 collection,
1505 CollectionStats {
1506 count,
1507 size_bytes: size,
1508 avg_doc_size: if count > 0 { size / count } else { 0 },
1509 },
1510 );
1511 }
1512
1513 Ok(stats)
1514 }
1515
1516 pub fn search_by_value(
1520 &self,
1521 collection: &str,
1522 field: &str,
1523 value: &Value,
1524 ) -> Result<Vec<Document>> {
1525 let index_key = format!("_index:{}:{}", collection, field);
1526
1527 if let Some(index_data) = self.get(&index_key)? {
1528 let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
1529 let index = Index::new(index_def);
1530
1531 if let Some(doc_ids) = index.search(value) {
1533 let mut docs = Vec::new();
1535 for id in doc_ids {
1536 if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
1537 let doc: Document = serde_json::from_slice(&doc_data)?;
1538 docs.push(doc);
1539 }
1540 }
1541 return Ok(docs);
1542 }
1543 }
1544
1545 Ok(Vec::new())
1547 }
1548
1549 pub fn full_text_search(
1554 &self,
1555 collection: &str,
1556 field: &str,
1557 query: &str,
1558 ) -> Result<Vec<Document>> {
1559 let index_key = format!("_index:{}:{}", collection, field);
1560
1561 if let Some(index_data) = self.get(&index_key)? {
1562 let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
1563
1564 if !matches!(index_def.index_type, IndexType::FullText) {
1566 return Err(AuroraError::InvalidOperation(format!(
1567 "Field '{}' is not indexed as full-text",
1568 field
1569 )));
1570 }
1571
1572 let index = Index::new(index_def);
1573
1574 if let Some(doc_id_scores) = index.search_text(query) {
1576 let mut docs = Vec::new();
1578 for (id, _score) in doc_id_scores {
1579 if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
1580 let doc: Document = serde_json::from_slice(&doc_data)?;
1581 docs.push(doc);
1582 }
1583 }
1584 return Ok(docs);
1585 }
1586 }
1587
1588 Ok(Vec::new())
1590 }
1591
1592 pub fn create_text_index(
1594 &self,
1595 collection: &str,
1596 field: &str,
1597 _enable_stop_words: bool,
1598 ) -> Result<()> {
1599 if self.get(&format!("_collection:{}", collection))?.is_none() {
1601 return Err(AuroraError::CollectionNotFound(collection.to_string()));
1602 }
1603
1604 let index_def = IndexDefinition {
1606 name: format!("{}_{}_fulltext", collection, field),
1607 collection: collection.to_string(),
1608 fields: vec![field.to_string()],
1609 index_type: IndexType::FullText,
1610 unique: false,
1611 };
1612
1613 let index_key = format!("_index:{}:{}", collection, field);
1615 self.put(index_key, serde_json::to_vec(&index_def)?, None)?;
1616
1617 let index = Index::new(index_def);
1619
1620 let prefix = format!("{}:", collection);
1622 for result in self.cold.scan_prefix(&prefix) {
1623 if let Ok((_, data)) = result {
1624 let doc: Document = serde_json::from_slice(&data)?;
1625 index.insert(&doc)?;
1626 }
1627 }
1628
1629 Ok(())
1630 }
1631
1632 pub async fn execute_simple_query(
1633 &self,
1634 builder: &SimpleQueryBuilder,
1635 ) -> Result<Vec<Document>> {
1636 self.ensure_indices_initialized().await?;
1638
1639 let mut doc_ids_to_load: Option<Vec<String>> = None;
1641 let mut used_filter_index: Option<usize> = None;
1642
1643 for (filter_idx, filter) in builder.filters.iter().enumerate() {
1646 match filter {
1647 Filter::Eq(field, value) => {
1648 let index_key = format!("{}:{}", &builder.collection, field);
1649
1650 if let Some(index) = self.secondary_indices.get(&index_key) {
1652 if let Some(matching_ids) = index.get(&value.to_string()) {
1654 doc_ids_to_load = Some(matching_ids.clone());
1655 used_filter_index = Some(filter_idx);
1656 break; }
1658 }
1659 }
1660 Filter::Gt(field, value)
1661 | Filter::Gte(field, value)
1662 | Filter::Lt(field, value)
1663 | Filter::Lte(field, value) => {
1664 let index_key = format!("{}:{}", &builder.collection, field);
1665
1666 if let Some(index) = self.secondary_indices.get(&index_key) {
1668 let mut matching_ids = Vec::new();
1670
1671 for entry in index.iter() {
1672 let index_value_str = entry.key();
1673
1674 if let Ok(index_value) =
1676 self.parse_value_from_string(index_value_str, value)
1677 {
1678 let matches = match filter {
1679 Filter::Gt(_, filter_val) => index_value > *filter_val,
1680 Filter::Gte(_, filter_val) => index_value >= *filter_val,
1681 Filter::Lt(_, filter_val) => index_value < *filter_val,
1682 Filter::Lte(_, filter_val) => index_value <= *filter_val,
1683 _ => false,
1684 };
1685
1686 if matches {
1687 matching_ids.extend(entry.value().clone());
1688 }
1689 }
1690 }
1691
1692 if !matching_ids.is_empty() {
1693 doc_ids_to_load = Some(matching_ids);
1694 used_filter_index = Some(filter_idx);
1695 break;
1696 }
1697 }
1698 }
1699 Filter::Contains(field, search_term) => {
1700 let index_key = format!("{}:{}", &builder.collection, field);
1701
1702 if let Some(index) = self.secondary_indices.get(&index_key) {
1704 let mut matching_ids = Vec::new();
1707
1708 for entry in index.iter() {
1709 let index_value_str = entry.key();
1710
1711 if index_value_str
1713 .to_lowercase()
1714 .contains(&search_term.to_lowercase())
1715 {
1716 matching_ids.extend(entry.value().clone());
1717 }
1718 }
1719
1720 if !matching_ids.is_empty() {
1721 matching_ids.sort();
1723 matching_ids.dedup();
1724
1725 doc_ids_to_load = Some(matching_ids);
1726 used_filter_index = Some(filter_idx);
1727 break;
1728 }
1729 }
1730 }
1731 }
1732 }
1733
1734 let mut final_docs: Vec<Document>;
1735
1736 if let Some(ids) = doc_ids_to_load {
1737 println!("📊 Loading {} documents via index", ids.len());
1740 final_docs = Vec::with_capacity(ids.len());
1741
1742 for id in ids {
1743 let doc_key = format!("{}:{}", &builder.collection, id);
1744 if let Some(data) = self.get(&doc_key)? {
1745 if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
1746 final_docs.push(doc);
1747 }
1748 }
1749 }
1750 } else {
1751 println!(
1754 "⚠️ INDEX MISS. Falling back to full collection scan for '{}'",
1755 &builder.collection
1756 );
1757 final_docs = self.get_all_collection(&builder.collection).await?;
1758 }
1759
1760 final_docs.retain(|doc| {
1764 builder.filters.iter().enumerate().all(|(idx, filter)| {
1765 if Some(idx) == used_filter_index {
1767 return true;
1768 }
1769
1770 match filter {
1771 Filter::Eq(field, value) => doc.data.get(field).map_or(false, |v| v == value),
1772 Filter::Gt(field, value) => doc.data.get(field).map_or(false, |v| v > value),
1773 Filter::Gte(field, value) => doc.data.get(field).map_or(false, |v| v >= value),
1774 Filter::Lt(field, value) => doc.data.get(field).map_or(false, |v| v < value),
1775 Filter::Lte(field, value) => doc.data.get(field).map_or(false, |v| v <= value),
1776 Filter::Contains(field, value_str) => {
1777 doc.data.get(field).map_or(false, |v| match v {
1778 Value::String(s) => s.contains(value_str),
1779 Value::Array(arr) => arr.contains(&Value::String(value_str.clone())),
1780 _ => false,
1781 })
1782 }
1783 }
1784 })
1785 });
1786
1787 println!(
1788 "✅ Query completed. Returning {} documents",
1789 final_docs.len()
1790 );
1791
1792 Ok(final_docs)
1795 }
1796
1797 fn parse_value_from_string(&self, value_str: &str, reference_value: &Value) -> Result<Value> {
1799 match reference_value {
1800 Value::Int(_) => {
1801 if let Ok(i) = value_str.parse::<i64>() {
1802 Ok(Value::Int(i))
1803 } else {
1804 Err(AuroraError::InvalidOperation("Failed to parse int".into()))
1805 }
1806 }
1807 Value::Float(_) => {
1808 if let Ok(f) = value_str.parse::<f64>() {
1809 Ok(Value::Float(f))
1810 } else {
1811 Err(AuroraError::InvalidOperation(
1812 "Failed to parse float".into(),
1813 ))
1814 }
1815 }
1816 Value::String(_) => Ok(Value::String(value_str.to_string())),
1817 _ => Ok(Value::String(value_str.to_string())),
1818 }
1819 }
1820
1821 pub async fn execute_dynamic_query(
1822 &self,
1823 collection: &str,
1824 payload: &QueryPayload,
1825 ) -> Result<Vec<Document>> {
1826 let mut docs = self.get_all_collection(collection).await?;
1827
1828 if let Some(filters) = &payload.filters {
1830 docs.retain(|doc| {
1831 filters.iter().all(|filter| {
1832 doc.data
1833 .get(&filter.field)
1834 .map_or(false, |doc_val| check_filter(doc_val, filter))
1835 })
1836 });
1837 }
1838
1839 if let Some(sort_options) = &payload.sort {
1841 docs.sort_by(|a, b| {
1842 let a_val = a.data.get(&sort_options.field);
1843 let b_val = b.data.get(&sort_options.field);
1844 let ordering = a_val
1845 .partial_cmp(&b_val)
1846 .unwrap_or(std::cmp::Ordering::Equal);
1847 if sort_options.ascending {
1848 ordering
1849 } else {
1850 ordering.reverse()
1851 }
1852 });
1853 }
1854
1855 let mut docs = docs;
1857 if let Some(offset) = payload.offset {
1858 docs = docs.into_iter().skip(offset).collect();
1859 }
1860 if let Some(limit) = payload.limit {
1861 docs = docs.into_iter().take(limit).collect();
1862 }
1863
1864 if let Some(select_fields) = &payload.select {
1866 if !select_fields.is_empty() {
1867 docs = docs
1868 .into_iter()
1869 .map(|mut doc| {
1870 doc.data.retain(|key, _| select_fields.contains(key));
1871 doc
1872 })
1873 .collect();
1874 }
1875 }
1876
1877 Ok(docs)
1878 }
1879
1880 pub async fn process_network_request(
1881 &self,
1882 request: crate::network::protocol::Request,
1883 ) -> crate::network::protocol::Response {
1884 use crate::network::protocol::Response;
1885
1886 match request {
1887 crate::network::protocol::Request::Get(key) => match self.get(&key) {
1888 Ok(value) => Response::Success(value),
1889 Err(e) => Response::Error(e.to_string()),
1890 },
1891 crate::network::protocol::Request::Put(key, value) => {
1892 match self.put(key, value, None) {
1893 Ok(_) => Response::Done,
1894 Err(e) => Response::Error(e.to_string()),
1895 }
1896 }
1897 crate::network::protocol::Request::Delete(key) => match self.delete(&key).await {
1898 Ok(_) => Response::Done,
1899 Err(e) => Response::Error(e.to_string()),
1900 },
1901 crate::network::protocol::Request::NewCollection { name, fields } => {
1902 let fields_for_db: Vec<(String, crate::types::FieldType, bool)> = fields
1903 .iter()
1904 .map(|(name, ft, unique)| (name.clone(), ft.clone(), *unique))
1905 .collect();
1906
1907 match self.new_collection(&name, fields_for_db) {
1908 Ok(_) => Response::Done,
1909 Err(e) => Response::Error(e.to_string()),
1910 }
1911 }
1912 crate::network::protocol::Request::Insert { collection, data } => {
1913 match self.insert_map(&collection, data).await {
1914 Ok(id) => Response::Message(id),
1915 Err(e) => Response::Error(e.to_string()),
1916 }
1917 }
1918 crate::network::protocol::Request::GetDocument { collection, id } => {
1919 match self.get_document(&collection, &id) {
1920 Ok(doc) => Response::Document(doc),
1921 Err(e) => Response::Error(e.to_string()),
1922 }
1923 }
1924 crate::network::protocol::Request::Query(builder) => {
1925 match self.execute_simple_query(&builder).await {
1926 Ok(docs) => Response::Documents(docs),
1927 Err(e) => Response::Error(e.to_string()),
1928 }
1929 }
1930 crate::network::protocol::Request::BeginTransaction => match self.begin_transaction() {
1931 Ok(_) => Response::Done,
1932 Err(e) => Response::Error(e.to_string()),
1933 },
1934 crate::network::protocol::Request::CommitTransaction => match self.commit_transaction()
1935 {
1936 Ok(_) => Response::Done,
1937 Err(e) => Response::Error(e.to_string()),
1938 },
1939 crate::network::protocol::Request::RollbackTransaction => {
1940 match self.rollback_transaction() {
1941 Ok(_) => Response::Done,
1942 Err(e) => Response::Error(e.to_string()),
1943 }
1944 }
1945 }
1946 }
1947
1948 pub async fn create_indices(&self, collection: &str, fields: &[&str]) -> Result<()> {
1963 for field in fields {
1964 if let Err(e) = self.create_index(collection, field).await {
1965 eprintln!(
1966 "Warning: Failed to create index for {}.{}: {}",
1967 collection, field, e
1968 );
1969 } else {
1970 println!("✅ Created index for {}.{}", collection, field);
1971 }
1972 }
1973 Ok(())
1974 }
1975
1976 pub fn get_index_stats(&self, collection: &str) -> HashMap<String, IndexStats> {
1980 let mut stats = HashMap::new();
1981
1982 for entry in self.secondary_indices.iter() {
1983 let key = entry.key();
1984 if key.starts_with(&format!("{}:", collection)) {
1985 let field = key.split(':').nth(1).unwrap_or("unknown");
1986 let index = entry.value();
1987
1988 let unique_values = index.len();
1989 let total_documents: usize = index.iter().map(|entry| entry.value().len()).sum();
1990
1991 stats.insert(
1992 field.to_string(),
1993 IndexStats {
1994 unique_values,
1995 total_documents,
1996 avg_docs_per_value: if unique_values > 0 {
1997 total_documents / unique_values
1998 } else {
1999 0
2000 },
2001 },
2002 );
2003 }
2004 }
2005
2006 stats
2007 }
2008
2009 pub async fn optimize_collection(&self, collection: &str) -> Result<()> {
2013 if let Ok(collection_def) = self.get_collection_definition(collection) {
2017 let field_names: Vec<&str> = collection_def.fields.keys().map(|s| s.as_str()).collect();
2018 self.create_indices(collection, &field_names).await?;
2019 println!(
2020 "🚀 Optimized collection '{}' with {} indices",
2021 collection,
2022 field_names.len()
2023 );
2024 }
2025
2026 Ok(())
2027 }
2028
2029 fn get_unique_fields(&self, collection: &Collection) -> Vec<String> {
2031 collection
2032 .fields
2033 .iter()
2034 .filter(|(_, def)| def.unique)
2035 .map(|(name, _)| name.clone())
2036 .collect()
2037 }
2038
2039 async fn validate_unique_constraints(
2041 &self,
2042 collection: &str,
2043 data: &HashMap<String, Value>,
2044 ) -> Result<()> {
2045 self.ensure_indices_initialized().await?;
2046 let collection_def = self.get_collection_definition(collection)?;
2047 let unique_fields = self.get_unique_fields(&collection_def);
2048
2049 for unique_field in &unique_fields {
2050 if let Some(value) = data.get(unique_field) {
2051 let index_key = format!("{}:{}", collection, unique_field);
2052 if let Some(index) = self.secondary_indices.get(&index_key) {
2053 let value_str = match value {
2055 Value::String(s) => s.clone(),
2056 _ => value.to_string(),
2057 };
2058 if index.contains_key(&value_str) {
2059 return Err(AuroraError::UniqueConstraintViolation(
2060 unique_field.clone(),
2061 value_str,
2062 ));
2063 }
2064 }
2065 }
2066 }
2067 Ok(())
2068 }
2069
2070 async fn validate_unique_constraints_excluding(
2072 &self,
2073 collection: &str,
2074 data: &HashMap<String, Value>,
2075 exclude_id: &str,
2076 ) -> Result<()> {
2077 self.ensure_indices_initialized().await?;
2078 let collection_def = self.get_collection_definition(collection)?;
2079 let unique_fields = self.get_unique_fields(&collection_def);
2080
2081 for unique_field in &unique_fields {
2082 if let Some(value) = data.get(unique_field) {
2083 let index_key = format!("{}:{}", collection, unique_field);
2084 if let Some(index) = self.secondary_indices.get(&index_key) {
2085 let value_str = match value {
2087 Value::String(s) => s.clone(),
2088 _ => value.to_string(),
2089 };
2090 if let Some(doc_ids) = index.get(&value_str) {
2091 let exclude_key = format!("{}:{}", collection, exclude_id);
2093 for doc_key in doc_ids.value() {
2094 if doc_key != &exclude_key {
2095 return Err(AuroraError::UniqueConstraintViolation(
2096 unique_field.clone(),
2097 value_str,
2098 ));
2099 }
2100 }
2101 }
2102 }
2103 }
2104 }
2105 Ok(())
2106 }
2107}
2108
2109fn check_filter(doc_val: &Value, filter: &HttpFilter) -> bool {
2110 let filter_val = match json_to_value(&filter.value) {
2111 Ok(v) => v,
2112 Err(_) => return false,
2113 };
2114
2115 match filter.operator {
2116 FilterOperator::Eq => doc_val == &filter_val,
2117 FilterOperator::Ne => doc_val != &filter_val,
2118 FilterOperator::Gt => doc_val > &filter_val,
2119 FilterOperator::Gte => doc_val >= &filter_val,
2120 FilterOperator::Lt => doc_val < &filter_val,
2121 FilterOperator::Lte => doc_val <= &filter_val,
2122 FilterOperator::Contains => match (doc_val, &filter_val) {
2123 (Value::String(s), Value::String(fv)) => s.contains(fv),
2124 (Value::Array(arr), _) => arr.contains(&filter_val),
2125 _ => false,
2126 },
2127 }
2128}
2129
2130enum ImportResult {
2132 Imported,
2133 Skipped,
2134}
2135
2136#[derive(Debug, Default)]
2138pub struct ImportStats {
2139 pub imported: usize,
2141 pub skipped: usize,
2143 pub failed: usize,
2145}
2146
2147#[derive(Debug)]
2149pub struct CollectionStats {
2150 pub count: usize,
2152 pub size_bytes: usize,
2154 pub avg_doc_size: usize,
2156}
2157
2158#[derive(Debug)]
2160pub struct IndexStats {
2161 pub unique_values: usize,
2163 pub total_documents: usize,
2165 pub avg_docs_per_value: usize,
2167}
2168
2169#[derive(Debug)]
2171pub struct DatabaseStats {
2172 pub hot_stats: crate::storage::hot::CacheStats,
2174 pub cold_stats: crate::storage::cold::ColdStoreStats,
2176 pub estimated_size: u64,
2178 pub collections: HashMap<String, CollectionStats>,
2180}
2181
2182#[cfg(test)]
2183mod tests {
2184 use super::*;
2185 use tempfile::tempdir;
2186
2187 #[tokio::test]
2188 async fn test_basic_operations() -> Result<()> {
2189 let temp_dir = tempdir()?;
2190 let db_path = temp_dir.path().join("test.aurora");
2191 let db = Aurora::open(db_path.to_str().unwrap())?;
2192
2193 db.new_collection(
2195 "users",
2196 vec![
2197 ("name".to_string(), FieldType::String, false),
2198 ("age".to_string(), FieldType::Int, false),
2199 ("email".to_string(), FieldType::String, true),
2200 ],
2201 )?;
2202
2203 let doc_id = db
2205 .insert_into(
2206 "users",
2207 vec![
2208 ("name", Value::String("John Doe".to_string())),
2209 ("age", Value::Int(30)),
2210 ("email", Value::String("john@example.com".to_string())),
2211 ],
2212 )
2213 .await?;
2214
2215 let doc = db.get_document("users", &doc_id)?.unwrap();
2217 assert_eq!(
2218 doc.data.get("name").unwrap(),
2219 &Value::String("John Doe".to_string())
2220 );
2221 assert_eq!(doc.data.get("age").unwrap(), &Value::Int(30));
2222
2223 Ok(())
2224 }
2225
2226 #[tokio::test]
2227 async fn test_transactions() -> Result<()> {
2228 let temp_dir = tempdir()?;
2229 let db_path = temp_dir.path().join("test.aurora");
2230 let db = Aurora::open(db_path.to_str().unwrap())?;
2231
2232 db.begin_transaction()?;
2234
2235 let doc_id = db
2237 .insert_into("test", vec![("field", Value::String("value".to_string()))])
2238 .await?;
2239
2240 db.commit_transaction()?;
2242
2243 let doc = db.get_document("test", &doc_id)?.unwrap();
2245 assert_eq!(
2246 doc.data.get("field").unwrap(),
2247 &Value::String("value".to_string())
2248 );
2249
2250 Ok(())
2251 }
2252
2253 #[tokio::test]
2254 async fn test_query_operations() -> Result<()> {
2255 let temp_dir = tempdir()?;
2256 let db_path = temp_dir.path().join("test.aurora");
2257 let db = Aurora::open(db_path.to_str().unwrap())?;
2258
2259 db.new_collection(
2261 "books",
2262 vec![
2263 ("title".to_string(), FieldType::String, false),
2264 ("author".to_string(), FieldType::String, false),
2265 ("year".to_string(), FieldType::Int, false),
2266 ],
2267 )?;
2268
2269 db.insert_into(
2271 "books",
2272 vec![
2273 ("title", Value::String("Book 1".to_string())),
2274 ("author", Value::String("Author 1".to_string())),
2275 ("year", Value::Int(2020)),
2276 ],
2277 )
2278 .await?;
2279
2280 db.insert_into(
2281 "books",
2282 vec![
2283 ("title", Value::String("Book 2".to_string())),
2284 ("author", Value::String("Author 2".to_string())),
2285 ("year", Value::Int(2021)),
2286 ],
2287 )
2288 .await?;
2289
2290 let results = db
2292 .query("books")
2293 .filter(|f| f.gt("year", Value::Int(2019)))
2294 .order_by("year", true)
2295 .collect()
2296 .await?;
2297
2298 assert_eq!(results.len(), 2);
2299 assert!(results[0].data.get("year").unwrap() < results[1].data.get("year").unwrap());
2300
2301 Ok(())
2302 }
2303
2304 #[tokio::test]
2305 async fn test_blob_operations() -> Result<()> {
2306 let temp_dir = tempdir()?;
2307 let db_path = temp_dir.path().join("test.aurora");
2308 let db = Aurora::open(db_path.to_str().unwrap())?;
2309
2310 let file_path = temp_dir.path().join("test.txt");
2312 std::fs::write(&file_path, b"Hello, World!")?;
2313
2314 db.put_blob("test:blob".to_string(), &file_path).await?;
2316
2317 let data = db.get_data_by_pattern("test:blob")?;
2319 assert_eq!(data.len(), 1);
2320 match &data[0].1 {
2321 DataInfo::Blob { size } => assert_eq!(*size, 13 + 5), _ => panic!("Expected Blob type"),
2323 }
2324
2325 Ok(())
2326 }
2327
2328 #[tokio::test]
2329 async fn test_blob_size_limit() -> Result<()> {
2330 let temp_dir = tempdir()?;
2331 let db_path = temp_dir.path().join("test.aurora");
2332 let db = Aurora::open(db_path.to_str().unwrap())?;
2333
2334 let large_file_path = temp_dir.path().join("large_file.bin");
2336 let large_data = vec![0u8; 201 * 1024 * 1024];
2337 std::fs::write(&large_file_path, &large_data)?;
2338
2339 let result = db
2341 .put_blob("test:large_blob".to_string(), &large_file_path)
2342 .await;
2343
2344 assert!(result.is_err());
2345 assert!(matches!(
2346 result.unwrap_err(),
2347 AuroraError::InvalidOperation(_)
2348 ));
2349
2350 Ok(())
2351 }
2352
2353 #[tokio::test]
2354 async fn test_unique_constraints() -> Result<()> {
2355 let temp_dir = tempdir()?;
2356 let db_path = temp_dir.path().join("test.aurora");
2357 let db = Aurora::open(db_path.to_str().unwrap())?;
2358
2359 db.new_collection(
2361 "users",
2362 vec![
2363 ("name".to_string(), FieldType::String, false),
2364 ("email".to_string(), FieldType::String, true), ("age".to_string(), FieldType::Int, false),
2366 ],
2367 )?;
2368
2369 let _doc_id1 = db
2371 .insert_into(
2372 "users",
2373 vec![
2374 ("name", Value::String("John Doe".to_string())),
2375 ("email", Value::String("john@example.com".to_string())),
2376 ("age", Value::Int(30)),
2377 ],
2378 )
2379 .await?;
2380
2381 let result = db
2383 .insert_into(
2384 "users",
2385 vec![
2386 ("name", Value::String("Jane Doe".to_string())),
2387 ("email", Value::String("john@example.com".to_string())), ("age", Value::Int(25)),
2389 ],
2390 )
2391 .await;
2392
2393 assert!(result.is_err());
2394 if let Err(AuroraError::UniqueConstraintViolation(field, value)) = result {
2395 assert_eq!(field, "email");
2396 assert_eq!(value, "john@example.com");
2397 } else {
2398 panic!("Expected UniqueConstraintViolation error");
2399 }
2400
2401 let _doc_id2 = db
2404 .upsert(
2405 "users",
2406 "user2",
2407 vec![
2408 ("name", Value::String("Alice Smith".to_string())),
2409 ("email", Value::String("alice@example.com".to_string())),
2410 ("age", Value::Int(28)),
2411 ],
2412 )
2413 .await?;
2414
2415 let result = db
2417 .upsert(
2418 "users",
2419 "user3",
2420 vec![
2421 ("name", Value::String("Bob Wilson".to_string())),
2422 ("email", Value::String("alice@example.com".to_string())), ("age", Value::Int(35)),
2424 ],
2425 )
2426 .await;
2427
2428 assert!(result.is_err());
2429
2430 let result = db
2432 .upsert(
2433 "users",
2434 "user2",
2435 vec![
2436 ("name", Value::String("Alice Updated".to_string())),
2437 ("email", Value::String("alice@example.com".to_string())), ("age", Value::Int(29)),
2439 ],
2440 )
2441 .await;
2442
2443 assert!(result.is_ok());
2444
2445 Ok(())
2446 }
2447}