Skip to main content

mssql_client/
client.rs

1//! SQL Server client implementation.
2//!
3//! ## DDL and statement routing
4//!
5//! [`Client::execute`] routes automatically by parameter count: with no
6//! parameters it sends a SQL batch (which permits DDL such as `CREATE` / `ALTER`
7//! / `DROP`); with parameters it uses `sp_executesql`, whose procedure context
8//! SQL Server forbids DDL in. Run DDL with an empty parameter slice:
9//!
10//! ```rust,no_run
11//! # async fn create_table(config: mssql_client::Config) -> Result<(), mssql_client::Error> {
12//! # let mut client = mssql_client::Client::connect(config).await?;
13//! client.execute("CREATE TABLE dbo.t (id INT)", &[]).await?;
14//! # Ok(())
15//! # }
16//! ```
17//!
18//! Use [`Client::simple_query`] for fire-and-forget batches (including
19//! multi-statement, `;`-separated DDL) when you don't need the affected-row count.
20
21// Allow unwrap/expect for chrono date construction with known-valid constant dates
22// and for regex patterns that are compile-time constants
23#![allow(clippy::unwrap_used, clippy::expect_used, clippy::needless_range_loop)]
24
25mod connect;
26mod params;
27mod response;
28
29use std::marker::PhantomData;
30
31use mssql_codec::connection::Connection;
32#[cfg(feature = "tls")]
33use mssql_tls::TlsStream;
34use tds_protocol::packet::PacketType;
35use tds_protocol::rpc::RpcRequest;
36use tds_protocol::token::{EnvChange, EnvChangeType};
37use tokio::net::TcpStream;
38
39use crate::config::Config;
40use crate::error::{Error, Result};
41#[cfg(feature = "otel")]
42use crate::instrumentation::InstrumentationContext;
43use crate::state::{ConnectionState, InTransaction, Ready};
44use crate::statement_cache::StatementCache;
45use crate::stream::{MultiResultStream, QueryStream};
46use crate::transaction::SavePoint;
47
48/// How long to wait for the server to acknowledge an Attention packet after
49/// a command timeout. SqlClient waits 5 seconds before dooming the
50/// connection; we match it.
51const ATTENTION_ACK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
52
53/// Run a network future under an optional command deadline.
54///
55/// On timeout this sends an Attention packet via `canceller` and then awaits
56/// the future so its own read loop drains the server's DONE_ATTN
57/// acknowledgement, leaving the connection clean before returning
58/// [`Error::CommandTimeout`]. This is the cancel-safe alternative to dropping
59/// the future (e.g. via `tokio::time::timeout`), which would leave unconsumed
60/// TDS data in the connection buffer and desync the next request.
61///
62/// The drain itself is bounded by [`ATTENTION_ACK_TIMEOUT`] — a hung server
63/// that never acknowledges the attention must not turn the timeout into an
64/// infinite wait. When the bound expires the connection is abandoned
65/// mid-response: `in_flight` stays set, so the pool discards the connection
66/// at check-in instead of reusing it.
67pub(crate) async fn run_with_deadline<F, T>(
68    fut: F,
69    deadline: Option<std::time::Duration>,
70    canceller: crate::cancel::CancelHandle,
71) -> Result<T>
72where
73    F: std::future::Future<Output = Result<T>>,
74{
75    let Some(d) = deadline else {
76        return fut.await;
77    };
78    tokio::pin!(fut);
79    tokio::select! {
80        biased;
81        res = &mut fut => res,
82        () = tokio::time::sleep(d) => {
83            // Signal cancellation, then let the in-flight read consume the
84            // server's attention acknowledgement so the connection stays usable.
85            let drain = async {
86                let _ = canceller.cancel().await;
87                let _ = (&mut fut).await;
88            };
89            if tokio::time::timeout(ATTENTION_ACK_TIMEOUT, drain).await.is_err() {
90                tracing::warn!(
91                    timeout = ?ATTENTION_ACK_TIMEOUT,
92                    "server did not acknowledge attention; abandoning the connection as dirty"
93                );
94            }
95            Err(Error::CommandTimeout)
96        }
97    }
98}
99
100/// SQL Server client with type-state connection management.
101///
102/// The generic parameter `S` represents the current connection state,
103/// ensuring at compile time that certain operations are only available
104/// in appropriate states.
105pub struct Client<S: ConnectionState> {
106    config: Config,
107    _state: PhantomData<S>,
108    /// The underlying connection (present only when connected)
109    connection: Option<ConnectionHandle>,
110    /// Server version from LoginAck (raw u32 TDS version)
111    server_version: Option<u32>,
112    /// Current database from EnvChange
113    current_database: Option<String>,
114    /// Server's default collation from SqlCollation EnvChange during login.
115    /// Used when `SendStringParametersAsUnicode=false` to encode VARCHAR
116    /// parameters with the correct character encoding and collation bytes.
117    server_collation: Option<tds_protocol::token::Collation>,
118    /// Prepared statement cache for query optimization
119    statement_cache: StatementCache,
120    /// Transaction descriptor from BeginTransaction EnvChange.
121    /// Per MS-TDS spec, this value must be included in ALL_HEADERS for subsequent
122    /// requests within an explicit transaction. 0 indicates auto-commit mode.
123    transaction_descriptor: u64,
124    /// Whether a request has been sent and the response has not yet been fully read.
125    /// Used by the connection pool to detect dirty connections after cancel/timeout.
126    in_flight: bool,
127    /// Whether this connection needs a reset on next use.
128    /// Set by connection pool on checkin, cleared after first query/execute.
129    /// When true, the RESETCONNECTION flag is set on the first TDS packet.
130    needs_reset: bool,
131    /// OpenTelemetry instrumentation context (when otel feature is enabled)
132    #[cfg(feature = "otel")]
133    instrumentation: InstrumentationContext,
134    /// Always Encrypted context for column decryption (when always-encrypted feature is enabled)
135    #[cfg(feature = "always-encrypted")]
136    pub(crate) encryption_context: Option<std::sync::Arc<crate::encryption::EncryptionContext>>,
137}
138
139/// Internal connection handle wrapping the actual connection.
140///
141/// This is an enum to support different connection types:
142/// - TLS (TDS 8.0 strict mode) - requires `tls` feature
143/// - TLS with PreLogin wrapping (TDS 7.x style) - requires `tls` feature
144/// - Plain TCP (for internal networks or when `tls` feature is disabled)
145#[allow(dead_code)] // Connection will be used once query execution is implemented
146enum ConnectionHandle {
147    /// TLS connection (TDS 8.0 strict mode - TLS before any TDS traffic)
148    #[cfg(feature = "tls")]
149    Tls(Connection<TlsStream<TcpStream>>),
150    /// TLS connection with PreLogin wrapping (TDS 7.x style)
151    #[cfg(feature = "tls")]
152    TlsPrelogin(Connection<TlsStream<mssql_tls::TlsPreloginWrapper<TcpStream>>>),
153    /// Plain TCP connection (for internal networks or when `tls` feature is disabled)
154    Plain(Connection<TcpStream>),
155}
156
157/// The parameter `TypeInfo` to declare a typed NULL ([`crate::null`]) with, from
158/// its [`crate::ToSql::sql_type`] name. Returns `None` for an untyped NULL
159/// (`Option::None`, type `"NULL"`), which falls back to the default param type.
160#[cfg(feature = "always-encrypted")]
161fn null_param_type_info(sql_type: &str) -> Option<tds_protocol::rpc::TypeInfo> {
162    use tds_protocol::rpc::TypeInfo;
163    Some(match sql_type {
164        "BIT" => TypeInfo::bit(),
165        "TINYINT" => TypeInfo::tinyint(),
166        "SMALLINT" => TypeInfo::smallint(),
167        "INT" => TypeInfo::int(),
168        "BIGINT" => TypeInfo::bigint(),
169        "REAL" => TypeInfo::real(),
170        "FLOAT" => TypeInfo::float(),
171        "NVARCHAR" => TypeInfo::nvarchar(1),
172        "VARBINARY" => TypeInfo::varbinary(1),
173        "UNIQUEIDENTIFIER" => TypeInfo::uuid(),
174        "DATE" => TypeInfo::date(),
175        _ => return None,
176    })
177}
178
179/// Map a typed-parameter wrapper's [`EncryptedParamType`] to the `TypeInfo` the
180/// driver declares it as (for `sp_describe_parameter_encryption` and the
181/// `CryptoMetadata` base type). Unknown future variants error rather than
182/// silently declaring the wrong type.
183#[cfg(feature = "always-encrypted")]
184fn encrypted_param_type_info(
185    ty: mssql_types::EncryptedParamType,
186) -> Result<tds_protocol::rpc::TypeInfo> {
187    use mssql_types::EncryptedParamType as E;
188    use tds_protocol::rpc::TypeInfo;
189    Ok(match ty {
190        E::Decimal { precision, scale } => TypeInfo::decimal(precision, scale),
191        E::Time { scale } => TypeInfo::time(scale),
192        E::DateTime2 { scale } => TypeInfo::datetime2(scale),
193        E::DateTimeOffset { scale } => TypeInfo::datetimeoffset(scale),
194        E::DateTime => TypeInfo::datetime(),
195        E::Char { length } => TypeInfo::char(length),
196        E::NChar { length } => TypeInfo::nchar(length),
197        E::Binary { length } => TypeInfo::binary(length),
198        _ => {
199            return Err(Error::Encryption(
200                "unsupported Always Encrypted parameter type".to_string(),
201            ));
202        }
203    })
204}
205
206// Private helper methods available to all connection states
207impl<S: ConnectionState> Client<S> {
208    /// The default per-command deadline from `command_timeout`.
209    ///
210    /// Returns `None` when `command_timeout` is zero, which means "no limit"
211    /// (matching ADO.NET's `SqlCommand.CommandTimeout = 0`).
212    pub(crate) fn command_deadline(&self) -> Option<std::time::Duration> {
213        let t = self.config.command_timeout;
214        if t.is_zero() { None } else { Some(t) }
215    }
216
217    /// Build a cancel handle for the current connection, regardless of
218    /// connection state. The public, documented surface is
219    /// [`Client::<Ready>::cancel_handle`]; both state-specific methods
220    /// delegate here.
221    pub(crate) fn connection_cancel_handle(&self) -> crate::cancel::CancelHandle {
222        let connection = self
223            .connection
224            .as_ref()
225            .expect("connection should be present");
226        match connection {
227            #[cfg(feature = "tls")]
228            ConnectionHandle::Tls(conn) => {
229                crate::cancel::CancelHandle::from_tls(conn.cancel_handle())
230            }
231            #[cfg(feature = "tls")]
232            ConnectionHandle::TlsPrelogin(conn) => {
233                crate::cancel::CancelHandle::from_tls_prelogin(conn.cancel_handle())
234            }
235            ConnectionHandle::Plain(conn) => {
236                crate::cancel::CancelHandle::from_plain(conn.cancel_handle())
237            }
238        }
239    }
240
241    /// Process transaction-related EnvChange tokens.
242    ///
243    /// This handles BeginTransaction, CommitTransaction, and RollbackTransaction
244    /// EnvChange tokens, updating the transaction descriptor accordingly.
245    ///
246    /// This enables executing BEGIN TRANSACTION, COMMIT, and ROLLBACK via raw SQL
247    /// while still having the transaction descriptor tracked correctly.
248    fn process_transaction_env_change(env: &EnvChange, transaction_descriptor: &mut u64) {
249        use tds_protocol::token::EnvChangeValue;
250
251        match env.env_type {
252            EnvChangeType::BeginTransaction => {
253                if let EnvChangeValue::Binary(ref data) = env.new_value {
254                    if data.len() >= 8 {
255                        let descriptor = u64::from_le_bytes([
256                            data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
257                        ]);
258                        tracing::debug!(descriptor = descriptor, "transaction started via raw SQL");
259                        *transaction_descriptor = descriptor;
260                    }
261                }
262            }
263            EnvChangeType::CommitTransaction | EnvChangeType::RollbackTransaction => {
264                tracing::debug!(
265                    env_type = ?env.env_type,
266                    "transaction ended via raw SQL"
267                );
268                *transaction_descriptor = 0;
269            }
270            _ => {}
271        }
272    }
273
274    /// Send a SQL batch to the server.
275    ///
276    /// Uses the client's current transaction descriptor in ALL_HEADERS.
277    /// Per MS-TDS spec, when in an explicit transaction, the descriptor
278    /// returned by BeginTransaction must be included.
279    ///
280    /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
281    /// is included in the first packet to reset connection state.
282    async fn send_sql_batch(&mut self, sql: &str) -> Result<()> {
283        let payload =
284            tds_protocol::encode_sql_batch_with_transaction(sql, self.transaction_descriptor);
285        let max_packet = self.config.packet_size as usize;
286
287        // Check if we need to reset the connection on this request
288        let reset = self.needs_reset;
289        if reset {
290            self.needs_reset = false; // Clear flag before sending
291            tracing::debug!("sending SQL batch with RESETCONNECTION flag");
292        }
293
294        self.in_flight = true;
295        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
296
297        match connection {
298            #[cfg(feature = "tls")]
299            ConnectionHandle::Tls(conn) => {
300                conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
301                    .await?;
302            }
303            #[cfg(feature = "tls")]
304            ConnectionHandle::TlsPrelogin(conn) => {
305                conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
306                    .await?;
307            }
308            ConnectionHandle::Plain(conn) => {
309                conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
310                    .await?;
311            }
312        }
313
314        Ok(())
315    }
316
317    /// Send an RPC request to the server.
318    ///
319    /// Uses the client's current transaction descriptor in ALL_HEADERS.
320    ///
321    /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
322    /// is included in the first packet to reset connection state.
323    pub(crate) async fn send_rpc(&mut self, rpc: &RpcRequest) -> Result<()> {
324        let payload = rpc.encode_with_transaction(self.transaction_descriptor);
325        let max_packet = self.config.packet_size as usize;
326
327        // Check if we need to reset the connection on this request
328        let reset = self.needs_reset;
329        if reset {
330            self.needs_reset = false; // Clear flag before sending
331            tracing::debug!("sending RPC with RESETCONNECTION flag");
332        }
333
334        self.in_flight = true;
335        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
336
337        match connection {
338            #[cfg(feature = "tls")]
339            ConnectionHandle::Tls(conn) => {
340                conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
341                    .await?;
342            }
343            #[cfg(feature = "tls")]
344            ConnectionHandle::TlsPrelogin(conn) => {
345                conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
346                    .await?;
347            }
348            ConnectionHandle::Plain(conn) => {
349                conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
350                    .await?;
351            }
352        }
353
354        Ok(())
355    }
356
357    /// Start building a stored procedure call with full control over parameters.
358    ///
359    /// Returns a [`crate::procedure::ProcedureBuilder`] that allows adding named input and output
360    /// parameters before executing the call.
361    ///
362    /// The procedure name is validated to prevent SQL injection. It may be
363    /// schema-qualified (e.g., `"dbo.MyProc"`).
364    ///
365    /// # Example
366    ///
367    /// ```rust,no_run
368    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
369    /// let result = client.procedure("dbo.CalculateSum")?
370    ///     .input("@a", &10i32)
371    ///     .input("@b", &20i32)
372    ///     .output_int("@result")
373    ///     .execute().await?;
374    ///
375    /// let sum = result.get_output("@result").unwrap();
376    /// # let _ = sum;
377    /// # Ok(())
378    /// # }
379    /// ```
380    pub fn procedure(
381        &mut self,
382        proc_name: &str,
383    ) -> Result<crate::procedure::ProcedureBuilder<'_, S>> {
384        crate::validation::validate_qualified_identifier(proc_name)?;
385        Ok(crate::procedure::ProcedureBuilder::new(self, proc_name))
386    }
387
388    /// Execute a stored procedure with positional input parameters.
389    ///
390    /// This is a convenience method for the common case of calling a procedure
391    /// with input-only parameters. For output parameters or named parameters,
392    /// use [`procedure()`](Client::procedure) instead.
393    ///
394    /// # Example
395    ///
396    /// ```rust,no_run
397    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
398    /// let result = client.call_procedure("dbo.GetUser", &[&1i32]).await?;
399    /// assert_eq!(result.return_value, 0);
400    ///
401    /// if let Some(rs) = result.first_result_set() {
402    ///     println!("columns: {:?}", rs.columns());
403    /// }
404    /// # Ok(())
405    /// # }
406    /// ```
407    pub async fn call_procedure(
408        &mut self,
409        proc_name: &str,
410        params: &[&(dyn crate::ToSql + Sync)],
411    ) -> Result<crate::stream::ProcedureResult> {
412        crate::validation::validate_qualified_identifier(proc_name)?;
413
414        tracing::debug!(
415            proc_name = proc_name,
416            params_count = params.len(),
417            "executing stored procedure"
418        );
419
420        let rpc_params =
421            Self::convert_params_positional(params, self.send_unicode(), self.server_collation())?;
422        let mut rpc = RpcRequest::named(proc_name);
423        for param in rpc_params {
424            rpc = rpc.param(param);
425        }
426
427        let deadline = self.command_deadline();
428        let canceller = self.connection_cancel_handle();
429        run_with_deadline(
430            async {
431                self.send_rpc(&rpc).await?;
432                self.read_procedure_result().await
433            },
434            deadline,
435            canceller,
436        )
437        .await
438    }
439
440    /// Ask the server how each parameter of a statement must be encrypted.
441    ///
442    /// Issues the `sp_describe_parameter_encryption` system RPC for the
443    /// parameterized statement `tsql` with the parameter declaration `params`
444    /// (e.g. `"@id int, @name nvarchar(64)"`), and parses the two result sets
445    /// into a [`ParameterEncryptionInfo`](crate::ParameterEncryptionInfo): the
446    /// CEK table, plus — for each parameter the server reports as encrypted —
447    /// which CEK and whether deterministic or randomized. Parameters the server
448    /// reports as plaintext are omitted.
449    ///
450    /// This is the first step of Always Encrypted parameter encryption; the
451    /// connection must have negotiated it (`Column Encryption Setting=Enabled`).
452    #[cfg(feature = "always-encrypted")]
453    pub async fn describe_parameter_encryption(
454        &mut self,
455        tsql: &str,
456        params: &str,
457    ) -> Result<crate::ParameterEncryptionInfo> {
458        let tsql_arg = tsql.to_string();
459        let params_arg = params.to_string();
460        let mut result = self
461            .call_procedure(
462                "sp_describe_parameter_encryption",
463                &[&tsql_arg, &params_arg],
464            )
465            .await?;
466        crate::ParameterEncryptionInfo::from_describe_result_sets(&mut result.result_sets)
467    }
468
469    /// Build the `sp_executesql` request for a parameterized statement.
470    ///
471    /// When the connection has Always Encrypted enabled, parameters the server
472    /// reports as encrypted are encrypted client-side first (an extra
473    /// `sp_describe_parameter_encryption` round-trip). Otherwise this is the
474    /// plain parameter conversion.
475    pub(crate) async fn build_parameterized_rpc(
476        &mut self,
477        sql: &str,
478        params: &[&(dyn crate::ToSql + Sync)],
479    ) -> Result<RpcRequest> {
480        #[cfg(feature = "always-encrypted")]
481        if self.encryption_context.is_some() {
482            return self.build_encrypted_sql_rpc(sql, params).await;
483        }
484        let rpc_params =
485            Self::convert_params(params, self.send_unicode(), self.server_collation())?;
486        Ok(RpcRequest::execute_sql(sql, rpc_params))
487    }
488
489    /// Encrypt the Always Encrypted parameters of a statement, then build its
490    /// `sp_executesql` request.
491    ///
492    /// Asks the server which parameters are encrypted
493    /// ([`describe_parameter_encryption`](Self::describe_parameter_encryption)),
494    /// then for each one normalizes the value, resolves its column encryption
495    /// key, encrypts, and emits an encrypted RPC parameter. Parameters the
496    /// server reports as plaintext are sent unchanged.
497    #[cfg(feature = "always-encrypted")]
498    async fn build_encrypted_sql_rpc(
499        &mut self,
500        sql: &str,
501        params: &[&(dyn crate::ToSql + Sync)],
502    ) -> Result<RpcRequest> {
503        use tds_protocol::rpc::RpcParam;
504
505        let Some(ctx) = self.encryption_context.clone() else {
506            let rpc_params =
507                Self::convert_params(params, self.send_unicode(), self.server_collation())?;
508            return Ok(RpcRequest::execute_sql(sql, rpc_params));
509        };
510
511        // Resolve each parameter's value once (AE normalization needs the typed
512        // value, not the wire encoding) and build the plaintext RPC params.
513        let send_unicode = self.send_unicode();
514        let collation = self.server_collation().cloned();
515        let mut values: Vec<mssql_types::SqlValue> = Vec::with_capacity(params.len());
516        let mut plaintext: Vec<RpcParam> = Vec::with_capacity(params.len());
517        let mut hints: Vec<Option<mssql_types::EncryptedParamType>> =
518            Vec::with_capacity(params.len());
519        for (i, p) in params.iter().enumerate() {
520            let name = format!("@p{}", i + 1);
521            let value = p.to_sql()?;
522            let hint = p.encrypted_param_type();
523            // A typed NULL (e.g. `null::<i32>()`) is declared by its SQL type so
524            // describe accepts it against the target encrypted column; an untyped
525            // NULL falls back to the default in `sql_value_to_rpc_param`.
526            let rpc_param = match (&value, null_param_type_info(p.sql_type())) {
527                (mssql_types::SqlValue::Null, Some(type_info)) => RpcParam::null(&name, type_info),
528                _ => {
529                    let mut param = Self::sql_value_to_rpc_param(
530                        &name,
531                        &value,
532                        send_unicode,
533                        collation.as_ref(),
534                    )?;
535                    // A typed-parameter wrapper (e.g. `numeric(v, p, s)`,
536                    // `datetime2(v, scale)`) declares an explicit SQL type so
537                    // describe matches the encrypted column exactly — the value
538                    // alone cannot convey precision/scale or the legacy-`datetime`
539                    // vs `datetime2` distinction.
540                    if let Some(ty) = hint {
541                        param.type_info = encrypted_param_type_info(ty)?;
542                    }
543                    param
544                }
545            };
546            plaintext.push(rpc_param);
547            values.push(value);
548            hints.push(hint);
549        }
550
551        if plaintext.is_empty() {
552            return Ok(RpcRequest::execute_sql(sql, plaintext));
553        }
554
555        // Ask the server which parameters need encryption.
556        let declarations = RpcRequest::build_param_declarations(&plaintext);
557        let info = self
558            .describe_parameter_encryption(sql, &declarations)
559            .await?;
560        if info.parameters.is_empty() {
561            return Ok(RpcRequest::execute_sql(sql, plaintext));
562        }
563
564        // Encrypt the flagged parameters; pass the rest through untouched.
565        let mut final_params: Vec<RpcParam> = Vec::with_capacity(plaintext.len());
566        for ((value, param), hint) in values.into_iter().zip(plaintext).zip(hints) {
567            let Some(crypto) = info.get_parameter(&param.name) else {
568                final_params.push(param);
569                continue;
570            };
571            let entry = info.cek_table.get(crypto.cek_ordinal).ok_or_else(|| {
572                Error::Protocol(format!(
573                    "encrypted parameter {} references missing CEK ordinal {}",
574                    param.name, crypto.cek_ordinal
575                ))
576            })?;
577            let metadata = tds_protocol::rpc::EncryptedParamMetadata {
578                base_type_info: param.type_info.clone(),
579                algorithm_id: crypto.algorithm_id,
580                encryption_type: crypto.encryption_type,
581                database_id: entry.database_id,
582                cek_id: entry.cek_id,
583                cek_version: entry.cek_version,
584                cek_md_version: entry.cek_md_version,
585                normalization_rule_version: crypto.normalization_rule_version,
586            };
587            // A NULL value bound to an encrypted column is sent as an encrypted
588            // NULL (the server rejects a plaintext parameter for an encrypted
589            // column); there is nothing to encrypt.
590            if matches!(value, mssql_types::SqlValue::Null) {
591                final_params.push(RpcParam::encrypted_null(param.name, metadata));
592                continue;
593            }
594            let normalized = crate::encryption::normalize_for_encryption(&value, hint)?;
595            let ciphertext = ctx
596                .encrypt_value(&normalized, entry, crypto.encryption_type)
597                .await?;
598            final_params.push(RpcParam::encrypted(
599                param.name,
600                bytes::Bytes::from(ciphertext),
601                metadata,
602            ));
603        }
604
605        Ok(RpcRequest::execute_sql(sql, final_params))
606    }
607
608    /// Start a bulk insert operation for the specified table.
609    ///
610    /// Sends the `INSERT BULK` statement to the server and returns a
611    /// [`crate::bulk::BulkWriter`] for streaming rows. The writer holds
612    /// a mutable borrow on the client, preventing other operations while
613    /// the bulk insert is in progress.
614    ///
615    /// # Example
616    ///
617    /// ```rust,no_run
618    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
619    /// use mssql_client::{BulkInsertBuilder, BulkColumn, SqlValue};
620    ///
621    /// let builder = BulkInsertBuilder::new("dbo.Users")
622    ///     .with_typed_columns(vec![
623    ///         BulkColumn::new("id", "INT", 0)?,
624    ///         BulkColumn::new("name", "NVARCHAR(100)", 1)?,
625    ///     ]);
626    ///
627    /// let mut writer = client.bulk_insert(&builder).await?;
628    /// writer.send_row_values(&[SqlValue::Int(1), SqlValue::String("Alice".into())])?;
629    /// writer.send_row_values(&[SqlValue::Int(2), SqlValue::String("Bob".into())])?;
630    /// let result = writer.finish().await?;
631    /// println!("Inserted {} rows", result.rows_affected);
632    /// # Ok(())
633    /// # }
634    /// ```
635    pub async fn bulk_insert(
636        &mut self,
637        builder: &crate::bulk::BulkInsertBuilder,
638    ) -> Result<crate::bulk::BulkWriter<'_, S>> {
639        use tds_protocol::token::{ColMetaData, Token};
640
641        tracing::debug!(
642            table = builder.table_name(),
643            columns = builder.columns().len(),
644            "starting bulk insert"
645        );
646
647        // Step 1: Query the server for column metadata.
648        // This gives us the exact type encoding the server expects for BulkLoad,
649        // following the pattern established by Tiberius.
650        let meta_query = format!("SELECT TOP 0 * FROM {}", builder.table_name());
651        let deadline = self.command_deadline();
652        let canceller = self.connection_cancel_handle();
653        let message = run_with_deadline(
654            async {
655                self.send_sql_batch(&meta_query).await?;
656                self.read_response_message().await
657            },
658            deadline,
659            canceller,
660        )
661        .await?;
662        self.in_flight = false;
663
664        // Capture both the raw COLMETADATA bytes and parsed column info
665        let raw_payload = message.payload.clone();
666        let mut parser = self.create_parser(message.payload);
667        let mut server_metadata: Option<ColMetaData> = None;
668        let mut meta_start: usize = 0;
669        let mut meta_end: usize = 0;
670
671        loop {
672            let pos_before = raw_payload.len() - parser.remaining();
673            let token = parser.next_token_with_metadata(server_metadata.as_ref())?;
674            let pos_after = raw_payload.len() - parser.remaining();
675            let Some(token) = token else { break };
676
677            match token {
678                Token::ColMetaData(meta) => {
679                    meta_start = pos_before;
680                    meta_end = pos_after;
681                    server_metadata = Some(meta);
682                }
683                Token::Done(_) => break,
684                _ => {}
685            }
686        }
687
688        // Reject deprecated TEXT/NTEXT/IMAGE columns reported by the server.
689        // These types require a legacy TEXTPTR wire format that this driver
690        // does not support — users should migrate the column to VARCHAR(MAX) /
691        // NVARCHAR(MAX) / VARBINARY(MAX).
692        if let Some(ref meta) = server_metadata {
693            use tds_protocol::types::TypeId;
694            for col in meta.columns.iter() {
695                let (rejected, replacement) = match col.type_id {
696                    TypeId::Text => (Some("TEXT"), "VARCHAR(MAX)"),
697                    TypeId::NText => (Some("NTEXT"), "NVARCHAR(MAX)"),
698                    TypeId::Image => (Some("IMAGE"), "VARBINARY(MAX)"),
699                    _ => (None, ""),
700                };
701                if let Some(sql_type) = rejected {
702                    return Err(Error::from(mssql_types::TypeError::UnsupportedType {
703                        sql_type: sql_type.to_string(),
704                        reason: format!(
705                            "column `{}` in table `{}` is {} — TEXT/NTEXT/IMAGE \
706                             are not supported. Alter the column to {} instead \
707                             (Microsoft deprecated TEXT/NTEXT/IMAGE in SQL \
708                             Server 2005).",
709                            col.name,
710                            builder.table_name(),
711                            sql_type,
712                            replacement,
713                        ),
714                    }));
715                }
716            }
717        }
718
719        // Step 2: Send INSERT BULK statement to put server in bulk load mode
720        let stmt = builder.build_insert_bulk_statement()?;
721        let deadline = self.command_deadline();
722        let canceller = self.connection_cancel_handle();
723        run_with_deadline(
724            async {
725                self.send_sql_batch(&stmt).await?;
726                self.read_execute_result().await
727            },
728            deadline,
729            canceller,
730        )
731        .await?;
732
733        // Step 3: Create bulk writer with server's metadata
734        let raw_meta = if meta_end > meta_start {
735            Some(raw_payload.slice(meta_start..meta_end))
736        } else {
737            None
738        };
739
740        let server_cols = server_metadata.as_ref().map(|m| m.columns.as_slice());
741        let bulk = crate::bulk::BulkInsert::new_with_server_metadata(
742            builder.columns().to_vec(),
743            builder.options().batch_size,
744            raw_meta,
745            server_cols,
746        );
747
748        Ok(crate::bulk::BulkWriter::new(self, bulk))
749    }
750
751    /// Start a bulk insert without querying the server for column metadata.
752    ///
753    /// Unlike [`bulk_insert()`](Self::bulk_insert), this method does not send
754    /// `SELECT TOP 0 * FROM table` to discover column types. Instead, the
755    /// column metadata is constructed from the `BulkColumn` types provided
756    /// on the builder. This saves a round-trip when the schema is known.
757    ///
758    /// # Caveats
759    ///
760    /// The caller must ensure `BulkColumn` entries match the target table's
761    /// column definitions exactly. Mismatched types, lengths, precision/scale,
762    /// or column ordering will cause the server to reject the BulkLoad packet.
763    ///
764    /// For most use cases, prefer [`bulk_insert()`](Self::bulk_insert) — the
765    /// extra round-trip is usually negligible and the server-supplied metadata
766    /// is guaranteed correct.
767    pub async fn bulk_insert_without_schema_discovery(
768        &mut self,
769        builder: &crate::bulk::BulkInsertBuilder,
770    ) -> Result<crate::bulk::BulkWriter<'_, S>> {
771        tracing::debug!(
772            table = builder.table_name(),
773            columns = builder.columns().len(),
774            "starting bulk insert (no schema discovery)"
775        );
776
777        // Send INSERT BULK statement to put server in bulk load mode
778        let stmt = builder.build_insert_bulk_statement()?;
779        let deadline = self.command_deadline();
780        let canceller = self.connection_cancel_handle();
781        run_with_deadline(
782            async {
783                self.send_sql_batch(&stmt).await?;
784                self.read_execute_result().await
785            },
786            deadline,
787            canceller,
788        )
789        .await?;
790
791        // Create bulk writer with hand-crafted metadata
792        let bulk =
793            crate::bulk::BulkInsert::new(builder.columns().to_vec(), builder.options().batch_size);
794
795        Ok(crate::bulk::BulkWriter::new(self, bulk))
796    }
797
798    /// Send bulk load data as a BulkLoad (0x07) message and read the server response.
799    ///
800    /// Used internally by [`crate::bulk::BulkWriter::finish()`] to transmit accumulated
801    /// row data after the `INSERT BULK` statement has been acknowledged.
802    pub(crate) async fn send_and_read_bulk_load(&mut self, payload: bytes::Bytes) -> Result<u64> {
803        let max_packet = self.config.packet_size as usize;
804
805        self.in_flight = true;
806        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
807
808        match connection {
809            #[cfg(feature = "tls")]
810            ConnectionHandle::Tls(conn) => {
811                conn.send_message(PacketType::BulkLoad, payload, max_packet)
812                    .await?;
813            }
814            #[cfg(feature = "tls")]
815            ConnectionHandle::TlsPrelogin(conn) => {
816                conn.send_message(PacketType::BulkLoad, payload, max_packet)
817                    .await?;
818            }
819            ConnectionHandle::Plain(conn) => {
820                conn.send_message(PacketType::BulkLoad, payload, max_packet)
821                    .await?;
822            }
823        }
824
825        // Read the server's Done response with row count
826        self.read_execute_result().await
827    }
828
829    /// Execute a query with named parameters and return a streaming result set.
830    ///
831    /// This method accepts [`NamedParam`](crate::to_params::NamedParam) values,
832    /// making it compatible with the [`ToParams`](crate::to_params::ToParams) trait
833    /// and the `#[derive(ToParams)]` macro.
834    ///
835    /// # Example
836    ///
837    /// ```rust,no_run
838    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
839    /// use mssql_client::{NamedParam, ToParams};
840    ///
841    /// // With derive macro:
842    /// #[derive(mssql_derive::ToParams)]
843    /// struct UserQuery { name: String }
844    ///
845    /// let q = UserQuery { name: "Alice".into() };
846    /// let rows = client.query_named(
847    ///     "SELECT * FROM users WHERE name = @name",
848    ///     &q.to_params()?,
849    /// ).await?;
850    ///
851    /// // Or manually:
852    /// let params = vec![NamedParam::from_value("name", &"Alice")?];
853    /// let rows = client.query_named(
854    ///     "SELECT * FROM users WHERE name = @name",
855    ///     &params,
856    /// ).await?;
857    /// # let _ = rows;
858    /// # Ok(())
859    /// # }
860    /// ```
861    pub async fn query_named<'a>(
862        &'a mut self,
863        sql: &str,
864        params: &[crate::to_params::NamedParam],
865    ) -> Result<QueryStream<'a>> {
866        tracing::debug!(
867            sql = sql,
868            params_count = params.len(),
869            "executing query with named parameters"
870        );
871
872        if params.is_empty() {
873            self.send_sql_batch(sql).await?;
874        } else {
875            let rpc_params =
876                Self::convert_named_params(params, self.send_unicode(), self.server_collation())?;
877            let rpc = RpcRequest::execute_sql(sql, rpc_params);
878            self.send_rpc(&rpc).await?;
879        }
880
881        let resp = self.read_query_response().await?;
882        #[cfg(feature = "always-encrypted")]
883        {
884            Ok(QueryStream::from_raw(
885                resp.columns,
886                resp.pending_rows,
887                resp.meta,
888                resp.decryptor,
889            ))
890        }
891        #[cfg(not(feature = "always-encrypted"))]
892        {
893            Ok(QueryStream::from_raw(
894                resp.columns,
895                resp.pending_rows,
896                resp.meta,
897            ))
898        }
899    }
900
901    /// Execute a statement with named parameters.
902    ///
903    /// Returns the number of affected rows. This is the named-parameter
904    /// counterpart of [`execute()`](Client::execute), compatible with the
905    /// [`ToParams`](crate::to_params::ToParams) trait.
906    ///
907    /// # Example
908    ///
909    /// ```rust,no_run
910    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
911    /// use mssql_client::NamedParam;
912    ///
913    /// let params = vec![
914    ///     NamedParam::from_value("name", &"Alice")?,
915    ///     NamedParam::from_value("email", &"alice@example.com")?,
916    /// ];
917    /// let rows_affected = client.execute_named(
918    ///     "INSERT INTO users (name, email) VALUES (@name, @email)",
919    ///     &params,
920    /// ).await?;
921    /// # let _ = rows_affected;
922    /// # Ok(())
923    /// # }
924    /// ```
925    pub async fn execute_named(
926        &mut self,
927        sql: &str,
928        params: &[crate::to_params::NamedParam],
929    ) -> Result<u64> {
930        tracing::debug!(
931            sql = sql,
932            params_count = params.len(),
933            "executing statement with named parameters"
934        );
935
936        let deadline = self.command_deadline();
937        let canceller = self.connection_cancel_handle();
938        run_with_deadline(
939            async {
940                if params.is_empty() {
941                    self.send_sql_batch(sql).await?;
942                } else {
943                    let rpc_params = Self::convert_named_params(
944                        params,
945                        self.send_unicode(),
946                        self.server_collation(),
947                    )?;
948                    let rpc = RpcRequest::execute_sql(sql, rpc_params);
949                    self.send_rpc(&rpc).await?;
950                }
951
952                self.read_execute_result().await
953            },
954            deadline,
955            canceller,
956        )
957        .await
958    }
959
960    /// Whether string parameters are sent as NVARCHAR (Unicode).
961    pub(crate) fn send_unicode(&self) -> bool {
962        self.config.send_string_parameters_as_unicode
963    }
964
965    /// Server's default collation, captured from ENVCHANGE during login.
966    pub(crate) fn server_collation(&self) -> Option<&tds_protocol::token::Collation> {
967        self.server_collation.as_ref()
968    }
969}
970
971impl Client<Ready> {
972    /// Mark this connection as needing a reset on next use.
973    ///
974    /// Called by the connection pool when a connection is returned.
975    /// The next SQL batch or RPC will include the RESETCONNECTION flag
976    /// in the TDS packet header, causing SQL Server to reset connection
977    /// state (temp tables, SET options, transaction isolation level, etc.)
978    /// before executing the command.
979    ///
980    /// This is more efficient than calling `sp_reset_connection` as a
981    /// separate command because it's handled at the TDS protocol level.
982    pub fn mark_needs_reset(&mut self) {
983        self.needs_reset = true;
984    }
985
986    /// Check if this connection needs a reset.
987    ///
988    /// Returns true if `mark_needs_reset()` was called and the reset
989    /// hasn't been performed yet.
990    #[must_use]
991    pub fn needs_reset(&self) -> bool {
992        self.needs_reset
993    }
994
995    /// Execute a query and return a result set with lazy per-row decoding.
996    ///
997    /// Per ADR-007 the full response is buffered in memory and each row is
998    /// *decoded* on demand as you iterate — this is not incremental network
999    /// streaming, so peak memory tracks the response size. Use
1000    /// `.collect_all()` if you want all rows materialized into a `Vec` up
1001    /// front.
1002    ///
1003    /// # Example
1004    ///
1005    /// ```rust,no_run
1006    /// # use mssql_client::Row;
1007    /// # fn process(_: &Row) {}
1008    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1009    /// // Streaming (synchronous iteration over the result set)
1010    /// let stream = client.query("SELECT * FROM users WHERE id = @p1", &[&1]).await?;
1011    /// for row in stream {
1012    ///     let row = row?;
1013    ///     process(&row);
1014    /// }
1015    ///
1016    /// // Buffered (loads all into memory)
1017    /// let rows: Vec<Row> = client
1018    ///     .query("SELECT * FROM small_table", &[])
1019    ///     .await?
1020    ///     .collect_all()
1021    ///     .await?;
1022    /// # let _ = rows;
1023    /// # Ok(())
1024    /// # }
1025    /// ```
1026    pub async fn query<'a>(
1027        &'a mut self,
1028        sql: &str,
1029        params: &[&(dyn crate::ToSql + Sync)],
1030    ) -> Result<QueryStream<'a>> {
1031        let deadline = self.command_deadline();
1032        self.query_inner(sql, params, deadline).await
1033    }
1034
1035    /// Shared query implementation with an explicit command deadline.
1036    async fn query_inner<'a>(
1037        &'a mut self,
1038        sql: &str,
1039        params: &[&(dyn crate::ToSql + Sync)],
1040        deadline: Option<std::time::Duration>,
1041    ) -> Result<QueryStream<'a>> {
1042        tracing::debug!(sql = sql, params_count = params.len(), "executing query");
1043
1044        #[cfg(feature = "otel")]
1045        let instrumentation = self.instrumentation.clone();
1046        #[cfg(feature = "otel")]
1047        let mut span = instrumentation.query_span(sql);
1048        #[cfg(feature = "otel")]
1049        let timer = crate::instrumentation::OperationTimer::start(
1050            crate::instrumentation::extract_operation(sql),
1051        );
1052
1053        let canceller = self.cancel_handle();
1054        let result = run_with_deadline(
1055            async {
1056                if params.is_empty() {
1057                    // Simple query without parameters - use SQL batch
1058                    self.send_sql_batch(sql).await?;
1059                } else {
1060                    // Parameterized query - sp_executesql (encrypts Always Encrypted params).
1061                    let rpc = self.build_parameterized_rpc(sql, params).await?;
1062                    self.send_rpc(&rpc).await?;
1063                }
1064
1065                // Read complete response including columns and rows
1066                self.read_query_response().await
1067            },
1068            deadline,
1069            canceller,
1070        )
1071        .await;
1072
1073        #[cfg(feature = "otel")]
1074        match &result {
1075            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1076            Err(e) => InstrumentationContext::record_error(&mut span, e),
1077        }
1078        #[cfg(feature = "otel")]
1079        timer.finish(instrumentation.metrics(), result.is_ok());
1080
1081        // Drop the span before returning
1082        #[cfg(feature = "otel")]
1083        drop(span);
1084
1085        let resp = result?;
1086        #[cfg(feature = "always-encrypted")]
1087        {
1088            Ok(QueryStream::from_raw(
1089                resp.columns,
1090                resp.pending_rows,
1091                resp.meta,
1092                resp.decryptor,
1093            ))
1094        }
1095        #[cfg(not(feature = "always-encrypted"))]
1096        {
1097            Ok(QueryStream::from_raw(
1098                resp.columns,
1099                resp.pending_rows,
1100                resp.meta,
1101            ))
1102        }
1103    }
1104
1105    /// Execute a query with a specific timeout.
1106    ///
1107    /// This overrides the default `command_timeout` from the connection configuration
1108    /// for this specific query. If the query does not complete within the specified
1109    /// duration, the driver sends an Attention packet to cancel it server-side,
1110    /// drains the acknowledgement, and returns [`Error::CommandTimeout`] with the
1111    /// connection left usable for the next request.
1112    ///
1113    /// # Arguments
1114    ///
1115    /// * `sql` - The SQL query to execute
1116    /// * `params` - Query parameters
1117    /// * `timeout_duration` - Maximum time to wait for the query to complete
1118    ///
1119    /// # Example
1120    ///
1121    /// ```rust,no_run
1122    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1123    /// use std::time::Duration;
1124    ///
1125    /// // Execute with a 5-second timeout
1126    /// let rows = client
1127    ///     .query_with_timeout(
1128    ///         "SELECT * FROM large_table",
1129    ///         &[],
1130    ///         Duration::from_secs(5),
1131    ///     )
1132    ///     .await?;
1133    /// # let _ = rows;
1134    /// # Ok(())
1135    /// # }
1136    /// ```
1137    pub async fn query_with_timeout<'a>(
1138        &'a mut self,
1139        sql: &str,
1140        params: &[&(dyn crate::ToSql + Sync)],
1141        timeout_duration: std::time::Duration,
1142    ) -> Result<QueryStream<'a>> {
1143        self.query_inner(sql, params, Some(timeout_duration)).await
1144    }
1145
1146    /// Execute a batch that may return multiple result sets.
1147    ///
1148    /// This is useful for stored procedures or SQL batches that contain
1149    /// multiple SELECT statements.
1150    ///
1151    /// # Example
1152    ///
1153    /// ```rust,no_run
1154    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1155    /// // Execute a batch with multiple SELECT statements
1156    /// let mut results = client.query_multiple(
1157    ///     "SELECT 1 AS a; SELECT 2 AS b, 3 AS c;",
1158    ///     &[]
1159    /// ).await?;
1160    ///
1161    /// // Process first result set
1162    /// while let Some(row) = results.next_row().await? {
1163    ///     println!("Result 1: {:?}", row);
1164    /// }
1165    ///
1166    /// // Move to second result set
1167    /// if results.next_result().await? {
1168    ///     while let Some(row) = results.next_row().await? {
1169    ///         println!("Result 2: {:?}", row);
1170    ///     }
1171    /// }
1172    /// # Ok(())
1173    /// # }
1174    /// ```
1175    pub async fn query_multiple<'a>(
1176        &'a mut self,
1177        sql: &str,
1178        params: &[&(dyn crate::ToSql + Sync)],
1179    ) -> Result<MultiResultStream<'a>> {
1180        tracing::debug!(
1181            sql = sql,
1182            params_count = params.len(),
1183            "executing multi-result query"
1184        );
1185
1186        let deadline = self.command_deadline();
1187        let canceller = self.connection_cancel_handle();
1188        let result_sets = run_with_deadline(
1189            async {
1190                if params.is_empty() {
1191                    // Simple batch without parameters - use SQL batch
1192                    self.send_sql_batch(sql).await?;
1193                } else {
1194                    // Parameterized query - sp_executesql (encrypts Always Encrypted params).
1195                    let rpc = self.build_parameterized_rpc(sql, params).await?;
1196                    self.send_rpc(&rpc).await?;
1197                }
1198
1199                // Read all result sets
1200                self.read_multi_result_response().await
1201            },
1202            deadline,
1203            canceller,
1204        )
1205        .await?;
1206        Ok(MultiResultStream::new(result_sets))
1207    }
1208
1209    /// Execute a query that doesn't return rows.
1210    ///
1211    /// Returns the number of affected rows.
1212    pub async fn execute(
1213        &mut self,
1214        sql: &str,
1215        params: &[&(dyn crate::ToSql + Sync)],
1216    ) -> Result<u64> {
1217        let deadline = self.command_deadline();
1218        self.execute_inner(sql, params, deadline).await
1219    }
1220
1221    /// Shared execute implementation with an explicit command deadline.
1222    async fn execute_inner(
1223        &mut self,
1224        sql: &str,
1225        params: &[&(dyn crate::ToSql + Sync)],
1226        deadline: Option<std::time::Duration>,
1227    ) -> Result<u64> {
1228        tracing::debug!(
1229            sql = sql,
1230            params_count = params.len(),
1231            "executing statement"
1232        );
1233
1234        #[cfg(feature = "otel")]
1235        let instrumentation = self.instrumentation.clone();
1236        #[cfg(feature = "otel")]
1237        let mut span = instrumentation.query_span(sql);
1238        #[cfg(feature = "otel")]
1239        let timer = crate::instrumentation::OperationTimer::start(
1240            crate::instrumentation::extract_operation(sql),
1241        );
1242
1243        let canceller = self.cancel_handle();
1244        let result = run_with_deadline(
1245            async {
1246                if params.is_empty() {
1247                    // Simple statement without parameters - use SQL batch
1248                    self.send_sql_batch(sql).await?;
1249                } else {
1250                    // Parameterized statement - sp_executesql (encrypts Always Encrypted params).
1251                    let rpc = self.build_parameterized_rpc(sql, params).await?;
1252                    self.send_rpc(&rpc).await?;
1253                }
1254
1255                // Read response and get row count
1256                self.read_execute_result().await
1257            },
1258            deadline,
1259            canceller,
1260        )
1261        .await;
1262
1263        #[cfg(feature = "otel")]
1264        match &result {
1265            Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
1266            Err(e) => InstrumentationContext::record_error(&mut span, e),
1267        }
1268        #[cfg(feature = "otel")]
1269        timer.finish(instrumentation.metrics(), result.is_ok());
1270
1271        // Drop the span before returning
1272        #[cfg(feature = "otel")]
1273        drop(span);
1274
1275        result
1276    }
1277
1278    /// Execute a statement with a specific timeout.
1279    ///
1280    /// This overrides the default `command_timeout` from the connection configuration
1281    /// for this specific statement. If the statement does not complete within the
1282    /// specified duration, the driver sends an Attention packet to cancel it
1283    /// server-side, drains the acknowledgement, and returns
1284    /// [`Error::CommandTimeout`] with the connection left usable.
1285    ///
1286    /// # Arguments
1287    ///
1288    /// * `sql` - The SQL statement to execute
1289    /// * `params` - Statement parameters
1290    /// * `timeout_duration` - Maximum time to wait for the statement to complete
1291    ///
1292    /// # Example
1293    ///
1294    /// ```rust,no_run
1295    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1296    /// use std::time::Duration;
1297    ///
1298    /// // Execute with a 10-second timeout
1299    /// let rows_affected = client
1300    ///     .execute_with_timeout(
1301    ///         "UPDATE large_table SET status = @p1",
1302    ///         &[&"processed"],
1303    ///         Duration::from_secs(10),
1304    ///     )
1305    ///     .await?;
1306    /// # let _ = rows_affected;
1307    /// # Ok(())
1308    /// # }
1309    /// ```
1310    pub async fn execute_with_timeout(
1311        &mut self,
1312        sql: &str,
1313        params: &[&(dyn crate::ToSql + Sync)],
1314        timeout_duration: std::time::Duration,
1315    ) -> Result<u64> {
1316        self.execute_inner(sql, params, Some(timeout_duration))
1317            .await
1318    }
1319
1320    /// Begin a transaction.
1321    ///
1322    /// This transitions the client from `Ready` to `InTransaction` state.
1323    /// Per MS-TDS spec, the server returns a transaction descriptor in the
1324    /// BeginTransaction EnvChange token that must be included in subsequent
1325    /// ALL_HEADERS sections.
1326    pub async fn begin_transaction(mut self) -> Result<Client<InTransaction>> {
1327        tracing::debug!("beginning transaction");
1328
1329        #[cfg(feature = "otel")]
1330        let instrumentation = self.instrumentation.clone();
1331        #[cfg(feature = "otel")]
1332        let mut span = instrumentation.transaction_span("BEGIN");
1333
1334        // Execute BEGIN TRANSACTION and extract the transaction descriptor
1335        let result = async {
1336            self.send_sql_batch("BEGIN TRANSACTION").await?;
1337            self.read_transaction_begin_result().await
1338        }
1339        .await;
1340
1341        #[cfg(feature = "otel")]
1342        match &result {
1343            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1344            Err(e) => InstrumentationContext::record_error(&mut span, e),
1345        }
1346
1347        // Drop the span before moving instrumentation
1348        #[cfg(feature = "otel")]
1349        drop(span);
1350
1351        let transaction_descriptor = result?;
1352
1353        Ok(Client {
1354            config: self.config,
1355            _state: PhantomData,
1356            connection: self.connection,
1357            server_version: self.server_version,
1358            current_database: self.current_database,
1359            server_collation: self.server_collation,
1360            statement_cache: self.statement_cache,
1361            transaction_descriptor, // Store the descriptor from server
1362            needs_reset: self.needs_reset,
1363            in_flight: self.in_flight,
1364            #[cfg(feature = "otel")]
1365            instrumentation: self.instrumentation,
1366            #[cfg(feature = "always-encrypted")]
1367            encryption_context: self.encryption_context,
1368        })
1369    }
1370
1371    /// Begin a transaction with a specific isolation level.
1372    ///
1373    /// This transitions the client from `Ready` to `InTransaction` state
1374    /// with the specified isolation level.
1375    ///
1376    /// # Example
1377    ///
1378    /// ```rust,no_run
1379    /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1380    /// use mssql_client::IsolationLevel;
1381    ///
1382    /// let tx = client.begin_transaction_with_isolation(IsolationLevel::Serializable).await?;
1383    /// // All operations in this transaction use SERIALIZABLE isolation
1384    /// tx.commit().await?;
1385    /// # Ok(())
1386    /// # }
1387    /// ```
1388    pub async fn begin_transaction_with_isolation(
1389        mut self,
1390        isolation_level: crate::transaction::IsolationLevel,
1391    ) -> Result<Client<InTransaction>> {
1392        tracing::debug!(
1393            isolation_level = %isolation_level.name(),
1394            "beginning transaction with isolation level"
1395        );
1396
1397        #[cfg(feature = "otel")]
1398        let instrumentation = self.instrumentation.clone();
1399        #[cfg(feature = "otel")]
1400        let mut span = instrumentation.transaction_span("BEGIN");
1401
1402        // First set the isolation level
1403        let result = async {
1404            self.send_sql_batch(isolation_level.as_sql()).await?;
1405            self.read_execute_result().await?;
1406
1407            // Then begin the transaction
1408            self.send_sql_batch("BEGIN TRANSACTION").await?;
1409            self.read_transaction_begin_result().await
1410        }
1411        .await;
1412
1413        #[cfg(feature = "otel")]
1414        match &result {
1415            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1416            Err(e) => InstrumentationContext::record_error(&mut span, e),
1417        }
1418
1419        #[cfg(feature = "otel")]
1420        drop(span);
1421
1422        let transaction_descriptor = result?;
1423
1424        Ok(Client {
1425            config: self.config,
1426            _state: PhantomData,
1427            connection: self.connection,
1428            server_version: self.server_version,
1429            current_database: self.current_database,
1430            server_collation: self.server_collation,
1431            statement_cache: self.statement_cache,
1432            transaction_descriptor,
1433            needs_reset: self.needs_reset,
1434            in_flight: self.in_flight,
1435            #[cfg(feature = "otel")]
1436            instrumentation: self.instrumentation,
1437            #[cfg(feature = "always-encrypted")]
1438            encryption_context: self.encryption_context,
1439        })
1440    }
1441
1442    /// Execute a simple query without parameters.
1443    ///
1444    /// This is useful for DDL statements and simple queries where you
1445    /// don't need to retrieve the affected row count.
1446    pub async fn simple_query(&mut self, sql: &str) -> Result<()> {
1447        tracing::debug!(sql = sql, "executing simple query");
1448
1449        // Send SQL batch
1450        self.send_sql_batch(sql).await?;
1451
1452        // Read and discard response
1453        let _ = self.read_execute_result().await?;
1454
1455        Ok(())
1456    }
1457
1458    /// Close the connection gracefully.
1459    pub async fn close(self) -> Result<()> {
1460        tracing::debug!("closing connection");
1461        Ok(())
1462    }
1463
1464    /// Get the current database name.
1465    #[must_use]
1466    pub fn database(&self) -> Option<&str> {
1467        self.config.database.as_deref()
1468    }
1469
1470    /// Get the server host.
1471    #[must_use]
1472    pub fn host(&self) -> &str {
1473        &self.config.host
1474    }
1475
1476    /// Get the server port.
1477    #[must_use]
1478    pub fn port(&self) -> u16 {
1479        self.config.port
1480    }
1481
1482    /// Check if the connection is currently in a transaction.
1483    ///
1484    /// This returns `true` if a transaction was started via raw SQL
1485    /// (`BEGIN TRANSACTION`) and has not yet been committed or rolled back.
1486    ///
1487    /// Note: This only tracks transactions started via raw SQL. Transactions
1488    /// started via the type-state API (`begin_transaction()`) result in a
1489    /// `Client<InTransaction>` which is a different type.
1490    ///
1491    /// # Example
1492    ///
1493    /// ```rust,no_run
1494    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1495    /// client.execute("BEGIN TRANSACTION", &[]).await?;
1496    /// assert!(client.is_in_transaction());
1497    ///
1498    /// client.execute("COMMIT", &[]).await?;
1499    /// assert!(!client.is_in_transaction());
1500    /// # Ok(())
1501    /// # }
1502    /// ```
1503    #[must_use]
1504    pub fn is_in_transaction(&self) -> bool {
1505        self.transaction_descriptor != 0
1506    }
1507
1508    /// Check if a request is in-flight (sent but response not fully read).
1509    ///
1510    /// Used by the connection pool to detect dirty connections that were
1511    /// interrupted mid-query (e.g., by `tokio::select!` or a timeout).
1512    /// A connection with an in-flight request has unread data in the TCP
1513    /// buffer and must be discarded rather than returned to the pool.
1514    #[must_use]
1515    pub fn is_in_flight(&self) -> bool {
1516        self.in_flight
1517    }
1518
1519    /// Report whether an Always Encrypted key-store provider with the given
1520    /// name is currently reachable through this client's encryption context.
1521    ///
1522    /// Returns `false` when the `always-encrypted` feature isn't enabled, when
1523    /// the connection was opened without `column_encryption` configured, or
1524    /// when no matching provider was registered.
1525    #[cfg(feature = "always-encrypted")]
1526    #[must_use]
1527    pub fn has_encryption_provider(&self, name: &str) -> bool {
1528        self.encryption_context
1529            .as_ref()
1530            .is_some_and(|ctx| ctx.has_provider(name))
1531    }
1532
1533    /// Get a handle for cancelling the current query.
1534    ///
1535    /// The cancel handle can be cloned and sent to other tasks, enabling
1536    /// cancellation of long-running queries from a separate async context.
1537    ///
1538    /// # Example
1539    ///
1540    /// ```rust,no_run
1541    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1542    /// use std::time::Duration;
1543    ///
1544    /// let cancel_handle = client.cancel_handle();
1545    ///
1546    /// // Spawn a task to cancel after 10 seconds
1547    /// let handle = tokio::spawn(async move {
1548    ///     tokio::time::sleep(Duration::from_secs(10)).await;
1549    ///     let _ = cancel_handle.cancel().await;
1550    /// });
1551    ///
1552    /// // This query will be cancelled if it runs longer than 10 seconds
1553    /// let result = client.query("SELECT * FROM very_large_table", &[]).await;
1554    /// # let _ = (handle, result);
1555    /// # Ok(())
1556    /// # }
1557    /// ```
1558    #[must_use]
1559    pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
1560        self.connection_cancel_handle()
1561    }
1562}
1563
1564/// # Drop Behavior
1565///
1566/// **`Client<InTransaction>` has no automatic rollback on drop.** If the client is
1567/// dropped without calling [`commit()`](Client::commit) or [`rollback()`](Client::rollback),
1568/// the transaction remains open on the server until the TCP connection closes
1569/// (at which point SQL Server automatically rolls back).
1570///
1571/// This is because `Drop` is synchronous and cannot perform the async I/O needed
1572/// to send a `ROLLBACK TRANSACTION` command.
1573///
1574/// ## Consequences of dropping without commit/rollback
1575///
1576/// - **Direct connections:** The transaction leaks until the OS TCP timeout
1577///   (potentially 30+ minutes), holding locks on any modified rows.
1578/// - **Pooled connections:** The pool detects the active transaction descriptor
1579///   and discards the connection rather than returning it to the idle pool
1580///   (see `PooledConnection::drop` in `mssql-driver-pool`).
1581///
1582/// ## Best practice
1583///
1584/// Always ensure `commit()` or `rollback()` is called. Use helper patterns
1585/// for error paths:
1586///
1587/// ```rust,no_run
1588/// # async fn do_work(_: &mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> { Ok(()) }
1589/// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1590/// let tx = client.begin_transaction().await?;
1591/// match do_work(&tx).await {
1592///     Ok(_) => { tx.commit().await?; }
1593///     Err(e) => { tx.rollback().await?; return Err(e); }
1594/// }
1595/// # Ok(())
1596/// # }
1597/// ```
1598impl Client<InTransaction> {
1599    /// Execute a query within the transaction and return a streaming result set.
1600    ///
1601    /// See [`Client<Ready>::query`] for usage examples.
1602    pub async fn query<'a>(
1603        &'a mut self,
1604        sql: &str,
1605        params: &[&(dyn crate::ToSql + Sync)],
1606    ) -> Result<QueryStream<'a>> {
1607        let deadline = self.command_deadline();
1608        self.query_inner(sql, params, deadline).await
1609    }
1610
1611    /// Shared query implementation with an explicit command deadline.
1612    async fn query_inner<'a>(
1613        &'a mut self,
1614        sql: &str,
1615        params: &[&(dyn crate::ToSql + Sync)],
1616        deadline: Option<std::time::Duration>,
1617    ) -> Result<QueryStream<'a>> {
1618        tracing::debug!(
1619            sql = sql,
1620            params_count = params.len(),
1621            "executing query in transaction"
1622        );
1623
1624        #[cfg(feature = "otel")]
1625        let instrumentation = self.instrumentation.clone();
1626        #[cfg(feature = "otel")]
1627        let mut span = instrumentation.query_span(sql);
1628        #[cfg(feature = "otel")]
1629        let timer = crate::instrumentation::OperationTimer::start(
1630            crate::instrumentation::extract_operation(sql),
1631        );
1632
1633        let canceller = self.cancel_handle();
1634        let result = run_with_deadline(
1635            async {
1636                if params.is_empty() {
1637                    // Simple query without parameters - use SQL batch
1638                    self.send_sql_batch(sql).await?;
1639                } else {
1640                    // Parameterized query - sp_executesql (encrypts Always Encrypted params).
1641                    let rpc = self.build_parameterized_rpc(sql, params).await?;
1642                    self.send_rpc(&rpc).await?;
1643                }
1644
1645                // Read complete response including columns and rows
1646                self.read_query_response().await
1647            },
1648            deadline,
1649            canceller,
1650        )
1651        .await;
1652
1653        #[cfg(feature = "otel")]
1654        match &result {
1655            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1656            Err(e) => InstrumentationContext::record_error(&mut span, e),
1657        }
1658        #[cfg(feature = "otel")]
1659        timer.finish(instrumentation.metrics(), result.is_ok());
1660
1661        // Drop the span before returning
1662        #[cfg(feature = "otel")]
1663        drop(span);
1664
1665        let resp = result?;
1666        #[cfg(feature = "always-encrypted")]
1667        {
1668            Ok(QueryStream::from_raw(
1669                resp.columns,
1670                resp.pending_rows,
1671                resp.meta,
1672                resp.decryptor,
1673            ))
1674        }
1675        #[cfg(not(feature = "always-encrypted"))]
1676        {
1677            Ok(QueryStream::from_raw(
1678                resp.columns,
1679                resp.pending_rows,
1680                resp.meta,
1681            ))
1682        }
1683    }
1684
1685    /// Execute a statement within the transaction.
1686    ///
1687    /// Returns the number of affected rows.
1688    pub async fn execute(
1689        &mut self,
1690        sql: &str,
1691        params: &[&(dyn crate::ToSql + Sync)],
1692    ) -> Result<u64> {
1693        let deadline = self.command_deadline();
1694        self.execute_inner(sql, params, deadline).await
1695    }
1696
1697    /// Shared execute implementation with an explicit command deadline.
1698    async fn execute_inner(
1699        &mut self,
1700        sql: &str,
1701        params: &[&(dyn crate::ToSql + Sync)],
1702        deadline: Option<std::time::Duration>,
1703    ) -> Result<u64> {
1704        tracing::debug!(
1705            sql = sql,
1706            params_count = params.len(),
1707            "executing statement in transaction"
1708        );
1709
1710        #[cfg(feature = "otel")]
1711        let instrumentation = self.instrumentation.clone();
1712        #[cfg(feature = "otel")]
1713        let mut span = instrumentation.query_span(sql);
1714        #[cfg(feature = "otel")]
1715        let timer = crate::instrumentation::OperationTimer::start(
1716            crate::instrumentation::extract_operation(sql),
1717        );
1718
1719        let canceller = self.cancel_handle();
1720        let result = run_with_deadline(
1721            async {
1722                if params.is_empty() {
1723                    // Simple statement without parameters - use SQL batch
1724                    self.send_sql_batch(sql).await?;
1725                } else {
1726                    // Parameterized statement - sp_executesql (encrypts Always Encrypted params).
1727                    let rpc = self.build_parameterized_rpc(sql, params).await?;
1728                    self.send_rpc(&rpc).await?;
1729                }
1730
1731                // Read response and get row count
1732                self.read_execute_result().await
1733            },
1734            deadline,
1735            canceller,
1736        )
1737        .await;
1738
1739        #[cfg(feature = "otel")]
1740        match &result {
1741            Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
1742            Err(e) => InstrumentationContext::record_error(&mut span, e),
1743        }
1744        #[cfg(feature = "otel")]
1745        timer.finish(instrumentation.metrics(), result.is_ok());
1746
1747        // Drop the span before returning
1748        #[cfg(feature = "otel")]
1749        drop(span);
1750
1751        result
1752    }
1753
1754    /// Execute a query within the transaction with a specific timeout.
1755    ///
1756    /// See [`Client<Ready>::query_with_timeout`] for details.
1757    pub async fn query_with_timeout<'a>(
1758        &'a mut self,
1759        sql: &str,
1760        params: &[&(dyn crate::ToSql + Sync)],
1761        timeout_duration: std::time::Duration,
1762    ) -> Result<QueryStream<'a>> {
1763        self.query_inner(sql, params, Some(timeout_duration)).await
1764    }
1765
1766    /// Execute a statement within the transaction with a specific timeout.
1767    ///
1768    /// See [`Client<Ready>::execute_with_timeout`] for details.
1769    pub async fn execute_with_timeout(
1770        &mut self,
1771        sql: &str,
1772        params: &[&(dyn crate::ToSql + Sync)],
1773        timeout_duration: std::time::Duration,
1774    ) -> Result<u64> {
1775        self.execute_inner(sql, params, Some(timeout_duration))
1776            .await
1777    }
1778
1779    /// Open a FILESTREAM BLOB for async reading and/or writing.
1780    ///
1781    /// This method queries the server for the transaction context, then opens
1782    /// the FILESTREAM handle using the native Win32 `OpenSqlFilestream` API.
1783    ///
1784    /// # Arguments
1785    ///
1786    /// * `path` — The UNC path obtained from the T-SQL `column.PathName()` function.
1787    ///   Query this yourself before calling `open_filestream`:
1788    ///   ```sql
1789    ///   SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1
1790    ///   ```
1791    /// * `access` — Read, write, or read/write access mode.
1792    ///
1793    /// # Requirements
1794    ///
1795    /// - SQL Server must have FILESTREAM enabled (`sp_configure 'filestream access level', 2`)
1796    /// - The Microsoft OLE DB Driver for SQL Server must be installed on the client
1797    /// - The `FileStream` must be dropped before calling [`commit`] or [`rollback`]
1798    ///
1799    /// # Example
1800    ///
1801    /// ```text
1802    /// use mssql_client::FileStreamAccess;
1803    /// use tokio::io::AsyncReadExt;
1804    ///
1805    /// let mut tx = client.begin_transaction().await?;
1806    ///
1807    /// // Get the FILESTREAM path
1808    /// let rows = tx.query(
1809    ///     "SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1",
1810    ///     &[&doc_id],
1811    /// ).await?;
1812    /// let path: String = rows.into_iter().next().unwrap()?.get(0)?;
1813    ///
1814    /// // Open and read the BLOB
1815    /// let mut stream = tx.open_filestream(&path, FileStreamAccess::Read).await?;
1816    /// let mut data = Vec::new();
1817    /// stream.read_to_end(&mut data).await?;
1818    /// drop(stream);
1819    ///
1820    /// tx.commit().await?;
1821    /// ```
1822    #[cfg(all(windows, feature = "filestream"))]
1823    pub async fn open_filestream(
1824        &mut self,
1825        path: &str,
1826        access: crate::filestream::FileStreamAccess,
1827    ) -> Result<crate::filestream::FileStream> {
1828        tracing::debug!(path = path, ?access, "opening FILESTREAM BLOB");
1829
1830        // Get the transaction context from SQL Server.
1831        // This binds the file access to the current SQL transaction.
1832        let txn_context: Vec<u8> = {
1833            let rows = self
1834                .query("SELECT GET_FILESTREAM_TRANSACTION_CONTEXT()", &[])
1835                .await?;
1836            let mut ctx = None;
1837            for result in rows {
1838                let row = result?;
1839                ctx = Some(row.get::<Vec<u8>>(0)?);
1840            }
1841            ctx.ok_or_else(|| {
1842                Error::FileStream("GET_FILESTREAM_TRANSACTION_CONTEXT() returned no rows".into())
1843            })?
1844        };
1845
1846        crate::filestream::FileStream::open(path, access, &txn_context)
1847    }
1848
1849    /// Commit the transaction.
1850    ///
1851    /// This transitions the client back to `Ready` state.
1852    pub async fn commit(mut self) -> Result<Client<Ready>> {
1853        tracing::debug!("committing transaction");
1854
1855        #[cfg(feature = "otel")]
1856        let instrumentation = self.instrumentation.clone();
1857        #[cfg(feature = "otel")]
1858        let mut span = instrumentation.transaction_span("COMMIT");
1859
1860        // Execute COMMIT TRANSACTION
1861        let result = async {
1862            self.send_sql_batch("COMMIT TRANSACTION").await?;
1863            self.read_execute_result().await
1864        }
1865        .await;
1866
1867        #[cfg(feature = "otel")]
1868        match &result {
1869            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1870            Err(e) => InstrumentationContext::record_error(&mut span, e),
1871        }
1872
1873        // Drop the span before moving instrumentation
1874        #[cfg(feature = "otel")]
1875        drop(span);
1876
1877        result?;
1878
1879        Ok(Client {
1880            config: self.config,
1881            _state: PhantomData,
1882            connection: self.connection,
1883            server_version: self.server_version,
1884            current_database: self.current_database,
1885            server_collation: self.server_collation,
1886            statement_cache: self.statement_cache,
1887            transaction_descriptor: 0, // Reset to auto-commit mode
1888            needs_reset: self.needs_reset,
1889            in_flight: self.in_flight,
1890            #[cfg(feature = "otel")]
1891            instrumentation: self.instrumentation,
1892            #[cfg(feature = "always-encrypted")]
1893            encryption_context: self.encryption_context,
1894        })
1895    }
1896
1897    /// Rollback the transaction.
1898    ///
1899    /// This transitions the client back to `Ready` state.
1900    pub async fn rollback(mut self) -> Result<Client<Ready>> {
1901        tracing::debug!("rolling back transaction");
1902
1903        #[cfg(feature = "otel")]
1904        let instrumentation = self.instrumentation.clone();
1905        #[cfg(feature = "otel")]
1906        let mut span = instrumentation.transaction_span("ROLLBACK");
1907
1908        // Execute ROLLBACK TRANSACTION
1909        let result = async {
1910            self.send_sql_batch("ROLLBACK TRANSACTION").await?;
1911            self.read_execute_result().await
1912        }
1913        .await;
1914
1915        #[cfg(feature = "otel")]
1916        match &result {
1917            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1918            Err(e) => InstrumentationContext::record_error(&mut span, e),
1919        }
1920
1921        // Drop the span before moving instrumentation
1922        #[cfg(feature = "otel")]
1923        drop(span);
1924
1925        result?;
1926
1927        Ok(Client {
1928            config: self.config,
1929            _state: PhantomData,
1930            connection: self.connection,
1931            server_version: self.server_version,
1932            current_database: self.current_database,
1933            server_collation: self.server_collation,
1934            statement_cache: self.statement_cache,
1935            transaction_descriptor: 0, // Reset to auto-commit mode
1936            needs_reset: self.needs_reset,
1937            in_flight: self.in_flight,
1938            #[cfg(feature = "otel")]
1939            instrumentation: self.instrumentation,
1940            #[cfg(feature = "always-encrypted")]
1941            encryption_context: self.encryption_context,
1942        })
1943    }
1944
1945    /// Create a savepoint and return a handle for later rollback.
1946    ///
1947    /// The returned `SavePoint` handle contains the validated savepoint name.
1948    /// Use it with `rollback_to()` to partially undo transaction work.
1949    ///
1950    /// # Example
1951    ///
1952    /// ```rust,no_run
1953    /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1954    /// let mut tx = client.begin_transaction().await?;
1955    /// tx.execute("INSERT INTO orders ...", &[]).await?;
1956    /// let sp = tx.save_point("before_items").await?;
1957    /// tx.execute("INSERT INTO items ...", &[]).await?;
1958    /// // Oops, rollback just the items
1959    /// tx.rollback_to(&sp).await?;
1960    /// tx.commit().await?;
1961    /// # Ok(())
1962    /// # }
1963    /// ```
1964    pub async fn save_point(&mut self, name: &str) -> Result<SavePoint> {
1965        crate::validation::validate_identifier(name)?;
1966        tracing::debug!(name = name, "creating savepoint");
1967
1968        // Execute SAVE TRANSACTION <name>
1969        // Note: name is validated by validate_identifier() to prevent SQL injection
1970        let sql = format!("SAVE TRANSACTION {name}");
1971        self.send_sql_batch(&sql).await?;
1972        self.read_execute_result().await?;
1973
1974        Ok(SavePoint::new(name.to_string()))
1975    }
1976
1977    /// Rollback to a savepoint.
1978    ///
1979    /// This rolls back all changes made after the savepoint was created,
1980    /// but keeps the transaction active. The savepoint remains valid and
1981    /// can be rolled back to again.
1982    ///
1983    /// # Example
1984    ///
1985    /// ```rust,no_run
1986    /// # async fn ex(mut tx: mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> {
1987    /// let sp = tx.save_point("checkpoint").await?;
1988    /// // ... do some work ...
1989    /// tx.rollback_to(&sp).await?;  // Undo changes since checkpoint
1990    /// // Transaction is still active, savepoint is still valid
1991    /// # Ok(())
1992    /// # }
1993    /// ```
1994    pub async fn rollback_to(&mut self, savepoint: &SavePoint) -> Result<()> {
1995        tracing::debug!(name = savepoint.name(), "rolling back to savepoint");
1996
1997        // Execute ROLLBACK TRANSACTION <name>
1998        // Note: savepoint name was validated during creation
1999        let sql = format!("ROLLBACK TRANSACTION {}", savepoint.name());
2000        self.send_sql_batch(&sql).await?;
2001        self.read_execute_result().await?;
2002
2003        Ok(())
2004    }
2005
2006    /// Release a savepoint (optional cleanup).
2007    ///
2008    /// Note: SQL Server doesn't have explicit savepoint release, but this
2009    /// method is provided for API completeness. The savepoint is automatically
2010    /// released when the transaction commits or rolls back.
2011    pub async fn release_savepoint(&mut self, savepoint: SavePoint) -> Result<()> {
2012        tracing::debug!(name = savepoint.name(), "releasing savepoint");
2013
2014        // SQL Server doesn't require explicit savepoint release
2015        // The savepoint is implicitly released on commit/rollback
2016        // This method exists for API completeness
2017        drop(savepoint);
2018        Ok(())
2019    }
2020
2021    /// Get a handle for cancelling the current query within the transaction.
2022    ///
2023    /// See [`Client<Ready>::cancel_handle`] for usage examples.
2024    #[must_use]
2025    pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
2026        self.connection_cancel_handle()
2027    }
2028}
2029
2030impl<S: ConnectionState> std::fmt::Debug for Client<S> {
2031    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2032        f.debug_struct("Client")
2033            .field("host", &self.config.host)
2034            .field("port", &self.config.port)
2035            .field("database", &self.config.database)
2036            .finish()
2037    }
2038}