mssql_client/client.rs
1//! SQL Server client implementation.
2//!
3//! ## DDL and statement routing
4//!
5//! [`Client::execute`] routes automatically by parameter count: with no
6//! parameters it sends a SQL batch (which permits DDL such as `CREATE` / `ALTER`
7//! / `DROP`); with parameters it uses `sp_executesql`, whose procedure context
8//! SQL Server forbids DDL in. Run DDL with an empty parameter slice:
9//!
10//! ```rust,no_run
11//! # async fn create_table(config: mssql_client::Config) -> Result<(), mssql_client::Error> {
12//! # let mut client = mssql_client::Client::connect(config).await?;
13//! client.execute("CREATE TABLE dbo.t (id INT)", &[]).await?;
14//! # Ok(())
15//! # }
16//! ```
17//!
18//! Use [`Client::simple_query`] for fire-and-forget batches (including
19//! multi-statement, `;`-separated DDL) when you don't need the affected-row count.
20
21// Allow unwrap/expect for chrono date construction with known-valid constant dates
22// and for regex patterns that are compile-time constants
23#![allow(clippy::unwrap_used, clippy::expect_used, clippy::needless_range_loop)]
24
25mod connect;
26mod params;
27mod response;
28
29use std::marker::PhantomData;
30
31use mssql_codec::connection::Connection;
32#[cfg(feature = "tls")]
33use mssql_tls::TlsStream;
34use tds_protocol::packet::PacketType;
35use tds_protocol::rpc::RpcRequest;
36use tds_protocol::token::{EnvChange, EnvChangeType};
37use tokio::net::TcpStream;
38
39use crate::config::Config;
40use crate::error::{Error, Result};
41#[cfg(feature = "otel")]
42use crate::instrumentation::InstrumentationContext;
43use crate::state::{ConnectionState, InTransaction, Ready};
44use crate::statement_cache::StatementCache;
45use crate::stream::{MultiResultStream, QueryStream};
46use crate::transaction::SavePoint;
47
48/// How long to wait for the server to acknowledge an Attention packet after
49/// a command timeout. SqlClient waits 5 seconds before dooming the
50/// connection; we match it.
51const ATTENTION_ACK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
52
53/// Run a network future under an optional command deadline.
54///
55/// On timeout this sends an Attention packet via `canceller` and then awaits
56/// the future so its own read loop drains the server's DONE_ATTN
57/// acknowledgement, leaving the connection clean before returning
58/// [`Error::CommandTimeout`]. This is the cancel-safe alternative to dropping
59/// the future (e.g. via `tokio::time::timeout`), which would leave unconsumed
60/// TDS data in the connection buffer and desync the next request.
61///
62/// The drain itself is bounded by [`ATTENTION_ACK_TIMEOUT`] — a hung server
63/// that never acknowledges the attention must not turn the timeout into an
64/// infinite wait. When the bound expires the connection is abandoned
65/// mid-response: `in_flight` stays set, so the pool discards the connection
66/// at check-in instead of reusing it.
67pub(crate) async fn run_with_deadline<F, T>(
68 fut: F,
69 deadline: Option<std::time::Duration>,
70 canceller: crate::cancel::CancelHandle,
71) -> Result<T>
72where
73 F: std::future::Future<Output = Result<T>>,
74{
75 let Some(d) = deadline else {
76 return fut.await;
77 };
78 tokio::pin!(fut);
79 tokio::select! {
80 biased;
81 res = &mut fut => res,
82 () = tokio::time::sleep(d) => {
83 // Signal cancellation, then let the in-flight read consume the
84 // server's attention acknowledgement so the connection stays usable.
85 let drain = async {
86 let _ = canceller.cancel().await;
87 let _ = (&mut fut).await;
88 };
89 if tokio::time::timeout(ATTENTION_ACK_TIMEOUT, drain).await.is_err() {
90 tracing::warn!(
91 timeout = ?ATTENTION_ACK_TIMEOUT,
92 "server did not acknowledge attention; abandoning the connection as dirty"
93 );
94 }
95 Err(Error::CommandTimeout)
96 }
97 }
98}
99
100/// SQL Server client with type-state connection management.
101///
102/// The generic parameter `S` represents the current connection state,
103/// ensuring at compile time that certain operations are only available
104/// in appropriate states.
105pub struct Client<S: ConnectionState> {
106 config: Config,
107 _state: PhantomData<S>,
108 /// The underlying connection (present only when connected)
109 connection: Option<ConnectionHandle>,
110 /// Server version from LoginAck (raw u32 TDS version)
111 server_version: Option<u32>,
112 /// Current database from EnvChange
113 current_database: Option<String>,
114 /// Server's default collation from SqlCollation EnvChange during login.
115 /// Used when `SendStringParametersAsUnicode=false` to encode VARCHAR
116 /// parameters with the correct character encoding and collation bytes.
117 server_collation: Option<tds_protocol::token::Collation>,
118 /// Prepared statement cache for query optimization
119 statement_cache: StatementCache,
120 /// Transaction descriptor from BeginTransaction EnvChange.
121 /// Per MS-TDS spec, this value must be included in ALL_HEADERS for subsequent
122 /// requests within an explicit transaction. 0 indicates auto-commit mode.
123 transaction_descriptor: u64,
124 /// Whether a request has been sent and the response has not yet been fully read.
125 /// Used by the connection pool to detect dirty connections after cancel/timeout.
126 in_flight: bool,
127 /// Whether this connection needs a reset on next use.
128 /// Set by connection pool on checkin, cleared after first query/execute.
129 /// When true, the RESETCONNECTION flag is set on the first TDS packet.
130 needs_reset: bool,
131 /// OpenTelemetry instrumentation context (when otel feature is enabled)
132 #[cfg(feature = "otel")]
133 instrumentation: InstrumentationContext,
134 /// Always Encrypted context for column decryption (when always-encrypted feature is enabled)
135 #[cfg(feature = "always-encrypted")]
136 pub(crate) encryption_context: Option<std::sync::Arc<crate::encryption::EncryptionContext>>,
137}
138
139/// Internal connection handle wrapping the actual connection.
140///
141/// This is an enum to support different connection types:
142/// - TLS (TDS 8.0 strict mode) - requires `tls` feature
143/// - TLS with PreLogin wrapping (TDS 7.x style) - requires `tls` feature
144/// - Plain TCP (for internal networks or when `tls` feature is disabled)
145#[allow(dead_code)] // Connection will be used once query execution is implemented
146enum ConnectionHandle {
147 /// TLS connection (TDS 8.0 strict mode - TLS before any TDS traffic)
148 #[cfg(feature = "tls")]
149 Tls(Connection<TlsStream<TcpStream>>),
150 /// TLS connection with PreLogin wrapping (TDS 7.x style)
151 #[cfg(feature = "tls")]
152 TlsPrelogin(Connection<TlsStream<mssql_tls::TlsPreloginWrapper<TcpStream>>>),
153 /// Plain TCP connection (for internal networks or when `tls` feature is disabled)
154 Plain(Connection<TcpStream>),
155}
156
157/// The parameter `TypeInfo` to declare a typed NULL ([`crate::null`]) with, from
158/// its [`crate::ToSql::sql_type`] name. Returns `None` for an untyped NULL
159/// (`Option::None`, type `"NULL"`), which falls back to the default param type.
160#[cfg(feature = "always-encrypted")]
161fn null_param_type_info(sql_type: &str) -> Option<tds_protocol::rpc::TypeInfo> {
162 use tds_protocol::rpc::TypeInfo;
163 Some(match sql_type {
164 "BIT" => TypeInfo::bit(),
165 "TINYINT" => TypeInfo::tinyint(),
166 "SMALLINT" => TypeInfo::smallint(),
167 "INT" => TypeInfo::int(),
168 "BIGINT" => TypeInfo::bigint(),
169 "REAL" => TypeInfo::real(),
170 "FLOAT" => TypeInfo::float(),
171 "NVARCHAR" => TypeInfo::nvarchar(1),
172 "VARBINARY" => TypeInfo::varbinary(1),
173 "UNIQUEIDENTIFIER" => TypeInfo::uuid(),
174 "DATE" => TypeInfo::date(),
175 _ => return None,
176 })
177}
178
179// Private helper methods available to all connection states
180impl<S: ConnectionState> Client<S> {
181 /// The default per-command deadline from `command_timeout`.
182 ///
183 /// Returns `None` when `command_timeout` is zero, which means "no limit"
184 /// (matching ADO.NET's `SqlCommand.CommandTimeout = 0`).
185 pub(crate) fn command_deadline(&self) -> Option<std::time::Duration> {
186 let t = self.config.command_timeout;
187 if t.is_zero() { None } else { Some(t) }
188 }
189
190 /// Build a cancel handle for the current connection, regardless of
191 /// connection state. The public, documented surface is
192 /// [`Client::<Ready>::cancel_handle`]; both state-specific methods
193 /// delegate here.
194 pub(crate) fn connection_cancel_handle(&self) -> crate::cancel::CancelHandle {
195 let connection = self
196 .connection
197 .as_ref()
198 .expect("connection should be present");
199 match connection {
200 #[cfg(feature = "tls")]
201 ConnectionHandle::Tls(conn) => {
202 crate::cancel::CancelHandle::from_tls(conn.cancel_handle())
203 }
204 #[cfg(feature = "tls")]
205 ConnectionHandle::TlsPrelogin(conn) => {
206 crate::cancel::CancelHandle::from_tls_prelogin(conn.cancel_handle())
207 }
208 ConnectionHandle::Plain(conn) => {
209 crate::cancel::CancelHandle::from_plain(conn.cancel_handle())
210 }
211 }
212 }
213
214 /// Process transaction-related EnvChange tokens.
215 ///
216 /// This handles BeginTransaction, CommitTransaction, and RollbackTransaction
217 /// EnvChange tokens, updating the transaction descriptor accordingly.
218 ///
219 /// This enables executing BEGIN TRANSACTION, COMMIT, and ROLLBACK via raw SQL
220 /// while still having the transaction descriptor tracked correctly.
221 fn process_transaction_env_change(env: &EnvChange, transaction_descriptor: &mut u64) {
222 use tds_protocol::token::EnvChangeValue;
223
224 match env.env_type {
225 EnvChangeType::BeginTransaction => {
226 if let EnvChangeValue::Binary(ref data) = env.new_value {
227 if data.len() >= 8 {
228 let descriptor = u64::from_le_bytes([
229 data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
230 ]);
231 tracing::debug!(descriptor = descriptor, "transaction started via raw SQL");
232 *transaction_descriptor = descriptor;
233 }
234 }
235 }
236 EnvChangeType::CommitTransaction | EnvChangeType::RollbackTransaction => {
237 tracing::debug!(
238 env_type = ?env.env_type,
239 "transaction ended via raw SQL"
240 );
241 *transaction_descriptor = 0;
242 }
243 _ => {}
244 }
245 }
246
247 /// Send a SQL batch to the server.
248 ///
249 /// Uses the client's current transaction descriptor in ALL_HEADERS.
250 /// Per MS-TDS spec, when in an explicit transaction, the descriptor
251 /// returned by BeginTransaction must be included.
252 ///
253 /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
254 /// is included in the first packet to reset connection state.
255 async fn send_sql_batch(&mut self, sql: &str) -> Result<()> {
256 let payload =
257 tds_protocol::encode_sql_batch_with_transaction(sql, self.transaction_descriptor);
258 let max_packet = self.config.packet_size as usize;
259
260 // Check if we need to reset the connection on this request
261 let reset = self.needs_reset;
262 if reset {
263 self.needs_reset = false; // Clear flag before sending
264 tracing::debug!("sending SQL batch with RESETCONNECTION flag");
265 }
266
267 self.in_flight = true;
268 let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
269
270 match connection {
271 #[cfg(feature = "tls")]
272 ConnectionHandle::Tls(conn) => {
273 conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
274 .await?;
275 }
276 #[cfg(feature = "tls")]
277 ConnectionHandle::TlsPrelogin(conn) => {
278 conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
279 .await?;
280 }
281 ConnectionHandle::Plain(conn) => {
282 conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
283 .await?;
284 }
285 }
286
287 Ok(())
288 }
289
290 /// Send an RPC request to the server.
291 ///
292 /// Uses the client's current transaction descriptor in ALL_HEADERS.
293 ///
294 /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
295 /// is included in the first packet to reset connection state.
296 pub(crate) async fn send_rpc(&mut self, rpc: &RpcRequest) -> Result<()> {
297 let payload = rpc.encode_with_transaction(self.transaction_descriptor);
298 let max_packet = self.config.packet_size as usize;
299
300 // Check if we need to reset the connection on this request
301 let reset = self.needs_reset;
302 if reset {
303 self.needs_reset = false; // Clear flag before sending
304 tracing::debug!("sending RPC with RESETCONNECTION flag");
305 }
306
307 self.in_flight = true;
308 let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
309
310 match connection {
311 #[cfg(feature = "tls")]
312 ConnectionHandle::Tls(conn) => {
313 conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
314 .await?;
315 }
316 #[cfg(feature = "tls")]
317 ConnectionHandle::TlsPrelogin(conn) => {
318 conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
319 .await?;
320 }
321 ConnectionHandle::Plain(conn) => {
322 conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
323 .await?;
324 }
325 }
326
327 Ok(())
328 }
329
330 /// Start building a stored procedure call with full control over parameters.
331 ///
332 /// Returns a [`crate::procedure::ProcedureBuilder`] that allows adding named input and output
333 /// parameters before executing the call.
334 ///
335 /// The procedure name is validated to prevent SQL injection. It may be
336 /// schema-qualified (e.g., `"dbo.MyProc"`).
337 ///
338 /// # Example
339 ///
340 /// ```rust,no_run
341 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
342 /// let result = client.procedure("dbo.CalculateSum")?
343 /// .input("@a", &10i32)
344 /// .input("@b", &20i32)
345 /// .output_int("@result")
346 /// .execute().await?;
347 ///
348 /// let sum = result.get_output("@result").unwrap();
349 /// # let _ = sum;
350 /// # Ok(())
351 /// # }
352 /// ```
353 pub fn procedure(
354 &mut self,
355 proc_name: &str,
356 ) -> Result<crate::procedure::ProcedureBuilder<'_, S>> {
357 crate::validation::validate_qualified_identifier(proc_name)?;
358 Ok(crate::procedure::ProcedureBuilder::new(self, proc_name))
359 }
360
361 /// Execute a stored procedure with positional input parameters.
362 ///
363 /// This is a convenience method for the common case of calling a procedure
364 /// with input-only parameters. For output parameters or named parameters,
365 /// use [`procedure()`](Client::procedure) instead.
366 ///
367 /// # Example
368 ///
369 /// ```rust,no_run
370 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
371 /// let result = client.call_procedure("dbo.GetUser", &[&1i32]).await?;
372 /// assert_eq!(result.return_value, 0);
373 ///
374 /// if let Some(rs) = result.first_result_set() {
375 /// println!("columns: {:?}", rs.columns());
376 /// }
377 /// # Ok(())
378 /// # }
379 /// ```
380 pub async fn call_procedure(
381 &mut self,
382 proc_name: &str,
383 params: &[&(dyn crate::ToSql + Sync)],
384 ) -> Result<crate::stream::ProcedureResult> {
385 crate::validation::validate_qualified_identifier(proc_name)?;
386
387 tracing::debug!(
388 proc_name = proc_name,
389 params_count = params.len(),
390 "executing stored procedure"
391 );
392
393 let rpc_params =
394 Self::convert_params_positional(params, self.send_unicode(), self.server_collation())?;
395 let mut rpc = RpcRequest::named(proc_name);
396 for param in rpc_params {
397 rpc = rpc.param(param);
398 }
399
400 let deadline = self.command_deadline();
401 let canceller = self.connection_cancel_handle();
402 run_with_deadline(
403 async {
404 self.send_rpc(&rpc).await?;
405 self.read_procedure_result().await
406 },
407 deadline,
408 canceller,
409 )
410 .await
411 }
412
413 /// Ask the server how each parameter of a statement must be encrypted.
414 ///
415 /// Issues the `sp_describe_parameter_encryption` system RPC for the
416 /// parameterized statement `tsql` with the parameter declaration `params`
417 /// (e.g. `"@id int, @name nvarchar(64)"`), and parses the two result sets
418 /// into a [`ParameterEncryptionInfo`](crate::ParameterEncryptionInfo): the
419 /// CEK table, plus — for each parameter the server reports as encrypted —
420 /// which CEK and whether deterministic or randomized. Parameters the server
421 /// reports as plaintext are omitted.
422 ///
423 /// This is the first step of Always Encrypted parameter encryption; the
424 /// connection must have negotiated it (`Column Encryption Setting=Enabled`).
425 #[cfg(feature = "always-encrypted")]
426 pub async fn describe_parameter_encryption(
427 &mut self,
428 tsql: &str,
429 params: &str,
430 ) -> Result<crate::ParameterEncryptionInfo> {
431 let tsql_arg = tsql.to_string();
432 let params_arg = params.to_string();
433 let mut result = self
434 .call_procedure(
435 "sp_describe_parameter_encryption",
436 &[&tsql_arg, ¶ms_arg],
437 )
438 .await?;
439 crate::ParameterEncryptionInfo::from_describe_result_sets(&mut result.result_sets)
440 }
441
442 /// Build the `sp_executesql` request for a parameterized statement.
443 ///
444 /// When the connection has Always Encrypted enabled, parameters the server
445 /// reports as encrypted are encrypted client-side first (an extra
446 /// `sp_describe_parameter_encryption` round-trip). Otherwise this is the
447 /// plain parameter conversion.
448 pub(crate) async fn build_parameterized_rpc(
449 &mut self,
450 sql: &str,
451 params: &[&(dyn crate::ToSql + Sync)],
452 ) -> Result<RpcRequest> {
453 #[cfg(feature = "always-encrypted")]
454 if self.encryption_context.is_some() {
455 return self.build_encrypted_sql_rpc(sql, params).await;
456 }
457 let rpc_params =
458 Self::convert_params(params, self.send_unicode(), self.server_collation())?;
459 Ok(RpcRequest::execute_sql(sql, rpc_params))
460 }
461
462 /// Encrypt the Always Encrypted parameters of a statement, then build its
463 /// `sp_executesql` request.
464 ///
465 /// Asks the server which parameters are encrypted
466 /// ([`describe_parameter_encryption`](Self::describe_parameter_encryption)),
467 /// then for each one normalizes the value, resolves its column encryption
468 /// key, encrypts, and emits an encrypted RPC parameter. Parameters the
469 /// server reports as plaintext are sent unchanged.
470 #[cfg(feature = "always-encrypted")]
471 async fn build_encrypted_sql_rpc(
472 &mut self,
473 sql: &str,
474 params: &[&(dyn crate::ToSql + Sync)],
475 ) -> Result<RpcRequest> {
476 use tds_protocol::rpc::RpcParam;
477
478 let Some(ctx) = self.encryption_context.clone() else {
479 let rpc_params =
480 Self::convert_params(params, self.send_unicode(), self.server_collation())?;
481 return Ok(RpcRequest::execute_sql(sql, rpc_params));
482 };
483
484 // Resolve each parameter's value once (AE normalization needs the typed
485 // value, not the wire encoding) and build the plaintext RPC params.
486 let send_unicode = self.send_unicode();
487 let collation = self.server_collation().cloned();
488 let mut values: Vec<mssql_types::SqlValue> = Vec::with_capacity(params.len());
489 let mut plaintext: Vec<RpcParam> = Vec::with_capacity(params.len());
490 for (i, p) in params.iter().enumerate() {
491 let name = format!("@p{}", i + 1);
492 let value = p.to_sql()?;
493 // A typed NULL (e.g. `null::<i32>()`) is declared by its SQL type so
494 // describe accepts it against the target encrypted column; an untyped
495 // NULL falls back to the default in `sql_value_to_rpc_param`.
496 let rpc_param = match (&value, null_param_type_info(p.sql_type())) {
497 (mssql_types::SqlValue::Null, Some(type_info)) => RpcParam::null(&name, type_info),
498 _ => {
499 let mut param = Self::sql_value_to_rpc_param(
500 &name,
501 &value,
502 send_unicode,
503 collation.as_ref(),
504 )?;
505 // `numeric(v, p, s)` declares an explicit `decimal(p, s)` so
506 // describe matches an encrypted decimal column exactly — the
507 // value alone conveys scale but not precision.
508 if let Some(d) = p.decimal_param_info() {
509 param.type_info =
510 tds_protocol::rpc::TypeInfo::decimal(d.precision, d.scale);
511 }
512 param
513 }
514 };
515 plaintext.push(rpc_param);
516 values.push(value);
517 }
518
519 if plaintext.is_empty() {
520 return Ok(RpcRequest::execute_sql(sql, plaintext));
521 }
522
523 // Ask the server which parameters need encryption.
524 let declarations = RpcRequest::build_param_declarations(&plaintext);
525 let info = self
526 .describe_parameter_encryption(sql, &declarations)
527 .await?;
528 if info.parameters.is_empty() {
529 return Ok(RpcRequest::execute_sql(sql, plaintext));
530 }
531
532 // Encrypt the flagged parameters; pass the rest through untouched.
533 let mut final_params: Vec<RpcParam> = Vec::with_capacity(plaintext.len());
534 for (value, param) in values.into_iter().zip(plaintext) {
535 let Some(crypto) = info.get_parameter(¶m.name) else {
536 final_params.push(param);
537 continue;
538 };
539 let entry = info.cek_table.get(crypto.cek_ordinal).ok_or_else(|| {
540 Error::Protocol(format!(
541 "encrypted parameter {} references missing CEK ordinal {}",
542 param.name, crypto.cek_ordinal
543 ))
544 })?;
545 let metadata = tds_protocol::rpc::EncryptedParamMetadata {
546 base_type_info: param.type_info.clone(),
547 algorithm_id: crypto.algorithm_id,
548 encryption_type: crypto.encryption_type,
549 database_id: entry.database_id,
550 cek_id: entry.cek_id,
551 cek_version: entry.cek_version,
552 cek_md_version: entry.cek_md_version,
553 normalization_rule_version: crypto.normalization_rule_version,
554 };
555 // A NULL value bound to an encrypted column is sent as an encrypted
556 // NULL (the server rejects a plaintext parameter for an encrypted
557 // column); there is nothing to encrypt.
558 if matches!(value, mssql_types::SqlValue::Null) {
559 final_params.push(RpcParam::encrypted_null(param.name, metadata));
560 continue;
561 }
562 let normalized = crate::encryption::normalize_for_encryption(&value)?;
563 let ciphertext = ctx
564 .encrypt_value(&normalized, entry, crypto.encryption_type)
565 .await?;
566 final_params.push(RpcParam::encrypted(
567 param.name,
568 bytes::Bytes::from(ciphertext),
569 metadata,
570 ));
571 }
572
573 Ok(RpcRequest::execute_sql(sql, final_params))
574 }
575
576 /// Start a bulk insert operation for the specified table.
577 ///
578 /// Sends the `INSERT BULK` statement to the server and returns a
579 /// [`crate::bulk::BulkWriter`] for streaming rows. The writer holds
580 /// a mutable borrow on the client, preventing other operations while
581 /// the bulk insert is in progress.
582 ///
583 /// # Example
584 ///
585 /// ```rust,no_run
586 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
587 /// use mssql_client::{BulkInsertBuilder, BulkColumn, SqlValue};
588 ///
589 /// let builder = BulkInsertBuilder::new("dbo.Users")
590 /// .with_typed_columns(vec![
591 /// BulkColumn::new("id", "INT", 0)?,
592 /// BulkColumn::new("name", "NVARCHAR(100)", 1)?,
593 /// ]);
594 ///
595 /// let mut writer = client.bulk_insert(&builder).await?;
596 /// writer.send_row_values(&[SqlValue::Int(1), SqlValue::String("Alice".into())])?;
597 /// writer.send_row_values(&[SqlValue::Int(2), SqlValue::String("Bob".into())])?;
598 /// let result = writer.finish().await?;
599 /// println!("Inserted {} rows", result.rows_affected);
600 /// # Ok(())
601 /// # }
602 /// ```
603 pub async fn bulk_insert(
604 &mut self,
605 builder: &crate::bulk::BulkInsertBuilder,
606 ) -> Result<crate::bulk::BulkWriter<'_, S>> {
607 use tds_protocol::token::{ColMetaData, Token};
608
609 tracing::debug!(
610 table = builder.table_name(),
611 columns = builder.columns().len(),
612 "starting bulk insert"
613 );
614
615 // Step 1: Query the server for column metadata.
616 // This gives us the exact type encoding the server expects for BulkLoad,
617 // following the pattern established by Tiberius.
618 let meta_query = format!("SELECT TOP 0 * FROM {}", builder.table_name());
619 let deadline = self.command_deadline();
620 let canceller = self.connection_cancel_handle();
621 let message = run_with_deadline(
622 async {
623 self.send_sql_batch(&meta_query).await?;
624 self.read_response_message().await
625 },
626 deadline,
627 canceller,
628 )
629 .await?;
630 self.in_flight = false;
631
632 // Capture both the raw COLMETADATA bytes and parsed column info
633 let raw_payload = message.payload.clone();
634 let mut parser = self.create_parser(message.payload);
635 let mut server_metadata: Option<ColMetaData> = None;
636 let mut meta_start: usize = 0;
637 let mut meta_end: usize = 0;
638
639 loop {
640 let pos_before = raw_payload.len() - parser.remaining();
641 let token = parser.next_token_with_metadata(server_metadata.as_ref())?;
642 let pos_after = raw_payload.len() - parser.remaining();
643 let Some(token) = token else { break };
644
645 match token {
646 Token::ColMetaData(meta) => {
647 meta_start = pos_before;
648 meta_end = pos_after;
649 server_metadata = Some(meta);
650 }
651 Token::Done(_) => break,
652 _ => {}
653 }
654 }
655
656 // Reject deprecated TEXT/NTEXT/IMAGE columns reported by the server.
657 // These types require a legacy TEXTPTR wire format that this driver
658 // does not support — users should migrate the column to VARCHAR(MAX) /
659 // NVARCHAR(MAX) / VARBINARY(MAX).
660 if let Some(ref meta) = server_metadata {
661 use tds_protocol::types::TypeId;
662 for col in meta.columns.iter() {
663 let (rejected, replacement) = match col.type_id {
664 TypeId::Text => (Some("TEXT"), "VARCHAR(MAX)"),
665 TypeId::NText => (Some("NTEXT"), "NVARCHAR(MAX)"),
666 TypeId::Image => (Some("IMAGE"), "VARBINARY(MAX)"),
667 _ => (None, ""),
668 };
669 if let Some(sql_type) = rejected {
670 return Err(Error::from(mssql_types::TypeError::UnsupportedType {
671 sql_type: sql_type.to_string(),
672 reason: format!(
673 "column `{}` in table `{}` is {} — TEXT/NTEXT/IMAGE \
674 are not supported. Alter the column to {} instead \
675 (Microsoft deprecated TEXT/NTEXT/IMAGE in SQL \
676 Server 2005).",
677 col.name,
678 builder.table_name(),
679 sql_type,
680 replacement,
681 ),
682 }));
683 }
684 }
685 }
686
687 // Step 2: Send INSERT BULK statement to put server in bulk load mode
688 let stmt = builder.build_insert_bulk_statement()?;
689 let deadline = self.command_deadline();
690 let canceller = self.connection_cancel_handle();
691 run_with_deadline(
692 async {
693 self.send_sql_batch(&stmt).await?;
694 self.read_execute_result().await
695 },
696 deadline,
697 canceller,
698 )
699 .await?;
700
701 // Step 3: Create bulk writer with server's metadata
702 let raw_meta = if meta_end > meta_start {
703 Some(raw_payload.slice(meta_start..meta_end))
704 } else {
705 None
706 };
707
708 let server_cols = server_metadata.as_ref().map(|m| m.columns.as_slice());
709 let bulk = crate::bulk::BulkInsert::new_with_server_metadata(
710 builder.columns().to_vec(),
711 builder.options().batch_size,
712 raw_meta,
713 server_cols,
714 );
715
716 Ok(crate::bulk::BulkWriter::new(self, bulk))
717 }
718
719 /// Start a bulk insert without querying the server for column metadata.
720 ///
721 /// Unlike [`bulk_insert()`](Self::bulk_insert), this method does not send
722 /// `SELECT TOP 0 * FROM table` to discover column types. Instead, the
723 /// column metadata is constructed from the `BulkColumn` types provided
724 /// on the builder. This saves a round-trip when the schema is known.
725 ///
726 /// # Caveats
727 ///
728 /// The caller must ensure `BulkColumn` entries match the target table's
729 /// column definitions exactly. Mismatched types, lengths, precision/scale,
730 /// or column ordering will cause the server to reject the BulkLoad packet.
731 ///
732 /// For most use cases, prefer [`bulk_insert()`](Self::bulk_insert) — the
733 /// extra round-trip is usually negligible and the server-supplied metadata
734 /// is guaranteed correct.
735 pub async fn bulk_insert_without_schema_discovery(
736 &mut self,
737 builder: &crate::bulk::BulkInsertBuilder,
738 ) -> Result<crate::bulk::BulkWriter<'_, S>> {
739 tracing::debug!(
740 table = builder.table_name(),
741 columns = builder.columns().len(),
742 "starting bulk insert (no schema discovery)"
743 );
744
745 // Send INSERT BULK statement to put server in bulk load mode
746 let stmt = builder.build_insert_bulk_statement()?;
747 let deadline = self.command_deadline();
748 let canceller = self.connection_cancel_handle();
749 run_with_deadline(
750 async {
751 self.send_sql_batch(&stmt).await?;
752 self.read_execute_result().await
753 },
754 deadline,
755 canceller,
756 )
757 .await?;
758
759 // Create bulk writer with hand-crafted metadata
760 let bulk =
761 crate::bulk::BulkInsert::new(builder.columns().to_vec(), builder.options().batch_size);
762
763 Ok(crate::bulk::BulkWriter::new(self, bulk))
764 }
765
766 /// Send bulk load data as a BulkLoad (0x07) message and read the server response.
767 ///
768 /// Used internally by [`crate::bulk::BulkWriter::finish()`] to transmit accumulated
769 /// row data after the `INSERT BULK` statement has been acknowledged.
770 pub(crate) async fn send_and_read_bulk_load(&mut self, payload: bytes::Bytes) -> Result<u64> {
771 let max_packet = self.config.packet_size as usize;
772
773 self.in_flight = true;
774 let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
775
776 match connection {
777 #[cfg(feature = "tls")]
778 ConnectionHandle::Tls(conn) => {
779 conn.send_message(PacketType::BulkLoad, payload, max_packet)
780 .await?;
781 }
782 #[cfg(feature = "tls")]
783 ConnectionHandle::TlsPrelogin(conn) => {
784 conn.send_message(PacketType::BulkLoad, payload, max_packet)
785 .await?;
786 }
787 ConnectionHandle::Plain(conn) => {
788 conn.send_message(PacketType::BulkLoad, payload, max_packet)
789 .await?;
790 }
791 }
792
793 // Read the server's Done response with row count
794 self.read_execute_result().await
795 }
796
797 /// Execute a query with named parameters and return a streaming result set.
798 ///
799 /// This method accepts [`NamedParam`](crate::to_params::NamedParam) values,
800 /// making it compatible with the [`ToParams`](crate::to_params::ToParams) trait
801 /// and the `#[derive(ToParams)]` macro.
802 ///
803 /// # Example
804 ///
805 /// ```rust,no_run
806 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
807 /// use mssql_client::{NamedParam, ToParams};
808 ///
809 /// // With derive macro:
810 /// #[derive(mssql_derive::ToParams)]
811 /// struct UserQuery { name: String }
812 ///
813 /// let q = UserQuery { name: "Alice".into() };
814 /// let rows = client.query_named(
815 /// "SELECT * FROM users WHERE name = @name",
816 /// &q.to_params()?,
817 /// ).await?;
818 ///
819 /// // Or manually:
820 /// let params = vec![NamedParam::from_value("name", &"Alice")?];
821 /// let rows = client.query_named(
822 /// "SELECT * FROM users WHERE name = @name",
823 /// ¶ms,
824 /// ).await?;
825 /// # let _ = rows;
826 /// # Ok(())
827 /// # }
828 /// ```
829 pub async fn query_named<'a>(
830 &'a mut self,
831 sql: &str,
832 params: &[crate::to_params::NamedParam],
833 ) -> Result<QueryStream<'a>> {
834 tracing::debug!(
835 sql = sql,
836 params_count = params.len(),
837 "executing query with named parameters"
838 );
839
840 if params.is_empty() {
841 self.send_sql_batch(sql).await?;
842 } else {
843 let rpc_params =
844 Self::convert_named_params(params, self.send_unicode(), self.server_collation())?;
845 let rpc = RpcRequest::execute_sql(sql, rpc_params);
846 self.send_rpc(&rpc).await?;
847 }
848
849 let resp = self.read_query_response().await?;
850 #[cfg(feature = "always-encrypted")]
851 {
852 Ok(QueryStream::from_raw(
853 resp.columns,
854 resp.pending_rows,
855 resp.meta,
856 resp.decryptor,
857 ))
858 }
859 #[cfg(not(feature = "always-encrypted"))]
860 {
861 Ok(QueryStream::from_raw(
862 resp.columns,
863 resp.pending_rows,
864 resp.meta,
865 ))
866 }
867 }
868
869 /// Execute a statement with named parameters.
870 ///
871 /// Returns the number of affected rows. This is the named-parameter
872 /// counterpart of [`execute()`](Client::execute), compatible with the
873 /// [`ToParams`](crate::to_params::ToParams) trait.
874 ///
875 /// # Example
876 ///
877 /// ```rust,no_run
878 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
879 /// use mssql_client::NamedParam;
880 ///
881 /// let params = vec![
882 /// NamedParam::from_value("name", &"Alice")?,
883 /// NamedParam::from_value("email", &"alice@example.com")?,
884 /// ];
885 /// let rows_affected = client.execute_named(
886 /// "INSERT INTO users (name, email) VALUES (@name, @email)",
887 /// ¶ms,
888 /// ).await?;
889 /// # let _ = rows_affected;
890 /// # Ok(())
891 /// # }
892 /// ```
893 pub async fn execute_named(
894 &mut self,
895 sql: &str,
896 params: &[crate::to_params::NamedParam],
897 ) -> Result<u64> {
898 tracing::debug!(
899 sql = sql,
900 params_count = params.len(),
901 "executing statement with named parameters"
902 );
903
904 let deadline = self.command_deadline();
905 let canceller = self.connection_cancel_handle();
906 run_with_deadline(
907 async {
908 if params.is_empty() {
909 self.send_sql_batch(sql).await?;
910 } else {
911 let rpc_params = Self::convert_named_params(
912 params,
913 self.send_unicode(),
914 self.server_collation(),
915 )?;
916 let rpc = RpcRequest::execute_sql(sql, rpc_params);
917 self.send_rpc(&rpc).await?;
918 }
919
920 self.read_execute_result().await
921 },
922 deadline,
923 canceller,
924 )
925 .await
926 }
927
928 /// Whether string parameters are sent as NVARCHAR (Unicode).
929 pub(crate) fn send_unicode(&self) -> bool {
930 self.config.send_string_parameters_as_unicode
931 }
932
933 /// Server's default collation, captured from ENVCHANGE during login.
934 pub(crate) fn server_collation(&self) -> Option<&tds_protocol::token::Collation> {
935 self.server_collation.as_ref()
936 }
937}
938
939impl Client<Ready> {
940 /// Mark this connection as needing a reset on next use.
941 ///
942 /// Called by the connection pool when a connection is returned.
943 /// The next SQL batch or RPC will include the RESETCONNECTION flag
944 /// in the TDS packet header, causing SQL Server to reset connection
945 /// state (temp tables, SET options, transaction isolation level, etc.)
946 /// before executing the command.
947 ///
948 /// This is more efficient than calling `sp_reset_connection` as a
949 /// separate command because it's handled at the TDS protocol level.
950 pub fn mark_needs_reset(&mut self) {
951 self.needs_reset = true;
952 }
953
954 /// Check if this connection needs a reset.
955 ///
956 /// Returns true if `mark_needs_reset()` was called and the reset
957 /// hasn't been performed yet.
958 #[must_use]
959 pub fn needs_reset(&self) -> bool {
960 self.needs_reset
961 }
962
963 /// Execute a query and return a result set with lazy per-row decoding.
964 ///
965 /// Per ADR-007 the full response is buffered in memory and each row is
966 /// *decoded* on demand as you iterate — this is not incremental network
967 /// streaming, so peak memory tracks the response size. Use
968 /// `.collect_all()` if you want all rows materialized into a `Vec` up
969 /// front.
970 ///
971 /// # Example
972 ///
973 /// ```rust,no_run
974 /// # use mssql_client::Row;
975 /// # fn process(_: &Row) {}
976 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
977 /// // Streaming (synchronous iteration over the result set)
978 /// let stream = client.query("SELECT * FROM users WHERE id = @p1", &[&1]).await?;
979 /// for row in stream {
980 /// let row = row?;
981 /// process(&row);
982 /// }
983 ///
984 /// // Buffered (loads all into memory)
985 /// let rows: Vec<Row> = client
986 /// .query("SELECT * FROM small_table", &[])
987 /// .await?
988 /// .collect_all()
989 /// .await?;
990 /// # let _ = rows;
991 /// # Ok(())
992 /// # }
993 /// ```
994 pub async fn query<'a>(
995 &'a mut self,
996 sql: &str,
997 params: &[&(dyn crate::ToSql + Sync)],
998 ) -> Result<QueryStream<'a>> {
999 let deadline = self.command_deadline();
1000 self.query_inner(sql, params, deadline).await
1001 }
1002
1003 /// Shared query implementation with an explicit command deadline.
1004 async fn query_inner<'a>(
1005 &'a mut self,
1006 sql: &str,
1007 params: &[&(dyn crate::ToSql + Sync)],
1008 deadline: Option<std::time::Duration>,
1009 ) -> Result<QueryStream<'a>> {
1010 tracing::debug!(sql = sql, params_count = params.len(), "executing query");
1011
1012 #[cfg(feature = "otel")]
1013 let instrumentation = self.instrumentation.clone();
1014 #[cfg(feature = "otel")]
1015 let mut span = instrumentation.query_span(sql);
1016 #[cfg(feature = "otel")]
1017 let timer = crate::instrumentation::OperationTimer::start(
1018 crate::instrumentation::extract_operation(sql),
1019 );
1020
1021 let canceller = self.cancel_handle();
1022 let result = run_with_deadline(
1023 async {
1024 if params.is_empty() {
1025 // Simple query without parameters - use SQL batch
1026 self.send_sql_batch(sql).await?;
1027 } else {
1028 // Parameterized query - sp_executesql (encrypts Always Encrypted params).
1029 let rpc = self.build_parameterized_rpc(sql, params).await?;
1030 self.send_rpc(&rpc).await?;
1031 }
1032
1033 // Read complete response including columns and rows
1034 self.read_query_response().await
1035 },
1036 deadline,
1037 canceller,
1038 )
1039 .await;
1040
1041 #[cfg(feature = "otel")]
1042 match &result {
1043 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1044 Err(e) => InstrumentationContext::record_error(&mut span, e),
1045 }
1046 #[cfg(feature = "otel")]
1047 timer.finish(instrumentation.metrics(), result.is_ok());
1048
1049 // Drop the span before returning
1050 #[cfg(feature = "otel")]
1051 drop(span);
1052
1053 let resp = result?;
1054 #[cfg(feature = "always-encrypted")]
1055 {
1056 Ok(QueryStream::from_raw(
1057 resp.columns,
1058 resp.pending_rows,
1059 resp.meta,
1060 resp.decryptor,
1061 ))
1062 }
1063 #[cfg(not(feature = "always-encrypted"))]
1064 {
1065 Ok(QueryStream::from_raw(
1066 resp.columns,
1067 resp.pending_rows,
1068 resp.meta,
1069 ))
1070 }
1071 }
1072
1073 /// Execute a query with a specific timeout.
1074 ///
1075 /// This overrides the default `command_timeout` from the connection configuration
1076 /// for this specific query. If the query does not complete within the specified
1077 /// duration, the driver sends an Attention packet to cancel it server-side,
1078 /// drains the acknowledgement, and returns [`Error::CommandTimeout`] with the
1079 /// connection left usable for the next request.
1080 ///
1081 /// # Arguments
1082 ///
1083 /// * `sql` - The SQL query to execute
1084 /// * `params` - Query parameters
1085 /// * `timeout_duration` - Maximum time to wait for the query to complete
1086 ///
1087 /// # Example
1088 ///
1089 /// ```rust,no_run
1090 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1091 /// use std::time::Duration;
1092 ///
1093 /// // Execute with a 5-second timeout
1094 /// let rows = client
1095 /// .query_with_timeout(
1096 /// "SELECT * FROM large_table",
1097 /// &[],
1098 /// Duration::from_secs(5),
1099 /// )
1100 /// .await?;
1101 /// # let _ = rows;
1102 /// # Ok(())
1103 /// # }
1104 /// ```
1105 pub async fn query_with_timeout<'a>(
1106 &'a mut self,
1107 sql: &str,
1108 params: &[&(dyn crate::ToSql + Sync)],
1109 timeout_duration: std::time::Duration,
1110 ) -> Result<QueryStream<'a>> {
1111 self.query_inner(sql, params, Some(timeout_duration)).await
1112 }
1113
1114 /// Execute a batch that may return multiple result sets.
1115 ///
1116 /// This is useful for stored procedures or SQL batches that contain
1117 /// multiple SELECT statements.
1118 ///
1119 /// # Example
1120 ///
1121 /// ```rust,no_run
1122 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1123 /// // Execute a batch with multiple SELECT statements
1124 /// let mut results = client.query_multiple(
1125 /// "SELECT 1 AS a; SELECT 2 AS b, 3 AS c;",
1126 /// &[]
1127 /// ).await?;
1128 ///
1129 /// // Process first result set
1130 /// while let Some(row) = results.next_row().await? {
1131 /// println!("Result 1: {:?}", row);
1132 /// }
1133 ///
1134 /// // Move to second result set
1135 /// if results.next_result().await? {
1136 /// while let Some(row) = results.next_row().await? {
1137 /// println!("Result 2: {:?}", row);
1138 /// }
1139 /// }
1140 /// # Ok(())
1141 /// # }
1142 /// ```
1143 pub async fn query_multiple<'a>(
1144 &'a mut self,
1145 sql: &str,
1146 params: &[&(dyn crate::ToSql + Sync)],
1147 ) -> Result<MultiResultStream<'a>> {
1148 tracing::debug!(
1149 sql = sql,
1150 params_count = params.len(),
1151 "executing multi-result query"
1152 );
1153
1154 let deadline = self.command_deadline();
1155 let canceller = self.connection_cancel_handle();
1156 let result_sets = run_with_deadline(
1157 async {
1158 if params.is_empty() {
1159 // Simple batch without parameters - use SQL batch
1160 self.send_sql_batch(sql).await?;
1161 } else {
1162 // Parameterized query - sp_executesql (encrypts Always Encrypted params).
1163 let rpc = self.build_parameterized_rpc(sql, params).await?;
1164 self.send_rpc(&rpc).await?;
1165 }
1166
1167 // Read all result sets
1168 self.read_multi_result_response().await
1169 },
1170 deadline,
1171 canceller,
1172 )
1173 .await?;
1174 Ok(MultiResultStream::new(result_sets))
1175 }
1176
1177 /// Execute a query that doesn't return rows.
1178 ///
1179 /// Returns the number of affected rows.
1180 pub async fn execute(
1181 &mut self,
1182 sql: &str,
1183 params: &[&(dyn crate::ToSql + Sync)],
1184 ) -> Result<u64> {
1185 let deadline = self.command_deadline();
1186 self.execute_inner(sql, params, deadline).await
1187 }
1188
1189 /// Shared execute implementation with an explicit command deadline.
1190 async fn execute_inner(
1191 &mut self,
1192 sql: &str,
1193 params: &[&(dyn crate::ToSql + Sync)],
1194 deadline: Option<std::time::Duration>,
1195 ) -> Result<u64> {
1196 tracing::debug!(
1197 sql = sql,
1198 params_count = params.len(),
1199 "executing statement"
1200 );
1201
1202 #[cfg(feature = "otel")]
1203 let instrumentation = self.instrumentation.clone();
1204 #[cfg(feature = "otel")]
1205 let mut span = instrumentation.query_span(sql);
1206 #[cfg(feature = "otel")]
1207 let timer = crate::instrumentation::OperationTimer::start(
1208 crate::instrumentation::extract_operation(sql),
1209 );
1210
1211 let canceller = self.cancel_handle();
1212 let result = run_with_deadline(
1213 async {
1214 if params.is_empty() {
1215 // Simple statement without parameters - use SQL batch
1216 self.send_sql_batch(sql).await?;
1217 } else {
1218 // Parameterized statement - sp_executesql (encrypts Always Encrypted params).
1219 let rpc = self.build_parameterized_rpc(sql, params).await?;
1220 self.send_rpc(&rpc).await?;
1221 }
1222
1223 // Read response and get row count
1224 self.read_execute_result().await
1225 },
1226 deadline,
1227 canceller,
1228 )
1229 .await;
1230
1231 #[cfg(feature = "otel")]
1232 match &result {
1233 Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
1234 Err(e) => InstrumentationContext::record_error(&mut span, e),
1235 }
1236 #[cfg(feature = "otel")]
1237 timer.finish(instrumentation.metrics(), result.is_ok());
1238
1239 // Drop the span before returning
1240 #[cfg(feature = "otel")]
1241 drop(span);
1242
1243 result
1244 }
1245
1246 /// Execute a statement with a specific timeout.
1247 ///
1248 /// This overrides the default `command_timeout` from the connection configuration
1249 /// for this specific statement. If the statement does not complete within the
1250 /// specified duration, the driver sends an Attention packet to cancel it
1251 /// server-side, drains the acknowledgement, and returns
1252 /// [`Error::CommandTimeout`] with the connection left usable.
1253 ///
1254 /// # Arguments
1255 ///
1256 /// * `sql` - The SQL statement to execute
1257 /// * `params` - Statement parameters
1258 /// * `timeout_duration` - Maximum time to wait for the statement to complete
1259 ///
1260 /// # Example
1261 ///
1262 /// ```rust,no_run
1263 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1264 /// use std::time::Duration;
1265 ///
1266 /// // Execute with a 10-second timeout
1267 /// let rows_affected = client
1268 /// .execute_with_timeout(
1269 /// "UPDATE large_table SET status = @p1",
1270 /// &[&"processed"],
1271 /// Duration::from_secs(10),
1272 /// )
1273 /// .await?;
1274 /// # let _ = rows_affected;
1275 /// # Ok(())
1276 /// # }
1277 /// ```
1278 pub async fn execute_with_timeout(
1279 &mut self,
1280 sql: &str,
1281 params: &[&(dyn crate::ToSql + Sync)],
1282 timeout_duration: std::time::Duration,
1283 ) -> Result<u64> {
1284 self.execute_inner(sql, params, Some(timeout_duration))
1285 .await
1286 }
1287
1288 /// Begin a transaction.
1289 ///
1290 /// This transitions the client from `Ready` to `InTransaction` state.
1291 /// Per MS-TDS spec, the server returns a transaction descriptor in the
1292 /// BeginTransaction EnvChange token that must be included in subsequent
1293 /// ALL_HEADERS sections.
1294 pub async fn begin_transaction(mut self) -> Result<Client<InTransaction>> {
1295 tracing::debug!("beginning transaction");
1296
1297 #[cfg(feature = "otel")]
1298 let instrumentation = self.instrumentation.clone();
1299 #[cfg(feature = "otel")]
1300 let mut span = instrumentation.transaction_span("BEGIN");
1301
1302 // Execute BEGIN TRANSACTION and extract the transaction descriptor
1303 let result = async {
1304 self.send_sql_batch("BEGIN TRANSACTION").await?;
1305 self.read_transaction_begin_result().await
1306 }
1307 .await;
1308
1309 #[cfg(feature = "otel")]
1310 match &result {
1311 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1312 Err(e) => InstrumentationContext::record_error(&mut span, e),
1313 }
1314
1315 // Drop the span before moving instrumentation
1316 #[cfg(feature = "otel")]
1317 drop(span);
1318
1319 let transaction_descriptor = result?;
1320
1321 Ok(Client {
1322 config: self.config,
1323 _state: PhantomData,
1324 connection: self.connection,
1325 server_version: self.server_version,
1326 current_database: self.current_database,
1327 server_collation: self.server_collation,
1328 statement_cache: self.statement_cache,
1329 transaction_descriptor, // Store the descriptor from server
1330 needs_reset: self.needs_reset,
1331 in_flight: self.in_flight,
1332 #[cfg(feature = "otel")]
1333 instrumentation: self.instrumentation,
1334 #[cfg(feature = "always-encrypted")]
1335 encryption_context: self.encryption_context,
1336 })
1337 }
1338
1339 /// Begin a transaction with a specific isolation level.
1340 ///
1341 /// This transitions the client from `Ready` to `InTransaction` state
1342 /// with the specified isolation level.
1343 ///
1344 /// # Example
1345 ///
1346 /// ```rust,no_run
1347 /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1348 /// use mssql_client::IsolationLevel;
1349 ///
1350 /// let tx = client.begin_transaction_with_isolation(IsolationLevel::Serializable).await?;
1351 /// // All operations in this transaction use SERIALIZABLE isolation
1352 /// tx.commit().await?;
1353 /// # Ok(())
1354 /// # }
1355 /// ```
1356 pub async fn begin_transaction_with_isolation(
1357 mut self,
1358 isolation_level: crate::transaction::IsolationLevel,
1359 ) -> Result<Client<InTransaction>> {
1360 tracing::debug!(
1361 isolation_level = %isolation_level.name(),
1362 "beginning transaction with isolation level"
1363 );
1364
1365 #[cfg(feature = "otel")]
1366 let instrumentation = self.instrumentation.clone();
1367 #[cfg(feature = "otel")]
1368 let mut span = instrumentation.transaction_span("BEGIN");
1369
1370 // First set the isolation level
1371 let result = async {
1372 self.send_sql_batch(isolation_level.as_sql()).await?;
1373 self.read_execute_result().await?;
1374
1375 // Then begin the transaction
1376 self.send_sql_batch("BEGIN TRANSACTION").await?;
1377 self.read_transaction_begin_result().await
1378 }
1379 .await;
1380
1381 #[cfg(feature = "otel")]
1382 match &result {
1383 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1384 Err(e) => InstrumentationContext::record_error(&mut span, e),
1385 }
1386
1387 #[cfg(feature = "otel")]
1388 drop(span);
1389
1390 let transaction_descriptor = result?;
1391
1392 Ok(Client {
1393 config: self.config,
1394 _state: PhantomData,
1395 connection: self.connection,
1396 server_version: self.server_version,
1397 current_database: self.current_database,
1398 server_collation: self.server_collation,
1399 statement_cache: self.statement_cache,
1400 transaction_descriptor,
1401 needs_reset: self.needs_reset,
1402 in_flight: self.in_flight,
1403 #[cfg(feature = "otel")]
1404 instrumentation: self.instrumentation,
1405 #[cfg(feature = "always-encrypted")]
1406 encryption_context: self.encryption_context,
1407 })
1408 }
1409
1410 /// Execute a simple query without parameters.
1411 ///
1412 /// This is useful for DDL statements and simple queries where you
1413 /// don't need to retrieve the affected row count.
1414 pub async fn simple_query(&mut self, sql: &str) -> Result<()> {
1415 tracing::debug!(sql = sql, "executing simple query");
1416
1417 // Send SQL batch
1418 self.send_sql_batch(sql).await?;
1419
1420 // Read and discard response
1421 let _ = self.read_execute_result().await?;
1422
1423 Ok(())
1424 }
1425
1426 /// Close the connection gracefully.
1427 pub async fn close(self) -> Result<()> {
1428 tracing::debug!("closing connection");
1429 Ok(())
1430 }
1431
1432 /// Get the current database name.
1433 #[must_use]
1434 pub fn database(&self) -> Option<&str> {
1435 self.config.database.as_deref()
1436 }
1437
1438 /// Get the server host.
1439 #[must_use]
1440 pub fn host(&self) -> &str {
1441 &self.config.host
1442 }
1443
1444 /// Get the server port.
1445 #[must_use]
1446 pub fn port(&self) -> u16 {
1447 self.config.port
1448 }
1449
1450 /// Check if the connection is currently in a transaction.
1451 ///
1452 /// This returns `true` if a transaction was started via raw SQL
1453 /// (`BEGIN TRANSACTION`) and has not yet been committed or rolled back.
1454 ///
1455 /// Note: This only tracks transactions started via raw SQL. Transactions
1456 /// started via the type-state API (`begin_transaction()`) result in a
1457 /// `Client<InTransaction>` which is a different type.
1458 ///
1459 /// # Example
1460 ///
1461 /// ```rust,no_run
1462 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1463 /// client.execute("BEGIN TRANSACTION", &[]).await?;
1464 /// assert!(client.is_in_transaction());
1465 ///
1466 /// client.execute("COMMIT", &[]).await?;
1467 /// assert!(!client.is_in_transaction());
1468 /// # Ok(())
1469 /// # }
1470 /// ```
1471 #[must_use]
1472 pub fn is_in_transaction(&self) -> bool {
1473 self.transaction_descriptor != 0
1474 }
1475
1476 /// Check if a request is in-flight (sent but response not fully read).
1477 ///
1478 /// Used by the connection pool to detect dirty connections that were
1479 /// interrupted mid-query (e.g., by `tokio::select!` or a timeout).
1480 /// A connection with an in-flight request has unread data in the TCP
1481 /// buffer and must be discarded rather than returned to the pool.
1482 #[must_use]
1483 pub fn is_in_flight(&self) -> bool {
1484 self.in_flight
1485 }
1486
1487 /// Report whether an Always Encrypted key-store provider with the given
1488 /// name is currently reachable through this client's encryption context.
1489 ///
1490 /// Returns `false` when the `always-encrypted` feature isn't enabled, when
1491 /// the connection was opened without `column_encryption` configured, or
1492 /// when no matching provider was registered.
1493 #[cfg(feature = "always-encrypted")]
1494 #[must_use]
1495 pub fn has_encryption_provider(&self, name: &str) -> bool {
1496 self.encryption_context
1497 .as_ref()
1498 .is_some_and(|ctx| ctx.has_provider(name))
1499 }
1500
1501 /// Get a handle for cancelling the current query.
1502 ///
1503 /// The cancel handle can be cloned and sent to other tasks, enabling
1504 /// cancellation of long-running queries from a separate async context.
1505 ///
1506 /// # Example
1507 ///
1508 /// ```rust,no_run
1509 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1510 /// use std::time::Duration;
1511 ///
1512 /// let cancel_handle = client.cancel_handle();
1513 ///
1514 /// // Spawn a task to cancel after 10 seconds
1515 /// let handle = tokio::spawn(async move {
1516 /// tokio::time::sleep(Duration::from_secs(10)).await;
1517 /// let _ = cancel_handle.cancel().await;
1518 /// });
1519 ///
1520 /// // This query will be cancelled if it runs longer than 10 seconds
1521 /// let result = client.query("SELECT * FROM very_large_table", &[]).await;
1522 /// # let _ = (handle, result);
1523 /// # Ok(())
1524 /// # }
1525 /// ```
1526 #[must_use]
1527 pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
1528 self.connection_cancel_handle()
1529 }
1530}
1531
1532/// # Drop Behavior
1533///
1534/// **`Client<InTransaction>` has no automatic rollback on drop.** If the client is
1535/// dropped without calling [`commit()`](Client::commit) or [`rollback()`](Client::rollback),
1536/// the transaction remains open on the server until the TCP connection closes
1537/// (at which point SQL Server automatically rolls back).
1538///
1539/// This is because `Drop` is synchronous and cannot perform the async I/O needed
1540/// to send a `ROLLBACK TRANSACTION` command.
1541///
1542/// ## Consequences of dropping without commit/rollback
1543///
1544/// - **Direct connections:** The transaction leaks until the OS TCP timeout
1545/// (potentially 30+ minutes), holding locks on any modified rows.
1546/// - **Pooled connections:** The pool detects the active transaction descriptor
1547/// and discards the connection rather than returning it to the idle pool
1548/// (see `PooledConnection::drop` in `mssql-driver-pool`).
1549///
1550/// ## Best practice
1551///
1552/// Always ensure `commit()` or `rollback()` is called. Use helper patterns
1553/// for error paths:
1554///
1555/// ```rust,no_run
1556/// # async fn do_work(_: &mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> { Ok(()) }
1557/// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1558/// let tx = client.begin_transaction().await?;
1559/// match do_work(&tx).await {
1560/// Ok(_) => { tx.commit().await?; }
1561/// Err(e) => { tx.rollback().await?; return Err(e); }
1562/// }
1563/// # Ok(())
1564/// # }
1565/// ```
1566impl Client<InTransaction> {
1567 /// Execute a query within the transaction and return a streaming result set.
1568 ///
1569 /// See [`Client<Ready>::query`] for usage examples.
1570 pub async fn query<'a>(
1571 &'a mut self,
1572 sql: &str,
1573 params: &[&(dyn crate::ToSql + Sync)],
1574 ) -> Result<QueryStream<'a>> {
1575 let deadline = self.command_deadline();
1576 self.query_inner(sql, params, deadline).await
1577 }
1578
1579 /// Shared query implementation with an explicit command deadline.
1580 async fn query_inner<'a>(
1581 &'a mut self,
1582 sql: &str,
1583 params: &[&(dyn crate::ToSql + Sync)],
1584 deadline: Option<std::time::Duration>,
1585 ) -> Result<QueryStream<'a>> {
1586 tracing::debug!(
1587 sql = sql,
1588 params_count = params.len(),
1589 "executing query in transaction"
1590 );
1591
1592 #[cfg(feature = "otel")]
1593 let instrumentation = self.instrumentation.clone();
1594 #[cfg(feature = "otel")]
1595 let mut span = instrumentation.query_span(sql);
1596 #[cfg(feature = "otel")]
1597 let timer = crate::instrumentation::OperationTimer::start(
1598 crate::instrumentation::extract_operation(sql),
1599 );
1600
1601 let canceller = self.cancel_handle();
1602 let result = run_with_deadline(
1603 async {
1604 if params.is_empty() {
1605 // Simple query without parameters - use SQL batch
1606 self.send_sql_batch(sql).await?;
1607 } else {
1608 // Parameterized query - sp_executesql (encrypts Always Encrypted params).
1609 let rpc = self.build_parameterized_rpc(sql, params).await?;
1610 self.send_rpc(&rpc).await?;
1611 }
1612
1613 // Read complete response including columns and rows
1614 self.read_query_response().await
1615 },
1616 deadline,
1617 canceller,
1618 )
1619 .await;
1620
1621 #[cfg(feature = "otel")]
1622 match &result {
1623 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1624 Err(e) => InstrumentationContext::record_error(&mut span, e),
1625 }
1626 #[cfg(feature = "otel")]
1627 timer.finish(instrumentation.metrics(), result.is_ok());
1628
1629 // Drop the span before returning
1630 #[cfg(feature = "otel")]
1631 drop(span);
1632
1633 let resp = result?;
1634 #[cfg(feature = "always-encrypted")]
1635 {
1636 Ok(QueryStream::from_raw(
1637 resp.columns,
1638 resp.pending_rows,
1639 resp.meta,
1640 resp.decryptor,
1641 ))
1642 }
1643 #[cfg(not(feature = "always-encrypted"))]
1644 {
1645 Ok(QueryStream::from_raw(
1646 resp.columns,
1647 resp.pending_rows,
1648 resp.meta,
1649 ))
1650 }
1651 }
1652
1653 /// Execute a statement within the transaction.
1654 ///
1655 /// Returns the number of affected rows.
1656 pub async fn execute(
1657 &mut self,
1658 sql: &str,
1659 params: &[&(dyn crate::ToSql + Sync)],
1660 ) -> Result<u64> {
1661 let deadline = self.command_deadline();
1662 self.execute_inner(sql, params, deadline).await
1663 }
1664
1665 /// Shared execute implementation with an explicit command deadline.
1666 async fn execute_inner(
1667 &mut self,
1668 sql: &str,
1669 params: &[&(dyn crate::ToSql + Sync)],
1670 deadline: Option<std::time::Duration>,
1671 ) -> Result<u64> {
1672 tracing::debug!(
1673 sql = sql,
1674 params_count = params.len(),
1675 "executing statement in transaction"
1676 );
1677
1678 #[cfg(feature = "otel")]
1679 let instrumentation = self.instrumentation.clone();
1680 #[cfg(feature = "otel")]
1681 let mut span = instrumentation.query_span(sql);
1682 #[cfg(feature = "otel")]
1683 let timer = crate::instrumentation::OperationTimer::start(
1684 crate::instrumentation::extract_operation(sql),
1685 );
1686
1687 let canceller = self.cancel_handle();
1688 let result = run_with_deadline(
1689 async {
1690 if params.is_empty() {
1691 // Simple statement without parameters - use SQL batch
1692 self.send_sql_batch(sql).await?;
1693 } else {
1694 // Parameterized statement - sp_executesql (encrypts Always Encrypted params).
1695 let rpc = self.build_parameterized_rpc(sql, params).await?;
1696 self.send_rpc(&rpc).await?;
1697 }
1698
1699 // Read response and get row count
1700 self.read_execute_result().await
1701 },
1702 deadline,
1703 canceller,
1704 )
1705 .await;
1706
1707 #[cfg(feature = "otel")]
1708 match &result {
1709 Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
1710 Err(e) => InstrumentationContext::record_error(&mut span, e),
1711 }
1712 #[cfg(feature = "otel")]
1713 timer.finish(instrumentation.metrics(), result.is_ok());
1714
1715 // Drop the span before returning
1716 #[cfg(feature = "otel")]
1717 drop(span);
1718
1719 result
1720 }
1721
1722 /// Execute a query within the transaction with a specific timeout.
1723 ///
1724 /// See [`Client<Ready>::query_with_timeout`] for details.
1725 pub async fn query_with_timeout<'a>(
1726 &'a mut self,
1727 sql: &str,
1728 params: &[&(dyn crate::ToSql + Sync)],
1729 timeout_duration: std::time::Duration,
1730 ) -> Result<QueryStream<'a>> {
1731 self.query_inner(sql, params, Some(timeout_duration)).await
1732 }
1733
1734 /// Execute a statement within the transaction with a specific timeout.
1735 ///
1736 /// See [`Client<Ready>::execute_with_timeout`] for details.
1737 pub async fn execute_with_timeout(
1738 &mut self,
1739 sql: &str,
1740 params: &[&(dyn crate::ToSql + Sync)],
1741 timeout_duration: std::time::Duration,
1742 ) -> Result<u64> {
1743 self.execute_inner(sql, params, Some(timeout_duration))
1744 .await
1745 }
1746
1747 /// Open a FILESTREAM BLOB for async reading and/or writing.
1748 ///
1749 /// This method queries the server for the transaction context, then opens
1750 /// the FILESTREAM handle using the native Win32 `OpenSqlFilestream` API.
1751 ///
1752 /// # Arguments
1753 ///
1754 /// * `path` — The UNC path obtained from the T-SQL `column.PathName()` function.
1755 /// Query this yourself before calling `open_filestream`:
1756 /// ```sql
1757 /// SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1
1758 /// ```
1759 /// * `access` — Read, write, or read/write access mode.
1760 ///
1761 /// # Requirements
1762 ///
1763 /// - SQL Server must have FILESTREAM enabled (`sp_configure 'filestream access level', 2`)
1764 /// - The Microsoft OLE DB Driver for SQL Server must be installed on the client
1765 /// - The `FileStream` must be dropped before calling [`commit`] or [`rollback`]
1766 ///
1767 /// # Example
1768 ///
1769 /// ```text
1770 /// use mssql_client::FileStreamAccess;
1771 /// use tokio::io::AsyncReadExt;
1772 ///
1773 /// let mut tx = client.begin_transaction().await?;
1774 ///
1775 /// // Get the FILESTREAM path
1776 /// let rows = tx.query(
1777 /// "SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1",
1778 /// &[&doc_id],
1779 /// ).await?;
1780 /// let path: String = rows.into_iter().next().unwrap()?.get(0)?;
1781 ///
1782 /// // Open and read the BLOB
1783 /// let mut stream = tx.open_filestream(&path, FileStreamAccess::Read).await?;
1784 /// let mut data = Vec::new();
1785 /// stream.read_to_end(&mut data).await?;
1786 /// drop(stream);
1787 ///
1788 /// tx.commit().await?;
1789 /// ```
1790 #[cfg(all(windows, feature = "filestream"))]
1791 pub async fn open_filestream(
1792 &mut self,
1793 path: &str,
1794 access: crate::filestream::FileStreamAccess,
1795 ) -> Result<crate::filestream::FileStream> {
1796 tracing::debug!(path = path, ?access, "opening FILESTREAM BLOB");
1797
1798 // Get the transaction context from SQL Server.
1799 // This binds the file access to the current SQL transaction.
1800 let txn_context: Vec<u8> = {
1801 let rows = self
1802 .query("SELECT GET_FILESTREAM_TRANSACTION_CONTEXT()", &[])
1803 .await?;
1804 let mut ctx = None;
1805 for result in rows {
1806 let row = result?;
1807 ctx = Some(row.get::<Vec<u8>>(0)?);
1808 }
1809 ctx.ok_or_else(|| {
1810 Error::FileStream("GET_FILESTREAM_TRANSACTION_CONTEXT() returned no rows".into())
1811 })?
1812 };
1813
1814 crate::filestream::FileStream::open(path, access, &txn_context)
1815 }
1816
1817 /// Commit the transaction.
1818 ///
1819 /// This transitions the client back to `Ready` state.
1820 pub async fn commit(mut self) -> Result<Client<Ready>> {
1821 tracing::debug!("committing transaction");
1822
1823 #[cfg(feature = "otel")]
1824 let instrumentation = self.instrumentation.clone();
1825 #[cfg(feature = "otel")]
1826 let mut span = instrumentation.transaction_span("COMMIT");
1827
1828 // Execute COMMIT TRANSACTION
1829 let result = async {
1830 self.send_sql_batch("COMMIT TRANSACTION").await?;
1831 self.read_execute_result().await
1832 }
1833 .await;
1834
1835 #[cfg(feature = "otel")]
1836 match &result {
1837 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1838 Err(e) => InstrumentationContext::record_error(&mut span, e),
1839 }
1840
1841 // Drop the span before moving instrumentation
1842 #[cfg(feature = "otel")]
1843 drop(span);
1844
1845 result?;
1846
1847 Ok(Client {
1848 config: self.config,
1849 _state: PhantomData,
1850 connection: self.connection,
1851 server_version: self.server_version,
1852 current_database: self.current_database,
1853 server_collation: self.server_collation,
1854 statement_cache: self.statement_cache,
1855 transaction_descriptor: 0, // Reset to auto-commit mode
1856 needs_reset: self.needs_reset,
1857 in_flight: self.in_flight,
1858 #[cfg(feature = "otel")]
1859 instrumentation: self.instrumentation,
1860 #[cfg(feature = "always-encrypted")]
1861 encryption_context: self.encryption_context,
1862 })
1863 }
1864
1865 /// Rollback the transaction.
1866 ///
1867 /// This transitions the client back to `Ready` state.
1868 pub async fn rollback(mut self) -> Result<Client<Ready>> {
1869 tracing::debug!("rolling back transaction");
1870
1871 #[cfg(feature = "otel")]
1872 let instrumentation = self.instrumentation.clone();
1873 #[cfg(feature = "otel")]
1874 let mut span = instrumentation.transaction_span("ROLLBACK");
1875
1876 // Execute ROLLBACK TRANSACTION
1877 let result = async {
1878 self.send_sql_batch("ROLLBACK TRANSACTION").await?;
1879 self.read_execute_result().await
1880 }
1881 .await;
1882
1883 #[cfg(feature = "otel")]
1884 match &result {
1885 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1886 Err(e) => InstrumentationContext::record_error(&mut span, e),
1887 }
1888
1889 // Drop the span before moving instrumentation
1890 #[cfg(feature = "otel")]
1891 drop(span);
1892
1893 result?;
1894
1895 Ok(Client {
1896 config: self.config,
1897 _state: PhantomData,
1898 connection: self.connection,
1899 server_version: self.server_version,
1900 current_database: self.current_database,
1901 server_collation: self.server_collation,
1902 statement_cache: self.statement_cache,
1903 transaction_descriptor: 0, // Reset to auto-commit mode
1904 needs_reset: self.needs_reset,
1905 in_flight: self.in_flight,
1906 #[cfg(feature = "otel")]
1907 instrumentation: self.instrumentation,
1908 #[cfg(feature = "always-encrypted")]
1909 encryption_context: self.encryption_context,
1910 })
1911 }
1912
1913 /// Create a savepoint and return a handle for later rollback.
1914 ///
1915 /// The returned `SavePoint` handle contains the validated savepoint name.
1916 /// Use it with `rollback_to()` to partially undo transaction work.
1917 ///
1918 /// # Example
1919 ///
1920 /// ```rust,no_run
1921 /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1922 /// let mut tx = client.begin_transaction().await?;
1923 /// tx.execute("INSERT INTO orders ...", &[]).await?;
1924 /// let sp = tx.save_point("before_items").await?;
1925 /// tx.execute("INSERT INTO items ...", &[]).await?;
1926 /// // Oops, rollback just the items
1927 /// tx.rollback_to(&sp).await?;
1928 /// tx.commit().await?;
1929 /// # Ok(())
1930 /// # }
1931 /// ```
1932 pub async fn save_point(&mut self, name: &str) -> Result<SavePoint> {
1933 crate::validation::validate_identifier(name)?;
1934 tracing::debug!(name = name, "creating savepoint");
1935
1936 // Execute SAVE TRANSACTION <name>
1937 // Note: name is validated by validate_identifier() to prevent SQL injection
1938 let sql = format!("SAVE TRANSACTION {name}");
1939 self.send_sql_batch(&sql).await?;
1940 self.read_execute_result().await?;
1941
1942 Ok(SavePoint::new(name.to_string()))
1943 }
1944
1945 /// Rollback to a savepoint.
1946 ///
1947 /// This rolls back all changes made after the savepoint was created,
1948 /// but keeps the transaction active. The savepoint remains valid and
1949 /// can be rolled back to again.
1950 ///
1951 /// # Example
1952 ///
1953 /// ```rust,no_run
1954 /// # async fn ex(mut tx: mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> {
1955 /// let sp = tx.save_point("checkpoint").await?;
1956 /// // ... do some work ...
1957 /// tx.rollback_to(&sp).await?; // Undo changes since checkpoint
1958 /// // Transaction is still active, savepoint is still valid
1959 /// # Ok(())
1960 /// # }
1961 /// ```
1962 pub async fn rollback_to(&mut self, savepoint: &SavePoint) -> Result<()> {
1963 tracing::debug!(name = savepoint.name(), "rolling back to savepoint");
1964
1965 // Execute ROLLBACK TRANSACTION <name>
1966 // Note: savepoint name was validated during creation
1967 let sql = format!("ROLLBACK TRANSACTION {}", savepoint.name());
1968 self.send_sql_batch(&sql).await?;
1969 self.read_execute_result().await?;
1970
1971 Ok(())
1972 }
1973
1974 /// Release a savepoint (optional cleanup).
1975 ///
1976 /// Note: SQL Server doesn't have explicit savepoint release, but this
1977 /// method is provided for API completeness. The savepoint is automatically
1978 /// released when the transaction commits or rolls back.
1979 pub async fn release_savepoint(&mut self, savepoint: SavePoint) -> Result<()> {
1980 tracing::debug!(name = savepoint.name(), "releasing savepoint");
1981
1982 // SQL Server doesn't require explicit savepoint release
1983 // The savepoint is implicitly released on commit/rollback
1984 // This method exists for API completeness
1985 drop(savepoint);
1986 Ok(())
1987 }
1988
1989 /// Get a handle for cancelling the current query within the transaction.
1990 ///
1991 /// See [`Client<Ready>::cancel_handle`] for usage examples.
1992 #[must_use]
1993 pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
1994 self.connection_cancel_handle()
1995 }
1996}
1997
1998impl<S: ConnectionState> std::fmt::Debug for Client<S> {
1999 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2000 f.debug_struct("Client")
2001 .field("host", &self.config.host)
2002 .field("port", &self.config.port)
2003 .field("database", &self.config.database)
2004 .finish()
2005 }
2006}