teo_mongodb_connector/connector/
transaction.rs

1use std::fmt::{Debug};
2use std::ops::Neg;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use async_trait::async_trait;
6use bson::{Bson, doc, Document};
7use futures_util::StreamExt;
8use key_path::{KeyPath, path};
9use mongodb::{Database, Collection, IndexModel, ClientSession};
10use mongodb::error::{ErrorKind, WriteFailure, Error as MongoDBError};
11use mongodb::options::{FindOneAndUpdateOptions, IndexOptions, ReturnDocument};
12use regex::Regex;
13use crate::aggregation::Aggregation;
14use crate::bson_ext::coder::BsonCoder;
15use teo_runtime::action::action::*;
16use teo_runtime::model::object::Object;
17use teo_runtime::model::index::Type;
18use teo_runtime::model::{Index, Model};
19use teo_runtime::value::Value;
20use teo_result::{Error, Result};
21use teo_runtime::connection::transaction::{Ctx, Transaction};
22use teo_runtime::model::field::column_named::ColumnNamed;
23use teo_runtime::model::field::is_optional::IsOptional;
24use teo_runtime::traits::named::Named;
25use teo_runtime::model::field::typed::Typed;
26use teo_runtime::sort::Sort;
27use teo_runtime::model::object::input::Input;
28use teo_runtime::namespace::Namespace;
29use teo_runtime::error_ext;
30use teo_runtime::request::Request;
31use teo_runtime::utils::ContainsStr;
32use teo_runtime::teon;
33use crate::bson_ext::teon_value_to_bson;
34use crate::connector::OwnedSession;
35use crate::migration::index_model::FromIndexModel;
36
37#[derive(Debug, Clone)]
38pub struct MongoDBTransaction {
39    pub(super) database: Database,
40    pub(super) owned_session: Option<OwnedSession>,
41    pub committed: Arc<AtomicBool>,
42}
43
44impl MongoDBTransaction {
45
46    pub(crate) fn session(&self) -> Option<&mut ClientSession> {
47        if self.committed.load(Ordering::SeqCst) {
48            None
49        } else {
50            match &self.owned_session {
51                None => None,
52                Some(s) => Some(s.client_session()),
53            }
54        }
55    }
56
57    pub(crate) fn get_collection(&self, model: &Model) -> Collection<Document> {
58        self.database.collection(model.table_name())
59    }
60
61    fn document_to_object(&self, transaction_ctx: Ctx, document: &Document, object: &Object, select: Option<&Value>, include: Option<&Value>) -> Result<()> {
62        for key in document.keys() {
63            let object_field = object.model().fields().values().find(|f| f.column_name() == key);
64            if object_field.is_some() {
65                // field
66                let object_field = object_field.unwrap();
67                let object_key = object_field.name();
68                let r#type = object_field.r#type();
69                let bson_value = document.get(key).unwrap();
70                let value_result = BsonCoder::decode(transaction_ctx.namespace(), object.model(), r#type, object_field.is_optional(), bson_value, path![]);
71                match value_result {
72                    Ok(value) => {
73                        object.set_value(object_key, value).unwrap();
74                    }
75                    Err(err) => {
76                        return Err(err);
77                    }
78                }
79            } else {
80                // relation
81                let relation = object.model().relation(key);
82                if relation.is_none() {
83                    continue;
84                }
85                let inner_finder = if let Some(include) = include {
86                    include.get(key)
87                } else {
88                    None
89                };
90                let inner_select = if let Some(inner_finder) = inner_finder {
91                    inner_finder.get("select")
92                } else {
93                    None
94                };
95                let inner_include = if let Some(inner_finder) = inner_finder {
96                    inner_finder.get("include")
97                } else {
98                    None
99                };
100                let relation = relation.unwrap();
101                let relation_model = transaction_ctx.namespace().model_at_path(&relation.model_path()).unwrap();
102                let object_bsons = document.get(key).unwrap().as_array().unwrap();
103                let mut related: Vec<Object> = vec![];
104                for related_object_bson in object_bsons {
105                    let action = NESTED | FIND | (if relation.is_vec() { MANY } else { SINGLE });
106                    let related_object = transaction_ctx.new_object(relation_model, action, object.request())?;
107                    self.clone().document_to_object(transaction_ctx.clone(), related_object_bson.as_document().unwrap(), &related_object, inner_select, inner_include)?;
108                    related.push(related_object);
109                }
110                object.inner.relation_query_map.lock().unwrap().insert(key.to_string(), related);
111            }
112        }
113        object.inner.is_initialized.store(true, Ordering::SeqCst);
114        object.inner.is_new.store(false, Ordering::SeqCst);
115        object.set_select(select).unwrap();
116        Ok(())
117    }
118
119    fn _handle_write_error(&self, error_kind: &ErrorKind, object: &Object, path: KeyPath) -> Error {
120        return match error_kind {
121            ErrorKind::Write(write) => {
122                match write {
123                    WriteFailure::WriteError(write_error) => {
124                        match write_error.code {
125                            11000 => {
126                                let full_regex = Regex::new(r"dup key: (.+)").unwrap();
127                                let regex = Regex::new(r"dup key: \{ (.+?):").unwrap();
128                                let full_message = full_regex.captures(write_error.message.as_str()).unwrap().get(1).unwrap().as_str();
129                                let field_column_name = regex.captures(write_error.message.as_str()).unwrap().get(1).unwrap().as_str();
130                                if let Some(field_column) = object.model().field_with_column_name(field_column_name) {
131                                    error_ext::unique_value_duplicated(path + field_column.name(), full_message)
132                                } else {
133                                    error_ext::unique_value_duplicated(path, full_message)
134                                }
135                            }
136                            _ => {
137                                error_ext::unknown_database_write_error(path, write_error.message.as_str())
138                            }
139                        }
140                    }
141                    WriteFailure::WriteConcernError(write_concern) => {
142                        error_ext::unknown_database_write_error(path, write_concern.message.as_str())
143                    }
144                    _ => {
145                        error_ext::unknown_database_write_error(path, "unknown write failure")
146                    }
147                }
148            }
149            ErrorKind::Transaction { message, .. } => {
150                error_ext::unknown_database_write_error(path, message.as_str())
151            }
152            ErrorKind::SessionsNotSupported => {
153                error_ext::unknown_database_write_error(path, "session is not supported")
154            }
155            _ => {
156                error_ext::unknown_database_write_error(path, format!("unknown write: {:?}", error_kind))
157            }
158        }
159    }
160
161    async fn aggregate_to_documents(&self, aggregate_input: Vec<Document>, col: Collection<Document>, path: KeyPath) -> Result<Vec<std::result::Result<Document, MongoDBError>>> {
162        match self.session() {
163            Some(session) => {
164                let cur = col.aggregate_with_session(aggregate_input, None, session).await;
165                if cur.is_err() {
166                    return Err(error_ext::unknown_database_find_error(path, format!("{:?}", cur)));
167                }
168                let mut cur = cur.unwrap();
169                let mut results: Vec<std::result::Result<Document, MongoDBError>> = vec![];
170                loop {
171                    if let Some(item) = cur.next(session).await {
172                        results.push(item);
173                    } else {
174                        break;
175                    }
176                }
177                Ok(results)
178            },
179            None => {
180                let cur = col.aggregate(aggregate_input, None).await;
181                if cur.is_err() {
182                    return Err(error_ext::unknown_database_find_error(path, format!("{:?}", cur)));
183                }
184                let cur = cur.unwrap();
185                let results: Vec<std::result::Result<Document, MongoDBError>> = cur.collect().await;
186                Ok(results)
187            },
188        }
189    }
190
191    async fn aggregate_or_group_by(&self, namespace: &Namespace, model: &Model, finder: &Value, path: KeyPath) -> Result<Vec<Value>> {
192        let aggregate_input = Aggregation::build_for_aggregate(namespace, model, finder)?;
193        let col = self.get_collection(model);
194        let results = self.aggregate_to_documents(aggregate_input, col, path).await?;
195        let mut final_retval: Vec<Value> = vec![];
196        for result in results.iter() {
197            // there are records
198            let data = result.as_ref().unwrap();
199            let mut retval = teon!({});
200            for (g, o) in data {
201                if g.as_str() == "_id" {
202                    continue;
203                }
204                // aggregate
205                if g.starts_with("_") {
206                    retval.as_dictionary_mut().unwrap().insert(g.clone(), teon!({}));
207                    for (dbk, v) in o.as_document().unwrap() {
208                        let k = dbk;
209                        if let Some(f) = v.as_f64() {
210                            retval.as_dictionary_mut().unwrap().get_mut(g.as_str()).unwrap().as_dictionary_mut().unwrap().insert(k.to_string(), teon!(f));
211                        } else if let Some(i) = v.as_i64() {
212                            retval.as_dictionary_mut().unwrap().get_mut(g.as_str()).unwrap().as_dictionary_mut().unwrap().insert(k.to_string(), teon!(i));
213                        } else if let Some(i) = v.as_i32() {
214                            retval.as_dictionary_mut().unwrap().get_mut(g.as_str()).unwrap().as_dictionary_mut().unwrap().insert(k.to_string(), teon!(i));
215                        } else if v.as_null().is_some() {
216                            retval.as_dictionary_mut().unwrap().get_mut(g.as_str()).unwrap().as_dictionary_mut().unwrap().insert(k.to_string(), teon!(null));
217                        }
218                    }
219                } else {
220                    // group by field
221                    let field = model.field(g).unwrap();
222                    let val = if o.as_null().is_some() { Value::Null } else {
223                        BsonCoder::decode(namespace, model, field.r#type(), true, o, path![])?
224                    };
225                    let json_val = val;
226                    retval.as_dictionary_mut().unwrap().insert(g.to_string(), json_val);
227                }
228            }
229            final_retval.push(retval);
230        }
231        Ok(final_retval)
232    }
233
234    async fn create_object(&self, object: &Object, path: KeyPath) -> Result<()> {
235        let namespace = object.namespace();
236        let model = object.model();
237        let keys = object.keys_for_save();
238        let col = self.get_collection(model);
239        let auto_keys = &model.cache().auto_keys;
240        // create
241        let mut doc = doc!{};
242        for key in keys {
243            if let Some(field) = model.field(key) {
244                let column_name = field.column_name();
245                let val: Bson = BsonCoder::encode(field.r#type(), object.get_value(&key).unwrap())?;
246                if val != Bson::Null {
247                    doc.insert(column_name, val);
248                }
249            } else if let Some(property) = model.property(key) {
250                let val: Bson = BsonCoder::encode(property.r#type(), object.get_property_value(&key).await?)?;
251                if val != Bson::Null {
252                    doc.insert(key, val);
253                }
254            }
255        }
256        let result = match self.session() {
257            Some(session) => {
258                col.insert_one_with_session(doc, None, session).await
259            }
260            None => {
261                col.insert_one(doc, None).await
262            }
263        };
264        match result {
265            Ok(insert_one_result) => {
266                let id = insert_one_result.inserted_id;
267                for key in auto_keys {
268                    let field = model.field(key).unwrap();
269                    if field.column_name() == "_id" {
270                        let new_value = BsonCoder::decode(namespace, model, field.r#type(), field.is_optional(), &id, path![]).unwrap();
271                        object.set_value(field.name(), new_value)?;
272                    }
273                }
274            }
275            Err(error) => {
276                return Err(self._handle_write_error(&error.kind, object, path));
277            }
278        }
279        Ok(())
280    }
281
282    async fn update_object(&self, object: &Object, path: KeyPath) -> Result<()> {
283        let namespace = object.namespace();
284        let model = object.model();
285        let keys = object.keys_for_save();
286        let col = self.get_collection(model);
287        let identifier: Bson = teon_value_to_bson(&object.db_identifier());
288        let identifier = identifier.as_document().unwrap();
289        let mut set = doc!{};
290        let mut unset = doc!{};
291        let mut inc = doc!{};
292        let mut mul = doc!{};
293        let mut push = doc!{};
294        for key in keys {
295            if let Some(field) = model.field(key) {
296                let column_name = field.column_name();
297                if let Some(updator) = object.get_atomic_updator(key) {
298                    let (key, val) = Input::key_value(updator.as_dictionary().unwrap());
299                    match key {
300                        "increment" => inc.insert(column_name, teon_value_to_bson(val)),
301                        "decrement" => inc.insert(column_name, teon_value_to_bson(&val.neg().unwrap())),
302                        "multiply" => mul.insert(column_name, teon_value_to_bson(val)),
303                        "divide" => mul.insert(column_name, Bson::Double(val.recip().unwrap().to_float().unwrap().abs())),
304                        "push" => push.insert(column_name, teon_value_to_bson(val)),
305                        _ => panic!("Unhandled key."),
306                    };
307                } else {
308                    let bson_val: Bson = BsonCoder::encode(field.r#type(), object.get_value(&key).unwrap())?;
309                    if bson_val == Bson::Null {
310                        unset.insert(key, bson_val);
311                    } else {
312                        set.insert(key, bson_val);
313                    }
314                }
315            } else if let Some(property) = model.property(key) {
316                let bson_val: Bson = BsonCoder::encode(property.r#type(), object.get_property_value(&key).await?)?;
317                if bson_val != Bson::Null {
318                    set.insert(key, bson_val);
319                } else {
320                    unset.insert(key, bson_val);
321                }
322            }
323        }
324        let mut update_doc = doc!{};
325        let mut return_new = false;
326        if !set.is_empty() {
327            update_doc.insert("$set", set);
328        }
329        if !unset.is_empty() {
330            update_doc.insert("$unset", unset);
331        }
332        if !inc.is_empty() {
333            update_doc.insert("$inc", inc);
334            return_new = true;
335        }
336        if !mul.is_empty() {
337            update_doc.insert("$mul", mul);
338            return_new = true;
339        }
340        if !push.is_empty() {
341            update_doc.insert("$push", push);
342            return_new = true;
343        }
344        if update_doc.is_empty() {
345            return Ok(());
346        }
347        if !return_new {
348            let result = match self.session() {
349                None => col.update_one(identifier.clone(), update_doc, None).await,
350                Some(session) => col.update_one_with_session(identifier.clone(), update_doc, None, session).await,
351            };
352            return match result {
353                Ok(_) => Ok(()),
354                Err(error) => {
355                    Err(self._handle_write_error(&error.kind, object, path))
356                }
357            }
358        } else {
359            let options = FindOneAndUpdateOptions::builder().return_document(ReturnDocument::After).build();
360            let result = match self.session() {
361                None => col.find_one_and_update(identifier.clone(), update_doc, options).await,
362                Some(session) => col.find_one_and_update_with_session(identifier.clone(), update_doc, options, session).await,
363            };
364            match result {
365                Ok(updated_document) => {
366                    for (key, value) in object.inner.atomic_updater_map.lock().unwrap().iter() {
367                        let bson_new_val = updated_document.as_ref().unwrap().get(key).unwrap();
368                        let field = object.model().field(key).unwrap();
369                        let field_value = BsonCoder::decode(namespace, model, field.r#type(), field.is_optional(), bson_new_val, path![])?;
370                        object.set_value(key, field_value).unwrap();
371                    }
372                }
373                Err(error) => {
374                    return Err(self._handle_write_error(&error.kind, object, path));
375                }
376            }
377        }
378        Ok(())
379    }
380
381}
382
383#[async_trait]
384impl Transaction for MongoDBTransaction {
385
386    async fn migrate(&self, models: Vec<&Model>, dry_run: bool, reset_database: bool, silent: bool) -> Result<()> {
387        if reset_database {
388            let _ = self.database.drop(None).await;
389        }
390        for model in models {
391            let _name = model.name();
392            let collection = self.get_collection(model);
393            let mut reviewed_names: Vec<String> = Vec::new();
394            let cursor_result = collection.list_indexes(None).await;
395            if cursor_result.is_ok() {
396                let mut cursor = cursor_result.unwrap();
397                while let Some(Ok(index)) = cursor.next().await {
398                    if index.keys == doc!{"_id": 1} {
399                        continue
400                    }
401                    let name = (&index).options.as_ref().unwrap().name.as_ref().unwrap();
402                    let result = model.indexes().values().find(|i| name == i.name());
403                    if result.is_none() {
404                        // not in our model definition, but in the database
405                        // drop this index
406                        let _ = collection.drop_index(name, None).await.unwrap();
407                    } else {
408                        let result = result.unwrap();
409                        let our_format_index: Index = Index::from_index_model(&index);
410                        if result != &our_format_index {
411                            // alter this index
412                            // drop first
413                            let _ = collection.drop_index(name, None).await.unwrap();
414                            // create index
415                            let index_options = IndexOptions::builder()
416                                .name(result.name().to_string())
417                                .unique(result.r#type() == Type::Unique || result.r#type() == Type::Primary)
418                                .sparse(true)
419                                .build();
420                            let mut keys = doc!{};
421                            for item in result.items() {
422                                let field = model.field(&item.field).unwrap();
423                                let column_name = field.column_name();
424                                keys.insert(column_name, if item.sort == Sort::Asc { 1 } else { -1 });
425                            }
426                            let index_model = IndexModel::builder().keys(keys).options(index_options).build();
427                            let _result = collection.create_index(index_model, None).await;
428                        }
429                    }
430                    reviewed_names.push(name.clone());
431                }
432            }
433            for (_, index) in model.indexes() {
434                if !reviewed_names.contains_str(index.name()) {
435                    // ignore primary
436                    if index.keys().len() == 1 {
437                        let field = model.field(index.keys().get(0).unwrap()).unwrap();
438                        if field.column_name() == "_id" {
439                            continue
440                        }
441                    }
442                    // create this index
443                    let index_options = IndexOptions::builder()
444                        .name(index.name().to_string())
445                        .unique(index.r#type() == Type::Unique || index.r#type() == Type::Primary)
446                        .sparse(true)
447                        .build();
448                    let mut keys = doc!{};
449                    for item in index.items() {
450                        let field = model.field(&item.field).unwrap();
451                        let column_name = field.column_name();
452                        keys.insert(column_name, if item.sort == Sort::Asc { 1 } else { -1 });
453                    }
454                    let index_model = IndexModel::builder().keys(keys).options(index_options).build();
455                    let result = collection.create_index(index_model, None).await;
456                    if result.is_err() {
457                        println!("index create error: {:?}", result.err().unwrap());
458                    }
459                }
460            }
461        }
462        Ok(())
463    }
464
465    async fn purge(&self, models: Vec<&Model>) -> Result<()> {
466        for model in models {
467            let col = self.get_collection(model);
468            col.drop(None).await.unwrap();
469        }
470        Ok(())
471    }
472
473    async fn query_raw(&self, value: &Value) -> Result<Value> {
474        unreachable!()
475    }
476
477    async fn save_object(&self, object: &Object, path: KeyPath) -> Result<()> {
478        if object.is_new() {
479            self.create_object(object, path).await
480        } else {
481            self.update_object(object, path).await
482        }
483    }
484
485    async fn delete_object(&self, object: &Object, path: KeyPath) -> Result<()> {
486        if object.is_new() {
487            return Err(error_ext::object_is_not_saved_thus_cant_be_deleted(path));
488        }
489        let model = object.model();
490        let col = self.get_collection(model);
491        let bson_identifier: Bson = teon_value_to_bson(&object.db_identifier());
492        let document_identifier = bson_identifier.as_document().unwrap();
493        let result = match self.session() {
494            None => col.delete_one(document_identifier.clone(), None).await,
495            Some(session) => col.delete_one_with_session(document_identifier.clone(), None, session).await,
496        };
497        return match result {
498            Ok(_result) => Ok(()),
499            Err(err) => {
500                Err(error_ext::unknown_database_delete_error(path, format!("{}", err)))
501            }
502        }
503    }
504
505    async fn find_unique(&self, model: &Model, finder: &Value, ignore_select_and_include: bool, action: Action, transaction_ctx: Ctx, request: Option<Request>, path: KeyPath) -> Result<Option<Object>> {
506        let select = finder.get("select");
507        let include = finder.get("include");
508        let aggregate_input = Aggregation::build(transaction_ctx.namespace(), model, finder)?;
509        let col = self.get_collection(model);
510        let results = self.aggregate_to_documents(aggregate_input, col, path).await?;
511        if results.is_empty() {
512            Ok(None)
513        } else {
514            for doc in results {
515                let obj = transaction_ctx.new_object(model, action, request)?;
516                self.clone().document_to_object(transaction_ctx, &doc.unwrap(), &obj, select, include)?;
517                return Ok(Some(obj));
518            }
519            Ok(None)
520        }
521    }
522
523    async fn find_many(&self, model: &Model, finder: &Value, ignore_select_and_include: bool, action: Action, transaction_ctx: Ctx, request: Option<Request>, path: KeyPath) -> Result<Vec<Object>> {
524        let select = finder.get("select");
525        let include = finder.get("include");
526        let aggregate_input = Aggregation::build(transaction_ctx.namespace(), model, finder)?;
527        let reverse = Input::has_negative_take(finder);
528        let col = self.get_collection(model);
529        // println!("see aggregate input: {:?}", aggregate_input);
530        let mut result = vec![];
531        let results: Vec<std::result::Result<Document, MongoDBError>> = self.aggregate_to_documents(aggregate_input, col, path.clone()).await?;
532        for doc in results {
533            let obj = transaction_ctx.new_object(model, action, request.clone())?;
534            match self.clone().document_to_object(transaction_ctx.clone(), &doc.unwrap(), &obj, select, include) {
535                Ok(_) => {
536                    if reverse {
537                        result.insert(0, obj);
538                    } else {
539                        result.push(obj);
540                    }
541                }
542                Err(err) => {
543                    return Err(error_ext::unknown_database_find_error(path, format!("{}", err)));
544                }
545            }
546        }
547        Ok(result)
548    }
549
550    async fn count(&self, model: &Model, finder: &Value, transaction_ctx: Ctx, path: KeyPath) -> Result<Value> {
551        if finder.get("select").is_some() {
552            self.count_fields(model, finder, transaction_ctx, path).await
553        } else {
554            let counts = self.count_objects(model, finder, transaction_ctx, path).await?;
555            Ok(Value::Int64(counts as i64))
556        }
557    }
558
559    async fn count_objects(&self, model: &Model, finder: &Value, transaction_ctx: Ctx, path: KeyPath) -> Result<usize> {
560        let input = Aggregation::build_for_count(transaction_ctx.namespace(), model, finder)?;
561        let col = self.get_collection(model);
562        let results = self.aggregate_to_documents(input, col, path).await?;
563        if results.is_empty() {
564            Ok(0)
565        } else {
566            let v = results.get(0).unwrap().as_ref().unwrap();
567            let bson_count = v.get("count").unwrap();
568            match bson_count {
569                Bson::Int32(i) => Ok(*i as usize),
570                Bson::Int64(i) => Ok(*i as usize),
571                _ => panic!("Unhandled count number type.")
572            }
573        }
574    }
575
576    async fn count_fields(&self, model: &Model, finder: &Value, transaction_ctx: Ctx, path: KeyPath) -> Result<Value> {
577        let new_finder = Value::Dictionary(finder.as_dictionary().unwrap().iter().map(|(k, v)| {
578            if k.as_str() == "select" {
579                ("_count".to_owned(), v.clone())
580            } else {
581                (k.to_owned(), v.clone())
582            }
583        }).collect());
584        let aggregate_value = self.aggregate(model, &new_finder, transaction_ctx, path).await?;
585        Ok(aggregate_value.get("_count").unwrap().clone())
586    }
587
588    async fn aggregate(&self, model: &Model, finder: &Value, transaction_ctx: Ctx, path: KeyPath) -> Result<Value> {
589        let results = self.aggregate_or_group_by(transaction_ctx.namespace(), model, finder, path).await?;
590        if results.is_empty() {
591            // there is no record
592            let mut retval = teon!({});
593            for (g, o) in finder.as_dictionary().unwrap() {
594                retval.as_dictionary_mut().unwrap().insert(g.clone(), teon!({}));
595                for (k, _v) in o.as_dictionary().unwrap() {
596                    let value = if g == "_count" { teon!(0) } else { teon!(null) };
597                    retval.as_dictionary_mut().unwrap().get_mut(g.as_str()).unwrap().as_dictionary_mut().unwrap().insert(k.to_string(), value);
598                }
599            }
600            Ok(retval)
601        } else {
602            Ok(results.get(0).unwrap().clone())
603        }
604    }
605
606    async fn group_by(&self, model: &Model, finder: &Value, transaction_ctx: Ctx, path: KeyPath) -> Result<Vec<Value>> {
607        Ok(self.aggregate_or_group_by(transaction_ctx.namespace(), model, finder, path).await?)
608    }
609
610    async fn sql(&self, model: &Model, sql: &str, transaction_ctx: Ctx) -> Result<Vec<Value>> {
611        Err(Error::new("do not run raw sql on MongoDB database"))
612    }
613
614    fn is_committed(&self) -> bool {
615        self.committed.load(Ordering::SeqCst)
616    }
617
618    fn is_transaction(&self) -> bool {
619        self.owned_session.is_some()
620    }
621
622    async fn commit(&self) -> Result<()> {
623        if let Some(session) = &self.owned_session {
624            session.commit_transaction().await
625        } else {
626            Ok(())
627        }
628    }
629
630    async fn abort(&self) -> Result<()> {
631        if let Some(session) = &self.owned_session {
632            session.abort_transaction().await
633        } else {
634            Ok(())
635        }
636    }
637
638    async fn spawn(&self) -> Result<Arc<dyn Transaction>> {
639        Ok(Arc::new(self.clone()))
640    }
641}
642
643unsafe impl Sync for MongoDBTransaction {}
644unsafe impl Send for MongoDBTransaction {}