mssql-client 0.20.0

High-level async SQL Server client with type-state connection management
Documentation
//! The streaming-redesign memory proof and regression guard.
//!
//! This is the *definition of done* for true result streaming. It runs a query
//! whose response is ~10 MB and asserts that peak heap allocation during
//! consumption stays bounded to roughly one row — not the whole response. A
//! custom counting global allocator (this binary only) makes the measurement
//! deterministic, unlike RSS.
//!
//! It consumes via [`Client::query_stream`] (the incremental path added in the
//! streaming redesign), which reads TDS packets on demand. Against the buffered
//! [`Client::query`] this assertion would FAIL (peak ≈ the full response,
//! measured at ~40 MB for this query); against `query_stream` peak stays at
//! ~one packet plus one row, so it passes.
//!
//! Run it against a live server (it is `#[ignore]`d so it never runs without
//! one):
//! ```text
//! MSSQL_HOST=localhost MSSQL_PORT=1433 MSSQL_USER=sa MSSQL_PASSWORD='YourStrong@Passw0rd' \
//!   cargo nextest run -p mssql-client --test streaming_memory --run-ignored ignored-only
//! ```

#![allow(clippy::expect_used)]
#![allow(unsafe_code)]

use std::alloc::{GlobalAlloc, Layout, System};
use std::sync::atomic::{AtomicUsize, Ordering};

use mssql_client::{Client, Config};

/// A pass-through allocator that tracks live and peak allocated bytes.
struct Counting;

static LIVE: AtomicUsize = AtomicUsize::new(0);
static PEAK: AtomicUsize = AtomicUsize::new(0);

// SAFETY: delegates every allocation to the system allocator unchanged; the
// atomics only observe sizes and never affect the returned pointers.
unsafe impl GlobalAlloc for Counting {
    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
        let ptr = unsafe { System.alloc(layout) };
        if !ptr.is_null() {
            let live = LIVE.fetch_add(layout.size(), Ordering::Relaxed) + layout.size();
            PEAK.fetch_max(live, Ordering::Relaxed);
        }
        ptr
    }

    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
        LIVE.fetch_sub(layout.size(), Ordering::Relaxed);
        unsafe { System.dealloc(ptr, layout) };
    }
}

#[global_allocator]
static GLOBAL: Counting = Counting;

fn config() -> Option<Config> {
    let host = std::env::var("MSSQL_HOST").ok()?;
    let port = std::env::var("MSSQL_PORT").unwrap_or_else(|_| "1433".into());
    let user = std::env::var("MSSQL_USER").unwrap_or_else(|_| "sa".into());
    let password = std::env::var("MSSQL_PASSWORD").unwrap_or_else(|_| "YourStrong@Passw0rd".into());
    let conn_str = format!(
        "Server={host},{port};Database=master;User Id={user};Password={password};\
         TrustServerCertificate=true;Encrypt=true"
    );
    Config::from_connection_string(&conn_str).ok()
}

/// ~100k rows of `(int, char(100))` ≈ a 10 MB response. Peak heap while reading
/// it must stay far below that if rows stream rather than buffer.
const LARGE_QUERY: &str = "\
    WITH n AS (SELECT 1 AS i UNION ALL SELECT i + 1 FROM n WHERE i < 100000) \
    SELECT i, CAST(REPLICATE('x', 100) AS CHAR(100)) AS pad FROM n OPTION (MAXRECURSION 0)";

#[tokio::test]
#[ignore = "Requires a live SQL Server"]
async fn streaming_query_bounds_peak_memory() {
    let Some(cfg) = config() else {
        return;
    };
    let mut client = Client::connect(cfg).await.expect("connect");

    // Baseline the live bytes, then measure the peak reached while the query
    // runs and the rows are consumed incrementally.
    let baseline = LIVE.load(Ordering::Relaxed);
    PEAK.store(baseline, Ordering::Relaxed);

    let mut stream = client.query_stream(LARGE_QUERY, &[]).await.expect("query");
    let mut count = 0usize;
    while let Some(row) = stream.try_next().await.expect("row") {
        let _ = row;
        count += 1;
    }

    let peak_delta = PEAK.load(Ordering::Relaxed).saturating_sub(baseline);
    eprintln!("rows={count} peak_delta={peak_delta} bytes");

    assert_eq!(count, 100_000, "expected 100k rows");
    // ~one row is ~100 bytes; one packet is ~8 KB. A 2 MB bound is far below the
    // ~10 MB response (the buffered path peaks at ~40 MB), so it can only pass
    // if rows stream rather than buffer.
    assert!(
        peak_delta < 2_000_000,
        "peak heap delta was {peak_delta} bytes — streaming should bound this to \
         roughly one row, not the whole ~10 MB response"
    );
}

/// A single ~30 MB `VARBINARY(MAX)` cell streamed to a sink must keep peak heap
/// far below the cell size — proving the BLOB is read in chunks from the socket
/// rather than materialized. The buffered path would hold the whole 30 MB.
#[tokio::test]
#[ignore = "Requires a live SQL Server"]
async fn blob_streaming_bounds_peak_memory() {
    let Some(cfg) = config() else {
        return;
    };
    let mut client = Client::connect(cfg).await.expect("connect");

    const BIG_BLOB: &str = "SELECT 1 AS id, \
        CAST(REPLICATE(CAST('A' AS VARCHAR(MAX)), 30000000) AS VARBINARY(MAX)) AS doc";

    let baseline = LIVE.load(Ordering::Relaxed);
    PEAK.store(baseline, Ordering::Relaxed);

    let mut stream = client
        .query_stream_blob(BIG_BLOB, &[])
        .await
        .expect("stream");
    let _ = stream.next().await.expect("next").expect("one row");
    let mut sink = tokio::io::sink();
    let n = stream.copy_blob_to(&mut sink).await.expect("copy blob");

    let peak_delta = PEAK.load(Ordering::Relaxed).saturating_sub(baseline);
    eprintln!("blob_bytes={n} peak_delta={peak_delta} bytes");

    assert_eq!(n, 30_000_000, "expected a 30 MB blob");
    assert!(
        peak_delta < 2_000_000,
        "peak heap delta was {peak_delta} bytes — blob streaming should bound \
         this to ~one chunk, not the whole 30 MB cell"
    );
}

/// A row with *two* ~15 MB `VARBINARY(MAX)` columns streamed in sequence via
/// [`Client::query_stream_rows`] must keep peak heap far below either cell —
/// proving the multi-blob path streams each trailing column from the socket
/// rather than materializing it. The buffered path would hold ~30 MB.
#[tokio::test]
#[ignore = "Requires a live SQL Server"]
async fn multi_blob_streaming_bounds_peak_memory() {
    let Some(cfg) = config() else {
        return;
    };
    let mut client = Client::connect(cfg).await.expect("connect");

    const TWO_BIG_BLOBS: &str = "SELECT 1 AS id, \
        CAST(REPLICATE(CAST('A' AS VARCHAR(MAX)), 15000000) AS VARBINARY(MAX)) AS doc1, \
        CAST(REPLICATE(CAST('B' AS VARCHAR(MAX)), 15000000) AS VARBINARY(MAX)) AS doc2";

    let baseline = LIVE.load(Ordering::Relaxed);
    PEAK.store(baseline, Ordering::Relaxed);

    let mut stream = client
        .query_stream_rows(TWO_BIG_BLOBS, &[])
        .await
        .expect("stream");
    let _ = stream.next().await.expect("next").expect("one row");
    let mut sink = tokio::io::sink();
    let mut total = 0u64;
    while stream.next_blob().await.expect("next_blob") {
        total += stream.copy_blob_to(&mut sink).await.expect("copy blob");
    }

    let peak_delta = PEAK.load(Ordering::Relaxed).saturating_sub(baseline);
    eprintln!("total_blob_bytes={total} peak_delta={peak_delta} bytes");

    assert_eq!(total, 30_000_000, "expected two 15 MB blobs");
    assert!(
        peak_delta < 2_000_000,
        "peak heap delta was {peak_delta} bytes — multi-blob streaming should \
         bound this to ~one chunk, not a whole 15 MB cell"
    );
}