teo_mongodb_connector/connector/
connection.rs1use std::sync::Arc;
2use std::sync::atomic::AtomicBool;
3use async_trait::async_trait;
4use bson::{doc, Document};
5use mongodb::{Client, Collection, Database};
6use mongodb::options::ClientOptions;
7use teo_runtime::connection::connection::Connection;
8use teo_runtime::connection::transaction::Transaction;
9use crate::connector::OwnedSession;
10use crate::connector::transaction::MongoDBTransaction;
11
12#[derive(Debug)]
13pub struct MongoDBConnection {
14 client: Client,
15 database: Database,
16 supports_transaction: bool,
17}
18
19impl MongoDBConnection {
20
21 pub async fn new<P>(url: &str, print: P) -> Self where P: Fn(&str) {
22 let options = match ClientOptions::parse(url).await {
23 Ok(options) => options,
24 Err(_) => panic!("MongoDB url is invalid.")
25 };
26 let database_name = match &options.default_database {
27 Some(database_name) => database_name,
28 None => panic!("No database name found in MongoDB url.")
29 };
30 let client = match Client::with_options(options.clone()) {
31 Ok(client) => client,
32 Err(_) => panic!("MongoDB client creating error.")
33 };
34 match client.database("xxxxxpingpingpingxxxxx").run_command(doc! {"ping": 1}, None).await {
35 Ok(_) => (),
36 Err(_) => panic!("Cannot connect to MongoDB database."),
37 }
38
39 let database = client.database(&database_name);
40 let supports_transaction = Self::test_transaction_support(&client, &database).await;
41 if !supports_transaction {
42 print("warning: MongoDB transaction is not supported in this setup.");
43 }
44 Self {
45 client,
46 database,
47 supports_transaction,
48 }
49 }
50
51 async fn test_transaction_support(client: &Client, database: &Database) -> bool {
52 let Ok(mut session) = client.start_session(None).await else {
53 return false;
54 };
55 let Ok(_) = session.start_transaction(None).await else {
56 return false;
57 };
58 let collection: Collection<Document> = database.collection("__teo__transaction_test__");
59 let result = collection.insert_one_with_session(doc! {"supports": true}, None, &mut session).await.is_ok();
64 let Ok(_) = session.commit_transaction().await else {
65 return false;
66 };
67 result
68 }
69}
70
71#[async_trait]
72impl Connection for MongoDBConnection {
73
74 async fn transaction(&self) -> teo_result::Result<Arc<dyn Transaction>> {
75 if !self.supports_transaction {
76 return self.no_transaction().await;
77 }
78 let session = OwnedSession::new(self.client.start_session(None).await.unwrap());
79 session.start_transaction().await?;
80 Ok(Arc::new(MongoDBTransaction {
81 owned_session: Some(session),
82 database: self.database.clone(),
83 committed: Arc::new(AtomicBool::new(false)),
84 }))
85 }
86
87 async fn no_transaction(&self) -> teo_result::Result<Arc<dyn Transaction>> {
88 Ok(Arc::new(MongoDBTransaction {
89 owned_session: None,
90 database: self.database.clone(),
91 committed: Arc::new(AtomicBool::new(false)),
92 }))
93 }
94}