teo_mongodb_connector/connector/
connection.rs

1use std::sync::Arc;
2use std::sync::atomic::AtomicBool;
3use async_trait::async_trait;
4use bson::{doc, Document};
5use mongodb::{Client, Collection, Database};
6use mongodb::options::ClientOptions;
7use teo_runtime::connection::connection::Connection;
8use teo_runtime::connection::transaction::Transaction;
9use crate::connector::OwnedSession;
10use crate::connector::transaction::MongoDBTransaction;
11
12#[derive(Debug)]
13pub struct MongoDBConnection {
14    client: Client,
15    database: Database,
16    supports_transaction: bool,
17}
18
19impl MongoDBConnection {
20
21    pub async fn new<P>(url: &str, print: P) -> Self where P: Fn(&str) {
22        let options = match ClientOptions::parse(url).await {
23            Ok(options) => options,
24            Err(_) => panic!("MongoDB url is invalid.")
25        };
26        let database_name = match &options.default_database {
27            Some(database_name) => database_name,
28            None => panic!("No database name found in MongoDB url.")
29        };
30        let client = match Client::with_options(options.clone()) {
31            Ok(client) => client,
32            Err(_) => panic!("MongoDB client creating error.")
33        };
34        match client.database("xxxxxpingpingpingxxxxx").run_command(doc! {"ping": 1}, None).await {
35            Ok(_) => (),
36            Err(_) => panic!("Cannot connect to MongoDB database."),
37        }
38
39        let database = client.database(&database_name);
40        let supports_transaction = Self::test_transaction_support(&client, &database).await;
41        if !supports_transaction {
42            print("warning: MongoDB transaction is not supported in this setup.");
43        }
44        Self {
45            client,
46            database,
47            supports_transaction,
48        }
49    }
50
51    async fn test_transaction_support(client: &Client, database: &Database) -> bool {
52        let Ok(mut session) = client.start_session(None).await else {
53            return false;
54        };
55        let Ok(_) = session.start_transaction(None).await else {
56            return false;
57        };
58        let collection: Collection<Document> = database.collection("__teo__transaction_test__");
59        // match collection.insert_one_with_session(doc! {"supports": true}, None, &mut session).await {
60        //     Ok(_) => (),
61        //     Err(e) => println!("see this error: {:?}", e),
62        // };
63        let result = collection.insert_one_with_session(doc! {"supports": true}, None, &mut session).await.is_ok();
64        let Ok(_) = session.commit_transaction().await else {
65            return false;
66        };
67        result
68    }
69}
70
71#[async_trait]
72impl Connection for MongoDBConnection {
73
74    async fn transaction(&self) -> teo_result::Result<Arc<dyn Transaction>> {
75        if !self.supports_transaction {
76            return self.no_transaction().await;
77        }
78        let session = OwnedSession::new(self.client.start_session(None).await.unwrap());
79        session.start_transaction().await?;
80        Ok(Arc::new(MongoDBTransaction {
81            owned_session: Some(session),
82            database: self.database.clone(),
83            committed: Arc::new(AtomicBool::new(false)),
84        }))
85    }
86
87    async fn no_transaction(&self) -> teo_result::Result<Arc<dyn Transaction>> {
88        Ok(Arc::new(MongoDBTransaction {
89            owned_session: None,
90            database: self.database.clone(),
91            committed: Arc::new(AtomicBool::new(false)),
92        }))
93    }
94}