1use std::fmt::{Debug};
2use std::ops::Neg;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use async_trait::async_trait;
6use bson::{Bson, doc, Document};
7use futures_util::StreamExt;
8use key_path::{KeyPath, path};
9use mongodb::{Database, Collection, IndexModel, ClientSession};
10use mongodb::error::{ErrorKind, WriteFailure, Error as MongoDBError};
11use mongodb::options::{FindOneAndUpdateOptions, IndexOptions, ReturnDocument};
12use regex::Regex;
13use crate::aggregation::Aggregation;
14use crate::bson_ext::coder::BsonCoder;
15use teo_runtime::action::action::*;
16use teo_runtime::model::object::Object;
17use teo_runtime::model::index::Type;
18use teo_runtime::model::{Index, Model};
19use teo_runtime::value::Value;
20use teo_result::{Error, Result};
21use teo_runtime::connection::transaction::{Ctx, Transaction};
22use teo_runtime::model::field::column_named::ColumnNamed;
23use teo_runtime::model::field::is_optional::IsOptional;
24use teo_runtime::traits::named::Named;
25use teo_runtime::model::field::typed::Typed;
26use teo_runtime::sort::Sort;
27use teo_runtime::model::object::input::Input;
28use teo_runtime::namespace::Namespace;
29use teo_runtime::error_ext;
30use teo_runtime::request::Request;
31use teo_runtime::utils::ContainsStr;
32use teo_runtime::teon;
33use crate::bson_ext::teon_value_to_bson;
34use crate::connector::OwnedSession;
35use crate::migration::index_model::FromIndexModel;
36
37#[derive(Debug, Clone)]
38pub struct MongoDBTransaction {
39 pub(super) database: Database,
40 pub(super) owned_session: Option<OwnedSession>,
41 pub committed: Arc<AtomicBool>,
42}
43
44impl MongoDBTransaction {
45
46 pub(crate) fn session(&self) -> Option<&mut ClientSession> {
47 if self.committed.load(Ordering::SeqCst) {
48 None
49 } else {
50 match &self.owned_session {
51 None => None,
52 Some(s) => Some(s.client_session()),
53 }
54 }
55 }
56
57 pub(crate) fn get_collection(&self, model: &Model) -> Collection<Document> {
58 self.database.collection(model.table_name())
59 }
60
61 fn document_to_object(&self, transaction_ctx: Ctx, document: &Document, object: &Object, select: Option<&Value>, include: Option<&Value>) -> Result<()> {
62 for key in document.keys() {
63 let object_field = object.model().fields().values().find(|f| f.column_name() == key);
64 if object_field.is_some() {
65 let object_field = object_field.unwrap();
67 let object_key = object_field.name();
68 let r#type = object_field.r#type();
69 let bson_value = document.get(key).unwrap();
70 let value_result = BsonCoder::decode(transaction_ctx.namespace(), object.model(), r#type, object_field.is_optional(), bson_value, path![]);
71 match value_result {
72 Ok(value) => {
73 object.set_value(object_key, value).unwrap();
74 }
75 Err(err) => {
76 return Err(err);
77 }
78 }
79 } else {
80 let relation = object.model().relation(key);
82 if relation.is_none() {
83 continue;
84 }
85 let inner_finder = if let Some(include) = include {
86 include.get(key)
87 } else {
88 None
89 };
90 let inner_select = if let Some(inner_finder) = inner_finder {
91 inner_finder.get("select")
92 } else {
93 None
94 };
95 let inner_include = if let Some(inner_finder) = inner_finder {
96 inner_finder.get("include")
97 } else {
98 None
99 };
100 let relation = relation.unwrap();
101 let relation_model = transaction_ctx.namespace().model_at_path(&relation.model_path()).unwrap();
102 let object_bsons = document.get(key).unwrap().as_array().unwrap();
103 let mut related: Vec<Object> = vec![];
104 for related_object_bson in object_bsons {
105 let action = NESTED | FIND | (if relation.is_vec() { MANY } else { SINGLE });
106 let related_object = transaction_ctx.new_object(relation_model, action, object.request())?;
107 self.clone().document_to_object(transaction_ctx.clone(), related_object_bson.as_document().unwrap(), &related_object, inner_select, inner_include)?;
108 related.push(related_object);
109 }
110 object.inner.relation_query_map.lock().unwrap().insert(key.to_string(), related);
111 }
112 }
113 object.inner.is_initialized.store(true, Ordering::SeqCst);
114 object.inner.is_new.store(false, Ordering::SeqCst);
115 object.set_select(select).unwrap();
116 Ok(())
117 }
118
119 fn _handle_write_error(&self, error_kind: &ErrorKind, object: &Object, path: KeyPath) -> Error {
120 return match error_kind {
121 ErrorKind::Write(write) => {
122 match write {
123 WriteFailure::WriteError(write_error) => {
124 match write_error.code {
125 11000 => {
126 let full_regex = Regex::new(r"dup key: (.+)").unwrap();
127 let regex = Regex::new(r"dup key: \{ (.+?):").unwrap();
128 let full_message = full_regex.captures(write_error.message.as_str()).unwrap().get(1).unwrap().as_str();
129 let field_column_name = regex.captures(write_error.message.as_str()).unwrap().get(1).unwrap().as_str();
130 if let Some(field_column) = object.model().field_with_column_name(field_column_name) {
131 error_ext::unique_value_duplicated(path + field_column.name(), full_message)
132 } else {
133 error_ext::unique_value_duplicated(path, full_message)
134 }
135 }
136 _ => {
137 error_ext::unknown_database_write_error(path, write_error.message.as_str())
138 }
139 }
140 }
141 WriteFailure::WriteConcernError(write_concern) => {
142 error_ext::unknown_database_write_error(path, write_concern.message.as_str())
143 }
144 _ => {
145 error_ext::unknown_database_write_error(path, "unknown write failure")
146 }
147 }
148 }
149 ErrorKind::Transaction { message, .. } => {
150 error_ext::unknown_database_write_error(path, message.as_str())
151 }
152 ErrorKind::SessionsNotSupported => {
153 error_ext::unknown_database_write_error(path, "session is not supported")
154 }
155 _ => {
156 error_ext::unknown_database_write_error(path, format!("unknown write: {:?}", error_kind))
157 }
158 }
159 }
160
161 async fn aggregate_to_documents(&self, aggregate_input: Vec<Document>, col: Collection<Document>, path: KeyPath) -> Result<Vec<std::result::Result<Document, MongoDBError>>> {
162 match self.session() {
163 Some(session) => {
164 let cur = col.aggregate_with_session(aggregate_input, None, session).await;
165 if cur.is_err() {
166 return Err(error_ext::unknown_database_find_error(path, format!("{:?}", cur)));
167 }
168 let mut cur = cur.unwrap();
169 let mut results: Vec<std::result::Result<Document, MongoDBError>> = vec![];
170 loop {
171 if let Some(item) = cur.next(session).await {
172 results.push(item);
173 } else {
174 break;
175 }
176 }
177 Ok(results)
178 },
179 None => {
180 let cur = col.aggregate(aggregate_input, None).await;
181 if cur.is_err() {
182 return Err(error_ext::unknown_database_find_error(path, format!("{:?}", cur)));
183 }
184 let cur = cur.unwrap();
185 let results: Vec<std::result::Result<Document, MongoDBError>> = cur.collect().await;
186 Ok(results)
187 },
188 }
189 }
190
191 async fn aggregate_or_group_by(&self, namespace: &Namespace, model: &Model, finder: &Value, path: KeyPath) -> Result<Vec<Value>> {
192 let aggregate_input = Aggregation::build_for_aggregate(namespace, model, finder)?;
193 let col = self.get_collection(model);
194 let results = self.aggregate_to_documents(aggregate_input, col, path).await?;
195 let mut final_retval: Vec<Value> = vec![];
196 for result in results.iter() {
197 let data = result.as_ref().unwrap();
199 let mut retval = teon!({});
200 for (g, o) in data {
201 if g.as_str() == "_id" {
202 continue;
203 }
204 if g.starts_with("_") {
206 retval.as_dictionary_mut().unwrap().insert(g.clone(), teon!({}));
207 for (dbk, v) in o.as_document().unwrap() {
208 let k = dbk;
209 if let Some(f) = v.as_f64() {
210 retval.as_dictionary_mut().unwrap().get_mut(g.as_str()).unwrap().as_dictionary_mut().unwrap().insert(k.to_string(), teon!(f));
211 } else if let Some(i) = v.as_i64() {
212 retval.as_dictionary_mut().unwrap().get_mut(g.as_str()).unwrap().as_dictionary_mut().unwrap().insert(k.to_string(), teon!(i));
213 } else if let Some(i) = v.as_i32() {
214 retval.as_dictionary_mut().unwrap().get_mut(g.as_str()).unwrap().as_dictionary_mut().unwrap().insert(k.to_string(), teon!(i));
215 } else if v.as_null().is_some() {
216 retval.as_dictionary_mut().unwrap().get_mut(g.as_str()).unwrap().as_dictionary_mut().unwrap().insert(k.to_string(), teon!(null));
217 }
218 }
219 } else {
220 let field = model.field(g).unwrap();
222 let val = if o.as_null().is_some() { Value::Null } else {
223 BsonCoder::decode(namespace, model, field.r#type(), true, o, path![])?
224 };
225 let json_val = val;
226 retval.as_dictionary_mut().unwrap().insert(g.to_string(), json_val);
227 }
228 }
229 final_retval.push(retval);
230 }
231 Ok(final_retval)
232 }
233
234 async fn create_object(&self, object: &Object, path: KeyPath) -> Result<()> {
235 let namespace = object.namespace();
236 let model = object.model();
237 let keys = object.keys_for_save();
238 let col = self.get_collection(model);
239 let auto_keys = &model.cache().auto_keys;
240 let mut doc = doc!{};
242 for key in keys {
243 if let Some(field) = model.field(key) {
244 let column_name = field.column_name();
245 let val: Bson = BsonCoder::encode(field.r#type(), object.get_value(&key).unwrap())?;
246 if val != Bson::Null {
247 doc.insert(column_name, val);
248 }
249 } else if let Some(property) = model.property(key) {
250 let val: Bson = BsonCoder::encode(property.r#type(), object.get_property_value(&key).await?)?;
251 if val != Bson::Null {
252 doc.insert(key, val);
253 }
254 }
255 }
256 let result = match self.session() {
257 Some(session) => {
258 col.insert_one_with_session(doc, None, session).await
259 }
260 None => {
261 col.insert_one(doc, None).await
262 }
263 };
264 match result {
265 Ok(insert_one_result) => {
266 let id = insert_one_result.inserted_id;
267 for key in auto_keys {
268 let field = model.field(key).unwrap();
269 if field.column_name() == "_id" {
270 let new_value = BsonCoder::decode(namespace, model, field.r#type(), field.is_optional(), &id, path![]).unwrap();
271 object.set_value(field.name(), new_value)?;
272 }
273 }
274 }
275 Err(error) => {
276 return Err(self._handle_write_error(&error.kind, object, path));
277 }
278 }
279 Ok(())
280 }
281
282 async fn update_object(&self, object: &Object, path: KeyPath) -> Result<()> {
283 let namespace = object.namespace();
284 let model = object.model();
285 let keys = object.keys_for_save();
286 let col = self.get_collection(model);
287 let identifier: Bson = teon_value_to_bson(&object.db_identifier());
288 let identifier = identifier.as_document().unwrap();
289 let mut set = doc!{};
290 let mut unset = doc!{};
291 let mut inc = doc!{};
292 let mut mul = doc!{};
293 let mut push = doc!{};
294 for key in keys {
295 if let Some(field) = model.field(key) {
296 let column_name = field.column_name();
297 if let Some(updator) = object.get_atomic_updator(key) {
298 let (key, val) = Input::key_value(updator.as_dictionary().unwrap());
299 match key {
300 "increment" => inc.insert(column_name, teon_value_to_bson(val)),
301 "decrement" => inc.insert(column_name, teon_value_to_bson(&val.neg().unwrap())),
302 "multiply" => mul.insert(column_name, teon_value_to_bson(val)),
303 "divide" => mul.insert(column_name, Bson::Double(val.recip().unwrap().to_float().unwrap().abs())),
304 "push" => push.insert(column_name, teon_value_to_bson(val)),
305 _ => panic!("Unhandled key."),
306 };
307 } else {
308 let bson_val: Bson = BsonCoder::encode(field.r#type(), object.get_value(&key).unwrap())?;
309 if bson_val == Bson::Null {
310 unset.insert(key, bson_val);
311 } else {
312 set.insert(key, bson_val);
313 }
314 }
315 } else if let Some(property) = model.property(key) {
316 let bson_val: Bson = BsonCoder::encode(property.r#type(), object.get_property_value(&key).await?)?;
317 if bson_val != Bson::Null {
318 set.insert(key, bson_val);
319 } else {
320 unset.insert(key, bson_val);
321 }
322 }
323 }
324 let mut update_doc = doc!{};
325 let mut return_new = false;
326 if !set.is_empty() {
327 update_doc.insert("$set", set);
328 }
329 if !unset.is_empty() {
330 update_doc.insert("$unset", unset);
331 }
332 if !inc.is_empty() {
333 update_doc.insert("$inc", inc);
334 return_new = true;
335 }
336 if !mul.is_empty() {
337 update_doc.insert("$mul", mul);
338 return_new = true;
339 }
340 if !push.is_empty() {
341 update_doc.insert("$push", push);
342 return_new = true;
343 }
344 if update_doc.is_empty() {
345 return Ok(());
346 }
347 if !return_new {
348 let result = match self.session() {
349 None => col.update_one(identifier.clone(), update_doc, None).await,
350 Some(session) => col.update_one_with_session(identifier.clone(), update_doc, None, session).await,
351 };
352 return match result {
353 Ok(_) => Ok(()),
354 Err(error) => {
355 Err(self._handle_write_error(&error.kind, object, path))
356 }
357 }
358 } else {
359 let options = FindOneAndUpdateOptions::builder().return_document(ReturnDocument::After).build();
360 let result = match self.session() {
361 None => col.find_one_and_update(identifier.clone(), update_doc, options).await,
362 Some(session) => col.find_one_and_update_with_session(identifier.clone(), update_doc, options, session).await,
363 };
364 match result {
365 Ok(updated_document) => {
366 for (key, value) in object.inner.atomic_updater_map.lock().unwrap().iter() {
367 let bson_new_val = updated_document.as_ref().unwrap().get(key).unwrap();
368 let field = object.model().field(key).unwrap();
369 let field_value = BsonCoder::decode(namespace, model, field.r#type(), field.is_optional(), bson_new_val, path![])?;
370 object.set_value(key, field_value).unwrap();
371 }
372 }
373 Err(error) => {
374 return Err(self._handle_write_error(&error.kind, object, path));
375 }
376 }
377 }
378 Ok(())
379 }
380
381}
382
383#[async_trait]
384impl Transaction for MongoDBTransaction {
385
386 async fn migrate(&self, models: Vec<&Model>, dry_run: bool, reset_database: bool, silent: bool) -> Result<()> {
387 if reset_database {
388 let _ = self.database.drop(None).await;
389 }
390 for model in models {
391 let _name = model.name();
392 let collection = self.get_collection(model);
393 let mut reviewed_names: Vec<String> = Vec::new();
394 let cursor_result = collection.list_indexes(None).await;
395 if cursor_result.is_ok() {
396 let mut cursor = cursor_result.unwrap();
397 while let Some(Ok(index)) = cursor.next().await {
398 if index.keys == doc!{"_id": 1} {
399 continue
400 }
401 let name = (&index).options.as_ref().unwrap().name.as_ref().unwrap();
402 let result = model.indexes().values().find(|i| name == i.name());
403 if result.is_none() {
404 let _ = collection.drop_index(name, None).await.unwrap();
407 } else {
408 let result = result.unwrap();
409 let our_format_index: Index = Index::from_index_model(&index);
410 if result != &our_format_index {
411 let _ = collection.drop_index(name, None).await.unwrap();
414 let index_options = IndexOptions::builder()
416 .name(result.name().to_string())
417 .unique(result.r#type() == Type::Unique || result.r#type() == Type::Primary)
418 .sparse(true)
419 .build();
420 let mut keys = doc!{};
421 for item in result.items() {
422 let field = model.field(&item.field).unwrap();
423 let column_name = field.column_name();
424 keys.insert(column_name, if item.sort == Sort::Asc { 1 } else { -1 });
425 }
426 let index_model = IndexModel::builder().keys(keys).options(index_options).build();
427 let _result = collection.create_index(index_model, None).await;
428 }
429 }
430 reviewed_names.push(name.clone());
431 }
432 }
433 for (_, index) in model.indexes() {
434 if !reviewed_names.contains_str(index.name()) {
435 if index.keys().len() == 1 {
437 let field = model.field(index.keys().get(0).unwrap()).unwrap();
438 if field.column_name() == "_id" {
439 continue
440 }
441 }
442 let index_options = IndexOptions::builder()
444 .name(index.name().to_string())
445 .unique(index.r#type() == Type::Unique || index.r#type() == Type::Primary)
446 .sparse(true)
447 .build();
448 let mut keys = doc!{};
449 for item in index.items() {
450 let field = model.field(&item.field).unwrap();
451 let column_name = field.column_name();
452 keys.insert(column_name, if item.sort == Sort::Asc { 1 } else { -1 });
453 }
454 let index_model = IndexModel::builder().keys(keys).options(index_options).build();
455 let result = collection.create_index(index_model, None).await;
456 if result.is_err() {
457 println!("index create error: {:?}", result.err().unwrap());
458 }
459 }
460 }
461 }
462 Ok(())
463 }
464
465 async fn purge(&self, models: Vec<&Model>) -> Result<()> {
466 for model in models {
467 let col = self.get_collection(model);
468 col.drop(None).await.unwrap();
469 }
470 Ok(())
471 }
472
473 async fn query_raw(&self, value: &Value) -> Result<Value> {
474 unreachable!()
475 }
476
477 async fn save_object(&self, object: &Object, path: KeyPath) -> Result<()> {
478 if object.is_new() {
479 self.create_object(object, path).await
480 } else {
481 self.update_object(object, path).await
482 }
483 }
484
485 async fn delete_object(&self, object: &Object, path: KeyPath) -> Result<()> {
486 if object.is_new() {
487 return Err(error_ext::object_is_not_saved_thus_cant_be_deleted(path));
488 }
489 let model = object.model();
490 let col = self.get_collection(model);
491 let bson_identifier: Bson = teon_value_to_bson(&object.db_identifier());
492 let document_identifier = bson_identifier.as_document().unwrap();
493 let result = match self.session() {
494 None => col.delete_one(document_identifier.clone(), None).await,
495 Some(session) => col.delete_one_with_session(document_identifier.clone(), None, session).await,
496 };
497 return match result {
498 Ok(_result) => Ok(()),
499 Err(err) => {
500 Err(error_ext::unknown_database_delete_error(path, format!("{}", err)))
501 }
502 }
503 }
504
505 async fn find_unique(&self, model: &Model, finder: &Value, ignore_select_and_include: bool, action: Action, transaction_ctx: Ctx, request: Option<Request>, path: KeyPath) -> Result<Option<Object>> {
506 let select = finder.get("select");
507 let include = finder.get("include");
508 let aggregate_input = Aggregation::build(transaction_ctx.namespace(), model, finder)?;
509 let col = self.get_collection(model);
510 let results = self.aggregate_to_documents(aggregate_input, col, path).await?;
511 if results.is_empty() {
512 Ok(None)
513 } else {
514 for doc in results {
515 let obj = transaction_ctx.new_object(model, action, request)?;
516 self.clone().document_to_object(transaction_ctx, &doc.unwrap(), &obj, select, include)?;
517 return Ok(Some(obj));
518 }
519 Ok(None)
520 }
521 }
522
523 async fn find_many(&self, model: &Model, finder: &Value, ignore_select_and_include: bool, action: Action, transaction_ctx: Ctx, request: Option<Request>, path: KeyPath) -> Result<Vec<Object>> {
524 let select = finder.get("select");
525 let include = finder.get("include");
526 let aggregate_input = Aggregation::build(transaction_ctx.namespace(), model, finder)?;
527 let reverse = Input::has_negative_take(finder);
528 let col = self.get_collection(model);
529 let mut result = vec![];
531 let results: Vec<std::result::Result<Document, MongoDBError>> = self.aggregate_to_documents(aggregate_input, col, path.clone()).await?;
532 for doc in results {
533 let obj = transaction_ctx.new_object(model, action, request.clone())?;
534 match self.clone().document_to_object(transaction_ctx.clone(), &doc.unwrap(), &obj, select, include) {
535 Ok(_) => {
536 if reverse {
537 result.insert(0, obj);
538 } else {
539 result.push(obj);
540 }
541 }
542 Err(err) => {
543 return Err(error_ext::unknown_database_find_error(path, format!("{}", err)));
544 }
545 }
546 }
547 Ok(result)
548 }
549
550 async fn count(&self, model: &Model, finder: &Value, transaction_ctx: Ctx, path: KeyPath) -> Result<Value> {
551 if finder.get("select").is_some() {
552 self.count_fields(model, finder, transaction_ctx, path).await
553 } else {
554 let counts = self.count_objects(model, finder, transaction_ctx, path).await?;
555 Ok(Value::Int64(counts as i64))
556 }
557 }
558
559 async fn count_objects(&self, model: &Model, finder: &Value, transaction_ctx: Ctx, path: KeyPath) -> Result<usize> {
560 let input = Aggregation::build_for_count(transaction_ctx.namespace(), model, finder)?;
561 let col = self.get_collection(model);
562 let results = self.aggregate_to_documents(input, col, path).await?;
563 if results.is_empty() {
564 Ok(0)
565 } else {
566 let v = results.get(0).unwrap().as_ref().unwrap();
567 let bson_count = v.get("count").unwrap();
568 match bson_count {
569 Bson::Int32(i) => Ok(*i as usize),
570 Bson::Int64(i) => Ok(*i as usize),
571 _ => panic!("Unhandled count number type.")
572 }
573 }
574 }
575
576 async fn count_fields(&self, model: &Model, finder: &Value, transaction_ctx: Ctx, path: KeyPath) -> Result<Value> {
577 let new_finder = Value::Dictionary(finder.as_dictionary().unwrap().iter().map(|(k, v)| {
578 if k.as_str() == "select" {
579 ("_count".to_owned(), v.clone())
580 } else {
581 (k.to_owned(), v.clone())
582 }
583 }).collect());
584 let aggregate_value = self.aggregate(model, &new_finder, transaction_ctx, path).await?;
585 Ok(aggregate_value.get("_count").unwrap().clone())
586 }
587
588 async fn aggregate(&self, model: &Model, finder: &Value, transaction_ctx: Ctx, path: KeyPath) -> Result<Value> {
589 let results = self.aggregate_or_group_by(transaction_ctx.namespace(), model, finder, path).await?;
590 if results.is_empty() {
591 let mut retval = teon!({});
593 for (g, o) in finder.as_dictionary().unwrap() {
594 retval.as_dictionary_mut().unwrap().insert(g.clone(), teon!({}));
595 for (k, _v) in o.as_dictionary().unwrap() {
596 let value = if g == "_count" { teon!(0) } else { teon!(null) };
597 retval.as_dictionary_mut().unwrap().get_mut(g.as_str()).unwrap().as_dictionary_mut().unwrap().insert(k.to_string(), value);
598 }
599 }
600 Ok(retval)
601 } else {
602 Ok(results.get(0).unwrap().clone())
603 }
604 }
605
606 async fn group_by(&self, model: &Model, finder: &Value, transaction_ctx: Ctx, path: KeyPath) -> Result<Vec<Value>> {
607 Ok(self.aggregate_or_group_by(transaction_ctx.namespace(), model, finder, path).await?)
608 }
609
610 async fn sql(&self, model: &Model, sql: &str, transaction_ctx: Ctx) -> Result<Vec<Value>> {
611 Err(Error::new("do not run raw sql on MongoDB database"))
612 }
613
614 fn is_committed(&self) -> bool {
615 self.committed.load(Ordering::SeqCst)
616 }
617
618 fn is_transaction(&self) -> bool {
619 self.owned_session.is_some()
620 }
621
622 async fn commit(&self) -> Result<()> {
623 if let Some(session) = &self.owned_session {
624 session.commit_transaction().await
625 } else {
626 Ok(())
627 }
628 }
629
630 async fn abort(&self) -> Result<()> {
631 if let Some(session) = &self.owned_session {
632 session.abort_transaction().await
633 } else {
634 Ok(())
635 }
636 }
637
638 async fn spawn(&self) -> Result<Arc<dyn Transaction>> {
639 Ok(Arc::new(self.clone()))
640 }
641}
642
643unsafe impl Sync for MongoDBTransaction {}
644unsafe impl Send for MongoDBTransaction {}