Skip to main content

mssql_client/
client.rs

1//! SQL Server client implementation.
2//!
3//! ## DDL and statement routing
4//!
5//! [`Client::execute`] routes automatically by parameter count: with no
6//! parameters it sends a SQL batch (which permits DDL such as `CREATE` / `ALTER`
7//! / `DROP`); with parameters it uses `sp_executesql`, whose procedure context
8//! SQL Server forbids DDL in. Run DDL with an empty parameter slice:
9//!
10//! ```rust,no_run
11//! # async fn create_table(config: mssql_client::Config) -> Result<(), mssql_client::Error> {
12//! # let mut client = mssql_client::Client::connect(config).await?;
13//! client.execute("CREATE TABLE dbo.t (id INT)", &[]).await?;
14//! # Ok(())
15//! # }
16//! ```
17//!
18//! Use [`Client::simple_query`] for fire-and-forget batches (including
19//! multi-statement, `;`-separated DDL) when you don't need the affected-row count.
20
21// Allow unwrap/expect for chrono date construction with known-valid constant dates
22// and for regex patterns that are compile-time constants
23#![allow(clippy::unwrap_used, clippy::expect_used, clippy::needless_range_loop)]
24
25mod connect;
26mod params;
27mod response;
28
29use std::marker::PhantomData;
30
31use mssql_codec::connection::Connection;
32#[cfg(feature = "tls")]
33use mssql_tls::TlsStream;
34use tds_protocol::packet::PacketType;
35use tds_protocol::rpc::RpcRequest;
36use tds_protocol::token::{EnvChange, EnvChangeType};
37use tokio::net::TcpStream;
38
39use crate::config::Config;
40use crate::error::{Error, Result};
41#[cfg(feature = "otel")]
42use crate::instrumentation::InstrumentationContext;
43use crate::state::{ConnectionState, InTransaction, Ready};
44use crate::statement_cache::StatementCache;
45use crate::stream::{MultiResultStream, QueryStream};
46use crate::transaction::SavePoint;
47
48/// Run a network future under an optional command deadline.
49///
50/// On timeout this sends an Attention packet via `canceller` and then awaits
51/// the future so its own read loop drains the server's DONE_ATTN
52/// acknowledgement, leaving the connection clean before returning
53/// [`Error::CommandTimeout`]. This is the cancel-safe alternative to dropping
54/// the future (e.g. via `tokio::time::timeout`), which would leave unconsumed
55/// TDS data in the connection buffer and desync the next request.
56async fn run_with_deadline<F, T>(
57    fut: F,
58    deadline: Option<std::time::Duration>,
59    canceller: crate::cancel::CancelHandle,
60) -> Result<T>
61where
62    F: std::future::Future<Output = Result<T>>,
63{
64    let Some(d) = deadline else {
65        return fut.await;
66    };
67    tokio::pin!(fut);
68    tokio::select! {
69        biased;
70        res = &mut fut => res,
71        () = tokio::time::sleep(d) => {
72            // Signal cancellation, then let the in-flight read consume the
73            // server's attention acknowledgement so the connection stays usable.
74            let _ = canceller.cancel().await;
75            let _ = fut.await;
76            Err(Error::CommandTimeout)
77        }
78    }
79}
80
81/// SQL Server client with type-state connection management.
82///
83/// The generic parameter `S` represents the current connection state,
84/// ensuring at compile time that certain operations are only available
85/// in appropriate states.
86pub struct Client<S: ConnectionState> {
87    config: Config,
88    _state: PhantomData<S>,
89    /// The underlying connection (present only when connected)
90    connection: Option<ConnectionHandle>,
91    /// Server version from LoginAck (raw u32 TDS version)
92    server_version: Option<u32>,
93    /// Current database from EnvChange
94    current_database: Option<String>,
95    /// Server's default collation from SqlCollation EnvChange during login.
96    /// Used when `SendStringParametersAsUnicode=false` to encode VARCHAR
97    /// parameters with the correct character encoding and collation bytes.
98    server_collation: Option<tds_protocol::token::Collation>,
99    /// Prepared statement cache for query optimization
100    statement_cache: StatementCache,
101    /// Transaction descriptor from BeginTransaction EnvChange.
102    /// Per MS-TDS spec, this value must be included in ALL_HEADERS for subsequent
103    /// requests within an explicit transaction. 0 indicates auto-commit mode.
104    transaction_descriptor: u64,
105    /// Whether a request has been sent and the response has not yet been fully read.
106    /// Used by the connection pool to detect dirty connections after cancel/timeout.
107    in_flight: bool,
108    /// Whether this connection needs a reset on next use.
109    /// Set by connection pool on checkin, cleared after first query/execute.
110    /// When true, the RESETCONNECTION flag is set on the first TDS packet.
111    needs_reset: bool,
112    /// OpenTelemetry instrumentation context (when otel feature is enabled)
113    #[cfg(feature = "otel")]
114    instrumentation: InstrumentationContext,
115    /// Always Encrypted context for column decryption (when always-encrypted feature is enabled)
116    #[cfg(feature = "always-encrypted")]
117    pub(crate) encryption_context: Option<std::sync::Arc<crate::encryption::EncryptionContext>>,
118}
119
120/// Internal connection handle wrapping the actual connection.
121///
122/// This is an enum to support different connection types:
123/// - TLS (TDS 8.0 strict mode) - requires `tls` feature
124/// - TLS with PreLogin wrapping (TDS 7.x style) - requires `tls` feature
125/// - Plain TCP (for internal networks or when `tls` feature is disabled)
126#[allow(dead_code)] // Connection will be used once query execution is implemented
127enum ConnectionHandle {
128    /// TLS connection (TDS 8.0 strict mode - TLS before any TDS traffic)
129    #[cfg(feature = "tls")]
130    Tls(Connection<TlsStream<TcpStream>>),
131    /// TLS connection with PreLogin wrapping (TDS 7.x style)
132    #[cfg(feature = "tls")]
133    TlsPrelogin(Connection<TlsStream<mssql_tls::TlsPreloginWrapper<TcpStream>>>),
134    /// Plain TCP connection (for internal networks or when `tls` feature is disabled)
135    Plain(Connection<TcpStream>),
136}
137
138// Private helper methods available to all connection states
139impl<S: ConnectionState> Client<S> {
140    /// The default per-command deadline from `command_timeout`.
141    ///
142    /// Returns `None` when `command_timeout` is zero, which means "no limit"
143    /// (matching ADO.NET's `SqlCommand.CommandTimeout = 0`).
144    fn command_deadline(&self) -> Option<std::time::Duration> {
145        let t = self.config.command_timeout;
146        if t.is_zero() { None } else { Some(t) }
147    }
148
149    /// Process transaction-related EnvChange tokens.
150    ///
151    /// This handles BeginTransaction, CommitTransaction, and RollbackTransaction
152    /// EnvChange tokens, updating the transaction descriptor accordingly.
153    ///
154    /// This enables executing BEGIN TRANSACTION, COMMIT, and ROLLBACK via raw SQL
155    /// while still having the transaction descriptor tracked correctly.
156    fn process_transaction_env_change(env: &EnvChange, transaction_descriptor: &mut u64) {
157        use tds_protocol::token::EnvChangeValue;
158
159        match env.env_type {
160            EnvChangeType::BeginTransaction => {
161                if let EnvChangeValue::Binary(ref data) = env.new_value {
162                    if data.len() >= 8 {
163                        let descriptor = u64::from_le_bytes([
164                            data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
165                        ]);
166                        tracing::debug!(descriptor = descriptor, "transaction started via raw SQL");
167                        *transaction_descriptor = descriptor;
168                    }
169                }
170            }
171            EnvChangeType::CommitTransaction | EnvChangeType::RollbackTransaction => {
172                tracing::debug!(
173                    env_type = ?env.env_type,
174                    "transaction ended via raw SQL"
175                );
176                *transaction_descriptor = 0;
177            }
178            _ => {}
179        }
180    }
181
182    /// Send a SQL batch to the server.
183    ///
184    /// Uses the client's current transaction descriptor in ALL_HEADERS.
185    /// Per MS-TDS spec, when in an explicit transaction, the descriptor
186    /// returned by BeginTransaction must be included.
187    ///
188    /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
189    /// is included in the first packet to reset connection state.
190    async fn send_sql_batch(&mut self, sql: &str) -> Result<()> {
191        let payload =
192            tds_protocol::encode_sql_batch_with_transaction(sql, self.transaction_descriptor);
193        let max_packet = self.config.packet_size as usize;
194
195        // Check if we need to reset the connection on this request
196        let reset = self.needs_reset;
197        if reset {
198            self.needs_reset = false; // Clear flag before sending
199            tracing::debug!("sending SQL batch with RESETCONNECTION flag");
200        }
201
202        self.in_flight = true;
203        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
204
205        match connection {
206            #[cfg(feature = "tls")]
207            ConnectionHandle::Tls(conn) => {
208                conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
209                    .await?;
210            }
211            #[cfg(feature = "tls")]
212            ConnectionHandle::TlsPrelogin(conn) => {
213                conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
214                    .await?;
215            }
216            ConnectionHandle::Plain(conn) => {
217                conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
218                    .await?;
219            }
220        }
221
222        Ok(())
223    }
224
225    /// Send an RPC request to the server.
226    ///
227    /// Uses the client's current transaction descriptor in ALL_HEADERS.
228    ///
229    /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
230    /// is included in the first packet to reset connection state.
231    pub(crate) async fn send_rpc(&mut self, rpc: &RpcRequest) -> Result<()> {
232        let payload = rpc.encode_with_transaction(self.transaction_descriptor);
233        let max_packet = self.config.packet_size as usize;
234
235        // Check if we need to reset the connection on this request
236        let reset = self.needs_reset;
237        if reset {
238            self.needs_reset = false; // Clear flag before sending
239            tracing::debug!("sending RPC with RESETCONNECTION flag");
240        }
241
242        self.in_flight = true;
243        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
244
245        match connection {
246            #[cfg(feature = "tls")]
247            ConnectionHandle::Tls(conn) => {
248                conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
249                    .await?;
250            }
251            #[cfg(feature = "tls")]
252            ConnectionHandle::TlsPrelogin(conn) => {
253                conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
254                    .await?;
255            }
256            ConnectionHandle::Plain(conn) => {
257                conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
258                    .await?;
259            }
260        }
261
262        Ok(())
263    }
264
265    /// Start building a stored procedure call with full control over parameters.
266    ///
267    /// Returns a [`crate::procedure::ProcedureBuilder`] that allows adding named input and output
268    /// parameters before executing the call.
269    ///
270    /// The procedure name is validated to prevent SQL injection. It may be
271    /// schema-qualified (e.g., `"dbo.MyProc"`).
272    ///
273    /// # Example
274    ///
275    /// ```rust,no_run
276    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
277    /// let result = client.procedure("dbo.CalculateSum")?
278    ///     .input("@a", &10i32)
279    ///     .input("@b", &20i32)
280    ///     .output_int("@result")
281    ///     .execute().await?;
282    ///
283    /// let sum = result.get_output("@result").unwrap();
284    /// # let _ = sum;
285    /// # Ok(())
286    /// # }
287    /// ```
288    pub fn procedure(
289        &mut self,
290        proc_name: &str,
291    ) -> Result<crate::procedure::ProcedureBuilder<'_, S>> {
292        crate::validation::validate_qualified_identifier(proc_name)?;
293        Ok(crate::procedure::ProcedureBuilder::new(self, proc_name))
294    }
295
296    /// Execute a stored procedure with positional input parameters.
297    ///
298    /// This is a convenience method for the common case of calling a procedure
299    /// with input-only parameters. For output parameters or named parameters,
300    /// use [`procedure()`](Client::procedure) instead.
301    ///
302    /// # Example
303    ///
304    /// ```rust,no_run
305    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
306    /// let result = client.call_procedure("dbo.GetUser", &[&1i32]).await?;
307    /// assert_eq!(result.return_value, 0);
308    ///
309    /// if let Some(rs) = result.first_result_set() {
310    ///     println!("columns: {:?}", rs.columns());
311    /// }
312    /// # Ok(())
313    /// # }
314    /// ```
315    pub async fn call_procedure(
316        &mut self,
317        proc_name: &str,
318        params: &[&(dyn crate::ToSql + Sync)],
319    ) -> Result<crate::stream::ProcedureResult> {
320        crate::validation::validate_qualified_identifier(proc_name)?;
321
322        tracing::debug!(
323            proc_name = proc_name,
324            params_count = params.len(),
325            "executing stored procedure"
326        );
327
328        let rpc_params =
329            Self::convert_params_positional(params, self.send_unicode(), self.server_collation())?;
330        let mut rpc = RpcRequest::named(proc_name);
331        for param in rpc_params {
332            rpc = rpc.param(param);
333        }
334
335        self.send_rpc(&rpc).await?;
336        self.read_procedure_result().await
337    }
338
339    /// Start a bulk insert operation for the specified table.
340    ///
341    /// Sends the `INSERT BULK` statement to the server and returns a
342    /// [`crate::bulk::BulkWriter`] for streaming rows. The writer holds
343    /// a mutable borrow on the client, preventing other operations while
344    /// the bulk insert is in progress.
345    ///
346    /// # Example
347    ///
348    /// ```rust,no_run
349    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
350    /// use mssql_client::{BulkInsertBuilder, BulkColumn, SqlValue};
351    ///
352    /// let builder = BulkInsertBuilder::new("dbo.Users")
353    ///     .with_typed_columns(vec![
354    ///         BulkColumn::new("id", "INT", 0)?,
355    ///         BulkColumn::new("name", "NVARCHAR(100)", 1)?,
356    ///     ]);
357    ///
358    /// let mut writer = client.bulk_insert(&builder).await?;
359    /// writer.send_row_values(&[SqlValue::Int(1), SqlValue::String("Alice".into())])?;
360    /// writer.send_row_values(&[SqlValue::Int(2), SqlValue::String("Bob".into())])?;
361    /// let result = writer.finish().await?;
362    /// println!("Inserted {} rows", result.rows_affected);
363    /// # Ok(())
364    /// # }
365    /// ```
366    pub async fn bulk_insert(
367        &mut self,
368        builder: &crate::bulk::BulkInsertBuilder,
369    ) -> Result<crate::bulk::BulkWriter<'_, S>> {
370        use tds_protocol::token::{ColMetaData, Token};
371
372        tracing::debug!(
373            table = builder.table_name(),
374            columns = builder.columns().len(),
375            "starting bulk insert"
376        );
377
378        // Step 1: Query the server for column metadata.
379        // This gives us the exact type encoding the server expects for BulkLoad,
380        // following the pattern established by Tiberius.
381        let meta_query = format!("SELECT TOP 0 * FROM {}", builder.table_name());
382        self.send_sql_batch(&meta_query).await?;
383
384        let message = self.read_response_message().await?;
385        self.in_flight = false;
386
387        // Capture both the raw COLMETADATA bytes and parsed column info
388        let raw_payload = message.payload.clone();
389        let mut parser = self.create_parser(message.payload);
390        let mut server_metadata: Option<ColMetaData> = None;
391        let mut meta_start: usize = 0;
392        let mut meta_end: usize = 0;
393
394        loop {
395            let pos_before = raw_payload.len() - parser.remaining();
396            let token = parser.next_token_with_metadata(server_metadata.as_ref())?;
397            let pos_after = raw_payload.len() - parser.remaining();
398            let Some(token) = token else { break };
399
400            match token {
401                Token::ColMetaData(meta) => {
402                    meta_start = pos_before;
403                    meta_end = pos_after;
404                    server_metadata = Some(meta);
405                }
406                Token::Done(_) => break,
407                _ => {}
408            }
409        }
410
411        // Reject deprecated TEXT/NTEXT/IMAGE columns reported by the server.
412        // These types require a legacy TEXTPTR wire format that this driver
413        // does not support — users should migrate the column to VARCHAR(MAX) /
414        // NVARCHAR(MAX) / VARBINARY(MAX).
415        if let Some(ref meta) = server_metadata {
416            use tds_protocol::types::TypeId;
417            for col in meta.columns.iter() {
418                let (rejected, replacement) = match col.type_id {
419                    TypeId::Text => (Some("TEXT"), "VARCHAR(MAX)"),
420                    TypeId::NText => (Some("NTEXT"), "NVARCHAR(MAX)"),
421                    TypeId::Image => (Some("IMAGE"), "VARBINARY(MAX)"),
422                    _ => (None, ""),
423                };
424                if let Some(sql_type) = rejected {
425                    return Err(Error::from(mssql_types::TypeError::UnsupportedType {
426                        sql_type: sql_type.to_string(),
427                        reason: format!(
428                            "column `{}` in table `{}` is {} — TEXT/NTEXT/IMAGE \
429                             are not supported. Alter the column to {} instead \
430                             (Microsoft deprecated TEXT/NTEXT/IMAGE in SQL \
431                             Server 2005).",
432                            col.name,
433                            builder.table_name(),
434                            sql_type,
435                            replacement,
436                        ),
437                    }));
438                }
439            }
440        }
441
442        // Step 2: Send INSERT BULK statement to put server in bulk load mode
443        let stmt = builder.build_insert_bulk_statement()?;
444        self.send_sql_batch(&stmt).await?;
445        self.read_execute_result().await?;
446
447        // Step 3: Create bulk writer with server's metadata
448        let raw_meta = if meta_end > meta_start {
449            Some(raw_payload.slice(meta_start..meta_end))
450        } else {
451            None
452        };
453
454        let server_cols = server_metadata.as_ref().map(|m| m.columns.as_slice());
455        let bulk = crate::bulk::BulkInsert::new_with_server_metadata(
456            builder.columns().to_vec(),
457            builder.options().batch_size,
458            raw_meta,
459            server_cols,
460        );
461
462        Ok(crate::bulk::BulkWriter::new(self, bulk))
463    }
464
465    /// Start a bulk insert without querying the server for column metadata.
466    ///
467    /// Unlike [`bulk_insert()`](Self::bulk_insert), this method does not send
468    /// `SELECT TOP 0 * FROM table` to discover column types. Instead, the
469    /// column metadata is constructed from the `BulkColumn` types provided
470    /// on the builder. This saves a round-trip when the schema is known.
471    ///
472    /// # Caveats
473    ///
474    /// The caller must ensure `BulkColumn` entries match the target table's
475    /// column definitions exactly. Mismatched types, lengths, precision/scale,
476    /// or column ordering will cause the server to reject the BulkLoad packet.
477    ///
478    /// For most use cases, prefer [`bulk_insert()`](Self::bulk_insert) — the
479    /// extra round-trip is usually negligible and the server-supplied metadata
480    /// is guaranteed correct.
481    pub async fn bulk_insert_without_schema_discovery(
482        &mut self,
483        builder: &crate::bulk::BulkInsertBuilder,
484    ) -> Result<crate::bulk::BulkWriter<'_, S>> {
485        tracing::debug!(
486            table = builder.table_name(),
487            columns = builder.columns().len(),
488            "starting bulk insert (no schema discovery)"
489        );
490
491        // Send INSERT BULK statement to put server in bulk load mode
492        let stmt = builder.build_insert_bulk_statement()?;
493        self.send_sql_batch(&stmt).await?;
494        self.read_execute_result().await?;
495
496        // Create bulk writer with hand-crafted metadata
497        let bulk =
498            crate::bulk::BulkInsert::new(builder.columns().to_vec(), builder.options().batch_size);
499
500        Ok(crate::bulk::BulkWriter::new(self, bulk))
501    }
502
503    /// Send bulk load data as a BulkLoad (0x07) message and read the server response.
504    ///
505    /// Used internally by [`crate::bulk::BulkWriter::finish()`] to transmit accumulated
506    /// row data after the `INSERT BULK` statement has been acknowledged.
507    pub(crate) async fn send_and_read_bulk_load(&mut self, payload: bytes::Bytes) -> Result<u64> {
508        let max_packet = self.config.packet_size as usize;
509
510        self.in_flight = true;
511        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
512
513        match connection {
514            #[cfg(feature = "tls")]
515            ConnectionHandle::Tls(conn) => {
516                conn.send_message(PacketType::BulkLoad, payload, max_packet)
517                    .await?;
518            }
519            #[cfg(feature = "tls")]
520            ConnectionHandle::TlsPrelogin(conn) => {
521                conn.send_message(PacketType::BulkLoad, payload, max_packet)
522                    .await?;
523            }
524            ConnectionHandle::Plain(conn) => {
525                conn.send_message(PacketType::BulkLoad, payload, max_packet)
526                    .await?;
527            }
528        }
529
530        // Read the server's Done response with row count
531        self.read_execute_result().await
532    }
533
534    /// Execute a query with named parameters and return a streaming result set.
535    ///
536    /// This method accepts [`NamedParam`](crate::to_params::NamedParam) values,
537    /// making it compatible with the [`ToParams`](crate::to_params::ToParams) trait
538    /// and the `#[derive(ToParams)]` macro.
539    ///
540    /// # Example
541    ///
542    /// ```rust,no_run
543    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
544    /// use mssql_client::{NamedParam, ToParams};
545    ///
546    /// // With derive macro:
547    /// #[derive(mssql_derive::ToParams)]
548    /// struct UserQuery { name: String }
549    ///
550    /// let q = UserQuery { name: "Alice".into() };
551    /// let rows = client.query_named(
552    ///     "SELECT * FROM users WHERE name = @name",
553    ///     &q.to_params()?,
554    /// ).await?;
555    ///
556    /// // Or manually:
557    /// let params = vec![NamedParam::from_value("name", &"Alice")?];
558    /// let rows = client.query_named(
559    ///     "SELECT * FROM users WHERE name = @name",
560    ///     &params,
561    /// ).await?;
562    /// # let _ = rows;
563    /// # Ok(())
564    /// # }
565    /// ```
566    pub async fn query_named<'a>(
567        &'a mut self,
568        sql: &str,
569        params: &[crate::to_params::NamedParam],
570    ) -> Result<QueryStream<'a>> {
571        tracing::debug!(
572            sql = sql,
573            params_count = params.len(),
574            "executing query with named parameters"
575        );
576
577        if params.is_empty() {
578            self.send_sql_batch(sql).await?;
579        } else {
580            let rpc_params =
581                Self::convert_named_params(params, self.send_unicode(), self.server_collation())?;
582            let rpc = RpcRequest::execute_sql(sql, rpc_params);
583            self.send_rpc(&rpc).await?;
584        }
585
586        let resp = self.read_query_response().await?;
587        #[cfg(feature = "always-encrypted")]
588        {
589            Ok(QueryStream::from_raw(
590                resp.columns,
591                resp.pending_rows,
592                resp.meta,
593                resp.decryptor,
594            ))
595        }
596        #[cfg(not(feature = "always-encrypted"))]
597        {
598            Ok(QueryStream::from_raw(
599                resp.columns,
600                resp.pending_rows,
601                resp.meta,
602            ))
603        }
604    }
605
606    /// Execute a statement with named parameters.
607    ///
608    /// Returns the number of affected rows. This is the named-parameter
609    /// counterpart of [`execute()`](Client::execute), compatible with the
610    /// [`ToParams`](crate::to_params::ToParams) trait.
611    ///
612    /// # Example
613    ///
614    /// ```rust,no_run
615    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
616    /// use mssql_client::NamedParam;
617    ///
618    /// let params = vec![
619    ///     NamedParam::from_value("name", &"Alice")?,
620    ///     NamedParam::from_value("email", &"alice@example.com")?,
621    /// ];
622    /// let rows_affected = client.execute_named(
623    ///     "INSERT INTO users (name, email) VALUES (@name, @email)",
624    ///     &params,
625    /// ).await?;
626    /// # let _ = rows_affected;
627    /// # Ok(())
628    /// # }
629    /// ```
630    pub async fn execute_named(
631        &mut self,
632        sql: &str,
633        params: &[crate::to_params::NamedParam],
634    ) -> Result<u64> {
635        tracing::debug!(
636            sql = sql,
637            params_count = params.len(),
638            "executing statement with named parameters"
639        );
640
641        if params.is_empty() {
642            self.send_sql_batch(sql).await?;
643        } else {
644            let rpc_params =
645                Self::convert_named_params(params, self.send_unicode(), self.server_collation())?;
646            let rpc = RpcRequest::execute_sql(sql, rpc_params);
647            self.send_rpc(&rpc).await?;
648        }
649
650        self.read_execute_result().await
651    }
652
653    /// Whether string parameters are sent as NVARCHAR (Unicode).
654    pub(crate) fn send_unicode(&self) -> bool {
655        self.config.send_string_parameters_as_unicode
656    }
657
658    /// Server's default collation, captured from ENVCHANGE during login.
659    pub(crate) fn server_collation(&self) -> Option<&tds_protocol::token::Collation> {
660        self.server_collation.as_ref()
661    }
662}
663
664impl Client<Ready> {
665    /// Mark this connection as needing a reset on next use.
666    ///
667    /// Called by the connection pool when a connection is returned.
668    /// The next SQL batch or RPC will include the RESETCONNECTION flag
669    /// in the TDS packet header, causing SQL Server to reset connection
670    /// state (temp tables, SET options, transaction isolation level, etc.)
671    /// before executing the command.
672    ///
673    /// This is more efficient than calling `sp_reset_connection` as a
674    /// separate command because it's handled at the TDS protocol level.
675    pub fn mark_needs_reset(&mut self) {
676        self.needs_reset = true;
677    }
678
679    /// Check if this connection needs a reset.
680    ///
681    /// Returns true if `mark_needs_reset()` was called and the reset
682    /// hasn't been performed yet.
683    #[must_use]
684    pub fn needs_reset(&self) -> bool {
685        self.needs_reset
686    }
687
688    /// Execute a query and return a result set with lazy per-row decoding.
689    ///
690    /// Per ADR-007 the full response is buffered in memory and each row is
691    /// *decoded* on demand as you iterate — this is not incremental network
692    /// streaming, so peak memory tracks the response size. Use
693    /// `.collect_all()` if you want all rows materialized into a `Vec` up
694    /// front.
695    ///
696    /// # Example
697    ///
698    /// ```rust,no_run
699    /// # use mssql_client::Row;
700    /// # fn process(_: &Row) {}
701    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
702    /// // Streaming (synchronous iteration over the result set)
703    /// let stream = client.query("SELECT * FROM users WHERE id = @p1", &[&1]).await?;
704    /// for row in stream {
705    ///     let row = row?;
706    ///     process(&row);
707    /// }
708    ///
709    /// // Buffered (loads all into memory)
710    /// let rows: Vec<Row> = client
711    ///     .query("SELECT * FROM small_table", &[])
712    ///     .await?
713    ///     .collect_all()
714    ///     .await?;
715    /// # let _ = rows;
716    /// # Ok(())
717    /// # }
718    /// ```
719    pub async fn query<'a>(
720        &'a mut self,
721        sql: &str,
722        params: &[&(dyn crate::ToSql + Sync)],
723    ) -> Result<QueryStream<'a>> {
724        let deadline = self.command_deadline();
725        self.query_inner(sql, params, deadline).await
726    }
727
728    /// Shared query implementation with an explicit command deadline.
729    async fn query_inner<'a>(
730        &'a mut self,
731        sql: &str,
732        params: &[&(dyn crate::ToSql + Sync)],
733        deadline: Option<std::time::Duration>,
734    ) -> Result<QueryStream<'a>> {
735        tracing::debug!(sql = sql, params_count = params.len(), "executing query");
736
737        #[cfg(feature = "otel")]
738        let instrumentation = self.instrumentation.clone();
739        #[cfg(feature = "otel")]
740        let mut span = instrumentation.query_span(sql);
741        #[cfg(feature = "otel")]
742        let timer = crate::instrumentation::OperationTimer::start(
743            crate::instrumentation::extract_operation(sql),
744        );
745
746        let canceller = self.cancel_handle();
747        let result = run_with_deadline(
748            async {
749                if params.is_empty() {
750                    // Simple query without parameters - use SQL batch
751                    self.send_sql_batch(sql).await?;
752                } else {
753                    // Parameterized query - use sp_executesql via RPC
754                    let rpc_params =
755                        Self::convert_params(params, self.send_unicode(), self.server_collation())?;
756                    let rpc = RpcRequest::execute_sql(sql, rpc_params);
757                    self.send_rpc(&rpc).await?;
758                }
759
760                // Read complete response including columns and rows
761                self.read_query_response().await
762            },
763            deadline,
764            canceller,
765        )
766        .await;
767
768        #[cfg(feature = "otel")]
769        match &result {
770            Ok(_) => InstrumentationContext::record_success(&mut span, None),
771            Err(e) => InstrumentationContext::record_error(&mut span, e),
772        }
773        #[cfg(feature = "otel")]
774        timer.finish(instrumentation.metrics(), result.is_ok());
775
776        // Drop the span before returning
777        #[cfg(feature = "otel")]
778        drop(span);
779
780        let resp = result?;
781        #[cfg(feature = "always-encrypted")]
782        {
783            Ok(QueryStream::from_raw(
784                resp.columns,
785                resp.pending_rows,
786                resp.meta,
787                resp.decryptor,
788            ))
789        }
790        #[cfg(not(feature = "always-encrypted"))]
791        {
792            Ok(QueryStream::from_raw(
793                resp.columns,
794                resp.pending_rows,
795                resp.meta,
796            ))
797        }
798    }
799
800    /// Execute a query with a specific timeout.
801    ///
802    /// This overrides the default `command_timeout` from the connection configuration
803    /// for this specific query. If the query does not complete within the specified
804    /// duration, the driver sends an Attention packet to cancel it server-side,
805    /// drains the acknowledgement, and returns [`Error::CommandTimeout`] with the
806    /// connection left usable for the next request.
807    ///
808    /// # Arguments
809    ///
810    /// * `sql` - The SQL query to execute
811    /// * `params` - Query parameters
812    /// * `timeout_duration` - Maximum time to wait for the query to complete
813    ///
814    /// # Example
815    ///
816    /// ```rust,no_run
817    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
818    /// use std::time::Duration;
819    ///
820    /// // Execute with a 5-second timeout
821    /// let rows = client
822    ///     .query_with_timeout(
823    ///         "SELECT * FROM large_table",
824    ///         &[],
825    ///         Duration::from_secs(5),
826    ///     )
827    ///     .await?;
828    /// # let _ = rows;
829    /// # Ok(())
830    /// # }
831    /// ```
832    pub async fn query_with_timeout<'a>(
833        &'a mut self,
834        sql: &str,
835        params: &[&(dyn crate::ToSql + Sync)],
836        timeout_duration: std::time::Duration,
837    ) -> Result<QueryStream<'a>> {
838        self.query_inner(sql, params, Some(timeout_duration)).await
839    }
840
841    /// Execute a batch that may return multiple result sets.
842    ///
843    /// This is useful for stored procedures or SQL batches that contain
844    /// multiple SELECT statements.
845    ///
846    /// # Example
847    ///
848    /// ```rust,no_run
849    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
850    /// // Execute a batch with multiple SELECT statements
851    /// let mut results = client.query_multiple(
852    ///     "SELECT 1 AS a; SELECT 2 AS b, 3 AS c;",
853    ///     &[]
854    /// ).await?;
855    ///
856    /// // Process first result set
857    /// while let Some(row) = results.next_row().await? {
858    ///     println!("Result 1: {:?}", row);
859    /// }
860    ///
861    /// // Move to second result set
862    /// if results.next_result().await? {
863    ///     while let Some(row) = results.next_row().await? {
864    ///         println!("Result 2: {:?}", row);
865    ///     }
866    /// }
867    /// # Ok(())
868    /// # }
869    /// ```
870    pub async fn query_multiple<'a>(
871        &'a mut self,
872        sql: &str,
873        params: &[&(dyn crate::ToSql + Sync)],
874    ) -> Result<MultiResultStream<'a>> {
875        tracing::debug!(
876            sql = sql,
877            params_count = params.len(),
878            "executing multi-result query"
879        );
880
881        if params.is_empty() {
882            // Simple batch without parameters - use SQL batch
883            self.send_sql_batch(sql).await?;
884        } else {
885            // Parameterized query - use sp_executesql via RPC
886            let rpc_params =
887                Self::convert_params(params, self.send_unicode(), self.server_collation())?;
888            let rpc = RpcRequest::execute_sql(sql, rpc_params);
889            self.send_rpc(&rpc).await?;
890        }
891
892        // Read all result sets
893        let result_sets = self.read_multi_result_response().await?;
894        Ok(MultiResultStream::new(result_sets))
895    }
896
897    /// Execute a query that doesn't return rows.
898    ///
899    /// Returns the number of affected rows.
900    pub async fn execute(
901        &mut self,
902        sql: &str,
903        params: &[&(dyn crate::ToSql + Sync)],
904    ) -> Result<u64> {
905        let deadline = self.command_deadline();
906        self.execute_inner(sql, params, deadline).await
907    }
908
909    /// Shared execute implementation with an explicit command deadline.
910    async fn execute_inner(
911        &mut self,
912        sql: &str,
913        params: &[&(dyn crate::ToSql + Sync)],
914        deadline: Option<std::time::Duration>,
915    ) -> Result<u64> {
916        tracing::debug!(
917            sql = sql,
918            params_count = params.len(),
919            "executing statement"
920        );
921
922        #[cfg(feature = "otel")]
923        let instrumentation = self.instrumentation.clone();
924        #[cfg(feature = "otel")]
925        let mut span = instrumentation.query_span(sql);
926        #[cfg(feature = "otel")]
927        let timer = crate::instrumentation::OperationTimer::start(
928            crate::instrumentation::extract_operation(sql),
929        );
930
931        let canceller = self.cancel_handle();
932        let result = run_with_deadline(
933            async {
934                if params.is_empty() {
935                    // Simple statement without parameters - use SQL batch
936                    self.send_sql_batch(sql).await?;
937                } else {
938                    // Parameterized statement - use sp_executesql via RPC
939                    let rpc_params =
940                        Self::convert_params(params, self.send_unicode(), self.server_collation())?;
941                    let rpc = RpcRequest::execute_sql(sql, rpc_params);
942                    self.send_rpc(&rpc).await?;
943                }
944
945                // Read response and get row count
946                self.read_execute_result().await
947            },
948            deadline,
949            canceller,
950        )
951        .await;
952
953        #[cfg(feature = "otel")]
954        match &result {
955            Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
956            Err(e) => InstrumentationContext::record_error(&mut span, e),
957        }
958        #[cfg(feature = "otel")]
959        timer.finish(instrumentation.metrics(), result.is_ok());
960
961        // Drop the span before returning
962        #[cfg(feature = "otel")]
963        drop(span);
964
965        result
966    }
967
968    /// Execute a statement with a specific timeout.
969    ///
970    /// This overrides the default `command_timeout` from the connection configuration
971    /// for this specific statement. If the statement does not complete within the
972    /// specified duration, the driver sends an Attention packet to cancel it
973    /// server-side, drains the acknowledgement, and returns
974    /// [`Error::CommandTimeout`] with the connection left usable.
975    ///
976    /// # Arguments
977    ///
978    /// * `sql` - The SQL statement to execute
979    /// * `params` - Statement parameters
980    /// * `timeout_duration` - Maximum time to wait for the statement to complete
981    ///
982    /// # Example
983    ///
984    /// ```rust,no_run
985    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
986    /// use std::time::Duration;
987    ///
988    /// // Execute with a 10-second timeout
989    /// let rows_affected = client
990    ///     .execute_with_timeout(
991    ///         "UPDATE large_table SET status = @p1",
992    ///         &[&"processed"],
993    ///         Duration::from_secs(10),
994    ///     )
995    ///     .await?;
996    /// # let _ = rows_affected;
997    /// # Ok(())
998    /// # }
999    /// ```
1000    pub async fn execute_with_timeout(
1001        &mut self,
1002        sql: &str,
1003        params: &[&(dyn crate::ToSql + Sync)],
1004        timeout_duration: std::time::Duration,
1005    ) -> Result<u64> {
1006        self.execute_inner(sql, params, Some(timeout_duration))
1007            .await
1008    }
1009
1010    /// Begin a transaction.
1011    ///
1012    /// This transitions the client from `Ready` to `InTransaction` state.
1013    /// Per MS-TDS spec, the server returns a transaction descriptor in the
1014    /// BeginTransaction EnvChange token that must be included in subsequent
1015    /// ALL_HEADERS sections.
1016    pub async fn begin_transaction(mut self) -> Result<Client<InTransaction>> {
1017        tracing::debug!("beginning transaction");
1018
1019        #[cfg(feature = "otel")]
1020        let instrumentation = self.instrumentation.clone();
1021        #[cfg(feature = "otel")]
1022        let mut span = instrumentation.transaction_span("BEGIN");
1023
1024        // Execute BEGIN TRANSACTION and extract the transaction descriptor
1025        let result = async {
1026            self.send_sql_batch("BEGIN TRANSACTION").await?;
1027            self.read_transaction_begin_result().await
1028        }
1029        .await;
1030
1031        #[cfg(feature = "otel")]
1032        match &result {
1033            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1034            Err(e) => InstrumentationContext::record_error(&mut span, e),
1035        }
1036
1037        // Drop the span before moving instrumentation
1038        #[cfg(feature = "otel")]
1039        drop(span);
1040
1041        let transaction_descriptor = result?;
1042
1043        Ok(Client {
1044            config: self.config,
1045            _state: PhantomData,
1046            connection: self.connection,
1047            server_version: self.server_version,
1048            current_database: self.current_database,
1049            server_collation: self.server_collation,
1050            statement_cache: self.statement_cache,
1051            transaction_descriptor, // Store the descriptor from server
1052            needs_reset: self.needs_reset,
1053            in_flight: self.in_flight,
1054            #[cfg(feature = "otel")]
1055            instrumentation: self.instrumentation,
1056            #[cfg(feature = "always-encrypted")]
1057            encryption_context: self.encryption_context,
1058        })
1059    }
1060
1061    /// Begin a transaction with a specific isolation level.
1062    ///
1063    /// This transitions the client from `Ready` to `InTransaction` state
1064    /// with the specified isolation level.
1065    ///
1066    /// # Example
1067    ///
1068    /// ```rust,no_run
1069    /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1070    /// use mssql_client::IsolationLevel;
1071    ///
1072    /// let tx = client.begin_transaction_with_isolation(IsolationLevel::Serializable).await?;
1073    /// // All operations in this transaction use SERIALIZABLE isolation
1074    /// tx.commit().await?;
1075    /// # Ok(())
1076    /// # }
1077    /// ```
1078    pub async fn begin_transaction_with_isolation(
1079        mut self,
1080        isolation_level: crate::transaction::IsolationLevel,
1081    ) -> Result<Client<InTransaction>> {
1082        tracing::debug!(
1083            isolation_level = %isolation_level.name(),
1084            "beginning transaction with isolation level"
1085        );
1086
1087        #[cfg(feature = "otel")]
1088        let instrumentation = self.instrumentation.clone();
1089        #[cfg(feature = "otel")]
1090        let mut span = instrumentation.transaction_span("BEGIN");
1091
1092        // First set the isolation level
1093        let result = async {
1094            self.send_sql_batch(isolation_level.as_sql()).await?;
1095            self.read_execute_result().await?;
1096
1097            // Then begin the transaction
1098            self.send_sql_batch("BEGIN TRANSACTION").await?;
1099            self.read_transaction_begin_result().await
1100        }
1101        .await;
1102
1103        #[cfg(feature = "otel")]
1104        match &result {
1105            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1106            Err(e) => InstrumentationContext::record_error(&mut span, e),
1107        }
1108
1109        #[cfg(feature = "otel")]
1110        drop(span);
1111
1112        let transaction_descriptor = result?;
1113
1114        Ok(Client {
1115            config: self.config,
1116            _state: PhantomData,
1117            connection: self.connection,
1118            server_version: self.server_version,
1119            current_database: self.current_database,
1120            server_collation: self.server_collation,
1121            statement_cache: self.statement_cache,
1122            transaction_descriptor,
1123            needs_reset: self.needs_reset,
1124            in_flight: self.in_flight,
1125            #[cfg(feature = "otel")]
1126            instrumentation: self.instrumentation,
1127            #[cfg(feature = "always-encrypted")]
1128            encryption_context: self.encryption_context,
1129        })
1130    }
1131
1132    /// Execute a simple query without parameters.
1133    ///
1134    /// This is useful for DDL statements and simple queries where you
1135    /// don't need to retrieve the affected row count.
1136    pub async fn simple_query(&mut self, sql: &str) -> Result<()> {
1137        tracing::debug!(sql = sql, "executing simple query");
1138
1139        // Send SQL batch
1140        self.send_sql_batch(sql).await?;
1141
1142        // Read and discard response
1143        let _ = self.read_execute_result().await?;
1144
1145        Ok(())
1146    }
1147
1148    /// Close the connection gracefully.
1149    pub async fn close(self) -> Result<()> {
1150        tracing::debug!("closing connection");
1151        Ok(())
1152    }
1153
1154    /// Get the current database name.
1155    #[must_use]
1156    pub fn database(&self) -> Option<&str> {
1157        self.config.database.as_deref()
1158    }
1159
1160    /// Get the server host.
1161    #[must_use]
1162    pub fn host(&self) -> &str {
1163        &self.config.host
1164    }
1165
1166    /// Get the server port.
1167    #[must_use]
1168    pub fn port(&self) -> u16 {
1169        self.config.port
1170    }
1171
1172    /// Check if the connection is currently in a transaction.
1173    ///
1174    /// This returns `true` if a transaction was started via raw SQL
1175    /// (`BEGIN TRANSACTION`) and has not yet been committed or rolled back.
1176    ///
1177    /// Note: This only tracks transactions started via raw SQL. Transactions
1178    /// started via the type-state API (`begin_transaction()`) result in a
1179    /// `Client<InTransaction>` which is a different type.
1180    ///
1181    /// # Example
1182    ///
1183    /// ```rust,no_run
1184    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1185    /// client.execute("BEGIN TRANSACTION", &[]).await?;
1186    /// assert!(client.is_in_transaction());
1187    ///
1188    /// client.execute("COMMIT", &[]).await?;
1189    /// assert!(!client.is_in_transaction());
1190    /// # Ok(())
1191    /// # }
1192    /// ```
1193    #[must_use]
1194    pub fn is_in_transaction(&self) -> bool {
1195        self.transaction_descriptor != 0
1196    }
1197
1198    /// Check if a request is in-flight (sent but response not fully read).
1199    ///
1200    /// Used by the connection pool to detect dirty connections that were
1201    /// interrupted mid-query (e.g., by `tokio::select!` or a timeout).
1202    /// A connection with an in-flight request has unread data in the TCP
1203    /// buffer and must be discarded rather than returned to the pool.
1204    #[must_use]
1205    pub fn is_in_flight(&self) -> bool {
1206        self.in_flight
1207    }
1208
1209    /// Report whether an Always Encrypted key-store provider with the given
1210    /// name is currently reachable through this client's encryption context.
1211    ///
1212    /// Returns `false` when the `always-encrypted` feature isn't enabled, when
1213    /// the connection was opened without `column_encryption` configured, or
1214    /// when no matching provider was registered.
1215    #[cfg(feature = "always-encrypted")]
1216    #[must_use]
1217    pub fn has_encryption_provider(&self, name: &str) -> bool {
1218        self.encryption_context
1219            .as_ref()
1220            .is_some_and(|ctx| ctx.has_provider(name))
1221    }
1222
1223    /// Get a handle for cancelling the current query.
1224    ///
1225    /// The cancel handle can be cloned and sent to other tasks, enabling
1226    /// cancellation of long-running queries from a separate async context.
1227    ///
1228    /// # Example
1229    ///
1230    /// ```rust,no_run
1231    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1232    /// use std::time::Duration;
1233    ///
1234    /// let cancel_handle = client.cancel_handle();
1235    ///
1236    /// // Spawn a task to cancel after 10 seconds
1237    /// let handle = tokio::spawn(async move {
1238    ///     tokio::time::sleep(Duration::from_secs(10)).await;
1239    ///     let _ = cancel_handle.cancel().await;
1240    /// });
1241    ///
1242    /// // This query will be cancelled if it runs longer than 10 seconds
1243    /// let result = client.query("SELECT * FROM very_large_table", &[]).await;
1244    /// # let _ = (handle, result);
1245    /// # Ok(())
1246    /// # }
1247    /// ```
1248    #[must_use]
1249    pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
1250        let connection = self
1251            .connection
1252            .as_ref()
1253            .expect("connection should be present");
1254        match connection {
1255            #[cfg(feature = "tls")]
1256            ConnectionHandle::Tls(conn) => {
1257                crate::cancel::CancelHandle::from_tls(conn.cancel_handle())
1258            }
1259            #[cfg(feature = "tls")]
1260            ConnectionHandle::TlsPrelogin(conn) => {
1261                crate::cancel::CancelHandle::from_tls_prelogin(conn.cancel_handle())
1262            }
1263            ConnectionHandle::Plain(conn) => {
1264                crate::cancel::CancelHandle::from_plain(conn.cancel_handle())
1265            }
1266        }
1267    }
1268}
1269
1270/// # Drop Behavior
1271///
1272/// **`Client<InTransaction>` has no automatic rollback on drop.** If the client is
1273/// dropped without calling [`commit()`](Client::commit) or [`rollback()`](Client::rollback),
1274/// the transaction remains open on the server until the TCP connection closes
1275/// (at which point SQL Server automatically rolls back).
1276///
1277/// This is because `Drop` is synchronous and cannot perform the async I/O needed
1278/// to send a `ROLLBACK TRANSACTION` command.
1279///
1280/// ## Consequences of dropping without commit/rollback
1281///
1282/// - **Direct connections:** The transaction leaks until the OS TCP timeout
1283///   (potentially 30+ minutes), holding locks on any modified rows.
1284/// - **Pooled connections:** The pool detects the active transaction descriptor
1285///   and discards the connection rather than returning it to the idle pool
1286///   (see `PooledConnection::drop` in `mssql-driver-pool`).
1287///
1288/// ## Best practice
1289///
1290/// Always ensure `commit()` or `rollback()` is called. Use helper patterns
1291/// for error paths:
1292///
1293/// ```rust,no_run
1294/// # async fn do_work(_: &mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> { Ok(()) }
1295/// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1296/// let tx = client.begin_transaction().await?;
1297/// match do_work(&tx).await {
1298///     Ok(_) => { tx.commit().await?; }
1299///     Err(e) => { tx.rollback().await?; return Err(e); }
1300/// }
1301/// # Ok(())
1302/// # }
1303/// ```
1304impl Client<InTransaction> {
1305    /// Execute a query within the transaction and return a streaming result set.
1306    ///
1307    /// See [`Client<Ready>::query`] for usage examples.
1308    pub async fn query<'a>(
1309        &'a mut self,
1310        sql: &str,
1311        params: &[&(dyn crate::ToSql + Sync)],
1312    ) -> Result<QueryStream<'a>> {
1313        let deadline = self.command_deadline();
1314        self.query_inner(sql, params, deadline).await
1315    }
1316
1317    /// Shared query implementation with an explicit command deadline.
1318    async fn query_inner<'a>(
1319        &'a mut self,
1320        sql: &str,
1321        params: &[&(dyn crate::ToSql + Sync)],
1322        deadline: Option<std::time::Duration>,
1323    ) -> Result<QueryStream<'a>> {
1324        tracing::debug!(
1325            sql = sql,
1326            params_count = params.len(),
1327            "executing query in transaction"
1328        );
1329
1330        #[cfg(feature = "otel")]
1331        let instrumentation = self.instrumentation.clone();
1332        #[cfg(feature = "otel")]
1333        let mut span = instrumentation.query_span(sql);
1334        #[cfg(feature = "otel")]
1335        let timer = crate::instrumentation::OperationTimer::start(
1336            crate::instrumentation::extract_operation(sql),
1337        );
1338
1339        let canceller = self.cancel_handle();
1340        let result = run_with_deadline(
1341            async {
1342                if params.is_empty() {
1343                    // Simple query without parameters - use SQL batch
1344                    self.send_sql_batch(sql).await?;
1345                } else {
1346                    // Parameterized query - use sp_executesql via RPC
1347                    let rpc_params =
1348                        Self::convert_params(params, self.send_unicode(), self.server_collation())?;
1349                    let rpc = RpcRequest::execute_sql(sql, rpc_params);
1350                    self.send_rpc(&rpc).await?;
1351                }
1352
1353                // Read complete response including columns and rows
1354                self.read_query_response().await
1355            },
1356            deadline,
1357            canceller,
1358        )
1359        .await;
1360
1361        #[cfg(feature = "otel")]
1362        match &result {
1363            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1364            Err(e) => InstrumentationContext::record_error(&mut span, e),
1365        }
1366        #[cfg(feature = "otel")]
1367        timer.finish(instrumentation.metrics(), result.is_ok());
1368
1369        // Drop the span before returning
1370        #[cfg(feature = "otel")]
1371        drop(span);
1372
1373        let resp = result?;
1374        #[cfg(feature = "always-encrypted")]
1375        {
1376            Ok(QueryStream::from_raw(
1377                resp.columns,
1378                resp.pending_rows,
1379                resp.meta,
1380                resp.decryptor,
1381            ))
1382        }
1383        #[cfg(not(feature = "always-encrypted"))]
1384        {
1385            Ok(QueryStream::from_raw(
1386                resp.columns,
1387                resp.pending_rows,
1388                resp.meta,
1389            ))
1390        }
1391    }
1392
1393    /// Execute a statement within the transaction.
1394    ///
1395    /// Returns the number of affected rows.
1396    pub async fn execute(
1397        &mut self,
1398        sql: &str,
1399        params: &[&(dyn crate::ToSql + Sync)],
1400    ) -> Result<u64> {
1401        let deadline = self.command_deadline();
1402        self.execute_inner(sql, params, deadline).await
1403    }
1404
1405    /// Shared execute implementation with an explicit command deadline.
1406    async fn execute_inner(
1407        &mut self,
1408        sql: &str,
1409        params: &[&(dyn crate::ToSql + Sync)],
1410        deadline: Option<std::time::Duration>,
1411    ) -> Result<u64> {
1412        tracing::debug!(
1413            sql = sql,
1414            params_count = params.len(),
1415            "executing statement in transaction"
1416        );
1417
1418        #[cfg(feature = "otel")]
1419        let instrumentation = self.instrumentation.clone();
1420        #[cfg(feature = "otel")]
1421        let mut span = instrumentation.query_span(sql);
1422        #[cfg(feature = "otel")]
1423        let timer = crate::instrumentation::OperationTimer::start(
1424            crate::instrumentation::extract_operation(sql),
1425        );
1426
1427        let canceller = self.cancel_handle();
1428        let result = run_with_deadline(
1429            async {
1430                if params.is_empty() {
1431                    // Simple statement without parameters - use SQL batch
1432                    self.send_sql_batch(sql).await?;
1433                } else {
1434                    // Parameterized statement - use sp_executesql via RPC
1435                    let rpc_params =
1436                        Self::convert_params(params, self.send_unicode(), self.server_collation())?;
1437                    let rpc = RpcRequest::execute_sql(sql, rpc_params);
1438                    self.send_rpc(&rpc).await?;
1439                }
1440
1441                // Read response and get row count
1442                self.read_execute_result().await
1443            },
1444            deadline,
1445            canceller,
1446        )
1447        .await;
1448
1449        #[cfg(feature = "otel")]
1450        match &result {
1451            Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
1452            Err(e) => InstrumentationContext::record_error(&mut span, e),
1453        }
1454        #[cfg(feature = "otel")]
1455        timer.finish(instrumentation.metrics(), result.is_ok());
1456
1457        // Drop the span before returning
1458        #[cfg(feature = "otel")]
1459        drop(span);
1460
1461        result
1462    }
1463
1464    /// Execute a query within the transaction with a specific timeout.
1465    ///
1466    /// See [`Client<Ready>::query_with_timeout`] for details.
1467    pub async fn query_with_timeout<'a>(
1468        &'a mut self,
1469        sql: &str,
1470        params: &[&(dyn crate::ToSql + Sync)],
1471        timeout_duration: std::time::Duration,
1472    ) -> Result<QueryStream<'a>> {
1473        self.query_inner(sql, params, Some(timeout_duration)).await
1474    }
1475
1476    /// Execute a statement within the transaction with a specific timeout.
1477    ///
1478    /// See [`Client<Ready>::execute_with_timeout`] for details.
1479    pub async fn execute_with_timeout(
1480        &mut self,
1481        sql: &str,
1482        params: &[&(dyn crate::ToSql + Sync)],
1483        timeout_duration: std::time::Duration,
1484    ) -> Result<u64> {
1485        self.execute_inner(sql, params, Some(timeout_duration))
1486            .await
1487    }
1488
1489    /// Open a FILESTREAM BLOB for async reading and/or writing.
1490    ///
1491    /// This method queries the server for the transaction context, then opens
1492    /// the FILESTREAM handle using the native Win32 `OpenSqlFilestream` API.
1493    ///
1494    /// # Arguments
1495    ///
1496    /// * `path` — The UNC path obtained from the T-SQL `column.PathName()` function.
1497    ///   Query this yourself before calling `open_filestream`:
1498    ///   ```sql
1499    ///   SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1
1500    ///   ```
1501    /// * `access` — Read, write, or read/write access mode.
1502    ///
1503    /// # Requirements
1504    ///
1505    /// - SQL Server must have FILESTREAM enabled (`sp_configure 'filestream access level', 2`)
1506    /// - The Microsoft OLE DB Driver for SQL Server must be installed on the client
1507    /// - The `FileStream` must be dropped before calling [`commit`] or [`rollback`]
1508    ///
1509    /// # Example
1510    ///
1511    /// ```text
1512    /// use mssql_client::FileStreamAccess;
1513    /// use tokio::io::AsyncReadExt;
1514    ///
1515    /// let mut tx = client.begin_transaction().await?;
1516    ///
1517    /// // Get the FILESTREAM path
1518    /// let rows = tx.query(
1519    ///     "SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1",
1520    ///     &[&doc_id],
1521    /// ).await?;
1522    /// let path: String = rows.into_iter().next().unwrap()?.get(0)?;
1523    ///
1524    /// // Open and read the BLOB
1525    /// let mut stream = tx.open_filestream(&path, FileStreamAccess::Read).await?;
1526    /// let mut data = Vec::new();
1527    /// stream.read_to_end(&mut data).await?;
1528    /// drop(stream);
1529    ///
1530    /// tx.commit().await?;
1531    /// ```
1532    #[cfg(all(windows, feature = "filestream"))]
1533    pub async fn open_filestream(
1534        &mut self,
1535        path: &str,
1536        access: crate::filestream::FileStreamAccess,
1537    ) -> Result<crate::filestream::FileStream> {
1538        tracing::debug!(path = path, ?access, "opening FILESTREAM BLOB");
1539
1540        // Get the transaction context from SQL Server.
1541        // This binds the file access to the current SQL transaction.
1542        let txn_context: Vec<u8> = {
1543            let rows = self
1544                .query("SELECT GET_FILESTREAM_TRANSACTION_CONTEXT()", &[])
1545                .await?;
1546            let mut ctx = None;
1547            for result in rows {
1548                let row = result?;
1549                ctx = Some(row.get::<Vec<u8>>(0)?);
1550            }
1551            ctx.ok_or_else(|| {
1552                Error::FileStream("GET_FILESTREAM_TRANSACTION_CONTEXT() returned no rows".into())
1553            })?
1554        };
1555
1556        crate::filestream::FileStream::open(path, access, &txn_context)
1557    }
1558
1559    /// Commit the transaction.
1560    ///
1561    /// This transitions the client back to `Ready` state.
1562    pub async fn commit(mut self) -> Result<Client<Ready>> {
1563        tracing::debug!("committing transaction");
1564
1565        #[cfg(feature = "otel")]
1566        let instrumentation = self.instrumentation.clone();
1567        #[cfg(feature = "otel")]
1568        let mut span = instrumentation.transaction_span("COMMIT");
1569
1570        // Execute COMMIT TRANSACTION
1571        let result = async {
1572            self.send_sql_batch("COMMIT TRANSACTION").await?;
1573            self.read_execute_result().await
1574        }
1575        .await;
1576
1577        #[cfg(feature = "otel")]
1578        match &result {
1579            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1580            Err(e) => InstrumentationContext::record_error(&mut span, e),
1581        }
1582
1583        // Drop the span before moving instrumentation
1584        #[cfg(feature = "otel")]
1585        drop(span);
1586
1587        result?;
1588
1589        Ok(Client {
1590            config: self.config,
1591            _state: PhantomData,
1592            connection: self.connection,
1593            server_version: self.server_version,
1594            current_database: self.current_database,
1595            server_collation: self.server_collation,
1596            statement_cache: self.statement_cache,
1597            transaction_descriptor: 0, // Reset to auto-commit mode
1598            needs_reset: self.needs_reset,
1599            in_flight: self.in_flight,
1600            #[cfg(feature = "otel")]
1601            instrumentation: self.instrumentation,
1602            #[cfg(feature = "always-encrypted")]
1603            encryption_context: self.encryption_context,
1604        })
1605    }
1606
1607    /// Rollback the transaction.
1608    ///
1609    /// This transitions the client back to `Ready` state.
1610    pub async fn rollback(mut self) -> Result<Client<Ready>> {
1611        tracing::debug!("rolling back transaction");
1612
1613        #[cfg(feature = "otel")]
1614        let instrumentation = self.instrumentation.clone();
1615        #[cfg(feature = "otel")]
1616        let mut span = instrumentation.transaction_span("ROLLBACK");
1617
1618        // Execute ROLLBACK TRANSACTION
1619        let result = async {
1620            self.send_sql_batch("ROLLBACK TRANSACTION").await?;
1621            self.read_execute_result().await
1622        }
1623        .await;
1624
1625        #[cfg(feature = "otel")]
1626        match &result {
1627            Ok(_) => InstrumentationContext::record_success(&mut span, None),
1628            Err(e) => InstrumentationContext::record_error(&mut span, e),
1629        }
1630
1631        // Drop the span before moving instrumentation
1632        #[cfg(feature = "otel")]
1633        drop(span);
1634
1635        result?;
1636
1637        Ok(Client {
1638            config: self.config,
1639            _state: PhantomData,
1640            connection: self.connection,
1641            server_version: self.server_version,
1642            current_database: self.current_database,
1643            server_collation: self.server_collation,
1644            statement_cache: self.statement_cache,
1645            transaction_descriptor: 0, // Reset to auto-commit mode
1646            needs_reset: self.needs_reset,
1647            in_flight: self.in_flight,
1648            #[cfg(feature = "otel")]
1649            instrumentation: self.instrumentation,
1650            #[cfg(feature = "always-encrypted")]
1651            encryption_context: self.encryption_context,
1652        })
1653    }
1654
1655    /// Create a savepoint and return a handle for later rollback.
1656    ///
1657    /// The returned `SavePoint` handle contains the validated savepoint name.
1658    /// Use it with `rollback_to()` to partially undo transaction work.
1659    ///
1660    /// # Example
1661    ///
1662    /// ```rust,no_run
1663    /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1664    /// let mut tx = client.begin_transaction().await?;
1665    /// tx.execute("INSERT INTO orders ...", &[]).await?;
1666    /// let sp = tx.save_point("before_items").await?;
1667    /// tx.execute("INSERT INTO items ...", &[]).await?;
1668    /// // Oops, rollback just the items
1669    /// tx.rollback_to(&sp).await?;
1670    /// tx.commit().await?;
1671    /// # Ok(())
1672    /// # }
1673    /// ```
1674    pub async fn save_point(&mut self, name: &str) -> Result<SavePoint> {
1675        crate::validation::validate_identifier(name)?;
1676        tracing::debug!(name = name, "creating savepoint");
1677
1678        // Execute SAVE TRANSACTION <name>
1679        // Note: name is validated by validate_identifier() to prevent SQL injection
1680        let sql = format!("SAVE TRANSACTION {name}");
1681        self.send_sql_batch(&sql).await?;
1682        self.read_execute_result().await?;
1683
1684        Ok(SavePoint::new(name.to_string()))
1685    }
1686
1687    /// Rollback to a savepoint.
1688    ///
1689    /// This rolls back all changes made after the savepoint was created,
1690    /// but keeps the transaction active. The savepoint remains valid and
1691    /// can be rolled back to again.
1692    ///
1693    /// # Example
1694    ///
1695    /// ```rust,no_run
1696    /// # async fn ex(mut tx: mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> {
1697    /// let sp = tx.save_point("checkpoint").await?;
1698    /// // ... do some work ...
1699    /// tx.rollback_to(&sp).await?;  // Undo changes since checkpoint
1700    /// // Transaction is still active, savepoint is still valid
1701    /// # Ok(())
1702    /// # }
1703    /// ```
1704    pub async fn rollback_to(&mut self, savepoint: &SavePoint) -> Result<()> {
1705        tracing::debug!(name = savepoint.name(), "rolling back to savepoint");
1706
1707        // Execute ROLLBACK TRANSACTION <name>
1708        // Note: savepoint name was validated during creation
1709        let sql = format!("ROLLBACK TRANSACTION {}", savepoint.name());
1710        self.send_sql_batch(&sql).await?;
1711        self.read_execute_result().await?;
1712
1713        Ok(())
1714    }
1715
1716    /// Release a savepoint (optional cleanup).
1717    ///
1718    /// Note: SQL Server doesn't have explicit savepoint release, but this
1719    /// method is provided for API completeness. The savepoint is automatically
1720    /// released when the transaction commits or rolls back.
1721    pub async fn release_savepoint(&mut self, savepoint: SavePoint) -> Result<()> {
1722        tracing::debug!(name = savepoint.name(), "releasing savepoint");
1723
1724        // SQL Server doesn't require explicit savepoint release
1725        // The savepoint is implicitly released on commit/rollback
1726        // This method exists for API completeness
1727        drop(savepoint);
1728        Ok(())
1729    }
1730
1731    /// Get a handle for cancelling the current query within the transaction.
1732    ///
1733    /// See [`Client<Ready>::cancel_handle`] for usage examples.
1734    #[must_use]
1735    pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
1736        let connection = self
1737            .connection
1738            .as_ref()
1739            .expect("connection should be present");
1740        match connection {
1741            #[cfg(feature = "tls")]
1742            ConnectionHandle::Tls(conn) => {
1743                crate::cancel::CancelHandle::from_tls(conn.cancel_handle())
1744            }
1745            #[cfg(feature = "tls")]
1746            ConnectionHandle::TlsPrelogin(conn) => {
1747                crate::cancel::CancelHandle::from_tls_prelogin(conn.cancel_handle())
1748            }
1749            ConnectionHandle::Plain(conn) => {
1750                crate::cancel::CancelHandle::from_plain(conn.cancel_handle())
1751            }
1752        }
1753    }
1754}
1755
1756impl<S: ConnectionState> std::fmt::Debug for Client<S> {
1757    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1758        f.debug_struct("Client")
1759            .field("host", &self.config.host)
1760            .field("port", &self.config.port)
1761            .field("database", &self.config.database)
1762            .finish()
1763    }
1764}