1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use std::fmt::{Debug, Formatter};
use std::sync::{Arc};
use std::sync::atomic::AtomicBool;
use tokio::sync::Mutex;
use async_trait::async_trait;
use once_cell::sync::Lazy;
use quaint_forked::{pooled::Quaint};
use quaint_forked::connector::start_owned_transaction;
use crate::connector::transaction::SQLTransaction;
use crate::migration::migrate::SQLMigration;
use crate::schema::dialect::SQLDialect;
use crate::url::url_utils;
use teo_runtime::connection::connection::Connection;
use teo_result::{Error, Result};
use teo_runtime::connection::transaction::Transaction;

pub struct SQLConnection {
    dialect: SQLDialect,
    pool: Quaint,
    memory_mode: bool,
}

impl Debug for SQLConnection {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        todo!()
    }
}

impl SQLConnection {

    pub async fn new(dialect: SQLDialect, url: &str, reset: bool) -> Self {
        SQLMigration::create_database_if_needed(dialect, url, reset).await;
        let url = url_utils::normalized_url(dialect, url);
        let pool = Quaint::builder(url.as_str()).unwrap().build();
        Self { dialect, pool, memory_mode: url.to_string().contains(":memory:") }
    }

    async fn sqlite_memory_transaction(&self) -> Result<Arc<dyn Transaction>> {
        let mut connection = UNIQUE_TRANSACTION.lock().await;
        if connection.is_none() {
            let result = {
                let pooled_connection = self.pool.check_out().await;
                if pooled_connection.is_err() {
                    Err(Error::new(format!("cannot create pooled connection: {}", pooled_connection.err().unwrap().to_string())))
                } else {
                    Ok(Arc::new(SQLTransaction::new(self.dialect, Arc::new(pooled_connection.unwrap()), None)))
                }
            }?;
            *connection = Some(result.clone());
            Ok(result)
        } else {
            Ok(connection.clone().unwrap())
        }
    }
}

static UNIQUE_TRANSACTION: Lazy<Mutex<Option<Arc<dyn Transaction>>>> = Lazy::new(|| {
    Mutex::new(None)
});

#[async_trait]
impl Connection for SQLConnection {

    async fn transaction(&self) -> Result<Arc<dyn Transaction>> {
        if self.memory_mode && self.dialect.is_sqlite() {
            return self.sqlite_memory_transaction().await;
        }
        match self.pool.check_out().await {
            Ok(pooled_connection) => {
                let pooled_connection = Arc::new(pooled_connection);
                let transaction = start_owned_transaction(pooled_connection.clone(), None).await.unwrap();
                Ok(Arc::new(SQLTransaction {
                    dialect: self.dialect,
                    conn: pooled_connection,
                    tran: Some(Arc::new(transaction)),
                    committed: Arc::new(AtomicBool::new(false)),
                }))
            }
            Err(err) => {
                Err(Error::new(format!("cannot create pooled connection: {}", err.to_string())))
            }
        }
    }

    async fn no_transaction(&self) -> Result<Arc<dyn Transaction>> {
        if self.memory_mode && self.dialect.is_sqlite() {
            return self.sqlite_memory_transaction().await;
        }
        let pooled_connection = self.pool.check_out().await;
        if pooled_connection.is_err() {
            Err(Error::new(format!("cannot create pooled connection: {}", pooled_connection.err().unwrap().to_string())))
        } else {
            Ok(Arc::new(SQLTransaction::new(self.dialect, Arc::new(pooled_connection.unwrap()), None)))
        }
    }
}