#![allow(clippy::expect_used, clippy::unwrap_used)]
use mssql_client::{Client, Config, Error};
fn get_test_config() -> Option<Config> {
let host = std::env::var("MSSQL_HOST").ok()?;
let port = std::env::var("MSSQL_PORT").unwrap_or_else(|_| "1433".into());
let user = std::env::var("MSSQL_USER").unwrap_or_else(|_| "sa".into());
let password = std::env::var("MSSQL_PASSWORD").unwrap_or_else(|_| "YourStrong@Passw0rd".into());
let conn_str = format!(
"Server={host},{port};Database=master;User Id={user};Password={password};\
TrustServerCertificate=true"
);
Config::from_connection_string(&conn_str).ok()
}
#[tokio::test]
#[ignore = "Requires SQL Server"]
async fn query_stream_matches_buffered_query() {
let Some(cfg) = get_test_config() else {
return;
};
let mut client = Client::connect(cfg).await.expect("connect");
const SQL: &str = "\
WITH n AS (SELECT 1 AS i UNION ALL SELECT i + 1 FROM n WHERE i < 5000) \
SELECT i, CAST(CONCAT('row-', i) AS VARCHAR(32)) AS label FROM n \
OPTION (MAXRECURSION 0)";
let buffered: Vec<(i32, String)> = client
.query(SQL, &[])
.await
.expect("buffered query")
.collect_all()
.await
.expect("collect")
.into_iter()
.map(|row| {
(
row.get_by_name::<i32>("i").unwrap(),
row.get_by_name::<String>("label").unwrap(),
)
})
.collect();
let mut stream = client.query_stream(SQL, &[]).await.expect("stream query");
let mut streamed: Vec<(i32, String)> = Vec::new();
while let Some(row) = stream.try_next().await.expect("row") {
streamed.push((
row.get_by_name::<i32>("i").unwrap(),
row.get_by_name::<String>("label").unwrap(),
));
}
assert_eq!(streamed.len(), 5000, "expected 5000 streamed rows");
assert_eq!(streamed, buffered, "streamed rows must match buffered rows");
}
#[tokio::test]
#[ignore = "Requires SQL Server"]
async fn connection_reusable_after_stream_drain() {
let Some(cfg) = get_test_config() else {
return;
};
let mut client = Client::connect(cfg).await.expect("connect");
{
let mut stream = client
.query_stream("SELECT TOP 3 object_id FROM sys.objects", &[])
.await
.expect("stream");
let mut n = 0;
while stream.try_next().await.expect("row").is_some() {
n += 1;
}
assert_eq!(n, 3);
}
let row = client
.query("SELECT 42 AS answer", &[])
.await
.expect("second query")
.collect_all()
.await
.expect("collect");
assert_eq!(row[0].get_by_name::<i32>("answer").unwrap(), 42);
}
#[tokio::test]
#[ignore = "Requires SQL Server"]
async fn query_stream_empty_result() {
let Some(cfg) = get_test_config() else {
return;
};
let mut client = Client::connect(cfg).await.expect("connect");
let mut stream = client
.query_stream("SELECT 1 WHERE 1 = 0", &[])
.await
.expect("stream");
assert!(stream.try_next().await.expect("row").is_none());
}
#[tokio::test]
#[ignore = "Requires SQL Server"]
async fn query_stream_surfaces_server_error() {
let Some(cfg) = get_test_config() else {
return;
};
let mut client = Client::connect(cfg).await.expect("connect");
let result = client
.query_stream("SELECT * FROM no_such_table_xyz", &[])
.await;
let err = match result {
Err(e) => e,
Ok(mut stream) => stream
.try_next()
.await
.expect_err("expected a server error"),
};
assert!(
matches!(err, Error::Server { .. }),
"expected Error::Server, got {err:?}"
);
}
const BIG_QUERY: &str = "\
WITH n AS (SELECT 1 AS i UNION ALL SELECT i + 1 FROM n WHERE i < 100000) \
SELECT i FROM n OPTION (MAXRECURSION 0)";
#[tokio::test]
#[ignore = "Requires SQL Server"]
async fn cancel_mid_stream_then_reuse() {
let Some(cfg) = get_test_config() else {
return;
};
let mut client = Client::connect(cfg).await.expect("connect");
{
let mut stream = client.query_stream(BIG_QUERY, &[]).await.expect("stream");
for _ in 0..5 {
stream.try_next().await.expect("row").expect("a row");
}
stream.cancel().await.expect("cancel must succeed");
}
let rows = client
.query("SELECT 7 AS v", &[])
.await
.expect("reuse after cancel")
.collect_all()
.await
.expect("collect");
assert_eq!(rows[0].get_by_name::<i32>("v").unwrap(), 7);
}
#[tokio::test]
#[ignore = "Requires SQL Server"]
async fn drop_mid_stream_then_reuse() {
let Some(cfg) = get_test_config() else {
return;
};
let mut client = Client::connect(cfg).await.expect("connect");
{
let mut stream = client.query_stream(BIG_QUERY, &[]).await.expect("stream");
for _ in 0..5 {
stream.try_next().await.expect("row").expect("a row");
}
}
let rows = client
.query("SELECT 11 AS v", &[])
.await
.expect("reuse after drop")
.collect_all()
.await
.expect("collect");
assert_eq!(rows[0].get_by_name::<i32>("v").unwrap(), 11);
}
#[tokio::test]
#[ignore = "Requires SQL Server"]
async fn cancel_after_full_drain_is_noop() {
let Some(cfg) = get_test_config() else {
return;
};
let mut client = Client::connect(cfg).await.expect("connect");
let mut stream = client
.query_stream("SELECT TOP 2 object_id FROM sys.objects", &[])
.await
.expect("stream");
while stream.try_next().await.expect("row").is_some() {}
stream
.cancel()
.await
.expect("cancel after drain is a no-op");
let rows = client
.query("SELECT 1 AS v", &[])
.await
.expect("reuse")
.collect_all()
.await
.expect("collect");
assert_eq!(rows[0].get_by_name::<i32>("v").unwrap(), 1);
}
#[tokio::test]
#[ignore = "Requires SQL Server"]
async fn error_mid_stream_then_reuse() {
let Some(cfg) = get_test_config() else {
return;
};
let mut client = Client::connect(cfg).await.expect("connect");
const SQL: &str = "SELECT 1 AS n; RAISERROR('boom', 16, 1);";
{
let mut stream = client.query_stream(SQL, &[]).await.expect("stream");
let row = stream.try_next().await.expect("first row").expect("a row");
assert_eq!(row.get_by_name::<i32>("n").unwrap(), 1);
let mut found: Option<Error> = None;
loop {
match stream.try_next().await {
Ok(Some(_)) => continue,
Ok(None) => break,
Err(e) => {
found = Some(e);
break;
}
}
}
let err = found.expect("expected a server error, got end of stream");
assert!(
matches!(err, Error::Server { .. }),
"expected Error::Server, got {err:?}"
);
}
let rows = client
.query("SELECT 13 AS v", &[])
.await
.expect("reuse after mid-stream error")
.collect_all()
.await
.expect("collect");
assert_eq!(rows[0].get_by_name::<i32>("v").unwrap(), 13);
}
#[tokio::test]
#[ignore = "Requires SQL Server"]
async fn query_stream_within_transaction() {
let Some(cfg) = get_test_config() else {
return;
};
let client = Client::connect(cfg).await.expect("connect");
let mut tx = client.begin_transaction().await.expect("begin");
tx.execute("CREATE TABLE #stream_tx (n INT)", &[])
.await
.expect("create temp table");
tx.execute("INSERT INTO #stream_tx VALUES (1), (2), (3)", &[])
.await
.expect("insert");
let mut got: Vec<i32> = Vec::new();
{
let mut stream = tx
.query_stream("SELECT n FROM #stream_tx ORDER BY n", &[])
.await
.expect("stream in transaction");
while let Some(row) = stream.try_next().await.expect("row") {
got.push(row.get_by_name::<i32>("n").unwrap());
}
}
assert_eq!(got, vec![1, 2, 3], "streamed the in-transaction rows");
let mut client = tx.rollback().await.expect("rollback");
let rows = client
.query("SELECT 17 AS v", &[])
.await
.expect("reuse after rollback")
.collect_all()
.await
.expect("collect");
assert_eq!(rows[0].get_by_name::<i32>("v").unwrap(), 17);
}
#[tokio::test]
#[ignore = "Requires SQL Server"]
async fn query_stream_with_parameters() {
let Some(cfg) = get_test_config() else {
return;
};
let mut client = Client::connect(cfg).await.expect("connect");
let mut stream = client
.query_stream("SELECT @p1 + @p2 AS total", &[&10i32, &32i32])
.await
.expect("stream");
let row = stream.try_next().await.expect("row").expect("one row");
assert_eq!(row.get_by_name::<i32>("total").unwrap(), 42);
assert!(stream.try_next().await.expect("row").is_none());
}