azoth_sqlite/
read_pool.rs1use azoth_core::{
7 error::{AzothError, Result},
8 ReadPoolConfig,
9};
10use parking_lot::Mutex;
11use rusqlite::{Connection, OpenFlags};
12use std::path::{Path, PathBuf};
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::time::{Duration, Instant};
15use tokio::sync::{Semaphore, SemaphorePermit};
16
17pub struct PooledSqliteConnection<'a> {
22 conn: parking_lot::MutexGuard<'a, Connection>,
23 _permit: SemaphorePermit<'a>,
24}
25
26impl<'a> PooledSqliteConnection<'a> {
27 pub fn query_row<T, P, F>(&self, sql: &str, params: P, f: F) -> Result<T>
35 where
36 P: rusqlite::Params,
37 F: FnOnce(&rusqlite::Row<'_>) -> rusqlite::Result<T>,
38 {
39 self.conn
40 .query_row(sql, params, f)
41 .map_err(|e| AzothError::Projection(e.to_string()))
42 }
43
44 pub fn prepare(&self, sql: &str) -> Result<rusqlite::Statement<'_>> {
46 self.conn
47 .prepare(sql)
48 .map_err(|e| AzothError::Projection(e.to_string()))
49 }
50
51 pub fn connection(&self) -> &Connection {
55 &self.conn
56 }
57}
58
59pub struct SqliteReadPool {
75 connections: Vec<Mutex<Connection>>,
76 semaphore: Semaphore,
77 acquire_timeout: Duration,
78 enabled: bool,
79 db_path: PathBuf,
80 next_idx: AtomicUsize,
82}
83
84impl SqliteReadPool {
85 pub fn new(db_path: &Path, config: ReadPoolConfig) -> Result<Self> {
89 let pool_size = if config.enabled { config.pool_size } else { 1 };
90 let mut connections = Vec::with_capacity(pool_size);
91
92 for _ in 0..pool_size {
93 let conn = Connection::open_with_flags(
94 db_path,
95 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
96 )
97 .map_err(|e| AzothError::Projection(e.to_string()))?;
98
99 connections.push(Mutex::new(conn));
100 }
101
102 Ok(Self {
103 connections,
104 semaphore: Semaphore::new(pool_size),
105 acquire_timeout: Duration::from_millis(config.acquire_timeout_ms),
106 enabled: config.enabled,
107 db_path: db_path.to_path_buf(),
108 next_idx: AtomicUsize::new(0),
109 })
110 }
111
112 pub async fn acquire(&self) -> Result<PooledSqliteConnection<'_>> {
117 let permit = tokio::time::timeout(self.acquire_timeout, self.semaphore.acquire())
118 .await
119 .map_err(|_| {
120 AzothError::Timeout(format!(
121 "Read pool acquire timeout after {:?}",
122 self.acquire_timeout
123 ))
124 })?
125 .map_err(|e| AzothError::Internal(format!("Semaphore closed: {}", e)))?;
126
127 let start = self.next_idx.fetch_add(1, Ordering::Relaxed) % self.connections.len();
129 for i in 0..self.connections.len() {
130 let idx = (start + i) % self.connections.len();
131 if let Some(guard) = self.connections[idx].try_lock() {
132 return Ok(PooledSqliteConnection {
133 conn: guard,
134 _permit: permit,
135 });
136 }
137 }
138
139 Err(AzothError::Internal(
141 "No available connection despite having permit".into(),
142 ))
143 }
144
145 pub fn try_acquire(&self) -> Result<Option<PooledSqliteConnection<'_>>> {
149 match self.semaphore.try_acquire() {
150 Ok(permit) => {
151 let start = self.next_idx.fetch_add(1, Ordering::Relaxed) % self.connections.len();
153 for i in 0..self.connections.len() {
154 let idx = (start + i) % self.connections.len();
155 if let Some(guard) = self.connections[idx].try_lock() {
156 return Ok(Some(PooledSqliteConnection {
157 conn: guard,
158 _permit: permit,
159 }));
160 }
161 }
162 Ok(None)
164 }
165 Err(_) => Ok(None),
166 }
167 }
168
169 pub fn available_permits(&self) -> usize {
171 self.semaphore.available_permits()
172 }
173
174 pub fn is_enabled(&self) -> bool {
176 self.enabled
177 }
178
179 pub fn db_path(&self) -> &Path {
181 &self.db_path
182 }
183
184 pub fn pool_size(&self) -> usize {
186 self.connections.len()
187 }
188
189 pub fn acquire_blocking(&self) -> Result<PooledSqliteConnection<'_>> {
193 let deadline = Instant::now() + self.acquire_timeout;
194
195 loop {
196 if let Ok(Some(conn)) = self.try_acquire() {
197 return Ok(conn);
198 }
199
200 if Instant::now() >= deadline {
201 return Err(AzothError::Timeout(format!(
202 "Read pool acquire timeout after {:?}",
203 self.acquire_timeout
204 )));
205 }
206
207 std::thread::sleep(Duration::from_millis(1));
208 }
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215 use tempfile::TempDir;
216
217 fn create_test_db() -> (TempDir, PathBuf) {
218 let temp_dir = TempDir::new().unwrap();
219 let db_path = temp_dir.path().join("test.db");
220
221 let conn = Connection::open(&db_path).unwrap();
223 conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, value TEXT)", [])
224 .unwrap();
225 conn.execute("INSERT INTO test (id, value) VALUES (1, 'hello')", [])
226 .unwrap();
227 conn.execute("INSERT INTO test (id, value) VALUES (2, 'world')", [])
228 .unwrap();
229 drop(conn);
230
231 (temp_dir, db_path)
232 }
233
234 #[tokio::test]
235 async fn test_pool_acquire_release() {
236 let (_temp_dir, db_path) = create_test_db();
237 let config = ReadPoolConfig::enabled(2);
238 let pool = SqliteReadPool::new(&db_path, config).unwrap();
239
240 assert_eq!(pool.available_permits(), 2);
241
242 let conn1 = pool.acquire().await.unwrap();
244 assert_eq!(pool.available_permits(), 1);
245
246 let conn2 = pool.acquire().await.unwrap();
248 assert_eq!(pool.available_permits(), 0);
249
250 assert!(pool.try_acquire().unwrap().is_none());
252
253 drop(conn1);
255 assert_eq!(pool.available_permits(), 1);
256
257 drop(conn2);
259 assert_eq!(pool.available_permits(), 2);
260 }
261
262 #[tokio::test]
263 async fn test_pool_query() {
264 let (_temp_dir, db_path) = create_test_db();
265 let config = ReadPoolConfig::enabled(2);
266 let pool = SqliteReadPool::new(&db_path, config).unwrap();
267
268 let conn = pool.acquire().await.unwrap();
269 let value: String = conn
270 .query_row("SELECT value FROM test WHERE id = ?1", [1], |row| {
271 row.get(0)
272 })
273 .unwrap();
274 assert_eq!(value, "hello");
275
276 let count: i64 = conn
277 .query_row("SELECT COUNT(*) FROM test", [], |row| row.get(0))
278 .unwrap();
279 assert_eq!(count, 2);
280 }
281
282 #[test]
283 fn test_try_acquire() {
284 let (_temp_dir, db_path) = create_test_db();
285 let config = ReadPoolConfig::enabled(1);
286 let pool = SqliteReadPool::new(&db_path, config).unwrap();
287
288 let conn = pool.try_acquire().unwrap();
290 assert!(conn.is_some());
291
292 assert!(pool.try_acquire().unwrap().is_none());
294
295 drop(conn);
297 assert!(pool.try_acquire().unwrap().is_some());
298 }
299}