1use crate::error::{AuroraError, Result};
47use crate::index::{Index, IndexDefinition, IndexType};
48use crate::network::http_models::{
49 document_to_json, json_to_value, Filter as HttpFilter, FilterOperator, QueryPayload,
50};
51use crate::query::{Filter, FilterBuilder, QueryBuilder, SearchBuilder, SimpleQueryBuilder};
52use crate::storage::{ColdStore, HotStore};
53use crate::types::{
54 AuroraConfig, Collection, Document, FieldDefinition, FieldType, InsertData, Value,
55};
56use dashmap::DashMap;
57use serde_json::from_str;
58use serde_json::Value as JsonValue;
59use std::collections::HashMap;
60use std::fs::File as StdFile;
61use std::path::{Path, PathBuf};
62use std::sync::Arc;
63use std::time::Duration;
64use tokio::fs::read_to_string;
65use tokio::fs::File;
66use tokio::io::AsyncReadExt;
67use tokio::sync::OnceCell;
68use uuid::Uuid;
69type PrimaryIndex = DashMap<String, Vec<u8>>;
71type SecondaryIndex = DashMap<String, Vec<String>>;
72
73#[derive(Debug)]
75pub enum DataInfo {
76 Data { size: usize, preview: String },
77 Blob { size: usize },
78 Compressed { size: usize },
79}
80
81impl DataInfo {
82 pub fn size(&self) -> usize {
83 match self {
84 DataInfo::Data { size, .. } => *size,
85 DataInfo::Blob { size } => *size,
86 DataInfo::Compressed { size } => *size,
87 }
88 }
89}
90
91pub struct Aurora {
115 hot: HotStore,
116 cold: ColdStore,
117 primary_indices: Arc<DashMap<String, PrimaryIndex>>,
119 secondary_indices: Arc<DashMap<String, SecondaryIndex>>,
120 indices_initialized: Arc<OnceCell<()>>,
121 in_transaction: std::sync::atomic::AtomicBool,
122 transaction_ops: DashMap<String, Vec<u8>>,
123 indices: Arc<DashMap<String, Index>>,
124}
125
126impl Aurora {
127 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
148 let path = Self::resolve_path(path)?;
149
150 if let Some(parent) = path.parent() {
152 if !parent.exists() {
153 std::fs::create_dir_all(parent)?;
154 }
155 }
156
157 let cold = ColdStore::new(path.to_str().unwrap())?;
159 let hot = HotStore::new();
160
161 let db = Self {
163 hot,
164 cold,
165 primary_indices: Arc::new(DashMap::new()),
166 secondary_indices: Arc::new(DashMap::new()),
167 indices_initialized: Arc::new(OnceCell::new()),
168 in_transaction: std::sync::atomic::AtomicBool::new(false),
169 transaction_ops: DashMap::new(),
170 indices: Arc::new(DashMap::new()),
171 };
172
173 Ok(db)
177 }
178
179 fn resolve_path<P: AsRef<Path>>(path: P) -> Result<PathBuf> {
181 let path = path.as_ref();
182
183 if path.is_absolute() {
185 return Ok(path.to_path_buf());
186 }
187
188 match std::env::current_dir() {
190 Ok(current_dir) => Ok(current_dir.join(path)),
191 Err(e) => Err(AuroraError::IoError(format!(
192 "Failed to resolve current directory: {}",
193 e
194 ))),
195 }
196 }
197
198 pub fn with_config(config: AuroraConfig) -> Result<Self> {
200 let path = Self::resolve_path(&config.db_path)?;
201
202 if config.create_dirs {
203 if let Some(parent) = path.parent() {
204 if !parent.exists() {
205 std::fs::create_dir_all(parent)?;
206 }
207 }
208 }
209
210 let cold = ColdStore::with_config(
212 path.to_str().unwrap(),
213 config.cold_cache_capacity_mb,
214 config.cold_flush_interval_ms,
215 config.cold_mode,
216 )?;
217
218 let hot = HotStore::with_config(
219 config.hot_cache_size_mb,
220 config.hot_cache_cleanup_interval_secs,
221 );
222
223 let db = Self {
225 hot,
226 cold,
227 primary_indices: Arc::new(DashMap::new()),
228 secondary_indices: Arc::new(DashMap::new()),
229 indices_initialized: Arc::new(OnceCell::new()),
230 in_transaction: std::sync::atomic::AtomicBool::new(false),
231 transaction_ops: DashMap::new(),
232 indices: Arc::new(DashMap::new()),
233 };
234
235 if config.auto_compact {
237 }
240
241 Ok(db)
242 }
243
244 async fn ensure_indices_initialized(&self) -> Result<()> {
246 self.indices_initialized
247 .get_or_init(|| async {
248 println!("Initializing indices...");
249 if let Err(e) = self.initialize_indices() {
250 eprintln!("Failed to initialize indices: {:?}", e);
251 }
252 println!("Indices initialized");
253 ()
254 })
255 .await;
256 Ok(())
257 }
258
259 fn initialize_indices(&self) -> Result<()> {
260 for result in self.cold.scan() {
262 let (key, value) = result?;
263 let key_str = std::str::from_utf8(&key.as_bytes())
264 .map_err(|_| AuroraError::InvalidKey("Invalid UTF-8".into()))?;
265
266 if let Some(collection_name) = key_str.split(':').next() {
267 self.index_value(collection_name, key_str, &value)?;
268 }
269 }
270 Ok(())
271 }
272
273 pub fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
275 if let Some(value) = self.hot.get(key) {
277 return Ok(Some(value));
278 }
279
280 if let Some(collection) = key.split(':').next() {
282 if let Some(index) = self.primary_indices.get(collection) {
283 if let Some(value) = index.get(key) {
284 self.hot.set(key.to_string(), value.clone(), None);
286 return Ok(Some(value.clone()));
287 }
288 }
289 }
290
291 let value = self.cold.get(key)?;
293 if let Some(v) = &value {
294 self.hot.set(key.to_string(), v.clone(), None);
295 }
296 Ok(value)
297 }
298
299 pub fn put(&self, key: String, value: Vec<u8>, ttl: Option<Duration>) -> Result<()> {
300 const MAX_BLOB_SIZE: usize = 50 * 1024 * 1024; if value.len() > MAX_BLOB_SIZE {
303 return Err(AuroraError::InvalidOperation(format!(
304 "Blob size {} exceeds maximum allowed size of {}MB",
305 value.len() / (1024 * 1024),
306 MAX_BLOB_SIZE / (1024 * 1024)
307 )));
308 }
309
310 if self
312 .in_transaction
313 .load(std::sync::atomic::Ordering::SeqCst)
314 {
315 self.transaction_ops.insert(key.clone(), value.clone());
316 return Ok(());
317 }
318
319 self.cold.set(key.clone(), value.clone())?;
321 self.hot.set(key.clone(), value.clone(), ttl);
322
323 if let Some(collection_name) = key.split(':').next() {
325 if !collection_name.starts_with('_') {
327 self.index_value(collection_name, &key, &value)?;
328 }
329 }
330
331 Ok(())
332 }
333
334 fn index_value(&self, collection: &str, key: &str, value: &[u8]) -> Result<()> {
335 self.primary_indices
337 .entry(collection.to_string())
338 .or_insert_with(DashMap::new)
339 .insert(key.to_string(), value.to_vec());
340
341 if let Ok(doc) = serde_json::from_slice::<Document>(value) {
343 for (field, value) in doc.data {
344 self.secondary_indices
345 .entry(format!("{}:{}", collection, field))
346 .or_insert_with(DashMap::new)
347 .entry(value.to_string())
348 .or_insert_with(Vec::new)
349 .push(key.to_string());
350 }
351 }
352 Ok(())
353 }
354
355 fn scan_collection(&self, collection: &str) -> Result<Vec<Document>> {
357 let _prefix = format!("{}:", collection);
358 let mut documents = Vec::new();
359
360 if let Some(index) = self.primary_indices.get(collection) {
361 for entry in index.iter() {
362 if let Ok(doc) = serde_json::from_slice(entry.value()) {
363 documents.push(doc);
364 }
365 }
366 }
367
368 Ok(documents)
369 }
370
371 pub async fn put_blob(&self, key: String, file_path: &Path) -> Result<()> {
373 const MAX_FILE_SIZE: usize = 50 * 1024 * 1024; let metadata = tokio::fs::metadata(file_path).await?;
377 let file_size = metadata.len() as usize;
378
379 if file_size > MAX_FILE_SIZE {
380 return Err(AuroraError::InvalidOperation(format!(
381 "File size {} MB exceeds maximum allowed size of {} MB",
382 file_size / (1024 * 1024),
383 MAX_FILE_SIZE / (1024 * 1024)
384 )));
385 }
386
387 let mut file = File::open(file_path).await?;
388 let mut buffer = Vec::new();
389 file.read_to_end(&mut buffer).await?;
390
391 self.put(key, buffer, None)
392 }
393
394 pub fn new_collection(&self, name: &str, fields: Vec<(String, FieldType, bool)>) -> Result<()> {
419 let collection = Collection {
420 name: name.to_string(),
421 fields: fields
422 .into_iter()
423 .map(|(name, field_type, unique)| {
424 (
425 name,
426 FieldDefinition {
427 field_type,
428 unique,
429 indexed: unique,
430 },
431 )
432 })
433 .collect(),
434 unique_fields: Vec::new(),
435 };
436
437 self.put(
438 format!("_collection:{}", name),
439 serde_json::to_vec(&collection)?,
440 None,
441 )
442 }
443
444 pub fn insert_into(&self, collection: &str, data: InsertData) -> Result<String> {
464 let data_map: HashMap<String, Value> =
465 data.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
466
467 let doc_id = Uuid::new_v4().to_string();
468 let document = Document {
469 id: doc_id.clone(),
470 data: data_map,
471 };
472
473 self.put(
474 format!("{}:{}", collection, doc_id),
475 serde_json::to_vec(&document)?,
476 None,
477 )?;
478
479 Ok(doc_id)
480 }
481
482 pub fn insert_map(&self, collection: &str, data: HashMap<String, Value>) -> Result<String> {
483 let doc_id = Uuid::new_v4().to_string();
484 let document = Document {
485 id: doc_id.clone(),
486 data,
487 };
488
489 self.put(
490 format!("{}:{}", collection, doc_id),
491 serde_json::to_vec(&document)?,
492 None,
493 )?;
494
495 Ok(doc_id)
496 }
497
498 pub async fn get_all_collection(&self, collection: &str) -> Result<Vec<Document>> {
499 self.ensure_indices_initialized().await?;
500 self.scan_collection(collection)
501 }
502
503 pub fn get_data_by_pattern(&self, pattern: &str) -> Result<Vec<(String, DataInfo)>> {
504 let mut data = Vec::new();
505
506 if let Some(index) = self
507 .primary_indices
508 .get(pattern.split(':').next().unwrap_or(""))
509 {
510 for entry in index.iter() {
511 if entry.key().contains(pattern) {
512 let value = entry.value();
513 let info = if value.starts_with(b"BLOB:") {
514 DataInfo::Blob { size: value.len() }
515 } else {
516 DataInfo::Data {
517 size: value.len(),
518 preview: String::from_utf8_lossy(&value[..value.len().min(50)])
519 .into_owned(),
520 }
521 };
522
523 data.push((entry.key().clone(), info));
524 }
525 }
526 }
527
528 Ok(data)
529 }
530
531 pub fn begin_transaction(&self) -> Result<()> {
557 if self
558 .in_transaction
559 .load(std::sync::atomic::Ordering::SeqCst)
560 {
561 return Err(AuroraError::InvalidOperation(
562 "Transaction already in progress".into(),
563 ));
564 }
565
566 self.in_transaction
568 .store(true, std::sync::atomic::Ordering::SeqCst);
569 Ok(())
570 }
571
572 pub fn commit_transaction(&self) -> Result<()> {
579 if !self
580 .in_transaction
581 .load(std::sync::atomic::Ordering::SeqCst)
582 {
583 return Err(AuroraError::InvalidOperation(
584 "No transaction in progress".into(),
585 ));
586 }
587
588 for item in self.transaction_ops.iter() {
590 self.cold.set(item.key().clone(), item.value().clone())?;
591 self.hot.set(item.key().clone(), item.value().clone(), None);
592 }
593
594 self.transaction_ops.clear();
596 self.in_transaction
597 .store(false, std::sync::atomic::Ordering::SeqCst);
598
599 self.cold.compact()?;
601
602 Ok(())
603 }
604
605 pub fn rollback_transaction(&self) -> Result<()> {
612 if !self
613 .in_transaction
614 .load(std::sync::atomic::Ordering::SeqCst)
615 {
616 return Err(AuroraError::InvalidOperation(
617 "No transaction in progress".into(),
618 ));
619 }
620
621 self.transaction_ops.clear();
623 self.in_transaction
624 .store(false, std::sync::atomic::Ordering::SeqCst);
625
626 Ok(())
627 }
628
629 pub async fn create_index(&self, collection: &str, field: &str) -> Result<()> {
630 if self.get(&format!("_collection:{}", collection))?.is_none() {
632 return Err(AuroraError::CollectionNotFound(collection.to_string()));
633 }
634
635 let index_name = format!("idx_{}_{}", collection, field);
637
638 let definition = IndexDefinition {
640 name: index_name.clone(),
641 collection: collection.to_string(),
642 fields: vec![field.to_string()],
643 index_type: IndexType::BTree,
644 unique: false,
645 };
646
647 let index = Index::new(definition.clone());
649
650 let prefix = format!("{}:", collection);
652 for result in self.cold.scan_prefix(&prefix) {
653 if let Ok((_, data)) = result {
654 if let Ok(doc) = serde_json::from_slice::<Document>(&data) {
655 let _ = index.insert(&doc);
656 }
657 }
658 }
659
660 self.indices.insert(index_name, index);
662
663 let index_key = format!("_index:{}:{}", collection, field);
665 self.put(index_key, serde_json::to_vec(&definition)?, None)?;
666
667 Ok(())
668 }
669
670 pub fn query<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
690 QueryBuilder::new(self, collection)
691 }
692
693 pub fn search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
713 SearchBuilder::new(self, collection)
714 }
715
716 pub fn get_document(&self, collection: &str, id: &str) -> Result<Option<Document>> {
736 let key = format!("{}:{}", collection, id);
737 if let Some(data) = self.get(&key)? {
738 Ok(Some(serde_json::from_slice(&data)?))
739 } else {
740 Ok(None)
741 }
742 }
743
744 pub async fn delete(&self, key: &str) -> Result<()> {
760 if self.hot.get(key).is_some() {
762 self.hot.delete(key);
763 }
764
765 self.cold.delete(key)?;
766
767 if let Some(collection) = key.split(':').next() {
769 if let Some(index) = self.primary_indices.get_mut(collection) {
770 index.remove(key);
771 }
772 }
773
774 if self
776 .in_transaction
777 .load(std::sync::atomic::Ordering::SeqCst)
778 {
779 self.transaction_ops.insert(key.to_string(), Vec::new());
780 }
781
782 Ok(())
783 }
784
785 pub async fn delete_collection(&self, collection: &str) -> Result<()> {
786 let prefix = format!("{}:", collection);
787
788 let keys: Vec<String> = self
790 .cold
791 .scan()
792 .filter_map(|r| r.ok())
793 .filter(|(k, _)| k.starts_with(&prefix))
794 .map(|(k, _)| k)
795 .collect();
796
797 for key in keys {
799 self.delete(&key).await?;
800 }
801
802 self.primary_indices.remove(collection);
804 self.secondary_indices
805 .retain(|k, _| !k.starts_with(&prefix));
806
807 Ok(())
808 }
809
810 #[allow(dead_code)]
811 fn remove_from_indices(&self, collection: &str, doc: &Document) -> Result<()> {
812 if let Some(index) = self.primary_indices.get(collection) {
814 index.remove(&doc.id);
815 }
816
817 for (field, value) in &doc.data {
819 let index_key = format!("{}:{}", collection, field);
820 if let Some(index) = self.secondary_indices.get(&index_key) {
821 if let Some(mut doc_ids) = index.get_mut(&value.to_string()) {
822 doc_ids.retain(|id| id != &doc.id);
823 }
824 }
825 }
826
827 Ok(())
828 }
829
830 pub async fn search_text(
831 &self,
832 collection: &str,
833 field: &str,
834 query: &str,
835 ) -> Result<Vec<Document>> {
836 let mut results = Vec::new();
837 let docs = self.get_all_collection(collection).await?;
838
839 for doc in docs {
840 if let Some(Value::String(text)) = doc.data.get(field) {
841 if text.to_lowercase().contains(&query.to_lowercase()) {
842 results.push(doc);
843 }
844 }
845 }
846
847 Ok(results)
848 }
849
850 pub fn export_as_json(&self, collection: &str, output_path: &str) -> Result<()> {
866 let output_path = if !output_path.ends_with(".json") {
867 format!("{}.json", output_path)
868 } else {
869 output_path.to_string()
870 };
871
872 let mut docs = Vec::new();
873
874 for result in self.cold.scan() {
876 let (key, value) = result?;
877
878 if let Some(key_collection) = key.split(':').next() {
880 if key_collection == collection && !key.starts_with("_collection:") {
881 if let Ok(doc) = serde_json::from_slice::<Document>(&value) {
882 let mut clean_doc = serde_json::Map::new();
884 for (k, v) in doc.data {
885 match v {
886 Value::String(s) => clean_doc.insert(k, JsonValue::String(s)),
887 Value::Int(i) => clean_doc.insert(k, JsonValue::Number(i.into())),
888 Value::Float(f) => {
889 if let Some(n) = serde_json::Number::from_f64(f) {
890 clean_doc.insert(k, JsonValue::Number(n))
891 } else {
892 clean_doc.insert(k, JsonValue::Null)
893 }
894 }
895 Value::Bool(b) => clean_doc.insert(k, JsonValue::Bool(b)),
896 Value::Array(arr) => {
897 let clean_arr: Vec<JsonValue> = arr
898 .into_iter()
899 .map(|v| match v {
900 Value::String(s) => JsonValue::String(s),
901 Value::Int(i) => JsonValue::Number(i.into()),
902 Value::Float(f) => serde_json::Number::from_f64(f)
903 .map(JsonValue::Number)
904 .unwrap_or(JsonValue::Null),
905 Value::Bool(b) => JsonValue::Bool(b),
906 Value::Null => JsonValue::Null,
907 _ => JsonValue::Null,
908 })
909 .collect();
910 clean_doc.insert(k, JsonValue::Array(clean_arr))
911 }
912 Value::Uuid(u) => {
913 clean_doc.insert(k, JsonValue::String(u.to_string()))
914 }
915 Value::Null => clean_doc.insert(k, JsonValue::Null),
916 Value::Object(_) => None, };
918 }
919 docs.push(JsonValue::Object(clean_doc));
920 }
921 }
922 }
923 }
924
925 let output = JsonValue::Object(serde_json::Map::from_iter(vec![(
926 collection.to_string(),
927 JsonValue::Array(docs),
928 )]));
929
930 let mut file = StdFile::create(&output_path)?;
931 serde_json::to_writer_pretty(&mut file, &output)?;
932 println!("Exported collection '{}' to {}", collection, &output_path);
933 Ok(())
934 }
935
936 pub fn export_as_csv(&self, collection: &str, filename: &str) -> Result<()> {
938 let output_path = if !filename.ends_with(".csv") {
939 format!("{}.csv", filename)
940 } else {
941 filename.to_string()
942 };
943
944 let mut writer = csv::Writer::from_path(&output_path)?;
945 let mut headers = Vec::new();
946 let mut first_doc = true;
947
948 for result in self.cold.scan() {
950 let (key, value) = result?;
951
952 if let Some(key_collection) = key.split(':').next() {
954 if key_collection == collection && !key.starts_with("_collection:") {
955 if let Ok(doc) = serde_json::from_slice::<Document>(&value) {
956 if first_doc && !doc.data.is_empty() {
958 headers = doc.data.keys().cloned().collect();
959 writer.write_record(&headers)?;
960 first_doc = false;
961 }
962
963 let values: Vec<String> = headers
965 .iter()
966 .map(|header| {
967 doc.data
968 .get(header)
969 .map(|v| v.to_string())
970 .unwrap_or_default()
971 })
972 .collect();
973 writer.write_record(&values)?;
974 }
975 }
976 }
977 }
978
979 writer.flush()?;
980 println!("Exported collection '{}' to {}", collection, &output_path);
981 Ok(())
982 }
983
984 pub fn find<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
986 self.query(collection)
987 }
988
989 pub async fn find_by_id(&self, collection: &str, id: &str) -> Result<Option<Document>> {
992 self.query(collection)
993 .filter(|f| f.eq("id", id))
994 .first_one()
995 .await
996 }
997
998 pub async fn find_one<F>(&self, collection: &str, filter_fn: F) -> Result<Option<Document>>
999 where
1000 F: Fn(&FilterBuilder) -> bool + 'static,
1001 {
1002 self.query(collection).filter(filter_fn).first_one().await
1003 }
1004
1005 pub async fn find_by_field<T: Into<Value> + Clone + 'static>(
1006 &self,
1007 collection: &str,
1008 field: &'static str,
1009 value: T,
1010 ) -> Result<Vec<Document>> {
1011 let value_clone = value.clone();
1012 self.query(collection)
1013 .filter(move |f| f.eq(field, value_clone.clone()))
1014 .collect()
1015 .await
1016 }
1017
1018 pub async fn find_by_fields(
1019 &self,
1020 collection: &str,
1021 fields: Vec<(&str, Value)>,
1022 ) -> Result<Vec<Document>> {
1023 let mut query = self.query(collection);
1024
1025 for (field, value) in fields {
1026 let field_owned = field.to_owned();
1027 let value_owned = value.clone();
1028 query = query.filter(move |f| f.eq(&field_owned, value_owned.clone()));
1029 }
1030
1031 query.collect().await
1032 }
1033
1034 pub async fn find_in_range<T: Into<Value> + Clone + 'static>(
1036 &self,
1037 collection: &str,
1038 field: &'static str,
1039 min: T,
1040 max: T,
1041 ) -> Result<Vec<Document>> {
1042 self.query(collection)
1043 .filter(move |f| f.between(field, min.clone(), max.clone()))
1044 .collect()
1045 .await
1046 }
1047
1048 pub async fn find_complex<'a>(&'a self, collection: &str) -> QueryBuilder<'a> {
1050 self.query(collection)
1051 }
1052
1053 pub fn advanced_search<'a>(&'a self, collection: &str) -> SearchBuilder<'a> {
1055 self.search(collection)
1056 }
1057
1058 pub async fn upsert(&self, collection: &str, id: &str, data: InsertData) -> Result<String> {
1060 if let Some(mut doc) = self.get_document(collection, id)? {
1062 for (key, value) in data {
1064 doc.data.insert(key.to_string(), value);
1065 }
1066
1067 self.put(
1069 format!("{}:{}", collection, id),
1070 serde_json::to_vec(&doc)?,
1071 None,
1072 )?;
1073
1074 Ok(id.to_string())
1075 } else {
1076 let mut data_with_id = data;
1078 data_with_id.push(("id", Value::String(id.to_string())));
1079 self.insert_into(collection, data_with_id)
1080 }
1081 }
1082
1083 pub async fn increment(
1085 &self,
1086 collection: &str,
1087 id: &str,
1088 field: &str,
1089 amount: i64,
1090 ) -> Result<i64> {
1091 if let Some(mut doc) = self.get_document(collection, id)? {
1092 let current = match doc.data.get(field) {
1094 Some(Value::Int(i)) => *i,
1095 _ => 0,
1096 };
1097
1098 let new_value = current + amount;
1100 doc.data.insert(field.to_string(), Value::Int(new_value));
1101
1102 self.put(
1104 format!("{}:{}", collection, id),
1105 serde_json::to_vec(&doc)?,
1106 None,
1107 )?;
1108
1109 Ok(new_value)
1110 } else {
1111 Err(AuroraError::NotFound(format!(
1112 "Document {}:{} not found",
1113 collection, id
1114 )))
1115 }
1116 }
1117
1118 pub async fn batch_insert(
1120 &self,
1121 collection: &str,
1122 docs: Vec<InsertData>,
1123 ) -> Result<Vec<String>> {
1124 self.begin_transaction()?;
1126
1127 let mut ids = Vec::with_capacity(docs.len());
1128
1129 for data in docs {
1131 match self.insert_into(collection, data) {
1132 Ok(id) => ids.push(id),
1133 Err(e) => {
1134 self.rollback_transaction()?;
1136 return Err(e);
1137 }
1138 }
1139 }
1140
1141 self.commit_transaction()?;
1143
1144 Ok(ids)
1145 }
1146
1147 pub async fn delete_by_query<F>(&self, collection: &str, filter_fn: F) -> Result<usize>
1149 where
1150 F: Fn(&FilterBuilder) -> bool + 'static,
1151 {
1152 let docs = self.query(collection).filter(filter_fn).collect().await?;
1153
1154 let mut deleted_count = 0;
1155
1156 for doc in docs {
1157 let key = format!("{}:{}", collection, doc.id);
1158 self.delete(&key).await?;
1159 deleted_count += 1;
1160 }
1161
1162 Ok(deleted_count)
1163 }
1164
1165 pub async fn import_from_json(&self, collection: &str, filename: &str) -> Result<ImportStats> {
1186 let collection_def = self.get_collection_definition(collection)?;
1188
1189 let json_string = read_to_string(filename)
1191 .await
1192 .map_err(|e| AuroraError::IoError(format!("Failed to read import file: {}", e)))?;
1193
1194 let documents: Vec<JsonValue> = from_str(&json_string)
1196 .map_err(|e| AuroraError::SerializationError(format!("Failed to parse JSON: {}", e)))?;
1197
1198 let mut stats = ImportStats::default();
1199
1200 for doc_json in documents {
1202 match self
1203 .import_document(collection, &collection_def, doc_json)
1204 .await
1205 {
1206 Ok(ImportResult::Imported) => stats.imported += 1,
1207 Ok(ImportResult::Skipped) => stats.skipped += 1,
1208 Err(_) => stats.failed += 1,
1209 }
1210 }
1211
1212 Ok(stats)
1213 }
1214
1215 async fn import_document(
1217 &self,
1218 collection: &str,
1219 collection_def: &Collection,
1220 doc_json: JsonValue,
1221 ) -> Result<ImportResult> {
1222 if !doc_json.is_object() {
1223 return Err(AuroraError::InvalidOperation("Expected JSON object".into()));
1224 }
1225
1226 let doc_id = doc_json
1228 .get("id")
1229 .and_then(|id| id.as_str())
1230 .map(|s| s.to_string())
1231 .unwrap_or_else(|| Uuid::new_v4().to_string());
1232
1233 if let Some(_) = self.get_document(collection, &doc_id)? {
1235 return Ok(ImportResult::Skipped);
1236 }
1237
1238 let mut data_map = HashMap::new();
1240
1241 if let Some(obj) = doc_json.as_object() {
1242 for (field_name, field_def) in &collection_def.fields {
1243 if let Some(json_value) = obj.get(field_name) {
1244 if !self.validate_field_value(json_value, &field_def.field_type) {
1246 return Err(AuroraError::InvalidOperation(format!(
1247 "Field '{}' has invalid type",
1248 field_name
1249 )));
1250 }
1251
1252 let value = self.json_to_value(json_value)?;
1254 data_map.insert(field_name.clone(), value);
1255 } else if field_def.unique {
1256 return Err(AuroraError::InvalidOperation(format!(
1258 "Missing required unique field '{}'",
1259 field_name
1260 )));
1261 }
1262 }
1263 }
1264
1265 for unique_field in &collection_def.unique_fields {
1267 if let Some(value) = data_map.get(unique_field) {
1268 let query_results = self
1270 .query(collection)
1271 .filter(move |f| f.eq(unique_field, value.clone()))
1272 .limit(1)
1273 .collect()
1274 .await?;
1275
1276 if !query_results.is_empty() {
1277 return Ok(ImportResult::Skipped);
1279 }
1280 }
1281 }
1282
1283 let document = Document {
1285 id: doc_id,
1286 data: data_map,
1287 };
1288
1289 self.put(
1290 format!("{}:{}", collection, document.id),
1291 serde_json::to_vec(&document)?,
1292 None,
1293 )?;
1294
1295 Ok(ImportResult::Imported)
1296 }
1297
1298 fn validate_field_value(&self, value: &JsonValue, field_type: &FieldType) -> bool {
1300 match field_type {
1301 FieldType::String => value.is_string(),
1302 FieldType::Int => value.is_i64() || value.is_u64(),
1303 FieldType::Float => value.is_number(),
1304 FieldType::Boolean => value.is_boolean(),
1305 FieldType::Array => value.is_array(),
1306 FieldType::Uuid => {
1307 value.is_string() && Uuid::parse_str(value.as_str().unwrap_or("")).is_ok()
1308 }
1309 }
1310 }
1311
1312 fn json_to_value(&self, json_value: &JsonValue) -> Result<Value> {
1314 match json_value {
1315 JsonValue::Null => Ok(Value::Null),
1316 JsonValue::Bool(b) => Ok(Value::Bool(*b)),
1317 JsonValue::Number(n) => {
1318 if let Some(i) = n.as_i64() {
1319 Ok(Value::Int(i))
1320 } else if let Some(f) = n.as_f64() {
1321 Ok(Value::Float(f))
1322 } else {
1323 Err(AuroraError::InvalidOperation("Invalid number value".into()))
1324 }
1325 }
1326 JsonValue::String(s) => {
1327 if let Ok(uuid) = Uuid::parse_str(s) {
1329 Ok(Value::Uuid(uuid))
1330 } else {
1331 Ok(Value::String(s.clone()))
1332 }
1333 }
1334 JsonValue::Array(arr) => {
1335 let mut values = Vec::new();
1336 for item in arr {
1337 values.push(self.json_to_value(item)?);
1338 }
1339 Ok(Value::Array(values))
1340 }
1341 JsonValue::Object(obj) => {
1342 let mut map = HashMap::new();
1343 for (k, v) in obj {
1344 map.insert(k.clone(), self.json_to_value(v)?);
1345 }
1346 Ok(Value::Object(map))
1347 }
1348 }
1349 }
1350
1351 fn get_collection_definition(&self, collection: &str) -> Result<Collection> {
1353 if let Some(data) = self.get(&format!("_collection:{}", collection))? {
1354 let collection_def: Collection = serde_json::from_slice(&data)?;
1355 Ok(collection_def)
1356 } else {
1357 Err(AuroraError::CollectionNotFound(collection.to_string()))
1358 }
1359 }
1360
1361 pub fn get_database_stats(&self) -> Result<DatabaseStats> {
1363 let hot_stats = self.hot.get_stats();
1364 let cold_stats = self.cold.get_stats()?;
1365
1366 Ok(DatabaseStats {
1367 hot_stats,
1368 cold_stats,
1369 estimated_size: self.cold.estimated_size(),
1370 collections: self.get_collection_stats()?,
1371 })
1372 }
1373
1374 pub fn get_hot_ref(&self, key: &str) -> Option<Arc<Vec<u8>>> {
1376 self.hot.get_ref(key)
1377 }
1378
1379 pub fn is_in_hot_cache(&self, key: &str) -> bool {
1381 self.hot.is_hot(key)
1382 }
1383
1384 pub async fn start_hot_cache_maintenance(&self, interval_secs: u64) {
1386 let hot_store = Arc::new(self.hot.clone());
1387 hot_store.start_cleanup_with_interval(interval_secs).await;
1388 }
1389
1390 pub fn clear_hot_cache(&self) {
1392 self.hot.clear();
1393 println!(
1394 "Hot cache cleared, current hit ratio: {:.2}%",
1395 self.hot.hit_ratio() * 100.0
1396 );
1397 }
1398
1399 pub fn batch_write(&self, pairs: Vec<(String, Vec<u8>)>) -> Result<()> {
1401 self.cold.batch_set(pairs)
1402 }
1403
1404 pub fn scan_with_prefix(
1406 &self,
1407 prefix: &str,
1408 ) -> impl Iterator<Item = Result<(String, Vec<u8>)>> + '_ {
1409 self.cold.scan_prefix(prefix)
1410 }
1411
1412 pub fn get_collection_stats(&self) -> Result<HashMap<String, CollectionStats>> {
1414 let mut stats = HashMap::new();
1415
1416 let collections: Vec<String> = self
1418 .cold
1419 .scan()
1420 .filter_map(|r| r.ok())
1421 .map(|(k, _)| k)
1422 .filter(|k| k.starts_with("_collection:"))
1423 .map(|k| k.trim_start_matches("_collection:").to_string())
1424 .collect();
1425
1426 for collection in collections {
1427 let prefix = format!("{}:", collection);
1428
1429 let count = self.cold.scan_prefix(&prefix).count();
1431
1432 let size: usize = self
1434 .cold
1435 .scan_prefix(&prefix)
1436 .filter_map(|r| r.ok())
1437 .map(|(_, v)| v.len())
1438 .sum();
1439
1440 stats.insert(
1441 collection,
1442 CollectionStats {
1443 count,
1444 size_bytes: size,
1445 avg_doc_size: if count > 0 { size / count } else { 0 },
1446 },
1447 );
1448 }
1449
1450 Ok(stats)
1451 }
1452
1453 pub fn search_by_value(
1457 &self,
1458 collection: &str,
1459 field: &str,
1460 value: &Value,
1461 ) -> Result<Vec<Document>> {
1462 let index_key = format!("_index:{}:{}", collection, field);
1463
1464 if let Some(index_data) = self.get(&index_key)? {
1465 let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
1466 let index = Index::new(index_def);
1467
1468 if let Some(doc_ids) = index.search(value) {
1470 let mut docs = Vec::new();
1472 for id in doc_ids {
1473 if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
1474 let doc: Document = serde_json::from_slice(&doc_data)?;
1475 docs.push(doc);
1476 }
1477 }
1478 return Ok(docs);
1479 }
1480 }
1481
1482 Ok(Vec::new())
1484 }
1485
1486 pub fn full_text_search(
1491 &self,
1492 collection: &str,
1493 field: &str,
1494 query: &str,
1495 ) -> Result<Vec<Document>> {
1496 let index_key = format!("_index:{}:{}", collection, field);
1497
1498 if let Some(index_data) = self.get(&index_key)? {
1499 let index_def: IndexDefinition = serde_json::from_slice(&index_data)?;
1500
1501 if !matches!(index_def.index_type, IndexType::FullText) {
1503 return Err(AuroraError::InvalidOperation(format!(
1504 "Field '{}' is not indexed as full-text",
1505 field
1506 )));
1507 }
1508
1509 let index = Index::new(index_def);
1510
1511 if let Some(doc_id_scores) = index.search_text(query) {
1513 let mut docs = Vec::new();
1515 for (id, _score) in doc_id_scores {
1516 if let Some(doc_data) = self.get(&format!("{}:{}", collection, id))? {
1517 let doc: Document = serde_json::from_slice(&doc_data)?;
1518 docs.push(doc);
1519 }
1520 }
1521 return Ok(docs);
1522 }
1523 }
1524
1525 Ok(Vec::new())
1527 }
1528
1529 pub fn create_text_index(
1531 &self,
1532 collection: &str,
1533 field: &str,
1534 _enable_stop_words: bool,
1535 ) -> Result<()> {
1536 if self.get(&format!("_collection:{}", collection))?.is_none() {
1538 return Err(AuroraError::CollectionNotFound(collection.to_string()));
1539 }
1540
1541 let index_def = IndexDefinition {
1543 name: format!("{}_{}_fulltext", collection, field),
1544 collection: collection.to_string(),
1545 fields: vec![field.to_string()],
1546 index_type: IndexType::FullText,
1547 unique: false,
1548 };
1549
1550 let index_key = format!("_index:{}:{}", collection, field);
1552 self.put(index_key, serde_json::to_vec(&index_def)?, None)?;
1553
1554 let index = Index::new(index_def);
1556
1557 let prefix = format!("{}:", collection);
1559 for result in self.cold.scan_prefix(&prefix) {
1560 if let Ok((_, data)) = result {
1561 let doc: Document = serde_json::from_slice(&data)?;
1562 index.insert(&doc)?;
1563 }
1564 }
1565
1566 Ok(())
1567 }
1568
1569 pub async fn execute_simple_query(
1570 &self,
1571 builder: &SimpleQueryBuilder,
1572 ) -> Result<Vec<Document>> {
1573 let mut docs = self.get_all_collection(&builder.collection).await?;
1574
1575 docs.retain(|doc| {
1576 builder.filters.iter().all(|f| match f {
1577 Filter::Eq(field, value) => doc.data.get(field).map_or(false, |v| v == value),
1578 Filter::Gt(field, value) => doc.data.get(field).map_or(false, |v| v > value),
1579 Filter::Lt(field, value) => doc.data.get(field).map_or(false, |v| v < value),
1580 Filter::Contains(field, value_str) => {
1581 doc.data.get(field).map_or(false, |v| match v {
1582 Value::String(s) => s.contains(value_str),
1583 Value::Array(arr) => arr.contains(&Value::String(value_str.clone())),
1584 _ => false,
1585 })
1586 }
1587 })
1588 });
1589
1590 Ok(docs)
1592 }
1593
1594 pub async fn execute_dynamic_query(
1595 &self,
1596 collection: &str,
1597 payload: &QueryPayload,
1598 ) -> Result<Vec<Document>> {
1599 let mut docs = self.get_all_collection(collection).await?;
1600
1601 if let Some(filters) = &payload.filters {
1603 docs.retain(|doc| {
1604 filters.iter().all(|filter| {
1605 doc.data
1606 .get(&filter.field)
1607 .map_or(false, |doc_val| check_filter(doc_val, filter))
1608 })
1609 });
1610 }
1611
1612 if let Some(sort_options) = &payload.sort {
1614 docs.sort_by(|a, b| {
1615 let a_val = a.data.get(&sort_options.field);
1616 let b_val = b.data.get(&sort_options.field);
1617 let ordering = a_val
1618 .partial_cmp(&b_val)
1619 .unwrap_or(std::cmp::Ordering::Equal);
1620 if sort_options.ascending {
1621 ordering
1622 } else {
1623 ordering.reverse()
1624 }
1625 });
1626 }
1627
1628 let mut docs = docs;
1630 if let Some(offset) = payload.offset {
1631 docs = docs.into_iter().skip(offset).collect();
1632 }
1633 if let Some(limit) = payload.limit {
1634 docs = docs.into_iter().take(limit).collect();
1635 }
1636
1637 if let Some(select_fields) = &payload.select {
1639 if !select_fields.is_empty() {
1640 docs = docs
1641 .into_iter()
1642 .map(|mut doc| {
1643 doc.data.retain(|key, _| select_fields.contains(key));
1644 doc
1645 })
1646 .collect();
1647 }
1648 }
1649
1650 Ok(docs)
1651 }
1652
1653 pub async fn process_network_request(
1654 &self,
1655 request: crate::network::protocol::Request,
1656 ) -> crate::network::protocol::Response {
1657 use crate::network::protocol::Response;
1658
1659 match request {
1660 crate::network::protocol::Request::Get(key) => match self.get(&key) {
1661 Ok(value) => Response::Success(value),
1662 Err(e) => Response::Error(e.to_string()),
1663 },
1664 crate::network::protocol::Request::Put(key, value) => {
1665 match self.put(key, value, None) {
1666 Ok(_) => Response::Done,
1667 Err(e) => Response::Error(e.to_string()),
1668 }
1669 }
1670 crate::network::protocol::Request::Delete(key) => match self.delete(&key).await {
1671 Ok(_) => Response::Done,
1672 Err(e) => Response::Error(e.to_string()),
1673 },
1674 crate::network::protocol::Request::NewCollection { name, fields } => {
1675 let fields_for_db: Vec<(String, crate::types::FieldType, bool)> = fields
1676 .iter()
1677 .map(|(name, ft, unique)| (name.clone(), ft.clone(), *unique))
1678 .collect();
1679
1680 match self.new_collection(&name, fields_for_db) {
1681 Ok(_) => Response::Done,
1682 Err(e) => Response::Error(e.to_string()),
1683 }
1684 }
1685 crate::network::protocol::Request::Insert { collection, data } => {
1686 match self.insert_map(&collection, data) {
1687 Ok(id) => Response::Message(id),
1688 Err(e) => Response::Error(e.to_string()),
1689 }
1690 }
1691 crate::network::protocol::Request::GetDocument { collection, id } => {
1692 match self.get_document(&collection, &id) {
1693 Ok(doc) => Response::Document(doc),
1694 Err(e) => Response::Error(e.to_string()),
1695 }
1696 }
1697 crate::network::protocol::Request::Query(builder) => {
1698 match self.execute_simple_query(&builder).await {
1699 Ok(docs) => Response::Documents(docs),
1700 Err(e) => Response::Error(e.to_string()),
1701 }
1702 }
1703 crate::network::protocol::Request::BeginTransaction => match self.begin_transaction() {
1704 Ok(_) => Response::Done,
1705 Err(e) => Response::Error(e.to_string()),
1706 },
1707 crate::network::protocol::Request::CommitTransaction => match self.commit_transaction()
1708 {
1709 Ok(_) => Response::Done,
1710 Err(e) => Response::Error(e.to_string()),
1711 },
1712 crate::network::protocol::Request::RollbackTransaction => {
1713 match self.rollback_transaction() {
1714 Ok(_) => Response::Done,
1715 Err(e) => Response::Error(e.to_string()),
1716 }
1717 }
1718 }
1719 }
1720}
1721
1722fn check_filter(doc_val: &Value, filter: &HttpFilter) -> bool {
1723 let filter_val = match json_to_value(&filter.value) {
1724 Ok(v) => v,
1725 Err(_) => return false,
1726 };
1727
1728 match filter.operator {
1729 FilterOperator::Eq => doc_val == &filter_val,
1730 FilterOperator::Ne => doc_val != &filter_val,
1731 FilterOperator::Gt => doc_val > &filter_val,
1732 FilterOperator::Gte => doc_val >= &filter_val,
1733 FilterOperator::Lt => doc_val < &filter_val,
1734 FilterOperator::Lte => doc_val <= &filter_val,
1735 FilterOperator::Contains => match (doc_val, &filter_val) {
1736 (Value::String(s), Value::String(fv)) => s.contains(fv),
1737 (Value::Array(arr), _) => arr.contains(&filter_val),
1738 _ => false,
1739 },
1740 }
1741}
1742
1743enum ImportResult {
1745 Imported,
1746 Skipped,
1747}
1748
1749#[derive(Debug, Default)]
1751pub struct ImportStats {
1752 pub imported: usize,
1754 pub skipped: usize,
1756 pub failed: usize,
1758}
1759
1760#[derive(Debug)]
1762pub struct CollectionStats {
1763 pub count: usize,
1765 pub size_bytes: usize,
1767 pub avg_doc_size: usize,
1769}
1770
1771#[derive(Debug)]
1773pub struct DatabaseStats {
1774 pub hot_stats: crate::storage::hot::CacheStats,
1776 pub cold_stats: crate::storage::cold::ColdStoreStats,
1778 pub estimated_size: u64,
1780 pub collections: HashMap<String, CollectionStats>,
1782}
1783
1784#[cfg(test)]
1785mod tests {
1786 use super::*;
1787 use tempfile::tempdir;
1788
1789 #[tokio::test]
1790 async fn test_basic_operations() -> Result<()> {
1791 let temp_dir = tempdir()?;
1792 let db_path = temp_dir.path().join("test.aurora");
1793 let db = Aurora::open(db_path.to_str().unwrap())?;
1794
1795 db.new_collection(
1797 "users",
1798 vec![
1799 ("name".to_string(), FieldType::String, false),
1800 ("age".to_string(), FieldType::Int, false),
1801 ("email".to_string(), FieldType::String, true),
1802 ],
1803 )?;
1804
1805 let doc_id = db.insert_into(
1807 "users",
1808 vec![
1809 ("name", Value::String("John Doe".to_string())),
1810 ("age", Value::Int(30)),
1811 ("email", Value::String("john@example.com".to_string())),
1812 ],
1813 )?;
1814
1815 let doc = db.get_document("users", &doc_id)?.unwrap();
1817 assert_eq!(
1818 doc.data.get("name").unwrap(),
1819 &Value::String("John Doe".to_string())
1820 );
1821 assert_eq!(doc.data.get("age").unwrap(), &Value::Int(30));
1822
1823 Ok(())
1824 }
1825
1826 #[tokio::test]
1827 async fn test_transactions() -> Result<()> {
1828 let temp_dir = tempdir()?;
1829 let db_path = temp_dir.path().join("test.aurora");
1830 let db = Aurora::open(db_path.to_str().unwrap())?;
1831
1832 db.begin_transaction()?;
1834
1835 let doc_id = db.insert_into("test", vec![("field", Value::String("value".to_string()))])?;
1837
1838 db.commit_transaction()?;
1840
1841 let doc = db.get_document("test", &doc_id)?.unwrap();
1843 assert_eq!(
1844 doc.data.get("field").unwrap(),
1845 &Value::String("value".to_string())
1846 );
1847
1848 Ok(())
1849 }
1850
1851 #[tokio::test]
1852 async fn test_query_operations() -> Result<()> {
1853 let temp_dir = tempdir()?;
1854 let db_path = temp_dir.path().join("test.aurora");
1855 let db = Aurora::open(db_path.to_str().unwrap())?;
1856
1857 db.new_collection(
1859 "books",
1860 vec![
1861 ("title".to_string(), FieldType::String, false),
1862 ("author".to_string(), FieldType::String, false),
1863 ("year".to_string(), FieldType::Int, false),
1864 ],
1865 )?;
1866
1867 db.insert_into(
1869 "books",
1870 vec![
1871 ("title", Value::String("Book 1".to_string())),
1872 ("author", Value::String("Author 1".to_string())),
1873 ("year", Value::Int(2020)),
1874 ],
1875 )?;
1876
1877 db.insert_into(
1878 "books",
1879 vec![
1880 ("title", Value::String("Book 2".to_string())),
1881 ("author", Value::String("Author 2".to_string())),
1882 ("year", Value::Int(2021)),
1883 ],
1884 )?;
1885
1886 let results = db
1888 .query("books")
1889 .filter(|f| f.gt("year", Value::Int(2019)))
1890 .order_by("year", true)
1891 .collect()
1892 .await?;
1893
1894 assert_eq!(results.len(), 2);
1895 assert!(results[0].data.get("year").unwrap() < results[1].data.get("year").unwrap());
1896
1897 Ok(())
1898 }
1899
1900 #[tokio::test]
1901 async fn test_blob_operations() -> Result<()> {
1902 let temp_dir = tempdir()?;
1903 let db_path = temp_dir.path().join("test.aurora");
1904 let db = Aurora::open(db_path.to_str().unwrap())?;
1905
1906 let file_path = temp_dir.path().join("test.txt");
1908 std::fs::write(&file_path, b"Hello, World!")?;
1909
1910 db.put_blob("test:blob".to_string(), &file_path).await?;
1912
1913 let data = db.get_data_by_pattern("test:blob")?;
1915 assert_eq!(data.len(), 1);
1916 match &data[0].1 {
1917 DataInfo::Blob { size } => assert_eq!(*size, 13 + 5), _ => panic!("Expected Blob type"),
1919 }
1920
1921 Ok(())
1922 }
1923
1924 #[tokio::test]
1925 async fn test_blob_size_limit() -> Result<()> {
1926 let temp_dir = tempdir()?;
1927 let db_path = temp_dir.path().join("test.aurora");
1928 let db = Aurora::open(db_path.to_str().unwrap())?;
1929
1930 let large_file_path = temp_dir.path().join("large_file.bin");
1932 let large_data = vec![0u8; 201 * 1024 * 1024];
1933 std::fs::write(&large_file_path, &large_data)?;
1934
1935 let result = db
1937 .put_blob("test:large_blob".to_string(), &large_file_path)
1938 .await;
1939
1940 assert!(result.is_err());
1941 assert!(matches!(
1942 result.unwrap_err(),
1943 AuroraError::InvalidOperation(_)
1944 ));
1945
1946 Ok(())
1947 }
1948}