Skip to main content

mssql_client/
client.rs

1//! SQL Server client implementation.
2//!
3//! ## DDL and statement routing
4//!
5//! [`Client::execute`] routes automatically by parameter count: with no
6//! parameters it sends a SQL batch (which permits DDL such as `CREATE` / `ALTER`
7//! / `DROP`); with parameters it uses `sp_executesql`, whose procedure context
8//! SQL Server forbids DDL in. Run DDL with an empty parameter slice:
9//!
10//! ```rust,no_run
11//! # async fn create_table(config: mssql_client::Config) -> Result<(), mssql_client::Error> {
12//! # let mut client = mssql_client::Client::connect(config).await?;
13//! client.execute("CREATE TABLE dbo.t (id INT)", &[]).await?;
14//! # Ok(())
15//! # }
16//! ```
17//!
18//! Use [`Client::simple_query`] for fire-and-forget batches (including
19//! multi-statement, `;`-separated DDL) when you don't need the affected-row count.
20
21// Allow unwrap/expect for chrono date construction with known-valid constant dates
22// and for regex patterns that are compile-time constants
23#![allow(clippy::unwrap_used, clippy::expect_used, clippy::needless_range_loop)]
24
25mod connect;
26mod params;
27pub(crate) mod response;
28
29use std::marker::PhantomData;
30
31use mssql_codec::connection::Connection;
32#[cfg(feature = "tls")]
33use mssql_tls::TlsStream;
34use tds_protocol::packet::PacketType;
35use tds_protocol::rpc::RpcRequest;
36use tds_protocol::token::{EnvChange, EnvChangeType};
37use tokio::net::TcpStream;
38
39use crate::config::Config;
40use crate::error::{Error, Result};
41#[cfg(feature = "otel")]
42use crate::instrumentation::InstrumentationContext;
43use crate::state::{ConnectionState, InTransaction, Ready};
44use crate::statement_cache::StatementCache;
45use crate::stream::{MultiResultStream, QueryStream};
46use crate::transaction::SavePoint;
47
48/// How long to wait for the server to acknowledge an Attention packet after
49/// a command timeout. SqlClient waits 5 seconds before dooming the
50/// connection; we match it.
51const ATTENTION_ACK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
52
53/// Run a network future under an optional command deadline.
54///
55/// On timeout this sends an Attention packet via `canceller` and then awaits
56/// the future so its own read loop drains the server's DONE_ATTN
57/// acknowledgement, leaving the connection clean before returning
58/// [`Error::CommandTimeout`]. This is the cancel-safe alternative to dropping
59/// the future (e.g. via `tokio::time::timeout`), which would leave unconsumed
60/// TDS data in the connection buffer and desync the next request.
61///
62/// The drain itself is bounded by [`ATTENTION_ACK_TIMEOUT`] — a hung server
63/// that never acknowledges the attention must not turn the timeout into an
64/// infinite wait. When the bound expires the connection is abandoned
65/// mid-response: `in_flight` stays set, so the pool discards the connection
66/// at check-in instead of reusing it.
67pub(crate) async fn run_with_deadline<F, T>(
68    fut: F,
69    deadline: Option<std::time::Duration>,
70    canceller: crate::cancel::CancelHandle,
71) -> Result<T>
72where
73    F: std::future::Future<Output = Result<T>>,
74{
75    let Some(d) = deadline else {
76        return fut.await;
77    };
78    tokio::pin!(fut);
79    tokio::select! {
80        biased;
81        res = &mut fut => res,
82        () = tokio::time::sleep(d) => {
83            // Signal cancellation, then let the in-flight read consume the
84            // server's attention acknowledgement so the connection stays usable.
85            let drain = async {
86                let _ = canceller.cancel().await;
87                let _ = (&mut fut).await;
88            };
89            if tokio::time::timeout(ATTENTION_ACK_TIMEOUT, drain).await.is_err() {
90                tracing::warn!(
91                    timeout = ?ATTENTION_ACK_TIMEOUT,
92                    "server did not acknowledge attention; abandoning the connection as dirty"
93                );
94            }
95            Err(Error::CommandTimeout)
96        }
97    }
98}
99
100/// SQL Server client with type-state connection management.
101///
102/// The generic parameter `S` represents the current connection state,
103/// ensuring at compile time that certain operations are only available
104/// in appropriate states.
105pub struct Client<S: ConnectionState> {
106    config: Config,
107    _state: PhantomData<S>,
108    /// The underlying connection (present only when connected)
109    connection: Option<ConnectionHandle>,
110    /// Server version from LoginAck (raw u32 TDS version)
111    server_version: Option<u32>,
112    /// Current database from EnvChange
113    current_database: Option<String>,
114    /// Server's default collation from SqlCollation EnvChange during login.
115    /// Used when `SendStringParametersAsUnicode=false` to encode VARCHAR
116    /// parameters with the correct character encoding and collation bytes.
117    server_collation: Option<tds_protocol::token::Collation>,
118    /// Prepared statement cache for query optimization
119    statement_cache: StatementCache,
120    /// Transaction descriptor from BeginTransaction EnvChange.
121    /// Per MS-TDS spec, this value must be included in ALL_HEADERS for subsequent
122    /// requests within an explicit transaction. 0 indicates auto-commit mode.
123    transaction_descriptor: u64,
124    /// Whether a request has been sent and the response has not yet been fully read.
125    /// Used by the connection pool to detect dirty connections after cancel/timeout.
126    in_flight: bool,
127    /// Whether this connection needs a reset on next use.
128    /// Set by connection pool on checkin, cleared after first query/execute.
129    /// When true, the RESETCONNECTION flag is set on the first TDS packet.
130    needs_reset: bool,
131    /// OpenTelemetry instrumentation context (when otel feature is enabled)
132    #[cfg(feature = "otel")]
133    instrumentation: InstrumentationContext,
134    /// Always Encrypted context for column decryption (when always-encrypted feature is enabled)
135    #[cfg(feature = "always-encrypted")]
136    pub(crate) encryption_context: Option<std::sync::Arc<crate::encryption::EncryptionContext>>,
137}
138
139/// Internal connection handle wrapping the actual connection.
140///
141/// This is an enum to support different connection types:
142/// - TLS (TDS 8.0 strict mode) - requires `tls` feature
143/// - TLS with PreLogin wrapping (TDS 7.x style) - requires `tls` feature
144/// - Plain TCP (for internal networks or when `tls` feature is disabled)
145#[allow(dead_code)] // Connection will be used once query execution is implemented
146enum ConnectionHandle {
147    /// TLS connection (TDS 8.0 strict mode - TLS before any TDS traffic)
148    #[cfg(feature = "tls")]
149    Tls(Connection<TlsStream<TcpStream>>),
150    /// TLS connection with PreLogin wrapping (TDS 7.x style)
151    #[cfg(feature = "tls")]
152    TlsPrelogin(Connection<TlsStream<mssql_tls::TlsPreloginWrapper<TcpStream>>>),
153    /// Plain TCP connection (for internal networks or when `tls` feature is disabled)
154    Plain(Connection<TcpStream>),
155}
156
157/// The parameter `TypeInfo` to declare a typed NULL ([`crate::null`]) with, from
158/// its [`crate::ToSql::sql_type`] name. Returns `None` for an untyped NULL
159/// (`Option::None`, type `"NULL"`), which falls back to the default param type.
160#[cfg(feature = "always-encrypted")]
161fn null_param_type_info(sql_type: &str) -> Option<tds_protocol::rpc::TypeInfo> {
162    use tds_protocol::rpc::TypeInfo;
163    Some(match sql_type {
164        "BIT" => TypeInfo::bit(),
165        "TINYINT" => TypeInfo::tinyint(),
166        "SMALLINT" => TypeInfo::smallint(),
167        "INT" => TypeInfo::int(),
168        "BIGINT" => TypeInfo::bigint(),
169        "REAL" => TypeInfo::real(),
170        "FLOAT" => TypeInfo::float(),
171        "NVARCHAR" => TypeInfo::nvarchar(1),
172        "VARBINARY" => TypeInfo::varbinary(1),
173        "UNIQUEIDENTIFIER" => TypeInfo::uuid(),
174        "DATE" => TypeInfo::date(),
175        _ => return None,
176    })
177}
178
179/// Map a typed-parameter wrapper's [`EncryptedParamType`] to the `TypeInfo` the
180/// driver declares it as (for `sp_describe_parameter_encryption` and the
181/// `CryptoMetadata` base type). Unknown future variants error rather than
182/// silently declaring the wrong type.
183#[cfg(feature = "always-encrypted")]
184fn encrypted_param_type_info(
185    ty: mssql_types::EncryptedParamType,
186) -> Result<tds_protocol::rpc::TypeInfo> {
187    use mssql_types::EncryptedParamType as E;
188    use tds_protocol::rpc::TypeInfo;
189    Ok(match ty {
190        E::Decimal { precision, scale } => TypeInfo::decimal(precision, scale),
191        E::Time { scale } => TypeInfo::time(scale),
192        E::DateTime2 { scale } => TypeInfo::datetime2(scale),
193        E::DateTimeOffset { scale } => TypeInfo::datetimeoffset(scale),
194        E::DateTime => TypeInfo::datetime(),
195        E::Char { length } => TypeInfo::char(length),
196        E::NChar { length } => TypeInfo::nchar(length),
197        E::Binary { length } => TypeInfo::binary(length),
198        _ => {
199            return Err(Error::Encryption(
200                "unsupported Always Encrypted parameter type".to_string(),
201            ));
202        }
203    })
204}
205
206// Private helper methods available to all connection states
207impl<S: ConnectionState> Client<S> {
208    /// The default per-command deadline from `command_timeout`.
209    ///
210    /// Returns `None` when `command_timeout` is zero, which means "no limit"
211    /// (matching ADO.NET's `SqlCommand.CommandTimeout = 0`).
212    pub(crate) fn command_deadline(&self) -> Option<std::time::Duration> {
213        let t = self.config.command_timeout;
214        if t.is_zero() { None } else { Some(t) }
215    }
216
217    /// Build a cancel handle for the current connection, regardless of
218    /// connection state. The public, documented surface is
219    /// [`Client::<Ready>::cancel_handle`]; both state-specific methods
220    /// delegate here.
221    pub(crate) fn connection_cancel_handle(&self) -> crate::cancel::CancelHandle {
222        let connection = self
223            .connection
224            .as_ref()
225            .expect("connection should be present");
226        match connection {
227            #[cfg(feature = "tls")]
228            ConnectionHandle::Tls(conn) => {
229                crate::cancel::CancelHandle::from_tls(conn.cancel_handle())
230            }
231            #[cfg(feature = "tls")]
232            ConnectionHandle::TlsPrelogin(conn) => {
233                crate::cancel::CancelHandle::from_tls_prelogin(conn.cancel_handle())
234            }
235            ConnectionHandle::Plain(conn) => {
236                crate::cancel::CancelHandle::from_plain(conn.cancel_handle())
237            }
238        }
239    }
240
241    /// Cancel an in-flight response that was abandoned without being drained —
242    /// e.g. a [`RowStream`](crate::RowStream) dropped or cancelled mid-result.
243    ///
244    /// Sends an Attention and drains to the server's DONE_ATTN acknowledgement so
245    /// the socket is clean and the connection reusable. A no-op when nothing is
246    /// in flight. Bounded by [`ATTENTION_ACK_TIMEOUT`]: if the acknowledgement
247    /// never arrives the connection is left marked in-flight (so the pool
248    /// discards it on return) and an error is returned.
249    pub(crate) async fn cancel_in_flight_response(&mut self) -> Result<()> {
250        if !self.in_flight {
251            return Ok(());
252        }
253        let canceller = self.connection_cancel_handle();
254        let drain = async {
255            canceller.cancel().await?;
256            // With the cancelling flag set, `read_response_message` routes through
257            // the codec's drain-after-cancel path and returns `Err(Cancelled)`
258            // once the DONE_ATTN acknowledgement is consumed (clearing
259            // `in_flight`). Any full messages that arrive before the ack are
260            // discarded.
261            loop {
262                match self.read_response_message().await {
263                    Err(Error::Cancelled) => return Ok(()),
264                    Ok(_) => continue,
265                    Err(e) => return Err(e),
266                }
267            }
268        };
269        match tokio::time::timeout(ATTENTION_ACK_TIMEOUT, drain).await {
270            Ok(result) => result,
271            Err(_) => {
272                tracing::warn!(
273                    timeout = ?ATTENTION_ACK_TIMEOUT,
274                    "attention acknowledgement not received while cancelling an \
275                     abandoned response; connection left dirty"
276                );
277                Err(Error::Cancelled)
278            }
279        }
280    }
281
282    /// Process transaction-related EnvChange tokens.
283    ///
284    /// This handles BeginTransaction, CommitTransaction, and RollbackTransaction
285    /// EnvChange tokens, updating the transaction descriptor accordingly.
286    ///
287    /// This enables executing BEGIN TRANSACTION, COMMIT, and ROLLBACK via raw SQL
288    /// while still having the transaction descriptor tracked correctly.
289    fn process_transaction_env_change(env: &EnvChange, transaction_descriptor: &mut u64) {
290        use tds_protocol::token::EnvChangeValue;
291
292        match env.env_type {
293            EnvChangeType::BeginTransaction => {
294                if let EnvChangeValue::Binary(ref data) = env.new_value {
295                    if data.len() >= 8 {
296                        let descriptor = u64::from_le_bytes([
297                            data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
298                        ]);
299                        tracing::debug!(descriptor = descriptor, "transaction started via raw SQL");
300                        *transaction_descriptor = descriptor;
301                    }
302                }
303            }
304            EnvChangeType::CommitTransaction | EnvChangeType::RollbackTransaction => {
305                tracing::debug!(
306                    env_type = ?env.env_type,
307                    "transaction ended via raw SQL"
308                );
309                *transaction_descriptor = 0;
310            }
311            _ => {}
312        }
313    }
314
315    /// Apply a transaction-related `ENVCHANGE` to this client's descriptor.
316    ///
317    /// Lets the streaming readers (which live in sibling modules) keep the
318    /// transaction descriptor in sync with raw `BEGIN`/`COMMIT`/`ROLLBACK`
319    /// batches seen mid-stream, exactly as the buffered readers do.
320    pub(crate) fn apply_transaction_env_change(&mut self, env: &EnvChange) {
321        Self::process_transaction_env_change(env, &mut self.transaction_descriptor);
322    }
323
324    /// Send a SQL batch to the server.
325    ///
326    /// Uses the client's current transaction descriptor in ALL_HEADERS.
327    /// Per MS-TDS spec, when in an explicit transaction, the descriptor
328    /// returned by BeginTransaction must be included.
329    ///
330    /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
331    /// is included in the first packet to reset connection state.
332    async fn send_sql_batch(&mut self, sql: &str) -> Result<()> {
333        // If a previous streamed response was abandoned (a RowStream dropped
334        // mid-result), drain it before issuing a new request so the next read
335        // does not pick up the old response's bytes.
336        self.cancel_in_flight_response().await?;
337
338        let payload = tds_protocol::__private::encode_sql_batch_with_transaction(
339            sql,
340            self.transaction_descriptor,
341        );
342        let max_packet = self.config.packet_size as usize;
343
344        // Check if we need to reset the connection on this request
345        let reset = self.needs_reset;
346        if reset {
347            self.needs_reset = false; // Clear flag before sending
348            // RESETCONNECTION invalidates all server-side prepared handles, so
349            // drop the cache (no sp_unprepare needed — the server released them).
350            let _ = self.statement_cache.clear();
351            tracing::debug!("sending SQL batch with RESETCONNECTION flag");
352        }
353
354        self.in_flight = true;
355        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
356
357        match connection {
358            #[cfg(feature = "tls")]
359            ConnectionHandle::Tls(conn) => {
360                conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
361                    .await?;
362            }
363            #[cfg(feature = "tls")]
364            ConnectionHandle::TlsPrelogin(conn) => {
365                conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
366                    .await?;
367            }
368            ConnectionHandle::Plain(conn) => {
369                conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
370                    .await?;
371            }
372        }
373
374        Ok(())
375    }
376
377    /// Send an RPC request to the server.
378    ///
379    /// Uses the client's current transaction descriptor in ALL_HEADERS.
380    ///
381    /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
382    /// is included in the first packet to reset connection state.
383    pub(crate) async fn send_rpc(&mut self, rpc: &RpcRequest) -> Result<()> {
384        // Drain an abandoned streamed response (see `send_sql_batch`) before
385        // issuing this request.
386        self.cancel_in_flight_response().await?;
387
388        let payload = rpc.encode_with_transaction(self.transaction_descriptor);
389        let max_packet = self.config.packet_size as usize;
390
391        // Check if we need to reset the connection on this request
392        let reset = self.needs_reset;
393        if reset {
394            self.needs_reset = false; // Clear flag before sending
395            // RESETCONNECTION invalidates all server-side prepared handles, so
396            // drop the cache (no sp_unprepare needed — the server released them).
397            let _ = self.statement_cache.clear();
398            tracing::debug!("sending RPC with RESETCONNECTION flag");
399        }
400
401        self.in_flight = true;
402        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
403
404        match connection {
405            #[cfg(feature = "tls")]
406            ConnectionHandle::Tls(conn) => {
407                conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
408                    .await?;
409            }
410            #[cfg(feature = "tls")]
411            ConnectionHandle::TlsPrelogin(conn) => {
412                conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
413                    .await?;
414            }
415            ConnectionHandle::Plain(conn) => {
416                conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
417                    .await?;
418            }
419        }
420
421        Ok(())
422    }
423
424    /// Start building a stored procedure call with full control over parameters.
425    ///
426    /// Returns a [`crate::procedure::ProcedureBuilder`] that allows adding named input and output
427    /// parameters before executing the call.
428    ///
429    /// The procedure name is validated to prevent SQL injection. It may be
430    /// schema-qualified (e.g., `"dbo.MyProc"`).
431    ///
432    /// # Example
433    ///
434    /// ```rust,no_run
435    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
436    /// let result = client.procedure("dbo.CalculateSum")?
437    ///     .input("@a", &10i32)
438    ///     .input("@b", &20i32)
439    ///     .output_int("@result")
440    ///     .execute().await?;
441    ///
442    /// let sum = result.get_output("@result").unwrap();
443    /// # let _ = sum;
444    /// # Ok(())
445    /// # }
446    /// ```
447    pub fn procedure(
448        &mut self,
449        proc_name: &str,
450    ) -> Result<crate::procedure::ProcedureBuilder<'_, S>> {
451        crate::validation::validate_qualified_identifier(proc_name)?;
452        Ok(crate::procedure::ProcedureBuilder::new(self, proc_name))
453    }
454
455    /// Execute a stored procedure with positional input parameters.
456    ///
457    /// This is a convenience method for the common case of calling a procedure
458    /// with input-only parameters. For output parameters or named parameters,
459    /// use [`procedure()`](Client::procedure) instead.
460    ///
461    /// # Example
462    ///
463    /// ```rust,no_run
464    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
465    /// let result = client.call_procedure("dbo.GetUser", &[&1i32]).await?;
466    /// assert_eq!(result.return_value, 0);
467    ///
468    /// if let Some(rs) = result.first_result_set() {
469    ///     println!("columns: {:?}", rs.columns());
470    /// }
471    /// # Ok(())
472    /// # }
473    /// ```
474    pub async fn call_procedure(
475        &mut self,
476        proc_name: &str,
477        params: &[&(dyn crate::ToSql + Sync)],
478    ) -> Result<crate::stream::ProcedureResult> {
479        crate::validation::validate_qualified_identifier(proc_name)?;
480
481        tracing::debug!(
482            proc_name = proc_name,
483            params_count = params.len(),
484            "executing stored procedure"
485        );
486
487        let rpc_params =
488            Self::convert_params_positional(params, self.send_unicode(), self.server_collation())?;
489        let mut rpc = RpcRequest::named(proc_name);
490        for param in rpc_params {
491            rpc = rpc.param(param);
492        }
493
494        #[cfg(feature = "otel")]
495        let instrumentation = self.instrumentation.clone();
496        #[cfg(feature = "otel")]
497        let mut span = instrumentation.procedure_span(proc_name);
498        #[cfg(feature = "otel")]
499        let timer = crate::instrumentation::OperationTimer::start("EXECUTE");
500
501        let deadline = self.command_deadline();
502        let canceller = self.connection_cancel_handle();
503        let result = run_with_deadline(
504            async {
505                self.send_rpc(&rpc).await?;
506                self.read_procedure_result().await
507            },
508            deadline,
509            canceller,
510        )
511        .await;
512
513        #[cfg(feature = "otel")]
514        match &result {
515            Ok(r) => InstrumentationContext::record_success(&mut span, Some(r.rows_affected)),
516            Err(e) => InstrumentationContext::record_error(&mut span, e),
517        }
518        #[cfg(feature = "otel")]
519        timer.finish(instrumentation.metrics(), result.is_ok());
520        #[cfg(feature = "otel")]
521        drop(span);
522
523        result
524    }
525
526    /// Ask the server how each parameter of a statement must be encrypted.
527    ///
528    /// Issues the `sp_describe_parameter_encryption` system RPC for the
529    /// parameterized statement `tsql` with the parameter declaration `params`
530    /// (e.g. `"@id int, @name nvarchar(64)"`), and parses the two result sets
531    /// into a [`ParameterEncryptionInfo`](crate::encryption::ParameterEncryptionInfo): the
532    /// CEK table, plus — for each parameter the server reports as encrypted —
533    /// which CEK and whether deterministic or randomized. Parameters the server
534    /// reports as plaintext are omitted.
535    ///
536    /// This is the first step of Always Encrypted parameter encryption; the
537    /// connection must have negotiated it (`Column Encryption Setting=Enabled`).
538    #[cfg(feature = "always-encrypted")]
539    pub(crate) async fn describe_parameter_encryption(
540        &mut self,
541        tsql: &str,
542        params: &str,
543    ) -> Result<crate::encryption::ParameterEncryptionInfo> {
544        let tsql_arg = tsql.to_string();
545        let params_arg = params.to_string();
546        let mut result = self
547            .call_procedure(
548                "sp_describe_parameter_encryption",
549                &[&tsql_arg, &params_arg],
550            )
551            .await?;
552        crate::encryption::ParameterEncryptionInfo::from_describe_result_sets(
553            &mut result.result_sets,
554        )
555    }
556
557    /// Build the `sp_executesql` request for a parameterized statement.
558    ///
559    /// When the connection has Always Encrypted enabled, parameters the server
560    /// reports as encrypted are encrypted client-side first (an extra
561    /// `sp_describe_parameter_encryption` round-trip). Otherwise this is the
562    /// plain parameter conversion.
563    pub(crate) async fn build_parameterized_rpc(
564        &mut self,
565        sql: &str,
566        params: &[&(dyn crate::ToSql + Sync)],
567    ) -> Result<RpcRequest> {
568        #[cfg(feature = "always-encrypted")]
569        if self.encryption_context.is_some() {
570            return self.build_encrypted_sql_rpc(sql, params).await;
571        }
572        let rpc_params =
573            Self::convert_params(params, self.send_unicode(), self.server_collation())?;
574        Ok(RpcRequest::execute_sql(sql, rpc_params))
575    }
576
577    /// Send a parameterized `query` request, consulting the prepared-statement
578    /// cache when [`Config::statement_cache`](crate::Config::statement_cache)
579    /// is enabled. Leaves the execution response ready for the caller's
580    /// `read_query_response`.
581    ///
582    /// Falls back to the default path (SQL batch for no params, `sp_executesql`
583    /// otherwise) when the cache is disabled, the query has no parameters, or
584    /// Always Encrypted is active (prepared + AE parameter encryption is out of
585    /// scope for this first increment).
586    async fn send_query_request(
587        &mut self,
588        sql: &str,
589        params: &[&(dyn crate::ToSql + Sync)],
590    ) -> Result<()> {
591        #[cfg(feature = "always-encrypted")]
592        let ae_active = self.encryption_context.is_some();
593        #[cfg(not(feature = "always-encrypted"))]
594        let ae_active = false;
595
596        if !self.config.statement_cache || params.is_empty() || ae_active {
597            if params.is_empty() {
598                self.send_sql_batch(sql).await?;
599            } else {
600                let rpc = self.build_parameterized_rpc(sql, params).await?;
601                self.send_rpc(&rpc).await?;
602            }
603            return Ok(());
604        }
605
606        let rpc_params =
607            Self::convert_params(params, self.send_unicode(), self.server_collation())?;
608        // Key on the parameter declaration + SQL: a cached handle is only valid
609        // for the exact prepared parameter types, so two calls with the same
610        // SQL but different param types must not share a handle.
611        let key = format!(
612            "{}\u{1}{sql}",
613            RpcRequest::build_param_declarations(&rpc_params)
614        );
615
616        if let Some(handle) = self.statement_cache.get(&key) {
617            // Hit: sp_execute the cached handle. Clear any stale pending key
618            // (e.g. from a prior request whose read aborted) so the read path
619            // does not try to capture a handle from this response.
620            self.statement_cache.set_pending(None);
621            let rpc = RpcRequest::execute(handle, rpc_params);
622            self.send_rpc(&rpc).await?;
623        } else {
624            // Miss: sp_prepexec prepares and executes in ONE round-trip. The
625            // caller's read_query_response reads the row response and captures
626            // the `@handle` RETURNVALUE, then stores it under `key` (see
627            // `store_pending_prepared_handle`).
628            self.statement_cache.set_pending(Some(key));
629            let rpc = RpcRequest::prepexec(sql, rpc_params);
630            self.send_rpc(&rpc).await?;
631        }
632        Ok(())
633    }
634
635    /// Store the handle captured from an `sp_prepexec` execution response under
636    /// the pending cache key (set by [`send_query_request`](Self::send_query_request)
637    /// on a cold miss), releasing any LRU-evicted server-side handle.
638    ///
639    /// Called by `read_query_response` after it has read the row response and
640    /// the trailing `@handle` RETURNVALUE. A no-op when no prepexec is pending.
641    /// Eviction `sp_unprepare` is best-effort: a failure leaks one handle until
642    /// connection reset, never corrupts data.
643    pub(super) async fn store_pending_prepared_handle(
644        &mut self,
645        handle: Option<i32>,
646    ) -> Result<()> {
647        let Some(key) = self.statement_cache.take_pending() else {
648            return Ok(());
649        };
650        let Some(handle) = handle else {
651            // No @handle came back (unexpected for sp_prepexec): leave the
652            // statement uncached so the next call simply re-prepares.
653            return Ok(());
654        };
655        if let Some(evicted) = self
656            .statement_cache
657            .insert(crate::statement_cache::PreparedStatement::new(handle, key))
658        {
659            let unprepare = RpcRequest::unprepare(evicted.handle());
660            self.send_rpc(&unprepare).await?;
661            let _ = self.read_procedure_result().await?;
662        }
663        Ok(())
664    }
665
666    /// Encrypt the Always Encrypted parameters of a statement, then build its
667    /// `sp_executesql` request.
668    ///
669    /// Asks the server which parameters are encrypted
670    /// ([`describe_parameter_encryption`](Self::describe_parameter_encryption)),
671    /// then for each one normalizes the value, resolves its column encryption
672    /// key, encrypts, and emits an encrypted RPC parameter. Parameters the
673    /// server reports as plaintext are sent unchanged.
674    #[cfg(feature = "always-encrypted")]
675    async fn build_encrypted_sql_rpc(
676        &mut self,
677        sql: &str,
678        params: &[&(dyn crate::ToSql + Sync)],
679    ) -> Result<RpcRequest> {
680        use tds_protocol::rpc::RpcParam;
681
682        let Some(ctx) = self.encryption_context.clone() else {
683            let rpc_params =
684                Self::convert_params(params, self.send_unicode(), self.server_collation())?;
685            return Ok(RpcRequest::execute_sql(sql, rpc_params));
686        };
687
688        // Resolve each parameter's value once (AE normalization needs the typed
689        // value, not the wire encoding) and build the plaintext RPC params.
690        let send_unicode = self.send_unicode();
691        let collation = self.server_collation().cloned();
692        let mut values: Vec<mssql_types::SqlValue> = Vec::with_capacity(params.len());
693        let mut plaintext: Vec<RpcParam> = Vec::with_capacity(params.len());
694        let mut hints: Vec<Option<mssql_types::EncryptedParamType>> =
695            Vec::with_capacity(params.len());
696        for (i, p) in params.iter().enumerate() {
697            let name = format!("@p{}", i + 1);
698            let value = p.to_sql()?;
699            let hint = p.encrypted_param_type();
700            // A typed NULL (e.g. `null::<i32>()`) is declared by its SQL type so
701            // describe accepts it against the target encrypted column; an untyped
702            // NULL falls back to the default in `sql_value_to_rpc_param`.
703            let rpc_param = match (&value, null_param_type_info(p.sql_type())) {
704                (mssql_types::SqlValue::Null, Some(type_info)) => RpcParam::null(&name, type_info),
705                _ => {
706                    let mut param = Self::sql_value_to_rpc_param(
707                        &name,
708                        &value,
709                        send_unicode,
710                        collation.as_ref(),
711                    )?;
712                    // A typed-parameter wrapper (e.g. `numeric(v, p, s)`,
713                    // `datetime2(v, scale)`) declares an explicit SQL type so
714                    // describe matches the encrypted column exactly — the value
715                    // alone cannot convey precision/scale or the legacy-`datetime`
716                    // vs `datetime2` distinction.
717                    if let Some(ty) = hint {
718                        param.type_info = encrypted_param_type_info(ty)?;
719                    }
720                    param
721                }
722            };
723            plaintext.push(rpc_param);
724            values.push(value);
725            hints.push(hint);
726        }
727
728        if plaintext.is_empty() {
729            return Ok(RpcRequest::execute_sql(sql, plaintext));
730        }
731
732        // Ask the server which parameters need encryption.
733        let declarations = RpcRequest::build_param_declarations(&plaintext);
734        let info = self
735            .describe_parameter_encryption(sql, &declarations)
736            .await?;
737        if info.parameters.is_empty() {
738            return Ok(RpcRequest::execute_sql(sql, plaintext));
739        }
740
741        // Encrypt the flagged parameters; pass the rest through untouched.
742        let mut final_params: Vec<RpcParam> = Vec::with_capacity(plaintext.len());
743        for ((value, param), hint) in values.into_iter().zip(plaintext).zip(hints) {
744            let Some(crypto) = info.get_parameter(&param.name) else {
745                final_params.push(param);
746                continue;
747            };
748            let entry = info.cek_table.get(crypto.cek_ordinal).ok_or_else(|| {
749                Error::Protocol(format!(
750                    "encrypted parameter {} references missing CEK ordinal {}",
751                    param.name, crypto.cek_ordinal
752                ))
753            })?;
754            let metadata = tds_protocol::rpc::EncryptedParamMetadata {
755                base_type_info: param.type_info.clone(),
756                algorithm_id: crypto.algorithm_id,
757                encryption_type: crypto.encryption_type,
758                database_id: entry.database_id,
759                cek_id: entry.cek_id,
760                cek_version: entry.cek_version,
761                cek_md_version: entry.cek_md_version,
762                normalization_rule_version: crypto.normalization_rule_version,
763            };
764            // A NULL value bound to an encrypted column is sent as an encrypted
765            // NULL (the server rejects a plaintext parameter for an encrypted
766            // column); there is nothing to encrypt.
767            if matches!(value, mssql_types::SqlValue::Null) {
768                final_params.push(RpcParam::encrypted_null(param.name, metadata));
769                continue;
770            }
771            let normalized = crate::encryption::normalize_for_encryption(&value, hint)?;
772            let ciphertext = ctx
773                .encrypt_value(&normalized, entry, crypto.encryption_type)
774                .await?;
775            final_params.push(RpcParam::encrypted(
776                param.name,
777                bytes::Bytes::from(ciphertext),
778                metadata,
779            ));
780        }
781
782        Ok(RpcRequest::execute_sql(sql, final_params))
783    }
784
785    /// Start a bulk insert operation for the specified table.
786    ///
787    /// Sends the `INSERT BULK` statement to the server and returns a
788    /// [`crate::bulk::BulkWriter`] for streaming rows. The writer holds
789    /// a mutable borrow on the client, preventing other operations while
790    /// the bulk insert is in progress.
791    ///
792    /// # Example
793    ///
794    /// ```rust,no_run
795    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
796    /// use mssql_client::{BulkInsertBuilder, BulkColumn, SqlValue};
797    ///
798    /// let builder = BulkInsertBuilder::new("dbo.Users")
799    ///     .with_typed_columns(vec![
800    ///         BulkColumn::new("id", "INT", 0)?,
801    ///         BulkColumn::new("name", "NVARCHAR(100)", 1)?,
802    ///     ]);
803    ///
804    /// let mut writer = client.bulk_insert(&builder).await?;
805    /// writer.send_row_values(&[SqlValue::Int(1), SqlValue::String("Alice".into())])?;
806    /// writer.send_row_values(&[SqlValue::Int(2), SqlValue::String("Bob".into())])?;
807    /// let result = writer.finish().await?;
808    /// println!("Inserted {} rows", result.rows_affected);
809    /// # Ok(())
810    /// # }
811    /// ```
812    pub async fn bulk_insert(
813        &mut self,
814        builder: &crate::bulk::BulkInsertBuilder,
815    ) -> Result<crate::bulk::BulkWriter<'_, S>> {
816        use tds_protocol::token::{ColMetaData, Token};
817
818        tracing::debug!(
819            table = builder.table_name(),
820            columns = builder.columns().len(),
821            "starting bulk insert"
822        );
823
824        // Step 1: Query the server for column metadata.
825        // This gives us the exact type encoding the server expects for BulkLoad,
826        // following the pattern established by Tiberius.
827        let meta_query = format!("SELECT TOP 0 * FROM {}", builder.table_name());
828        let deadline = self.command_deadline();
829        let canceller = self.connection_cancel_handle();
830        let message = run_with_deadline(
831            async {
832                self.send_sql_batch(&meta_query).await?;
833                self.read_response_message().await
834            },
835            deadline,
836            canceller,
837        )
838        .await?;
839        self.in_flight = false;
840
841        // Capture both the raw COLMETADATA bytes and parsed column info
842        let raw_payload = message.payload.clone();
843        let mut parser = self.create_parser(message.payload);
844        let mut server_metadata: Option<ColMetaData> = None;
845        let mut meta_start: usize = 0;
846        let mut meta_end: usize = 0;
847
848        loop {
849            let pos_before = raw_payload.len() - parser.remaining();
850            let token = parser.next_token_with_metadata(server_metadata.as_ref())?;
851            let pos_after = raw_payload.len() - parser.remaining();
852            let Some(token) = token else { break };
853
854            match token {
855                Token::ColMetaData(meta) => {
856                    meta_start = pos_before;
857                    meta_end = pos_after;
858                    server_metadata = Some(meta);
859                }
860                Token::Done(_) => break,
861                _ => {}
862            }
863        }
864
865        // Reject deprecated TEXT/NTEXT/IMAGE columns reported by the server.
866        // These types require a legacy TEXTPTR wire format that this driver
867        // does not support — users should migrate the column to VARCHAR(MAX) /
868        // NVARCHAR(MAX) / VARBINARY(MAX).
869        if let Some(ref meta) = server_metadata {
870            use tds_protocol::types::TypeId;
871            for col in meta.columns.iter() {
872                let (rejected, replacement) = match col.type_id {
873                    TypeId::Text => (Some("TEXT"), "VARCHAR(MAX)"),
874                    TypeId::NText => (Some("NTEXT"), "NVARCHAR(MAX)"),
875                    TypeId::Image => (Some("IMAGE"), "VARBINARY(MAX)"),
876                    _ => (None, ""),
877                };
878                if let Some(sql_type) = rejected {
879                    return Err(Error::from(mssql_types::TypeError::UnsupportedType {
880                        sql_type: sql_type.to_string(),
881                        reason: format!(
882                            "column `{}` in table `{}` is {} — TEXT/NTEXT/IMAGE \
883                             are not supported. Alter the column to {} instead \
884                             (Microsoft deprecated TEXT/NTEXT/IMAGE in SQL \
885                             Server 2005).",
886                            col.name,
887                            builder.table_name(),
888                            sql_type,
889                            replacement,
890                        ),
891                    }));
892                }
893            }
894        }
895
896        // Step 2: Send INSERT BULK statement to put server in bulk load mode
897        let stmt = builder.build_insert_bulk_statement()?;
898        let deadline = self.command_deadline();
899        let canceller = self.connection_cancel_handle();
900        run_with_deadline(
901            async {
902                self.send_sql_batch(&stmt).await?;
903                self.read_execute_result().await
904            },
905            deadline,
906            canceller,
907        )
908        .await?;
909
910        // Step 3: Create bulk writer with server's metadata
911        let raw_meta = if meta_end > meta_start {
912            Some(raw_payload.slice(meta_start..meta_end))
913        } else {
914            None
915        };
916
917        let server_cols = server_metadata.as_ref().map(|m| m.columns.as_slice());
918        let bulk = crate::bulk::BulkInsert::new_with_server_metadata(
919            builder.columns().to_vec(),
920            builder.options().batch_size,
921            raw_meta,
922            server_cols,
923        );
924
925        Ok(crate::bulk::BulkWriter::new(self, bulk))
926    }
927
928    /// Start a bulk insert without querying the server for column metadata.
929    ///
930    /// Unlike [`bulk_insert()`](Self::bulk_insert), this method does not send
931    /// `SELECT TOP 0 * FROM table` to discover column types. Instead, the
932    /// column metadata is constructed from the `BulkColumn` types provided
933    /// on the builder. This saves a round-trip when the schema is known.
934    ///
935    /// # Caveats
936    ///
937    /// The caller must ensure `BulkColumn` entries match the target table's
938    /// column definitions exactly. Mismatched types, lengths, precision/scale,
939    /// or column ordering will cause the server to reject the BulkLoad packet.
940    ///
941    /// For most use cases, prefer [`bulk_insert()`](Self::bulk_insert) — the
942    /// extra round-trip is usually negligible and the server-supplied metadata
943    /// is guaranteed correct.
944    pub async fn bulk_insert_without_schema_discovery(
945        &mut self,
946        builder: &crate::bulk::BulkInsertBuilder,
947    ) -> Result<crate::bulk::BulkWriter<'_, S>> {
948        tracing::debug!(
949            table = builder.table_name(),
950            columns = builder.columns().len(),
951            "starting bulk insert (no schema discovery)"
952        );
953
954        // Send INSERT BULK statement to put server in bulk load mode
955        let stmt = builder.build_insert_bulk_statement()?;
956        let deadline = self.command_deadline();
957        let canceller = self.connection_cancel_handle();
958        run_with_deadline(
959            async {
960                self.send_sql_batch(&stmt).await?;
961                self.read_execute_result().await
962            },
963            deadline,
964            canceller,
965        )
966        .await?;
967
968        // Create bulk writer with hand-crafted metadata
969        let bulk =
970            crate::bulk::BulkInsert::new(builder.columns().to_vec(), builder.options().batch_size);
971
972        Ok(crate::bulk::BulkWriter::new(self, bulk))
973    }
974
975    /// Send bulk load data as a BulkLoad (0x07) message and read the server response.
976    ///
977    /// Used internally by [`crate::bulk::BulkWriter::finish()`] to transmit accumulated
978    /// row data after the `INSERT BULK` statement has been acknowledged.
979    pub(crate) async fn send_and_read_bulk_load(&mut self, payload: bytes::Bytes) -> Result<u64> {
980        let max_packet = self.config.packet_size as usize;
981
982        self.in_flight = true;
983        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
984
985        match connection {
986            #[cfg(feature = "tls")]
987            ConnectionHandle::Tls(conn) => {
988                conn.send_message(PacketType::BulkLoad, payload, max_packet)
989                    .await?;
990            }
991            #[cfg(feature = "tls")]
992            ConnectionHandle::TlsPrelogin(conn) => {
993                conn.send_message(PacketType::BulkLoad, payload, max_packet)
994                    .await?;
995            }
996            ConnectionHandle::Plain(conn) => {
997                conn.send_message(PacketType::BulkLoad, payload, max_packet)
998                    .await?;
999            }
1000        }
1001
1002        // Read the server's Done response with row count
1003        self.read_execute_result().await
1004    }
1005
1006    /// Execute a query with named parameters and return a streaming result set.
1007    ///
1008    /// This method accepts [`NamedParam`](crate::to_params::NamedParam) values,
1009    /// making it compatible with the [`ToParams`](crate::to_params::ToParams) trait
1010    /// and the `#[derive(ToParams)]` macro.
1011    ///
1012    /// # Example
1013    ///
1014    /// ```rust,no_run
1015    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1016    /// use mssql_client::{NamedParam, ToParams};
1017    ///
1018    /// // With derive macro:
1019    /// #[derive(mssql_derive::ToParams)]
1020    /// struct UserQuery { name: String }
1021    ///
1022    /// let q = UserQuery { name: "Alice".into() };
1023    /// let rows = client.query_named(
1024    ///     "SELECT * FROM users WHERE name = @name",
1025    ///     &q.to_params()?,
1026    /// ).await?;
1027    ///
1028    /// // Or manually:
1029    /// let params = vec![NamedParam::from_value("name", &"Alice")?];
1030    /// let rows = client.query_named(
1031    ///     "SELECT * FROM users WHERE name = @name",
1032    ///     &params,
1033    /// ).await?;
1034    /// # let _ = rows;
1035    /// # Ok(())
1036    /// # }
1037    /// ```
1038    pub async fn query_named<'a>(
1039        &'a mut self,
1040        sql: &str,
1041        params: &[crate::to_params::NamedParam],
1042    ) -> Result<QueryStream<'a>> {
1043        tracing::debug!(
1044            sql = sql,
1045            params_count = params.len(),
1046            "executing query with named parameters"
1047        );
1048
1049        #[cfg(feature = "otel")]
1050        let instrumentation = self.instrumentation.clone();
1051        #[cfg(feature = "otel")]
1052        let mut span = instrumentation.query_span(sql);
1053        #[cfg(feature = "otel")]
1054        let timer = crate::instrumentation::OperationTimer::start(
1055            crate::instrumentation::extract_operation(sql),
1056        );
1057
1058        let result = async {
1059            if params.is_empty() {
1060                self.send_sql_batch(sql).await?;
1061            } else {
1062                let rpc_params = Self::convert_named_params(
1063                    params,
1064                    self.send_unicode(),
1065                    self.server_collation(),
1066                )?;
1067                let rpc = RpcRequest::execute_sql(sql, rpc_params);
1068                self.send_rpc(&rpc).await?;
1069            }
1070
1071            self.read_query_response().await
1072        }
1073        .await;
1074
1075        #[cfg(feature = "otel")]
1076        match &result {
1077            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1078            Err(e) => InstrumentationContext::record_error(&mut span, e),
1079        }
1080        #[cfg(feature = "otel")]
1081        timer.finish(instrumentation.metrics(), result.is_ok());
1082        #[cfg(feature = "otel")]
1083        drop(span);
1084
1085        let resp = result?;
1086        #[cfg(feature = "always-encrypted")]
1087        {
1088            Ok(QueryStream::from_raw(
1089                resp.columns,
1090                resp.pending_rows,
1091                resp.meta,
1092                resp.decryptor,
1093            ))
1094        }
1095        #[cfg(not(feature = "always-encrypted"))]
1096        {
1097            Ok(QueryStream::from_raw(
1098                resp.columns,
1099                resp.pending_rows,
1100                resp.meta,
1101            ))
1102        }
1103    }
1104
1105    /// Execute a statement with named parameters.
1106    ///
1107    /// Returns the number of affected rows. This is the named-parameter
1108    /// counterpart of [`execute()`](Client::execute), compatible with the
1109    /// [`ToParams`](crate::to_params::ToParams) trait.
1110    ///
1111    /// # Example
1112    ///
1113    /// ```rust,no_run
1114    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1115    /// use mssql_client::NamedParam;
1116    ///
1117    /// let params = vec![
1118    ///     NamedParam::from_value("name", &"Alice")?,
1119    ///     NamedParam::from_value("email", &"alice@example.com")?,
1120    /// ];
1121    /// let rows_affected = client.execute_named(
1122    ///     "INSERT INTO users (name, email) VALUES (@name, @email)",
1123    ///     &params,
1124    /// ).await?;
1125    /// # let _ = rows_affected;
1126    /// # Ok(())
1127    /// # }
1128    /// ```
1129    pub async fn execute_named(
1130        &mut self,
1131        sql: &str,
1132        params: &[crate::to_params::NamedParam],
1133    ) -> Result<u64> {
1134        tracing::debug!(
1135            sql = sql,
1136            params_count = params.len(),
1137            "executing statement with named parameters"
1138        );
1139
1140        #[cfg(feature = "otel")]
1141        let instrumentation = self.instrumentation.clone();
1142        #[cfg(feature = "otel")]
1143        let mut span = instrumentation.query_span(sql);
1144        #[cfg(feature = "otel")]
1145        let timer = crate::instrumentation::OperationTimer::start(
1146            crate::instrumentation::extract_operation(sql),
1147        );
1148
1149        let deadline = self.command_deadline();
1150        let canceller = self.connection_cancel_handle();
1151        let result = run_with_deadline(
1152            async {
1153                if params.is_empty() {
1154                    self.send_sql_batch(sql).await?;
1155                } else {
1156                    let rpc_params = Self::convert_named_params(
1157                        params,
1158                        self.send_unicode(),
1159                        self.server_collation(),
1160                    )?;
1161                    let rpc = RpcRequest::execute_sql(sql, rpc_params);
1162                    self.send_rpc(&rpc).await?;
1163                }
1164
1165                self.read_execute_result().await
1166            },
1167            deadline,
1168            canceller,
1169        )
1170        .await;
1171
1172        #[cfg(feature = "otel")]
1173        match &result {
1174            Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
1175            Err(e) => InstrumentationContext::record_error(&mut span, e),
1176        }
1177        #[cfg(feature = "otel")]
1178        timer.finish(instrumentation.metrics(), result.is_ok());
1179        #[cfg(feature = "otel")]
1180        drop(span);
1181
1182        result
1183    }
1184
1185    /// The connection's OpenTelemetry instrumentation context.
1186    #[cfg(feature = "otel")]
1187    pub(crate) fn instrumentation(&self) -> &InstrumentationContext {
1188        &self.instrumentation
1189    }
1190
1191    /// Snapshot this connection's prepared-statement cache statistics.
1192    ///
1193    /// Reflects activity since the connection was established (or its last
1194    /// reset). Meaningful only when
1195    /// [`Config::statement_cache`](crate::Config::statement_cache) is enabled;
1196    /// otherwise the cache is never consulted and all counts stay zero.
1197    #[must_use]
1198    pub fn statement_cache_stats(&self) -> crate::StatementCacheStats {
1199        self.statement_cache.stats()
1200    }
1201
1202    /// Whether string parameters are sent as NVARCHAR (Unicode).
1203    pub(crate) fn send_unicode(&self) -> bool {
1204        self.config.send_string_parameters_as_unicode
1205    }
1206
1207    /// Server's default collation, captured from ENVCHANGE during login.
1208    pub(crate) fn server_collation(&self) -> Option<&tds_protocol::token::Collation> {
1209        self.server_collation.as_ref()
1210    }
1211
1212    /// Shared implementation behind `query_stream` for both `Ready` and
1213    /// `InTransaction`. Sends the request, then pulls packets until the first
1214    /// result set's `ColMetaData` (resolving columns and any Always Encrypted
1215    /// decryptor up front) before handing back a [`RowStream`].
1216    pub(crate) async fn query_stream_inner<'a>(
1217        &'a mut self,
1218        sql: &str,
1219        params: &[&(dyn crate::ToSql + Sync)],
1220    ) -> Result<crate::row_stream::RowStream<'a, S>> {
1221        use crate::client::response::server_token_to_error;
1222        use crate::row_source::{Pull, RowSource};
1223        use tds_protocol::token::Token;
1224
1225        tracing::debug!(sql = sql, params_count = params.len(), "streaming query");
1226
1227        // Send the request (same wire format as the buffered path).
1228        if params.is_empty() {
1229            self.send_sql_batch(sql).await?;
1230        } else {
1231            let rpc = self.build_parameterized_rpc(sql, params).await?;
1232            self.send_rpc(&rpc).await?;
1233        }
1234        self.in_flight = true;
1235
1236        #[cfg(feature = "always-encrypted")]
1237        let encryption_enabled = self.encryption_context.is_some();
1238        #[cfg(not(feature = "always-encrypted"))]
1239        let encryption_enabled = false;
1240
1241        let mut source = RowSource::new(encryption_enabled);
1242
1243        // Prelude: pull packets until the first result set's ColMetaData (so the
1244        // columns and any Always Encrypted decryptor are resolved up front), or
1245        // until a terminal Done/Error if there is no result set.
1246        loop {
1247            match source.pull()? {
1248                Pull::Token(Token::ColMetaData(meta)) => {
1249                    let columns = Self::build_columns(&meta);
1250                    #[cfg(feature = "always-encrypted")]
1251                    let decryptor = self
1252                        .resolve_decryptor(&meta)
1253                        .await?
1254                        .map(std::sync::Arc::new);
1255                    return Ok(crate::row_stream::RowStream::new(
1256                        self,
1257                        source,
1258                        columns,
1259                        meta,
1260                        #[cfg(feature = "always-encrypted")]
1261                        decryptor,
1262                    ));
1263                }
1264                Pull::Token(Token::Error(err)) => {
1265                    self.in_flight = false;
1266                    return Err(server_token_to_error(&err));
1267                }
1268                Pull::Token(Token::Done(done)) => {
1269                    if done.status.error {
1270                        self.in_flight = false;
1271                        return Err(Error::Query(
1272                            "query failed (server set error flag in DONE token)".to_string(),
1273                        ));
1274                    }
1275                    if !done.status.more {
1276                        // No result set (e.g. an INSERT) — an empty stream.
1277                        self.in_flight = false;
1278                        return Ok(crate::row_stream::RowStream::empty(self));
1279                    }
1280                    // More results may follow; keep looking for ColMetaData.
1281                }
1282                Pull::Token(Token::EnvChange(env)) => {
1283                    Self::process_transaction_env_change(&env, &mut self.transaction_descriptor);
1284                }
1285                Pull::Token(_) => {
1286                    // Info / Order / DoneProc / DoneInProc, etc. — keep pulling.
1287                }
1288                Pull::NeedMore => match self.read_response_packet().await? {
1289                    Some((payload, is_eom)) => source.push_packet(payload, is_eom),
1290                    None => {
1291                        self.in_flight = false;
1292                        return Err(Error::ConnectionClosed);
1293                    }
1294                },
1295                Pull::End => {
1296                    self.in_flight = false;
1297                    return Ok(crate::row_stream::RowStream::empty(self));
1298                }
1299            }
1300        }
1301    }
1302
1303    /// Shared implementation behind `query_stream_blob` for both `Ready` and
1304    /// `InTransaction`.
1305    pub(crate) async fn query_stream_blob_inner<'a>(
1306        &'a mut self,
1307        sql: &str,
1308        params: &[&(dyn crate::ToSql + Sync)],
1309    ) -> Result<crate::blob_stream::BlobStream<'a, S>> {
1310        let (meta, buf, eom, encryption_enabled) = self.open_blob_stream(sql, params).await?;
1311        let first_blob = Self::validate_blob_result_set(&meta)?;
1312        Ok(crate::blob_stream::BlobStream::new(
1313            self,
1314            buf,
1315            eom,
1316            encryption_enabled,
1317            meta,
1318            first_blob,
1319            // Single trailing MAX column; auto-position it so the existing
1320            // `next` → `copy_blob_to` flow works without an explicit `next_blob`.
1321            1,
1322            true,
1323        ))
1324    }
1325
1326    /// Shared implementation behind `query_stream_rows` for both `Ready` and
1327    /// `InTransaction`.
1328    pub(crate) async fn query_stream_rows_inner<'a>(
1329        &'a mut self,
1330        sql: &str,
1331        params: &[&(dyn crate::ToSql + Sync)],
1332    ) -> Result<crate::blob_stream::BlobStream<'a, S>> {
1333        let (meta, buf, eom, encryption_enabled) = self.open_blob_stream(sql, params).await?;
1334        let (first_blob, blob_count) = Self::validate_blob_rows_result_set(&meta)?;
1335        Ok(crate::blob_stream::BlobStream::new(
1336            self,
1337            buf,
1338            eom,
1339            encryption_enabled,
1340            meta,
1341            first_blob,
1342            blob_count,
1343            // Caller drives blobs explicitly via `next_blob`.
1344            false,
1345        ))
1346    }
1347
1348    /// Send the query and pull tokens until the first `ColMetaData`, returning
1349    /// the result-set metadata plus the unconsumed post-metadata wire bytes.
1350    /// Shared by the single-blob and multi-blob streaming paths.
1351    async fn open_blob_stream(
1352        &mut self,
1353        sql: &str,
1354        params: &[&(dyn crate::ToSql + Sync)],
1355    ) -> Result<(tds_protocol::token::ColMetaData, bytes::Bytes, bool, bool)> {
1356        use crate::client::response::server_token_to_error;
1357        use crate::row_source::{Pull, RowSource};
1358        use tds_protocol::token::Token;
1359
1360        if params.is_empty() {
1361            self.send_sql_batch(sql).await?;
1362        } else {
1363            let rpc = self.build_parameterized_rpc(sql, params).await?;
1364            self.send_rpc(&rpc).await?;
1365        }
1366        self.in_flight = true;
1367
1368        #[cfg(feature = "always-encrypted")]
1369        let encryption_enabled = self.encryption_context.is_some();
1370        #[cfg(not(feature = "always-encrypted"))]
1371        let encryption_enabled = false;
1372
1373        let mut source = RowSource::new(encryption_enabled);
1374
1375        loop {
1376            match source.pull()? {
1377                Pull::Token(Token::ColMetaData(meta)) => {
1378                    let (buf, eom) = source.into_parts();
1379                    return Ok((meta, buf, eom, encryption_enabled));
1380                }
1381                Pull::Token(Token::Error(err)) => {
1382                    self.in_flight = false;
1383                    return Err(server_token_to_error(&err));
1384                }
1385                Pull::Token(Token::Done(_)) => {
1386                    self.in_flight = false;
1387                    return Err(Error::Protocol(
1388                        "blob streaming: query produced no result set".to_string(),
1389                    ));
1390                }
1391                Pull::Token(_) => {}
1392                Pull::NeedMore => match self.read_response_packet().await? {
1393                    Some((payload, is_eom)) => source.push_packet(payload, is_eom),
1394                    None => {
1395                        self.in_flight = false;
1396                        return Err(Error::ConnectionClosed);
1397                    }
1398                },
1399                Pull::End => {
1400                    self.in_flight = false;
1401                    return Err(Error::Protocol(
1402                        "blob streaming: query produced no result set".to_string(),
1403                    ));
1404                }
1405            }
1406        }
1407    }
1408
1409    /// Validate that a result set is shaped for [`query_stream_blob`] and return
1410    /// the index of its single trailing MAX column.
1411    fn validate_blob_result_set(meta: &tds_protocol::token::ColMetaData) -> Result<usize> {
1412        if meta.cek_table.is_some() {
1413            return Err(Error::Protocol(
1414                "query_stream_blob does not support Always Encrypted result sets".to_string(),
1415            ));
1416        }
1417        let max_cols: Vec<usize> = meta
1418            .columns
1419            .iter()
1420            .enumerate()
1421            .filter(|(_, c)| crate::blob_stream::is_plp_max(c))
1422            .map(|(i, _)| i)
1423            .collect();
1424        match max_cols.as_slice() {
1425            [] => Err(Error::Protocol(
1426                "query_stream_blob: result set has no MAX column — use query_stream".to_string(),
1427            )),
1428            [idx] if *idx == meta.columns.len() - 1 => Ok(*idx),
1429            [_] => Err(Error::Protocol(
1430                "query_stream_blob: the MAX column must be the last column".to_string(),
1431            )),
1432            _ => Err(Error::Protocol(
1433                "query_stream_blob: result set has more than one MAX column".to_string(),
1434            )),
1435        }
1436    }
1437
1438    /// Validate that a result set is shaped for [`query_stream_rows`] and return
1439    /// `(first_blob_index, blob_count)` — the start and length of the trailing
1440    /// run of MAX columns.
1441    ///
1442    /// Requires at least one MAX column and that every MAX column be trailing
1443    /// (no scalar column may follow a MAX column). The interleaved case (a
1444    /// scalar column after a MAX column) is rejected — supporting it needs a
1445    /// resumable per-column decoder (tracked in #258).
1446    fn validate_blob_rows_result_set(
1447        meta: &tds_protocol::token::ColMetaData,
1448    ) -> Result<(usize, usize)> {
1449        if meta.cek_table.is_some() {
1450            return Err(Error::Protocol(
1451                "query_stream_rows does not support Always Encrypted result sets".to_string(),
1452            ));
1453        }
1454        let first_blob = meta
1455            .columns
1456            .iter()
1457            .position(crate::blob_stream::is_plp_max)
1458            .ok_or_else(|| {
1459                Error::Protocol(
1460                    "query_stream_rows: result set has no MAX column — use query_stream"
1461                        .to_string(),
1462                )
1463            })?;
1464        // Every column from the first MAX column onward must itself be a MAX
1465        // column; a scalar column after a blob cannot be decoded until the blob
1466        // is consumed.
1467        if !meta.columns[first_blob..]
1468            .iter()
1469            .all(crate::blob_stream::is_plp_max)
1470        {
1471            return Err(Error::Protocol(
1472                "query_stream_rows: a non-MAX column follows a MAX column; interleaved MAX \
1473                 columns are not supported (the MAX columns must be trailing)"
1474                    .to_string(),
1475            ));
1476        }
1477        Ok((first_blob, meta.columns.len() - first_blob))
1478    }
1479}
1480
1481impl Client<Ready> {
1482    /// Mark this connection as needing a reset on next use.
1483    ///
1484    /// Called by the connection pool when a connection is returned.
1485    /// The next SQL batch or RPC will include the RESETCONNECTION flag
1486    /// in the TDS packet header, causing SQL Server to reset connection
1487    /// state (temp tables, SET options, transaction isolation level, etc.)
1488    /// before executing the command.
1489    ///
1490    /// This is more efficient than calling `sp_reset_connection` as a
1491    /// separate command because it's handled at the TDS protocol level.
1492    pub fn mark_needs_reset(&mut self) {
1493        self.needs_reset = true;
1494    }
1495
1496    /// Check if this connection needs a reset.
1497    ///
1498    /// Returns true if `mark_needs_reset()` was called and the reset
1499    /// hasn't been performed yet.
1500    #[must_use]
1501    pub fn needs_reset(&self) -> bool {
1502        self.needs_reset
1503    }
1504
1505    /// Execute a query and return a result set with lazy per-row decoding.
1506    ///
1507    /// Per ADR-007 the full response is buffered in memory and each row is
1508    /// *decoded* on demand as you iterate — this is not incremental network
1509    /// streaming, so peak memory tracks the response size. Use
1510    /// `.collect_all()` if you want all rows materialized into a `Vec` up
1511    /// front.
1512    ///
1513    /// # Example
1514    ///
1515    /// ```rust,no_run
1516    /// # use mssql_client::Row;
1517    /// # fn process(_: &Row) {}
1518    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1519    /// // Streaming (synchronous iteration over the result set)
1520    /// let stream = client.query("SELECT * FROM users WHERE id = @p1", &[&1]).await?;
1521    /// for row in stream {
1522    ///     let row = row?;
1523    ///     process(&row);
1524    /// }
1525    ///
1526    /// // Buffered (loads all into memory)
1527    /// let rows: Vec<Row> = client
1528    ///     .query("SELECT * FROM small_table", &[])
1529    ///     .await?
1530    ///     .collect_all()
1531    ///     .await?;
1532    /// # let _ = rows;
1533    /// # Ok(())
1534    /// # }
1535    /// ```
1536    pub async fn query<'a>(
1537        &'a mut self,
1538        sql: &str,
1539        params: &[&(dyn crate::ToSql + Sync)],
1540    ) -> Result<QueryStream<'a>> {
1541        let deadline = self.command_deadline();
1542        self.query_inner(sql, params, deadline).await
1543    }
1544
1545    /// Shared query implementation with an explicit command deadline.
1546    async fn query_inner<'a>(
1547        &'a mut self,
1548        sql: &str,
1549        params: &[&(dyn crate::ToSql + Sync)],
1550        deadline: Option<std::time::Duration>,
1551    ) -> Result<QueryStream<'a>> {
1552        tracing::debug!(sql = sql, params_count = params.len(), "executing query");
1553
1554        #[cfg(feature = "otel")]
1555        let instrumentation = self.instrumentation.clone();
1556        #[cfg(feature = "otel")]
1557        let mut span = instrumentation.query_span(sql);
1558        #[cfg(feature = "otel")]
1559        let timer = crate::instrumentation::OperationTimer::start(
1560            crate::instrumentation::extract_operation(sql),
1561        );
1562
1563        let canceller = self.cancel_handle();
1564        let result = run_with_deadline(
1565            async {
1566                // Sends via the prepared-statement cache when enabled, else the
1567                // SQL batch / sp_executesql default.
1568                self.send_query_request(sql, params).await?;
1569
1570                // Read complete response including columns and rows
1571                self.read_query_response().await
1572            },
1573            deadline,
1574            canceller,
1575        )
1576        .await;
1577
1578        #[cfg(feature = "otel")]
1579        match &result {
1580            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1581            Err(e) => InstrumentationContext::record_error(&mut span, e),
1582        }
1583        #[cfg(feature = "otel")]
1584        timer.finish(instrumentation.metrics(), result.is_ok());
1585
1586        // Drop the span before returning
1587        #[cfg(feature = "otel")]
1588        drop(span);
1589
1590        let resp = result?;
1591        #[cfg(feature = "always-encrypted")]
1592        {
1593            Ok(QueryStream::from_raw(
1594                resp.columns,
1595                resp.pending_rows,
1596                resp.meta,
1597                resp.decryptor,
1598            ))
1599        }
1600        #[cfg(not(feature = "always-encrypted"))]
1601        {
1602            Ok(QueryStream::from_raw(
1603                resp.columns,
1604                resp.pending_rows,
1605                resp.meta,
1606            ))
1607        }
1608    }
1609
1610    /// Execute a query and stream rows incrementally from the network.
1611    ///
1612    /// Unlike [`query`](Self::query) — which buffers the whole response in
1613    /// memory before returning — this reads TDS packets on demand as rows are
1614    /// pulled, so peak memory is roughly one packet plus one row regardless of
1615    /// result-set size. Use it for large result sets; use [`query`](Self::query)
1616    /// for the common small-result case where the buffered, synchronously
1617    /// iterable [`QueryStream`] is more convenient.
1618    ///
1619    /// The returned [`RowStream`](crate::RowStream) borrows the client for its
1620    /// lifetime, so no other request can run on this connection until the stream
1621    /// is consumed or dropped. Also available on `Client<InTransaction>` to
1622    /// stream within a transaction.
1623    ///
1624    /// # Example
1625    ///
1626    /// ```rust,no_run
1627    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1628    /// let mut stream = client.query_stream("SELECT id FROM big_table", &[]).await?;
1629    /// while let Some(row) = stream.try_next().await? {
1630    ///     let id: i32 = row.get_by_name("id")?;
1631    ///     let _ = id;
1632    /// }
1633    /// # Ok(())
1634    /// # }
1635    /// ```
1636    pub async fn query_stream<'a>(
1637        &'a mut self,
1638        sql: &str,
1639        params: &[&(dyn crate::ToSql + Sync)],
1640    ) -> Result<crate::row_stream::RowStream<'a, Ready>> {
1641        self.query_stream_inner(sql, params).await
1642    }
1643
1644    /// Execute a query and stream a row's trailing MAX column from the network.
1645    ///
1646    /// For result sets whose last column is a single MAX type
1647    /// (`VARBINARY(MAX)`, `NVARCHAR(MAX)`, `VARCHAR(MAX)`, `XML`), this reads
1648    /// that column's bytes incrementally from the socket instead of
1649    /// materializing the cell — so a multi-GB BLOB can be streamed to a sink in
1650    /// bounded memory. The leading (scalar) columns are decoded eagerly into the
1651    /// per-row [`Row`](crate::Row).
1652    ///
1653    /// The MAX column must be the **last** column. The returned
1654    /// [`BlobStream`](crate::BlobStream) yields scalar [`Row`](crate::Row)s via
1655    /// [`next`](crate::BlobStream::next); read each row's blob with
1656    /// [`read_chunk`](crate::BlobStream::read_chunk) /
1657    /// [`copy_blob_to`](crate::BlobStream::copy_blob_to) before advancing. Also
1658    /// available on `Client<InTransaction>`.
1659    ///
1660    /// # Errors
1661    ///
1662    /// Returns an error if the result set has no trailing MAX column, has more
1663    /// than one MAX column, the MAX column is not last, or the result set uses
1664    /// Always Encrypted (not yet supported on this path).
1665    pub async fn query_stream_blob<'a>(
1666        &'a mut self,
1667        sql: &str,
1668        params: &[&(dyn crate::ToSql + Sync)],
1669    ) -> Result<crate::blob_stream::BlobStream<'a, Ready>> {
1670        self.query_stream_blob_inner(sql, params).await
1671    }
1672
1673    /// Execute a query and stream a row's **trailing MAX columns** from the
1674    /// network — the multi-column generalization of
1675    /// [`query_stream_blob`](Self::query_stream_blob).
1676    ///
1677    /// For result sets whose trailing columns are one or more MAX types
1678    /// (`VARBINARY(MAX)`, `NVARCHAR(MAX)`, `VARCHAR(MAX)`, `XML`), this decodes
1679    /// the leading scalar columns eagerly into the per-row [`Row`](crate::Row)
1680    /// and streams each trailing MAX column's bytes incrementally from the
1681    /// socket, in bounded memory. The returned
1682    /// [`BlobStream`](crate::BlobStream) yields scalar rows via
1683    /// [`next`](crate::BlobStream::next); within each row, iterate the trailing
1684    /// MAX columns with [`next_blob`](crate::BlobStream::next_blob), reading each
1685    /// with [`copy_blob_to`](crate::BlobStream::copy_blob_to) /
1686    /// [`read_chunk`](crate::BlobStream::read_chunk). Also available on
1687    /// `Client<InTransaction>`.
1688    ///
1689    /// # Errors
1690    ///
1691    /// Returns an error if the result set has no trailing MAX column, has a
1692    /// non-MAX column after a MAX column (interleaved MAX columns are not
1693    /// supported — the MAX columns must be trailing), or uses Always Encrypted
1694    /// (not yet supported on this path).
1695    pub async fn query_stream_rows<'a>(
1696        &'a mut self,
1697        sql: &str,
1698        params: &[&(dyn crate::ToSql + Sync)],
1699    ) -> Result<crate::blob_stream::BlobStream<'a, Ready>> {
1700        self.query_stream_rows_inner(sql, params).await
1701    }
1702
1703    /// Execute a query with a specific timeout.
1704    ///
1705    /// This overrides the default `command_timeout` from the connection configuration
1706    /// for this specific query. If the query does not complete within the specified
1707    /// duration, the driver sends an Attention packet to cancel it server-side,
1708    /// drains the acknowledgement, and returns [`Error::CommandTimeout`] with the
1709    /// connection left usable for the next request.
1710    ///
1711    /// # Arguments
1712    ///
1713    /// * `sql` - The SQL query to execute
1714    /// * `params` - Query parameters
1715    /// * `timeout_duration` - Maximum time to wait for the query to complete
1716    ///
1717    /// # Example
1718    ///
1719    /// ```rust,no_run
1720    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1721    /// use std::time::Duration;
1722    ///
1723    /// // Execute with a 5-second timeout
1724    /// let rows = client
1725    ///     .query_with_timeout(
1726    ///         "SELECT * FROM large_table",
1727    ///         &[],
1728    ///         Duration::from_secs(5),
1729    ///     )
1730    ///     .await?;
1731    /// # let _ = rows;
1732    /// # Ok(())
1733    /// # }
1734    /// ```
1735    pub async fn query_with_timeout<'a>(
1736        &'a mut self,
1737        sql: &str,
1738        params: &[&(dyn crate::ToSql + Sync)],
1739        timeout_duration: std::time::Duration,
1740    ) -> Result<QueryStream<'a>> {
1741        self.query_inner(sql, params, Some(timeout_duration)).await
1742    }
1743
1744    /// Execute a batch that may return multiple result sets.
1745    ///
1746    /// This is useful for stored procedures or SQL batches that contain
1747    /// multiple SELECT statements.
1748    ///
1749    /// # Example
1750    ///
1751    /// ```rust,no_run
1752    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1753    /// // Execute a batch with multiple SELECT statements
1754    /// let mut results = client.query_multiple(
1755    ///     "SELECT 1 AS a; SELECT 2 AS b, 3 AS c;",
1756    ///     &[]
1757    /// ).await?;
1758    ///
1759    /// // Process first result set
1760    /// while let Some(row) = results.next_row().await? {
1761    ///     println!("Result 1: {:?}", row);
1762    /// }
1763    ///
1764    /// // Move to second result set
1765    /// if results.next_result().await? {
1766    ///     while let Some(row) = results.next_row().await? {
1767    ///         println!("Result 2: {:?}", row);
1768    ///     }
1769    /// }
1770    /// # Ok(())
1771    /// # }
1772    /// ```
1773    pub async fn query_multiple<'a>(
1774        &'a mut self,
1775        sql: &str,
1776        params: &[&(dyn crate::ToSql + Sync)],
1777    ) -> Result<MultiResultStream<'a>> {
1778        tracing::debug!(
1779            sql = sql,
1780            params_count = params.len(),
1781            "executing multi-result query"
1782        );
1783
1784        #[cfg(feature = "otel")]
1785        let instrumentation = self.instrumentation.clone();
1786        #[cfg(feature = "otel")]
1787        let mut span = instrumentation.query_span(sql);
1788        #[cfg(feature = "otel")]
1789        let timer = crate::instrumentation::OperationTimer::start(
1790            crate::instrumentation::extract_operation(sql),
1791        );
1792
1793        let deadline = self.command_deadline();
1794        let canceller = self.connection_cancel_handle();
1795        let result = run_with_deadline(
1796            async {
1797                if params.is_empty() {
1798                    // Simple batch without parameters - use SQL batch
1799                    self.send_sql_batch(sql).await?;
1800                } else {
1801                    // Parameterized query - sp_executesql (encrypts Always Encrypted params).
1802                    let rpc = self.build_parameterized_rpc(sql, params).await?;
1803                    self.send_rpc(&rpc).await?;
1804                }
1805
1806                // Read all result sets
1807                self.read_multi_result_response().await
1808            },
1809            deadline,
1810            canceller,
1811        )
1812        .await;
1813
1814        #[cfg(feature = "otel")]
1815        match &result {
1816            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1817            Err(e) => InstrumentationContext::record_error(&mut span, e),
1818        }
1819        #[cfg(feature = "otel")]
1820        timer.finish(instrumentation.metrics(), result.is_ok());
1821        #[cfg(feature = "otel")]
1822        drop(span);
1823
1824        let result_sets = result?;
1825        Ok(MultiResultStream::new(result_sets))
1826    }
1827
1828    /// Execute a query that doesn't return rows.
1829    ///
1830    /// Returns the number of affected rows.
1831    pub async fn execute(
1832        &mut self,
1833        sql: &str,
1834        params: &[&(dyn crate::ToSql + Sync)],
1835    ) -> Result<u64> {
1836        let deadline = self.command_deadline();
1837        self.execute_inner(sql, params, deadline).await
1838    }
1839
1840    /// Shared execute implementation with an explicit command deadline.
1841    async fn execute_inner(
1842        &mut self,
1843        sql: &str,
1844        params: &[&(dyn crate::ToSql + Sync)],
1845        deadline: Option<std::time::Duration>,
1846    ) -> Result<u64> {
1847        tracing::debug!(
1848            sql = sql,
1849            params_count = params.len(),
1850            "executing statement"
1851        );
1852
1853        #[cfg(feature = "otel")]
1854        let instrumentation = self.instrumentation.clone();
1855        #[cfg(feature = "otel")]
1856        let mut span = instrumentation.query_span(sql);
1857        #[cfg(feature = "otel")]
1858        let timer = crate::instrumentation::OperationTimer::start(
1859            crate::instrumentation::extract_operation(sql),
1860        );
1861
1862        let canceller = self.cancel_handle();
1863        let result = run_with_deadline(
1864            async {
1865                if params.is_empty() {
1866                    // Simple statement without parameters - use SQL batch
1867                    self.send_sql_batch(sql).await?;
1868                } else {
1869                    // Parameterized statement - sp_executesql (encrypts Always Encrypted params).
1870                    let rpc = self.build_parameterized_rpc(sql, params).await?;
1871                    self.send_rpc(&rpc).await?;
1872                }
1873
1874                // Read response and get row count
1875                self.read_execute_result().await
1876            },
1877            deadline,
1878            canceller,
1879        )
1880        .await;
1881
1882        #[cfg(feature = "otel")]
1883        match &result {
1884            Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
1885            Err(e) => InstrumentationContext::record_error(&mut span, e),
1886        }
1887        #[cfg(feature = "otel")]
1888        timer.finish(instrumentation.metrics(), result.is_ok());
1889
1890        // Drop the span before returning
1891        #[cfg(feature = "otel")]
1892        drop(span);
1893
1894        result
1895    }
1896
1897    /// Execute a statement with a specific timeout.
1898    ///
1899    /// This overrides the default `command_timeout` from the connection configuration
1900    /// for this specific statement. If the statement does not complete within the
1901    /// specified duration, the driver sends an Attention packet to cancel it
1902    /// server-side, drains the acknowledgement, and returns
1903    /// [`Error::CommandTimeout`] with the connection left usable.
1904    ///
1905    /// # Arguments
1906    ///
1907    /// * `sql` - The SQL statement to execute
1908    /// * `params` - Statement parameters
1909    /// * `timeout_duration` - Maximum time to wait for the statement to complete
1910    ///
1911    /// # Example
1912    ///
1913    /// ```rust,no_run
1914    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1915    /// use std::time::Duration;
1916    ///
1917    /// // Execute with a 10-second timeout
1918    /// let rows_affected = client
1919    ///     .execute_with_timeout(
1920    ///         "UPDATE large_table SET status = @p1",
1921    ///         &[&"processed"],
1922    ///         Duration::from_secs(10),
1923    ///     )
1924    ///     .await?;
1925    /// # let _ = rows_affected;
1926    /// # Ok(())
1927    /// # }
1928    /// ```
1929    pub async fn execute_with_timeout(
1930        &mut self,
1931        sql: &str,
1932        params: &[&(dyn crate::ToSql + Sync)],
1933        timeout_duration: std::time::Duration,
1934    ) -> Result<u64> {
1935        self.execute_inner(sql, params, Some(timeout_duration))
1936            .await
1937    }
1938
1939    /// Begin a transaction.
1940    ///
1941    /// This transitions the client from `Ready` to `InTransaction` state.
1942    /// Per MS-TDS spec, the server returns a transaction descriptor in the
1943    /// BeginTransaction EnvChange token that must be included in subsequent
1944    /// ALL_HEADERS sections.
1945    pub async fn begin_transaction(mut self) -> Result<Client<InTransaction>> {
1946        tracing::debug!("beginning transaction");
1947
1948        #[cfg(feature = "otel")]
1949        let instrumentation = self.instrumentation.clone();
1950        #[cfg(feature = "otel")]
1951        let mut span = instrumentation.transaction_span("BEGIN");
1952
1953        // Execute BEGIN TRANSACTION and extract the transaction descriptor
1954        let result = async {
1955            self.send_sql_batch("BEGIN TRANSACTION").await?;
1956            self.read_transaction_begin_result().await
1957        }
1958        .await;
1959
1960        #[cfg(feature = "otel")]
1961        match &result {
1962            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1963            Err(e) => InstrumentationContext::record_error(&mut span, e),
1964        }
1965
1966        // Drop the span before moving instrumentation
1967        #[cfg(feature = "otel")]
1968        drop(span);
1969
1970        let transaction_descriptor = result?;
1971
1972        Ok(Client {
1973            config: self.config,
1974            _state: PhantomData,
1975            connection: self.connection,
1976            server_version: self.server_version,
1977            current_database: self.current_database,
1978            server_collation: self.server_collation,
1979            statement_cache: self.statement_cache,
1980            transaction_descriptor, // Store the descriptor from server
1981            needs_reset: self.needs_reset,
1982            in_flight: self.in_flight,
1983            #[cfg(feature = "otel")]
1984            instrumentation: self.instrumentation,
1985            #[cfg(feature = "always-encrypted")]
1986            encryption_context: self.encryption_context,
1987        })
1988    }
1989
1990    /// Begin a transaction with a specific isolation level.
1991    ///
1992    /// This transitions the client from `Ready` to `InTransaction` state
1993    /// with the specified isolation level.
1994    ///
1995    /// # Example
1996    ///
1997    /// ```rust,no_run
1998    /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1999    /// use mssql_client::IsolationLevel;
2000    ///
2001    /// let tx = client.begin_transaction_with_isolation(IsolationLevel::Serializable).await?;
2002    /// // All operations in this transaction use SERIALIZABLE isolation
2003    /// tx.commit().await?;
2004    /// # Ok(())
2005    /// # }
2006    /// ```
2007    pub async fn begin_transaction_with_isolation(
2008        mut self,
2009        isolation_level: crate::transaction::IsolationLevel,
2010    ) -> Result<Client<InTransaction>> {
2011        tracing::debug!(
2012            isolation_level = %isolation_level.name(),
2013            "beginning transaction with isolation level"
2014        );
2015
2016        #[cfg(feature = "otel")]
2017        let instrumentation = self.instrumentation.clone();
2018        #[cfg(feature = "otel")]
2019        let mut span = instrumentation.transaction_span("BEGIN");
2020
2021        // First set the isolation level
2022        let result = async {
2023            self.send_sql_batch(isolation_level.as_sql()).await?;
2024            self.read_execute_result().await?;
2025
2026            // Then begin the transaction
2027            self.send_sql_batch("BEGIN TRANSACTION").await?;
2028            self.read_transaction_begin_result().await
2029        }
2030        .await;
2031
2032        #[cfg(feature = "otel")]
2033        match &result {
2034            Ok(_) => InstrumentationContext::record_success(&mut span, None),
2035            Err(e) => InstrumentationContext::record_error(&mut span, e),
2036        }
2037
2038        #[cfg(feature = "otel")]
2039        drop(span);
2040
2041        let transaction_descriptor = result?;
2042
2043        Ok(Client {
2044            config: self.config,
2045            _state: PhantomData,
2046            connection: self.connection,
2047            server_version: self.server_version,
2048            current_database: self.current_database,
2049            server_collation: self.server_collation,
2050            statement_cache: self.statement_cache,
2051            transaction_descriptor,
2052            needs_reset: self.needs_reset,
2053            in_flight: self.in_flight,
2054            #[cfg(feature = "otel")]
2055            instrumentation: self.instrumentation,
2056            #[cfg(feature = "always-encrypted")]
2057            encryption_context: self.encryption_context,
2058        })
2059    }
2060
2061    /// Execute a simple query without parameters.
2062    ///
2063    /// This is useful for DDL statements and simple queries where you
2064    /// don't need to retrieve the affected row count.
2065    pub async fn simple_query(&mut self, sql: &str) -> Result<()> {
2066        tracing::debug!(sql = sql, "executing simple query");
2067
2068        // Send SQL batch
2069        self.send_sql_batch(sql).await?;
2070
2071        // Read and discard response
2072        let _ = self.read_execute_result().await?;
2073
2074        Ok(())
2075    }
2076
2077    /// Close the connection gracefully.
2078    pub async fn close(self) -> Result<()> {
2079        tracing::debug!("closing connection");
2080        Ok(())
2081    }
2082
2083    /// Get the current database name.
2084    #[must_use]
2085    pub fn database(&self) -> Option<&str> {
2086        self.config.database.as_deref()
2087    }
2088
2089    /// Get the server host.
2090    #[must_use]
2091    pub fn host(&self) -> &str {
2092        &self.config.host
2093    }
2094
2095    /// Get the server port.
2096    #[must_use]
2097    pub fn port(&self) -> u16 {
2098        self.config.port
2099    }
2100
2101    /// Check if the connection is currently in a transaction.
2102    ///
2103    /// This returns `true` if a transaction was started via raw SQL
2104    /// (`BEGIN TRANSACTION`) and has not yet been committed or rolled back.
2105    ///
2106    /// Note: This only tracks transactions started via raw SQL. Transactions
2107    /// started via the type-state API (`begin_transaction()`) result in a
2108    /// `Client<InTransaction>` which is a different type.
2109    ///
2110    /// # Example
2111    ///
2112    /// ```rust,no_run
2113    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
2114    /// client.execute("BEGIN TRANSACTION", &[]).await?;
2115    /// assert!(client.is_in_transaction());
2116    ///
2117    /// client.execute("COMMIT", &[]).await?;
2118    /// assert!(!client.is_in_transaction());
2119    /// # Ok(())
2120    /// # }
2121    /// ```
2122    #[must_use]
2123    pub fn is_in_transaction(&self) -> bool {
2124        self.transaction_descriptor != 0
2125    }
2126
2127    /// Check if a request is in-flight (sent but response not fully read).
2128    ///
2129    /// Used by the connection pool to detect dirty connections that were
2130    /// interrupted mid-query (e.g., by `tokio::select!` or a timeout).
2131    /// A connection with an in-flight request has unread data in the TCP
2132    /// buffer and must be discarded rather than returned to the pool.
2133    #[must_use]
2134    pub fn is_in_flight(&self) -> bool {
2135        self.in_flight
2136    }
2137
2138    /// Report whether an Always Encrypted key-store provider with the given
2139    /// name is currently reachable through this client's encryption context.
2140    ///
2141    /// Returns `false` when the `always-encrypted` feature isn't enabled, when
2142    /// the connection was opened without `column_encryption` configured, or
2143    /// when no matching provider was registered.
2144    #[cfg(feature = "always-encrypted")]
2145    #[must_use]
2146    pub fn has_encryption_provider(&self, name: &str) -> bool {
2147        self.encryption_context
2148            .as_ref()
2149            .is_some_and(|ctx| ctx.has_provider(name))
2150    }
2151
2152    /// Get a handle for cancelling the current query.
2153    ///
2154    /// The cancel handle can be cloned and sent to other tasks, enabling
2155    /// cancellation of long-running queries from a separate async context.
2156    ///
2157    /// # Example
2158    ///
2159    /// ```rust,no_run
2160    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
2161    /// use std::time::Duration;
2162    ///
2163    /// let cancel_handle = client.cancel_handle();
2164    ///
2165    /// // Spawn a task to cancel after 10 seconds
2166    /// let handle = tokio::spawn(async move {
2167    ///     tokio::time::sleep(Duration::from_secs(10)).await;
2168    ///     let _ = cancel_handle.cancel().await;
2169    /// });
2170    ///
2171    /// // This query will be cancelled if it runs longer than 10 seconds
2172    /// let result = client.query("SELECT * FROM very_large_table", &[]).await;
2173    /// # let _ = (handle, result);
2174    /// # Ok(())
2175    /// # }
2176    /// ```
2177    #[must_use]
2178    pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
2179        self.connection_cancel_handle()
2180    }
2181}
2182
2183/// # Drop Behavior
2184///
2185/// **`Client<InTransaction>` has no automatic rollback on drop.** If the client is
2186/// dropped without calling [`commit()`](Client::commit) or [`rollback()`](Client::rollback),
2187/// the transaction remains open on the server until the TCP connection closes
2188/// (at which point SQL Server automatically rolls back).
2189///
2190/// This is because `Drop` is synchronous and cannot perform the async I/O needed
2191/// to send a `ROLLBACK TRANSACTION` command.
2192///
2193/// ## Consequences of dropping without commit/rollback
2194///
2195/// - **Direct connections:** The transaction leaks until the OS TCP timeout
2196///   (potentially 30+ minutes), holding locks on any modified rows.
2197/// - **Pooled connections:** The pool detects the active transaction descriptor
2198///   and discards the connection rather than returning it to the idle pool
2199///   (see `PooledConnection::drop` in `mssql-driver-pool`).
2200///
2201/// ## Best practice
2202///
2203/// Always ensure `commit()` or `rollback()` is called. Use helper patterns
2204/// for error paths:
2205///
2206/// ```rust,no_run
2207/// # async fn do_work(_: &mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> { Ok(()) }
2208/// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
2209/// let tx = client.begin_transaction().await?;
2210/// match do_work(&tx).await {
2211///     Ok(_) => { tx.commit().await?; }
2212///     Err(e) => { tx.rollback().await?; return Err(e); }
2213/// }
2214/// # Ok(())
2215/// # }
2216/// ```
2217impl Client<InTransaction> {
2218    /// Execute a query within the transaction and return a streaming result set.
2219    ///
2220    /// See [`Client<Ready>::query`] for usage examples.
2221    pub async fn query<'a>(
2222        &'a mut self,
2223        sql: &str,
2224        params: &[&(dyn crate::ToSql + Sync)],
2225    ) -> Result<QueryStream<'a>> {
2226        let deadline = self.command_deadline();
2227        self.query_inner(sql, params, deadline).await
2228    }
2229
2230    /// Shared query implementation with an explicit command deadline.
2231    async fn query_inner<'a>(
2232        &'a mut self,
2233        sql: &str,
2234        params: &[&(dyn crate::ToSql + Sync)],
2235        deadline: Option<std::time::Duration>,
2236    ) -> Result<QueryStream<'a>> {
2237        tracing::debug!(
2238            sql = sql,
2239            params_count = params.len(),
2240            "executing query in transaction"
2241        );
2242
2243        #[cfg(feature = "otel")]
2244        let instrumentation = self.instrumentation.clone();
2245        #[cfg(feature = "otel")]
2246        let mut span = instrumentation.query_span(sql);
2247        #[cfg(feature = "otel")]
2248        let timer = crate::instrumentation::OperationTimer::start(
2249            crate::instrumentation::extract_operation(sql),
2250        );
2251
2252        let canceller = self.cancel_handle();
2253        let result = run_with_deadline(
2254            async {
2255                // Sends via the prepared-statement cache when enabled, else the
2256                // SQL batch / sp_executesql default.
2257                self.send_query_request(sql, params).await?;
2258
2259                // Read complete response including columns and rows
2260                self.read_query_response().await
2261            },
2262            deadline,
2263            canceller,
2264        )
2265        .await;
2266
2267        #[cfg(feature = "otel")]
2268        match &result {
2269            Ok(_) => InstrumentationContext::record_success(&mut span, None),
2270            Err(e) => InstrumentationContext::record_error(&mut span, e),
2271        }
2272        #[cfg(feature = "otel")]
2273        timer.finish(instrumentation.metrics(), result.is_ok());
2274
2275        // Drop the span before returning
2276        #[cfg(feature = "otel")]
2277        drop(span);
2278
2279        let resp = result?;
2280        #[cfg(feature = "always-encrypted")]
2281        {
2282            Ok(QueryStream::from_raw(
2283                resp.columns,
2284                resp.pending_rows,
2285                resp.meta,
2286                resp.decryptor,
2287            ))
2288        }
2289        #[cfg(not(feature = "always-encrypted"))]
2290        {
2291            Ok(QueryStream::from_raw(
2292                resp.columns,
2293                resp.pending_rows,
2294                resp.meta,
2295            ))
2296        }
2297    }
2298
2299    /// Stream rows incrementally from the network within the transaction.
2300    ///
2301    /// Identical to [`Client<Ready>::query_stream`] except the query runs inside
2302    /// the open transaction. The returned [`RowStream`](crate::RowStream)
2303    /// borrows the transaction client for its lifetime, so the stream must be
2304    /// consumed or dropped before the transaction can be committed or rolled
2305    /// back.
2306    pub async fn query_stream<'a>(
2307        &'a mut self,
2308        sql: &str,
2309        params: &[&(dyn crate::ToSql + Sync)],
2310    ) -> Result<crate::row_stream::RowStream<'a, InTransaction>> {
2311        self.query_stream_inner(sql, params).await
2312    }
2313
2314    /// Stream a row's trailing MAX column from the network within the
2315    /// transaction.
2316    ///
2317    /// See [`Client<Ready>::query_stream_blob`] for semantics and constraints;
2318    /// the only difference is that the query runs inside the open transaction.
2319    pub async fn query_stream_blob<'a>(
2320        &'a mut self,
2321        sql: &str,
2322        params: &[&(dyn crate::ToSql + Sync)],
2323    ) -> Result<crate::blob_stream::BlobStream<'a, InTransaction>> {
2324        self.query_stream_blob_inner(sql, params).await
2325    }
2326
2327    /// Stream a row's trailing MAX columns from the network within the
2328    /// transaction.
2329    ///
2330    /// See [`Client<Ready>::query_stream_rows`] for semantics and constraints;
2331    /// the only difference is that the query runs inside the open transaction.
2332    pub async fn query_stream_rows<'a>(
2333        &'a mut self,
2334        sql: &str,
2335        params: &[&(dyn crate::ToSql + Sync)],
2336    ) -> Result<crate::blob_stream::BlobStream<'a, InTransaction>> {
2337        self.query_stream_rows_inner(sql, params).await
2338    }
2339
2340    /// Execute a statement within the transaction.
2341    ///
2342    /// Returns the number of affected rows.
2343    pub async fn execute(
2344        &mut self,
2345        sql: &str,
2346        params: &[&(dyn crate::ToSql + Sync)],
2347    ) -> Result<u64> {
2348        let deadline = self.command_deadline();
2349        self.execute_inner(sql, params, deadline).await
2350    }
2351
2352    /// Shared execute implementation with an explicit command deadline.
2353    async fn execute_inner(
2354        &mut self,
2355        sql: &str,
2356        params: &[&(dyn crate::ToSql + Sync)],
2357        deadline: Option<std::time::Duration>,
2358    ) -> Result<u64> {
2359        tracing::debug!(
2360            sql = sql,
2361            params_count = params.len(),
2362            "executing statement in transaction"
2363        );
2364
2365        #[cfg(feature = "otel")]
2366        let instrumentation = self.instrumentation.clone();
2367        #[cfg(feature = "otel")]
2368        let mut span = instrumentation.query_span(sql);
2369        #[cfg(feature = "otel")]
2370        let timer = crate::instrumentation::OperationTimer::start(
2371            crate::instrumentation::extract_operation(sql),
2372        );
2373
2374        let canceller = self.cancel_handle();
2375        let result = run_with_deadline(
2376            async {
2377                if params.is_empty() {
2378                    // Simple statement without parameters - use SQL batch
2379                    self.send_sql_batch(sql).await?;
2380                } else {
2381                    // Parameterized statement - sp_executesql (encrypts Always Encrypted params).
2382                    let rpc = self.build_parameterized_rpc(sql, params).await?;
2383                    self.send_rpc(&rpc).await?;
2384                }
2385
2386                // Read response and get row count
2387                self.read_execute_result().await
2388            },
2389            deadline,
2390            canceller,
2391        )
2392        .await;
2393
2394        #[cfg(feature = "otel")]
2395        match &result {
2396            Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
2397            Err(e) => InstrumentationContext::record_error(&mut span, e),
2398        }
2399        #[cfg(feature = "otel")]
2400        timer.finish(instrumentation.metrics(), result.is_ok());
2401
2402        // Drop the span before returning
2403        #[cfg(feature = "otel")]
2404        drop(span);
2405
2406        result
2407    }
2408
2409    /// Execute a query within the transaction with a specific timeout.
2410    ///
2411    /// See [`Client<Ready>::query_with_timeout`] for details.
2412    pub async fn query_with_timeout<'a>(
2413        &'a mut self,
2414        sql: &str,
2415        params: &[&(dyn crate::ToSql + Sync)],
2416        timeout_duration: std::time::Duration,
2417    ) -> Result<QueryStream<'a>> {
2418        self.query_inner(sql, params, Some(timeout_duration)).await
2419    }
2420
2421    /// Execute a statement within the transaction with a specific timeout.
2422    ///
2423    /// See [`Client<Ready>::execute_with_timeout`] for details.
2424    pub async fn execute_with_timeout(
2425        &mut self,
2426        sql: &str,
2427        params: &[&(dyn crate::ToSql + Sync)],
2428        timeout_duration: std::time::Duration,
2429    ) -> Result<u64> {
2430        self.execute_inner(sql, params, Some(timeout_duration))
2431            .await
2432    }
2433
2434    /// Open a FILESTREAM BLOB for async reading and/or writing.
2435    ///
2436    /// This method queries the server for the transaction context, then opens
2437    /// the FILESTREAM handle using the native Win32 `OpenSqlFilestream` API.
2438    ///
2439    /// # Arguments
2440    ///
2441    /// * `path` — The UNC path obtained from the T-SQL `column.PathName()` function.
2442    ///   Query this yourself before calling `open_filestream`:
2443    ///   ```sql
2444    ///   SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1
2445    ///   ```
2446    /// * `access` — Read, write, or read/write access mode.
2447    ///
2448    /// # Requirements
2449    ///
2450    /// - SQL Server must have FILESTREAM enabled (`sp_configure 'filestream access level', 2`)
2451    /// - The Microsoft OLE DB Driver for SQL Server must be installed on the client
2452    /// - The `FileStream` must be dropped before calling [`commit`] or [`rollback`]
2453    ///
2454    /// # Example
2455    ///
2456    /// ```text
2457    /// use mssql_client::FileStreamAccess;
2458    /// use tokio::io::AsyncReadExt;
2459    ///
2460    /// let mut tx = client.begin_transaction().await?;
2461    ///
2462    /// // Get the FILESTREAM path
2463    /// let rows = tx.query(
2464    ///     "SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1",
2465    ///     &[&doc_id],
2466    /// ).await?;
2467    /// let path: String = rows.into_iter().next().unwrap()?.get(0)?;
2468    ///
2469    /// // Open and read the BLOB
2470    /// let mut stream = tx.open_filestream(&path, FileStreamAccess::Read).await?;
2471    /// let mut data = Vec::new();
2472    /// stream.read_to_end(&mut data).await?;
2473    /// drop(stream);
2474    ///
2475    /// tx.commit().await?;
2476    /// ```
2477    #[cfg(all(windows, feature = "filestream"))]
2478    pub async fn open_filestream(
2479        &mut self,
2480        path: &str,
2481        access: crate::filestream::FileStreamAccess,
2482    ) -> Result<crate::filestream::FileStream> {
2483        tracing::debug!(path = path, ?access, "opening FILESTREAM BLOB");
2484
2485        // Get the transaction context from SQL Server.
2486        // This binds the file access to the current SQL transaction.
2487        let txn_context: Vec<u8> = {
2488            let rows = self
2489                .query("SELECT GET_FILESTREAM_TRANSACTION_CONTEXT()", &[])
2490                .await?;
2491            let mut ctx = None;
2492            for result in rows {
2493                let row = result?;
2494                ctx = Some(row.get::<Vec<u8>>(0)?);
2495            }
2496            ctx.ok_or_else(|| {
2497                Error::FileStream("GET_FILESTREAM_TRANSACTION_CONTEXT() returned no rows".into())
2498            })?
2499        };
2500
2501        crate::filestream::FileStream::open(path, access, &txn_context)
2502    }
2503
2504    /// Commit the transaction.
2505    ///
2506    /// This transitions the client back to `Ready` state.
2507    pub async fn commit(mut self) -> Result<Client<Ready>> {
2508        tracing::debug!("committing transaction");
2509
2510        #[cfg(feature = "otel")]
2511        let instrumentation = self.instrumentation.clone();
2512        #[cfg(feature = "otel")]
2513        let mut span = instrumentation.transaction_span("COMMIT");
2514
2515        // Execute COMMIT TRANSACTION
2516        let result = async {
2517            self.send_sql_batch("COMMIT TRANSACTION").await?;
2518            self.read_execute_result().await
2519        }
2520        .await;
2521
2522        #[cfg(feature = "otel")]
2523        match &result {
2524            Ok(_) => InstrumentationContext::record_success(&mut span, None),
2525            Err(e) => InstrumentationContext::record_error(&mut span, e),
2526        }
2527
2528        // Drop the span before moving instrumentation
2529        #[cfg(feature = "otel")]
2530        drop(span);
2531
2532        result?;
2533
2534        Ok(Client {
2535            config: self.config,
2536            _state: PhantomData,
2537            connection: self.connection,
2538            server_version: self.server_version,
2539            current_database: self.current_database,
2540            server_collation: self.server_collation,
2541            statement_cache: self.statement_cache,
2542            transaction_descriptor: 0, // Reset to auto-commit mode
2543            needs_reset: self.needs_reset,
2544            in_flight: self.in_flight,
2545            #[cfg(feature = "otel")]
2546            instrumentation: self.instrumentation,
2547            #[cfg(feature = "always-encrypted")]
2548            encryption_context: self.encryption_context,
2549        })
2550    }
2551
2552    /// Rollback the transaction.
2553    ///
2554    /// This transitions the client back to `Ready` state.
2555    pub async fn rollback(mut self) -> Result<Client<Ready>> {
2556        tracing::debug!("rolling back transaction");
2557
2558        #[cfg(feature = "otel")]
2559        let instrumentation = self.instrumentation.clone();
2560        #[cfg(feature = "otel")]
2561        let mut span = instrumentation.transaction_span("ROLLBACK");
2562
2563        // Execute ROLLBACK TRANSACTION
2564        let result = async {
2565            self.send_sql_batch("ROLLBACK TRANSACTION").await?;
2566            self.read_execute_result().await
2567        }
2568        .await;
2569
2570        #[cfg(feature = "otel")]
2571        match &result {
2572            Ok(_) => InstrumentationContext::record_success(&mut span, None),
2573            Err(e) => InstrumentationContext::record_error(&mut span, e),
2574        }
2575
2576        // Drop the span before moving instrumentation
2577        #[cfg(feature = "otel")]
2578        drop(span);
2579
2580        result?;
2581
2582        Ok(Client {
2583            config: self.config,
2584            _state: PhantomData,
2585            connection: self.connection,
2586            server_version: self.server_version,
2587            current_database: self.current_database,
2588            server_collation: self.server_collation,
2589            statement_cache: self.statement_cache,
2590            transaction_descriptor: 0, // Reset to auto-commit mode
2591            needs_reset: self.needs_reset,
2592            in_flight: self.in_flight,
2593            #[cfg(feature = "otel")]
2594            instrumentation: self.instrumentation,
2595            #[cfg(feature = "always-encrypted")]
2596            encryption_context: self.encryption_context,
2597        })
2598    }
2599
2600    /// Create a savepoint and return a handle for later rollback.
2601    ///
2602    /// The returned `SavePoint` handle contains the validated savepoint name.
2603    /// Use it with `rollback_to()` to partially undo transaction work.
2604    ///
2605    /// # Example
2606    ///
2607    /// ```rust,no_run
2608    /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
2609    /// let mut tx = client.begin_transaction().await?;
2610    /// tx.execute("INSERT INTO orders ...", &[]).await?;
2611    /// let sp = tx.save_point("before_items").await?;
2612    /// tx.execute("INSERT INTO items ...", &[]).await?;
2613    /// // Oops, rollback just the items
2614    /// tx.rollback_to(&sp).await?;
2615    /// tx.commit().await?;
2616    /// # Ok(())
2617    /// # }
2618    /// ```
2619    pub async fn save_point(&mut self, name: &str) -> Result<SavePoint> {
2620        crate::validation::validate_identifier(name)?;
2621        tracing::debug!(name = name, "creating savepoint");
2622
2623        // Execute SAVE TRANSACTION <name>
2624        // Note: name is validated by validate_identifier() to prevent SQL injection
2625        let sql = format!("SAVE TRANSACTION {name}");
2626        self.send_sql_batch(&sql).await?;
2627        self.read_execute_result().await?;
2628
2629        Ok(SavePoint::new(name.to_string()))
2630    }
2631
2632    /// Rollback to a savepoint.
2633    ///
2634    /// This rolls back all changes made after the savepoint was created,
2635    /// but keeps the transaction active. The savepoint remains valid and
2636    /// can be rolled back to again.
2637    ///
2638    /// # Example
2639    ///
2640    /// ```rust,no_run
2641    /// # async fn ex(mut tx: mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> {
2642    /// let sp = tx.save_point("checkpoint").await?;
2643    /// // ... do some work ...
2644    /// tx.rollback_to(&sp).await?;  // Undo changes since checkpoint
2645    /// // Transaction is still active, savepoint is still valid
2646    /// # Ok(())
2647    /// # }
2648    /// ```
2649    pub async fn rollback_to(&mut self, savepoint: &SavePoint) -> Result<()> {
2650        tracing::debug!(name = savepoint.name(), "rolling back to savepoint");
2651
2652        // Execute ROLLBACK TRANSACTION <name>
2653        // Note: savepoint name was validated during creation
2654        let sql = format!("ROLLBACK TRANSACTION {}", savepoint.name());
2655        self.send_sql_batch(&sql).await?;
2656        self.read_execute_result().await?;
2657
2658        Ok(())
2659    }
2660
2661    /// Release a savepoint (optional cleanup).
2662    ///
2663    /// Note: SQL Server doesn't have explicit savepoint release, but this
2664    /// method is provided for API completeness. The savepoint is automatically
2665    /// released when the transaction commits or rolls back.
2666    pub async fn release_savepoint(&mut self, savepoint: SavePoint) -> Result<()> {
2667        tracing::debug!(name = savepoint.name(), "releasing savepoint");
2668
2669        // SQL Server doesn't require explicit savepoint release
2670        // The savepoint is implicitly released on commit/rollback
2671        // This method exists for API completeness
2672        drop(savepoint);
2673        Ok(())
2674    }
2675
2676    /// Get a handle for cancelling the current query within the transaction.
2677    ///
2678    /// See [`Client<Ready>::cancel_handle`] for usage examples.
2679    #[must_use]
2680    pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
2681        self.connection_cancel_handle()
2682    }
2683}
2684
2685impl<S: ConnectionState> std::fmt::Debug for Client<S> {
2686    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2687        f.debug_struct("Client")
2688            .field("host", &self.config.host)
2689            .field("port", &self.config.port)
2690            .field("database", &self.config.database)
2691            .finish()
2692    }
2693}
2694
2695#[cfg(test)]
2696mod blob_result_set_validation_tests {
2697    use tds_protocol::token::{ColMetaData, ColumnData, TypeInfo};
2698    use tds_protocol::types::TypeId;
2699
2700    use super::{Client, Ready};
2701    use crate::error::Error;
2702
2703    /// A scalar (non-MAX) column.
2704    fn scalar(name: &str) -> ColumnData {
2705        col(name, TypeId::Int4, None)
2706    }
2707
2708    /// A MAX (PLP) column: `max_length == 0xFFFF` marks the MAX variant.
2709    fn blob(name: &str) -> ColumnData {
2710        col(name, TypeId::BigVarBinary, Some(0xFFFF))
2711    }
2712
2713    fn col(name: &str, type_id: TypeId, max_length: Option<u32>) -> ColumnData {
2714        ColumnData {
2715            name: name.to_string(),
2716            type_id,
2717            col_type: 0,
2718            flags: 0,
2719            user_type: 0,
2720            type_info: TypeInfo {
2721                max_length,
2722                ..Default::default()
2723            },
2724            crypto_metadata: None,
2725        }
2726    }
2727
2728    fn meta(columns: Vec<ColumnData>) -> ColMetaData {
2729        ColMetaData {
2730            columns,
2731            cek_table: None,
2732        }
2733    }
2734
2735    fn validate(columns: Vec<ColumnData>) -> Result<(usize, usize), Error> {
2736        Client::<Ready>::validate_blob_rows_result_set(&meta(columns))
2737    }
2738
2739    #[test]
2740    fn single_trailing_blob() {
2741        assert_eq!(validate(vec![scalar("id"), blob("doc")]).unwrap(), (1, 1));
2742    }
2743
2744    #[test]
2745    fn multiple_trailing_blobs() {
2746        assert_eq!(
2747            validate(vec![scalar("id"), blob("doc1"), blob("doc2")]).unwrap(),
2748            (1, 2)
2749        );
2750    }
2751
2752    #[test]
2753    fn all_columns_blobs() {
2754        assert_eq!(validate(vec![blob("a"), blob("b")]).unwrap(), (0, 2));
2755    }
2756
2757    #[test]
2758    fn no_max_column_is_rejected() {
2759        assert!(matches!(
2760            validate(vec![scalar("id"), scalar("j")]),
2761            Err(Error::Protocol(_))
2762        ));
2763    }
2764
2765    #[test]
2766    fn scalar_after_blob_is_rejected() {
2767        // Interleaved MAX columns are out of scope: a scalar after a blob.
2768        assert!(matches!(
2769            validate(vec![scalar("id"), blob("doc"), scalar("trailing")]),
2770            Err(Error::Protocol(_))
2771        ));
2772        // ...even when more blobs follow the interloping scalar.
2773        assert!(matches!(
2774            validate(vec![blob("doc1"), scalar("mid"), blob("doc2")]),
2775            Err(Error::Protocol(_))
2776        ));
2777    }
2778}