geekorm_core/backends/connect/
manager.rs

1//! # Connection Manager
2#![allow(unused_imports, unused_variables)]
3use std::collections::VecDeque;
4use std::path::PathBuf;
5use std::sync::Mutex;
6use std::sync::atomic::AtomicUsize;
7
8use url::Url;
9
10use crate::backends::connect::ConnectionType;
11
12use super::{Backend, Connection};
13
14/// Connection
15#[derive(Default)]
16pub struct ConnectionManager {
17    backend: Mutex<VecDeque<Backend>>,
18    /// The type of database
19    dbtype: super::ConnectionType,
20
21    notifier: tokio::sync::Notify,
22}
23
24impl Clone for ConnectionManager {
25    fn clone(&self) -> Self {
26        Self {
27            backend: Mutex::new(self.backend.lock().unwrap().clone()),
28            ..Default::default()
29        }
30    }
31}
32
33impl ConnectionManager {
34    /// Get the type of database
35    pub fn get_database_type(&self) -> super::ConnectionType {
36        self.dbtype.clone()
37    }
38
39    /// Connect to a database
40    ///
41    /// *Examples:*
42    /// - `:memory:`, `file://:memory:` (in-memory SQLite database)
43    /// - `sqlite://./test.db`(SQLite database at `./test.db`)
44    /// - `libsql:///path/to/db` (libsql database at `/path/to/db`)
45    /// - `postgres://user:pass@localhost:5432/dbname` (Postgres database)
46    ///
47    pub async fn connect(connection: impl Into<String>) -> Result<Self, crate::Error> {
48        let connection = connection.into();
49        let connect = connection.as_str();
50
51        if connect == ":memory:" {
52            Self::in_memory().await
53        } else if connect.starts_with("file:")
54            || connect.starts_with("./")
55            || connect.starts_with("/")
56        {
57            Self::path(PathBuf::from(connect)).await
58        } else {
59            let Ok(url) = Url::parse(&connection) else {
60                return Err(crate::Error::ConnectionError(
61                    "Error parsing the URL/URI".to_string(),
62                ));
63            };
64            Self::url(url).await
65        }
66    }
67
68    /// Connect to an in-memory database
69    ///
70    /// This is only supported for sqlite based databases.
71    pub async fn in_memory() -> Result<Self, crate::Error> {
72        let manager = Self {
73            backend: Mutex::new(VecDeque::new()),
74            dbtype: ConnectionType::InMemory,
75            ..Default::default()
76        };
77
78        #[cfg(feature = "libsql")]
79        {
80            let db = ::libsql::Builder::new_local(":memory:").build().await?;
81            let conn = db.connect().unwrap();
82
83            manager.insert_backend(Backend::Libsql { conn });
84        }
85        #[cfg(feature = "rusqlite")]
86        {
87            let conn = ::rusqlite::Connection::open_in_memory()?;
88
89            manager.insert_backend(Backend::Rusqlite {
90                conn: std::sync::Arc::new(conn),
91            });
92        }
93        Ok(manager)
94    }
95
96    /// Connect to a database at a given path
97    pub async fn path(path: impl Into<PathBuf>) -> Result<Self, crate::Error> {
98        let path: PathBuf = path.into();
99        #[cfg(feature = "log")]
100        log::debug!("Connection to database path: {}", path.display());
101
102        let Some(filename) = path.file_name() else {
103            return Err(crate::Error::ConnectionError(
104                "Database path requires to have a file name".to_string(),
105            ));
106        };
107
108        if filename == ":memory:" || filename == "file::memory:" {
109            return Self::in_memory().await;
110        }
111
112        // Create the parent directory if it doesn't exist (recursively)
113        if let Some(parent) = path.parent() {
114            if !parent.exists() {
115                #[cfg(feature = "log")]
116                log::debug!("Creating parent directory: {}", parent.display());
117                tokio::fs::create_dir_all(parent).await?;
118            }
119        };
120
121        let manager = Self {
122            backend: Mutex::new(VecDeque::new()),
123            dbtype: ConnectionType::Path { file: path.clone() },
124            ..Default::default()
125        };
126
127        #[cfg(feature = "libsql")]
128        {
129            let db = ::libsql::Builder::new_local(path).build().await?;
130            let conn = db.connect().unwrap();
131
132            manager.insert_backend(Backend::Libsql { conn });
133        }
134        Ok(manager)
135    }
136
137    /// Connect to a database using a URL
138    ///
139    #[allow(unreachable_patterns)]
140    pub async fn url(url: Url) -> Result<Self, crate::Error> {
141        match url.scheme() {
142            "file" => {
143                let path = url.path();
144                Self::path(path).await
145            }
146            #[cfg(feature = "libsql")]
147            "libsql" | "sqlite" => {
148                if let Some(host) = url.host_str() {
149                    if host == "memory" || host == ":memory:" {
150                        return Self::in_memory().await;
151                    }
152                    Err(crate::Error::ConnectionError(
153                        "Remote connection handling is not yet supported".to_string(),
154                    ))
155                } else {
156                    Self::path(url.path()).await
157                }
158            }
159            #[cfg(feature = "rusqlite")]
160            "rusqlite" | "sqlite" => {
161                if let Some(host) = url.host_str() {
162                    if host == "memory" || host == ":memory:" {
163                        return Self::in_memory().await;
164                    }
165                    Err(crate::Error::ConnectionError(
166                        "Remote connection handling is not yet supported".to_string(),
167                    ))
168                } else {
169                    Self::path(url.path()).await
170                }
171            }
172            _ => Err(crate::Error::ConnectionError(format!(
173                "Unknown database URL scheme: {}",
174                url.scheme()
175            ))),
176        }
177    }
178
179    /// Acquire a connection from the pool
180    pub async fn acquire(&self) -> Connection<'_> {
181        self.notifier.notified().await;
182        let mut conns = self.backend.lock().unwrap();
183        let conn = conns.pop_front().unwrap();
184
185        Connection {
186            pool: self,
187            query_count: AtomicUsize::new(0),
188            backend: conn,
189        }
190    }
191
192    /// Insert a connection back into the pool
193    pub fn insert_backend(&self, backend: Backend) {
194        let mut conns = self.backend.lock().unwrap();
195        conns.push_back(backend);
196
197        self.notifier.notify_one();
198    }
199}
200
201#[cfg(feature = "libsql")]
202impl From<libsql::Connection> for ConnectionManager {
203    fn from(value: libsql::Connection) -> Self {
204        let backend = Backend::Libsql { conn: value };
205        let cm = Self {
206            backend: Mutex::new(VecDeque::new()),
207            ..Default::default()
208        };
209        cm.insert_backend(backend);
210
211        cm
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218    use crate::backends::connect::ConnectionType;
219
220    #[tokio::test]
221    async fn test_connect_memory() {
222        let cm = ConnectionManager::in_memory()
223            .await
224            .expect("Failed to connect to in-memory database");
225        assert_eq!(cm.get_database_type(), ConnectionType::InMemory);
226
227        let cm = ConnectionManager::connect(":memory:")
228            .await
229            .expect("Failed to connect to in-memory database");
230        assert_eq!(cm.get_database_type(), ConnectionType::InMemory);
231
232        let cm = ConnectionManager::connect("file::memory:")
233            .await
234            .expect("Failed to connect to in-memory database");
235        assert_eq!(cm.get_database_type(), ConnectionType::InMemory);
236    }
237
238    #[tokio::test]
239    async fn test_connect_path() {
240        let path = PathBuf::from("./test.db");
241        let cm = ConnectionManager::path(path.clone())
242            .await
243            .expect("Failed to connect to database");
244        assert_eq!(cm.get_database_type(), ConnectionType::Path { file: path });
245
246        let path = PathBuf::from("/tmp/test.db");
247        let cm = ConnectionManager::path(path.clone())
248            .await
249            .expect("Failed to connect to database");
250        assert_eq!(cm.get_database_type(), ConnectionType::Path { file: path });
251    }
252
253    #[tokio::test]
254    async fn test_connect_url() {
255        let url = Url::parse("sqlite:///tmp/test.db").unwrap();
256        let cm = ConnectionManager::url(url)
257            .await
258            .expect("Failed to connect to database");
259        assert_eq!(
260            cm.get_database_type(),
261            ConnectionType::Path {
262                file: PathBuf::from("/tmp/test.db")
263            }
264        );
265    }
266}