use tokio::fs;
use turso::{Builder, EncryptionOpts, Error, Value};
#[tokio::test]
async fn test_rows_next() {
let builder = Builder::new_local(":memory:");
let db = builder.build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute("CREATE TABLE test (x INTEGER)", ())
.await
.unwrap();
conn.execute("INSERT INTO test (x) VALUES (1)", ())
.await
.unwrap();
assert_eq!(conn.last_insert_rowid(), 1);
conn.execute("INSERT INTO test (x) VALUES (2)", ())
.await
.unwrap();
assert_eq!(conn.last_insert_rowid(), 2);
conn.execute(
"INSERT INTO test (x) VALUES (:x)",
vec![(":x".to_string(), Value::Integer(3))],
)
.await
.unwrap();
assert_eq!(conn.last_insert_rowid(), 3);
conn.execute(
"INSERT INTO test (x) VALUES (@x)",
vec![("@x".to_string(), Value::Integer(4))],
)
.await
.unwrap();
assert_eq!(conn.last_insert_rowid(), 4);
conn.execute(
"INSERT INTO test (x) VALUES ($x)",
vec![("$x".to_string(), Value::Integer(5))],
)
.await
.unwrap();
assert_eq!(conn.last_insert_rowid(), 5);
let mut res = conn.query("SELECT * FROM test", ()).await.unwrap();
assert_eq!(
res.next().await.unwrap().unwrap().get_value(0).unwrap(),
1.into()
);
assert_eq!(
res.next().await.unwrap().unwrap().get_value(0).unwrap(),
2.into()
);
assert_eq!(
res.next().await.unwrap().unwrap().get_value(0).unwrap(),
3.into()
);
assert_eq!(
res.next().await.unwrap().unwrap().get_value(0).unwrap(),
4.into()
);
assert_eq!(
res.next().await.unwrap().unwrap().get_value(0).unwrap(),
5.into()
);
assert!(res.next().await.unwrap().is_none());
}
#[tokio::test]
async fn test_cacheflush() {
let builder = Builder::new_local("test.db");
let db = builder.build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute("CREATE TABLE IF NOT EXISTS asdf (x INTEGER)", ())
.await
.unwrap();
conn.execute("BEGIN", ()).await.unwrap();
conn.execute("INSERT INTO asdf (x) VALUES (1)", ())
.await
.unwrap();
conn.cacheflush().unwrap();
conn.execute("ROLLBACK", ()).await.unwrap();
conn.execute("INSERT INTO asdf (x) VALUES (2)", ())
.await
.unwrap();
conn.execute("INSERT INTO asdf (x) VALUES (3)", ())
.await
.unwrap();
let mut res = conn.query("SELECT * FROM asdf", ()).await.unwrap();
assert_eq!(
res.next().await.unwrap().unwrap().get_value(0).unwrap(),
2.into()
);
assert_eq!(
res.next().await.unwrap().unwrap().get_value(0).unwrap(),
3.into()
);
conn.execute("BEGIN", ()).await.unwrap();
conn.execute("INSERT INTO asdf (x) VALUES (1)", ())
.await
.unwrap();
conn.cacheflush().unwrap();
conn.execute("COMMIT", ()).await.unwrap();
let mut res = conn
.query("SELECT * FROM asdf WHERE x = 1", ())
.await
.unwrap();
assert_eq!(
res.next().await.unwrap().unwrap().get_value(0).unwrap(),
1.into()
);
fs::remove_file("test.db").await.unwrap();
fs::remove_file("test.db-wal").await.unwrap();
}
#[tokio::test]
async fn test_rows_returned() {
let db = Builder::new_local(":memory:").build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, val TEXT)", ())
.await
.unwrap();
let changed = conn
.execute("INSERT INTO t VALUES (1,'hello')", ())
.await
.unwrap();
let changed1 = conn
.execute("INSERT INTO t VALUES (2,'hi')", ())
.await
.unwrap();
let changed2 = conn
.execute("UPDATE t SET val='hi' WHERE id=1", ())
.await
.unwrap();
let changed3 = conn
.execute("DELETE FROM t WHERE val='hi'", ())
.await
.unwrap();
assert_eq!(changed, 1);
assert_eq!(changed1, 1);
assert_eq!(changed2, 1);
assert_eq!(changed3, 2);
conn.execute(
"CREATE TABLE authors ( id INTEGER PRIMARY KEY, name TEXT NOT NULL);
",
(),
)
.await
.unwrap();
conn.execute(
"CREATE TABLE books ( id INTEGER PRIMARY KEY, author_id INTEGER NOT NULL REFERENCES authors(id), title TEXT NOT NULL); "
,()
).await.unwrap();
conn.execute(
"CREATE TABLE prize_winners ( book_id INTEGER PRIMARY KEY, author_name TEXT NOT NULL);",
(),
)
.await
.unwrap();
conn.execute(
"INSERT INTO authors (id, name) VALUES (1, 'Alice'), (2, 'Bob');",
(),
)
.await
.unwrap();
conn.execute(
"INSERT INTO books (id, author_id, title) VALUES (1, 1, 'Rust in Action'), (2, 1, 'Async Adventures'), (3, 1, 'Fearless Concurrency'), (4, 1, 'Unsafe Tales'), (5, 1, 'Zero-Cost Futures'), (6, 2, 'Learning SQL');",
()
).await.unwrap();
let rows_changed = conn
.execute(
"
INSERT INTO prize_winners (book_id, author_name)
SELECT b.id, a.name
FROM books b
JOIN authors a ON a.id = b.author_id
WHERE a.id = 1; -- Alice's five books
",
(),
)
.await
.unwrap();
assert_eq!(rows_changed, 5);
}
#[tokio::test]
pub async fn test_execute_batch() {
let db = Builder::new_local(":memory:").build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute_batch("CREATE TABLE authors ( id INTEGER PRIMARY KEY, name TEXT NOT NULL);CREATE TABLE books ( id INTEGER PRIMARY KEY, author_id INTEGER NOT NULL REFERENCES authors(id), title TEXT NOT NULL); INSERT INTO authors (id, name) VALUES (1, 'Alice'), (2, 'Bob');")
.await
.unwrap();
let mut rows = conn
.query("SELECT COUNT(*) FROM authors;", ())
.await
.unwrap();
if let Some(row) = rows.next().await.unwrap() {
assert_eq!(row.get_value(0).unwrap(), Value::Integer(2));
}
}
#[tokio::test]
async fn test_query_row_returns_first_row() {
let db = Builder::new_local(":memory:").build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute("CREATE TABLE users (id INTEGER, name TEXT)", ())
.await
.unwrap();
conn.execute("INSERT INTO users VALUES (1, 'Frodo')", ())
.await
.unwrap();
let row = conn
.prepare("SELECT id FROM users WHERE name = ?")
.await
.unwrap()
.query_row(&["Frodo"])
.await
.unwrap();
let id: i64 = row.get(0).unwrap();
assert_eq!(id, 1);
}
#[tokio::test]
async fn test_query_row_returns_no_rows_error() {
let db = Builder::new_local(":memory:").build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute("CREATE TABLE users (id INTEGER, name TEXT)", ())
.await
.unwrap();
let result = conn
.prepare("SELECT id FROM users WHERE name = ?")
.await
.unwrap()
.query_row(&["Ghost"])
.await;
assert!(matches!(result, Err(Error::QueryReturnedNoRows)));
}
#[tokio::test]
async fn test_row_get_column_typed() {
let db = Builder::new_local(":memory:").build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute("CREATE TABLE v (n INTEGER, label TEXT)", ())
.await
.unwrap();
conn.execute("INSERT INTO v VALUES (42, 'answer')", ())
.await
.unwrap();
let mut rows = conn.query("SELECT * FROM v", ()).await.unwrap();
let row = rows.next().await.unwrap().unwrap();
let n: i64 = row.get(0).unwrap();
let label: String = row.get(1).unwrap();
assert_eq!(n, 42);
assert_eq!(label, "answer");
}
#[tokio::test]
async fn test_row_get_conversion_error() {
let db = Builder::new_local(":memory:").build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute("CREATE TABLE t (x TEXT)", ()).await.unwrap();
conn.execute("INSERT INTO t VALUES (NULL)", ())
.await
.unwrap();
let mut rows = conn.query("SELECT x FROM t", ()).await.unwrap();
let row = rows.next().await.unwrap().unwrap();
let result: Result<u32, _> = row.get(0);
assert!(matches!(result, Err(Error::ConversionFailure(_))));
}
#[tokio::test]
async fn test_index() {
let db = Builder::new_local(":memory:").build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute("CREATE TABLE users (name TEXT PRIMARY KEY, email TEXT)", ())
.await
.unwrap();
conn.execute("CREATE INDEX email_idx ON users(email)", ())
.await
.unwrap();
conn.execute(
"INSERT INTO users VALUES ('alice', 'a@b.c'), ('bob', 'b@d.e')",
(),
)
.await
.unwrap();
let mut rows = conn
.query("SELECT * FROM users WHERE email = 'a@b.c'", ())
.await
.unwrap();
let row = rows.next().await.unwrap().unwrap();
assert!(row.get::<String>(0).unwrap() == "alice");
assert!(row.get::<String>(1).unwrap() == "a@b.c");
assert!(rows.next().await.unwrap().is_none());
let mut rows = conn
.query("SELECT * FROM users WHERE email = 'b@d.e'", ())
.await
.unwrap();
let row = rows.next().await.unwrap().unwrap();
assert!(row.get::<String>(0).unwrap() == "bob");
assert!(row.get::<String>(1).unwrap() == "b@d.e");
assert!(rows.next().await.unwrap().is_none());
}
#[tokio::test]
async fn test_concurrent_unique_constraint_regression() {
use std::sync::Arc;
use tokio::sync::Barrier;
let db = Builder::new_local(":memory:").build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute(
"CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT UNIQUE,
created_at DATETIME
)",
(),
)
.await
.unwrap();
conn.execute(
"INSERT INTO users (email, created_at) VALUES (:email, :created_at)",
vec![
(":email".to_string(), Value::Text("seed@example.com".into())),
(":created_at".to_string(), Value::Text("whatever".into())),
],
)
.await
.unwrap();
let barrier = Arc::new(Barrier::new(8));
let mut handles = Vec::new();
for _ in 0..8 {
let conn = db.connect().unwrap();
let barrier = barrier.clone();
handles.push(tokio::spawn(async move {
barrier.wait().await;
let mut prepared_stmt = conn
.prepare("INSERT INTO users (email, created_at) VALUES (:email, :created_at)")
.await
.unwrap();
for i in 0..1000 {
let email = match i % 3 {
0 => "seed@example.com",
1 => "dup@example.com",
2 => "dapper@example.com",
_ => panic!("Invalid email index: {i}"),
};
let result = prepared_stmt
.execute(vec![
(":email".to_string(), Value::Text(email.into())),
(":created_at".to_string(), Value::Text("whatever".into())),
])
.await;
match result {
Ok(_) => (),
Err(Error::Constraint(e)) if e.contains("UNIQUE constraint failed") => {}
Err(Error::Busy(e)) if e.contains("database is locked") => {}
Err(e) => {
panic!("Error executing statement: {e:?}");
}
}
}
}));
}
for handle in handles {
handle.await.unwrap();
}
}
#[tokio::test]
async fn test_statement_query_resets_before_execution() {
let db = Builder::new_local(":memory:").build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)", ())
.await
.unwrap();
for i in 0..5 {
conn.execute(&format!("INSERT INTO t VALUES ({i}, 'value_{i}')"), ())
.await
.unwrap();
}
let mut stmt = conn
.prepare("SELECT id, value FROM t ORDER BY id")
.await
.unwrap();
let mut rows = stmt.query(()).await.unwrap();
let mut count = 0;
while let Some(row) = rows.next().await.unwrap() {
let id: i64 = row.get(0).unwrap();
assert_eq!(id, count);
count += 1;
}
assert_eq!(count, 5);
let mut rows = stmt.query(()).await.unwrap();
let mut count = 0;
while let Some(row) = rows.next().await.unwrap() {
let id: i64 = row.get(0).unwrap();
assert_eq!(id, count);
count += 1;
}
assert_eq!(count, 5, "Second query() should return all rows again");
}
#[tokio::test]
async fn test_encryption() {
let temp_dir = tempfile::tempdir().unwrap();
let db_file = temp_dir.path().join("test-encrypted.db");
let db_file = db_file.to_str().unwrap();
let hexkey = "b1bbfda4f589dc9daaf004fe21111e00dc00c98237102f5c7002a5669fc76327";
let wrong_key = "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff";
let encryption_opts = EncryptionOpts {
hexkey: hexkey.to_string(),
cipher: "aegis256".to_string(),
};
{
let builder = Builder::new_local(db_file)
.experimental_encryption(true)
.with_encryption(encryption_opts.clone());
let db = builder.build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute(
"CREATE TABLE test (id INTEGER PRIMARY KEY, value TEXT);",
(),
)
.await
.unwrap();
conn.execute("INSERT INTO test (value) VALUES ('secret_data')", ())
.await
.unwrap();
let mut row_count = 0;
let mut rows = conn.query("SELECT * FROM test", ()).await.unwrap();
while let Some(row) = rows.next().await.unwrap() {
assert_eq!(row.get::<i64>(0).unwrap(), 1);
assert_eq!(row.get::<String>(1).unwrap(), "secret_data");
row_count += 1;
}
assert_eq!(row_count, 1);
let mut rows = conn
.query("PRAGMA wal_checkpoint(TRUNCATE)", ())
.await
.unwrap();
while rows.next().await.unwrap().is_some() {}
}
let content = std::fs::read(db_file).unwrap();
assert!(content.len() > 1024);
assert!(
!content.windows(11).any(|w| w == b"secret_data"),
"Plaintext should not appear in encrypted database file"
);
{
let builder = Builder::new_local(db_file)
.experimental_encryption(true)
.with_encryption(encryption_opts.clone());
let db = builder.build().await.unwrap();
let conn = db.connect().unwrap();
let mut row_count = 0;
let mut rows = conn.query("SELECT * FROM test", ()).await.unwrap();
while let Some(row) = rows.next().await.unwrap() {
assert_eq!(row.get::<i64>(0).unwrap(), 1);
assert_eq!(row.get::<String>(1).unwrap(), "secret_data");
row_count += 1;
}
assert_eq!(row_count, 1);
}
{
let wrong_opts = EncryptionOpts {
hexkey: wrong_key.to_string(),
cipher: "aegis256".to_string(),
};
let builder = Builder::new_local(db_file)
.experimental_encryption(true)
.with_encryption(wrong_opts);
let result = builder.build().await;
assert!(result.is_err(), "Opening with wrong key should fail");
}
{
let builder = Builder::new_local(db_file).experimental_encryption(true);
let result = builder.build().await;
assert!(
result.is_err(),
"Opening encrypted database without key should fail"
);
}
}
#[tokio::test]
async fn test_query_without_reset_does_not_panic() {
let tempfile = tempfile::NamedTempFile::new().unwrap();
let db = Builder::new_local(tempfile.path().to_str().unwrap())
.build()
.await
.unwrap();
let conn = db.connect().unwrap();
conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)", ())
.await
.unwrap();
for i in 0..10 {
conn.execute(&format!("INSERT INTO t VALUES ({i}, 'val')"), ())
.await
.unwrap();
}
let mut stmts: Vec<Option<turso::Statement>> = Vec::new();
for round in 0..4 {
for i in 0..30 {
let id = 100 + round * 100 + i;
let sql = match i % 4 {
0 => format!("INSERT INTO t VALUES ({id}, 'new')"),
1 => "SELECT * FROM t".to_string(),
2 => format!("UPDATE t SET value = 'upd' WHERE id = {}", i % 10),
_ => format!("DELETE FROM t WHERE id = {}", 1000 + i),
};
if let Ok(s) = conn.prepare(&sql).await {
stmts.push(Some(s));
}
}
for i in (0..stmts.len()).step_by(7) {
if let Some(Some(stmt)) = stmts.get_mut(i) {
if let Ok(mut rows) = stmt.query(()).await {
let _ = rows.next().await;
}
}
}
for i in 0..3 {
let _ = conn
.execute(
&format!("INSERT INTO t VALUES ({}, 'x')", 2000 + round * 10 + i),
(),
)
.await;
}
for i in (0..stmts.len()).step_by(13) {
stmts[i] = None;
}
for i in (0..stmts.len()).step_by(5) {
if let Some(Some(stmt)) = stmts.get_mut(i) {
if let Ok(mut rows) = stmt.query(()).await {
let _ = rows.next().await;
}
}
}
}
}
#[tokio::test]
async fn test_transaction_prepared_statement() {
let db = Builder::new_local(":memory:").build().await.unwrap();
let mut conn = db.connect().unwrap();
conn.execute("CREATE TABLE users (id INTEGER, name TEXT)", ())
.await
.unwrap();
let tx = conn.transaction().await.unwrap();
let mut stmt = tx
.prepare("INSERT INTO users VALUES (?1, ?2)")
.await
.unwrap();
stmt.execute(["1", "Frodo"]).await.unwrap();
tx.commit().await.unwrap();
let row = conn
.prepare("SELECT id FROM users WHERE name = ?")
.await
.unwrap()
.query_row(&["Frodo"])
.await
.unwrap();
let id: i64 = row.get(0).unwrap();
assert_eq!(id, 1);
}
#[tokio::test]
async fn test_row_get_value_out_of_bounds() {
let db = Builder::new_local(":memory:").build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute("CREATE TABLE t (x INTEGER)", ())
.await
.unwrap();
conn.execute("INSERT INTO t VALUES (1)", ()).await.unwrap();
let mut rows = conn.query("SELECT x FROM t", ()).await.unwrap();
let row = rows.next().await.unwrap().unwrap();
assert!(row.get_value(0).is_ok());
let result = row.get_value(999);
assert!(matches!(result, Err(Error::Misuse(_))));
let result: Result<i64, _> = row.get(999);
assert!(matches!(result, Err(Error::Misuse(_))));
}
#[tokio::test]
async fn test_connection_clone() {
let db = Builder::new_local(":memory:").build().await.unwrap();
let mut conn = db.connect().unwrap();
conn.execute("CREATE TABLE users (id INTEGER, name TEXT)", ())
.await
.unwrap();
let tx = conn.transaction().await.unwrap();
let mut stmt = tx
.prepare("INSERT INTO users VALUES (?1, ?2)")
.await
.unwrap();
stmt.execute(["1", "Frodo"]).await.unwrap();
tx.commit().await.unwrap();
let conn2 = conn.clone();
let row = conn2
.prepare("SELECT id FROM users WHERE name = ?")
.await
.unwrap()
.query_row(&["Frodo"])
.await
.unwrap();
let id: i64 = row.get(0).unwrap();
assert_eq!(id, 1);
}
#[tokio::test]
async fn test_insert_returning_partial_consume() {
let db = Builder::new_local(":memory:").build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute("CREATE TABLE t (x INTEGER)", ())
.await
.unwrap();
let mut stmt = conn
.prepare("INSERT INTO t (x) VALUES (1), (2), (3) RETURNING x")
.await
.unwrap();
let mut rows = stmt.query(()).await.unwrap();
let first_row = rows.next().await.unwrap().unwrap();
assert_eq!(first_row.get::<i64>(0).unwrap(), 1);
drop(rows);
drop(stmt);
let mut count_rows = conn.query("SELECT COUNT(*) FROM t", ()).await.unwrap();
let count: i64 = count_rows.next().await.unwrap().unwrap().get(0).unwrap();
assert_eq!(
count, 3,
"All 3 rows should be inserted even if RETURNING was partially consumed"
);
}
#[tokio::test]
async fn test_transaction_commit_without_mvcc() {
let db = Builder::new_local(":memory:").build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, value TEXT)", ())
.await
.unwrap();
conn.execute("BEGIN IMMEDIATE TRANSACTION", ())
.await
.unwrap();
conn.execute("INSERT INTO test (id, value) VALUES (1, 'hello')", ())
.await
.unwrap();
conn.execute("COMMIT", ())
.await
.expect("COMMIT should succeed for non-MVCC transactions");
let mut rows = conn
.query("SELECT value FROM test WHERE id = 1", ())
.await
.unwrap();
let row = rows.next().await.unwrap().unwrap();
let value: String = row.get(0).unwrap();
assert_eq!(value, "hello", "Data should be committed");
}
#[tokio::test]
async fn test_transaction_with_insert_returning_then_commit() {
let db = Builder::new_local(":memory:").build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute("CREATE TABLE t (x INTEGER)", ())
.await
.unwrap();
conn.execute("BEGIN IMMEDIATE TRANSACTION", ())
.await
.unwrap();
let mut stmt = conn
.prepare("INSERT INTO t (x) VALUES (1), (2), (3) RETURNING x")
.await
.unwrap();
let mut rows = stmt.query(()).await.unwrap();
let first = rows.next().await.unwrap().unwrap();
assert_eq!(first.get::<i64>(0).unwrap(), 1);
drop(rows);
drop(stmt);
conn.execute("COMMIT", ())
.await
.expect("COMMIT should succeed after INSERT...RETURNING");
let mut count_rows = conn.query("SELECT COUNT(*) FROM t", ()).await.unwrap();
let count: i64 = count_rows.next().await.unwrap().unwrap().get(0).unwrap();
assert_eq!(count, 3, "All rows should be committed");
}
#[tokio::test]
async fn test_prepare_cached_basic() {
let db = Builder::new_local(":memory:").build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)", ())
.await
.unwrap();
let mut stmt1 = conn
.prepare_cached("SELECT * FROM users WHERE id = ?")
.await
.unwrap();
conn.execute("INSERT INTO users VALUES (1, 'Alice')", ())
.await
.unwrap();
let mut rows = stmt1.query(vec![Value::Integer(1)]).await.unwrap();
let row = rows.next().await.unwrap().unwrap();
assert_eq!(row.get::<i64>(0).unwrap(), 1);
assert_eq!(row.get::<String>(1).unwrap(), "Alice");
drop(rows);
drop(stmt1);
let mut stmt2 = conn
.prepare_cached("SELECT * FROM users WHERE id = ?")
.await
.unwrap();
let mut rows = stmt2.query(vec![Value::Integer(1)]).await.unwrap();
let row = rows.next().await.unwrap().unwrap();
assert_eq!(row.get::<i64>(0).unwrap(), 1);
assert_eq!(row.get::<String>(1).unwrap(), "Alice");
}
#[tokio::test]
async fn test_prepare_cached_reprepare_on_query_only_change() {
let db = Builder::new_local(":memory:").build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute("CREATE TABLE t (id INTEGER)", ())
.await
.unwrap();
let mut stmt = conn
.prepare_cached("INSERT INTO t VALUES (?)")
.await
.unwrap();
conn.execute("PRAGMA query_only=1", ()).await.unwrap();
let err = stmt.execute(vec![Value::Integer(1)]).await.unwrap_err();
assert!(err.to_string().to_ascii_lowercase().contains("query_only"));
let mut rows = conn.query("SELECT COUNT(*) FROM t", ()).await.unwrap();
let count: i64 = rows.next().await.unwrap().unwrap().get(0).unwrap();
assert_eq!(count, 0);
}
#[tokio::test]
async fn test_prepare_cached_batch_insert_delete_pattern() {
#[derive(Clone)]
struct Host {
name: String,
app: String,
address: String,
namespace: String,
cloud_cluster_name: String,
allowed_ips: Vec<String>,
updated_at: std::time::SystemTime,
deleted: bool,
}
fn serialize_allowed_ips(allowed_ips: &[String]) -> String {
allowed_ips.join(",")
}
fn system_time_to_unix_seconds(ts: std::time::SystemTime) -> i64 {
let duration = ts
.duration_since(std::time::UNIX_EPOCH)
.expect("system time should be after unix epoch");
duration.as_secs() as i64
}
async fn insert_hosts(conn: &turso::Connection, hosts: &[Host]) -> Result<(), Error> {
if hosts.is_empty() {
return Ok(());
}
conn.execute("BEGIN", ()).await?;
let mut insert_stmt = conn
.prepare_cached(
"INSERT INTO hosts (name, app, address, namespace, cloud_cluster_name, allowed_ips, updated_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
ON CONFLICT(name) DO UPDATE SET
app = excluded.app,
address = excluded.address,
namespace = excluded.namespace,
cloud_cluster_name = excluded.cloud_cluster_name,
allowed_ips = excluded.allowed_ips,
updated_at = excluded.updated_at",
)
.await?;
let mut delete_stmt = conn
.prepare_cached("DELETE FROM hosts WHERE name = ?1")
.await?;
let result = async {
for host in hosts {
if host.deleted {
delete_stmt.execute([host.name.as_str()]).await?;
continue;
}
let allowed_ips = serialize_allowed_ips(&host.allowed_ips);
let updated_at = system_time_to_unix_seconds(host.updated_at);
insert_stmt
.execute((
host.name.as_str(),
host.app.as_str(),
host.address.as_str(),
host.namespace.as_str(),
host.cloud_cluster_name.as_str(),
allowed_ips,
updated_at,
))
.await?;
}
Ok(())
}
.await;
match result {
Ok(()) => {
conn.execute("COMMIT", ()).await?;
Ok(())
}
Err(err) => {
let _ = conn.execute("ROLLBACK", ()).await;
Err(err)
}
}
}
let db = Builder::new_local(":memory:").build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute(
"CREATE TABLE hosts (
name TEXT PRIMARY KEY,
app TEXT,
address TEXT,
namespace TEXT,
cloud_cluster_name TEXT,
allowed_ips TEXT,
updated_at INTEGER
)",
(),
)
.await
.unwrap();
let base_time = std::time::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000);
let hosts = vec![
Host {
name: "a".to_string(),
app: "app_a".to_string(),
address: "10.0.0.1".to_string(),
namespace: "ns".to_string(),
cloud_cluster_name: "cluster".to_string(),
allowed_ips: vec!["10.0.0.0/24".to_string()],
updated_at: base_time,
deleted: false,
},
Host {
name: "b".to_string(),
app: "app_b".to_string(),
address: "10.0.0.2".to_string(),
namespace: "ns".to_string(),
cloud_cluster_name: "cluster".to_string(),
allowed_ips: vec!["10.0.1.0/24".to_string()],
updated_at: base_time,
deleted: false,
},
Host {
name: "a".to_string(),
app: "app_a".to_string(),
address: "10.0.0.1".to_string(),
namespace: "ns".to_string(),
cloud_cluster_name: "cluster".to_string(),
allowed_ips: vec!["10.0.0.0/24".to_string()],
updated_at: base_time,
deleted: true,
},
];
insert_hosts(&conn, &hosts).await.unwrap();
let mut rows = conn
.query("SELECT name FROM hosts ORDER BY name", ())
.await
.unwrap();
let first = rows
.next()
.await
.unwrap()
.unwrap()
.get::<String>(0)
.unwrap();
assert_eq!(first, "b");
assert!(rows.next().await.unwrap().is_none());
}
#[tokio::test]
async fn test_prepare_cached_multiple_statements() {
let db = Builder::new_local(":memory:").build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute("CREATE TABLE t (id INTEGER, value TEXT)", ())
.await
.unwrap();
let queries = vec![
"SELECT * FROM t WHERE id = ?",
"SELECT * FROM t WHERE value = ?",
"INSERT INTO t VALUES (?, ?)",
];
for query in &queries {
let _ = conn.prepare_cached(*query).await.unwrap();
}
let mut stmt1 = conn.prepare_cached(queries[0]).await.unwrap();
let mut stmt2 = conn.prepare_cached(queries[1]).await.unwrap();
let mut stmt3 = conn.prepare_cached(queries[2]).await.unwrap();
stmt3
.execute(vec![Value::Integer(1), Value::Text("test".into())])
.await
.unwrap();
let mut rows = stmt1.query(vec![Value::Integer(1)]).await.unwrap();
let row = rows.next().await.unwrap().unwrap();
assert_eq!(row.get::<i64>(0).unwrap(), 1);
drop(rows);
let mut rows = stmt2.query(vec![Value::Text("test".into())]).await.unwrap();
let row = rows.next().await.unwrap().unwrap();
assert_eq!(row.get::<String>(1).unwrap(), "test");
}
#[tokio::test]
async fn test_prepare_cached_independent_state() {
let db = Builder::new_local(":memory:").build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute("CREATE TABLE t (id INTEGER)", ())
.await
.unwrap();
for i in 1..=5 {
conn.execute(&format!("INSERT INTO t VALUES ({i})"), ())
.await
.unwrap();
}
let query = "SELECT * FROM t ORDER BY id";
let mut stmt1 = conn.prepare_cached(query).await.unwrap();
let mut stmt2 = conn.prepare_cached(query).await.unwrap();
let mut rows1 = stmt1.query(()).await.unwrap();
let row1 = rows1.next().await.unwrap().unwrap();
assert_eq!(row1.get::<i64>(0).unwrap(), 1);
let mut rows2 = stmt2.query(()).await.unwrap();
let row2 = rows2.next().await.unwrap().unwrap();
assert_eq!(row2.get::<i64>(0).unwrap(), 1);
let row1 = rows1.next().await.unwrap().unwrap();
assert_eq!(row1.get::<i64>(0).unwrap(), 2);
let row2 = rows2.next().await.unwrap().unwrap();
assert_eq!(row2.get::<i64>(0).unwrap(), 2);
}
#[tokio::test]
async fn test_prepare_cached_with_parameters() {
let db = Builder::new_local(":memory:").build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute(
"CREATE TABLE users (id INTEGER, name TEXT, age INTEGER)",
(),
)
.await
.unwrap();
conn.execute("INSERT INTO users VALUES (1, 'Alice', 30)", ())
.await
.unwrap();
conn.execute("INSERT INTO users VALUES (2, 'Bob', 25)", ())
.await
.unwrap();
conn.execute("INSERT INTO users VALUES (3, 'Charlie', 35)", ())
.await
.unwrap();
let query = "SELECT name FROM users WHERE age > ?";
let mut stmt = conn.prepare_cached(query).await.unwrap();
let mut rows = stmt.query(vec![Value::Integer(25)]).await.unwrap();
let mut names = Vec::new();
while let Some(row) = rows.next().await.unwrap() {
names.push(row.get::<String>(0).unwrap());
}
assert_eq!(names.len(), 2);
assert!(names.contains(&"Alice".to_string()));
assert!(names.contains(&"Charlie".to_string()));
drop(rows);
let mut rows = stmt.query(vec![Value::Integer(30)]).await.unwrap();
let mut names = Vec::new();
while let Some(row) = rows.next().await.unwrap() {
names.push(row.get::<String>(0).unwrap());
}
assert_eq!(names.len(), 1);
assert_eq!(names[0], "Charlie");
}
#[tokio::test]
async fn test_prepare_cached_stress() {
let db = Builder::new_local(":memory:").build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)", ())
.await
.unwrap();
let insert_query = "INSERT INTO t (id, value) VALUES (?, ?)";
let select_query = "SELECT value FROM t WHERE id = ?";
for i in 0..100 {
let mut stmt = conn.prepare_cached(insert_query).await.unwrap();
stmt.execute(vec![Value::Integer(i), Value::Text(format!("value_{i}"))])
.await
.unwrap();
}
for i in 0..100 {
let mut stmt = conn.prepare_cached(select_query).await.unwrap();
let mut rows = stmt.query(vec![Value::Integer(i)]).await.unwrap();
let row = rows.next().await.unwrap().unwrap();
assert_eq!(row.get::<String>(0).unwrap(), format!("value_{i}"));
}
}
#[tokio::test]
async fn test_prepare_vs_prepare_cached_equivalence() {
let db = Builder::new_local(":memory:").build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute("CREATE TABLE t (x INTEGER, y TEXT)", ())
.await
.unwrap();
conn.execute("INSERT INTO t VALUES (1, 'a'), (2, 'b'), (3, 'c')", ())
.await
.unwrap();
let query = "SELECT * FROM t ORDER BY x";
let mut stmt1 = conn.prepare(query).await.unwrap();
let mut rows1 = stmt1.query(()).await.unwrap();
let mut results1 = Vec::new();
while let Some(row) = rows1.next().await.unwrap() {
results1.push((row.get::<i64>(0).unwrap(), row.get::<String>(1).unwrap()));
}
let mut stmt2 = conn.prepare_cached(query).await.unwrap();
let mut rows2 = stmt2.query(()).await.unwrap();
let mut results2 = Vec::new();
while let Some(row) = rows2.next().await.unwrap() {
results2.push((row.get::<i64>(0).unwrap(), row.get::<String>(1).unwrap()));
}
assert_eq!(results1, results2);
assert_eq!(
results1,
vec![
(1, "a".to_string()),
(2, "b".to_string()),
(3, "c".to_string()),
]
);
}
#[tokio::test]
async fn test_once_not_cleared_on_reset_with_coroutine() {
let db = Builder::new_local(":memory:").build().await.unwrap();
let conn = db.connect().unwrap();
let mut stmt = conn
.prepare("SELECT * FROM (SELECT (SELECT 1))")
.await
.unwrap();
let mut rows = stmt.query(()).await.unwrap();
let row = rows.next().await.unwrap().unwrap();
let value: i64 = row.get(0).unwrap();
assert_eq!(value, 1);
assert!(rows.next().await.unwrap().is_none());
drop(rows);
stmt.reset().unwrap();
let mut rows = stmt.query(()).await.unwrap();
let row = rows.next().await.unwrap().unwrap();
assert_eq!(
row.get_value(0).unwrap(),
Value::Integer(1),
"Second execution should return 1, not Null. Bug: state.once not cleared in reset()"
);
}
#[tokio::test]
async fn test_strict_tables() {
let db = Builder::new_local(":memory:").build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute(
"CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT) STRICT",
(),
)
.await
.unwrap();
conn.execute("INSERT INTO users VALUES (1, 'Alice')", ())
.await
.unwrap();
let mut rows = conn.query("SELECT id, name FROM users", ()).await.unwrap();
let row = rows.next().await.unwrap().unwrap();
assert_eq!(row.get::<i64>(0).unwrap(), 1);
assert_eq!(row.get::<String>(1).unwrap(), "Alice");
}
async fn collect_ids(conn: &turso::Connection, sql: &str) -> Vec<i64> {
let mut rows = conn.query(sql, ()).await.unwrap();
let mut ids = Vec::new();
while let Some(row) = rows.next().await.unwrap() {
let id: i64 = row.get(0).unwrap();
ids.push(id);
}
ids
}
#[tokio::test]
async fn test_check_on_conflict_fail() {
let db = Builder::new_local(":memory:").build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute(
"CREATE TABLE t(id INTEGER PRIMARY KEY, value INTEGER CHECK(value > 0))",
(),
)
.await
.unwrap();
conn.execute("BEGIN", ()).await.unwrap();
conn.execute("INSERT INTO t VALUES(1, 10)", ())
.await
.unwrap();
let err = conn
.execute("INSERT OR FAIL INTO t VALUES(2, -5)", ())
.await;
assert!(
err.is_err(),
"INSERT OR FAIL should error on CHECK violation"
);
conn.execute("COMMIT", ()).await.unwrap();
let ids = collect_ids(&conn, "SELECT id FROM t ORDER BY id").await;
assert_eq!(ids, vec![1]);
}
#[tokio::test]
async fn test_check_on_conflict_abort() {
let db = Builder::new_local(":memory:").build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute(
"CREATE TABLE t(id INTEGER PRIMARY KEY, value INTEGER CHECK(value > 0))",
(),
)
.await
.unwrap();
conn.execute("BEGIN", ()).await.unwrap();
conn.execute("INSERT INTO t VALUES(1, 10)", ())
.await
.unwrap();
let err = conn
.execute("INSERT OR ABORT INTO t VALUES(2, -5)", ())
.await;
assert!(
err.is_err(),
"INSERT OR ABORT should error on CHECK violation"
);
conn.execute("COMMIT", ()).await.unwrap();
let ids = collect_ids(&conn, "SELECT id FROM t ORDER BY id").await;
assert_eq!(ids, vec![1]);
}
#[tokio::test]
async fn test_check_on_conflict_rollback() {
let db = Builder::new_local(":memory:").build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute(
"CREATE TABLE t(id INTEGER PRIMARY KEY, value INTEGER CHECK(value > 0))",
(),
)
.await
.unwrap();
conn.execute("INSERT INTO t VALUES(1, 10)", ())
.await
.unwrap();
conn.execute("BEGIN", ()).await.unwrap();
conn.execute("INSERT INTO t VALUES(2, 20)", ())
.await
.unwrap();
let err = conn
.execute("INSERT OR ROLLBACK INTO t VALUES(3, -5)", ())
.await;
assert!(
err.is_err(),
"INSERT OR ROLLBACK should error on CHECK violation"
);
let ids = collect_ids(&conn, "SELECT id FROM t ORDER BY id").await;
assert_eq!(ids, vec![1]);
}
#[tokio::test]
async fn test_check_on_conflict_replace() {
let db = Builder::new_local(":memory:").build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute(
"CREATE TABLE t(id INTEGER PRIMARY KEY, value INTEGER CHECK(value > 0))",
(),
)
.await
.unwrap();
conn.execute("BEGIN", ()).await.unwrap();
conn.execute("INSERT INTO t VALUES(1, 10)", ())
.await
.unwrap();
let err = conn
.execute("INSERT OR REPLACE INTO t VALUES(1, -5)", ())
.await;
assert!(
err.is_err(),
"INSERT OR REPLACE should error on CHECK violation"
);
conn.execute("COMMIT", ()).await.unwrap();
let ids = collect_ids(&conn, "SELECT id FROM t ORDER BY id").await;
assert_eq!(ids, vec![1]);
}
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tempfile::tempdir;
use tokio::sync::Barrier;
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_lost_updates() {
let (db, _dir) = setup_mvcc_db(
"CREATE TABLE counter(id INTEGER PRIMARY KEY, val INTEGER);
INSERT INTO counter VALUES(1, 0);",
)
.await;
let num_workers: usize = 16;
let rounds: i64 = 100;
let total_committed = Arc::new(AtomicI64::new(0));
for _round in 0..rounds {
let barrier = Arc::new(Barrier::new(num_workers));
let mut handles = Vec::new();
for _ in 0..num_workers {
let conn = db.connect().unwrap();
let barrier = barrier.clone();
let total_committed = total_committed.clone();
handles.push(tokio::spawn(async move {
barrier.wait().await;
if conn.execute("BEGIN CONCURRENT", ()).await.is_err() {
return;
}
if conn
.execute("UPDATE counter SET val = val + 1 WHERE id = 1", ())
.await
.is_err()
{
let _ = conn.execute("ROLLBACK", ()).await;
return;
}
match conn.execute("COMMIT", ()).await {
Ok(_) => {
total_committed.fetch_add(1, Ordering::Relaxed);
}
Err(_) => {
let _ = conn.execute("ROLLBACK", ()).await;
}
}
}));
}
for handle in handles {
handle.await.unwrap();
}
}
let conn = db.connect().unwrap();
let val = query_i64(&conn, "SELECT val FROM counter WHERE id = 1").await;
let committed = total_committed.load(Ordering::Relaxed);
assert_eq!(
val, committed,
"Lost updates! counter={val} but {committed} transactions committed successfully"
);
}
async fn setup_mvcc_db(schema: &str) -> (turso::Database, tempfile::TempDir) {
setup_mvcc_db_with_options(schema, false).await
}
async fn setup_mvcc_db_with_options(
schema: &str,
triggers: bool,
) -> (turso::Database, tempfile::TempDir) {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let mut builder = Builder::new_local(db_path.to_str().unwrap());
if triggers {
builder = builder.experimental_triggers(true);
}
let db = builder.build().await.unwrap();
let conn = db.connect().unwrap();
let mut rows = conn
.query("PRAGMA journal_mode = 'mvcc'", ())
.await
.unwrap();
while let Ok(Some(_)) = rows.next().await {}
drop(rows);
if !schema.is_empty() {
conn.execute_batch(schema).await.unwrap();
}
(db, dir)
}
async fn query_i64(conn: &turso::Connection, sql: &str) -> i64 {
let mut rows = conn.query(sql, ()).await.unwrap();
let row = rows.next().await.unwrap().unwrap();
row.get::<i64>(0).unwrap()
}
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
#[ignore = "FIXME: This test hangs on main"]
async fn test_deadlock_join_during_writes() {
let (db, _dir) = setup_mvcc_db(
"CREATE TABLE orders(id INTEGER PRIMARY KEY, customer_id INTEGER, amount INTEGER);
CREATE TABLE customers(id INTEGER PRIMARY KEY, name TEXT);
INSERT INTO customers VALUES(1, 'alice');
INSERT INTO customers VALUES(2, 'bob');
INSERT INTO customers VALUES(3, 'charlie');",
)
.await;
let done = Arc::new(AtomicBool::new(false));
let mut handles = vec![];
for w in 0..4 {
let db = db.clone();
let done = done.clone();
handles.push(tokio::spawn(async move {
let conn = db.connect().unwrap();
let mut i = 0u64;
while !done.load(Ordering::Relaxed) {
let id = (w as u64) * 100000 + i;
let cust = (i % 3) + 1;
let _ = conn.execute("BEGIN CONCURRENT", ()).await;
let _ = conn
.execute(
&format!("INSERT INTO orders VALUES({}, {}, {})", id, cust, 10),
(),
)
.await;
let _ = conn.execute("COMMIT", ()).await;
i += 1;
}
}));
}
for _ in 0..4 {
let db = db.clone();
let done = done.clone();
handles.push(tokio::spawn(async move {
let conn = db.connect().unwrap();
while !done.load(Ordering::Relaxed) {
let _ = conn.execute("BEGIN CONCURRENT", ()).await;
let _orphans = match conn
.query(
"SELECT COUNT(*) FROM orders o LEFT JOIN customers c ON o.customer_id = c.id WHERE c.id IS NULL",
(),
)
.await
{
Ok(mut rows) => match rows.next().await {
Ok(Some(row)) => row.get::<i64>(0).unwrap_or(0),
_ => 0,
},
Err(_) => 0,
};
let _ = conn.execute("COMMIT", ()).await;
}
}));
}
tokio::time::sleep(Duration::from_secs(3)).await;
done.store(true, Ordering::Relaxed);
for handle in handles {
handle.await.unwrap();
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_snapshot_isolation_violation() {
let (db, _dir) = setup_mvcc_db("CREATE TABLE t(id INTEGER PRIMARY KEY, val INTEGER)").await;
let done = Arc::new(AtomicBool::new(false));
let violation_found = Arc::new(AtomicBool::new(false));
let mut handles = Vec::new();
for w in 0..4i64 {
let conn = db.connect().unwrap();
let done = done.clone();
handles.push(tokio::spawn(async move {
let mut i = 0i64;
while !done.load(Ordering::Relaxed) {
if conn.execute("BEGIN CONCURRENT", ()).await.is_err() {
continue;
}
let mut ok = true;
for j in 0..5i64 {
let id = w * 100_000 + i * 5 + j;
if conn
.execute(&format!("INSERT INTO t VALUES({id}, {id})"), ())
.await
.is_err()
{
ok = false;
break;
}
}
if ok {
if conn.execute("COMMIT", ()).await.is_err() {
let _ = conn.execute("ROLLBACK", ()).await;
}
} else {
let _ = conn.execute("ROLLBACK", ()).await;
}
i += 1;
}
}));
}
for _ in 0..4 {
let conn = db.connect().unwrap();
let done = done.clone();
let violation_found = violation_found.clone();
handles.push(tokio::spawn(async move {
while !done.load(Ordering::Relaxed) {
if conn.execute("BEGIN CONCURRENT", ()).await.is_err() {
continue;
}
let count1 = query_i64(&conn, "SELECT COUNT(*) FROM t").await;
tokio::task::yield_now().await; let count2 = query_i64(&conn, "SELECT COUNT(*) FROM t").await;
let _ = conn.execute("COMMIT", ()).await;
if count1 != count2 {
violation_found.store(true, Ordering::Relaxed);
eprintln!(
"VIOLATION: COUNT changed {} -> {} within same txn (delta={})",
count1,
count2,
count2 - count1
);
}
}
}));
}
tokio::time::sleep(Duration::from_secs(3)).await;
done.store(true, Ordering::Relaxed);
for handle in handles {
let _ = handle.await;
}
assert!(
!violation_found.load(Ordering::Relaxed),
"Snapshot isolation violated: COUNT(*) changed within a single BEGIN CONCURRENT txn"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_ghost_commits() {
for iteration in 0..500 {
if iteration % 100 == 0 {
eprintln!("test_ghost_commits: Iteration {iteration}");
}
let (db, _dir) = setup_mvcc_db("CREATE TABLE t(id INTEGER PRIMARY KEY, val INTEGER)").await;
let num_workers: usize = 8;
let ops_per_worker: i64 = 100;
let barrier = Arc::new(Barrier::new(num_workers));
let mut handles = Vec::new();
for worker_id in 0..num_workers as i64 {
let conn = db.connect().unwrap();
let barrier = barrier.clone();
handles.push(tokio::spawn(async move {
barrier.wait().await;
let mut successes = 0i64;
let mut errors = 0i64;
for i in 0..ops_per_worker {
let id = worker_id * 10_000 + i;
match conn
.execute(&format!("INSERT INTO t VALUES({id}, {i})"), ())
.await
{
Ok(_) => successes += 1,
Err(turso::Error::Busy(_) | turso::Error::BusySnapshot(_)) => errors += 1, Err(e) => panic!("unexpected error: {e:?}"),
}
}
(successes, errors)
}));
}
let mut total_successes = 0i64;
let mut total_errors = 0i64;
for handle in handles {
let (s, e) = handle.await.unwrap();
total_successes += s;
total_errors += e;
}
let conn = db.connect().unwrap();
let actual_rows = query_i64(&conn, "SELECT COUNT(*) FROM t").await;
if iteration % 100 == 0 {
eprintln!("test_ghost_commits: Iteration {iteration}, actual_rows={actual_rows}, total_successes={total_successes}, total_errors={total_errors}");
}
assert_eq!(
actual_rows,
total_successes,
"Ghost commits! {actual_rows} rows in DB but only {total_successes} reported as Ok ({total_errors} errors). \
{} inserts committed despite returning Busy.",
total_successes - actual_rows,
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_autoincrement_blocked_in_mvcc() {
let (db, _dir) = setup_mvcc_db("").await;
let conn = db.connect().unwrap();
let result = conn
.execute(
"CREATE TABLE t(a INTEGER PRIMARY KEY AUTOINCREMENT, b TEXT)",
(),
)
.await;
assert!(
result.is_err(),
"CREATE TABLE with AUTOINCREMENT should fail in MVCC mode"
);
let err = result.unwrap_err().to_string();
assert!(
err.contains("AUTOINCREMENT is not supported in MVCC mode"),
"unexpected error: {err}"
);
conn.execute("CREATE TABLE t(a INTEGER PRIMARY KEY, b TEXT)", ())
.await
.unwrap();
conn.execute("INSERT INTO t VALUES (1, 'hello')", ())
.await
.unwrap();
let count = query_i64(&conn, "SELECT COUNT(*) FROM t").await;
assert_eq!(count, 1);
}