mssql_client/client.rs
1//! SQL Server client implementation.
2//!
3//! ## DDL and statement routing
4//!
5//! [`Client::execute`] routes automatically by parameter count: with no
6//! parameters it sends a SQL batch (which permits DDL such as `CREATE` / `ALTER`
7//! / `DROP`); with parameters it uses `sp_executesql`, whose procedure context
8//! SQL Server forbids DDL in. Run DDL with an empty parameter slice:
9//!
10//! ```rust,no_run
11//! # async fn create_table(config: mssql_client::Config) -> Result<(), mssql_client::Error> {
12//! # let mut client = mssql_client::Client::connect(config).await?;
13//! client.execute("CREATE TABLE dbo.t (id INT)", &[]).await?;
14//! # Ok(())
15//! # }
16//! ```
17//!
18//! Use [`Client::simple_query`] for fire-and-forget batches (including
19//! multi-statement, `;`-separated DDL) when you don't need the affected-row count.
20
21// Allow unwrap/expect for chrono date construction with known-valid constant dates
22// and for regex patterns that are compile-time constants
23#![allow(clippy::unwrap_used, clippy::expect_used, clippy::needless_range_loop)]
24
25mod connect;
26mod params;
27mod response;
28
29use std::marker::PhantomData;
30
31use mssql_codec::connection::Connection;
32#[cfg(feature = "tls")]
33use mssql_tls::TlsStream;
34use tds_protocol::packet::PacketType;
35use tds_protocol::rpc::RpcRequest;
36use tds_protocol::token::{EnvChange, EnvChangeType};
37use tokio::net::TcpStream;
38
39use crate::config::Config;
40use crate::error::{Error, Result};
41#[cfg(feature = "otel")]
42use crate::instrumentation::InstrumentationContext;
43use crate::state::{ConnectionState, InTransaction, Ready};
44use crate::statement_cache::StatementCache;
45use crate::stream::{MultiResultStream, QueryStream};
46use crate::transaction::SavePoint;
47
48/// How long to wait for the server to acknowledge an Attention packet after
49/// a command timeout. SqlClient waits 5 seconds before dooming the
50/// connection; we match it.
51const ATTENTION_ACK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
52
53/// Run a network future under an optional command deadline.
54///
55/// On timeout this sends an Attention packet via `canceller` and then awaits
56/// the future so its own read loop drains the server's DONE_ATTN
57/// acknowledgement, leaving the connection clean before returning
58/// [`Error::CommandTimeout`]. This is the cancel-safe alternative to dropping
59/// the future (e.g. via `tokio::time::timeout`), which would leave unconsumed
60/// TDS data in the connection buffer and desync the next request.
61///
62/// The drain itself is bounded by [`ATTENTION_ACK_TIMEOUT`] — a hung server
63/// that never acknowledges the attention must not turn the timeout into an
64/// infinite wait. When the bound expires the connection is abandoned
65/// mid-response: `in_flight` stays set, so the pool discards the connection
66/// at check-in instead of reusing it.
67pub(crate) async fn run_with_deadline<F, T>(
68 fut: F,
69 deadline: Option<std::time::Duration>,
70 canceller: crate::cancel::CancelHandle,
71) -> Result<T>
72where
73 F: std::future::Future<Output = Result<T>>,
74{
75 let Some(d) = deadline else {
76 return fut.await;
77 };
78 tokio::pin!(fut);
79 tokio::select! {
80 biased;
81 res = &mut fut => res,
82 () = tokio::time::sleep(d) => {
83 // Signal cancellation, then let the in-flight read consume the
84 // server's attention acknowledgement so the connection stays usable.
85 let drain = async {
86 let _ = canceller.cancel().await;
87 let _ = (&mut fut).await;
88 };
89 if tokio::time::timeout(ATTENTION_ACK_TIMEOUT, drain).await.is_err() {
90 tracing::warn!(
91 timeout = ?ATTENTION_ACK_TIMEOUT,
92 "server did not acknowledge attention; abandoning the connection as dirty"
93 );
94 }
95 Err(Error::CommandTimeout)
96 }
97 }
98}
99
100/// SQL Server client with type-state connection management.
101///
102/// The generic parameter `S` represents the current connection state,
103/// ensuring at compile time that certain operations are only available
104/// in appropriate states.
105pub struct Client<S: ConnectionState> {
106 config: Config,
107 _state: PhantomData<S>,
108 /// The underlying connection (present only when connected)
109 connection: Option<ConnectionHandle>,
110 /// Server version from LoginAck (raw u32 TDS version)
111 server_version: Option<u32>,
112 /// Current database from EnvChange
113 current_database: Option<String>,
114 /// Server's default collation from SqlCollation EnvChange during login.
115 /// Used when `SendStringParametersAsUnicode=false` to encode VARCHAR
116 /// parameters with the correct character encoding and collation bytes.
117 server_collation: Option<tds_protocol::token::Collation>,
118 /// Prepared statement cache for query optimization
119 statement_cache: StatementCache,
120 /// Transaction descriptor from BeginTransaction EnvChange.
121 /// Per MS-TDS spec, this value must be included in ALL_HEADERS for subsequent
122 /// requests within an explicit transaction. 0 indicates auto-commit mode.
123 transaction_descriptor: u64,
124 /// Whether a request has been sent and the response has not yet been fully read.
125 /// Used by the connection pool to detect dirty connections after cancel/timeout.
126 in_flight: bool,
127 /// Whether this connection needs a reset on next use.
128 /// Set by connection pool on checkin, cleared after first query/execute.
129 /// When true, the RESETCONNECTION flag is set on the first TDS packet.
130 needs_reset: bool,
131 /// OpenTelemetry instrumentation context (when otel feature is enabled)
132 #[cfg(feature = "otel")]
133 instrumentation: InstrumentationContext,
134 /// Always Encrypted context for column decryption (when always-encrypted feature is enabled)
135 #[cfg(feature = "always-encrypted")]
136 pub(crate) encryption_context: Option<std::sync::Arc<crate::encryption::EncryptionContext>>,
137}
138
139/// Internal connection handle wrapping the actual connection.
140///
141/// This is an enum to support different connection types:
142/// - TLS (TDS 8.0 strict mode) - requires `tls` feature
143/// - TLS with PreLogin wrapping (TDS 7.x style) - requires `tls` feature
144/// - Plain TCP (for internal networks or when `tls` feature is disabled)
145#[allow(dead_code)] // Connection will be used once query execution is implemented
146enum ConnectionHandle {
147 /// TLS connection (TDS 8.0 strict mode - TLS before any TDS traffic)
148 #[cfg(feature = "tls")]
149 Tls(Connection<TlsStream<TcpStream>>),
150 /// TLS connection with PreLogin wrapping (TDS 7.x style)
151 #[cfg(feature = "tls")]
152 TlsPrelogin(Connection<TlsStream<mssql_tls::TlsPreloginWrapper<TcpStream>>>),
153 /// Plain TCP connection (for internal networks or when `tls` feature is disabled)
154 Plain(Connection<TcpStream>),
155}
156
157/// The parameter `TypeInfo` to declare a typed NULL ([`crate::null`]) with, from
158/// its [`crate::ToSql::sql_type`] name. Returns `None` for an untyped NULL
159/// (`Option::None`, type `"NULL"`), which falls back to the default param type.
160#[cfg(feature = "always-encrypted")]
161fn null_param_type_info(sql_type: &str) -> Option<tds_protocol::rpc::TypeInfo> {
162 use tds_protocol::rpc::TypeInfo;
163 Some(match sql_type {
164 "BIT" => TypeInfo::bit(),
165 "TINYINT" => TypeInfo::tinyint(),
166 "SMALLINT" => TypeInfo::smallint(),
167 "INT" => TypeInfo::int(),
168 "BIGINT" => TypeInfo::bigint(),
169 "REAL" => TypeInfo::real(),
170 "FLOAT" => TypeInfo::float(),
171 "NVARCHAR" => TypeInfo::nvarchar(1),
172 "VARBINARY" => TypeInfo::varbinary(1),
173 "UNIQUEIDENTIFIER" => TypeInfo::uuid(),
174 "DATE" => TypeInfo::date(),
175 _ => return None,
176 })
177}
178
179/// Map a typed-parameter wrapper's [`EncryptedParamType`] to the `TypeInfo` the
180/// driver declares it as (for `sp_describe_parameter_encryption` and the
181/// `CryptoMetadata` base type). Unknown future variants error rather than
182/// silently declaring the wrong type.
183#[cfg(feature = "always-encrypted")]
184fn encrypted_param_type_info(
185 ty: mssql_types::EncryptedParamType,
186) -> Result<tds_protocol::rpc::TypeInfo> {
187 use mssql_types::EncryptedParamType as E;
188 use tds_protocol::rpc::TypeInfo;
189 Ok(match ty {
190 E::Decimal { precision, scale } => TypeInfo::decimal(precision, scale),
191 E::Time { scale } => TypeInfo::time(scale),
192 E::DateTime2 { scale } => TypeInfo::datetime2(scale),
193 E::DateTimeOffset { scale } => TypeInfo::datetimeoffset(scale),
194 E::DateTime => TypeInfo::datetime(),
195 E::Char { length } => TypeInfo::char(length),
196 E::NChar { length } => TypeInfo::nchar(length),
197 E::Binary { length } => TypeInfo::binary(length),
198 _ => {
199 return Err(Error::Encryption(
200 "unsupported Always Encrypted parameter type".to_string(),
201 ));
202 }
203 })
204}
205
206// Private helper methods available to all connection states
207impl<S: ConnectionState> Client<S> {
208 /// The default per-command deadline from `command_timeout`.
209 ///
210 /// Returns `None` when `command_timeout` is zero, which means "no limit"
211 /// (matching ADO.NET's `SqlCommand.CommandTimeout = 0`).
212 pub(crate) fn command_deadline(&self) -> Option<std::time::Duration> {
213 let t = self.config.command_timeout;
214 if t.is_zero() { None } else { Some(t) }
215 }
216
217 /// Build a cancel handle for the current connection, regardless of
218 /// connection state. The public, documented surface is
219 /// [`Client::<Ready>::cancel_handle`]; both state-specific methods
220 /// delegate here.
221 pub(crate) fn connection_cancel_handle(&self) -> crate::cancel::CancelHandle {
222 let connection = self
223 .connection
224 .as_ref()
225 .expect("connection should be present");
226 match connection {
227 #[cfg(feature = "tls")]
228 ConnectionHandle::Tls(conn) => {
229 crate::cancel::CancelHandle::from_tls(conn.cancel_handle())
230 }
231 #[cfg(feature = "tls")]
232 ConnectionHandle::TlsPrelogin(conn) => {
233 crate::cancel::CancelHandle::from_tls_prelogin(conn.cancel_handle())
234 }
235 ConnectionHandle::Plain(conn) => {
236 crate::cancel::CancelHandle::from_plain(conn.cancel_handle())
237 }
238 }
239 }
240
241 /// Process transaction-related EnvChange tokens.
242 ///
243 /// This handles BeginTransaction, CommitTransaction, and RollbackTransaction
244 /// EnvChange tokens, updating the transaction descriptor accordingly.
245 ///
246 /// This enables executing BEGIN TRANSACTION, COMMIT, and ROLLBACK via raw SQL
247 /// while still having the transaction descriptor tracked correctly.
248 fn process_transaction_env_change(env: &EnvChange, transaction_descriptor: &mut u64) {
249 use tds_protocol::token::EnvChangeValue;
250
251 match env.env_type {
252 EnvChangeType::BeginTransaction => {
253 if let EnvChangeValue::Binary(ref data) = env.new_value {
254 if data.len() >= 8 {
255 let descriptor = u64::from_le_bytes([
256 data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
257 ]);
258 tracing::debug!(descriptor = descriptor, "transaction started via raw SQL");
259 *transaction_descriptor = descriptor;
260 }
261 }
262 }
263 EnvChangeType::CommitTransaction | EnvChangeType::RollbackTransaction => {
264 tracing::debug!(
265 env_type = ?env.env_type,
266 "transaction ended via raw SQL"
267 );
268 *transaction_descriptor = 0;
269 }
270 _ => {}
271 }
272 }
273
274 /// Send a SQL batch to the server.
275 ///
276 /// Uses the client's current transaction descriptor in ALL_HEADERS.
277 /// Per MS-TDS spec, when in an explicit transaction, the descriptor
278 /// returned by BeginTransaction must be included.
279 ///
280 /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
281 /// is included in the first packet to reset connection state.
282 async fn send_sql_batch(&mut self, sql: &str) -> Result<()> {
283 let payload =
284 tds_protocol::encode_sql_batch_with_transaction(sql, self.transaction_descriptor);
285 let max_packet = self.config.packet_size as usize;
286
287 // Check if we need to reset the connection on this request
288 let reset = self.needs_reset;
289 if reset {
290 self.needs_reset = false; // Clear flag before sending
291 tracing::debug!("sending SQL batch with RESETCONNECTION flag");
292 }
293
294 self.in_flight = true;
295 let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
296
297 match connection {
298 #[cfg(feature = "tls")]
299 ConnectionHandle::Tls(conn) => {
300 conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
301 .await?;
302 }
303 #[cfg(feature = "tls")]
304 ConnectionHandle::TlsPrelogin(conn) => {
305 conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
306 .await?;
307 }
308 ConnectionHandle::Plain(conn) => {
309 conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
310 .await?;
311 }
312 }
313
314 Ok(())
315 }
316
317 /// Send an RPC request to the server.
318 ///
319 /// Uses the client's current transaction descriptor in ALL_HEADERS.
320 ///
321 /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
322 /// is included in the first packet to reset connection state.
323 pub(crate) async fn send_rpc(&mut self, rpc: &RpcRequest) -> Result<()> {
324 let payload = rpc.encode_with_transaction(self.transaction_descriptor);
325 let max_packet = self.config.packet_size as usize;
326
327 // Check if we need to reset the connection on this request
328 let reset = self.needs_reset;
329 if reset {
330 self.needs_reset = false; // Clear flag before sending
331 tracing::debug!("sending RPC with RESETCONNECTION flag");
332 }
333
334 self.in_flight = true;
335 let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
336
337 match connection {
338 #[cfg(feature = "tls")]
339 ConnectionHandle::Tls(conn) => {
340 conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
341 .await?;
342 }
343 #[cfg(feature = "tls")]
344 ConnectionHandle::TlsPrelogin(conn) => {
345 conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
346 .await?;
347 }
348 ConnectionHandle::Plain(conn) => {
349 conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
350 .await?;
351 }
352 }
353
354 Ok(())
355 }
356
357 /// Start building a stored procedure call with full control over parameters.
358 ///
359 /// Returns a [`crate::procedure::ProcedureBuilder`] that allows adding named input and output
360 /// parameters before executing the call.
361 ///
362 /// The procedure name is validated to prevent SQL injection. It may be
363 /// schema-qualified (e.g., `"dbo.MyProc"`).
364 ///
365 /// # Example
366 ///
367 /// ```rust,no_run
368 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
369 /// let result = client.procedure("dbo.CalculateSum")?
370 /// .input("@a", &10i32)
371 /// .input("@b", &20i32)
372 /// .output_int("@result")
373 /// .execute().await?;
374 ///
375 /// let sum = result.get_output("@result").unwrap();
376 /// # let _ = sum;
377 /// # Ok(())
378 /// # }
379 /// ```
380 pub fn procedure(
381 &mut self,
382 proc_name: &str,
383 ) -> Result<crate::procedure::ProcedureBuilder<'_, S>> {
384 crate::validation::validate_qualified_identifier(proc_name)?;
385 Ok(crate::procedure::ProcedureBuilder::new(self, proc_name))
386 }
387
388 /// Execute a stored procedure with positional input parameters.
389 ///
390 /// This is a convenience method for the common case of calling a procedure
391 /// with input-only parameters. For output parameters or named parameters,
392 /// use [`procedure()`](Client::procedure) instead.
393 ///
394 /// # Example
395 ///
396 /// ```rust,no_run
397 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
398 /// let result = client.call_procedure("dbo.GetUser", &[&1i32]).await?;
399 /// assert_eq!(result.return_value, 0);
400 ///
401 /// if let Some(rs) = result.first_result_set() {
402 /// println!("columns: {:?}", rs.columns());
403 /// }
404 /// # Ok(())
405 /// # }
406 /// ```
407 pub async fn call_procedure(
408 &mut self,
409 proc_name: &str,
410 params: &[&(dyn crate::ToSql + Sync)],
411 ) -> Result<crate::stream::ProcedureResult> {
412 crate::validation::validate_qualified_identifier(proc_name)?;
413
414 tracing::debug!(
415 proc_name = proc_name,
416 params_count = params.len(),
417 "executing stored procedure"
418 );
419
420 let rpc_params =
421 Self::convert_params_positional(params, self.send_unicode(), self.server_collation())?;
422 let mut rpc = RpcRequest::named(proc_name);
423 for param in rpc_params {
424 rpc = rpc.param(param);
425 }
426
427 let deadline = self.command_deadline();
428 let canceller = self.connection_cancel_handle();
429 run_with_deadline(
430 async {
431 self.send_rpc(&rpc).await?;
432 self.read_procedure_result().await
433 },
434 deadline,
435 canceller,
436 )
437 .await
438 }
439
440 /// Ask the server how each parameter of a statement must be encrypted.
441 ///
442 /// Issues the `sp_describe_parameter_encryption` system RPC for the
443 /// parameterized statement `tsql` with the parameter declaration `params`
444 /// (e.g. `"@id int, @name nvarchar(64)"`), and parses the two result sets
445 /// into a [`ParameterEncryptionInfo`](crate::ParameterEncryptionInfo): the
446 /// CEK table, plus — for each parameter the server reports as encrypted —
447 /// which CEK and whether deterministic or randomized. Parameters the server
448 /// reports as plaintext are omitted.
449 ///
450 /// This is the first step of Always Encrypted parameter encryption; the
451 /// connection must have negotiated it (`Column Encryption Setting=Enabled`).
452 #[cfg(feature = "always-encrypted")]
453 pub async fn describe_parameter_encryption(
454 &mut self,
455 tsql: &str,
456 params: &str,
457 ) -> Result<crate::ParameterEncryptionInfo> {
458 let tsql_arg = tsql.to_string();
459 let params_arg = params.to_string();
460 let mut result = self
461 .call_procedure(
462 "sp_describe_parameter_encryption",
463 &[&tsql_arg, ¶ms_arg],
464 )
465 .await?;
466 crate::ParameterEncryptionInfo::from_describe_result_sets(&mut result.result_sets)
467 }
468
469 /// Build the `sp_executesql` request for a parameterized statement.
470 ///
471 /// When the connection has Always Encrypted enabled, parameters the server
472 /// reports as encrypted are encrypted client-side first (an extra
473 /// `sp_describe_parameter_encryption` round-trip). Otherwise this is the
474 /// plain parameter conversion.
475 pub(crate) async fn build_parameterized_rpc(
476 &mut self,
477 sql: &str,
478 params: &[&(dyn crate::ToSql + Sync)],
479 ) -> Result<RpcRequest> {
480 #[cfg(feature = "always-encrypted")]
481 if self.encryption_context.is_some() {
482 return self.build_encrypted_sql_rpc(sql, params).await;
483 }
484 let rpc_params =
485 Self::convert_params(params, self.send_unicode(), self.server_collation())?;
486 Ok(RpcRequest::execute_sql(sql, rpc_params))
487 }
488
489 /// Encrypt the Always Encrypted parameters of a statement, then build its
490 /// `sp_executesql` request.
491 ///
492 /// Asks the server which parameters are encrypted
493 /// ([`describe_parameter_encryption`](Self::describe_parameter_encryption)),
494 /// then for each one normalizes the value, resolves its column encryption
495 /// key, encrypts, and emits an encrypted RPC parameter. Parameters the
496 /// server reports as plaintext are sent unchanged.
497 #[cfg(feature = "always-encrypted")]
498 async fn build_encrypted_sql_rpc(
499 &mut self,
500 sql: &str,
501 params: &[&(dyn crate::ToSql + Sync)],
502 ) -> Result<RpcRequest> {
503 use tds_protocol::rpc::RpcParam;
504
505 let Some(ctx) = self.encryption_context.clone() else {
506 let rpc_params =
507 Self::convert_params(params, self.send_unicode(), self.server_collation())?;
508 return Ok(RpcRequest::execute_sql(sql, rpc_params));
509 };
510
511 // Resolve each parameter's value once (AE normalization needs the typed
512 // value, not the wire encoding) and build the plaintext RPC params.
513 let send_unicode = self.send_unicode();
514 let collation = self.server_collation().cloned();
515 let mut values: Vec<mssql_types::SqlValue> = Vec::with_capacity(params.len());
516 let mut plaintext: Vec<RpcParam> = Vec::with_capacity(params.len());
517 let mut hints: Vec<Option<mssql_types::EncryptedParamType>> =
518 Vec::with_capacity(params.len());
519 for (i, p) in params.iter().enumerate() {
520 let name = format!("@p{}", i + 1);
521 let value = p.to_sql()?;
522 let hint = p.encrypted_param_type();
523 // A typed NULL (e.g. `null::<i32>()`) is declared by its SQL type so
524 // describe accepts it against the target encrypted column; an untyped
525 // NULL falls back to the default in `sql_value_to_rpc_param`.
526 let rpc_param = match (&value, null_param_type_info(p.sql_type())) {
527 (mssql_types::SqlValue::Null, Some(type_info)) => RpcParam::null(&name, type_info),
528 _ => {
529 let mut param = Self::sql_value_to_rpc_param(
530 &name,
531 &value,
532 send_unicode,
533 collation.as_ref(),
534 )?;
535 // A typed-parameter wrapper (e.g. `numeric(v, p, s)`,
536 // `datetime2(v, scale)`) declares an explicit SQL type so
537 // describe matches the encrypted column exactly — the value
538 // alone cannot convey precision/scale or the legacy-`datetime`
539 // vs `datetime2` distinction.
540 if let Some(ty) = hint {
541 param.type_info = encrypted_param_type_info(ty)?;
542 }
543 param
544 }
545 };
546 plaintext.push(rpc_param);
547 values.push(value);
548 hints.push(hint);
549 }
550
551 if plaintext.is_empty() {
552 return Ok(RpcRequest::execute_sql(sql, plaintext));
553 }
554
555 // Ask the server which parameters need encryption.
556 let declarations = RpcRequest::build_param_declarations(&plaintext);
557 let info = self
558 .describe_parameter_encryption(sql, &declarations)
559 .await?;
560 if info.parameters.is_empty() {
561 return Ok(RpcRequest::execute_sql(sql, plaintext));
562 }
563
564 // Encrypt the flagged parameters; pass the rest through untouched.
565 let mut final_params: Vec<RpcParam> = Vec::with_capacity(plaintext.len());
566 for ((value, param), hint) in values.into_iter().zip(plaintext).zip(hints) {
567 let Some(crypto) = info.get_parameter(¶m.name) else {
568 final_params.push(param);
569 continue;
570 };
571 let entry = info.cek_table.get(crypto.cek_ordinal).ok_or_else(|| {
572 Error::Protocol(format!(
573 "encrypted parameter {} references missing CEK ordinal {}",
574 param.name, crypto.cek_ordinal
575 ))
576 })?;
577 let metadata = tds_protocol::rpc::EncryptedParamMetadata {
578 base_type_info: param.type_info.clone(),
579 algorithm_id: crypto.algorithm_id,
580 encryption_type: crypto.encryption_type,
581 database_id: entry.database_id,
582 cek_id: entry.cek_id,
583 cek_version: entry.cek_version,
584 cek_md_version: entry.cek_md_version,
585 normalization_rule_version: crypto.normalization_rule_version,
586 };
587 // A NULL value bound to an encrypted column is sent as an encrypted
588 // NULL (the server rejects a plaintext parameter for an encrypted
589 // column); there is nothing to encrypt.
590 if matches!(value, mssql_types::SqlValue::Null) {
591 final_params.push(RpcParam::encrypted_null(param.name, metadata));
592 continue;
593 }
594 let normalized = crate::encryption::normalize_for_encryption(&value, hint)?;
595 let ciphertext = ctx
596 .encrypt_value(&normalized, entry, crypto.encryption_type)
597 .await?;
598 final_params.push(RpcParam::encrypted(
599 param.name,
600 bytes::Bytes::from(ciphertext),
601 metadata,
602 ));
603 }
604
605 Ok(RpcRequest::execute_sql(sql, final_params))
606 }
607
608 /// Start a bulk insert operation for the specified table.
609 ///
610 /// Sends the `INSERT BULK` statement to the server and returns a
611 /// [`crate::bulk::BulkWriter`] for streaming rows. The writer holds
612 /// a mutable borrow on the client, preventing other operations while
613 /// the bulk insert is in progress.
614 ///
615 /// # Example
616 ///
617 /// ```rust,no_run
618 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
619 /// use mssql_client::{BulkInsertBuilder, BulkColumn, SqlValue};
620 ///
621 /// let builder = BulkInsertBuilder::new("dbo.Users")
622 /// .with_typed_columns(vec![
623 /// BulkColumn::new("id", "INT", 0)?,
624 /// BulkColumn::new("name", "NVARCHAR(100)", 1)?,
625 /// ]);
626 ///
627 /// let mut writer = client.bulk_insert(&builder).await?;
628 /// writer.send_row_values(&[SqlValue::Int(1), SqlValue::String("Alice".into())])?;
629 /// writer.send_row_values(&[SqlValue::Int(2), SqlValue::String("Bob".into())])?;
630 /// let result = writer.finish().await?;
631 /// println!("Inserted {} rows", result.rows_affected);
632 /// # Ok(())
633 /// # }
634 /// ```
635 pub async fn bulk_insert(
636 &mut self,
637 builder: &crate::bulk::BulkInsertBuilder,
638 ) -> Result<crate::bulk::BulkWriter<'_, S>> {
639 use tds_protocol::token::{ColMetaData, Token};
640
641 tracing::debug!(
642 table = builder.table_name(),
643 columns = builder.columns().len(),
644 "starting bulk insert"
645 );
646
647 // Step 1: Query the server for column metadata.
648 // This gives us the exact type encoding the server expects for BulkLoad,
649 // following the pattern established by Tiberius.
650 let meta_query = format!("SELECT TOP 0 * FROM {}", builder.table_name());
651 let deadline = self.command_deadline();
652 let canceller = self.connection_cancel_handle();
653 let message = run_with_deadline(
654 async {
655 self.send_sql_batch(&meta_query).await?;
656 self.read_response_message().await
657 },
658 deadline,
659 canceller,
660 )
661 .await?;
662 self.in_flight = false;
663
664 // Capture both the raw COLMETADATA bytes and parsed column info
665 let raw_payload = message.payload.clone();
666 let mut parser = self.create_parser(message.payload);
667 let mut server_metadata: Option<ColMetaData> = None;
668 let mut meta_start: usize = 0;
669 let mut meta_end: usize = 0;
670
671 loop {
672 let pos_before = raw_payload.len() - parser.remaining();
673 let token = parser.next_token_with_metadata(server_metadata.as_ref())?;
674 let pos_after = raw_payload.len() - parser.remaining();
675 let Some(token) = token else { break };
676
677 match token {
678 Token::ColMetaData(meta) => {
679 meta_start = pos_before;
680 meta_end = pos_after;
681 server_metadata = Some(meta);
682 }
683 Token::Done(_) => break,
684 _ => {}
685 }
686 }
687
688 // Reject deprecated TEXT/NTEXT/IMAGE columns reported by the server.
689 // These types require a legacy TEXTPTR wire format that this driver
690 // does not support — users should migrate the column to VARCHAR(MAX) /
691 // NVARCHAR(MAX) / VARBINARY(MAX).
692 if let Some(ref meta) = server_metadata {
693 use tds_protocol::types::TypeId;
694 for col in meta.columns.iter() {
695 let (rejected, replacement) = match col.type_id {
696 TypeId::Text => (Some("TEXT"), "VARCHAR(MAX)"),
697 TypeId::NText => (Some("NTEXT"), "NVARCHAR(MAX)"),
698 TypeId::Image => (Some("IMAGE"), "VARBINARY(MAX)"),
699 _ => (None, ""),
700 };
701 if let Some(sql_type) = rejected {
702 return Err(Error::from(mssql_types::TypeError::UnsupportedType {
703 sql_type: sql_type.to_string(),
704 reason: format!(
705 "column `{}` in table `{}` is {} — TEXT/NTEXT/IMAGE \
706 are not supported. Alter the column to {} instead \
707 (Microsoft deprecated TEXT/NTEXT/IMAGE in SQL \
708 Server 2005).",
709 col.name,
710 builder.table_name(),
711 sql_type,
712 replacement,
713 ),
714 }));
715 }
716 }
717 }
718
719 // Step 2: Send INSERT BULK statement to put server in bulk load mode
720 let stmt = builder.build_insert_bulk_statement()?;
721 let deadline = self.command_deadline();
722 let canceller = self.connection_cancel_handle();
723 run_with_deadline(
724 async {
725 self.send_sql_batch(&stmt).await?;
726 self.read_execute_result().await
727 },
728 deadline,
729 canceller,
730 )
731 .await?;
732
733 // Step 3: Create bulk writer with server's metadata
734 let raw_meta = if meta_end > meta_start {
735 Some(raw_payload.slice(meta_start..meta_end))
736 } else {
737 None
738 };
739
740 let server_cols = server_metadata.as_ref().map(|m| m.columns.as_slice());
741 let bulk = crate::bulk::BulkInsert::new_with_server_metadata(
742 builder.columns().to_vec(),
743 builder.options().batch_size,
744 raw_meta,
745 server_cols,
746 );
747
748 Ok(crate::bulk::BulkWriter::new(self, bulk))
749 }
750
751 /// Start a bulk insert without querying the server for column metadata.
752 ///
753 /// Unlike [`bulk_insert()`](Self::bulk_insert), this method does not send
754 /// `SELECT TOP 0 * FROM table` to discover column types. Instead, the
755 /// column metadata is constructed from the `BulkColumn` types provided
756 /// on the builder. This saves a round-trip when the schema is known.
757 ///
758 /// # Caveats
759 ///
760 /// The caller must ensure `BulkColumn` entries match the target table's
761 /// column definitions exactly. Mismatched types, lengths, precision/scale,
762 /// or column ordering will cause the server to reject the BulkLoad packet.
763 ///
764 /// For most use cases, prefer [`bulk_insert()`](Self::bulk_insert) — the
765 /// extra round-trip is usually negligible and the server-supplied metadata
766 /// is guaranteed correct.
767 pub async fn bulk_insert_without_schema_discovery(
768 &mut self,
769 builder: &crate::bulk::BulkInsertBuilder,
770 ) -> Result<crate::bulk::BulkWriter<'_, S>> {
771 tracing::debug!(
772 table = builder.table_name(),
773 columns = builder.columns().len(),
774 "starting bulk insert (no schema discovery)"
775 );
776
777 // Send INSERT BULK statement to put server in bulk load mode
778 let stmt = builder.build_insert_bulk_statement()?;
779 let deadline = self.command_deadline();
780 let canceller = self.connection_cancel_handle();
781 run_with_deadline(
782 async {
783 self.send_sql_batch(&stmt).await?;
784 self.read_execute_result().await
785 },
786 deadline,
787 canceller,
788 )
789 .await?;
790
791 // Create bulk writer with hand-crafted metadata
792 let bulk =
793 crate::bulk::BulkInsert::new(builder.columns().to_vec(), builder.options().batch_size);
794
795 Ok(crate::bulk::BulkWriter::new(self, bulk))
796 }
797
798 /// Send bulk load data as a BulkLoad (0x07) message and read the server response.
799 ///
800 /// Used internally by [`crate::bulk::BulkWriter::finish()`] to transmit accumulated
801 /// row data after the `INSERT BULK` statement has been acknowledged.
802 pub(crate) async fn send_and_read_bulk_load(&mut self, payload: bytes::Bytes) -> Result<u64> {
803 let max_packet = self.config.packet_size as usize;
804
805 self.in_flight = true;
806 let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
807
808 match connection {
809 #[cfg(feature = "tls")]
810 ConnectionHandle::Tls(conn) => {
811 conn.send_message(PacketType::BulkLoad, payload, max_packet)
812 .await?;
813 }
814 #[cfg(feature = "tls")]
815 ConnectionHandle::TlsPrelogin(conn) => {
816 conn.send_message(PacketType::BulkLoad, payload, max_packet)
817 .await?;
818 }
819 ConnectionHandle::Plain(conn) => {
820 conn.send_message(PacketType::BulkLoad, payload, max_packet)
821 .await?;
822 }
823 }
824
825 // Read the server's Done response with row count
826 self.read_execute_result().await
827 }
828
829 /// Execute a query with named parameters and return a streaming result set.
830 ///
831 /// This method accepts [`NamedParam`](crate::to_params::NamedParam) values,
832 /// making it compatible with the [`ToParams`](crate::to_params::ToParams) trait
833 /// and the `#[derive(ToParams)]` macro.
834 ///
835 /// # Example
836 ///
837 /// ```rust,no_run
838 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
839 /// use mssql_client::{NamedParam, ToParams};
840 ///
841 /// // With derive macro:
842 /// #[derive(mssql_derive::ToParams)]
843 /// struct UserQuery { name: String }
844 ///
845 /// let q = UserQuery { name: "Alice".into() };
846 /// let rows = client.query_named(
847 /// "SELECT * FROM users WHERE name = @name",
848 /// &q.to_params()?,
849 /// ).await?;
850 ///
851 /// // Or manually:
852 /// let params = vec![NamedParam::from_value("name", &"Alice")?];
853 /// let rows = client.query_named(
854 /// "SELECT * FROM users WHERE name = @name",
855 /// ¶ms,
856 /// ).await?;
857 /// # let _ = rows;
858 /// # Ok(())
859 /// # }
860 /// ```
861 pub async fn query_named<'a>(
862 &'a mut self,
863 sql: &str,
864 params: &[crate::to_params::NamedParam],
865 ) -> Result<QueryStream<'a>> {
866 tracing::debug!(
867 sql = sql,
868 params_count = params.len(),
869 "executing query with named parameters"
870 );
871
872 if params.is_empty() {
873 self.send_sql_batch(sql).await?;
874 } else {
875 let rpc_params =
876 Self::convert_named_params(params, self.send_unicode(), self.server_collation())?;
877 let rpc = RpcRequest::execute_sql(sql, rpc_params);
878 self.send_rpc(&rpc).await?;
879 }
880
881 let resp = self.read_query_response().await?;
882 #[cfg(feature = "always-encrypted")]
883 {
884 Ok(QueryStream::from_raw(
885 resp.columns,
886 resp.pending_rows,
887 resp.meta,
888 resp.decryptor,
889 ))
890 }
891 #[cfg(not(feature = "always-encrypted"))]
892 {
893 Ok(QueryStream::from_raw(
894 resp.columns,
895 resp.pending_rows,
896 resp.meta,
897 ))
898 }
899 }
900
901 /// Execute a statement with named parameters.
902 ///
903 /// Returns the number of affected rows. This is the named-parameter
904 /// counterpart of [`execute()`](Client::execute), compatible with the
905 /// [`ToParams`](crate::to_params::ToParams) trait.
906 ///
907 /// # Example
908 ///
909 /// ```rust,no_run
910 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
911 /// use mssql_client::NamedParam;
912 ///
913 /// let params = vec![
914 /// NamedParam::from_value("name", &"Alice")?,
915 /// NamedParam::from_value("email", &"alice@example.com")?,
916 /// ];
917 /// let rows_affected = client.execute_named(
918 /// "INSERT INTO users (name, email) VALUES (@name, @email)",
919 /// ¶ms,
920 /// ).await?;
921 /// # let _ = rows_affected;
922 /// # Ok(())
923 /// # }
924 /// ```
925 pub async fn execute_named(
926 &mut self,
927 sql: &str,
928 params: &[crate::to_params::NamedParam],
929 ) -> Result<u64> {
930 tracing::debug!(
931 sql = sql,
932 params_count = params.len(),
933 "executing statement with named parameters"
934 );
935
936 let deadline = self.command_deadline();
937 let canceller = self.connection_cancel_handle();
938 run_with_deadline(
939 async {
940 if params.is_empty() {
941 self.send_sql_batch(sql).await?;
942 } else {
943 let rpc_params = Self::convert_named_params(
944 params,
945 self.send_unicode(),
946 self.server_collation(),
947 )?;
948 let rpc = RpcRequest::execute_sql(sql, rpc_params);
949 self.send_rpc(&rpc).await?;
950 }
951
952 self.read_execute_result().await
953 },
954 deadline,
955 canceller,
956 )
957 .await
958 }
959
960 /// Whether string parameters are sent as NVARCHAR (Unicode).
961 pub(crate) fn send_unicode(&self) -> bool {
962 self.config.send_string_parameters_as_unicode
963 }
964
965 /// Server's default collation, captured from ENVCHANGE during login.
966 pub(crate) fn server_collation(&self) -> Option<&tds_protocol::token::Collation> {
967 self.server_collation.as_ref()
968 }
969}
970
971impl Client<Ready> {
972 /// Mark this connection as needing a reset on next use.
973 ///
974 /// Called by the connection pool when a connection is returned.
975 /// The next SQL batch or RPC will include the RESETCONNECTION flag
976 /// in the TDS packet header, causing SQL Server to reset connection
977 /// state (temp tables, SET options, transaction isolation level, etc.)
978 /// before executing the command.
979 ///
980 /// This is more efficient than calling `sp_reset_connection` as a
981 /// separate command because it's handled at the TDS protocol level.
982 pub fn mark_needs_reset(&mut self) {
983 self.needs_reset = true;
984 }
985
986 /// Check if this connection needs a reset.
987 ///
988 /// Returns true if `mark_needs_reset()` was called and the reset
989 /// hasn't been performed yet.
990 #[must_use]
991 pub fn needs_reset(&self) -> bool {
992 self.needs_reset
993 }
994
995 /// Execute a query and return a result set with lazy per-row decoding.
996 ///
997 /// Per ADR-007 the full response is buffered in memory and each row is
998 /// *decoded* on demand as you iterate — this is not incremental network
999 /// streaming, so peak memory tracks the response size. Use
1000 /// `.collect_all()` if you want all rows materialized into a `Vec` up
1001 /// front.
1002 ///
1003 /// # Example
1004 ///
1005 /// ```rust,no_run
1006 /// # use mssql_client::Row;
1007 /// # fn process(_: &Row) {}
1008 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1009 /// // Streaming (synchronous iteration over the result set)
1010 /// let stream = client.query("SELECT * FROM users WHERE id = @p1", &[&1]).await?;
1011 /// for row in stream {
1012 /// let row = row?;
1013 /// process(&row);
1014 /// }
1015 ///
1016 /// // Buffered (loads all into memory)
1017 /// let rows: Vec<Row> = client
1018 /// .query("SELECT * FROM small_table", &[])
1019 /// .await?
1020 /// .collect_all()
1021 /// .await?;
1022 /// # let _ = rows;
1023 /// # Ok(())
1024 /// # }
1025 /// ```
1026 pub async fn query<'a>(
1027 &'a mut self,
1028 sql: &str,
1029 params: &[&(dyn crate::ToSql + Sync)],
1030 ) -> Result<QueryStream<'a>> {
1031 let deadline = self.command_deadline();
1032 self.query_inner(sql, params, deadline).await
1033 }
1034
1035 /// Shared query implementation with an explicit command deadline.
1036 async fn query_inner<'a>(
1037 &'a mut self,
1038 sql: &str,
1039 params: &[&(dyn crate::ToSql + Sync)],
1040 deadline: Option<std::time::Duration>,
1041 ) -> Result<QueryStream<'a>> {
1042 tracing::debug!(sql = sql, params_count = params.len(), "executing query");
1043
1044 #[cfg(feature = "otel")]
1045 let instrumentation = self.instrumentation.clone();
1046 #[cfg(feature = "otel")]
1047 let mut span = instrumentation.query_span(sql);
1048 #[cfg(feature = "otel")]
1049 let timer = crate::instrumentation::OperationTimer::start(
1050 crate::instrumentation::extract_operation(sql),
1051 );
1052
1053 let canceller = self.cancel_handle();
1054 let result = run_with_deadline(
1055 async {
1056 if params.is_empty() {
1057 // Simple query without parameters - use SQL batch
1058 self.send_sql_batch(sql).await?;
1059 } else {
1060 // Parameterized query - sp_executesql (encrypts Always Encrypted params).
1061 let rpc = self.build_parameterized_rpc(sql, params).await?;
1062 self.send_rpc(&rpc).await?;
1063 }
1064
1065 // Read complete response including columns and rows
1066 self.read_query_response().await
1067 },
1068 deadline,
1069 canceller,
1070 )
1071 .await;
1072
1073 #[cfg(feature = "otel")]
1074 match &result {
1075 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1076 Err(e) => InstrumentationContext::record_error(&mut span, e),
1077 }
1078 #[cfg(feature = "otel")]
1079 timer.finish(instrumentation.metrics(), result.is_ok());
1080
1081 // Drop the span before returning
1082 #[cfg(feature = "otel")]
1083 drop(span);
1084
1085 let resp = result?;
1086 #[cfg(feature = "always-encrypted")]
1087 {
1088 Ok(QueryStream::from_raw(
1089 resp.columns,
1090 resp.pending_rows,
1091 resp.meta,
1092 resp.decryptor,
1093 ))
1094 }
1095 #[cfg(not(feature = "always-encrypted"))]
1096 {
1097 Ok(QueryStream::from_raw(
1098 resp.columns,
1099 resp.pending_rows,
1100 resp.meta,
1101 ))
1102 }
1103 }
1104
1105 /// Execute a query with a specific timeout.
1106 ///
1107 /// This overrides the default `command_timeout` from the connection configuration
1108 /// for this specific query. If the query does not complete within the specified
1109 /// duration, the driver sends an Attention packet to cancel it server-side,
1110 /// drains the acknowledgement, and returns [`Error::CommandTimeout`] with the
1111 /// connection left usable for the next request.
1112 ///
1113 /// # Arguments
1114 ///
1115 /// * `sql` - The SQL query to execute
1116 /// * `params` - Query parameters
1117 /// * `timeout_duration` - Maximum time to wait for the query to complete
1118 ///
1119 /// # Example
1120 ///
1121 /// ```rust,no_run
1122 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1123 /// use std::time::Duration;
1124 ///
1125 /// // Execute with a 5-second timeout
1126 /// let rows = client
1127 /// .query_with_timeout(
1128 /// "SELECT * FROM large_table",
1129 /// &[],
1130 /// Duration::from_secs(5),
1131 /// )
1132 /// .await?;
1133 /// # let _ = rows;
1134 /// # Ok(())
1135 /// # }
1136 /// ```
1137 pub async fn query_with_timeout<'a>(
1138 &'a mut self,
1139 sql: &str,
1140 params: &[&(dyn crate::ToSql + Sync)],
1141 timeout_duration: std::time::Duration,
1142 ) -> Result<QueryStream<'a>> {
1143 self.query_inner(sql, params, Some(timeout_duration)).await
1144 }
1145
1146 /// Execute a batch that may return multiple result sets.
1147 ///
1148 /// This is useful for stored procedures or SQL batches that contain
1149 /// multiple SELECT statements.
1150 ///
1151 /// # Example
1152 ///
1153 /// ```rust,no_run
1154 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1155 /// // Execute a batch with multiple SELECT statements
1156 /// let mut results = client.query_multiple(
1157 /// "SELECT 1 AS a; SELECT 2 AS b, 3 AS c;",
1158 /// &[]
1159 /// ).await?;
1160 ///
1161 /// // Process first result set
1162 /// while let Some(row) = results.next_row().await? {
1163 /// println!("Result 1: {:?}", row);
1164 /// }
1165 ///
1166 /// // Move to second result set
1167 /// if results.next_result().await? {
1168 /// while let Some(row) = results.next_row().await? {
1169 /// println!("Result 2: {:?}", row);
1170 /// }
1171 /// }
1172 /// # Ok(())
1173 /// # }
1174 /// ```
1175 pub async fn query_multiple<'a>(
1176 &'a mut self,
1177 sql: &str,
1178 params: &[&(dyn crate::ToSql + Sync)],
1179 ) -> Result<MultiResultStream<'a>> {
1180 tracing::debug!(
1181 sql = sql,
1182 params_count = params.len(),
1183 "executing multi-result query"
1184 );
1185
1186 let deadline = self.command_deadline();
1187 let canceller = self.connection_cancel_handle();
1188 let result_sets = run_with_deadline(
1189 async {
1190 if params.is_empty() {
1191 // Simple batch without parameters - use SQL batch
1192 self.send_sql_batch(sql).await?;
1193 } else {
1194 // Parameterized query - sp_executesql (encrypts Always Encrypted params).
1195 let rpc = self.build_parameterized_rpc(sql, params).await?;
1196 self.send_rpc(&rpc).await?;
1197 }
1198
1199 // Read all result sets
1200 self.read_multi_result_response().await
1201 },
1202 deadline,
1203 canceller,
1204 )
1205 .await?;
1206 Ok(MultiResultStream::new(result_sets))
1207 }
1208
1209 /// Execute a query that doesn't return rows.
1210 ///
1211 /// Returns the number of affected rows.
1212 pub async fn execute(
1213 &mut self,
1214 sql: &str,
1215 params: &[&(dyn crate::ToSql + Sync)],
1216 ) -> Result<u64> {
1217 let deadline = self.command_deadline();
1218 self.execute_inner(sql, params, deadline).await
1219 }
1220
1221 /// Shared execute implementation with an explicit command deadline.
1222 async fn execute_inner(
1223 &mut self,
1224 sql: &str,
1225 params: &[&(dyn crate::ToSql + Sync)],
1226 deadline: Option<std::time::Duration>,
1227 ) -> Result<u64> {
1228 tracing::debug!(
1229 sql = sql,
1230 params_count = params.len(),
1231 "executing statement"
1232 );
1233
1234 #[cfg(feature = "otel")]
1235 let instrumentation = self.instrumentation.clone();
1236 #[cfg(feature = "otel")]
1237 let mut span = instrumentation.query_span(sql);
1238 #[cfg(feature = "otel")]
1239 let timer = crate::instrumentation::OperationTimer::start(
1240 crate::instrumentation::extract_operation(sql),
1241 );
1242
1243 let canceller = self.cancel_handle();
1244 let result = run_with_deadline(
1245 async {
1246 if params.is_empty() {
1247 // Simple statement without parameters - use SQL batch
1248 self.send_sql_batch(sql).await?;
1249 } else {
1250 // Parameterized statement - sp_executesql (encrypts Always Encrypted params).
1251 let rpc = self.build_parameterized_rpc(sql, params).await?;
1252 self.send_rpc(&rpc).await?;
1253 }
1254
1255 // Read response and get row count
1256 self.read_execute_result().await
1257 },
1258 deadline,
1259 canceller,
1260 )
1261 .await;
1262
1263 #[cfg(feature = "otel")]
1264 match &result {
1265 Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
1266 Err(e) => InstrumentationContext::record_error(&mut span, e),
1267 }
1268 #[cfg(feature = "otel")]
1269 timer.finish(instrumentation.metrics(), result.is_ok());
1270
1271 // Drop the span before returning
1272 #[cfg(feature = "otel")]
1273 drop(span);
1274
1275 result
1276 }
1277
1278 /// Execute a statement with a specific timeout.
1279 ///
1280 /// This overrides the default `command_timeout` from the connection configuration
1281 /// for this specific statement. If the statement does not complete within the
1282 /// specified duration, the driver sends an Attention packet to cancel it
1283 /// server-side, drains the acknowledgement, and returns
1284 /// [`Error::CommandTimeout`] with the connection left usable.
1285 ///
1286 /// # Arguments
1287 ///
1288 /// * `sql` - The SQL statement to execute
1289 /// * `params` - Statement parameters
1290 /// * `timeout_duration` - Maximum time to wait for the statement to complete
1291 ///
1292 /// # Example
1293 ///
1294 /// ```rust,no_run
1295 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1296 /// use std::time::Duration;
1297 ///
1298 /// // Execute with a 10-second timeout
1299 /// let rows_affected = client
1300 /// .execute_with_timeout(
1301 /// "UPDATE large_table SET status = @p1",
1302 /// &[&"processed"],
1303 /// Duration::from_secs(10),
1304 /// )
1305 /// .await?;
1306 /// # let _ = rows_affected;
1307 /// # Ok(())
1308 /// # }
1309 /// ```
1310 pub async fn execute_with_timeout(
1311 &mut self,
1312 sql: &str,
1313 params: &[&(dyn crate::ToSql + Sync)],
1314 timeout_duration: std::time::Duration,
1315 ) -> Result<u64> {
1316 self.execute_inner(sql, params, Some(timeout_duration))
1317 .await
1318 }
1319
1320 /// Begin a transaction.
1321 ///
1322 /// This transitions the client from `Ready` to `InTransaction` state.
1323 /// Per MS-TDS spec, the server returns a transaction descriptor in the
1324 /// BeginTransaction EnvChange token that must be included in subsequent
1325 /// ALL_HEADERS sections.
1326 pub async fn begin_transaction(mut self) -> Result<Client<InTransaction>> {
1327 tracing::debug!("beginning transaction");
1328
1329 #[cfg(feature = "otel")]
1330 let instrumentation = self.instrumentation.clone();
1331 #[cfg(feature = "otel")]
1332 let mut span = instrumentation.transaction_span("BEGIN");
1333
1334 // Execute BEGIN TRANSACTION and extract the transaction descriptor
1335 let result = async {
1336 self.send_sql_batch("BEGIN TRANSACTION").await?;
1337 self.read_transaction_begin_result().await
1338 }
1339 .await;
1340
1341 #[cfg(feature = "otel")]
1342 match &result {
1343 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1344 Err(e) => InstrumentationContext::record_error(&mut span, e),
1345 }
1346
1347 // Drop the span before moving instrumentation
1348 #[cfg(feature = "otel")]
1349 drop(span);
1350
1351 let transaction_descriptor = result?;
1352
1353 Ok(Client {
1354 config: self.config,
1355 _state: PhantomData,
1356 connection: self.connection,
1357 server_version: self.server_version,
1358 current_database: self.current_database,
1359 server_collation: self.server_collation,
1360 statement_cache: self.statement_cache,
1361 transaction_descriptor, // Store the descriptor from server
1362 needs_reset: self.needs_reset,
1363 in_flight: self.in_flight,
1364 #[cfg(feature = "otel")]
1365 instrumentation: self.instrumentation,
1366 #[cfg(feature = "always-encrypted")]
1367 encryption_context: self.encryption_context,
1368 })
1369 }
1370
1371 /// Begin a transaction with a specific isolation level.
1372 ///
1373 /// This transitions the client from `Ready` to `InTransaction` state
1374 /// with the specified isolation level.
1375 ///
1376 /// # Example
1377 ///
1378 /// ```rust,no_run
1379 /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1380 /// use mssql_client::IsolationLevel;
1381 ///
1382 /// let tx = client.begin_transaction_with_isolation(IsolationLevel::Serializable).await?;
1383 /// // All operations in this transaction use SERIALIZABLE isolation
1384 /// tx.commit().await?;
1385 /// # Ok(())
1386 /// # }
1387 /// ```
1388 pub async fn begin_transaction_with_isolation(
1389 mut self,
1390 isolation_level: crate::transaction::IsolationLevel,
1391 ) -> Result<Client<InTransaction>> {
1392 tracing::debug!(
1393 isolation_level = %isolation_level.name(),
1394 "beginning transaction with isolation level"
1395 );
1396
1397 #[cfg(feature = "otel")]
1398 let instrumentation = self.instrumentation.clone();
1399 #[cfg(feature = "otel")]
1400 let mut span = instrumentation.transaction_span("BEGIN");
1401
1402 // First set the isolation level
1403 let result = async {
1404 self.send_sql_batch(isolation_level.as_sql()).await?;
1405 self.read_execute_result().await?;
1406
1407 // Then begin the transaction
1408 self.send_sql_batch("BEGIN TRANSACTION").await?;
1409 self.read_transaction_begin_result().await
1410 }
1411 .await;
1412
1413 #[cfg(feature = "otel")]
1414 match &result {
1415 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1416 Err(e) => InstrumentationContext::record_error(&mut span, e),
1417 }
1418
1419 #[cfg(feature = "otel")]
1420 drop(span);
1421
1422 let transaction_descriptor = result?;
1423
1424 Ok(Client {
1425 config: self.config,
1426 _state: PhantomData,
1427 connection: self.connection,
1428 server_version: self.server_version,
1429 current_database: self.current_database,
1430 server_collation: self.server_collation,
1431 statement_cache: self.statement_cache,
1432 transaction_descriptor,
1433 needs_reset: self.needs_reset,
1434 in_flight: self.in_flight,
1435 #[cfg(feature = "otel")]
1436 instrumentation: self.instrumentation,
1437 #[cfg(feature = "always-encrypted")]
1438 encryption_context: self.encryption_context,
1439 })
1440 }
1441
1442 /// Execute a simple query without parameters.
1443 ///
1444 /// This is useful for DDL statements and simple queries where you
1445 /// don't need to retrieve the affected row count.
1446 pub async fn simple_query(&mut self, sql: &str) -> Result<()> {
1447 tracing::debug!(sql = sql, "executing simple query");
1448
1449 // Send SQL batch
1450 self.send_sql_batch(sql).await?;
1451
1452 // Read and discard response
1453 let _ = self.read_execute_result().await?;
1454
1455 Ok(())
1456 }
1457
1458 /// Close the connection gracefully.
1459 pub async fn close(self) -> Result<()> {
1460 tracing::debug!("closing connection");
1461 Ok(())
1462 }
1463
1464 /// Get the current database name.
1465 #[must_use]
1466 pub fn database(&self) -> Option<&str> {
1467 self.config.database.as_deref()
1468 }
1469
1470 /// Get the server host.
1471 #[must_use]
1472 pub fn host(&self) -> &str {
1473 &self.config.host
1474 }
1475
1476 /// Get the server port.
1477 #[must_use]
1478 pub fn port(&self) -> u16 {
1479 self.config.port
1480 }
1481
1482 /// Check if the connection is currently in a transaction.
1483 ///
1484 /// This returns `true` if a transaction was started via raw SQL
1485 /// (`BEGIN TRANSACTION`) and has not yet been committed or rolled back.
1486 ///
1487 /// Note: This only tracks transactions started via raw SQL. Transactions
1488 /// started via the type-state API (`begin_transaction()`) result in a
1489 /// `Client<InTransaction>` which is a different type.
1490 ///
1491 /// # Example
1492 ///
1493 /// ```rust,no_run
1494 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1495 /// client.execute("BEGIN TRANSACTION", &[]).await?;
1496 /// assert!(client.is_in_transaction());
1497 ///
1498 /// client.execute("COMMIT", &[]).await?;
1499 /// assert!(!client.is_in_transaction());
1500 /// # Ok(())
1501 /// # }
1502 /// ```
1503 #[must_use]
1504 pub fn is_in_transaction(&self) -> bool {
1505 self.transaction_descriptor != 0
1506 }
1507
1508 /// Check if a request is in-flight (sent but response not fully read).
1509 ///
1510 /// Used by the connection pool to detect dirty connections that were
1511 /// interrupted mid-query (e.g., by `tokio::select!` or a timeout).
1512 /// A connection with an in-flight request has unread data in the TCP
1513 /// buffer and must be discarded rather than returned to the pool.
1514 #[must_use]
1515 pub fn is_in_flight(&self) -> bool {
1516 self.in_flight
1517 }
1518
1519 /// Report whether an Always Encrypted key-store provider with the given
1520 /// name is currently reachable through this client's encryption context.
1521 ///
1522 /// Returns `false` when the `always-encrypted` feature isn't enabled, when
1523 /// the connection was opened without `column_encryption` configured, or
1524 /// when no matching provider was registered.
1525 #[cfg(feature = "always-encrypted")]
1526 #[must_use]
1527 pub fn has_encryption_provider(&self, name: &str) -> bool {
1528 self.encryption_context
1529 .as_ref()
1530 .is_some_and(|ctx| ctx.has_provider(name))
1531 }
1532
1533 /// Get a handle for cancelling the current query.
1534 ///
1535 /// The cancel handle can be cloned and sent to other tasks, enabling
1536 /// cancellation of long-running queries from a separate async context.
1537 ///
1538 /// # Example
1539 ///
1540 /// ```rust,no_run
1541 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1542 /// use std::time::Duration;
1543 ///
1544 /// let cancel_handle = client.cancel_handle();
1545 ///
1546 /// // Spawn a task to cancel after 10 seconds
1547 /// let handle = tokio::spawn(async move {
1548 /// tokio::time::sleep(Duration::from_secs(10)).await;
1549 /// let _ = cancel_handle.cancel().await;
1550 /// });
1551 ///
1552 /// // This query will be cancelled if it runs longer than 10 seconds
1553 /// let result = client.query("SELECT * FROM very_large_table", &[]).await;
1554 /// # let _ = (handle, result);
1555 /// # Ok(())
1556 /// # }
1557 /// ```
1558 #[must_use]
1559 pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
1560 self.connection_cancel_handle()
1561 }
1562}
1563
1564/// # Drop Behavior
1565///
1566/// **`Client<InTransaction>` has no automatic rollback on drop.** If the client is
1567/// dropped without calling [`commit()`](Client::commit) or [`rollback()`](Client::rollback),
1568/// the transaction remains open on the server until the TCP connection closes
1569/// (at which point SQL Server automatically rolls back).
1570///
1571/// This is because `Drop` is synchronous and cannot perform the async I/O needed
1572/// to send a `ROLLBACK TRANSACTION` command.
1573///
1574/// ## Consequences of dropping without commit/rollback
1575///
1576/// - **Direct connections:** The transaction leaks until the OS TCP timeout
1577/// (potentially 30+ minutes), holding locks on any modified rows.
1578/// - **Pooled connections:** The pool detects the active transaction descriptor
1579/// and discards the connection rather than returning it to the idle pool
1580/// (see `PooledConnection::drop` in `mssql-driver-pool`).
1581///
1582/// ## Best practice
1583///
1584/// Always ensure `commit()` or `rollback()` is called. Use helper patterns
1585/// for error paths:
1586///
1587/// ```rust,no_run
1588/// # async fn do_work(_: &mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> { Ok(()) }
1589/// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1590/// let tx = client.begin_transaction().await?;
1591/// match do_work(&tx).await {
1592/// Ok(_) => { tx.commit().await?; }
1593/// Err(e) => { tx.rollback().await?; return Err(e); }
1594/// }
1595/// # Ok(())
1596/// # }
1597/// ```
1598impl Client<InTransaction> {
1599 /// Execute a query within the transaction and return a streaming result set.
1600 ///
1601 /// See [`Client<Ready>::query`] for usage examples.
1602 pub async fn query<'a>(
1603 &'a mut self,
1604 sql: &str,
1605 params: &[&(dyn crate::ToSql + Sync)],
1606 ) -> Result<QueryStream<'a>> {
1607 let deadline = self.command_deadline();
1608 self.query_inner(sql, params, deadline).await
1609 }
1610
1611 /// Shared query implementation with an explicit command deadline.
1612 async fn query_inner<'a>(
1613 &'a mut self,
1614 sql: &str,
1615 params: &[&(dyn crate::ToSql + Sync)],
1616 deadline: Option<std::time::Duration>,
1617 ) -> Result<QueryStream<'a>> {
1618 tracing::debug!(
1619 sql = sql,
1620 params_count = params.len(),
1621 "executing query in transaction"
1622 );
1623
1624 #[cfg(feature = "otel")]
1625 let instrumentation = self.instrumentation.clone();
1626 #[cfg(feature = "otel")]
1627 let mut span = instrumentation.query_span(sql);
1628 #[cfg(feature = "otel")]
1629 let timer = crate::instrumentation::OperationTimer::start(
1630 crate::instrumentation::extract_operation(sql),
1631 );
1632
1633 let canceller = self.cancel_handle();
1634 let result = run_with_deadline(
1635 async {
1636 if params.is_empty() {
1637 // Simple query without parameters - use SQL batch
1638 self.send_sql_batch(sql).await?;
1639 } else {
1640 // Parameterized query - sp_executesql (encrypts Always Encrypted params).
1641 let rpc = self.build_parameterized_rpc(sql, params).await?;
1642 self.send_rpc(&rpc).await?;
1643 }
1644
1645 // Read complete response including columns and rows
1646 self.read_query_response().await
1647 },
1648 deadline,
1649 canceller,
1650 )
1651 .await;
1652
1653 #[cfg(feature = "otel")]
1654 match &result {
1655 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1656 Err(e) => InstrumentationContext::record_error(&mut span, e),
1657 }
1658 #[cfg(feature = "otel")]
1659 timer.finish(instrumentation.metrics(), result.is_ok());
1660
1661 // Drop the span before returning
1662 #[cfg(feature = "otel")]
1663 drop(span);
1664
1665 let resp = result?;
1666 #[cfg(feature = "always-encrypted")]
1667 {
1668 Ok(QueryStream::from_raw(
1669 resp.columns,
1670 resp.pending_rows,
1671 resp.meta,
1672 resp.decryptor,
1673 ))
1674 }
1675 #[cfg(not(feature = "always-encrypted"))]
1676 {
1677 Ok(QueryStream::from_raw(
1678 resp.columns,
1679 resp.pending_rows,
1680 resp.meta,
1681 ))
1682 }
1683 }
1684
1685 /// Execute a statement within the transaction.
1686 ///
1687 /// Returns the number of affected rows.
1688 pub async fn execute(
1689 &mut self,
1690 sql: &str,
1691 params: &[&(dyn crate::ToSql + Sync)],
1692 ) -> Result<u64> {
1693 let deadline = self.command_deadline();
1694 self.execute_inner(sql, params, deadline).await
1695 }
1696
1697 /// Shared execute implementation with an explicit command deadline.
1698 async fn execute_inner(
1699 &mut self,
1700 sql: &str,
1701 params: &[&(dyn crate::ToSql + Sync)],
1702 deadline: Option<std::time::Duration>,
1703 ) -> Result<u64> {
1704 tracing::debug!(
1705 sql = sql,
1706 params_count = params.len(),
1707 "executing statement in transaction"
1708 );
1709
1710 #[cfg(feature = "otel")]
1711 let instrumentation = self.instrumentation.clone();
1712 #[cfg(feature = "otel")]
1713 let mut span = instrumentation.query_span(sql);
1714 #[cfg(feature = "otel")]
1715 let timer = crate::instrumentation::OperationTimer::start(
1716 crate::instrumentation::extract_operation(sql),
1717 );
1718
1719 let canceller = self.cancel_handle();
1720 let result = run_with_deadline(
1721 async {
1722 if params.is_empty() {
1723 // Simple statement without parameters - use SQL batch
1724 self.send_sql_batch(sql).await?;
1725 } else {
1726 // Parameterized statement - sp_executesql (encrypts Always Encrypted params).
1727 let rpc = self.build_parameterized_rpc(sql, params).await?;
1728 self.send_rpc(&rpc).await?;
1729 }
1730
1731 // Read response and get row count
1732 self.read_execute_result().await
1733 },
1734 deadline,
1735 canceller,
1736 )
1737 .await;
1738
1739 #[cfg(feature = "otel")]
1740 match &result {
1741 Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
1742 Err(e) => InstrumentationContext::record_error(&mut span, e),
1743 }
1744 #[cfg(feature = "otel")]
1745 timer.finish(instrumentation.metrics(), result.is_ok());
1746
1747 // Drop the span before returning
1748 #[cfg(feature = "otel")]
1749 drop(span);
1750
1751 result
1752 }
1753
1754 /// Execute a query within the transaction with a specific timeout.
1755 ///
1756 /// See [`Client<Ready>::query_with_timeout`] for details.
1757 pub async fn query_with_timeout<'a>(
1758 &'a mut self,
1759 sql: &str,
1760 params: &[&(dyn crate::ToSql + Sync)],
1761 timeout_duration: std::time::Duration,
1762 ) -> Result<QueryStream<'a>> {
1763 self.query_inner(sql, params, Some(timeout_duration)).await
1764 }
1765
1766 /// Execute a statement within the transaction with a specific timeout.
1767 ///
1768 /// See [`Client<Ready>::execute_with_timeout`] for details.
1769 pub async fn execute_with_timeout(
1770 &mut self,
1771 sql: &str,
1772 params: &[&(dyn crate::ToSql + Sync)],
1773 timeout_duration: std::time::Duration,
1774 ) -> Result<u64> {
1775 self.execute_inner(sql, params, Some(timeout_duration))
1776 .await
1777 }
1778
1779 /// Open a FILESTREAM BLOB for async reading and/or writing.
1780 ///
1781 /// This method queries the server for the transaction context, then opens
1782 /// the FILESTREAM handle using the native Win32 `OpenSqlFilestream` API.
1783 ///
1784 /// # Arguments
1785 ///
1786 /// * `path` — The UNC path obtained from the T-SQL `column.PathName()` function.
1787 /// Query this yourself before calling `open_filestream`:
1788 /// ```sql
1789 /// SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1
1790 /// ```
1791 /// * `access` — Read, write, or read/write access mode.
1792 ///
1793 /// # Requirements
1794 ///
1795 /// - SQL Server must have FILESTREAM enabled (`sp_configure 'filestream access level', 2`)
1796 /// - The Microsoft OLE DB Driver for SQL Server must be installed on the client
1797 /// - The `FileStream` must be dropped before calling [`commit`] or [`rollback`]
1798 ///
1799 /// # Example
1800 ///
1801 /// ```text
1802 /// use mssql_client::FileStreamAccess;
1803 /// use tokio::io::AsyncReadExt;
1804 ///
1805 /// let mut tx = client.begin_transaction().await?;
1806 ///
1807 /// // Get the FILESTREAM path
1808 /// let rows = tx.query(
1809 /// "SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1",
1810 /// &[&doc_id],
1811 /// ).await?;
1812 /// let path: String = rows.into_iter().next().unwrap()?.get(0)?;
1813 ///
1814 /// // Open and read the BLOB
1815 /// let mut stream = tx.open_filestream(&path, FileStreamAccess::Read).await?;
1816 /// let mut data = Vec::new();
1817 /// stream.read_to_end(&mut data).await?;
1818 /// drop(stream);
1819 ///
1820 /// tx.commit().await?;
1821 /// ```
1822 #[cfg(all(windows, feature = "filestream"))]
1823 pub async fn open_filestream(
1824 &mut self,
1825 path: &str,
1826 access: crate::filestream::FileStreamAccess,
1827 ) -> Result<crate::filestream::FileStream> {
1828 tracing::debug!(path = path, ?access, "opening FILESTREAM BLOB");
1829
1830 // Get the transaction context from SQL Server.
1831 // This binds the file access to the current SQL transaction.
1832 let txn_context: Vec<u8> = {
1833 let rows = self
1834 .query("SELECT GET_FILESTREAM_TRANSACTION_CONTEXT()", &[])
1835 .await?;
1836 let mut ctx = None;
1837 for result in rows {
1838 let row = result?;
1839 ctx = Some(row.get::<Vec<u8>>(0)?);
1840 }
1841 ctx.ok_or_else(|| {
1842 Error::FileStream("GET_FILESTREAM_TRANSACTION_CONTEXT() returned no rows".into())
1843 })?
1844 };
1845
1846 crate::filestream::FileStream::open(path, access, &txn_context)
1847 }
1848
1849 /// Commit the transaction.
1850 ///
1851 /// This transitions the client back to `Ready` state.
1852 pub async fn commit(mut self) -> Result<Client<Ready>> {
1853 tracing::debug!("committing transaction");
1854
1855 #[cfg(feature = "otel")]
1856 let instrumentation = self.instrumentation.clone();
1857 #[cfg(feature = "otel")]
1858 let mut span = instrumentation.transaction_span("COMMIT");
1859
1860 // Execute COMMIT TRANSACTION
1861 let result = async {
1862 self.send_sql_batch("COMMIT TRANSACTION").await?;
1863 self.read_execute_result().await
1864 }
1865 .await;
1866
1867 #[cfg(feature = "otel")]
1868 match &result {
1869 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1870 Err(e) => InstrumentationContext::record_error(&mut span, e),
1871 }
1872
1873 // Drop the span before moving instrumentation
1874 #[cfg(feature = "otel")]
1875 drop(span);
1876
1877 result?;
1878
1879 Ok(Client {
1880 config: self.config,
1881 _state: PhantomData,
1882 connection: self.connection,
1883 server_version: self.server_version,
1884 current_database: self.current_database,
1885 server_collation: self.server_collation,
1886 statement_cache: self.statement_cache,
1887 transaction_descriptor: 0, // Reset to auto-commit mode
1888 needs_reset: self.needs_reset,
1889 in_flight: self.in_flight,
1890 #[cfg(feature = "otel")]
1891 instrumentation: self.instrumentation,
1892 #[cfg(feature = "always-encrypted")]
1893 encryption_context: self.encryption_context,
1894 })
1895 }
1896
1897 /// Rollback the transaction.
1898 ///
1899 /// This transitions the client back to `Ready` state.
1900 pub async fn rollback(mut self) -> Result<Client<Ready>> {
1901 tracing::debug!("rolling back transaction");
1902
1903 #[cfg(feature = "otel")]
1904 let instrumentation = self.instrumentation.clone();
1905 #[cfg(feature = "otel")]
1906 let mut span = instrumentation.transaction_span("ROLLBACK");
1907
1908 // Execute ROLLBACK TRANSACTION
1909 let result = async {
1910 self.send_sql_batch("ROLLBACK TRANSACTION").await?;
1911 self.read_execute_result().await
1912 }
1913 .await;
1914
1915 #[cfg(feature = "otel")]
1916 match &result {
1917 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1918 Err(e) => InstrumentationContext::record_error(&mut span, e),
1919 }
1920
1921 // Drop the span before moving instrumentation
1922 #[cfg(feature = "otel")]
1923 drop(span);
1924
1925 result?;
1926
1927 Ok(Client {
1928 config: self.config,
1929 _state: PhantomData,
1930 connection: self.connection,
1931 server_version: self.server_version,
1932 current_database: self.current_database,
1933 server_collation: self.server_collation,
1934 statement_cache: self.statement_cache,
1935 transaction_descriptor: 0, // Reset to auto-commit mode
1936 needs_reset: self.needs_reset,
1937 in_flight: self.in_flight,
1938 #[cfg(feature = "otel")]
1939 instrumentation: self.instrumentation,
1940 #[cfg(feature = "always-encrypted")]
1941 encryption_context: self.encryption_context,
1942 })
1943 }
1944
1945 /// Create a savepoint and return a handle for later rollback.
1946 ///
1947 /// The returned `SavePoint` handle contains the validated savepoint name.
1948 /// Use it with `rollback_to()` to partially undo transaction work.
1949 ///
1950 /// # Example
1951 ///
1952 /// ```rust,no_run
1953 /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1954 /// let mut tx = client.begin_transaction().await?;
1955 /// tx.execute("INSERT INTO orders ...", &[]).await?;
1956 /// let sp = tx.save_point("before_items").await?;
1957 /// tx.execute("INSERT INTO items ...", &[]).await?;
1958 /// // Oops, rollback just the items
1959 /// tx.rollback_to(&sp).await?;
1960 /// tx.commit().await?;
1961 /// # Ok(())
1962 /// # }
1963 /// ```
1964 pub async fn save_point(&mut self, name: &str) -> Result<SavePoint> {
1965 crate::validation::validate_identifier(name)?;
1966 tracing::debug!(name = name, "creating savepoint");
1967
1968 // Execute SAVE TRANSACTION <name>
1969 // Note: name is validated by validate_identifier() to prevent SQL injection
1970 let sql = format!("SAVE TRANSACTION {name}");
1971 self.send_sql_batch(&sql).await?;
1972 self.read_execute_result().await?;
1973
1974 Ok(SavePoint::new(name.to_string()))
1975 }
1976
1977 /// Rollback to a savepoint.
1978 ///
1979 /// This rolls back all changes made after the savepoint was created,
1980 /// but keeps the transaction active. The savepoint remains valid and
1981 /// can be rolled back to again.
1982 ///
1983 /// # Example
1984 ///
1985 /// ```rust,no_run
1986 /// # async fn ex(mut tx: mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> {
1987 /// let sp = tx.save_point("checkpoint").await?;
1988 /// // ... do some work ...
1989 /// tx.rollback_to(&sp).await?; // Undo changes since checkpoint
1990 /// // Transaction is still active, savepoint is still valid
1991 /// # Ok(())
1992 /// # }
1993 /// ```
1994 pub async fn rollback_to(&mut self, savepoint: &SavePoint) -> Result<()> {
1995 tracing::debug!(name = savepoint.name(), "rolling back to savepoint");
1996
1997 // Execute ROLLBACK TRANSACTION <name>
1998 // Note: savepoint name was validated during creation
1999 let sql = format!("ROLLBACK TRANSACTION {}", savepoint.name());
2000 self.send_sql_batch(&sql).await?;
2001 self.read_execute_result().await?;
2002
2003 Ok(())
2004 }
2005
2006 /// Release a savepoint (optional cleanup).
2007 ///
2008 /// Note: SQL Server doesn't have explicit savepoint release, but this
2009 /// method is provided for API completeness. The savepoint is automatically
2010 /// released when the transaction commits or rolls back.
2011 pub async fn release_savepoint(&mut self, savepoint: SavePoint) -> Result<()> {
2012 tracing::debug!(name = savepoint.name(), "releasing savepoint");
2013
2014 // SQL Server doesn't require explicit savepoint release
2015 // The savepoint is implicitly released on commit/rollback
2016 // This method exists for API completeness
2017 drop(savepoint);
2018 Ok(())
2019 }
2020
2021 /// Get a handle for cancelling the current query within the transaction.
2022 ///
2023 /// See [`Client<Ready>::cancel_handle`] for usage examples.
2024 #[must_use]
2025 pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
2026 self.connection_cancel_handle()
2027 }
2028}
2029
2030impl<S: ConnectionState> std::fmt::Debug for Client<S> {
2031 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2032 f.debug_struct("Client")
2033 .field("host", &self.config.host)
2034 .field("port", &self.config.port)
2035 .field("database", &self.config.database)
2036 .finish()
2037 }
2038}