Skip to main content

mssql_client/
client.rs

1//! SQL Server client implementation.
2//!
3//! ## DDL and statement routing
4//!
5//! [`Client::execute`] routes automatically by parameter count: with no
6//! parameters it sends a SQL batch (which permits DDL such as `CREATE` / `ALTER`
7//! / `DROP`); with parameters it uses `sp_executesql`, whose procedure context
8//! SQL Server forbids DDL in. Run DDL with an empty parameter slice:
9//!
10//! ```rust,no_run
11//! # async fn create_table(config: mssql_client::Config) -> Result<(), mssql_client::Error> {
12//! # let mut client = mssql_client::Client::connect(config).await?;
13//! client.execute("CREATE TABLE dbo.t (id INT)", &[]).await?;
14//! # Ok(())
15//! # }
16//! ```
17//!
18//! Use [`Client::simple_query`] for fire-and-forget batches (including
19//! multi-statement, `;`-separated DDL) when you don't need the affected-row count.
20
21// Allow unwrap/expect for chrono date construction with known-valid constant dates
22// and for regex patterns that are compile-time constants
23#![allow(clippy::unwrap_used, clippy::expect_used, clippy::needless_range_loop)]
24
25mod connect;
26mod params;
27pub(crate) mod response;
28
29use std::marker::PhantomData;
30
31use mssql_codec::connection::Connection;
32#[cfg(feature = "tls")]
33use mssql_tls::TlsStream;
34use tds_protocol::packet::PacketType;
35use tds_protocol::rpc::RpcRequest;
36use tds_protocol::token::{EnvChange, EnvChangeType};
37use tokio::net::TcpStream;
38
39use crate::config::Config;
40use crate::error::{Error, Result};
41#[cfg(feature = "otel")]
42use crate::instrumentation::InstrumentationContext;
43use crate::state::{ConnectionState, InTransaction, Ready};
44use crate::statement_cache::StatementCache;
45use crate::stream::{MultiResultStream, QueryStream};
46use crate::transaction::SavePoint;
47
48/// How long to wait for the server to acknowledge an Attention packet after
49/// a command timeout. SqlClient waits 5 seconds before dooming the
50/// connection; we match it.
51const ATTENTION_ACK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
52
53/// Run a network future under an optional command deadline.
54///
55/// On timeout this sends an Attention packet via `canceller` and then awaits
56/// the future so its own read loop drains the server's DONE_ATTN
57/// acknowledgement, leaving the connection clean before returning
58/// [`Error::CommandTimeout`]. This is the cancel-safe alternative to dropping
59/// the future (e.g. via `tokio::time::timeout`), which would leave unconsumed
60/// TDS data in the connection buffer and desync the next request.
61///
62/// The drain itself is bounded by [`ATTENTION_ACK_TIMEOUT`] — a hung server
63/// that never acknowledges the attention must not turn the timeout into an
64/// infinite wait. When the bound expires the connection is abandoned
65/// mid-response: `in_flight` stays set, so the pool discards the connection
66/// at check-in instead of reusing it.
67pub(crate) async fn run_with_deadline<F, T>(
68    fut: F,
69    deadline: Option<std::time::Duration>,
70    canceller: crate::cancel::CancelHandle,
71) -> Result<T>
72where
73    F: std::future::Future<Output = Result<T>>,
74{
75    let Some(d) = deadline else {
76        return fut.await;
77    };
78    tokio::pin!(fut);
79    tokio::select! {
80        biased;
81        res = &mut fut => res,
82        () = tokio::time::sleep(d) => {
83            // Signal cancellation, then let the in-flight read consume the
84            // server's attention acknowledgement so the connection stays usable.
85            let drain = async {
86                let _ = canceller.cancel().await;
87                let _ = (&mut fut).await;
88            };
89            if tokio::time::timeout(ATTENTION_ACK_TIMEOUT, drain).await.is_err() {
90                tracing::warn!(
91                    timeout = ?ATTENTION_ACK_TIMEOUT,
92                    "server did not acknowledge attention; abandoning the connection as dirty"
93                );
94            }
95            Err(Error::CommandTimeout)
96        }
97    }
98}
99
100/// SQL Server client with type-state connection management.
101///
102/// The generic parameter `S` represents the current connection state,
103/// ensuring at compile time that certain operations are only available
104/// in appropriate states.
105pub struct Client<S: ConnectionState> {
106    config: Config,
107    _state: PhantomData<S>,
108    /// The underlying connection (present only when connected)
109    connection: Option<ConnectionHandle>,
110    /// Server version from LoginAck (raw u32 TDS version)
111    server_version: Option<u32>,
112    /// Current database from EnvChange
113    current_database: Option<String>,
114    /// Server's default collation from SqlCollation EnvChange during login.
115    /// Used when `SendStringParametersAsUnicode=false` to encode VARCHAR
116    /// parameters with the correct character encoding and collation bytes.
117    server_collation: Option<tds_protocol::token::Collation>,
118    /// Prepared statement cache for query optimization
119    statement_cache: StatementCache,
120    /// Transaction descriptor from BeginTransaction EnvChange.
121    /// Per MS-TDS spec, this value must be included in ALL_HEADERS for subsequent
122    /// requests within an explicit transaction. 0 indicates auto-commit mode.
123    transaction_descriptor: u64,
124    /// Whether a request has been sent and the response has not yet been fully read.
125    /// Used by the connection pool to detect dirty connections after cancel/timeout.
126    in_flight: bool,
127    /// Whether this connection needs a reset on next use.
128    /// Set by connection pool on checkin, cleared after first query/execute.
129    /// When true, the RESETCONNECTION flag is set on the first TDS packet.
130    needs_reset: bool,
131    /// OpenTelemetry instrumentation context (when otel feature is enabled)
132    #[cfg(feature = "otel")]
133    instrumentation: InstrumentationContext,
134    /// Always Encrypted context for column decryption (when always-encrypted feature is enabled)
135    #[cfg(feature = "always-encrypted")]
136    pub(crate) encryption_context: Option<std::sync::Arc<crate::encryption::EncryptionContext>>,
137}
138
139/// Internal connection handle wrapping the actual connection.
140///
141/// This is an enum to support different connection types:
142/// - TLS (TDS 8.0 strict mode) - requires `tls` feature
143/// - TLS with PreLogin wrapping (TDS 7.x style) - requires `tls` feature
144/// - Plain TCP (for internal networks or when `tls` feature is disabled)
145#[allow(dead_code)] // Connection will be used once query execution is implemented
146enum ConnectionHandle {
147    /// TLS connection (TDS 8.0 strict mode - TLS before any TDS traffic)
148    #[cfg(feature = "tls")]
149    Tls(Connection<TlsStream<TcpStream>>),
150    /// TLS connection with PreLogin wrapping (TDS 7.x style)
151    #[cfg(feature = "tls")]
152    TlsPrelogin(Connection<TlsStream<mssql_tls::TlsPreloginWrapper<TcpStream>>>),
153    /// Plain TCP connection (for internal networks or when `tls` feature is disabled)
154    Plain(Connection<TcpStream>),
155}
156
157/// The parameter `TypeInfo` to declare a typed NULL ([`crate::null`]) with, from
158/// its [`crate::ToSql::sql_type`] name. Returns `None` for an untyped NULL
159/// (`Option::None`, type `"NULL"`), which falls back to the default param type.
160#[cfg(feature = "always-encrypted")]
161fn null_param_type_info(sql_type: &str) -> Option<tds_protocol::rpc::TypeInfo> {
162    use tds_protocol::rpc::TypeInfo;
163    Some(match sql_type {
164        "BIT" => TypeInfo::bit(),
165        "TINYINT" => TypeInfo::tinyint(),
166        "SMALLINT" => TypeInfo::smallint(),
167        "INT" => TypeInfo::int(),
168        "BIGINT" => TypeInfo::bigint(),
169        "REAL" => TypeInfo::real(),
170        "FLOAT" => TypeInfo::float(),
171        "NVARCHAR" => TypeInfo::nvarchar(1),
172        "VARBINARY" => TypeInfo::varbinary(1),
173        "UNIQUEIDENTIFIER" => TypeInfo::uuid(),
174        "DATE" => TypeInfo::date(),
175        _ => return None,
176    })
177}
178
179/// Map a typed-parameter wrapper's [`EncryptedParamType`] to the `TypeInfo` the
180/// driver declares it as (for `sp_describe_parameter_encryption` and the
181/// `CryptoMetadata` base type). Unknown future variants error rather than
182/// silently declaring the wrong type.
183#[cfg(feature = "always-encrypted")]
184fn encrypted_param_type_info(
185    ty: mssql_types::EncryptedParamType,
186) -> Result<tds_protocol::rpc::TypeInfo> {
187    use mssql_types::EncryptedParamType as E;
188    use tds_protocol::rpc::TypeInfo;
189    Ok(match ty {
190        E::Decimal { precision, scale } => TypeInfo::decimal(precision, scale),
191        E::Time { scale } => TypeInfo::time(scale),
192        E::DateTime2 { scale } => TypeInfo::datetime2(scale),
193        E::DateTimeOffset { scale } => TypeInfo::datetimeoffset(scale),
194        E::DateTime => TypeInfo::datetime(),
195        E::Char { length } => TypeInfo::char(length),
196        E::NChar { length } => TypeInfo::nchar(length),
197        E::Binary { length } => TypeInfo::binary(length),
198        _ => {
199            return Err(Error::Encryption(
200                "unsupported Always Encrypted parameter type".to_string(),
201            ));
202        }
203    })
204}
205
206// Private helper methods available to all connection states
207impl<S: ConnectionState> Client<S> {
208    /// The default per-command deadline from `command_timeout`.
209    ///
210    /// Returns `None` when `command_timeout` is zero, which means "no limit"
211    /// (matching ADO.NET's `SqlCommand.CommandTimeout = 0`).
212    pub(crate) fn command_deadline(&self) -> Option<std::time::Duration> {
213        let t = self.config.command_timeout;
214        if t.is_zero() { None } else { Some(t) }
215    }
216
217    /// Build a cancel handle for the current connection, regardless of
218    /// connection state. The public, documented surface is
219    /// [`Client::<Ready>::cancel_handle`]; both state-specific methods
220    /// delegate here.
221    pub(crate) fn connection_cancel_handle(&self) -> crate::cancel::CancelHandle {
222        let connection = self
223            .connection
224            .as_ref()
225            .expect("connection should be present");
226        match connection {
227            #[cfg(feature = "tls")]
228            ConnectionHandle::Tls(conn) => {
229                crate::cancel::CancelHandle::from_tls(conn.cancel_handle())
230            }
231            #[cfg(feature = "tls")]
232            ConnectionHandle::TlsPrelogin(conn) => {
233                crate::cancel::CancelHandle::from_tls_prelogin(conn.cancel_handle())
234            }
235            ConnectionHandle::Plain(conn) => {
236                crate::cancel::CancelHandle::from_plain(conn.cancel_handle())
237            }
238        }
239    }
240
241    /// Cancel an in-flight response that was abandoned without being drained —
242    /// e.g. a [`RowStream`](crate::RowStream) dropped or cancelled mid-result.
243    ///
244    /// Sends an Attention and drains to the server's DONE_ATTN acknowledgement so
245    /// the socket is clean and the connection reusable. A no-op when nothing is
246    /// in flight. Bounded by [`ATTENTION_ACK_TIMEOUT`]: if the acknowledgement
247    /// never arrives the connection is left marked in-flight (so the pool
248    /// discards it on return) and an error is returned.
249    pub(crate) async fn cancel_in_flight_response(&mut self) -> Result<()> {
250        if !self.in_flight {
251            return Ok(());
252        }
253        let canceller = self.connection_cancel_handle();
254        let drain = async {
255            canceller.cancel().await?;
256            // With the cancelling flag set, `read_response_message` routes through
257            // the codec's drain-after-cancel path and returns `Err(Cancelled)`
258            // once the DONE_ATTN acknowledgement is consumed (clearing
259            // `in_flight`). Any full messages that arrive before the ack are
260            // discarded.
261            loop {
262                match self.read_response_message().await {
263                    Err(Error::Cancelled) => return Ok(()),
264                    Ok(_) => continue,
265                    Err(e) => return Err(e),
266                }
267            }
268        };
269        match tokio::time::timeout(ATTENTION_ACK_TIMEOUT, drain).await {
270            Ok(result) => result,
271            Err(_) => {
272                tracing::warn!(
273                    timeout = ?ATTENTION_ACK_TIMEOUT,
274                    "attention acknowledgement not received while cancelling an \
275                     abandoned response; connection left dirty"
276                );
277                Err(Error::Cancelled)
278            }
279        }
280    }
281
282    /// Process transaction-related EnvChange tokens.
283    ///
284    /// This handles BeginTransaction, CommitTransaction, and RollbackTransaction
285    /// EnvChange tokens, updating the transaction descriptor accordingly.
286    ///
287    /// This enables executing BEGIN TRANSACTION, COMMIT, and ROLLBACK via raw SQL
288    /// while still having the transaction descriptor tracked correctly.
289    fn process_transaction_env_change(env: &EnvChange, transaction_descriptor: &mut u64) {
290        use tds_protocol::token::EnvChangeValue;
291
292        match env.env_type {
293            EnvChangeType::BeginTransaction => {
294                if let EnvChangeValue::Binary(ref data) = env.new_value {
295                    if data.len() >= 8 {
296                        let descriptor = u64::from_le_bytes([
297                            data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
298                        ]);
299                        tracing::debug!(descriptor = descriptor, "transaction started via raw SQL");
300                        *transaction_descriptor = descriptor;
301                    }
302                }
303            }
304            EnvChangeType::CommitTransaction | EnvChangeType::RollbackTransaction => {
305                tracing::debug!(
306                    env_type = ?env.env_type,
307                    "transaction ended via raw SQL"
308                );
309                *transaction_descriptor = 0;
310            }
311            _ => {}
312        }
313    }
314
315    /// Apply a transaction-related `ENVCHANGE` to this client's descriptor.
316    ///
317    /// Lets the streaming readers (which live in sibling modules) keep the
318    /// transaction descriptor in sync with raw `BEGIN`/`COMMIT`/`ROLLBACK`
319    /// batches seen mid-stream, exactly as the buffered readers do.
320    pub(crate) fn apply_transaction_env_change(&mut self, env: &EnvChange) {
321        Self::process_transaction_env_change(env, &mut self.transaction_descriptor);
322    }
323
324    /// Send a SQL batch to the server.
325    ///
326    /// Uses the client's current transaction descriptor in ALL_HEADERS.
327    /// Per MS-TDS spec, when in an explicit transaction, the descriptor
328    /// returned by BeginTransaction must be included.
329    ///
330    /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
331    /// is included in the first packet to reset connection state.
332    async fn send_sql_batch(&mut self, sql: &str) -> Result<()> {
333        // If a previous streamed response was abandoned (a RowStream dropped
334        // mid-result), drain it before issuing a new request so the next read
335        // does not pick up the old response's bytes.
336        self.cancel_in_flight_response().await?;
337
338        let payload = tds_protocol::__private::encode_sql_batch_with_transaction(
339            sql,
340            self.transaction_descriptor,
341        );
342        let max_packet = self.config.packet_size as usize;
343
344        // Check if we need to reset the connection on this request
345        let reset = self.needs_reset;
346        if reset {
347            self.needs_reset = false; // Clear flag before sending
348            // RESETCONNECTION invalidates all server-side prepared handles, so
349            // drop the cache (no sp_unprepare needed — the server released them).
350            let _ = self.statement_cache.clear();
351            tracing::debug!("sending SQL batch with RESETCONNECTION flag");
352        }
353
354        self.in_flight = true;
355        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
356
357        match connection {
358            #[cfg(feature = "tls")]
359            ConnectionHandle::Tls(conn) => {
360                conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
361                    .await?;
362            }
363            #[cfg(feature = "tls")]
364            ConnectionHandle::TlsPrelogin(conn) => {
365                conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
366                    .await?;
367            }
368            ConnectionHandle::Plain(conn) => {
369                conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
370                    .await?;
371            }
372        }
373
374        Ok(())
375    }
376
377    /// Send an RPC request to the server.
378    ///
379    /// Uses the client's current transaction descriptor in ALL_HEADERS.
380    ///
381    /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
382    /// is included in the first packet to reset connection state.
383    pub(crate) async fn send_rpc(&mut self, rpc: &RpcRequest) -> Result<()> {
384        // Drain an abandoned streamed response (see `send_sql_batch`) before
385        // issuing this request.
386        self.cancel_in_flight_response().await?;
387
388        let payload = rpc.encode_with_transaction(self.transaction_descriptor);
389        let max_packet = self.config.packet_size as usize;
390
391        // Check if we need to reset the connection on this request
392        let reset = self.needs_reset;
393        if reset {
394            self.needs_reset = false; // Clear flag before sending
395            // RESETCONNECTION invalidates all server-side prepared handles, so
396            // drop the cache (no sp_unprepare needed — the server released them).
397            let _ = self.statement_cache.clear();
398            tracing::debug!("sending RPC with RESETCONNECTION flag");
399        }
400
401        self.in_flight = true;
402        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
403
404        match connection {
405            #[cfg(feature = "tls")]
406            ConnectionHandle::Tls(conn) => {
407                conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
408                    .await?;
409            }
410            #[cfg(feature = "tls")]
411            ConnectionHandle::TlsPrelogin(conn) => {
412                conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
413                    .await?;
414            }
415            ConnectionHandle::Plain(conn) => {
416                conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
417                    .await?;
418            }
419        }
420
421        Ok(())
422    }
423
424    /// Start building a stored procedure call with full control over parameters.
425    ///
426    /// Returns a [`crate::procedure::ProcedureBuilder`] that allows adding named input and output
427    /// parameters before executing the call.
428    ///
429    /// The procedure name is validated to prevent SQL injection. It may be
430    /// schema-qualified (e.g., `"dbo.MyProc"`).
431    ///
432    /// # Example
433    ///
434    /// ```rust,no_run
435    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
436    /// let result = client.procedure("dbo.CalculateSum")?
437    ///     .input("@a", &10i32)
438    ///     .input("@b", &20i32)
439    ///     .output_int("@result")
440    ///     .execute().await?;
441    ///
442    /// let sum = result.get_output("@result").unwrap();
443    /// # let _ = sum;
444    /// # Ok(())
445    /// # }
446    /// ```
447    pub fn procedure(
448        &mut self,
449        proc_name: &str,
450    ) -> Result<crate::procedure::ProcedureBuilder<'_, S>> {
451        crate::validation::validate_qualified_identifier(proc_name)?;
452        Ok(crate::procedure::ProcedureBuilder::new(self, proc_name))
453    }
454
455    /// Execute a stored procedure with positional input parameters.
456    ///
457    /// This is a convenience method for the common case of calling a procedure
458    /// with input-only parameters. For output parameters or named parameters,
459    /// use [`procedure()`](Client::procedure) instead.
460    ///
461    /// # Example
462    ///
463    /// ```rust,no_run
464    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
465    /// let result = client.call_procedure("dbo.GetUser", &[&1i32]).await?;
466    /// assert_eq!(result.return_value, 0);
467    ///
468    /// if let Some(rs) = result.first_result_set() {
469    ///     println!("columns: {:?}", rs.columns());
470    /// }
471    /// # Ok(())
472    /// # }
473    /// ```
474    pub async fn call_procedure(
475        &mut self,
476        proc_name: &str,
477        params: &[&(dyn crate::ToSql + Sync)],
478    ) -> Result<crate::stream::ProcedureResult> {
479        crate::validation::validate_qualified_identifier(proc_name)?;
480
481        tracing::debug!(
482            proc_name = proc_name,
483            params_count = params.len(),
484            "executing stored procedure"
485        );
486
487        let rpc_params =
488            Self::convert_params_positional(params, self.send_unicode(), self.server_collation())?;
489        let mut rpc = RpcRequest::named(proc_name);
490        for param in rpc_params {
491            rpc = rpc.param(param);
492        }
493
494        #[cfg(feature = "otel")]
495        let instrumentation = self.instrumentation.clone();
496        #[cfg(feature = "otel")]
497        let mut span = instrumentation.procedure_span(proc_name);
498        #[cfg(feature = "otel")]
499        let timer = crate::instrumentation::OperationTimer::start("EXECUTE");
500
501        let deadline = self.command_deadline();
502        let canceller = self.connection_cancel_handle();
503        let result = run_with_deadline(
504            async {
505                self.send_rpc(&rpc).await?;
506                self.read_procedure_result().await
507            },
508            deadline,
509            canceller,
510        )
511        .await;
512
513        #[cfg(feature = "otel")]
514        match &result {
515            Ok(r) => InstrumentationContext::record_success(&mut span, Some(r.rows_affected)),
516            Err(e) => InstrumentationContext::record_error(&mut span, e),
517        }
518        #[cfg(feature = "otel")]
519        timer.finish(instrumentation.metrics(), result.is_ok());
520        #[cfg(feature = "otel")]
521        drop(span);
522
523        result
524    }
525
526    /// Ask the server how each parameter of a statement must be encrypted.
527    ///
528    /// Issues the `sp_describe_parameter_encryption` system RPC for the
529    /// parameterized statement `tsql` with the parameter declaration `params`
530    /// (e.g. `"@id int, @name nvarchar(64)"`), and parses the two result sets
531    /// into a [`ParameterEncryptionInfo`](crate::encryption::ParameterEncryptionInfo): the
532    /// CEK table, plus — for each parameter the server reports as encrypted —
533    /// which CEK and whether deterministic or randomized. Parameters the server
534    /// reports as plaintext are omitted.
535    ///
536    /// This is the first step of Always Encrypted parameter encryption; the
537    /// connection must have negotiated it (`Column Encryption Setting=Enabled`).
538    #[cfg(feature = "always-encrypted")]
539    pub(crate) async fn describe_parameter_encryption(
540        &mut self,
541        tsql: &str,
542        params: &str,
543    ) -> Result<crate::encryption::ParameterEncryptionInfo> {
544        let tsql_arg = tsql.to_string();
545        let params_arg = params.to_string();
546        let mut result = self
547            .call_procedure(
548                "sp_describe_parameter_encryption",
549                &[&tsql_arg, &params_arg],
550            )
551            .await?;
552        crate::encryption::ParameterEncryptionInfo::from_describe_result_sets(
553            &mut result.result_sets,
554        )
555    }
556
557    /// Build the `sp_executesql` request for a parameterized statement.
558    ///
559    /// When the connection has Always Encrypted enabled, parameters the server
560    /// reports as encrypted are encrypted client-side first (an extra
561    /// `sp_describe_parameter_encryption` round-trip). Otherwise this is the
562    /// plain parameter conversion.
563    pub(crate) async fn build_parameterized_rpc(
564        &mut self,
565        sql: &str,
566        params: &[&(dyn crate::ToSql + Sync)],
567    ) -> Result<RpcRequest> {
568        #[cfg(feature = "always-encrypted")]
569        if self.encryption_context.is_some() {
570            return self.build_encrypted_sql_rpc(sql, params).await;
571        }
572        let rpc_params =
573            Self::convert_params(params, self.send_unicode(), self.server_collation())?;
574        Ok(RpcRequest::execute_sql(sql, rpc_params))
575    }
576
577    /// Send a parameterized `query` request, consulting the prepared-statement
578    /// cache when [`Config::statement_cache`](crate::Config::statement_cache)
579    /// is enabled. Leaves the execution response ready for the caller's
580    /// `read_query_response`.
581    ///
582    /// Falls back to the default path (SQL batch for no params, `sp_executesql`
583    /// otherwise) when the cache is disabled, the query has no parameters, or
584    /// Always Encrypted is active (prepared + AE parameter encryption is out of
585    /// scope for this first increment).
586    async fn send_query_request(
587        &mut self,
588        sql: &str,
589        params: &[&(dyn crate::ToSql + Sync)],
590    ) -> Result<()> {
591        #[cfg(feature = "always-encrypted")]
592        let ae_active = self.encryption_context.is_some();
593        #[cfg(not(feature = "always-encrypted"))]
594        let ae_active = false;
595
596        if !self.config.statement_cache || params.is_empty() || ae_active {
597            if params.is_empty() {
598                self.send_sql_batch(sql).await?;
599            } else {
600                let rpc = self.build_parameterized_rpc(sql, params).await?;
601                self.send_rpc(&rpc).await?;
602            }
603            return Ok(());
604        }
605
606        // If a connection reset is pending, the next packet carries
607        // RESETCONNECTION (set in `send_rpc`), which invalidates every
608        // server-side prepared handle. Drop the cache BEFORE the lookup so this
609        // request re-prepares instead of `sp_execute`-ing a handle the reset is
610        // about to invalidate (otherwise the server rejects it with "Could not
611        // find prepared statement with handle N").
612        if self.needs_reset {
613            let _ = self.statement_cache.clear();
614        }
615
616        let rpc_params =
617            Self::convert_params(params, self.send_unicode(), self.server_collation())?;
618        // Key on the parameter declaration + SQL: a cached handle is only valid
619        // for the exact prepared parameter types, so two calls with the same
620        // SQL but different param types must not share a handle.
621        let key = format!(
622            "{}\u{1}{sql}",
623            RpcRequest::build_param_declarations(&rpc_params)
624        );
625
626        if let Some(handle) = self.statement_cache.get(&key) {
627            // Hit: sp_execute the cached handle. Clear any stale pending key
628            // (e.g. from a prior request whose read aborted) so the read path
629            // does not try to capture a handle from this response.
630            self.statement_cache.set_pending(None);
631            let rpc = RpcRequest::execute(handle, rpc_params);
632            self.send_rpc(&rpc).await?;
633        } else {
634            // Miss: sp_prepexec prepares and executes in ONE round-trip. The
635            // caller's read_query_response reads the row response and captures
636            // the `@handle` RETURNVALUE, then stores it under `key` (see
637            // `store_pending_prepared_handle`).
638            self.statement_cache.set_pending(Some(key));
639            let rpc = RpcRequest::prepexec(sql, rpc_params);
640            self.send_rpc(&rpc).await?;
641        }
642        Ok(())
643    }
644
645    /// Store the handle captured from an `sp_prepexec` execution response under
646    /// the pending cache key (set by [`send_query_request`](Self::send_query_request)
647    /// on a cold miss), releasing any LRU-evicted server-side handle.
648    ///
649    /// Called by `read_query_response` after it has read the row response and
650    /// the trailing `@handle` RETURNVALUE. A no-op when no prepexec is pending.
651    /// Eviction `sp_unprepare` is best-effort: a failure leaks one handle until
652    /// connection reset, never corrupts data.
653    pub(super) async fn store_pending_prepared_handle(
654        &mut self,
655        handle: Option<i32>,
656    ) -> Result<()> {
657        let Some(key) = self.statement_cache.take_pending() else {
658            return Ok(());
659        };
660        let Some(handle) = handle else {
661            // No @handle came back (unexpected for sp_prepexec): leave the
662            // statement uncached so the next call simply re-prepares.
663            return Ok(());
664        };
665        if let Some(evicted) = self
666            .statement_cache
667            .insert(crate::statement_cache::PreparedStatement::new(handle, key))
668        {
669            let unprepare = RpcRequest::unprepare(evicted.handle());
670            self.send_rpc(&unprepare).await?;
671            let _ = self.read_procedure_result().await?;
672        }
673        Ok(())
674    }
675
676    /// Encrypt the Always Encrypted parameters of a statement, then build its
677    /// `sp_executesql` request.
678    ///
679    /// Asks the server which parameters are encrypted
680    /// ([`describe_parameter_encryption`](Self::describe_parameter_encryption)),
681    /// then for each one normalizes the value, resolves its column encryption
682    /// key, encrypts, and emits an encrypted RPC parameter. Parameters the
683    /// server reports as plaintext are sent unchanged.
684    #[cfg(feature = "always-encrypted")]
685    async fn build_encrypted_sql_rpc(
686        &mut self,
687        sql: &str,
688        params: &[&(dyn crate::ToSql + Sync)],
689    ) -> Result<RpcRequest> {
690        use tds_protocol::rpc::RpcParam;
691
692        let Some(ctx) = self.encryption_context.clone() else {
693            let rpc_params =
694                Self::convert_params(params, self.send_unicode(), self.server_collation())?;
695            return Ok(RpcRequest::execute_sql(sql, rpc_params));
696        };
697
698        // Resolve each parameter's value once (AE normalization needs the typed
699        // value, not the wire encoding) and build the plaintext RPC params.
700        let send_unicode = self.send_unicode();
701        let collation = self.server_collation().cloned();
702        let mut values: Vec<mssql_types::SqlValue> = Vec::with_capacity(params.len());
703        let mut plaintext: Vec<RpcParam> = Vec::with_capacity(params.len());
704        let mut hints: Vec<Option<mssql_types::EncryptedParamType>> =
705            Vec::with_capacity(params.len());
706        for (i, p) in params.iter().enumerate() {
707            let name = format!("@p{}", i + 1);
708            let value = p.to_sql()?;
709            let hint = p.encrypted_param_type();
710            // A typed NULL (e.g. `null::<i32>()`) is declared by its SQL type so
711            // describe accepts it against the target encrypted column; an untyped
712            // NULL falls back to the default in `sql_value_to_rpc_param`.
713            let rpc_param = match (&value, null_param_type_info(p.sql_type())) {
714                (mssql_types::SqlValue::Null, Some(type_info)) => RpcParam::null(&name, type_info),
715                _ => {
716                    let mut param = Self::sql_value_to_rpc_param(
717                        &name,
718                        &value,
719                        send_unicode,
720                        collation.as_ref(),
721                    )?;
722                    // A typed-parameter wrapper (e.g. `numeric(v, p, s)`,
723                    // `datetime2(v, scale)`) declares an explicit SQL type so
724                    // describe matches the encrypted column exactly — the value
725                    // alone cannot convey precision/scale or the legacy-`datetime`
726                    // vs `datetime2` distinction.
727                    if let Some(ty) = hint {
728                        param.type_info = encrypted_param_type_info(ty)?;
729                    }
730                    param
731                }
732            };
733            plaintext.push(rpc_param);
734            values.push(value);
735            hints.push(hint);
736        }
737
738        if plaintext.is_empty() {
739            return Ok(RpcRequest::execute_sql(sql, plaintext));
740        }
741
742        // Ask the server which parameters need encryption.
743        let declarations = RpcRequest::build_param_declarations(&plaintext);
744        let info = self
745            .describe_parameter_encryption(sql, &declarations)
746            .await?;
747        if info.parameters.is_empty() {
748            return Ok(RpcRequest::execute_sql(sql, plaintext));
749        }
750
751        // Encrypt the flagged parameters; pass the rest through untouched.
752        let mut final_params: Vec<RpcParam> = Vec::with_capacity(plaintext.len());
753        for ((value, param), hint) in values.into_iter().zip(plaintext).zip(hints) {
754            let Some(crypto) = info.get_parameter(&param.name) else {
755                final_params.push(param);
756                continue;
757            };
758            let entry = info.cek_table.get(crypto.cek_ordinal).ok_or_else(|| {
759                Error::Protocol(format!(
760                    "encrypted parameter {} references missing CEK ordinal {}",
761                    param.name, crypto.cek_ordinal
762                ))
763            })?;
764            let metadata = tds_protocol::rpc::EncryptedParamMetadata {
765                base_type_info: param.type_info.clone(),
766                algorithm_id: crypto.algorithm_id,
767                encryption_type: crypto.encryption_type,
768                database_id: entry.database_id,
769                cek_id: entry.cek_id,
770                cek_version: entry.cek_version,
771                cek_md_version: entry.cek_md_version,
772                normalization_rule_version: crypto.normalization_rule_version,
773            };
774            // A NULL value bound to an encrypted column is sent as an encrypted
775            // NULL (the server rejects a plaintext parameter for an encrypted
776            // column); there is nothing to encrypt.
777            if matches!(value, mssql_types::SqlValue::Null) {
778                final_params.push(RpcParam::encrypted_null(param.name, metadata));
779                continue;
780            }
781            let normalized = crate::encryption::normalize_for_encryption(&value, hint)?;
782            let ciphertext = ctx
783                .encrypt_value(&normalized, entry, crypto.encryption_type)
784                .await?;
785            final_params.push(RpcParam::encrypted(
786                param.name,
787                bytes::Bytes::from(ciphertext),
788                metadata,
789            ));
790        }
791
792        Ok(RpcRequest::execute_sql(sql, final_params))
793    }
794
795    /// Start a bulk insert operation for the specified table.
796    ///
797    /// Sends the `INSERT BULK` statement to the server and returns a
798    /// [`crate::bulk::BulkWriter`] for streaming rows. The writer holds
799    /// a mutable borrow on the client, preventing other operations while
800    /// the bulk insert is in progress.
801    ///
802    /// # Example
803    ///
804    /// ```rust,no_run
805    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
806    /// use mssql_client::{BulkInsertBuilder, BulkColumn, SqlValue};
807    ///
808    /// let builder = BulkInsertBuilder::new("dbo.Users")
809    ///     .with_typed_columns(vec![
810    ///         BulkColumn::new("id", "INT", 0)?,
811    ///         BulkColumn::new("name", "NVARCHAR(100)", 1)?,
812    ///     ]);
813    ///
814    /// let mut writer = client.bulk_insert(&builder).await?;
815    /// writer.send_row_values(&[SqlValue::Int(1), SqlValue::String("Alice".into())])?;
816    /// writer.send_row_values(&[SqlValue::Int(2), SqlValue::String("Bob".into())])?;
817    /// let result = writer.finish().await?;
818    /// println!("Inserted {} rows", result.rows_affected);
819    /// # Ok(())
820    /// # }
821    /// ```
822    pub async fn bulk_insert(
823        &mut self,
824        builder: &crate::bulk::BulkInsertBuilder,
825    ) -> Result<crate::bulk::BulkWriter<'_, S>> {
826        use tds_protocol::token::{ColMetaData, Token};
827
828        tracing::debug!(
829            table = builder.table_name(),
830            columns = builder.columns().len(),
831            "starting bulk insert"
832        );
833
834        // Step 1: Query the server for column metadata.
835        // This gives us the exact type encoding the server expects for BulkLoad,
836        // following the pattern established by Tiberius.
837        let meta_query = format!("SELECT TOP 0 * FROM {}", builder.table_name());
838        let deadline = self.command_deadline();
839        let canceller = self.connection_cancel_handle();
840        let message = run_with_deadline(
841            async {
842                self.send_sql_batch(&meta_query).await?;
843                self.read_response_message().await
844            },
845            deadline,
846            canceller,
847        )
848        .await?;
849        self.in_flight = false;
850
851        // Capture both the raw COLMETADATA bytes and parsed column info
852        let raw_payload = message.payload.clone();
853        let mut parser = self.create_parser(message.payload);
854        let mut server_metadata: Option<ColMetaData> = None;
855        let mut meta_start: usize = 0;
856        let mut meta_end: usize = 0;
857
858        loop {
859            let pos_before = raw_payload.len() - parser.remaining();
860            let token = parser.next_token_with_metadata(server_metadata.as_ref())?;
861            let pos_after = raw_payload.len() - parser.remaining();
862            let Some(token) = token else { break };
863
864            match token {
865                Token::ColMetaData(meta) => {
866                    meta_start = pos_before;
867                    meta_end = pos_after;
868                    server_metadata = Some(meta);
869                }
870                Token::Done(_) => break,
871                _ => {}
872            }
873        }
874
875        // Reject deprecated TEXT/NTEXT/IMAGE columns reported by the server.
876        // These types require a legacy TEXTPTR wire format that this driver
877        // does not support — users should migrate the column to VARCHAR(MAX) /
878        // NVARCHAR(MAX) / VARBINARY(MAX).
879        if let Some(ref meta) = server_metadata {
880            use tds_protocol::types::TypeId;
881            for col in meta.columns.iter() {
882                let (rejected, replacement) = match col.type_id {
883                    TypeId::Text => (Some("TEXT"), "VARCHAR(MAX)"),
884                    TypeId::NText => (Some("NTEXT"), "NVARCHAR(MAX)"),
885                    TypeId::Image => (Some("IMAGE"), "VARBINARY(MAX)"),
886                    _ => (None, ""),
887                };
888                if let Some(sql_type) = rejected {
889                    return Err(Error::from(mssql_types::TypeError::UnsupportedType {
890                        sql_type: sql_type.to_string(),
891                        reason: format!(
892                            "column `{}` in table `{}` is {} — TEXT/NTEXT/IMAGE \
893                             are not supported. Alter the column to {} instead \
894                             (Microsoft deprecated TEXT/NTEXT/IMAGE in SQL \
895                             Server 2005).",
896                            col.name,
897                            builder.table_name(),
898                            sql_type,
899                            replacement,
900                        ),
901                    }));
902                }
903            }
904        }
905
906        // Step 2: Send INSERT BULK statement to put server in bulk load mode
907        let stmt = builder.build_insert_bulk_statement()?;
908        let deadline = self.command_deadline();
909        let canceller = self.connection_cancel_handle();
910        run_with_deadline(
911            async {
912                self.send_sql_batch(&stmt).await?;
913                self.read_execute_result().await
914            },
915            deadline,
916            canceller,
917        )
918        .await?;
919
920        // Step 3: Create bulk writer with server's metadata
921        let raw_meta = if meta_end > meta_start {
922            Some(raw_payload.slice(meta_start..meta_end))
923        } else {
924            None
925        };
926
927        let server_cols = server_metadata.as_ref().map(|m| m.columns.as_slice());
928        let bulk = crate::bulk::BulkInsert::new_with_server_metadata(
929            builder.columns().to_vec(),
930            builder.options().batch_size,
931            raw_meta,
932            server_cols,
933        );
934
935        Ok(crate::bulk::BulkWriter::new(self, bulk))
936    }
937
938    /// Start a bulk insert without querying the server for column metadata.
939    ///
940    /// Unlike [`bulk_insert()`](Self::bulk_insert), this method does not send
941    /// `SELECT TOP 0 * FROM table` to discover column types. Instead, the
942    /// column metadata is constructed from the `BulkColumn` types provided
943    /// on the builder. This saves a round-trip when the schema is known.
944    ///
945    /// # Caveats
946    ///
947    /// The caller must ensure `BulkColumn` entries match the target table's
948    /// column definitions exactly. Mismatched types, lengths, precision/scale,
949    /// or column ordering will cause the server to reject the BulkLoad packet.
950    ///
951    /// For most use cases, prefer [`bulk_insert()`](Self::bulk_insert) — the
952    /// extra round-trip is usually negligible and the server-supplied metadata
953    /// is guaranteed correct.
954    pub async fn bulk_insert_without_schema_discovery(
955        &mut self,
956        builder: &crate::bulk::BulkInsertBuilder,
957    ) -> Result<crate::bulk::BulkWriter<'_, S>> {
958        tracing::debug!(
959            table = builder.table_name(),
960            columns = builder.columns().len(),
961            "starting bulk insert (no schema discovery)"
962        );
963
964        // Send INSERT BULK statement to put server in bulk load mode
965        let stmt = builder.build_insert_bulk_statement()?;
966        let deadline = self.command_deadline();
967        let canceller = self.connection_cancel_handle();
968        run_with_deadline(
969            async {
970                self.send_sql_batch(&stmt).await?;
971                self.read_execute_result().await
972            },
973            deadline,
974            canceller,
975        )
976        .await?;
977
978        // Create bulk writer with hand-crafted metadata
979        let bulk =
980            crate::bulk::BulkInsert::new(builder.columns().to_vec(), builder.options().batch_size);
981
982        Ok(crate::bulk::BulkWriter::new(self, bulk))
983    }
984
985    /// Send bulk load data as a BulkLoad (0x07) message and read the server response.
986    ///
987    /// Used internally by [`crate::bulk::BulkWriter::finish()`] to transmit accumulated
988    /// row data after the `INSERT BULK` statement has been acknowledged.
989    pub(crate) async fn send_and_read_bulk_load(&mut self, payload: bytes::Bytes) -> Result<u64> {
990        let max_packet = self.config.packet_size as usize;
991
992        self.in_flight = true;
993        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
994
995        match connection {
996            #[cfg(feature = "tls")]
997            ConnectionHandle::Tls(conn) => {
998                conn.send_message(PacketType::BulkLoad, payload, max_packet)
999                    .await?;
1000            }
1001            #[cfg(feature = "tls")]
1002            ConnectionHandle::TlsPrelogin(conn) => {
1003                conn.send_message(PacketType::BulkLoad, payload, max_packet)
1004                    .await?;
1005            }
1006            ConnectionHandle::Plain(conn) => {
1007                conn.send_message(PacketType::BulkLoad, payload, max_packet)
1008                    .await?;
1009            }
1010        }
1011
1012        // Read the server's Done response with row count
1013        self.read_execute_result().await
1014    }
1015
1016    /// Execute a query with named parameters and return a streaming result set.
1017    ///
1018    /// This method accepts [`NamedParam`](crate::to_params::NamedParam) values,
1019    /// making it compatible with the [`ToParams`](crate::to_params::ToParams) trait
1020    /// and the `#[derive(ToParams)]` macro.
1021    ///
1022    /// # Example
1023    ///
1024    /// ```rust,no_run
1025    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1026    /// use mssql_client::{NamedParam, ToParams};
1027    ///
1028    /// // With derive macro:
1029    /// #[derive(mssql_derive::ToParams)]
1030    /// struct UserQuery { name: String }
1031    ///
1032    /// let q = UserQuery { name: "Alice".into() };
1033    /// let rows = client.query_named(
1034    ///     "SELECT * FROM users WHERE name = @name",
1035    ///     &q.to_params()?,
1036    /// ).await?;
1037    ///
1038    /// // Or manually:
1039    /// let params = vec![NamedParam::from_value("name", &"Alice")?];
1040    /// let rows = client.query_named(
1041    ///     "SELECT * FROM users WHERE name = @name",
1042    ///     &params,
1043    /// ).await?;
1044    /// # let _ = rows;
1045    /// # Ok(())
1046    /// # }
1047    /// ```
1048    pub async fn query_named<'a>(
1049        &'a mut self,
1050        sql: &str,
1051        params: &[crate::to_params::NamedParam],
1052    ) -> Result<QueryStream<'a>> {
1053        tracing::debug!(
1054            sql = sql,
1055            params_count = params.len(),
1056            "executing query with named parameters"
1057        );
1058
1059        #[cfg(feature = "otel")]
1060        let instrumentation = self.instrumentation.clone();
1061        #[cfg(feature = "otel")]
1062        let mut span = instrumentation.query_span(sql);
1063        #[cfg(feature = "otel")]
1064        let timer = crate::instrumentation::OperationTimer::start(
1065            crate::instrumentation::extract_operation(sql),
1066        );
1067
1068        let result = async {
1069            if params.is_empty() {
1070                self.send_sql_batch(sql).await?;
1071            } else {
1072                let rpc_params = Self::convert_named_params(
1073                    params,
1074                    self.send_unicode(),
1075                    self.server_collation(),
1076                )?;
1077                let rpc = RpcRequest::execute_sql(sql, rpc_params);
1078                self.send_rpc(&rpc).await?;
1079            }
1080
1081            self.read_query_response().await
1082        }
1083        .await;
1084
1085        #[cfg(feature = "otel")]
1086        match &result {
1087            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1088            Err(e) => InstrumentationContext::record_error(&mut span, e),
1089        }
1090        #[cfg(feature = "otel")]
1091        timer.finish(instrumentation.metrics(), result.is_ok());
1092        #[cfg(feature = "otel")]
1093        drop(span);
1094
1095        let resp = result?;
1096        #[cfg(feature = "always-encrypted")]
1097        {
1098            Ok(QueryStream::from_raw(
1099                resp.columns,
1100                resp.pending_rows,
1101                resp.meta,
1102                resp.decryptor,
1103            ))
1104        }
1105        #[cfg(not(feature = "always-encrypted"))]
1106        {
1107            Ok(QueryStream::from_raw(
1108                resp.columns,
1109                resp.pending_rows,
1110                resp.meta,
1111            ))
1112        }
1113    }
1114
1115    /// Execute a statement with named parameters.
1116    ///
1117    /// Returns the number of affected rows. This is the named-parameter
1118    /// counterpart of [`execute()`](Client::execute), compatible with the
1119    /// [`ToParams`](crate::to_params::ToParams) trait.
1120    ///
1121    /// # Example
1122    ///
1123    /// ```rust,no_run
1124    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1125    /// use mssql_client::NamedParam;
1126    ///
1127    /// let params = vec![
1128    ///     NamedParam::from_value("name", &"Alice")?,
1129    ///     NamedParam::from_value("email", &"alice@example.com")?,
1130    /// ];
1131    /// let rows_affected = client.execute_named(
1132    ///     "INSERT INTO users (name, email) VALUES (@name, @email)",
1133    ///     &params,
1134    /// ).await?;
1135    /// # let _ = rows_affected;
1136    /// # Ok(())
1137    /// # }
1138    /// ```
1139    pub async fn execute_named(
1140        &mut self,
1141        sql: &str,
1142        params: &[crate::to_params::NamedParam],
1143    ) -> Result<u64> {
1144        tracing::debug!(
1145            sql = sql,
1146            params_count = params.len(),
1147            "executing statement with named parameters"
1148        );
1149
1150        #[cfg(feature = "otel")]
1151        let instrumentation = self.instrumentation.clone();
1152        #[cfg(feature = "otel")]
1153        let mut span = instrumentation.query_span(sql);
1154        #[cfg(feature = "otel")]
1155        let timer = crate::instrumentation::OperationTimer::start(
1156            crate::instrumentation::extract_operation(sql),
1157        );
1158
1159        let deadline = self.command_deadline();
1160        let canceller = self.connection_cancel_handle();
1161        let result = run_with_deadline(
1162            async {
1163                if params.is_empty() {
1164                    self.send_sql_batch(sql).await?;
1165                } else {
1166                    let rpc_params = Self::convert_named_params(
1167                        params,
1168                        self.send_unicode(),
1169                        self.server_collation(),
1170                    )?;
1171                    let rpc = RpcRequest::execute_sql(sql, rpc_params);
1172                    self.send_rpc(&rpc).await?;
1173                }
1174
1175                self.read_execute_result().await
1176            },
1177            deadline,
1178            canceller,
1179        )
1180        .await;
1181
1182        #[cfg(feature = "otel")]
1183        match &result {
1184            Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
1185            Err(e) => InstrumentationContext::record_error(&mut span, e),
1186        }
1187        #[cfg(feature = "otel")]
1188        timer.finish(instrumentation.metrics(), result.is_ok());
1189        #[cfg(feature = "otel")]
1190        drop(span);
1191
1192        result
1193    }
1194
1195    /// The connection's OpenTelemetry instrumentation context.
1196    #[cfg(feature = "otel")]
1197    pub(crate) fn instrumentation(&self) -> &InstrumentationContext {
1198        &self.instrumentation
1199    }
1200
1201    /// Snapshot this connection's prepared-statement cache statistics.
1202    ///
1203    /// Reflects activity since the connection was established (or its last
1204    /// reset). Meaningful only when
1205    /// [`Config::statement_cache`](crate::Config::statement_cache) is enabled;
1206    /// otherwise the cache is never consulted and all counts stay zero.
1207    #[must_use]
1208    pub fn statement_cache_stats(&self) -> crate::StatementCacheStats {
1209        self.statement_cache.stats()
1210    }
1211
1212    /// Whether string parameters are sent as NVARCHAR (Unicode).
1213    pub(crate) fn send_unicode(&self) -> bool {
1214        self.config.send_string_parameters_as_unicode
1215    }
1216
1217    /// Server's default collation, captured from ENVCHANGE during login.
1218    pub(crate) fn server_collation(&self) -> Option<&tds_protocol::token::Collation> {
1219        self.server_collation.as_ref()
1220    }
1221
1222    /// Shared implementation behind `query_stream` for both `Ready` and
1223    /// `InTransaction`. Sends the request, then pulls packets until the first
1224    /// result set's `ColMetaData` (resolving columns and any Always Encrypted
1225    /// decryptor up front) before handing back a [`RowStream`].
1226    pub(crate) async fn query_stream_inner<'a>(
1227        &'a mut self,
1228        sql: &str,
1229        params: &[&(dyn crate::ToSql + Sync)],
1230    ) -> Result<crate::row_stream::RowStream<'a, S>> {
1231        use crate::client::response::server_token_to_error;
1232        use crate::row_source::{Pull, RowSource};
1233        use tds_protocol::token::Token;
1234
1235        tracing::debug!(sql = sql, params_count = params.len(), "streaming query");
1236
1237        // Send the request (same wire format as the buffered path).
1238        if params.is_empty() {
1239            self.send_sql_batch(sql).await?;
1240        } else {
1241            let rpc = self.build_parameterized_rpc(sql, params).await?;
1242            self.send_rpc(&rpc).await?;
1243        }
1244        self.in_flight = true;
1245
1246        #[cfg(feature = "always-encrypted")]
1247        let encryption_enabled = self.encryption_context.is_some();
1248        #[cfg(not(feature = "always-encrypted"))]
1249        let encryption_enabled = false;
1250
1251        let mut source = RowSource::new(encryption_enabled);
1252
1253        // Prelude: pull packets until the first result set's ColMetaData (so the
1254        // columns and any Always Encrypted decryptor are resolved up front), or
1255        // until a terminal Done/Error if there is no result set.
1256        loop {
1257            match source.pull()? {
1258                Pull::Token(Token::ColMetaData(meta)) => {
1259                    let columns = Self::build_columns(&meta);
1260                    #[cfg(feature = "always-encrypted")]
1261                    let decryptor = self
1262                        .resolve_decryptor(&meta)
1263                        .await?
1264                        .map(std::sync::Arc::new);
1265                    return Ok(crate::row_stream::RowStream::new(
1266                        self,
1267                        source,
1268                        columns,
1269                        meta,
1270                        #[cfg(feature = "always-encrypted")]
1271                        decryptor,
1272                    ));
1273                }
1274                Pull::Token(Token::Error(err)) => {
1275                    self.in_flight = false;
1276                    return Err(server_token_to_error(&err));
1277                }
1278                Pull::Token(Token::Done(done)) => {
1279                    if done.status.error {
1280                        self.in_flight = false;
1281                        return Err(Error::Query(
1282                            "query failed (server set error flag in DONE token)".to_string(),
1283                        ));
1284                    }
1285                    if !done.status.more {
1286                        // No result set (e.g. an INSERT) — an empty stream.
1287                        self.in_flight = false;
1288                        return Ok(crate::row_stream::RowStream::empty(self));
1289                    }
1290                    // More results may follow; keep looking for ColMetaData.
1291                }
1292                Pull::Token(Token::EnvChange(env)) => {
1293                    Self::process_transaction_env_change(&env, &mut self.transaction_descriptor);
1294                }
1295                Pull::Token(_) => {
1296                    // Info / Order / DoneProc / DoneInProc, etc. — keep pulling.
1297                }
1298                Pull::NeedMore => match self.read_response_packet().await? {
1299                    Some((payload, is_eom)) => source.push_packet(payload, is_eom),
1300                    None => {
1301                        self.in_flight = false;
1302                        return Err(Error::ConnectionClosed);
1303                    }
1304                },
1305                Pull::End => {
1306                    self.in_flight = false;
1307                    return Ok(crate::row_stream::RowStream::empty(self));
1308                }
1309            }
1310        }
1311    }
1312
1313    /// Shared implementation behind `query_stream_blob` for both `Ready` and
1314    /// `InTransaction`.
1315    pub(crate) async fn query_stream_blob_inner<'a>(
1316        &'a mut self,
1317        sql: &str,
1318        params: &[&(dyn crate::ToSql + Sync)],
1319    ) -> Result<crate::blob_stream::BlobStream<'a, S>> {
1320        let (meta, buf, eom, encryption_enabled) = self.open_blob_stream(sql, params).await?;
1321        let first_blob = Self::validate_blob_result_set(&meta)?;
1322        Ok(crate::blob_stream::BlobStream::new(
1323            self,
1324            buf,
1325            eom,
1326            encryption_enabled,
1327            meta,
1328            first_blob,
1329            // Single trailing MAX column; auto-position it so the existing
1330            // `next` → `copy_blob_to` flow works without an explicit `next_blob`.
1331            1,
1332            true,
1333        ))
1334    }
1335
1336    /// Shared implementation behind `query_stream_rows` for both `Ready` and
1337    /// `InTransaction`.
1338    pub(crate) async fn query_stream_rows_inner<'a>(
1339        &'a mut self,
1340        sql: &str,
1341        params: &[&(dyn crate::ToSql + Sync)],
1342    ) -> Result<crate::blob_stream::BlobStream<'a, S>> {
1343        let (meta, buf, eom, encryption_enabled) = self.open_blob_stream(sql, params).await?;
1344        let (first_blob, blob_count) = Self::validate_blob_rows_result_set(&meta)?;
1345        Ok(crate::blob_stream::BlobStream::new(
1346            self,
1347            buf,
1348            eom,
1349            encryption_enabled,
1350            meta,
1351            first_blob,
1352            blob_count,
1353            // Caller drives blobs explicitly via `next_blob`.
1354            false,
1355        ))
1356    }
1357
1358    /// Send the query and pull tokens until the first `ColMetaData`, returning
1359    /// the result-set metadata plus the unconsumed post-metadata wire bytes.
1360    /// Shared by the single-blob and multi-blob streaming paths.
1361    async fn open_blob_stream(
1362        &mut self,
1363        sql: &str,
1364        params: &[&(dyn crate::ToSql + Sync)],
1365    ) -> Result<(tds_protocol::token::ColMetaData, bytes::Bytes, bool, bool)> {
1366        use crate::client::response::server_token_to_error;
1367        use crate::row_source::{Pull, RowSource};
1368        use tds_protocol::token::Token;
1369
1370        if params.is_empty() {
1371            self.send_sql_batch(sql).await?;
1372        } else {
1373            let rpc = self.build_parameterized_rpc(sql, params).await?;
1374            self.send_rpc(&rpc).await?;
1375        }
1376        self.in_flight = true;
1377
1378        #[cfg(feature = "always-encrypted")]
1379        let encryption_enabled = self.encryption_context.is_some();
1380        #[cfg(not(feature = "always-encrypted"))]
1381        let encryption_enabled = false;
1382
1383        let mut source = RowSource::new(encryption_enabled);
1384
1385        loop {
1386            match source.pull()? {
1387                Pull::Token(Token::ColMetaData(meta)) => {
1388                    let (buf, eom) = source.into_parts();
1389                    return Ok((meta, buf, eom, encryption_enabled));
1390                }
1391                Pull::Token(Token::Error(err)) => {
1392                    self.in_flight = false;
1393                    return Err(server_token_to_error(&err));
1394                }
1395                Pull::Token(Token::Done(_)) => {
1396                    self.in_flight = false;
1397                    return Err(Error::Protocol(
1398                        "blob streaming: query produced no result set".to_string(),
1399                    ));
1400                }
1401                Pull::Token(_) => {}
1402                Pull::NeedMore => match self.read_response_packet().await? {
1403                    Some((payload, is_eom)) => source.push_packet(payload, is_eom),
1404                    None => {
1405                        self.in_flight = false;
1406                        return Err(Error::ConnectionClosed);
1407                    }
1408                },
1409                Pull::End => {
1410                    self.in_flight = false;
1411                    return Err(Error::Protocol(
1412                        "blob streaming: query produced no result set".to_string(),
1413                    ));
1414                }
1415            }
1416        }
1417    }
1418
1419    /// Validate that a result set is shaped for [`query_stream_blob`] and return
1420    /// the index of its single trailing MAX column.
1421    fn validate_blob_result_set(meta: &tds_protocol::token::ColMetaData) -> Result<usize> {
1422        if meta.cek_table.is_some() {
1423            return Err(Error::Protocol(
1424                "query_stream_blob does not support Always Encrypted result sets".to_string(),
1425            ));
1426        }
1427        let max_cols: Vec<usize> = meta
1428            .columns
1429            .iter()
1430            .enumerate()
1431            .filter(|(_, c)| crate::blob_stream::is_plp_max(c))
1432            .map(|(i, _)| i)
1433            .collect();
1434        match max_cols.as_slice() {
1435            [] => Err(Error::Protocol(
1436                "query_stream_blob: result set has no MAX column — use query_stream".to_string(),
1437            )),
1438            [idx] if *idx == meta.columns.len() - 1 => Ok(*idx),
1439            [_] => Err(Error::Protocol(
1440                "query_stream_blob: the MAX column must be the last column".to_string(),
1441            )),
1442            _ => Err(Error::Protocol(
1443                "query_stream_blob: result set has more than one MAX column".to_string(),
1444            )),
1445        }
1446    }
1447
1448    /// Validate that a result set is shaped for [`query_stream_rows`] and return
1449    /// `(first_blob_index, blob_count)` — the start and length of the trailing
1450    /// run of MAX columns.
1451    ///
1452    /// Requires at least one MAX column and that every MAX column be trailing
1453    /// (no scalar column may follow a MAX column). The interleaved case (a
1454    /// scalar column after a MAX column) is rejected — supporting it needs a
1455    /// resumable per-column decoder (tracked in #258).
1456    fn validate_blob_rows_result_set(
1457        meta: &tds_protocol::token::ColMetaData,
1458    ) -> Result<(usize, usize)> {
1459        if meta.cek_table.is_some() {
1460            return Err(Error::Protocol(
1461                "query_stream_rows does not support Always Encrypted result sets".to_string(),
1462            ));
1463        }
1464        let first_blob = meta
1465            .columns
1466            .iter()
1467            .position(crate::blob_stream::is_plp_max)
1468            .ok_or_else(|| {
1469                Error::Protocol(
1470                    "query_stream_rows: result set has no MAX column — use query_stream"
1471                        .to_string(),
1472                )
1473            })?;
1474        // Every column from the first MAX column onward must itself be a MAX
1475        // column; a scalar column after a blob cannot be decoded until the blob
1476        // is consumed.
1477        if !meta.columns[first_blob..]
1478            .iter()
1479            .all(crate::blob_stream::is_plp_max)
1480        {
1481            return Err(Error::Protocol(
1482                "query_stream_rows: a non-MAX column follows a MAX column; interleaved MAX \
1483                 columns are not supported (the MAX columns must be trailing)"
1484                    .to_string(),
1485            ));
1486        }
1487        Ok((first_blob, meta.columns.len() - first_blob))
1488    }
1489}
1490
1491impl Client<Ready> {
1492    /// Mark this connection as needing a reset on next use.
1493    ///
1494    /// Called by the connection pool when a connection is returned.
1495    /// The next SQL batch or RPC will include the RESETCONNECTION flag
1496    /// in the TDS packet header, causing SQL Server to reset connection
1497    /// state (temp tables, SET options, transaction isolation level, etc.)
1498    /// before executing the command.
1499    ///
1500    /// This is more efficient than calling `sp_reset_connection` as a
1501    /// separate command because it's handled at the TDS protocol level.
1502    pub fn mark_needs_reset(&mut self) {
1503        self.needs_reset = true;
1504    }
1505
1506    /// Check if this connection needs a reset.
1507    ///
1508    /// Returns true if `mark_needs_reset()` was called and the reset
1509    /// hasn't been performed yet.
1510    #[must_use]
1511    pub fn needs_reset(&self) -> bool {
1512        self.needs_reset
1513    }
1514
1515    /// Execute a query and return a result set with lazy per-row decoding.
1516    ///
1517    /// Per ADR-007 the full response is buffered in memory and each row is
1518    /// *decoded* on demand as you iterate — this is not incremental network
1519    /// streaming, so peak memory tracks the response size. Use
1520    /// `.collect_all()` if you want all rows materialized into a `Vec` up
1521    /// front.
1522    ///
1523    /// # Example
1524    ///
1525    /// ```rust,no_run
1526    /// # use mssql_client::Row;
1527    /// # fn process(_: &Row) {}
1528    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1529    /// // Streaming (synchronous iteration over the result set)
1530    /// let stream = client.query("SELECT * FROM users WHERE id = @p1", &[&1]).await?;
1531    /// for row in stream {
1532    ///     let row = row?;
1533    ///     process(&row);
1534    /// }
1535    ///
1536    /// // Buffered (loads all into memory)
1537    /// let rows: Vec<Row> = client
1538    ///     .query("SELECT * FROM small_table", &[])
1539    ///     .await?
1540    ///     .collect_all()
1541    ///     .await?;
1542    /// # let _ = rows;
1543    /// # Ok(())
1544    /// # }
1545    /// ```
1546    pub async fn query<'a>(
1547        &'a mut self,
1548        sql: &str,
1549        params: &[&(dyn crate::ToSql + Sync)],
1550    ) -> Result<QueryStream<'a>> {
1551        let deadline = self.command_deadline();
1552        self.query_inner(sql, params, deadline).await
1553    }
1554
1555    /// Shared query implementation with an explicit command deadline.
1556    async fn query_inner<'a>(
1557        &'a mut self,
1558        sql: &str,
1559        params: &[&(dyn crate::ToSql + Sync)],
1560        deadline: Option<std::time::Duration>,
1561    ) -> Result<QueryStream<'a>> {
1562        tracing::debug!(sql = sql, params_count = params.len(), "executing query");
1563
1564        #[cfg(feature = "otel")]
1565        let instrumentation = self.instrumentation.clone();
1566        #[cfg(feature = "otel")]
1567        let mut span = instrumentation.query_span(sql);
1568        #[cfg(feature = "otel")]
1569        let timer = crate::instrumentation::OperationTimer::start(
1570            crate::instrumentation::extract_operation(sql),
1571        );
1572
1573        let canceller = self.cancel_handle();
1574        let result = run_with_deadline(
1575            async {
1576                // Sends via the prepared-statement cache when enabled, else the
1577                // SQL batch / sp_executesql default.
1578                self.send_query_request(sql, params).await?;
1579
1580                // Read complete response including columns and rows
1581                self.read_query_response().await
1582            },
1583            deadline,
1584            canceller,
1585        )
1586        .await;
1587
1588        #[cfg(feature = "otel")]
1589        match &result {
1590            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1591            Err(e) => InstrumentationContext::record_error(&mut span, e),
1592        }
1593        #[cfg(feature = "otel")]
1594        timer.finish(instrumentation.metrics(), result.is_ok());
1595
1596        // Drop the span before returning
1597        #[cfg(feature = "otel")]
1598        drop(span);
1599
1600        let resp = result?;
1601        #[cfg(feature = "always-encrypted")]
1602        {
1603            Ok(QueryStream::from_raw(
1604                resp.columns,
1605                resp.pending_rows,
1606                resp.meta,
1607                resp.decryptor,
1608            ))
1609        }
1610        #[cfg(not(feature = "always-encrypted"))]
1611        {
1612            Ok(QueryStream::from_raw(
1613                resp.columns,
1614                resp.pending_rows,
1615                resp.meta,
1616            ))
1617        }
1618    }
1619
1620    /// Execute a query and stream rows incrementally from the network.
1621    ///
1622    /// Unlike [`query`](Self::query) — which buffers the whole response in
1623    /// memory before returning — this reads TDS packets on demand as rows are
1624    /// pulled, so peak memory is roughly one packet plus one row regardless of
1625    /// result-set size. Use it for large result sets; use [`query`](Self::query)
1626    /// for the common small-result case where the buffered, synchronously
1627    /// iterable [`QueryStream`] is more convenient.
1628    ///
1629    /// The returned [`RowStream`](crate::RowStream) borrows the client for its
1630    /// lifetime, so no other request can run on this connection until the stream
1631    /// is consumed or dropped. Also available on `Client<InTransaction>` to
1632    /// stream within a transaction.
1633    ///
1634    /// # Example
1635    ///
1636    /// ```rust,no_run
1637    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1638    /// let mut stream = client.query_stream("SELECT id FROM big_table", &[]).await?;
1639    /// while let Some(row) = stream.try_next().await? {
1640    ///     let id: i32 = row.get_by_name("id")?;
1641    ///     let _ = id;
1642    /// }
1643    /// # Ok(())
1644    /// # }
1645    /// ```
1646    pub async fn query_stream<'a>(
1647        &'a mut self,
1648        sql: &str,
1649        params: &[&(dyn crate::ToSql + Sync)],
1650    ) -> Result<crate::row_stream::RowStream<'a, Ready>> {
1651        self.query_stream_inner(sql, params).await
1652    }
1653
1654    /// Execute a query and stream a row's trailing MAX column from the network.
1655    ///
1656    /// For result sets whose last column is a single MAX type
1657    /// (`VARBINARY(MAX)`, `NVARCHAR(MAX)`, `VARCHAR(MAX)`, `XML`), this reads
1658    /// that column's bytes incrementally from the socket instead of
1659    /// materializing the cell — so a multi-GB BLOB can be streamed to a sink in
1660    /// bounded memory. The leading (scalar) columns are decoded eagerly into the
1661    /// per-row [`Row`](crate::Row).
1662    ///
1663    /// The MAX column must be the **last** column. The returned
1664    /// [`BlobStream`](crate::BlobStream) yields scalar [`Row`](crate::Row)s via
1665    /// [`next`](crate::BlobStream::next); read each row's blob with
1666    /// [`read_chunk`](crate::BlobStream::read_chunk) /
1667    /// [`copy_blob_to`](crate::BlobStream::copy_blob_to) before advancing. Also
1668    /// available on `Client<InTransaction>`.
1669    ///
1670    /// # Errors
1671    ///
1672    /// Returns an error if the result set has no trailing MAX column, has more
1673    /// than one MAX column, the MAX column is not last, or the result set uses
1674    /// Always Encrypted (not yet supported on this path).
1675    pub async fn query_stream_blob<'a>(
1676        &'a mut self,
1677        sql: &str,
1678        params: &[&(dyn crate::ToSql + Sync)],
1679    ) -> Result<crate::blob_stream::BlobStream<'a, Ready>> {
1680        self.query_stream_blob_inner(sql, params).await
1681    }
1682
1683    /// Execute a query and stream a row's **trailing MAX columns** from the
1684    /// network — the multi-column generalization of
1685    /// [`query_stream_blob`](Self::query_stream_blob).
1686    ///
1687    /// For result sets whose trailing columns are one or more MAX types
1688    /// (`VARBINARY(MAX)`, `NVARCHAR(MAX)`, `VARCHAR(MAX)`, `XML`), this decodes
1689    /// the leading scalar columns eagerly into the per-row [`Row`](crate::Row)
1690    /// and streams each trailing MAX column's bytes incrementally from the
1691    /// socket, in bounded memory. The returned
1692    /// [`BlobStream`](crate::BlobStream) yields scalar rows via
1693    /// [`next`](crate::BlobStream::next); within each row, iterate the trailing
1694    /// MAX columns with [`next_blob`](crate::BlobStream::next_blob), reading each
1695    /// with [`copy_blob_to`](crate::BlobStream::copy_blob_to) /
1696    /// [`read_chunk`](crate::BlobStream::read_chunk). Also available on
1697    /// `Client<InTransaction>`.
1698    ///
1699    /// # Errors
1700    ///
1701    /// Returns an error if the result set has no trailing MAX column, has a
1702    /// non-MAX column after a MAX column (interleaved MAX columns are not
1703    /// supported — the MAX columns must be trailing), or uses Always Encrypted
1704    /// (not yet supported on this path).
1705    pub async fn query_stream_rows<'a>(
1706        &'a mut self,
1707        sql: &str,
1708        params: &[&(dyn crate::ToSql + Sync)],
1709    ) -> Result<crate::blob_stream::BlobStream<'a, Ready>> {
1710        self.query_stream_rows_inner(sql, params).await
1711    }
1712
1713    /// Execute a query with a specific timeout.
1714    ///
1715    /// This overrides the default `command_timeout` from the connection configuration
1716    /// for this specific query. If the query does not complete within the specified
1717    /// duration, the driver sends an Attention packet to cancel it server-side,
1718    /// drains the acknowledgement, and returns [`Error::CommandTimeout`] with the
1719    /// connection left usable for the next request.
1720    ///
1721    /// # Arguments
1722    ///
1723    /// * `sql` - The SQL query to execute
1724    /// * `params` - Query parameters
1725    /// * `timeout_duration` - Maximum time to wait for the query to complete
1726    ///
1727    /// # Example
1728    ///
1729    /// ```rust,no_run
1730    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1731    /// use std::time::Duration;
1732    ///
1733    /// // Execute with a 5-second timeout
1734    /// let rows = client
1735    ///     .query_with_timeout(
1736    ///         "SELECT * FROM large_table",
1737    ///         &[],
1738    ///         Duration::from_secs(5),
1739    ///     )
1740    ///     .await?;
1741    /// # let _ = rows;
1742    /// # Ok(())
1743    /// # }
1744    /// ```
1745    pub async fn query_with_timeout<'a>(
1746        &'a mut self,
1747        sql: &str,
1748        params: &[&(dyn crate::ToSql + Sync)],
1749        timeout_duration: std::time::Duration,
1750    ) -> Result<QueryStream<'a>> {
1751        self.query_inner(sql, params, Some(timeout_duration)).await
1752    }
1753
1754    /// Execute a batch that may return multiple result sets.
1755    ///
1756    /// This is useful for stored procedures or SQL batches that contain
1757    /// multiple SELECT statements.
1758    ///
1759    /// # Example
1760    ///
1761    /// ```rust,no_run
1762    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1763    /// // Execute a batch with multiple SELECT statements
1764    /// let mut results = client.query_multiple(
1765    ///     "SELECT 1 AS a; SELECT 2 AS b, 3 AS c;",
1766    ///     &[]
1767    /// ).await?;
1768    ///
1769    /// // Process first result set
1770    /// while let Some(row) = results.next_row().await? {
1771    ///     println!("Result 1: {:?}", row);
1772    /// }
1773    ///
1774    /// // Move to second result set
1775    /// if results.next_result().await? {
1776    ///     while let Some(row) = results.next_row().await? {
1777    ///         println!("Result 2: {:?}", row);
1778    ///     }
1779    /// }
1780    /// # Ok(())
1781    /// # }
1782    /// ```
1783    pub async fn query_multiple<'a>(
1784        &'a mut self,
1785        sql: &str,
1786        params: &[&(dyn crate::ToSql + Sync)],
1787    ) -> Result<MultiResultStream<'a>> {
1788        tracing::debug!(
1789            sql = sql,
1790            params_count = params.len(),
1791            "executing multi-result query"
1792        );
1793
1794        #[cfg(feature = "otel")]
1795        let instrumentation = self.instrumentation.clone();
1796        #[cfg(feature = "otel")]
1797        let mut span = instrumentation.query_span(sql);
1798        #[cfg(feature = "otel")]
1799        let timer = crate::instrumentation::OperationTimer::start(
1800            crate::instrumentation::extract_operation(sql),
1801        );
1802
1803        let deadline = self.command_deadline();
1804        let canceller = self.connection_cancel_handle();
1805        let result = run_with_deadline(
1806            async {
1807                if params.is_empty() {
1808                    // Simple batch without parameters - use SQL batch
1809                    self.send_sql_batch(sql).await?;
1810                } else {
1811                    // Parameterized query - sp_executesql (encrypts Always Encrypted params).
1812                    let rpc = self.build_parameterized_rpc(sql, params).await?;
1813                    self.send_rpc(&rpc).await?;
1814                }
1815
1816                // Read all result sets
1817                self.read_multi_result_response().await
1818            },
1819            deadline,
1820            canceller,
1821        )
1822        .await;
1823
1824        #[cfg(feature = "otel")]
1825        match &result {
1826            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1827            Err(e) => InstrumentationContext::record_error(&mut span, e),
1828        }
1829        #[cfg(feature = "otel")]
1830        timer.finish(instrumentation.metrics(), result.is_ok());
1831        #[cfg(feature = "otel")]
1832        drop(span);
1833
1834        let result_sets = result?;
1835        Ok(MultiResultStream::new(result_sets))
1836    }
1837
1838    /// Execute a query that doesn't return rows.
1839    ///
1840    /// Returns the number of affected rows.
1841    pub async fn execute(
1842        &mut self,
1843        sql: &str,
1844        params: &[&(dyn crate::ToSql + Sync)],
1845    ) -> Result<u64> {
1846        let deadline = self.command_deadline();
1847        self.execute_inner(sql, params, deadline).await
1848    }
1849
1850    /// Shared execute implementation with an explicit command deadline.
1851    async fn execute_inner(
1852        &mut self,
1853        sql: &str,
1854        params: &[&(dyn crate::ToSql + Sync)],
1855        deadline: Option<std::time::Duration>,
1856    ) -> Result<u64> {
1857        tracing::debug!(
1858            sql = sql,
1859            params_count = params.len(),
1860            "executing statement"
1861        );
1862
1863        #[cfg(feature = "otel")]
1864        let instrumentation = self.instrumentation.clone();
1865        #[cfg(feature = "otel")]
1866        let mut span = instrumentation.query_span(sql);
1867        #[cfg(feature = "otel")]
1868        let timer = crate::instrumentation::OperationTimer::start(
1869            crate::instrumentation::extract_operation(sql),
1870        );
1871
1872        let canceller = self.cancel_handle();
1873        let result = run_with_deadline(
1874            async {
1875                if params.is_empty() {
1876                    // Simple statement without parameters - use SQL batch
1877                    self.send_sql_batch(sql).await?;
1878                } else {
1879                    // Parameterized statement - sp_executesql (encrypts Always Encrypted params).
1880                    let rpc = self.build_parameterized_rpc(sql, params).await?;
1881                    self.send_rpc(&rpc).await?;
1882                }
1883
1884                // Read response and get row count
1885                self.read_execute_result().await
1886            },
1887            deadline,
1888            canceller,
1889        )
1890        .await;
1891
1892        #[cfg(feature = "otel")]
1893        match &result {
1894            Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
1895            Err(e) => InstrumentationContext::record_error(&mut span, e),
1896        }
1897        #[cfg(feature = "otel")]
1898        timer.finish(instrumentation.metrics(), result.is_ok());
1899
1900        // Drop the span before returning
1901        #[cfg(feature = "otel")]
1902        drop(span);
1903
1904        result
1905    }
1906
1907    /// Execute a statement with a specific timeout.
1908    ///
1909    /// This overrides the default `command_timeout` from the connection configuration
1910    /// for this specific statement. If the statement does not complete within the
1911    /// specified duration, the driver sends an Attention packet to cancel it
1912    /// server-side, drains the acknowledgement, and returns
1913    /// [`Error::CommandTimeout`] with the connection left usable.
1914    ///
1915    /// # Arguments
1916    ///
1917    /// * `sql` - The SQL statement to execute
1918    /// * `params` - Statement parameters
1919    /// * `timeout_duration` - Maximum time to wait for the statement to complete
1920    ///
1921    /// # Example
1922    ///
1923    /// ```rust,no_run
1924    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1925    /// use std::time::Duration;
1926    ///
1927    /// // Execute with a 10-second timeout
1928    /// let rows_affected = client
1929    ///     .execute_with_timeout(
1930    ///         "UPDATE large_table SET status = @p1",
1931    ///         &[&"processed"],
1932    ///         Duration::from_secs(10),
1933    ///     )
1934    ///     .await?;
1935    /// # let _ = rows_affected;
1936    /// # Ok(())
1937    /// # }
1938    /// ```
1939    pub async fn execute_with_timeout(
1940        &mut self,
1941        sql: &str,
1942        params: &[&(dyn crate::ToSql + Sync)],
1943        timeout_duration: std::time::Duration,
1944    ) -> Result<u64> {
1945        self.execute_inner(sql, params, Some(timeout_duration))
1946            .await
1947    }
1948
1949    /// Begin a transaction.
1950    ///
1951    /// This transitions the client from `Ready` to `InTransaction` state.
1952    /// Per MS-TDS spec, the server returns a transaction descriptor in the
1953    /// BeginTransaction EnvChange token that must be included in subsequent
1954    /// ALL_HEADERS sections.
1955    pub async fn begin_transaction(mut self) -> Result<Client<InTransaction>> {
1956        tracing::debug!("beginning transaction");
1957
1958        #[cfg(feature = "otel")]
1959        let instrumentation = self.instrumentation.clone();
1960        #[cfg(feature = "otel")]
1961        let mut span = instrumentation.transaction_span("BEGIN");
1962
1963        // Execute BEGIN TRANSACTION and extract the transaction descriptor
1964        let result = async {
1965            self.send_sql_batch("BEGIN TRANSACTION").await?;
1966            self.read_transaction_begin_result().await
1967        }
1968        .await;
1969
1970        #[cfg(feature = "otel")]
1971        match &result {
1972            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1973            Err(e) => InstrumentationContext::record_error(&mut span, e),
1974        }
1975
1976        // Drop the span before moving instrumentation
1977        #[cfg(feature = "otel")]
1978        drop(span);
1979
1980        let transaction_descriptor = result?;
1981
1982        Ok(Client {
1983            config: self.config,
1984            _state: PhantomData,
1985            connection: self.connection,
1986            server_version: self.server_version,
1987            current_database: self.current_database,
1988            server_collation: self.server_collation,
1989            statement_cache: self.statement_cache,
1990            transaction_descriptor, // Store the descriptor from server
1991            needs_reset: self.needs_reset,
1992            in_flight: self.in_flight,
1993            #[cfg(feature = "otel")]
1994            instrumentation: self.instrumentation,
1995            #[cfg(feature = "always-encrypted")]
1996            encryption_context: self.encryption_context,
1997        })
1998    }
1999
2000    /// Begin a transaction with a specific isolation level.
2001    ///
2002    /// This transitions the client from `Ready` to `InTransaction` state
2003    /// with the specified isolation level.
2004    ///
2005    /// # Example
2006    ///
2007    /// ```rust,no_run
2008    /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
2009    /// use mssql_client::IsolationLevel;
2010    ///
2011    /// let tx = client.begin_transaction_with_isolation(IsolationLevel::Serializable).await?;
2012    /// // All operations in this transaction use SERIALIZABLE isolation
2013    /// tx.commit().await?;
2014    /// # Ok(())
2015    /// # }
2016    /// ```
2017    pub async fn begin_transaction_with_isolation(
2018        mut self,
2019        isolation_level: crate::transaction::IsolationLevel,
2020    ) -> Result<Client<InTransaction>> {
2021        tracing::debug!(
2022            isolation_level = %isolation_level.name(),
2023            "beginning transaction with isolation level"
2024        );
2025
2026        #[cfg(feature = "otel")]
2027        let instrumentation = self.instrumentation.clone();
2028        #[cfg(feature = "otel")]
2029        let mut span = instrumentation.transaction_span("BEGIN");
2030
2031        // First set the isolation level
2032        let result = async {
2033            self.send_sql_batch(isolation_level.as_sql()).await?;
2034            self.read_execute_result().await?;
2035
2036            // Then begin the transaction
2037            self.send_sql_batch("BEGIN TRANSACTION").await?;
2038            self.read_transaction_begin_result().await
2039        }
2040        .await;
2041
2042        #[cfg(feature = "otel")]
2043        match &result {
2044            Ok(_) => InstrumentationContext::record_success(&mut span, None),
2045            Err(e) => InstrumentationContext::record_error(&mut span, e),
2046        }
2047
2048        #[cfg(feature = "otel")]
2049        drop(span);
2050
2051        let transaction_descriptor = result?;
2052
2053        Ok(Client {
2054            config: self.config,
2055            _state: PhantomData,
2056            connection: self.connection,
2057            server_version: self.server_version,
2058            current_database: self.current_database,
2059            server_collation: self.server_collation,
2060            statement_cache: self.statement_cache,
2061            transaction_descriptor,
2062            needs_reset: self.needs_reset,
2063            in_flight: self.in_flight,
2064            #[cfg(feature = "otel")]
2065            instrumentation: self.instrumentation,
2066            #[cfg(feature = "always-encrypted")]
2067            encryption_context: self.encryption_context,
2068        })
2069    }
2070
2071    /// Execute a simple query without parameters.
2072    ///
2073    /// This is useful for DDL statements and simple queries where you
2074    /// don't need to retrieve the affected row count.
2075    pub async fn simple_query(&mut self, sql: &str) -> Result<()> {
2076        tracing::debug!(sql = sql, "executing simple query");
2077
2078        // Send SQL batch
2079        self.send_sql_batch(sql).await?;
2080
2081        // Read and discard response
2082        let _ = self.read_execute_result().await?;
2083
2084        Ok(())
2085    }
2086
2087    /// Close the connection gracefully.
2088    pub async fn close(self) -> Result<()> {
2089        tracing::debug!("closing connection");
2090        Ok(())
2091    }
2092
2093    /// Get the current database name.
2094    #[must_use]
2095    pub fn database(&self) -> Option<&str> {
2096        self.config.database.as_deref()
2097    }
2098
2099    /// Get the server host.
2100    #[must_use]
2101    pub fn host(&self) -> &str {
2102        &self.config.host
2103    }
2104
2105    /// Get the server port.
2106    #[must_use]
2107    pub fn port(&self) -> u16 {
2108        self.config.port
2109    }
2110
2111    /// Check if the connection is currently in a transaction.
2112    ///
2113    /// This returns `true` if a transaction was started via raw SQL
2114    /// (`BEGIN TRANSACTION`) and has not yet been committed or rolled back.
2115    ///
2116    /// Note: This only tracks transactions started via raw SQL. Transactions
2117    /// started via the type-state API (`begin_transaction()`) result in a
2118    /// `Client<InTransaction>` which is a different type.
2119    ///
2120    /// # Example
2121    ///
2122    /// ```rust,no_run
2123    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
2124    /// client.execute("BEGIN TRANSACTION", &[]).await?;
2125    /// assert!(client.is_in_transaction());
2126    ///
2127    /// client.execute("COMMIT", &[]).await?;
2128    /// assert!(!client.is_in_transaction());
2129    /// # Ok(())
2130    /// # }
2131    /// ```
2132    #[must_use]
2133    pub fn is_in_transaction(&self) -> bool {
2134        self.transaction_descriptor != 0
2135    }
2136
2137    /// Check if a request is in-flight (sent but response not fully read).
2138    ///
2139    /// Used by the connection pool to detect dirty connections that were
2140    /// interrupted mid-query (e.g., by `tokio::select!` or a timeout).
2141    /// A connection with an in-flight request has unread data in the TCP
2142    /// buffer and must be discarded rather than returned to the pool.
2143    #[must_use]
2144    pub fn is_in_flight(&self) -> bool {
2145        self.in_flight
2146    }
2147
2148    /// Report whether an Always Encrypted key-store provider with the given
2149    /// name is currently reachable through this client's encryption context.
2150    ///
2151    /// Returns `false` when the `always-encrypted` feature isn't enabled, when
2152    /// the connection was opened without `column_encryption` configured, or
2153    /// when no matching provider was registered.
2154    #[cfg(feature = "always-encrypted")]
2155    #[must_use]
2156    pub fn has_encryption_provider(&self, name: &str) -> bool {
2157        self.encryption_context
2158            .as_ref()
2159            .is_some_and(|ctx| ctx.has_provider(name))
2160    }
2161
2162    /// Get a handle for cancelling the current query.
2163    ///
2164    /// The cancel handle can be cloned and sent to other tasks, enabling
2165    /// cancellation of long-running queries from a separate async context.
2166    ///
2167    /// # Example
2168    ///
2169    /// ```rust,no_run
2170    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
2171    /// use std::time::Duration;
2172    ///
2173    /// let cancel_handle = client.cancel_handle();
2174    ///
2175    /// // Spawn a task to cancel after 10 seconds
2176    /// let handle = tokio::spawn(async move {
2177    ///     tokio::time::sleep(Duration::from_secs(10)).await;
2178    ///     let _ = cancel_handle.cancel().await;
2179    /// });
2180    ///
2181    /// // This query will be cancelled if it runs longer than 10 seconds
2182    /// let result = client.query("SELECT * FROM very_large_table", &[]).await;
2183    /// # let _ = (handle, result);
2184    /// # Ok(())
2185    /// # }
2186    /// ```
2187    #[must_use]
2188    pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
2189        self.connection_cancel_handle()
2190    }
2191}
2192
2193/// # Drop Behavior
2194///
2195/// **`Client<InTransaction>` has no automatic rollback on drop.** If the client is
2196/// dropped without calling [`commit()`](Client::commit) or [`rollback()`](Client::rollback),
2197/// the transaction remains open on the server until the TCP connection closes
2198/// (at which point SQL Server automatically rolls back).
2199///
2200/// This is because `Drop` is synchronous and cannot perform the async I/O needed
2201/// to send a `ROLLBACK TRANSACTION` command.
2202///
2203/// ## Consequences of dropping without commit/rollback
2204///
2205/// - **Direct connections:** The transaction leaks until the OS TCP timeout
2206///   (potentially 30+ minutes), holding locks on any modified rows.
2207/// - **Pooled connections:** The pool detects the active transaction descriptor
2208///   and discards the connection rather than returning it to the idle pool
2209///   (see `PooledConnection::drop` in `mssql-driver-pool`).
2210///
2211/// ## Best practice
2212///
2213/// Always ensure `commit()` or `rollback()` is called. Use helper patterns
2214/// for error paths:
2215///
2216/// ```rust,no_run
2217/// # async fn do_work(_: &mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> { Ok(()) }
2218/// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
2219/// let tx = client.begin_transaction().await?;
2220/// match do_work(&tx).await {
2221///     Ok(_) => { tx.commit().await?; }
2222///     Err(e) => { tx.rollback().await?; return Err(e); }
2223/// }
2224/// # Ok(())
2225/// # }
2226/// ```
2227impl Client<InTransaction> {
2228    /// Execute a query within the transaction and return a streaming result set.
2229    ///
2230    /// See [`Client<Ready>::query`] for usage examples.
2231    pub async fn query<'a>(
2232        &'a mut self,
2233        sql: &str,
2234        params: &[&(dyn crate::ToSql + Sync)],
2235    ) -> Result<QueryStream<'a>> {
2236        let deadline = self.command_deadline();
2237        self.query_inner(sql, params, deadline).await
2238    }
2239
2240    /// Shared query implementation with an explicit command deadline.
2241    async fn query_inner<'a>(
2242        &'a mut self,
2243        sql: &str,
2244        params: &[&(dyn crate::ToSql + Sync)],
2245        deadline: Option<std::time::Duration>,
2246    ) -> Result<QueryStream<'a>> {
2247        tracing::debug!(
2248            sql = sql,
2249            params_count = params.len(),
2250            "executing query in transaction"
2251        );
2252
2253        #[cfg(feature = "otel")]
2254        let instrumentation = self.instrumentation.clone();
2255        #[cfg(feature = "otel")]
2256        let mut span = instrumentation.query_span(sql);
2257        #[cfg(feature = "otel")]
2258        let timer = crate::instrumentation::OperationTimer::start(
2259            crate::instrumentation::extract_operation(sql),
2260        );
2261
2262        let canceller = self.cancel_handle();
2263        let result = run_with_deadline(
2264            async {
2265                // Sends via the prepared-statement cache when enabled, else the
2266                // SQL batch / sp_executesql default.
2267                self.send_query_request(sql, params).await?;
2268
2269                // Read complete response including columns and rows
2270                self.read_query_response().await
2271            },
2272            deadline,
2273            canceller,
2274        )
2275        .await;
2276
2277        #[cfg(feature = "otel")]
2278        match &result {
2279            Ok(_) => InstrumentationContext::record_success(&mut span, None),
2280            Err(e) => InstrumentationContext::record_error(&mut span, e),
2281        }
2282        #[cfg(feature = "otel")]
2283        timer.finish(instrumentation.metrics(), result.is_ok());
2284
2285        // Drop the span before returning
2286        #[cfg(feature = "otel")]
2287        drop(span);
2288
2289        let resp = result?;
2290        #[cfg(feature = "always-encrypted")]
2291        {
2292            Ok(QueryStream::from_raw(
2293                resp.columns,
2294                resp.pending_rows,
2295                resp.meta,
2296                resp.decryptor,
2297            ))
2298        }
2299        #[cfg(not(feature = "always-encrypted"))]
2300        {
2301            Ok(QueryStream::from_raw(
2302                resp.columns,
2303                resp.pending_rows,
2304                resp.meta,
2305            ))
2306        }
2307    }
2308
2309    /// Stream rows incrementally from the network within the transaction.
2310    ///
2311    /// Identical to [`Client<Ready>::query_stream`] except the query runs inside
2312    /// the open transaction. The returned [`RowStream`](crate::RowStream)
2313    /// borrows the transaction client for its lifetime, so the stream must be
2314    /// consumed or dropped before the transaction can be committed or rolled
2315    /// back.
2316    pub async fn query_stream<'a>(
2317        &'a mut self,
2318        sql: &str,
2319        params: &[&(dyn crate::ToSql + Sync)],
2320    ) -> Result<crate::row_stream::RowStream<'a, InTransaction>> {
2321        self.query_stream_inner(sql, params).await
2322    }
2323
2324    /// Stream a row's trailing MAX column from the network within the
2325    /// transaction.
2326    ///
2327    /// See [`Client<Ready>::query_stream_blob`] for semantics and constraints;
2328    /// the only difference is that the query runs inside the open transaction.
2329    pub async fn query_stream_blob<'a>(
2330        &'a mut self,
2331        sql: &str,
2332        params: &[&(dyn crate::ToSql + Sync)],
2333    ) -> Result<crate::blob_stream::BlobStream<'a, InTransaction>> {
2334        self.query_stream_blob_inner(sql, params).await
2335    }
2336
2337    /// Stream a row's trailing MAX columns from the network within the
2338    /// transaction.
2339    ///
2340    /// See [`Client<Ready>::query_stream_rows`] for semantics and constraints;
2341    /// the only difference is that the query runs inside the open transaction.
2342    pub async fn query_stream_rows<'a>(
2343        &'a mut self,
2344        sql: &str,
2345        params: &[&(dyn crate::ToSql + Sync)],
2346    ) -> Result<crate::blob_stream::BlobStream<'a, InTransaction>> {
2347        self.query_stream_rows_inner(sql, params).await
2348    }
2349
2350    /// Execute a statement within the transaction.
2351    ///
2352    /// Returns the number of affected rows.
2353    pub async fn execute(
2354        &mut self,
2355        sql: &str,
2356        params: &[&(dyn crate::ToSql + Sync)],
2357    ) -> Result<u64> {
2358        let deadline = self.command_deadline();
2359        self.execute_inner(sql, params, deadline).await
2360    }
2361
2362    /// Shared execute implementation with an explicit command deadline.
2363    async fn execute_inner(
2364        &mut self,
2365        sql: &str,
2366        params: &[&(dyn crate::ToSql + Sync)],
2367        deadline: Option<std::time::Duration>,
2368    ) -> Result<u64> {
2369        tracing::debug!(
2370            sql = sql,
2371            params_count = params.len(),
2372            "executing statement in transaction"
2373        );
2374
2375        #[cfg(feature = "otel")]
2376        let instrumentation = self.instrumentation.clone();
2377        #[cfg(feature = "otel")]
2378        let mut span = instrumentation.query_span(sql);
2379        #[cfg(feature = "otel")]
2380        let timer = crate::instrumentation::OperationTimer::start(
2381            crate::instrumentation::extract_operation(sql),
2382        );
2383
2384        let canceller = self.cancel_handle();
2385        let result = run_with_deadline(
2386            async {
2387                if params.is_empty() {
2388                    // Simple statement without parameters - use SQL batch
2389                    self.send_sql_batch(sql).await?;
2390                } else {
2391                    // Parameterized statement - sp_executesql (encrypts Always Encrypted params).
2392                    let rpc = self.build_parameterized_rpc(sql, params).await?;
2393                    self.send_rpc(&rpc).await?;
2394                }
2395
2396                // Read response and get row count
2397                self.read_execute_result().await
2398            },
2399            deadline,
2400            canceller,
2401        )
2402        .await;
2403
2404        #[cfg(feature = "otel")]
2405        match &result {
2406            Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
2407            Err(e) => InstrumentationContext::record_error(&mut span, e),
2408        }
2409        #[cfg(feature = "otel")]
2410        timer.finish(instrumentation.metrics(), result.is_ok());
2411
2412        // Drop the span before returning
2413        #[cfg(feature = "otel")]
2414        drop(span);
2415
2416        result
2417    }
2418
2419    /// Execute a query within the transaction with a specific timeout.
2420    ///
2421    /// See [`Client<Ready>::query_with_timeout`] for details.
2422    pub async fn query_with_timeout<'a>(
2423        &'a mut self,
2424        sql: &str,
2425        params: &[&(dyn crate::ToSql + Sync)],
2426        timeout_duration: std::time::Duration,
2427    ) -> Result<QueryStream<'a>> {
2428        self.query_inner(sql, params, Some(timeout_duration)).await
2429    }
2430
2431    /// Execute a statement within the transaction with a specific timeout.
2432    ///
2433    /// See [`Client<Ready>::execute_with_timeout`] for details.
2434    pub async fn execute_with_timeout(
2435        &mut self,
2436        sql: &str,
2437        params: &[&(dyn crate::ToSql + Sync)],
2438        timeout_duration: std::time::Duration,
2439    ) -> Result<u64> {
2440        self.execute_inner(sql, params, Some(timeout_duration))
2441            .await
2442    }
2443
2444    /// Open a FILESTREAM BLOB for async reading and/or writing.
2445    ///
2446    /// This method queries the server for the transaction context, then opens
2447    /// the FILESTREAM handle using the native Win32 `OpenSqlFilestream` API.
2448    ///
2449    /// # Arguments
2450    ///
2451    /// * `path` — The UNC path obtained from the T-SQL `column.PathName()` function.
2452    ///   Query this yourself before calling `open_filestream`:
2453    ///   ```sql
2454    ///   SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1
2455    ///   ```
2456    /// * `access` — Read, write, or read/write access mode.
2457    ///
2458    /// # Requirements
2459    ///
2460    /// - SQL Server must have FILESTREAM enabled (`sp_configure 'filestream access level', 2`)
2461    /// - The Microsoft OLE DB Driver for SQL Server must be installed on the client
2462    /// - The `FileStream` must be dropped before calling [`commit`] or [`rollback`]
2463    ///
2464    /// # Example
2465    ///
2466    /// ```text
2467    /// use mssql_client::FileStreamAccess;
2468    /// use tokio::io::AsyncReadExt;
2469    ///
2470    /// let mut tx = client.begin_transaction().await?;
2471    ///
2472    /// // Get the FILESTREAM path
2473    /// let rows = tx.query(
2474    ///     "SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1",
2475    ///     &[&doc_id],
2476    /// ).await?;
2477    /// let path: String = rows.into_iter().next().unwrap()?.get(0)?;
2478    ///
2479    /// // Open and read the BLOB
2480    /// let mut stream = tx.open_filestream(&path, FileStreamAccess::Read).await?;
2481    /// let mut data = Vec::new();
2482    /// stream.read_to_end(&mut data).await?;
2483    /// drop(stream);
2484    ///
2485    /// tx.commit().await?;
2486    /// ```
2487    #[cfg(all(windows, feature = "filestream"))]
2488    pub async fn open_filestream(
2489        &mut self,
2490        path: &str,
2491        access: crate::filestream::FileStreamAccess,
2492    ) -> Result<crate::filestream::FileStream> {
2493        tracing::debug!(path = path, ?access, "opening FILESTREAM BLOB");
2494
2495        // Get the transaction context from SQL Server.
2496        // This binds the file access to the current SQL transaction.
2497        let txn_context: Vec<u8> = {
2498            let rows = self
2499                .query("SELECT GET_FILESTREAM_TRANSACTION_CONTEXT()", &[])
2500                .await?;
2501            let mut ctx = None;
2502            for result in rows {
2503                let row = result?;
2504                ctx = Some(row.get::<Vec<u8>>(0)?);
2505            }
2506            ctx.ok_or_else(|| {
2507                Error::FileStream("GET_FILESTREAM_TRANSACTION_CONTEXT() returned no rows".into())
2508            })?
2509        };
2510
2511        crate::filestream::FileStream::open(path, access, &txn_context)
2512    }
2513
2514    /// Commit the transaction.
2515    ///
2516    /// This transitions the client back to `Ready` state.
2517    pub async fn commit(mut self) -> Result<Client<Ready>> {
2518        tracing::debug!("committing transaction");
2519
2520        #[cfg(feature = "otel")]
2521        let instrumentation = self.instrumentation.clone();
2522        #[cfg(feature = "otel")]
2523        let mut span = instrumentation.transaction_span("COMMIT");
2524
2525        // Execute COMMIT TRANSACTION
2526        let result = async {
2527            self.send_sql_batch("COMMIT TRANSACTION").await?;
2528            self.read_execute_result().await
2529        }
2530        .await;
2531
2532        #[cfg(feature = "otel")]
2533        match &result {
2534            Ok(_) => InstrumentationContext::record_success(&mut span, None),
2535            Err(e) => InstrumentationContext::record_error(&mut span, e),
2536        }
2537
2538        // Drop the span before moving instrumentation
2539        #[cfg(feature = "otel")]
2540        drop(span);
2541
2542        result?;
2543
2544        Ok(Client {
2545            config: self.config,
2546            _state: PhantomData,
2547            connection: self.connection,
2548            server_version: self.server_version,
2549            current_database: self.current_database,
2550            server_collation: self.server_collation,
2551            statement_cache: self.statement_cache,
2552            transaction_descriptor: 0, // Reset to auto-commit mode
2553            needs_reset: self.needs_reset,
2554            in_flight: self.in_flight,
2555            #[cfg(feature = "otel")]
2556            instrumentation: self.instrumentation,
2557            #[cfg(feature = "always-encrypted")]
2558            encryption_context: self.encryption_context,
2559        })
2560    }
2561
2562    /// Rollback the transaction.
2563    ///
2564    /// This transitions the client back to `Ready` state.
2565    pub async fn rollback(mut self) -> Result<Client<Ready>> {
2566        tracing::debug!("rolling back transaction");
2567
2568        #[cfg(feature = "otel")]
2569        let instrumentation = self.instrumentation.clone();
2570        #[cfg(feature = "otel")]
2571        let mut span = instrumentation.transaction_span("ROLLBACK");
2572
2573        // Execute ROLLBACK TRANSACTION
2574        let result = async {
2575            self.send_sql_batch("ROLLBACK TRANSACTION").await?;
2576            self.read_execute_result().await
2577        }
2578        .await;
2579
2580        #[cfg(feature = "otel")]
2581        match &result {
2582            Ok(_) => InstrumentationContext::record_success(&mut span, None),
2583            Err(e) => InstrumentationContext::record_error(&mut span, e),
2584        }
2585
2586        // Drop the span before moving instrumentation
2587        #[cfg(feature = "otel")]
2588        drop(span);
2589
2590        result?;
2591
2592        Ok(Client {
2593            config: self.config,
2594            _state: PhantomData,
2595            connection: self.connection,
2596            server_version: self.server_version,
2597            current_database: self.current_database,
2598            server_collation: self.server_collation,
2599            statement_cache: self.statement_cache,
2600            transaction_descriptor: 0, // Reset to auto-commit mode
2601            needs_reset: self.needs_reset,
2602            in_flight: self.in_flight,
2603            #[cfg(feature = "otel")]
2604            instrumentation: self.instrumentation,
2605            #[cfg(feature = "always-encrypted")]
2606            encryption_context: self.encryption_context,
2607        })
2608    }
2609
2610    /// Create a savepoint and return a handle for later rollback.
2611    ///
2612    /// The returned `SavePoint` handle contains the validated savepoint name.
2613    /// Use it with `rollback_to()` to partially undo transaction work.
2614    ///
2615    /// # Example
2616    ///
2617    /// ```rust,no_run
2618    /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
2619    /// let mut tx = client.begin_transaction().await?;
2620    /// tx.execute("INSERT INTO orders ...", &[]).await?;
2621    /// let sp = tx.save_point("before_items").await?;
2622    /// tx.execute("INSERT INTO items ...", &[]).await?;
2623    /// // Oops, rollback just the items
2624    /// tx.rollback_to(&sp).await?;
2625    /// tx.commit().await?;
2626    /// # Ok(())
2627    /// # }
2628    /// ```
2629    pub async fn save_point(&mut self, name: &str) -> Result<SavePoint> {
2630        crate::validation::validate_identifier(name)?;
2631        tracing::debug!(name = name, "creating savepoint");
2632
2633        // Execute SAVE TRANSACTION <name>
2634        // Note: name is validated by validate_identifier() to prevent SQL injection
2635        let sql = format!("SAVE TRANSACTION {name}");
2636        self.send_sql_batch(&sql).await?;
2637        self.read_execute_result().await?;
2638
2639        Ok(SavePoint::new(name.to_string()))
2640    }
2641
2642    /// Rollback to a savepoint.
2643    ///
2644    /// This rolls back all changes made after the savepoint was created,
2645    /// but keeps the transaction active. The savepoint remains valid and
2646    /// can be rolled back to again.
2647    ///
2648    /// # Example
2649    ///
2650    /// ```rust,no_run
2651    /// # async fn ex(mut tx: mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> {
2652    /// let sp = tx.save_point("checkpoint").await?;
2653    /// // ... do some work ...
2654    /// tx.rollback_to(&sp).await?;  // Undo changes since checkpoint
2655    /// // Transaction is still active, savepoint is still valid
2656    /// # Ok(())
2657    /// # }
2658    /// ```
2659    pub async fn rollback_to(&mut self, savepoint: &SavePoint) -> Result<()> {
2660        tracing::debug!(name = savepoint.name(), "rolling back to savepoint");
2661
2662        // Execute ROLLBACK TRANSACTION <name>
2663        // Note: savepoint name was validated during creation
2664        let sql = format!("ROLLBACK TRANSACTION {}", savepoint.name());
2665        self.send_sql_batch(&sql).await?;
2666        self.read_execute_result().await?;
2667
2668        Ok(())
2669    }
2670
2671    /// Release a savepoint (optional cleanup).
2672    ///
2673    /// Note: SQL Server doesn't have explicit savepoint release, but this
2674    /// method is provided for API completeness. The savepoint is automatically
2675    /// released when the transaction commits or rolls back.
2676    pub async fn release_savepoint(&mut self, savepoint: SavePoint) -> Result<()> {
2677        tracing::debug!(name = savepoint.name(), "releasing savepoint");
2678
2679        // SQL Server doesn't require explicit savepoint release
2680        // The savepoint is implicitly released on commit/rollback
2681        // This method exists for API completeness
2682        drop(savepoint);
2683        Ok(())
2684    }
2685
2686    /// Get a handle for cancelling the current query within the transaction.
2687    ///
2688    /// See [`Client<Ready>::cancel_handle`] for usage examples.
2689    #[must_use]
2690    pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
2691        self.connection_cancel_handle()
2692    }
2693}
2694
2695impl<S: ConnectionState> std::fmt::Debug for Client<S> {
2696    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2697        f.debug_struct("Client")
2698            .field("host", &self.config.host)
2699            .field("port", &self.config.port)
2700            .field("database", &self.config.database)
2701            .finish()
2702    }
2703}
2704
2705#[cfg(test)]
2706mod blob_result_set_validation_tests {
2707    use tds_protocol::token::{ColMetaData, ColumnData, TypeInfo};
2708    use tds_protocol::types::TypeId;
2709
2710    use super::{Client, Ready};
2711    use crate::error::Error;
2712
2713    /// A scalar (non-MAX) column.
2714    fn scalar(name: &str) -> ColumnData {
2715        col(name, TypeId::Int4, None)
2716    }
2717
2718    /// A MAX (PLP) column: `max_length == 0xFFFF` marks the MAX variant.
2719    fn blob(name: &str) -> ColumnData {
2720        col(name, TypeId::BigVarBinary, Some(0xFFFF))
2721    }
2722
2723    fn col(name: &str, type_id: TypeId, max_length: Option<u32>) -> ColumnData {
2724        ColumnData {
2725            name: name.to_string(),
2726            type_id,
2727            col_type: 0,
2728            flags: 0,
2729            user_type: 0,
2730            type_info: TypeInfo {
2731                max_length,
2732                ..Default::default()
2733            },
2734            crypto_metadata: None,
2735        }
2736    }
2737
2738    fn meta(columns: Vec<ColumnData>) -> ColMetaData {
2739        ColMetaData {
2740            columns,
2741            cek_table: None,
2742        }
2743    }
2744
2745    fn validate(columns: Vec<ColumnData>) -> Result<(usize, usize), Error> {
2746        Client::<Ready>::validate_blob_rows_result_set(&meta(columns))
2747    }
2748
2749    #[test]
2750    fn single_trailing_blob() {
2751        assert_eq!(validate(vec![scalar("id"), blob("doc")]).unwrap(), (1, 1));
2752    }
2753
2754    #[test]
2755    fn multiple_trailing_blobs() {
2756        assert_eq!(
2757            validate(vec![scalar("id"), blob("doc1"), blob("doc2")]).unwrap(),
2758            (1, 2)
2759        );
2760    }
2761
2762    #[test]
2763    fn all_columns_blobs() {
2764        assert_eq!(validate(vec![blob("a"), blob("b")]).unwrap(), (0, 2));
2765    }
2766
2767    #[test]
2768    fn no_max_column_is_rejected() {
2769        assert!(matches!(
2770            validate(vec![scalar("id"), scalar("j")]),
2771            Err(Error::Protocol(_))
2772        ));
2773    }
2774
2775    #[test]
2776    fn scalar_after_blob_is_rejected() {
2777        // Interleaved MAX columns are out of scope: a scalar after a blob.
2778        assert!(matches!(
2779            validate(vec![scalar("id"), blob("doc"), scalar("trailing")]),
2780            Err(Error::Protocol(_))
2781        ));
2782        // ...even when more blobs follow the interloping scalar.
2783        assert!(matches!(
2784            validate(vec![blob("doc1"), scalar("mid"), blob("doc2")]),
2785            Err(Error::Protocol(_))
2786        ));
2787    }
2788}