mssql_client/client.rs
1//! SQL Server client implementation.
2//!
3//! ## DDL and statement routing
4//!
5//! [`Client::execute`] routes automatically by parameter count: with no
6//! parameters it sends a SQL batch (which permits DDL such as `CREATE` / `ALTER`
7//! / `DROP`); with parameters it uses `sp_executesql`, whose procedure context
8//! SQL Server forbids DDL in. Run DDL with an empty parameter slice:
9//!
10//! ```rust,no_run
11//! # async fn create_table(config: mssql_client::Config) -> Result<(), mssql_client::Error> {
12//! # let mut client = mssql_client::Client::connect(config).await?;
13//! client.execute("CREATE TABLE dbo.t (id INT)", &[]).await?;
14//! # Ok(())
15//! # }
16//! ```
17//!
18//! Use [`Client::simple_query`] for fire-and-forget batches (including
19//! multi-statement, `;`-separated DDL) when you don't need the affected-row count.
20
21// Allow unwrap/expect for chrono date construction with known-valid constant dates
22// and for regex patterns that are compile-time constants
23#![allow(clippy::unwrap_used, clippy::expect_used, clippy::needless_range_loop)]
24
25mod connect;
26mod params;
27mod response;
28
29use std::marker::PhantomData;
30
31use mssql_codec::connection::Connection;
32#[cfg(feature = "tls")]
33use mssql_tls::TlsStream;
34use tds_protocol::packet::PacketType;
35use tds_protocol::rpc::RpcRequest;
36use tds_protocol::token::{EnvChange, EnvChangeType};
37use tokio::net::TcpStream;
38
39use crate::config::Config;
40use crate::error::{Error, Result};
41#[cfg(feature = "otel")]
42use crate::instrumentation::InstrumentationContext;
43use crate::state::{ConnectionState, InTransaction, Ready};
44use crate::statement_cache::StatementCache;
45use crate::stream::{MultiResultStream, QueryStream};
46use crate::transaction::SavePoint;
47
48/// How long to wait for the server to acknowledge an Attention packet after
49/// a command timeout. SqlClient waits 5 seconds before dooming the
50/// connection; we match it.
51const ATTENTION_ACK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
52
53/// Run a network future under an optional command deadline.
54///
55/// On timeout this sends an Attention packet via `canceller` and then awaits
56/// the future so its own read loop drains the server's DONE_ATTN
57/// acknowledgement, leaving the connection clean before returning
58/// [`Error::CommandTimeout`]. This is the cancel-safe alternative to dropping
59/// the future (e.g. via `tokio::time::timeout`), which would leave unconsumed
60/// TDS data in the connection buffer and desync the next request.
61///
62/// The drain itself is bounded by [`ATTENTION_ACK_TIMEOUT`] — a hung server
63/// that never acknowledges the attention must not turn the timeout into an
64/// infinite wait. When the bound expires the connection is abandoned
65/// mid-response: `in_flight` stays set, so the pool discards the connection
66/// at check-in instead of reusing it.
67pub(crate) async fn run_with_deadline<F, T>(
68 fut: F,
69 deadline: Option<std::time::Duration>,
70 canceller: crate::cancel::CancelHandle,
71) -> Result<T>
72where
73 F: std::future::Future<Output = Result<T>>,
74{
75 let Some(d) = deadline else {
76 return fut.await;
77 };
78 tokio::pin!(fut);
79 tokio::select! {
80 biased;
81 res = &mut fut => res,
82 () = tokio::time::sleep(d) => {
83 // Signal cancellation, then let the in-flight read consume the
84 // server's attention acknowledgement so the connection stays usable.
85 let drain = async {
86 let _ = canceller.cancel().await;
87 let _ = (&mut fut).await;
88 };
89 if tokio::time::timeout(ATTENTION_ACK_TIMEOUT, drain).await.is_err() {
90 tracing::warn!(
91 timeout = ?ATTENTION_ACK_TIMEOUT,
92 "server did not acknowledge attention; abandoning the connection as dirty"
93 );
94 }
95 Err(Error::CommandTimeout)
96 }
97 }
98}
99
100/// SQL Server client with type-state connection management.
101///
102/// The generic parameter `S` represents the current connection state,
103/// ensuring at compile time that certain operations are only available
104/// in appropriate states.
105pub struct Client<S: ConnectionState> {
106 config: Config,
107 _state: PhantomData<S>,
108 /// The underlying connection (present only when connected)
109 connection: Option<ConnectionHandle>,
110 /// Server version from LoginAck (raw u32 TDS version)
111 server_version: Option<u32>,
112 /// Current database from EnvChange
113 current_database: Option<String>,
114 /// Server's default collation from SqlCollation EnvChange during login.
115 /// Used when `SendStringParametersAsUnicode=false` to encode VARCHAR
116 /// parameters with the correct character encoding and collation bytes.
117 server_collation: Option<tds_protocol::token::Collation>,
118 /// Prepared statement cache for query optimization
119 statement_cache: StatementCache,
120 /// Transaction descriptor from BeginTransaction EnvChange.
121 /// Per MS-TDS spec, this value must be included in ALL_HEADERS for subsequent
122 /// requests within an explicit transaction. 0 indicates auto-commit mode.
123 transaction_descriptor: u64,
124 /// Whether a request has been sent and the response has not yet been fully read.
125 /// Used by the connection pool to detect dirty connections after cancel/timeout.
126 in_flight: bool,
127 /// Whether this connection needs a reset on next use.
128 /// Set by connection pool on checkin, cleared after first query/execute.
129 /// When true, the RESETCONNECTION flag is set on the first TDS packet.
130 needs_reset: bool,
131 /// OpenTelemetry instrumentation context (when otel feature is enabled)
132 #[cfg(feature = "otel")]
133 instrumentation: InstrumentationContext,
134 /// Always Encrypted context for column decryption (when always-encrypted feature is enabled)
135 #[cfg(feature = "always-encrypted")]
136 pub(crate) encryption_context: Option<std::sync::Arc<crate::encryption::EncryptionContext>>,
137}
138
139/// Internal connection handle wrapping the actual connection.
140///
141/// This is an enum to support different connection types:
142/// - TLS (TDS 8.0 strict mode) - requires `tls` feature
143/// - TLS with PreLogin wrapping (TDS 7.x style) - requires `tls` feature
144/// - Plain TCP (for internal networks or when `tls` feature is disabled)
145#[allow(dead_code)] // Connection will be used once query execution is implemented
146enum ConnectionHandle {
147 /// TLS connection (TDS 8.0 strict mode - TLS before any TDS traffic)
148 #[cfg(feature = "tls")]
149 Tls(Connection<TlsStream<TcpStream>>),
150 /// TLS connection with PreLogin wrapping (TDS 7.x style)
151 #[cfg(feature = "tls")]
152 TlsPrelogin(Connection<TlsStream<mssql_tls::TlsPreloginWrapper<TcpStream>>>),
153 /// Plain TCP connection (for internal networks or when `tls` feature is disabled)
154 Plain(Connection<TcpStream>),
155}
156
157// Private helper methods available to all connection states
158impl<S: ConnectionState> Client<S> {
159 /// The default per-command deadline from `command_timeout`.
160 ///
161 /// Returns `None` when `command_timeout` is zero, which means "no limit"
162 /// (matching ADO.NET's `SqlCommand.CommandTimeout = 0`).
163 pub(crate) fn command_deadline(&self) -> Option<std::time::Duration> {
164 let t = self.config.command_timeout;
165 if t.is_zero() { None } else { Some(t) }
166 }
167
168 /// Build a cancel handle for the current connection, regardless of
169 /// connection state. The public, documented surface is
170 /// [`Client::<Ready>::cancel_handle`]; both state-specific methods
171 /// delegate here.
172 pub(crate) fn connection_cancel_handle(&self) -> crate::cancel::CancelHandle {
173 let connection = self
174 .connection
175 .as_ref()
176 .expect("connection should be present");
177 match connection {
178 #[cfg(feature = "tls")]
179 ConnectionHandle::Tls(conn) => {
180 crate::cancel::CancelHandle::from_tls(conn.cancel_handle())
181 }
182 #[cfg(feature = "tls")]
183 ConnectionHandle::TlsPrelogin(conn) => {
184 crate::cancel::CancelHandle::from_tls_prelogin(conn.cancel_handle())
185 }
186 ConnectionHandle::Plain(conn) => {
187 crate::cancel::CancelHandle::from_plain(conn.cancel_handle())
188 }
189 }
190 }
191
192 /// Process transaction-related EnvChange tokens.
193 ///
194 /// This handles BeginTransaction, CommitTransaction, and RollbackTransaction
195 /// EnvChange tokens, updating the transaction descriptor accordingly.
196 ///
197 /// This enables executing BEGIN TRANSACTION, COMMIT, and ROLLBACK via raw SQL
198 /// while still having the transaction descriptor tracked correctly.
199 fn process_transaction_env_change(env: &EnvChange, transaction_descriptor: &mut u64) {
200 use tds_protocol::token::EnvChangeValue;
201
202 match env.env_type {
203 EnvChangeType::BeginTransaction => {
204 if let EnvChangeValue::Binary(ref data) = env.new_value {
205 if data.len() >= 8 {
206 let descriptor = u64::from_le_bytes([
207 data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
208 ]);
209 tracing::debug!(descriptor = descriptor, "transaction started via raw SQL");
210 *transaction_descriptor = descriptor;
211 }
212 }
213 }
214 EnvChangeType::CommitTransaction | EnvChangeType::RollbackTransaction => {
215 tracing::debug!(
216 env_type = ?env.env_type,
217 "transaction ended via raw SQL"
218 );
219 *transaction_descriptor = 0;
220 }
221 _ => {}
222 }
223 }
224
225 /// Send a SQL batch to the server.
226 ///
227 /// Uses the client's current transaction descriptor in ALL_HEADERS.
228 /// Per MS-TDS spec, when in an explicit transaction, the descriptor
229 /// returned by BeginTransaction must be included.
230 ///
231 /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
232 /// is included in the first packet to reset connection state.
233 async fn send_sql_batch(&mut self, sql: &str) -> Result<()> {
234 let payload =
235 tds_protocol::encode_sql_batch_with_transaction(sql, self.transaction_descriptor);
236 let max_packet = self.config.packet_size as usize;
237
238 // Check if we need to reset the connection on this request
239 let reset = self.needs_reset;
240 if reset {
241 self.needs_reset = false; // Clear flag before sending
242 tracing::debug!("sending SQL batch with RESETCONNECTION flag");
243 }
244
245 self.in_flight = true;
246 let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
247
248 match connection {
249 #[cfg(feature = "tls")]
250 ConnectionHandle::Tls(conn) => {
251 conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
252 .await?;
253 }
254 #[cfg(feature = "tls")]
255 ConnectionHandle::TlsPrelogin(conn) => {
256 conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
257 .await?;
258 }
259 ConnectionHandle::Plain(conn) => {
260 conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
261 .await?;
262 }
263 }
264
265 Ok(())
266 }
267
268 /// Send an RPC request to the server.
269 ///
270 /// Uses the client's current transaction descriptor in ALL_HEADERS.
271 ///
272 /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
273 /// is included in the first packet to reset connection state.
274 pub(crate) async fn send_rpc(&mut self, rpc: &RpcRequest) -> Result<()> {
275 let payload = rpc.encode_with_transaction(self.transaction_descriptor);
276 let max_packet = self.config.packet_size as usize;
277
278 // Check if we need to reset the connection on this request
279 let reset = self.needs_reset;
280 if reset {
281 self.needs_reset = false; // Clear flag before sending
282 tracing::debug!("sending RPC with RESETCONNECTION flag");
283 }
284
285 self.in_flight = true;
286 let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
287
288 match connection {
289 #[cfg(feature = "tls")]
290 ConnectionHandle::Tls(conn) => {
291 conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
292 .await?;
293 }
294 #[cfg(feature = "tls")]
295 ConnectionHandle::TlsPrelogin(conn) => {
296 conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
297 .await?;
298 }
299 ConnectionHandle::Plain(conn) => {
300 conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
301 .await?;
302 }
303 }
304
305 Ok(())
306 }
307
308 /// Start building a stored procedure call with full control over parameters.
309 ///
310 /// Returns a [`crate::procedure::ProcedureBuilder`] that allows adding named input and output
311 /// parameters before executing the call.
312 ///
313 /// The procedure name is validated to prevent SQL injection. It may be
314 /// schema-qualified (e.g., `"dbo.MyProc"`).
315 ///
316 /// # Example
317 ///
318 /// ```rust,no_run
319 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
320 /// let result = client.procedure("dbo.CalculateSum")?
321 /// .input("@a", &10i32)
322 /// .input("@b", &20i32)
323 /// .output_int("@result")
324 /// .execute().await?;
325 ///
326 /// let sum = result.get_output("@result").unwrap();
327 /// # let _ = sum;
328 /// # Ok(())
329 /// # }
330 /// ```
331 pub fn procedure(
332 &mut self,
333 proc_name: &str,
334 ) -> Result<crate::procedure::ProcedureBuilder<'_, S>> {
335 crate::validation::validate_qualified_identifier(proc_name)?;
336 Ok(crate::procedure::ProcedureBuilder::new(self, proc_name))
337 }
338
339 /// Execute a stored procedure with positional input parameters.
340 ///
341 /// This is a convenience method for the common case of calling a procedure
342 /// with input-only parameters. For output parameters or named parameters,
343 /// use [`procedure()`](Client::procedure) instead.
344 ///
345 /// # Example
346 ///
347 /// ```rust,no_run
348 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
349 /// let result = client.call_procedure("dbo.GetUser", &[&1i32]).await?;
350 /// assert_eq!(result.return_value, 0);
351 ///
352 /// if let Some(rs) = result.first_result_set() {
353 /// println!("columns: {:?}", rs.columns());
354 /// }
355 /// # Ok(())
356 /// # }
357 /// ```
358 pub async fn call_procedure(
359 &mut self,
360 proc_name: &str,
361 params: &[&(dyn crate::ToSql + Sync)],
362 ) -> Result<crate::stream::ProcedureResult> {
363 crate::validation::validate_qualified_identifier(proc_name)?;
364
365 tracing::debug!(
366 proc_name = proc_name,
367 params_count = params.len(),
368 "executing stored procedure"
369 );
370
371 let rpc_params =
372 Self::convert_params_positional(params, self.send_unicode(), self.server_collation())?;
373 let mut rpc = RpcRequest::named(proc_name);
374 for param in rpc_params {
375 rpc = rpc.param(param);
376 }
377
378 let deadline = self.command_deadline();
379 let canceller = self.connection_cancel_handle();
380 run_with_deadline(
381 async {
382 self.send_rpc(&rpc).await?;
383 self.read_procedure_result().await
384 },
385 deadline,
386 canceller,
387 )
388 .await
389 }
390
391 /// Start a bulk insert operation for the specified table.
392 ///
393 /// Sends the `INSERT BULK` statement to the server and returns a
394 /// [`crate::bulk::BulkWriter`] for streaming rows. The writer holds
395 /// a mutable borrow on the client, preventing other operations while
396 /// the bulk insert is in progress.
397 ///
398 /// # Example
399 ///
400 /// ```rust,no_run
401 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
402 /// use mssql_client::{BulkInsertBuilder, BulkColumn, SqlValue};
403 ///
404 /// let builder = BulkInsertBuilder::new("dbo.Users")
405 /// .with_typed_columns(vec![
406 /// BulkColumn::new("id", "INT", 0)?,
407 /// BulkColumn::new("name", "NVARCHAR(100)", 1)?,
408 /// ]);
409 ///
410 /// let mut writer = client.bulk_insert(&builder).await?;
411 /// writer.send_row_values(&[SqlValue::Int(1), SqlValue::String("Alice".into())])?;
412 /// writer.send_row_values(&[SqlValue::Int(2), SqlValue::String("Bob".into())])?;
413 /// let result = writer.finish().await?;
414 /// println!("Inserted {} rows", result.rows_affected);
415 /// # Ok(())
416 /// # }
417 /// ```
418 pub async fn bulk_insert(
419 &mut self,
420 builder: &crate::bulk::BulkInsertBuilder,
421 ) -> Result<crate::bulk::BulkWriter<'_, S>> {
422 use tds_protocol::token::{ColMetaData, Token};
423
424 tracing::debug!(
425 table = builder.table_name(),
426 columns = builder.columns().len(),
427 "starting bulk insert"
428 );
429
430 // Step 1: Query the server for column metadata.
431 // This gives us the exact type encoding the server expects for BulkLoad,
432 // following the pattern established by Tiberius.
433 let meta_query = format!("SELECT TOP 0 * FROM {}", builder.table_name());
434 let deadline = self.command_deadline();
435 let canceller = self.connection_cancel_handle();
436 let message = run_with_deadline(
437 async {
438 self.send_sql_batch(&meta_query).await?;
439 self.read_response_message().await
440 },
441 deadline,
442 canceller,
443 )
444 .await?;
445 self.in_flight = false;
446
447 // Capture both the raw COLMETADATA bytes and parsed column info
448 let raw_payload = message.payload.clone();
449 let mut parser = self.create_parser(message.payload);
450 let mut server_metadata: Option<ColMetaData> = None;
451 let mut meta_start: usize = 0;
452 let mut meta_end: usize = 0;
453
454 loop {
455 let pos_before = raw_payload.len() - parser.remaining();
456 let token = parser.next_token_with_metadata(server_metadata.as_ref())?;
457 let pos_after = raw_payload.len() - parser.remaining();
458 let Some(token) = token else { break };
459
460 match token {
461 Token::ColMetaData(meta) => {
462 meta_start = pos_before;
463 meta_end = pos_after;
464 server_metadata = Some(meta);
465 }
466 Token::Done(_) => break,
467 _ => {}
468 }
469 }
470
471 // Reject deprecated TEXT/NTEXT/IMAGE columns reported by the server.
472 // These types require a legacy TEXTPTR wire format that this driver
473 // does not support — users should migrate the column to VARCHAR(MAX) /
474 // NVARCHAR(MAX) / VARBINARY(MAX).
475 if let Some(ref meta) = server_metadata {
476 use tds_protocol::types::TypeId;
477 for col in meta.columns.iter() {
478 let (rejected, replacement) = match col.type_id {
479 TypeId::Text => (Some("TEXT"), "VARCHAR(MAX)"),
480 TypeId::NText => (Some("NTEXT"), "NVARCHAR(MAX)"),
481 TypeId::Image => (Some("IMAGE"), "VARBINARY(MAX)"),
482 _ => (None, ""),
483 };
484 if let Some(sql_type) = rejected {
485 return Err(Error::from(mssql_types::TypeError::UnsupportedType {
486 sql_type: sql_type.to_string(),
487 reason: format!(
488 "column `{}` in table `{}` is {} — TEXT/NTEXT/IMAGE \
489 are not supported. Alter the column to {} instead \
490 (Microsoft deprecated TEXT/NTEXT/IMAGE in SQL \
491 Server 2005).",
492 col.name,
493 builder.table_name(),
494 sql_type,
495 replacement,
496 ),
497 }));
498 }
499 }
500 }
501
502 // Step 2: Send INSERT BULK statement to put server in bulk load mode
503 let stmt = builder.build_insert_bulk_statement()?;
504 let deadline = self.command_deadline();
505 let canceller = self.connection_cancel_handle();
506 run_with_deadline(
507 async {
508 self.send_sql_batch(&stmt).await?;
509 self.read_execute_result().await
510 },
511 deadline,
512 canceller,
513 )
514 .await?;
515
516 // Step 3: Create bulk writer with server's metadata
517 let raw_meta = if meta_end > meta_start {
518 Some(raw_payload.slice(meta_start..meta_end))
519 } else {
520 None
521 };
522
523 let server_cols = server_metadata.as_ref().map(|m| m.columns.as_slice());
524 let bulk = crate::bulk::BulkInsert::new_with_server_metadata(
525 builder.columns().to_vec(),
526 builder.options().batch_size,
527 raw_meta,
528 server_cols,
529 );
530
531 Ok(crate::bulk::BulkWriter::new(self, bulk))
532 }
533
534 /// Start a bulk insert without querying the server for column metadata.
535 ///
536 /// Unlike [`bulk_insert()`](Self::bulk_insert), this method does not send
537 /// `SELECT TOP 0 * FROM table` to discover column types. Instead, the
538 /// column metadata is constructed from the `BulkColumn` types provided
539 /// on the builder. This saves a round-trip when the schema is known.
540 ///
541 /// # Caveats
542 ///
543 /// The caller must ensure `BulkColumn` entries match the target table's
544 /// column definitions exactly. Mismatched types, lengths, precision/scale,
545 /// or column ordering will cause the server to reject the BulkLoad packet.
546 ///
547 /// For most use cases, prefer [`bulk_insert()`](Self::bulk_insert) — the
548 /// extra round-trip is usually negligible and the server-supplied metadata
549 /// is guaranteed correct.
550 pub async fn bulk_insert_without_schema_discovery(
551 &mut self,
552 builder: &crate::bulk::BulkInsertBuilder,
553 ) -> Result<crate::bulk::BulkWriter<'_, S>> {
554 tracing::debug!(
555 table = builder.table_name(),
556 columns = builder.columns().len(),
557 "starting bulk insert (no schema discovery)"
558 );
559
560 // Send INSERT BULK statement to put server in bulk load mode
561 let stmt = builder.build_insert_bulk_statement()?;
562 let deadline = self.command_deadline();
563 let canceller = self.connection_cancel_handle();
564 run_with_deadline(
565 async {
566 self.send_sql_batch(&stmt).await?;
567 self.read_execute_result().await
568 },
569 deadline,
570 canceller,
571 )
572 .await?;
573
574 // Create bulk writer with hand-crafted metadata
575 let bulk =
576 crate::bulk::BulkInsert::new(builder.columns().to_vec(), builder.options().batch_size);
577
578 Ok(crate::bulk::BulkWriter::new(self, bulk))
579 }
580
581 /// Send bulk load data as a BulkLoad (0x07) message and read the server response.
582 ///
583 /// Used internally by [`crate::bulk::BulkWriter::finish()`] to transmit accumulated
584 /// row data after the `INSERT BULK` statement has been acknowledged.
585 pub(crate) async fn send_and_read_bulk_load(&mut self, payload: bytes::Bytes) -> Result<u64> {
586 let max_packet = self.config.packet_size as usize;
587
588 self.in_flight = true;
589 let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
590
591 match connection {
592 #[cfg(feature = "tls")]
593 ConnectionHandle::Tls(conn) => {
594 conn.send_message(PacketType::BulkLoad, payload, max_packet)
595 .await?;
596 }
597 #[cfg(feature = "tls")]
598 ConnectionHandle::TlsPrelogin(conn) => {
599 conn.send_message(PacketType::BulkLoad, payload, max_packet)
600 .await?;
601 }
602 ConnectionHandle::Plain(conn) => {
603 conn.send_message(PacketType::BulkLoad, payload, max_packet)
604 .await?;
605 }
606 }
607
608 // Read the server's Done response with row count
609 self.read_execute_result().await
610 }
611
612 /// Execute a query with named parameters and return a streaming result set.
613 ///
614 /// This method accepts [`NamedParam`](crate::to_params::NamedParam) values,
615 /// making it compatible with the [`ToParams`](crate::to_params::ToParams) trait
616 /// and the `#[derive(ToParams)]` macro.
617 ///
618 /// # Example
619 ///
620 /// ```rust,no_run
621 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
622 /// use mssql_client::{NamedParam, ToParams};
623 ///
624 /// // With derive macro:
625 /// #[derive(mssql_derive::ToParams)]
626 /// struct UserQuery { name: String }
627 ///
628 /// let q = UserQuery { name: "Alice".into() };
629 /// let rows = client.query_named(
630 /// "SELECT * FROM users WHERE name = @name",
631 /// &q.to_params()?,
632 /// ).await?;
633 ///
634 /// // Or manually:
635 /// let params = vec![NamedParam::from_value("name", &"Alice")?];
636 /// let rows = client.query_named(
637 /// "SELECT * FROM users WHERE name = @name",
638 /// ¶ms,
639 /// ).await?;
640 /// # let _ = rows;
641 /// # Ok(())
642 /// # }
643 /// ```
644 pub async fn query_named<'a>(
645 &'a mut self,
646 sql: &str,
647 params: &[crate::to_params::NamedParam],
648 ) -> Result<QueryStream<'a>> {
649 tracing::debug!(
650 sql = sql,
651 params_count = params.len(),
652 "executing query with named parameters"
653 );
654
655 if params.is_empty() {
656 self.send_sql_batch(sql).await?;
657 } else {
658 let rpc_params =
659 Self::convert_named_params(params, self.send_unicode(), self.server_collation())?;
660 let rpc = RpcRequest::execute_sql(sql, rpc_params);
661 self.send_rpc(&rpc).await?;
662 }
663
664 let resp = self.read_query_response().await?;
665 #[cfg(feature = "always-encrypted")]
666 {
667 Ok(QueryStream::from_raw(
668 resp.columns,
669 resp.pending_rows,
670 resp.meta,
671 resp.decryptor,
672 ))
673 }
674 #[cfg(not(feature = "always-encrypted"))]
675 {
676 Ok(QueryStream::from_raw(
677 resp.columns,
678 resp.pending_rows,
679 resp.meta,
680 ))
681 }
682 }
683
684 /// Execute a statement with named parameters.
685 ///
686 /// Returns the number of affected rows. This is the named-parameter
687 /// counterpart of [`execute()`](Client::execute), compatible with the
688 /// [`ToParams`](crate::to_params::ToParams) trait.
689 ///
690 /// # Example
691 ///
692 /// ```rust,no_run
693 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
694 /// use mssql_client::NamedParam;
695 ///
696 /// let params = vec![
697 /// NamedParam::from_value("name", &"Alice")?,
698 /// NamedParam::from_value("email", &"alice@example.com")?,
699 /// ];
700 /// let rows_affected = client.execute_named(
701 /// "INSERT INTO users (name, email) VALUES (@name, @email)",
702 /// ¶ms,
703 /// ).await?;
704 /// # let _ = rows_affected;
705 /// # Ok(())
706 /// # }
707 /// ```
708 pub async fn execute_named(
709 &mut self,
710 sql: &str,
711 params: &[crate::to_params::NamedParam],
712 ) -> Result<u64> {
713 tracing::debug!(
714 sql = sql,
715 params_count = params.len(),
716 "executing statement with named parameters"
717 );
718
719 let deadline = self.command_deadline();
720 let canceller = self.connection_cancel_handle();
721 run_with_deadline(
722 async {
723 if params.is_empty() {
724 self.send_sql_batch(sql).await?;
725 } else {
726 let rpc_params = Self::convert_named_params(
727 params,
728 self.send_unicode(),
729 self.server_collation(),
730 )?;
731 let rpc = RpcRequest::execute_sql(sql, rpc_params);
732 self.send_rpc(&rpc).await?;
733 }
734
735 self.read_execute_result().await
736 },
737 deadline,
738 canceller,
739 )
740 .await
741 }
742
743 /// Whether string parameters are sent as NVARCHAR (Unicode).
744 pub(crate) fn send_unicode(&self) -> bool {
745 self.config.send_string_parameters_as_unicode
746 }
747
748 /// Server's default collation, captured from ENVCHANGE during login.
749 pub(crate) fn server_collation(&self) -> Option<&tds_protocol::token::Collation> {
750 self.server_collation.as_ref()
751 }
752}
753
754impl Client<Ready> {
755 /// Mark this connection as needing a reset on next use.
756 ///
757 /// Called by the connection pool when a connection is returned.
758 /// The next SQL batch or RPC will include the RESETCONNECTION flag
759 /// in the TDS packet header, causing SQL Server to reset connection
760 /// state (temp tables, SET options, transaction isolation level, etc.)
761 /// before executing the command.
762 ///
763 /// This is more efficient than calling `sp_reset_connection` as a
764 /// separate command because it's handled at the TDS protocol level.
765 pub fn mark_needs_reset(&mut self) {
766 self.needs_reset = true;
767 }
768
769 /// Check if this connection needs a reset.
770 ///
771 /// Returns true if `mark_needs_reset()` was called and the reset
772 /// hasn't been performed yet.
773 #[must_use]
774 pub fn needs_reset(&self) -> bool {
775 self.needs_reset
776 }
777
778 /// Execute a query and return a result set with lazy per-row decoding.
779 ///
780 /// Per ADR-007 the full response is buffered in memory and each row is
781 /// *decoded* on demand as you iterate — this is not incremental network
782 /// streaming, so peak memory tracks the response size. Use
783 /// `.collect_all()` if you want all rows materialized into a `Vec` up
784 /// front.
785 ///
786 /// # Example
787 ///
788 /// ```rust,no_run
789 /// # use mssql_client::Row;
790 /// # fn process(_: &Row) {}
791 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
792 /// // Streaming (synchronous iteration over the result set)
793 /// let stream = client.query("SELECT * FROM users WHERE id = @p1", &[&1]).await?;
794 /// for row in stream {
795 /// let row = row?;
796 /// process(&row);
797 /// }
798 ///
799 /// // Buffered (loads all into memory)
800 /// let rows: Vec<Row> = client
801 /// .query("SELECT * FROM small_table", &[])
802 /// .await?
803 /// .collect_all()
804 /// .await?;
805 /// # let _ = rows;
806 /// # Ok(())
807 /// # }
808 /// ```
809 pub async fn query<'a>(
810 &'a mut self,
811 sql: &str,
812 params: &[&(dyn crate::ToSql + Sync)],
813 ) -> Result<QueryStream<'a>> {
814 let deadline = self.command_deadline();
815 self.query_inner(sql, params, deadline).await
816 }
817
818 /// Shared query implementation with an explicit command deadline.
819 async fn query_inner<'a>(
820 &'a mut self,
821 sql: &str,
822 params: &[&(dyn crate::ToSql + Sync)],
823 deadline: Option<std::time::Duration>,
824 ) -> Result<QueryStream<'a>> {
825 tracing::debug!(sql = sql, params_count = params.len(), "executing query");
826
827 #[cfg(feature = "otel")]
828 let instrumentation = self.instrumentation.clone();
829 #[cfg(feature = "otel")]
830 let mut span = instrumentation.query_span(sql);
831 #[cfg(feature = "otel")]
832 let timer = crate::instrumentation::OperationTimer::start(
833 crate::instrumentation::extract_operation(sql),
834 );
835
836 let canceller = self.cancel_handle();
837 let result = run_with_deadline(
838 async {
839 if params.is_empty() {
840 // Simple query without parameters - use SQL batch
841 self.send_sql_batch(sql).await?;
842 } else {
843 // Parameterized query - use sp_executesql via RPC
844 let rpc_params =
845 Self::convert_params(params, self.send_unicode(), self.server_collation())?;
846 let rpc = RpcRequest::execute_sql(sql, rpc_params);
847 self.send_rpc(&rpc).await?;
848 }
849
850 // Read complete response including columns and rows
851 self.read_query_response().await
852 },
853 deadline,
854 canceller,
855 )
856 .await;
857
858 #[cfg(feature = "otel")]
859 match &result {
860 Ok(_) => InstrumentationContext::record_success(&mut span, None),
861 Err(e) => InstrumentationContext::record_error(&mut span, e),
862 }
863 #[cfg(feature = "otel")]
864 timer.finish(instrumentation.metrics(), result.is_ok());
865
866 // Drop the span before returning
867 #[cfg(feature = "otel")]
868 drop(span);
869
870 let resp = result?;
871 #[cfg(feature = "always-encrypted")]
872 {
873 Ok(QueryStream::from_raw(
874 resp.columns,
875 resp.pending_rows,
876 resp.meta,
877 resp.decryptor,
878 ))
879 }
880 #[cfg(not(feature = "always-encrypted"))]
881 {
882 Ok(QueryStream::from_raw(
883 resp.columns,
884 resp.pending_rows,
885 resp.meta,
886 ))
887 }
888 }
889
890 /// Execute a query with a specific timeout.
891 ///
892 /// This overrides the default `command_timeout` from the connection configuration
893 /// for this specific query. If the query does not complete within the specified
894 /// duration, the driver sends an Attention packet to cancel it server-side,
895 /// drains the acknowledgement, and returns [`Error::CommandTimeout`] with the
896 /// connection left usable for the next request.
897 ///
898 /// # Arguments
899 ///
900 /// * `sql` - The SQL query to execute
901 /// * `params` - Query parameters
902 /// * `timeout_duration` - Maximum time to wait for the query to complete
903 ///
904 /// # Example
905 ///
906 /// ```rust,no_run
907 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
908 /// use std::time::Duration;
909 ///
910 /// // Execute with a 5-second timeout
911 /// let rows = client
912 /// .query_with_timeout(
913 /// "SELECT * FROM large_table",
914 /// &[],
915 /// Duration::from_secs(5),
916 /// )
917 /// .await?;
918 /// # let _ = rows;
919 /// # Ok(())
920 /// # }
921 /// ```
922 pub async fn query_with_timeout<'a>(
923 &'a mut self,
924 sql: &str,
925 params: &[&(dyn crate::ToSql + Sync)],
926 timeout_duration: std::time::Duration,
927 ) -> Result<QueryStream<'a>> {
928 self.query_inner(sql, params, Some(timeout_duration)).await
929 }
930
931 /// Execute a batch that may return multiple result sets.
932 ///
933 /// This is useful for stored procedures or SQL batches that contain
934 /// multiple SELECT statements.
935 ///
936 /// # Example
937 ///
938 /// ```rust,no_run
939 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
940 /// // Execute a batch with multiple SELECT statements
941 /// let mut results = client.query_multiple(
942 /// "SELECT 1 AS a; SELECT 2 AS b, 3 AS c;",
943 /// &[]
944 /// ).await?;
945 ///
946 /// // Process first result set
947 /// while let Some(row) = results.next_row().await? {
948 /// println!("Result 1: {:?}", row);
949 /// }
950 ///
951 /// // Move to second result set
952 /// if results.next_result().await? {
953 /// while let Some(row) = results.next_row().await? {
954 /// println!("Result 2: {:?}", row);
955 /// }
956 /// }
957 /// # Ok(())
958 /// # }
959 /// ```
960 pub async fn query_multiple<'a>(
961 &'a mut self,
962 sql: &str,
963 params: &[&(dyn crate::ToSql + Sync)],
964 ) -> Result<MultiResultStream<'a>> {
965 tracing::debug!(
966 sql = sql,
967 params_count = params.len(),
968 "executing multi-result query"
969 );
970
971 let deadline = self.command_deadline();
972 let canceller = self.connection_cancel_handle();
973 let result_sets = run_with_deadline(
974 async {
975 if params.is_empty() {
976 // Simple batch without parameters - use SQL batch
977 self.send_sql_batch(sql).await?;
978 } else {
979 // Parameterized query - use sp_executesql via RPC
980 let rpc_params =
981 Self::convert_params(params, self.send_unicode(), self.server_collation())?;
982 let rpc = RpcRequest::execute_sql(sql, rpc_params);
983 self.send_rpc(&rpc).await?;
984 }
985
986 // Read all result sets
987 self.read_multi_result_response().await
988 },
989 deadline,
990 canceller,
991 )
992 .await?;
993 Ok(MultiResultStream::new(result_sets))
994 }
995
996 /// Execute a query that doesn't return rows.
997 ///
998 /// Returns the number of affected rows.
999 pub async fn execute(
1000 &mut self,
1001 sql: &str,
1002 params: &[&(dyn crate::ToSql + Sync)],
1003 ) -> Result<u64> {
1004 let deadline = self.command_deadline();
1005 self.execute_inner(sql, params, deadline).await
1006 }
1007
1008 /// Shared execute implementation with an explicit command deadline.
1009 async fn execute_inner(
1010 &mut self,
1011 sql: &str,
1012 params: &[&(dyn crate::ToSql + Sync)],
1013 deadline: Option<std::time::Duration>,
1014 ) -> Result<u64> {
1015 tracing::debug!(
1016 sql = sql,
1017 params_count = params.len(),
1018 "executing statement"
1019 );
1020
1021 #[cfg(feature = "otel")]
1022 let instrumentation = self.instrumentation.clone();
1023 #[cfg(feature = "otel")]
1024 let mut span = instrumentation.query_span(sql);
1025 #[cfg(feature = "otel")]
1026 let timer = crate::instrumentation::OperationTimer::start(
1027 crate::instrumentation::extract_operation(sql),
1028 );
1029
1030 let canceller = self.cancel_handle();
1031 let result = run_with_deadline(
1032 async {
1033 if params.is_empty() {
1034 // Simple statement without parameters - use SQL batch
1035 self.send_sql_batch(sql).await?;
1036 } else {
1037 // Parameterized statement - use sp_executesql via RPC
1038 let rpc_params =
1039 Self::convert_params(params, self.send_unicode(), self.server_collation())?;
1040 let rpc = RpcRequest::execute_sql(sql, rpc_params);
1041 self.send_rpc(&rpc).await?;
1042 }
1043
1044 // Read response and get row count
1045 self.read_execute_result().await
1046 },
1047 deadline,
1048 canceller,
1049 )
1050 .await;
1051
1052 #[cfg(feature = "otel")]
1053 match &result {
1054 Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
1055 Err(e) => InstrumentationContext::record_error(&mut span, e),
1056 }
1057 #[cfg(feature = "otel")]
1058 timer.finish(instrumentation.metrics(), result.is_ok());
1059
1060 // Drop the span before returning
1061 #[cfg(feature = "otel")]
1062 drop(span);
1063
1064 result
1065 }
1066
1067 /// Execute a statement with a specific timeout.
1068 ///
1069 /// This overrides the default `command_timeout` from the connection configuration
1070 /// for this specific statement. If the statement does not complete within the
1071 /// specified duration, the driver sends an Attention packet to cancel it
1072 /// server-side, drains the acknowledgement, and returns
1073 /// [`Error::CommandTimeout`] with the connection left usable.
1074 ///
1075 /// # Arguments
1076 ///
1077 /// * `sql` - The SQL statement to execute
1078 /// * `params` - Statement parameters
1079 /// * `timeout_duration` - Maximum time to wait for the statement to complete
1080 ///
1081 /// # Example
1082 ///
1083 /// ```rust,no_run
1084 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1085 /// use std::time::Duration;
1086 ///
1087 /// // Execute with a 10-second timeout
1088 /// let rows_affected = client
1089 /// .execute_with_timeout(
1090 /// "UPDATE large_table SET status = @p1",
1091 /// &[&"processed"],
1092 /// Duration::from_secs(10),
1093 /// )
1094 /// .await?;
1095 /// # let _ = rows_affected;
1096 /// # Ok(())
1097 /// # }
1098 /// ```
1099 pub async fn execute_with_timeout(
1100 &mut self,
1101 sql: &str,
1102 params: &[&(dyn crate::ToSql + Sync)],
1103 timeout_duration: std::time::Duration,
1104 ) -> Result<u64> {
1105 self.execute_inner(sql, params, Some(timeout_duration))
1106 .await
1107 }
1108
1109 /// Begin a transaction.
1110 ///
1111 /// This transitions the client from `Ready` to `InTransaction` state.
1112 /// Per MS-TDS spec, the server returns a transaction descriptor in the
1113 /// BeginTransaction EnvChange token that must be included in subsequent
1114 /// ALL_HEADERS sections.
1115 pub async fn begin_transaction(mut self) -> Result<Client<InTransaction>> {
1116 tracing::debug!("beginning transaction");
1117
1118 #[cfg(feature = "otel")]
1119 let instrumentation = self.instrumentation.clone();
1120 #[cfg(feature = "otel")]
1121 let mut span = instrumentation.transaction_span("BEGIN");
1122
1123 // Execute BEGIN TRANSACTION and extract the transaction descriptor
1124 let result = async {
1125 self.send_sql_batch("BEGIN TRANSACTION").await?;
1126 self.read_transaction_begin_result().await
1127 }
1128 .await;
1129
1130 #[cfg(feature = "otel")]
1131 match &result {
1132 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1133 Err(e) => InstrumentationContext::record_error(&mut span, e),
1134 }
1135
1136 // Drop the span before moving instrumentation
1137 #[cfg(feature = "otel")]
1138 drop(span);
1139
1140 let transaction_descriptor = result?;
1141
1142 Ok(Client {
1143 config: self.config,
1144 _state: PhantomData,
1145 connection: self.connection,
1146 server_version: self.server_version,
1147 current_database: self.current_database,
1148 server_collation: self.server_collation,
1149 statement_cache: self.statement_cache,
1150 transaction_descriptor, // Store the descriptor from server
1151 needs_reset: self.needs_reset,
1152 in_flight: self.in_flight,
1153 #[cfg(feature = "otel")]
1154 instrumentation: self.instrumentation,
1155 #[cfg(feature = "always-encrypted")]
1156 encryption_context: self.encryption_context,
1157 })
1158 }
1159
1160 /// Begin a transaction with a specific isolation level.
1161 ///
1162 /// This transitions the client from `Ready` to `InTransaction` state
1163 /// with the specified isolation level.
1164 ///
1165 /// # Example
1166 ///
1167 /// ```rust,no_run
1168 /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1169 /// use mssql_client::IsolationLevel;
1170 ///
1171 /// let tx = client.begin_transaction_with_isolation(IsolationLevel::Serializable).await?;
1172 /// // All operations in this transaction use SERIALIZABLE isolation
1173 /// tx.commit().await?;
1174 /// # Ok(())
1175 /// # }
1176 /// ```
1177 pub async fn begin_transaction_with_isolation(
1178 mut self,
1179 isolation_level: crate::transaction::IsolationLevel,
1180 ) -> Result<Client<InTransaction>> {
1181 tracing::debug!(
1182 isolation_level = %isolation_level.name(),
1183 "beginning transaction with isolation level"
1184 );
1185
1186 #[cfg(feature = "otel")]
1187 let instrumentation = self.instrumentation.clone();
1188 #[cfg(feature = "otel")]
1189 let mut span = instrumentation.transaction_span("BEGIN");
1190
1191 // First set the isolation level
1192 let result = async {
1193 self.send_sql_batch(isolation_level.as_sql()).await?;
1194 self.read_execute_result().await?;
1195
1196 // Then begin the transaction
1197 self.send_sql_batch("BEGIN TRANSACTION").await?;
1198 self.read_transaction_begin_result().await
1199 }
1200 .await;
1201
1202 #[cfg(feature = "otel")]
1203 match &result {
1204 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1205 Err(e) => InstrumentationContext::record_error(&mut span, e),
1206 }
1207
1208 #[cfg(feature = "otel")]
1209 drop(span);
1210
1211 let transaction_descriptor = result?;
1212
1213 Ok(Client {
1214 config: self.config,
1215 _state: PhantomData,
1216 connection: self.connection,
1217 server_version: self.server_version,
1218 current_database: self.current_database,
1219 server_collation: self.server_collation,
1220 statement_cache: self.statement_cache,
1221 transaction_descriptor,
1222 needs_reset: self.needs_reset,
1223 in_flight: self.in_flight,
1224 #[cfg(feature = "otel")]
1225 instrumentation: self.instrumentation,
1226 #[cfg(feature = "always-encrypted")]
1227 encryption_context: self.encryption_context,
1228 })
1229 }
1230
1231 /// Execute a simple query without parameters.
1232 ///
1233 /// This is useful for DDL statements and simple queries where you
1234 /// don't need to retrieve the affected row count.
1235 pub async fn simple_query(&mut self, sql: &str) -> Result<()> {
1236 tracing::debug!(sql = sql, "executing simple query");
1237
1238 // Send SQL batch
1239 self.send_sql_batch(sql).await?;
1240
1241 // Read and discard response
1242 let _ = self.read_execute_result().await?;
1243
1244 Ok(())
1245 }
1246
1247 /// Close the connection gracefully.
1248 pub async fn close(self) -> Result<()> {
1249 tracing::debug!("closing connection");
1250 Ok(())
1251 }
1252
1253 /// Get the current database name.
1254 #[must_use]
1255 pub fn database(&self) -> Option<&str> {
1256 self.config.database.as_deref()
1257 }
1258
1259 /// Get the server host.
1260 #[must_use]
1261 pub fn host(&self) -> &str {
1262 &self.config.host
1263 }
1264
1265 /// Get the server port.
1266 #[must_use]
1267 pub fn port(&self) -> u16 {
1268 self.config.port
1269 }
1270
1271 /// Check if the connection is currently in a transaction.
1272 ///
1273 /// This returns `true` if a transaction was started via raw SQL
1274 /// (`BEGIN TRANSACTION`) and has not yet been committed or rolled back.
1275 ///
1276 /// Note: This only tracks transactions started via raw SQL. Transactions
1277 /// started via the type-state API (`begin_transaction()`) result in a
1278 /// `Client<InTransaction>` which is a different type.
1279 ///
1280 /// # Example
1281 ///
1282 /// ```rust,no_run
1283 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1284 /// client.execute("BEGIN TRANSACTION", &[]).await?;
1285 /// assert!(client.is_in_transaction());
1286 ///
1287 /// client.execute("COMMIT", &[]).await?;
1288 /// assert!(!client.is_in_transaction());
1289 /// # Ok(())
1290 /// # }
1291 /// ```
1292 #[must_use]
1293 pub fn is_in_transaction(&self) -> bool {
1294 self.transaction_descriptor != 0
1295 }
1296
1297 /// Check if a request is in-flight (sent but response not fully read).
1298 ///
1299 /// Used by the connection pool to detect dirty connections that were
1300 /// interrupted mid-query (e.g., by `tokio::select!` or a timeout).
1301 /// A connection with an in-flight request has unread data in the TCP
1302 /// buffer and must be discarded rather than returned to the pool.
1303 #[must_use]
1304 pub fn is_in_flight(&self) -> bool {
1305 self.in_flight
1306 }
1307
1308 /// Report whether an Always Encrypted key-store provider with the given
1309 /// name is currently reachable through this client's encryption context.
1310 ///
1311 /// Returns `false` when the `always-encrypted` feature isn't enabled, when
1312 /// the connection was opened without `column_encryption` configured, or
1313 /// when no matching provider was registered.
1314 #[cfg(feature = "always-encrypted")]
1315 #[must_use]
1316 pub fn has_encryption_provider(&self, name: &str) -> bool {
1317 self.encryption_context
1318 .as_ref()
1319 .is_some_and(|ctx| ctx.has_provider(name))
1320 }
1321
1322 /// Get a handle for cancelling the current query.
1323 ///
1324 /// The cancel handle can be cloned and sent to other tasks, enabling
1325 /// cancellation of long-running queries from a separate async context.
1326 ///
1327 /// # Example
1328 ///
1329 /// ```rust,no_run
1330 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1331 /// use std::time::Duration;
1332 ///
1333 /// let cancel_handle = client.cancel_handle();
1334 ///
1335 /// // Spawn a task to cancel after 10 seconds
1336 /// let handle = tokio::spawn(async move {
1337 /// tokio::time::sleep(Duration::from_secs(10)).await;
1338 /// let _ = cancel_handle.cancel().await;
1339 /// });
1340 ///
1341 /// // This query will be cancelled if it runs longer than 10 seconds
1342 /// let result = client.query("SELECT * FROM very_large_table", &[]).await;
1343 /// # let _ = (handle, result);
1344 /// # Ok(())
1345 /// # }
1346 /// ```
1347 #[must_use]
1348 pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
1349 self.connection_cancel_handle()
1350 }
1351}
1352
1353/// # Drop Behavior
1354///
1355/// **`Client<InTransaction>` has no automatic rollback on drop.** If the client is
1356/// dropped without calling [`commit()`](Client::commit) or [`rollback()`](Client::rollback),
1357/// the transaction remains open on the server until the TCP connection closes
1358/// (at which point SQL Server automatically rolls back).
1359///
1360/// This is because `Drop` is synchronous and cannot perform the async I/O needed
1361/// to send a `ROLLBACK TRANSACTION` command.
1362///
1363/// ## Consequences of dropping without commit/rollback
1364///
1365/// - **Direct connections:** The transaction leaks until the OS TCP timeout
1366/// (potentially 30+ minutes), holding locks on any modified rows.
1367/// - **Pooled connections:** The pool detects the active transaction descriptor
1368/// and discards the connection rather than returning it to the idle pool
1369/// (see `PooledConnection::drop` in `mssql-driver-pool`).
1370///
1371/// ## Best practice
1372///
1373/// Always ensure `commit()` or `rollback()` is called. Use helper patterns
1374/// for error paths:
1375///
1376/// ```rust,no_run
1377/// # async fn do_work(_: &mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> { Ok(()) }
1378/// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1379/// let tx = client.begin_transaction().await?;
1380/// match do_work(&tx).await {
1381/// Ok(_) => { tx.commit().await?; }
1382/// Err(e) => { tx.rollback().await?; return Err(e); }
1383/// }
1384/// # Ok(())
1385/// # }
1386/// ```
1387impl Client<InTransaction> {
1388 /// Execute a query within the transaction and return a streaming result set.
1389 ///
1390 /// See [`Client<Ready>::query`] for usage examples.
1391 pub async fn query<'a>(
1392 &'a mut self,
1393 sql: &str,
1394 params: &[&(dyn crate::ToSql + Sync)],
1395 ) -> Result<QueryStream<'a>> {
1396 let deadline = self.command_deadline();
1397 self.query_inner(sql, params, deadline).await
1398 }
1399
1400 /// Shared query implementation with an explicit command deadline.
1401 async fn query_inner<'a>(
1402 &'a mut self,
1403 sql: &str,
1404 params: &[&(dyn crate::ToSql + Sync)],
1405 deadline: Option<std::time::Duration>,
1406 ) -> Result<QueryStream<'a>> {
1407 tracing::debug!(
1408 sql = sql,
1409 params_count = params.len(),
1410 "executing query in transaction"
1411 );
1412
1413 #[cfg(feature = "otel")]
1414 let instrumentation = self.instrumentation.clone();
1415 #[cfg(feature = "otel")]
1416 let mut span = instrumentation.query_span(sql);
1417 #[cfg(feature = "otel")]
1418 let timer = crate::instrumentation::OperationTimer::start(
1419 crate::instrumentation::extract_operation(sql),
1420 );
1421
1422 let canceller = self.cancel_handle();
1423 let result = run_with_deadline(
1424 async {
1425 if params.is_empty() {
1426 // Simple query without parameters - use SQL batch
1427 self.send_sql_batch(sql).await?;
1428 } else {
1429 // Parameterized query - use sp_executesql via RPC
1430 let rpc_params =
1431 Self::convert_params(params, self.send_unicode(), self.server_collation())?;
1432 let rpc = RpcRequest::execute_sql(sql, rpc_params);
1433 self.send_rpc(&rpc).await?;
1434 }
1435
1436 // Read complete response including columns and rows
1437 self.read_query_response().await
1438 },
1439 deadline,
1440 canceller,
1441 )
1442 .await;
1443
1444 #[cfg(feature = "otel")]
1445 match &result {
1446 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1447 Err(e) => InstrumentationContext::record_error(&mut span, e),
1448 }
1449 #[cfg(feature = "otel")]
1450 timer.finish(instrumentation.metrics(), result.is_ok());
1451
1452 // Drop the span before returning
1453 #[cfg(feature = "otel")]
1454 drop(span);
1455
1456 let resp = result?;
1457 #[cfg(feature = "always-encrypted")]
1458 {
1459 Ok(QueryStream::from_raw(
1460 resp.columns,
1461 resp.pending_rows,
1462 resp.meta,
1463 resp.decryptor,
1464 ))
1465 }
1466 #[cfg(not(feature = "always-encrypted"))]
1467 {
1468 Ok(QueryStream::from_raw(
1469 resp.columns,
1470 resp.pending_rows,
1471 resp.meta,
1472 ))
1473 }
1474 }
1475
1476 /// Execute a statement within the transaction.
1477 ///
1478 /// Returns the number of affected rows.
1479 pub async fn execute(
1480 &mut self,
1481 sql: &str,
1482 params: &[&(dyn crate::ToSql + Sync)],
1483 ) -> Result<u64> {
1484 let deadline = self.command_deadline();
1485 self.execute_inner(sql, params, deadline).await
1486 }
1487
1488 /// Shared execute implementation with an explicit command deadline.
1489 async fn execute_inner(
1490 &mut self,
1491 sql: &str,
1492 params: &[&(dyn crate::ToSql + Sync)],
1493 deadline: Option<std::time::Duration>,
1494 ) -> Result<u64> {
1495 tracing::debug!(
1496 sql = sql,
1497 params_count = params.len(),
1498 "executing statement in transaction"
1499 );
1500
1501 #[cfg(feature = "otel")]
1502 let instrumentation = self.instrumentation.clone();
1503 #[cfg(feature = "otel")]
1504 let mut span = instrumentation.query_span(sql);
1505 #[cfg(feature = "otel")]
1506 let timer = crate::instrumentation::OperationTimer::start(
1507 crate::instrumentation::extract_operation(sql),
1508 );
1509
1510 let canceller = self.cancel_handle();
1511 let result = run_with_deadline(
1512 async {
1513 if params.is_empty() {
1514 // Simple statement without parameters - use SQL batch
1515 self.send_sql_batch(sql).await?;
1516 } else {
1517 // Parameterized statement - use sp_executesql via RPC
1518 let rpc_params =
1519 Self::convert_params(params, self.send_unicode(), self.server_collation())?;
1520 let rpc = RpcRequest::execute_sql(sql, rpc_params);
1521 self.send_rpc(&rpc).await?;
1522 }
1523
1524 // Read response and get row count
1525 self.read_execute_result().await
1526 },
1527 deadline,
1528 canceller,
1529 )
1530 .await;
1531
1532 #[cfg(feature = "otel")]
1533 match &result {
1534 Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
1535 Err(e) => InstrumentationContext::record_error(&mut span, e),
1536 }
1537 #[cfg(feature = "otel")]
1538 timer.finish(instrumentation.metrics(), result.is_ok());
1539
1540 // Drop the span before returning
1541 #[cfg(feature = "otel")]
1542 drop(span);
1543
1544 result
1545 }
1546
1547 /// Execute a query within the transaction with a specific timeout.
1548 ///
1549 /// See [`Client<Ready>::query_with_timeout`] for details.
1550 pub async fn query_with_timeout<'a>(
1551 &'a mut self,
1552 sql: &str,
1553 params: &[&(dyn crate::ToSql + Sync)],
1554 timeout_duration: std::time::Duration,
1555 ) -> Result<QueryStream<'a>> {
1556 self.query_inner(sql, params, Some(timeout_duration)).await
1557 }
1558
1559 /// Execute a statement within the transaction with a specific timeout.
1560 ///
1561 /// See [`Client<Ready>::execute_with_timeout`] for details.
1562 pub async fn execute_with_timeout(
1563 &mut self,
1564 sql: &str,
1565 params: &[&(dyn crate::ToSql + Sync)],
1566 timeout_duration: std::time::Duration,
1567 ) -> Result<u64> {
1568 self.execute_inner(sql, params, Some(timeout_duration))
1569 .await
1570 }
1571
1572 /// Open a FILESTREAM BLOB for async reading and/or writing.
1573 ///
1574 /// This method queries the server for the transaction context, then opens
1575 /// the FILESTREAM handle using the native Win32 `OpenSqlFilestream` API.
1576 ///
1577 /// # Arguments
1578 ///
1579 /// * `path` — The UNC path obtained from the T-SQL `column.PathName()` function.
1580 /// Query this yourself before calling `open_filestream`:
1581 /// ```sql
1582 /// SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1
1583 /// ```
1584 /// * `access` — Read, write, or read/write access mode.
1585 ///
1586 /// # Requirements
1587 ///
1588 /// - SQL Server must have FILESTREAM enabled (`sp_configure 'filestream access level', 2`)
1589 /// - The Microsoft OLE DB Driver for SQL Server must be installed on the client
1590 /// - The `FileStream` must be dropped before calling [`commit`] or [`rollback`]
1591 ///
1592 /// # Example
1593 ///
1594 /// ```text
1595 /// use mssql_client::FileStreamAccess;
1596 /// use tokio::io::AsyncReadExt;
1597 ///
1598 /// let mut tx = client.begin_transaction().await?;
1599 ///
1600 /// // Get the FILESTREAM path
1601 /// let rows = tx.query(
1602 /// "SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1",
1603 /// &[&doc_id],
1604 /// ).await?;
1605 /// let path: String = rows.into_iter().next().unwrap()?.get(0)?;
1606 ///
1607 /// // Open and read the BLOB
1608 /// let mut stream = tx.open_filestream(&path, FileStreamAccess::Read).await?;
1609 /// let mut data = Vec::new();
1610 /// stream.read_to_end(&mut data).await?;
1611 /// drop(stream);
1612 ///
1613 /// tx.commit().await?;
1614 /// ```
1615 #[cfg(all(windows, feature = "filestream"))]
1616 pub async fn open_filestream(
1617 &mut self,
1618 path: &str,
1619 access: crate::filestream::FileStreamAccess,
1620 ) -> Result<crate::filestream::FileStream> {
1621 tracing::debug!(path = path, ?access, "opening FILESTREAM BLOB");
1622
1623 // Get the transaction context from SQL Server.
1624 // This binds the file access to the current SQL transaction.
1625 let txn_context: Vec<u8> = {
1626 let rows = self
1627 .query("SELECT GET_FILESTREAM_TRANSACTION_CONTEXT()", &[])
1628 .await?;
1629 let mut ctx = None;
1630 for result in rows {
1631 let row = result?;
1632 ctx = Some(row.get::<Vec<u8>>(0)?);
1633 }
1634 ctx.ok_or_else(|| {
1635 Error::FileStream("GET_FILESTREAM_TRANSACTION_CONTEXT() returned no rows".into())
1636 })?
1637 };
1638
1639 crate::filestream::FileStream::open(path, access, &txn_context)
1640 }
1641
1642 /// Commit the transaction.
1643 ///
1644 /// This transitions the client back to `Ready` state.
1645 pub async fn commit(mut self) -> Result<Client<Ready>> {
1646 tracing::debug!("committing transaction");
1647
1648 #[cfg(feature = "otel")]
1649 let instrumentation = self.instrumentation.clone();
1650 #[cfg(feature = "otel")]
1651 let mut span = instrumentation.transaction_span("COMMIT");
1652
1653 // Execute COMMIT TRANSACTION
1654 let result = async {
1655 self.send_sql_batch("COMMIT TRANSACTION").await?;
1656 self.read_execute_result().await
1657 }
1658 .await;
1659
1660 #[cfg(feature = "otel")]
1661 match &result {
1662 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1663 Err(e) => InstrumentationContext::record_error(&mut span, e),
1664 }
1665
1666 // Drop the span before moving instrumentation
1667 #[cfg(feature = "otel")]
1668 drop(span);
1669
1670 result?;
1671
1672 Ok(Client {
1673 config: self.config,
1674 _state: PhantomData,
1675 connection: self.connection,
1676 server_version: self.server_version,
1677 current_database: self.current_database,
1678 server_collation: self.server_collation,
1679 statement_cache: self.statement_cache,
1680 transaction_descriptor: 0, // Reset to auto-commit mode
1681 needs_reset: self.needs_reset,
1682 in_flight: self.in_flight,
1683 #[cfg(feature = "otel")]
1684 instrumentation: self.instrumentation,
1685 #[cfg(feature = "always-encrypted")]
1686 encryption_context: self.encryption_context,
1687 })
1688 }
1689
1690 /// Rollback the transaction.
1691 ///
1692 /// This transitions the client back to `Ready` state.
1693 pub async fn rollback(mut self) -> Result<Client<Ready>> {
1694 tracing::debug!("rolling back transaction");
1695
1696 #[cfg(feature = "otel")]
1697 let instrumentation = self.instrumentation.clone();
1698 #[cfg(feature = "otel")]
1699 let mut span = instrumentation.transaction_span("ROLLBACK");
1700
1701 // Execute ROLLBACK TRANSACTION
1702 let result = async {
1703 self.send_sql_batch("ROLLBACK TRANSACTION").await?;
1704 self.read_execute_result().await
1705 }
1706 .await;
1707
1708 #[cfg(feature = "otel")]
1709 match &result {
1710 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1711 Err(e) => InstrumentationContext::record_error(&mut span, e),
1712 }
1713
1714 // Drop the span before moving instrumentation
1715 #[cfg(feature = "otel")]
1716 drop(span);
1717
1718 result?;
1719
1720 Ok(Client {
1721 config: self.config,
1722 _state: PhantomData,
1723 connection: self.connection,
1724 server_version: self.server_version,
1725 current_database: self.current_database,
1726 server_collation: self.server_collation,
1727 statement_cache: self.statement_cache,
1728 transaction_descriptor: 0, // Reset to auto-commit mode
1729 needs_reset: self.needs_reset,
1730 in_flight: self.in_flight,
1731 #[cfg(feature = "otel")]
1732 instrumentation: self.instrumentation,
1733 #[cfg(feature = "always-encrypted")]
1734 encryption_context: self.encryption_context,
1735 })
1736 }
1737
1738 /// Create a savepoint and return a handle for later rollback.
1739 ///
1740 /// The returned `SavePoint` handle contains the validated savepoint name.
1741 /// Use it with `rollback_to()` to partially undo transaction work.
1742 ///
1743 /// # Example
1744 ///
1745 /// ```rust,no_run
1746 /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1747 /// let mut tx = client.begin_transaction().await?;
1748 /// tx.execute("INSERT INTO orders ...", &[]).await?;
1749 /// let sp = tx.save_point("before_items").await?;
1750 /// tx.execute("INSERT INTO items ...", &[]).await?;
1751 /// // Oops, rollback just the items
1752 /// tx.rollback_to(&sp).await?;
1753 /// tx.commit().await?;
1754 /// # Ok(())
1755 /// # }
1756 /// ```
1757 pub async fn save_point(&mut self, name: &str) -> Result<SavePoint> {
1758 crate::validation::validate_identifier(name)?;
1759 tracing::debug!(name = name, "creating savepoint");
1760
1761 // Execute SAVE TRANSACTION <name>
1762 // Note: name is validated by validate_identifier() to prevent SQL injection
1763 let sql = format!("SAVE TRANSACTION {name}");
1764 self.send_sql_batch(&sql).await?;
1765 self.read_execute_result().await?;
1766
1767 Ok(SavePoint::new(name.to_string()))
1768 }
1769
1770 /// Rollback to a savepoint.
1771 ///
1772 /// This rolls back all changes made after the savepoint was created,
1773 /// but keeps the transaction active. The savepoint remains valid and
1774 /// can be rolled back to again.
1775 ///
1776 /// # Example
1777 ///
1778 /// ```rust,no_run
1779 /// # async fn ex(mut tx: mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> {
1780 /// let sp = tx.save_point("checkpoint").await?;
1781 /// // ... do some work ...
1782 /// tx.rollback_to(&sp).await?; // Undo changes since checkpoint
1783 /// // Transaction is still active, savepoint is still valid
1784 /// # Ok(())
1785 /// # }
1786 /// ```
1787 pub async fn rollback_to(&mut self, savepoint: &SavePoint) -> Result<()> {
1788 tracing::debug!(name = savepoint.name(), "rolling back to savepoint");
1789
1790 // Execute ROLLBACK TRANSACTION <name>
1791 // Note: savepoint name was validated during creation
1792 let sql = format!("ROLLBACK TRANSACTION {}", savepoint.name());
1793 self.send_sql_batch(&sql).await?;
1794 self.read_execute_result().await?;
1795
1796 Ok(())
1797 }
1798
1799 /// Release a savepoint (optional cleanup).
1800 ///
1801 /// Note: SQL Server doesn't have explicit savepoint release, but this
1802 /// method is provided for API completeness. The savepoint is automatically
1803 /// released when the transaction commits or rolls back.
1804 pub async fn release_savepoint(&mut self, savepoint: SavePoint) -> Result<()> {
1805 tracing::debug!(name = savepoint.name(), "releasing savepoint");
1806
1807 // SQL Server doesn't require explicit savepoint release
1808 // The savepoint is implicitly released on commit/rollback
1809 // This method exists for API completeness
1810 drop(savepoint);
1811 Ok(())
1812 }
1813
1814 /// Get a handle for cancelling the current query within the transaction.
1815 ///
1816 /// See [`Client<Ready>::cancel_handle`] for usage examples.
1817 #[must_use]
1818 pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
1819 self.connection_cancel_handle()
1820 }
1821}
1822
1823impl<S: ConnectionState> std::fmt::Debug for Client<S> {
1824 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1825 f.debug_struct("Client")
1826 .field("host", &self.config.host)
1827 .field("port", &self.config.port)
1828 .field("database", &self.config.database)
1829 .finish()
1830 }
1831}