geekorm_core/backends/libsql/
mutex.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use serde::{de::DeserializeOwned, Serialize};
use std::sync::Arc;

#[cfg(not(feature = "backends-tokio"))]
use std::sync::Mutex;
#[cfg(feature = "backends-tokio")]
use tokio::sync::Mutex;

use crate::{GeekConnection, QueryBuilderTrait, TableBuilder};

const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);

impl<C> GeekConnection for Arc<Mutex<C>>
where
    // Self: Sized + Sync + Send + 'static,
    C: GeekConnection<Connection = libsql::Connection>,
{
    type Connection = Arc<Mutex<libsql::Connection>>;

    async fn create_table<T>(connection: &Self::Connection) -> Result<(), crate::Error>
    where
        T: TableBuilder + QueryBuilderTrait + Sized + Serialize + DeserializeOwned,
    {
        let start = std::time::Instant::now();
        while start.elapsed() < TIMEOUT {
            match connection.try_lock() {
                Ok(conn) => return C::create_table::<T>(&conn).await,
                Err(_) => {
                    std::thread::sleep(std::time::Duration::from_millis(10));
                }
            }
        }
        Err(crate::Error::LibSQLError {
            error: "Error getting write lock on connection".to_string(),
            query: "N/A".to_string(),
        })
    }

    async fn row_count(
        connection: &Self::Connection,
        query: crate::Query,
    ) -> Result<i64, crate::Error> {
        let start = std::time::Instant::now();
        while start.elapsed() < TIMEOUT {
            match connection.try_lock() {
                Ok(conn) => return C::row_count(&conn, query).await,
                Err(_) => {
                    std::thread::sleep(std::time::Duration::from_millis(10));
                }
            }
        }
        Err(crate::Error::LibSQLError {
            error: "Error getting write lock on connection in row_count".to_string(),
            query: "N/A".to_string(),
        })
    }

    async fn query<T>(
        connection: &Self::Connection,
        query: crate::Query,
    ) -> Result<Vec<T>, crate::Error>
    where
        T: serde::de::DeserializeOwned,
    {
        let start = std::time::Instant::now();
        while start.elapsed() < TIMEOUT {
            match connection.try_lock() {
                Ok(conn) => return C::query::<T>(&conn, query).await,
                Err(_) => {
                    std::thread::sleep(std::time::Duration::from_millis(10));
                }
            }
        }

        Err(crate::Error::LibSQLError {
            error: "Error getting write lock on connection".to_string(),
            query: "N/A".to_string(),
        })
    }

    async fn query_first<T>(
        connection: &Self::Connection,
        query: crate::Query,
    ) -> Result<T, crate::Error>
    where
        T: serde::de::DeserializeOwned,
    {
        let start = std::time::Instant::now();
        while start.elapsed() < TIMEOUT {
            match connection.try_lock() {
                Ok(conn) => return C::query_first::<T>(&conn, query).await,
                Err(_) => {
                    std::thread::sleep(std::time::Duration::from_millis(10));
                }
            }
        }
        Err(crate::Error::LibSQLError {
            error: "Error getting write lock on connection".to_string(),
            query: "N/A".to_string(),
        })
    }

    async fn execute(
        connection: &Self::Connection,
        query: crate::Query,
    ) -> Result<(), crate::Error> {
        let start = std::time::Instant::now();
        while start.elapsed() < TIMEOUT {
            match connection.try_lock() {
                Ok(conn) => return C::execute(&conn, query).await,
                Err(_) => {
                    std::thread::sleep(std::time::Duration::from_millis(10));
                }
            }
        }
        Err(crate::Error::LibSQLError {
            error: "Error getting write lock on connection in execute".to_string(),
            query: "N/A".to_string(),
        })
    }
}