Skip to main content

mssql_client/
client.rs

1//! SQL Server client implementation.
2//!
3//! ## DDL and statement routing
4//!
5//! [`Client::execute`] routes automatically by parameter count: with no
6//! parameters it sends a SQL batch (which permits DDL such as `CREATE` / `ALTER`
7//! / `DROP`); with parameters it uses `sp_executesql`, whose procedure context
8//! SQL Server forbids DDL in. Run DDL with an empty parameter slice:
9//!
10//! ```rust,no_run
11//! # async fn create_table(config: mssql_client::Config) -> Result<(), mssql_client::Error> {
12//! # let mut client = mssql_client::Client::connect(config).await?;
13//! client.execute("CREATE TABLE dbo.t (id INT)", &[]).await?;
14//! # Ok(())
15//! # }
16//! ```
17//!
18//! Use [`Client::simple_query`] for fire-and-forget batches (including
19//! multi-statement, `;`-separated DDL) when you don't need the affected-row count.
20
21// Allow unwrap/expect for chrono date construction with known-valid constant dates
22// and for regex patterns that are compile-time constants
23#![allow(clippy::unwrap_used, clippy::expect_used, clippy::needless_range_loop)]
24
25mod connect;
26mod params;
27pub(crate) mod response;
28
29use std::marker::PhantomData;
30
31use mssql_codec::connection::Connection;
32#[cfg(feature = "tls")]
33use mssql_tls::TlsStream;
34use tds_protocol::packet::PacketType;
35use tds_protocol::rpc::RpcRequest;
36use tds_protocol::token::{EnvChange, EnvChangeType};
37use tokio::net::TcpStream;
38
39use crate::config::Config;
40use crate::error::{Error, Result};
41#[cfg(feature = "otel")]
42use crate::instrumentation::InstrumentationContext;
43use crate::state::{ConnectionState, InTransaction, Ready};
44use crate::statement_cache::StatementCache;
45use crate::stream::{MultiResultStream, QueryStream};
46use crate::transaction::SavePoint;
47
48/// How long to wait for the server to acknowledge an Attention packet after
49/// a command timeout. SqlClient waits 5 seconds before dooming the
50/// connection; we match it.
51const ATTENTION_ACK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
52
53/// Run a network future under an optional command deadline.
54///
55/// On timeout this sends an Attention packet via `canceller` and then awaits
56/// the future so its own read loop drains the server's DONE_ATTN
57/// acknowledgement, leaving the connection clean before returning
58/// [`Error::CommandTimeout`]. This is the cancel-safe alternative to dropping
59/// the future (e.g. via `tokio::time::timeout`), which would leave unconsumed
60/// TDS data in the connection buffer and desync the next request.
61///
62/// The drain itself is bounded by [`ATTENTION_ACK_TIMEOUT`] — a hung server
63/// that never acknowledges the attention must not turn the timeout into an
64/// infinite wait. When the bound expires the connection is abandoned
65/// mid-response: `in_flight` stays set, so the pool discards the connection
66/// at check-in instead of reusing it.
67pub(crate) async fn run_with_deadline<F, T>(
68    fut: F,
69    deadline: Option<std::time::Duration>,
70    canceller: crate::cancel::CancelHandle,
71) -> Result<T>
72where
73    F: std::future::Future<Output = Result<T>>,
74{
75    let Some(d) = deadline else {
76        return fut.await;
77    };
78    tokio::pin!(fut);
79    tokio::select! {
80        biased;
81        res = &mut fut => res,
82        () = tokio::time::sleep(d) => {
83            // Signal cancellation, then let the in-flight read consume the
84            // server's attention acknowledgement so the connection stays usable.
85            let drain = async {
86                let _ = canceller.cancel().await;
87                let _ = (&mut fut).await;
88            };
89            if tokio::time::timeout(ATTENTION_ACK_TIMEOUT, drain).await.is_err() {
90                tracing::warn!(
91                    timeout = ?ATTENTION_ACK_TIMEOUT,
92                    "server did not acknowledge attention; abandoning the connection as dirty"
93                );
94            }
95            Err(Error::CommandTimeout)
96        }
97    }
98}
99
100/// SQL Server client with type-state connection management.
101///
102/// The generic parameter `S` represents the current connection state,
103/// ensuring at compile time that certain operations are only available
104/// in appropriate states.
105pub struct Client<S: ConnectionState> {
106    config: Config,
107    _state: PhantomData<S>,
108    /// The underlying connection (present only when connected)
109    connection: Option<ConnectionHandle>,
110    /// Server version from LoginAck (raw u32 TDS version)
111    server_version: Option<u32>,
112    /// Current database from EnvChange
113    current_database: Option<String>,
114    /// Server's default collation from SqlCollation EnvChange during login.
115    /// Used when `SendStringParametersAsUnicode=false` to encode VARCHAR
116    /// parameters with the correct character encoding and collation bytes.
117    server_collation: Option<tds_protocol::token::Collation>,
118    /// Prepared statement cache for query optimization
119    statement_cache: StatementCache,
120    /// Transaction descriptor from BeginTransaction EnvChange.
121    /// Per MS-TDS spec, this value must be included in ALL_HEADERS for subsequent
122    /// requests within an explicit transaction. 0 indicates auto-commit mode.
123    transaction_descriptor: u64,
124    /// Whether a request has been sent and the response has not yet been fully read.
125    /// Used by the connection pool to detect dirty connections after cancel/timeout.
126    in_flight: bool,
127    /// Whether this connection needs a reset on next use.
128    /// Set by connection pool on checkin, cleared after first query/execute.
129    /// When true, the RESETCONNECTION flag is set on the first TDS packet.
130    needs_reset: bool,
131    /// OpenTelemetry instrumentation context (when otel feature is enabled)
132    #[cfg(feature = "otel")]
133    instrumentation: InstrumentationContext,
134    /// Always Encrypted context for column decryption (when always-encrypted feature is enabled)
135    #[cfg(feature = "always-encrypted")]
136    pub(crate) encryption_context: Option<std::sync::Arc<crate::encryption::EncryptionContext>>,
137}
138
139/// Internal connection handle wrapping the actual connection.
140///
141/// This is an enum to support different connection types:
142/// - TLS (TDS 8.0 strict mode) - requires `tls` feature
143/// - TLS with PreLogin wrapping (TDS 7.x style) - requires `tls` feature
144/// - Plain TCP (for internal networks or when `tls` feature is disabled)
145#[allow(dead_code)] // Connection will be used once query execution is implemented
146enum ConnectionHandle {
147    /// TLS connection (TDS 8.0 strict mode - TLS before any TDS traffic)
148    #[cfg(feature = "tls")]
149    Tls(Connection<TlsStream<TcpStream>>),
150    /// TLS connection with PreLogin wrapping (TDS 7.x style)
151    #[cfg(feature = "tls")]
152    TlsPrelogin(Connection<TlsStream<mssql_tls::TlsPreloginWrapper<TcpStream>>>),
153    /// Plain TCP connection (for internal networks or when `tls` feature is disabled)
154    Plain(Connection<TcpStream>),
155}
156
157/// The parameter `TypeInfo` to declare a typed NULL ([`crate::null`]) with, from
158/// its [`crate::ToSql::sql_type`] name. Returns `None` for an untyped NULL
159/// (`Option::None`, type `"NULL"`), which falls back to the default param type.
160#[cfg(feature = "always-encrypted")]
161fn null_param_type_info(sql_type: &str) -> Option<tds_protocol::rpc::TypeInfo> {
162    use tds_protocol::rpc::TypeInfo;
163    Some(match sql_type {
164        "BIT" => TypeInfo::bit(),
165        "TINYINT" => TypeInfo::tinyint(),
166        "SMALLINT" => TypeInfo::smallint(),
167        "INT" => TypeInfo::int(),
168        "BIGINT" => TypeInfo::bigint(),
169        "REAL" => TypeInfo::real(),
170        "FLOAT" => TypeInfo::float(),
171        "NVARCHAR" => TypeInfo::nvarchar(1),
172        "VARBINARY" => TypeInfo::varbinary(1),
173        "UNIQUEIDENTIFIER" => TypeInfo::uuid(),
174        "DATE" => TypeInfo::date(),
175        _ => return None,
176    })
177}
178
179/// Map a typed-parameter wrapper's [`EncryptedParamType`] to the `TypeInfo` the
180/// driver declares it as (for `sp_describe_parameter_encryption` and the
181/// `CryptoMetadata` base type). Unknown future variants error rather than
182/// silently declaring the wrong type.
183#[cfg(feature = "always-encrypted")]
184fn encrypted_param_type_info(
185    ty: mssql_types::EncryptedParamType,
186) -> Result<tds_protocol::rpc::TypeInfo> {
187    use mssql_types::EncryptedParamType as E;
188    use tds_protocol::rpc::TypeInfo;
189    Ok(match ty {
190        E::Decimal { precision, scale } => TypeInfo::decimal(precision, scale),
191        E::Time { scale } => TypeInfo::time(scale),
192        E::DateTime2 { scale } => TypeInfo::datetime2(scale),
193        E::DateTimeOffset { scale } => TypeInfo::datetimeoffset(scale),
194        E::DateTime => TypeInfo::datetime(),
195        E::Char { length } => TypeInfo::char(length),
196        E::NChar { length } => TypeInfo::nchar(length),
197        E::Binary { length } => TypeInfo::binary(length),
198        _ => {
199            return Err(Error::Encryption(
200                "unsupported Always Encrypted parameter type".to_string(),
201            ));
202        }
203    })
204}
205
206// Private helper methods available to all connection states
207impl<S: ConnectionState> Client<S> {
208    /// The default per-command deadline from `command_timeout`.
209    ///
210    /// Returns `None` when `command_timeout` is zero, which means "no limit"
211    /// (matching ADO.NET's `SqlCommand.CommandTimeout = 0`).
212    pub(crate) fn command_deadline(&self) -> Option<std::time::Duration> {
213        let t = self.config.command_timeout;
214        if t.is_zero() { None } else { Some(t) }
215    }
216
217    /// Build a cancel handle for the current connection, regardless of
218    /// connection state. The public, documented surface is
219    /// [`Client::<Ready>::cancel_handle`]; both state-specific methods
220    /// delegate here.
221    pub(crate) fn connection_cancel_handle(&self) -> crate::cancel::CancelHandle {
222        let connection = self
223            .connection
224            .as_ref()
225            .expect("connection should be present");
226        match connection {
227            #[cfg(feature = "tls")]
228            ConnectionHandle::Tls(conn) => {
229                crate::cancel::CancelHandle::from_tls(conn.cancel_handle())
230            }
231            #[cfg(feature = "tls")]
232            ConnectionHandle::TlsPrelogin(conn) => {
233                crate::cancel::CancelHandle::from_tls_prelogin(conn.cancel_handle())
234            }
235            ConnectionHandle::Plain(conn) => {
236                crate::cancel::CancelHandle::from_plain(conn.cancel_handle())
237            }
238        }
239    }
240
241    /// Cancel an in-flight response that was abandoned without being drained —
242    /// e.g. a [`RowStream`](crate::RowStream) dropped or cancelled mid-result.
243    ///
244    /// Sends an Attention and drains to the server's DONE_ATTN acknowledgement so
245    /// the socket is clean and the connection reusable. A no-op when nothing is
246    /// in flight. Bounded by [`ATTENTION_ACK_TIMEOUT`]: if the acknowledgement
247    /// never arrives the connection is left marked in-flight (so the pool
248    /// discards it on return) and an error is returned.
249    pub(crate) async fn cancel_in_flight_response(&mut self) -> Result<()> {
250        if !self.in_flight {
251            return Ok(());
252        }
253        let canceller = self.connection_cancel_handle();
254        let drain = async {
255            canceller.cancel().await?;
256            // With the cancelling flag set, `read_response_message` routes through
257            // the codec's drain-after-cancel path and returns `Err(Cancelled)`
258            // once the DONE_ATTN acknowledgement is consumed (clearing
259            // `in_flight`). Any full messages that arrive before the ack are
260            // discarded.
261            loop {
262                match self.read_response_message().await {
263                    Err(Error::Cancelled) => return Ok(()),
264                    Ok(_) => continue,
265                    Err(e) => return Err(e),
266                }
267            }
268        };
269        match tokio::time::timeout(ATTENTION_ACK_TIMEOUT, drain).await {
270            Ok(result) => result,
271            Err(_) => {
272                tracing::warn!(
273                    timeout = ?ATTENTION_ACK_TIMEOUT,
274                    "attention acknowledgement not received while cancelling an \
275                     abandoned response; connection left dirty"
276                );
277                Err(Error::Cancelled)
278            }
279        }
280    }
281
282    /// Process transaction-related EnvChange tokens.
283    ///
284    /// This handles BeginTransaction, CommitTransaction, and RollbackTransaction
285    /// EnvChange tokens, updating the transaction descriptor accordingly.
286    ///
287    /// This enables executing BEGIN TRANSACTION, COMMIT, and ROLLBACK via raw SQL
288    /// while still having the transaction descriptor tracked correctly.
289    fn process_transaction_env_change(env: &EnvChange, transaction_descriptor: &mut u64) {
290        use tds_protocol::token::EnvChangeValue;
291
292        match env.env_type {
293            EnvChangeType::BeginTransaction => {
294                if let EnvChangeValue::Binary(ref data) = env.new_value {
295                    if data.len() >= 8 {
296                        let descriptor = u64::from_le_bytes([
297                            data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
298                        ]);
299                        tracing::debug!(descriptor = descriptor, "transaction started via raw SQL");
300                        *transaction_descriptor = descriptor;
301                    }
302                }
303            }
304            EnvChangeType::CommitTransaction | EnvChangeType::RollbackTransaction => {
305                tracing::debug!(
306                    env_type = ?env.env_type,
307                    "transaction ended via raw SQL"
308                );
309                *transaction_descriptor = 0;
310            }
311            _ => {}
312        }
313    }
314
315    /// Apply a transaction-related `ENVCHANGE` to this client's descriptor.
316    ///
317    /// Lets the streaming readers (which live in sibling modules) keep the
318    /// transaction descriptor in sync with raw `BEGIN`/`COMMIT`/`ROLLBACK`
319    /// batches seen mid-stream, exactly as the buffered readers do.
320    pub(crate) fn apply_transaction_env_change(&mut self, env: &EnvChange) {
321        Self::process_transaction_env_change(env, &mut self.transaction_descriptor);
322    }
323
324    /// Send a SQL batch to the server.
325    ///
326    /// Uses the client's current transaction descriptor in ALL_HEADERS.
327    /// Per MS-TDS spec, when in an explicit transaction, the descriptor
328    /// returned by BeginTransaction must be included.
329    ///
330    /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
331    /// is included in the first packet to reset connection state.
332    async fn send_sql_batch(&mut self, sql: &str) -> Result<()> {
333        // If a previous streamed response was abandoned (a RowStream dropped
334        // mid-result), drain it before issuing a new request so the next read
335        // does not pick up the old response's bytes.
336        self.cancel_in_flight_response().await?;
337
338        let payload = tds_protocol::__private::encode_sql_batch_with_transaction(
339            sql,
340            self.transaction_descriptor,
341        );
342        let max_packet = self.config.packet_size as usize;
343
344        // Check if we need to reset the connection on this request
345        let reset = self.needs_reset;
346        if reset {
347            self.needs_reset = false; // Clear flag before sending
348            tracing::debug!("sending SQL batch with RESETCONNECTION flag");
349        }
350
351        self.in_flight = true;
352        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
353
354        match connection {
355            #[cfg(feature = "tls")]
356            ConnectionHandle::Tls(conn) => {
357                conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
358                    .await?;
359            }
360            #[cfg(feature = "tls")]
361            ConnectionHandle::TlsPrelogin(conn) => {
362                conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
363                    .await?;
364            }
365            ConnectionHandle::Plain(conn) => {
366                conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
367                    .await?;
368            }
369        }
370
371        Ok(())
372    }
373
374    /// Send an RPC request to the server.
375    ///
376    /// Uses the client's current transaction descriptor in ALL_HEADERS.
377    ///
378    /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
379    /// is included in the first packet to reset connection state.
380    pub(crate) async fn send_rpc(&mut self, rpc: &RpcRequest) -> Result<()> {
381        // Drain an abandoned streamed response (see `send_sql_batch`) before
382        // issuing this request.
383        self.cancel_in_flight_response().await?;
384
385        let payload = rpc.encode_with_transaction(self.transaction_descriptor);
386        let max_packet = self.config.packet_size as usize;
387
388        // Check if we need to reset the connection on this request
389        let reset = self.needs_reset;
390        if reset {
391            self.needs_reset = false; // Clear flag before sending
392            tracing::debug!("sending RPC with RESETCONNECTION flag");
393        }
394
395        self.in_flight = true;
396        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
397
398        match connection {
399            #[cfg(feature = "tls")]
400            ConnectionHandle::Tls(conn) => {
401                conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
402                    .await?;
403            }
404            #[cfg(feature = "tls")]
405            ConnectionHandle::TlsPrelogin(conn) => {
406                conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
407                    .await?;
408            }
409            ConnectionHandle::Plain(conn) => {
410                conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
411                    .await?;
412            }
413        }
414
415        Ok(())
416    }
417
418    /// Start building a stored procedure call with full control over parameters.
419    ///
420    /// Returns a [`crate::procedure::ProcedureBuilder`] that allows adding named input and output
421    /// parameters before executing the call.
422    ///
423    /// The procedure name is validated to prevent SQL injection. It may be
424    /// schema-qualified (e.g., `"dbo.MyProc"`).
425    ///
426    /// # Example
427    ///
428    /// ```rust,no_run
429    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
430    /// let result = client.procedure("dbo.CalculateSum")?
431    ///     .input("@a", &10i32)
432    ///     .input("@b", &20i32)
433    ///     .output_int("@result")
434    ///     .execute().await?;
435    ///
436    /// let sum = result.get_output("@result").unwrap();
437    /// # let _ = sum;
438    /// # Ok(())
439    /// # }
440    /// ```
441    pub fn procedure(
442        &mut self,
443        proc_name: &str,
444    ) -> Result<crate::procedure::ProcedureBuilder<'_, S>> {
445        crate::validation::validate_qualified_identifier(proc_name)?;
446        Ok(crate::procedure::ProcedureBuilder::new(self, proc_name))
447    }
448
449    /// Execute a stored procedure with positional input parameters.
450    ///
451    /// This is a convenience method for the common case of calling a procedure
452    /// with input-only parameters. For output parameters or named parameters,
453    /// use [`procedure()`](Client::procedure) instead.
454    ///
455    /// # Example
456    ///
457    /// ```rust,no_run
458    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
459    /// let result = client.call_procedure("dbo.GetUser", &[&1i32]).await?;
460    /// assert_eq!(result.return_value, 0);
461    ///
462    /// if let Some(rs) = result.first_result_set() {
463    ///     println!("columns: {:?}", rs.columns());
464    /// }
465    /// # Ok(())
466    /// # }
467    /// ```
468    pub async fn call_procedure(
469        &mut self,
470        proc_name: &str,
471        params: &[&(dyn crate::ToSql + Sync)],
472    ) -> Result<crate::stream::ProcedureResult> {
473        crate::validation::validate_qualified_identifier(proc_name)?;
474
475        tracing::debug!(
476            proc_name = proc_name,
477            params_count = params.len(),
478            "executing stored procedure"
479        );
480
481        let rpc_params =
482            Self::convert_params_positional(params, self.send_unicode(), self.server_collation())?;
483        let mut rpc = RpcRequest::named(proc_name);
484        for param in rpc_params {
485            rpc = rpc.param(param);
486        }
487
488        let deadline = self.command_deadline();
489        let canceller = self.connection_cancel_handle();
490        run_with_deadline(
491            async {
492                self.send_rpc(&rpc).await?;
493                self.read_procedure_result().await
494            },
495            deadline,
496            canceller,
497        )
498        .await
499    }
500
501    /// Ask the server how each parameter of a statement must be encrypted.
502    ///
503    /// Issues the `sp_describe_parameter_encryption` system RPC for the
504    /// parameterized statement `tsql` with the parameter declaration `params`
505    /// (e.g. `"@id int, @name nvarchar(64)"`), and parses the two result sets
506    /// into a [`ParameterEncryptionInfo`](crate::encryption::ParameterEncryptionInfo): the
507    /// CEK table, plus — for each parameter the server reports as encrypted —
508    /// which CEK and whether deterministic or randomized. Parameters the server
509    /// reports as plaintext are omitted.
510    ///
511    /// This is the first step of Always Encrypted parameter encryption; the
512    /// connection must have negotiated it (`Column Encryption Setting=Enabled`).
513    #[cfg(feature = "always-encrypted")]
514    pub(crate) async fn describe_parameter_encryption(
515        &mut self,
516        tsql: &str,
517        params: &str,
518    ) -> Result<crate::encryption::ParameterEncryptionInfo> {
519        let tsql_arg = tsql.to_string();
520        let params_arg = params.to_string();
521        let mut result = self
522            .call_procedure(
523                "sp_describe_parameter_encryption",
524                &[&tsql_arg, &params_arg],
525            )
526            .await?;
527        crate::encryption::ParameterEncryptionInfo::from_describe_result_sets(
528            &mut result.result_sets,
529        )
530    }
531
532    /// Build the `sp_executesql` request for a parameterized statement.
533    ///
534    /// When the connection has Always Encrypted enabled, parameters the server
535    /// reports as encrypted are encrypted client-side first (an extra
536    /// `sp_describe_parameter_encryption` round-trip). Otherwise this is the
537    /// plain parameter conversion.
538    pub(crate) async fn build_parameterized_rpc(
539        &mut self,
540        sql: &str,
541        params: &[&(dyn crate::ToSql + Sync)],
542    ) -> Result<RpcRequest> {
543        #[cfg(feature = "always-encrypted")]
544        if self.encryption_context.is_some() {
545            return self.build_encrypted_sql_rpc(sql, params).await;
546        }
547        let rpc_params =
548            Self::convert_params(params, self.send_unicode(), self.server_collation())?;
549        Ok(RpcRequest::execute_sql(sql, rpc_params))
550    }
551
552    /// Encrypt the Always Encrypted parameters of a statement, then build its
553    /// `sp_executesql` request.
554    ///
555    /// Asks the server which parameters are encrypted
556    /// ([`describe_parameter_encryption`](Self::describe_parameter_encryption)),
557    /// then for each one normalizes the value, resolves its column encryption
558    /// key, encrypts, and emits an encrypted RPC parameter. Parameters the
559    /// server reports as plaintext are sent unchanged.
560    #[cfg(feature = "always-encrypted")]
561    async fn build_encrypted_sql_rpc(
562        &mut self,
563        sql: &str,
564        params: &[&(dyn crate::ToSql + Sync)],
565    ) -> Result<RpcRequest> {
566        use tds_protocol::rpc::RpcParam;
567
568        let Some(ctx) = self.encryption_context.clone() else {
569            let rpc_params =
570                Self::convert_params(params, self.send_unicode(), self.server_collation())?;
571            return Ok(RpcRequest::execute_sql(sql, rpc_params));
572        };
573
574        // Resolve each parameter's value once (AE normalization needs the typed
575        // value, not the wire encoding) and build the plaintext RPC params.
576        let send_unicode = self.send_unicode();
577        let collation = self.server_collation().cloned();
578        let mut values: Vec<mssql_types::SqlValue> = Vec::with_capacity(params.len());
579        let mut plaintext: Vec<RpcParam> = Vec::with_capacity(params.len());
580        let mut hints: Vec<Option<mssql_types::EncryptedParamType>> =
581            Vec::with_capacity(params.len());
582        for (i, p) in params.iter().enumerate() {
583            let name = format!("@p{}", i + 1);
584            let value = p.to_sql()?;
585            let hint = p.encrypted_param_type();
586            // A typed NULL (e.g. `null::<i32>()`) is declared by its SQL type so
587            // describe accepts it against the target encrypted column; an untyped
588            // NULL falls back to the default in `sql_value_to_rpc_param`.
589            let rpc_param = match (&value, null_param_type_info(p.sql_type())) {
590                (mssql_types::SqlValue::Null, Some(type_info)) => RpcParam::null(&name, type_info),
591                _ => {
592                    let mut param = Self::sql_value_to_rpc_param(
593                        &name,
594                        &value,
595                        send_unicode,
596                        collation.as_ref(),
597                    )?;
598                    // A typed-parameter wrapper (e.g. `numeric(v, p, s)`,
599                    // `datetime2(v, scale)`) declares an explicit SQL type so
600                    // describe matches the encrypted column exactly — the value
601                    // alone cannot convey precision/scale or the legacy-`datetime`
602                    // vs `datetime2` distinction.
603                    if let Some(ty) = hint {
604                        param.type_info = encrypted_param_type_info(ty)?;
605                    }
606                    param
607                }
608            };
609            plaintext.push(rpc_param);
610            values.push(value);
611            hints.push(hint);
612        }
613
614        if plaintext.is_empty() {
615            return Ok(RpcRequest::execute_sql(sql, plaintext));
616        }
617
618        // Ask the server which parameters need encryption.
619        let declarations = RpcRequest::build_param_declarations(&plaintext);
620        let info = self
621            .describe_parameter_encryption(sql, &declarations)
622            .await?;
623        if info.parameters.is_empty() {
624            return Ok(RpcRequest::execute_sql(sql, plaintext));
625        }
626
627        // Encrypt the flagged parameters; pass the rest through untouched.
628        let mut final_params: Vec<RpcParam> = Vec::with_capacity(plaintext.len());
629        for ((value, param), hint) in values.into_iter().zip(plaintext).zip(hints) {
630            let Some(crypto) = info.get_parameter(&param.name) else {
631                final_params.push(param);
632                continue;
633            };
634            let entry = info.cek_table.get(crypto.cek_ordinal).ok_or_else(|| {
635                Error::Protocol(format!(
636                    "encrypted parameter {} references missing CEK ordinal {}",
637                    param.name, crypto.cek_ordinal
638                ))
639            })?;
640            let metadata = tds_protocol::rpc::EncryptedParamMetadata {
641                base_type_info: param.type_info.clone(),
642                algorithm_id: crypto.algorithm_id,
643                encryption_type: crypto.encryption_type,
644                database_id: entry.database_id,
645                cek_id: entry.cek_id,
646                cek_version: entry.cek_version,
647                cek_md_version: entry.cek_md_version,
648                normalization_rule_version: crypto.normalization_rule_version,
649            };
650            // A NULL value bound to an encrypted column is sent as an encrypted
651            // NULL (the server rejects a plaintext parameter for an encrypted
652            // column); there is nothing to encrypt.
653            if matches!(value, mssql_types::SqlValue::Null) {
654                final_params.push(RpcParam::encrypted_null(param.name, metadata));
655                continue;
656            }
657            let normalized = crate::encryption::normalize_for_encryption(&value, hint)?;
658            let ciphertext = ctx
659                .encrypt_value(&normalized, entry, crypto.encryption_type)
660                .await?;
661            final_params.push(RpcParam::encrypted(
662                param.name,
663                bytes::Bytes::from(ciphertext),
664                metadata,
665            ));
666        }
667
668        Ok(RpcRequest::execute_sql(sql, final_params))
669    }
670
671    /// Start a bulk insert operation for the specified table.
672    ///
673    /// Sends the `INSERT BULK` statement to the server and returns a
674    /// [`crate::bulk::BulkWriter`] for streaming rows. The writer holds
675    /// a mutable borrow on the client, preventing other operations while
676    /// the bulk insert is in progress.
677    ///
678    /// # Example
679    ///
680    /// ```rust,no_run
681    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
682    /// use mssql_client::{BulkInsertBuilder, BulkColumn, SqlValue};
683    ///
684    /// let builder = BulkInsertBuilder::new("dbo.Users")
685    ///     .with_typed_columns(vec![
686    ///         BulkColumn::new("id", "INT", 0)?,
687    ///         BulkColumn::new("name", "NVARCHAR(100)", 1)?,
688    ///     ]);
689    ///
690    /// let mut writer = client.bulk_insert(&builder).await?;
691    /// writer.send_row_values(&[SqlValue::Int(1), SqlValue::String("Alice".into())])?;
692    /// writer.send_row_values(&[SqlValue::Int(2), SqlValue::String("Bob".into())])?;
693    /// let result = writer.finish().await?;
694    /// println!("Inserted {} rows", result.rows_affected);
695    /// # Ok(())
696    /// # }
697    /// ```
698    pub async fn bulk_insert(
699        &mut self,
700        builder: &crate::bulk::BulkInsertBuilder,
701    ) -> Result<crate::bulk::BulkWriter<'_, S>> {
702        use tds_protocol::token::{ColMetaData, Token};
703
704        tracing::debug!(
705            table = builder.table_name(),
706            columns = builder.columns().len(),
707            "starting bulk insert"
708        );
709
710        // Step 1: Query the server for column metadata.
711        // This gives us the exact type encoding the server expects for BulkLoad,
712        // following the pattern established by Tiberius.
713        let meta_query = format!("SELECT TOP 0 * FROM {}", builder.table_name());
714        let deadline = self.command_deadline();
715        let canceller = self.connection_cancel_handle();
716        let message = run_with_deadline(
717            async {
718                self.send_sql_batch(&meta_query).await?;
719                self.read_response_message().await
720            },
721            deadline,
722            canceller,
723        )
724        .await?;
725        self.in_flight = false;
726
727        // Capture both the raw COLMETADATA bytes and parsed column info
728        let raw_payload = message.payload.clone();
729        let mut parser = self.create_parser(message.payload);
730        let mut server_metadata: Option<ColMetaData> = None;
731        let mut meta_start: usize = 0;
732        let mut meta_end: usize = 0;
733
734        loop {
735            let pos_before = raw_payload.len() - parser.remaining();
736            let token = parser.next_token_with_metadata(server_metadata.as_ref())?;
737            let pos_after = raw_payload.len() - parser.remaining();
738            let Some(token) = token else { break };
739
740            match token {
741                Token::ColMetaData(meta) => {
742                    meta_start = pos_before;
743                    meta_end = pos_after;
744                    server_metadata = Some(meta);
745                }
746                Token::Done(_) => break,
747                _ => {}
748            }
749        }
750
751        // Reject deprecated TEXT/NTEXT/IMAGE columns reported by the server.
752        // These types require a legacy TEXTPTR wire format that this driver
753        // does not support — users should migrate the column to VARCHAR(MAX) /
754        // NVARCHAR(MAX) / VARBINARY(MAX).
755        if let Some(ref meta) = server_metadata {
756            use tds_protocol::types::TypeId;
757            for col in meta.columns.iter() {
758                let (rejected, replacement) = match col.type_id {
759                    TypeId::Text => (Some("TEXT"), "VARCHAR(MAX)"),
760                    TypeId::NText => (Some("NTEXT"), "NVARCHAR(MAX)"),
761                    TypeId::Image => (Some("IMAGE"), "VARBINARY(MAX)"),
762                    _ => (None, ""),
763                };
764                if let Some(sql_type) = rejected {
765                    return Err(Error::from(mssql_types::TypeError::UnsupportedType {
766                        sql_type: sql_type.to_string(),
767                        reason: format!(
768                            "column `{}` in table `{}` is {} — TEXT/NTEXT/IMAGE \
769                             are not supported. Alter the column to {} instead \
770                             (Microsoft deprecated TEXT/NTEXT/IMAGE in SQL \
771                             Server 2005).",
772                            col.name,
773                            builder.table_name(),
774                            sql_type,
775                            replacement,
776                        ),
777                    }));
778                }
779            }
780        }
781
782        // Step 2: Send INSERT BULK statement to put server in bulk load mode
783        let stmt = builder.build_insert_bulk_statement()?;
784        let deadline = self.command_deadline();
785        let canceller = self.connection_cancel_handle();
786        run_with_deadline(
787            async {
788                self.send_sql_batch(&stmt).await?;
789                self.read_execute_result().await
790            },
791            deadline,
792            canceller,
793        )
794        .await?;
795
796        // Step 3: Create bulk writer with server's metadata
797        let raw_meta = if meta_end > meta_start {
798            Some(raw_payload.slice(meta_start..meta_end))
799        } else {
800            None
801        };
802
803        let server_cols = server_metadata.as_ref().map(|m| m.columns.as_slice());
804        let bulk = crate::bulk::BulkInsert::new_with_server_metadata(
805            builder.columns().to_vec(),
806            builder.options().batch_size,
807            raw_meta,
808            server_cols,
809        );
810
811        Ok(crate::bulk::BulkWriter::new(self, bulk))
812    }
813
814    /// Start a bulk insert without querying the server for column metadata.
815    ///
816    /// Unlike [`bulk_insert()`](Self::bulk_insert), this method does not send
817    /// `SELECT TOP 0 * FROM table` to discover column types. Instead, the
818    /// column metadata is constructed from the `BulkColumn` types provided
819    /// on the builder. This saves a round-trip when the schema is known.
820    ///
821    /// # Caveats
822    ///
823    /// The caller must ensure `BulkColumn` entries match the target table's
824    /// column definitions exactly. Mismatched types, lengths, precision/scale,
825    /// or column ordering will cause the server to reject the BulkLoad packet.
826    ///
827    /// For most use cases, prefer [`bulk_insert()`](Self::bulk_insert) — the
828    /// extra round-trip is usually negligible and the server-supplied metadata
829    /// is guaranteed correct.
830    pub async fn bulk_insert_without_schema_discovery(
831        &mut self,
832        builder: &crate::bulk::BulkInsertBuilder,
833    ) -> Result<crate::bulk::BulkWriter<'_, S>> {
834        tracing::debug!(
835            table = builder.table_name(),
836            columns = builder.columns().len(),
837            "starting bulk insert (no schema discovery)"
838        );
839
840        // Send INSERT BULK statement to put server in bulk load mode
841        let stmt = builder.build_insert_bulk_statement()?;
842        let deadline = self.command_deadline();
843        let canceller = self.connection_cancel_handle();
844        run_with_deadline(
845            async {
846                self.send_sql_batch(&stmt).await?;
847                self.read_execute_result().await
848            },
849            deadline,
850            canceller,
851        )
852        .await?;
853
854        // Create bulk writer with hand-crafted metadata
855        let bulk =
856            crate::bulk::BulkInsert::new(builder.columns().to_vec(), builder.options().batch_size);
857
858        Ok(crate::bulk::BulkWriter::new(self, bulk))
859    }
860
861    /// Send bulk load data as a BulkLoad (0x07) message and read the server response.
862    ///
863    /// Used internally by [`crate::bulk::BulkWriter::finish()`] to transmit accumulated
864    /// row data after the `INSERT BULK` statement has been acknowledged.
865    pub(crate) async fn send_and_read_bulk_load(&mut self, payload: bytes::Bytes) -> Result<u64> {
866        let max_packet = self.config.packet_size as usize;
867
868        self.in_flight = true;
869        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
870
871        match connection {
872            #[cfg(feature = "tls")]
873            ConnectionHandle::Tls(conn) => {
874                conn.send_message(PacketType::BulkLoad, payload, max_packet)
875                    .await?;
876            }
877            #[cfg(feature = "tls")]
878            ConnectionHandle::TlsPrelogin(conn) => {
879                conn.send_message(PacketType::BulkLoad, payload, max_packet)
880                    .await?;
881            }
882            ConnectionHandle::Plain(conn) => {
883                conn.send_message(PacketType::BulkLoad, payload, max_packet)
884                    .await?;
885            }
886        }
887
888        // Read the server's Done response with row count
889        self.read_execute_result().await
890    }
891
892    /// Execute a query with named parameters and return a streaming result set.
893    ///
894    /// This method accepts [`NamedParam`](crate::to_params::NamedParam) values,
895    /// making it compatible with the [`ToParams`](crate::to_params::ToParams) trait
896    /// and the `#[derive(ToParams)]` macro.
897    ///
898    /// # Example
899    ///
900    /// ```rust,no_run
901    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
902    /// use mssql_client::{NamedParam, ToParams};
903    ///
904    /// // With derive macro:
905    /// #[derive(mssql_derive::ToParams)]
906    /// struct UserQuery { name: String }
907    ///
908    /// let q = UserQuery { name: "Alice".into() };
909    /// let rows = client.query_named(
910    ///     "SELECT * FROM users WHERE name = @name",
911    ///     &q.to_params()?,
912    /// ).await?;
913    ///
914    /// // Or manually:
915    /// let params = vec![NamedParam::from_value("name", &"Alice")?];
916    /// let rows = client.query_named(
917    ///     "SELECT * FROM users WHERE name = @name",
918    ///     &params,
919    /// ).await?;
920    /// # let _ = rows;
921    /// # Ok(())
922    /// # }
923    /// ```
924    pub async fn query_named<'a>(
925        &'a mut self,
926        sql: &str,
927        params: &[crate::to_params::NamedParam],
928    ) -> Result<QueryStream<'a>> {
929        tracing::debug!(
930            sql = sql,
931            params_count = params.len(),
932            "executing query with named parameters"
933        );
934
935        if params.is_empty() {
936            self.send_sql_batch(sql).await?;
937        } else {
938            let rpc_params =
939                Self::convert_named_params(params, self.send_unicode(), self.server_collation())?;
940            let rpc = RpcRequest::execute_sql(sql, rpc_params);
941            self.send_rpc(&rpc).await?;
942        }
943
944        let resp = self.read_query_response().await?;
945        #[cfg(feature = "always-encrypted")]
946        {
947            Ok(QueryStream::from_raw(
948                resp.columns,
949                resp.pending_rows,
950                resp.meta,
951                resp.decryptor,
952            ))
953        }
954        #[cfg(not(feature = "always-encrypted"))]
955        {
956            Ok(QueryStream::from_raw(
957                resp.columns,
958                resp.pending_rows,
959                resp.meta,
960            ))
961        }
962    }
963
964    /// Execute a statement with named parameters.
965    ///
966    /// Returns the number of affected rows. This is the named-parameter
967    /// counterpart of [`execute()`](Client::execute), compatible with the
968    /// [`ToParams`](crate::to_params::ToParams) trait.
969    ///
970    /// # Example
971    ///
972    /// ```rust,no_run
973    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
974    /// use mssql_client::NamedParam;
975    ///
976    /// let params = vec![
977    ///     NamedParam::from_value("name", &"Alice")?,
978    ///     NamedParam::from_value("email", &"alice@example.com")?,
979    /// ];
980    /// let rows_affected = client.execute_named(
981    ///     "INSERT INTO users (name, email) VALUES (@name, @email)",
982    ///     &params,
983    /// ).await?;
984    /// # let _ = rows_affected;
985    /// # Ok(())
986    /// # }
987    /// ```
988    pub async fn execute_named(
989        &mut self,
990        sql: &str,
991        params: &[crate::to_params::NamedParam],
992    ) -> Result<u64> {
993        tracing::debug!(
994            sql = sql,
995            params_count = params.len(),
996            "executing statement with named parameters"
997        );
998
999        let deadline = self.command_deadline();
1000        let canceller = self.connection_cancel_handle();
1001        run_with_deadline(
1002            async {
1003                if params.is_empty() {
1004                    self.send_sql_batch(sql).await?;
1005                } else {
1006                    let rpc_params = Self::convert_named_params(
1007                        params,
1008                        self.send_unicode(),
1009                        self.server_collation(),
1010                    )?;
1011                    let rpc = RpcRequest::execute_sql(sql, rpc_params);
1012                    self.send_rpc(&rpc).await?;
1013                }
1014
1015                self.read_execute_result().await
1016            },
1017            deadline,
1018            canceller,
1019        )
1020        .await
1021    }
1022
1023    /// Whether string parameters are sent as NVARCHAR (Unicode).
1024    pub(crate) fn send_unicode(&self) -> bool {
1025        self.config.send_string_parameters_as_unicode
1026    }
1027
1028    /// Server's default collation, captured from ENVCHANGE during login.
1029    pub(crate) fn server_collation(&self) -> Option<&tds_protocol::token::Collation> {
1030        self.server_collation.as_ref()
1031    }
1032
1033    /// Shared implementation behind `query_stream` for both `Ready` and
1034    /// `InTransaction`. Sends the request, then pulls packets until the first
1035    /// result set's `ColMetaData` (resolving columns and any Always Encrypted
1036    /// decryptor up front) before handing back a [`RowStream`].
1037    pub(crate) async fn query_stream_inner<'a>(
1038        &'a mut self,
1039        sql: &str,
1040        params: &[&(dyn crate::ToSql + Sync)],
1041    ) -> Result<crate::row_stream::RowStream<'a, S>> {
1042        use crate::client::response::server_token_to_error;
1043        use crate::row_source::{Pull, RowSource};
1044        use tds_protocol::token::Token;
1045
1046        tracing::debug!(sql = sql, params_count = params.len(), "streaming query");
1047
1048        // Send the request (same wire format as the buffered path).
1049        if params.is_empty() {
1050            self.send_sql_batch(sql).await?;
1051        } else {
1052            let rpc = self.build_parameterized_rpc(sql, params).await?;
1053            self.send_rpc(&rpc).await?;
1054        }
1055        self.in_flight = true;
1056
1057        #[cfg(feature = "always-encrypted")]
1058        let encryption_enabled = self.encryption_context.is_some();
1059        #[cfg(not(feature = "always-encrypted"))]
1060        let encryption_enabled = false;
1061
1062        let mut source = RowSource::new(encryption_enabled);
1063
1064        // Prelude: pull packets until the first result set's ColMetaData (so the
1065        // columns and any Always Encrypted decryptor are resolved up front), or
1066        // until a terminal Done/Error if there is no result set.
1067        loop {
1068            match source.pull()? {
1069                Pull::Token(Token::ColMetaData(meta)) => {
1070                    let columns = Self::build_columns(&meta);
1071                    #[cfg(feature = "always-encrypted")]
1072                    let decryptor = self
1073                        .resolve_decryptor(&meta)
1074                        .await?
1075                        .map(std::sync::Arc::new);
1076                    return Ok(crate::row_stream::RowStream::new(
1077                        self,
1078                        source,
1079                        columns,
1080                        meta,
1081                        #[cfg(feature = "always-encrypted")]
1082                        decryptor,
1083                    ));
1084                }
1085                Pull::Token(Token::Error(err)) => {
1086                    self.in_flight = false;
1087                    return Err(server_token_to_error(&err));
1088                }
1089                Pull::Token(Token::Done(done)) => {
1090                    if done.status.error {
1091                        self.in_flight = false;
1092                        return Err(Error::Query(
1093                            "query failed (server set error flag in DONE token)".to_string(),
1094                        ));
1095                    }
1096                    if !done.status.more {
1097                        // No result set (e.g. an INSERT) — an empty stream.
1098                        self.in_flight = false;
1099                        return Ok(crate::row_stream::RowStream::empty(self));
1100                    }
1101                    // More results may follow; keep looking for ColMetaData.
1102                }
1103                Pull::Token(Token::EnvChange(env)) => {
1104                    Self::process_transaction_env_change(&env, &mut self.transaction_descriptor);
1105                }
1106                Pull::Token(_) => {
1107                    // Info / Order / DoneProc / DoneInProc, etc. — keep pulling.
1108                }
1109                Pull::NeedMore => match self.read_response_packet().await? {
1110                    Some((payload, is_eom)) => source.push_packet(payload, is_eom),
1111                    None => {
1112                        self.in_flight = false;
1113                        return Err(Error::ConnectionClosed);
1114                    }
1115                },
1116                Pull::End => {
1117                    self.in_flight = false;
1118                    return Ok(crate::row_stream::RowStream::empty(self));
1119                }
1120            }
1121        }
1122    }
1123
1124    /// Shared implementation behind `query_stream_blob` for both `Ready` and
1125    /// `InTransaction`.
1126    pub(crate) async fn query_stream_blob_inner<'a>(
1127        &'a mut self,
1128        sql: &str,
1129        params: &[&(dyn crate::ToSql + Sync)],
1130    ) -> Result<crate::blob_stream::BlobStream<'a, S>> {
1131        use crate::client::response::server_token_to_error;
1132        use crate::row_source::{Pull, RowSource};
1133        use tds_protocol::token::Token;
1134
1135        if params.is_empty() {
1136            self.send_sql_batch(sql).await?;
1137        } else {
1138            let rpc = self.build_parameterized_rpc(sql, params).await?;
1139            self.send_rpc(&rpc).await?;
1140        }
1141        self.in_flight = true;
1142
1143        #[cfg(feature = "always-encrypted")]
1144        let encryption_enabled = self.encryption_context.is_some();
1145        #[cfg(not(feature = "always-encrypted"))]
1146        let encryption_enabled = false;
1147
1148        let mut source = RowSource::new(encryption_enabled);
1149
1150        loop {
1151            match source.pull()? {
1152                Pull::Token(Token::ColMetaData(meta)) => {
1153                    let blob_index = Self::validate_blob_result_set(&meta)?;
1154                    let (buf, eom) = source.into_parts();
1155                    return Ok(crate::blob_stream::BlobStream::new(
1156                        self,
1157                        buf,
1158                        eom,
1159                        encryption_enabled,
1160                        meta,
1161                        blob_index,
1162                    ));
1163                }
1164                Pull::Token(Token::Error(err)) => {
1165                    self.in_flight = false;
1166                    return Err(server_token_to_error(&err));
1167                }
1168                Pull::Token(Token::Done(_)) => {
1169                    self.in_flight = false;
1170                    return Err(Error::Protocol(
1171                        "query_stream_blob: query produced no result set".to_string(),
1172                    ));
1173                }
1174                Pull::Token(_) => {}
1175                Pull::NeedMore => match self.read_response_packet().await? {
1176                    Some((payload, is_eom)) => source.push_packet(payload, is_eom),
1177                    None => {
1178                        self.in_flight = false;
1179                        return Err(Error::ConnectionClosed);
1180                    }
1181                },
1182                Pull::End => {
1183                    self.in_flight = false;
1184                    return Err(Error::Protocol(
1185                        "query_stream_blob: query produced no result set".to_string(),
1186                    ));
1187                }
1188            }
1189        }
1190    }
1191
1192    /// Validate that a result set is shaped for [`query_stream_blob`] and return
1193    /// the index of its single trailing MAX column.
1194    fn validate_blob_result_set(meta: &tds_protocol::token::ColMetaData) -> Result<usize> {
1195        if meta.cek_table.is_some() {
1196            return Err(Error::Protocol(
1197                "query_stream_blob does not support Always Encrypted result sets".to_string(),
1198            ));
1199        }
1200        let max_cols: Vec<usize> = meta
1201            .columns
1202            .iter()
1203            .enumerate()
1204            .filter(|(_, c)| crate::blob_stream::is_plp_max(c))
1205            .map(|(i, _)| i)
1206            .collect();
1207        match max_cols.as_slice() {
1208            [] => Err(Error::Protocol(
1209                "query_stream_blob: result set has no MAX column — use query_stream".to_string(),
1210            )),
1211            [idx] if *idx == meta.columns.len() - 1 => Ok(*idx),
1212            [_] => Err(Error::Protocol(
1213                "query_stream_blob: the MAX column must be the last column".to_string(),
1214            )),
1215            _ => Err(Error::Protocol(
1216                "query_stream_blob: result set has more than one MAX column".to_string(),
1217            )),
1218        }
1219    }
1220}
1221
1222impl Client<Ready> {
1223    /// Mark this connection as needing a reset on next use.
1224    ///
1225    /// Called by the connection pool when a connection is returned.
1226    /// The next SQL batch or RPC will include the RESETCONNECTION flag
1227    /// in the TDS packet header, causing SQL Server to reset connection
1228    /// state (temp tables, SET options, transaction isolation level, etc.)
1229    /// before executing the command.
1230    ///
1231    /// This is more efficient than calling `sp_reset_connection` as a
1232    /// separate command because it's handled at the TDS protocol level.
1233    pub fn mark_needs_reset(&mut self) {
1234        self.needs_reset = true;
1235    }
1236
1237    /// Check if this connection needs a reset.
1238    ///
1239    /// Returns true if `mark_needs_reset()` was called and the reset
1240    /// hasn't been performed yet.
1241    #[must_use]
1242    pub fn needs_reset(&self) -> bool {
1243        self.needs_reset
1244    }
1245
1246    /// Execute a query and return a result set with lazy per-row decoding.
1247    ///
1248    /// Per ADR-007 the full response is buffered in memory and each row is
1249    /// *decoded* on demand as you iterate — this is not incremental network
1250    /// streaming, so peak memory tracks the response size. Use
1251    /// `.collect_all()` if you want all rows materialized into a `Vec` up
1252    /// front.
1253    ///
1254    /// # Example
1255    ///
1256    /// ```rust,no_run
1257    /// # use mssql_client::Row;
1258    /// # fn process(_: &Row) {}
1259    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1260    /// // Streaming (synchronous iteration over the result set)
1261    /// let stream = client.query("SELECT * FROM users WHERE id = @p1", &[&1]).await?;
1262    /// for row in stream {
1263    ///     let row = row?;
1264    ///     process(&row);
1265    /// }
1266    ///
1267    /// // Buffered (loads all into memory)
1268    /// let rows: Vec<Row> = client
1269    ///     .query("SELECT * FROM small_table", &[])
1270    ///     .await?
1271    ///     .collect_all()
1272    ///     .await?;
1273    /// # let _ = rows;
1274    /// # Ok(())
1275    /// # }
1276    /// ```
1277    pub async fn query<'a>(
1278        &'a mut self,
1279        sql: &str,
1280        params: &[&(dyn crate::ToSql + Sync)],
1281    ) -> Result<QueryStream<'a>> {
1282        let deadline = self.command_deadline();
1283        self.query_inner(sql, params, deadline).await
1284    }
1285
1286    /// Shared query implementation with an explicit command deadline.
1287    async fn query_inner<'a>(
1288        &'a mut self,
1289        sql: &str,
1290        params: &[&(dyn crate::ToSql + Sync)],
1291        deadline: Option<std::time::Duration>,
1292    ) -> Result<QueryStream<'a>> {
1293        tracing::debug!(sql = sql, params_count = params.len(), "executing query");
1294
1295        #[cfg(feature = "otel")]
1296        let instrumentation = self.instrumentation.clone();
1297        #[cfg(feature = "otel")]
1298        let mut span = instrumentation.query_span(sql);
1299        #[cfg(feature = "otel")]
1300        let timer = crate::instrumentation::OperationTimer::start(
1301            crate::instrumentation::extract_operation(sql),
1302        );
1303
1304        let canceller = self.cancel_handle();
1305        let result = run_with_deadline(
1306            async {
1307                if params.is_empty() {
1308                    // Simple query without parameters - use SQL batch
1309                    self.send_sql_batch(sql).await?;
1310                } else {
1311                    // Parameterized query - sp_executesql (encrypts Always Encrypted params).
1312                    let rpc = self.build_parameterized_rpc(sql, params).await?;
1313                    self.send_rpc(&rpc).await?;
1314                }
1315
1316                // Read complete response including columns and rows
1317                self.read_query_response().await
1318            },
1319            deadline,
1320            canceller,
1321        )
1322        .await;
1323
1324        #[cfg(feature = "otel")]
1325        match &result {
1326            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1327            Err(e) => InstrumentationContext::record_error(&mut span, e),
1328        }
1329        #[cfg(feature = "otel")]
1330        timer.finish(instrumentation.metrics(), result.is_ok());
1331
1332        // Drop the span before returning
1333        #[cfg(feature = "otel")]
1334        drop(span);
1335
1336        let resp = result?;
1337        #[cfg(feature = "always-encrypted")]
1338        {
1339            Ok(QueryStream::from_raw(
1340                resp.columns,
1341                resp.pending_rows,
1342                resp.meta,
1343                resp.decryptor,
1344            ))
1345        }
1346        #[cfg(not(feature = "always-encrypted"))]
1347        {
1348            Ok(QueryStream::from_raw(
1349                resp.columns,
1350                resp.pending_rows,
1351                resp.meta,
1352            ))
1353        }
1354    }
1355
1356    /// Execute a query and stream rows incrementally from the network.
1357    ///
1358    /// Unlike [`query`](Self::query) — which buffers the whole response in
1359    /// memory before returning — this reads TDS packets on demand as rows are
1360    /// pulled, so peak memory is roughly one packet plus one row regardless of
1361    /// result-set size. Use it for large result sets; use [`query`](Self::query)
1362    /// for the common small-result case where the buffered, synchronously
1363    /// iterable [`QueryStream`] is more convenient.
1364    ///
1365    /// The returned [`RowStream`](crate::RowStream) borrows the client for its
1366    /// lifetime, so no other request can run on this connection until the stream
1367    /// is consumed or dropped. Also available on `Client<InTransaction>` to
1368    /// stream within a transaction.
1369    ///
1370    /// # Example
1371    ///
1372    /// ```rust,no_run
1373    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1374    /// let mut stream = client.query_stream("SELECT id FROM big_table", &[]).await?;
1375    /// while let Some(row) = stream.try_next().await? {
1376    ///     let id: i32 = row.get_by_name("id")?;
1377    ///     let _ = id;
1378    /// }
1379    /// # Ok(())
1380    /// # }
1381    /// ```
1382    pub async fn query_stream<'a>(
1383        &'a mut self,
1384        sql: &str,
1385        params: &[&(dyn crate::ToSql + Sync)],
1386    ) -> Result<crate::row_stream::RowStream<'a, Ready>> {
1387        self.query_stream_inner(sql, params).await
1388    }
1389
1390    /// Execute a query and stream a row's trailing MAX column from the network.
1391    ///
1392    /// For result sets whose last column is a single MAX type
1393    /// (`VARBINARY(MAX)`, `NVARCHAR(MAX)`, `VARCHAR(MAX)`, `XML`), this reads
1394    /// that column's bytes incrementally from the socket instead of
1395    /// materializing the cell — so a multi-GB BLOB can be streamed to a sink in
1396    /// bounded memory. The leading (scalar) columns are decoded eagerly into the
1397    /// per-row [`Row`](crate::Row).
1398    ///
1399    /// The MAX column must be the **last** column. The returned
1400    /// [`BlobStream`](crate::BlobStream) yields scalar [`Row`](crate::Row)s via
1401    /// [`next`](crate::BlobStream::next); read each row's blob with
1402    /// [`read_chunk`](crate::BlobStream::read_chunk) /
1403    /// [`copy_blob_to`](crate::BlobStream::copy_blob_to) before advancing. Also
1404    /// available on `Client<InTransaction>`.
1405    ///
1406    /// # Errors
1407    ///
1408    /// Returns an error if the result set has no trailing MAX column, has more
1409    /// than one MAX column, the MAX column is not last, or the result set uses
1410    /// Always Encrypted (not yet supported on this path).
1411    pub async fn query_stream_blob<'a>(
1412        &'a mut self,
1413        sql: &str,
1414        params: &[&(dyn crate::ToSql + Sync)],
1415    ) -> Result<crate::blob_stream::BlobStream<'a, Ready>> {
1416        self.query_stream_blob_inner(sql, params).await
1417    }
1418
1419    /// Execute a query with a specific timeout.
1420    ///
1421    /// This overrides the default `command_timeout` from the connection configuration
1422    /// for this specific query. If the query does not complete within the specified
1423    /// duration, the driver sends an Attention packet to cancel it server-side,
1424    /// drains the acknowledgement, and returns [`Error::CommandTimeout`] with the
1425    /// connection left usable for the next request.
1426    ///
1427    /// # Arguments
1428    ///
1429    /// * `sql` - The SQL query to execute
1430    /// * `params` - Query parameters
1431    /// * `timeout_duration` - Maximum time to wait for the query to complete
1432    ///
1433    /// # Example
1434    ///
1435    /// ```rust,no_run
1436    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1437    /// use std::time::Duration;
1438    ///
1439    /// // Execute with a 5-second timeout
1440    /// let rows = client
1441    ///     .query_with_timeout(
1442    ///         "SELECT * FROM large_table",
1443    ///         &[],
1444    ///         Duration::from_secs(5),
1445    ///     )
1446    ///     .await?;
1447    /// # let _ = rows;
1448    /// # Ok(())
1449    /// # }
1450    /// ```
1451    pub async fn query_with_timeout<'a>(
1452        &'a mut self,
1453        sql: &str,
1454        params: &[&(dyn crate::ToSql + Sync)],
1455        timeout_duration: std::time::Duration,
1456    ) -> Result<QueryStream<'a>> {
1457        self.query_inner(sql, params, Some(timeout_duration)).await
1458    }
1459
1460    /// Execute a batch that may return multiple result sets.
1461    ///
1462    /// This is useful for stored procedures or SQL batches that contain
1463    /// multiple SELECT statements.
1464    ///
1465    /// # Example
1466    ///
1467    /// ```rust,no_run
1468    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1469    /// // Execute a batch with multiple SELECT statements
1470    /// let mut results = client.query_multiple(
1471    ///     "SELECT 1 AS a; SELECT 2 AS b, 3 AS c;",
1472    ///     &[]
1473    /// ).await?;
1474    ///
1475    /// // Process first result set
1476    /// while let Some(row) = results.next_row().await? {
1477    ///     println!("Result 1: {:?}", row);
1478    /// }
1479    ///
1480    /// // Move to second result set
1481    /// if results.next_result().await? {
1482    ///     while let Some(row) = results.next_row().await? {
1483    ///         println!("Result 2: {:?}", row);
1484    ///     }
1485    /// }
1486    /// # Ok(())
1487    /// # }
1488    /// ```
1489    pub async fn query_multiple<'a>(
1490        &'a mut self,
1491        sql: &str,
1492        params: &[&(dyn crate::ToSql + Sync)],
1493    ) -> Result<MultiResultStream<'a>> {
1494        tracing::debug!(
1495            sql = sql,
1496            params_count = params.len(),
1497            "executing multi-result query"
1498        );
1499
1500        let deadline = self.command_deadline();
1501        let canceller = self.connection_cancel_handle();
1502        let result_sets = run_with_deadline(
1503            async {
1504                if params.is_empty() {
1505                    // Simple batch without parameters - use SQL batch
1506                    self.send_sql_batch(sql).await?;
1507                } else {
1508                    // Parameterized query - sp_executesql (encrypts Always Encrypted params).
1509                    let rpc = self.build_parameterized_rpc(sql, params).await?;
1510                    self.send_rpc(&rpc).await?;
1511                }
1512
1513                // Read all result sets
1514                self.read_multi_result_response().await
1515            },
1516            deadline,
1517            canceller,
1518        )
1519        .await?;
1520        Ok(MultiResultStream::new(result_sets))
1521    }
1522
1523    /// Execute a query that doesn't return rows.
1524    ///
1525    /// Returns the number of affected rows.
1526    pub async fn execute(
1527        &mut self,
1528        sql: &str,
1529        params: &[&(dyn crate::ToSql + Sync)],
1530    ) -> Result<u64> {
1531        let deadline = self.command_deadline();
1532        self.execute_inner(sql, params, deadline).await
1533    }
1534
1535    /// Shared execute implementation with an explicit command deadline.
1536    async fn execute_inner(
1537        &mut self,
1538        sql: &str,
1539        params: &[&(dyn crate::ToSql + Sync)],
1540        deadline: Option<std::time::Duration>,
1541    ) -> Result<u64> {
1542        tracing::debug!(
1543            sql = sql,
1544            params_count = params.len(),
1545            "executing statement"
1546        );
1547
1548        #[cfg(feature = "otel")]
1549        let instrumentation = self.instrumentation.clone();
1550        #[cfg(feature = "otel")]
1551        let mut span = instrumentation.query_span(sql);
1552        #[cfg(feature = "otel")]
1553        let timer = crate::instrumentation::OperationTimer::start(
1554            crate::instrumentation::extract_operation(sql),
1555        );
1556
1557        let canceller = self.cancel_handle();
1558        let result = run_with_deadline(
1559            async {
1560                if params.is_empty() {
1561                    // Simple statement without parameters - use SQL batch
1562                    self.send_sql_batch(sql).await?;
1563                } else {
1564                    // Parameterized statement - sp_executesql (encrypts Always Encrypted params).
1565                    let rpc = self.build_parameterized_rpc(sql, params).await?;
1566                    self.send_rpc(&rpc).await?;
1567                }
1568
1569                // Read response and get row count
1570                self.read_execute_result().await
1571            },
1572            deadline,
1573            canceller,
1574        )
1575        .await;
1576
1577        #[cfg(feature = "otel")]
1578        match &result {
1579            Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
1580            Err(e) => InstrumentationContext::record_error(&mut span, e),
1581        }
1582        #[cfg(feature = "otel")]
1583        timer.finish(instrumentation.metrics(), result.is_ok());
1584
1585        // Drop the span before returning
1586        #[cfg(feature = "otel")]
1587        drop(span);
1588
1589        result
1590    }
1591
1592    /// Execute a statement with a specific timeout.
1593    ///
1594    /// This overrides the default `command_timeout` from the connection configuration
1595    /// for this specific statement. If the statement does not complete within the
1596    /// specified duration, the driver sends an Attention packet to cancel it
1597    /// server-side, drains the acknowledgement, and returns
1598    /// [`Error::CommandTimeout`] with the connection left usable.
1599    ///
1600    /// # Arguments
1601    ///
1602    /// * `sql` - The SQL statement to execute
1603    /// * `params` - Statement parameters
1604    /// * `timeout_duration` - Maximum time to wait for the statement to complete
1605    ///
1606    /// # Example
1607    ///
1608    /// ```rust,no_run
1609    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1610    /// use std::time::Duration;
1611    ///
1612    /// // Execute with a 10-second timeout
1613    /// let rows_affected = client
1614    ///     .execute_with_timeout(
1615    ///         "UPDATE large_table SET status = @p1",
1616    ///         &[&"processed"],
1617    ///         Duration::from_secs(10),
1618    ///     )
1619    ///     .await?;
1620    /// # let _ = rows_affected;
1621    /// # Ok(())
1622    /// # }
1623    /// ```
1624    pub async fn execute_with_timeout(
1625        &mut self,
1626        sql: &str,
1627        params: &[&(dyn crate::ToSql + Sync)],
1628        timeout_duration: std::time::Duration,
1629    ) -> Result<u64> {
1630        self.execute_inner(sql, params, Some(timeout_duration))
1631            .await
1632    }
1633
1634    /// Begin a transaction.
1635    ///
1636    /// This transitions the client from `Ready` to `InTransaction` state.
1637    /// Per MS-TDS spec, the server returns a transaction descriptor in the
1638    /// BeginTransaction EnvChange token that must be included in subsequent
1639    /// ALL_HEADERS sections.
1640    pub async fn begin_transaction(mut self) -> Result<Client<InTransaction>> {
1641        tracing::debug!("beginning transaction");
1642
1643        #[cfg(feature = "otel")]
1644        let instrumentation = self.instrumentation.clone();
1645        #[cfg(feature = "otel")]
1646        let mut span = instrumentation.transaction_span("BEGIN");
1647
1648        // Execute BEGIN TRANSACTION and extract the transaction descriptor
1649        let result = async {
1650            self.send_sql_batch("BEGIN TRANSACTION").await?;
1651            self.read_transaction_begin_result().await
1652        }
1653        .await;
1654
1655        #[cfg(feature = "otel")]
1656        match &result {
1657            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1658            Err(e) => InstrumentationContext::record_error(&mut span, e),
1659        }
1660
1661        // Drop the span before moving instrumentation
1662        #[cfg(feature = "otel")]
1663        drop(span);
1664
1665        let transaction_descriptor = result?;
1666
1667        Ok(Client {
1668            config: self.config,
1669            _state: PhantomData,
1670            connection: self.connection,
1671            server_version: self.server_version,
1672            current_database: self.current_database,
1673            server_collation: self.server_collation,
1674            statement_cache: self.statement_cache,
1675            transaction_descriptor, // Store the descriptor from server
1676            needs_reset: self.needs_reset,
1677            in_flight: self.in_flight,
1678            #[cfg(feature = "otel")]
1679            instrumentation: self.instrumentation,
1680            #[cfg(feature = "always-encrypted")]
1681            encryption_context: self.encryption_context,
1682        })
1683    }
1684
1685    /// Begin a transaction with a specific isolation level.
1686    ///
1687    /// This transitions the client from `Ready` to `InTransaction` state
1688    /// with the specified isolation level.
1689    ///
1690    /// # Example
1691    ///
1692    /// ```rust,no_run
1693    /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1694    /// use mssql_client::IsolationLevel;
1695    ///
1696    /// let tx = client.begin_transaction_with_isolation(IsolationLevel::Serializable).await?;
1697    /// // All operations in this transaction use SERIALIZABLE isolation
1698    /// tx.commit().await?;
1699    /// # Ok(())
1700    /// # }
1701    /// ```
1702    pub async fn begin_transaction_with_isolation(
1703        mut self,
1704        isolation_level: crate::transaction::IsolationLevel,
1705    ) -> Result<Client<InTransaction>> {
1706        tracing::debug!(
1707            isolation_level = %isolation_level.name(),
1708            "beginning transaction with isolation level"
1709        );
1710
1711        #[cfg(feature = "otel")]
1712        let instrumentation = self.instrumentation.clone();
1713        #[cfg(feature = "otel")]
1714        let mut span = instrumentation.transaction_span("BEGIN");
1715
1716        // First set the isolation level
1717        let result = async {
1718            self.send_sql_batch(isolation_level.as_sql()).await?;
1719            self.read_execute_result().await?;
1720
1721            // Then begin the transaction
1722            self.send_sql_batch("BEGIN TRANSACTION").await?;
1723            self.read_transaction_begin_result().await
1724        }
1725        .await;
1726
1727        #[cfg(feature = "otel")]
1728        match &result {
1729            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1730            Err(e) => InstrumentationContext::record_error(&mut span, e),
1731        }
1732
1733        #[cfg(feature = "otel")]
1734        drop(span);
1735
1736        let transaction_descriptor = result?;
1737
1738        Ok(Client {
1739            config: self.config,
1740            _state: PhantomData,
1741            connection: self.connection,
1742            server_version: self.server_version,
1743            current_database: self.current_database,
1744            server_collation: self.server_collation,
1745            statement_cache: self.statement_cache,
1746            transaction_descriptor,
1747            needs_reset: self.needs_reset,
1748            in_flight: self.in_flight,
1749            #[cfg(feature = "otel")]
1750            instrumentation: self.instrumentation,
1751            #[cfg(feature = "always-encrypted")]
1752            encryption_context: self.encryption_context,
1753        })
1754    }
1755
1756    /// Execute a simple query without parameters.
1757    ///
1758    /// This is useful for DDL statements and simple queries where you
1759    /// don't need to retrieve the affected row count.
1760    pub async fn simple_query(&mut self, sql: &str) -> Result<()> {
1761        tracing::debug!(sql = sql, "executing simple query");
1762
1763        // Send SQL batch
1764        self.send_sql_batch(sql).await?;
1765
1766        // Read and discard response
1767        let _ = self.read_execute_result().await?;
1768
1769        Ok(())
1770    }
1771
1772    /// Close the connection gracefully.
1773    pub async fn close(self) -> Result<()> {
1774        tracing::debug!("closing connection");
1775        Ok(())
1776    }
1777
1778    /// Get the current database name.
1779    #[must_use]
1780    pub fn database(&self) -> Option<&str> {
1781        self.config.database.as_deref()
1782    }
1783
1784    /// Get the server host.
1785    #[must_use]
1786    pub fn host(&self) -> &str {
1787        &self.config.host
1788    }
1789
1790    /// Get the server port.
1791    #[must_use]
1792    pub fn port(&self) -> u16 {
1793        self.config.port
1794    }
1795
1796    /// Check if the connection is currently in a transaction.
1797    ///
1798    /// This returns `true` if a transaction was started via raw SQL
1799    /// (`BEGIN TRANSACTION`) and has not yet been committed or rolled back.
1800    ///
1801    /// Note: This only tracks transactions started via raw SQL. Transactions
1802    /// started via the type-state API (`begin_transaction()`) result in a
1803    /// `Client<InTransaction>` which is a different type.
1804    ///
1805    /// # Example
1806    ///
1807    /// ```rust,no_run
1808    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1809    /// client.execute("BEGIN TRANSACTION", &[]).await?;
1810    /// assert!(client.is_in_transaction());
1811    ///
1812    /// client.execute("COMMIT", &[]).await?;
1813    /// assert!(!client.is_in_transaction());
1814    /// # Ok(())
1815    /// # }
1816    /// ```
1817    #[must_use]
1818    pub fn is_in_transaction(&self) -> bool {
1819        self.transaction_descriptor != 0
1820    }
1821
1822    /// Check if a request is in-flight (sent but response not fully read).
1823    ///
1824    /// Used by the connection pool to detect dirty connections that were
1825    /// interrupted mid-query (e.g., by `tokio::select!` or a timeout).
1826    /// A connection with an in-flight request has unread data in the TCP
1827    /// buffer and must be discarded rather than returned to the pool.
1828    #[must_use]
1829    pub fn is_in_flight(&self) -> bool {
1830        self.in_flight
1831    }
1832
1833    /// Report whether an Always Encrypted key-store provider with the given
1834    /// name is currently reachable through this client's encryption context.
1835    ///
1836    /// Returns `false` when the `always-encrypted` feature isn't enabled, when
1837    /// the connection was opened without `column_encryption` configured, or
1838    /// when no matching provider was registered.
1839    #[cfg(feature = "always-encrypted")]
1840    #[must_use]
1841    pub fn has_encryption_provider(&self, name: &str) -> bool {
1842        self.encryption_context
1843            .as_ref()
1844            .is_some_and(|ctx| ctx.has_provider(name))
1845    }
1846
1847    /// Get a handle for cancelling the current query.
1848    ///
1849    /// The cancel handle can be cloned and sent to other tasks, enabling
1850    /// cancellation of long-running queries from a separate async context.
1851    ///
1852    /// # Example
1853    ///
1854    /// ```rust,no_run
1855    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1856    /// use std::time::Duration;
1857    ///
1858    /// let cancel_handle = client.cancel_handle();
1859    ///
1860    /// // Spawn a task to cancel after 10 seconds
1861    /// let handle = tokio::spawn(async move {
1862    ///     tokio::time::sleep(Duration::from_secs(10)).await;
1863    ///     let _ = cancel_handle.cancel().await;
1864    /// });
1865    ///
1866    /// // This query will be cancelled if it runs longer than 10 seconds
1867    /// let result = client.query("SELECT * FROM very_large_table", &[]).await;
1868    /// # let _ = (handle, result);
1869    /// # Ok(())
1870    /// # }
1871    /// ```
1872    #[must_use]
1873    pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
1874        self.connection_cancel_handle()
1875    }
1876}
1877
1878/// # Drop Behavior
1879///
1880/// **`Client<InTransaction>` has no automatic rollback on drop.** If the client is
1881/// dropped without calling [`commit()`](Client::commit) or [`rollback()`](Client::rollback),
1882/// the transaction remains open on the server until the TCP connection closes
1883/// (at which point SQL Server automatically rolls back).
1884///
1885/// This is because `Drop` is synchronous and cannot perform the async I/O needed
1886/// to send a `ROLLBACK TRANSACTION` command.
1887///
1888/// ## Consequences of dropping without commit/rollback
1889///
1890/// - **Direct connections:** The transaction leaks until the OS TCP timeout
1891///   (potentially 30+ minutes), holding locks on any modified rows.
1892/// - **Pooled connections:** The pool detects the active transaction descriptor
1893///   and discards the connection rather than returning it to the idle pool
1894///   (see `PooledConnection::drop` in `mssql-driver-pool`).
1895///
1896/// ## Best practice
1897///
1898/// Always ensure `commit()` or `rollback()` is called. Use helper patterns
1899/// for error paths:
1900///
1901/// ```rust,no_run
1902/// # async fn do_work(_: &mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> { Ok(()) }
1903/// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1904/// let tx = client.begin_transaction().await?;
1905/// match do_work(&tx).await {
1906///     Ok(_) => { tx.commit().await?; }
1907///     Err(e) => { tx.rollback().await?; return Err(e); }
1908/// }
1909/// # Ok(())
1910/// # }
1911/// ```
1912impl Client<InTransaction> {
1913    /// Execute a query within the transaction and return a streaming result set.
1914    ///
1915    /// See [`Client<Ready>::query`] for usage examples.
1916    pub async fn query<'a>(
1917        &'a mut self,
1918        sql: &str,
1919        params: &[&(dyn crate::ToSql + Sync)],
1920    ) -> Result<QueryStream<'a>> {
1921        let deadline = self.command_deadline();
1922        self.query_inner(sql, params, deadline).await
1923    }
1924
1925    /// Shared query implementation with an explicit command deadline.
1926    async fn query_inner<'a>(
1927        &'a mut self,
1928        sql: &str,
1929        params: &[&(dyn crate::ToSql + Sync)],
1930        deadline: Option<std::time::Duration>,
1931    ) -> Result<QueryStream<'a>> {
1932        tracing::debug!(
1933            sql = sql,
1934            params_count = params.len(),
1935            "executing query in transaction"
1936        );
1937
1938        #[cfg(feature = "otel")]
1939        let instrumentation = self.instrumentation.clone();
1940        #[cfg(feature = "otel")]
1941        let mut span = instrumentation.query_span(sql);
1942        #[cfg(feature = "otel")]
1943        let timer = crate::instrumentation::OperationTimer::start(
1944            crate::instrumentation::extract_operation(sql),
1945        );
1946
1947        let canceller = self.cancel_handle();
1948        let result = run_with_deadline(
1949            async {
1950                if params.is_empty() {
1951                    // Simple query without parameters - use SQL batch
1952                    self.send_sql_batch(sql).await?;
1953                } else {
1954                    // Parameterized query - sp_executesql (encrypts Always Encrypted params).
1955                    let rpc = self.build_parameterized_rpc(sql, params).await?;
1956                    self.send_rpc(&rpc).await?;
1957                }
1958
1959                // Read complete response including columns and rows
1960                self.read_query_response().await
1961            },
1962            deadline,
1963            canceller,
1964        )
1965        .await;
1966
1967        #[cfg(feature = "otel")]
1968        match &result {
1969            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1970            Err(e) => InstrumentationContext::record_error(&mut span, e),
1971        }
1972        #[cfg(feature = "otel")]
1973        timer.finish(instrumentation.metrics(), result.is_ok());
1974
1975        // Drop the span before returning
1976        #[cfg(feature = "otel")]
1977        drop(span);
1978
1979        let resp = result?;
1980        #[cfg(feature = "always-encrypted")]
1981        {
1982            Ok(QueryStream::from_raw(
1983                resp.columns,
1984                resp.pending_rows,
1985                resp.meta,
1986                resp.decryptor,
1987            ))
1988        }
1989        #[cfg(not(feature = "always-encrypted"))]
1990        {
1991            Ok(QueryStream::from_raw(
1992                resp.columns,
1993                resp.pending_rows,
1994                resp.meta,
1995            ))
1996        }
1997    }
1998
1999    /// Stream rows incrementally from the network within the transaction.
2000    ///
2001    /// Identical to [`Client<Ready>::query_stream`] except the query runs inside
2002    /// the open transaction. The returned [`RowStream`](crate::RowStream)
2003    /// borrows the transaction client for its lifetime, so the stream must be
2004    /// consumed or dropped before the transaction can be committed or rolled
2005    /// back.
2006    pub async fn query_stream<'a>(
2007        &'a mut self,
2008        sql: &str,
2009        params: &[&(dyn crate::ToSql + Sync)],
2010    ) -> Result<crate::row_stream::RowStream<'a, InTransaction>> {
2011        self.query_stream_inner(sql, params).await
2012    }
2013
2014    /// Stream a row's trailing MAX column from the network within the
2015    /// transaction.
2016    ///
2017    /// See [`Client<Ready>::query_stream_blob`] for semantics and constraints;
2018    /// the only difference is that the query runs inside the open transaction.
2019    pub async fn query_stream_blob<'a>(
2020        &'a mut self,
2021        sql: &str,
2022        params: &[&(dyn crate::ToSql + Sync)],
2023    ) -> Result<crate::blob_stream::BlobStream<'a, InTransaction>> {
2024        self.query_stream_blob_inner(sql, params).await
2025    }
2026
2027    /// Execute a statement within the transaction.
2028    ///
2029    /// Returns the number of affected rows.
2030    pub async fn execute(
2031        &mut self,
2032        sql: &str,
2033        params: &[&(dyn crate::ToSql + Sync)],
2034    ) -> Result<u64> {
2035        let deadline = self.command_deadline();
2036        self.execute_inner(sql, params, deadline).await
2037    }
2038
2039    /// Shared execute implementation with an explicit command deadline.
2040    async fn execute_inner(
2041        &mut self,
2042        sql: &str,
2043        params: &[&(dyn crate::ToSql + Sync)],
2044        deadline: Option<std::time::Duration>,
2045    ) -> Result<u64> {
2046        tracing::debug!(
2047            sql = sql,
2048            params_count = params.len(),
2049            "executing statement in transaction"
2050        );
2051
2052        #[cfg(feature = "otel")]
2053        let instrumentation = self.instrumentation.clone();
2054        #[cfg(feature = "otel")]
2055        let mut span = instrumentation.query_span(sql);
2056        #[cfg(feature = "otel")]
2057        let timer = crate::instrumentation::OperationTimer::start(
2058            crate::instrumentation::extract_operation(sql),
2059        );
2060
2061        let canceller = self.cancel_handle();
2062        let result = run_with_deadline(
2063            async {
2064                if params.is_empty() {
2065                    // Simple statement without parameters - use SQL batch
2066                    self.send_sql_batch(sql).await?;
2067                } else {
2068                    // Parameterized statement - sp_executesql (encrypts Always Encrypted params).
2069                    let rpc = self.build_parameterized_rpc(sql, params).await?;
2070                    self.send_rpc(&rpc).await?;
2071                }
2072
2073                // Read response and get row count
2074                self.read_execute_result().await
2075            },
2076            deadline,
2077            canceller,
2078        )
2079        .await;
2080
2081        #[cfg(feature = "otel")]
2082        match &result {
2083            Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
2084            Err(e) => InstrumentationContext::record_error(&mut span, e),
2085        }
2086        #[cfg(feature = "otel")]
2087        timer.finish(instrumentation.metrics(), result.is_ok());
2088
2089        // Drop the span before returning
2090        #[cfg(feature = "otel")]
2091        drop(span);
2092
2093        result
2094    }
2095
2096    /// Execute a query within the transaction with a specific timeout.
2097    ///
2098    /// See [`Client<Ready>::query_with_timeout`] for details.
2099    pub async fn query_with_timeout<'a>(
2100        &'a mut self,
2101        sql: &str,
2102        params: &[&(dyn crate::ToSql + Sync)],
2103        timeout_duration: std::time::Duration,
2104    ) -> Result<QueryStream<'a>> {
2105        self.query_inner(sql, params, Some(timeout_duration)).await
2106    }
2107
2108    /// Execute a statement within the transaction with a specific timeout.
2109    ///
2110    /// See [`Client<Ready>::execute_with_timeout`] for details.
2111    pub async fn execute_with_timeout(
2112        &mut self,
2113        sql: &str,
2114        params: &[&(dyn crate::ToSql + Sync)],
2115        timeout_duration: std::time::Duration,
2116    ) -> Result<u64> {
2117        self.execute_inner(sql, params, Some(timeout_duration))
2118            .await
2119    }
2120
2121    /// Open a FILESTREAM BLOB for async reading and/or writing.
2122    ///
2123    /// This method queries the server for the transaction context, then opens
2124    /// the FILESTREAM handle using the native Win32 `OpenSqlFilestream` API.
2125    ///
2126    /// # Arguments
2127    ///
2128    /// * `path` — The UNC path obtained from the T-SQL `column.PathName()` function.
2129    ///   Query this yourself before calling `open_filestream`:
2130    ///   ```sql
2131    ///   SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1
2132    ///   ```
2133    /// * `access` — Read, write, or read/write access mode.
2134    ///
2135    /// # Requirements
2136    ///
2137    /// - SQL Server must have FILESTREAM enabled (`sp_configure 'filestream access level', 2`)
2138    /// - The Microsoft OLE DB Driver for SQL Server must be installed on the client
2139    /// - The `FileStream` must be dropped before calling [`commit`] or [`rollback`]
2140    ///
2141    /// # Example
2142    ///
2143    /// ```text
2144    /// use mssql_client::FileStreamAccess;
2145    /// use tokio::io::AsyncReadExt;
2146    ///
2147    /// let mut tx = client.begin_transaction().await?;
2148    ///
2149    /// // Get the FILESTREAM path
2150    /// let rows = tx.query(
2151    ///     "SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1",
2152    ///     &[&doc_id],
2153    /// ).await?;
2154    /// let path: String = rows.into_iter().next().unwrap()?.get(0)?;
2155    ///
2156    /// // Open and read the BLOB
2157    /// let mut stream = tx.open_filestream(&path, FileStreamAccess::Read).await?;
2158    /// let mut data = Vec::new();
2159    /// stream.read_to_end(&mut data).await?;
2160    /// drop(stream);
2161    ///
2162    /// tx.commit().await?;
2163    /// ```
2164    #[cfg(all(windows, feature = "filestream"))]
2165    pub async fn open_filestream(
2166        &mut self,
2167        path: &str,
2168        access: crate::filestream::FileStreamAccess,
2169    ) -> Result<crate::filestream::FileStream> {
2170        tracing::debug!(path = path, ?access, "opening FILESTREAM BLOB");
2171
2172        // Get the transaction context from SQL Server.
2173        // This binds the file access to the current SQL transaction.
2174        let txn_context: Vec<u8> = {
2175            let rows = self
2176                .query("SELECT GET_FILESTREAM_TRANSACTION_CONTEXT()", &[])
2177                .await?;
2178            let mut ctx = None;
2179            for result in rows {
2180                let row = result?;
2181                ctx = Some(row.get::<Vec<u8>>(0)?);
2182            }
2183            ctx.ok_or_else(|| {
2184                Error::FileStream("GET_FILESTREAM_TRANSACTION_CONTEXT() returned no rows".into())
2185            })?
2186        };
2187
2188        crate::filestream::FileStream::open(path, access, &txn_context)
2189    }
2190
2191    /// Commit the transaction.
2192    ///
2193    /// This transitions the client back to `Ready` state.
2194    pub async fn commit(mut self) -> Result<Client<Ready>> {
2195        tracing::debug!("committing transaction");
2196
2197        #[cfg(feature = "otel")]
2198        let instrumentation = self.instrumentation.clone();
2199        #[cfg(feature = "otel")]
2200        let mut span = instrumentation.transaction_span("COMMIT");
2201
2202        // Execute COMMIT TRANSACTION
2203        let result = async {
2204            self.send_sql_batch("COMMIT TRANSACTION").await?;
2205            self.read_execute_result().await
2206        }
2207        .await;
2208
2209        #[cfg(feature = "otel")]
2210        match &result {
2211            Ok(_) => InstrumentationContext::record_success(&mut span, None),
2212            Err(e) => InstrumentationContext::record_error(&mut span, e),
2213        }
2214
2215        // Drop the span before moving instrumentation
2216        #[cfg(feature = "otel")]
2217        drop(span);
2218
2219        result?;
2220
2221        Ok(Client {
2222            config: self.config,
2223            _state: PhantomData,
2224            connection: self.connection,
2225            server_version: self.server_version,
2226            current_database: self.current_database,
2227            server_collation: self.server_collation,
2228            statement_cache: self.statement_cache,
2229            transaction_descriptor: 0, // Reset to auto-commit mode
2230            needs_reset: self.needs_reset,
2231            in_flight: self.in_flight,
2232            #[cfg(feature = "otel")]
2233            instrumentation: self.instrumentation,
2234            #[cfg(feature = "always-encrypted")]
2235            encryption_context: self.encryption_context,
2236        })
2237    }
2238
2239    /// Rollback the transaction.
2240    ///
2241    /// This transitions the client back to `Ready` state.
2242    pub async fn rollback(mut self) -> Result<Client<Ready>> {
2243        tracing::debug!("rolling back transaction");
2244
2245        #[cfg(feature = "otel")]
2246        let instrumentation = self.instrumentation.clone();
2247        #[cfg(feature = "otel")]
2248        let mut span = instrumentation.transaction_span("ROLLBACK");
2249
2250        // Execute ROLLBACK TRANSACTION
2251        let result = async {
2252            self.send_sql_batch("ROLLBACK TRANSACTION").await?;
2253            self.read_execute_result().await
2254        }
2255        .await;
2256
2257        #[cfg(feature = "otel")]
2258        match &result {
2259            Ok(_) => InstrumentationContext::record_success(&mut span, None),
2260            Err(e) => InstrumentationContext::record_error(&mut span, e),
2261        }
2262
2263        // Drop the span before moving instrumentation
2264        #[cfg(feature = "otel")]
2265        drop(span);
2266
2267        result?;
2268
2269        Ok(Client {
2270            config: self.config,
2271            _state: PhantomData,
2272            connection: self.connection,
2273            server_version: self.server_version,
2274            current_database: self.current_database,
2275            server_collation: self.server_collation,
2276            statement_cache: self.statement_cache,
2277            transaction_descriptor: 0, // Reset to auto-commit mode
2278            needs_reset: self.needs_reset,
2279            in_flight: self.in_flight,
2280            #[cfg(feature = "otel")]
2281            instrumentation: self.instrumentation,
2282            #[cfg(feature = "always-encrypted")]
2283            encryption_context: self.encryption_context,
2284        })
2285    }
2286
2287    /// Create a savepoint and return a handle for later rollback.
2288    ///
2289    /// The returned `SavePoint` handle contains the validated savepoint name.
2290    /// Use it with `rollback_to()` to partially undo transaction work.
2291    ///
2292    /// # Example
2293    ///
2294    /// ```rust,no_run
2295    /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
2296    /// let mut tx = client.begin_transaction().await?;
2297    /// tx.execute("INSERT INTO orders ...", &[]).await?;
2298    /// let sp = tx.save_point("before_items").await?;
2299    /// tx.execute("INSERT INTO items ...", &[]).await?;
2300    /// // Oops, rollback just the items
2301    /// tx.rollback_to(&sp).await?;
2302    /// tx.commit().await?;
2303    /// # Ok(())
2304    /// # }
2305    /// ```
2306    pub async fn save_point(&mut self, name: &str) -> Result<SavePoint> {
2307        crate::validation::validate_identifier(name)?;
2308        tracing::debug!(name = name, "creating savepoint");
2309
2310        // Execute SAVE TRANSACTION <name>
2311        // Note: name is validated by validate_identifier() to prevent SQL injection
2312        let sql = format!("SAVE TRANSACTION {name}");
2313        self.send_sql_batch(&sql).await?;
2314        self.read_execute_result().await?;
2315
2316        Ok(SavePoint::new(name.to_string()))
2317    }
2318
2319    /// Rollback to a savepoint.
2320    ///
2321    /// This rolls back all changes made after the savepoint was created,
2322    /// but keeps the transaction active. The savepoint remains valid and
2323    /// can be rolled back to again.
2324    ///
2325    /// # Example
2326    ///
2327    /// ```rust,no_run
2328    /// # async fn ex(mut tx: mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> {
2329    /// let sp = tx.save_point("checkpoint").await?;
2330    /// // ... do some work ...
2331    /// tx.rollback_to(&sp).await?;  // Undo changes since checkpoint
2332    /// // Transaction is still active, savepoint is still valid
2333    /// # Ok(())
2334    /// # }
2335    /// ```
2336    pub async fn rollback_to(&mut self, savepoint: &SavePoint) -> Result<()> {
2337        tracing::debug!(name = savepoint.name(), "rolling back to savepoint");
2338
2339        // Execute ROLLBACK TRANSACTION <name>
2340        // Note: savepoint name was validated during creation
2341        let sql = format!("ROLLBACK TRANSACTION {}", savepoint.name());
2342        self.send_sql_batch(&sql).await?;
2343        self.read_execute_result().await?;
2344
2345        Ok(())
2346    }
2347
2348    /// Release a savepoint (optional cleanup).
2349    ///
2350    /// Note: SQL Server doesn't have explicit savepoint release, but this
2351    /// method is provided for API completeness. The savepoint is automatically
2352    /// released when the transaction commits or rolls back.
2353    pub async fn release_savepoint(&mut self, savepoint: SavePoint) -> Result<()> {
2354        tracing::debug!(name = savepoint.name(), "releasing savepoint");
2355
2356        // SQL Server doesn't require explicit savepoint release
2357        // The savepoint is implicitly released on commit/rollback
2358        // This method exists for API completeness
2359        drop(savepoint);
2360        Ok(())
2361    }
2362
2363    /// Get a handle for cancelling the current query within the transaction.
2364    ///
2365    /// See [`Client<Ready>::cancel_handle`] for usage examples.
2366    #[must_use]
2367    pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
2368        self.connection_cancel_handle()
2369    }
2370}
2371
2372impl<S: ConnectionState> std::fmt::Debug for Client<S> {
2373    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2374        f.debug_struct("Client")
2375            .field("host", &self.config.host)
2376            .field("port", &self.config.port)
2377            .field("database", &self.config.database)
2378            .finish()
2379    }
2380}