mod common;
use common::pgwire_harness::TestServer;
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn kv_insert_duplicate_key_raises_unique_violation() {
let server = TestServer::start().await;
server
.exec("CREATE COLLECTION c (key TEXT PRIMARY KEY, n INT) WITH (engine='kv')")
.await
.unwrap();
server
.exec("INSERT INTO c (key, n) VALUES ('k', 1)")
.await
.unwrap();
match server
.client
.simple_query("INSERT INTO c (key, n) VALUES ('k', 2)")
.await
{
Ok(_) => panic!("expected unique_violation, got success"),
Err(e) => {
let db_err = e.as_db_error().expect("expected DbError");
assert_eq!(
db_err.code().code(),
"23505",
"expected SQLSTATE 23505, got {}: {}",
db_err.code().code(),
db_err.message()
);
assert!(
db_err.message().to_lowercase().contains("k"),
"error must name the conflicting key, got: {}",
db_err.message()
);
}
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn kv_insert_duplicate_key_preserves_original_row() {
let server = TestServer::start().await;
server
.exec("CREATE COLLECTION c (key TEXT PRIMARY KEY, n INT) WITH (engine='kv')")
.await
.unwrap();
server
.exec("INSERT INTO c (key, n) VALUES ('k', 1)")
.await
.unwrap();
let _ = server
.client
.simple_query("INSERT INTO c (key, n) VALUES ('k', 2)")
.await;
let rows = server
.query_text("SELECT n FROM c WHERE key = 'k'")
.await
.unwrap();
assert_eq!(rows.len(), 1, "expected exactly one row, got {rows:?}");
assert_eq!(
rows[0], "1",
"duplicate-key INSERT must not overwrite the original row, got: {}",
rows[0]
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn kv_insert_duplicate_key_resp_value_form_raises_unique_violation() {
let server = TestServer::start().await;
server
.exec("CREATE COLLECTION c (key STRING PRIMARY KEY, value STRING) WITH (engine='kv')")
.await
.unwrap();
server
.exec("INSERT INTO c (key, value) VALUES ('k', 'first')")
.await
.unwrap();
match server
.client
.simple_query("INSERT INTO c (key, value) VALUES ('k', 'second')")
.await
{
Ok(_) => panic!("expected unique_violation on RESP (key,value) form, got success"),
Err(e) => {
let db_err = e.as_db_error().expect("expected DbError");
assert_eq!(
db_err.code().code(),
"23505",
"expected SQLSTATE 23505, got {}: {}",
db_err.code().code(),
db_err.message()
);
}
}
let rows = server
.query_text("SELECT value FROM c WHERE key = 'k'")
.await
.unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(
rows[0], "first",
"RESP-shape duplicate INSERT must not overwrite, got: {}",
rows[0]
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn kv_insert_on_conflict_do_nothing_is_noop() {
let server = TestServer::start().await;
server
.exec("CREATE COLLECTION c (key TEXT PRIMARY KEY, n INT) WITH (engine='kv')")
.await
.unwrap();
server
.exec("INSERT INTO c (key, n) VALUES ('k', 1)")
.await
.unwrap();
server
.exec("INSERT INTO c (key, n) VALUES ('k', 2) ON CONFLICT DO NOTHING")
.await
.unwrap();
let rows = server
.query_text("SELECT n FROM c WHERE key = 'k'")
.await
.unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(
rows[0], "1",
"ON CONFLICT DO NOTHING must leave the original row intact, got: {}",
rows[0]
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn kv_insert_on_conflict_do_update_overwrites() {
let server = TestServer::start().await;
server
.exec("CREATE COLLECTION c (key TEXT PRIMARY KEY, n INT) WITH (engine='kv')")
.await
.unwrap();
server
.exec("INSERT INTO c (key, n) VALUES ('k', 1)")
.await
.unwrap();
server
.exec(
"INSERT INTO c (key, n) VALUES ('k', 2) \
ON CONFLICT (key) DO UPDATE SET n = EXCLUDED.n",
)
.await
.unwrap();
let rows = server
.query_text("SELECT n FROM c WHERE key = 'k'")
.await
.unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0], "2", "got: {}", rows[0]);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn kv_upsert_keyword_overwrites() {
let server = TestServer::start().await;
server
.exec("CREATE COLLECTION c (key TEXT PRIMARY KEY, n INT) WITH (engine='kv')")
.await
.unwrap();
server
.exec("INSERT INTO c (key, n) VALUES ('k', 1)")
.await
.unwrap();
server
.exec("UPSERT INTO c (key, n) VALUES ('k', 2)")
.await
.unwrap();
let rows = server
.query_text("SELECT n FROM c WHERE key = 'k'")
.await
.unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0], "2", "got: {}", rows[0]);
}