1#![feature(slice_as_array)]
2#![allow(warnings)]
3use chrono;
4use serde::{Deserialize, Serialize};
5use serde_json::{Map, Value as Document, Value};
6use std::clone;
7use std::cmp::Ordering;
8use std::collections::{BTreeMap, HashMap, HashSet};
9use std::fs::{self, File};
10use std::io::{BufRead, BufReader, BufWriter};
11use std::io::{Read, Write};
12use std::net::{SocketAddr, TcpListener, TcpStream};
13use std::ops::Bound::{Excluded, Included, Unbounded};
14use std::path::{Path, PathBuf};
15use std::sync::{Arc, RwLock};
16use std::thread;
17use thiserror::Error;
18use uuid::Uuid;
19
20#[derive(Error, Debug)]
22pub enum DbError {
23 #[error("Document not found")]
24 NotFound,
25 #[error("Invalid query: {0}")]
26 InvalidQuery(String),
27 #[error("Serialization error: {0}")]
28 Serialization(#[from] serde_json::Error),
29 #[error("IO error: {0}")]
30 IoError(#[from] std::io::Error),
31 #[error("Other error: {0}")]
32 Other(String),
33 #[error("Index error: {0}")]
34 IndexError(String),
35 #[error("Update error: {0}")]
36 UpdateError(String),
37 #[error("Validation error: {0}")]
38 ValidationError(String),
39 #[error("Connection error: {0}")]
40 ConnectionError(String),
41 #[error("Protocol error: {0}")]
42 ProtocolError(String),
43}
44pub type Result<T> = std::result::Result<T, DbError>;
45
46#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
48pub struct DocId(String);
49impl DocId {
50 pub fn new() -> Self {
51 Self(Uuid::new_v4().to_string())
52 }
53
54 pub fn from_str(s: &str) -> Self {
55 Self(s.to_string())
56 }
57
58 pub fn as_str(&self) -> &str {
59 &self.0
60 }
61}
62impl std::fmt::Display for DocId {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 write!(f, "{}", self.0)
65 }
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
70pub enum QueryOperator {
71 Eq(Value),
72 Ne(Value),
73 Gt(Value),
74 Gte(Value),
75 Lt(Value),
76 Lte(Value),
77 In(Vec<Value>),
78 Nin(Vec<Value>),
79 Exists(bool),
80 Regex(String),
81 And(Vec<Query>),
82 Or(Vec<Query>),
83 Nor(Vec<Query>),
84 Not(Box<QueryOperator>),
85 All(Vec<Value>),
87 ElemMatch(Query),
88 Size(usize),
89 Near {
91 point: (f64, f64),
92 max_distance: Option<f64>,
93 },
94 Within {
95 shape: GeoShape,
96 },
97 Intersects {
98 shape: GeoShape,
99 },
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub enum GeoShape {
104 Box {
105 bottom_left: (f64, f64),
106 top_right: (f64, f64),
107 },
108 Polygon {
109 points: Vec<(f64, f64)>,
110 },
111 Center {
112 center: (f64, f64),
113 radius: f64,
114 },
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct QueryCondition {
120 field: String,
121 operator: QueryOperator,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct Query {
127 conditions: Vec<QueryCondition>,
128 logical_ops: Vec<(LogicalOp, Vec<Query>)>,
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize)]
132pub enum LogicalOp {
133 And,
134 Or,
135 Nor,
136 Not,
137}
138impl Query {
140 pub fn from_value(value: Value) -> Result<Self> {
141 match value {
142 Value::Object(map) => {
143 let mut query = Query::new();
144
145 for (field, op_value) in map {
146 match op_value {
147 Value::Object(op_map) => {
148 for (op, val) in op_map {
149 match op.as_str() {
150 "$eq" => query = query.eq(&field, val.clone()),
151 "$ne" => query = query.ne(&field, val.clone()),
152 "$gt" => query = query.gt(&field, val.clone()),
153 "$gte" => query = query.gte(&field, val.clone()),
154 "$lt" => query = query.lt(&field, val.clone()),
155 "$lte" => query = query.lte(&field, val.clone()),
156 "$in" => {
157 if let Value::Array(arr) = val {
158 query = query.in_(&field, arr.clone());
159 }
160 }
161 "$nin" => {
162 if let Value::Array(arr) = val {
163 query = query.nin(&field, arr.clone());
164 }
165 }
166 "$exists" => {
167 if let Value::Bool(exists) = val {
168 query = query.exists(&field, exists);
169 }
170 }
171 "$regex" => {
172 if let Value::String(pattern) = val {
173 query = query.regex(&field, &pattern);
174 }
175 }
176 _ => {
177 return Err(DbError::InvalidQuery(format!(
178 "Unknown operator: {}",
179 op
180 )))
181 }
182 }
183 }
184 }
185 _ => query = query.eq(&field, op_value.clone()),
186 }
187 }
188
189 Ok(query)
190 }
191 _ => Err(DbError::InvalidQuery("Query must be an object".to_string())),
192 }
193 }
194}
195
196impl Query {
197 pub fn new() -> Self {
198 Self {
199 conditions: Vec::new(),
200 logical_ops: Vec::new(),
201 }
202 }
203
204 pub fn eq(mut self, key: &str, value: Value) -> Self {
205 self.conditions.push(QueryCondition {
206 field: key.to_string(),
207 operator: QueryOperator::Eq(value),
208 });
209 self
210 }
211
212 pub fn ne(mut self, key: &str, value: Value) -> Self {
213 self.conditions.push(QueryCondition {
214 field: key.to_string(),
215 operator: QueryOperator::Ne(value),
216 });
217 self
218 }
219
220 pub fn gt(mut self, key: &str, value: Value) -> Self {
221 self.conditions.push(QueryCondition {
222 field: key.to_string(),
223 operator: QueryOperator::Gt(value),
224 });
225 self
226 }
227
228 pub fn gte(mut self, key: &str, value: Value) -> Self {
229 self.conditions.push(QueryCondition {
230 field: key.to_string(),
231 operator: QueryOperator::Gte(value),
232 });
233 self
234 }
235
236 pub fn lt(mut self, key: &str, value: Value) -> Self {
237 self.conditions.push(QueryCondition {
238 field: key.to_string(),
239 operator: QueryOperator::Lt(value),
240 });
241 self
242 }
243
244 pub fn lte(mut self, key: &str, value: Value) -> Self {
245 self.conditions.push(QueryCondition {
246 field: key.to_string(),
247 operator: QueryOperator::Lte(value),
248 });
249 self
250 }
251
252 pub fn in_(mut self, key: &str, values: Vec<Value>) -> Self {
253 self.conditions.push(QueryCondition {
254 field: key.to_string(),
255 operator: QueryOperator::In(values),
256 });
257 self
258 }
259
260 pub fn nin(mut self, key: &str, values: Vec<Value>) -> Self {
261 self.conditions.push(QueryCondition {
262 field: key.to_string(),
263 operator: QueryOperator::Nin(values),
264 });
265 self
266 }
267
268 pub fn exists(mut self, key: &str, exists: bool) -> Self {
269 self.conditions.push(QueryCondition {
270 field: key.to_string(),
271 operator: QueryOperator::Exists(exists),
272 });
273 self
274 }
275
276 pub fn regex(mut self, key: &str, pattern: &str) -> Self {
277 self.conditions.push(QueryCondition {
278 field: key.to_string(),
279 operator: QueryOperator::Regex(pattern.to_string()),
280 });
281 self
282 }
283
284 pub fn and(mut self, queries: Vec<Query>) -> Self {
285 self.logical_ops.push((LogicalOp::And, queries));
286 self
287 }
288
289 pub fn or(mut self, queries: Vec<Query>) -> Self {
290 self.logical_ops.push((LogicalOp::Or, queries));
291 self
292 }
293
294 pub fn nor(mut self, queries: Vec<Query>) -> Self {
295 self.logical_ops.push((LogicalOp::Nor, queries));
296 self
297 }
298
299 pub fn not(mut self, query: Query) -> Self {
300 if let Some(cond) = query.conditions.first() {
302 self.conditions.push(QueryCondition {
303 field: cond.field.clone(),
304 operator: QueryOperator::Not(Box::new(cond.operator.clone())),
305 });
306 }
307 self
308 }
309
310 pub fn all(mut self, key: &str, values: Vec<Value>) -> Self {
312 self.conditions.push(QueryCondition {
313 field: key.to_string(),
314 operator: QueryOperator::All(values),
315 });
316 self
317 }
318
319 pub fn elem_match(mut self, key: &str, query: Query) -> Self {
320 self.conditions.push(QueryCondition {
321 field: key.to_string(),
322 operator: QueryOperator::ElemMatch(query),
323 });
324 self
325 }
326
327 pub fn size(mut self, key: &str, size: usize) -> Self {
328 self.conditions.push(QueryCondition {
329 field: key.to_string(),
330 operator: QueryOperator::Size(size),
331 });
332 self
333 }
334
335 pub fn near(mut self, key: &str, point: (f64, f64), max_distance: Option<f64>) -> Self {
337 self.conditions.push(QueryCondition {
338 field: key.to_string(),
339 operator: QueryOperator::Near {
340 point,
341 max_distance,
342 },
343 });
344 self
345 }
346
347 pub fn within(mut self, key: &str, shape: GeoShape) -> Self {
348 self.conditions.push(QueryCondition {
349 field: key.to_string(),
350 operator: QueryOperator::Within { shape },
351 });
352 self
353 }
354
355 pub fn intersects(mut self, key: &str, shape: GeoShape) -> Self {
356 self.conditions.push(QueryCondition {
357 field: key.to_string(),
358 operator: QueryOperator::Intersects { shape },
359 });
360 self
361 }
362
363 pub fn matches(&self, doc: &Document) -> bool {
365 for cond in &self.conditions {
367 if !self.matches_condition(doc, cond) {
368 return false;
369 }
370 }
371
372 for (op, queries) in &self.logical_ops {
374 match op {
375 LogicalOp::And => {
376 if !queries.iter().all(|q| q.matches(doc)) {
377 return false;
378 }
379 }
380 LogicalOp::Or => {
381 if !queries.iter().any(|q| q.matches(doc)) {
382 return false;
383 }
384 }
385 LogicalOp::Nor => {
386 if queries.iter().any(|q| q.matches(doc)) {
387 return false;
388 }
389 }
390 LogicalOp::Not => {
391 if queries.iter().any(|q| q.matches(doc)) {
392 return false;
393 }
394 }
395 }
396 }
397
398 true
399 }
400
401 fn matches_condition(&self, doc: &Document, cond: &QueryCondition) -> bool {
402 let field_parts: Vec<&str> = cond.field.split('.').collect();
403 let value = self.get_nested_value(doc, &field_parts);
404
405 match &cond.operator {
406 QueryOperator::Eq(expected) => match value {
407 Some(v) => v == expected,
408 None => false,
409 },
410 QueryOperator::Ne(expected) => match value {
411 Some(v) => v != expected,
412 None => true,
413 },
414 QueryOperator::Gt(expected) => match (value, expected) {
415 (Some(Value::Number(a)), Value::Number(b)) => {
416 a.as_f64().unwrap_or(0.0) > b.as_f64().unwrap_or(0.0)
417 }
418 _ => false,
419 },
420 QueryOperator::Gte(expected) => match (value, expected) {
421 (Some(Value::Number(a)), Value::Number(b)) => {
422 a.as_f64().unwrap_or(0.0) >= b.as_f64().unwrap_or(0.0)
423 }
424 _ => false,
425 },
426 QueryOperator::Lt(expected) => match (value, expected) {
427 (Some(Value::Number(a)), Value::Number(b)) => {
428 a.as_f64().unwrap_or(0.0) < b.as_f64().unwrap_or(0.0)
429 }
430 _ => false,
431 },
432 QueryOperator::Lte(expected) => match (value, expected) {
433 (Some(Value::Number(a)), Value::Number(b)) => {
434 a.as_f64().unwrap_or(0.0) <= b.as_f64().unwrap_or(0.0)
435 }
436 _ => false,
437 },
438 QueryOperator::In(values) => match value {
439 Some(v) => values.contains(v),
440 None => false,
441 },
442 QueryOperator::Nin(values) => match value {
443 Some(v) => !values.contains(v),
444 None => true,
445 },
446 QueryOperator::Exists(exists) => value.is_some() == *exists,
447 QueryOperator::Regex(pattern) => {
448 match value {
449 Some(Value::String(s)) => {
450 s.contains(pattern)
452 }
453 _ => false,
454 }
455 }
456 QueryOperator::All(values) => match value {
457 Some(Value::Array(arr)) => values.iter().all(|v| arr.contains(v)),
458 _ => false,
459 },
460 QueryOperator::ElemMatch(query) => match value {
461 Some(Value::Array(arr)) => arr.iter().any(|elem| query.matches(elem)),
462 _ => false,
463 },
464 QueryOperator::Size(size) => match value {
465 Some(Value::Array(arr)) => arr.len() == *size,
466 _ => false,
467 },
468 QueryOperator::Near {
469 point,
470 max_distance,
471 } => {
472 match value {
474 Some(Value::Array(arr)) if arr.len() >= 2 => {
475 if let (Some(Value::Number(x)), Some(Value::Number(y))) =
476 (arr.get(0), arr.get(1))
477 {
478 let doc_x = x.as_f64().unwrap_or(0.0);
479 let doc_y = y.as_f64().unwrap_or(0.0);
480
481 let distance =
483 ((doc_x - point.0).powi(2) + (doc_y - point.1).powi(2)).sqrt();
484
485 if let Some(max) = max_distance {
486 distance <= *max
487 } else {
488 true
489 }
490 } else {
491 false
492 }
493 }
494 _ => false,
495 }
496 }
497 QueryOperator::Within { shape } => {
498 match (value, shape) {
500 (
501 Some(Value::Array(arr)),
502 GeoShape::Box {
503 bottom_left,
504 top_right,
505 },
506 ) if arr.len() >= 2 => {
507 if let (Some(Value::Number(x)), Some(Value::Number(y))) =
508 (arr.get(0), arr.get(1))
509 {
510 let doc_x = x.as_f64().unwrap_or(0.0);
511 let doc_y = y.as_f64().unwrap_or(0.0);
512
513 doc_x >= bottom_left.0
514 && doc_x <= top_right.0
515 && doc_y >= bottom_left.1
516 && doc_y <= top_right.1
517 } else {
518 false
519 }
520 }
521 _ => false,
522 }
523 }
524 QueryOperator::Intersects { shape } => {
525 match (value, shape) {
527 (
528 Some(Value::Array(arr)),
529 GeoShape::Box {
530 bottom_left,
531 top_right,
532 },
533 ) if arr.len() >= 2 => {
534 if let (Some(Value::Number(x)), Some(Value::Number(y))) =
535 (arr.get(0), arr.get(1))
536 {
537 let doc_x = x.as_f64().unwrap_or(0.0);
538 let doc_y = y.as_f64().unwrap_or(0.0);
539
540 doc_x >= bottom_left.0
541 && doc_x <= top_right.0
542 && doc_y >= bottom_left.1
543 && doc_y <= top_right.1
544 } else {
545 false
546 }
547 }
548 _ => false,
549 }
550 }
551 QueryOperator::And(_)
552 | QueryOperator::Or(_)
553 | QueryOperator::Nor(_)
554 | QueryOperator::Not(_) => {
555 true
557 }
558 }
559 }
560
561 fn get_nested_value<'a>(&self, doc: &'a Value, path: &[&str]) -> Option<&'a Value> {
562 if path.is_empty() {
563 return Some(doc);
564 }
565
566 let key = path[0];
567 let rest = &path[1..];
568
569 match doc {
570 Value::Object(map) => {
571 if let Some(next_value) = map.get(key) {
572 self.get_nested_value(next_value, rest)
573 } else {
574 None
575 }
576 }
577 _ => None,
578 }
579 }
580}
581
582#[derive(Debug, Clone, Serialize, Deserialize)]
584pub enum UpdateOperator {
585 Set(Map<String, Value>),
586 Unset(Vec<String>),
587 Inc(Map<String, Value>),
588 Mul(Map<String, Value>),
589 Rename(Vec<(String, String)>),
590 SetOnInsert(Map<String, Value>),
591 Min(Map<String, Value>),
592 Max(Map<String, Value>),
593 CurrentDate(Map<String, Value>),
594 Push(Map<String, Value>),
596 PushAll(BTreeMap<String, Vec<Value>>),
597 AddToSet(Map<String, Value>),
598 Pop(BTreeMap<String, i64>),
599 Pull(Map<String, Value>),
600 PullAll(BTreeMap<String, Vec<Value>>),
601 Bit(Map<String, Value>),
602}
603
604#[derive(Debug, Clone, Serialize, Deserialize)]
606pub struct UpdateDocument {
607 operators: Vec<UpdateOperator>,
608}
609
610impl UpdateDocument {
611 pub fn new() -> Self {
612 Self {
613 operators: Vec::new(),
614 }
615 }
616
617 pub fn set(mut self, key: &str, value: Value) -> Self {
618 let mut map = Map::new();
619 map.insert(key.to_string(), value);
620 self.operators.push(UpdateOperator::Set(map));
621 self
622 }
623
624 pub fn unset(mut self, key: &str) -> Self {
625 self.operators
626 .push(UpdateOperator::Unset(vec![key.to_string()]));
627 self
628 }
629
630 pub fn inc(mut self, key: &str, value: Value) -> Self {
631 let mut map = Map::new();
632 map.insert(key.to_string(), value);
633 self.operators.push(UpdateOperator::Inc(map));
634 self
635 }
636
637 pub fn mul(mut self, key: &str, value: Value) -> Self {
638 let mut map = Map::new();
639 map.insert(key.to_string(), value);
640 self.operators.push(UpdateOperator::Mul(map));
641 self
642 }
643
644 pub fn rename(mut self, old_key: &str, new_key: &str) -> Self {
645 self.operators.push(UpdateOperator::Rename(vec![(
646 old_key.to_string(),
647 new_key.to_string(),
648 )]));
649 self
650 }
651
652 pub fn set_on_insert(mut self, key: &str, value: Value) -> Self {
653 let mut map = Map::new();
654 map.insert(key.to_string(), value);
655 self.operators.push(UpdateOperator::SetOnInsert(map));
656 self
657 }
658
659 pub fn min(mut self, key: &str, value: Value) -> Self {
660 let mut map = Map::new();
661 map.insert(key.to_string(), value);
662 self.operators.push(UpdateOperator::Min(map));
663 self
664 }
665
666 pub fn max(mut self, key: &str, value: Value) -> Self {
667 let mut map = Map::new();
668 map.insert(key.to_string(), value);
669 self.operators.push(UpdateOperator::Max(map));
670 self
671 }
672
673 pub fn current_date(mut self, key: &str, type_spec: Value) -> Self {
674 let mut map = Map::new();
675 map.insert(key.to_string(), type_spec);
676 self.operators.push(UpdateOperator::CurrentDate(map));
677 self
678 }
679
680 pub fn push(mut self, key: &str, value: Value) -> Self {
682 let mut map = Map::new();
683 map.insert(key.to_string(), value);
684 self.operators.push(UpdateOperator::Push(map));
685 self
686 }
687
688 pub fn push_all(mut self, key: &str, values: Vec<Value>) -> Self {
689 let mut btree_map = BTreeMap::new();
690 btree_map.insert(key.to_string(), values);
691 self.operators.push(UpdateOperator::PushAll(btree_map));
692 self
693 }
694
695 pub fn add_to_set(mut self, key: &str, value: Value) -> Self {
696 let mut map = Map::new();
697 map.insert(key.to_string(), value);
698 self.operators.push(UpdateOperator::AddToSet(map));
699 self
700 }
701
702 pub fn pop(mut self, key: &str, pos: i64) -> Self {
703 let mut btree_map = BTreeMap::new();
704 btree_map.insert(key.to_string(), pos);
705 self.operators.push(UpdateOperator::Pop(btree_map));
706 self
707 }
708
709 pub fn pull(mut self, key: &str, condition: Value) -> Self {
710 let mut map = Map::new();
711 map.insert(key.to_string(), condition);
712 self.operators.push(UpdateOperator::Pull(map));
713 self
714 }
715
716 pub fn pull_all(mut self, key: &str, values: Vec<Value>) -> Self {
717 let mut btree_map = BTreeMap::new();
718 btree_map.insert(key.to_string(), values);
719 self.operators.push(UpdateOperator::PullAll(btree_map));
720 self
721 }
722
723 pub fn bit(mut self, key: &str, operation: Value) -> Self {
725 let mut map = Map::new();
726 map.insert(key.to_string(), operation);
727 self.operators.push(UpdateOperator::Bit(map));
728 self
729 }
730}
731
732#[derive(Debug, Clone, Serialize, Deserialize)]
734pub enum IndexType {
735 Ascending,
736 Descending,
737 Geospatial,
738 Text,
739 Hashed,
740}
741
742#[derive(Debug, Clone, Serialize, Deserialize)]
743pub struct Index {
744 name: String,
745 key: Vec<(String, IndexType)>,
746 unique: bool,
747 sparse: bool,
748 background: bool,
749}
750
751impl Index {
752 pub fn new(name: String, key: Vec<(String, IndexType)>) -> Self {
753 Self {
754 name,
755 key,
756 unique: false,
757 sparse: false,
758 background: false,
759 }
760 }
761
762 pub fn unique(mut self, unique: bool) -> Self {
763 self.unique = unique;
764 self
765 }
766
767 pub fn sparse(mut self, sparse: bool) -> Self {
768 self.sparse = sparse;
769 self
770 }
771
772 pub fn background(mut self, background: bool) -> Self {
773 self.background = background;
774 self
775 }
776}
777
778#[derive(Debug, Clone, Serialize, Deserialize)]
780struct IndexEntry {
781 doc_id: DocId,
782}
783
784#[derive(Debug, Clone, Serialize, Deserialize)]
785struct OrdValue(Value);
786
787impl PartialEq for OrdValue {
788 fn eq(&self, other: &Self) -> bool {
789 self.0 == other.0
790 }
791}
792
793impl Eq for OrdValue {}
794
795impl PartialOrd for OrdValue {
796 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
797 Some(self.cmp(other))
798 }
799}
800
801impl Ord for OrdValue {
802 fn cmp(&self, other: &Self) -> Ordering {
803 format!("{:?}", self.0).cmp(&format!("{:?}", other.0))
805 }
806}
807
808#[derive(Debug, Serialize, Clone, Deserialize)]
810struct IndexData {
811 entries: BTreeMap<OrdValue, Vec<IndexEntry>>,
812 unique: bool,
813 sparse: bool,
814}
815
816impl IndexData {
817 fn new(unique: bool, sparse: bool) -> Self {
818 Self {
819 entries: BTreeMap::new(),
820 unique,
821 sparse,
822 }
823 }
824
825 fn insert(&mut self, value: Value, doc_id: DocId) -> Result<()> {
826 let ord_value = OrdValue(value.clone());
827
828 if self.unique && self.entries.contains_key(&ord_value) {
829 return Err(DbError::IndexError(format!(
830 "Duplicate key error: {:?}",
831 value
832 )));
833 }
834
835 self.entries
836 .entry(ord_value)
837 .or_insert_with(Vec::new)
838 .push(IndexEntry { doc_id });
839 Ok(())
840 }
841
842 fn remove(&mut self, value: &Value, doc_id: &DocId) -> Result<()> {
843 let ord_value = OrdValue(value.clone());
844
845 if let Some(entries) = self.entries.get_mut(&ord_value) {
846 entries.retain(|entry| entry.doc_id != *doc_id);
847 if entries.is_empty() {
848 self.entries.remove(&ord_value);
849 }
850 }
851 Ok(())
852 }
853
854 fn find(&self, value: &Value) -> Vec<&IndexEntry> {
855 let ord_value = OrdValue(value.clone());
856 self.entries
857 .get(&ord_value)
858 .map_or(Vec::new(), |entries| entries.iter().collect())
859 }
860
861 fn find_range(&self, min: &Value, max: &Value) -> Vec<&IndexEntry> {
862 let mut result = Vec::new();
863 let ord_min = OrdValue(min.clone());
864 let ord_max = OrdValue(max.clone());
865
866 for (key, entries) in self.entries.range((Included(&ord_min), Included(&ord_max))) {
867 result.extend(entries.iter());
868 }
869 result
870 }
871}
872
873#[derive(Debug, Clone, Serialize, Deserialize)]
875pub struct FindOptions {
876 pub projection: Option<Map<String, Value>>,
877 pub sort: Option<Vec<(String, SortOrder)>>,
878 pub skip: Option<usize>,
879 pub limit: Option<usize>,
880 pub max_time_ms: Option<u64>,
881 pub allow_partial_results: bool,
882}
883
884#[derive(Debug, Clone, Serialize, Deserialize)]
885pub enum SortOrder {
886 Ascending,
887 Descending,
888}
889
890impl Default for FindOptions {
891 fn default() -> Self {
892 Self {
893 projection: None,
894 sort: None,
895 skip: None,
896 limit: None,
897 max_time_ms: None,
898 allow_partial_results: false,
899 }
900 }
901}
902
903#[derive(Debug, Clone, Serialize, Deserialize)]
905pub struct Collection {
906 name: String,
907 docs: HashMap<DocId, Document>,
908 indexes: HashMap<String, IndexData>,
909 index_definitions: HashMap<String, Index>,
910}
911
912#[derive(Debug, Serialize, Deserialize)]
914pub struct PersistentCollection {
915 name: String,
916 docs: HashMap<DocId, Document>,
917 indexes: HashMap<String, IndexData>,
918 index_definitions: HashMap<String, Index>,
919}
920
921impl From<Collection> for PersistentCollection {
922 fn from(collection: Collection) -> Self {
923 Self {
924 name: collection.name,
925 docs: collection.docs,
926 indexes: collection.indexes,
927 index_definitions: collection.index_definitions,
928 }
929 }
930}
931
932impl From<PersistentCollection> for Collection {
933 fn from(persistent: PersistentCollection) -> Self {
934 Self {
935 name: persistent.name,
936 docs: persistent.docs,
937 indexes: persistent.indexes,
938 index_definitions: persistent.index_definitions,
939 }
940 }
941}
942
943#[derive(Debug, Serialize, Deserialize)]
944pub struct PersistentDatabase {
945 name: String,
946 collections: HashMap<String, PersistentCollection>,
947}
948
949#[derive(Debug, Serialize, Deserialize)]
950pub struct PersistentStorage {
951 databases: HashMap<String, PersistentDatabase>,
952}
953
954impl PersistentStorage {
955 pub fn new() -> Self {
956 Self {
957 databases: HashMap::new(),
958 }
959 }
960
961 pub fn save_to_file(&self, path: &Path) -> Result<()> {
962 let data = serde_json::to_string_pretty(self)?;
963 let mut file = File::create(path)?;
964 file.write_all(data.as_bytes())?;
965 Ok(())
966 }
967
968 pub fn load_from_file(path: &Path) -> Result<Self> {
969 if !path.exists() {
970 return Ok(Self::new());
971 }
972
973 let mut file = File::open(path)?;
974 let mut contents = String::new();
975 file.read_to_string(&mut contents)?;
976 let storage = serde_json::from_str(&contents)?;
977 Ok(storage)
978 }
979}
980
981impl Collection {
983 pub fn new(name: String) -> Self {
984 Self {
985 name,
986 docs: HashMap::new(),
987 indexes: HashMap::new(),
988 index_definitions: HashMap::new(),
989 }
990 }
991
992 pub fn insert(&mut self, mut doc: Document) -> Result<DocId> {
994 let id = DocId::new();
995
996 if let Some(obj) = doc.as_object_mut() {
998 obj.insert("_id".to_string(), Value::String(id.to_string()));
999 } else {
1000 return Err(DbError::Other("Document must be an object".into()));
1001 }
1002
1003 self.docs.insert(id.clone(), doc.clone());
1004
1005 self.update_indexes_on_insert(&id, &doc)?;
1007
1008 Ok(id)
1009 }
1010
1011 pub fn insert_many(&mut self, docs: Vec<Document>) -> Result<Vec<DocId>> {
1013 let mut ids = Vec::new();
1014
1015 for mut doc in docs {
1016 let id = DocId::new();
1017
1018 if let Some(obj) = doc.as_object_mut() {
1020 obj.insert("_id".to_string(), Value::String(id.to_string()));
1021 } else {
1022 return Err(DbError::Other("Document must be an object".into()));
1023 }
1024
1025 self.docs.insert(id.clone(), doc.clone());
1026 ids.push(id.clone());
1027
1028 self.update_indexes_on_insert(&id, &doc)?;
1030 }
1031
1032 Ok(ids)
1033 }
1034
1035 pub fn find(
1037 &self,
1038 query: Query,
1039 options: Option<FindOptions>,
1040 ) -> Result<Vec<(DocId, Document)>> {
1041 let mut results = Vec::new();
1042
1043 let indexed_results = self.try_indexed_query(&query)?;
1045
1046 if !indexed_results.is_empty() {
1047 for (id, doc) in indexed_results {
1049 if query.matches(&doc) {
1050 results.push((id, doc));
1051 }
1052 }
1053 } else {
1054 for (id, doc) in self.docs.iter() {
1056 if query.matches(doc) {
1057 results.push((id.clone(), doc.clone()));
1058 }
1059 }
1060 }
1061
1062 if let Some(opts) = options {
1064 if let Some(sort_spec) = opts.sort {
1066 self.sort_results(&mut results, &sort_spec);
1067 }
1068
1069 if let Some(skip) = opts.skip {
1071 if skip < results.len() {
1072 results.drain(0..skip);
1073 } else {
1074 results.clear();
1075 }
1076 }
1077
1078 if let Some(limit) = opts.limit {
1080 results.truncate(limit);
1081 }
1082
1083 if let Some(proj) = opts.projection {
1085 self.apply_projection(&mut results, &proj);
1086 }
1087 }
1088
1089 Ok(results)
1090 }
1091
1092 pub fn find_one(
1094 &self,
1095 query: Query,
1096 options: Option<FindOptions>,
1097 ) -> Result<(DocId, Document)> {
1098 let mut opts = options.unwrap_or_default();
1099 opts.limit = Some(1);
1100
1101 let mut results = self.find(query, Some(opts))?;
1102 results.pop().ok_or(DbError::NotFound)
1103 }
1104
1105 pub fn find_one_and_update(
1107 &mut self,
1108 query: Query,
1109 update: UpdateDocument,
1110 options: Option<FindOneAndUpdateOptions>,
1111 ) -> Result<Option<(DocId, Document)>> {
1112 let opts = options.unwrap_or_default();
1113
1114 let result = self.find_one(query.clone(), None);
1116
1117 match result {
1118 Ok((id, mut doc)) => {
1119 let original_doc = doc.clone();
1120
1121 self.apply_update(&mut doc, &update, opts.upsert)?;
1123
1124 self.docs.insert(id.clone(), doc.clone());
1126
1127 self.update_indexes_on_update(&id, &original_doc, &doc)?;
1129
1130 if opts.return_document == ReturnDocument::After {
1131 Ok(Some((id, doc)))
1132 } else {
1133 Ok(Some((id, original_doc)))
1134 }
1135 }
1136 Err(DbError::NotFound) if opts.upsert => {
1137 let mut new_doc = Map::new();
1139
1140 for cond in &query.conditions {
1142 if let QueryOperator::Eq(value) = &cond.operator {
1143 new_doc.insert(cond.field.clone(), value.clone());
1144 }
1145 }
1146
1147 let mut doc = Value::Object(new_doc);
1149 self.apply_update(&mut doc, &update, true)?;
1150
1151 let id = self.insert(doc)?;
1153
1154 if opts.return_document == ReturnDocument::After {
1156 self.find_one(Query::new().eq("_id", Value::String(id.to_string())), None)
1157 .map(Some)
1158 } else {
1159 Ok(None)
1160 }
1161 }
1162 Err(_) => Ok(None),
1163 }
1164 }
1165
1166 pub fn find_one_and_replace(
1168 &mut self,
1169 query: Query,
1170 replacement: Document,
1171 options: Option<FindOneAndReplaceOptions>,
1172 ) -> Result<Option<(DocId, Document)>> {
1173 let opts = options.unwrap_or_default();
1174
1175 let result = self.find_one(query.clone(), None);
1177
1178 match result {
1179 Ok((id, original_doc)) => {
1180 let mut replacement = replacement;
1182 if let Some(obj) = replacement.as_object_mut() {
1183 obj.insert("_id".to_string(), Value::String(id.to_string()));
1184 } else {
1185 return Err(DbError::Other("Replacement must be an object".into()));
1186 }
1187
1188 self.docs.insert(id.clone(), replacement.clone());
1190
1191 self.update_indexes_on_update(&id, &original_doc, &replacement)?;
1193
1194 if opts.return_document == ReturnDocument::After {
1195 Ok(Some((id, replacement)))
1196 } else {
1197 Ok(Some((id, original_doc)))
1198 }
1199 }
1200 Err(DbError::NotFound) if opts.upsert => {
1201 let id = self.insert(replacement)?;
1203
1204 if opts.return_document == ReturnDocument::After {
1206 self.find_one(Query::new().eq("_id", Value::String(id.to_string())), None)
1207 .map(Some)
1208 } else {
1209 Ok(None)
1210 }
1211 }
1212 Err(_) => Ok(None),
1213 }
1214 }
1215
1216 pub fn find_one_and_delete(
1218 &mut self,
1219 query: Query,
1220 options: Option<FindOneAndDeleteOptions>,
1221 ) -> Result<Option<(DocId, Document)>> {
1222 let opts = options.unwrap_or_default();
1223
1224 let result = self.find_one(query.clone(), None);
1226
1227 match result {
1228 Ok((id, doc)) => {
1229 self.docs.remove(&id);
1231
1232 self.update_indexes_on_delete(&id, &doc)?;
1234
1235 Ok(Some((id, doc)))
1236 }
1237 Err(_) => Ok(None),
1238 }
1239 }
1240
1241 pub fn update_one(
1243 &mut self,
1244 query: Query,
1245 update: UpdateDocument,
1246 upsert: bool,
1247 ) -> Result<usize> {
1248 let result = self.find_one(query.clone(), None);
1249
1250 match result {
1251 Ok((id, mut doc)) => {
1252 let original_doc = doc.clone();
1253
1254 self.apply_update(&mut doc, &update, upsert)?;
1256
1257 self.docs.insert(id.clone(), doc.clone());
1259
1260 self.update_indexes_on_update(&id, &original_doc, &doc)?;
1262
1263 Ok(1)
1264 }
1265 Err(DbError::NotFound) if upsert => {
1266 let mut new_doc = Map::new();
1268
1269 for cond in &query.conditions {
1271 if let QueryOperator::Eq(value) = &cond.operator {
1272 new_doc.insert(cond.field.clone(), value.clone());
1273 }
1274 }
1275
1276 let mut doc = Value::Object(new_doc);
1278 self.apply_update(&mut doc, &update, true)?;
1279
1280 self.insert(doc)?;
1282
1283 Ok(1)
1284 }
1285 Err(_) => Ok(0),
1286 }
1287 }
1288
1289 pub fn update_many(&mut self, query: Query, update: UpdateDocument) -> Result<usize> {
1291 let docs = self.find(query, None)?;
1292 let mut count = 0;
1293
1294 for (id, doc) in docs {
1295 let mut doc = doc;
1296 let original_doc = doc.clone();
1297
1298 self.apply_update(&mut doc, &update, false)?;
1300
1301 self.docs.insert(id.clone(), doc.clone());
1303
1304 self.update_indexes_on_update(&id, &original_doc, &doc)?;
1306
1307 count += 1;
1308 }
1309
1310 Ok(count)
1311 }
1312
1313 pub fn replace_one(
1315 &mut self,
1316 query: Query,
1317 replacement: Document,
1318 upsert: bool,
1319 ) -> Result<usize> {
1320 let result = self.find_one(query.clone(), None);
1321
1322 match result {
1323 Ok((id, original_doc)) => {
1324 let mut replacement = replacement;
1326 if let Some(obj) = replacement.as_object_mut() {
1327 obj.insert("_id".to_string(), Value::String(id.to_string()));
1328 } else {
1329 return Err(DbError::Other("Replacement must be an object".into()));
1330 }
1331
1332 self.docs.insert(id.clone(), replacement.clone());
1334
1335 self.update_indexes_on_update(&id, &original_doc, &replacement)?;
1337
1338 Ok(1)
1339 }
1340 Err(DbError::NotFound) if upsert => {
1341 self.insert(replacement)?;
1343 Ok(1)
1344 }
1345 Err(_) => Ok(0),
1346 }
1347 }
1348
1349 pub fn delete_one(&mut self, query: Query) -> Result<usize> {
1351 let result = self.find_one(query, None);
1352
1353 match result {
1354 Ok((id, doc)) => {
1355 self.docs.remove(&id);
1357
1358 self.update_indexes_on_delete(&id, &doc)?;
1360
1361 Ok(1)
1362 }
1363 Err(_) => Ok(0),
1364 }
1365 }
1366
1367 pub fn delete_many(&mut self, query: Query) -> Result<usize> {
1369 let docs = self.find(query, None)?;
1370 let mut count = 0;
1371
1372 for (id, doc) in docs {
1373 self.docs.remove(&id);
1375
1376 self.update_indexes_on_delete(&id, &doc)?;
1378
1379 count += 1;
1380 }
1381
1382 Ok(count)
1383 }
1384
1385 pub fn count_documents(&self, query: Query) -> Result<usize> {
1387 let indexed_results = self.try_indexed_query(&query)?;
1389
1390 if !indexed_results.is_empty() {
1391 let mut count = 0;
1393 for (_, doc) in indexed_results {
1394 if query.matches(&doc) {
1395 count += 1;
1396 }
1397 }
1398 Ok(count)
1399 } else {
1400 let mut count = 0;
1402 for doc in self.docs.values() {
1403 if query.matches(doc) {
1404 count += 1;
1405 }
1406 }
1407 Ok(count)
1408 }
1409 }
1410
1411 pub fn estimated_document_count(&self) -> Result<usize> {
1413 Ok(self.docs.len())
1414 }
1415
1416 pub fn create_index(&mut self, index: Index) -> Result<()> {
1419 let index_name = index.name.clone();
1420
1421 if self.index_definitions.contains_key(&index_name) {
1423 return Err(DbError::IndexError(format!(
1424 "Index {} already exists",
1425 index_name
1426 )));
1427 }
1428
1429 let index_data = IndexData::new(index.unique, index.sparse);
1431
1432 self.indexes.insert(index_name.clone(), index_data);
1434
1435 self.index_definitions
1437 .insert(index_name.clone(), index.clone());
1438
1439 let docs_to_index: Vec<(DocId, Document)> = self
1441 .docs
1442 .iter()
1443 .map(|(id, doc)| (id.clone(), doc.clone()))
1444 .collect();
1445
1446 for (id, doc) in docs_to_index {
1448 self.add_to_indexes(&index_name, &id, &doc)?;
1449 }
1450
1451 Ok(())
1452 }
1453
1454 pub fn drop_index(&mut self, name: &str) -> Result<()> {
1456 self.indexes.remove(name);
1457 self.index_definitions.remove(name);
1458 Ok(())
1459 }
1460
1461 pub fn list_indexes(&self) -> Result<Vec<Index>> {
1463 Ok(self.index_definitions.values().cloned().collect())
1464 }
1465
1466 pub fn drop_indexes(&mut self) -> Result<()> {
1468 self.indexes.clear();
1469 self.index_definitions.clear();
1470 Ok(())
1471 }
1472
1473 pub fn stats(&self) -> Result<CollectionStats> {
1475 Ok(CollectionStats {
1476 count: self.docs.len(),
1477 size: self.calculate_size(),
1478 avg_obj_size: if self.docs.is_empty() {
1479 0
1480 } else {
1481 self.calculate_size() / self.docs.len()
1482 },
1483 index_count: self.index_definitions.len(),
1484 index_size: self.calculate_index_size(),
1485 })
1486 }
1487
1488 pub fn aggregate(&self, pipeline: Vec<AggregationStage>) -> Result<Vec<Document>> {
1490 let mut results = Vec::new();
1491
1492 let mut docs: Vec<Document> = self.docs.values().cloned().collect();
1494
1495 for stage in pipeline {
1497 docs = self.apply_aggregation_stage(docs, stage)?;
1498 }
1499
1500 results.extend(docs);
1501 Ok(results)
1502 }
1503
1504 pub fn distinct(&self, field: &str, query: Option<Query>) -> Result<Vec<Value>> {
1506 let mut values = HashSet::new();
1507
1508 for doc in self.docs.values() {
1509 if let Some(q) = &query {
1510 if !q.matches(doc) {
1511 continue;
1512 }
1513 }
1514
1515 let field_parts: Vec<&str> = field.split('.').collect();
1516 if let Some(value) = self.get_nested_value(doc, &field_parts) {
1517 values.insert(value.clone());
1518 }
1519 }
1520
1521 Ok(values.into_iter().collect())
1522 }
1523
1524 pub fn count(&self) -> usize {
1526 self.docs.len()
1527 }
1528
1529 fn apply_update(
1532 &self,
1533 doc: &mut Document,
1534 update: &UpdateDocument,
1535 is_upsert: bool,
1536 ) -> Result<()> {
1537 for op in &update.operators {
1538 match op {
1539 UpdateOperator::Set(fields) => {
1540 if let Some(obj) = doc.as_object_mut() {
1541 for (key, value) in fields {
1542 obj.insert(key.clone(), value.clone());
1543 }
1544 }
1545 }
1546 UpdateOperator::Unset(fields) => {
1547 if let Some(obj) = doc.as_object_mut() {
1548 for key in fields {
1549 obj.remove(key);
1550 }
1551 }
1552 }
1553 UpdateOperator::Inc(fields) => {
1554 if let Some(obj) = doc.as_object_mut() {
1555 for (key, value) in fields {
1556 if let Some(Value::Number(n)) = obj.get(key) {
1557 if let (Some(current), Some(inc)) = (n.as_f64(), value.as_f64()) {
1558 obj.insert(
1559 key.clone(),
1560 Value::Number(
1561 serde_json::Number::from_f64(current + inc).unwrap(),
1562 ),
1563 );
1564 }
1565 } else {
1566 if let Some(inc) = value.as_f64() {
1568 obj.insert(
1569 key.clone(),
1570 Value::Number(serde_json::Number::from_f64(inc).unwrap()),
1571 );
1572 }
1573 }
1574 }
1575 }
1576 }
1577 UpdateOperator::Mul(fields) => {
1578 if let Some(obj) = doc.as_object_mut() {
1579 for (key, value) in fields {
1580 if let Some(Value::Number(n)) = obj.get(key) {
1581 if let (Some(current), Some(mul)) = (n.as_f64(), value.as_f64()) {
1582 obj.insert(
1583 key.clone(),
1584 Value::Number(
1585 serde_json::Number::from_f64(current * mul).unwrap(),
1586 ),
1587 );
1588 }
1589 }
1590 }
1591 }
1592 }
1593 UpdateOperator::Rename(fields) => {
1594 if let Some(obj) = doc.as_object_mut() {
1595 for (old_key, new_key) in fields {
1596 if let Some(value) = obj.remove(old_key) {
1597 obj.insert(new_key.clone(), value);
1598 }
1599 }
1600 }
1601 }
1602 UpdateOperator::SetOnInsert(fields) => {
1603 if is_upsert {
1604 if let Some(obj) = doc.as_object_mut() {
1605 for (key, value) in fields {
1606 obj.insert(key.clone(), value.clone());
1607 }
1608 }
1609 }
1610 }
1611 UpdateOperator::Min(fields) => {
1612 if let Some(obj) = doc.as_object_mut() {
1613 for (key, value) in fields {
1614 if let Some(current) = obj.get(key) {
1615 if self.compare_values(value, current) == Ordering::Less {
1617 obj.insert(key.clone(), value.clone());
1618 }
1619 } else {
1620 obj.insert(key.clone(), value.clone());
1621 }
1622 }
1623 }
1624 }
1625 UpdateOperator::Max(fields) => {
1626 if let Some(obj) = doc.as_object_mut() {
1627 for (key, value) in fields {
1628 if let Some(current) = obj.get(key) {
1629 if self.compare_values(value, current) == Ordering::Greater {
1631 obj.insert(key.clone(), value.clone());
1632 }
1633 } else {
1634 obj.insert(key.clone(), value.clone());
1635 }
1636 }
1637 }
1638 }
1639 UpdateOperator::CurrentDate(fields) => {
1640 if let Some(obj) = doc.as_object_mut() {
1641 for (key, type_spec) in fields {
1642 let now = chrono::Utc::now();
1643
1644 match type_spec {
1645 Value::String(type_str) if type_str == "date" => {
1646 obj.insert(key.clone(), Value::String(now.to_rfc3339()));
1647 }
1648 Value::String(type_str) if type_str == "timestamp" => {
1649 obj.insert(
1650 key.clone(),
1651 Value::Number(serde_json::Number::from(now.timestamp())),
1652 );
1653 }
1654 Value::Object(spec) => {
1655 if let Some(Value::String(type_str)) = spec.get("$type") {
1656 match type_str.as_str() {
1657 "date" => {
1658 obj.insert(
1659 key.clone(),
1660 Value::String(now.to_rfc3339()),
1661 );
1662 }
1663 "timestamp" => {
1664 obj.insert(
1665 key.clone(),
1666 Value::Number(serde_json::Number::from(
1667 now.timestamp(),
1668 )),
1669 );
1670 }
1671 _ => {
1672 return Err(DbError::UpdateError(format!(
1673 "Invalid date type specification: {}",
1674 type_str
1675 )));
1676 }
1677 }
1678 }
1679 }
1680 _ => {
1681 return Err(DbError::UpdateError(
1682 "Invalid date type specification".into(),
1683 ));
1684 }
1685 }
1686 }
1687 }
1688 }
1689 UpdateOperator::Push(fields) => {
1690 if let Some(obj) = doc.as_object_mut() {
1691 for (key, value) in fields {
1692 if let Some(Value::Array(arr)) = obj.get_mut(key) {
1693 arr.push(value.clone());
1694 } else {
1695 obj.insert(key.clone(), Value::Array(vec![value.clone()]));
1697 }
1698 }
1699 }
1700 }
1701 UpdateOperator::PushAll(fields) => {
1702 if let Some(obj) = doc.as_object_mut() {
1703 for (key, values) in fields {
1704 if let Some(Value::Array(arr)) = obj.get_mut(key) {
1705 arr.extend(values.iter().cloned());
1706 } else {
1707 obj.insert(key.clone(), Value::Array(values.clone()));
1709 }
1710 }
1711 }
1712 }
1713 UpdateOperator::AddToSet(fields) => {
1714 if let Some(obj) = doc.as_object_mut() {
1715 for (key, value) in fields {
1716 if let Some(Value::Array(arr)) = obj.get_mut(key) {
1717 if !arr.contains(value) {
1718 arr.push(value.clone());
1719 }
1720 } else {
1721 obj.insert(key.clone(), Value::Array(vec![value.clone()]));
1723 }
1724 }
1725 }
1726 }
1727 UpdateOperator::Pop(fields) => {
1728 if let Some(obj) = doc.as_object_mut() {
1729 for (key, pos) in fields {
1730 if let Some(Value::Array(arr)) = obj.get_mut(key) {
1731 if *pos == 1 {
1732 arr.pop();
1733 } else if *pos == -1 {
1734 arr.remove(0);
1735 }
1736 }
1737 }
1738 }
1739 }
1740 UpdateOperator::Pull(fields) => {
1741 if let Some(obj) = doc.as_object_mut() {
1742 for (key, condition) in fields {
1743 if let Some(Value::Array(arr)) = obj.get_mut(key) {
1744 arr.retain(|elem| {
1746 match condition {
1747 Value::Object(cond) => {
1748 let query = Query::new();
1750 !query.matches(elem)
1752 }
1753 _ => elem != condition,
1754 }
1755 });
1756 }
1757 }
1758 }
1759 }
1760 UpdateOperator::PullAll(fields) => {
1761 if let Some(obj) = doc.as_object_mut() {
1762 for (key, values) in fields {
1763 if let Some(Value::Array(arr)) = obj.get_mut(key) {
1764 arr.retain(|elem| !values.contains(elem));
1765 }
1766 }
1767 }
1768 }
1769 UpdateOperator::Bit(fields) => {
1770 if let Some(obj) = doc.as_object_mut() {
1771 for (key, operation) in fields {
1772 if let Value::Object(op) = operation {
1773 if let Some(Value::Number(current)) = obj.get(key) {
1774 let mut current_num = current.as_i64().unwrap_or(0);
1775
1776 if let Some(Value::Number(and)) = op.get("and") {
1777 current_num &= and.as_i64().unwrap_or(0);
1778 }
1779 if let Some(Value::Number(or)) = op.get("or") {
1780 current_num |= or.as_i64().unwrap_or(0);
1781 }
1782 if let Some(Value::Number(xor)) = op.get("xor") {
1783 current_num ^= xor.as_i64().unwrap_or(0);
1784 }
1785
1786 obj.insert(
1787 key.clone(),
1788 Value::Number(serde_json::Number::from(current_num)),
1789 );
1790 }
1791 }
1792 }
1793 }
1794 }
1795 }
1796 }
1797
1798 Ok(())
1799 }
1800
1801 fn compare_values(&self, a: &Value, b: &Value) -> Ordering {
1802 match (a, b) {
1803 (Value::Number(a_num), Value::Number(b_num)) => a_num
1804 .as_f64()
1805 .unwrap_or(0.0)
1806 .partial_cmp(&b_num.as_f64().unwrap_or(0.0))
1807 .unwrap_or(Ordering::Equal),
1808 (Value::String(a_str), Value::String(b_str)) => a_str.cmp(b_str),
1809 (Value::Bool(a_bool), Value::Bool(b_bool)) => a_bool.cmp(b_bool),
1810 (Value::Array(a_arr), Value::Array(b_arr)) => a_arr.len().cmp(&b_arr.len()),
1811 (Value::Object(_), Value::Object(_)) => {
1812 Ordering::Equal }
1814 _ => Ordering::Equal,
1815 }
1816 }
1817
1818 fn update_indexes_on_insert(&mut self, id: &DocId, doc: &Document) -> Result<()> {
1819 for (name, index_def) in self.index_definitions.iter() {
1820 let mut field_values = Vec::new();
1822 for (field, _) in &index_def.key {
1823 let field_parts: Vec<&str> = field.split('.').collect();
1824 let value = self.get_nested_value(doc, &field_parts);
1825 field_values.push((field, value));
1826 }
1827
1828 if let Some(index_data) = self.indexes.get_mut(name) {
1829 for (field, value) in field_values {
1830 if let Some(value) = value {
1831 index_data.insert(value.clone(), id.clone())?;
1832 } else if index_data.sparse {
1833 continue;
1835 } else {
1836 return Err(DbError::IndexError(format!(
1837 "Field {} not found in document for non-sparse index",
1838 field
1839 )));
1840 }
1841 }
1842 }
1843 }
1844
1845 Ok(())
1846 }
1847
1848 fn update_indexes_on_update(
1849 &mut self,
1850 id: &DocId,
1851 old_doc: &Document,
1852 new_doc: &Document,
1853 ) -> Result<()> {
1854 self.update_indexes_on_delete(id, old_doc)?;
1856
1857 self.update_indexes_on_insert(id, new_doc)?;
1859
1860 Ok(())
1861 }
1862 fn update_indexes_on_delete(&mut self, id: &DocId, doc: &Document) -> Result<()> {
1864 let index_names: Vec<String> = self.index_definitions.keys().cloned().collect();
1865
1866 for name in index_names {
1867 if let Some(index_def) = self.index_definitions.get(&name) {
1868 let mut field_values = Vec::new();
1870 for (field, _) in &index_def.key {
1871 let field_parts: Vec<&str> = field.split('.').collect();
1872 let value = self.get_nested_value(doc, &field_parts);
1873 field_values.push(value.cloned());
1874 }
1875
1876 if let Some(index_data) = self.indexes.get_mut(&name) {
1878 for (i, (_, _)) in index_def.key.iter().enumerate() {
1879 if let Some(val) = &field_values[i] {
1880 index_data.remove(val, id)?;
1881 }
1882 }
1883 }
1884 }
1885 }
1886
1887 Ok(())
1888 }
1889
1890 fn add_to_indexes(&mut self, index_name: &str, id: &DocId, doc: &Document) -> Result<()> {
1892 if let Some(index_def) = self.index_definitions.get(index_name).cloned() {
1893 let mut field_values = Vec::new();
1895 let field_names: Vec<String> = index_def
1896 .key
1897 .iter()
1898 .map(|(field, _)| field.clone())
1899 .collect();
1900
1901 for field in &field_names {
1902 let field_parts: Vec<&str> = field.split('.').collect();
1903 let value = self.get_nested_value(doc, &field_parts);
1904 field_values.push(value.cloned());
1905 }
1906
1907 if let Some(index_data) = self.indexes.get_mut(index_name) {
1909 for (i, field) in field_names.iter().enumerate() {
1910 if let Some(val) = &field_values[i] {
1911 index_data.insert(val.clone(), id.clone())?;
1912 } else if index_data.sparse {
1913 continue;
1915 } else {
1916 return Err(DbError::IndexError(format!(
1917 "Field {} not found in document for non-sparse index",
1918 field
1919 )));
1920 }
1921 }
1922 }
1923 }
1924
1925 Ok(())
1926 }
1927
1928 fn try_indexed_query(&self, query: &Query) -> Result<Vec<(DocId, Document)>> {
1929 for (index_name, index_def) in self.index_definitions.iter() {
1931 if let Some((field, _)) = index_def.key.first() {
1933 for cond in &query.conditions {
1934 if cond.field == *field {
1935 let mut results = Vec::new();
1937
1938 if let Some(index_data) = self.indexes.get(index_name) {
1939 match &cond.operator {
1940 QueryOperator::Eq(value) => {
1941 let entries = index_data.find(value);
1942 for entry in entries {
1943 if let Some(doc) = self.docs.get(&entry.doc_id) {
1944 results.push((entry.doc_id.clone(), doc.clone()));
1945 }
1946 }
1947 }
1948 QueryOperator::Gt(value) => {
1949 let ord_value = OrdValue(value.clone());
1951 for (key, entries) in
1952 index_data.entries.range((Excluded(&ord_value), Unbounded))
1953 {
1954 for entry in entries {
1955 if let Some(doc) = self.docs.get(&entry.doc_id) {
1956 results.push((entry.doc_id.clone(), doc.clone()));
1957 }
1958 }
1959 }
1960 }
1961 QueryOperator::Gte(value) => {
1962 let ord_value = OrdValue(value.clone());
1964 for (key, entries) in
1965 index_data.entries.range((Included(&ord_value), Unbounded))
1966 {
1967 for entry in entries {
1968 if let Some(doc) = self.docs.get(&entry.doc_id) {
1969 results.push((entry.doc_id.clone(), doc.clone()));
1970 }
1971 }
1972 }
1973 }
1974 QueryOperator::Lt(value) => {
1975 let ord_value = OrdValue(value.clone());
1977 for (key, entries) in
1978 index_data.entries.range((Unbounded, Excluded(&ord_value)))
1979 {
1980 for entry in entries {
1981 if let Some(doc) = self.docs.get(&entry.doc_id) {
1982 results.push((entry.doc_id.clone(), doc.clone()));
1983 }
1984 }
1985 }
1986 }
1987 QueryOperator::Lte(value) => {
1988 let ord_value = OrdValue(value.clone());
1990 for (key, entries) in
1991 index_data.entries.range((Unbounded, Included(&ord_value)))
1992 {
1993 for entry in entries {
1994 if let Some(doc) = self.docs.get(&entry.doc_id) {
1995 results.push((entry.doc_id.clone(), doc.clone()));
1996 }
1997 }
1998 }
1999 }
2000 _ => {
2001 continue;
2003 }
2004 }
2005 }
2006
2007 return Ok(results);
2008 }
2009 }
2010 }
2011 }
2012
2013 Ok(Vec::new())
2015 }
2016
2017 fn sort_results(
2018 &self,
2019 results: &mut Vec<(DocId, Document)>,
2020 sort_spec: &[(String, SortOrder)],
2021 ) {
2022 results.sort_by(|a, b| {
2023 for (field, order) in sort_spec {
2024 let field_parts: Vec<&str> = field.split('.').collect();
2025 let a_val = self.get_nested_value(&a.1, &field_parts);
2026 let b_val = self.get_nested_value(&b.1, &field_parts);
2027
2028 let cmp = match (a_val, b_val) {
2029 (Some(Value::Number(a_num)), Some(Value::Number(b_num))) => a_num
2030 .as_f64()
2031 .unwrap_or(0.0)
2032 .partial_cmp(&b_num.as_f64().unwrap_or(0.0))
2033 .unwrap_or(Ordering::Equal),
2034 (Some(Value::String(a_str)), Some(Value::String(b_str))) => a_str.cmp(b_str),
2035 (Some(Value::Bool(a_bool)), Some(Value::Bool(b_bool))) => a_bool.cmp(b_bool),
2036 (Some(Value::Array(a_arr)), Some(Value::Array(b_arr))) => {
2037 a_arr.len().cmp(&b_arr.len())
2038 }
2039 (Some(_), None) => Ordering::Greater,
2040 (None, Some(_)) => Ordering::Less,
2041 (None, None) => Ordering::Equal,
2042 _ => Ordering::Equal,
2043 };
2044
2045 if cmp != Ordering::Equal {
2046 return match order {
2047 SortOrder::Ascending => cmp,
2048 SortOrder::Descending => cmp.reverse(),
2049 };
2050 }
2051 }
2052
2053 Ordering::Equal
2054 });
2055 }
2056
2057 fn apply_projection(
2058 &self,
2059 results: &mut Vec<(DocId, Document)>,
2060 projection: &Map<String, Value>,
2061 ) {
2062 let is_inclusive = projection.values().any(|v| match v {
2063 Value::Bool(b) => *b,
2064 Value::Number(n) => n.as_i64().unwrap_or(0) == 1,
2065 _ => false,
2066 });
2067
2068 for (_, doc) in results.iter_mut() {
2069 if let Some(obj) = doc.as_object_mut() {
2070 let mut new_obj = Map::new();
2071
2072 if is_inclusive {
2073 for (key, value) in projection {
2075 if obj.contains_key(key) {
2076 new_obj.insert(key.clone(), obj.get(key).unwrap().clone());
2077 }
2078 }
2079
2080 if obj.contains_key("_id") && !projection.contains_key("_id") {
2082 new_obj.insert("_id".to_string(), obj.get("_id").unwrap().clone());
2083 }
2084 } else {
2085 for (key, value) in obj {
2087 if !projection.contains_key(key) || key == "_id" {
2088 new_obj.insert(key.clone(), value.clone());
2089 }
2090 }
2091 }
2092
2093 *doc = Value::Object(new_obj);
2094 }
2095 }
2096 }
2097
2098 fn get_nested_value<'a>(&self, doc: &'a Document, path: &[&str]) -> Option<&'a Value> {
2099 if path.is_empty() {
2100 return Some(doc);
2101 }
2102
2103 let current = path[0];
2104 let rest = &path[1..];
2105
2106 match doc.get(current) {
2107 Some(value) => {
2108 if rest.is_empty() {
2109 Some(value)
2110 } else {
2111 match value {
2112 Value::Object(obj) => self.get_nested_value(value, rest),
2113 _ => None,
2114 }
2115 }
2116 }
2117 None => None,
2118 }
2119 }
2120
2121 fn calculate_size(&self) -> usize {
2122 self.docs
2124 .values()
2125 .map(|doc| serde_json::to_string(doc).unwrap_or_default().len())
2126 .sum()
2127 }
2128
2129 fn calculate_index_size(&self) -> usize {
2130 self.indexes
2132 .values()
2133 .map(|index_data| {
2134 index_data.entries.len() * 16 })
2136 .sum()
2137 }
2138
2139 fn apply_aggregation_stage(
2140 &self,
2141 docs: Vec<Document>,
2142 stage: AggregationStage,
2143 ) -> Result<Vec<Document>> {
2144 match stage {
2145 AggregationStage::Match(query) => {
2146 let mut result = Vec::new();
2147 for doc in docs {
2148 if query.matches(&doc) {
2149 result.push(doc);
2150 }
2151 }
2152 Ok(result)
2153 }
2154 AggregationStage::Project(projection) => {
2155 let mut result = Vec::new();
2156 for doc in docs {
2157 if let Some(obj) = doc.as_object() {
2158 let mut new_obj = Map::new();
2159
2160 for (key, value) in &projection {
2161 if value.as_object().and_then(|o| o.get("$")).is_some() {
2162 if let Some(expr) = value.as_object().and_then(|o| o.get("$")) {
2164 if let Some(field_name) = expr.as_str() {
2166 if let Some(field_value) =
2167 obj.get(field_name.trim_start_matches('$'))
2168 {
2169 new_obj.insert(key.clone(), field_value.clone());
2170 }
2171 }
2172 }
2173 } else if value.as_object().and_then(|o| o.get("$concat")).is_some() {
2174 if let Some(Value::Array(fields)) =
2176 value.as_object().and_then(|o| o.get("$concat"))
2177 {
2178 let mut result_str = String::new();
2179 for field in fields {
2180 if let Some(field_name) = field.as_str() {
2181 if let Some(field_value) =
2182 obj.get(field_name.trim_start_matches('$'))
2183 {
2184 if let Some(s) = field_value.as_str() {
2185 result_str.push_str(s);
2186 }
2187 }
2188 }
2189 }
2190 new_obj.insert(key.clone(), Value::String(result_str));
2191 }
2192 } else {
2193 let include = match value {
2195 Value::Bool(b) => b,
2196 Value::Number(n) => &(n.as_i64().unwrap_or(0) == 1),
2197 _ => &false,
2198 };
2199
2200 if *include && obj.contains_key(key.as_str()) {
2201 new_obj.insert(
2202 key.clone(),
2203 obj.get(key.as_str()).unwrap().clone(),
2204 );
2205 } else if !include && !new_obj.contains_key(key.as_str()) {
2206 new_obj.insert(
2207 key.clone(),
2208 obj.get(key.as_str()).unwrap().clone(),
2209 );
2210 }
2211 }
2212 }
2213
2214 if obj.contains_key("_id") && !projection.contains_key("_id") {
2216 new_obj.insert("_id".to_string(), obj.get("_id").unwrap().clone());
2217 }
2218
2219 result.push(Value::Object(new_obj));
2220 }
2221 }
2222 Ok(result)
2223 }
2224 AggregationStage::Sort(sort_spec) => {
2225 let mut result = docs;
2226
2227 result.sort_by(|a, b| {
2228 for (field, order) in &sort_spec {
2229 let field_parts: Vec<&str> = field.split('.').collect();
2230 let a_val = self.get_nested_value(a, &field_parts);
2231 let b_val = self.get_nested_value(b, &field_parts);
2232
2233 let cmp = match (a_val, b_val) {
2234 (Some(Value::Number(a_num)), Some(Value::Number(b_num))) => a_num
2235 .as_f64()
2236 .unwrap_or(0.0)
2237 .partial_cmp(&b_num.as_f64().unwrap_or(0.0))
2238 .unwrap_or(Ordering::Equal),
2239 (Some(Value::String(a_str)), Some(Value::String(b_str))) => {
2240 a_str.cmp(b_str)
2241 }
2242 (Some(Value::Bool(a_bool)), Some(Value::Bool(b_bool))) => {
2243 a_bool.cmp(b_bool)
2244 }
2245 (Some(Value::Array(a_arr)), Some(Value::Array(b_arr))) => {
2246 a_arr.len().cmp(&b_arr.len())
2247 }
2248 (Some(_), None) => Ordering::Greater,
2249 (None, Some(_)) => Ordering::Less,
2250 (None, None) => Ordering::Equal,
2251 _ => Ordering::Equal,
2252 };
2253
2254 if cmp != Ordering::Equal {
2255 return match order {
2256 SortOrder::Ascending => cmp,
2257 SortOrder::Descending => cmp.reverse(),
2258 };
2259 }
2260 }
2261
2262 Ordering::Equal
2263 });
2264
2265 Ok(result)
2266 }
2267 AggregationStage::Skip(n) => {
2268 let mut result = docs;
2269 if n < result.len() {
2270 result.drain(0..n);
2271 } else {
2272 result.clear();
2273 }
2274 Ok(result)
2275 }
2276 AggregationStage::Limit(n) => {
2277 let mut result = docs;
2278 result.truncate(n);
2279 Ok(result)
2280 }
2281 AggregationStage::Group(group_spec) => {
2282 let mut groups: HashMap<String, Vec<Document>> = HashMap::new();
2283
2284 for doc in docs {
2286 let group_key = self.calculate_group_key(&doc, &group_spec.id)?;
2287 groups.entry(group_key).or_insert_with(Vec::new).push(doc);
2288 }
2289
2290 let mut result = Vec::new();
2292 for (key, group_docs) in groups {
2293 let mut group_obj = Map::new();
2294
2295 group_obj.insert("_id".to_string(), Value::String(key));
2297
2298 for (field, op) in &group_spec.operations {
2300 match op {
2301 GroupOperation::Sum(expr) => {
2302 let sum = self.calculate_sum(&group_docs, expr)?;
2303 group_obj.insert(field.clone(), sum);
2304 }
2305 GroupOperation::Avg(expr) => {
2306 let avg = self.calculate_avg(&group_docs, expr)?;
2307 group_obj.insert(field.clone(), avg);
2308 }
2309 GroupOperation::Min(expr) => {
2310 let min = self.calculate_min(&group_docs, expr)?;
2311 group_obj.insert(field.clone(), min);
2312 }
2313 GroupOperation::Max(expr) => {
2314 let max = self.calculate_max(&group_docs, expr)?;
2315 group_obj.insert(field.clone(), max);
2316 }
2317 GroupOperation::First(expr) => {
2318 if let Some(doc) = group_docs.first() {
2319 if let Some(value) = self.evaluate_expression(doc, expr) {
2320 group_obj.insert(field.clone(), value);
2321 }
2322 }
2323 }
2324 GroupOperation::Last(expr) => {
2325 if let Some(doc) = group_docs.last() {
2326 if let Some(value) = self.evaluate_expression(doc, expr) {
2327 group_obj.insert(field.clone(), value);
2328 }
2329 }
2330 }
2331 GroupOperation::Push(expr) => {
2332 let mut values = Vec::new();
2333 for doc in &group_docs {
2334 if let Some(value) = self.evaluate_expression(doc, expr) {
2335 values.push(value);
2336 }
2337 }
2338 group_obj.insert(field.clone(), Value::Array(values));
2339 }
2340 GroupOperation::AddToSet(expr) => {
2341 let mut values = HashSet::new();
2342 for doc in &group_docs {
2343 if let Some(value) = self.evaluate_expression(doc, expr) {
2344 values.insert(value);
2345 }
2346 }
2347 group_obj.insert(
2348 field.clone(),
2349 Value::Array(values.into_iter().collect()),
2350 );
2351 }
2352 GroupOperation::StdDevPop(expr) => {
2353 let std_dev = self.calculate_std_dev_pop(&group_docs, expr)?;
2354 group_obj.insert(field.clone(), std_dev);
2355 }
2356 GroupOperation::StdDevSamp(expr) => {
2357 let std_dev = self.calculate_std_dev_samp(&group_docs, expr)?;
2358 group_obj.insert(field.clone(), std_dev);
2359 }
2360 }
2361 }
2362
2363 result.push(Value::Object(group_obj));
2364 }
2365
2366 Ok(result)
2367 }
2368 AggregationStage::Unwind(field) => {
2369 let mut result = Vec::new();
2370
2371 for doc in docs {
2372 if let Some(obj) = doc.as_object() {
2373 if let Some(value) = obj.get(&field) {
2374 if let Value::Array(arr) = value {
2375 for item in arr {
2376 let mut new_obj = obj.clone();
2377 new_obj.insert(field.clone(), item.clone());
2378 result.push(Value::Object(new_obj));
2379 }
2380 } else {
2381 result.push(doc.clone());
2383 }
2384 } else {
2385 result.push(doc.clone());
2387 }
2388 }
2389 }
2390
2391 Ok(result)
2392 }
2393 AggregationStage::Lookup(lookup_spec) => {
2394 let mut result = Vec::new();
2396
2397 for doc in docs {
2398 if let Some(obj) = doc.as_object() {
2399 let mut new_obj = obj.clone();
2400
2401 let local_field_value = obj.get(&lookup_spec.local_field);
2403
2404 if let Some(local_value) = local_field_value {
2405 let query =
2407 Query::new().eq(&lookup_spec.foreign_field, local_value.clone());
2408
2409 new_obj.insert(lookup_spec.as_field.clone(), Value::Array(Vec::new()));
2411 } else {
2412 new_obj.insert(lookup_spec.as_field.clone(), Value::Array(Vec::new()));
2414 }
2415
2416 result.push(Value::Object(new_obj));
2417 }
2418 }
2419
2420 Ok(result)
2421 }
2422 AggregationStage::Out(collection_name) => {
2423 Ok(docs)
2426 }
2427 }
2428 }
2429
2430 fn calculate_group_key(&self, doc: &Document, group_id: &GroupId) -> Result<String> {
2431 match group_id {
2432 GroupId::Field(field) => {
2433 let field_parts: Vec<&str> = field.split('.').collect();
2434 if let Some(value) = self.get_nested_value(doc, &field_parts) {
2435 Ok(value.to_string())
2436 } else {
2437 Ok("null".to_string())
2438 }
2439 }
2440 GroupId::Expression(expr) => {
2441 if let Some(value) = self.evaluate_expression(doc, expr) {
2442 Ok(value.to_string())
2443 } else {
2444 Ok("null".to_string())
2445 }
2446 }
2447 GroupId::Null => Ok("null".to_string()),
2448 }
2449 }
2450
2451 fn evaluate_expression(&self, doc: &Document, expr: &Value) -> Option<Value> {
2452 match expr {
2453 Value::String(s) if s.starts_with('$') => {
2454 let field = &s[1..];
2455 let field_parts: Vec<&str> = field.split('.').collect();
2456 self.get_nested_value(doc, &field_parts).cloned()
2457 }
2458 Value::Object(obj) => {
2459 if let Some(sum_expr) = obj.get("$sum") {
2461 if let Some(field) = sum_expr.as_str() {
2462 if field.starts_with('$') {
2463 let field_name = &field[1..];
2464 let field_parts: Vec<&str> = field_name.split('.').collect();
2465 return self.get_nested_value(doc, &field_parts).cloned();
2466 }
2467 }
2468 }
2469 None
2470 }
2471 _ => Some(expr.clone()),
2472 }
2473 }
2474
2475 fn calculate_sum(&self, docs: &[Document], expr: &Value) -> Result<Value> {
2476 let mut sum = 0.0;
2477
2478 for doc in docs {
2479 if let Some(value) = self.evaluate_expression(doc, expr) {
2480 if let Some(num) = value.as_f64() {
2481 sum += num;
2482 }
2483 }
2484 }
2485
2486 Ok(Value::Number(serde_json::Number::from_f64(sum).unwrap()))
2487 }
2488
2489 fn calculate_avg(&self, docs: &[Document], expr: &Value) -> Result<Value> {
2490 if docs.is_empty() {
2491 return Ok(Value::Number(serde_json::Number::from(0)));
2492 }
2493
2494 let sum = self.calculate_sum(docs, expr)?;
2495 if let Some(sum_num) = sum.as_f64() {
2496 let avg = sum_num / docs.len() as f64;
2497 Ok(Value::Number(serde_json::Number::from_f64(avg).unwrap()))
2498 } else {
2499 Ok(Value::Number(serde_json::Number::from(0)))
2500 }
2501 }
2502
2503 fn calculate_min(&self, docs: &[Document], expr: &Value) -> Result<Value> {
2504 let mut min_value: Option<Value> = None;
2505
2506 for doc in docs {
2507 if let Some(value) = self.evaluate_expression(doc, expr) {
2508 if let Some(ref mut min) = min_value {
2509 if self.compare_values(&value, min) == Ordering::Less {
2510 *min = value;
2511 }
2512 } else {
2513 min_value = Some(value);
2514 }
2515 }
2516 }
2517
2518 Ok(min_value.unwrap_or(Value::Null))
2519 }
2520
2521 fn calculate_max(&self, docs: &[Document], expr: &Value) -> Result<Value> {
2522 let mut max_value: Option<Value> = None;
2523
2524 for doc in docs {
2525 if let Some(value) = self.evaluate_expression(doc, expr) {
2526 if let Some(ref mut max) = max_value {
2527 if self.compare_values(&value, max) == Ordering::Greater {
2528 *max = value;
2529 }
2530 } else {
2531 max_value = Some(value);
2532 }
2533 }
2534 }
2535
2536 Ok(max_value.unwrap_or(Value::Null))
2537 }
2538
2539 fn calculate_std_dev_pop(&self, docs: &[Document], expr: &Value) -> Result<Value> {
2540 if docs.is_empty() {
2541 return Ok(Value::Number(serde_json::Number::from(0)));
2542 }
2543
2544 let avg = self.calculate_avg(docs, expr)?;
2545 let avg_num = avg.as_f64().unwrap_or(0.0);
2546
2547 let mut sum_sq_diff = 0.0;
2548 for doc in docs {
2549 if let Some(value) = self.evaluate_expression(doc, expr) {
2550 if let Some(num) = value.as_f64() {
2551 let diff = num - avg_num;
2552 sum_sq_diff += diff * diff;
2553 }
2554 }
2555 }
2556
2557 let variance = sum_sq_diff / docs.len() as f64;
2558 let std_dev = variance.sqrt();
2559
2560 Ok(Value::Number(
2561 serde_json::Number::from_f64(std_dev).unwrap(),
2562 ))
2563 }
2564
2565 fn calculate_std_dev_samp(&self, docs: &[Document], expr: &Value) -> Result<Value> {
2566 if docs.len() <= 1 {
2567 return Ok(Value::Number(serde_json::Number::from(0)));
2568 }
2569
2570 let avg = self.calculate_avg(docs, expr)?;
2571 let avg_num = avg.as_f64().unwrap_or(0.0);
2572
2573 let mut sum_sq_diff = 0.0;
2574 for doc in docs {
2575 if let Some(value) = self.evaluate_expression(doc, expr) {
2576 if let Some(num) = value.as_f64() {
2577 let diff = num - avg_num;
2578 sum_sq_diff += diff * diff;
2579 }
2580 }
2581 }
2582
2583 let variance = sum_sq_diff / (docs.len() - 1) as f64;
2584 let std_dev = variance.sqrt();
2585
2586 Ok(Value::Number(
2587 serde_json::Number::from_f64(std_dev).unwrap(),
2588 ))
2589 }
2590}
2591
2592#[derive(Debug, Clone, Serialize, Deserialize)]
2594pub enum AggregationStage {
2595 Match(Query),
2596 Project(Map<String, Value>),
2597 Sort(Vec<(String, SortOrder)>),
2598 Skip(usize),
2599 Limit(usize),
2600 Group(GroupSpecification),
2601 Unwind(String),
2602 Lookup(LookupSpecification),
2603 Out(String),
2604}
2605
2606#[derive(Debug, Clone, Serialize, Deserialize)]
2607pub struct GroupSpecification {
2608 pub id: GroupId,
2609 pub operations: HashMap<String, GroupOperation>,
2610}
2611
2612#[derive(Debug, Clone, Serialize, Deserialize)]
2613pub enum GroupId {
2614 Field(String),
2615 Expression(Value),
2616 Null,
2617}
2618
2619#[derive(Debug, Clone, Serialize, Deserialize)]
2620pub enum GroupOperation {
2621 Sum(Value),
2622 Avg(Value),
2623 Min(Value),
2624 Max(Value),
2625 First(Value),
2626 Last(Value),
2627 Push(Value),
2628 AddToSet(Value),
2629 StdDevPop(Value),
2630 StdDevSamp(Value),
2631}
2632
2633#[derive(Debug, Clone, Serialize, Deserialize)]
2634pub struct LookupSpecification {
2635 pub from: String,
2636 pub local_field: String,
2637 pub foreign_field: String,
2638 pub as_field: String, }
2640
2641#[derive(Debug, Clone, Serialize, Deserialize)]
2643pub struct FindOneAndUpdateOptions {
2644 pub upsert: bool,
2645 pub return_document: ReturnDocument,
2646 pub projection: Option<Map<String, Value>>,
2647 pub sort: Option<Vec<(String, SortOrder)>>,
2648 pub max_time_ms: Option<u64>,
2649}
2650
2651impl Default for FindOneAndUpdateOptions {
2652 fn default() -> Self {
2653 Self {
2654 upsert: false,
2655 return_document: ReturnDocument::Before,
2656 projection: None,
2657 sort: None,
2658 max_time_ms: None,
2659 }
2660 }
2661}
2662
2663#[derive(Debug, Clone, Serialize, Deserialize)]
2664pub struct FindOneAndReplaceOptions {
2665 pub upsert: bool,
2666 pub return_document: ReturnDocument,
2667 pub projection: Option<Map<String, Value>>,
2668 pub sort: Option<Vec<(String, SortOrder)>>,
2669 pub max_time_ms: Option<u64>,
2670}
2671
2672impl Default for FindOneAndReplaceOptions {
2673 fn default() -> Self {
2674 Self {
2675 upsert: false,
2676 return_document: ReturnDocument::Before,
2677 projection: None,
2678 sort: None,
2679 max_time_ms: None,
2680 }
2681 }
2682}
2683
2684#[derive(Debug, Clone, Serialize, Deserialize)]
2685pub struct FindOneAndDeleteOptions {
2686 pub projection: Option<Map<String, Value>>,
2687 pub sort: Option<Vec<(String, SortOrder)>>,
2688 pub max_time_ms: Option<u64>,
2689}
2690
2691impl Default for FindOneAndDeleteOptions {
2692 fn default() -> Self {
2693 Self {
2694 projection: None,
2695 sort: None,
2696 max_time_ms: None,
2697 }
2698 }
2699}
2700
2701#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
2702pub enum ReturnDocument {
2703 Before,
2704 After,
2705}
2706
2707#[derive(Debug, Clone, Serialize, Deserialize)]
2709pub struct CollectionStats {
2710 pub count: usize,
2711 pub size: usize,
2712 pub avg_obj_size: usize,
2713 pub index_count: usize,
2714 pub index_size: usize,
2715}
2716
2717#[derive(Debug, Clone, Serialize, Deserialize)]
2719pub struct Database {
2720 name: String,
2721 collections: HashMap<String, Collection>,
2722}
2723
2724impl Database {
2725 pub fn new(name: String) -> Self {
2726 Self {
2727 name,
2728 collections: HashMap::new(),
2729 }
2730 }
2731
2732 pub fn collection(&mut self, name: &str) -> &mut Collection {
2733 self.collections
2734 .entry(name.to_string())
2735 .or_insert_with(|| Collection::new(name.to_string()))
2736 }
2737
2738 pub fn list_collection_names(&self) -> Vec<String> {
2740 self.collections.keys().cloned().collect()
2741 }
2742
2743 pub fn create_collection(
2745 &mut self,
2746 name: &str,
2747 options: Option<CreateCollectionOptions>,
2748 ) -> Result<()> {
2749 if self.collections.contains_key(name) {
2750 return Err(DbError::Other(format!(
2751 "Collection {} already exists",
2752 name
2753 )));
2754 }
2755
2756 let mut collection = Collection::new(name.to_string());
2757
2758 if let Some(opts) = options {
2760 for index in opts.indexes {
2762 collection.create_index(index)?;
2763 }
2764 }
2765
2766 self.collections.insert(name.to_string(), collection);
2767 Ok(())
2768 }
2769
2770 pub fn drop_collection(&mut self, name: &str) -> Result<()> {
2772 if self.collections.remove(name).is_none() {
2773 return Err(DbError::Other(format!(
2774 "Collection {} does not exist",
2775 name
2776 )));
2777 }
2778
2779 Ok(())
2780 }
2781
2782 pub fn stats(&self) -> Result<DatabaseStats> {
2784 let mut collections = Vec::new();
2785 let mut total_size = 0;
2786 let mut total_index_size = 0;
2787
2788 for (name, collection) in self.collections.iter() {
2789 let coll_stats = collection.stats()?;
2790 collections.push(CollectionStatsInfo {
2791 name: name.clone(),
2792 count: coll_stats.count,
2793 size: coll_stats.size,
2794 index_count: coll_stats.index_count,
2795 index_size: coll_stats.index_size,
2796 });
2797
2798 total_size += coll_stats.size;
2799 total_index_size += coll_stats.index_size;
2800 }
2801
2802 Ok(DatabaseStats {
2803 collections,
2804 total_size,
2805 total_index_size,
2806 })
2807 }
2808
2809 pub fn run_command(&mut self, command: &Document) -> Result<Document> {
2811 if let Some(obj) = command.as_object() {
2812 if let Some(cmd_name) = obj.keys().next() {
2813 match cmd_name.as_str() {
2814 "create" => {
2815 if let Some(Value::String(coll_name)) = obj.get(cmd_name) {
2816 self.create_collection(coll_name, None)?;
2817 Ok(serde_json::json!({ "ok": 1 }))
2818 } else {
2819 Err(DbError::Other("Invalid create command".into()))
2820 }
2821 }
2822 "drop" => {
2823 if let Some(Value::String(coll_name)) = obj.get(cmd_name) {
2824 self.drop_collection(coll_name)?;
2825 Ok(serde_json::json!({ "ok": 1 }))
2826 } else {
2827 Err(DbError::Other("Invalid drop command".into()))
2828 }
2829 }
2830 "listCollections" => {
2831 let coll_names = self.list_collection_names();
2832 let collections: Vec<_> = coll_names
2833 .into_iter()
2834 .map(|name| {
2835 serde_json::json!({
2836 "name": name,
2837 "type": "collection"
2838 })
2839 })
2840 .collect();
2841
2842 Ok(serde_json::json!({
2843 "cursor": {
2844 "id": 0,
2845 "ns": format!("{}.collections", self.name),
2846 "firstBatch": collections
2847 },
2848 "ok": 1
2849 }))
2850 }
2851 "dbStats" => {
2852 let stats = self.stats()?;
2853 Ok(serde_json::json!({
2854 "db": self.name,
2855 "collections": stats.collections.len(),
2856 "objects": stats.collections.iter().map(|c| c.count).sum::<usize>(),
2857 "avgObjSize": if stats.collections.iter().map(|c| c.count).sum::<usize>() > 0 {
2858 stats.total_size / stats.collections.iter().map(|c| c.count).sum::<usize>()
2859 } else {
2860 0
2861 },
2862 "dataSize": stats.total_size,
2863 "indexSize": stats.total_index_size,
2864 "ok": 1
2865 }))
2866 }
2867 _ => Err(DbError::Other(format!("Unknown command: {}", cmd_name))),
2868 }
2869 } else {
2870 Err(DbError::Other("Empty command".into()))
2871 }
2872 } else {
2873 Err(DbError::Other("Command must be an object".into()))
2874 }
2875 }
2876}
2877
2878#[derive(Debug, Clone, Serialize, Deserialize)]
2880pub struct DatabaseStats {
2881 pub collections: Vec<CollectionStatsInfo>,
2882 pub total_size: usize,
2883 pub total_index_size: usize,
2884}
2885
2886#[derive(Debug, Clone, Serialize, Deserialize)]
2887pub struct CollectionStatsInfo {
2888 pub name: String,
2889 pub count: usize,
2890 pub size: usize,
2891 pub index_count: usize,
2892 pub index_size: usize,
2893}
2894
2895#[derive(Debug, Clone, Serialize, Deserialize)]
2897pub struct CreateCollectionOptions {
2898 pub capped: bool,
2899 pub size: Option<usize>,
2900 pub max: Option<usize>,
2901 pub storage_engine: Option<Map<String, Value>>,
2902 pub validator: Option<Document>,
2903 pub validation_level: Option<String>,
2904 pub validation_action: Option<String>,
2905 pub index_option_defaults: Option<Map<String, Value>>,
2906 pub view_on: Option<String>,
2907 pub pipeline: Option<Vec<Document>>,
2908 pub collation: Option<Map<String, Value>>,
2909 pub write_concern: Option<Map<String, Value>>,
2910 pub indexes: Vec<Index>,
2911}
2912
2913impl Default for CreateCollectionOptions {
2914 fn default() -> Self {
2915 Self {
2916 capped: false,
2917 size: None,
2918 max: None,
2919 storage_engine: None,
2920 validator: None,
2921 validation_level: None,
2922 validation_action: None,
2923 index_option_defaults: None,
2924 view_on: None,
2925 pipeline: None,
2926 collation: None,
2927 write_concern: None,
2928 indexes: Vec::new(),
2929 }
2930 }
2931}
2932
2933#[derive(Debug, Clone)]
2935pub struct Client {
2936 databases: HashMap<String, Database>,
2937 uri: String,
2938 storage_path: Option<PathBuf>,
2939}
2940
2941impl Client {
2942 pub fn new() -> Self {
2943 Self {
2944 databases: HashMap::new(),
2945 uri: "mongodb://localhost:27017".to_string(),
2946 storage_path: None,
2947 }
2948 }
2949
2950 pub fn with_uri(uri: &str) -> Self {
2951 Self {
2952 databases: HashMap::new(),
2953 uri: uri.to_string(),
2954 storage_path: None,
2955 }
2956 }
2957
2958 pub fn with_storage_path<P: AsRef<Path>>(uri: &str, path: P) -> Self {
2959 Self {
2960 databases: HashMap::new(),
2961 uri: uri.to_string(),
2962 storage_path: Some(path.as_ref().to_path_buf()),
2963 }
2964 }
2965
2966 pub fn db(&mut self, name: &str) -> &mut Database {
2967 self.databases
2968 .entry(name.to_string())
2969 .or_insert_with(|| Database::new(name.to_string()))
2970 }
2971
2972 pub fn list_database_names(&self) -> Vec<String> {
2974 self.databases.keys().cloned().collect()
2975 }
2976
2977 pub fn drop_database(&mut self, name: &str) -> Result<()> {
2979 if self.databases.remove(name).is_none() {
2980 return Err(DbError::Other(format!("Database {} does not exist", name)));
2981 }
2982
2983 Ok(())
2984 }
2985
2986 pub fn uri(&self) -> &str {
2988 &self.uri
2989 }
2990
2991 pub fn save(&self) -> Result<()> {
2993 if let Some(ref path) = self.storage_path {
2994 if let Some(parent) = path.parent() {
2996 fs::create_dir_all(parent)?;
2997 }
2998
2999 let mut persistent_storage = PersistentStorage::new();
3000
3001 for (name, db) in &self.databases {
3002 let mut persistent_db = PersistentDatabase {
3003 name: name.clone(),
3004 collections: HashMap::new(),
3005 };
3006
3007 for (coll_name, coll) in &db.collections {
3008 let persistent_coll = PersistentCollection::from(coll.clone());
3009 persistent_db
3010 .collections
3011 .insert(coll_name.clone(), persistent_coll);
3012 }
3013
3014 persistent_storage
3015 .databases
3016 .insert(name.clone(), persistent_db);
3017 }
3018
3019 persistent_storage.save_to_file(path)?;
3020 Ok(())
3021 } else {
3022 Err(DbError::Other("No storage path configured".into()))
3023 }
3024 }
3025
3026 pub fn load(&mut self) -> Result<()> {
3028 if let Some(ref path) = self.storage_path {
3029 if path.exists() {
3030 let persistent_storage = PersistentStorage::load_from_file(path)?;
3031
3032 for (name, persistent_db) in persistent_storage.databases {
3033 let mut db = Database::new(name.clone());
3034
3035 for (coll_name, persistent_coll) in persistent_db.collections {
3036 let coll = Collection::from(persistent_coll);
3037 db.collections.insert(coll_name, coll);
3038 }
3039
3040 self.databases.insert(name, db);
3041 }
3042 }
3043 Ok(())
3044 } else {
3045 Err(DbError::Other("No storage path configured".into()))
3046 }
3047 }
3048}
3049
3050#[derive(Debug, Clone, Serialize, Deserialize)]
3052pub enum BulkWriteOperation {
3053 InsertOne {
3054 document: Document,
3055 },
3056 UpdateOne {
3057 filter: Query,
3058 update: UpdateDocument,
3059 upsert: bool,
3060 },
3061 UpdateMany {
3062 filter: Query,
3063 update: UpdateDocument,
3064 upsert: bool,
3065 },
3066 ReplaceOne {
3067 filter: Query,
3068 replacement: Document,
3069 upsert: bool,
3070 },
3071 DeleteOne {
3072 filter: Query,
3073 },
3074 DeleteMany {
3075 filter: Query,
3076 },
3077}
3078
3079#[derive(Debug, Clone, Serialize, Deserialize)]
3080pub struct BulkWriteOptions {
3081 pub ordered: bool,
3082 pub bypass_document_validation: bool,
3083 pub write_concern: Option<Map<String, Value>>,
3084}
3085
3086impl Default for BulkWriteOptions {
3087 fn default() -> Self {
3088 Self {
3089 ordered: true,
3090 bypass_document_validation: false,
3091 write_concern: None,
3092 }
3093 }
3094}
3095
3096#[derive(Debug, Clone, Serialize, Deserialize)]
3097pub struct BulkWriteResult {
3098 pub inserted_count: usize,
3099 pub matched_count: usize,
3100 pub modified_count: usize,
3101 pub deleted_count: usize,
3102 pub upserted_count: usize,
3103 pub upserted_ids: HashMap<usize, DocId>,
3104}
3105
3106impl Collection {
3107 pub fn bulk_write(
3109 &mut self,
3110 operations: Vec<BulkWriteOperation>,
3111 options: Option<BulkWriteOptions>,
3112 ) -> Result<BulkWriteResult> {
3113 let opts = options.unwrap_or_default();
3114 let mut result = BulkWriteResult {
3115 inserted_count: 0,
3116 matched_count: 0,
3117 modified_count: 0,
3118 deleted_count: 0,
3119 upserted_count: 0,
3120 upserted_ids: HashMap::new(),
3121 };
3122
3123 let mut index = 0;
3124
3125 for op in operations {
3126 match op {
3127 BulkWriteOperation::InsertOne { document } => {
3128 let id = self.insert(document)?;
3129 result.inserted_count += 1;
3130 result.upserted_ids.insert(index, id);
3131 }
3132 BulkWriteOperation::UpdateOne {
3133 filter,
3134 update,
3135 upsert,
3136 } => {
3137 let count = self.update_one(filter, update, upsert)?;
3138 if count > 0 {
3139 result.matched_count += 1;
3140 result.modified_count += 1;
3141 }
3142 if upsert && count > 0 {
3143 result.upserted_count += 1;
3144 }
3145 }
3146 BulkWriteOperation::UpdateMany {
3147 filter,
3148 update,
3149 upsert,
3150 } => {
3151 let count = self.update_many(filter, update)?;
3152 result.matched_count += count;
3153 result.modified_count += count;
3154 if upsert && count > 0 {
3155 result.upserted_count += count;
3156 }
3157 }
3158 BulkWriteOperation::ReplaceOne {
3159 filter,
3160 replacement,
3161 upsert,
3162 } => {
3163 let count = self.replace_one(filter, replacement, upsert)?;
3164 if count > 0 {
3165 result.matched_count += 1;
3166 result.modified_count += 1;
3167 }
3168 if upsert && count > 0 {
3169 result.upserted_count += 1;
3170 }
3171 }
3172 BulkWriteOperation::DeleteOne { filter } => {
3173 let count = self.delete_one(filter)?;
3174 result.deleted_count += count;
3175 }
3176 BulkWriteOperation::DeleteMany { filter } => {
3177 let count = self.delete_many(filter)?;
3178 result.deleted_count += count;
3179 }
3180 }
3181
3182 index += 1;
3183
3184 }
3187
3188 Ok(result)
3189 }
3190}
3191
3192#[derive(Debug, Clone)]
3194pub struct RemoteClient {
3195 addr: SocketAddr,
3196}
3197
3198impl RemoteClient {
3199 pub fn new(addr: SocketAddr) -> Self {
3200 Self { addr }
3201 }
3202
3203 pub fn connect(&self) -> Result<RemoteConnection> {
3204 let stream = TcpStream::connect(self.addr)?;
3205 Ok(RemoteConnection { stream })
3206 }
3207}
3208
3209#[derive(Debug)]
3210pub struct RemoteConnection {
3211 stream: TcpStream,
3212}
3213
3214impl RemoteConnection {
3215 pub fn send_command(&mut self, command: &str) -> Result<String> {
3216 let mut writer = BufWriter::new(&self.stream);
3217 writer.write_all(command.as_bytes())?;
3218 writer.write_all(b"\n")?;
3219 writer.flush()?;
3220
3221 let mut reader = BufReader::new(&self.stream);
3222 let mut response = String::new();
3223 reader.read_line(&mut response)?;
3224
3225 Ok(response)
3226 }
3227
3228 pub fn close(self) -> Result<()> {
3229 Ok(())
3231 }
3232}
3233
3234#[derive(Debug)]
3236pub struct Server {
3237 addr: SocketAddr,
3238 client: Client,
3239 auth_username: Option<String>,
3240 auth_password: Option<String>,
3241}
3242impl Server {
3243 pub fn new(addr: SocketAddr, storage_path: Option<PathBuf>) -> Self {
3244 let client = if let Some(path) = storage_path {
3245 Client::with_storage_path("mongodb://localhost", path)
3246 } else {
3247 Client::new()
3248 };
3249 Self {
3250 addr,
3251 client,
3252 auth_username: None,
3253 auth_password: None,
3254 }
3255 }
3256
3257 pub fn with_auth(mut self, username: String, password: String) -> Self {
3258 self.auth_username = Some(username);
3259 self.auth_password = Some(password);
3260 self
3261 }
3262
3263 pub fn start(&mut self) -> Result<()> {
3264 if self.client.storage_path.is_some() {
3266 self.client.load()?;
3267 }
3268 let listener = TcpListener::bind(self.addr)?;
3269 println!("Server listening on {}", self.addr);
3270
3271 for stream in listener.incoming() {
3272 match stream {
3273 Ok(stream) => {
3274 let client = self.client.clone();
3275 let auth_username = self.auth_username.clone();
3276 let auth_password = self.auth_password.clone();
3277 thread::spawn(move || {
3278 if let Err(e) = Self::handle_client(stream, client, auth_username, auth_password) {
3279 eprintln!("Error handling client: {}", e);
3280 }
3281 });
3282 }
3283 Err(e) => {
3284 eprintln!("Failed to accept connection: {}", e);
3285 }
3286 }
3287 }
3288 Ok(())
3289 }
3290
3291 fn handle_client(
3292 mut stream: TcpStream,
3293 mut client: Client,
3294 auth_username: Option<String>,
3295 auth_password: Option<String>
3296 ) -> Result<()> {
3297 let mut reader = BufReader::new(&stream);
3298 let mut writer = BufWriter::new(&stream);
3299 let mut authenticated = auth_username.is_none(); loop {
3302 let mut command = String::new();
3303 match reader.read_line(&mut command) {
3304 Ok(0) => break, Ok(_) => {
3306 let command = command.trim();
3308 if command.is_empty() {
3309 continue;
3310 }
3311
3312 if !authenticated {
3314 if command.starts_with("AUTH") {
3315 let parts: Vec<&str> = command.split_whitespace().collect();
3316 if parts.len() != 3 {
3317 let response = "ERROR: Usage: AUTH <username> <password>".to_string();
3318 writer.write_all(response.as_bytes())?;
3319 writer.write_all(b"\n")?;
3320 writer.flush()?;
3321 continue;
3322 }
3323
3324 let username = parts[1];
3325 let password = parts[2];
3326
3327 if let (Some(auth_user), Some(auth_pass)) = (&auth_username, &auth_password) {
3328 if username == auth_user && password == auth_pass {
3329 authenticated = true;
3330 let response = "OK: Authenticated".to_string();
3331 writer.write_all(response.as_bytes())?;
3332 writer.write_all(b"\n")?;
3333 writer.flush()?;
3334 continue;
3335 }
3336 }
3337
3338 let response = "ERROR: Authentication failed".to_string();
3339 writer.write_all(response.as_bytes())?;
3340 writer.write_all(b"\n")?;
3341 writer.flush()?;
3342 continue;
3343 } else {
3344 let response = "ERROR: Not authenticated. Use AUTH <username> <password>".to_string();
3345 writer.write_all(response.as_bytes())?;
3346 writer.write_all(b"\n")?;
3347 writer.flush()?;
3348 continue;
3349 }
3350 }
3351
3352 let response = match Self::process_command(command, &mut client) {
3354 Ok(response) => response,
3355 Err(e) => format!("ERROR: {}", e),
3356 };
3357
3358 writer.write_all(response.as_bytes())?;
3360 writer.write_all(b"\n")?;
3361 writer.flush()?;
3362
3363 if command == "EXIT" {
3365 break;
3366 }
3367 }
3368 Err(e) => {
3369 eprintln!("Error reading from client: {}", e);
3370 break;
3371 }
3372 }
3373 }
3374 Ok(())
3375 }
3376
3377 fn process_command(command: &str, client: &mut Client) -> Result<String> {
3378 let parts: Vec<&str> = command.split_whitespace().collect();
3379 if parts.is_empty() {
3380 return Ok("ERROR: Empty command".to_string());
3381 }
3382 match parts[0] {
3383 "INSERT" => {
3384 if parts.len() < 3 {
3385 return Ok(
3386 "ERROR: Usage: INSERT <database> <collection> <document>".to_string()
3387 );
3388 }
3389 let db_name = parts[1];
3390 let coll_name = parts[2];
3391 let doc_json = parts[3..].join(" ");
3392 let doc: Document = serde_json::from_str(&doc_json)?;
3393 let db = client.db(db_name);
3394 let coll = db.collection(coll_name);
3395 let id = coll.insert(doc)?;
3396 Ok(format!("OK: Document inserted with ID: {}", id))
3397 }
3398 "FIND" => {
3399 if parts.len() < 3 {
3400 return Ok("ERROR: Usage: FIND <database> <collection> [query]".to_string());
3401 }
3402 let db_name = parts[1];
3403 let coll_name = parts[2];
3404 let query = if parts.len() > 3 {
3405 let query_json = parts[3..].join(" ");
3406 let query_value: Value = serde_json::from_str(&query_json)?;
3407 Query::from_value(query_value)?
3408 } else {
3409 Query::new()
3410 };
3411 let db = client.db(db_name);
3412 let coll = db.collection(coll_name);
3413 let results = coll.find(query, None)?;
3414 let results_json = serde_json::to_string(&results)?;
3415 Ok(format!("OK: {}", results_json))
3416 }
3417 "UPDATE" => {
3418 if parts.len() < 4 {
3419 return Ok(
3420 "ERROR: Usage: UPDATE <database> <collection> <query> <update>".to_string(),
3421 );
3422 }
3423 let db_name = parts[1];
3424 let coll_name = parts[2];
3425 let mut query_end = 3;
3427 let mut brace_count = 0;
3428 for (i, c) in command.char_indices() {
3429 if i < parts[0].len() + parts[1].len() + parts[2].len() + 2 {
3430 continue;
3431 }
3432 if c == '{' {
3433 brace_count += 1;
3434 } else if c == '}' {
3435 brace_count -= 1;
3436 if brace_count == 0 {
3437 query_end = i + 1;
3438 break;
3439 }
3440 }
3441 }
3442 let query_json =
3443 command[parts[0].len() + parts[1].len() + parts[2].len() + 3..query_end].trim();
3444 let update_json = command[query_end..].trim();
3445 let query: Query = serde_json::from_str(query_json)?;
3446 let update: UpdateDocument = serde_json::from_str(update_json)?;
3447 let db = client.db(db_name);
3448 let coll = db.collection(coll_name);
3449 let count = coll.update_one(query, update, false)?;
3450 Ok(format!("OK: Updated {} documents", count))
3451 }
3452 "DELETE" => {
3453 if parts.len() < 3 {
3454 return Ok("ERROR: Usage: DELETE <database> <collection> [query]".to_string());
3455 }
3456 let db_name = parts[1];
3457 let coll_name = parts[2];
3458 let query = if parts.len() > 3 {
3459 let query_json = parts[3..].join(" ");
3460 let query_value: Value = serde_json::from_str(&query_json)?;
3461 Query::from_value(query_value)?
3462 } else {
3463 Query::new()
3464 };
3465 let db = client.db(db_name);
3466 let coll = db.collection(coll_name);
3467 let count = coll.delete_one(query)?;
3468 Ok(format!("OK: Deleted {} documents", count))
3469 }
3470 "SAVE" => {
3471 client.save()?;
3472 Ok("OK: Data saved to disk".to_string())
3473 }
3474 "LOAD" => {
3475 client.load()?;
3476 Ok("OK: Data loaded from disk".to_string())
3477 }
3478 "LIST_DB" => {
3479 let dbs = client.list_database_names();
3480 Ok(format!("OK: {}", serde_json::to_string(&dbs)?))
3481 }
3482 "LIST_COLL" => {
3483 if parts.len() < 2 {
3484 return Ok("ERROR: Usage: LIST_COLL <database>".to_string());
3485 }
3486 let db_name = parts[1];
3487 let db = client.db(db_name);
3488 let colls = db.list_collection_names();
3489 Ok(format!("OK: {}", serde_json::to_string(&colls)?))
3490 }
3491 "STATS" => {
3492 if parts.len() < 2 {
3493 return Ok("ERROR: Usage: STATS <database>".to_string());
3494 }
3495 let db_name = parts[1];
3496 let db = client.db(db_name);
3497 let stats = db.stats()?;
3498 Ok(format!("OK: {}", serde_json::to_string(&stats)?))
3499 }
3500 "EXIT" => {
3501 if client.storage_path.is_some() {
3503 if let Err(e) = client.save() {
3504 return Ok(format!("ERROR: Failed to save data: {}", e));
3505 }
3506 }
3507 Ok("OK: Connection closing".to_string())
3508 }
3509 _ => Ok("ERROR: Unknown command".to_string()),
3510 }
3511 }
3512}