Skip to main content

mssql_client/
client.rs

1//! SQL Server client implementation.
2//!
3//! ## DDL and statement routing
4//!
5//! [`Client::execute`] routes automatically by parameter count: with no
6//! parameters it sends a SQL batch (which permits DDL such as `CREATE` / `ALTER`
7//! / `DROP`); with parameters it uses `sp_executesql`, whose procedure context
8//! SQL Server forbids DDL in. Run DDL with an empty parameter slice:
9//!
10//! ```rust,no_run
11//! # async fn create_table(config: mssql_client::Config) -> Result<(), mssql_client::Error> {
12//! # let mut client = mssql_client::Client::connect(config).await?;
13//! client.execute("CREATE TABLE dbo.t (id INT)", &[]).await?;
14//! # Ok(())
15//! # }
16//! ```
17//!
18//! Use [`Client::simple_query`] for fire-and-forget batches (including
19//! multi-statement, `;`-separated DDL) when you don't need the affected-row count.
20
21// Allow unwrap/expect for chrono date construction with known-valid constant dates
22// and for regex patterns that are compile-time constants
23#![allow(clippy::unwrap_used, clippy::expect_used, clippy::needless_range_loop)]
24
25mod connect;
26mod params;
27mod response;
28
29use std::marker::PhantomData;
30
31use mssql_codec::connection::Connection;
32#[cfg(feature = "tls")]
33use mssql_tls::TlsStream;
34use tds_protocol::packet::PacketType;
35use tds_protocol::rpc::RpcRequest;
36use tds_protocol::token::{EnvChange, EnvChangeType};
37use tokio::net::TcpStream;
38
39use crate::config::Config;
40use crate::error::{Error, Result};
41#[cfg(feature = "otel")]
42use crate::instrumentation::InstrumentationContext;
43use crate::state::{ConnectionState, InTransaction, Ready};
44use crate::statement_cache::StatementCache;
45use crate::stream::{MultiResultStream, QueryStream};
46use crate::transaction::SavePoint;
47
48/// How long to wait for the server to acknowledge an Attention packet after
49/// a command timeout. SqlClient waits 5 seconds before dooming the
50/// connection; we match it.
51const ATTENTION_ACK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
52
53/// Run a network future under an optional command deadline.
54///
55/// On timeout this sends an Attention packet via `canceller` and then awaits
56/// the future so its own read loop drains the server's DONE_ATTN
57/// acknowledgement, leaving the connection clean before returning
58/// [`Error::CommandTimeout`]. This is the cancel-safe alternative to dropping
59/// the future (e.g. via `tokio::time::timeout`), which would leave unconsumed
60/// TDS data in the connection buffer and desync the next request.
61///
62/// The drain itself is bounded by [`ATTENTION_ACK_TIMEOUT`] — a hung server
63/// that never acknowledges the attention must not turn the timeout into an
64/// infinite wait. When the bound expires the connection is abandoned
65/// mid-response: `in_flight` stays set, so the pool discards the connection
66/// at check-in instead of reusing it.
67pub(crate) async fn run_with_deadline<F, T>(
68    fut: F,
69    deadline: Option<std::time::Duration>,
70    canceller: crate::cancel::CancelHandle,
71) -> Result<T>
72where
73    F: std::future::Future<Output = Result<T>>,
74{
75    let Some(d) = deadline else {
76        return fut.await;
77    };
78    tokio::pin!(fut);
79    tokio::select! {
80        biased;
81        res = &mut fut => res,
82        () = tokio::time::sleep(d) => {
83            // Signal cancellation, then let the in-flight read consume the
84            // server's attention acknowledgement so the connection stays usable.
85            let drain = async {
86                let _ = canceller.cancel().await;
87                let _ = (&mut fut).await;
88            };
89            if tokio::time::timeout(ATTENTION_ACK_TIMEOUT, drain).await.is_err() {
90                tracing::warn!(
91                    timeout = ?ATTENTION_ACK_TIMEOUT,
92                    "server did not acknowledge attention; abandoning the connection as dirty"
93                );
94            }
95            Err(Error::CommandTimeout)
96        }
97    }
98}
99
100/// SQL Server client with type-state connection management.
101///
102/// The generic parameter `S` represents the current connection state,
103/// ensuring at compile time that certain operations are only available
104/// in appropriate states.
105pub struct Client<S: ConnectionState> {
106    config: Config,
107    _state: PhantomData<S>,
108    /// The underlying connection (present only when connected)
109    connection: Option<ConnectionHandle>,
110    /// Server version from LoginAck (raw u32 TDS version)
111    server_version: Option<u32>,
112    /// Current database from EnvChange
113    current_database: Option<String>,
114    /// Server's default collation from SqlCollation EnvChange during login.
115    /// Used when `SendStringParametersAsUnicode=false` to encode VARCHAR
116    /// parameters with the correct character encoding and collation bytes.
117    server_collation: Option<tds_protocol::token::Collation>,
118    /// Prepared statement cache for query optimization
119    statement_cache: StatementCache,
120    /// Transaction descriptor from BeginTransaction EnvChange.
121    /// Per MS-TDS spec, this value must be included in ALL_HEADERS for subsequent
122    /// requests within an explicit transaction. 0 indicates auto-commit mode.
123    transaction_descriptor: u64,
124    /// Whether a request has been sent and the response has not yet been fully read.
125    /// Used by the connection pool to detect dirty connections after cancel/timeout.
126    in_flight: bool,
127    /// Whether this connection needs a reset on next use.
128    /// Set by connection pool on checkin, cleared after first query/execute.
129    /// When true, the RESETCONNECTION flag is set on the first TDS packet.
130    needs_reset: bool,
131    /// OpenTelemetry instrumentation context (when otel feature is enabled)
132    #[cfg(feature = "otel")]
133    instrumentation: InstrumentationContext,
134    /// Always Encrypted context for column decryption (when always-encrypted feature is enabled)
135    #[cfg(feature = "always-encrypted")]
136    pub(crate) encryption_context: Option<std::sync::Arc<crate::encryption::EncryptionContext>>,
137}
138
139/// Internal connection handle wrapping the actual connection.
140///
141/// This is an enum to support different connection types:
142/// - TLS (TDS 8.0 strict mode) - requires `tls` feature
143/// - TLS with PreLogin wrapping (TDS 7.x style) - requires `tls` feature
144/// - Plain TCP (for internal networks or when `tls` feature is disabled)
145#[allow(dead_code)] // Connection will be used once query execution is implemented
146enum ConnectionHandle {
147    /// TLS connection (TDS 8.0 strict mode - TLS before any TDS traffic)
148    #[cfg(feature = "tls")]
149    Tls(Connection<TlsStream<TcpStream>>),
150    /// TLS connection with PreLogin wrapping (TDS 7.x style)
151    #[cfg(feature = "tls")]
152    TlsPrelogin(Connection<TlsStream<mssql_tls::TlsPreloginWrapper<TcpStream>>>),
153    /// Plain TCP connection (for internal networks or when `tls` feature is disabled)
154    Plain(Connection<TcpStream>),
155}
156
157/// The parameter `TypeInfo` to declare a typed NULL ([`crate::null`]) with, from
158/// its [`crate::ToSql::sql_type`] name. Returns `None` for an untyped NULL
159/// (`Option::None`, type `"NULL"`), which falls back to the default param type.
160#[cfg(feature = "always-encrypted")]
161fn null_param_type_info(sql_type: &str) -> Option<tds_protocol::rpc::TypeInfo> {
162    use tds_protocol::rpc::TypeInfo;
163    Some(match sql_type {
164        "BIT" => TypeInfo::bit(),
165        "TINYINT" => TypeInfo::tinyint(),
166        "SMALLINT" => TypeInfo::smallint(),
167        "INT" => TypeInfo::int(),
168        "BIGINT" => TypeInfo::bigint(),
169        "REAL" => TypeInfo::real(),
170        "FLOAT" => TypeInfo::float(),
171        "NVARCHAR" => TypeInfo::nvarchar(1),
172        "VARBINARY" => TypeInfo::varbinary(1),
173        "UNIQUEIDENTIFIER" => TypeInfo::uuid(),
174        "DATE" => TypeInfo::date(),
175        _ => return None,
176    })
177}
178
179// Private helper methods available to all connection states
180impl<S: ConnectionState> Client<S> {
181    /// The default per-command deadline from `command_timeout`.
182    ///
183    /// Returns `None` when `command_timeout` is zero, which means "no limit"
184    /// (matching ADO.NET's `SqlCommand.CommandTimeout = 0`).
185    pub(crate) fn command_deadline(&self) -> Option<std::time::Duration> {
186        let t = self.config.command_timeout;
187        if t.is_zero() { None } else { Some(t) }
188    }
189
190    /// Build a cancel handle for the current connection, regardless of
191    /// connection state. The public, documented surface is
192    /// [`Client::<Ready>::cancel_handle`]; both state-specific methods
193    /// delegate here.
194    pub(crate) fn connection_cancel_handle(&self) -> crate::cancel::CancelHandle {
195        let connection = self
196            .connection
197            .as_ref()
198            .expect("connection should be present");
199        match connection {
200            #[cfg(feature = "tls")]
201            ConnectionHandle::Tls(conn) => {
202                crate::cancel::CancelHandle::from_tls(conn.cancel_handle())
203            }
204            #[cfg(feature = "tls")]
205            ConnectionHandle::TlsPrelogin(conn) => {
206                crate::cancel::CancelHandle::from_tls_prelogin(conn.cancel_handle())
207            }
208            ConnectionHandle::Plain(conn) => {
209                crate::cancel::CancelHandle::from_plain(conn.cancel_handle())
210            }
211        }
212    }
213
214    /// Process transaction-related EnvChange tokens.
215    ///
216    /// This handles BeginTransaction, CommitTransaction, and RollbackTransaction
217    /// EnvChange tokens, updating the transaction descriptor accordingly.
218    ///
219    /// This enables executing BEGIN TRANSACTION, COMMIT, and ROLLBACK via raw SQL
220    /// while still having the transaction descriptor tracked correctly.
221    fn process_transaction_env_change(env: &EnvChange, transaction_descriptor: &mut u64) {
222        use tds_protocol::token::EnvChangeValue;
223
224        match env.env_type {
225            EnvChangeType::BeginTransaction => {
226                if let EnvChangeValue::Binary(ref data) = env.new_value {
227                    if data.len() >= 8 {
228                        let descriptor = u64::from_le_bytes([
229                            data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
230                        ]);
231                        tracing::debug!(descriptor = descriptor, "transaction started via raw SQL");
232                        *transaction_descriptor = descriptor;
233                    }
234                }
235            }
236            EnvChangeType::CommitTransaction | EnvChangeType::RollbackTransaction => {
237                tracing::debug!(
238                    env_type = ?env.env_type,
239                    "transaction ended via raw SQL"
240                );
241                *transaction_descriptor = 0;
242            }
243            _ => {}
244        }
245    }
246
247    /// Send a SQL batch to the server.
248    ///
249    /// Uses the client's current transaction descriptor in ALL_HEADERS.
250    /// Per MS-TDS spec, when in an explicit transaction, the descriptor
251    /// returned by BeginTransaction must be included.
252    ///
253    /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
254    /// is included in the first packet to reset connection state.
255    async fn send_sql_batch(&mut self, sql: &str) -> Result<()> {
256        let payload =
257            tds_protocol::encode_sql_batch_with_transaction(sql, self.transaction_descriptor);
258        let max_packet = self.config.packet_size as usize;
259
260        // Check if we need to reset the connection on this request
261        let reset = self.needs_reset;
262        if reset {
263            self.needs_reset = false; // Clear flag before sending
264            tracing::debug!("sending SQL batch with RESETCONNECTION flag");
265        }
266
267        self.in_flight = true;
268        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
269
270        match connection {
271            #[cfg(feature = "tls")]
272            ConnectionHandle::Tls(conn) => {
273                conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
274                    .await?;
275            }
276            #[cfg(feature = "tls")]
277            ConnectionHandle::TlsPrelogin(conn) => {
278                conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
279                    .await?;
280            }
281            ConnectionHandle::Plain(conn) => {
282                conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
283                    .await?;
284            }
285        }
286
287        Ok(())
288    }
289
290    /// Send an RPC request to the server.
291    ///
292    /// Uses the client's current transaction descriptor in ALL_HEADERS.
293    ///
294    /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
295    /// is included in the first packet to reset connection state.
296    pub(crate) async fn send_rpc(&mut self, rpc: &RpcRequest) -> Result<()> {
297        let payload = rpc.encode_with_transaction(self.transaction_descriptor);
298        let max_packet = self.config.packet_size as usize;
299
300        // Check if we need to reset the connection on this request
301        let reset = self.needs_reset;
302        if reset {
303            self.needs_reset = false; // Clear flag before sending
304            tracing::debug!("sending RPC with RESETCONNECTION flag");
305        }
306
307        self.in_flight = true;
308        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
309
310        match connection {
311            #[cfg(feature = "tls")]
312            ConnectionHandle::Tls(conn) => {
313                conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
314                    .await?;
315            }
316            #[cfg(feature = "tls")]
317            ConnectionHandle::TlsPrelogin(conn) => {
318                conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
319                    .await?;
320            }
321            ConnectionHandle::Plain(conn) => {
322                conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
323                    .await?;
324            }
325        }
326
327        Ok(())
328    }
329
330    /// Start building a stored procedure call with full control over parameters.
331    ///
332    /// Returns a [`crate::procedure::ProcedureBuilder`] that allows adding named input and output
333    /// parameters before executing the call.
334    ///
335    /// The procedure name is validated to prevent SQL injection. It may be
336    /// schema-qualified (e.g., `"dbo.MyProc"`).
337    ///
338    /// # Example
339    ///
340    /// ```rust,no_run
341    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
342    /// let result = client.procedure("dbo.CalculateSum")?
343    ///     .input("@a", &10i32)
344    ///     .input("@b", &20i32)
345    ///     .output_int("@result")
346    ///     .execute().await?;
347    ///
348    /// let sum = result.get_output("@result").unwrap();
349    /// # let _ = sum;
350    /// # Ok(())
351    /// # }
352    /// ```
353    pub fn procedure(
354        &mut self,
355        proc_name: &str,
356    ) -> Result<crate::procedure::ProcedureBuilder<'_, S>> {
357        crate::validation::validate_qualified_identifier(proc_name)?;
358        Ok(crate::procedure::ProcedureBuilder::new(self, proc_name))
359    }
360
361    /// Execute a stored procedure with positional input parameters.
362    ///
363    /// This is a convenience method for the common case of calling a procedure
364    /// with input-only parameters. For output parameters or named parameters,
365    /// use [`procedure()`](Client::procedure) instead.
366    ///
367    /// # Example
368    ///
369    /// ```rust,no_run
370    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
371    /// let result = client.call_procedure("dbo.GetUser", &[&1i32]).await?;
372    /// assert_eq!(result.return_value, 0);
373    ///
374    /// if let Some(rs) = result.first_result_set() {
375    ///     println!("columns: {:?}", rs.columns());
376    /// }
377    /// # Ok(())
378    /// # }
379    /// ```
380    pub async fn call_procedure(
381        &mut self,
382        proc_name: &str,
383        params: &[&(dyn crate::ToSql + Sync)],
384    ) -> Result<crate::stream::ProcedureResult> {
385        crate::validation::validate_qualified_identifier(proc_name)?;
386
387        tracing::debug!(
388            proc_name = proc_name,
389            params_count = params.len(),
390            "executing stored procedure"
391        );
392
393        let rpc_params =
394            Self::convert_params_positional(params, self.send_unicode(), self.server_collation())?;
395        let mut rpc = RpcRequest::named(proc_name);
396        for param in rpc_params {
397            rpc = rpc.param(param);
398        }
399
400        let deadline = self.command_deadline();
401        let canceller = self.connection_cancel_handle();
402        run_with_deadline(
403            async {
404                self.send_rpc(&rpc).await?;
405                self.read_procedure_result().await
406            },
407            deadline,
408            canceller,
409        )
410        .await
411    }
412
413    /// Ask the server how each parameter of a statement must be encrypted.
414    ///
415    /// Issues the `sp_describe_parameter_encryption` system RPC for the
416    /// parameterized statement `tsql` with the parameter declaration `params`
417    /// (e.g. `"@id int, @name nvarchar(64)"`), and parses the two result sets
418    /// into a [`ParameterEncryptionInfo`](crate::ParameterEncryptionInfo): the
419    /// CEK table, plus — for each parameter the server reports as encrypted —
420    /// which CEK and whether deterministic or randomized. Parameters the server
421    /// reports as plaintext are omitted.
422    ///
423    /// This is the first step of Always Encrypted parameter encryption; the
424    /// connection must have negotiated it (`Column Encryption Setting=Enabled`).
425    #[cfg(feature = "always-encrypted")]
426    pub async fn describe_parameter_encryption(
427        &mut self,
428        tsql: &str,
429        params: &str,
430    ) -> Result<crate::ParameterEncryptionInfo> {
431        let tsql_arg = tsql.to_string();
432        let params_arg = params.to_string();
433        let mut result = self
434            .call_procedure(
435                "sp_describe_parameter_encryption",
436                &[&tsql_arg, &params_arg],
437            )
438            .await?;
439        crate::ParameterEncryptionInfo::from_describe_result_sets(&mut result.result_sets)
440    }
441
442    /// Build the `sp_executesql` request for a parameterized statement.
443    ///
444    /// When the connection has Always Encrypted enabled, parameters the server
445    /// reports as encrypted are encrypted client-side first (an extra
446    /// `sp_describe_parameter_encryption` round-trip). Otherwise this is the
447    /// plain parameter conversion.
448    pub(crate) async fn build_parameterized_rpc(
449        &mut self,
450        sql: &str,
451        params: &[&(dyn crate::ToSql + Sync)],
452    ) -> Result<RpcRequest> {
453        #[cfg(feature = "always-encrypted")]
454        if self.encryption_context.is_some() {
455            return self.build_encrypted_sql_rpc(sql, params).await;
456        }
457        let rpc_params =
458            Self::convert_params(params, self.send_unicode(), self.server_collation())?;
459        Ok(RpcRequest::execute_sql(sql, rpc_params))
460    }
461
462    /// Encrypt the Always Encrypted parameters of a statement, then build its
463    /// `sp_executesql` request.
464    ///
465    /// Asks the server which parameters are encrypted
466    /// ([`describe_parameter_encryption`](Self::describe_parameter_encryption)),
467    /// then for each one normalizes the value, resolves its column encryption
468    /// key, encrypts, and emits an encrypted RPC parameter. Parameters the
469    /// server reports as plaintext are sent unchanged.
470    #[cfg(feature = "always-encrypted")]
471    async fn build_encrypted_sql_rpc(
472        &mut self,
473        sql: &str,
474        params: &[&(dyn crate::ToSql + Sync)],
475    ) -> Result<RpcRequest> {
476        use tds_protocol::rpc::RpcParam;
477
478        let Some(ctx) = self.encryption_context.clone() else {
479            let rpc_params =
480                Self::convert_params(params, self.send_unicode(), self.server_collation())?;
481            return Ok(RpcRequest::execute_sql(sql, rpc_params));
482        };
483
484        // Resolve each parameter's value once (AE normalization needs the typed
485        // value, not the wire encoding) and build the plaintext RPC params.
486        let send_unicode = self.send_unicode();
487        let collation = self.server_collation().cloned();
488        let mut values: Vec<mssql_types::SqlValue> = Vec::with_capacity(params.len());
489        let mut plaintext: Vec<RpcParam> = Vec::with_capacity(params.len());
490        for (i, p) in params.iter().enumerate() {
491            let name = format!("@p{}", i + 1);
492            let value = p.to_sql()?;
493            // A typed NULL (e.g. `null::<i32>()`) is declared by its SQL type so
494            // describe accepts it against the target encrypted column; an untyped
495            // NULL falls back to the default in `sql_value_to_rpc_param`.
496            let rpc_param = match (&value, null_param_type_info(p.sql_type())) {
497                (mssql_types::SqlValue::Null, Some(type_info)) => RpcParam::null(&name, type_info),
498                _ => {
499                    let mut param = Self::sql_value_to_rpc_param(
500                        &name,
501                        &value,
502                        send_unicode,
503                        collation.as_ref(),
504                    )?;
505                    // `numeric(v, p, s)` declares an explicit `decimal(p, s)` so
506                    // describe matches an encrypted decimal column exactly — the
507                    // value alone conveys scale but not precision.
508                    if let Some(d) = p.decimal_param_info() {
509                        param.type_info =
510                            tds_protocol::rpc::TypeInfo::decimal(d.precision, d.scale);
511                    }
512                    param
513                }
514            };
515            plaintext.push(rpc_param);
516            values.push(value);
517        }
518
519        if plaintext.is_empty() {
520            return Ok(RpcRequest::execute_sql(sql, plaintext));
521        }
522
523        // Ask the server which parameters need encryption.
524        let declarations = RpcRequest::build_param_declarations(&plaintext);
525        let info = self
526            .describe_parameter_encryption(sql, &declarations)
527            .await?;
528        if info.parameters.is_empty() {
529            return Ok(RpcRequest::execute_sql(sql, plaintext));
530        }
531
532        // Encrypt the flagged parameters; pass the rest through untouched.
533        let mut final_params: Vec<RpcParam> = Vec::with_capacity(plaintext.len());
534        for (value, param) in values.into_iter().zip(plaintext) {
535            let Some(crypto) = info.get_parameter(&param.name) else {
536                final_params.push(param);
537                continue;
538            };
539            let entry = info.cek_table.get(crypto.cek_ordinal).ok_or_else(|| {
540                Error::Protocol(format!(
541                    "encrypted parameter {} references missing CEK ordinal {}",
542                    param.name, crypto.cek_ordinal
543                ))
544            })?;
545            let metadata = tds_protocol::rpc::EncryptedParamMetadata {
546                base_type_info: param.type_info.clone(),
547                algorithm_id: crypto.algorithm_id,
548                encryption_type: crypto.encryption_type,
549                database_id: entry.database_id,
550                cek_id: entry.cek_id,
551                cek_version: entry.cek_version,
552                cek_md_version: entry.cek_md_version,
553                normalization_rule_version: crypto.normalization_rule_version,
554            };
555            // A NULL value bound to an encrypted column is sent as an encrypted
556            // NULL (the server rejects a plaintext parameter for an encrypted
557            // column); there is nothing to encrypt.
558            if matches!(value, mssql_types::SqlValue::Null) {
559                final_params.push(RpcParam::encrypted_null(param.name, metadata));
560                continue;
561            }
562            let normalized = crate::encryption::normalize_for_encryption(&value)?;
563            let ciphertext = ctx
564                .encrypt_value(&normalized, entry, crypto.encryption_type)
565                .await?;
566            final_params.push(RpcParam::encrypted(
567                param.name,
568                bytes::Bytes::from(ciphertext),
569                metadata,
570            ));
571        }
572
573        Ok(RpcRequest::execute_sql(sql, final_params))
574    }
575
576    /// Start a bulk insert operation for the specified table.
577    ///
578    /// Sends the `INSERT BULK` statement to the server and returns a
579    /// [`crate::bulk::BulkWriter`] for streaming rows. The writer holds
580    /// a mutable borrow on the client, preventing other operations while
581    /// the bulk insert is in progress.
582    ///
583    /// # Example
584    ///
585    /// ```rust,no_run
586    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
587    /// use mssql_client::{BulkInsertBuilder, BulkColumn, SqlValue};
588    ///
589    /// let builder = BulkInsertBuilder::new("dbo.Users")
590    ///     .with_typed_columns(vec![
591    ///         BulkColumn::new("id", "INT", 0)?,
592    ///         BulkColumn::new("name", "NVARCHAR(100)", 1)?,
593    ///     ]);
594    ///
595    /// let mut writer = client.bulk_insert(&builder).await?;
596    /// writer.send_row_values(&[SqlValue::Int(1), SqlValue::String("Alice".into())])?;
597    /// writer.send_row_values(&[SqlValue::Int(2), SqlValue::String("Bob".into())])?;
598    /// let result = writer.finish().await?;
599    /// println!("Inserted {} rows", result.rows_affected);
600    /// # Ok(())
601    /// # }
602    /// ```
603    pub async fn bulk_insert(
604        &mut self,
605        builder: &crate::bulk::BulkInsertBuilder,
606    ) -> Result<crate::bulk::BulkWriter<'_, S>> {
607        use tds_protocol::token::{ColMetaData, Token};
608
609        tracing::debug!(
610            table = builder.table_name(),
611            columns = builder.columns().len(),
612            "starting bulk insert"
613        );
614
615        // Step 1: Query the server for column metadata.
616        // This gives us the exact type encoding the server expects for BulkLoad,
617        // following the pattern established by Tiberius.
618        let meta_query = format!("SELECT TOP 0 * FROM {}", builder.table_name());
619        let deadline = self.command_deadline();
620        let canceller = self.connection_cancel_handle();
621        let message = run_with_deadline(
622            async {
623                self.send_sql_batch(&meta_query).await?;
624                self.read_response_message().await
625            },
626            deadline,
627            canceller,
628        )
629        .await?;
630        self.in_flight = false;
631
632        // Capture both the raw COLMETADATA bytes and parsed column info
633        let raw_payload = message.payload.clone();
634        let mut parser = self.create_parser(message.payload);
635        let mut server_metadata: Option<ColMetaData> = None;
636        let mut meta_start: usize = 0;
637        let mut meta_end: usize = 0;
638
639        loop {
640            let pos_before = raw_payload.len() - parser.remaining();
641            let token = parser.next_token_with_metadata(server_metadata.as_ref())?;
642            let pos_after = raw_payload.len() - parser.remaining();
643            let Some(token) = token else { break };
644
645            match token {
646                Token::ColMetaData(meta) => {
647                    meta_start = pos_before;
648                    meta_end = pos_after;
649                    server_metadata = Some(meta);
650                }
651                Token::Done(_) => break,
652                _ => {}
653            }
654        }
655
656        // Reject deprecated TEXT/NTEXT/IMAGE columns reported by the server.
657        // These types require a legacy TEXTPTR wire format that this driver
658        // does not support — users should migrate the column to VARCHAR(MAX) /
659        // NVARCHAR(MAX) / VARBINARY(MAX).
660        if let Some(ref meta) = server_metadata {
661            use tds_protocol::types::TypeId;
662            for col in meta.columns.iter() {
663                let (rejected, replacement) = match col.type_id {
664                    TypeId::Text => (Some("TEXT"), "VARCHAR(MAX)"),
665                    TypeId::NText => (Some("NTEXT"), "NVARCHAR(MAX)"),
666                    TypeId::Image => (Some("IMAGE"), "VARBINARY(MAX)"),
667                    _ => (None, ""),
668                };
669                if let Some(sql_type) = rejected {
670                    return Err(Error::from(mssql_types::TypeError::UnsupportedType {
671                        sql_type: sql_type.to_string(),
672                        reason: format!(
673                            "column `{}` in table `{}` is {} — TEXT/NTEXT/IMAGE \
674                             are not supported. Alter the column to {} instead \
675                             (Microsoft deprecated TEXT/NTEXT/IMAGE in SQL \
676                             Server 2005).",
677                            col.name,
678                            builder.table_name(),
679                            sql_type,
680                            replacement,
681                        ),
682                    }));
683                }
684            }
685        }
686
687        // Step 2: Send INSERT BULK statement to put server in bulk load mode
688        let stmt = builder.build_insert_bulk_statement()?;
689        let deadline = self.command_deadline();
690        let canceller = self.connection_cancel_handle();
691        run_with_deadline(
692            async {
693                self.send_sql_batch(&stmt).await?;
694                self.read_execute_result().await
695            },
696            deadline,
697            canceller,
698        )
699        .await?;
700
701        // Step 3: Create bulk writer with server's metadata
702        let raw_meta = if meta_end > meta_start {
703            Some(raw_payload.slice(meta_start..meta_end))
704        } else {
705            None
706        };
707
708        let server_cols = server_metadata.as_ref().map(|m| m.columns.as_slice());
709        let bulk = crate::bulk::BulkInsert::new_with_server_metadata(
710            builder.columns().to_vec(),
711            builder.options().batch_size,
712            raw_meta,
713            server_cols,
714        );
715
716        Ok(crate::bulk::BulkWriter::new(self, bulk))
717    }
718
719    /// Start a bulk insert without querying the server for column metadata.
720    ///
721    /// Unlike [`bulk_insert()`](Self::bulk_insert), this method does not send
722    /// `SELECT TOP 0 * FROM table` to discover column types. Instead, the
723    /// column metadata is constructed from the `BulkColumn` types provided
724    /// on the builder. This saves a round-trip when the schema is known.
725    ///
726    /// # Caveats
727    ///
728    /// The caller must ensure `BulkColumn` entries match the target table's
729    /// column definitions exactly. Mismatched types, lengths, precision/scale,
730    /// or column ordering will cause the server to reject the BulkLoad packet.
731    ///
732    /// For most use cases, prefer [`bulk_insert()`](Self::bulk_insert) — the
733    /// extra round-trip is usually negligible and the server-supplied metadata
734    /// is guaranteed correct.
735    pub async fn bulk_insert_without_schema_discovery(
736        &mut self,
737        builder: &crate::bulk::BulkInsertBuilder,
738    ) -> Result<crate::bulk::BulkWriter<'_, S>> {
739        tracing::debug!(
740            table = builder.table_name(),
741            columns = builder.columns().len(),
742            "starting bulk insert (no schema discovery)"
743        );
744
745        // Send INSERT BULK statement to put server in bulk load mode
746        let stmt = builder.build_insert_bulk_statement()?;
747        let deadline = self.command_deadline();
748        let canceller = self.connection_cancel_handle();
749        run_with_deadline(
750            async {
751                self.send_sql_batch(&stmt).await?;
752                self.read_execute_result().await
753            },
754            deadline,
755            canceller,
756        )
757        .await?;
758
759        // Create bulk writer with hand-crafted metadata
760        let bulk =
761            crate::bulk::BulkInsert::new(builder.columns().to_vec(), builder.options().batch_size);
762
763        Ok(crate::bulk::BulkWriter::new(self, bulk))
764    }
765
766    /// Send bulk load data as a BulkLoad (0x07) message and read the server response.
767    ///
768    /// Used internally by [`crate::bulk::BulkWriter::finish()`] to transmit accumulated
769    /// row data after the `INSERT BULK` statement has been acknowledged.
770    pub(crate) async fn send_and_read_bulk_load(&mut self, payload: bytes::Bytes) -> Result<u64> {
771        let max_packet = self.config.packet_size as usize;
772
773        self.in_flight = true;
774        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
775
776        match connection {
777            #[cfg(feature = "tls")]
778            ConnectionHandle::Tls(conn) => {
779                conn.send_message(PacketType::BulkLoad, payload, max_packet)
780                    .await?;
781            }
782            #[cfg(feature = "tls")]
783            ConnectionHandle::TlsPrelogin(conn) => {
784                conn.send_message(PacketType::BulkLoad, payload, max_packet)
785                    .await?;
786            }
787            ConnectionHandle::Plain(conn) => {
788                conn.send_message(PacketType::BulkLoad, payload, max_packet)
789                    .await?;
790            }
791        }
792
793        // Read the server's Done response with row count
794        self.read_execute_result().await
795    }
796
797    /// Execute a query with named parameters and return a streaming result set.
798    ///
799    /// This method accepts [`NamedParam`](crate::to_params::NamedParam) values,
800    /// making it compatible with the [`ToParams`](crate::to_params::ToParams) trait
801    /// and the `#[derive(ToParams)]` macro.
802    ///
803    /// # Example
804    ///
805    /// ```rust,no_run
806    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
807    /// use mssql_client::{NamedParam, ToParams};
808    ///
809    /// // With derive macro:
810    /// #[derive(mssql_derive::ToParams)]
811    /// struct UserQuery { name: String }
812    ///
813    /// let q = UserQuery { name: "Alice".into() };
814    /// let rows = client.query_named(
815    ///     "SELECT * FROM users WHERE name = @name",
816    ///     &q.to_params()?,
817    /// ).await?;
818    ///
819    /// // Or manually:
820    /// let params = vec![NamedParam::from_value("name", &"Alice")?];
821    /// let rows = client.query_named(
822    ///     "SELECT * FROM users WHERE name = @name",
823    ///     &params,
824    /// ).await?;
825    /// # let _ = rows;
826    /// # Ok(())
827    /// # }
828    /// ```
829    pub async fn query_named<'a>(
830        &'a mut self,
831        sql: &str,
832        params: &[crate::to_params::NamedParam],
833    ) -> Result<QueryStream<'a>> {
834        tracing::debug!(
835            sql = sql,
836            params_count = params.len(),
837            "executing query with named parameters"
838        );
839
840        if params.is_empty() {
841            self.send_sql_batch(sql).await?;
842        } else {
843            let rpc_params =
844                Self::convert_named_params(params, self.send_unicode(), self.server_collation())?;
845            let rpc = RpcRequest::execute_sql(sql, rpc_params);
846            self.send_rpc(&rpc).await?;
847        }
848
849        let resp = self.read_query_response().await?;
850        #[cfg(feature = "always-encrypted")]
851        {
852            Ok(QueryStream::from_raw(
853                resp.columns,
854                resp.pending_rows,
855                resp.meta,
856                resp.decryptor,
857            ))
858        }
859        #[cfg(not(feature = "always-encrypted"))]
860        {
861            Ok(QueryStream::from_raw(
862                resp.columns,
863                resp.pending_rows,
864                resp.meta,
865            ))
866        }
867    }
868
869    /// Execute a statement with named parameters.
870    ///
871    /// Returns the number of affected rows. This is the named-parameter
872    /// counterpart of [`execute()`](Client::execute), compatible with the
873    /// [`ToParams`](crate::to_params::ToParams) trait.
874    ///
875    /// # Example
876    ///
877    /// ```rust,no_run
878    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
879    /// use mssql_client::NamedParam;
880    ///
881    /// let params = vec![
882    ///     NamedParam::from_value("name", &"Alice")?,
883    ///     NamedParam::from_value("email", &"alice@example.com")?,
884    /// ];
885    /// let rows_affected = client.execute_named(
886    ///     "INSERT INTO users (name, email) VALUES (@name, @email)",
887    ///     &params,
888    /// ).await?;
889    /// # let _ = rows_affected;
890    /// # Ok(())
891    /// # }
892    /// ```
893    pub async fn execute_named(
894        &mut self,
895        sql: &str,
896        params: &[crate::to_params::NamedParam],
897    ) -> Result<u64> {
898        tracing::debug!(
899            sql = sql,
900            params_count = params.len(),
901            "executing statement with named parameters"
902        );
903
904        let deadline = self.command_deadline();
905        let canceller = self.connection_cancel_handle();
906        run_with_deadline(
907            async {
908                if params.is_empty() {
909                    self.send_sql_batch(sql).await?;
910                } else {
911                    let rpc_params = Self::convert_named_params(
912                        params,
913                        self.send_unicode(),
914                        self.server_collation(),
915                    )?;
916                    let rpc = RpcRequest::execute_sql(sql, rpc_params);
917                    self.send_rpc(&rpc).await?;
918                }
919
920                self.read_execute_result().await
921            },
922            deadline,
923            canceller,
924        )
925        .await
926    }
927
928    /// Whether string parameters are sent as NVARCHAR (Unicode).
929    pub(crate) fn send_unicode(&self) -> bool {
930        self.config.send_string_parameters_as_unicode
931    }
932
933    /// Server's default collation, captured from ENVCHANGE during login.
934    pub(crate) fn server_collation(&self) -> Option<&tds_protocol::token::Collation> {
935        self.server_collation.as_ref()
936    }
937}
938
939impl Client<Ready> {
940    /// Mark this connection as needing a reset on next use.
941    ///
942    /// Called by the connection pool when a connection is returned.
943    /// The next SQL batch or RPC will include the RESETCONNECTION flag
944    /// in the TDS packet header, causing SQL Server to reset connection
945    /// state (temp tables, SET options, transaction isolation level, etc.)
946    /// before executing the command.
947    ///
948    /// This is more efficient than calling `sp_reset_connection` as a
949    /// separate command because it's handled at the TDS protocol level.
950    pub fn mark_needs_reset(&mut self) {
951        self.needs_reset = true;
952    }
953
954    /// Check if this connection needs a reset.
955    ///
956    /// Returns true if `mark_needs_reset()` was called and the reset
957    /// hasn't been performed yet.
958    #[must_use]
959    pub fn needs_reset(&self) -> bool {
960        self.needs_reset
961    }
962
963    /// Execute a query and return a result set with lazy per-row decoding.
964    ///
965    /// Per ADR-007 the full response is buffered in memory and each row is
966    /// *decoded* on demand as you iterate — this is not incremental network
967    /// streaming, so peak memory tracks the response size. Use
968    /// `.collect_all()` if you want all rows materialized into a `Vec` up
969    /// front.
970    ///
971    /// # Example
972    ///
973    /// ```rust,no_run
974    /// # use mssql_client::Row;
975    /// # fn process(_: &Row) {}
976    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
977    /// // Streaming (synchronous iteration over the result set)
978    /// let stream = client.query("SELECT * FROM users WHERE id = @p1", &[&1]).await?;
979    /// for row in stream {
980    ///     let row = row?;
981    ///     process(&row);
982    /// }
983    ///
984    /// // Buffered (loads all into memory)
985    /// let rows: Vec<Row> = client
986    ///     .query("SELECT * FROM small_table", &[])
987    ///     .await?
988    ///     .collect_all()
989    ///     .await?;
990    /// # let _ = rows;
991    /// # Ok(())
992    /// # }
993    /// ```
994    pub async fn query<'a>(
995        &'a mut self,
996        sql: &str,
997        params: &[&(dyn crate::ToSql + Sync)],
998    ) -> Result<QueryStream<'a>> {
999        let deadline = self.command_deadline();
1000        self.query_inner(sql, params, deadline).await
1001    }
1002
1003    /// Shared query implementation with an explicit command deadline.
1004    async fn query_inner<'a>(
1005        &'a mut self,
1006        sql: &str,
1007        params: &[&(dyn crate::ToSql + Sync)],
1008        deadline: Option<std::time::Duration>,
1009    ) -> Result<QueryStream<'a>> {
1010        tracing::debug!(sql = sql, params_count = params.len(), "executing query");
1011
1012        #[cfg(feature = "otel")]
1013        let instrumentation = self.instrumentation.clone();
1014        #[cfg(feature = "otel")]
1015        let mut span = instrumentation.query_span(sql);
1016        #[cfg(feature = "otel")]
1017        let timer = crate::instrumentation::OperationTimer::start(
1018            crate::instrumentation::extract_operation(sql),
1019        );
1020
1021        let canceller = self.cancel_handle();
1022        let result = run_with_deadline(
1023            async {
1024                if params.is_empty() {
1025                    // Simple query without parameters - use SQL batch
1026                    self.send_sql_batch(sql).await?;
1027                } else {
1028                    // Parameterized query - sp_executesql (encrypts Always Encrypted params).
1029                    let rpc = self.build_parameterized_rpc(sql, params).await?;
1030                    self.send_rpc(&rpc).await?;
1031                }
1032
1033                // Read complete response including columns and rows
1034                self.read_query_response().await
1035            },
1036            deadline,
1037            canceller,
1038        )
1039        .await;
1040
1041        #[cfg(feature = "otel")]
1042        match &result {
1043            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1044            Err(e) => InstrumentationContext::record_error(&mut span, e),
1045        }
1046        #[cfg(feature = "otel")]
1047        timer.finish(instrumentation.metrics(), result.is_ok());
1048
1049        // Drop the span before returning
1050        #[cfg(feature = "otel")]
1051        drop(span);
1052
1053        let resp = result?;
1054        #[cfg(feature = "always-encrypted")]
1055        {
1056            Ok(QueryStream::from_raw(
1057                resp.columns,
1058                resp.pending_rows,
1059                resp.meta,
1060                resp.decryptor,
1061            ))
1062        }
1063        #[cfg(not(feature = "always-encrypted"))]
1064        {
1065            Ok(QueryStream::from_raw(
1066                resp.columns,
1067                resp.pending_rows,
1068                resp.meta,
1069            ))
1070        }
1071    }
1072
1073    /// Execute a query with a specific timeout.
1074    ///
1075    /// This overrides the default `command_timeout` from the connection configuration
1076    /// for this specific query. If the query does not complete within the specified
1077    /// duration, the driver sends an Attention packet to cancel it server-side,
1078    /// drains the acknowledgement, and returns [`Error::CommandTimeout`] with the
1079    /// connection left usable for the next request.
1080    ///
1081    /// # Arguments
1082    ///
1083    /// * `sql` - The SQL query to execute
1084    /// * `params` - Query parameters
1085    /// * `timeout_duration` - Maximum time to wait for the query to complete
1086    ///
1087    /// # Example
1088    ///
1089    /// ```rust,no_run
1090    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1091    /// use std::time::Duration;
1092    ///
1093    /// // Execute with a 5-second timeout
1094    /// let rows = client
1095    ///     .query_with_timeout(
1096    ///         "SELECT * FROM large_table",
1097    ///         &[],
1098    ///         Duration::from_secs(5),
1099    ///     )
1100    ///     .await?;
1101    /// # let _ = rows;
1102    /// # Ok(())
1103    /// # }
1104    /// ```
1105    pub async fn query_with_timeout<'a>(
1106        &'a mut self,
1107        sql: &str,
1108        params: &[&(dyn crate::ToSql + Sync)],
1109        timeout_duration: std::time::Duration,
1110    ) -> Result<QueryStream<'a>> {
1111        self.query_inner(sql, params, Some(timeout_duration)).await
1112    }
1113
1114    /// Execute a batch that may return multiple result sets.
1115    ///
1116    /// This is useful for stored procedures or SQL batches that contain
1117    /// multiple SELECT statements.
1118    ///
1119    /// # Example
1120    ///
1121    /// ```rust,no_run
1122    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1123    /// // Execute a batch with multiple SELECT statements
1124    /// let mut results = client.query_multiple(
1125    ///     "SELECT 1 AS a; SELECT 2 AS b, 3 AS c;",
1126    ///     &[]
1127    /// ).await?;
1128    ///
1129    /// // Process first result set
1130    /// while let Some(row) = results.next_row().await? {
1131    ///     println!("Result 1: {:?}", row);
1132    /// }
1133    ///
1134    /// // Move to second result set
1135    /// if results.next_result().await? {
1136    ///     while let Some(row) = results.next_row().await? {
1137    ///         println!("Result 2: {:?}", row);
1138    ///     }
1139    /// }
1140    /// # Ok(())
1141    /// # }
1142    /// ```
1143    pub async fn query_multiple<'a>(
1144        &'a mut self,
1145        sql: &str,
1146        params: &[&(dyn crate::ToSql + Sync)],
1147    ) -> Result<MultiResultStream<'a>> {
1148        tracing::debug!(
1149            sql = sql,
1150            params_count = params.len(),
1151            "executing multi-result query"
1152        );
1153
1154        let deadline = self.command_deadline();
1155        let canceller = self.connection_cancel_handle();
1156        let result_sets = run_with_deadline(
1157            async {
1158                if params.is_empty() {
1159                    // Simple batch without parameters - use SQL batch
1160                    self.send_sql_batch(sql).await?;
1161                } else {
1162                    // Parameterized query - sp_executesql (encrypts Always Encrypted params).
1163                    let rpc = self.build_parameterized_rpc(sql, params).await?;
1164                    self.send_rpc(&rpc).await?;
1165                }
1166
1167                // Read all result sets
1168                self.read_multi_result_response().await
1169            },
1170            deadline,
1171            canceller,
1172        )
1173        .await?;
1174        Ok(MultiResultStream::new(result_sets))
1175    }
1176
1177    /// Execute a query that doesn't return rows.
1178    ///
1179    /// Returns the number of affected rows.
1180    pub async fn execute(
1181        &mut self,
1182        sql: &str,
1183        params: &[&(dyn crate::ToSql + Sync)],
1184    ) -> Result<u64> {
1185        let deadline = self.command_deadline();
1186        self.execute_inner(sql, params, deadline).await
1187    }
1188
1189    /// Shared execute implementation with an explicit command deadline.
1190    async fn execute_inner(
1191        &mut self,
1192        sql: &str,
1193        params: &[&(dyn crate::ToSql + Sync)],
1194        deadline: Option<std::time::Duration>,
1195    ) -> Result<u64> {
1196        tracing::debug!(
1197            sql = sql,
1198            params_count = params.len(),
1199            "executing statement"
1200        );
1201
1202        #[cfg(feature = "otel")]
1203        let instrumentation = self.instrumentation.clone();
1204        #[cfg(feature = "otel")]
1205        let mut span = instrumentation.query_span(sql);
1206        #[cfg(feature = "otel")]
1207        let timer = crate::instrumentation::OperationTimer::start(
1208            crate::instrumentation::extract_operation(sql),
1209        );
1210
1211        let canceller = self.cancel_handle();
1212        let result = run_with_deadline(
1213            async {
1214                if params.is_empty() {
1215                    // Simple statement without parameters - use SQL batch
1216                    self.send_sql_batch(sql).await?;
1217                } else {
1218                    // Parameterized statement - sp_executesql (encrypts Always Encrypted params).
1219                    let rpc = self.build_parameterized_rpc(sql, params).await?;
1220                    self.send_rpc(&rpc).await?;
1221                }
1222
1223                // Read response and get row count
1224                self.read_execute_result().await
1225            },
1226            deadline,
1227            canceller,
1228        )
1229        .await;
1230
1231        #[cfg(feature = "otel")]
1232        match &result {
1233            Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
1234            Err(e) => InstrumentationContext::record_error(&mut span, e),
1235        }
1236        #[cfg(feature = "otel")]
1237        timer.finish(instrumentation.metrics(), result.is_ok());
1238
1239        // Drop the span before returning
1240        #[cfg(feature = "otel")]
1241        drop(span);
1242
1243        result
1244    }
1245
1246    /// Execute a statement with a specific timeout.
1247    ///
1248    /// This overrides the default `command_timeout` from the connection configuration
1249    /// for this specific statement. If the statement does not complete within the
1250    /// specified duration, the driver sends an Attention packet to cancel it
1251    /// server-side, drains the acknowledgement, and returns
1252    /// [`Error::CommandTimeout`] with the connection left usable.
1253    ///
1254    /// # Arguments
1255    ///
1256    /// * `sql` - The SQL statement to execute
1257    /// * `params` - Statement parameters
1258    /// * `timeout_duration` - Maximum time to wait for the statement to complete
1259    ///
1260    /// # Example
1261    ///
1262    /// ```rust,no_run
1263    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1264    /// use std::time::Duration;
1265    ///
1266    /// // Execute with a 10-second timeout
1267    /// let rows_affected = client
1268    ///     .execute_with_timeout(
1269    ///         "UPDATE large_table SET status = @p1",
1270    ///         &[&"processed"],
1271    ///         Duration::from_secs(10),
1272    ///     )
1273    ///     .await?;
1274    /// # let _ = rows_affected;
1275    /// # Ok(())
1276    /// # }
1277    /// ```
1278    pub async fn execute_with_timeout(
1279        &mut self,
1280        sql: &str,
1281        params: &[&(dyn crate::ToSql + Sync)],
1282        timeout_duration: std::time::Duration,
1283    ) -> Result<u64> {
1284        self.execute_inner(sql, params, Some(timeout_duration))
1285            .await
1286    }
1287
1288    /// Begin a transaction.
1289    ///
1290    /// This transitions the client from `Ready` to `InTransaction` state.
1291    /// Per MS-TDS spec, the server returns a transaction descriptor in the
1292    /// BeginTransaction EnvChange token that must be included in subsequent
1293    /// ALL_HEADERS sections.
1294    pub async fn begin_transaction(mut self) -> Result<Client<InTransaction>> {
1295        tracing::debug!("beginning transaction");
1296
1297        #[cfg(feature = "otel")]
1298        let instrumentation = self.instrumentation.clone();
1299        #[cfg(feature = "otel")]
1300        let mut span = instrumentation.transaction_span("BEGIN");
1301
1302        // Execute BEGIN TRANSACTION and extract the transaction descriptor
1303        let result = async {
1304            self.send_sql_batch("BEGIN TRANSACTION").await?;
1305            self.read_transaction_begin_result().await
1306        }
1307        .await;
1308
1309        #[cfg(feature = "otel")]
1310        match &result {
1311            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1312            Err(e) => InstrumentationContext::record_error(&mut span, e),
1313        }
1314
1315        // Drop the span before moving instrumentation
1316        #[cfg(feature = "otel")]
1317        drop(span);
1318
1319        let transaction_descriptor = result?;
1320
1321        Ok(Client {
1322            config: self.config,
1323            _state: PhantomData,
1324            connection: self.connection,
1325            server_version: self.server_version,
1326            current_database: self.current_database,
1327            server_collation: self.server_collation,
1328            statement_cache: self.statement_cache,
1329            transaction_descriptor, // Store the descriptor from server
1330            needs_reset: self.needs_reset,
1331            in_flight: self.in_flight,
1332            #[cfg(feature = "otel")]
1333            instrumentation: self.instrumentation,
1334            #[cfg(feature = "always-encrypted")]
1335            encryption_context: self.encryption_context,
1336        })
1337    }
1338
1339    /// Begin a transaction with a specific isolation level.
1340    ///
1341    /// This transitions the client from `Ready` to `InTransaction` state
1342    /// with the specified isolation level.
1343    ///
1344    /// # Example
1345    ///
1346    /// ```rust,no_run
1347    /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1348    /// use mssql_client::IsolationLevel;
1349    ///
1350    /// let tx = client.begin_transaction_with_isolation(IsolationLevel::Serializable).await?;
1351    /// // All operations in this transaction use SERIALIZABLE isolation
1352    /// tx.commit().await?;
1353    /// # Ok(())
1354    /// # }
1355    /// ```
1356    pub async fn begin_transaction_with_isolation(
1357        mut self,
1358        isolation_level: crate::transaction::IsolationLevel,
1359    ) -> Result<Client<InTransaction>> {
1360        tracing::debug!(
1361            isolation_level = %isolation_level.name(),
1362            "beginning transaction with isolation level"
1363        );
1364
1365        #[cfg(feature = "otel")]
1366        let instrumentation = self.instrumentation.clone();
1367        #[cfg(feature = "otel")]
1368        let mut span = instrumentation.transaction_span("BEGIN");
1369
1370        // First set the isolation level
1371        let result = async {
1372            self.send_sql_batch(isolation_level.as_sql()).await?;
1373            self.read_execute_result().await?;
1374
1375            // Then begin the transaction
1376            self.send_sql_batch("BEGIN TRANSACTION").await?;
1377            self.read_transaction_begin_result().await
1378        }
1379        .await;
1380
1381        #[cfg(feature = "otel")]
1382        match &result {
1383            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1384            Err(e) => InstrumentationContext::record_error(&mut span, e),
1385        }
1386
1387        #[cfg(feature = "otel")]
1388        drop(span);
1389
1390        let transaction_descriptor = result?;
1391
1392        Ok(Client {
1393            config: self.config,
1394            _state: PhantomData,
1395            connection: self.connection,
1396            server_version: self.server_version,
1397            current_database: self.current_database,
1398            server_collation: self.server_collation,
1399            statement_cache: self.statement_cache,
1400            transaction_descriptor,
1401            needs_reset: self.needs_reset,
1402            in_flight: self.in_flight,
1403            #[cfg(feature = "otel")]
1404            instrumentation: self.instrumentation,
1405            #[cfg(feature = "always-encrypted")]
1406            encryption_context: self.encryption_context,
1407        })
1408    }
1409
1410    /// Execute a simple query without parameters.
1411    ///
1412    /// This is useful for DDL statements and simple queries where you
1413    /// don't need to retrieve the affected row count.
1414    pub async fn simple_query(&mut self, sql: &str) -> Result<()> {
1415        tracing::debug!(sql = sql, "executing simple query");
1416
1417        // Send SQL batch
1418        self.send_sql_batch(sql).await?;
1419
1420        // Read and discard response
1421        let _ = self.read_execute_result().await?;
1422
1423        Ok(())
1424    }
1425
1426    /// Close the connection gracefully.
1427    pub async fn close(self) -> Result<()> {
1428        tracing::debug!("closing connection");
1429        Ok(())
1430    }
1431
1432    /// Get the current database name.
1433    #[must_use]
1434    pub fn database(&self) -> Option<&str> {
1435        self.config.database.as_deref()
1436    }
1437
1438    /// Get the server host.
1439    #[must_use]
1440    pub fn host(&self) -> &str {
1441        &self.config.host
1442    }
1443
1444    /// Get the server port.
1445    #[must_use]
1446    pub fn port(&self) -> u16 {
1447        self.config.port
1448    }
1449
1450    /// Check if the connection is currently in a transaction.
1451    ///
1452    /// This returns `true` if a transaction was started via raw SQL
1453    /// (`BEGIN TRANSACTION`) and has not yet been committed or rolled back.
1454    ///
1455    /// Note: This only tracks transactions started via raw SQL. Transactions
1456    /// started via the type-state API (`begin_transaction()`) result in a
1457    /// `Client<InTransaction>` which is a different type.
1458    ///
1459    /// # Example
1460    ///
1461    /// ```rust,no_run
1462    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1463    /// client.execute("BEGIN TRANSACTION", &[]).await?;
1464    /// assert!(client.is_in_transaction());
1465    ///
1466    /// client.execute("COMMIT", &[]).await?;
1467    /// assert!(!client.is_in_transaction());
1468    /// # Ok(())
1469    /// # }
1470    /// ```
1471    #[must_use]
1472    pub fn is_in_transaction(&self) -> bool {
1473        self.transaction_descriptor != 0
1474    }
1475
1476    /// Check if a request is in-flight (sent but response not fully read).
1477    ///
1478    /// Used by the connection pool to detect dirty connections that were
1479    /// interrupted mid-query (e.g., by `tokio::select!` or a timeout).
1480    /// A connection with an in-flight request has unread data in the TCP
1481    /// buffer and must be discarded rather than returned to the pool.
1482    #[must_use]
1483    pub fn is_in_flight(&self) -> bool {
1484        self.in_flight
1485    }
1486
1487    /// Report whether an Always Encrypted key-store provider with the given
1488    /// name is currently reachable through this client's encryption context.
1489    ///
1490    /// Returns `false` when the `always-encrypted` feature isn't enabled, when
1491    /// the connection was opened without `column_encryption` configured, or
1492    /// when no matching provider was registered.
1493    #[cfg(feature = "always-encrypted")]
1494    #[must_use]
1495    pub fn has_encryption_provider(&self, name: &str) -> bool {
1496        self.encryption_context
1497            .as_ref()
1498            .is_some_and(|ctx| ctx.has_provider(name))
1499    }
1500
1501    /// Get a handle for cancelling the current query.
1502    ///
1503    /// The cancel handle can be cloned and sent to other tasks, enabling
1504    /// cancellation of long-running queries from a separate async context.
1505    ///
1506    /// # Example
1507    ///
1508    /// ```rust,no_run
1509    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1510    /// use std::time::Duration;
1511    ///
1512    /// let cancel_handle = client.cancel_handle();
1513    ///
1514    /// // Spawn a task to cancel after 10 seconds
1515    /// let handle = tokio::spawn(async move {
1516    ///     tokio::time::sleep(Duration::from_secs(10)).await;
1517    ///     let _ = cancel_handle.cancel().await;
1518    /// });
1519    ///
1520    /// // This query will be cancelled if it runs longer than 10 seconds
1521    /// let result = client.query("SELECT * FROM very_large_table", &[]).await;
1522    /// # let _ = (handle, result);
1523    /// # Ok(())
1524    /// # }
1525    /// ```
1526    #[must_use]
1527    pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
1528        self.connection_cancel_handle()
1529    }
1530}
1531
1532/// # Drop Behavior
1533///
1534/// **`Client<InTransaction>` has no automatic rollback on drop.** If the client is
1535/// dropped without calling [`commit()`](Client::commit) or [`rollback()`](Client::rollback),
1536/// the transaction remains open on the server until the TCP connection closes
1537/// (at which point SQL Server automatically rolls back).
1538///
1539/// This is because `Drop` is synchronous and cannot perform the async I/O needed
1540/// to send a `ROLLBACK TRANSACTION` command.
1541///
1542/// ## Consequences of dropping without commit/rollback
1543///
1544/// - **Direct connections:** The transaction leaks until the OS TCP timeout
1545///   (potentially 30+ minutes), holding locks on any modified rows.
1546/// - **Pooled connections:** The pool detects the active transaction descriptor
1547///   and discards the connection rather than returning it to the idle pool
1548///   (see `PooledConnection::drop` in `mssql-driver-pool`).
1549///
1550/// ## Best practice
1551///
1552/// Always ensure `commit()` or `rollback()` is called. Use helper patterns
1553/// for error paths:
1554///
1555/// ```rust,no_run
1556/// # async fn do_work(_: &mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> { Ok(()) }
1557/// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1558/// let tx = client.begin_transaction().await?;
1559/// match do_work(&tx).await {
1560///     Ok(_) => { tx.commit().await?; }
1561///     Err(e) => { tx.rollback().await?; return Err(e); }
1562/// }
1563/// # Ok(())
1564/// # }
1565/// ```
1566impl Client<InTransaction> {
1567    /// Execute a query within the transaction and return a streaming result set.
1568    ///
1569    /// See [`Client<Ready>::query`] for usage examples.
1570    pub async fn query<'a>(
1571        &'a mut self,
1572        sql: &str,
1573        params: &[&(dyn crate::ToSql + Sync)],
1574    ) -> Result<QueryStream<'a>> {
1575        let deadline = self.command_deadline();
1576        self.query_inner(sql, params, deadline).await
1577    }
1578
1579    /// Shared query implementation with an explicit command deadline.
1580    async fn query_inner<'a>(
1581        &'a mut self,
1582        sql: &str,
1583        params: &[&(dyn crate::ToSql + Sync)],
1584        deadline: Option<std::time::Duration>,
1585    ) -> Result<QueryStream<'a>> {
1586        tracing::debug!(
1587            sql = sql,
1588            params_count = params.len(),
1589            "executing query in transaction"
1590        );
1591
1592        #[cfg(feature = "otel")]
1593        let instrumentation = self.instrumentation.clone();
1594        #[cfg(feature = "otel")]
1595        let mut span = instrumentation.query_span(sql);
1596        #[cfg(feature = "otel")]
1597        let timer = crate::instrumentation::OperationTimer::start(
1598            crate::instrumentation::extract_operation(sql),
1599        );
1600
1601        let canceller = self.cancel_handle();
1602        let result = run_with_deadline(
1603            async {
1604                if params.is_empty() {
1605                    // Simple query without parameters - use SQL batch
1606                    self.send_sql_batch(sql).await?;
1607                } else {
1608                    // Parameterized query - sp_executesql (encrypts Always Encrypted params).
1609                    let rpc = self.build_parameterized_rpc(sql, params).await?;
1610                    self.send_rpc(&rpc).await?;
1611                }
1612
1613                // Read complete response including columns and rows
1614                self.read_query_response().await
1615            },
1616            deadline,
1617            canceller,
1618        )
1619        .await;
1620
1621        #[cfg(feature = "otel")]
1622        match &result {
1623            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1624            Err(e) => InstrumentationContext::record_error(&mut span, e),
1625        }
1626        #[cfg(feature = "otel")]
1627        timer.finish(instrumentation.metrics(), result.is_ok());
1628
1629        // Drop the span before returning
1630        #[cfg(feature = "otel")]
1631        drop(span);
1632
1633        let resp = result?;
1634        #[cfg(feature = "always-encrypted")]
1635        {
1636            Ok(QueryStream::from_raw(
1637                resp.columns,
1638                resp.pending_rows,
1639                resp.meta,
1640                resp.decryptor,
1641            ))
1642        }
1643        #[cfg(not(feature = "always-encrypted"))]
1644        {
1645            Ok(QueryStream::from_raw(
1646                resp.columns,
1647                resp.pending_rows,
1648                resp.meta,
1649            ))
1650        }
1651    }
1652
1653    /// Execute a statement within the transaction.
1654    ///
1655    /// Returns the number of affected rows.
1656    pub async fn execute(
1657        &mut self,
1658        sql: &str,
1659        params: &[&(dyn crate::ToSql + Sync)],
1660    ) -> Result<u64> {
1661        let deadline = self.command_deadline();
1662        self.execute_inner(sql, params, deadline).await
1663    }
1664
1665    /// Shared execute implementation with an explicit command deadline.
1666    async fn execute_inner(
1667        &mut self,
1668        sql: &str,
1669        params: &[&(dyn crate::ToSql + Sync)],
1670        deadline: Option<std::time::Duration>,
1671    ) -> Result<u64> {
1672        tracing::debug!(
1673            sql = sql,
1674            params_count = params.len(),
1675            "executing statement in transaction"
1676        );
1677
1678        #[cfg(feature = "otel")]
1679        let instrumentation = self.instrumentation.clone();
1680        #[cfg(feature = "otel")]
1681        let mut span = instrumentation.query_span(sql);
1682        #[cfg(feature = "otel")]
1683        let timer = crate::instrumentation::OperationTimer::start(
1684            crate::instrumentation::extract_operation(sql),
1685        );
1686
1687        let canceller = self.cancel_handle();
1688        let result = run_with_deadline(
1689            async {
1690                if params.is_empty() {
1691                    // Simple statement without parameters - use SQL batch
1692                    self.send_sql_batch(sql).await?;
1693                } else {
1694                    // Parameterized statement - sp_executesql (encrypts Always Encrypted params).
1695                    let rpc = self.build_parameterized_rpc(sql, params).await?;
1696                    self.send_rpc(&rpc).await?;
1697                }
1698
1699                // Read response and get row count
1700                self.read_execute_result().await
1701            },
1702            deadline,
1703            canceller,
1704        )
1705        .await;
1706
1707        #[cfg(feature = "otel")]
1708        match &result {
1709            Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
1710            Err(e) => InstrumentationContext::record_error(&mut span, e),
1711        }
1712        #[cfg(feature = "otel")]
1713        timer.finish(instrumentation.metrics(), result.is_ok());
1714
1715        // Drop the span before returning
1716        #[cfg(feature = "otel")]
1717        drop(span);
1718
1719        result
1720    }
1721
1722    /// Execute a query within the transaction with a specific timeout.
1723    ///
1724    /// See [`Client<Ready>::query_with_timeout`] for details.
1725    pub async fn query_with_timeout<'a>(
1726        &'a mut self,
1727        sql: &str,
1728        params: &[&(dyn crate::ToSql + Sync)],
1729        timeout_duration: std::time::Duration,
1730    ) -> Result<QueryStream<'a>> {
1731        self.query_inner(sql, params, Some(timeout_duration)).await
1732    }
1733
1734    /// Execute a statement within the transaction with a specific timeout.
1735    ///
1736    /// See [`Client<Ready>::execute_with_timeout`] for details.
1737    pub async fn execute_with_timeout(
1738        &mut self,
1739        sql: &str,
1740        params: &[&(dyn crate::ToSql + Sync)],
1741        timeout_duration: std::time::Duration,
1742    ) -> Result<u64> {
1743        self.execute_inner(sql, params, Some(timeout_duration))
1744            .await
1745    }
1746
1747    /// Open a FILESTREAM BLOB for async reading and/or writing.
1748    ///
1749    /// This method queries the server for the transaction context, then opens
1750    /// the FILESTREAM handle using the native Win32 `OpenSqlFilestream` API.
1751    ///
1752    /// # Arguments
1753    ///
1754    /// * `path` — The UNC path obtained from the T-SQL `column.PathName()` function.
1755    ///   Query this yourself before calling `open_filestream`:
1756    ///   ```sql
1757    ///   SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1
1758    ///   ```
1759    /// * `access` — Read, write, or read/write access mode.
1760    ///
1761    /// # Requirements
1762    ///
1763    /// - SQL Server must have FILESTREAM enabled (`sp_configure 'filestream access level', 2`)
1764    /// - The Microsoft OLE DB Driver for SQL Server must be installed on the client
1765    /// - The `FileStream` must be dropped before calling [`commit`] or [`rollback`]
1766    ///
1767    /// # Example
1768    ///
1769    /// ```text
1770    /// use mssql_client::FileStreamAccess;
1771    /// use tokio::io::AsyncReadExt;
1772    ///
1773    /// let mut tx = client.begin_transaction().await?;
1774    ///
1775    /// // Get the FILESTREAM path
1776    /// let rows = tx.query(
1777    ///     "SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1",
1778    ///     &[&doc_id],
1779    /// ).await?;
1780    /// let path: String = rows.into_iter().next().unwrap()?.get(0)?;
1781    ///
1782    /// // Open and read the BLOB
1783    /// let mut stream = tx.open_filestream(&path, FileStreamAccess::Read).await?;
1784    /// let mut data = Vec::new();
1785    /// stream.read_to_end(&mut data).await?;
1786    /// drop(stream);
1787    ///
1788    /// tx.commit().await?;
1789    /// ```
1790    #[cfg(all(windows, feature = "filestream"))]
1791    pub async fn open_filestream(
1792        &mut self,
1793        path: &str,
1794        access: crate::filestream::FileStreamAccess,
1795    ) -> Result<crate::filestream::FileStream> {
1796        tracing::debug!(path = path, ?access, "opening FILESTREAM BLOB");
1797
1798        // Get the transaction context from SQL Server.
1799        // This binds the file access to the current SQL transaction.
1800        let txn_context: Vec<u8> = {
1801            let rows = self
1802                .query("SELECT GET_FILESTREAM_TRANSACTION_CONTEXT()", &[])
1803                .await?;
1804            let mut ctx = None;
1805            for result in rows {
1806                let row = result?;
1807                ctx = Some(row.get::<Vec<u8>>(0)?);
1808            }
1809            ctx.ok_or_else(|| {
1810                Error::FileStream("GET_FILESTREAM_TRANSACTION_CONTEXT() returned no rows".into())
1811            })?
1812        };
1813
1814        crate::filestream::FileStream::open(path, access, &txn_context)
1815    }
1816
1817    /// Commit the transaction.
1818    ///
1819    /// This transitions the client back to `Ready` state.
1820    pub async fn commit(mut self) -> Result<Client<Ready>> {
1821        tracing::debug!("committing transaction");
1822
1823        #[cfg(feature = "otel")]
1824        let instrumentation = self.instrumentation.clone();
1825        #[cfg(feature = "otel")]
1826        let mut span = instrumentation.transaction_span("COMMIT");
1827
1828        // Execute COMMIT TRANSACTION
1829        let result = async {
1830            self.send_sql_batch("COMMIT TRANSACTION").await?;
1831            self.read_execute_result().await
1832        }
1833        .await;
1834
1835        #[cfg(feature = "otel")]
1836        match &result {
1837            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1838            Err(e) => InstrumentationContext::record_error(&mut span, e),
1839        }
1840
1841        // Drop the span before moving instrumentation
1842        #[cfg(feature = "otel")]
1843        drop(span);
1844
1845        result?;
1846
1847        Ok(Client {
1848            config: self.config,
1849            _state: PhantomData,
1850            connection: self.connection,
1851            server_version: self.server_version,
1852            current_database: self.current_database,
1853            server_collation: self.server_collation,
1854            statement_cache: self.statement_cache,
1855            transaction_descriptor: 0, // Reset to auto-commit mode
1856            needs_reset: self.needs_reset,
1857            in_flight: self.in_flight,
1858            #[cfg(feature = "otel")]
1859            instrumentation: self.instrumentation,
1860            #[cfg(feature = "always-encrypted")]
1861            encryption_context: self.encryption_context,
1862        })
1863    }
1864
1865    /// Rollback the transaction.
1866    ///
1867    /// This transitions the client back to `Ready` state.
1868    pub async fn rollback(mut self) -> Result<Client<Ready>> {
1869        tracing::debug!("rolling back transaction");
1870
1871        #[cfg(feature = "otel")]
1872        let instrumentation = self.instrumentation.clone();
1873        #[cfg(feature = "otel")]
1874        let mut span = instrumentation.transaction_span("ROLLBACK");
1875
1876        // Execute ROLLBACK TRANSACTION
1877        let result = async {
1878            self.send_sql_batch("ROLLBACK TRANSACTION").await?;
1879            self.read_execute_result().await
1880        }
1881        .await;
1882
1883        #[cfg(feature = "otel")]
1884        match &result {
1885            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1886            Err(e) => InstrumentationContext::record_error(&mut span, e),
1887        }
1888
1889        // Drop the span before moving instrumentation
1890        #[cfg(feature = "otel")]
1891        drop(span);
1892
1893        result?;
1894
1895        Ok(Client {
1896            config: self.config,
1897            _state: PhantomData,
1898            connection: self.connection,
1899            server_version: self.server_version,
1900            current_database: self.current_database,
1901            server_collation: self.server_collation,
1902            statement_cache: self.statement_cache,
1903            transaction_descriptor: 0, // Reset to auto-commit mode
1904            needs_reset: self.needs_reset,
1905            in_flight: self.in_flight,
1906            #[cfg(feature = "otel")]
1907            instrumentation: self.instrumentation,
1908            #[cfg(feature = "always-encrypted")]
1909            encryption_context: self.encryption_context,
1910        })
1911    }
1912
1913    /// Create a savepoint and return a handle for later rollback.
1914    ///
1915    /// The returned `SavePoint` handle contains the validated savepoint name.
1916    /// Use it with `rollback_to()` to partially undo transaction work.
1917    ///
1918    /// # Example
1919    ///
1920    /// ```rust,no_run
1921    /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1922    /// let mut tx = client.begin_transaction().await?;
1923    /// tx.execute("INSERT INTO orders ...", &[]).await?;
1924    /// let sp = tx.save_point("before_items").await?;
1925    /// tx.execute("INSERT INTO items ...", &[]).await?;
1926    /// // Oops, rollback just the items
1927    /// tx.rollback_to(&sp).await?;
1928    /// tx.commit().await?;
1929    /// # Ok(())
1930    /// # }
1931    /// ```
1932    pub async fn save_point(&mut self, name: &str) -> Result<SavePoint> {
1933        crate::validation::validate_identifier(name)?;
1934        tracing::debug!(name = name, "creating savepoint");
1935
1936        // Execute SAVE TRANSACTION <name>
1937        // Note: name is validated by validate_identifier() to prevent SQL injection
1938        let sql = format!("SAVE TRANSACTION {name}");
1939        self.send_sql_batch(&sql).await?;
1940        self.read_execute_result().await?;
1941
1942        Ok(SavePoint::new(name.to_string()))
1943    }
1944
1945    /// Rollback to a savepoint.
1946    ///
1947    /// This rolls back all changes made after the savepoint was created,
1948    /// but keeps the transaction active. The savepoint remains valid and
1949    /// can be rolled back to again.
1950    ///
1951    /// # Example
1952    ///
1953    /// ```rust,no_run
1954    /// # async fn ex(mut tx: mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> {
1955    /// let sp = tx.save_point("checkpoint").await?;
1956    /// // ... do some work ...
1957    /// tx.rollback_to(&sp).await?;  // Undo changes since checkpoint
1958    /// // Transaction is still active, savepoint is still valid
1959    /// # Ok(())
1960    /// # }
1961    /// ```
1962    pub async fn rollback_to(&mut self, savepoint: &SavePoint) -> Result<()> {
1963        tracing::debug!(name = savepoint.name(), "rolling back to savepoint");
1964
1965        // Execute ROLLBACK TRANSACTION <name>
1966        // Note: savepoint name was validated during creation
1967        let sql = format!("ROLLBACK TRANSACTION {}", savepoint.name());
1968        self.send_sql_batch(&sql).await?;
1969        self.read_execute_result().await?;
1970
1971        Ok(())
1972    }
1973
1974    /// Release a savepoint (optional cleanup).
1975    ///
1976    /// Note: SQL Server doesn't have explicit savepoint release, but this
1977    /// method is provided for API completeness. The savepoint is automatically
1978    /// released when the transaction commits or rolls back.
1979    pub async fn release_savepoint(&mut self, savepoint: SavePoint) -> Result<()> {
1980        tracing::debug!(name = savepoint.name(), "releasing savepoint");
1981
1982        // SQL Server doesn't require explicit savepoint release
1983        // The savepoint is implicitly released on commit/rollback
1984        // This method exists for API completeness
1985        drop(savepoint);
1986        Ok(())
1987    }
1988
1989    /// Get a handle for cancelling the current query within the transaction.
1990    ///
1991    /// See [`Client<Ready>::cancel_handle`] for usage examples.
1992    #[must_use]
1993    pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
1994        self.connection_cancel_handle()
1995    }
1996}
1997
1998impl<S: ConnectionState> std::fmt::Debug for Client<S> {
1999    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2000        f.debug_struct("Client")
2001            .field("host", &self.config.host)
2002            .field("port", &self.config.port)
2003            .field("database", &self.config.database)
2004            .finish()
2005    }
2006}