use faucet_core::Sink;
use faucet_sink_sqlite::{SqliteColumnMapping, SqliteSink, SqliteSinkConfig};
use serde_json::{Value, json};
use sqlx::Row;
use sqlx::sqlite::SqlitePoolOptions;
use tempfile::TempDir;
async fn fresh_db(create_sql: &str) -> (TempDir, String) {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("test.db");
let url = format!("sqlite://{}?mode=rwc", path.display());
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect(&url)
.await
.expect("connect");
sqlx::query(create_sql)
.execute(&pool)
.await
.expect("create table");
pool.close().await;
(dir, url)
}
async fn count_rows(url: &str, table: &str) -> i64 {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect(url)
.await
.expect("connect");
let row = sqlx::query(&format!("SELECT COUNT(*) AS n FROM {table}"))
.fetch_one(&pool)
.await
.expect("count");
let n: i64 = row.get("n");
pool.close().await;
n
}
#[tokio::test]
async fn json_mode_rechunks_large_page_into_multiple_inserts() {
let (_dir, url) = fresh_db("CREATE TABLE events (data TEXT NOT NULL)").await;
let config = SqliteSinkConfig::new(&url, "events")
.column_mapping(SqliteColumnMapping::Json {
column: "data".into(),
})
.with_batch_size(500);
let sink = SqliteSink::new(config).await.unwrap();
let records: Vec<Value> = (0..1_500).map(|i| json!({"i": i})).collect();
let n = sink.write_batch(&records).await.unwrap();
assert_eq!(n, 1_500);
assert_eq!(count_rows(&url, "events").await, 1_500);
}
#[tokio::test]
async fn json_mode_batch_size_zero_writes_single_transaction() {
let (_dir, url) = fresh_db("CREATE TABLE events (data TEXT NOT NULL)").await;
let config = SqliteSinkConfig::new(&url, "events")
.column_mapping(SqliteColumnMapping::Json {
column: "data".into(),
})
.with_batch_size(0);
let sink = SqliteSink::new(config).await.unwrap();
let records: Vec<Value> = (0..2_500).map(|i| json!({"i": i})).collect();
let n = sink.write_batch(&records).await.unwrap();
assert_eq!(n, 2_500);
assert_eq!(count_rows(&url, "events").await, 2_500);
}
#[tokio::test]
async fn json_mode_undersized_page_writes_in_single_chunk() {
let (_dir, url) = fresh_db("CREATE TABLE events (data TEXT NOT NULL)").await;
let config = SqliteSinkConfig::new(&url, "events")
.column_mapping(SqliteColumnMapping::Json {
column: "data".into(),
})
.with_batch_size(500);
let sink = SqliteSink::new(config).await.unwrap();
let records: Vec<Value> = (0..100).map(|i| json!({"i": i})).collect();
let n = sink.write_batch(&records).await.unwrap();
assert_eq!(n, 100);
assert_eq!(count_rows(&url, "events").await, 100);
}
#[tokio::test]
async fn json_mode_empty_input_is_noop() {
let (_dir, url) = fresh_db("CREATE TABLE events (data TEXT NOT NULL)").await;
let config = SqliteSinkConfig::new(&url, "events")
.column_mapping(SqliteColumnMapping::Json {
column: "data".into(),
})
.with_batch_size(500);
let sink = SqliteSink::new(config).await.unwrap();
let n = sink.write_batch(&[]).await.unwrap();
assert_eq!(n, 0);
assert_eq!(count_rows(&url, "events").await, 0);
}
#[tokio::test]
async fn json_mode_exact_multiple_writes_all_rows() {
let (_dir, url) = fresh_db("CREATE TABLE events (data TEXT NOT NULL)").await;
let config = SqliteSinkConfig::new(&url, "events")
.column_mapping(SqliteColumnMapping::Json {
column: "data".into(),
})
.with_batch_size(500);
let sink = SqliteSink::new(config).await.unwrap();
let records: Vec<Value> = (0..1_000).map(|i| json!({"i": i})).collect();
let n = sink.write_batch(&records).await.unwrap();
assert_eq!(n, 1_000);
assert_eq!(count_rows(&url, "events").await, 1_000);
}
#[tokio::test]
async fn auto_map_mode_rechunks_large_page() {
let (_dir, url) =
fresh_db("CREATE TABLE events (user_id TEXT NOT NULL, event TEXT NOT NULL)").await;
let config = SqliteSinkConfig::new(&url, "events")
.column_mapping(SqliteColumnMapping::AutoMap)
.with_batch_size(500);
let sink = SqliteSink::new(config).await.unwrap();
let records: Vec<Value> = (0..1_500)
.map(|i| json!({"user_id": format!("u{i}"), "event": "signup"}))
.collect();
let n = sink.write_batch(&records).await.unwrap();
assert_eq!(n, 1_500);
assert_eq!(count_rows(&url, "events").await, 1_500);
}
#[tokio::test]
async fn auto_map_binds_native_types_not_json_strings() {
let (_dir, url) =
fresh_db("CREATE TABLE people (name TEXT, active INTEGER, score REAL, note TEXT)").await;
let config = SqliteSinkConfig::new(&url, "people").column_mapping(SqliteColumnMapping::AutoMap);
let sink = SqliteSink::new(config).await.unwrap();
let records = vec![
json!({"name": "Bob", "active": true, "score": 1.5, "note": "hi"}),
json!({"name": "Sue", "active": false, "score": 2.5}),
];
sink.write_batch(&records).await.unwrap();
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect(&url)
.await
.expect("connect");
let bob = sqlx::query(
"SELECT name, active, score, note, typeof(name) AS tn, typeof(active) AS ta \
FROM people WHERE name = 'Bob'",
)
.fetch_one(&pool)
.await
.expect("bob row (name must be stored unquoted)");
assert_eq!(bob.get::<String, _>("tn"), "text");
assert_eq!(bob.get::<String, _>("name"), "Bob");
assert_eq!(bob.get::<String, _>("ta"), "integer");
assert_eq!(bob.get::<i64, _>("active"), 1);
assert_eq!(bob.get::<f64, _>("score"), 1.5);
assert_eq!(bob.get::<String, _>("note"), "hi");
let sue =
sqlx::query("SELECT active, note, typeof(note) AS tnote FROM people WHERE name = 'Sue'")
.fetch_one(&pool)
.await
.expect("sue row");
assert_eq!(sue.get::<i64, _>("active"), 0, "false must bind integer 0");
assert_eq!(
sue.get::<String, _>("tnote"),
"null",
"missing column must bind SQL NULL, not the text 'null'"
);
assert_eq!(sue.get::<Option<String>, _>("note"), None);
pool.close().await;
}
#[tokio::test]
async fn auto_map_chunks_to_respect_sqlite_var_limit() {
let cols: Vec<String> = (0..100).map(|i| format!("c{i}")).collect();
let create = format!(
"CREATE TABLE wide ({})",
cols.iter()
.map(|c| format!("{c} INTEGER"))
.collect::<Vec<_>>()
.join(", ")
);
let (_dir, url) = fresh_db(&create).await;
let config = SqliteSinkConfig::new(&url, "wide")
.column_mapping(SqliteColumnMapping::AutoMap)
.with_batch_size(0); let sink = SqliteSink::new(config).await.unwrap();
let records: Vec<Value> = (0..1_000)
.map(|r| {
let mut m = serde_json::Map::new();
for (i, c) in cols.iter().enumerate() {
m.insert(c.clone(), json!(r * 100 + i as i64));
}
Value::Object(m)
})
.collect();
let n = sink.write_batch(&records).await.unwrap();
assert_eq!(n, 1_000);
assert_eq!(count_rows(&url, "wide").await, 1_000);
}
#[tokio::test]
async fn auto_map_mode_batch_size_zero_passes_page_through() {
let (_dir, url) =
fresh_db("CREATE TABLE events (user_id TEXT NOT NULL, event TEXT NOT NULL)").await;
let config = SqliteSinkConfig::new(&url, "events")
.column_mapping(SqliteColumnMapping::AutoMap)
.with_batch_size(0);
let sink = SqliteSink::new(config).await.unwrap();
let records: Vec<Value> = (0..2_500)
.map(|i| json!({"user_id": format!("u{i}"), "event": "signup"}))
.collect();
let n = sink.write_batch(&records).await.unwrap();
assert_eq!(n, 2_500);
assert_eq!(count_rows(&url, "events").await, 2_500);
}
#[tokio::test]
async fn auto_map_unions_columns_across_heterogeneous_batch() {
let (_dir, url) = fresh_db("CREATE TABLE events (id INTEGER, name TEXT, email TEXT)").await;
let config = SqliteSinkConfig::new(&url, "events").column_mapping(SqliteColumnMapping::AutoMap);
let sink = SqliteSink::new(config).await.unwrap();
let records = vec![
json!({ "id": 1 }),
json!({ "id": 2, "name": "b", "email": "x@y" }),
];
let n = sink.write_batch(&records).await.unwrap();
assert_eq!(n, 2);
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect(&url)
.await
.expect("connect");
let row2 = sqlx::query("SELECT name, email FROM events WHERE id = 2")
.fetch_one(&pool)
.await
.expect("read row 2");
let row1 = sqlx::query("SELECT email FROM events WHERE id = 1")
.fetch_one(&pool)
.await
.expect("read row 1");
let name2: Option<String> = row2.get("name");
let email2: Option<String> = row2.get("email");
let email1: Option<String> = row1.get("email");
pool.close().await;
assert_eq!(name2.as_deref(), Some("b"));
assert_eq!(
email2.as_deref(),
Some("x@y"),
"later-record-only column must be inserted, not dropped (H1)"
);
assert_eq!(
email1, None,
"row missing the unioned column binds SQL NULL"
);
}
#[tokio::test]
async fn batch_size_atomicity_per_chunk_preserved() {
let (_dir, url) = fresh_db("CREATE TABLE events (data TEXT NOT NULL)").await;
let config = SqliteSinkConfig::new(&url, "events")
.column_mapping(SqliteColumnMapping::Json {
column: "data".into(),
})
.with_batch_size(300);
let sink = SqliteSink::new(config).await.unwrap();
let first: Vec<Value> = (0..700).map(|i| json!({"i": i})).collect();
let second: Vec<Value> = (0..400).map(|i| json!({"i": i + 1000})).collect();
sink.write_batch(&first).await.unwrap();
assert_eq!(count_rows(&url, "events").await, 700);
sink.write_batch(&second).await.unwrap();
assert_eq!(count_rows(&url, "events").await, 1_100);
}
#[tokio::test]
async fn wal_pool_writes_correctly_with_concurrent_reader() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("wal.db");
let url = format!("sqlite://{}", path.display());
{
let setup = SqlitePoolOptions::new()
.max_connections(1)
.connect(&format!("{url}?mode=rwc"))
.await
.expect("setup connect");
sqlx::query("CREATE TABLE events (data TEXT NOT NULL)")
.execute(&setup)
.await
.expect("create table");
setup.close().await;
}
let config = SqliteSinkConfig::new(&url, "events")
.column_mapping(SqliteColumnMapping::Json {
column: "data".into(),
})
.with_batch_size(500);
let sink = SqliteSink::new(config).await.unwrap();
let reader = SqlitePoolOptions::new()
.max_connections(1)
.connect(&url)
.await
.expect("reader connect");
let first: Vec<Value> = (0..900).map(|i| json!({"i": i})).collect();
let n1 = sink.write_batch(&first).await.unwrap();
assert_eq!(n1, 900);
let n: i64 = sqlx::query("SELECT COUNT(*) AS n FROM events")
.fetch_one(&reader)
.await
.expect("reader sees first batch")
.get("n");
assert_eq!(n, 900, "concurrent reader must observe the committed batch");
let second: Vec<Value> = (0..600).map(|i| json!({"i": i + 10_000})).collect();
let n2 = sink.write_batch(&second).await.unwrap();
assert_eq!(n2, 600);
let n: i64 = sqlx::query("SELECT COUNT(*) AS n FROM events")
.fetch_one(&reader)
.await
.expect("reader sees both batches")
.get("n");
assert_eq!(n, 1_500);
let mode: String = sqlx::query("PRAGMA journal_mode")
.fetch_one(&reader)
.await
.expect("journal_mode")
.get(0);
assert_eq!(
mode.to_lowercase(),
"wal",
"the on-disk database must be in WAL mode"
);
reader.close().await;
}
#[tokio::test]
async fn memory_url_still_writes_correctly() {
let config = SqliteSinkConfig::new("sqlite::memory:", "events").column_mapping(
SqliteColumnMapping::Json {
column: "data".into(),
},
);
let sink = SqliteSink::new(config).await.unwrap();
let n = sink.write_batch(&[]).await.unwrap();
assert_eq!(n, 0, "empty write is a no-op even on a memory DB");
}