mssql_client/client.rs
1//! SQL Server client implementation.
2//!
3//! ## DDL and statement routing
4//!
5//! [`Client::execute`] routes automatically by parameter count: with no
6//! parameters it sends a SQL batch (which permits DDL such as `CREATE` / `ALTER`
7//! / `DROP`); with parameters it uses `sp_executesql`, whose procedure context
8//! SQL Server forbids DDL in. Run DDL with an empty parameter slice:
9//!
10//! ```rust,no_run
11//! # async fn create_table(config: mssql_client::Config) -> Result<(), mssql_client::Error> {
12//! # let mut client = mssql_client::Client::connect(config).await?;
13//! client.execute("CREATE TABLE dbo.t (id INT)", &[]).await?;
14//! # Ok(())
15//! # }
16//! ```
17//!
18//! Use [`Client::simple_query`] for fire-and-forget batches (including
19//! multi-statement, `;`-separated DDL) when you don't need the affected-row count.
20
21// Allow unwrap/expect for chrono date construction with known-valid constant dates
22// and for regex patterns that are compile-time constants
23#![allow(clippy::unwrap_used, clippy::expect_used, clippy::needless_range_loop)]
24
25mod connect;
26mod params;
27mod response;
28
29use std::marker::PhantomData;
30
31use mssql_codec::connection::Connection;
32#[cfg(feature = "tls")]
33use mssql_tls::TlsStream;
34use tds_protocol::packet::PacketType;
35use tds_protocol::rpc::RpcRequest;
36use tds_protocol::token::{EnvChange, EnvChangeType};
37use tokio::net::TcpStream;
38
39use crate::config::Config;
40use crate::error::{Error, Result};
41#[cfg(feature = "otel")]
42use crate::instrumentation::InstrumentationContext;
43use crate::state::{ConnectionState, InTransaction, Ready};
44use crate::statement_cache::StatementCache;
45use crate::stream::{MultiResultStream, QueryStream};
46use crate::transaction::SavePoint;
47
48/// How long to wait for the server to acknowledge an Attention packet after
49/// a command timeout. SqlClient waits 5 seconds before dooming the
50/// connection; we match it.
51const ATTENTION_ACK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
52
53/// Run a network future under an optional command deadline.
54///
55/// On timeout this sends an Attention packet via `canceller` and then awaits
56/// the future so its own read loop drains the server's DONE_ATTN
57/// acknowledgement, leaving the connection clean before returning
58/// [`Error::CommandTimeout`]. This is the cancel-safe alternative to dropping
59/// the future (e.g. via `tokio::time::timeout`), which would leave unconsumed
60/// TDS data in the connection buffer and desync the next request.
61///
62/// The drain itself is bounded by [`ATTENTION_ACK_TIMEOUT`] — a hung server
63/// that never acknowledges the attention must not turn the timeout into an
64/// infinite wait. When the bound expires the connection is abandoned
65/// mid-response: `in_flight` stays set, so the pool discards the connection
66/// at check-in instead of reusing it.
67pub(crate) async fn run_with_deadline<F, T>(
68 fut: F,
69 deadline: Option<std::time::Duration>,
70 canceller: crate::cancel::CancelHandle,
71) -> Result<T>
72where
73 F: std::future::Future<Output = Result<T>>,
74{
75 let Some(d) = deadline else {
76 return fut.await;
77 };
78 tokio::pin!(fut);
79 tokio::select! {
80 biased;
81 res = &mut fut => res,
82 () = tokio::time::sleep(d) => {
83 // Signal cancellation, then let the in-flight read consume the
84 // server's attention acknowledgement so the connection stays usable.
85 let drain = async {
86 let _ = canceller.cancel().await;
87 let _ = (&mut fut).await;
88 };
89 if tokio::time::timeout(ATTENTION_ACK_TIMEOUT, drain).await.is_err() {
90 tracing::warn!(
91 timeout = ?ATTENTION_ACK_TIMEOUT,
92 "server did not acknowledge attention; abandoning the connection as dirty"
93 );
94 }
95 Err(Error::CommandTimeout)
96 }
97 }
98}
99
100/// SQL Server client with type-state connection management.
101///
102/// The generic parameter `S` represents the current connection state,
103/// ensuring at compile time that certain operations are only available
104/// in appropriate states.
105pub struct Client<S: ConnectionState> {
106 config: Config,
107 _state: PhantomData<S>,
108 /// The underlying connection (present only when connected)
109 connection: Option<ConnectionHandle>,
110 /// Server version from LoginAck (raw u32 TDS version)
111 server_version: Option<u32>,
112 /// Current database from EnvChange
113 current_database: Option<String>,
114 /// Server's default collation from SqlCollation EnvChange during login.
115 /// Used when `SendStringParametersAsUnicode=false` to encode VARCHAR
116 /// parameters with the correct character encoding and collation bytes.
117 server_collation: Option<tds_protocol::token::Collation>,
118 /// Prepared statement cache for query optimization
119 statement_cache: StatementCache,
120 /// Transaction descriptor from BeginTransaction EnvChange.
121 /// Per MS-TDS spec, this value must be included in ALL_HEADERS for subsequent
122 /// requests within an explicit transaction. 0 indicates auto-commit mode.
123 transaction_descriptor: u64,
124 /// Whether a request has been sent and the response has not yet been fully read.
125 /// Used by the connection pool to detect dirty connections after cancel/timeout.
126 in_flight: bool,
127 /// Whether this connection needs a reset on next use.
128 /// Set by connection pool on checkin, cleared after first query/execute.
129 /// When true, the RESETCONNECTION flag is set on the first TDS packet.
130 needs_reset: bool,
131 /// OpenTelemetry instrumentation context (when otel feature is enabled)
132 #[cfg(feature = "otel")]
133 instrumentation: InstrumentationContext,
134 /// Always Encrypted context for column decryption (when always-encrypted feature is enabled)
135 #[cfg(feature = "always-encrypted")]
136 pub(crate) encryption_context: Option<std::sync::Arc<crate::encryption::EncryptionContext>>,
137}
138
139/// Internal connection handle wrapping the actual connection.
140///
141/// This is an enum to support different connection types:
142/// - TLS (TDS 8.0 strict mode) - requires `tls` feature
143/// - TLS with PreLogin wrapping (TDS 7.x style) - requires `tls` feature
144/// - Plain TCP (for internal networks or when `tls` feature is disabled)
145#[allow(dead_code)] // Connection will be used once query execution is implemented
146enum ConnectionHandle {
147 /// TLS connection (TDS 8.0 strict mode - TLS before any TDS traffic)
148 #[cfg(feature = "tls")]
149 Tls(Connection<TlsStream<TcpStream>>),
150 /// TLS connection with PreLogin wrapping (TDS 7.x style)
151 #[cfg(feature = "tls")]
152 TlsPrelogin(Connection<TlsStream<mssql_tls::TlsPreloginWrapper<TcpStream>>>),
153 /// Plain TCP connection (for internal networks or when `tls` feature is disabled)
154 Plain(Connection<TcpStream>),
155}
156
157// Private helper methods available to all connection states
158impl<S: ConnectionState> Client<S> {
159 /// The default per-command deadline from `command_timeout`.
160 ///
161 /// Returns `None` when `command_timeout` is zero, which means "no limit"
162 /// (matching ADO.NET's `SqlCommand.CommandTimeout = 0`).
163 pub(crate) fn command_deadline(&self) -> Option<std::time::Duration> {
164 let t = self.config.command_timeout;
165 if t.is_zero() { None } else { Some(t) }
166 }
167
168 /// Build a cancel handle for the current connection, regardless of
169 /// connection state. The public, documented surface is
170 /// [`Client::<Ready>::cancel_handle`]; both state-specific methods
171 /// delegate here.
172 pub(crate) fn connection_cancel_handle(&self) -> crate::cancel::CancelHandle {
173 let connection = self
174 .connection
175 .as_ref()
176 .expect("connection should be present");
177 match connection {
178 #[cfg(feature = "tls")]
179 ConnectionHandle::Tls(conn) => {
180 crate::cancel::CancelHandle::from_tls(conn.cancel_handle())
181 }
182 #[cfg(feature = "tls")]
183 ConnectionHandle::TlsPrelogin(conn) => {
184 crate::cancel::CancelHandle::from_tls_prelogin(conn.cancel_handle())
185 }
186 ConnectionHandle::Plain(conn) => {
187 crate::cancel::CancelHandle::from_plain(conn.cancel_handle())
188 }
189 }
190 }
191
192 /// Process transaction-related EnvChange tokens.
193 ///
194 /// This handles BeginTransaction, CommitTransaction, and RollbackTransaction
195 /// EnvChange tokens, updating the transaction descriptor accordingly.
196 ///
197 /// This enables executing BEGIN TRANSACTION, COMMIT, and ROLLBACK via raw SQL
198 /// while still having the transaction descriptor tracked correctly.
199 fn process_transaction_env_change(env: &EnvChange, transaction_descriptor: &mut u64) {
200 use tds_protocol::token::EnvChangeValue;
201
202 match env.env_type {
203 EnvChangeType::BeginTransaction => {
204 if let EnvChangeValue::Binary(ref data) = env.new_value {
205 if data.len() >= 8 {
206 let descriptor = u64::from_le_bytes([
207 data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
208 ]);
209 tracing::debug!(descriptor = descriptor, "transaction started via raw SQL");
210 *transaction_descriptor = descriptor;
211 }
212 }
213 }
214 EnvChangeType::CommitTransaction | EnvChangeType::RollbackTransaction => {
215 tracing::debug!(
216 env_type = ?env.env_type,
217 "transaction ended via raw SQL"
218 );
219 *transaction_descriptor = 0;
220 }
221 _ => {}
222 }
223 }
224
225 /// Send a SQL batch to the server.
226 ///
227 /// Uses the client's current transaction descriptor in ALL_HEADERS.
228 /// Per MS-TDS spec, when in an explicit transaction, the descriptor
229 /// returned by BeginTransaction must be included.
230 ///
231 /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
232 /// is included in the first packet to reset connection state.
233 async fn send_sql_batch(&mut self, sql: &str) -> Result<()> {
234 let payload =
235 tds_protocol::encode_sql_batch_with_transaction(sql, self.transaction_descriptor);
236 let max_packet = self.config.packet_size as usize;
237
238 // Check if we need to reset the connection on this request
239 let reset = self.needs_reset;
240 if reset {
241 self.needs_reset = false; // Clear flag before sending
242 tracing::debug!("sending SQL batch with RESETCONNECTION flag");
243 }
244
245 self.in_flight = true;
246 let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
247
248 match connection {
249 #[cfg(feature = "tls")]
250 ConnectionHandle::Tls(conn) => {
251 conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
252 .await?;
253 }
254 #[cfg(feature = "tls")]
255 ConnectionHandle::TlsPrelogin(conn) => {
256 conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
257 .await?;
258 }
259 ConnectionHandle::Plain(conn) => {
260 conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
261 .await?;
262 }
263 }
264
265 Ok(())
266 }
267
268 /// Send an RPC request to the server.
269 ///
270 /// Uses the client's current transaction descriptor in ALL_HEADERS.
271 ///
272 /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
273 /// is included in the first packet to reset connection state.
274 pub(crate) async fn send_rpc(&mut self, rpc: &RpcRequest) -> Result<()> {
275 let payload = rpc.encode_with_transaction(self.transaction_descriptor);
276 let max_packet = self.config.packet_size as usize;
277
278 // Check if we need to reset the connection on this request
279 let reset = self.needs_reset;
280 if reset {
281 self.needs_reset = false; // Clear flag before sending
282 tracing::debug!("sending RPC with RESETCONNECTION flag");
283 }
284
285 self.in_flight = true;
286 let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
287
288 match connection {
289 #[cfg(feature = "tls")]
290 ConnectionHandle::Tls(conn) => {
291 conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
292 .await?;
293 }
294 #[cfg(feature = "tls")]
295 ConnectionHandle::TlsPrelogin(conn) => {
296 conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
297 .await?;
298 }
299 ConnectionHandle::Plain(conn) => {
300 conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
301 .await?;
302 }
303 }
304
305 Ok(())
306 }
307
308 /// Start building a stored procedure call with full control over parameters.
309 ///
310 /// Returns a [`crate::procedure::ProcedureBuilder`] that allows adding named input and output
311 /// parameters before executing the call.
312 ///
313 /// The procedure name is validated to prevent SQL injection. It may be
314 /// schema-qualified (e.g., `"dbo.MyProc"`).
315 ///
316 /// # Example
317 ///
318 /// ```rust,no_run
319 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
320 /// let result = client.procedure("dbo.CalculateSum")?
321 /// .input("@a", &10i32)
322 /// .input("@b", &20i32)
323 /// .output_int("@result")
324 /// .execute().await?;
325 ///
326 /// let sum = result.get_output("@result").unwrap();
327 /// # let _ = sum;
328 /// # Ok(())
329 /// # }
330 /// ```
331 pub fn procedure(
332 &mut self,
333 proc_name: &str,
334 ) -> Result<crate::procedure::ProcedureBuilder<'_, S>> {
335 crate::validation::validate_qualified_identifier(proc_name)?;
336 Ok(crate::procedure::ProcedureBuilder::new(self, proc_name))
337 }
338
339 /// Execute a stored procedure with positional input parameters.
340 ///
341 /// This is a convenience method for the common case of calling a procedure
342 /// with input-only parameters. For output parameters or named parameters,
343 /// use [`procedure()`](Client::procedure) instead.
344 ///
345 /// # Example
346 ///
347 /// ```rust,no_run
348 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
349 /// let result = client.call_procedure("dbo.GetUser", &[&1i32]).await?;
350 /// assert_eq!(result.return_value, 0);
351 ///
352 /// if let Some(rs) = result.first_result_set() {
353 /// println!("columns: {:?}", rs.columns());
354 /// }
355 /// # Ok(())
356 /// # }
357 /// ```
358 pub async fn call_procedure(
359 &mut self,
360 proc_name: &str,
361 params: &[&(dyn crate::ToSql + Sync)],
362 ) -> Result<crate::stream::ProcedureResult> {
363 crate::validation::validate_qualified_identifier(proc_name)?;
364
365 tracing::debug!(
366 proc_name = proc_name,
367 params_count = params.len(),
368 "executing stored procedure"
369 );
370
371 let rpc_params =
372 Self::convert_params_positional(params, self.send_unicode(), self.server_collation())?;
373 let mut rpc = RpcRequest::named(proc_name);
374 for param in rpc_params {
375 rpc = rpc.param(param);
376 }
377
378 let deadline = self.command_deadline();
379 let canceller = self.connection_cancel_handle();
380 run_with_deadline(
381 async {
382 self.send_rpc(&rpc).await?;
383 self.read_procedure_result().await
384 },
385 deadline,
386 canceller,
387 )
388 .await
389 }
390
391 /// Start a bulk insert operation for the specified table.
392 ///
393 /// Sends the `INSERT BULK` statement to the server and returns a
394 /// [`crate::bulk::BulkWriter`] for streaming rows. The writer holds
395 /// a mutable borrow on the client, preventing other operations while
396 /// the bulk insert is in progress.
397 ///
398 /// # Example
399 ///
400 /// ```rust,no_run
401 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
402 /// use mssql_client::{BulkInsertBuilder, BulkColumn, SqlValue};
403 ///
404 /// let builder = BulkInsertBuilder::new("dbo.Users")
405 /// .with_typed_columns(vec![
406 /// BulkColumn::new("id", "INT", 0)?,
407 /// BulkColumn::new("name", "NVARCHAR(100)", 1)?,
408 /// ]);
409 ///
410 /// let mut writer = client.bulk_insert(&builder).await?;
411 /// writer.send_row_values(&[SqlValue::Int(1), SqlValue::String("Alice".into())])?;
412 /// writer.send_row_values(&[SqlValue::Int(2), SqlValue::String("Bob".into())])?;
413 /// let result = writer.finish().await?;
414 /// println!("Inserted {} rows", result.rows_affected);
415 /// # Ok(())
416 /// # }
417 /// ```
418 pub async fn bulk_insert(
419 &mut self,
420 builder: &crate::bulk::BulkInsertBuilder,
421 ) -> Result<crate::bulk::BulkWriter<'_, S>> {
422 use tds_protocol::token::{ColMetaData, Token};
423
424 tracing::debug!(
425 table = builder.table_name(),
426 columns = builder.columns().len(),
427 "starting bulk insert"
428 );
429
430 // Step 1: Query the server for column metadata.
431 // This gives us the exact type encoding the server expects for BulkLoad,
432 // following the pattern established by Tiberius.
433 let meta_query = format!("SELECT TOP 0 * FROM {}", builder.table_name());
434 self.send_sql_batch(&meta_query).await?;
435
436 let message = self.read_response_message().await?;
437 self.in_flight = false;
438
439 // Capture both the raw COLMETADATA bytes and parsed column info
440 let raw_payload = message.payload.clone();
441 let mut parser = self.create_parser(message.payload);
442 let mut server_metadata: Option<ColMetaData> = None;
443 let mut meta_start: usize = 0;
444 let mut meta_end: usize = 0;
445
446 loop {
447 let pos_before = raw_payload.len() - parser.remaining();
448 let token = parser.next_token_with_metadata(server_metadata.as_ref())?;
449 let pos_after = raw_payload.len() - parser.remaining();
450 let Some(token) = token else { break };
451
452 match token {
453 Token::ColMetaData(meta) => {
454 meta_start = pos_before;
455 meta_end = pos_after;
456 server_metadata = Some(meta);
457 }
458 Token::Done(_) => break,
459 _ => {}
460 }
461 }
462
463 // Reject deprecated TEXT/NTEXT/IMAGE columns reported by the server.
464 // These types require a legacy TEXTPTR wire format that this driver
465 // does not support — users should migrate the column to VARCHAR(MAX) /
466 // NVARCHAR(MAX) / VARBINARY(MAX).
467 if let Some(ref meta) = server_metadata {
468 use tds_protocol::types::TypeId;
469 for col in meta.columns.iter() {
470 let (rejected, replacement) = match col.type_id {
471 TypeId::Text => (Some("TEXT"), "VARCHAR(MAX)"),
472 TypeId::NText => (Some("NTEXT"), "NVARCHAR(MAX)"),
473 TypeId::Image => (Some("IMAGE"), "VARBINARY(MAX)"),
474 _ => (None, ""),
475 };
476 if let Some(sql_type) = rejected {
477 return Err(Error::from(mssql_types::TypeError::UnsupportedType {
478 sql_type: sql_type.to_string(),
479 reason: format!(
480 "column `{}` in table `{}` is {} — TEXT/NTEXT/IMAGE \
481 are not supported. Alter the column to {} instead \
482 (Microsoft deprecated TEXT/NTEXT/IMAGE in SQL \
483 Server 2005).",
484 col.name,
485 builder.table_name(),
486 sql_type,
487 replacement,
488 ),
489 }));
490 }
491 }
492 }
493
494 // Step 2: Send INSERT BULK statement to put server in bulk load mode
495 let stmt = builder.build_insert_bulk_statement()?;
496 self.send_sql_batch(&stmt).await?;
497 self.read_execute_result().await?;
498
499 // Step 3: Create bulk writer with server's metadata
500 let raw_meta = if meta_end > meta_start {
501 Some(raw_payload.slice(meta_start..meta_end))
502 } else {
503 None
504 };
505
506 let server_cols = server_metadata.as_ref().map(|m| m.columns.as_slice());
507 let bulk = crate::bulk::BulkInsert::new_with_server_metadata(
508 builder.columns().to_vec(),
509 builder.options().batch_size,
510 raw_meta,
511 server_cols,
512 );
513
514 Ok(crate::bulk::BulkWriter::new(self, bulk))
515 }
516
517 /// Start a bulk insert without querying the server for column metadata.
518 ///
519 /// Unlike [`bulk_insert()`](Self::bulk_insert), this method does not send
520 /// `SELECT TOP 0 * FROM table` to discover column types. Instead, the
521 /// column metadata is constructed from the `BulkColumn` types provided
522 /// on the builder. This saves a round-trip when the schema is known.
523 ///
524 /// # Caveats
525 ///
526 /// The caller must ensure `BulkColumn` entries match the target table's
527 /// column definitions exactly. Mismatched types, lengths, precision/scale,
528 /// or column ordering will cause the server to reject the BulkLoad packet.
529 ///
530 /// For most use cases, prefer [`bulk_insert()`](Self::bulk_insert) — the
531 /// extra round-trip is usually negligible and the server-supplied metadata
532 /// is guaranteed correct.
533 pub async fn bulk_insert_without_schema_discovery(
534 &mut self,
535 builder: &crate::bulk::BulkInsertBuilder,
536 ) -> Result<crate::bulk::BulkWriter<'_, S>> {
537 tracing::debug!(
538 table = builder.table_name(),
539 columns = builder.columns().len(),
540 "starting bulk insert (no schema discovery)"
541 );
542
543 // Send INSERT BULK statement to put server in bulk load mode
544 let stmt = builder.build_insert_bulk_statement()?;
545 self.send_sql_batch(&stmt).await?;
546 self.read_execute_result().await?;
547
548 // Create bulk writer with hand-crafted metadata
549 let bulk =
550 crate::bulk::BulkInsert::new(builder.columns().to_vec(), builder.options().batch_size);
551
552 Ok(crate::bulk::BulkWriter::new(self, bulk))
553 }
554
555 /// Send bulk load data as a BulkLoad (0x07) message and read the server response.
556 ///
557 /// Used internally by [`crate::bulk::BulkWriter::finish()`] to transmit accumulated
558 /// row data after the `INSERT BULK` statement has been acknowledged.
559 pub(crate) async fn send_and_read_bulk_load(&mut self, payload: bytes::Bytes) -> Result<u64> {
560 let max_packet = self.config.packet_size as usize;
561
562 self.in_flight = true;
563 let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
564
565 match connection {
566 #[cfg(feature = "tls")]
567 ConnectionHandle::Tls(conn) => {
568 conn.send_message(PacketType::BulkLoad, payload, max_packet)
569 .await?;
570 }
571 #[cfg(feature = "tls")]
572 ConnectionHandle::TlsPrelogin(conn) => {
573 conn.send_message(PacketType::BulkLoad, payload, max_packet)
574 .await?;
575 }
576 ConnectionHandle::Plain(conn) => {
577 conn.send_message(PacketType::BulkLoad, payload, max_packet)
578 .await?;
579 }
580 }
581
582 // Read the server's Done response with row count
583 self.read_execute_result().await
584 }
585
586 /// Execute a query with named parameters and return a streaming result set.
587 ///
588 /// This method accepts [`NamedParam`](crate::to_params::NamedParam) values,
589 /// making it compatible with the [`ToParams`](crate::to_params::ToParams) trait
590 /// and the `#[derive(ToParams)]` macro.
591 ///
592 /// # Example
593 ///
594 /// ```rust,no_run
595 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
596 /// use mssql_client::{NamedParam, ToParams};
597 ///
598 /// // With derive macro:
599 /// #[derive(mssql_derive::ToParams)]
600 /// struct UserQuery { name: String }
601 ///
602 /// let q = UserQuery { name: "Alice".into() };
603 /// let rows = client.query_named(
604 /// "SELECT * FROM users WHERE name = @name",
605 /// &q.to_params()?,
606 /// ).await?;
607 ///
608 /// // Or manually:
609 /// let params = vec![NamedParam::from_value("name", &"Alice")?];
610 /// let rows = client.query_named(
611 /// "SELECT * FROM users WHERE name = @name",
612 /// ¶ms,
613 /// ).await?;
614 /// # let _ = rows;
615 /// # Ok(())
616 /// # }
617 /// ```
618 pub async fn query_named<'a>(
619 &'a mut self,
620 sql: &str,
621 params: &[crate::to_params::NamedParam],
622 ) -> Result<QueryStream<'a>> {
623 tracing::debug!(
624 sql = sql,
625 params_count = params.len(),
626 "executing query with named parameters"
627 );
628
629 if params.is_empty() {
630 self.send_sql_batch(sql).await?;
631 } else {
632 let rpc_params =
633 Self::convert_named_params(params, self.send_unicode(), self.server_collation())?;
634 let rpc = RpcRequest::execute_sql(sql, rpc_params);
635 self.send_rpc(&rpc).await?;
636 }
637
638 let resp = self.read_query_response().await?;
639 #[cfg(feature = "always-encrypted")]
640 {
641 Ok(QueryStream::from_raw(
642 resp.columns,
643 resp.pending_rows,
644 resp.meta,
645 resp.decryptor,
646 ))
647 }
648 #[cfg(not(feature = "always-encrypted"))]
649 {
650 Ok(QueryStream::from_raw(
651 resp.columns,
652 resp.pending_rows,
653 resp.meta,
654 ))
655 }
656 }
657
658 /// Execute a statement with named parameters.
659 ///
660 /// Returns the number of affected rows. This is the named-parameter
661 /// counterpart of [`execute()`](Client::execute), compatible with the
662 /// [`ToParams`](crate::to_params::ToParams) trait.
663 ///
664 /// # Example
665 ///
666 /// ```rust,no_run
667 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
668 /// use mssql_client::NamedParam;
669 ///
670 /// let params = vec![
671 /// NamedParam::from_value("name", &"Alice")?,
672 /// NamedParam::from_value("email", &"alice@example.com")?,
673 /// ];
674 /// let rows_affected = client.execute_named(
675 /// "INSERT INTO users (name, email) VALUES (@name, @email)",
676 /// ¶ms,
677 /// ).await?;
678 /// # let _ = rows_affected;
679 /// # Ok(())
680 /// # }
681 /// ```
682 pub async fn execute_named(
683 &mut self,
684 sql: &str,
685 params: &[crate::to_params::NamedParam],
686 ) -> Result<u64> {
687 tracing::debug!(
688 sql = sql,
689 params_count = params.len(),
690 "executing statement with named parameters"
691 );
692
693 let deadline = self.command_deadline();
694 let canceller = self.connection_cancel_handle();
695 run_with_deadline(
696 async {
697 if params.is_empty() {
698 self.send_sql_batch(sql).await?;
699 } else {
700 let rpc_params = Self::convert_named_params(
701 params,
702 self.send_unicode(),
703 self.server_collation(),
704 )?;
705 let rpc = RpcRequest::execute_sql(sql, rpc_params);
706 self.send_rpc(&rpc).await?;
707 }
708
709 self.read_execute_result().await
710 },
711 deadline,
712 canceller,
713 )
714 .await
715 }
716
717 /// Whether string parameters are sent as NVARCHAR (Unicode).
718 pub(crate) fn send_unicode(&self) -> bool {
719 self.config.send_string_parameters_as_unicode
720 }
721
722 /// Server's default collation, captured from ENVCHANGE during login.
723 pub(crate) fn server_collation(&self) -> Option<&tds_protocol::token::Collation> {
724 self.server_collation.as_ref()
725 }
726}
727
728impl Client<Ready> {
729 /// Mark this connection as needing a reset on next use.
730 ///
731 /// Called by the connection pool when a connection is returned.
732 /// The next SQL batch or RPC will include the RESETCONNECTION flag
733 /// in the TDS packet header, causing SQL Server to reset connection
734 /// state (temp tables, SET options, transaction isolation level, etc.)
735 /// before executing the command.
736 ///
737 /// This is more efficient than calling `sp_reset_connection` as a
738 /// separate command because it's handled at the TDS protocol level.
739 pub fn mark_needs_reset(&mut self) {
740 self.needs_reset = true;
741 }
742
743 /// Check if this connection needs a reset.
744 ///
745 /// Returns true if `mark_needs_reset()` was called and the reset
746 /// hasn't been performed yet.
747 #[must_use]
748 pub fn needs_reset(&self) -> bool {
749 self.needs_reset
750 }
751
752 /// Execute a query and return a result set with lazy per-row decoding.
753 ///
754 /// Per ADR-007 the full response is buffered in memory and each row is
755 /// *decoded* on demand as you iterate — this is not incremental network
756 /// streaming, so peak memory tracks the response size. Use
757 /// `.collect_all()` if you want all rows materialized into a `Vec` up
758 /// front.
759 ///
760 /// # Example
761 ///
762 /// ```rust,no_run
763 /// # use mssql_client::Row;
764 /// # fn process(_: &Row) {}
765 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
766 /// // Streaming (synchronous iteration over the result set)
767 /// let stream = client.query("SELECT * FROM users WHERE id = @p1", &[&1]).await?;
768 /// for row in stream {
769 /// let row = row?;
770 /// process(&row);
771 /// }
772 ///
773 /// // Buffered (loads all into memory)
774 /// let rows: Vec<Row> = client
775 /// .query("SELECT * FROM small_table", &[])
776 /// .await?
777 /// .collect_all()
778 /// .await?;
779 /// # let _ = rows;
780 /// # Ok(())
781 /// # }
782 /// ```
783 pub async fn query<'a>(
784 &'a mut self,
785 sql: &str,
786 params: &[&(dyn crate::ToSql + Sync)],
787 ) -> Result<QueryStream<'a>> {
788 let deadline = self.command_deadline();
789 self.query_inner(sql, params, deadline).await
790 }
791
792 /// Shared query implementation with an explicit command deadline.
793 async fn query_inner<'a>(
794 &'a mut self,
795 sql: &str,
796 params: &[&(dyn crate::ToSql + Sync)],
797 deadline: Option<std::time::Duration>,
798 ) -> Result<QueryStream<'a>> {
799 tracing::debug!(sql = sql, params_count = params.len(), "executing query");
800
801 #[cfg(feature = "otel")]
802 let instrumentation = self.instrumentation.clone();
803 #[cfg(feature = "otel")]
804 let mut span = instrumentation.query_span(sql);
805 #[cfg(feature = "otel")]
806 let timer = crate::instrumentation::OperationTimer::start(
807 crate::instrumentation::extract_operation(sql),
808 );
809
810 let canceller = self.cancel_handle();
811 let result = run_with_deadline(
812 async {
813 if params.is_empty() {
814 // Simple query without parameters - use SQL batch
815 self.send_sql_batch(sql).await?;
816 } else {
817 // Parameterized query - use sp_executesql via RPC
818 let rpc_params =
819 Self::convert_params(params, self.send_unicode(), self.server_collation())?;
820 let rpc = RpcRequest::execute_sql(sql, rpc_params);
821 self.send_rpc(&rpc).await?;
822 }
823
824 // Read complete response including columns and rows
825 self.read_query_response().await
826 },
827 deadline,
828 canceller,
829 )
830 .await;
831
832 #[cfg(feature = "otel")]
833 match &result {
834 Ok(_) => InstrumentationContext::record_success(&mut span, None),
835 Err(e) => InstrumentationContext::record_error(&mut span, e),
836 }
837 #[cfg(feature = "otel")]
838 timer.finish(instrumentation.metrics(), result.is_ok());
839
840 // Drop the span before returning
841 #[cfg(feature = "otel")]
842 drop(span);
843
844 let resp = result?;
845 #[cfg(feature = "always-encrypted")]
846 {
847 Ok(QueryStream::from_raw(
848 resp.columns,
849 resp.pending_rows,
850 resp.meta,
851 resp.decryptor,
852 ))
853 }
854 #[cfg(not(feature = "always-encrypted"))]
855 {
856 Ok(QueryStream::from_raw(
857 resp.columns,
858 resp.pending_rows,
859 resp.meta,
860 ))
861 }
862 }
863
864 /// Execute a query with a specific timeout.
865 ///
866 /// This overrides the default `command_timeout` from the connection configuration
867 /// for this specific query. If the query does not complete within the specified
868 /// duration, the driver sends an Attention packet to cancel it server-side,
869 /// drains the acknowledgement, and returns [`Error::CommandTimeout`] with the
870 /// connection left usable for the next request.
871 ///
872 /// # Arguments
873 ///
874 /// * `sql` - The SQL query to execute
875 /// * `params` - Query parameters
876 /// * `timeout_duration` - Maximum time to wait for the query to complete
877 ///
878 /// # Example
879 ///
880 /// ```rust,no_run
881 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
882 /// use std::time::Duration;
883 ///
884 /// // Execute with a 5-second timeout
885 /// let rows = client
886 /// .query_with_timeout(
887 /// "SELECT * FROM large_table",
888 /// &[],
889 /// Duration::from_secs(5),
890 /// )
891 /// .await?;
892 /// # let _ = rows;
893 /// # Ok(())
894 /// # }
895 /// ```
896 pub async fn query_with_timeout<'a>(
897 &'a mut self,
898 sql: &str,
899 params: &[&(dyn crate::ToSql + Sync)],
900 timeout_duration: std::time::Duration,
901 ) -> Result<QueryStream<'a>> {
902 self.query_inner(sql, params, Some(timeout_duration)).await
903 }
904
905 /// Execute a batch that may return multiple result sets.
906 ///
907 /// This is useful for stored procedures or SQL batches that contain
908 /// multiple SELECT statements.
909 ///
910 /// # Example
911 ///
912 /// ```rust,no_run
913 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
914 /// // Execute a batch with multiple SELECT statements
915 /// let mut results = client.query_multiple(
916 /// "SELECT 1 AS a; SELECT 2 AS b, 3 AS c;",
917 /// &[]
918 /// ).await?;
919 ///
920 /// // Process first result set
921 /// while let Some(row) = results.next_row().await? {
922 /// println!("Result 1: {:?}", row);
923 /// }
924 ///
925 /// // Move to second result set
926 /// if results.next_result().await? {
927 /// while let Some(row) = results.next_row().await? {
928 /// println!("Result 2: {:?}", row);
929 /// }
930 /// }
931 /// # Ok(())
932 /// # }
933 /// ```
934 pub async fn query_multiple<'a>(
935 &'a mut self,
936 sql: &str,
937 params: &[&(dyn crate::ToSql + Sync)],
938 ) -> Result<MultiResultStream<'a>> {
939 tracing::debug!(
940 sql = sql,
941 params_count = params.len(),
942 "executing multi-result query"
943 );
944
945 let deadline = self.command_deadline();
946 let canceller = self.connection_cancel_handle();
947 let result_sets = run_with_deadline(
948 async {
949 if params.is_empty() {
950 // Simple batch without parameters - use SQL batch
951 self.send_sql_batch(sql).await?;
952 } else {
953 // Parameterized query - use sp_executesql via RPC
954 let rpc_params =
955 Self::convert_params(params, self.send_unicode(), self.server_collation())?;
956 let rpc = RpcRequest::execute_sql(sql, rpc_params);
957 self.send_rpc(&rpc).await?;
958 }
959
960 // Read all result sets
961 self.read_multi_result_response().await
962 },
963 deadline,
964 canceller,
965 )
966 .await?;
967 Ok(MultiResultStream::new(result_sets))
968 }
969
970 /// Execute a query that doesn't return rows.
971 ///
972 /// Returns the number of affected rows.
973 pub async fn execute(
974 &mut self,
975 sql: &str,
976 params: &[&(dyn crate::ToSql + Sync)],
977 ) -> Result<u64> {
978 let deadline = self.command_deadline();
979 self.execute_inner(sql, params, deadline).await
980 }
981
982 /// Shared execute implementation with an explicit command deadline.
983 async fn execute_inner(
984 &mut self,
985 sql: &str,
986 params: &[&(dyn crate::ToSql + Sync)],
987 deadline: Option<std::time::Duration>,
988 ) -> Result<u64> {
989 tracing::debug!(
990 sql = sql,
991 params_count = params.len(),
992 "executing statement"
993 );
994
995 #[cfg(feature = "otel")]
996 let instrumentation = self.instrumentation.clone();
997 #[cfg(feature = "otel")]
998 let mut span = instrumentation.query_span(sql);
999 #[cfg(feature = "otel")]
1000 let timer = crate::instrumentation::OperationTimer::start(
1001 crate::instrumentation::extract_operation(sql),
1002 );
1003
1004 let canceller = self.cancel_handle();
1005 let result = run_with_deadline(
1006 async {
1007 if params.is_empty() {
1008 // Simple statement without parameters - use SQL batch
1009 self.send_sql_batch(sql).await?;
1010 } else {
1011 // Parameterized statement - use sp_executesql via RPC
1012 let rpc_params =
1013 Self::convert_params(params, self.send_unicode(), self.server_collation())?;
1014 let rpc = RpcRequest::execute_sql(sql, rpc_params);
1015 self.send_rpc(&rpc).await?;
1016 }
1017
1018 // Read response and get row count
1019 self.read_execute_result().await
1020 },
1021 deadline,
1022 canceller,
1023 )
1024 .await;
1025
1026 #[cfg(feature = "otel")]
1027 match &result {
1028 Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
1029 Err(e) => InstrumentationContext::record_error(&mut span, e),
1030 }
1031 #[cfg(feature = "otel")]
1032 timer.finish(instrumentation.metrics(), result.is_ok());
1033
1034 // Drop the span before returning
1035 #[cfg(feature = "otel")]
1036 drop(span);
1037
1038 result
1039 }
1040
1041 /// Execute a statement with a specific timeout.
1042 ///
1043 /// This overrides the default `command_timeout` from the connection configuration
1044 /// for this specific statement. If the statement does not complete within the
1045 /// specified duration, the driver sends an Attention packet to cancel it
1046 /// server-side, drains the acknowledgement, and returns
1047 /// [`Error::CommandTimeout`] with the connection left usable.
1048 ///
1049 /// # Arguments
1050 ///
1051 /// * `sql` - The SQL statement to execute
1052 /// * `params` - Statement parameters
1053 /// * `timeout_duration` - Maximum time to wait for the statement to complete
1054 ///
1055 /// # Example
1056 ///
1057 /// ```rust,no_run
1058 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1059 /// use std::time::Duration;
1060 ///
1061 /// // Execute with a 10-second timeout
1062 /// let rows_affected = client
1063 /// .execute_with_timeout(
1064 /// "UPDATE large_table SET status = @p1",
1065 /// &[&"processed"],
1066 /// Duration::from_secs(10),
1067 /// )
1068 /// .await?;
1069 /// # let _ = rows_affected;
1070 /// # Ok(())
1071 /// # }
1072 /// ```
1073 pub async fn execute_with_timeout(
1074 &mut self,
1075 sql: &str,
1076 params: &[&(dyn crate::ToSql + Sync)],
1077 timeout_duration: std::time::Duration,
1078 ) -> Result<u64> {
1079 self.execute_inner(sql, params, Some(timeout_duration))
1080 .await
1081 }
1082
1083 /// Begin a transaction.
1084 ///
1085 /// This transitions the client from `Ready` to `InTransaction` state.
1086 /// Per MS-TDS spec, the server returns a transaction descriptor in the
1087 /// BeginTransaction EnvChange token that must be included in subsequent
1088 /// ALL_HEADERS sections.
1089 pub async fn begin_transaction(mut self) -> Result<Client<InTransaction>> {
1090 tracing::debug!("beginning transaction");
1091
1092 #[cfg(feature = "otel")]
1093 let instrumentation = self.instrumentation.clone();
1094 #[cfg(feature = "otel")]
1095 let mut span = instrumentation.transaction_span("BEGIN");
1096
1097 // Execute BEGIN TRANSACTION and extract the transaction descriptor
1098 let result = async {
1099 self.send_sql_batch("BEGIN TRANSACTION").await?;
1100 self.read_transaction_begin_result().await
1101 }
1102 .await;
1103
1104 #[cfg(feature = "otel")]
1105 match &result {
1106 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1107 Err(e) => InstrumentationContext::record_error(&mut span, e),
1108 }
1109
1110 // Drop the span before moving instrumentation
1111 #[cfg(feature = "otel")]
1112 drop(span);
1113
1114 let transaction_descriptor = result?;
1115
1116 Ok(Client {
1117 config: self.config,
1118 _state: PhantomData,
1119 connection: self.connection,
1120 server_version: self.server_version,
1121 current_database: self.current_database,
1122 server_collation: self.server_collation,
1123 statement_cache: self.statement_cache,
1124 transaction_descriptor, // Store the descriptor from server
1125 needs_reset: self.needs_reset,
1126 in_flight: self.in_flight,
1127 #[cfg(feature = "otel")]
1128 instrumentation: self.instrumentation,
1129 #[cfg(feature = "always-encrypted")]
1130 encryption_context: self.encryption_context,
1131 })
1132 }
1133
1134 /// Begin a transaction with a specific isolation level.
1135 ///
1136 /// This transitions the client from `Ready` to `InTransaction` state
1137 /// with the specified isolation level.
1138 ///
1139 /// # Example
1140 ///
1141 /// ```rust,no_run
1142 /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1143 /// use mssql_client::IsolationLevel;
1144 ///
1145 /// let tx = client.begin_transaction_with_isolation(IsolationLevel::Serializable).await?;
1146 /// // All operations in this transaction use SERIALIZABLE isolation
1147 /// tx.commit().await?;
1148 /// # Ok(())
1149 /// # }
1150 /// ```
1151 pub async fn begin_transaction_with_isolation(
1152 mut self,
1153 isolation_level: crate::transaction::IsolationLevel,
1154 ) -> Result<Client<InTransaction>> {
1155 tracing::debug!(
1156 isolation_level = %isolation_level.name(),
1157 "beginning transaction with isolation level"
1158 );
1159
1160 #[cfg(feature = "otel")]
1161 let instrumentation = self.instrumentation.clone();
1162 #[cfg(feature = "otel")]
1163 let mut span = instrumentation.transaction_span("BEGIN");
1164
1165 // First set the isolation level
1166 let result = async {
1167 self.send_sql_batch(isolation_level.as_sql()).await?;
1168 self.read_execute_result().await?;
1169
1170 // Then begin the transaction
1171 self.send_sql_batch("BEGIN TRANSACTION").await?;
1172 self.read_transaction_begin_result().await
1173 }
1174 .await;
1175
1176 #[cfg(feature = "otel")]
1177 match &result {
1178 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1179 Err(e) => InstrumentationContext::record_error(&mut span, e),
1180 }
1181
1182 #[cfg(feature = "otel")]
1183 drop(span);
1184
1185 let transaction_descriptor = result?;
1186
1187 Ok(Client {
1188 config: self.config,
1189 _state: PhantomData,
1190 connection: self.connection,
1191 server_version: self.server_version,
1192 current_database: self.current_database,
1193 server_collation: self.server_collation,
1194 statement_cache: self.statement_cache,
1195 transaction_descriptor,
1196 needs_reset: self.needs_reset,
1197 in_flight: self.in_flight,
1198 #[cfg(feature = "otel")]
1199 instrumentation: self.instrumentation,
1200 #[cfg(feature = "always-encrypted")]
1201 encryption_context: self.encryption_context,
1202 })
1203 }
1204
1205 /// Execute a simple query without parameters.
1206 ///
1207 /// This is useful for DDL statements and simple queries where you
1208 /// don't need to retrieve the affected row count.
1209 pub async fn simple_query(&mut self, sql: &str) -> Result<()> {
1210 tracing::debug!(sql = sql, "executing simple query");
1211
1212 // Send SQL batch
1213 self.send_sql_batch(sql).await?;
1214
1215 // Read and discard response
1216 let _ = self.read_execute_result().await?;
1217
1218 Ok(())
1219 }
1220
1221 /// Close the connection gracefully.
1222 pub async fn close(self) -> Result<()> {
1223 tracing::debug!("closing connection");
1224 Ok(())
1225 }
1226
1227 /// Get the current database name.
1228 #[must_use]
1229 pub fn database(&self) -> Option<&str> {
1230 self.config.database.as_deref()
1231 }
1232
1233 /// Get the server host.
1234 #[must_use]
1235 pub fn host(&self) -> &str {
1236 &self.config.host
1237 }
1238
1239 /// Get the server port.
1240 #[must_use]
1241 pub fn port(&self) -> u16 {
1242 self.config.port
1243 }
1244
1245 /// Check if the connection is currently in a transaction.
1246 ///
1247 /// This returns `true` if a transaction was started via raw SQL
1248 /// (`BEGIN TRANSACTION`) and has not yet been committed or rolled back.
1249 ///
1250 /// Note: This only tracks transactions started via raw SQL. Transactions
1251 /// started via the type-state API (`begin_transaction()`) result in a
1252 /// `Client<InTransaction>` which is a different type.
1253 ///
1254 /// # Example
1255 ///
1256 /// ```rust,no_run
1257 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1258 /// client.execute("BEGIN TRANSACTION", &[]).await?;
1259 /// assert!(client.is_in_transaction());
1260 ///
1261 /// client.execute("COMMIT", &[]).await?;
1262 /// assert!(!client.is_in_transaction());
1263 /// # Ok(())
1264 /// # }
1265 /// ```
1266 #[must_use]
1267 pub fn is_in_transaction(&self) -> bool {
1268 self.transaction_descriptor != 0
1269 }
1270
1271 /// Check if a request is in-flight (sent but response not fully read).
1272 ///
1273 /// Used by the connection pool to detect dirty connections that were
1274 /// interrupted mid-query (e.g., by `tokio::select!` or a timeout).
1275 /// A connection with an in-flight request has unread data in the TCP
1276 /// buffer and must be discarded rather than returned to the pool.
1277 #[must_use]
1278 pub fn is_in_flight(&self) -> bool {
1279 self.in_flight
1280 }
1281
1282 /// Report whether an Always Encrypted key-store provider with the given
1283 /// name is currently reachable through this client's encryption context.
1284 ///
1285 /// Returns `false` when the `always-encrypted` feature isn't enabled, when
1286 /// the connection was opened without `column_encryption` configured, or
1287 /// when no matching provider was registered.
1288 #[cfg(feature = "always-encrypted")]
1289 #[must_use]
1290 pub fn has_encryption_provider(&self, name: &str) -> bool {
1291 self.encryption_context
1292 .as_ref()
1293 .is_some_and(|ctx| ctx.has_provider(name))
1294 }
1295
1296 /// Get a handle for cancelling the current query.
1297 ///
1298 /// The cancel handle can be cloned and sent to other tasks, enabling
1299 /// cancellation of long-running queries from a separate async context.
1300 ///
1301 /// # Example
1302 ///
1303 /// ```rust,no_run
1304 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1305 /// use std::time::Duration;
1306 ///
1307 /// let cancel_handle = client.cancel_handle();
1308 ///
1309 /// // Spawn a task to cancel after 10 seconds
1310 /// let handle = tokio::spawn(async move {
1311 /// tokio::time::sleep(Duration::from_secs(10)).await;
1312 /// let _ = cancel_handle.cancel().await;
1313 /// });
1314 ///
1315 /// // This query will be cancelled if it runs longer than 10 seconds
1316 /// let result = client.query("SELECT * FROM very_large_table", &[]).await;
1317 /// # let _ = (handle, result);
1318 /// # Ok(())
1319 /// # }
1320 /// ```
1321 #[must_use]
1322 pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
1323 self.connection_cancel_handle()
1324 }
1325}
1326
1327/// # Drop Behavior
1328///
1329/// **`Client<InTransaction>` has no automatic rollback on drop.** If the client is
1330/// dropped without calling [`commit()`](Client::commit) or [`rollback()`](Client::rollback),
1331/// the transaction remains open on the server until the TCP connection closes
1332/// (at which point SQL Server automatically rolls back).
1333///
1334/// This is because `Drop` is synchronous and cannot perform the async I/O needed
1335/// to send a `ROLLBACK TRANSACTION` command.
1336///
1337/// ## Consequences of dropping without commit/rollback
1338///
1339/// - **Direct connections:** The transaction leaks until the OS TCP timeout
1340/// (potentially 30+ minutes), holding locks on any modified rows.
1341/// - **Pooled connections:** The pool detects the active transaction descriptor
1342/// and discards the connection rather than returning it to the idle pool
1343/// (see `PooledConnection::drop` in `mssql-driver-pool`).
1344///
1345/// ## Best practice
1346///
1347/// Always ensure `commit()` or `rollback()` is called. Use helper patterns
1348/// for error paths:
1349///
1350/// ```rust,no_run
1351/// # async fn do_work(_: &mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> { Ok(()) }
1352/// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1353/// let tx = client.begin_transaction().await?;
1354/// match do_work(&tx).await {
1355/// Ok(_) => { tx.commit().await?; }
1356/// Err(e) => { tx.rollback().await?; return Err(e); }
1357/// }
1358/// # Ok(())
1359/// # }
1360/// ```
1361impl Client<InTransaction> {
1362 /// Execute a query within the transaction and return a streaming result set.
1363 ///
1364 /// See [`Client<Ready>::query`] for usage examples.
1365 pub async fn query<'a>(
1366 &'a mut self,
1367 sql: &str,
1368 params: &[&(dyn crate::ToSql + Sync)],
1369 ) -> Result<QueryStream<'a>> {
1370 let deadline = self.command_deadline();
1371 self.query_inner(sql, params, deadline).await
1372 }
1373
1374 /// Shared query implementation with an explicit command deadline.
1375 async fn query_inner<'a>(
1376 &'a mut self,
1377 sql: &str,
1378 params: &[&(dyn crate::ToSql + Sync)],
1379 deadline: Option<std::time::Duration>,
1380 ) -> Result<QueryStream<'a>> {
1381 tracing::debug!(
1382 sql = sql,
1383 params_count = params.len(),
1384 "executing query in transaction"
1385 );
1386
1387 #[cfg(feature = "otel")]
1388 let instrumentation = self.instrumentation.clone();
1389 #[cfg(feature = "otel")]
1390 let mut span = instrumentation.query_span(sql);
1391 #[cfg(feature = "otel")]
1392 let timer = crate::instrumentation::OperationTimer::start(
1393 crate::instrumentation::extract_operation(sql),
1394 );
1395
1396 let canceller = self.cancel_handle();
1397 let result = run_with_deadline(
1398 async {
1399 if params.is_empty() {
1400 // Simple query without parameters - use SQL batch
1401 self.send_sql_batch(sql).await?;
1402 } else {
1403 // Parameterized query - use sp_executesql via RPC
1404 let rpc_params =
1405 Self::convert_params(params, self.send_unicode(), self.server_collation())?;
1406 let rpc = RpcRequest::execute_sql(sql, rpc_params);
1407 self.send_rpc(&rpc).await?;
1408 }
1409
1410 // Read complete response including columns and rows
1411 self.read_query_response().await
1412 },
1413 deadline,
1414 canceller,
1415 )
1416 .await;
1417
1418 #[cfg(feature = "otel")]
1419 match &result {
1420 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1421 Err(e) => InstrumentationContext::record_error(&mut span, e),
1422 }
1423 #[cfg(feature = "otel")]
1424 timer.finish(instrumentation.metrics(), result.is_ok());
1425
1426 // Drop the span before returning
1427 #[cfg(feature = "otel")]
1428 drop(span);
1429
1430 let resp = result?;
1431 #[cfg(feature = "always-encrypted")]
1432 {
1433 Ok(QueryStream::from_raw(
1434 resp.columns,
1435 resp.pending_rows,
1436 resp.meta,
1437 resp.decryptor,
1438 ))
1439 }
1440 #[cfg(not(feature = "always-encrypted"))]
1441 {
1442 Ok(QueryStream::from_raw(
1443 resp.columns,
1444 resp.pending_rows,
1445 resp.meta,
1446 ))
1447 }
1448 }
1449
1450 /// Execute a statement within the transaction.
1451 ///
1452 /// Returns the number of affected rows.
1453 pub async fn execute(
1454 &mut self,
1455 sql: &str,
1456 params: &[&(dyn crate::ToSql + Sync)],
1457 ) -> Result<u64> {
1458 let deadline = self.command_deadline();
1459 self.execute_inner(sql, params, deadline).await
1460 }
1461
1462 /// Shared execute implementation with an explicit command deadline.
1463 async fn execute_inner(
1464 &mut self,
1465 sql: &str,
1466 params: &[&(dyn crate::ToSql + Sync)],
1467 deadline: Option<std::time::Duration>,
1468 ) -> Result<u64> {
1469 tracing::debug!(
1470 sql = sql,
1471 params_count = params.len(),
1472 "executing statement in transaction"
1473 );
1474
1475 #[cfg(feature = "otel")]
1476 let instrumentation = self.instrumentation.clone();
1477 #[cfg(feature = "otel")]
1478 let mut span = instrumentation.query_span(sql);
1479 #[cfg(feature = "otel")]
1480 let timer = crate::instrumentation::OperationTimer::start(
1481 crate::instrumentation::extract_operation(sql),
1482 );
1483
1484 let canceller = self.cancel_handle();
1485 let result = run_with_deadline(
1486 async {
1487 if params.is_empty() {
1488 // Simple statement without parameters - use SQL batch
1489 self.send_sql_batch(sql).await?;
1490 } else {
1491 // Parameterized statement - use sp_executesql via RPC
1492 let rpc_params =
1493 Self::convert_params(params, self.send_unicode(), self.server_collation())?;
1494 let rpc = RpcRequest::execute_sql(sql, rpc_params);
1495 self.send_rpc(&rpc).await?;
1496 }
1497
1498 // Read response and get row count
1499 self.read_execute_result().await
1500 },
1501 deadline,
1502 canceller,
1503 )
1504 .await;
1505
1506 #[cfg(feature = "otel")]
1507 match &result {
1508 Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
1509 Err(e) => InstrumentationContext::record_error(&mut span, e),
1510 }
1511 #[cfg(feature = "otel")]
1512 timer.finish(instrumentation.metrics(), result.is_ok());
1513
1514 // Drop the span before returning
1515 #[cfg(feature = "otel")]
1516 drop(span);
1517
1518 result
1519 }
1520
1521 /// Execute a query within the transaction with a specific timeout.
1522 ///
1523 /// See [`Client<Ready>::query_with_timeout`] for details.
1524 pub async fn query_with_timeout<'a>(
1525 &'a mut self,
1526 sql: &str,
1527 params: &[&(dyn crate::ToSql + Sync)],
1528 timeout_duration: std::time::Duration,
1529 ) -> Result<QueryStream<'a>> {
1530 self.query_inner(sql, params, Some(timeout_duration)).await
1531 }
1532
1533 /// Execute a statement within the transaction with a specific timeout.
1534 ///
1535 /// See [`Client<Ready>::execute_with_timeout`] for details.
1536 pub async fn execute_with_timeout(
1537 &mut self,
1538 sql: &str,
1539 params: &[&(dyn crate::ToSql + Sync)],
1540 timeout_duration: std::time::Duration,
1541 ) -> Result<u64> {
1542 self.execute_inner(sql, params, Some(timeout_duration))
1543 .await
1544 }
1545
1546 /// Open a FILESTREAM BLOB for async reading and/or writing.
1547 ///
1548 /// This method queries the server for the transaction context, then opens
1549 /// the FILESTREAM handle using the native Win32 `OpenSqlFilestream` API.
1550 ///
1551 /// # Arguments
1552 ///
1553 /// * `path` — The UNC path obtained from the T-SQL `column.PathName()` function.
1554 /// Query this yourself before calling `open_filestream`:
1555 /// ```sql
1556 /// SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1
1557 /// ```
1558 /// * `access` — Read, write, or read/write access mode.
1559 ///
1560 /// # Requirements
1561 ///
1562 /// - SQL Server must have FILESTREAM enabled (`sp_configure 'filestream access level', 2`)
1563 /// - The Microsoft OLE DB Driver for SQL Server must be installed on the client
1564 /// - The `FileStream` must be dropped before calling [`commit`] or [`rollback`]
1565 ///
1566 /// # Example
1567 ///
1568 /// ```text
1569 /// use mssql_client::FileStreamAccess;
1570 /// use tokio::io::AsyncReadExt;
1571 ///
1572 /// let mut tx = client.begin_transaction().await?;
1573 ///
1574 /// // Get the FILESTREAM path
1575 /// let rows = tx.query(
1576 /// "SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1",
1577 /// &[&doc_id],
1578 /// ).await?;
1579 /// let path: String = rows.into_iter().next().unwrap()?.get(0)?;
1580 ///
1581 /// // Open and read the BLOB
1582 /// let mut stream = tx.open_filestream(&path, FileStreamAccess::Read).await?;
1583 /// let mut data = Vec::new();
1584 /// stream.read_to_end(&mut data).await?;
1585 /// drop(stream);
1586 ///
1587 /// tx.commit().await?;
1588 /// ```
1589 #[cfg(all(windows, feature = "filestream"))]
1590 pub async fn open_filestream(
1591 &mut self,
1592 path: &str,
1593 access: crate::filestream::FileStreamAccess,
1594 ) -> Result<crate::filestream::FileStream> {
1595 tracing::debug!(path = path, ?access, "opening FILESTREAM BLOB");
1596
1597 // Get the transaction context from SQL Server.
1598 // This binds the file access to the current SQL transaction.
1599 let txn_context: Vec<u8> = {
1600 let rows = self
1601 .query("SELECT GET_FILESTREAM_TRANSACTION_CONTEXT()", &[])
1602 .await?;
1603 let mut ctx = None;
1604 for result in rows {
1605 let row = result?;
1606 ctx = Some(row.get::<Vec<u8>>(0)?);
1607 }
1608 ctx.ok_or_else(|| {
1609 Error::FileStream("GET_FILESTREAM_TRANSACTION_CONTEXT() returned no rows".into())
1610 })?
1611 };
1612
1613 crate::filestream::FileStream::open(path, access, &txn_context)
1614 }
1615
1616 /// Commit the transaction.
1617 ///
1618 /// This transitions the client back to `Ready` state.
1619 pub async fn commit(mut self) -> Result<Client<Ready>> {
1620 tracing::debug!("committing transaction");
1621
1622 #[cfg(feature = "otel")]
1623 let instrumentation = self.instrumentation.clone();
1624 #[cfg(feature = "otel")]
1625 let mut span = instrumentation.transaction_span("COMMIT");
1626
1627 // Execute COMMIT TRANSACTION
1628 let result = async {
1629 self.send_sql_batch("COMMIT TRANSACTION").await?;
1630 self.read_execute_result().await
1631 }
1632 .await;
1633
1634 #[cfg(feature = "otel")]
1635 match &result {
1636 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1637 Err(e) => InstrumentationContext::record_error(&mut span, e),
1638 }
1639
1640 // Drop the span before moving instrumentation
1641 #[cfg(feature = "otel")]
1642 drop(span);
1643
1644 result?;
1645
1646 Ok(Client {
1647 config: self.config,
1648 _state: PhantomData,
1649 connection: self.connection,
1650 server_version: self.server_version,
1651 current_database: self.current_database,
1652 server_collation: self.server_collation,
1653 statement_cache: self.statement_cache,
1654 transaction_descriptor: 0, // Reset to auto-commit mode
1655 needs_reset: self.needs_reset,
1656 in_flight: self.in_flight,
1657 #[cfg(feature = "otel")]
1658 instrumentation: self.instrumentation,
1659 #[cfg(feature = "always-encrypted")]
1660 encryption_context: self.encryption_context,
1661 })
1662 }
1663
1664 /// Rollback the transaction.
1665 ///
1666 /// This transitions the client back to `Ready` state.
1667 pub async fn rollback(mut self) -> Result<Client<Ready>> {
1668 tracing::debug!("rolling back transaction");
1669
1670 #[cfg(feature = "otel")]
1671 let instrumentation = self.instrumentation.clone();
1672 #[cfg(feature = "otel")]
1673 let mut span = instrumentation.transaction_span("ROLLBACK");
1674
1675 // Execute ROLLBACK TRANSACTION
1676 let result = async {
1677 self.send_sql_batch("ROLLBACK TRANSACTION").await?;
1678 self.read_execute_result().await
1679 }
1680 .await;
1681
1682 #[cfg(feature = "otel")]
1683 match &result {
1684 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1685 Err(e) => InstrumentationContext::record_error(&mut span, e),
1686 }
1687
1688 // Drop the span before moving instrumentation
1689 #[cfg(feature = "otel")]
1690 drop(span);
1691
1692 result?;
1693
1694 Ok(Client {
1695 config: self.config,
1696 _state: PhantomData,
1697 connection: self.connection,
1698 server_version: self.server_version,
1699 current_database: self.current_database,
1700 server_collation: self.server_collation,
1701 statement_cache: self.statement_cache,
1702 transaction_descriptor: 0, // Reset to auto-commit mode
1703 needs_reset: self.needs_reset,
1704 in_flight: self.in_flight,
1705 #[cfg(feature = "otel")]
1706 instrumentation: self.instrumentation,
1707 #[cfg(feature = "always-encrypted")]
1708 encryption_context: self.encryption_context,
1709 })
1710 }
1711
1712 /// Create a savepoint and return a handle for later rollback.
1713 ///
1714 /// The returned `SavePoint` handle contains the validated savepoint name.
1715 /// Use it with `rollback_to()` to partially undo transaction work.
1716 ///
1717 /// # Example
1718 ///
1719 /// ```rust,no_run
1720 /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1721 /// let mut tx = client.begin_transaction().await?;
1722 /// tx.execute("INSERT INTO orders ...", &[]).await?;
1723 /// let sp = tx.save_point("before_items").await?;
1724 /// tx.execute("INSERT INTO items ...", &[]).await?;
1725 /// // Oops, rollback just the items
1726 /// tx.rollback_to(&sp).await?;
1727 /// tx.commit().await?;
1728 /// # Ok(())
1729 /// # }
1730 /// ```
1731 pub async fn save_point(&mut self, name: &str) -> Result<SavePoint> {
1732 crate::validation::validate_identifier(name)?;
1733 tracing::debug!(name = name, "creating savepoint");
1734
1735 // Execute SAVE TRANSACTION <name>
1736 // Note: name is validated by validate_identifier() to prevent SQL injection
1737 let sql = format!("SAVE TRANSACTION {name}");
1738 self.send_sql_batch(&sql).await?;
1739 self.read_execute_result().await?;
1740
1741 Ok(SavePoint::new(name.to_string()))
1742 }
1743
1744 /// Rollback to a savepoint.
1745 ///
1746 /// This rolls back all changes made after the savepoint was created,
1747 /// but keeps the transaction active. The savepoint remains valid and
1748 /// can be rolled back to again.
1749 ///
1750 /// # Example
1751 ///
1752 /// ```rust,no_run
1753 /// # async fn ex(mut tx: mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> {
1754 /// let sp = tx.save_point("checkpoint").await?;
1755 /// // ... do some work ...
1756 /// tx.rollback_to(&sp).await?; // Undo changes since checkpoint
1757 /// // Transaction is still active, savepoint is still valid
1758 /// # Ok(())
1759 /// # }
1760 /// ```
1761 pub async fn rollback_to(&mut self, savepoint: &SavePoint) -> Result<()> {
1762 tracing::debug!(name = savepoint.name(), "rolling back to savepoint");
1763
1764 // Execute ROLLBACK TRANSACTION <name>
1765 // Note: savepoint name was validated during creation
1766 let sql = format!("ROLLBACK TRANSACTION {}", savepoint.name());
1767 self.send_sql_batch(&sql).await?;
1768 self.read_execute_result().await?;
1769
1770 Ok(())
1771 }
1772
1773 /// Release a savepoint (optional cleanup).
1774 ///
1775 /// Note: SQL Server doesn't have explicit savepoint release, but this
1776 /// method is provided for API completeness. The savepoint is automatically
1777 /// released when the transaction commits or rolls back.
1778 pub async fn release_savepoint(&mut self, savepoint: SavePoint) -> Result<()> {
1779 tracing::debug!(name = savepoint.name(), "releasing savepoint");
1780
1781 // SQL Server doesn't require explicit savepoint release
1782 // The savepoint is implicitly released on commit/rollback
1783 // This method exists for API completeness
1784 drop(savepoint);
1785 Ok(())
1786 }
1787
1788 /// Get a handle for cancelling the current query within the transaction.
1789 ///
1790 /// See [`Client<Ready>::cancel_handle`] for usage examples.
1791 #[must_use]
1792 pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
1793 self.connection_cancel_handle()
1794 }
1795}
1796
1797impl<S: ConnectionState> std::fmt::Debug for Client<S> {
1798 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1799 f.debug_struct("Client")
1800 .field("host", &self.config.host)
1801 .field("port", &self.config.port)
1802 .field("database", &self.config.database)
1803 .finish()
1804 }
1805}