Skip to main content

mssql_client/
client.rs

1//! SQL Server client implementation.
2//!
3//! ## DDL and statement routing
4//!
5//! [`Client::execute`] routes automatically by parameter count: with no
6//! parameters it sends a SQL batch (which permits DDL such as `CREATE` / `ALTER`
7//! / `DROP`); with parameters it uses `sp_executesql`, whose procedure context
8//! SQL Server forbids DDL in. Run DDL with an empty parameter slice:
9//!
10//! ```rust,no_run
11//! # async fn create_table(config: mssql_client::Config) -> Result<(), mssql_client::Error> {
12//! # let mut client = mssql_client::Client::connect(config).await?;
13//! client.execute("CREATE TABLE dbo.t (id INT)", &[]).await?;
14//! # Ok(())
15//! # }
16//! ```
17//!
18//! Use [`Client::simple_query`] for fire-and-forget batches (including
19//! multi-statement, `;`-separated DDL) when you don't need the affected-row count.
20
21// Allow unwrap/expect for chrono date construction with known-valid constant dates
22// and for regex patterns that are compile-time constants
23#![allow(clippy::unwrap_used, clippy::expect_used, clippy::needless_range_loop)]
24
25mod connect;
26mod params;
27mod response;
28
29use std::marker::PhantomData;
30
31use mssql_codec::connection::Connection;
32#[cfg(feature = "tls")]
33use mssql_tls::TlsStream;
34use tds_protocol::packet::PacketType;
35use tds_protocol::rpc::RpcRequest;
36use tds_protocol::token::{EnvChange, EnvChangeType};
37use tokio::net::TcpStream;
38
39use crate::config::Config;
40use crate::error::{Error, Result};
41#[cfg(feature = "otel")]
42use crate::instrumentation::InstrumentationContext;
43use crate::state::{ConnectionState, InTransaction, Ready};
44use crate::statement_cache::StatementCache;
45use crate::stream::{MultiResultStream, QueryStream};
46use crate::transaction::SavePoint;
47
48/// How long to wait for the server to acknowledge an Attention packet after
49/// a command timeout. SqlClient waits 5 seconds before dooming the
50/// connection; we match it.
51const ATTENTION_ACK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
52
53/// Run a network future under an optional command deadline.
54///
55/// On timeout this sends an Attention packet via `canceller` and then awaits
56/// the future so its own read loop drains the server's DONE_ATTN
57/// acknowledgement, leaving the connection clean before returning
58/// [`Error::CommandTimeout`]. This is the cancel-safe alternative to dropping
59/// the future (e.g. via `tokio::time::timeout`), which would leave unconsumed
60/// TDS data in the connection buffer and desync the next request.
61///
62/// The drain itself is bounded by [`ATTENTION_ACK_TIMEOUT`] — a hung server
63/// that never acknowledges the attention must not turn the timeout into an
64/// infinite wait. When the bound expires the connection is abandoned
65/// mid-response: `in_flight` stays set, so the pool discards the connection
66/// at check-in instead of reusing it.
67pub(crate) async fn run_with_deadline<F, T>(
68    fut: F,
69    deadline: Option<std::time::Duration>,
70    canceller: crate::cancel::CancelHandle,
71) -> Result<T>
72where
73    F: std::future::Future<Output = Result<T>>,
74{
75    let Some(d) = deadline else {
76        return fut.await;
77    };
78    tokio::pin!(fut);
79    tokio::select! {
80        biased;
81        res = &mut fut => res,
82        () = tokio::time::sleep(d) => {
83            // Signal cancellation, then let the in-flight read consume the
84            // server's attention acknowledgement so the connection stays usable.
85            let drain = async {
86                let _ = canceller.cancel().await;
87                let _ = (&mut fut).await;
88            };
89            if tokio::time::timeout(ATTENTION_ACK_TIMEOUT, drain).await.is_err() {
90                tracing::warn!(
91                    timeout = ?ATTENTION_ACK_TIMEOUT,
92                    "server did not acknowledge attention; abandoning the connection as dirty"
93                );
94            }
95            Err(Error::CommandTimeout)
96        }
97    }
98}
99
100/// SQL Server client with type-state connection management.
101///
102/// The generic parameter `S` represents the current connection state,
103/// ensuring at compile time that certain operations are only available
104/// in appropriate states.
105pub struct Client<S: ConnectionState> {
106    config: Config,
107    _state: PhantomData<S>,
108    /// The underlying connection (present only when connected)
109    connection: Option<ConnectionHandle>,
110    /// Server version from LoginAck (raw u32 TDS version)
111    server_version: Option<u32>,
112    /// Current database from EnvChange
113    current_database: Option<String>,
114    /// Server's default collation from SqlCollation EnvChange during login.
115    /// Used when `SendStringParametersAsUnicode=false` to encode VARCHAR
116    /// parameters with the correct character encoding and collation bytes.
117    server_collation: Option<tds_protocol::token::Collation>,
118    /// Prepared statement cache for query optimization
119    statement_cache: StatementCache,
120    /// Transaction descriptor from BeginTransaction EnvChange.
121    /// Per MS-TDS spec, this value must be included in ALL_HEADERS for subsequent
122    /// requests within an explicit transaction. 0 indicates auto-commit mode.
123    transaction_descriptor: u64,
124    /// Whether a request has been sent and the response has not yet been fully read.
125    /// Used by the connection pool to detect dirty connections after cancel/timeout.
126    in_flight: bool,
127    /// Whether this connection needs a reset on next use.
128    /// Set by connection pool on checkin, cleared after first query/execute.
129    /// When true, the RESETCONNECTION flag is set on the first TDS packet.
130    needs_reset: bool,
131    /// OpenTelemetry instrumentation context (when otel feature is enabled)
132    #[cfg(feature = "otel")]
133    instrumentation: InstrumentationContext,
134    /// Always Encrypted context for column decryption (when always-encrypted feature is enabled)
135    #[cfg(feature = "always-encrypted")]
136    pub(crate) encryption_context: Option<std::sync::Arc<crate::encryption::EncryptionContext>>,
137}
138
139/// Internal connection handle wrapping the actual connection.
140///
141/// This is an enum to support different connection types:
142/// - TLS (TDS 8.0 strict mode) - requires `tls` feature
143/// - TLS with PreLogin wrapping (TDS 7.x style) - requires `tls` feature
144/// - Plain TCP (for internal networks or when `tls` feature is disabled)
145#[allow(dead_code)] // Connection will be used once query execution is implemented
146enum ConnectionHandle {
147    /// TLS connection (TDS 8.0 strict mode - TLS before any TDS traffic)
148    #[cfg(feature = "tls")]
149    Tls(Connection<TlsStream<TcpStream>>),
150    /// TLS connection with PreLogin wrapping (TDS 7.x style)
151    #[cfg(feature = "tls")]
152    TlsPrelogin(Connection<TlsStream<mssql_tls::TlsPreloginWrapper<TcpStream>>>),
153    /// Plain TCP connection (for internal networks or when `tls` feature is disabled)
154    Plain(Connection<TcpStream>),
155}
156
157// Private helper methods available to all connection states
158impl<S: ConnectionState> Client<S> {
159    /// The default per-command deadline from `command_timeout`.
160    ///
161    /// Returns `None` when `command_timeout` is zero, which means "no limit"
162    /// (matching ADO.NET's `SqlCommand.CommandTimeout = 0`).
163    pub(crate) fn command_deadline(&self) -> Option<std::time::Duration> {
164        let t = self.config.command_timeout;
165        if t.is_zero() { None } else { Some(t) }
166    }
167
168    /// Build a cancel handle for the current connection, regardless of
169    /// connection state. The public, documented surface is
170    /// [`Client::<Ready>::cancel_handle`]; both state-specific methods
171    /// delegate here.
172    pub(crate) fn connection_cancel_handle(&self) -> crate::cancel::CancelHandle {
173        let connection = self
174            .connection
175            .as_ref()
176            .expect("connection should be present");
177        match connection {
178            #[cfg(feature = "tls")]
179            ConnectionHandle::Tls(conn) => {
180                crate::cancel::CancelHandle::from_tls(conn.cancel_handle())
181            }
182            #[cfg(feature = "tls")]
183            ConnectionHandle::TlsPrelogin(conn) => {
184                crate::cancel::CancelHandle::from_tls_prelogin(conn.cancel_handle())
185            }
186            ConnectionHandle::Plain(conn) => {
187                crate::cancel::CancelHandle::from_plain(conn.cancel_handle())
188            }
189        }
190    }
191
192    /// Process transaction-related EnvChange tokens.
193    ///
194    /// This handles BeginTransaction, CommitTransaction, and RollbackTransaction
195    /// EnvChange tokens, updating the transaction descriptor accordingly.
196    ///
197    /// This enables executing BEGIN TRANSACTION, COMMIT, and ROLLBACK via raw SQL
198    /// while still having the transaction descriptor tracked correctly.
199    fn process_transaction_env_change(env: &EnvChange, transaction_descriptor: &mut u64) {
200        use tds_protocol::token::EnvChangeValue;
201
202        match env.env_type {
203            EnvChangeType::BeginTransaction => {
204                if let EnvChangeValue::Binary(ref data) = env.new_value {
205                    if data.len() >= 8 {
206                        let descriptor = u64::from_le_bytes([
207                            data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
208                        ]);
209                        tracing::debug!(descriptor = descriptor, "transaction started via raw SQL");
210                        *transaction_descriptor = descriptor;
211                    }
212                }
213            }
214            EnvChangeType::CommitTransaction | EnvChangeType::RollbackTransaction => {
215                tracing::debug!(
216                    env_type = ?env.env_type,
217                    "transaction ended via raw SQL"
218                );
219                *transaction_descriptor = 0;
220            }
221            _ => {}
222        }
223    }
224
225    /// Send a SQL batch to the server.
226    ///
227    /// Uses the client's current transaction descriptor in ALL_HEADERS.
228    /// Per MS-TDS spec, when in an explicit transaction, the descriptor
229    /// returned by BeginTransaction must be included.
230    ///
231    /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
232    /// is included in the first packet to reset connection state.
233    async fn send_sql_batch(&mut self, sql: &str) -> Result<()> {
234        let payload =
235            tds_protocol::encode_sql_batch_with_transaction(sql, self.transaction_descriptor);
236        let max_packet = self.config.packet_size as usize;
237
238        // Check if we need to reset the connection on this request
239        let reset = self.needs_reset;
240        if reset {
241            self.needs_reset = false; // Clear flag before sending
242            tracing::debug!("sending SQL batch with RESETCONNECTION flag");
243        }
244
245        self.in_flight = true;
246        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
247
248        match connection {
249            #[cfg(feature = "tls")]
250            ConnectionHandle::Tls(conn) => {
251                conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
252                    .await?;
253            }
254            #[cfg(feature = "tls")]
255            ConnectionHandle::TlsPrelogin(conn) => {
256                conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
257                    .await?;
258            }
259            ConnectionHandle::Plain(conn) => {
260                conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
261                    .await?;
262            }
263        }
264
265        Ok(())
266    }
267
268    /// Send an RPC request to the server.
269    ///
270    /// Uses the client's current transaction descriptor in ALL_HEADERS.
271    ///
272    /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
273    /// is included in the first packet to reset connection state.
274    pub(crate) async fn send_rpc(&mut self, rpc: &RpcRequest) -> Result<()> {
275        let payload = rpc.encode_with_transaction(self.transaction_descriptor);
276        let max_packet = self.config.packet_size as usize;
277
278        // Check if we need to reset the connection on this request
279        let reset = self.needs_reset;
280        if reset {
281            self.needs_reset = false; // Clear flag before sending
282            tracing::debug!("sending RPC with RESETCONNECTION flag");
283        }
284
285        self.in_flight = true;
286        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
287
288        match connection {
289            #[cfg(feature = "tls")]
290            ConnectionHandle::Tls(conn) => {
291                conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
292                    .await?;
293            }
294            #[cfg(feature = "tls")]
295            ConnectionHandle::TlsPrelogin(conn) => {
296                conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
297                    .await?;
298            }
299            ConnectionHandle::Plain(conn) => {
300                conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
301                    .await?;
302            }
303        }
304
305        Ok(())
306    }
307
308    /// Start building a stored procedure call with full control over parameters.
309    ///
310    /// Returns a [`crate::procedure::ProcedureBuilder`] that allows adding named input and output
311    /// parameters before executing the call.
312    ///
313    /// The procedure name is validated to prevent SQL injection. It may be
314    /// schema-qualified (e.g., `"dbo.MyProc"`).
315    ///
316    /// # Example
317    ///
318    /// ```rust,no_run
319    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
320    /// let result = client.procedure("dbo.CalculateSum")?
321    ///     .input("@a", &10i32)
322    ///     .input("@b", &20i32)
323    ///     .output_int("@result")
324    ///     .execute().await?;
325    ///
326    /// let sum = result.get_output("@result").unwrap();
327    /// # let _ = sum;
328    /// # Ok(())
329    /// # }
330    /// ```
331    pub fn procedure(
332        &mut self,
333        proc_name: &str,
334    ) -> Result<crate::procedure::ProcedureBuilder<'_, S>> {
335        crate::validation::validate_qualified_identifier(proc_name)?;
336        Ok(crate::procedure::ProcedureBuilder::new(self, proc_name))
337    }
338
339    /// Execute a stored procedure with positional input parameters.
340    ///
341    /// This is a convenience method for the common case of calling a procedure
342    /// with input-only parameters. For output parameters or named parameters,
343    /// use [`procedure()`](Client::procedure) instead.
344    ///
345    /// # Example
346    ///
347    /// ```rust,no_run
348    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
349    /// let result = client.call_procedure("dbo.GetUser", &[&1i32]).await?;
350    /// assert_eq!(result.return_value, 0);
351    ///
352    /// if let Some(rs) = result.first_result_set() {
353    ///     println!("columns: {:?}", rs.columns());
354    /// }
355    /// # Ok(())
356    /// # }
357    /// ```
358    pub async fn call_procedure(
359        &mut self,
360        proc_name: &str,
361        params: &[&(dyn crate::ToSql + Sync)],
362    ) -> Result<crate::stream::ProcedureResult> {
363        crate::validation::validate_qualified_identifier(proc_name)?;
364
365        tracing::debug!(
366            proc_name = proc_name,
367            params_count = params.len(),
368            "executing stored procedure"
369        );
370
371        let rpc_params =
372            Self::convert_params_positional(params, self.send_unicode(), self.server_collation())?;
373        let mut rpc = RpcRequest::named(proc_name);
374        for param in rpc_params {
375            rpc = rpc.param(param);
376        }
377
378        let deadline = self.command_deadline();
379        let canceller = self.connection_cancel_handle();
380        run_with_deadline(
381            async {
382                self.send_rpc(&rpc).await?;
383                self.read_procedure_result().await
384            },
385            deadline,
386            canceller,
387        )
388        .await
389    }
390
391    /// Start a bulk insert operation for the specified table.
392    ///
393    /// Sends the `INSERT BULK` statement to the server and returns a
394    /// [`crate::bulk::BulkWriter`] for streaming rows. The writer holds
395    /// a mutable borrow on the client, preventing other operations while
396    /// the bulk insert is in progress.
397    ///
398    /// # Example
399    ///
400    /// ```rust,no_run
401    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
402    /// use mssql_client::{BulkInsertBuilder, BulkColumn, SqlValue};
403    ///
404    /// let builder = BulkInsertBuilder::new("dbo.Users")
405    ///     .with_typed_columns(vec![
406    ///         BulkColumn::new("id", "INT", 0)?,
407    ///         BulkColumn::new("name", "NVARCHAR(100)", 1)?,
408    ///     ]);
409    ///
410    /// let mut writer = client.bulk_insert(&builder).await?;
411    /// writer.send_row_values(&[SqlValue::Int(1), SqlValue::String("Alice".into())])?;
412    /// writer.send_row_values(&[SqlValue::Int(2), SqlValue::String("Bob".into())])?;
413    /// let result = writer.finish().await?;
414    /// println!("Inserted {} rows", result.rows_affected);
415    /// # Ok(())
416    /// # }
417    /// ```
418    pub async fn bulk_insert(
419        &mut self,
420        builder: &crate::bulk::BulkInsertBuilder,
421    ) -> Result<crate::bulk::BulkWriter<'_, S>> {
422        use tds_protocol::token::{ColMetaData, Token};
423
424        tracing::debug!(
425            table = builder.table_name(),
426            columns = builder.columns().len(),
427            "starting bulk insert"
428        );
429
430        // Step 1: Query the server for column metadata.
431        // This gives us the exact type encoding the server expects for BulkLoad,
432        // following the pattern established by Tiberius.
433        let meta_query = format!("SELECT TOP 0 * FROM {}", builder.table_name());
434        let deadline = self.command_deadline();
435        let canceller = self.connection_cancel_handle();
436        let message = run_with_deadline(
437            async {
438                self.send_sql_batch(&meta_query).await?;
439                self.read_response_message().await
440            },
441            deadline,
442            canceller,
443        )
444        .await?;
445        self.in_flight = false;
446
447        // Capture both the raw COLMETADATA bytes and parsed column info
448        let raw_payload = message.payload.clone();
449        let mut parser = self.create_parser(message.payload);
450        let mut server_metadata: Option<ColMetaData> = None;
451        let mut meta_start: usize = 0;
452        let mut meta_end: usize = 0;
453
454        loop {
455            let pos_before = raw_payload.len() - parser.remaining();
456            let token = parser.next_token_with_metadata(server_metadata.as_ref())?;
457            let pos_after = raw_payload.len() - parser.remaining();
458            let Some(token) = token else { break };
459
460            match token {
461                Token::ColMetaData(meta) => {
462                    meta_start = pos_before;
463                    meta_end = pos_after;
464                    server_metadata = Some(meta);
465                }
466                Token::Done(_) => break,
467                _ => {}
468            }
469        }
470
471        // Reject deprecated TEXT/NTEXT/IMAGE columns reported by the server.
472        // These types require a legacy TEXTPTR wire format that this driver
473        // does not support — users should migrate the column to VARCHAR(MAX) /
474        // NVARCHAR(MAX) / VARBINARY(MAX).
475        if let Some(ref meta) = server_metadata {
476            use tds_protocol::types::TypeId;
477            for col in meta.columns.iter() {
478                let (rejected, replacement) = match col.type_id {
479                    TypeId::Text => (Some("TEXT"), "VARCHAR(MAX)"),
480                    TypeId::NText => (Some("NTEXT"), "NVARCHAR(MAX)"),
481                    TypeId::Image => (Some("IMAGE"), "VARBINARY(MAX)"),
482                    _ => (None, ""),
483                };
484                if let Some(sql_type) = rejected {
485                    return Err(Error::from(mssql_types::TypeError::UnsupportedType {
486                        sql_type: sql_type.to_string(),
487                        reason: format!(
488                            "column `{}` in table `{}` is {} — TEXT/NTEXT/IMAGE \
489                             are not supported. Alter the column to {} instead \
490                             (Microsoft deprecated TEXT/NTEXT/IMAGE in SQL \
491                             Server 2005).",
492                            col.name,
493                            builder.table_name(),
494                            sql_type,
495                            replacement,
496                        ),
497                    }));
498                }
499            }
500        }
501
502        // Step 2: Send INSERT BULK statement to put server in bulk load mode
503        let stmt = builder.build_insert_bulk_statement()?;
504        let deadline = self.command_deadline();
505        let canceller = self.connection_cancel_handle();
506        run_with_deadline(
507            async {
508                self.send_sql_batch(&stmt).await?;
509                self.read_execute_result().await
510            },
511            deadline,
512            canceller,
513        )
514        .await?;
515
516        // Step 3: Create bulk writer with server's metadata
517        let raw_meta = if meta_end > meta_start {
518            Some(raw_payload.slice(meta_start..meta_end))
519        } else {
520            None
521        };
522
523        let server_cols = server_metadata.as_ref().map(|m| m.columns.as_slice());
524        let bulk = crate::bulk::BulkInsert::new_with_server_metadata(
525            builder.columns().to_vec(),
526            builder.options().batch_size,
527            raw_meta,
528            server_cols,
529        );
530
531        Ok(crate::bulk::BulkWriter::new(self, bulk))
532    }
533
534    /// Start a bulk insert without querying the server for column metadata.
535    ///
536    /// Unlike [`bulk_insert()`](Self::bulk_insert), this method does not send
537    /// `SELECT TOP 0 * FROM table` to discover column types. Instead, the
538    /// column metadata is constructed from the `BulkColumn` types provided
539    /// on the builder. This saves a round-trip when the schema is known.
540    ///
541    /// # Caveats
542    ///
543    /// The caller must ensure `BulkColumn` entries match the target table's
544    /// column definitions exactly. Mismatched types, lengths, precision/scale,
545    /// or column ordering will cause the server to reject the BulkLoad packet.
546    ///
547    /// For most use cases, prefer [`bulk_insert()`](Self::bulk_insert) — the
548    /// extra round-trip is usually negligible and the server-supplied metadata
549    /// is guaranteed correct.
550    pub async fn bulk_insert_without_schema_discovery(
551        &mut self,
552        builder: &crate::bulk::BulkInsertBuilder,
553    ) -> Result<crate::bulk::BulkWriter<'_, S>> {
554        tracing::debug!(
555            table = builder.table_name(),
556            columns = builder.columns().len(),
557            "starting bulk insert (no schema discovery)"
558        );
559
560        // Send INSERT BULK statement to put server in bulk load mode
561        let stmt = builder.build_insert_bulk_statement()?;
562        let deadline = self.command_deadline();
563        let canceller = self.connection_cancel_handle();
564        run_with_deadline(
565            async {
566                self.send_sql_batch(&stmt).await?;
567                self.read_execute_result().await
568            },
569            deadline,
570            canceller,
571        )
572        .await?;
573
574        // Create bulk writer with hand-crafted metadata
575        let bulk =
576            crate::bulk::BulkInsert::new(builder.columns().to_vec(), builder.options().batch_size);
577
578        Ok(crate::bulk::BulkWriter::new(self, bulk))
579    }
580
581    /// Send bulk load data as a BulkLoad (0x07) message and read the server response.
582    ///
583    /// Used internally by [`crate::bulk::BulkWriter::finish()`] to transmit accumulated
584    /// row data after the `INSERT BULK` statement has been acknowledged.
585    pub(crate) async fn send_and_read_bulk_load(&mut self, payload: bytes::Bytes) -> Result<u64> {
586        let max_packet = self.config.packet_size as usize;
587
588        self.in_flight = true;
589        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
590
591        match connection {
592            #[cfg(feature = "tls")]
593            ConnectionHandle::Tls(conn) => {
594                conn.send_message(PacketType::BulkLoad, payload, max_packet)
595                    .await?;
596            }
597            #[cfg(feature = "tls")]
598            ConnectionHandle::TlsPrelogin(conn) => {
599                conn.send_message(PacketType::BulkLoad, payload, max_packet)
600                    .await?;
601            }
602            ConnectionHandle::Plain(conn) => {
603                conn.send_message(PacketType::BulkLoad, payload, max_packet)
604                    .await?;
605            }
606        }
607
608        // Read the server's Done response with row count
609        self.read_execute_result().await
610    }
611
612    /// Execute a query with named parameters and return a streaming result set.
613    ///
614    /// This method accepts [`NamedParam`](crate::to_params::NamedParam) values,
615    /// making it compatible with the [`ToParams`](crate::to_params::ToParams) trait
616    /// and the `#[derive(ToParams)]` macro.
617    ///
618    /// # Example
619    ///
620    /// ```rust,no_run
621    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
622    /// use mssql_client::{NamedParam, ToParams};
623    ///
624    /// // With derive macro:
625    /// #[derive(mssql_derive::ToParams)]
626    /// struct UserQuery { name: String }
627    ///
628    /// let q = UserQuery { name: "Alice".into() };
629    /// let rows = client.query_named(
630    ///     "SELECT * FROM users WHERE name = @name",
631    ///     &q.to_params()?,
632    /// ).await?;
633    ///
634    /// // Or manually:
635    /// let params = vec![NamedParam::from_value("name", &"Alice")?];
636    /// let rows = client.query_named(
637    ///     "SELECT * FROM users WHERE name = @name",
638    ///     &params,
639    /// ).await?;
640    /// # let _ = rows;
641    /// # Ok(())
642    /// # }
643    /// ```
644    pub async fn query_named<'a>(
645        &'a mut self,
646        sql: &str,
647        params: &[crate::to_params::NamedParam],
648    ) -> Result<QueryStream<'a>> {
649        tracing::debug!(
650            sql = sql,
651            params_count = params.len(),
652            "executing query with named parameters"
653        );
654
655        if params.is_empty() {
656            self.send_sql_batch(sql).await?;
657        } else {
658            let rpc_params =
659                Self::convert_named_params(params, self.send_unicode(), self.server_collation())?;
660            let rpc = RpcRequest::execute_sql(sql, rpc_params);
661            self.send_rpc(&rpc).await?;
662        }
663
664        let resp = self.read_query_response().await?;
665        #[cfg(feature = "always-encrypted")]
666        {
667            Ok(QueryStream::from_raw(
668                resp.columns,
669                resp.pending_rows,
670                resp.meta,
671                resp.decryptor,
672            ))
673        }
674        #[cfg(not(feature = "always-encrypted"))]
675        {
676            Ok(QueryStream::from_raw(
677                resp.columns,
678                resp.pending_rows,
679                resp.meta,
680            ))
681        }
682    }
683
684    /// Execute a statement with named parameters.
685    ///
686    /// Returns the number of affected rows. This is the named-parameter
687    /// counterpart of [`execute()`](Client::execute), compatible with the
688    /// [`ToParams`](crate::to_params::ToParams) trait.
689    ///
690    /// # Example
691    ///
692    /// ```rust,no_run
693    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
694    /// use mssql_client::NamedParam;
695    ///
696    /// let params = vec![
697    ///     NamedParam::from_value("name", &"Alice")?,
698    ///     NamedParam::from_value("email", &"alice@example.com")?,
699    /// ];
700    /// let rows_affected = client.execute_named(
701    ///     "INSERT INTO users (name, email) VALUES (@name, @email)",
702    ///     &params,
703    /// ).await?;
704    /// # let _ = rows_affected;
705    /// # Ok(())
706    /// # }
707    /// ```
708    pub async fn execute_named(
709        &mut self,
710        sql: &str,
711        params: &[crate::to_params::NamedParam],
712    ) -> Result<u64> {
713        tracing::debug!(
714            sql = sql,
715            params_count = params.len(),
716            "executing statement with named parameters"
717        );
718
719        let deadline = self.command_deadline();
720        let canceller = self.connection_cancel_handle();
721        run_with_deadline(
722            async {
723                if params.is_empty() {
724                    self.send_sql_batch(sql).await?;
725                } else {
726                    let rpc_params = Self::convert_named_params(
727                        params,
728                        self.send_unicode(),
729                        self.server_collation(),
730                    )?;
731                    let rpc = RpcRequest::execute_sql(sql, rpc_params);
732                    self.send_rpc(&rpc).await?;
733                }
734
735                self.read_execute_result().await
736            },
737            deadline,
738            canceller,
739        )
740        .await
741    }
742
743    /// Whether string parameters are sent as NVARCHAR (Unicode).
744    pub(crate) fn send_unicode(&self) -> bool {
745        self.config.send_string_parameters_as_unicode
746    }
747
748    /// Server's default collation, captured from ENVCHANGE during login.
749    pub(crate) fn server_collation(&self) -> Option<&tds_protocol::token::Collation> {
750        self.server_collation.as_ref()
751    }
752}
753
754impl Client<Ready> {
755    /// Mark this connection as needing a reset on next use.
756    ///
757    /// Called by the connection pool when a connection is returned.
758    /// The next SQL batch or RPC will include the RESETCONNECTION flag
759    /// in the TDS packet header, causing SQL Server to reset connection
760    /// state (temp tables, SET options, transaction isolation level, etc.)
761    /// before executing the command.
762    ///
763    /// This is more efficient than calling `sp_reset_connection` as a
764    /// separate command because it's handled at the TDS protocol level.
765    pub fn mark_needs_reset(&mut self) {
766        self.needs_reset = true;
767    }
768
769    /// Check if this connection needs a reset.
770    ///
771    /// Returns true if `mark_needs_reset()` was called and the reset
772    /// hasn't been performed yet.
773    #[must_use]
774    pub fn needs_reset(&self) -> bool {
775        self.needs_reset
776    }
777
778    /// Execute a query and return a result set with lazy per-row decoding.
779    ///
780    /// Per ADR-007 the full response is buffered in memory and each row is
781    /// *decoded* on demand as you iterate — this is not incremental network
782    /// streaming, so peak memory tracks the response size. Use
783    /// `.collect_all()` if you want all rows materialized into a `Vec` up
784    /// front.
785    ///
786    /// # Example
787    ///
788    /// ```rust,no_run
789    /// # use mssql_client::Row;
790    /// # fn process(_: &Row) {}
791    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
792    /// // Streaming (synchronous iteration over the result set)
793    /// let stream = client.query("SELECT * FROM users WHERE id = @p1", &[&1]).await?;
794    /// for row in stream {
795    ///     let row = row?;
796    ///     process(&row);
797    /// }
798    ///
799    /// // Buffered (loads all into memory)
800    /// let rows: Vec<Row> = client
801    ///     .query("SELECT * FROM small_table", &[])
802    ///     .await?
803    ///     .collect_all()
804    ///     .await?;
805    /// # let _ = rows;
806    /// # Ok(())
807    /// # }
808    /// ```
809    pub async fn query<'a>(
810        &'a mut self,
811        sql: &str,
812        params: &[&(dyn crate::ToSql + Sync)],
813    ) -> Result<QueryStream<'a>> {
814        let deadline = self.command_deadline();
815        self.query_inner(sql, params, deadline).await
816    }
817
818    /// Shared query implementation with an explicit command deadline.
819    async fn query_inner<'a>(
820        &'a mut self,
821        sql: &str,
822        params: &[&(dyn crate::ToSql + Sync)],
823        deadline: Option<std::time::Duration>,
824    ) -> Result<QueryStream<'a>> {
825        tracing::debug!(sql = sql, params_count = params.len(), "executing query");
826
827        #[cfg(feature = "otel")]
828        let instrumentation = self.instrumentation.clone();
829        #[cfg(feature = "otel")]
830        let mut span = instrumentation.query_span(sql);
831        #[cfg(feature = "otel")]
832        let timer = crate::instrumentation::OperationTimer::start(
833            crate::instrumentation::extract_operation(sql),
834        );
835
836        let canceller = self.cancel_handle();
837        let result = run_with_deadline(
838            async {
839                if params.is_empty() {
840                    // Simple query without parameters - use SQL batch
841                    self.send_sql_batch(sql).await?;
842                } else {
843                    // Parameterized query - use sp_executesql via RPC
844                    let rpc_params =
845                        Self::convert_params(params, self.send_unicode(), self.server_collation())?;
846                    let rpc = RpcRequest::execute_sql(sql, rpc_params);
847                    self.send_rpc(&rpc).await?;
848                }
849
850                // Read complete response including columns and rows
851                self.read_query_response().await
852            },
853            deadline,
854            canceller,
855        )
856        .await;
857
858        #[cfg(feature = "otel")]
859        match &result {
860            Ok(_) => InstrumentationContext::record_success(&mut span, None),
861            Err(e) => InstrumentationContext::record_error(&mut span, e),
862        }
863        #[cfg(feature = "otel")]
864        timer.finish(instrumentation.metrics(), result.is_ok());
865
866        // Drop the span before returning
867        #[cfg(feature = "otel")]
868        drop(span);
869
870        let resp = result?;
871        #[cfg(feature = "always-encrypted")]
872        {
873            Ok(QueryStream::from_raw(
874                resp.columns,
875                resp.pending_rows,
876                resp.meta,
877                resp.decryptor,
878            ))
879        }
880        #[cfg(not(feature = "always-encrypted"))]
881        {
882            Ok(QueryStream::from_raw(
883                resp.columns,
884                resp.pending_rows,
885                resp.meta,
886            ))
887        }
888    }
889
890    /// Execute a query with a specific timeout.
891    ///
892    /// This overrides the default `command_timeout` from the connection configuration
893    /// for this specific query. If the query does not complete within the specified
894    /// duration, the driver sends an Attention packet to cancel it server-side,
895    /// drains the acknowledgement, and returns [`Error::CommandTimeout`] with the
896    /// connection left usable for the next request.
897    ///
898    /// # Arguments
899    ///
900    /// * `sql` - The SQL query to execute
901    /// * `params` - Query parameters
902    /// * `timeout_duration` - Maximum time to wait for the query to complete
903    ///
904    /// # Example
905    ///
906    /// ```rust,no_run
907    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
908    /// use std::time::Duration;
909    ///
910    /// // Execute with a 5-second timeout
911    /// let rows = client
912    ///     .query_with_timeout(
913    ///         "SELECT * FROM large_table",
914    ///         &[],
915    ///         Duration::from_secs(5),
916    ///     )
917    ///     .await?;
918    /// # let _ = rows;
919    /// # Ok(())
920    /// # }
921    /// ```
922    pub async fn query_with_timeout<'a>(
923        &'a mut self,
924        sql: &str,
925        params: &[&(dyn crate::ToSql + Sync)],
926        timeout_duration: std::time::Duration,
927    ) -> Result<QueryStream<'a>> {
928        self.query_inner(sql, params, Some(timeout_duration)).await
929    }
930
931    /// Execute a batch that may return multiple result sets.
932    ///
933    /// This is useful for stored procedures or SQL batches that contain
934    /// multiple SELECT statements.
935    ///
936    /// # Example
937    ///
938    /// ```rust,no_run
939    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
940    /// // Execute a batch with multiple SELECT statements
941    /// let mut results = client.query_multiple(
942    ///     "SELECT 1 AS a; SELECT 2 AS b, 3 AS c;",
943    ///     &[]
944    /// ).await?;
945    ///
946    /// // Process first result set
947    /// while let Some(row) = results.next_row().await? {
948    ///     println!("Result 1: {:?}", row);
949    /// }
950    ///
951    /// // Move to second result set
952    /// if results.next_result().await? {
953    ///     while let Some(row) = results.next_row().await? {
954    ///         println!("Result 2: {:?}", row);
955    ///     }
956    /// }
957    /// # Ok(())
958    /// # }
959    /// ```
960    pub async fn query_multiple<'a>(
961        &'a mut self,
962        sql: &str,
963        params: &[&(dyn crate::ToSql + Sync)],
964    ) -> Result<MultiResultStream<'a>> {
965        tracing::debug!(
966            sql = sql,
967            params_count = params.len(),
968            "executing multi-result query"
969        );
970
971        let deadline = self.command_deadline();
972        let canceller = self.connection_cancel_handle();
973        let result_sets = run_with_deadline(
974            async {
975                if params.is_empty() {
976                    // Simple batch without parameters - use SQL batch
977                    self.send_sql_batch(sql).await?;
978                } else {
979                    // Parameterized query - use sp_executesql via RPC
980                    let rpc_params =
981                        Self::convert_params(params, self.send_unicode(), self.server_collation())?;
982                    let rpc = RpcRequest::execute_sql(sql, rpc_params);
983                    self.send_rpc(&rpc).await?;
984                }
985
986                // Read all result sets
987                self.read_multi_result_response().await
988            },
989            deadline,
990            canceller,
991        )
992        .await?;
993        Ok(MultiResultStream::new(result_sets))
994    }
995
996    /// Execute a query that doesn't return rows.
997    ///
998    /// Returns the number of affected rows.
999    pub async fn execute(
1000        &mut self,
1001        sql: &str,
1002        params: &[&(dyn crate::ToSql + Sync)],
1003    ) -> Result<u64> {
1004        let deadline = self.command_deadline();
1005        self.execute_inner(sql, params, deadline).await
1006    }
1007
1008    /// Shared execute implementation with an explicit command deadline.
1009    async fn execute_inner(
1010        &mut self,
1011        sql: &str,
1012        params: &[&(dyn crate::ToSql + Sync)],
1013        deadline: Option<std::time::Duration>,
1014    ) -> Result<u64> {
1015        tracing::debug!(
1016            sql = sql,
1017            params_count = params.len(),
1018            "executing statement"
1019        );
1020
1021        #[cfg(feature = "otel")]
1022        let instrumentation = self.instrumentation.clone();
1023        #[cfg(feature = "otel")]
1024        let mut span = instrumentation.query_span(sql);
1025        #[cfg(feature = "otel")]
1026        let timer = crate::instrumentation::OperationTimer::start(
1027            crate::instrumentation::extract_operation(sql),
1028        );
1029
1030        let canceller = self.cancel_handle();
1031        let result = run_with_deadline(
1032            async {
1033                if params.is_empty() {
1034                    // Simple statement without parameters - use SQL batch
1035                    self.send_sql_batch(sql).await?;
1036                } else {
1037                    // Parameterized statement - use sp_executesql via RPC
1038                    let rpc_params =
1039                        Self::convert_params(params, self.send_unicode(), self.server_collation())?;
1040                    let rpc = RpcRequest::execute_sql(sql, rpc_params);
1041                    self.send_rpc(&rpc).await?;
1042                }
1043
1044                // Read response and get row count
1045                self.read_execute_result().await
1046            },
1047            deadline,
1048            canceller,
1049        )
1050        .await;
1051
1052        #[cfg(feature = "otel")]
1053        match &result {
1054            Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
1055            Err(e) => InstrumentationContext::record_error(&mut span, e),
1056        }
1057        #[cfg(feature = "otel")]
1058        timer.finish(instrumentation.metrics(), result.is_ok());
1059
1060        // Drop the span before returning
1061        #[cfg(feature = "otel")]
1062        drop(span);
1063
1064        result
1065    }
1066
1067    /// Execute a statement with a specific timeout.
1068    ///
1069    /// This overrides the default `command_timeout` from the connection configuration
1070    /// for this specific statement. If the statement does not complete within the
1071    /// specified duration, the driver sends an Attention packet to cancel it
1072    /// server-side, drains the acknowledgement, and returns
1073    /// [`Error::CommandTimeout`] with the connection left usable.
1074    ///
1075    /// # Arguments
1076    ///
1077    /// * `sql` - The SQL statement to execute
1078    /// * `params` - Statement parameters
1079    /// * `timeout_duration` - Maximum time to wait for the statement to complete
1080    ///
1081    /// # Example
1082    ///
1083    /// ```rust,no_run
1084    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1085    /// use std::time::Duration;
1086    ///
1087    /// // Execute with a 10-second timeout
1088    /// let rows_affected = client
1089    ///     .execute_with_timeout(
1090    ///         "UPDATE large_table SET status = @p1",
1091    ///         &[&"processed"],
1092    ///         Duration::from_secs(10),
1093    ///     )
1094    ///     .await?;
1095    /// # let _ = rows_affected;
1096    /// # Ok(())
1097    /// # }
1098    /// ```
1099    pub async fn execute_with_timeout(
1100        &mut self,
1101        sql: &str,
1102        params: &[&(dyn crate::ToSql + Sync)],
1103        timeout_duration: std::time::Duration,
1104    ) -> Result<u64> {
1105        self.execute_inner(sql, params, Some(timeout_duration))
1106            .await
1107    }
1108
1109    /// Begin a transaction.
1110    ///
1111    /// This transitions the client from `Ready` to `InTransaction` state.
1112    /// Per MS-TDS spec, the server returns a transaction descriptor in the
1113    /// BeginTransaction EnvChange token that must be included in subsequent
1114    /// ALL_HEADERS sections.
1115    pub async fn begin_transaction(mut self) -> Result<Client<InTransaction>> {
1116        tracing::debug!("beginning transaction");
1117
1118        #[cfg(feature = "otel")]
1119        let instrumentation = self.instrumentation.clone();
1120        #[cfg(feature = "otel")]
1121        let mut span = instrumentation.transaction_span("BEGIN");
1122
1123        // Execute BEGIN TRANSACTION and extract the transaction descriptor
1124        let result = async {
1125            self.send_sql_batch("BEGIN TRANSACTION").await?;
1126            self.read_transaction_begin_result().await
1127        }
1128        .await;
1129
1130        #[cfg(feature = "otel")]
1131        match &result {
1132            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1133            Err(e) => InstrumentationContext::record_error(&mut span, e),
1134        }
1135
1136        // Drop the span before moving instrumentation
1137        #[cfg(feature = "otel")]
1138        drop(span);
1139
1140        let transaction_descriptor = result?;
1141
1142        Ok(Client {
1143            config: self.config,
1144            _state: PhantomData,
1145            connection: self.connection,
1146            server_version: self.server_version,
1147            current_database: self.current_database,
1148            server_collation: self.server_collation,
1149            statement_cache: self.statement_cache,
1150            transaction_descriptor, // Store the descriptor from server
1151            needs_reset: self.needs_reset,
1152            in_flight: self.in_flight,
1153            #[cfg(feature = "otel")]
1154            instrumentation: self.instrumentation,
1155            #[cfg(feature = "always-encrypted")]
1156            encryption_context: self.encryption_context,
1157        })
1158    }
1159
1160    /// Begin a transaction with a specific isolation level.
1161    ///
1162    /// This transitions the client from `Ready` to `InTransaction` state
1163    /// with the specified isolation level.
1164    ///
1165    /// # Example
1166    ///
1167    /// ```rust,no_run
1168    /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1169    /// use mssql_client::IsolationLevel;
1170    ///
1171    /// let tx = client.begin_transaction_with_isolation(IsolationLevel::Serializable).await?;
1172    /// // All operations in this transaction use SERIALIZABLE isolation
1173    /// tx.commit().await?;
1174    /// # Ok(())
1175    /// # }
1176    /// ```
1177    pub async fn begin_transaction_with_isolation(
1178        mut self,
1179        isolation_level: crate::transaction::IsolationLevel,
1180    ) -> Result<Client<InTransaction>> {
1181        tracing::debug!(
1182            isolation_level = %isolation_level.name(),
1183            "beginning transaction with isolation level"
1184        );
1185
1186        #[cfg(feature = "otel")]
1187        let instrumentation = self.instrumentation.clone();
1188        #[cfg(feature = "otel")]
1189        let mut span = instrumentation.transaction_span("BEGIN");
1190
1191        // First set the isolation level
1192        let result = async {
1193            self.send_sql_batch(isolation_level.as_sql()).await?;
1194            self.read_execute_result().await?;
1195
1196            // Then begin the transaction
1197            self.send_sql_batch("BEGIN TRANSACTION").await?;
1198            self.read_transaction_begin_result().await
1199        }
1200        .await;
1201
1202        #[cfg(feature = "otel")]
1203        match &result {
1204            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1205            Err(e) => InstrumentationContext::record_error(&mut span, e),
1206        }
1207
1208        #[cfg(feature = "otel")]
1209        drop(span);
1210
1211        let transaction_descriptor = result?;
1212
1213        Ok(Client {
1214            config: self.config,
1215            _state: PhantomData,
1216            connection: self.connection,
1217            server_version: self.server_version,
1218            current_database: self.current_database,
1219            server_collation: self.server_collation,
1220            statement_cache: self.statement_cache,
1221            transaction_descriptor,
1222            needs_reset: self.needs_reset,
1223            in_flight: self.in_flight,
1224            #[cfg(feature = "otel")]
1225            instrumentation: self.instrumentation,
1226            #[cfg(feature = "always-encrypted")]
1227            encryption_context: self.encryption_context,
1228        })
1229    }
1230
1231    /// Execute a simple query without parameters.
1232    ///
1233    /// This is useful for DDL statements and simple queries where you
1234    /// don't need to retrieve the affected row count.
1235    pub async fn simple_query(&mut self, sql: &str) -> Result<()> {
1236        tracing::debug!(sql = sql, "executing simple query");
1237
1238        // Send SQL batch
1239        self.send_sql_batch(sql).await?;
1240
1241        // Read and discard response
1242        let _ = self.read_execute_result().await?;
1243
1244        Ok(())
1245    }
1246
1247    /// Close the connection gracefully.
1248    pub async fn close(self) -> Result<()> {
1249        tracing::debug!("closing connection");
1250        Ok(())
1251    }
1252
1253    /// Get the current database name.
1254    #[must_use]
1255    pub fn database(&self) -> Option<&str> {
1256        self.config.database.as_deref()
1257    }
1258
1259    /// Get the server host.
1260    #[must_use]
1261    pub fn host(&self) -> &str {
1262        &self.config.host
1263    }
1264
1265    /// Get the server port.
1266    #[must_use]
1267    pub fn port(&self) -> u16 {
1268        self.config.port
1269    }
1270
1271    /// Check if the connection is currently in a transaction.
1272    ///
1273    /// This returns `true` if a transaction was started via raw SQL
1274    /// (`BEGIN TRANSACTION`) and has not yet been committed or rolled back.
1275    ///
1276    /// Note: This only tracks transactions started via raw SQL. Transactions
1277    /// started via the type-state API (`begin_transaction()`) result in a
1278    /// `Client<InTransaction>` which is a different type.
1279    ///
1280    /// # Example
1281    ///
1282    /// ```rust,no_run
1283    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1284    /// client.execute("BEGIN TRANSACTION", &[]).await?;
1285    /// assert!(client.is_in_transaction());
1286    ///
1287    /// client.execute("COMMIT", &[]).await?;
1288    /// assert!(!client.is_in_transaction());
1289    /// # Ok(())
1290    /// # }
1291    /// ```
1292    #[must_use]
1293    pub fn is_in_transaction(&self) -> bool {
1294        self.transaction_descriptor != 0
1295    }
1296
1297    /// Check if a request is in-flight (sent but response not fully read).
1298    ///
1299    /// Used by the connection pool to detect dirty connections that were
1300    /// interrupted mid-query (e.g., by `tokio::select!` or a timeout).
1301    /// A connection with an in-flight request has unread data in the TCP
1302    /// buffer and must be discarded rather than returned to the pool.
1303    #[must_use]
1304    pub fn is_in_flight(&self) -> bool {
1305        self.in_flight
1306    }
1307
1308    /// Report whether an Always Encrypted key-store provider with the given
1309    /// name is currently reachable through this client's encryption context.
1310    ///
1311    /// Returns `false` when the `always-encrypted` feature isn't enabled, when
1312    /// the connection was opened without `column_encryption` configured, or
1313    /// when no matching provider was registered.
1314    #[cfg(feature = "always-encrypted")]
1315    #[must_use]
1316    pub fn has_encryption_provider(&self, name: &str) -> bool {
1317        self.encryption_context
1318            .as_ref()
1319            .is_some_and(|ctx| ctx.has_provider(name))
1320    }
1321
1322    /// Get a handle for cancelling the current query.
1323    ///
1324    /// The cancel handle can be cloned and sent to other tasks, enabling
1325    /// cancellation of long-running queries from a separate async context.
1326    ///
1327    /// # Example
1328    ///
1329    /// ```rust,no_run
1330    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1331    /// use std::time::Duration;
1332    ///
1333    /// let cancel_handle = client.cancel_handle();
1334    ///
1335    /// // Spawn a task to cancel after 10 seconds
1336    /// let handle = tokio::spawn(async move {
1337    ///     tokio::time::sleep(Duration::from_secs(10)).await;
1338    ///     let _ = cancel_handle.cancel().await;
1339    /// });
1340    ///
1341    /// // This query will be cancelled if it runs longer than 10 seconds
1342    /// let result = client.query("SELECT * FROM very_large_table", &[]).await;
1343    /// # let _ = (handle, result);
1344    /// # Ok(())
1345    /// # }
1346    /// ```
1347    #[must_use]
1348    pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
1349        self.connection_cancel_handle()
1350    }
1351}
1352
1353/// # Drop Behavior
1354///
1355/// **`Client<InTransaction>` has no automatic rollback on drop.** If the client is
1356/// dropped without calling [`commit()`](Client::commit) or [`rollback()`](Client::rollback),
1357/// the transaction remains open on the server until the TCP connection closes
1358/// (at which point SQL Server automatically rolls back).
1359///
1360/// This is because `Drop` is synchronous and cannot perform the async I/O needed
1361/// to send a `ROLLBACK TRANSACTION` command.
1362///
1363/// ## Consequences of dropping without commit/rollback
1364///
1365/// - **Direct connections:** The transaction leaks until the OS TCP timeout
1366///   (potentially 30+ minutes), holding locks on any modified rows.
1367/// - **Pooled connections:** The pool detects the active transaction descriptor
1368///   and discards the connection rather than returning it to the idle pool
1369///   (see `PooledConnection::drop` in `mssql-driver-pool`).
1370///
1371/// ## Best practice
1372///
1373/// Always ensure `commit()` or `rollback()` is called. Use helper patterns
1374/// for error paths:
1375///
1376/// ```rust,no_run
1377/// # async fn do_work(_: &mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> { Ok(()) }
1378/// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1379/// let tx = client.begin_transaction().await?;
1380/// match do_work(&tx).await {
1381///     Ok(_) => { tx.commit().await?; }
1382///     Err(e) => { tx.rollback().await?; return Err(e); }
1383/// }
1384/// # Ok(())
1385/// # }
1386/// ```
1387impl Client<InTransaction> {
1388    /// Execute a query within the transaction and return a streaming result set.
1389    ///
1390    /// See [`Client<Ready>::query`] for usage examples.
1391    pub async fn query<'a>(
1392        &'a mut self,
1393        sql: &str,
1394        params: &[&(dyn crate::ToSql + Sync)],
1395    ) -> Result<QueryStream<'a>> {
1396        let deadline = self.command_deadline();
1397        self.query_inner(sql, params, deadline).await
1398    }
1399
1400    /// Shared query implementation with an explicit command deadline.
1401    async fn query_inner<'a>(
1402        &'a mut self,
1403        sql: &str,
1404        params: &[&(dyn crate::ToSql + Sync)],
1405        deadline: Option<std::time::Duration>,
1406    ) -> Result<QueryStream<'a>> {
1407        tracing::debug!(
1408            sql = sql,
1409            params_count = params.len(),
1410            "executing query in transaction"
1411        );
1412
1413        #[cfg(feature = "otel")]
1414        let instrumentation = self.instrumentation.clone();
1415        #[cfg(feature = "otel")]
1416        let mut span = instrumentation.query_span(sql);
1417        #[cfg(feature = "otel")]
1418        let timer = crate::instrumentation::OperationTimer::start(
1419            crate::instrumentation::extract_operation(sql),
1420        );
1421
1422        let canceller = self.cancel_handle();
1423        let result = run_with_deadline(
1424            async {
1425                if params.is_empty() {
1426                    // Simple query without parameters - use SQL batch
1427                    self.send_sql_batch(sql).await?;
1428                } else {
1429                    // Parameterized query - use sp_executesql via RPC
1430                    let rpc_params =
1431                        Self::convert_params(params, self.send_unicode(), self.server_collation())?;
1432                    let rpc = RpcRequest::execute_sql(sql, rpc_params);
1433                    self.send_rpc(&rpc).await?;
1434                }
1435
1436                // Read complete response including columns and rows
1437                self.read_query_response().await
1438            },
1439            deadline,
1440            canceller,
1441        )
1442        .await;
1443
1444        #[cfg(feature = "otel")]
1445        match &result {
1446            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1447            Err(e) => InstrumentationContext::record_error(&mut span, e),
1448        }
1449        #[cfg(feature = "otel")]
1450        timer.finish(instrumentation.metrics(), result.is_ok());
1451
1452        // Drop the span before returning
1453        #[cfg(feature = "otel")]
1454        drop(span);
1455
1456        let resp = result?;
1457        #[cfg(feature = "always-encrypted")]
1458        {
1459            Ok(QueryStream::from_raw(
1460                resp.columns,
1461                resp.pending_rows,
1462                resp.meta,
1463                resp.decryptor,
1464            ))
1465        }
1466        #[cfg(not(feature = "always-encrypted"))]
1467        {
1468            Ok(QueryStream::from_raw(
1469                resp.columns,
1470                resp.pending_rows,
1471                resp.meta,
1472            ))
1473        }
1474    }
1475
1476    /// Execute a statement within the transaction.
1477    ///
1478    /// Returns the number of affected rows.
1479    pub async fn execute(
1480        &mut self,
1481        sql: &str,
1482        params: &[&(dyn crate::ToSql + Sync)],
1483    ) -> Result<u64> {
1484        let deadline = self.command_deadline();
1485        self.execute_inner(sql, params, deadline).await
1486    }
1487
1488    /// Shared execute implementation with an explicit command deadline.
1489    async fn execute_inner(
1490        &mut self,
1491        sql: &str,
1492        params: &[&(dyn crate::ToSql + Sync)],
1493        deadline: Option<std::time::Duration>,
1494    ) -> Result<u64> {
1495        tracing::debug!(
1496            sql = sql,
1497            params_count = params.len(),
1498            "executing statement in transaction"
1499        );
1500
1501        #[cfg(feature = "otel")]
1502        let instrumentation = self.instrumentation.clone();
1503        #[cfg(feature = "otel")]
1504        let mut span = instrumentation.query_span(sql);
1505        #[cfg(feature = "otel")]
1506        let timer = crate::instrumentation::OperationTimer::start(
1507            crate::instrumentation::extract_operation(sql),
1508        );
1509
1510        let canceller = self.cancel_handle();
1511        let result = run_with_deadline(
1512            async {
1513                if params.is_empty() {
1514                    // Simple statement without parameters - use SQL batch
1515                    self.send_sql_batch(sql).await?;
1516                } else {
1517                    // Parameterized statement - use sp_executesql via RPC
1518                    let rpc_params =
1519                        Self::convert_params(params, self.send_unicode(), self.server_collation())?;
1520                    let rpc = RpcRequest::execute_sql(sql, rpc_params);
1521                    self.send_rpc(&rpc).await?;
1522                }
1523
1524                // Read response and get row count
1525                self.read_execute_result().await
1526            },
1527            deadline,
1528            canceller,
1529        )
1530        .await;
1531
1532        #[cfg(feature = "otel")]
1533        match &result {
1534            Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
1535            Err(e) => InstrumentationContext::record_error(&mut span, e),
1536        }
1537        #[cfg(feature = "otel")]
1538        timer.finish(instrumentation.metrics(), result.is_ok());
1539
1540        // Drop the span before returning
1541        #[cfg(feature = "otel")]
1542        drop(span);
1543
1544        result
1545    }
1546
1547    /// Execute a query within the transaction with a specific timeout.
1548    ///
1549    /// See [`Client<Ready>::query_with_timeout`] for details.
1550    pub async fn query_with_timeout<'a>(
1551        &'a mut self,
1552        sql: &str,
1553        params: &[&(dyn crate::ToSql + Sync)],
1554        timeout_duration: std::time::Duration,
1555    ) -> Result<QueryStream<'a>> {
1556        self.query_inner(sql, params, Some(timeout_duration)).await
1557    }
1558
1559    /// Execute a statement within the transaction with a specific timeout.
1560    ///
1561    /// See [`Client<Ready>::execute_with_timeout`] for details.
1562    pub async fn execute_with_timeout(
1563        &mut self,
1564        sql: &str,
1565        params: &[&(dyn crate::ToSql + Sync)],
1566        timeout_duration: std::time::Duration,
1567    ) -> Result<u64> {
1568        self.execute_inner(sql, params, Some(timeout_duration))
1569            .await
1570    }
1571
1572    /// Open a FILESTREAM BLOB for async reading and/or writing.
1573    ///
1574    /// This method queries the server for the transaction context, then opens
1575    /// the FILESTREAM handle using the native Win32 `OpenSqlFilestream` API.
1576    ///
1577    /// # Arguments
1578    ///
1579    /// * `path` — The UNC path obtained from the T-SQL `column.PathName()` function.
1580    ///   Query this yourself before calling `open_filestream`:
1581    ///   ```sql
1582    ///   SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1
1583    ///   ```
1584    /// * `access` — Read, write, or read/write access mode.
1585    ///
1586    /// # Requirements
1587    ///
1588    /// - SQL Server must have FILESTREAM enabled (`sp_configure 'filestream access level', 2`)
1589    /// - The Microsoft OLE DB Driver for SQL Server must be installed on the client
1590    /// - The `FileStream` must be dropped before calling [`commit`] or [`rollback`]
1591    ///
1592    /// # Example
1593    ///
1594    /// ```text
1595    /// use mssql_client::FileStreamAccess;
1596    /// use tokio::io::AsyncReadExt;
1597    ///
1598    /// let mut tx = client.begin_transaction().await?;
1599    ///
1600    /// // Get the FILESTREAM path
1601    /// let rows = tx.query(
1602    ///     "SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1",
1603    ///     &[&doc_id],
1604    /// ).await?;
1605    /// let path: String = rows.into_iter().next().unwrap()?.get(0)?;
1606    ///
1607    /// // Open and read the BLOB
1608    /// let mut stream = tx.open_filestream(&path, FileStreamAccess::Read).await?;
1609    /// let mut data = Vec::new();
1610    /// stream.read_to_end(&mut data).await?;
1611    /// drop(stream);
1612    ///
1613    /// tx.commit().await?;
1614    /// ```
1615    #[cfg(all(windows, feature = "filestream"))]
1616    pub async fn open_filestream(
1617        &mut self,
1618        path: &str,
1619        access: crate::filestream::FileStreamAccess,
1620    ) -> Result<crate::filestream::FileStream> {
1621        tracing::debug!(path = path, ?access, "opening FILESTREAM BLOB");
1622
1623        // Get the transaction context from SQL Server.
1624        // This binds the file access to the current SQL transaction.
1625        let txn_context: Vec<u8> = {
1626            let rows = self
1627                .query("SELECT GET_FILESTREAM_TRANSACTION_CONTEXT()", &[])
1628                .await?;
1629            let mut ctx = None;
1630            for result in rows {
1631                let row = result?;
1632                ctx = Some(row.get::<Vec<u8>>(0)?);
1633            }
1634            ctx.ok_or_else(|| {
1635                Error::FileStream("GET_FILESTREAM_TRANSACTION_CONTEXT() returned no rows".into())
1636            })?
1637        };
1638
1639        crate::filestream::FileStream::open(path, access, &txn_context)
1640    }
1641
1642    /// Commit the transaction.
1643    ///
1644    /// This transitions the client back to `Ready` state.
1645    pub async fn commit(mut self) -> Result<Client<Ready>> {
1646        tracing::debug!("committing transaction");
1647
1648        #[cfg(feature = "otel")]
1649        let instrumentation = self.instrumentation.clone();
1650        #[cfg(feature = "otel")]
1651        let mut span = instrumentation.transaction_span("COMMIT");
1652
1653        // Execute COMMIT TRANSACTION
1654        let result = async {
1655            self.send_sql_batch("COMMIT TRANSACTION").await?;
1656            self.read_execute_result().await
1657        }
1658        .await;
1659
1660        #[cfg(feature = "otel")]
1661        match &result {
1662            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1663            Err(e) => InstrumentationContext::record_error(&mut span, e),
1664        }
1665
1666        // Drop the span before moving instrumentation
1667        #[cfg(feature = "otel")]
1668        drop(span);
1669
1670        result?;
1671
1672        Ok(Client {
1673            config: self.config,
1674            _state: PhantomData,
1675            connection: self.connection,
1676            server_version: self.server_version,
1677            current_database: self.current_database,
1678            server_collation: self.server_collation,
1679            statement_cache: self.statement_cache,
1680            transaction_descriptor: 0, // Reset to auto-commit mode
1681            needs_reset: self.needs_reset,
1682            in_flight: self.in_flight,
1683            #[cfg(feature = "otel")]
1684            instrumentation: self.instrumentation,
1685            #[cfg(feature = "always-encrypted")]
1686            encryption_context: self.encryption_context,
1687        })
1688    }
1689
1690    /// Rollback the transaction.
1691    ///
1692    /// This transitions the client back to `Ready` state.
1693    pub async fn rollback(mut self) -> Result<Client<Ready>> {
1694        tracing::debug!("rolling back transaction");
1695
1696        #[cfg(feature = "otel")]
1697        let instrumentation = self.instrumentation.clone();
1698        #[cfg(feature = "otel")]
1699        let mut span = instrumentation.transaction_span("ROLLBACK");
1700
1701        // Execute ROLLBACK TRANSACTION
1702        let result = async {
1703            self.send_sql_batch("ROLLBACK TRANSACTION").await?;
1704            self.read_execute_result().await
1705        }
1706        .await;
1707
1708        #[cfg(feature = "otel")]
1709        match &result {
1710            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1711            Err(e) => InstrumentationContext::record_error(&mut span, e),
1712        }
1713
1714        // Drop the span before moving instrumentation
1715        #[cfg(feature = "otel")]
1716        drop(span);
1717
1718        result?;
1719
1720        Ok(Client {
1721            config: self.config,
1722            _state: PhantomData,
1723            connection: self.connection,
1724            server_version: self.server_version,
1725            current_database: self.current_database,
1726            server_collation: self.server_collation,
1727            statement_cache: self.statement_cache,
1728            transaction_descriptor: 0, // Reset to auto-commit mode
1729            needs_reset: self.needs_reset,
1730            in_flight: self.in_flight,
1731            #[cfg(feature = "otel")]
1732            instrumentation: self.instrumentation,
1733            #[cfg(feature = "always-encrypted")]
1734            encryption_context: self.encryption_context,
1735        })
1736    }
1737
1738    /// Create a savepoint and return a handle for later rollback.
1739    ///
1740    /// The returned `SavePoint` handle contains the validated savepoint name.
1741    /// Use it with `rollback_to()` to partially undo transaction work.
1742    ///
1743    /// # Example
1744    ///
1745    /// ```rust,no_run
1746    /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1747    /// let mut tx = client.begin_transaction().await?;
1748    /// tx.execute("INSERT INTO orders ...", &[]).await?;
1749    /// let sp = tx.save_point("before_items").await?;
1750    /// tx.execute("INSERT INTO items ...", &[]).await?;
1751    /// // Oops, rollback just the items
1752    /// tx.rollback_to(&sp).await?;
1753    /// tx.commit().await?;
1754    /// # Ok(())
1755    /// # }
1756    /// ```
1757    pub async fn save_point(&mut self, name: &str) -> Result<SavePoint> {
1758        crate::validation::validate_identifier(name)?;
1759        tracing::debug!(name = name, "creating savepoint");
1760
1761        // Execute SAVE TRANSACTION <name>
1762        // Note: name is validated by validate_identifier() to prevent SQL injection
1763        let sql = format!("SAVE TRANSACTION {name}");
1764        self.send_sql_batch(&sql).await?;
1765        self.read_execute_result().await?;
1766
1767        Ok(SavePoint::new(name.to_string()))
1768    }
1769
1770    /// Rollback to a savepoint.
1771    ///
1772    /// This rolls back all changes made after the savepoint was created,
1773    /// but keeps the transaction active. The savepoint remains valid and
1774    /// can be rolled back to again.
1775    ///
1776    /// # Example
1777    ///
1778    /// ```rust,no_run
1779    /// # async fn ex(mut tx: mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> {
1780    /// let sp = tx.save_point("checkpoint").await?;
1781    /// // ... do some work ...
1782    /// tx.rollback_to(&sp).await?;  // Undo changes since checkpoint
1783    /// // Transaction is still active, savepoint is still valid
1784    /// # Ok(())
1785    /// # }
1786    /// ```
1787    pub async fn rollback_to(&mut self, savepoint: &SavePoint) -> Result<()> {
1788        tracing::debug!(name = savepoint.name(), "rolling back to savepoint");
1789
1790        // Execute ROLLBACK TRANSACTION <name>
1791        // Note: savepoint name was validated during creation
1792        let sql = format!("ROLLBACK TRANSACTION {}", savepoint.name());
1793        self.send_sql_batch(&sql).await?;
1794        self.read_execute_result().await?;
1795
1796        Ok(())
1797    }
1798
1799    /// Release a savepoint (optional cleanup).
1800    ///
1801    /// Note: SQL Server doesn't have explicit savepoint release, but this
1802    /// method is provided for API completeness. The savepoint is automatically
1803    /// released when the transaction commits or rolls back.
1804    pub async fn release_savepoint(&mut self, savepoint: SavePoint) -> Result<()> {
1805        tracing::debug!(name = savepoint.name(), "releasing savepoint");
1806
1807        // SQL Server doesn't require explicit savepoint release
1808        // The savepoint is implicitly released on commit/rollback
1809        // This method exists for API completeness
1810        drop(savepoint);
1811        Ok(())
1812    }
1813
1814    /// Get a handle for cancelling the current query within the transaction.
1815    ///
1816    /// See [`Client<Ready>::cancel_handle`] for usage examples.
1817    #[must_use]
1818    pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
1819        self.connection_cancel_handle()
1820    }
1821}
1822
1823impl<S: ConnectionState> std::fmt::Debug for Client<S> {
1824    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1825        f.debug_struct("Client")
1826            .field("host", &self.config.host)
1827            .field("port", &self.config.port)
1828            .field("database", &self.config.database)
1829            .finish()
1830    }
1831}