1#![allow(warnings)]
2#![doc = include_str!("../README.md")]
3
4use chrono;
111use serde::{Deserialize, Serialize};
112use serde_json::{Map, Value as Document, Value};
113use std::clone;
114use std::cmp::Ordering;
115use std::collections::{BTreeMap, HashMap, HashSet};
116use std::fs::{self, File};
117use std::io::{BufRead, BufReader, BufWriter};
118use std::io::{Read, Write};
119use std::net::{SocketAddr, TcpListener, TcpStream};
120use std::ops::Bound::{Excluded, Included, Unbounded};
121use std::path::{Path, PathBuf};
122use std::sync::{Arc, RwLock};
123use std::thread;
124use thiserror::Error;
125use uuid::Uuid;
126
127pub mod config;
129pub mod encryption;
130
131#[derive(Error, Debug)]
133pub enum DbError {
135 #[error("Document not found")]
136 NotFound,
138 #[error("Invalid query: {0}")]
139 InvalidQuery(String),
141 #[error("Serialization error: {0}")]
142 Serialization(#[from] serde_json::Error),
144 #[error("IO error: {0}")]
145 IoError(#[from] std::io::Error),
147 #[error("Other error: {0}")]
148 Other(String),
150 #[error("Index error: {0}")]
151 IndexError(String),
153 #[error("Update error: {0}")]
154 UpdateError(String),
156 #[error("Validation error: {0}")]
157 ValidationError(String),
159 #[error("Connection error: {0}")]
160 ConnectionError(String),
162 #[error("Protocol error: {0}")]
163 ProtocolError(String),
165 #[error("Encryption error: {0}")]
166 EncryptionError(String),
168 #[error("Key management error: {0}")]
169 KeyManagementError(String),
171 #[error("Configuration error: {0}")]
172 ConfigurationError(String),
174}
175pub type Result<T> = std::result::Result<T, DbError>;
176
177#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
179pub struct DocId(String);
180impl DocId {
181 pub fn new() -> Self {
182 Self(Uuid::new_v4().to_string())
183 }
184
185 pub fn from_str(s: &str) -> Self {
186 Self(s.to_string())
187 }
188
189 pub fn as_str(&self) -> &str {
190 &self.0
191 }
192}
193impl std::fmt::Display for DocId {
194 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195 write!(f, "{}", self.0)
196 }
197}
198
199#[derive(Debug, Clone, Serialize, Deserialize)]
201pub enum QueryOperator {
202 Eq(Value),
203 Ne(Value),
204 Gt(Value),
205 Gte(Value),
206 Lt(Value),
207 Lte(Value),
208 In(Vec<Value>),
209 Nin(Vec<Value>),
210 Exists(bool),
211 Regex(String),
212 And(Vec<Query>),
213 Or(Vec<Query>),
214 Nor(Vec<Query>),
215 Not(Box<QueryOperator>),
216 All(Vec<Value>),
218 ElemMatch(Query),
219 Size(usize),
220 Near {
222 point: (f64, f64),
223 max_distance: Option<f64>,
224 },
225 Within {
226 shape: GeoShape,
227 },
228 Intersects {
229 shape: GeoShape,
230 },
231}
232
233#[derive(Debug, Clone, Serialize, Deserialize)]
234pub enum GeoShape {
235 Box {
236 bottom_left: (f64, f64),
237 top_right: (f64, f64),
238 },
239 Polygon {
240 points: Vec<(f64, f64)>,
241 },
242 Center {
243 center: (f64, f64),
244 radius: f64,
245 },
246}
247
248#[derive(Debug, Clone, Serialize, Deserialize)]
250pub struct QueryCondition {
251 field: String,
252 operator: QueryOperator,
253}
254
255#[derive(Debug, Clone, Serialize, Deserialize)]
257pub struct Query {
282 conditions: Vec<QueryCondition>,
284 logical_ops: Vec<(LogicalOp, Vec<Query>)>,
286}
287
288#[derive(Debug, Clone, Serialize, Deserialize)]
289pub enum LogicalOp {
290 And,
291 Or,
292 Nor,
293 Not,
294}
295impl Query {
297 pub fn from_value(value: Value) -> Result<Self> {
310 match value {
311 Value::Object(map) => {
312 let mut query = Query::new();
313
314 for (field, op_value) in map {
315 match op_value {
316 Value::Object(op_map) => {
317 for (op, val) in op_map {
318 match op.as_str() {
319 "$eq" => query = query.eq(&field, val.clone()),
320 "$ne" => query = query.ne(&field, val.clone()),
321 "$gt" => query = query.gt(&field, val.clone()),
322 "$gte" => query = query.gte(&field, val.clone()),
323 "$lt" => query = query.lt(&field, val.clone()),
324 "$lte" => query = query.lte(&field, val.clone()),
325 "$in" => {
326 if let Value::Array(arr) = val {
327 query = query.in_(&field, arr.clone());
328 }
329 }
330 "$nin" => {
331 if let Value::Array(arr) = val {
332 query = query.nin(&field, arr.clone());
333 }
334 }
335 "$exists" => {
336 if let Value::Bool(exists) = val {
337 query = query.exists(&field, exists);
338 }
339 }
340 "$regex" => {
341 if let Value::String(pattern) = val {
342 query = query.regex(&field, &pattern);
343 }
344 }
345 _ => {
346 return Err(DbError::InvalidQuery(format!(
347 "Unknown operator: {}",
348 op
349 )))
350 }
351 }
352 }
353 }
354 _ => query = query.eq(&field, op_value.clone()),
355 }
356 }
357
358 Ok(query)
359 }
360 _ => Err(DbError::InvalidQuery("Query must be an object".to_string())),
361 }
362 }
363}
364
365impl Query {
366 pub fn new() -> Self {
370 Self {
371 conditions: Vec::new(),
372 logical_ops: Vec::new(),
373 }
374 }
375
376 pub fn eq(mut self, key: &str, value: Value) -> Self {
387 self.conditions.push(QueryCondition {
388 field: key.to_string(),
389 operator: QueryOperator::Eq(value),
390 });
391 self
392 }
393
394 pub fn ne(mut self, key: &str, value: Value) -> Self {
405 self.conditions.push(QueryCondition {
406 field: key.to_string(),
407 operator: QueryOperator::Ne(value),
408 });
409 self
410 }
411
412 pub fn gt(mut self, key: &str, value: Value) -> Self {
423 self.conditions.push(QueryCondition {
424 field: key.to_string(),
425 operator: QueryOperator::Gt(value),
426 });
427 self
428 }
429
430 pub fn gte(mut self, key: &str, value: Value) -> Self {
441 self.conditions.push(QueryCondition {
442 field: key.to_string(),
443 operator: QueryOperator::Gte(value),
444 });
445 self
446 }
447
448 pub fn lt(mut self, key: &str, value: Value) -> Self {
459 self.conditions.push(QueryCondition {
460 field: key.to_string(),
461 operator: QueryOperator::Lt(value),
462 });
463 self
464 }
465
466 pub fn lte(mut self, key: &str, value: Value) -> Self {
477 self.conditions.push(QueryCondition {
478 field: key.to_string(),
479 operator: QueryOperator::Lte(value),
480 });
481 self
482 }
483
484 pub fn in_(mut self, key: &str, values: Vec<Value>) -> Self {
497 self.conditions.push(QueryCondition {
498 field: key.to_string(),
499 operator: QueryOperator::In(values),
500 });
501 self
502 }
503
504 pub fn nin(mut self, key: &str, values: Vec<Value>) -> Self {
517 self.conditions.push(QueryCondition {
518 field: key.to_string(),
519 operator: QueryOperator::Nin(values),
520 });
521 self
522 }
523
524 pub fn exists(mut self, key: &str, exists: bool) -> Self {
537 self.conditions.push(QueryCondition {
538 field: key.to_string(),
539 operator: QueryOperator::Exists(exists),
540 });
541 self
542 }
543
544 pub fn regex(mut self, key: &str, pattern: &str) -> Self {
557 self.conditions.push(QueryCondition {
558 field: key.to_string(),
559 operator: QueryOperator::Regex(pattern.to_string()),
560 });
561 self
562 }
563
564 pub fn and(mut self, queries: Vec<Query>) -> Self {
576 self.logical_ops.push((LogicalOp::And, queries));
577 self
578 }
579
580 pub fn or(mut self, queries: Vec<Query>) -> Self {
592 self.logical_ops.push((LogicalOp::Or, queries));
593 self
594 }
595
596 pub fn nor(mut self, queries: Vec<Query>) -> Self {
608 self.logical_ops.push((LogicalOp::Nor, queries));
609 self
610 }
611
612 pub fn not(mut self, query: Query) -> Self {
625 if let Some(cond) = query.conditions.first() {
627 self.conditions.push(QueryCondition {
628 field: cond.field.clone(),
629 operator: QueryOperator::Not(Box::new(cond.operator.clone())),
630 });
631 }
632 self
633 }
634
635 pub fn all(mut self, key: &str, values: Vec<Value>) -> Self {
646 self.conditions.push(QueryCondition {
647 field: key.to_string(),
648 operator: QueryOperator::All(values),
649 });
650 self
651 }
652
653 pub fn elem_match(mut self, key: &str, query: Query) -> Self {
664 self.conditions.push(QueryCondition {
665 field: key.to_string(),
666 operator: QueryOperator::ElemMatch(query),
667 });
668 self
669 }
670
671 pub fn size(mut self, key: &str, size: usize) -> Self {
682 self.conditions.push(QueryCondition {
683 field: key.to_string(),
684 operator: QueryOperator::Size(size),
685 });
686 self
687 }
688
689 pub fn near(mut self, key: &str, point: (f64, f64), max_distance: Option<f64>) -> Self {
701 self.conditions.push(QueryCondition {
702 field: key.to_string(),
703 operator: QueryOperator::Near {
704 point,
705 max_distance,
706 },
707 });
708 self
709 }
710
711 pub fn within(mut self, key: &str, shape: GeoShape) -> Self {
722 self.conditions.push(QueryCondition {
723 field: key.to_string(),
724 operator: QueryOperator::Within { shape },
725 });
726 self
727 }
728
729 pub fn intersects(mut self, key: &str, shape: GeoShape) -> Self {
740 self.conditions.push(QueryCondition {
741 field: key.to_string(),
742 operator: QueryOperator::Intersects { shape },
743 });
744 self
745 }
746
747 pub fn matches(&self, doc: &Document) -> bool {
760 for cond in &self.conditions {
762 if !self.matches_condition(doc, cond) {
763 return false;
764 }
765 }
766
767 for (op, queries) in &self.logical_ops {
769 match op {
770 LogicalOp::And => {
771 if !queries.iter().all(|q| q.matches(doc)) {
772 return false;
773 }
774 }
775 LogicalOp::Or => {
776 if !queries.iter().any(|q| q.matches(doc)) {
777 return false;
778 }
779 }
780 LogicalOp::Nor => {
781 if queries.iter().any(|q| q.matches(doc)) {
782 return false;
783 }
784 }
785 LogicalOp::Not => {
786 if queries.iter().any(|q| q.matches(doc)) {
787 return false;
788 }
789 }
790 }
791 }
792
793 true
794 }
795
796 fn matches_condition(&self, doc: &Document, cond: &QueryCondition) -> bool {
797 let field_parts: Vec<&str> = cond.field.split('.').collect();
798 let value = self.get_nested_value(doc, &field_parts);
799
800 match &cond.operator {
801 QueryOperator::Eq(expected) => match value {
802 Some(v) => v == expected,
803 None => false,
804 },
805 QueryOperator::Ne(expected) => match value {
806 Some(v) => v != expected,
807 None => true,
808 },
809 QueryOperator::Gt(expected) => match (value, expected) {
810 (Some(Value::Number(a)), Value::Number(b)) => {
811 a.as_f64().unwrap_or(0.0) > b.as_f64().unwrap_or(0.0)
812 }
813 _ => false,
814 },
815 QueryOperator::Gte(expected) => match (value, expected) {
816 (Some(Value::Number(a)), Value::Number(b)) => {
817 a.as_f64().unwrap_or(0.0) >= b.as_f64().unwrap_or(0.0)
818 }
819 _ => false,
820 },
821 QueryOperator::Lt(expected) => match (value, expected) {
822 (Some(Value::Number(a)), Value::Number(b)) => {
823 a.as_f64().unwrap_or(0.0) < b.as_f64().unwrap_or(0.0)
824 }
825 _ => false,
826 },
827 QueryOperator::Lte(expected) => match (value, expected) {
828 (Some(Value::Number(a)), Value::Number(b)) => {
829 a.as_f64().unwrap_or(0.0) <= b.as_f64().unwrap_or(0.0)
830 }
831 _ => false,
832 },
833 QueryOperator::In(values) => match value {
834 Some(v) => values.contains(v),
835 None => false,
836 },
837 QueryOperator::Nin(values) => match value {
838 Some(v) => !values.contains(v),
839 None => true,
840 },
841 QueryOperator::Exists(exists) => value.is_some() == *exists,
842 QueryOperator::Regex(pattern) => {
843 match value {
844 Some(Value::String(s)) => {
845 s.contains(pattern)
847 }
848 _ => false,
849 }
850 }
851 QueryOperator::All(values) => match value {
852 Some(Value::Array(arr)) => values.iter().all(|v| arr.contains(v)),
853 _ => false,
854 },
855 QueryOperator::ElemMatch(query) => match value {
856 Some(Value::Array(arr)) => arr.iter().any(|elem| query.matches(elem)),
857 _ => false,
858 },
859 QueryOperator::Size(size) => match value {
860 Some(Value::Array(arr)) => arr.len() == *size,
861 _ => false,
862 },
863 QueryOperator::Near {
864 point,
865 max_distance,
866 } => {
867 match value {
869 Some(Value::Array(arr)) if arr.len() >= 2 => {
870 if let (Some(Value::Number(x)), Some(Value::Number(y))) =
871 (arr.get(0), arr.get(1))
872 {
873 let doc_x = x.as_f64().unwrap_or(0.0);
874 let doc_y = y.as_f64().unwrap_or(0.0);
875
876 let distance =
878 ((doc_x - point.0).powi(2) + (doc_y - point.1).powi(2)).sqrt();
879
880 if let Some(max) = max_distance {
881 distance <= *max
882 } else {
883 true
884 }
885 } else {
886 false
887 }
888 }
889 _ => false,
890 }
891 }
892 QueryOperator::Within { shape } => {
893 match (value, shape) {
895 (
896 Some(Value::Array(arr)),
897 GeoShape::Box {
898 bottom_left,
899 top_right,
900 },
901 ) if arr.len() >= 2 => {
902 if let (Some(Value::Number(x)), Some(Value::Number(y))) =
903 (arr.get(0), arr.get(1))
904 {
905 let doc_x = x.as_f64().unwrap_or(0.0);
906 let doc_y = y.as_f64().unwrap_or(0.0);
907
908 doc_x >= bottom_left.0
909 && doc_x <= top_right.0
910 && doc_y >= bottom_left.1
911 && doc_y <= top_right.1
912 } else {
913 false
914 }
915 }
916 _ => false,
917 }
918 }
919 QueryOperator::Intersects { shape } => {
920 match (value, shape) {
922 (
923 Some(Value::Array(arr)),
924 GeoShape::Box {
925 bottom_left,
926 top_right,
927 },
928 ) if arr.len() >= 2 => {
929 if let (Some(Value::Number(x)), Some(Value::Number(y))) =
930 (arr.get(0), arr.get(1))
931 {
932 let doc_x = x.as_f64().unwrap_or(0.0);
933 let doc_y = y.as_f64().unwrap_or(0.0);
934
935 doc_x >= bottom_left.0
936 && doc_x <= top_right.0
937 && doc_y >= bottom_left.1
938 && doc_y <= top_right.1
939 } else {
940 false
941 }
942 }
943 _ => false,
944 }
945 }
946 QueryOperator::And(_)
947 | QueryOperator::Or(_)
948 | QueryOperator::Nor(_)
949 | QueryOperator::Not(_) => {
950 true
952 }
953 }
954 }
955
956 fn get_nested_value<'a>(&self, doc: &'a Value, path: &[&str]) -> Option<&'a Value> {
957 if path.is_empty() {
958 return Some(doc);
959 }
960
961 let key = path[0];
962 let rest = &path[1..];
963
964 match doc {
965 Value::Object(map) => {
966 if let Some(next_value) = map.get(key) {
967 self.get_nested_value(next_value, rest)
968 } else {
969 None
970 }
971 }
972 _ => None,
973 }
974 }
975}
976
977#[derive(Debug, Clone, Serialize, Deserialize)]
979pub enum UpdateOperator {
980 Set(Map<String, Value>),
981 Unset(Vec<String>),
982 Inc(Map<String, Value>),
983 Mul(Map<String, Value>),
984 Rename(Vec<(String, String)>),
985 SetOnInsert(Map<String, Value>),
986 Min(Map<String, Value>),
987 Max(Map<String, Value>),
988 CurrentDate(Map<String, Value>),
989 Push(Map<String, Value>),
991 PushAll(BTreeMap<String, Vec<Value>>),
992 AddToSet(Map<String, Value>),
993 Pop(BTreeMap<String, i64>),
994 Pull(Map<String, Value>),
995 PullAll(BTreeMap<String, Vec<Value>>),
996 Bit(Map<String, Value>),
997}
998
999#[derive(Debug, Clone, Serialize, Deserialize)]
1001pub struct UpdateDocument {
1002 operators: Vec<UpdateOperator>,
1003}
1004
1005impl UpdateDocument {
1006 pub fn new() -> Self {
1007 Self {
1008 operators: Vec::new(),
1009 }
1010 }
1011
1012 pub fn set(mut self, key: &str, value: Value) -> Self {
1013 let mut map = Map::new();
1014 map.insert(key.to_string(), value);
1015 self.operators.push(UpdateOperator::Set(map));
1016 self
1017 }
1018
1019 pub fn unset(mut self, key: &str) -> Self {
1020 self.operators
1021 .push(UpdateOperator::Unset(vec![key.to_string()]));
1022 self
1023 }
1024
1025 pub fn inc(mut self, key: &str, value: Value) -> Self {
1026 let mut map = Map::new();
1027 map.insert(key.to_string(), value);
1028 self.operators.push(UpdateOperator::Inc(map));
1029 self
1030 }
1031
1032 pub fn mul(mut self, key: &str, value: Value) -> Self {
1033 let mut map = Map::new();
1034 map.insert(key.to_string(), value);
1035 self.operators.push(UpdateOperator::Mul(map));
1036 self
1037 }
1038
1039 pub fn rename(mut self, old_key: &str, new_key: &str) -> Self {
1040 self.operators.push(UpdateOperator::Rename(vec![(
1041 old_key.to_string(),
1042 new_key.to_string(),
1043 )]));
1044 self
1045 }
1046
1047 pub fn set_on_insert(mut self, key: &str, value: Value) -> Self {
1048 let mut map = Map::new();
1049 map.insert(key.to_string(), value);
1050 self.operators.push(UpdateOperator::SetOnInsert(map));
1051 self
1052 }
1053
1054 pub fn min(mut self, key: &str, value: Value) -> Self {
1055 let mut map = Map::new();
1056 map.insert(key.to_string(), value);
1057 self.operators.push(UpdateOperator::Min(map));
1058 self
1059 }
1060
1061 pub fn max(mut self, key: &str, value: Value) -> Self {
1062 let mut map = Map::new();
1063 map.insert(key.to_string(), value);
1064 self.operators.push(UpdateOperator::Max(map));
1065 self
1066 }
1067
1068 pub fn current_date(mut self, key: &str, type_spec: Value) -> Self {
1069 let mut map = Map::new();
1070 map.insert(key.to_string(), type_spec);
1071 self.operators.push(UpdateOperator::CurrentDate(map));
1072 self
1073 }
1074
1075 pub fn push(mut self, key: &str, value: Value) -> Self {
1077 let mut map = Map::new();
1078 map.insert(key.to_string(), value);
1079 self.operators.push(UpdateOperator::Push(map));
1080 self
1081 }
1082
1083 pub fn push_all(mut self, key: &str, values: Vec<Value>) -> Self {
1084 let mut btree_map = BTreeMap::new();
1085 btree_map.insert(key.to_string(), values);
1086 self.operators.push(UpdateOperator::PushAll(btree_map));
1087 self
1088 }
1089
1090 pub fn add_to_set(mut self, key: &str, value: Value) -> Self {
1091 let mut map = Map::new();
1092 map.insert(key.to_string(), value);
1093 self.operators.push(UpdateOperator::AddToSet(map));
1094 self
1095 }
1096
1097 pub fn pop(mut self, key: &str, pos: i64) -> Self {
1098 let mut btree_map = BTreeMap::new();
1099 btree_map.insert(key.to_string(), pos);
1100 self.operators.push(UpdateOperator::Pop(btree_map));
1101 self
1102 }
1103
1104 pub fn pull(mut self, key: &str, condition: Value) -> Self {
1105 let mut map = Map::new();
1106 map.insert(key.to_string(), condition);
1107 self.operators.push(UpdateOperator::Pull(map));
1108 self
1109 }
1110
1111 pub fn pull_all(mut self, key: &str, values: Vec<Value>) -> Self {
1112 let mut btree_map = BTreeMap::new();
1113 btree_map.insert(key.to_string(), values);
1114 self.operators.push(UpdateOperator::PullAll(btree_map));
1115 self
1116 }
1117
1118 pub fn bit(mut self, key: &str, operation: Value) -> Self {
1120 let mut map = Map::new();
1121 map.insert(key.to_string(), operation);
1122 self.operators.push(UpdateOperator::Bit(map));
1123 self
1124 }
1125}
1126
1127#[derive(Debug, Clone, Serialize, Deserialize)]
1129pub enum IndexType {
1130 Ascending,
1131 Descending,
1132 Geospatial,
1133 Text,
1134 Hashed,
1135}
1136
1137#[derive(Debug, Clone, Serialize, Deserialize)]
1138pub struct Index {
1139 name: String,
1140 key: Vec<(String, IndexType)>,
1141 unique: bool,
1142 sparse: bool,
1143 background: bool,
1144}
1145
1146impl Index {
1147 pub fn new(name: String, key: Vec<(String, IndexType)>) -> Self {
1148 Self {
1149 name,
1150 key,
1151 unique: false,
1152 sparse: false,
1153 background: false,
1154 }
1155 }
1156
1157 pub fn unique(mut self, unique: bool) -> Self {
1158 self.unique = unique;
1159 self
1160 }
1161
1162 pub fn sparse(mut self, sparse: bool) -> Self {
1163 self.sparse = sparse;
1164 self
1165 }
1166
1167 pub fn background(mut self, background: bool) -> Self {
1168 self.background = background;
1169 self
1170 }
1171}
1172
1173#[derive(Debug, Clone, Serialize, Deserialize)]
1175struct IndexEntry {
1176 doc_id: DocId,
1177}
1178
1179#[derive(Debug, Clone, Serialize, Deserialize)]
1180struct OrdValue(Value);
1181
1182impl PartialEq for OrdValue {
1183 fn eq(&self, other: &Self) -> bool {
1184 self.0 == other.0
1185 }
1186}
1187
1188impl Eq for OrdValue {}
1189
1190impl PartialOrd for OrdValue {
1191 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1192 Some(self.cmp(other))
1193 }
1194}
1195
1196impl Ord for OrdValue {
1197 fn cmp(&self, other: &Self) -> Ordering {
1198 format!("{:?}", self.0).cmp(&format!("{:?}", other.0))
1200 }
1201}
1202
1203#[derive(Debug, Serialize, Clone, Deserialize)]
1205struct IndexData {
1206 entries: BTreeMap<OrdValue, Vec<IndexEntry>>,
1207 unique: bool,
1208 sparse: bool,
1209}
1210
1211impl IndexData {
1212 fn new(unique: bool, sparse: bool) -> Self {
1213 Self {
1214 entries: BTreeMap::new(),
1215 unique,
1216 sparse,
1217 }
1218 }
1219
1220 fn insert(&mut self, value: Value, doc_id: DocId) -> Result<()> {
1221 let ord_value = OrdValue(value.clone());
1222
1223 if self.unique && self.entries.contains_key(&ord_value) {
1224 return Err(DbError::IndexError(format!(
1225 "Duplicate key error: {:?}",
1226 value
1227 )));
1228 }
1229
1230 self.entries
1231 .entry(ord_value)
1232 .or_insert_with(Vec::new)
1233 .push(IndexEntry { doc_id });
1234 Ok(())
1235 }
1236
1237 fn remove(&mut self, value: &Value, doc_id: &DocId) -> Result<()> {
1238 let ord_value = OrdValue(value.clone());
1239
1240 if let Some(entries) = self.entries.get_mut(&ord_value) {
1241 entries.retain(|entry| entry.doc_id != *doc_id);
1242 if entries.is_empty() {
1243 self.entries.remove(&ord_value);
1244 }
1245 }
1246 Ok(())
1247 }
1248
1249 fn find(&self, value: &Value) -> Vec<&IndexEntry> {
1250 let ord_value = OrdValue(value.clone());
1251 self.entries
1252 .get(&ord_value)
1253 .map_or(Vec::new(), |entries| entries.iter().collect())
1254 }
1255
1256 fn find_range(&self, min: &Value, max: &Value) -> Vec<&IndexEntry> {
1257 let mut result = Vec::new();
1258 let ord_min = OrdValue(min.clone());
1259 let ord_max = OrdValue(max.clone());
1260
1261 for (key, entries) in self.entries.range((Included(&ord_min), Included(&ord_max))) {
1262 result.extend(entries.iter());
1263 }
1264 result
1265 }
1266}
1267
1268#[derive(Debug, Clone, Serialize, Deserialize)]
1270pub struct FindOptions {
1271 pub projection: Option<Map<String, Value>>,
1272 pub sort: Option<Vec<(String, SortOrder)>>,
1273 pub skip: Option<usize>,
1274 pub limit: Option<usize>,
1275 pub max_time_ms: Option<u64>,
1276 pub allow_partial_results: bool,
1277}
1278
1279#[derive(Debug, Clone, Serialize, Deserialize)]
1280pub enum SortOrder {
1281 Ascending,
1282 Descending,
1283}
1284
1285impl Default for FindOptions {
1286 fn default() -> Self {
1287 Self {
1288 projection: None,
1289 sort: None,
1290 skip: None,
1291 limit: None,
1292 max_time_ms: None,
1293 allow_partial_results: false,
1294 }
1295 }
1296}
1297
1298#[derive(Debug, Clone, Serialize, Deserialize)]
1300pub struct Collection {
1320 name: String,
1322 docs: HashMap<DocId, Document>,
1324 indexes: HashMap<String, IndexData>,
1326 index_definitions: HashMap<String, Index>,
1328}
1329
1330#[derive(Debug, Serialize, Deserialize)]
1332pub struct PersistentCollection {
1333 name: String,
1334 docs: HashMap<DocId, Document>,
1335 indexes: HashMap<String, IndexData>,
1336 index_definitions: HashMap<String, Index>,
1337}
1338
1339impl From<Collection> for PersistentCollection {
1340 fn from(collection: Collection) -> Self {
1341 Self {
1342 name: collection.name,
1343 docs: collection.docs,
1344 indexes: collection.indexes,
1345 index_definitions: collection.index_definitions,
1346 }
1347 }
1348}
1349
1350impl From<PersistentCollection> for Collection {
1351 fn from(persistent: PersistentCollection) -> Self {
1352 Self {
1353 name: persistent.name,
1354 docs: persistent.docs,
1355 indexes: persistent.indexes,
1356 index_definitions: persistent.index_definitions,
1357 }
1358 }
1359}
1360
1361#[derive(Debug, Serialize, Deserialize)]
1362pub struct PersistentDatabase {
1363 name: String,
1364 collections: HashMap<String, PersistentCollection>,
1365}
1366
1367#[derive(Debug, Serialize, Deserialize)]
1368pub struct PersistentStorage {
1369 databases: HashMap<String, PersistentDatabase>,
1370}
1371
1372impl PersistentStorage {
1373 pub fn new() -> Self {
1374 Self {
1375 databases: HashMap::new(),
1376 }
1377 }
1378
1379 pub fn save_to_file(
1380 &self,
1381 path: &Path,
1382 encryption_key: Option<&crate::encryption::EncryptionKey>,
1383 ) -> Result<()> {
1384 let data = serde_json::to_string_pretty(self)?;
1385
1386 let final_data = if let Some(key) = encryption_key {
1387 crate::encryption::AesEncryption::encrypt_string(&data, key)?
1388 } else {
1389 data
1390 };
1391
1392 std::fs::write(path, final_data)?;
1393 Ok(())
1394 }
1395
1396 pub fn load_from_file(
1397 path: &Path,
1398 encryption_key: Option<&crate::encryption::EncryptionKey>,
1399 ) -> Result<Self> {
1400 if !path.exists() {
1401 return Ok(Self::new());
1402 }
1403
1404 let mut file = File::open(path)?;
1405 let mut contents = String::new();
1406 file.read_to_string(&mut contents)?;
1407
1408 let final_data = if let Some(key) = encryption_key {
1409 match crate::encryption::AesEncryption::decrypt_string(&contents, key) {
1411 Ok(decrypted) => decrypted,
1412 Err(_) => {
1413 contents
1415 }
1416 }
1417 } else {
1418 contents
1419 };
1420
1421 let storage = serde_json::from_str(&final_data)?;
1422 Ok(storage)
1423 }
1424}
1425
1426impl Collection {
1428 pub fn new(name: String) -> Self {
1434 Self {
1435 name,
1436 docs: HashMap::new(),
1437 indexes: HashMap::new(),
1438 index_definitions: HashMap::new(),
1439 }
1440 }
1441
1442 pub fn insert(&mut self, mut doc: Document) -> Result<DocId> {
1456 let id = DocId::new();
1457
1458 if let Some(obj) = doc.as_object_mut() {
1460 obj.insert("_id".to_string(), Value::String(id.to_string()));
1461 } else {
1462 return Err(DbError::Other("Document must be an object".into()));
1463 }
1464
1465 self.docs.insert(id.clone(), doc.clone());
1466
1467 self.update_indexes_on_insert(&id, &doc)?;
1469
1470 Ok(id)
1471 }
1472
1473 pub fn insert_many(&mut self, docs: Vec<Document>) -> Result<Vec<DocId>> {
1487 let mut ids = Vec::new();
1488
1489 for mut doc in docs {
1490 let id = DocId::new();
1491
1492 if let Some(obj) = doc.as_object_mut() {
1494 obj.insert("_id".to_string(), Value::String(id.to_string()));
1495 } else {
1496 return Err(DbError::Other("Document must be an object".into()));
1497 }
1498
1499 self.docs.insert(id.clone(), doc.clone());
1500 ids.push(id.clone());
1501
1502 self.update_indexes_on_insert(&id, &doc)?;
1504 }
1505
1506 Ok(ids)
1507 }
1508
1509 pub fn find(
1520 &self,
1521 query: Query,
1522 options: Option<FindOptions>,
1523 ) -> Result<Vec<(DocId, Document)>> {
1524 let mut results = Vec::new();
1525
1526 let indexed_results = self.try_indexed_query(&query)?;
1528
1529 if !indexed_results.is_empty() {
1530 for (id, doc) in indexed_results {
1532 if query.matches(&doc) {
1533 results.push((id, doc));
1534 }
1535 }
1536 } else {
1537 for (id, doc) in self.docs.iter() {
1539 if query.matches(doc) {
1540 results.push((id.clone(), doc.clone()));
1541 }
1542 }
1543 }
1544
1545 if let Some(opts) = options {
1547 if let Some(sort_spec) = opts.sort {
1549 self.sort_results(&mut results, &sort_spec);
1550 }
1551
1552 if let Some(skip) = opts.skip {
1554 if skip < results.len() {
1555 results.drain(0..skip);
1556 } else {
1557 results.clear();
1558 }
1559 }
1560
1561 if let Some(limit) = opts.limit {
1563 results.truncate(limit);
1564 }
1565
1566 if let Some(proj) = opts.projection {
1568 self.apply_projection(&mut results, &proj);
1569 }
1570 }
1571
1572 Ok(results)
1573 }
1574
1575 pub fn find_one(
1590 &self,
1591 query: Query,
1592 options: Option<FindOptions>,
1593 ) -> Result<(DocId, Document)> {
1594 let mut opts = options.unwrap_or_default();
1595 opts.limit = Some(1);
1596
1597 let mut results = self.find(query, Some(opts))?;
1598 results.pop().ok_or(DbError::NotFound)
1599 }
1600
1601 pub fn find_one_and_update(
1613 &mut self,
1614 query: Query,
1615 update: UpdateDocument,
1616 options: Option<FindOneAndUpdateOptions>,
1617 ) -> Result<Option<(DocId, Document)>> {
1618 let opts = options.unwrap_or_default();
1619
1620 let result = self.find_one(query.clone(), None);
1622
1623 match result {
1624 Ok((id, mut doc)) => {
1625 let original_doc = doc.clone();
1626
1627 self.apply_update(&mut doc, &update, opts.upsert)?;
1629
1630 self.docs.insert(id.clone(), doc.clone());
1632
1633 self.update_indexes_on_update(&id, &original_doc, &doc)?;
1635
1636 if opts.return_document == ReturnDocument::After {
1637 Ok(Some((id, doc)))
1638 } else {
1639 Ok(Some((id, original_doc)))
1640 }
1641 }
1642 Err(DbError::NotFound) if opts.upsert => {
1643 let mut new_doc = Map::new();
1645
1646 for cond in &query.conditions {
1648 if let QueryOperator::Eq(value) = &cond.operator {
1649 new_doc.insert(cond.field.clone(), value.clone());
1650 }
1651 }
1652
1653 let mut doc = Value::Object(new_doc);
1655 self.apply_update(&mut doc, &update, true)?;
1656
1657 let id = self.insert(doc)?;
1659
1660 if opts.return_document == ReturnDocument::After {
1662 self.find_one(Query::new().eq("_id", Value::String(id.to_string())), None)
1663 .map(Some)
1664 } else {
1665 Ok(None)
1666 }
1667 }
1668 Err(_) => Ok(None),
1669 }
1670 }
1671
1672 pub fn find_one_and_replace(
1674 &mut self,
1675 query: Query,
1676 replacement: Document,
1677 options: Option<FindOneAndReplaceOptions>,
1678 ) -> Result<Option<(DocId, Document)>> {
1679 let opts = options.unwrap_or_default();
1680
1681 let result = self.find_one(query.clone(), None);
1683
1684 match result {
1685 Ok((id, original_doc)) => {
1686 let mut replacement = replacement;
1688 if let Some(obj) = replacement.as_object_mut() {
1689 obj.insert("_id".to_string(), Value::String(id.to_string()));
1690 } else {
1691 return Err(DbError::Other("Replacement must be an object".into()));
1692 }
1693
1694 self.docs.insert(id.clone(), replacement.clone());
1696
1697 self.update_indexes_on_update(&id, &original_doc, &replacement)?;
1699
1700 if opts.return_document == ReturnDocument::After {
1701 Ok(Some((id, replacement)))
1702 } else {
1703 Ok(Some((id, original_doc)))
1704 }
1705 }
1706 Err(DbError::NotFound) if opts.upsert => {
1707 let id = self.insert(replacement)?;
1709
1710 if opts.return_document == ReturnDocument::After {
1712 self.find_one(Query::new().eq("_id", Value::String(id.to_string())), None)
1713 .map(Some)
1714 } else {
1715 Ok(None)
1716 }
1717 }
1718 Err(_) => Ok(None),
1719 }
1720 }
1721
1722 pub fn find_one_and_delete(
1724 &mut self,
1725 query: Query,
1726 options: Option<FindOneAndDeleteOptions>,
1727 ) -> Result<Option<(DocId, Document)>> {
1728 let opts = options.unwrap_or_default();
1729
1730 let result = self.find_one(query.clone(), None);
1732
1733 match result {
1734 Ok((id, doc)) => {
1735 self.docs.remove(&id);
1737
1738 self.update_indexes_on_delete(&id, &doc)?;
1740
1741 Ok(Some((id, doc)))
1742 }
1743 Err(_) => Ok(None),
1744 }
1745 }
1746
1747 pub fn update_one(
1759 &mut self,
1760 query: Query,
1761 update: UpdateDocument,
1762 upsert: bool,
1763 ) -> Result<usize> {
1764 let result = self.find_one(query.clone(), None);
1765
1766 match result {
1767 Ok((id, mut doc)) => {
1768 let original_doc = doc.clone();
1769
1770 self.apply_update(&mut doc, &update, upsert)?;
1772
1773 self.docs.insert(id.clone(), doc.clone());
1775
1776 self.update_indexes_on_update(&id, &original_doc, &doc)?;
1778
1779 Ok(1)
1780 }
1781 Err(DbError::NotFound) if upsert => {
1782 let mut new_doc = Map::new();
1784
1785 for cond in &query.conditions {
1787 if let QueryOperator::Eq(value) = &cond.operator {
1788 new_doc.insert(cond.field.clone(), value.clone());
1789 }
1790 }
1791
1792 let mut doc = Value::Object(new_doc);
1794 self.apply_update(&mut doc, &update, true)?;
1795
1796 self.insert(doc)?;
1798
1799 Ok(1)
1800 }
1801 Err(_) => Ok(0),
1802 }
1803 }
1804
1805 pub fn update_many(&mut self, query: Query, update: UpdateDocument) -> Result<usize> {
1816 let docs = self.find(query, None)?;
1817 let mut count = 0;
1818
1819 for (id, doc) in docs {
1820 let mut doc = doc;
1821 let original_doc = doc.clone();
1822
1823 self.apply_update(&mut doc, &update, false)?;
1825
1826 self.docs.insert(id.clone(), doc.clone());
1828
1829 self.update_indexes_on_update(&id, &original_doc, &doc)?;
1831
1832 count += 1;
1833 }
1834
1835 Ok(count)
1836 }
1837
1838 pub fn replace_one(
1840 &mut self,
1841 query: Query,
1842 replacement: Document,
1843 upsert: bool,
1844 ) -> Result<usize> {
1845 let result = self.find_one(query.clone(), None);
1846
1847 match result {
1848 Ok((id, original_doc)) => {
1849 let mut replacement = replacement;
1851 if let Some(obj) = replacement.as_object_mut() {
1852 obj.insert("_id".to_string(), Value::String(id.to_string()));
1853 } else {
1854 return Err(DbError::Other("Replacement must be an object".into()));
1855 }
1856
1857 self.docs.insert(id.clone(), replacement.clone());
1859
1860 self.update_indexes_on_update(&id, &original_doc, &replacement)?;
1862
1863 Ok(1)
1864 }
1865 Err(DbError::NotFound) if upsert => {
1866 self.insert(replacement)?;
1868 Ok(1)
1869 }
1870 Err(_) => Ok(0),
1871 }
1872 }
1873
1874 pub fn delete_one(&mut self, query: Query) -> Result<usize> {
1884 let result = self.find_one(query, None);
1885
1886 match result {
1887 Ok((id, doc)) => {
1888 self.docs.remove(&id);
1890
1891 self.update_indexes_on_delete(&id, &doc)?;
1893
1894 Ok(1)
1895 }
1896 Err(_) => Ok(0),
1897 }
1898 }
1899
1900 pub fn delete_many(&mut self, query: Query) -> Result<usize> {
1910 let docs = self.find(query, None)?;
1911 let mut count = 0;
1912
1913 for (id, doc) in docs {
1914 self.docs.remove(&id);
1916
1917 self.update_indexes_on_delete(&id, &doc)?;
1919
1920 count += 1;
1921 }
1922
1923 Ok(count)
1924 }
1925
1926 pub fn count_documents(&self, query: Query) -> Result<usize> {
1936 let indexed_results = self.try_indexed_query(&query)?;
1938
1939 if !indexed_results.is_empty() {
1940 let mut count = 0;
1942 for (_, doc) in indexed_results {
1943 if query.matches(&doc) {
1944 count += 1;
1945 }
1946 }
1947 Ok(count)
1948 } else {
1949 let mut count = 0;
1951 for doc in self.docs.values() {
1952 if query.matches(doc) {
1953 count += 1;
1954 }
1955 }
1956 Ok(count)
1957 }
1958 }
1959
1960 pub fn estimated_document_count(&self) -> Result<usize> {
1966 Ok(self.docs.len())
1967 }
1968
1969 pub fn create_index(&mut self, index: Index) -> Result<()> {
1980 let index_name = index.name.clone();
1981
1982 if self.index_definitions.contains_key(&index_name) {
1984 return Err(DbError::IndexError(format!(
1985 "Index {} already exists",
1986 index_name
1987 )));
1988 }
1989
1990 let index_data = IndexData::new(index.unique, index.sparse);
1992
1993 self.indexes.insert(index_name.clone(), index_data);
1995
1996 self.index_definitions
1998 .insert(index_name.clone(), index.clone());
1999
2000 let docs_to_index: Vec<(DocId, Document)> = self
2002 .docs
2003 .iter()
2004 .map(|(id, doc)| (id.clone(), doc.clone()))
2005 .collect();
2006
2007 for (id, doc) in docs_to_index {
2009 self.add_to_indexes(&index_name, &id, &doc)?;
2010 }
2011
2012 Ok(())
2013 }
2014
2015 pub fn drop_index(&mut self, name: &str) -> Result<()> {
2021 self.indexes.remove(name);
2022 self.index_definitions.remove(name);
2023 Ok(())
2024 }
2025
2026 pub fn list_indexes(&self) -> Result<Vec<Index>> {
2032 Ok(self.index_definitions.values().cloned().collect())
2033 }
2034
2035 pub fn drop_indexes(&mut self) -> Result<()> {
2037 self.indexes.clear();
2038 self.index_definitions.clear();
2039 Ok(())
2040 }
2041
2042 pub fn stats(&self) -> Result<CollectionStats> {
2044 Ok(CollectionStats {
2045 count: self.docs.len(),
2046 size: self.calculate_size(),
2047 avg_obj_size: if self.docs.is_empty() {
2048 0
2049 } else {
2050 self.calculate_size() / self.docs.len()
2051 },
2052 index_count: self.index_definitions.len(),
2053 index_size: self.calculate_index_size(),
2054 })
2055 }
2056
2057 pub fn aggregate(&self, pipeline: Vec<AggregationStage>) -> Result<Vec<Document>> {
2059 let mut results = Vec::new();
2060
2061 let mut docs: Vec<Document> = self.docs.values().cloned().collect();
2063
2064 for stage in pipeline {
2066 docs = self.apply_aggregation_stage(docs, stage)?;
2067 }
2068
2069 results.extend(docs);
2070 Ok(results)
2071 }
2072
2073 pub fn distinct(&self, field: &str, query: Option<Query>) -> Result<Vec<Value>> {
2075 let mut values = HashSet::new();
2076
2077 for doc in self.docs.values() {
2078 if let Some(q) = &query {
2079 if !q.matches(doc) {
2080 continue;
2081 }
2082 }
2083
2084 let field_parts: Vec<&str> = field.split('.').collect();
2085 if let Some(value) = self.get_nested_value(doc, &field_parts) {
2086 values.insert(value.clone());
2087 }
2088 }
2089
2090 Ok(values.into_iter().collect())
2091 }
2092
2093 pub fn count(&self) -> usize {
2095 self.docs.len()
2096 }
2097
2098 fn apply_update(
2101 &self,
2102 doc: &mut Document,
2103 update: &UpdateDocument,
2104 is_upsert: bool,
2105 ) -> Result<()> {
2106 for op in &update.operators {
2107 match op {
2108 UpdateOperator::Set(fields) => {
2109 if let Some(obj) = doc.as_object_mut() {
2110 for (key, value) in fields {
2111 obj.insert(key.clone(), value.clone());
2112 }
2113 }
2114 }
2115 UpdateOperator::Unset(fields) => {
2116 if let Some(obj) = doc.as_object_mut() {
2117 for key in fields {
2118 obj.remove(key);
2119 }
2120 }
2121 }
2122 UpdateOperator::Inc(fields) => {
2123 if let Some(obj) = doc.as_object_mut() {
2124 for (key, value) in fields {
2125 if let Some(Value::Number(n)) = obj.get(key) {
2126 if let (Some(current), Some(inc)) = (n.as_f64(), value.as_f64()) {
2127 obj.insert(
2128 key.clone(),
2129 Value::Number(
2130 serde_json::Number::from_f64(current + inc).unwrap(),
2131 ),
2132 );
2133 }
2134 } else {
2135 if let Some(inc) = value.as_f64() {
2137 obj.insert(
2138 key.clone(),
2139 Value::Number(serde_json::Number::from_f64(inc).unwrap()),
2140 );
2141 }
2142 }
2143 }
2144 }
2145 }
2146 UpdateOperator::Mul(fields) => {
2147 if let Some(obj) = doc.as_object_mut() {
2148 for (key, value) in fields {
2149 if let Some(Value::Number(n)) = obj.get(key) {
2150 if let (Some(current), Some(mul)) = (n.as_f64(), value.as_f64()) {
2151 obj.insert(
2152 key.clone(),
2153 Value::Number(
2154 serde_json::Number::from_f64(current * mul).unwrap(),
2155 ),
2156 );
2157 }
2158 }
2159 }
2160 }
2161 }
2162 UpdateOperator::Rename(fields) => {
2163 if let Some(obj) = doc.as_object_mut() {
2164 for (old_key, new_key) in fields {
2165 if let Some(value) = obj.remove(old_key) {
2166 obj.insert(new_key.clone(), value);
2167 }
2168 }
2169 }
2170 }
2171 UpdateOperator::SetOnInsert(fields) => {
2172 if is_upsert {
2173 if let Some(obj) = doc.as_object_mut() {
2174 for (key, value) in fields {
2175 obj.insert(key.clone(), value.clone());
2176 }
2177 }
2178 }
2179 }
2180 UpdateOperator::Min(fields) => {
2181 if let Some(obj) = doc.as_object_mut() {
2182 for (key, value) in fields {
2183 if let Some(current) = obj.get(key) {
2184 if self.compare_values(value, current) == Ordering::Less {
2186 obj.insert(key.clone(), value.clone());
2187 }
2188 } else {
2189 obj.insert(key.clone(), value.clone());
2190 }
2191 }
2192 }
2193 }
2194 UpdateOperator::Max(fields) => {
2195 if let Some(obj) = doc.as_object_mut() {
2196 for (key, value) in fields {
2197 if let Some(current) = obj.get(key) {
2198 if self.compare_values(value, current) == Ordering::Greater {
2200 obj.insert(key.clone(), value.clone());
2201 }
2202 } else {
2203 obj.insert(key.clone(), value.clone());
2204 }
2205 }
2206 }
2207 }
2208 UpdateOperator::CurrentDate(fields) => {
2209 if let Some(obj) = doc.as_object_mut() {
2210 for (key, type_spec) in fields {
2211 let now = chrono::Utc::now();
2212
2213 match type_spec {
2214 Value::String(type_str) if type_str == "date" => {
2215 obj.insert(key.clone(), Value::String(now.to_rfc3339()));
2216 }
2217 Value::String(type_str) if type_str == "timestamp" => {
2218 obj.insert(
2219 key.clone(),
2220 Value::Number(serde_json::Number::from(now.timestamp())),
2221 );
2222 }
2223 Value::Object(spec) => {
2224 if let Some(Value::String(type_str)) = spec.get("$type") {
2225 match type_str.as_str() {
2226 "date" => {
2227 obj.insert(
2228 key.clone(),
2229 Value::String(now.to_rfc3339()),
2230 );
2231 }
2232 "timestamp" => {
2233 obj.insert(
2234 key.clone(),
2235 Value::Number(serde_json::Number::from(
2236 now.timestamp(),
2237 )),
2238 );
2239 }
2240 _ => {
2241 return Err(DbError::UpdateError(format!(
2242 "Invalid date type specification: {}",
2243 type_str
2244 )));
2245 }
2246 }
2247 }
2248 }
2249 _ => {
2250 return Err(DbError::UpdateError(
2251 "Invalid date type specification".into(),
2252 ));
2253 }
2254 }
2255 }
2256 }
2257 }
2258 UpdateOperator::Push(fields) => {
2259 if let Some(obj) = doc.as_object_mut() {
2260 for (key, value) in fields {
2261 if let Some(Value::Array(arr)) = obj.get_mut(key) {
2262 arr.push(value.clone());
2263 } else {
2264 obj.insert(key.clone(), Value::Array(vec![value.clone()]));
2266 }
2267 }
2268 }
2269 }
2270 UpdateOperator::PushAll(fields) => {
2271 if let Some(obj) = doc.as_object_mut() {
2272 for (key, values) in fields {
2273 if let Some(Value::Array(arr)) = obj.get_mut(key) {
2274 arr.extend(values.iter().cloned());
2275 } else {
2276 obj.insert(key.clone(), Value::Array(values.clone()));
2278 }
2279 }
2280 }
2281 }
2282 UpdateOperator::AddToSet(fields) => {
2283 if let Some(obj) = doc.as_object_mut() {
2284 for (key, value) in fields {
2285 if let Some(Value::Array(arr)) = obj.get_mut(key) {
2286 if !arr.contains(value) {
2287 arr.push(value.clone());
2288 }
2289 } else {
2290 obj.insert(key.clone(), Value::Array(vec![value.clone()]));
2292 }
2293 }
2294 }
2295 }
2296 UpdateOperator::Pop(fields) => {
2297 if let Some(obj) = doc.as_object_mut() {
2298 for (key, pos) in fields {
2299 if let Some(Value::Array(arr)) = obj.get_mut(key) {
2300 if *pos == 1 {
2301 arr.pop();
2302 } else if *pos == -1 {
2303 arr.remove(0);
2304 }
2305 }
2306 }
2307 }
2308 }
2309 UpdateOperator::Pull(fields) => {
2310 if let Some(obj) = doc.as_object_mut() {
2311 for (key, condition) in fields {
2312 if let Some(Value::Array(arr)) = obj.get_mut(key) {
2313 arr.retain(|elem| {
2315 match condition {
2316 Value::Object(cond) => {
2317 let query = Query::new();
2319 !query.matches(elem)
2321 }
2322 _ => elem != condition,
2323 }
2324 });
2325 }
2326 }
2327 }
2328 }
2329 UpdateOperator::PullAll(fields) => {
2330 if let Some(obj) = doc.as_object_mut() {
2331 for (key, values) in fields {
2332 if let Some(Value::Array(arr)) = obj.get_mut(key) {
2333 arr.retain(|elem| !values.contains(elem));
2334 }
2335 }
2336 }
2337 }
2338 UpdateOperator::Bit(fields) => {
2339 if let Some(obj) = doc.as_object_mut() {
2340 for (key, operation) in fields {
2341 if let Value::Object(op) = operation {
2342 if let Some(Value::Number(current)) = obj.get(key) {
2343 let mut current_num = current.as_i64().unwrap_or(0);
2344
2345 if let Some(Value::Number(and)) = op.get("and") {
2346 current_num &= and.as_i64().unwrap_or(0);
2347 }
2348 if let Some(Value::Number(or)) = op.get("or") {
2349 current_num |= or.as_i64().unwrap_or(0);
2350 }
2351 if let Some(Value::Number(xor)) = op.get("xor") {
2352 current_num ^= xor.as_i64().unwrap_or(0);
2353 }
2354
2355 obj.insert(
2356 key.clone(),
2357 Value::Number(serde_json::Number::from(current_num)),
2358 );
2359 }
2360 }
2361 }
2362 }
2363 }
2364 }
2365 }
2366
2367 Ok(())
2368 }
2369
2370 fn compare_values(&self, a: &Value, b: &Value) -> Ordering {
2371 match (a, b) {
2372 (Value::Number(a_num), Value::Number(b_num)) => a_num
2373 .as_f64()
2374 .unwrap_or(0.0)
2375 .partial_cmp(&b_num.as_f64().unwrap_or(0.0))
2376 .unwrap_or(Ordering::Equal),
2377 (Value::String(a_str), Value::String(b_str)) => a_str.cmp(b_str),
2378 (Value::Bool(a_bool), Value::Bool(b_bool)) => a_bool.cmp(b_bool),
2379 (Value::Array(a_arr), Value::Array(b_arr)) => a_arr.len().cmp(&b_arr.len()),
2380 (Value::Object(_), Value::Object(_)) => {
2381 Ordering::Equal }
2383 _ => Ordering::Equal,
2384 }
2385 }
2386
2387 fn update_indexes_on_insert(&mut self, id: &DocId, doc: &Document) -> Result<()> {
2388 for (name, index_def) in self.index_definitions.iter() {
2389 let mut field_values = Vec::new();
2391 for (field, _) in &index_def.key {
2392 let field_parts: Vec<&str> = field.split('.').collect();
2393 let value = self.get_nested_value(doc, &field_parts);
2394 field_values.push((field, value));
2395 }
2396
2397 if let Some(index_data) = self.indexes.get_mut(name) {
2398 for (field, value) in field_values {
2399 if let Some(value) = value {
2400 index_data.insert(value.clone(), id.clone())?;
2401 } else if index_data.sparse {
2402 continue;
2404 } else {
2405 return Err(DbError::IndexError(format!(
2406 "Field {} not found in document for non-sparse index",
2407 field
2408 )));
2409 }
2410 }
2411 }
2412 }
2413
2414 Ok(())
2415 }
2416
2417 fn update_indexes_on_update(
2418 &mut self,
2419 id: &DocId,
2420 old_doc: &Document,
2421 new_doc: &Document,
2422 ) -> Result<()> {
2423 self.update_indexes_on_delete(id, old_doc)?;
2425
2426 self.update_indexes_on_insert(id, new_doc)?;
2428
2429 Ok(())
2430 }
2431 fn update_indexes_on_delete(&mut self, id: &DocId, doc: &Document) -> Result<()> {
2433 let index_names: Vec<String> = self.index_definitions.keys().cloned().collect();
2434
2435 for name in index_names {
2436 if let Some(index_def) = self.index_definitions.get(&name) {
2437 let mut field_values = Vec::new();
2439 for (field, _) in &index_def.key {
2440 let field_parts: Vec<&str> = field.split('.').collect();
2441 let value = self.get_nested_value(doc, &field_parts);
2442 field_values.push(value.cloned());
2443 }
2444
2445 if let Some(index_data) = self.indexes.get_mut(&name) {
2447 for (i, (_, _)) in index_def.key.iter().enumerate() {
2448 if let Some(val) = &field_values[i] {
2449 index_data.remove(val, id)?;
2450 }
2451 }
2452 }
2453 }
2454 }
2455
2456 Ok(())
2457 }
2458
2459 fn add_to_indexes(&mut self, index_name: &str, id: &DocId, doc: &Document) -> Result<()> {
2461 if let Some(index_def) = self.index_definitions.get(index_name).cloned() {
2462 let mut field_values = Vec::new();
2464 let field_names: Vec<String> = index_def
2465 .key
2466 .iter()
2467 .map(|(field, _)| field.clone())
2468 .collect();
2469
2470 for field in &field_names {
2471 let field_parts: Vec<&str> = field.split('.').collect();
2472 let value = self.get_nested_value(doc, &field_parts);
2473 field_values.push(value.cloned());
2474 }
2475
2476 if let Some(index_data) = self.indexes.get_mut(index_name) {
2478 for (i, field) in field_names.iter().enumerate() {
2479 if let Some(val) = &field_values[i] {
2480 index_data.insert(val.clone(), id.clone())?;
2481 } else if index_data.sparse {
2482 continue;
2484 } else {
2485 return Err(DbError::IndexError(format!(
2486 "Field {} not found in document for non-sparse index",
2487 field
2488 )));
2489 }
2490 }
2491 }
2492 }
2493
2494 Ok(())
2495 }
2496
2497 fn try_indexed_query(&self, query: &Query) -> Result<Vec<(DocId, Document)>> {
2498 for (index_name, index_def) in self.index_definitions.iter() {
2500 if let Some((field, _)) = index_def.key.first() {
2502 for cond in &query.conditions {
2503 if cond.field == *field {
2504 let mut results = Vec::new();
2506
2507 if let Some(index_data) = self.indexes.get(index_name) {
2508 match &cond.operator {
2509 QueryOperator::Eq(value) => {
2510 let entries = index_data.find(value);
2511 for entry in entries {
2512 if let Some(doc) = self.docs.get(&entry.doc_id) {
2513 results.push((entry.doc_id.clone(), doc.clone()));
2514 }
2515 }
2516 }
2517 QueryOperator::Gt(value) => {
2518 let ord_value = OrdValue(value.clone());
2520 for (key, entries) in
2521 index_data.entries.range((Excluded(&ord_value), Unbounded))
2522 {
2523 for entry in entries {
2524 if let Some(doc) = self.docs.get(&entry.doc_id) {
2525 results.push((entry.doc_id.clone(), doc.clone()));
2526 }
2527 }
2528 }
2529 }
2530 QueryOperator::Gte(value) => {
2531 let ord_value = OrdValue(value.clone());
2533 for (key, entries) in
2534 index_data.entries.range((Included(&ord_value), Unbounded))
2535 {
2536 for entry in entries {
2537 if let Some(doc) = self.docs.get(&entry.doc_id) {
2538 results.push((entry.doc_id.clone(), doc.clone()));
2539 }
2540 }
2541 }
2542 }
2543 QueryOperator::Lt(value) => {
2544 let ord_value = OrdValue(value.clone());
2546 for (key, entries) in
2547 index_data.entries.range((Unbounded, Excluded(&ord_value)))
2548 {
2549 for entry in entries {
2550 if let Some(doc) = self.docs.get(&entry.doc_id) {
2551 results.push((entry.doc_id.clone(), doc.clone()));
2552 }
2553 }
2554 }
2555 }
2556 QueryOperator::Lte(value) => {
2557 let ord_value = OrdValue(value.clone());
2559 for (key, entries) in
2560 index_data.entries.range((Unbounded, Included(&ord_value)))
2561 {
2562 for entry in entries {
2563 if let Some(doc) = self.docs.get(&entry.doc_id) {
2564 results.push((entry.doc_id.clone(), doc.clone()));
2565 }
2566 }
2567 }
2568 }
2569 _ => {
2570 continue;
2572 }
2573 }
2574 }
2575
2576 return Ok(results);
2577 }
2578 }
2579 }
2580 }
2581
2582 Ok(Vec::new())
2584 }
2585
2586 fn sort_results(
2587 &self,
2588 results: &mut Vec<(DocId, Document)>,
2589 sort_spec: &[(String, SortOrder)],
2590 ) {
2591 results.sort_by(|a, b| {
2592 for (field, order) in sort_spec {
2593 let field_parts: Vec<&str> = field.split('.').collect();
2594 let a_val = self.get_nested_value(&a.1, &field_parts);
2595 let b_val = self.get_nested_value(&b.1, &field_parts);
2596
2597 let cmp = match (a_val, b_val) {
2598 (Some(Value::Number(a_num)), Some(Value::Number(b_num))) => a_num
2599 .as_f64()
2600 .unwrap_or(0.0)
2601 .partial_cmp(&b_num.as_f64().unwrap_or(0.0))
2602 .unwrap_or(Ordering::Equal),
2603 (Some(Value::String(a_str)), Some(Value::String(b_str))) => a_str.cmp(b_str),
2604 (Some(Value::Bool(a_bool)), Some(Value::Bool(b_bool))) => a_bool.cmp(b_bool),
2605 (Some(Value::Array(a_arr)), Some(Value::Array(b_arr))) => {
2606 a_arr.len().cmp(&b_arr.len())
2607 }
2608 (Some(_), None) => Ordering::Greater,
2609 (None, Some(_)) => Ordering::Less,
2610 (None, None) => Ordering::Equal,
2611 _ => Ordering::Equal,
2612 };
2613
2614 if cmp != Ordering::Equal {
2615 return match order {
2616 SortOrder::Ascending => cmp,
2617 SortOrder::Descending => cmp.reverse(),
2618 };
2619 }
2620 }
2621
2622 Ordering::Equal
2623 });
2624 }
2625
2626 fn apply_projection(
2627 &self,
2628 results: &mut Vec<(DocId, Document)>,
2629 projection: &Map<String, Value>,
2630 ) {
2631 let is_inclusive = projection.values().any(|v| match v {
2632 Value::Bool(b) => *b,
2633 Value::Number(n) => n.as_i64().unwrap_or(0) == 1,
2634 _ => false,
2635 });
2636
2637 for (_, doc) in results.iter_mut() {
2638 if let Some(obj) = doc.as_object_mut() {
2639 let mut new_obj = Map::new();
2640
2641 if is_inclusive {
2642 for (key, value) in projection {
2644 if obj.contains_key(key) {
2645 new_obj.insert(key.clone(), obj.get(key).unwrap().clone());
2646 }
2647 }
2648
2649 if obj.contains_key("_id") && !projection.contains_key("_id") {
2651 new_obj.insert("_id".to_string(), obj.get("_id").unwrap().clone());
2652 }
2653 } else {
2654 for (key, value) in obj {
2656 if !projection.contains_key(key) || key == "_id" {
2657 new_obj.insert(key.clone(), value.clone());
2658 }
2659 }
2660 }
2661
2662 *doc = Value::Object(new_obj);
2663 }
2664 }
2665 }
2666
2667 fn get_nested_value<'a>(&self, doc: &'a Document, path: &[&str]) -> Option<&'a Value> {
2668 if path.is_empty() {
2669 return Some(doc);
2670 }
2671
2672 let current = path[0];
2673 let rest = &path[1..];
2674
2675 match doc.get(current) {
2676 Some(value) => {
2677 if rest.is_empty() {
2678 Some(value)
2679 } else {
2680 match value {
2681 Value::Object(obj) => self.get_nested_value(value, rest),
2682 _ => None,
2683 }
2684 }
2685 }
2686 None => None,
2687 }
2688 }
2689
2690 fn calculate_size(&self) -> usize {
2691 self.docs
2693 .values()
2694 .map(|doc| serde_json::to_string(doc).unwrap_or_default().len())
2695 .sum()
2696 }
2697
2698 fn calculate_index_size(&self) -> usize {
2699 self.indexes
2701 .values()
2702 .map(|index_data| {
2703 index_data.entries.len() * 16 })
2705 .sum()
2706 }
2707
2708 fn apply_aggregation_stage(
2709 &self,
2710 docs: Vec<Document>,
2711 stage: AggregationStage,
2712 ) -> Result<Vec<Document>> {
2713 match stage {
2714 AggregationStage::Match(query) => {
2715 let mut result = Vec::new();
2716 for doc in docs {
2717 if query.matches(&doc) {
2718 result.push(doc);
2719 }
2720 }
2721 Ok(result)
2722 }
2723 AggregationStage::Project(projection) => {
2724 let mut result = Vec::new();
2725 for doc in docs {
2726 if let Some(obj) = doc.as_object() {
2727 let mut new_obj = Map::new();
2728
2729 for (key, value) in &projection {
2730 if value.as_object().and_then(|o| o.get("$")).is_some() {
2731 if let Some(expr) = value.as_object().and_then(|o| o.get("$")) {
2733 if let Some(field_name) = expr.as_str() {
2735 if let Some(field_value) =
2736 obj.get(field_name.trim_start_matches('$'))
2737 {
2738 new_obj.insert(key.clone(), field_value.clone());
2739 }
2740 }
2741 }
2742 } else if value.as_object().and_then(|o| o.get("$concat")).is_some() {
2743 if let Some(Value::Array(fields)) =
2745 value.as_object().and_then(|o| o.get("$concat"))
2746 {
2747 let mut result_str = String::new();
2748 for field in fields {
2749 if let Some(field_name) = field.as_str() {
2750 if let Some(field_value) =
2751 obj.get(field_name.trim_start_matches('$'))
2752 {
2753 if let Some(s) = field_value.as_str() {
2754 result_str.push_str(s);
2755 }
2756 }
2757 }
2758 }
2759 new_obj.insert(key.clone(), Value::String(result_str));
2760 }
2761 } else {
2762 let include = match value {
2764 Value::Bool(b) => b,
2765 Value::Number(n) => &(n.as_i64().unwrap_or(0) == 1),
2766 _ => &false,
2767 };
2768
2769 if *include && obj.contains_key(key.as_str()) {
2770 new_obj.insert(
2771 key.clone(),
2772 obj.get(key.as_str()).unwrap().clone(),
2773 );
2774 } else if !include && !new_obj.contains_key(key.as_str()) {
2775 new_obj.insert(
2776 key.clone(),
2777 obj.get(key.as_str()).unwrap().clone(),
2778 );
2779 }
2780 }
2781 }
2782
2783 if obj.contains_key("_id") && !projection.contains_key("_id") {
2785 new_obj.insert("_id".to_string(), obj.get("_id").unwrap().clone());
2786 }
2787
2788 result.push(Value::Object(new_obj));
2789 }
2790 }
2791 Ok(result)
2792 }
2793 AggregationStage::Sort(sort_spec) => {
2794 let mut result = docs;
2795
2796 result.sort_by(|a, b| {
2797 for (field, order) in &sort_spec {
2798 let field_parts: Vec<&str> = field.split('.').collect();
2799 let a_val = self.get_nested_value(a, &field_parts);
2800 let b_val = self.get_nested_value(b, &field_parts);
2801
2802 let cmp = match (a_val, b_val) {
2803 (Some(Value::Number(a_num)), Some(Value::Number(b_num))) => a_num
2804 .as_f64()
2805 .unwrap_or(0.0)
2806 .partial_cmp(&b_num.as_f64().unwrap_or(0.0))
2807 .unwrap_or(Ordering::Equal),
2808 (Some(Value::String(a_str)), Some(Value::String(b_str))) => {
2809 a_str.cmp(b_str)
2810 }
2811 (Some(Value::Bool(a_bool)), Some(Value::Bool(b_bool))) => {
2812 a_bool.cmp(b_bool)
2813 }
2814 (Some(Value::Array(a_arr)), Some(Value::Array(b_arr))) => {
2815 a_arr.len().cmp(&b_arr.len())
2816 }
2817 (Some(_), None) => Ordering::Greater,
2818 (None, Some(_)) => Ordering::Less,
2819 (None, None) => Ordering::Equal,
2820 _ => Ordering::Equal,
2821 };
2822
2823 if cmp != Ordering::Equal {
2824 return match order {
2825 SortOrder::Ascending => cmp,
2826 SortOrder::Descending => cmp.reverse(),
2827 };
2828 }
2829 }
2830
2831 Ordering::Equal
2832 });
2833
2834 Ok(result)
2835 }
2836 AggregationStage::Skip(n) => {
2837 let mut result = docs;
2838 if n < result.len() {
2839 result.drain(0..n);
2840 } else {
2841 result.clear();
2842 }
2843 Ok(result)
2844 }
2845 AggregationStage::Limit(n) => {
2846 let mut result = docs;
2847 result.truncate(n);
2848 Ok(result)
2849 }
2850 AggregationStage::Group(group_spec) => {
2851 let mut groups: HashMap<String, Vec<Document>> = HashMap::new();
2852
2853 for doc in docs {
2855 let group_key = self.calculate_group_key(&doc, &group_spec.id)?;
2856 groups.entry(group_key).or_insert_with(Vec::new).push(doc);
2857 }
2858
2859 let mut result = Vec::new();
2861 for (key, group_docs) in groups {
2862 let mut group_obj = Map::new();
2863
2864 group_obj.insert("_id".to_string(), Value::String(key));
2866
2867 for (field, op) in &group_spec.operations {
2869 match op {
2870 GroupOperation::Sum(expr) => {
2871 let sum = self.calculate_sum(&group_docs, expr)?;
2872 group_obj.insert(field.clone(), sum);
2873 }
2874 GroupOperation::Avg(expr) => {
2875 let avg = self.calculate_avg(&group_docs, expr)?;
2876 group_obj.insert(field.clone(), avg);
2877 }
2878 GroupOperation::Min(expr) => {
2879 let min = self.calculate_min(&group_docs, expr)?;
2880 group_obj.insert(field.clone(), min);
2881 }
2882 GroupOperation::Max(expr) => {
2883 let max = self.calculate_max(&group_docs, expr)?;
2884 group_obj.insert(field.clone(), max);
2885 }
2886 GroupOperation::First(expr) => {
2887 if let Some(doc) = group_docs.first() {
2888 if let Some(value) = self.evaluate_expression(doc, expr) {
2889 group_obj.insert(field.clone(), value);
2890 }
2891 }
2892 }
2893 GroupOperation::Last(expr) => {
2894 if let Some(doc) = group_docs.last() {
2895 if let Some(value) = self.evaluate_expression(doc, expr) {
2896 group_obj.insert(field.clone(), value);
2897 }
2898 }
2899 }
2900 GroupOperation::Push(expr) => {
2901 let mut values = Vec::new();
2902 for doc in &group_docs {
2903 if let Some(value) = self.evaluate_expression(doc, expr) {
2904 values.push(value);
2905 }
2906 }
2907 group_obj.insert(field.clone(), Value::Array(values));
2908 }
2909 GroupOperation::AddToSet(expr) => {
2910 let mut values = HashSet::new();
2911 for doc in &group_docs {
2912 if let Some(value) = self.evaluate_expression(doc, expr) {
2913 values.insert(value);
2914 }
2915 }
2916 group_obj.insert(
2917 field.clone(),
2918 Value::Array(values.into_iter().collect()),
2919 );
2920 }
2921 GroupOperation::StdDevPop(expr) => {
2922 let std_dev = self.calculate_std_dev_pop(&group_docs, expr)?;
2923 group_obj.insert(field.clone(), std_dev);
2924 }
2925 GroupOperation::StdDevSamp(expr) => {
2926 let std_dev = self.calculate_std_dev_samp(&group_docs, expr)?;
2927 group_obj.insert(field.clone(), std_dev);
2928 }
2929 }
2930 }
2931
2932 result.push(Value::Object(group_obj));
2933 }
2934
2935 Ok(result)
2936 }
2937 AggregationStage::Unwind(field) => {
2938 let mut result = Vec::new();
2939
2940 for doc in docs {
2941 if let Some(obj) = doc.as_object() {
2942 if let Some(value) = obj.get(&field) {
2943 if let Value::Array(arr) = value {
2944 for item in arr {
2945 let mut new_obj = obj.clone();
2946 new_obj.insert(field.clone(), item.clone());
2947 result.push(Value::Object(new_obj));
2948 }
2949 } else {
2950 result.push(doc.clone());
2952 }
2953 } else {
2954 result.push(doc.clone());
2956 }
2957 }
2958 }
2959
2960 Ok(result)
2961 }
2962 AggregationStage::Lookup(lookup_spec) => {
2963 let mut result = Vec::new();
2965
2966 for doc in docs {
2967 if let Some(obj) = doc.as_object() {
2968 let mut new_obj = obj.clone();
2969
2970 let local_field_value = obj.get(&lookup_spec.local_field);
2972
2973 if let Some(local_value) = local_field_value {
2974 let query =
2976 Query::new().eq(&lookup_spec.foreign_field, local_value.clone());
2977
2978 new_obj.insert(lookup_spec.as_field.clone(), Value::Array(Vec::new()));
2980 } else {
2981 new_obj.insert(lookup_spec.as_field.clone(), Value::Array(Vec::new()));
2983 }
2984
2985 result.push(Value::Object(new_obj));
2986 }
2987 }
2988
2989 Ok(result)
2990 }
2991 AggregationStage::Out(collection_name) => {
2992 Ok(docs)
2995 }
2996 }
2997 }
2998
2999 fn calculate_group_key(&self, doc: &Document, group_id: &GroupId) -> Result<String> {
3000 match group_id {
3001 GroupId::Field(field) => {
3002 let field_parts: Vec<&str> = field.split('.').collect();
3003 if let Some(value) = self.get_nested_value(doc, &field_parts) {
3004 Ok(value.to_string())
3005 } else {
3006 Ok("null".to_string())
3007 }
3008 }
3009 GroupId::Expression(expr) => {
3010 if let Some(value) = self.evaluate_expression(doc, expr) {
3011 Ok(value.to_string())
3012 } else {
3013 Ok("null".to_string())
3014 }
3015 }
3016 GroupId::Null => Ok("null".to_string()),
3017 }
3018 }
3019
3020 fn evaluate_expression(&self, doc: &Document, expr: &Value) -> Option<Value> {
3021 match expr {
3022 Value::String(s) if s.starts_with('$') => {
3023 let field = &s[1..];
3024 let field_parts: Vec<&str> = field.split('.').collect();
3025 self.get_nested_value(doc, &field_parts).cloned()
3026 }
3027 Value::Object(obj) => {
3028 if let Some(sum_expr) = obj.get("$sum") {
3030 if let Some(field) = sum_expr.as_str() {
3031 if field.starts_with('$') {
3032 let field_name = &field[1..];
3033 let field_parts: Vec<&str> = field_name.split('.').collect();
3034 return self.get_nested_value(doc, &field_parts).cloned();
3035 }
3036 }
3037 }
3038 None
3039 }
3040 _ => Some(expr.clone()),
3041 }
3042 }
3043
3044 fn calculate_sum(&self, docs: &[Document], expr: &Value) -> Result<Value> {
3045 let mut sum = 0.0;
3046
3047 for doc in docs {
3048 if let Some(value) = self.evaluate_expression(doc, expr) {
3049 if let Some(num) = value.as_f64() {
3050 sum += num;
3051 }
3052 }
3053 }
3054
3055 Ok(Value::Number(serde_json::Number::from_f64(sum).unwrap()))
3056 }
3057
3058 fn calculate_avg(&self, docs: &[Document], expr: &Value) -> Result<Value> {
3059 if docs.is_empty() {
3060 return Ok(Value::Number(serde_json::Number::from(0)));
3061 }
3062
3063 let sum = self.calculate_sum(docs, expr)?;
3064 if let Some(sum_num) = sum.as_f64() {
3065 let avg = sum_num / docs.len() as f64;
3066 Ok(Value::Number(serde_json::Number::from_f64(avg).unwrap()))
3067 } else {
3068 Ok(Value::Number(serde_json::Number::from(0)))
3069 }
3070 }
3071
3072 fn calculate_min(&self, docs: &[Document], expr: &Value) -> Result<Value> {
3073 let mut min_value: Option<Value> = None;
3074
3075 for doc in docs {
3076 if let Some(value) = self.evaluate_expression(doc, expr) {
3077 if let Some(ref mut min) = min_value {
3078 if self.compare_values(&value, min) == Ordering::Less {
3079 *min = value;
3080 }
3081 } else {
3082 min_value = Some(value);
3083 }
3084 }
3085 }
3086
3087 Ok(min_value.unwrap_or(Value::Null))
3088 }
3089
3090 fn calculate_max(&self, docs: &[Document], expr: &Value) -> Result<Value> {
3091 let mut max_value: Option<Value> = None;
3092
3093 for doc in docs {
3094 if let Some(value) = self.evaluate_expression(doc, expr) {
3095 if let Some(ref mut max) = max_value {
3096 if self.compare_values(&value, max) == Ordering::Greater {
3097 *max = value;
3098 }
3099 } else {
3100 max_value = Some(value);
3101 }
3102 }
3103 }
3104
3105 Ok(max_value.unwrap_or(Value::Null))
3106 }
3107
3108 fn calculate_std_dev_pop(&self, docs: &[Document], expr: &Value) -> Result<Value> {
3109 if docs.is_empty() {
3110 return Ok(Value::Number(serde_json::Number::from(0)));
3111 }
3112
3113 let avg = self.calculate_avg(docs, expr)?;
3114 let avg_num = avg.as_f64().unwrap_or(0.0);
3115
3116 let mut sum_sq_diff = 0.0;
3117 for doc in docs {
3118 if let Some(value) = self.evaluate_expression(doc, expr) {
3119 if let Some(num) = value.as_f64() {
3120 let diff = num - avg_num;
3121 sum_sq_diff += diff * diff;
3122 }
3123 }
3124 }
3125
3126 let variance = sum_sq_diff / docs.len() as f64;
3127 let std_dev = variance.sqrt();
3128
3129 Ok(Value::Number(
3130 serde_json::Number::from_f64(std_dev).unwrap(),
3131 ))
3132 }
3133
3134 fn calculate_std_dev_samp(&self, docs: &[Document], expr: &Value) -> Result<Value> {
3135 if docs.len() <= 1 {
3136 return Ok(Value::Number(serde_json::Number::from(0)));
3137 }
3138
3139 let avg = self.calculate_avg(docs, expr)?;
3140 let avg_num = avg.as_f64().unwrap_or(0.0);
3141
3142 let mut sum_sq_diff = 0.0;
3143 for doc in docs {
3144 if let Some(value) = self.evaluate_expression(doc, expr) {
3145 if let Some(num) = value.as_f64() {
3146 let diff = num - avg_num;
3147 sum_sq_diff += diff * diff;
3148 }
3149 }
3150 }
3151
3152 let variance = sum_sq_diff / (docs.len() - 1) as f64;
3153 let std_dev = variance.sqrt();
3154
3155 Ok(Value::Number(
3156 serde_json::Number::from_f64(std_dev).unwrap(),
3157 ))
3158 }
3159}
3160
3161#[derive(Debug, Clone, Serialize, Deserialize)]
3163pub enum AggregationStage {
3164 Match(Query),
3165 Project(Map<String, Value>),
3166 Sort(Vec<(String, SortOrder)>),
3167 Skip(usize),
3168 Limit(usize),
3169 Group(GroupSpecification),
3170 Unwind(String),
3171 Lookup(LookupSpecification),
3172 Out(String),
3173}
3174
3175#[derive(Debug, Clone, Serialize, Deserialize)]
3176pub struct GroupSpecification {
3177 pub id: GroupId,
3178 pub operations: HashMap<String, GroupOperation>,
3179}
3180
3181#[derive(Debug, Clone, Serialize, Deserialize)]
3182pub enum GroupId {
3183 Field(String),
3184 Expression(Value),
3185 Null,
3186}
3187
3188#[derive(Debug, Clone, Serialize, Deserialize)]
3189pub enum GroupOperation {
3190 Sum(Value),
3191 Avg(Value),
3192 Min(Value),
3193 Max(Value),
3194 First(Value),
3195 Last(Value),
3196 Push(Value),
3197 AddToSet(Value),
3198 StdDevPop(Value),
3199 StdDevSamp(Value),
3200}
3201
3202#[derive(Debug, Clone, Serialize, Deserialize)]
3203pub struct LookupSpecification {
3204 pub from: String,
3205 pub local_field: String,
3206 pub foreign_field: String,
3207 pub as_field: String, }
3209
3210#[derive(Debug, Clone, Serialize, Deserialize)]
3212pub struct FindOneAndUpdateOptions {
3213 pub upsert: bool,
3214 pub return_document: ReturnDocument,
3215 pub projection: Option<Map<String, Value>>,
3216 pub sort: Option<Vec<(String, SortOrder)>>,
3217 pub max_time_ms: Option<u64>,
3218}
3219
3220impl Default for FindOneAndUpdateOptions {
3221 fn default() -> Self {
3222 Self {
3223 upsert: false,
3224 return_document: ReturnDocument::Before,
3225 projection: None,
3226 sort: None,
3227 max_time_ms: None,
3228 }
3229 }
3230}
3231
3232#[derive(Debug, Clone, Serialize, Deserialize)]
3233pub struct FindOneAndReplaceOptions {
3234 pub upsert: bool,
3235 pub return_document: ReturnDocument,
3236 pub projection: Option<Map<String, Value>>,
3237 pub sort: Option<Vec<(String, SortOrder)>>,
3238 pub max_time_ms: Option<u64>,
3239}
3240
3241impl Default for FindOneAndReplaceOptions {
3242 fn default() -> Self {
3243 Self {
3244 upsert: false,
3245 return_document: ReturnDocument::Before,
3246 projection: None,
3247 sort: None,
3248 max_time_ms: None,
3249 }
3250 }
3251}
3252
3253#[derive(Debug, Clone, Serialize, Deserialize)]
3254pub struct FindOneAndDeleteOptions {
3255 pub projection: Option<Map<String, Value>>,
3256 pub sort: Option<Vec<(String, SortOrder)>>,
3257 pub max_time_ms: Option<u64>,
3258}
3259
3260impl Default for FindOneAndDeleteOptions {
3261 fn default() -> Self {
3262 Self {
3263 projection: None,
3264 sort: None,
3265 max_time_ms: None,
3266 }
3267 }
3268}
3269
3270#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
3271pub enum ReturnDocument {
3272 Before,
3273 After,
3274}
3275
3276#[derive(Debug, Clone, Serialize, Deserialize)]
3278pub struct CollectionStats {
3279 pub count: usize,
3280 pub size: usize,
3281 pub avg_obj_size: usize,
3282 pub index_count: usize,
3283 pub index_size: usize,
3284}
3285
3286#[derive(Debug, Clone, Serialize, Deserialize)]
3288pub struct Database {
3305 name: String,
3307 collections: HashMap<String, Collection>,
3309}
3310
3311impl Database {
3312 pub fn new(name: String) -> Self {
3318 Self {
3319 name,
3320 collections: HashMap::new(),
3321 }
3322 }
3323
3324 pub fn collection(&mut self, name: &str) -> &mut Collection {
3334 self.collections
3335 .entry(name.to_string())
3336 .or_insert_with(|| Collection::new(name.to_string()))
3337 }
3338
3339 pub fn list_collection_names(&self) -> Vec<String> {
3345 self.collections.keys().cloned().collect()
3346 }
3347
3348 pub fn create_collection(
3359 &mut self,
3360 name: &str,
3361 options: Option<CreateCollectionOptions>,
3362 ) -> Result<()> {
3363 if self.collections.contains_key(name) {
3364 return Err(DbError::Other(format!(
3365 "Collection {} already exists",
3366 name
3367 )));
3368 }
3369
3370 let mut collection = Collection::new(name.to_string());
3371
3372 if let Some(opts) = options {
3374 for index in opts.indexes {
3376 collection.create_index(index)?;
3377 }
3378 }
3379
3380 self.collections.insert(name.to_string(), collection);
3381 Ok(())
3382 }
3383
3384 pub fn drop_collection(&mut self, name: &str) -> Result<()> {
3394 if self.collections.remove(name).is_none() {
3395 return Err(DbError::Other(format!(
3396 "Collection {} does not exist",
3397 name
3398 )));
3399 }
3400
3401 Ok(())
3402 }
3403
3404 pub fn stats(&self) -> Result<DatabaseStats> {
3410 let mut collections = Vec::new();
3411 let mut total_size = 0;
3412 let mut total_index_size = 0;
3413
3414 for (name, collection) in self.collections.iter() {
3415 let coll_stats = collection.stats()?;
3416 collections.push(CollectionStatsInfo {
3417 name: name.clone(),
3418 count: coll_stats.count,
3419 size: coll_stats.size,
3420 index_count: coll_stats.index_count,
3421 index_size: coll_stats.index_size,
3422 });
3423
3424 total_size += coll_stats.size;
3425 total_index_size += coll_stats.index_size;
3426 }
3427
3428 Ok(DatabaseStats {
3429 collections,
3430 total_size,
3431 total_index_size,
3432 })
3433 }
3434
3435 pub fn run_command(&mut self, command: &Document) -> Result<Document> {
3447 if let Some(obj) = command.as_object() {
3448 if let Some(cmd_name) = obj.keys().next() {
3449 match cmd_name.as_str() {
3450 "create" => {
3451 if let Some(Value::String(coll_name)) = obj.get(cmd_name) {
3452 self.create_collection(coll_name, None)?;
3453 Ok(serde_json::json!({ "ok": 1 }))
3454 } else {
3455 Err(DbError::Other("Invalid create command".into()))
3456 }
3457 }
3458 "drop" => {
3459 if let Some(Value::String(coll_name)) = obj.get(cmd_name) {
3460 self.drop_collection(coll_name)?;
3461 Ok(serde_json::json!({ "ok": 1 }))
3462 } else {
3463 Err(DbError::Other("Invalid drop command".into()))
3464 }
3465 }
3466 "listCollections" => {
3467 let coll_names = self.list_collection_names();
3468 let collections: Vec<_> = coll_names
3469 .into_iter()
3470 .map(|name| {
3471 serde_json::json!({
3472 "name": name,
3473 "type": "collection"
3474 })
3475 })
3476 .collect();
3477
3478 Ok(serde_json::json!({
3479 "cursor": {
3480 "id": 0,
3481 "ns": format!("{}.collections", self.name),
3482 "firstBatch": collections
3483 },
3484 "ok": 1
3485 }))
3486 }
3487 "dbStats" => {
3488 let stats = self.stats()?;
3489 Ok(serde_json::json!({
3490 "db": self.name,
3491 "collections": stats.collections.len(),
3492 "objects": stats.collections.iter().map(|c| c.count).sum::<usize>(),
3493 "avgObjSize": if stats.collections.iter().map(|c| c.count).sum::<usize>() > 0 {
3494 stats.total_size / stats.collections.iter().map(|c| c.count).sum::<usize>()
3495 } else {
3496 0
3497 },
3498 "dataSize": stats.total_size,
3499 "indexSize": stats.total_index_size,
3500 "ok": 1
3501 }))
3502 }
3503 _ => Err(DbError::Other(format!("Unknown command: {}", cmd_name))),
3504 }
3505 } else {
3506 Err(DbError::Other("Empty command".into()))
3507 }
3508 } else {
3509 Err(DbError::Other("Command must be an object".into()))
3510 }
3511 }
3512}
3513
3514#[derive(Debug, Clone, Serialize, Deserialize)]
3516pub struct DatabaseStats {
3517 pub collections: Vec<CollectionStatsInfo>,
3518 pub total_size: usize,
3519 pub total_index_size: usize,
3520}
3521
3522#[derive(Debug, Clone, Serialize, Deserialize)]
3523pub struct CollectionStatsInfo {
3524 pub name: String,
3525 pub count: usize,
3526 pub size: usize,
3527 pub index_count: usize,
3528 pub index_size: usize,
3529}
3530
3531#[derive(Debug, Clone, Serialize, Deserialize)]
3533pub struct CreateCollectionOptions {
3534 pub capped: bool,
3535 pub size: Option<usize>,
3536 pub max: Option<usize>,
3537 pub storage_engine: Option<Map<String, Value>>,
3538 pub validator: Option<Document>,
3539 pub validation_level: Option<String>,
3540 pub validation_action: Option<String>,
3541 pub index_option_defaults: Option<Map<String, Value>>,
3542 pub view_on: Option<String>,
3543 pub pipeline: Option<Vec<Document>>,
3544 pub collation: Option<Map<String, Value>>,
3545 pub write_concern: Option<Map<String, Value>>,
3546 pub indexes: Vec<Index>,
3547}
3548
3549impl Default for CreateCollectionOptions {
3550 fn default() -> Self {
3551 Self {
3552 capped: false,
3553 size: None,
3554 max: None,
3555 storage_engine: None,
3556 validator: None,
3557 validation_level: None,
3558 validation_action: None,
3559 index_option_defaults: None,
3560 view_on: None,
3561 pipeline: None,
3562 collation: None,
3563 write_concern: None,
3564 indexes: Vec::new(),
3565 }
3566 }
3567}
3568
3569#[derive(Debug, Clone)]
3571pub struct Client {
3591 pub databases: HashMap<String, Database>,
3593 pub uri: String,
3595 pub config: config::DatabaseConfig,
3597 pub encryption_key: Option<encryption::EncryptionKey>,
3599}
3600
3601impl Client {
3602 pub fn new() -> Self {
3621 let config = config::DatabaseConfig::default();
3622 Self {
3623 databases: HashMap::new(),
3624 uri: "luckdb://localhost:27017".to_string(), encryption_key: None,
3626 config,
3627 }
3628 }
3629
3630 pub fn with_uri(uri: &str) -> Self {
3636 let config = config::DatabaseConfig::default();
3637 Self {
3638 databases: HashMap::new(),
3639 uri: uri.to_string(),
3640 encryption_key: None,
3641 config,
3642 }
3643 }
3644
3645 pub fn with_storage_path<P: AsRef<Path>>(uri: &str, path: P) -> Self {
3652 let mut config = config::DatabaseConfig::default();
3653 config.storage_path = Some(path.as_ref().to_path_buf());
3654 Self {
3655 databases: HashMap::new(),
3656 uri: uri.to_string(),
3657 encryption_key: None,
3658 config,
3659 }
3660 }
3661
3662 pub fn with_config(config: config::DatabaseConfig) -> Result<Self> {
3693 config.validate()?;
3694
3695 let encryption_key = if config.is_encryption_configured() {
3696 Some(encryption::EncryptionKey::from_password(
3697 config.encryption_password.as_ref().unwrap(),
3698 b"luckdb_salt_2024", ))
3700 } else {
3701 None
3702 };
3703
3704 Ok(Self {
3705 databases: HashMap::new(),
3706 uri: "luckdb://localhost:27017".to_string(),
3707 config,
3708 encryption_key,
3709 })
3710 }
3711
3712 pub fn with_config_file<P: AsRef<Path>>(config_path: P) -> Result<Self> {
3754 let config = config::DatabaseConfig::load_from_file(config_path)?;
3755 Self::with_config(config)
3756 }
3757
3758 pub fn with_encryption<S: Into<String>>(mut self, password: S) -> Result<Self> {
3764 self.config.encryption_enabled = true;
3765 self.config.encryption_password = Some(password.into());
3766
3767 let key = encryption::EncryptionKey::from_password(
3768 self.config.encryption_password.as_ref().unwrap(),
3769 b"luckdb_salt_2024",
3770 );
3771
3772 self.encryption_key = Some(key);
3773 self.config.validate()?;
3774 Ok(self)
3775 }
3776
3777 pub fn db(&mut self, name: &str) -> &mut Database {
3787 self.databases
3788 .entry(name.to_string())
3789 .or_insert_with(|| Database::new(name.to_string()))
3790 }
3791
3792 pub fn list_database_names(&self) -> Vec<String> {
3798 self.databases.keys().cloned().collect()
3799 }
3800
3801 pub fn drop_database(&mut self, name: &str) -> Result<()> {
3811 if self.databases.remove(name).is_none() {
3812 return Err(DbError::Other(format!("Database {} does not exist", name)));
3813 }
3814
3815 Ok(())
3816 }
3817
3818 pub fn uri(&self) -> &str {
3824 &self.uri
3825 }
3826
3827 pub fn save(&self) -> Result<()> {
3833 let storage_path = self.config.get_or_create_storage_path()?;
3834
3835 if let Some(parent) = storage_path.parent() {
3837 fs::create_dir_all(parent)?;
3838 }
3839
3840 let mut persistent_storage = PersistentStorage::new();
3841
3842 for (name, db) in &self.databases {
3843 let mut persistent_db = PersistentDatabase {
3844 name: name.clone(),
3845 collections: HashMap::new(),
3846 };
3847
3848 for (coll_name, coll) in &db.collections {
3849 let persistent_coll = PersistentCollection::from(coll.clone());
3850 persistent_db
3851 .collections
3852 .insert(coll_name.clone(), persistent_coll);
3853 }
3854
3855 persistent_storage
3856 .databases
3857 .insert(name.clone(), persistent_db);
3858 }
3859
3860 persistent_storage.save_to_file(&storage_path, self.encryption_key.as_ref())?;
3862 Ok(())
3863 }
3864
3865 pub fn load(&mut self) -> Result<()> {
3871 let storage_path = self.config.get_or_create_storage_path()?;
3872
3873 if storage_path.exists() {
3874 let persistent_storage =
3875 PersistentStorage::load_from_file(&storage_path, self.encryption_key.as_ref())?;
3876
3877 for (name, persistent_db) in persistent_storage.databases {
3878 let mut db = Database::new(name.clone());
3879
3880 for (coll_name, persistent_coll) in persistent_db.collections {
3881 let coll = Collection::from(persistent_coll);
3882 db.collections.insert(coll_name, coll);
3883 }
3884
3885 self.databases.insert(name, db);
3886 }
3887 }
3888 Ok(())
3889 }
3890
3891 pub fn config(&self) -> &config::DatabaseConfig {
3893 &self.config
3894 }
3895
3896 pub fn update_config(&mut self, config: config::DatabaseConfig) -> Result<()> {
3898 config.validate()?;
3899 self.config = config;
3900
3901 if self.config.is_encryption_configured() {
3903 self.encryption_key = Some(encryption::EncryptionKey::from_password(
3904 self.config.encryption_password.as_ref().unwrap(),
3905 b"luckdb_salt_2024",
3906 ));
3907 } else {
3908 self.encryption_key = None;
3909 }
3910
3911 Ok(())
3912 }
3913}
3914
3915#[derive(Debug, Clone, Serialize, Deserialize)]
3917pub enum BulkWriteOperation {
3918 InsertOne {
3919 document: Document,
3920 },
3921 UpdateOne {
3922 filter: Query,
3923 update: UpdateDocument,
3924 upsert: bool,
3925 },
3926 UpdateMany {
3927 filter: Query,
3928 update: UpdateDocument,
3929 upsert: bool,
3930 },
3931 ReplaceOne {
3932 filter: Query,
3933 replacement: Document,
3934 upsert: bool,
3935 },
3936 DeleteOne {
3937 filter: Query,
3938 },
3939 DeleteMany {
3940 filter: Query,
3941 },
3942}
3943
3944#[derive(Debug, Clone, Serialize, Deserialize)]
3945pub struct BulkWriteOptions {
3946 pub ordered: bool,
3947 pub bypass_document_validation: bool,
3948 pub write_concern: Option<Map<String, Value>>,
3949}
3950
3951impl Default for BulkWriteOptions {
3952 fn default() -> Self {
3953 Self {
3954 ordered: true,
3955 bypass_document_validation: false,
3956 write_concern: None,
3957 }
3958 }
3959}
3960
3961#[derive(Debug, Clone, Serialize, Deserialize)]
3962pub struct BulkWriteResult {
3963 pub inserted_count: usize,
3964 pub matched_count: usize,
3965 pub modified_count: usize,
3966 pub deleted_count: usize,
3967 pub upserted_count: usize,
3968 pub upserted_ids: HashMap<usize, DocId>,
3969}
3970
3971impl Collection {
3972 pub fn bulk_write(
3974 &mut self,
3975 operations: Vec<BulkWriteOperation>,
3976 options: Option<BulkWriteOptions>,
3977 ) -> Result<BulkWriteResult> {
3978 let opts = options.unwrap_or_default();
3979 let mut result = BulkWriteResult {
3980 inserted_count: 0,
3981 matched_count: 0,
3982 modified_count: 0,
3983 deleted_count: 0,
3984 upserted_count: 0,
3985 upserted_ids: HashMap::new(),
3986 };
3987
3988 let mut index = 0;
3989
3990 for op in operations {
3991 match op {
3992 BulkWriteOperation::InsertOne { document } => {
3993 let id = self.insert(document)?;
3994 result.inserted_count += 1;
3995 result.upserted_ids.insert(index, id);
3996 }
3997 BulkWriteOperation::UpdateOne {
3998 filter,
3999 update,
4000 upsert,
4001 } => {
4002 let count = self.update_one(filter, update, upsert)?;
4003 if count > 0 {
4004 result.matched_count += 1;
4005 result.modified_count += 1;
4006 }
4007 if upsert && count > 0 {
4008 result.upserted_count += 1;
4009 }
4010 }
4011 BulkWriteOperation::UpdateMany {
4012 filter,
4013 update,
4014 upsert,
4015 } => {
4016 let count = self.update_many(filter, update)?;
4017 result.matched_count += count;
4018 result.modified_count += count;
4019 if upsert && count > 0 {
4020 result.upserted_count += count;
4021 }
4022 }
4023 BulkWriteOperation::ReplaceOne {
4024 filter,
4025 replacement,
4026 upsert,
4027 } => {
4028 let count = self.replace_one(filter, replacement, upsert)?;
4029 if count > 0 {
4030 result.matched_count += 1;
4031 result.modified_count += 1;
4032 }
4033 if upsert && count > 0 {
4034 result.upserted_count += 1;
4035 }
4036 }
4037 BulkWriteOperation::DeleteOne { filter } => {
4038 let count = self.delete_one(filter)?;
4039 result.deleted_count += count;
4040 }
4041 BulkWriteOperation::DeleteMany { filter } => {
4042 let count = self.delete_many(filter)?;
4043 result.deleted_count += count;
4044 }
4045 }
4046
4047 index += 1;
4048
4049 }
4052
4053 Ok(result)
4054 }
4055}
4056
4057#[derive(Debug, Clone)]
4059pub struct RemoteClient {
4060 addr: SocketAddr,
4061}
4062
4063impl RemoteClient {
4064 pub fn new(addr: SocketAddr) -> Self {
4065 Self { addr }
4066 }
4067
4068 pub fn connect(&self) -> Result<RemoteConnection> {
4069 let stream = TcpStream::connect(self.addr)?;
4070 Ok(RemoteConnection { stream })
4071 }
4072}
4073
4074#[derive(Debug)]
4075pub struct RemoteConnection {
4076 stream: TcpStream,
4077}
4078
4079impl RemoteConnection {
4080 pub fn send_command(&mut self, command: &str) -> Result<String> {
4081 let mut writer = BufWriter::new(&self.stream);
4082 writer.write_all(command.as_bytes())?;
4083 writer.write_all(b"\n")?;
4084 writer.flush()?;
4085
4086 let mut reader = BufReader::new(&self.stream);
4087 let mut response = String::new();
4088 reader.read_line(&mut response)?;
4089
4090 Ok(response)
4091 }
4092
4093 pub fn close(self) -> Result<()> {
4094 Ok(())
4096 }
4097}
4098
4099#[derive(Debug)]
4101pub struct Server {
4102 addr: SocketAddr,
4103 client: Client,
4104}
4105
4106impl Server {
4107 pub fn new(addr: SocketAddr, storage_path: Option<PathBuf>) -> Self {
4109 let mut config = config::DatabaseConfig::default();
4110 config.storage_path = storage_path;
4111
4112 let client = Client::with_config(config).unwrap();
4113 Self { addr, client }
4114 }
4115
4116 pub fn with_config(addr: SocketAddr, config: config::DatabaseConfig) -> Result<Self> {
4118 let client = Client::with_config(config)?;
4119 Ok(Self { addr, client })
4120 }
4121
4122 pub fn with_config_file<P: AsRef<Path>>(addr: SocketAddr, config_path: P) -> Result<Self> {
4124 let client = Client::with_config_file(config_path)?;
4125 Ok(Self { addr, client })
4126 }
4127
4128 #[deprecated(note = "Use with_config() instead")]
4130 pub fn with_auth(mut self, username: String, password: String) -> Self {
4131 let mut config = self.client.config().clone();
4132 config.auth_username = Some(username);
4133 config.auth_password = Some(password);
4134 let _ = self.client.update_config(config);
4135 self
4136 }
4137
4138 pub fn with_encryption(mut self, password: String) -> Result<Self> {
4140 self.client = self.client.with_encryption(password)?;
4141 Ok(self)
4142 }
4143
4144 pub fn start(&mut self) -> Result<()> {
4145 if self.client.config().storage_path.is_some() {
4147 self.client.load()?;
4148 }
4149 let listener = TcpListener::bind(self.addr)?;
4150 println!("Server listening on {}", self.addr);
4151
4152 for stream in listener.incoming() {
4153 match stream {
4154 Ok(stream) => {
4155 let config = self.client.config().clone();
4156 let encryption_key = self.client.encryption_key.clone();
4157 thread::spawn(move || {
4158 if let Err(e) = Self::handle_client(stream, config, encryption_key) {
4159 eprintln!("Error handling client: {}", e);
4160 }
4161 });
4162 }
4163 Err(e) => {
4164 eprintln!("Failed to accept connection: {}", e);
4165 }
4166 }
4167 }
4168 Ok(())
4169 }
4170
4171 fn handle_client(
4172 mut stream: TcpStream,
4173 config: config::DatabaseConfig,
4174 encryption_key: Option<encryption::EncryptionKey>,
4175 ) -> Result<()> {
4176 let mut client = Client::with_config(config)?;
4177 client.encryption_key = encryption_key;
4178
4179 let mut reader = BufReader::new(&stream);
4180 let mut writer = BufWriter::new(&stream);
4181 let mut authenticated = !client.config().is_auth_configured(); loop {
4184 let mut command = String::new();
4185 match reader.read_line(&mut command) {
4186 Ok(0) => break, Ok(_) => {
4188 let command = command.trim();
4190 if command.is_empty() {
4191 continue;
4192 }
4193
4194 if !authenticated {
4196 if command.starts_with("AUTH") {
4197 let parts: Vec<&str> = command.split_whitespace().collect();
4198 if parts.len() != 3 {
4199 let response =
4200 "ERROR: Usage: AUTH <username> <password>".to_string();
4201 writer.write_all(response.as_bytes())?;
4202 writer.write_all(b"\n")?;
4203 writer.flush()?;
4204 continue;
4205 }
4206
4207 let username = parts[1];
4208 let password = parts[2];
4209
4210 if let (Some(auth_user), Some(auth_pass)) = (
4211 &client.config().auth_username,
4212 &client.config().auth_password,
4213 ) {
4214 if username == auth_user && password == auth_pass {
4215 authenticated = true;
4216 let response = "OK: Authenticated".to_string();
4217 writer.write_all(response.as_bytes())?;
4218 writer.write_all(b"\n")?;
4219 writer.flush()?;
4220 continue;
4221 }
4222 }
4223
4224 let response = "ERROR: Authentication failed".to_string();
4225 writer.write_all(response.as_bytes())?;
4226 writer.write_all(b"\n")?;
4227 writer.flush()?;
4228 continue;
4229 } else {
4230 let response =
4231 "ERROR: Not authenticated. Use AUTH <username> <password>"
4232 .to_string();
4233 writer.write_all(response.as_bytes())?;
4234 writer.write_all(b"\n")?;
4235 writer.flush()?;
4236 continue;
4237 }
4238 }
4239
4240 let response = match Self::process_command(command, &mut client) {
4242 Ok(response) => response,
4243 Err(e) => format!("ERROR: {}", e),
4244 };
4245
4246 writer.write_all(response.as_bytes())?;
4248 writer.write_all(b"\n")?;
4249 writer.flush()?;
4250
4251 if command == "EXIT" {
4253 break;
4254 }
4255 }
4256 Err(e) => {
4257 eprintln!("Error reading from client: {}", e);
4258 break;
4259 }
4260 }
4261 }
4262 Ok(())
4263 }
4264
4265 fn process_command(command: &str, client: &mut Client) -> Result<String> {
4266 let parts: Vec<&str> = command.split_whitespace().collect();
4267 if parts.is_empty() {
4268 return Ok("ERROR: Empty command".to_string());
4269 }
4270 match parts[0] {
4271 "INSERT" => {
4272 if parts.len() < 3 {
4273 return Ok(
4274 "ERROR: Usage: INSERT <database> <collection> <document>".to_string()
4275 );
4276 }
4277 let db_name = parts[1];
4278 let coll_name = parts[2];
4279 let doc_json = parts[3..].join(" ");
4280 let doc: Document = serde_json::from_str(&doc_json)?;
4281 let db = client.db(db_name);
4282 let coll = db.collection(coll_name);
4283 let id = coll.insert(doc)?;
4284 Ok(format!("OK: Document inserted with ID: {}", id))
4285 }
4286 "FIND" => {
4287 if parts.len() < 3 {
4288 return Ok("ERROR: Usage: FIND <database> <collection> [query]".to_string());
4289 }
4290 let db_name = parts[1];
4291 let coll_name = parts[2];
4292 let query = if parts.len() > 3 {
4293 let query_json = parts[3..].join(" ");
4294 let query_value: Value = serde_json::from_str(&query_json)?;
4295 Query::from_value(query_value)?
4296 } else {
4297 Query::new()
4298 };
4299 let db = client.db(db_name);
4300 let coll = db.collection(coll_name);
4301 let results = coll.find(query, None)?;
4302 let results_json = serde_json::to_string(&results)?;
4303 Ok(format!("OK: {}", results_json))
4304 }
4305 "UPDATE" => {
4306 if parts.len() < 4 {
4307 return Ok(
4308 "ERROR: Usage: UPDATE <database> <collection> <query> <update>".to_string(),
4309 );
4310 }
4311 let db_name = parts[1];
4312 let coll_name = parts[2];
4313 let mut query_end = 3;
4315 let mut brace_count = 0;
4316 for (i, c) in command.char_indices() {
4317 if i < parts[0].len() + parts[1].len() + parts[2].len() + 2 {
4318 continue;
4319 }
4320 if c == '{' {
4321 brace_count += 1;
4322 } else if c == '}' {
4323 brace_count -= 1;
4324 if brace_count == 0 {
4325 query_end = i + 1;
4326 break;
4327 }
4328 }
4329 }
4330 let query_json =
4331 command[parts[0].len() + parts[1].len() + parts[2].len() + 3..query_end].trim();
4332 let update_json = command[query_end..].trim();
4333 let query: Query = serde_json::from_str(query_json)?;
4334 let update: UpdateDocument = serde_json::from_str(update_json)?;
4335 let db = client.db(db_name);
4336 let coll = db.collection(coll_name);
4337 let count = coll.update_one(query, update, false)?;
4338 Ok(format!("OK: Updated {} documents", count))
4339 }
4340 "DELETE" => {
4341 if parts.len() < 3 {
4342 return Ok("ERROR: Usage: DELETE <database> <collection> [query]".to_string());
4343 }
4344 let db_name = parts[1];
4345 let coll_name = parts[2];
4346 let query = if parts.len() > 3 {
4347 let query_json = parts[3..].join(" ");
4348 let query_value: Value = serde_json::from_str(&query_json)?;
4349 Query::from_value(query_value)?
4350 } else {
4351 Query::new()
4352 };
4353 let db = client.db(db_name);
4354 let coll = db.collection(coll_name);
4355 let count = coll.delete_one(query)?;
4356 Ok(format!("OK: Deleted {} documents", count))
4357 }
4358 "SAVE" => {
4359 client.save()?;
4360 Ok("OK: Data saved to disk".to_string())
4361 }
4362 "LOAD" => {
4363 client.load()?;
4364 Ok("OK: Data loaded from disk".to_string())
4365 }
4366 "LIST_DB" => {
4367 let dbs = client.list_database_names();
4368 Ok(format!("OK: {}", serde_json::to_string(&dbs)?))
4369 }
4370 "LIST_COLL" => {
4371 if parts.len() < 2 {
4372 return Ok("ERROR: Usage: LIST_COLL <database>".to_string());
4373 }
4374 let db_name = parts[1];
4375 let db = client.db(db_name);
4376 let colls = db.list_collection_names();
4377 Ok(format!("OK: {}", serde_json::to_string(&colls)?))
4378 }
4379 "STATS" => {
4380 if parts.len() < 2 {
4381 return Ok("ERROR: Usage: STATS <database>".to_string());
4382 }
4383 let db_name = parts[1];
4384 let db = client.db(db_name);
4385 let stats = db.stats()?;
4386 Ok(format!("OK: {}", serde_json::to_string(&stats)?))
4387 }
4388 "EXIT" => {
4389 if client.config().storage_path.is_some() {
4391 if let Err(e) = client.save() {
4392 return Ok(format!("ERROR: Failed to save data: {}", e));
4393 }
4394 }
4395 Ok("OK: Connection closing".to_string())
4396 }
4397 _ => Ok("ERROR: Unknown command".to_string()),
4398 }
4399 }
4400}