tork-orm-core 0.1.0

Core runtime for the Tork ORM: dialect-agnostic query model, typed columns, and database drivers.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
//! The SQLite driver.
//!
//! Connections are pooled and reused: each call checks an idle connection out of
//! the pool, runs the blocking SQLite work on the runtime's blocking thread pool
//! via [`tokio::task::spawn_blocking`], then returns the connection. A semaphore
//! bounds how many connections run concurrently, so the blocking pool is never
//! flooded. Reusing connections preserves SQLite's per-connection prepared
//! statement cache, and a connection survives a failed query (only a panic or a
//! failed open removes it), so the pool neither leaks nor thrashes.

use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;

use rusqlite::types::{ToSqlOutput, Value as SqliteValue, ValueRef};
use rusqlite::{Connection, ToSql};
use tokio::sync::{oneshot, OwnedSemaphorePermit, Semaphore};

use crate::driver::ExecuteResult;
use crate::error::OrmError;
use crate::row::Row;
use crate::value::Value;

/// The busy timeout applied to every connection, in milliseconds.
///
/// With write-ahead logging a brief wait lets concurrent writers serialize
/// instead of failing immediately with `SQLITE_BUSY`.
const BUSY_TIMEOUT_MS: u32 = 5_000;

/// How long a connection checkout waits for a free pool slot before failing.
///
/// Without a bound, a connection leak or a stuck query would make every later
/// checkout hang forever; a timeout turns that into a recoverable error so the
/// request fails fast instead of wedging the whole server.
const DEFAULT_ACQUIRE_TIMEOUT: Duration = Duration::from_secs(30);

/// How a connection should be opened.
#[derive(Debug, Clone)]
enum Source {
    /// A file-backed database at the given path.
    File(String),
    /// A private in-memory database.
    Memory,
}

/// Shared pool state behind an [`Arc`].
struct Inner {
    source: Source,
    idle: Mutex<Vec<Connection>>,
    semaphore: Arc<Semaphore>,
    statements: AtomicU64,
    acquire_timeout: Duration,
}

impl Inner {
    /// Waits for a pool slot, failing with a clear error if the wait exceeds the
    /// configured checkout timeout or the pool has been closed.
    async fn acquire_permit(&self) -> crate::Result<OwnedSemaphorePermit> {
        let acquire = Arc::clone(&self.semaphore).acquire_owned();
        match tokio::time::timeout(self.acquire_timeout, acquire).await {
            Ok(Ok(permit)) => Ok(permit),
            Ok(Err(_)) => Err(OrmError::connection("connection pool is closed")),
            Err(_) => Err(OrmError::connection(format!(
                "timed out after {}s waiting for a database connection",
                self.acquire_timeout.as_secs()
            ))),
        }
    }
}

impl Inner {
    /// Opens and configures a fresh connection.
    fn open(&self) -> crate::Result<Connection> {
        let conn = match &self.source {
            Source::File(path) => Connection::open(path)
                .map_err(|e| OrmError::connection(format!("cannot open `{path}`")).with_source(e))?,
            Source::Memory => Connection::open_in_memory()
                .map_err(|e| OrmError::connection("cannot open in-memory database").with_source(e))?,
        };

        conn.busy_timeout(std::time::Duration::from_millis(u64::from(BUSY_TIMEOUT_MS)))
            .map_err(|e| OrmError::connection("cannot set busy timeout").with_source(e))?;
        conn.pragma_update(None, "foreign_keys", "ON")
            .map_err(|e| OrmError::connection("cannot enable foreign keys").with_source(e))?;
        if matches!(self.source, Source::File(_)) {
            // Write-ahead logging improves read/write concurrency for file databases.
            conn.pragma_update(None, "journal_mode", "WAL")
                .map_err(|e| OrmError::connection("cannot enable WAL").with_source(e))?;
        }
        // Cap the per-connection prepared-statement cache so dynamically
        // generated SQL strings (for example from raw query builders)
        // cannot bloat memory indefinitely. 200 entries is generous enough
        // for every statement a typical ORM workload generates, while
        // bounding the worst-case memory footprint.
        conn.set_prepared_statement_cache_capacity(200);
        Ok(conn)
    }
}

/// A pool of reusable SQLite connections.
///
/// Cloning a pool is cheap: clones share the same underlying connections and
/// concurrency limit.
#[derive(Clone)]
pub struct SqlitePool {
    inner: Arc<Inner>,
}

impl SqlitePool {
    /// Builds a pool from a database URL and a maximum connection count.
    ///
    /// Accepted URL forms: `sqlite://path/to.db`, `sqlite:path/to.db`, a bare
    /// path, `:memory:`, or `sqlite::memory:`. In-memory databases are private to
    /// a single connection, so the pool is clamped to one connection for them.
    ///
    /// # Errors
    ///
    /// Returns an error if the URL is empty or `max_connections` is zero.
    pub fn new(url: &str, max_connections: u32) -> crate::Result<Self> {
        if max_connections == 0 {
            return Err(OrmError::configuration("max_connections must be at least 1"));
        }
        let source = parse_url(url)?;
        let permits = match source {
            Source::Memory => 1,
            Source::File(_) => max_connections as usize,
        };
        Ok(Self {
            inner: Arc::new(Inner {
                source,
                idle: Mutex::new(Vec::new()),
                semaphore: Arc::new(Semaphore::new(permits)),
                statements: AtomicU64::new(0),
                acquire_timeout: DEFAULT_ACQUIRE_TIMEOUT,
            }),
        })
    }

    /// Overrides how long a connection checkout waits for a free slot before
    /// failing with a timeout error (default 30 seconds).
    ///
    /// Must be called before the pool is cloned or shared, as it rebuilds the
    /// shared state. A zero duration keeps the default.
    pub fn with_acquire_timeout(mut self, timeout: Duration) -> Self {
        if let Some(inner) = Arc::get_mut(&mut self.inner) {
            inner.acquire_timeout = if timeout.is_zero() {
                DEFAULT_ACQUIRE_TIMEOUT
            } else {
                timeout
            };
        }
        self
    }

    /// Runs a query that returns rows.
    pub async fn fetch_all(&self, sql: String, params: Vec<Value>) -> crate::Result<Vec<Row>> {
        self.with_connection(move |conn| fetch_all(conn, &sql, &params))
            .await
    }

    /// Runs a statement that returns no rows.
    pub async fn execute(&self, sql: String, params: Vec<Value>) -> crate::Result<ExecuteResult> {
        self.with_connection(move |conn| execute(conn, &sql, &params))
            .await
    }

    /// Runs a batch of semicolon-separated statements with no bound parameters.
    ///
    /// Used to apply a migration's SQL, which may hold several statements.
    pub async fn execute_batch(&self, sql: String) -> crate::Result<()> {
        self.with_connection(move |conn| execute_batch(conn, &sql))
            .await
    }

    /// Returns the number of statements run through this pool so far.
    ///
    /// Useful in tests to confirm a query strategy (for example, that preloading
    /// adds one query per relation rather than one per row).
    pub fn statement_count(&self) -> u64 {
        self.inner.statements.load(Ordering::Relaxed)
    }

    /// Checks out a single connection and pins it for the caller's exclusive use.
    ///
    /// Every statement run through the returned handle uses the same connection,
    /// so a sequence such as `BEGIN`/.../`COMMIT` is sound regardless of the pool
    /// size. The connection returns to the pool when the handle is dropped. Used by
    /// the migration runner and the transaction API to pin a connection.
    pub(crate) async fn acquire_pinned(&self) -> crate::Result<PinnedSqlite> {
        let permit = self.inner.acquire_permit().await?;

        let checked_out = lock(&self.inner.idle).pop();
        let conn = match checked_out {
            Some(conn) => conn,
            None => {
                let inner = Arc::clone(&self.inner);
                tokio::task::spawn_blocking(move || inner.open())
                    .await
                    .map_err(|e| OrmError::connection(format!("database worker failed: {e}")))??
            }
        };

        Ok(PinnedSqlite {
            inner: Arc::clone(&self.inner),
            conn: Arc::new(Mutex::new(Some(conn))),
            gate: tokio::sync::Mutex::new(()),
            _permit: permit,
        })
    }

    /// Drops all idle connections. In-flight calls keep their connection until done.
    pub async fn close(&self) {
        let drained = {
            let mut idle = lock(&self.inner.idle);
            std::mem::take(&mut *idle)
        };
        drop(drained);
    }

    /// Checks out a connection, runs the blocking work off-runtime, and returns it.
    async fn with_connection<F, T>(&self, work: F) -> crate::Result<T>
    where
        F: FnOnce(&mut Connection) -> crate::Result<T> + Send + 'static,
        T: Send + 'static,
    {
        self.inner.statements.fetch_add(1, Ordering::Relaxed);

        // Bound concurrency before touching the blocking pool, failing fast if no
        // slot frees up within the checkout timeout instead of hanging forever.
        let permit = self.inner.acquire_permit().await?;

        let checked_out = lock(&self.inner.idle).pop();
        let inner = Arc::clone(&self.inner);

        // The blocking task returns the connection to the pool itself and reports
        // the result over a channel. This way, if the caller's future is cancelled
        // (for example by `tokio::time::timeout`) while the query runs, the
        // connection is still returned rather than dropped, so the pool never
        // leaks connections under request cancellation. The permit is held inside
        // the task so the concurrency bound stays correct until the work finishes.
        let (tx, rx) = oneshot::channel();
        tokio::task::spawn_blocking(move || {
            let _permit = permit;
            let mut conn = match checked_out {
                Some(conn) => conn,
                None => match inner.open() {
                    Ok(conn) => conn,
                    Err(error) => {
                        let _ = tx.send(Err(error));
                        return;
                    }
                },
            };
            // A normal query error (a constraint violation, say) leaves the
            // connection healthy, but a terminal error (disk full, corruption, an
            // IO failure) can poison it. On error, probe the connection and discard
            // it when the probe fails, so a broken connection is never handed back
            // to the pool to fail every later query routed to it. A fresh one is
            // opened on demand next time.
            let result = work(&mut conn);
            let healthy = result.is_ok() || conn.execute_batch("SELECT 1;").is_ok();
            if healthy {
                lock(&inner.idle).push(conn);
            }
            let _ = tx.send(result);
        });

        rx.await
            .map_err(|_| OrmError::query("database worker was dropped before completing"))?
    }
}

/// Locks the idle list, recovering from a poisoned mutex (the guarded data is a
/// plain `Vec` of connections, so a prior panic leaves it in a usable state).
fn lock(mutex: &Mutex<Vec<Connection>>) -> std::sync::MutexGuard<'_, Vec<Connection>> {
    mutex.lock().unwrap_or_else(|poisoned| poisoned.into_inner())
}

/// Parses a database URL into a connection source.
fn parse_url(url: &str) -> crate::Result<Source> {
    let trimmed = url.trim();
    if trimmed.is_empty() {
        return Err(OrmError::configuration("database url is empty"));
    }

    // Strip an optional `sqlite:` / `sqlite://` scheme.
    let without_scheme = trimmed
        .strip_prefix("sqlite://")
        .or_else(|| trimmed.strip_prefix("sqlite:"))
        .unwrap_or(trimmed);

    if without_scheme == ":memory:" || without_scheme.is_empty() {
        return Ok(Source::Memory);
    }

    // Reject parent-directory traversal so a database path derived from untrusted
    // input (a multi-tenant database name, say) cannot escape to an arbitrary file
    // such as `sqlite://../../etc/passwd`. Absolute paths chosen by the application
    // are allowed; only `..` components are blocked.
    let has_traversal = std::path::Path::new(without_scheme)
        .components()
        .any(|component| matches!(component, std::path::Component::ParentDir));
    if has_traversal {
        return Err(OrmError::configuration(
            "database path must not contain `..` components",
        ));
    }

    Ok(Source::File(without_scheme.to_string()))
}

/// Implements binding so a [`Value`] can be passed as a SQLite parameter.
impl ToSql for Value {
    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
        let value = match self {
            Value::Null => SqliteValue::Null,
            Value::Bool(b) => SqliteValue::Integer(i64::from(*b)),
            Value::Int(i) => SqliteValue::Integer(*i),
            Value::Real(r) => SqliteValue::Real(*r),
            Value::Text(s) => SqliteValue::Text(s.clone()),
            Value::Blob(b) => SqliteValue::Blob(b.clone()),
            Value::Timestamp(ts) => SqliteValue::Text(format_timestamp(ts)?),
            // PostgreSQL-specific values: SQLite has no native types for these, and
            // a `sqlite`-declared project is rejected at compile time before reaching
            // here. Store a defensive text form for any value built directly.
            Value::Json(j) => SqliteValue::Text(j.to_string()),
            Value::Uuid(u) => SqliteValue::Text(u.to_string()),
            Value::Array(items) => SqliteValue::Text(format!("{items:?}")),
        };
        Ok(ToSqlOutput::Owned(value))
    }
}

/// Formats a timestamp as RFC 3339 text for storage.
fn format_timestamp(ts: &time::OffsetDateTime) -> rusqlite::Result<String> {
    ts.format(&time::format_description::well_known::Rfc3339)
        .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))
}

/// Reads a driver-native column value into a backend-neutral [`Value`].
fn read_value(raw: ValueRef<'_>) -> crate::Result<Value> {
    Ok(match raw {
        ValueRef::Null => Value::Null,
        ValueRef::Integer(i) => Value::Int(i),
        ValueRef::Real(r) => Value::Real(r),
        ValueRef::Text(bytes) => {
            let text = std::str::from_utf8(bytes)
                .map_err(|_| OrmError::conversion("column text is not valid UTF-8"))?;
            Value::Text(text.to_string())
        }
        ValueRef::Blob(bytes) => Value::Blob(bytes.to_vec()),
    })
}

/// Runs a row-returning query against a connection.
fn fetch_all(conn: &mut Connection, sql: &str, params: &[Value]) -> crate::Result<Vec<Row>> {
    let mut statement = conn
        .prepare_cached(sql)
        .map_err(|e| OrmError::query("failed to prepare statement").with_source(e))?;

    let column_names: Arc<[String]> = statement
        .column_names()
        .into_iter()
        .map(str::to_string)
        .collect::<Vec<_>>()
        .into();
    let column_count = column_names.len();

    let mut rows = statement
        .query(rusqlite::params_from_iter(params.iter()))
        .map_err(|e| OrmError::query("query execution failed").with_source(e))?;

    let mut out = Vec::new();
    while let Some(row) = rows
        .next()
        .map_err(|e| OrmError::query("reading a row failed").with_source(e))?
    {
        let mut values = Vec::with_capacity(column_count);
        for index in 0..column_count {
            let raw = row
                .get_ref(index)
                .map_err(|e| OrmError::query("reading a column failed").with_source(e))?;
            values.push(read_value(raw)?);
        }
        out.push(Row::with_columns(Arc::clone(&column_names), values));
    }
    Ok(out)
}

/// Runs a batch of statements (no parameters) against a connection.
fn execute_batch(conn: &mut Connection, sql: &str) -> crate::Result<()> {
    conn.execute_batch(sql)
        .map_err(|e| OrmError::query("statement batch failed").with_source(e))
}

/// Runs a non-row-returning statement against a connection.
fn execute(conn: &mut Connection, sql: &str, params: &[Value]) -> crate::Result<ExecuteResult> {
    let affected = conn
        .prepare_cached(sql)
        .map_err(|e| OrmError::query("failed to prepare statement").with_source(e))?
        .execute(rusqlite::params_from_iter(params.iter()))
        .map_err(|e| OrmError::query("statement execution failed").with_source(e))?;

    Ok(ExecuteResult {
        rows_affected: affected as u64,
        last_insert_rowid: conn.last_insert_rowid(),
    })
}

/// A single connection pinned out of the pool for exclusive, sequential use.
///
/// Returns the connection to the pool when dropped.
pub(crate) struct PinnedSqlite {
    inner: Arc<Inner>,
    conn: Arc<Mutex<Option<Connection>>>,
    /// Serializes operations on this connection: a transaction runs its
    /// statements sequentially, so two concurrent calls (for example via
    /// `tokio::join!`) queue instead of racing for the single connection.
    gate: tokio::sync::Mutex<()>,
    _permit: OwnedSemaphorePermit,
}

impl PinnedSqlite {
    /// Takes the connection out for one blocking operation, then puts it back.
    fn take_conn(&self) -> crate::Result<Connection> {
        self.conn
            .lock()
            .unwrap_or_else(|poisoned| poisoned.into_inner())
            .take()
            .ok_or_else(|| OrmError::query("pinned connection is already in use"))
    }

    /// Runs one blocking operation on the pinned connection.
    ///
    /// The connection is restored to the shared slot inside the blocking task, so
    /// if the caller's future is cancelled mid-statement the connection is still
    /// returned to the handle (and, on drop, to the pool) rather than being lost.
    async fn run<F, T>(&self, work: F) -> crate::Result<T>
    where
        F: FnOnce(&mut Connection) -> crate::Result<T> + Send + 'static,
        T: Send + 'static,
    {
        // Hold the gate for the whole operation so concurrent statements on the
        // same transaction serialize rather than failing with "already in use".
        let _gate = self.gate.lock().await;
        self.inner.statements.fetch_add(1, Ordering::Relaxed);
        let mut conn = self.take_conn()?;
        let slot = Arc::clone(&self.conn);
        let (tx, rx) = oneshot::channel();
        tokio::task::spawn_blocking(move || {
            let result = work(&mut conn);
            *slot.lock().unwrap_or_else(|poisoned| poisoned.into_inner()) = Some(conn);
            let _ = tx.send(result);
        });
        rx.await
            .map_err(|_| OrmError::query("database worker was dropped before completing"))?
    }

    /// Runs a row-returning query on the pinned connection.
    pub(crate) async fn fetch_all(
        &self,
        sql: String,
        params: Vec<Value>,
    ) -> crate::Result<Vec<Row>> {
        self.run(move |conn| fetch_all(conn, &sql, &params)).await
    }

    /// Runs a non-row-returning statement on the pinned connection.
    pub(crate) async fn execute(
        &self,
        sql: String,
        params: Vec<Value>,
    ) -> crate::Result<ExecuteResult> {
        self.run(move |conn| execute(conn, &sql, &params)).await
    }

    /// Runs a batch of statements on the pinned connection.
    pub(crate) async fn execute_batch(&self, sql: String) -> crate::Result<()> {
        self.run(move |conn| execute_batch(conn, &sql)).await
    }

    /// Rolls back synchronously without `spawn_blocking`.
    ///
    /// Safe to call from a `Drop` impl where no async context is available. If
    /// the connection is not currently available (the mutex is None because a
    /// concurrent spawn_blocking holds it), the rollback is skipped — SQLite
    /// closes any open transaction when the connection is eventually dropped.
    pub(crate) fn rollback_now(&self) {
        if let Ok(conn) = self.take_conn() {
            if let Err(error) = conn.execute_batch("ROLLBACK") {
                eprintln!("tork-orm: failed to rollback transaction on drop: {error}");
            }
            *self.conn.lock().unwrap_or_else(|poisoned| poisoned.into_inner()) = Some(conn);
        }
    }
}

impl Drop for PinnedSqlite {
    fn drop(&mut self) {
        if let Some(conn) = self
            .conn
            .lock()
            .unwrap_or_else(|poisoned| poisoned.into_inner())
            .take()
        {
            lock(&self.inner.idle).push(conn);
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    impl SqlitePool {
        /// Number of connections currently idle in the pool (test-only).
        fn idle_len(&self) -> usize {
            lock(&self.inner.idle).len()
        }
    }

    /// A row-returning query slow enough to be cancelled by a sub-millisecond
    /// timeout: a recursive CTE counting to several million.
    const SLOW_QUERY: &str = "WITH RECURSIVE c(n) AS (SELECT 1 UNION ALL \
         SELECT n + 1 FROM c WHERE n < 4000000) SELECT count(*) FROM c";

    #[tokio::test]
    async fn checkout_times_out_instead_of_hanging_forever() {
        // One connection, a short checkout timeout. Pinning the only connection
        // leaves a pooled query with no slot, so it must fail fast rather than
        // wait forever.
        let pool = SqlitePool::new(":memory:", 1)
            .unwrap()
            .with_acquire_timeout(Duration::from_millis(50));

        let pinned = pool.acquire_pinned().await.unwrap();

        let start = std::time::Instant::now();
        let result = pool.fetch_all("SELECT 1".into(), vec![]).await;
        let waited = start.elapsed();

        let error = result.expect_err("checkout should time out");
        assert!(
            error.to_string().contains("timed out"),
            "expected a timeout error, got: {error}"
        );
        assert!(waited < Duration::from_secs(5), "must fail fast, not hang");

        // Releasing the pinned connection lets the next query proceed.
        drop(pinned);
        let rows = pool.fetch_all("SELECT 1".into(), vec![]).await.unwrap();
        assert_eq!(rows.len(), 1);
    }

    #[tokio::test]
    async fn cancelled_query_returns_its_connection_to_the_pool() {
        // A query whose future is cancelled (here by a timeout firing before it
        // finishes) must not lose its connection: the blocking task returns it to
        // the pool so later checkouts reuse it instead of the pool thrashing.
        let pool = SqlitePool::new(":memory:", 1).unwrap();

        let cancelled = tokio::time::timeout(
            Duration::from_millis(1),
            pool.fetch_all(SLOW_QUERY.into(), vec![]),
        )
        .await;
        assert!(
            cancelled.is_err(),
            "the slow query should be cancelled by the timeout"
        );

        // The blocking task is still finishing the query; once it does, it returns
        // the connection to the idle pool. Wait for that to happen.
        let mut returned = false;
        for _ in 0..200 {
            if pool.idle_len() == 1 {
                returned = true;
                break;
            }
            tokio::time::sleep(Duration::from_millis(25)).await;
        }
        assert!(
            returned,
            "the cancelled query's connection was never returned to the pool"
        );

        // And the pool still serves queries on that single recovered connection.
        let rows = pool.fetch_all("SELECT 1".into(), vec![]).await.unwrap();
        assert_eq!(rows.len(), 1);
    }

    #[test]
    fn rejects_parent_directory_traversal_in_the_path() {
        assert!(SqlitePool::new("sqlite://../../etc/passwd", 1).is_err());
        assert!(SqlitePool::new("../secret.db", 1).is_err());
        // Ordinary relative, absolute, and in-memory paths are accepted.
        assert!(SqlitePool::new("app.db", 1).is_ok());
        assert!(SqlitePool::new("/tmp/tork-test.db", 1).is_ok());
        assert!(SqlitePool::new(":memory:", 1).is_ok());
    }

    #[tokio::test]
    async fn a_query_error_keeps_a_healthy_connection() {
        // A failing statement (here, a missing table) is an ordinary query error,
        // not a poisoned connection: the connection passes the health probe and is
        // returned to the pool so it is reused, not discarded and reopened.
        let pool = SqlitePool::new(":memory:", 1).unwrap();

        let result = pool
            .fetch_all("SELECT * FROM does_not_exist".into(), vec![])
            .await;
        assert!(result.is_err(), "the query should fail");
        assert_eq!(pool.idle_len(), 1, "a healthy connection stays in the pool");

        // The pool still serves queries on the same connection.
        let rows = pool.fetch_all("SELECT 1".into(), vec![]).await.unwrap();
        assert_eq!(rows.len(), 1);
    }

    #[tokio::test]
    async fn query_errors_do_not_embed_the_raw_sql() {
        // A failing statement must not put the verbatim SQL (table/column names,
        // query shape) into the error text, which could surface schema details.
        let pool = SqlitePool::new(":memory:", 1).unwrap();

        let error = pool
            .fetch_all("SELECT secret_column FROM secret_table".into(), vec![])
            .await
            .expect_err("a missing table should error");

        assert!(
            !error.message().contains("secret_table")
                && !error.message().contains("secret_column"),
            "the error message leaked SQL: {}",
            error.message()
        );
        assert!(
            !error.to_string().contains("secret_table"),
            "the error display leaked SQL: {error}"
        );
    }
}