Skip to main content

mssql_client/
client.rs

1//! SQL Server client implementation.
2//!
3//! ## DDL and statement routing
4//!
5//! [`Client::execute`] routes automatically by parameter count: with no
6//! parameters it sends a SQL batch (which permits DDL such as `CREATE` / `ALTER`
7//! / `DROP`); with parameters it uses `sp_executesql`, whose procedure context
8//! SQL Server forbids DDL in. Run DDL with an empty parameter slice:
9//!
10//! ```rust,no_run
11//! # async fn create_table(config: mssql_client::Config) -> Result<(), mssql_client::Error> {
12//! # let mut client = mssql_client::Client::connect(config).await?;
13//! client.execute("CREATE TABLE dbo.t (id INT)", &[]).await?;
14//! # Ok(())
15//! # }
16//! ```
17//!
18//! Use [`Client::simple_query`] for fire-and-forget batches (including
19//! multi-statement, `;`-separated DDL) when you don't need the affected-row count.
20
21// Allow unwrap/expect for chrono date construction with known-valid constant dates
22// and for regex patterns that are compile-time constants
23#![allow(clippy::unwrap_used, clippy::expect_used, clippy::needless_range_loop)]
24
25mod connect;
26mod params;
27mod response;
28
29use std::marker::PhantomData;
30
31use mssql_codec::connection::Connection;
32#[cfg(feature = "tls")]
33use mssql_tls::TlsStream;
34use tds_protocol::packet::PacketType;
35use tds_protocol::rpc::RpcRequest;
36use tds_protocol::token::{EnvChange, EnvChangeType};
37use tokio::net::TcpStream;
38
39use crate::config::Config;
40use crate::error::{Error, Result};
41#[cfg(feature = "otel")]
42use crate::instrumentation::InstrumentationContext;
43use crate::state::{ConnectionState, InTransaction, Ready};
44use crate::statement_cache::StatementCache;
45use crate::stream::{MultiResultStream, QueryStream};
46use crate::transaction::SavePoint;
47
48/// How long to wait for the server to acknowledge an Attention packet after
49/// a command timeout. SqlClient waits 5 seconds before dooming the
50/// connection; we match it.
51const ATTENTION_ACK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
52
53/// Run a network future under an optional command deadline.
54///
55/// On timeout this sends an Attention packet via `canceller` and then awaits
56/// the future so its own read loop drains the server's DONE_ATTN
57/// acknowledgement, leaving the connection clean before returning
58/// [`Error::CommandTimeout`]. This is the cancel-safe alternative to dropping
59/// the future (e.g. via `tokio::time::timeout`), which would leave unconsumed
60/// TDS data in the connection buffer and desync the next request.
61///
62/// The drain itself is bounded by [`ATTENTION_ACK_TIMEOUT`] — a hung server
63/// that never acknowledges the attention must not turn the timeout into an
64/// infinite wait. When the bound expires the connection is abandoned
65/// mid-response: `in_flight` stays set, so the pool discards the connection
66/// at check-in instead of reusing it.
67pub(crate) async fn run_with_deadline<F, T>(
68    fut: F,
69    deadline: Option<std::time::Duration>,
70    canceller: crate::cancel::CancelHandle,
71) -> Result<T>
72where
73    F: std::future::Future<Output = Result<T>>,
74{
75    let Some(d) = deadline else {
76        return fut.await;
77    };
78    tokio::pin!(fut);
79    tokio::select! {
80        biased;
81        res = &mut fut => res,
82        () = tokio::time::sleep(d) => {
83            // Signal cancellation, then let the in-flight read consume the
84            // server's attention acknowledgement so the connection stays usable.
85            let drain = async {
86                let _ = canceller.cancel().await;
87                let _ = (&mut fut).await;
88            };
89            if tokio::time::timeout(ATTENTION_ACK_TIMEOUT, drain).await.is_err() {
90                tracing::warn!(
91                    timeout = ?ATTENTION_ACK_TIMEOUT,
92                    "server did not acknowledge attention; abandoning the connection as dirty"
93                );
94            }
95            Err(Error::CommandTimeout)
96        }
97    }
98}
99
100/// SQL Server client with type-state connection management.
101///
102/// The generic parameter `S` represents the current connection state,
103/// ensuring at compile time that certain operations are only available
104/// in appropriate states.
105pub struct Client<S: ConnectionState> {
106    config: Config,
107    _state: PhantomData<S>,
108    /// The underlying connection (present only when connected)
109    connection: Option<ConnectionHandle>,
110    /// Server version from LoginAck (raw u32 TDS version)
111    server_version: Option<u32>,
112    /// Current database from EnvChange
113    current_database: Option<String>,
114    /// Server's default collation from SqlCollation EnvChange during login.
115    /// Used when `SendStringParametersAsUnicode=false` to encode VARCHAR
116    /// parameters with the correct character encoding and collation bytes.
117    server_collation: Option<tds_protocol::token::Collation>,
118    /// Prepared statement cache for query optimization
119    statement_cache: StatementCache,
120    /// Transaction descriptor from BeginTransaction EnvChange.
121    /// Per MS-TDS spec, this value must be included in ALL_HEADERS for subsequent
122    /// requests within an explicit transaction. 0 indicates auto-commit mode.
123    transaction_descriptor: u64,
124    /// Whether a request has been sent and the response has not yet been fully read.
125    /// Used by the connection pool to detect dirty connections after cancel/timeout.
126    in_flight: bool,
127    /// Whether this connection needs a reset on next use.
128    /// Set by connection pool on checkin, cleared after first query/execute.
129    /// When true, the RESETCONNECTION flag is set on the first TDS packet.
130    needs_reset: bool,
131    /// OpenTelemetry instrumentation context (when otel feature is enabled)
132    #[cfg(feature = "otel")]
133    instrumentation: InstrumentationContext,
134    /// Always Encrypted context for column decryption (when always-encrypted feature is enabled)
135    #[cfg(feature = "always-encrypted")]
136    pub(crate) encryption_context: Option<std::sync::Arc<crate::encryption::EncryptionContext>>,
137}
138
139/// Internal connection handle wrapping the actual connection.
140///
141/// This is an enum to support different connection types:
142/// - TLS (TDS 8.0 strict mode) - requires `tls` feature
143/// - TLS with PreLogin wrapping (TDS 7.x style) - requires `tls` feature
144/// - Plain TCP (for internal networks or when `tls` feature is disabled)
145#[allow(dead_code)] // Connection will be used once query execution is implemented
146enum ConnectionHandle {
147    /// TLS connection (TDS 8.0 strict mode - TLS before any TDS traffic)
148    #[cfg(feature = "tls")]
149    Tls(Connection<TlsStream<TcpStream>>),
150    /// TLS connection with PreLogin wrapping (TDS 7.x style)
151    #[cfg(feature = "tls")]
152    TlsPrelogin(Connection<TlsStream<mssql_tls::TlsPreloginWrapper<TcpStream>>>),
153    /// Plain TCP connection (for internal networks or when `tls` feature is disabled)
154    Plain(Connection<TcpStream>),
155}
156
157// Private helper methods available to all connection states
158impl<S: ConnectionState> Client<S> {
159    /// The default per-command deadline from `command_timeout`.
160    ///
161    /// Returns `None` when `command_timeout` is zero, which means "no limit"
162    /// (matching ADO.NET's `SqlCommand.CommandTimeout = 0`).
163    pub(crate) fn command_deadline(&self) -> Option<std::time::Duration> {
164        let t = self.config.command_timeout;
165        if t.is_zero() { None } else { Some(t) }
166    }
167
168    /// Build a cancel handle for the current connection, regardless of
169    /// connection state. The public, documented surface is
170    /// [`Client::<Ready>::cancel_handle`]; both state-specific methods
171    /// delegate here.
172    pub(crate) fn connection_cancel_handle(&self) -> crate::cancel::CancelHandle {
173        let connection = self
174            .connection
175            .as_ref()
176            .expect("connection should be present");
177        match connection {
178            #[cfg(feature = "tls")]
179            ConnectionHandle::Tls(conn) => {
180                crate::cancel::CancelHandle::from_tls(conn.cancel_handle())
181            }
182            #[cfg(feature = "tls")]
183            ConnectionHandle::TlsPrelogin(conn) => {
184                crate::cancel::CancelHandle::from_tls_prelogin(conn.cancel_handle())
185            }
186            ConnectionHandle::Plain(conn) => {
187                crate::cancel::CancelHandle::from_plain(conn.cancel_handle())
188            }
189        }
190    }
191
192    /// Process transaction-related EnvChange tokens.
193    ///
194    /// This handles BeginTransaction, CommitTransaction, and RollbackTransaction
195    /// EnvChange tokens, updating the transaction descriptor accordingly.
196    ///
197    /// This enables executing BEGIN TRANSACTION, COMMIT, and ROLLBACK via raw SQL
198    /// while still having the transaction descriptor tracked correctly.
199    fn process_transaction_env_change(env: &EnvChange, transaction_descriptor: &mut u64) {
200        use tds_protocol::token::EnvChangeValue;
201
202        match env.env_type {
203            EnvChangeType::BeginTransaction => {
204                if let EnvChangeValue::Binary(ref data) = env.new_value {
205                    if data.len() >= 8 {
206                        let descriptor = u64::from_le_bytes([
207                            data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
208                        ]);
209                        tracing::debug!(descriptor = descriptor, "transaction started via raw SQL");
210                        *transaction_descriptor = descriptor;
211                    }
212                }
213            }
214            EnvChangeType::CommitTransaction | EnvChangeType::RollbackTransaction => {
215                tracing::debug!(
216                    env_type = ?env.env_type,
217                    "transaction ended via raw SQL"
218                );
219                *transaction_descriptor = 0;
220            }
221            _ => {}
222        }
223    }
224
225    /// Send a SQL batch to the server.
226    ///
227    /// Uses the client's current transaction descriptor in ALL_HEADERS.
228    /// Per MS-TDS spec, when in an explicit transaction, the descriptor
229    /// returned by BeginTransaction must be included.
230    ///
231    /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
232    /// is included in the first packet to reset connection state.
233    async fn send_sql_batch(&mut self, sql: &str) -> Result<()> {
234        let payload =
235            tds_protocol::encode_sql_batch_with_transaction(sql, self.transaction_descriptor);
236        let max_packet = self.config.packet_size as usize;
237
238        // Check if we need to reset the connection on this request
239        let reset = self.needs_reset;
240        if reset {
241            self.needs_reset = false; // Clear flag before sending
242            tracing::debug!("sending SQL batch with RESETCONNECTION flag");
243        }
244
245        self.in_flight = true;
246        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
247
248        match connection {
249            #[cfg(feature = "tls")]
250            ConnectionHandle::Tls(conn) => {
251                conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
252                    .await?;
253            }
254            #[cfg(feature = "tls")]
255            ConnectionHandle::TlsPrelogin(conn) => {
256                conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
257                    .await?;
258            }
259            ConnectionHandle::Plain(conn) => {
260                conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
261                    .await?;
262            }
263        }
264
265        Ok(())
266    }
267
268    /// Send an RPC request to the server.
269    ///
270    /// Uses the client's current transaction descriptor in ALL_HEADERS.
271    ///
272    /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
273    /// is included in the first packet to reset connection state.
274    pub(crate) async fn send_rpc(&mut self, rpc: &RpcRequest) -> Result<()> {
275        let payload = rpc.encode_with_transaction(self.transaction_descriptor);
276        let max_packet = self.config.packet_size as usize;
277
278        // Check if we need to reset the connection on this request
279        let reset = self.needs_reset;
280        if reset {
281            self.needs_reset = false; // Clear flag before sending
282            tracing::debug!("sending RPC with RESETCONNECTION flag");
283        }
284
285        self.in_flight = true;
286        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
287
288        match connection {
289            #[cfg(feature = "tls")]
290            ConnectionHandle::Tls(conn) => {
291                conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
292                    .await?;
293            }
294            #[cfg(feature = "tls")]
295            ConnectionHandle::TlsPrelogin(conn) => {
296                conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
297                    .await?;
298            }
299            ConnectionHandle::Plain(conn) => {
300                conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
301                    .await?;
302            }
303        }
304
305        Ok(())
306    }
307
308    /// Start building a stored procedure call with full control over parameters.
309    ///
310    /// Returns a [`crate::procedure::ProcedureBuilder`] that allows adding named input and output
311    /// parameters before executing the call.
312    ///
313    /// The procedure name is validated to prevent SQL injection. It may be
314    /// schema-qualified (e.g., `"dbo.MyProc"`).
315    ///
316    /// # Example
317    ///
318    /// ```rust,no_run
319    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
320    /// let result = client.procedure("dbo.CalculateSum")?
321    ///     .input("@a", &10i32)
322    ///     .input("@b", &20i32)
323    ///     .output_int("@result")
324    ///     .execute().await?;
325    ///
326    /// let sum = result.get_output("@result").unwrap();
327    /// # let _ = sum;
328    /// # Ok(())
329    /// # }
330    /// ```
331    pub fn procedure(
332        &mut self,
333        proc_name: &str,
334    ) -> Result<crate::procedure::ProcedureBuilder<'_, S>> {
335        crate::validation::validate_qualified_identifier(proc_name)?;
336        Ok(crate::procedure::ProcedureBuilder::new(self, proc_name))
337    }
338
339    /// Execute a stored procedure with positional input parameters.
340    ///
341    /// This is a convenience method for the common case of calling a procedure
342    /// with input-only parameters. For output parameters or named parameters,
343    /// use [`procedure()`](Client::procedure) instead.
344    ///
345    /// # Example
346    ///
347    /// ```rust,no_run
348    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
349    /// let result = client.call_procedure("dbo.GetUser", &[&1i32]).await?;
350    /// assert_eq!(result.return_value, 0);
351    ///
352    /// if let Some(rs) = result.first_result_set() {
353    ///     println!("columns: {:?}", rs.columns());
354    /// }
355    /// # Ok(())
356    /// # }
357    /// ```
358    pub async fn call_procedure(
359        &mut self,
360        proc_name: &str,
361        params: &[&(dyn crate::ToSql + Sync)],
362    ) -> Result<crate::stream::ProcedureResult> {
363        crate::validation::validate_qualified_identifier(proc_name)?;
364
365        tracing::debug!(
366            proc_name = proc_name,
367            params_count = params.len(),
368            "executing stored procedure"
369        );
370
371        let rpc_params =
372            Self::convert_params_positional(params, self.send_unicode(), self.server_collation())?;
373        let mut rpc = RpcRequest::named(proc_name);
374        for param in rpc_params {
375            rpc = rpc.param(param);
376        }
377
378        let deadline = self.command_deadline();
379        let canceller = self.connection_cancel_handle();
380        run_with_deadline(
381            async {
382                self.send_rpc(&rpc).await?;
383                self.read_procedure_result().await
384            },
385            deadline,
386            canceller,
387        )
388        .await
389    }
390
391    /// Start a bulk insert operation for the specified table.
392    ///
393    /// Sends the `INSERT BULK` statement to the server and returns a
394    /// [`crate::bulk::BulkWriter`] for streaming rows. The writer holds
395    /// a mutable borrow on the client, preventing other operations while
396    /// the bulk insert is in progress.
397    ///
398    /// # Example
399    ///
400    /// ```rust,no_run
401    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
402    /// use mssql_client::{BulkInsertBuilder, BulkColumn, SqlValue};
403    ///
404    /// let builder = BulkInsertBuilder::new("dbo.Users")
405    ///     .with_typed_columns(vec![
406    ///         BulkColumn::new("id", "INT", 0)?,
407    ///         BulkColumn::new("name", "NVARCHAR(100)", 1)?,
408    ///     ]);
409    ///
410    /// let mut writer = client.bulk_insert(&builder).await?;
411    /// writer.send_row_values(&[SqlValue::Int(1), SqlValue::String("Alice".into())])?;
412    /// writer.send_row_values(&[SqlValue::Int(2), SqlValue::String("Bob".into())])?;
413    /// let result = writer.finish().await?;
414    /// println!("Inserted {} rows", result.rows_affected);
415    /// # Ok(())
416    /// # }
417    /// ```
418    pub async fn bulk_insert(
419        &mut self,
420        builder: &crate::bulk::BulkInsertBuilder,
421    ) -> Result<crate::bulk::BulkWriter<'_, S>> {
422        use tds_protocol::token::{ColMetaData, Token};
423
424        tracing::debug!(
425            table = builder.table_name(),
426            columns = builder.columns().len(),
427            "starting bulk insert"
428        );
429
430        // Step 1: Query the server for column metadata.
431        // This gives us the exact type encoding the server expects for BulkLoad,
432        // following the pattern established by Tiberius.
433        let meta_query = format!("SELECT TOP 0 * FROM {}", builder.table_name());
434        self.send_sql_batch(&meta_query).await?;
435
436        let message = self.read_response_message().await?;
437        self.in_flight = false;
438
439        // Capture both the raw COLMETADATA bytes and parsed column info
440        let raw_payload = message.payload.clone();
441        let mut parser = self.create_parser(message.payload);
442        let mut server_metadata: Option<ColMetaData> = None;
443        let mut meta_start: usize = 0;
444        let mut meta_end: usize = 0;
445
446        loop {
447            let pos_before = raw_payload.len() - parser.remaining();
448            let token = parser.next_token_with_metadata(server_metadata.as_ref())?;
449            let pos_after = raw_payload.len() - parser.remaining();
450            let Some(token) = token else { break };
451
452            match token {
453                Token::ColMetaData(meta) => {
454                    meta_start = pos_before;
455                    meta_end = pos_after;
456                    server_metadata = Some(meta);
457                }
458                Token::Done(_) => break,
459                _ => {}
460            }
461        }
462
463        // Reject deprecated TEXT/NTEXT/IMAGE columns reported by the server.
464        // These types require a legacy TEXTPTR wire format that this driver
465        // does not support — users should migrate the column to VARCHAR(MAX) /
466        // NVARCHAR(MAX) / VARBINARY(MAX).
467        if let Some(ref meta) = server_metadata {
468            use tds_protocol::types::TypeId;
469            for col in meta.columns.iter() {
470                let (rejected, replacement) = match col.type_id {
471                    TypeId::Text => (Some("TEXT"), "VARCHAR(MAX)"),
472                    TypeId::NText => (Some("NTEXT"), "NVARCHAR(MAX)"),
473                    TypeId::Image => (Some("IMAGE"), "VARBINARY(MAX)"),
474                    _ => (None, ""),
475                };
476                if let Some(sql_type) = rejected {
477                    return Err(Error::from(mssql_types::TypeError::UnsupportedType {
478                        sql_type: sql_type.to_string(),
479                        reason: format!(
480                            "column `{}` in table `{}` is {} — TEXT/NTEXT/IMAGE \
481                             are not supported. Alter the column to {} instead \
482                             (Microsoft deprecated TEXT/NTEXT/IMAGE in SQL \
483                             Server 2005).",
484                            col.name,
485                            builder.table_name(),
486                            sql_type,
487                            replacement,
488                        ),
489                    }));
490                }
491            }
492        }
493
494        // Step 2: Send INSERT BULK statement to put server in bulk load mode
495        let stmt = builder.build_insert_bulk_statement()?;
496        self.send_sql_batch(&stmt).await?;
497        self.read_execute_result().await?;
498
499        // Step 3: Create bulk writer with server's metadata
500        let raw_meta = if meta_end > meta_start {
501            Some(raw_payload.slice(meta_start..meta_end))
502        } else {
503            None
504        };
505
506        let server_cols = server_metadata.as_ref().map(|m| m.columns.as_slice());
507        let bulk = crate::bulk::BulkInsert::new_with_server_metadata(
508            builder.columns().to_vec(),
509            builder.options().batch_size,
510            raw_meta,
511            server_cols,
512        );
513
514        Ok(crate::bulk::BulkWriter::new(self, bulk))
515    }
516
517    /// Start a bulk insert without querying the server for column metadata.
518    ///
519    /// Unlike [`bulk_insert()`](Self::bulk_insert), this method does not send
520    /// `SELECT TOP 0 * FROM table` to discover column types. Instead, the
521    /// column metadata is constructed from the `BulkColumn` types provided
522    /// on the builder. This saves a round-trip when the schema is known.
523    ///
524    /// # Caveats
525    ///
526    /// The caller must ensure `BulkColumn` entries match the target table's
527    /// column definitions exactly. Mismatched types, lengths, precision/scale,
528    /// or column ordering will cause the server to reject the BulkLoad packet.
529    ///
530    /// For most use cases, prefer [`bulk_insert()`](Self::bulk_insert) — the
531    /// extra round-trip is usually negligible and the server-supplied metadata
532    /// is guaranteed correct.
533    pub async fn bulk_insert_without_schema_discovery(
534        &mut self,
535        builder: &crate::bulk::BulkInsertBuilder,
536    ) -> Result<crate::bulk::BulkWriter<'_, S>> {
537        tracing::debug!(
538            table = builder.table_name(),
539            columns = builder.columns().len(),
540            "starting bulk insert (no schema discovery)"
541        );
542
543        // Send INSERT BULK statement to put server in bulk load mode
544        let stmt = builder.build_insert_bulk_statement()?;
545        self.send_sql_batch(&stmt).await?;
546        self.read_execute_result().await?;
547
548        // Create bulk writer with hand-crafted metadata
549        let bulk =
550            crate::bulk::BulkInsert::new(builder.columns().to_vec(), builder.options().batch_size);
551
552        Ok(crate::bulk::BulkWriter::new(self, bulk))
553    }
554
555    /// Send bulk load data as a BulkLoad (0x07) message and read the server response.
556    ///
557    /// Used internally by [`crate::bulk::BulkWriter::finish()`] to transmit accumulated
558    /// row data after the `INSERT BULK` statement has been acknowledged.
559    pub(crate) async fn send_and_read_bulk_load(&mut self, payload: bytes::Bytes) -> Result<u64> {
560        let max_packet = self.config.packet_size as usize;
561
562        self.in_flight = true;
563        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
564
565        match connection {
566            #[cfg(feature = "tls")]
567            ConnectionHandle::Tls(conn) => {
568                conn.send_message(PacketType::BulkLoad, payload, max_packet)
569                    .await?;
570            }
571            #[cfg(feature = "tls")]
572            ConnectionHandle::TlsPrelogin(conn) => {
573                conn.send_message(PacketType::BulkLoad, payload, max_packet)
574                    .await?;
575            }
576            ConnectionHandle::Plain(conn) => {
577                conn.send_message(PacketType::BulkLoad, payload, max_packet)
578                    .await?;
579            }
580        }
581
582        // Read the server's Done response with row count
583        self.read_execute_result().await
584    }
585
586    /// Execute a query with named parameters and return a streaming result set.
587    ///
588    /// This method accepts [`NamedParam`](crate::to_params::NamedParam) values,
589    /// making it compatible with the [`ToParams`](crate::to_params::ToParams) trait
590    /// and the `#[derive(ToParams)]` macro.
591    ///
592    /// # Example
593    ///
594    /// ```rust,no_run
595    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
596    /// use mssql_client::{NamedParam, ToParams};
597    ///
598    /// // With derive macro:
599    /// #[derive(mssql_derive::ToParams)]
600    /// struct UserQuery { name: String }
601    ///
602    /// let q = UserQuery { name: "Alice".into() };
603    /// let rows = client.query_named(
604    ///     "SELECT * FROM users WHERE name = @name",
605    ///     &q.to_params()?,
606    /// ).await?;
607    ///
608    /// // Or manually:
609    /// let params = vec![NamedParam::from_value("name", &"Alice")?];
610    /// let rows = client.query_named(
611    ///     "SELECT * FROM users WHERE name = @name",
612    ///     &params,
613    /// ).await?;
614    /// # let _ = rows;
615    /// # Ok(())
616    /// # }
617    /// ```
618    pub async fn query_named<'a>(
619        &'a mut self,
620        sql: &str,
621        params: &[crate::to_params::NamedParam],
622    ) -> Result<QueryStream<'a>> {
623        tracing::debug!(
624            sql = sql,
625            params_count = params.len(),
626            "executing query with named parameters"
627        );
628
629        if params.is_empty() {
630            self.send_sql_batch(sql).await?;
631        } else {
632            let rpc_params =
633                Self::convert_named_params(params, self.send_unicode(), self.server_collation())?;
634            let rpc = RpcRequest::execute_sql(sql, rpc_params);
635            self.send_rpc(&rpc).await?;
636        }
637
638        let resp = self.read_query_response().await?;
639        #[cfg(feature = "always-encrypted")]
640        {
641            Ok(QueryStream::from_raw(
642                resp.columns,
643                resp.pending_rows,
644                resp.meta,
645                resp.decryptor,
646            ))
647        }
648        #[cfg(not(feature = "always-encrypted"))]
649        {
650            Ok(QueryStream::from_raw(
651                resp.columns,
652                resp.pending_rows,
653                resp.meta,
654            ))
655        }
656    }
657
658    /// Execute a statement with named parameters.
659    ///
660    /// Returns the number of affected rows. This is the named-parameter
661    /// counterpart of [`execute()`](Client::execute), compatible with the
662    /// [`ToParams`](crate::to_params::ToParams) trait.
663    ///
664    /// # Example
665    ///
666    /// ```rust,no_run
667    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
668    /// use mssql_client::NamedParam;
669    ///
670    /// let params = vec![
671    ///     NamedParam::from_value("name", &"Alice")?,
672    ///     NamedParam::from_value("email", &"alice@example.com")?,
673    /// ];
674    /// let rows_affected = client.execute_named(
675    ///     "INSERT INTO users (name, email) VALUES (@name, @email)",
676    ///     &params,
677    /// ).await?;
678    /// # let _ = rows_affected;
679    /// # Ok(())
680    /// # }
681    /// ```
682    pub async fn execute_named(
683        &mut self,
684        sql: &str,
685        params: &[crate::to_params::NamedParam],
686    ) -> Result<u64> {
687        tracing::debug!(
688            sql = sql,
689            params_count = params.len(),
690            "executing statement with named parameters"
691        );
692
693        let deadline = self.command_deadline();
694        let canceller = self.connection_cancel_handle();
695        run_with_deadline(
696            async {
697                if params.is_empty() {
698                    self.send_sql_batch(sql).await?;
699                } else {
700                    let rpc_params = Self::convert_named_params(
701                        params,
702                        self.send_unicode(),
703                        self.server_collation(),
704                    )?;
705                    let rpc = RpcRequest::execute_sql(sql, rpc_params);
706                    self.send_rpc(&rpc).await?;
707                }
708
709                self.read_execute_result().await
710            },
711            deadline,
712            canceller,
713        )
714        .await
715    }
716
717    /// Whether string parameters are sent as NVARCHAR (Unicode).
718    pub(crate) fn send_unicode(&self) -> bool {
719        self.config.send_string_parameters_as_unicode
720    }
721
722    /// Server's default collation, captured from ENVCHANGE during login.
723    pub(crate) fn server_collation(&self) -> Option<&tds_protocol::token::Collation> {
724        self.server_collation.as_ref()
725    }
726}
727
728impl Client<Ready> {
729    /// Mark this connection as needing a reset on next use.
730    ///
731    /// Called by the connection pool when a connection is returned.
732    /// The next SQL batch or RPC will include the RESETCONNECTION flag
733    /// in the TDS packet header, causing SQL Server to reset connection
734    /// state (temp tables, SET options, transaction isolation level, etc.)
735    /// before executing the command.
736    ///
737    /// This is more efficient than calling `sp_reset_connection` as a
738    /// separate command because it's handled at the TDS protocol level.
739    pub fn mark_needs_reset(&mut self) {
740        self.needs_reset = true;
741    }
742
743    /// Check if this connection needs a reset.
744    ///
745    /// Returns true if `mark_needs_reset()` was called and the reset
746    /// hasn't been performed yet.
747    #[must_use]
748    pub fn needs_reset(&self) -> bool {
749        self.needs_reset
750    }
751
752    /// Execute a query and return a result set with lazy per-row decoding.
753    ///
754    /// Per ADR-007 the full response is buffered in memory and each row is
755    /// *decoded* on demand as you iterate — this is not incremental network
756    /// streaming, so peak memory tracks the response size. Use
757    /// `.collect_all()` if you want all rows materialized into a `Vec` up
758    /// front.
759    ///
760    /// # Example
761    ///
762    /// ```rust,no_run
763    /// # use mssql_client::Row;
764    /// # fn process(_: &Row) {}
765    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
766    /// // Streaming (synchronous iteration over the result set)
767    /// let stream = client.query("SELECT * FROM users WHERE id = @p1", &[&1]).await?;
768    /// for row in stream {
769    ///     let row = row?;
770    ///     process(&row);
771    /// }
772    ///
773    /// // Buffered (loads all into memory)
774    /// let rows: Vec<Row> = client
775    ///     .query("SELECT * FROM small_table", &[])
776    ///     .await?
777    ///     .collect_all()
778    ///     .await?;
779    /// # let _ = rows;
780    /// # Ok(())
781    /// # }
782    /// ```
783    pub async fn query<'a>(
784        &'a mut self,
785        sql: &str,
786        params: &[&(dyn crate::ToSql + Sync)],
787    ) -> Result<QueryStream<'a>> {
788        let deadline = self.command_deadline();
789        self.query_inner(sql, params, deadline).await
790    }
791
792    /// Shared query implementation with an explicit command deadline.
793    async fn query_inner<'a>(
794        &'a mut self,
795        sql: &str,
796        params: &[&(dyn crate::ToSql + Sync)],
797        deadline: Option<std::time::Duration>,
798    ) -> Result<QueryStream<'a>> {
799        tracing::debug!(sql = sql, params_count = params.len(), "executing query");
800
801        #[cfg(feature = "otel")]
802        let instrumentation = self.instrumentation.clone();
803        #[cfg(feature = "otel")]
804        let mut span = instrumentation.query_span(sql);
805        #[cfg(feature = "otel")]
806        let timer = crate::instrumentation::OperationTimer::start(
807            crate::instrumentation::extract_operation(sql),
808        );
809
810        let canceller = self.cancel_handle();
811        let result = run_with_deadline(
812            async {
813                if params.is_empty() {
814                    // Simple query without parameters - use SQL batch
815                    self.send_sql_batch(sql).await?;
816                } else {
817                    // Parameterized query - use sp_executesql via RPC
818                    let rpc_params =
819                        Self::convert_params(params, self.send_unicode(), self.server_collation())?;
820                    let rpc = RpcRequest::execute_sql(sql, rpc_params);
821                    self.send_rpc(&rpc).await?;
822                }
823
824                // Read complete response including columns and rows
825                self.read_query_response().await
826            },
827            deadline,
828            canceller,
829        )
830        .await;
831
832        #[cfg(feature = "otel")]
833        match &result {
834            Ok(_) => InstrumentationContext::record_success(&mut span, None),
835            Err(e) => InstrumentationContext::record_error(&mut span, e),
836        }
837        #[cfg(feature = "otel")]
838        timer.finish(instrumentation.metrics(), result.is_ok());
839
840        // Drop the span before returning
841        #[cfg(feature = "otel")]
842        drop(span);
843
844        let resp = result?;
845        #[cfg(feature = "always-encrypted")]
846        {
847            Ok(QueryStream::from_raw(
848                resp.columns,
849                resp.pending_rows,
850                resp.meta,
851                resp.decryptor,
852            ))
853        }
854        #[cfg(not(feature = "always-encrypted"))]
855        {
856            Ok(QueryStream::from_raw(
857                resp.columns,
858                resp.pending_rows,
859                resp.meta,
860            ))
861        }
862    }
863
864    /// Execute a query with a specific timeout.
865    ///
866    /// This overrides the default `command_timeout` from the connection configuration
867    /// for this specific query. If the query does not complete within the specified
868    /// duration, the driver sends an Attention packet to cancel it server-side,
869    /// drains the acknowledgement, and returns [`Error::CommandTimeout`] with the
870    /// connection left usable for the next request.
871    ///
872    /// # Arguments
873    ///
874    /// * `sql` - The SQL query to execute
875    /// * `params` - Query parameters
876    /// * `timeout_duration` - Maximum time to wait for the query to complete
877    ///
878    /// # Example
879    ///
880    /// ```rust,no_run
881    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
882    /// use std::time::Duration;
883    ///
884    /// // Execute with a 5-second timeout
885    /// let rows = client
886    ///     .query_with_timeout(
887    ///         "SELECT * FROM large_table",
888    ///         &[],
889    ///         Duration::from_secs(5),
890    ///     )
891    ///     .await?;
892    /// # let _ = rows;
893    /// # Ok(())
894    /// # }
895    /// ```
896    pub async fn query_with_timeout<'a>(
897        &'a mut self,
898        sql: &str,
899        params: &[&(dyn crate::ToSql + Sync)],
900        timeout_duration: std::time::Duration,
901    ) -> Result<QueryStream<'a>> {
902        self.query_inner(sql, params, Some(timeout_duration)).await
903    }
904
905    /// Execute a batch that may return multiple result sets.
906    ///
907    /// This is useful for stored procedures or SQL batches that contain
908    /// multiple SELECT statements.
909    ///
910    /// # Example
911    ///
912    /// ```rust,no_run
913    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
914    /// // Execute a batch with multiple SELECT statements
915    /// let mut results = client.query_multiple(
916    ///     "SELECT 1 AS a; SELECT 2 AS b, 3 AS c;",
917    ///     &[]
918    /// ).await?;
919    ///
920    /// // Process first result set
921    /// while let Some(row) = results.next_row().await? {
922    ///     println!("Result 1: {:?}", row);
923    /// }
924    ///
925    /// // Move to second result set
926    /// if results.next_result().await? {
927    ///     while let Some(row) = results.next_row().await? {
928    ///         println!("Result 2: {:?}", row);
929    ///     }
930    /// }
931    /// # Ok(())
932    /// # }
933    /// ```
934    pub async fn query_multiple<'a>(
935        &'a mut self,
936        sql: &str,
937        params: &[&(dyn crate::ToSql + Sync)],
938    ) -> Result<MultiResultStream<'a>> {
939        tracing::debug!(
940            sql = sql,
941            params_count = params.len(),
942            "executing multi-result query"
943        );
944
945        let deadline = self.command_deadline();
946        let canceller = self.connection_cancel_handle();
947        let result_sets = run_with_deadline(
948            async {
949                if params.is_empty() {
950                    // Simple batch without parameters - use SQL batch
951                    self.send_sql_batch(sql).await?;
952                } else {
953                    // Parameterized query - use sp_executesql via RPC
954                    let rpc_params =
955                        Self::convert_params(params, self.send_unicode(), self.server_collation())?;
956                    let rpc = RpcRequest::execute_sql(sql, rpc_params);
957                    self.send_rpc(&rpc).await?;
958                }
959
960                // Read all result sets
961                self.read_multi_result_response().await
962            },
963            deadline,
964            canceller,
965        )
966        .await?;
967        Ok(MultiResultStream::new(result_sets))
968    }
969
970    /// Execute a query that doesn't return rows.
971    ///
972    /// Returns the number of affected rows.
973    pub async fn execute(
974        &mut self,
975        sql: &str,
976        params: &[&(dyn crate::ToSql + Sync)],
977    ) -> Result<u64> {
978        let deadline = self.command_deadline();
979        self.execute_inner(sql, params, deadline).await
980    }
981
982    /// Shared execute implementation with an explicit command deadline.
983    async fn execute_inner(
984        &mut self,
985        sql: &str,
986        params: &[&(dyn crate::ToSql + Sync)],
987        deadline: Option<std::time::Duration>,
988    ) -> Result<u64> {
989        tracing::debug!(
990            sql = sql,
991            params_count = params.len(),
992            "executing statement"
993        );
994
995        #[cfg(feature = "otel")]
996        let instrumentation = self.instrumentation.clone();
997        #[cfg(feature = "otel")]
998        let mut span = instrumentation.query_span(sql);
999        #[cfg(feature = "otel")]
1000        let timer = crate::instrumentation::OperationTimer::start(
1001            crate::instrumentation::extract_operation(sql),
1002        );
1003
1004        let canceller = self.cancel_handle();
1005        let result = run_with_deadline(
1006            async {
1007                if params.is_empty() {
1008                    // Simple statement without parameters - use SQL batch
1009                    self.send_sql_batch(sql).await?;
1010                } else {
1011                    // Parameterized statement - use sp_executesql via RPC
1012                    let rpc_params =
1013                        Self::convert_params(params, self.send_unicode(), self.server_collation())?;
1014                    let rpc = RpcRequest::execute_sql(sql, rpc_params);
1015                    self.send_rpc(&rpc).await?;
1016                }
1017
1018                // Read response and get row count
1019                self.read_execute_result().await
1020            },
1021            deadline,
1022            canceller,
1023        )
1024        .await;
1025
1026        #[cfg(feature = "otel")]
1027        match &result {
1028            Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
1029            Err(e) => InstrumentationContext::record_error(&mut span, e),
1030        }
1031        #[cfg(feature = "otel")]
1032        timer.finish(instrumentation.metrics(), result.is_ok());
1033
1034        // Drop the span before returning
1035        #[cfg(feature = "otel")]
1036        drop(span);
1037
1038        result
1039    }
1040
1041    /// Execute a statement with a specific timeout.
1042    ///
1043    /// This overrides the default `command_timeout` from the connection configuration
1044    /// for this specific statement. If the statement does not complete within the
1045    /// specified duration, the driver sends an Attention packet to cancel it
1046    /// server-side, drains the acknowledgement, and returns
1047    /// [`Error::CommandTimeout`] with the connection left usable.
1048    ///
1049    /// # Arguments
1050    ///
1051    /// * `sql` - The SQL statement to execute
1052    /// * `params` - Statement parameters
1053    /// * `timeout_duration` - Maximum time to wait for the statement to complete
1054    ///
1055    /// # Example
1056    ///
1057    /// ```rust,no_run
1058    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1059    /// use std::time::Duration;
1060    ///
1061    /// // Execute with a 10-second timeout
1062    /// let rows_affected = client
1063    ///     .execute_with_timeout(
1064    ///         "UPDATE large_table SET status = @p1",
1065    ///         &[&"processed"],
1066    ///         Duration::from_secs(10),
1067    ///     )
1068    ///     .await?;
1069    /// # let _ = rows_affected;
1070    /// # Ok(())
1071    /// # }
1072    /// ```
1073    pub async fn execute_with_timeout(
1074        &mut self,
1075        sql: &str,
1076        params: &[&(dyn crate::ToSql + Sync)],
1077        timeout_duration: std::time::Duration,
1078    ) -> Result<u64> {
1079        self.execute_inner(sql, params, Some(timeout_duration))
1080            .await
1081    }
1082
1083    /// Begin a transaction.
1084    ///
1085    /// This transitions the client from `Ready` to `InTransaction` state.
1086    /// Per MS-TDS spec, the server returns a transaction descriptor in the
1087    /// BeginTransaction EnvChange token that must be included in subsequent
1088    /// ALL_HEADERS sections.
1089    pub async fn begin_transaction(mut self) -> Result<Client<InTransaction>> {
1090        tracing::debug!("beginning transaction");
1091
1092        #[cfg(feature = "otel")]
1093        let instrumentation = self.instrumentation.clone();
1094        #[cfg(feature = "otel")]
1095        let mut span = instrumentation.transaction_span("BEGIN");
1096
1097        // Execute BEGIN TRANSACTION and extract the transaction descriptor
1098        let result = async {
1099            self.send_sql_batch("BEGIN TRANSACTION").await?;
1100            self.read_transaction_begin_result().await
1101        }
1102        .await;
1103
1104        #[cfg(feature = "otel")]
1105        match &result {
1106            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1107            Err(e) => InstrumentationContext::record_error(&mut span, e),
1108        }
1109
1110        // Drop the span before moving instrumentation
1111        #[cfg(feature = "otel")]
1112        drop(span);
1113
1114        let transaction_descriptor = result?;
1115
1116        Ok(Client {
1117            config: self.config,
1118            _state: PhantomData,
1119            connection: self.connection,
1120            server_version: self.server_version,
1121            current_database: self.current_database,
1122            server_collation: self.server_collation,
1123            statement_cache: self.statement_cache,
1124            transaction_descriptor, // Store the descriptor from server
1125            needs_reset: self.needs_reset,
1126            in_flight: self.in_flight,
1127            #[cfg(feature = "otel")]
1128            instrumentation: self.instrumentation,
1129            #[cfg(feature = "always-encrypted")]
1130            encryption_context: self.encryption_context,
1131        })
1132    }
1133
1134    /// Begin a transaction with a specific isolation level.
1135    ///
1136    /// This transitions the client from `Ready` to `InTransaction` state
1137    /// with the specified isolation level.
1138    ///
1139    /// # Example
1140    ///
1141    /// ```rust,no_run
1142    /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1143    /// use mssql_client::IsolationLevel;
1144    ///
1145    /// let tx = client.begin_transaction_with_isolation(IsolationLevel::Serializable).await?;
1146    /// // All operations in this transaction use SERIALIZABLE isolation
1147    /// tx.commit().await?;
1148    /// # Ok(())
1149    /// # }
1150    /// ```
1151    pub async fn begin_transaction_with_isolation(
1152        mut self,
1153        isolation_level: crate::transaction::IsolationLevel,
1154    ) -> Result<Client<InTransaction>> {
1155        tracing::debug!(
1156            isolation_level = %isolation_level.name(),
1157            "beginning transaction with isolation level"
1158        );
1159
1160        #[cfg(feature = "otel")]
1161        let instrumentation = self.instrumentation.clone();
1162        #[cfg(feature = "otel")]
1163        let mut span = instrumentation.transaction_span("BEGIN");
1164
1165        // First set the isolation level
1166        let result = async {
1167            self.send_sql_batch(isolation_level.as_sql()).await?;
1168            self.read_execute_result().await?;
1169
1170            // Then begin the transaction
1171            self.send_sql_batch("BEGIN TRANSACTION").await?;
1172            self.read_transaction_begin_result().await
1173        }
1174        .await;
1175
1176        #[cfg(feature = "otel")]
1177        match &result {
1178            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1179            Err(e) => InstrumentationContext::record_error(&mut span, e),
1180        }
1181
1182        #[cfg(feature = "otel")]
1183        drop(span);
1184
1185        let transaction_descriptor = result?;
1186
1187        Ok(Client {
1188            config: self.config,
1189            _state: PhantomData,
1190            connection: self.connection,
1191            server_version: self.server_version,
1192            current_database: self.current_database,
1193            server_collation: self.server_collation,
1194            statement_cache: self.statement_cache,
1195            transaction_descriptor,
1196            needs_reset: self.needs_reset,
1197            in_flight: self.in_flight,
1198            #[cfg(feature = "otel")]
1199            instrumentation: self.instrumentation,
1200            #[cfg(feature = "always-encrypted")]
1201            encryption_context: self.encryption_context,
1202        })
1203    }
1204
1205    /// Execute a simple query without parameters.
1206    ///
1207    /// This is useful for DDL statements and simple queries where you
1208    /// don't need to retrieve the affected row count.
1209    pub async fn simple_query(&mut self, sql: &str) -> Result<()> {
1210        tracing::debug!(sql = sql, "executing simple query");
1211
1212        // Send SQL batch
1213        self.send_sql_batch(sql).await?;
1214
1215        // Read and discard response
1216        let _ = self.read_execute_result().await?;
1217
1218        Ok(())
1219    }
1220
1221    /// Close the connection gracefully.
1222    pub async fn close(self) -> Result<()> {
1223        tracing::debug!("closing connection");
1224        Ok(())
1225    }
1226
1227    /// Get the current database name.
1228    #[must_use]
1229    pub fn database(&self) -> Option<&str> {
1230        self.config.database.as_deref()
1231    }
1232
1233    /// Get the server host.
1234    #[must_use]
1235    pub fn host(&self) -> &str {
1236        &self.config.host
1237    }
1238
1239    /// Get the server port.
1240    #[must_use]
1241    pub fn port(&self) -> u16 {
1242        self.config.port
1243    }
1244
1245    /// Check if the connection is currently in a transaction.
1246    ///
1247    /// This returns `true` if a transaction was started via raw SQL
1248    /// (`BEGIN TRANSACTION`) and has not yet been committed or rolled back.
1249    ///
1250    /// Note: This only tracks transactions started via raw SQL. Transactions
1251    /// started via the type-state API (`begin_transaction()`) result in a
1252    /// `Client<InTransaction>` which is a different type.
1253    ///
1254    /// # Example
1255    ///
1256    /// ```rust,no_run
1257    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1258    /// client.execute("BEGIN TRANSACTION", &[]).await?;
1259    /// assert!(client.is_in_transaction());
1260    ///
1261    /// client.execute("COMMIT", &[]).await?;
1262    /// assert!(!client.is_in_transaction());
1263    /// # Ok(())
1264    /// # }
1265    /// ```
1266    #[must_use]
1267    pub fn is_in_transaction(&self) -> bool {
1268        self.transaction_descriptor != 0
1269    }
1270
1271    /// Check if a request is in-flight (sent but response not fully read).
1272    ///
1273    /// Used by the connection pool to detect dirty connections that were
1274    /// interrupted mid-query (e.g., by `tokio::select!` or a timeout).
1275    /// A connection with an in-flight request has unread data in the TCP
1276    /// buffer and must be discarded rather than returned to the pool.
1277    #[must_use]
1278    pub fn is_in_flight(&self) -> bool {
1279        self.in_flight
1280    }
1281
1282    /// Report whether an Always Encrypted key-store provider with the given
1283    /// name is currently reachable through this client's encryption context.
1284    ///
1285    /// Returns `false` when the `always-encrypted` feature isn't enabled, when
1286    /// the connection was opened without `column_encryption` configured, or
1287    /// when no matching provider was registered.
1288    #[cfg(feature = "always-encrypted")]
1289    #[must_use]
1290    pub fn has_encryption_provider(&self, name: &str) -> bool {
1291        self.encryption_context
1292            .as_ref()
1293            .is_some_and(|ctx| ctx.has_provider(name))
1294    }
1295
1296    /// Get a handle for cancelling the current query.
1297    ///
1298    /// The cancel handle can be cloned and sent to other tasks, enabling
1299    /// cancellation of long-running queries from a separate async context.
1300    ///
1301    /// # Example
1302    ///
1303    /// ```rust,no_run
1304    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1305    /// use std::time::Duration;
1306    ///
1307    /// let cancel_handle = client.cancel_handle();
1308    ///
1309    /// // Spawn a task to cancel after 10 seconds
1310    /// let handle = tokio::spawn(async move {
1311    ///     tokio::time::sleep(Duration::from_secs(10)).await;
1312    ///     let _ = cancel_handle.cancel().await;
1313    /// });
1314    ///
1315    /// // This query will be cancelled if it runs longer than 10 seconds
1316    /// let result = client.query("SELECT * FROM very_large_table", &[]).await;
1317    /// # let _ = (handle, result);
1318    /// # Ok(())
1319    /// # }
1320    /// ```
1321    #[must_use]
1322    pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
1323        self.connection_cancel_handle()
1324    }
1325}
1326
1327/// # Drop Behavior
1328///
1329/// **`Client<InTransaction>` has no automatic rollback on drop.** If the client is
1330/// dropped without calling [`commit()`](Client::commit) or [`rollback()`](Client::rollback),
1331/// the transaction remains open on the server until the TCP connection closes
1332/// (at which point SQL Server automatically rolls back).
1333///
1334/// This is because `Drop` is synchronous and cannot perform the async I/O needed
1335/// to send a `ROLLBACK TRANSACTION` command.
1336///
1337/// ## Consequences of dropping without commit/rollback
1338///
1339/// - **Direct connections:** The transaction leaks until the OS TCP timeout
1340///   (potentially 30+ minutes), holding locks on any modified rows.
1341/// - **Pooled connections:** The pool detects the active transaction descriptor
1342///   and discards the connection rather than returning it to the idle pool
1343///   (see `PooledConnection::drop` in `mssql-driver-pool`).
1344///
1345/// ## Best practice
1346///
1347/// Always ensure `commit()` or `rollback()` is called. Use helper patterns
1348/// for error paths:
1349///
1350/// ```rust,no_run
1351/// # async fn do_work(_: &mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> { Ok(()) }
1352/// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1353/// let tx = client.begin_transaction().await?;
1354/// match do_work(&tx).await {
1355///     Ok(_) => { tx.commit().await?; }
1356///     Err(e) => { tx.rollback().await?; return Err(e); }
1357/// }
1358/// # Ok(())
1359/// # }
1360/// ```
1361impl Client<InTransaction> {
1362    /// Execute a query within the transaction and return a streaming result set.
1363    ///
1364    /// See [`Client<Ready>::query`] for usage examples.
1365    pub async fn query<'a>(
1366        &'a mut self,
1367        sql: &str,
1368        params: &[&(dyn crate::ToSql + Sync)],
1369    ) -> Result<QueryStream<'a>> {
1370        let deadline = self.command_deadline();
1371        self.query_inner(sql, params, deadline).await
1372    }
1373
1374    /// Shared query implementation with an explicit command deadline.
1375    async fn query_inner<'a>(
1376        &'a mut self,
1377        sql: &str,
1378        params: &[&(dyn crate::ToSql + Sync)],
1379        deadline: Option<std::time::Duration>,
1380    ) -> Result<QueryStream<'a>> {
1381        tracing::debug!(
1382            sql = sql,
1383            params_count = params.len(),
1384            "executing query in transaction"
1385        );
1386
1387        #[cfg(feature = "otel")]
1388        let instrumentation = self.instrumentation.clone();
1389        #[cfg(feature = "otel")]
1390        let mut span = instrumentation.query_span(sql);
1391        #[cfg(feature = "otel")]
1392        let timer = crate::instrumentation::OperationTimer::start(
1393            crate::instrumentation::extract_operation(sql),
1394        );
1395
1396        let canceller = self.cancel_handle();
1397        let result = run_with_deadline(
1398            async {
1399                if params.is_empty() {
1400                    // Simple query without parameters - use SQL batch
1401                    self.send_sql_batch(sql).await?;
1402                } else {
1403                    // Parameterized query - use sp_executesql via RPC
1404                    let rpc_params =
1405                        Self::convert_params(params, self.send_unicode(), self.server_collation())?;
1406                    let rpc = RpcRequest::execute_sql(sql, rpc_params);
1407                    self.send_rpc(&rpc).await?;
1408                }
1409
1410                // Read complete response including columns and rows
1411                self.read_query_response().await
1412            },
1413            deadline,
1414            canceller,
1415        )
1416        .await;
1417
1418        #[cfg(feature = "otel")]
1419        match &result {
1420            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1421            Err(e) => InstrumentationContext::record_error(&mut span, e),
1422        }
1423        #[cfg(feature = "otel")]
1424        timer.finish(instrumentation.metrics(), result.is_ok());
1425
1426        // Drop the span before returning
1427        #[cfg(feature = "otel")]
1428        drop(span);
1429
1430        let resp = result?;
1431        #[cfg(feature = "always-encrypted")]
1432        {
1433            Ok(QueryStream::from_raw(
1434                resp.columns,
1435                resp.pending_rows,
1436                resp.meta,
1437                resp.decryptor,
1438            ))
1439        }
1440        #[cfg(not(feature = "always-encrypted"))]
1441        {
1442            Ok(QueryStream::from_raw(
1443                resp.columns,
1444                resp.pending_rows,
1445                resp.meta,
1446            ))
1447        }
1448    }
1449
1450    /// Execute a statement within the transaction.
1451    ///
1452    /// Returns the number of affected rows.
1453    pub async fn execute(
1454        &mut self,
1455        sql: &str,
1456        params: &[&(dyn crate::ToSql + Sync)],
1457    ) -> Result<u64> {
1458        let deadline = self.command_deadline();
1459        self.execute_inner(sql, params, deadline).await
1460    }
1461
1462    /// Shared execute implementation with an explicit command deadline.
1463    async fn execute_inner(
1464        &mut self,
1465        sql: &str,
1466        params: &[&(dyn crate::ToSql + Sync)],
1467        deadline: Option<std::time::Duration>,
1468    ) -> Result<u64> {
1469        tracing::debug!(
1470            sql = sql,
1471            params_count = params.len(),
1472            "executing statement in transaction"
1473        );
1474
1475        #[cfg(feature = "otel")]
1476        let instrumentation = self.instrumentation.clone();
1477        #[cfg(feature = "otel")]
1478        let mut span = instrumentation.query_span(sql);
1479        #[cfg(feature = "otel")]
1480        let timer = crate::instrumentation::OperationTimer::start(
1481            crate::instrumentation::extract_operation(sql),
1482        );
1483
1484        let canceller = self.cancel_handle();
1485        let result = run_with_deadline(
1486            async {
1487                if params.is_empty() {
1488                    // Simple statement without parameters - use SQL batch
1489                    self.send_sql_batch(sql).await?;
1490                } else {
1491                    // Parameterized statement - use sp_executesql via RPC
1492                    let rpc_params =
1493                        Self::convert_params(params, self.send_unicode(), self.server_collation())?;
1494                    let rpc = RpcRequest::execute_sql(sql, rpc_params);
1495                    self.send_rpc(&rpc).await?;
1496                }
1497
1498                // Read response and get row count
1499                self.read_execute_result().await
1500            },
1501            deadline,
1502            canceller,
1503        )
1504        .await;
1505
1506        #[cfg(feature = "otel")]
1507        match &result {
1508            Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
1509            Err(e) => InstrumentationContext::record_error(&mut span, e),
1510        }
1511        #[cfg(feature = "otel")]
1512        timer.finish(instrumentation.metrics(), result.is_ok());
1513
1514        // Drop the span before returning
1515        #[cfg(feature = "otel")]
1516        drop(span);
1517
1518        result
1519    }
1520
1521    /// Execute a query within the transaction with a specific timeout.
1522    ///
1523    /// See [`Client<Ready>::query_with_timeout`] for details.
1524    pub async fn query_with_timeout<'a>(
1525        &'a mut self,
1526        sql: &str,
1527        params: &[&(dyn crate::ToSql + Sync)],
1528        timeout_duration: std::time::Duration,
1529    ) -> Result<QueryStream<'a>> {
1530        self.query_inner(sql, params, Some(timeout_duration)).await
1531    }
1532
1533    /// Execute a statement within the transaction with a specific timeout.
1534    ///
1535    /// See [`Client<Ready>::execute_with_timeout`] for details.
1536    pub async fn execute_with_timeout(
1537        &mut self,
1538        sql: &str,
1539        params: &[&(dyn crate::ToSql + Sync)],
1540        timeout_duration: std::time::Duration,
1541    ) -> Result<u64> {
1542        self.execute_inner(sql, params, Some(timeout_duration))
1543            .await
1544    }
1545
1546    /// Open a FILESTREAM BLOB for async reading and/or writing.
1547    ///
1548    /// This method queries the server for the transaction context, then opens
1549    /// the FILESTREAM handle using the native Win32 `OpenSqlFilestream` API.
1550    ///
1551    /// # Arguments
1552    ///
1553    /// * `path` — The UNC path obtained from the T-SQL `column.PathName()` function.
1554    ///   Query this yourself before calling `open_filestream`:
1555    ///   ```sql
1556    ///   SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1
1557    ///   ```
1558    /// * `access` — Read, write, or read/write access mode.
1559    ///
1560    /// # Requirements
1561    ///
1562    /// - SQL Server must have FILESTREAM enabled (`sp_configure 'filestream access level', 2`)
1563    /// - The Microsoft OLE DB Driver for SQL Server must be installed on the client
1564    /// - The `FileStream` must be dropped before calling [`commit`] or [`rollback`]
1565    ///
1566    /// # Example
1567    ///
1568    /// ```text
1569    /// use mssql_client::FileStreamAccess;
1570    /// use tokio::io::AsyncReadExt;
1571    ///
1572    /// let mut tx = client.begin_transaction().await?;
1573    ///
1574    /// // Get the FILESTREAM path
1575    /// let rows = tx.query(
1576    ///     "SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1",
1577    ///     &[&doc_id],
1578    /// ).await?;
1579    /// let path: String = rows.into_iter().next().unwrap()?.get(0)?;
1580    ///
1581    /// // Open and read the BLOB
1582    /// let mut stream = tx.open_filestream(&path, FileStreamAccess::Read).await?;
1583    /// let mut data = Vec::new();
1584    /// stream.read_to_end(&mut data).await?;
1585    /// drop(stream);
1586    ///
1587    /// tx.commit().await?;
1588    /// ```
1589    #[cfg(all(windows, feature = "filestream"))]
1590    pub async fn open_filestream(
1591        &mut self,
1592        path: &str,
1593        access: crate::filestream::FileStreamAccess,
1594    ) -> Result<crate::filestream::FileStream> {
1595        tracing::debug!(path = path, ?access, "opening FILESTREAM BLOB");
1596
1597        // Get the transaction context from SQL Server.
1598        // This binds the file access to the current SQL transaction.
1599        let txn_context: Vec<u8> = {
1600            let rows = self
1601                .query("SELECT GET_FILESTREAM_TRANSACTION_CONTEXT()", &[])
1602                .await?;
1603            let mut ctx = None;
1604            for result in rows {
1605                let row = result?;
1606                ctx = Some(row.get::<Vec<u8>>(0)?);
1607            }
1608            ctx.ok_or_else(|| {
1609                Error::FileStream("GET_FILESTREAM_TRANSACTION_CONTEXT() returned no rows".into())
1610            })?
1611        };
1612
1613        crate::filestream::FileStream::open(path, access, &txn_context)
1614    }
1615
1616    /// Commit the transaction.
1617    ///
1618    /// This transitions the client back to `Ready` state.
1619    pub async fn commit(mut self) -> Result<Client<Ready>> {
1620        tracing::debug!("committing transaction");
1621
1622        #[cfg(feature = "otel")]
1623        let instrumentation = self.instrumentation.clone();
1624        #[cfg(feature = "otel")]
1625        let mut span = instrumentation.transaction_span("COMMIT");
1626
1627        // Execute COMMIT TRANSACTION
1628        let result = async {
1629            self.send_sql_batch("COMMIT TRANSACTION").await?;
1630            self.read_execute_result().await
1631        }
1632        .await;
1633
1634        #[cfg(feature = "otel")]
1635        match &result {
1636            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1637            Err(e) => InstrumentationContext::record_error(&mut span, e),
1638        }
1639
1640        // Drop the span before moving instrumentation
1641        #[cfg(feature = "otel")]
1642        drop(span);
1643
1644        result?;
1645
1646        Ok(Client {
1647            config: self.config,
1648            _state: PhantomData,
1649            connection: self.connection,
1650            server_version: self.server_version,
1651            current_database: self.current_database,
1652            server_collation: self.server_collation,
1653            statement_cache: self.statement_cache,
1654            transaction_descriptor: 0, // Reset to auto-commit mode
1655            needs_reset: self.needs_reset,
1656            in_flight: self.in_flight,
1657            #[cfg(feature = "otel")]
1658            instrumentation: self.instrumentation,
1659            #[cfg(feature = "always-encrypted")]
1660            encryption_context: self.encryption_context,
1661        })
1662    }
1663
1664    /// Rollback the transaction.
1665    ///
1666    /// This transitions the client back to `Ready` state.
1667    pub async fn rollback(mut self) -> Result<Client<Ready>> {
1668        tracing::debug!("rolling back transaction");
1669
1670        #[cfg(feature = "otel")]
1671        let instrumentation = self.instrumentation.clone();
1672        #[cfg(feature = "otel")]
1673        let mut span = instrumentation.transaction_span("ROLLBACK");
1674
1675        // Execute ROLLBACK TRANSACTION
1676        let result = async {
1677            self.send_sql_batch("ROLLBACK TRANSACTION").await?;
1678            self.read_execute_result().await
1679        }
1680        .await;
1681
1682        #[cfg(feature = "otel")]
1683        match &result {
1684            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1685            Err(e) => InstrumentationContext::record_error(&mut span, e),
1686        }
1687
1688        // Drop the span before moving instrumentation
1689        #[cfg(feature = "otel")]
1690        drop(span);
1691
1692        result?;
1693
1694        Ok(Client {
1695            config: self.config,
1696            _state: PhantomData,
1697            connection: self.connection,
1698            server_version: self.server_version,
1699            current_database: self.current_database,
1700            server_collation: self.server_collation,
1701            statement_cache: self.statement_cache,
1702            transaction_descriptor: 0, // Reset to auto-commit mode
1703            needs_reset: self.needs_reset,
1704            in_flight: self.in_flight,
1705            #[cfg(feature = "otel")]
1706            instrumentation: self.instrumentation,
1707            #[cfg(feature = "always-encrypted")]
1708            encryption_context: self.encryption_context,
1709        })
1710    }
1711
1712    /// Create a savepoint and return a handle for later rollback.
1713    ///
1714    /// The returned `SavePoint` handle contains the validated savepoint name.
1715    /// Use it with `rollback_to()` to partially undo transaction work.
1716    ///
1717    /// # Example
1718    ///
1719    /// ```rust,no_run
1720    /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1721    /// let mut tx = client.begin_transaction().await?;
1722    /// tx.execute("INSERT INTO orders ...", &[]).await?;
1723    /// let sp = tx.save_point("before_items").await?;
1724    /// tx.execute("INSERT INTO items ...", &[]).await?;
1725    /// // Oops, rollback just the items
1726    /// tx.rollback_to(&sp).await?;
1727    /// tx.commit().await?;
1728    /// # Ok(())
1729    /// # }
1730    /// ```
1731    pub async fn save_point(&mut self, name: &str) -> Result<SavePoint> {
1732        crate::validation::validate_identifier(name)?;
1733        tracing::debug!(name = name, "creating savepoint");
1734
1735        // Execute SAVE TRANSACTION <name>
1736        // Note: name is validated by validate_identifier() to prevent SQL injection
1737        let sql = format!("SAVE TRANSACTION {name}");
1738        self.send_sql_batch(&sql).await?;
1739        self.read_execute_result().await?;
1740
1741        Ok(SavePoint::new(name.to_string()))
1742    }
1743
1744    /// Rollback to a savepoint.
1745    ///
1746    /// This rolls back all changes made after the savepoint was created,
1747    /// but keeps the transaction active. The savepoint remains valid and
1748    /// can be rolled back to again.
1749    ///
1750    /// # Example
1751    ///
1752    /// ```rust,no_run
1753    /// # async fn ex(mut tx: mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> {
1754    /// let sp = tx.save_point("checkpoint").await?;
1755    /// // ... do some work ...
1756    /// tx.rollback_to(&sp).await?;  // Undo changes since checkpoint
1757    /// // Transaction is still active, savepoint is still valid
1758    /// # Ok(())
1759    /// # }
1760    /// ```
1761    pub async fn rollback_to(&mut self, savepoint: &SavePoint) -> Result<()> {
1762        tracing::debug!(name = savepoint.name(), "rolling back to savepoint");
1763
1764        // Execute ROLLBACK TRANSACTION <name>
1765        // Note: savepoint name was validated during creation
1766        let sql = format!("ROLLBACK TRANSACTION {}", savepoint.name());
1767        self.send_sql_batch(&sql).await?;
1768        self.read_execute_result().await?;
1769
1770        Ok(())
1771    }
1772
1773    /// Release a savepoint (optional cleanup).
1774    ///
1775    /// Note: SQL Server doesn't have explicit savepoint release, but this
1776    /// method is provided for API completeness. The savepoint is automatically
1777    /// released when the transaction commits or rolls back.
1778    pub async fn release_savepoint(&mut self, savepoint: SavePoint) -> Result<()> {
1779        tracing::debug!(name = savepoint.name(), "releasing savepoint");
1780
1781        // SQL Server doesn't require explicit savepoint release
1782        // The savepoint is implicitly released on commit/rollback
1783        // This method exists for API completeness
1784        drop(savepoint);
1785        Ok(())
1786    }
1787
1788    /// Get a handle for cancelling the current query within the transaction.
1789    ///
1790    /// See [`Client<Ready>::cancel_handle`] for usage examples.
1791    #[must_use]
1792    pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
1793        self.connection_cancel_handle()
1794    }
1795}
1796
1797impl<S: ConnectionState> std::fmt::Debug for Client<S> {
1798    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1799        f.debug_struct("Client")
1800            .field("host", &self.config.host)
1801            .field("port", &self.config.port)
1802            .field("database", &self.config.database)
1803            .finish()
1804    }
1805}