geekorm_core/backends/connect/
manager.rs

1//! # Connection Manager
2#![allow(unused_imports, unused_variables)]
3use std::collections::VecDeque;
4use std::path::PathBuf;
5use std::sync::atomic::AtomicUsize;
6use std::sync::{Arc, Mutex};
7use url::Url;
8
9use super::{Backend, Connection};
10use crate::backends::connect::ConnectionType;
11
12/// Connection Manager
13///
14/// The connection manager is used to manage the connections to the database.
15pub struct ConnectionManager {
16    /// The backend connections which are thread safe and can be shared between
17    /// different parts of the code.
18    ///
19    /// The pool is a deque of connections, where the front of the deque is the
20    /// next connection to be acquired and the back of the deque is the next
21    /// connection to be released.
22    backend: Arc<Mutex<VecDeque<Backend>>>,
23    /// The type of database that the connection is connected to
24    /// (e.g. in-memory, file-based, etc.)
25    dbtype: super::ConnectionType,
26
27    /// Notifier is used to notify the pool that a connection has been released
28    /// and is ready to be acquired.
29    ///
30    /// The Arc is to allow the pool to be cloned and passed around to different
31    /// parts of the code without having to worry about lifetimes.
32    notifier: Arc<tokio::sync::Notify>,
33}
34
35impl Clone for ConnectionManager {
36    fn clone(&self) -> Self {
37        Self {
38            backend: Arc::clone(&self.backend),
39            dbtype: self.dbtype.clone(),
40            notifier: Arc::clone(&self.notifier),
41        }
42    }
43}
44
45impl Default for ConnectionManager {
46    fn default() -> Self {
47        Self {
48            backend: Arc::new(Mutex::new(VecDeque::new())),
49            dbtype: ConnectionType::InMemory,
50            notifier: Arc::new(tokio::sync::Notify::new()),
51        }
52    }
53}
54
55impl ConnectionManager {
56    /// Get the type of database
57    pub fn get_database_type(&self) -> super::ConnectionType {
58        self.dbtype.clone()
59    }
60
61    /// Connect to a database
62    ///
63    /// *Examples:*
64    /// - `:memory:`, `file://:memory:` (in-memory SQLite database)
65    /// - `sqlite://./test.db`(SQLite database at `./test.db`)
66    /// - `libsql:///path/to/db` (libsql database at `/path/to/db`)
67    /// - `postgres://user:pass@localhost:5432/dbname` (Postgres database)
68    ///
69    pub async fn connect(connection: impl Into<String>) -> Result<Self, crate::Error> {
70        let connection = connection.into();
71        let connect = connection.as_str();
72
73        if connect == ":memory:" {
74            Self::in_memory().await
75        } else if connect.starts_with("file:")
76            || connect.starts_with("./")
77            || connect.starts_with("/")
78        {
79            Self::path(PathBuf::from(connect)).await
80        } else {
81            let Ok(url) = Url::parse(&connection) else {
82                return Err(crate::Error::ConnectionError(
83                    "Error parsing the URL/URI".to_string(),
84                ));
85            };
86            Self::url(url).await
87        }
88    }
89
90    /// Connect to an in-memory database
91    ///
92    /// This is only supported for sqlite based databases.
93    pub async fn in_memory() -> Result<Self, crate::Error> {
94        let manager = Self {
95            dbtype: ConnectionType::InMemory,
96            ..Default::default()
97        };
98
99        #[cfg(feature = "libsql")]
100        {
101            let db = ::libsql::Builder::new_local(":memory:").build().await?;
102            let conn = db.connect().unwrap();
103
104            manager.insert_backend(Backend::Libsql { conn });
105        }
106        #[cfg(feature = "rusqlite")]
107        {
108            let conn = ::rusqlite::Connection::open_in_memory()?;
109
110            manager.insert_backend(Backend::Rusqlite {
111                conn: std::sync::Arc::new(conn),
112            });
113        }
114        Ok(manager)
115    }
116
117    /// Connect to a database at a given path
118    pub async fn path(path: impl Into<PathBuf>) -> Result<Self, crate::Error> {
119        let path: PathBuf = path.into();
120        #[cfg(feature = "log")]
121        log::debug!("Connection to database path: {}", path.display());
122
123        let Some(filename) = path.file_name() else {
124            return Err(crate::Error::ConnectionError(
125                "Database path requires to have a file name".to_string(),
126            ));
127        };
128
129        if filename == ":memory:" || filename == "file::memory:" {
130            return Self::in_memory().await;
131        }
132
133        // Create the parent directory if it doesn't exist (recursively)
134        if let Some(parent) = path.parent() {
135            if !parent.exists() {
136                #[cfg(feature = "log")]
137                log::debug!("Creating parent directory: {}", parent.display());
138                tokio::fs::create_dir_all(parent).await?;
139            }
140        };
141
142        let manager = Self {
143            dbtype: ConnectionType::Path { file: path.clone() },
144            ..Default::default()
145        };
146
147        #[cfg(feature = "libsql")]
148        {
149            let db = ::libsql::Builder::new_local(path).build().await?;
150            let conn = db.connect().unwrap();
151
152            manager.insert_backend(Backend::Libsql { conn });
153        }
154        Ok(manager)
155    }
156
157    /// Connect to a database using a URL
158    ///
159    #[allow(unreachable_patterns)]
160    pub async fn url(url: Url) -> Result<Self, crate::Error> {
161        match url.scheme() {
162            "file" => {
163                let path = url.path();
164                Self::path(path).await
165            }
166            #[cfg(feature = "libsql")]
167            "libsql" | "sqlite" => {
168                if let Some(host) = url.host_str() {
169                    if host == "memory" || host == ":memory:" {
170                        return Self::in_memory().await;
171                    }
172                    Err(crate::Error::ConnectionError(
173                        "Remote connection handling is not yet supported".to_string(),
174                    ))
175                } else {
176                    Self::path(url.path()).await
177                }
178            }
179            #[cfg(feature = "rusqlite")]
180            "rusqlite" | "sqlite" => {
181                if let Some(host) = url.host_str() {
182                    if host == "memory" || host == ":memory:" {
183                        return Self::in_memory().await;
184                    }
185                    Err(crate::Error::ConnectionError(
186                        "Remote connection handling is not yet supported".to_string(),
187                    ))
188                } else {
189                    Self::path(url.path()).await
190                }
191            }
192            _ => Err(crate::Error::ConnectionError(format!(
193                "Unknown database URL scheme: {}",
194                url.scheme()
195            ))),
196        }
197    }
198
199    /// Acquire a connection from the pool
200    pub async fn acquire(&self) -> Connection<'_> {
201        self.notifier.notified().await;
202        let mut conns = self.backend.lock().unwrap();
203        let conn = conns.pop_front().unwrap();
204
205        Connection {
206            pool: self,
207            query_count: AtomicUsize::new(0),
208            backend: conn,
209        }
210    }
211
212    /// Insert a connection back into the pool
213    pub fn insert_backend(&self, backend: Backend) {
214        let mut conns = self.backend.lock().unwrap();
215        conns.push_back(backend);
216
217        self.notifier.notify_one();
218    }
219}
220
221#[cfg(feature = "libsql")]
222impl From<libsql::Connection> for ConnectionManager {
223    fn from(value: libsql::Connection) -> Self {
224        let backend = Backend::Libsql { conn: value };
225        let cm = Self::default();
226        cm.insert_backend(backend);
227
228        cm
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use crate::backends::connect::ConnectionType;
236
237    #[tokio::test]
238    async fn test_connect_memory() {
239        let cm = ConnectionManager::in_memory()
240            .await
241            .expect("Failed to connect to in-memory database");
242        assert_eq!(cm.get_database_type(), ConnectionType::InMemory);
243
244        let cm = ConnectionManager::connect(":memory:")
245            .await
246            .expect("Failed to connect to in-memory database");
247        assert_eq!(cm.get_database_type(), ConnectionType::InMemory);
248
249        let cm = ConnectionManager::connect("file::memory:")
250            .await
251            .expect("Failed to connect to in-memory database");
252        assert_eq!(cm.get_database_type(), ConnectionType::InMemory);
253    }
254
255    #[tokio::test]
256    async fn test_connect_path() {
257        let path = PathBuf::from("./test.db");
258        let cm = ConnectionManager::path(path.clone())
259            .await
260            .expect("Failed to connect to database");
261        assert_eq!(cm.get_database_type(), ConnectionType::Path { file: path });
262
263        let path = PathBuf::from("/tmp/test.db");
264        let cm = ConnectionManager::path(path.clone())
265            .await
266            .expect("Failed to connect to database");
267        assert_eq!(cm.get_database_type(), ConnectionType::Path { file: path });
268    }
269
270    #[tokio::test]
271    async fn test_connect_url() {
272        let url = Url::parse("sqlite:///tmp/test.db").unwrap();
273        let cm = ConnectionManager::url(url)
274            .await
275            .expect("Failed to connect to database");
276        assert_eq!(
277            cm.get_database_type(),
278            ConnectionType::Path {
279                file: PathBuf::from("/tmp/test.db")
280            }
281        );
282    }
283}