#![allow(clippy::expect_used)]
#![allow(unsafe_code)]
use std::alloc::{GlobalAlloc, Layout, System};
use std::sync::atomic::{AtomicUsize, Ordering};
use mssql_client::{Client, Config};
struct Counting;
static LIVE: AtomicUsize = AtomicUsize::new(0);
static PEAK: AtomicUsize = AtomicUsize::new(0);
unsafe impl GlobalAlloc for Counting {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
let ptr = unsafe { System.alloc(layout) };
if !ptr.is_null() {
let live = LIVE.fetch_add(layout.size(), Ordering::Relaxed) + layout.size();
PEAK.fetch_max(live, Ordering::Relaxed);
}
ptr
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
LIVE.fetch_sub(layout.size(), Ordering::Relaxed);
unsafe { System.dealloc(ptr, layout) };
}
}
#[global_allocator]
static GLOBAL: Counting = Counting;
fn config() -> Option<Config> {
let host = std::env::var("MSSQL_HOST").ok()?;
let port = std::env::var("MSSQL_PORT").unwrap_or_else(|_| "1433".into());
let user = std::env::var("MSSQL_USER").unwrap_or_else(|_| "sa".into());
let password = std::env::var("MSSQL_PASSWORD").unwrap_or_else(|_| "YourStrong@Passw0rd".into());
let conn_str = format!(
"Server={host},{port};Database=master;User Id={user};Password={password};\
TrustServerCertificate=true;Encrypt=true"
);
Config::from_connection_string(&conn_str).ok()
}
const LARGE_QUERY: &str = "\
WITH n AS (SELECT 1 AS i UNION ALL SELECT i + 1 FROM n WHERE i < 100000) \
SELECT i, CAST(REPLICATE('x', 100) AS CHAR(100)) AS pad FROM n OPTION (MAXRECURSION 0)";
#[tokio::test]
#[ignore = "Requires a live SQL Server"]
async fn streaming_query_bounds_peak_memory() {
let Some(cfg) = config() else {
return;
};
let mut client = Client::connect(cfg).await.expect("connect");
let baseline = LIVE.load(Ordering::Relaxed);
PEAK.store(baseline, Ordering::Relaxed);
let mut stream = client.query_stream(LARGE_QUERY, &[]).await.expect("query");
let mut count = 0usize;
while let Some(row) = stream.try_next().await.expect("row") {
let _ = row;
count += 1;
}
let peak_delta = PEAK.load(Ordering::Relaxed).saturating_sub(baseline);
eprintln!("rows={count} peak_delta={peak_delta} bytes");
assert_eq!(count, 100_000, "expected 100k rows");
assert!(
peak_delta < 2_000_000,
"peak heap delta was {peak_delta} bytes — streaming should bound this to \
roughly one row, not the whole ~10 MB response"
);
}
#[tokio::test]
#[ignore = "Requires a live SQL Server"]
async fn blob_streaming_bounds_peak_memory() {
let Some(cfg) = config() else {
return;
};
let mut client = Client::connect(cfg).await.expect("connect");
const BIG_BLOB: &str = "SELECT 1 AS id, \
CAST(REPLICATE(CAST('A' AS VARCHAR(MAX)), 30000000) AS VARBINARY(MAX)) AS doc";
let baseline = LIVE.load(Ordering::Relaxed);
PEAK.store(baseline, Ordering::Relaxed);
let mut stream = client
.query_stream_blob(BIG_BLOB, &[])
.await
.expect("stream");
let _ = stream.next().await.expect("next").expect("one row");
let mut sink = tokio::io::sink();
let n = stream.copy_blob_to(&mut sink).await.expect("copy blob");
let peak_delta = PEAK.load(Ordering::Relaxed).saturating_sub(baseline);
eprintln!("blob_bytes={n} peak_delta={peak_delta} bytes");
assert_eq!(n, 30_000_000, "expected a 30 MB blob");
assert!(
peak_delta < 2_000_000,
"peak heap delta was {peak_delta} bytes — blob streaming should bound \
this to ~one chunk, not the whole 30 MB cell"
);
}
#[tokio::test]
#[ignore = "Requires a live SQL Server"]
async fn multi_blob_streaming_bounds_peak_memory() {
let Some(cfg) = config() else {
return;
};
let mut client = Client::connect(cfg).await.expect("connect");
const TWO_BIG_BLOBS: &str = "SELECT 1 AS id, \
CAST(REPLICATE(CAST('A' AS VARCHAR(MAX)), 15000000) AS VARBINARY(MAX)) AS doc1, \
CAST(REPLICATE(CAST('B' AS VARCHAR(MAX)), 15000000) AS VARBINARY(MAX)) AS doc2";
let baseline = LIVE.load(Ordering::Relaxed);
PEAK.store(baseline, Ordering::Relaxed);
let mut stream = client
.query_stream_rows(TWO_BIG_BLOBS, &[])
.await
.expect("stream");
let _ = stream.next().await.expect("next").expect("one row");
let mut sink = tokio::io::sink();
let mut total = 0u64;
while stream.next_blob().await.expect("next_blob") {
total += stream.copy_blob_to(&mut sink).await.expect("copy blob");
}
let peak_delta = PEAK.load(Ordering::Relaxed).saturating_sub(baseline);
eprintln!("total_blob_bytes={total} peak_delta={peak_delta} bytes");
assert_eq!(total, 30_000_000, "expected two 15 MB blobs");
assert!(
peak_delta < 2_000_000,
"peak heap delta was {peak_delta} bytes — multi-blob streaming should \
bound this to ~one chunk, not a whole 15 MB cell"
);
}