mssql_client/client.rs
1//! SQL Server client implementation.
2//!
3//! ## DDL and statement routing
4//!
5//! [`Client::execute`] routes automatically by parameter count: with no
6//! parameters it sends a SQL batch (which permits DDL such as `CREATE` / `ALTER`
7//! / `DROP`); with parameters it uses `sp_executesql`, whose procedure context
8//! SQL Server forbids DDL in. Run DDL with an empty parameter slice:
9//!
10//! ```rust,no_run
11//! # async fn create_table(config: mssql_client::Config) -> Result<(), mssql_client::Error> {
12//! # let mut client = mssql_client::Client::connect(config).await?;
13//! client.execute("CREATE TABLE dbo.t (id INT)", &[]).await?;
14//! # Ok(())
15//! # }
16//! ```
17//!
18//! Use [`Client::simple_query`] for fire-and-forget batches (including
19//! multi-statement, `;`-separated DDL) when you don't need the affected-row count.
20
21// Allow unwrap/expect for chrono date construction with known-valid constant dates
22// and for regex patterns that are compile-time constants
23#![allow(clippy::unwrap_used, clippy::expect_used, clippy::needless_range_loop)]
24
25mod connect;
26mod params;
27pub(crate) mod response;
28
29use std::marker::PhantomData;
30
31use mssql_codec::connection::Connection;
32#[cfg(feature = "tls")]
33use mssql_tls::TlsStream;
34use tds_protocol::packet::PacketType;
35use tds_protocol::rpc::RpcRequest;
36use tds_protocol::token::{EnvChange, EnvChangeType};
37use tokio::net::TcpStream;
38
39use crate::config::Config;
40use crate::error::{Error, Result};
41#[cfg(feature = "otel")]
42use crate::instrumentation::InstrumentationContext;
43use crate::state::{ConnectionState, InTransaction, Ready};
44use crate::statement_cache::StatementCache;
45use crate::stream::{MultiResultStream, QueryStream};
46use crate::transaction::SavePoint;
47
48/// How long to wait for the server to acknowledge an Attention packet after
49/// a command timeout. SqlClient waits 5 seconds before dooming the
50/// connection; we match it.
51const ATTENTION_ACK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
52
53/// Run a network future under an optional command deadline.
54///
55/// On timeout this sends an Attention packet via `canceller` and then awaits
56/// the future so its own read loop drains the server's DONE_ATTN
57/// acknowledgement, leaving the connection clean before returning
58/// [`Error::CommandTimeout`]. This is the cancel-safe alternative to dropping
59/// the future (e.g. via `tokio::time::timeout`), which would leave unconsumed
60/// TDS data in the connection buffer and desync the next request.
61///
62/// The drain itself is bounded by [`ATTENTION_ACK_TIMEOUT`] — a hung server
63/// that never acknowledges the attention must not turn the timeout into an
64/// infinite wait. When the bound expires the connection is abandoned
65/// mid-response: `in_flight` stays set, so the pool discards the connection
66/// at check-in instead of reusing it.
67pub(crate) async fn run_with_deadline<F, T>(
68 fut: F,
69 deadline: Option<std::time::Duration>,
70 canceller: crate::cancel::CancelHandle,
71) -> Result<T>
72where
73 F: std::future::Future<Output = Result<T>>,
74{
75 let Some(d) = deadline else {
76 return fut.await;
77 };
78 tokio::pin!(fut);
79 tokio::select! {
80 biased;
81 res = &mut fut => res,
82 () = tokio::time::sleep(d) => {
83 // Signal cancellation, then let the in-flight read consume the
84 // server's attention acknowledgement so the connection stays usable.
85 let drain = async {
86 let _ = canceller.cancel().await;
87 let _ = (&mut fut).await;
88 };
89 if tokio::time::timeout(ATTENTION_ACK_TIMEOUT, drain).await.is_err() {
90 tracing::warn!(
91 timeout = ?ATTENTION_ACK_TIMEOUT,
92 "server did not acknowledge attention; abandoning the connection as dirty"
93 );
94 }
95 Err(Error::CommandTimeout)
96 }
97 }
98}
99
100/// SQL Server client with type-state connection management.
101///
102/// The generic parameter `S` represents the current connection state,
103/// ensuring at compile time that certain operations are only available
104/// in appropriate states.
105pub struct Client<S: ConnectionState> {
106 config: Config,
107 _state: PhantomData<S>,
108 /// The underlying connection (present only when connected)
109 connection: Option<ConnectionHandle>,
110 /// Server version from LoginAck (raw u32 TDS version)
111 server_version: Option<u32>,
112 /// Current database from EnvChange
113 current_database: Option<String>,
114 /// Server's default collation from SqlCollation EnvChange during login.
115 /// Used when `SendStringParametersAsUnicode=false` to encode VARCHAR
116 /// parameters with the correct character encoding and collation bytes.
117 server_collation: Option<tds_protocol::token::Collation>,
118 /// Prepared statement cache for query optimization
119 statement_cache: StatementCache,
120 /// Transaction descriptor from BeginTransaction EnvChange.
121 /// Per MS-TDS spec, this value must be included in ALL_HEADERS for subsequent
122 /// requests within an explicit transaction. 0 indicates auto-commit mode.
123 transaction_descriptor: u64,
124 /// Whether a request has been sent and the response has not yet been fully read.
125 /// Used by the connection pool to detect dirty connections after cancel/timeout.
126 in_flight: bool,
127 /// Whether this connection needs a reset on next use.
128 /// Set by connection pool on checkin, cleared after first query/execute.
129 /// When true, the RESETCONNECTION flag is set on the first TDS packet.
130 needs_reset: bool,
131 /// OpenTelemetry instrumentation context (when otel feature is enabled)
132 #[cfg(feature = "otel")]
133 instrumentation: InstrumentationContext,
134 /// Always Encrypted context for column decryption (when always-encrypted feature is enabled)
135 #[cfg(feature = "always-encrypted")]
136 pub(crate) encryption_context: Option<std::sync::Arc<crate::encryption::EncryptionContext>>,
137}
138
139/// Internal connection handle wrapping the actual connection.
140///
141/// This is an enum to support different connection types:
142/// - TLS (TDS 8.0 strict mode) - requires `tls` feature
143/// - TLS with PreLogin wrapping (TDS 7.x style) - requires `tls` feature
144/// - Plain TCP (for internal networks or when `tls` feature is disabled)
145#[allow(dead_code)] // Connection will be used once query execution is implemented
146enum ConnectionHandle {
147 /// TLS connection (TDS 8.0 strict mode - TLS before any TDS traffic)
148 #[cfg(feature = "tls")]
149 Tls(Connection<TlsStream<TcpStream>>),
150 /// TLS connection with PreLogin wrapping (TDS 7.x style)
151 #[cfg(feature = "tls")]
152 TlsPrelogin(Connection<TlsStream<mssql_tls::TlsPreloginWrapper<TcpStream>>>),
153 /// Plain TCP connection (for internal networks or when `tls` feature is disabled)
154 Plain(Connection<TcpStream>),
155}
156
157/// The parameter `TypeInfo` to declare a typed NULL ([`crate::null`]) with, from
158/// its [`crate::ToSql::sql_type`] name. Returns `None` for an untyped NULL
159/// (`Option::None`, type `"NULL"`), which falls back to the default param type.
160#[cfg(feature = "always-encrypted")]
161fn null_param_type_info(sql_type: &str) -> Option<tds_protocol::rpc::TypeInfo> {
162 use tds_protocol::rpc::TypeInfo;
163 Some(match sql_type {
164 "BIT" => TypeInfo::bit(),
165 "TINYINT" => TypeInfo::tinyint(),
166 "SMALLINT" => TypeInfo::smallint(),
167 "INT" => TypeInfo::int(),
168 "BIGINT" => TypeInfo::bigint(),
169 "REAL" => TypeInfo::real(),
170 "FLOAT" => TypeInfo::float(),
171 "NVARCHAR" => TypeInfo::nvarchar(1),
172 "VARBINARY" => TypeInfo::varbinary(1),
173 "UNIQUEIDENTIFIER" => TypeInfo::uuid(),
174 "DATE" => TypeInfo::date(),
175 _ => return None,
176 })
177}
178
179/// Map a typed-parameter wrapper's [`EncryptedParamType`] to the `TypeInfo` the
180/// driver declares it as (for `sp_describe_parameter_encryption` and the
181/// `CryptoMetadata` base type). Unknown future variants error rather than
182/// silently declaring the wrong type.
183#[cfg(feature = "always-encrypted")]
184fn encrypted_param_type_info(
185 ty: mssql_types::EncryptedParamType,
186) -> Result<tds_protocol::rpc::TypeInfo> {
187 use mssql_types::EncryptedParamType as E;
188 use tds_protocol::rpc::TypeInfo;
189 Ok(match ty {
190 E::Decimal { precision, scale } => TypeInfo::decimal(precision, scale),
191 E::Time { scale } => TypeInfo::time(scale),
192 E::DateTime2 { scale } => TypeInfo::datetime2(scale),
193 E::DateTimeOffset { scale } => TypeInfo::datetimeoffset(scale),
194 E::DateTime => TypeInfo::datetime(),
195 E::Char { length } => TypeInfo::char(length),
196 E::NChar { length } => TypeInfo::nchar(length),
197 E::Binary { length } => TypeInfo::binary(length),
198 _ => {
199 return Err(Error::Encryption(
200 "unsupported Always Encrypted parameter type".to_string(),
201 ));
202 }
203 })
204}
205
206// Private helper methods available to all connection states
207impl<S: ConnectionState> Client<S> {
208 /// The default per-command deadline from `command_timeout`.
209 ///
210 /// Returns `None` when `command_timeout` is zero, which means "no limit"
211 /// (matching ADO.NET's `SqlCommand.CommandTimeout = 0`).
212 pub(crate) fn command_deadline(&self) -> Option<std::time::Duration> {
213 let t = self.config.command_timeout;
214 if t.is_zero() { None } else { Some(t) }
215 }
216
217 /// Build a cancel handle for the current connection, regardless of
218 /// connection state. The public, documented surface is
219 /// [`Client::<Ready>::cancel_handle`]; both state-specific methods
220 /// delegate here.
221 pub(crate) fn connection_cancel_handle(&self) -> crate::cancel::CancelHandle {
222 let connection = self
223 .connection
224 .as_ref()
225 .expect("connection should be present");
226 match connection {
227 #[cfg(feature = "tls")]
228 ConnectionHandle::Tls(conn) => {
229 crate::cancel::CancelHandle::from_tls(conn.cancel_handle())
230 }
231 #[cfg(feature = "tls")]
232 ConnectionHandle::TlsPrelogin(conn) => {
233 crate::cancel::CancelHandle::from_tls_prelogin(conn.cancel_handle())
234 }
235 ConnectionHandle::Plain(conn) => {
236 crate::cancel::CancelHandle::from_plain(conn.cancel_handle())
237 }
238 }
239 }
240
241 /// Cancel an in-flight response that was abandoned without being drained —
242 /// e.g. a [`RowStream`](crate::RowStream) dropped or cancelled mid-result.
243 ///
244 /// Sends an Attention and drains to the server's DONE_ATTN acknowledgement so
245 /// the socket is clean and the connection reusable. A no-op when nothing is
246 /// in flight. Bounded by [`ATTENTION_ACK_TIMEOUT`]: if the acknowledgement
247 /// never arrives the connection is left marked in-flight (so the pool
248 /// discards it on return) and an error is returned.
249 pub(crate) async fn cancel_in_flight_response(&mut self) -> Result<()> {
250 if !self.in_flight {
251 return Ok(());
252 }
253 let canceller = self.connection_cancel_handle();
254 let drain = async {
255 canceller.cancel().await?;
256 // With the cancelling flag set, `read_response_message` routes through
257 // the codec's drain-after-cancel path and returns `Err(Cancelled)`
258 // once the DONE_ATTN acknowledgement is consumed (clearing
259 // `in_flight`). Any full messages that arrive before the ack are
260 // discarded.
261 loop {
262 match self.read_response_message().await {
263 Err(Error::Cancelled) => return Ok(()),
264 Ok(_) => continue,
265 Err(e) => return Err(e),
266 }
267 }
268 };
269 match tokio::time::timeout(ATTENTION_ACK_TIMEOUT, drain).await {
270 Ok(result) => result,
271 Err(_) => {
272 tracing::warn!(
273 timeout = ?ATTENTION_ACK_TIMEOUT,
274 "attention acknowledgement not received while cancelling an \
275 abandoned response; connection left dirty"
276 );
277 Err(Error::Cancelled)
278 }
279 }
280 }
281
282 /// Process transaction-related EnvChange tokens.
283 ///
284 /// This handles BeginTransaction, CommitTransaction, and RollbackTransaction
285 /// EnvChange tokens, updating the transaction descriptor accordingly.
286 ///
287 /// This enables executing BEGIN TRANSACTION, COMMIT, and ROLLBACK via raw SQL
288 /// while still having the transaction descriptor tracked correctly.
289 fn process_transaction_env_change(env: &EnvChange, transaction_descriptor: &mut u64) {
290 use tds_protocol::token::EnvChangeValue;
291
292 match env.env_type {
293 EnvChangeType::BeginTransaction => {
294 if let EnvChangeValue::Binary(ref data) = env.new_value {
295 if data.len() >= 8 {
296 let descriptor = u64::from_le_bytes([
297 data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
298 ]);
299 tracing::debug!(descriptor = descriptor, "transaction started via raw SQL");
300 *transaction_descriptor = descriptor;
301 }
302 }
303 }
304 EnvChangeType::CommitTransaction | EnvChangeType::RollbackTransaction => {
305 tracing::debug!(
306 env_type = ?env.env_type,
307 "transaction ended via raw SQL"
308 );
309 *transaction_descriptor = 0;
310 }
311 _ => {}
312 }
313 }
314
315 /// Apply a transaction-related `ENVCHANGE` to this client's descriptor.
316 ///
317 /// Lets the streaming readers (which live in sibling modules) keep the
318 /// transaction descriptor in sync with raw `BEGIN`/`COMMIT`/`ROLLBACK`
319 /// batches seen mid-stream, exactly as the buffered readers do.
320 pub(crate) fn apply_transaction_env_change(&mut self, env: &EnvChange) {
321 Self::process_transaction_env_change(env, &mut self.transaction_descriptor);
322 }
323
324 /// Send a SQL batch to the server.
325 ///
326 /// Uses the client's current transaction descriptor in ALL_HEADERS.
327 /// Per MS-TDS spec, when in an explicit transaction, the descriptor
328 /// returned by BeginTransaction must be included.
329 ///
330 /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
331 /// is included in the first packet to reset connection state.
332 async fn send_sql_batch(&mut self, sql: &str) -> Result<()> {
333 // If a previous streamed response was abandoned (a RowStream dropped
334 // mid-result), drain it before issuing a new request so the next read
335 // does not pick up the old response's bytes.
336 self.cancel_in_flight_response().await?;
337
338 let payload = tds_protocol::__private::encode_sql_batch_with_transaction(
339 sql,
340 self.transaction_descriptor,
341 );
342 let max_packet = self.config.packet_size as usize;
343
344 // Check if we need to reset the connection on this request
345 let reset = self.needs_reset;
346 if reset {
347 self.needs_reset = false; // Clear flag before sending
348 // RESETCONNECTION invalidates all server-side prepared handles, so
349 // drop the cache (no sp_unprepare needed — the server released them).
350 let _ = self.statement_cache.clear();
351 tracing::debug!("sending SQL batch with RESETCONNECTION flag");
352 }
353
354 self.in_flight = true;
355 let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
356
357 match connection {
358 #[cfg(feature = "tls")]
359 ConnectionHandle::Tls(conn) => {
360 conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
361 .await?;
362 }
363 #[cfg(feature = "tls")]
364 ConnectionHandle::TlsPrelogin(conn) => {
365 conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
366 .await?;
367 }
368 ConnectionHandle::Plain(conn) => {
369 conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
370 .await?;
371 }
372 }
373
374 Ok(())
375 }
376
377 /// Send an RPC request to the server.
378 ///
379 /// Uses the client's current transaction descriptor in ALL_HEADERS.
380 ///
381 /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
382 /// is included in the first packet to reset connection state.
383 pub(crate) async fn send_rpc(&mut self, rpc: &RpcRequest) -> Result<()> {
384 // Drain an abandoned streamed response (see `send_sql_batch`) before
385 // issuing this request.
386 self.cancel_in_flight_response().await?;
387
388 let payload = rpc.encode_with_transaction(self.transaction_descriptor);
389 let max_packet = self.config.packet_size as usize;
390
391 // Check if we need to reset the connection on this request
392 let reset = self.needs_reset;
393 if reset {
394 self.needs_reset = false; // Clear flag before sending
395 // RESETCONNECTION invalidates all server-side prepared handles, so
396 // drop the cache (no sp_unprepare needed — the server released them).
397 let _ = self.statement_cache.clear();
398 tracing::debug!("sending RPC with RESETCONNECTION flag");
399 }
400
401 self.in_flight = true;
402 let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
403
404 match connection {
405 #[cfg(feature = "tls")]
406 ConnectionHandle::Tls(conn) => {
407 conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
408 .await?;
409 }
410 #[cfg(feature = "tls")]
411 ConnectionHandle::TlsPrelogin(conn) => {
412 conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
413 .await?;
414 }
415 ConnectionHandle::Plain(conn) => {
416 conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
417 .await?;
418 }
419 }
420
421 Ok(())
422 }
423
424 /// Start building a stored procedure call with full control over parameters.
425 ///
426 /// Returns a [`crate::procedure::ProcedureBuilder`] that allows adding named input and output
427 /// parameters before executing the call.
428 ///
429 /// The procedure name is validated to prevent SQL injection. It may be
430 /// schema-qualified (e.g., `"dbo.MyProc"`).
431 ///
432 /// # Example
433 ///
434 /// ```rust,no_run
435 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
436 /// let result = client.procedure("dbo.CalculateSum")?
437 /// .input("@a", &10i32)
438 /// .input("@b", &20i32)
439 /// .output_int("@result")
440 /// .execute().await?;
441 ///
442 /// let sum = result.get_output("@result").unwrap();
443 /// # let _ = sum;
444 /// # Ok(())
445 /// # }
446 /// ```
447 pub fn procedure(
448 &mut self,
449 proc_name: &str,
450 ) -> Result<crate::procedure::ProcedureBuilder<'_, S>> {
451 crate::validation::validate_qualified_identifier(proc_name)?;
452 Ok(crate::procedure::ProcedureBuilder::new(self, proc_name))
453 }
454
455 /// Execute a stored procedure with positional input parameters.
456 ///
457 /// This is a convenience method for the common case of calling a procedure
458 /// with input-only parameters. For output parameters or named parameters,
459 /// use [`procedure()`](Client::procedure) instead.
460 ///
461 /// # Example
462 ///
463 /// ```rust,no_run
464 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
465 /// let result = client.call_procedure("dbo.GetUser", &[&1i32]).await?;
466 /// assert_eq!(result.return_value, 0);
467 ///
468 /// if let Some(rs) = result.first_result_set() {
469 /// println!("columns: {:?}", rs.columns());
470 /// }
471 /// # Ok(())
472 /// # }
473 /// ```
474 pub async fn call_procedure(
475 &mut self,
476 proc_name: &str,
477 params: &[&(dyn crate::ToSql + Sync)],
478 ) -> Result<crate::stream::ProcedureResult> {
479 crate::validation::validate_qualified_identifier(proc_name)?;
480
481 tracing::debug!(
482 proc_name = proc_name,
483 params_count = params.len(),
484 "executing stored procedure"
485 );
486
487 let rpc_params =
488 Self::convert_params_positional(params, self.send_unicode(), self.server_collation())?;
489 let mut rpc = RpcRequest::named(proc_name);
490 for param in rpc_params {
491 rpc = rpc.param(param);
492 }
493
494 #[cfg(feature = "otel")]
495 let instrumentation = self.instrumentation.clone();
496 #[cfg(feature = "otel")]
497 let mut span = instrumentation.procedure_span(proc_name);
498 #[cfg(feature = "otel")]
499 let timer = crate::instrumentation::OperationTimer::start("EXECUTE");
500
501 let deadline = self.command_deadline();
502 let canceller = self.connection_cancel_handle();
503 let result = run_with_deadline(
504 async {
505 self.send_rpc(&rpc).await?;
506 self.read_procedure_result().await
507 },
508 deadline,
509 canceller,
510 )
511 .await;
512
513 #[cfg(feature = "otel")]
514 match &result {
515 Ok(r) => InstrumentationContext::record_success(&mut span, Some(r.rows_affected)),
516 Err(e) => InstrumentationContext::record_error(&mut span, e),
517 }
518 #[cfg(feature = "otel")]
519 timer.finish(instrumentation.metrics(), result.is_ok());
520 #[cfg(feature = "otel")]
521 drop(span);
522
523 result
524 }
525
526 /// Ask the server how each parameter of a statement must be encrypted.
527 ///
528 /// Issues the `sp_describe_parameter_encryption` system RPC for the
529 /// parameterized statement `tsql` with the parameter declaration `params`
530 /// (e.g. `"@id int, @name nvarchar(64)"`), and parses the two result sets
531 /// into a [`ParameterEncryptionInfo`](crate::encryption::ParameterEncryptionInfo): the
532 /// CEK table, plus — for each parameter the server reports as encrypted —
533 /// which CEK and whether deterministic or randomized. Parameters the server
534 /// reports as plaintext are omitted.
535 ///
536 /// This is the first step of Always Encrypted parameter encryption; the
537 /// connection must have negotiated it (`Column Encryption Setting=Enabled`).
538 #[cfg(feature = "always-encrypted")]
539 pub(crate) async fn describe_parameter_encryption(
540 &mut self,
541 tsql: &str,
542 params: &str,
543 ) -> Result<crate::encryption::ParameterEncryptionInfo> {
544 let tsql_arg = tsql.to_string();
545 let params_arg = params.to_string();
546 let mut result = self
547 .call_procedure(
548 "sp_describe_parameter_encryption",
549 &[&tsql_arg, ¶ms_arg],
550 )
551 .await?;
552 crate::encryption::ParameterEncryptionInfo::from_describe_result_sets(
553 &mut result.result_sets,
554 )
555 }
556
557 /// Build the `sp_executesql` request for a parameterized statement.
558 ///
559 /// When the connection has Always Encrypted enabled, parameters the server
560 /// reports as encrypted are encrypted client-side first (an extra
561 /// `sp_describe_parameter_encryption` round-trip). Otherwise this is the
562 /// plain parameter conversion.
563 pub(crate) async fn build_parameterized_rpc(
564 &mut self,
565 sql: &str,
566 params: &[&(dyn crate::ToSql + Sync)],
567 ) -> Result<RpcRequest> {
568 #[cfg(feature = "always-encrypted")]
569 if self.encryption_context.is_some() {
570 return self.build_encrypted_sql_rpc(sql, params).await;
571 }
572 let rpc_params =
573 Self::convert_params(params, self.send_unicode(), self.server_collation())?;
574 Ok(RpcRequest::execute_sql(sql, rpc_params))
575 }
576
577 /// Send a parameterized `query` request, consulting the prepared-statement
578 /// cache when [`Config::statement_cache`](crate::Config::statement_cache)
579 /// is enabled. Leaves the execution response ready for the caller's
580 /// `read_query_response`.
581 ///
582 /// Falls back to the default path (SQL batch for no params, `sp_executesql`
583 /// otherwise) when the cache is disabled, the query has no parameters, or
584 /// Always Encrypted is active (prepared + AE parameter encryption is out of
585 /// scope for this first increment).
586 async fn send_query_request(
587 &mut self,
588 sql: &str,
589 params: &[&(dyn crate::ToSql + Sync)],
590 ) -> Result<()> {
591 #[cfg(feature = "always-encrypted")]
592 let ae_active = self.encryption_context.is_some();
593 #[cfg(not(feature = "always-encrypted"))]
594 let ae_active = false;
595
596 if !self.config.statement_cache || params.is_empty() || ae_active {
597 if params.is_empty() {
598 self.send_sql_batch(sql).await?;
599 } else {
600 let rpc = self.build_parameterized_rpc(sql, params).await?;
601 self.send_rpc(&rpc).await?;
602 }
603 return Ok(());
604 }
605
606 let rpc_params =
607 Self::convert_params(params, self.send_unicode(), self.server_collation())?;
608 // Key on the parameter declaration + SQL: a cached handle is only valid
609 // for the exact prepared parameter types, so two calls with the same
610 // SQL but different param types must not share a handle.
611 let key = format!(
612 "{}\u{1}{sql}",
613 RpcRequest::build_param_declarations(&rpc_params)
614 );
615
616 if let Some(handle) = self.statement_cache.get(&key) {
617 // Hit: sp_execute the cached handle. Clear any stale pending key
618 // (e.g. from a prior request whose read aborted) so the read path
619 // does not try to capture a handle from this response.
620 self.statement_cache.set_pending(None);
621 let rpc = RpcRequest::execute(handle, rpc_params);
622 self.send_rpc(&rpc).await?;
623 } else {
624 // Miss: sp_prepexec prepares and executes in ONE round-trip. The
625 // caller's read_query_response reads the row response and captures
626 // the `@handle` RETURNVALUE, then stores it under `key` (see
627 // `store_pending_prepared_handle`).
628 self.statement_cache.set_pending(Some(key));
629 let rpc = RpcRequest::prepexec(sql, rpc_params);
630 self.send_rpc(&rpc).await?;
631 }
632 Ok(())
633 }
634
635 /// Store the handle captured from an `sp_prepexec` execution response under
636 /// the pending cache key (set by [`send_query_request`](Self::send_query_request)
637 /// on a cold miss), releasing any LRU-evicted server-side handle.
638 ///
639 /// Called by `read_query_response` after it has read the row response and
640 /// the trailing `@handle` RETURNVALUE. A no-op when no prepexec is pending.
641 /// Eviction `sp_unprepare` is best-effort: a failure leaks one handle until
642 /// connection reset, never corrupts data.
643 pub(super) async fn store_pending_prepared_handle(
644 &mut self,
645 handle: Option<i32>,
646 ) -> Result<()> {
647 let Some(key) = self.statement_cache.take_pending() else {
648 return Ok(());
649 };
650 let Some(handle) = handle else {
651 // No @handle came back (unexpected for sp_prepexec): leave the
652 // statement uncached so the next call simply re-prepares.
653 return Ok(());
654 };
655 if let Some(evicted) = self
656 .statement_cache
657 .insert(crate::statement_cache::PreparedStatement::new(handle, key))
658 {
659 let unprepare = RpcRequest::unprepare(evicted.handle());
660 self.send_rpc(&unprepare).await?;
661 let _ = self.read_procedure_result().await?;
662 }
663 Ok(())
664 }
665
666 /// Encrypt the Always Encrypted parameters of a statement, then build its
667 /// `sp_executesql` request.
668 ///
669 /// Asks the server which parameters are encrypted
670 /// ([`describe_parameter_encryption`](Self::describe_parameter_encryption)),
671 /// then for each one normalizes the value, resolves its column encryption
672 /// key, encrypts, and emits an encrypted RPC parameter. Parameters the
673 /// server reports as plaintext are sent unchanged.
674 #[cfg(feature = "always-encrypted")]
675 async fn build_encrypted_sql_rpc(
676 &mut self,
677 sql: &str,
678 params: &[&(dyn crate::ToSql + Sync)],
679 ) -> Result<RpcRequest> {
680 use tds_protocol::rpc::RpcParam;
681
682 let Some(ctx) = self.encryption_context.clone() else {
683 let rpc_params =
684 Self::convert_params(params, self.send_unicode(), self.server_collation())?;
685 return Ok(RpcRequest::execute_sql(sql, rpc_params));
686 };
687
688 // Resolve each parameter's value once (AE normalization needs the typed
689 // value, not the wire encoding) and build the plaintext RPC params.
690 let send_unicode = self.send_unicode();
691 let collation = self.server_collation().cloned();
692 let mut values: Vec<mssql_types::SqlValue> = Vec::with_capacity(params.len());
693 let mut plaintext: Vec<RpcParam> = Vec::with_capacity(params.len());
694 let mut hints: Vec<Option<mssql_types::EncryptedParamType>> =
695 Vec::with_capacity(params.len());
696 for (i, p) in params.iter().enumerate() {
697 let name = format!("@p{}", i + 1);
698 let value = p.to_sql()?;
699 let hint = p.encrypted_param_type();
700 // A typed NULL (e.g. `null::<i32>()`) is declared by its SQL type so
701 // describe accepts it against the target encrypted column; an untyped
702 // NULL falls back to the default in `sql_value_to_rpc_param`.
703 let rpc_param = match (&value, null_param_type_info(p.sql_type())) {
704 (mssql_types::SqlValue::Null, Some(type_info)) => RpcParam::null(&name, type_info),
705 _ => {
706 let mut param = Self::sql_value_to_rpc_param(
707 &name,
708 &value,
709 send_unicode,
710 collation.as_ref(),
711 )?;
712 // A typed-parameter wrapper (e.g. `numeric(v, p, s)`,
713 // `datetime2(v, scale)`) declares an explicit SQL type so
714 // describe matches the encrypted column exactly — the value
715 // alone cannot convey precision/scale or the legacy-`datetime`
716 // vs `datetime2` distinction.
717 if let Some(ty) = hint {
718 param.type_info = encrypted_param_type_info(ty)?;
719 }
720 param
721 }
722 };
723 plaintext.push(rpc_param);
724 values.push(value);
725 hints.push(hint);
726 }
727
728 if plaintext.is_empty() {
729 return Ok(RpcRequest::execute_sql(sql, plaintext));
730 }
731
732 // Ask the server which parameters need encryption.
733 let declarations = RpcRequest::build_param_declarations(&plaintext);
734 let info = self
735 .describe_parameter_encryption(sql, &declarations)
736 .await?;
737 if info.parameters.is_empty() {
738 return Ok(RpcRequest::execute_sql(sql, plaintext));
739 }
740
741 // Encrypt the flagged parameters; pass the rest through untouched.
742 let mut final_params: Vec<RpcParam> = Vec::with_capacity(plaintext.len());
743 for ((value, param), hint) in values.into_iter().zip(plaintext).zip(hints) {
744 let Some(crypto) = info.get_parameter(¶m.name) else {
745 final_params.push(param);
746 continue;
747 };
748 let entry = info.cek_table.get(crypto.cek_ordinal).ok_or_else(|| {
749 Error::Protocol(format!(
750 "encrypted parameter {} references missing CEK ordinal {}",
751 param.name, crypto.cek_ordinal
752 ))
753 })?;
754 let metadata = tds_protocol::rpc::EncryptedParamMetadata {
755 base_type_info: param.type_info.clone(),
756 algorithm_id: crypto.algorithm_id,
757 encryption_type: crypto.encryption_type,
758 database_id: entry.database_id,
759 cek_id: entry.cek_id,
760 cek_version: entry.cek_version,
761 cek_md_version: entry.cek_md_version,
762 normalization_rule_version: crypto.normalization_rule_version,
763 };
764 // A NULL value bound to an encrypted column is sent as an encrypted
765 // NULL (the server rejects a plaintext parameter for an encrypted
766 // column); there is nothing to encrypt.
767 if matches!(value, mssql_types::SqlValue::Null) {
768 final_params.push(RpcParam::encrypted_null(param.name, metadata));
769 continue;
770 }
771 let normalized = crate::encryption::normalize_for_encryption(&value, hint)?;
772 let ciphertext = ctx
773 .encrypt_value(&normalized, entry, crypto.encryption_type)
774 .await?;
775 final_params.push(RpcParam::encrypted(
776 param.name,
777 bytes::Bytes::from(ciphertext),
778 metadata,
779 ));
780 }
781
782 Ok(RpcRequest::execute_sql(sql, final_params))
783 }
784
785 /// Start a bulk insert operation for the specified table.
786 ///
787 /// Sends the `INSERT BULK` statement to the server and returns a
788 /// [`crate::bulk::BulkWriter`] for streaming rows. The writer holds
789 /// a mutable borrow on the client, preventing other operations while
790 /// the bulk insert is in progress.
791 ///
792 /// # Example
793 ///
794 /// ```rust,no_run
795 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
796 /// use mssql_client::{BulkInsertBuilder, BulkColumn, SqlValue};
797 ///
798 /// let builder = BulkInsertBuilder::new("dbo.Users")
799 /// .with_typed_columns(vec![
800 /// BulkColumn::new("id", "INT", 0)?,
801 /// BulkColumn::new("name", "NVARCHAR(100)", 1)?,
802 /// ]);
803 ///
804 /// let mut writer = client.bulk_insert(&builder).await?;
805 /// writer.send_row_values(&[SqlValue::Int(1), SqlValue::String("Alice".into())])?;
806 /// writer.send_row_values(&[SqlValue::Int(2), SqlValue::String("Bob".into())])?;
807 /// let result = writer.finish().await?;
808 /// println!("Inserted {} rows", result.rows_affected);
809 /// # Ok(())
810 /// # }
811 /// ```
812 pub async fn bulk_insert(
813 &mut self,
814 builder: &crate::bulk::BulkInsertBuilder,
815 ) -> Result<crate::bulk::BulkWriter<'_, S>> {
816 use tds_protocol::token::{ColMetaData, Token};
817
818 tracing::debug!(
819 table = builder.table_name(),
820 columns = builder.columns().len(),
821 "starting bulk insert"
822 );
823
824 // Step 1: Query the server for column metadata.
825 // This gives us the exact type encoding the server expects for BulkLoad,
826 // following the pattern established by Tiberius.
827 let meta_query = format!("SELECT TOP 0 * FROM {}", builder.table_name());
828 let deadline = self.command_deadline();
829 let canceller = self.connection_cancel_handle();
830 let message = run_with_deadline(
831 async {
832 self.send_sql_batch(&meta_query).await?;
833 self.read_response_message().await
834 },
835 deadline,
836 canceller,
837 )
838 .await?;
839 self.in_flight = false;
840
841 // Capture both the raw COLMETADATA bytes and parsed column info
842 let raw_payload = message.payload.clone();
843 let mut parser = self.create_parser(message.payload);
844 let mut server_metadata: Option<ColMetaData> = None;
845 let mut meta_start: usize = 0;
846 let mut meta_end: usize = 0;
847
848 loop {
849 let pos_before = raw_payload.len() - parser.remaining();
850 let token = parser.next_token_with_metadata(server_metadata.as_ref())?;
851 let pos_after = raw_payload.len() - parser.remaining();
852 let Some(token) = token else { break };
853
854 match token {
855 Token::ColMetaData(meta) => {
856 meta_start = pos_before;
857 meta_end = pos_after;
858 server_metadata = Some(meta);
859 }
860 Token::Done(_) => break,
861 _ => {}
862 }
863 }
864
865 // Reject deprecated TEXT/NTEXT/IMAGE columns reported by the server.
866 // These types require a legacy TEXTPTR wire format that this driver
867 // does not support — users should migrate the column to VARCHAR(MAX) /
868 // NVARCHAR(MAX) / VARBINARY(MAX).
869 if let Some(ref meta) = server_metadata {
870 use tds_protocol::types::TypeId;
871 for col in meta.columns.iter() {
872 let (rejected, replacement) = match col.type_id {
873 TypeId::Text => (Some("TEXT"), "VARCHAR(MAX)"),
874 TypeId::NText => (Some("NTEXT"), "NVARCHAR(MAX)"),
875 TypeId::Image => (Some("IMAGE"), "VARBINARY(MAX)"),
876 _ => (None, ""),
877 };
878 if let Some(sql_type) = rejected {
879 return Err(Error::from(mssql_types::TypeError::UnsupportedType {
880 sql_type: sql_type.to_string(),
881 reason: format!(
882 "column `{}` in table `{}` is {} — TEXT/NTEXT/IMAGE \
883 are not supported. Alter the column to {} instead \
884 (Microsoft deprecated TEXT/NTEXT/IMAGE in SQL \
885 Server 2005).",
886 col.name,
887 builder.table_name(),
888 sql_type,
889 replacement,
890 ),
891 }));
892 }
893 }
894 }
895
896 // Step 2: Send INSERT BULK statement to put server in bulk load mode
897 let stmt = builder.build_insert_bulk_statement()?;
898 let deadline = self.command_deadline();
899 let canceller = self.connection_cancel_handle();
900 run_with_deadline(
901 async {
902 self.send_sql_batch(&stmt).await?;
903 self.read_execute_result().await
904 },
905 deadline,
906 canceller,
907 )
908 .await?;
909
910 // Step 3: Create bulk writer with server's metadata
911 let raw_meta = if meta_end > meta_start {
912 Some(raw_payload.slice(meta_start..meta_end))
913 } else {
914 None
915 };
916
917 let server_cols = server_metadata.as_ref().map(|m| m.columns.as_slice());
918 let bulk = crate::bulk::BulkInsert::new_with_server_metadata(
919 builder.columns().to_vec(),
920 builder.options().batch_size,
921 raw_meta,
922 server_cols,
923 );
924
925 Ok(crate::bulk::BulkWriter::new(self, bulk))
926 }
927
928 /// Start a bulk insert without querying the server for column metadata.
929 ///
930 /// Unlike [`bulk_insert()`](Self::bulk_insert), this method does not send
931 /// `SELECT TOP 0 * FROM table` to discover column types. Instead, the
932 /// column metadata is constructed from the `BulkColumn` types provided
933 /// on the builder. This saves a round-trip when the schema is known.
934 ///
935 /// # Caveats
936 ///
937 /// The caller must ensure `BulkColumn` entries match the target table's
938 /// column definitions exactly. Mismatched types, lengths, precision/scale,
939 /// or column ordering will cause the server to reject the BulkLoad packet.
940 ///
941 /// For most use cases, prefer [`bulk_insert()`](Self::bulk_insert) — the
942 /// extra round-trip is usually negligible and the server-supplied metadata
943 /// is guaranteed correct.
944 pub async fn bulk_insert_without_schema_discovery(
945 &mut self,
946 builder: &crate::bulk::BulkInsertBuilder,
947 ) -> Result<crate::bulk::BulkWriter<'_, S>> {
948 tracing::debug!(
949 table = builder.table_name(),
950 columns = builder.columns().len(),
951 "starting bulk insert (no schema discovery)"
952 );
953
954 // Send INSERT BULK statement to put server in bulk load mode
955 let stmt = builder.build_insert_bulk_statement()?;
956 let deadline = self.command_deadline();
957 let canceller = self.connection_cancel_handle();
958 run_with_deadline(
959 async {
960 self.send_sql_batch(&stmt).await?;
961 self.read_execute_result().await
962 },
963 deadline,
964 canceller,
965 )
966 .await?;
967
968 // Create bulk writer with hand-crafted metadata
969 let bulk =
970 crate::bulk::BulkInsert::new(builder.columns().to_vec(), builder.options().batch_size);
971
972 Ok(crate::bulk::BulkWriter::new(self, bulk))
973 }
974
975 /// Send bulk load data as a BulkLoad (0x07) message and read the server response.
976 ///
977 /// Used internally by [`crate::bulk::BulkWriter::finish()`] to transmit accumulated
978 /// row data after the `INSERT BULK` statement has been acknowledged.
979 pub(crate) async fn send_and_read_bulk_load(&mut self, payload: bytes::Bytes) -> Result<u64> {
980 let max_packet = self.config.packet_size as usize;
981
982 self.in_flight = true;
983 let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
984
985 match connection {
986 #[cfg(feature = "tls")]
987 ConnectionHandle::Tls(conn) => {
988 conn.send_message(PacketType::BulkLoad, payload, max_packet)
989 .await?;
990 }
991 #[cfg(feature = "tls")]
992 ConnectionHandle::TlsPrelogin(conn) => {
993 conn.send_message(PacketType::BulkLoad, payload, max_packet)
994 .await?;
995 }
996 ConnectionHandle::Plain(conn) => {
997 conn.send_message(PacketType::BulkLoad, payload, max_packet)
998 .await?;
999 }
1000 }
1001
1002 // Read the server's Done response with row count
1003 self.read_execute_result().await
1004 }
1005
1006 /// Execute a query with named parameters and return a streaming result set.
1007 ///
1008 /// This method accepts [`NamedParam`](crate::to_params::NamedParam) values,
1009 /// making it compatible with the [`ToParams`](crate::to_params::ToParams) trait
1010 /// and the `#[derive(ToParams)]` macro.
1011 ///
1012 /// # Example
1013 ///
1014 /// ```rust,no_run
1015 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1016 /// use mssql_client::{NamedParam, ToParams};
1017 ///
1018 /// // With derive macro:
1019 /// #[derive(mssql_derive::ToParams)]
1020 /// struct UserQuery { name: String }
1021 ///
1022 /// let q = UserQuery { name: "Alice".into() };
1023 /// let rows = client.query_named(
1024 /// "SELECT * FROM users WHERE name = @name",
1025 /// &q.to_params()?,
1026 /// ).await?;
1027 ///
1028 /// // Or manually:
1029 /// let params = vec![NamedParam::from_value("name", &"Alice")?];
1030 /// let rows = client.query_named(
1031 /// "SELECT * FROM users WHERE name = @name",
1032 /// ¶ms,
1033 /// ).await?;
1034 /// # let _ = rows;
1035 /// # Ok(())
1036 /// # }
1037 /// ```
1038 pub async fn query_named<'a>(
1039 &'a mut self,
1040 sql: &str,
1041 params: &[crate::to_params::NamedParam],
1042 ) -> Result<QueryStream<'a>> {
1043 tracing::debug!(
1044 sql = sql,
1045 params_count = params.len(),
1046 "executing query with named parameters"
1047 );
1048
1049 #[cfg(feature = "otel")]
1050 let instrumentation = self.instrumentation.clone();
1051 #[cfg(feature = "otel")]
1052 let mut span = instrumentation.query_span(sql);
1053 #[cfg(feature = "otel")]
1054 let timer = crate::instrumentation::OperationTimer::start(
1055 crate::instrumentation::extract_operation(sql),
1056 );
1057
1058 let result = async {
1059 if params.is_empty() {
1060 self.send_sql_batch(sql).await?;
1061 } else {
1062 let rpc_params = Self::convert_named_params(
1063 params,
1064 self.send_unicode(),
1065 self.server_collation(),
1066 )?;
1067 let rpc = RpcRequest::execute_sql(sql, rpc_params);
1068 self.send_rpc(&rpc).await?;
1069 }
1070
1071 self.read_query_response().await
1072 }
1073 .await;
1074
1075 #[cfg(feature = "otel")]
1076 match &result {
1077 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1078 Err(e) => InstrumentationContext::record_error(&mut span, e),
1079 }
1080 #[cfg(feature = "otel")]
1081 timer.finish(instrumentation.metrics(), result.is_ok());
1082 #[cfg(feature = "otel")]
1083 drop(span);
1084
1085 let resp = result?;
1086 #[cfg(feature = "always-encrypted")]
1087 {
1088 Ok(QueryStream::from_raw(
1089 resp.columns,
1090 resp.pending_rows,
1091 resp.meta,
1092 resp.decryptor,
1093 ))
1094 }
1095 #[cfg(not(feature = "always-encrypted"))]
1096 {
1097 Ok(QueryStream::from_raw(
1098 resp.columns,
1099 resp.pending_rows,
1100 resp.meta,
1101 ))
1102 }
1103 }
1104
1105 /// Execute a statement with named parameters.
1106 ///
1107 /// Returns the number of affected rows. This is the named-parameter
1108 /// counterpart of [`execute()`](Client::execute), compatible with the
1109 /// [`ToParams`](crate::to_params::ToParams) trait.
1110 ///
1111 /// # Example
1112 ///
1113 /// ```rust,no_run
1114 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1115 /// use mssql_client::NamedParam;
1116 ///
1117 /// let params = vec![
1118 /// NamedParam::from_value("name", &"Alice")?,
1119 /// NamedParam::from_value("email", &"alice@example.com")?,
1120 /// ];
1121 /// let rows_affected = client.execute_named(
1122 /// "INSERT INTO users (name, email) VALUES (@name, @email)",
1123 /// ¶ms,
1124 /// ).await?;
1125 /// # let _ = rows_affected;
1126 /// # Ok(())
1127 /// # }
1128 /// ```
1129 pub async fn execute_named(
1130 &mut self,
1131 sql: &str,
1132 params: &[crate::to_params::NamedParam],
1133 ) -> Result<u64> {
1134 tracing::debug!(
1135 sql = sql,
1136 params_count = params.len(),
1137 "executing statement with named parameters"
1138 );
1139
1140 #[cfg(feature = "otel")]
1141 let instrumentation = self.instrumentation.clone();
1142 #[cfg(feature = "otel")]
1143 let mut span = instrumentation.query_span(sql);
1144 #[cfg(feature = "otel")]
1145 let timer = crate::instrumentation::OperationTimer::start(
1146 crate::instrumentation::extract_operation(sql),
1147 );
1148
1149 let deadline = self.command_deadline();
1150 let canceller = self.connection_cancel_handle();
1151 let result = run_with_deadline(
1152 async {
1153 if params.is_empty() {
1154 self.send_sql_batch(sql).await?;
1155 } else {
1156 let rpc_params = Self::convert_named_params(
1157 params,
1158 self.send_unicode(),
1159 self.server_collation(),
1160 )?;
1161 let rpc = RpcRequest::execute_sql(sql, rpc_params);
1162 self.send_rpc(&rpc).await?;
1163 }
1164
1165 self.read_execute_result().await
1166 },
1167 deadline,
1168 canceller,
1169 )
1170 .await;
1171
1172 #[cfg(feature = "otel")]
1173 match &result {
1174 Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
1175 Err(e) => InstrumentationContext::record_error(&mut span, e),
1176 }
1177 #[cfg(feature = "otel")]
1178 timer.finish(instrumentation.metrics(), result.is_ok());
1179 #[cfg(feature = "otel")]
1180 drop(span);
1181
1182 result
1183 }
1184
1185 /// The connection's OpenTelemetry instrumentation context.
1186 #[cfg(feature = "otel")]
1187 pub(crate) fn instrumentation(&self) -> &InstrumentationContext {
1188 &self.instrumentation
1189 }
1190
1191 /// Snapshot this connection's prepared-statement cache statistics.
1192 ///
1193 /// Reflects activity since the connection was established (or its last
1194 /// reset). Meaningful only when
1195 /// [`Config::statement_cache`](crate::Config::statement_cache) is enabled;
1196 /// otherwise the cache is never consulted and all counts stay zero.
1197 #[must_use]
1198 pub fn statement_cache_stats(&self) -> crate::StatementCacheStats {
1199 self.statement_cache.stats()
1200 }
1201
1202 /// Whether string parameters are sent as NVARCHAR (Unicode).
1203 pub(crate) fn send_unicode(&self) -> bool {
1204 self.config.send_string_parameters_as_unicode
1205 }
1206
1207 /// Server's default collation, captured from ENVCHANGE during login.
1208 pub(crate) fn server_collation(&self) -> Option<&tds_protocol::token::Collation> {
1209 self.server_collation.as_ref()
1210 }
1211
1212 /// Shared implementation behind `query_stream` for both `Ready` and
1213 /// `InTransaction`. Sends the request, then pulls packets until the first
1214 /// result set's `ColMetaData` (resolving columns and any Always Encrypted
1215 /// decryptor up front) before handing back a [`RowStream`].
1216 pub(crate) async fn query_stream_inner<'a>(
1217 &'a mut self,
1218 sql: &str,
1219 params: &[&(dyn crate::ToSql + Sync)],
1220 ) -> Result<crate::row_stream::RowStream<'a, S>> {
1221 use crate::client::response::server_token_to_error;
1222 use crate::row_source::{Pull, RowSource};
1223 use tds_protocol::token::Token;
1224
1225 tracing::debug!(sql = sql, params_count = params.len(), "streaming query");
1226
1227 // Send the request (same wire format as the buffered path).
1228 if params.is_empty() {
1229 self.send_sql_batch(sql).await?;
1230 } else {
1231 let rpc = self.build_parameterized_rpc(sql, params).await?;
1232 self.send_rpc(&rpc).await?;
1233 }
1234 self.in_flight = true;
1235
1236 #[cfg(feature = "always-encrypted")]
1237 let encryption_enabled = self.encryption_context.is_some();
1238 #[cfg(not(feature = "always-encrypted"))]
1239 let encryption_enabled = false;
1240
1241 let mut source = RowSource::new(encryption_enabled);
1242
1243 // Prelude: pull packets until the first result set's ColMetaData (so the
1244 // columns and any Always Encrypted decryptor are resolved up front), or
1245 // until a terminal Done/Error if there is no result set.
1246 loop {
1247 match source.pull()? {
1248 Pull::Token(Token::ColMetaData(meta)) => {
1249 let columns = Self::build_columns(&meta);
1250 #[cfg(feature = "always-encrypted")]
1251 let decryptor = self
1252 .resolve_decryptor(&meta)
1253 .await?
1254 .map(std::sync::Arc::new);
1255 return Ok(crate::row_stream::RowStream::new(
1256 self,
1257 source,
1258 columns,
1259 meta,
1260 #[cfg(feature = "always-encrypted")]
1261 decryptor,
1262 ));
1263 }
1264 Pull::Token(Token::Error(err)) => {
1265 self.in_flight = false;
1266 return Err(server_token_to_error(&err));
1267 }
1268 Pull::Token(Token::Done(done)) => {
1269 if done.status.error {
1270 self.in_flight = false;
1271 return Err(Error::Query(
1272 "query failed (server set error flag in DONE token)".to_string(),
1273 ));
1274 }
1275 if !done.status.more {
1276 // No result set (e.g. an INSERT) — an empty stream.
1277 self.in_flight = false;
1278 return Ok(crate::row_stream::RowStream::empty(self));
1279 }
1280 // More results may follow; keep looking for ColMetaData.
1281 }
1282 Pull::Token(Token::EnvChange(env)) => {
1283 Self::process_transaction_env_change(&env, &mut self.transaction_descriptor);
1284 }
1285 Pull::Token(_) => {
1286 // Info / Order / DoneProc / DoneInProc, etc. — keep pulling.
1287 }
1288 Pull::NeedMore => match self.read_response_packet().await? {
1289 Some((payload, is_eom)) => source.push_packet(payload, is_eom),
1290 None => {
1291 self.in_flight = false;
1292 return Err(Error::ConnectionClosed);
1293 }
1294 },
1295 Pull::End => {
1296 self.in_flight = false;
1297 return Ok(crate::row_stream::RowStream::empty(self));
1298 }
1299 }
1300 }
1301 }
1302
1303 /// Shared implementation behind `query_stream_blob` for both `Ready` and
1304 /// `InTransaction`.
1305 pub(crate) async fn query_stream_blob_inner<'a>(
1306 &'a mut self,
1307 sql: &str,
1308 params: &[&(dyn crate::ToSql + Sync)],
1309 ) -> Result<crate::blob_stream::BlobStream<'a, S>> {
1310 let (meta, buf, eom, encryption_enabled) = self.open_blob_stream(sql, params).await?;
1311 let first_blob = Self::validate_blob_result_set(&meta)?;
1312 Ok(crate::blob_stream::BlobStream::new(
1313 self,
1314 buf,
1315 eom,
1316 encryption_enabled,
1317 meta,
1318 first_blob,
1319 // Single trailing MAX column; auto-position it so the existing
1320 // `next` → `copy_blob_to` flow works without an explicit `next_blob`.
1321 1,
1322 true,
1323 ))
1324 }
1325
1326 /// Shared implementation behind `query_stream_rows` for both `Ready` and
1327 /// `InTransaction`.
1328 pub(crate) async fn query_stream_rows_inner<'a>(
1329 &'a mut self,
1330 sql: &str,
1331 params: &[&(dyn crate::ToSql + Sync)],
1332 ) -> Result<crate::blob_stream::BlobStream<'a, S>> {
1333 let (meta, buf, eom, encryption_enabled) = self.open_blob_stream(sql, params).await?;
1334 let (first_blob, blob_count) = Self::validate_blob_rows_result_set(&meta)?;
1335 Ok(crate::blob_stream::BlobStream::new(
1336 self,
1337 buf,
1338 eom,
1339 encryption_enabled,
1340 meta,
1341 first_blob,
1342 blob_count,
1343 // Caller drives blobs explicitly via `next_blob`.
1344 false,
1345 ))
1346 }
1347
1348 /// Send the query and pull tokens until the first `ColMetaData`, returning
1349 /// the result-set metadata plus the unconsumed post-metadata wire bytes.
1350 /// Shared by the single-blob and multi-blob streaming paths.
1351 async fn open_blob_stream(
1352 &mut self,
1353 sql: &str,
1354 params: &[&(dyn crate::ToSql + Sync)],
1355 ) -> Result<(tds_protocol::token::ColMetaData, bytes::Bytes, bool, bool)> {
1356 use crate::client::response::server_token_to_error;
1357 use crate::row_source::{Pull, RowSource};
1358 use tds_protocol::token::Token;
1359
1360 if params.is_empty() {
1361 self.send_sql_batch(sql).await?;
1362 } else {
1363 let rpc = self.build_parameterized_rpc(sql, params).await?;
1364 self.send_rpc(&rpc).await?;
1365 }
1366 self.in_flight = true;
1367
1368 #[cfg(feature = "always-encrypted")]
1369 let encryption_enabled = self.encryption_context.is_some();
1370 #[cfg(not(feature = "always-encrypted"))]
1371 let encryption_enabled = false;
1372
1373 let mut source = RowSource::new(encryption_enabled);
1374
1375 loop {
1376 match source.pull()? {
1377 Pull::Token(Token::ColMetaData(meta)) => {
1378 let (buf, eom) = source.into_parts();
1379 return Ok((meta, buf, eom, encryption_enabled));
1380 }
1381 Pull::Token(Token::Error(err)) => {
1382 self.in_flight = false;
1383 return Err(server_token_to_error(&err));
1384 }
1385 Pull::Token(Token::Done(_)) => {
1386 self.in_flight = false;
1387 return Err(Error::Protocol(
1388 "blob streaming: query produced no result set".to_string(),
1389 ));
1390 }
1391 Pull::Token(_) => {}
1392 Pull::NeedMore => match self.read_response_packet().await? {
1393 Some((payload, is_eom)) => source.push_packet(payload, is_eom),
1394 None => {
1395 self.in_flight = false;
1396 return Err(Error::ConnectionClosed);
1397 }
1398 },
1399 Pull::End => {
1400 self.in_flight = false;
1401 return Err(Error::Protocol(
1402 "blob streaming: query produced no result set".to_string(),
1403 ));
1404 }
1405 }
1406 }
1407 }
1408
1409 /// Validate that a result set is shaped for [`query_stream_blob`] and return
1410 /// the index of its single trailing MAX column.
1411 fn validate_blob_result_set(meta: &tds_protocol::token::ColMetaData) -> Result<usize> {
1412 if meta.cek_table.is_some() {
1413 return Err(Error::Protocol(
1414 "query_stream_blob does not support Always Encrypted result sets".to_string(),
1415 ));
1416 }
1417 let max_cols: Vec<usize> = meta
1418 .columns
1419 .iter()
1420 .enumerate()
1421 .filter(|(_, c)| crate::blob_stream::is_plp_max(c))
1422 .map(|(i, _)| i)
1423 .collect();
1424 match max_cols.as_slice() {
1425 [] => Err(Error::Protocol(
1426 "query_stream_blob: result set has no MAX column — use query_stream".to_string(),
1427 )),
1428 [idx] if *idx == meta.columns.len() - 1 => Ok(*idx),
1429 [_] => Err(Error::Protocol(
1430 "query_stream_blob: the MAX column must be the last column".to_string(),
1431 )),
1432 _ => Err(Error::Protocol(
1433 "query_stream_blob: result set has more than one MAX column".to_string(),
1434 )),
1435 }
1436 }
1437
1438 /// Validate that a result set is shaped for [`query_stream_rows`] and return
1439 /// `(first_blob_index, blob_count)` — the start and length of the trailing
1440 /// run of MAX columns.
1441 ///
1442 /// Requires at least one MAX column and that every MAX column be trailing
1443 /// (no scalar column may follow a MAX column). The interleaved case (a
1444 /// scalar column after a MAX column) is rejected — supporting it needs a
1445 /// resumable per-column decoder (tracked in #258).
1446 fn validate_blob_rows_result_set(
1447 meta: &tds_protocol::token::ColMetaData,
1448 ) -> Result<(usize, usize)> {
1449 if meta.cek_table.is_some() {
1450 return Err(Error::Protocol(
1451 "query_stream_rows does not support Always Encrypted result sets".to_string(),
1452 ));
1453 }
1454 let first_blob = meta
1455 .columns
1456 .iter()
1457 .position(crate::blob_stream::is_plp_max)
1458 .ok_or_else(|| {
1459 Error::Protocol(
1460 "query_stream_rows: result set has no MAX column — use query_stream"
1461 .to_string(),
1462 )
1463 })?;
1464 // Every column from the first MAX column onward must itself be a MAX
1465 // column; a scalar column after a blob cannot be decoded until the blob
1466 // is consumed.
1467 if !meta.columns[first_blob..]
1468 .iter()
1469 .all(crate::blob_stream::is_plp_max)
1470 {
1471 return Err(Error::Protocol(
1472 "query_stream_rows: a non-MAX column follows a MAX column; interleaved MAX \
1473 columns are not supported (the MAX columns must be trailing)"
1474 .to_string(),
1475 ));
1476 }
1477 Ok((first_blob, meta.columns.len() - first_blob))
1478 }
1479}
1480
1481impl Client<Ready> {
1482 /// Mark this connection as needing a reset on next use.
1483 ///
1484 /// Called by the connection pool when a connection is returned.
1485 /// The next SQL batch or RPC will include the RESETCONNECTION flag
1486 /// in the TDS packet header, causing SQL Server to reset connection
1487 /// state (temp tables, SET options, transaction isolation level, etc.)
1488 /// before executing the command.
1489 ///
1490 /// This is more efficient than calling `sp_reset_connection` as a
1491 /// separate command because it's handled at the TDS protocol level.
1492 pub fn mark_needs_reset(&mut self) {
1493 self.needs_reset = true;
1494 }
1495
1496 /// Check if this connection needs a reset.
1497 ///
1498 /// Returns true if `mark_needs_reset()` was called and the reset
1499 /// hasn't been performed yet.
1500 #[must_use]
1501 pub fn needs_reset(&self) -> bool {
1502 self.needs_reset
1503 }
1504
1505 /// Execute a query and return a result set with lazy per-row decoding.
1506 ///
1507 /// Per ADR-007 the full response is buffered in memory and each row is
1508 /// *decoded* on demand as you iterate — this is not incremental network
1509 /// streaming, so peak memory tracks the response size. Use
1510 /// `.collect_all()` if you want all rows materialized into a `Vec` up
1511 /// front.
1512 ///
1513 /// # Example
1514 ///
1515 /// ```rust,no_run
1516 /// # use mssql_client::Row;
1517 /// # fn process(_: &Row) {}
1518 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1519 /// // Streaming (synchronous iteration over the result set)
1520 /// let stream = client.query("SELECT * FROM users WHERE id = @p1", &[&1]).await?;
1521 /// for row in stream {
1522 /// let row = row?;
1523 /// process(&row);
1524 /// }
1525 ///
1526 /// // Buffered (loads all into memory)
1527 /// let rows: Vec<Row> = client
1528 /// .query("SELECT * FROM small_table", &[])
1529 /// .await?
1530 /// .collect_all()
1531 /// .await?;
1532 /// # let _ = rows;
1533 /// # Ok(())
1534 /// # }
1535 /// ```
1536 pub async fn query<'a>(
1537 &'a mut self,
1538 sql: &str,
1539 params: &[&(dyn crate::ToSql + Sync)],
1540 ) -> Result<QueryStream<'a>> {
1541 let deadline = self.command_deadline();
1542 self.query_inner(sql, params, deadline).await
1543 }
1544
1545 /// Shared query implementation with an explicit command deadline.
1546 async fn query_inner<'a>(
1547 &'a mut self,
1548 sql: &str,
1549 params: &[&(dyn crate::ToSql + Sync)],
1550 deadline: Option<std::time::Duration>,
1551 ) -> Result<QueryStream<'a>> {
1552 tracing::debug!(sql = sql, params_count = params.len(), "executing query");
1553
1554 #[cfg(feature = "otel")]
1555 let instrumentation = self.instrumentation.clone();
1556 #[cfg(feature = "otel")]
1557 let mut span = instrumentation.query_span(sql);
1558 #[cfg(feature = "otel")]
1559 let timer = crate::instrumentation::OperationTimer::start(
1560 crate::instrumentation::extract_operation(sql),
1561 );
1562
1563 let canceller = self.cancel_handle();
1564 let result = run_with_deadline(
1565 async {
1566 // Sends via the prepared-statement cache when enabled, else the
1567 // SQL batch / sp_executesql default.
1568 self.send_query_request(sql, params).await?;
1569
1570 // Read complete response including columns and rows
1571 self.read_query_response().await
1572 },
1573 deadline,
1574 canceller,
1575 )
1576 .await;
1577
1578 #[cfg(feature = "otel")]
1579 match &result {
1580 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1581 Err(e) => InstrumentationContext::record_error(&mut span, e),
1582 }
1583 #[cfg(feature = "otel")]
1584 timer.finish(instrumentation.metrics(), result.is_ok());
1585
1586 // Drop the span before returning
1587 #[cfg(feature = "otel")]
1588 drop(span);
1589
1590 let resp = result?;
1591 #[cfg(feature = "always-encrypted")]
1592 {
1593 Ok(QueryStream::from_raw(
1594 resp.columns,
1595 resp.pending_rows,
1596 resp.meta,
1597 resp.decryptor,
1598 ))
1599 }
1600 #[cfg(not(feature = "always-encrypted"))]
1601 {
1602 Ok(QueryStream::from_raw(
1603 resp.columns,
1604 resp.pending_rows,
1605 resp.meta,
1606 ))
1607 }
1608 }
1609
1610 /// Execute a query and stream rows incrementally from the network.
1611 ///
1612 /// Unlike [`query`](Self::query) — which buffers the whole response in
1613 /// memory before returning — this reads TDS packets on demand as rows are
1614 /// pulled, so peak memory is roughly one packet plus one row regardless of
1615 /// result-set size. Use it for large result sets; use [`query`](Self::query)
1616 /// for the common small-result case where the buffered, synchronously
1617 /// iterable [`QueryStream`] is more convenient.
1618 ///
1619 /// The returned [`RowStream`](crate::RowStream) borrows the client for its
1620 /// lifetime, so no other request can run on this connection until the stream
1621 /// is consumed or dropped. Also available on `Client<InTransaction>` to
1622 /// stream within a transaction.
1623 ///
1624 /// # Example
1625 ///
1626 /// ```rust,no_run
1627 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1628 /// let mut stream = client.query_stream("SELECT id FROM big_table", &[]).await?;
1629 /// while let Some(row) = stream.try_next().await? {
1630 /// let id: i32 = row.get_by_name("id")?;
1631 /// let _ = id;
1632 /// }
1633 /// # Ok(())
1634 /// # }
1635 /// ```
1636 pub async fn query_stream<'a>(
1637 &'a mut self,
1638 sql: &str,
1639 params: &[&(dyn crate::ToSql + Sync)],
1640 ) -> Result<crate::row_stream::RowStream<'a, Ready>> {
1641 self.query_stream_inner(sql, params).await
1642 }
1643
1644 /// Execute a query and stream a row's trailing MAX column from the network.
1645 ///
1646 /// For result sets whose last column is a single MAX type
1647 /// (`VARBINARY(MAX)`, `NVARCHAR(MAX)`, `VARCHAR(MAX)`, `XML`), this reads
1648 /// that column's bytes incrementally from the socket instead of
1649 /// materializing the cell — so a multi-GB BLOB can be streamed to a sink in
1650 /// bounded memory. The leading (scalar) columns are decoded eagerly into the
1651 /// per-row [`Row`](crate::Row).
1652 ///
1653 /// The MAX column must be the **last** column. The returned
1654 /// [`BlobStream`](crate::BlobStream) yields scalar [`Row`](crate::Row)s via
1655 /// [`next`](crate::BlobStream::next); read each row's blob with
1656 /// [`read_chunk`](crate::BlobStream::read_chunk) /
1657 /// [`copy_blob_to`](crate::BlobStream::copy_blob_to) before advancing. Also
1658 /// available on `Client<InTransaction>`.
1659 ///
1660 /// # Errors
1661 ///
1662 /// Returns an error if the result set has no trailing MAX column, has more
1663 /// than one MAX column, the MAX column is not last, or the result set uses
1664 /// Always Encrypted (not yet supported on this path).
1665 pub async fn query_stream_blob<'a>(
1666 &'a mut self,
1667 sql: &str,
1668 params: &[&(dyn crate::ToSql + Sync)],
1669 ) -> Result<crate::blob_stream::BlobStream<'a, Ready>> {
1670 self.query_stream_blob_inner(sql, params).await
1671 }
1672
1673 /// Execute a query and stream a row's **trailing MAX columns** from the
1674 /// network — the multi-column generalization of
1675 /// [`query_stream_blob`](Self::query_stream_blob).
1676 ///
1677 /// For result sets whose trailing columns are one or more MAX types
1678 /// (`VARBINARY(MAX)`, `NVARCHAR(MAX)`, `VARCHAR(MAX)`, `XML`), this decodes
1679 /// the leading scalar columns eagerly into the per-row [`Row`](crate::Row)
1680 /// and streams each trailing MAX column's bytes incrementally from the
1681 /// socket, in bounded memory. The returned
1682 /// [`BlobStream`](crate::BlobStream) yields scalar rows via
1683 /// [`next`](crate::BlobStream::next); within each row, iterate the trailing
1684 /// MAX columns with [`next_blob`](crate::BlobStream::next_blob), reading each
1685 /// with [`copy_blob_to`](crate::BlobStream::copy_blob_to) /
1686 /// [`read_chunk`](crate::BlobStream::read_chunk). Also available on
1687 /// `Client<InTransaction>`.
1688 ///
1689 /// # Errors
1690 ///
1691 /// Returns an error if the result set has no trailing MAX column, has a
1692 /// non-MAX column after a MAX column (interleaved MAX columns are not
1693 /// supported — the MAX columns must be trailing), or uses Always Encrypted
1694 /// (not yet supported on this path).
1695 pub async fn query_stream_rows<'a>(
1696 &'a mut self,
1697 sql: &str,
1698 params: &[&(dyn crate::ToSql + Sync)],
1699 ) -> Result<crate::blob_stream::BlobStream<'a, Ready>> {
1700 self.query_stream_rows_inner(sql, params).await
1701 }
1702
1703 /// Execute a query with a specific timeout.
1704 ///
1705 /// This overrides the default `command_timeout` from the connection configuration
1706 /// for this specific query. If the query does not complete within the specified
1707 /// duration, the driver sends an Attention packet to cancel it server-side,
1708 /// drains the acknowledgement, and returns [`Error::CommandTimeout`] with the
1709 /// connection left usable for the next request.
1710 ///
1711 /// # Arguments
1712 ///
1713 /// * `sql` - The SQL query to execute
1714 /// * `params` - Query parameters
1715 /// * `timeout_duration` - Maximum time to wait for the query to complete
1716 ///
1717 /// # Example
1718 ///
1719 /// ```rust,no_run
1720 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1721 /// use std::time::Duration;
1722 ///
1723 /// // Execute with a 5-second timeout
1724 /// let rows = client
1725 /// .query_with_timeout(
1726 /// "SELECT * FROM large_table",
1727 /// &[],
1728 /// Duration::from_secs(5),
1729 /// )
1730 /// .await?;
1731 /// # let _ = rows;
1732 /// # Ok(())
1733 /// # }
1734 /// ```
1735 pub async fn query_with_timeout<'a>(
1736 &'a mut self,
1737 sql: &str,
1738 params: &[&(dyn crate::ToSql + Sync)],
1739 timeout_duration: std::time::Duration,
1740 ) -> Result<QueryStream<'a>> {
1741 self.query_inner(sql, params, Some(timeout_duration)).await
1742 }
1743
1744 /// Execute a batch that may return multiple result sets.
1745 ///
1746 /// This is useful for stored procedures or SQL batches that contain
1747 /// multiple SELECT statements.
1748 ///
1749 /// # Example
1750 ///
1751 /// ```rust,no_run
1752 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1753 /// // Execute a batch with multiple SELECT statements
1754 /// let mut results = client.query_multiple(
1755 /// "SELECT 1 AS a; SELECT 2 AS b, 3 AS c;",
1756 /// &[]
1757 /// ).await?;
1758 ///
1759 /// // Process first result set
1760 /// while let Some(row) = results.next_row().await? {
1761 /// println!("Result 1: {:?}", row);
1762 /// }
1763 ///
1764 /// // Move to second result set
1765 /// if results.next_result().await? {
1766 /// while let Some(row) = results.next_row().await? {
1767 /// println!("Result 2: {:?}", row);
1768 /// }
1769 /// }
1770 /// # Ok(())
1771 /// # }
1772 /// ```
1773 pub async fn query_multiple<'a>(
1774 &'a mut self,
1775 sql: &str,
1776 params: &[&(dyn crate::ToSql + Sync)],
1777 ) -> Result<MultiResultStream<'a>> {
1778 tracing::debug!(
1779 sql = sql,
1780 params_count = params.len(),
1781 "executing multi-result query"
1782 );
1783
1784 #[cfg(feature = "otel")]
1785 let instrumentation = self.instrumentation.clone();
1786 #[cfg(feature = "otel")]
1787 let mut span = instrumentation.query_span(sql);
1788 #[cfg(feature = "otel")]
1789 let timer = crate::instrumentation::OperationTimer::start(
1790 crate::instrumentation::extract_operation(sql),
1791 );
1792
1793 let deadline = self.command_deadline();
1794 let canceller = self.connection_cancel_handle();
1795 let result = run_with_deadline(
1796 async {
1797 if params.is_empty() {
1798 // Simple batch without parameters - use SQL batch
1799 self.send_sql_batch(sql).await?;
1800 } else {
1801 // Parameterized query - sp_executesql (encrypts Always Encrypted params).
1802 let rpc = self.build_parameterized_rpc(sql, params).await?;
1803 self.send_rpc(&rpc).await?;
1804 }
1805
1806 // Read all result sets
1807 self.read_multi_result_response().await
1808 },
1809 deadline,
1810 canceller,
1811 )
1812 .await;
1813
1814 #[cfg(feature = "otel")]
1815 match &result {
1816 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1817 Err(e) => InstrumentationContext::record_error(&mut span, e),
1818 }
1819 #[cfg(feature = "otel")]
1820 timer.finish(instrumentation.metrics(), result.is_ok());
1821 #[cfg(feature = "otel")]
1822 drop(span);
1823
1824 let result_sets = result?;
1825 Ok(MultiResultStream::new(result_sets))
1826 }
1827
1828 /// Execute a query that doesn't return rows.
1829 ///
1830 /// Returns the number of affected rows.
1831 pub async fn execute(
1832 &mut self,
1833 sql: &str,
1834 params: &[&(dyn crate::ToSql + Sync)],
1835 ) -> Result<u64> {
1836 let deadline = self.command_deadline();
1837 self.execute_inner(sql, params, deadline).await
1838 }
1839
1840 /// Shared execute implementation with an explicit command deadline.
1841 async fn execute_inner(
1842 &mut self,
1843 sql: &str,
1844 params: &[&(dyn crate::ToSql + Sync)],
1845 deadline: Option<std::time::Duration>,
1846 ) -> Result<u64> {
1847 tracing::debug!(
1848 sql = sql,
1849 params_count = params.len(),
1850 "executing statement"
1851 );
1852
1853 #[cfg(feature = "otel")]
1854 let instrumentation = self.instrumentation.clone();
1855 #[cfg(feature = "otel")]
1856 let mut span = instrumentation.query_span(sql);
1857 #[cfg(feature = "otel")]
1858 let timer = crate::instrumentation::OperationTimer::start(
1859 crate::instrumentation::extract_operation(sql),
1860 );
1861
1862 let canceller = self.cancel_handle();
1863 let result = run_with_deadline(
1864 async {
1865 if params.is_empty() {
1866 // Simple statement without parameters - use SQL batch
1867 self.send_sql_batch(sql).await?;
1868 } else {
1869 // Parameterized statement - sp_executesql (encrypts Always Encrypted params).
1870 let rpc = self.build_parameterized_rpc(sql, params).await?;
1871 self.send_rpc(&rpc).await?;
1872 }
1873
1874 // Read response and get row count
1875 self.read_execute_result().await
1876 },
1877 deadline,
1878 canceller,
1879 )
1880 .await;
1881
1882 #[cfg(feature = "otel")]
1883 match &result {
1884 Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
1885 Err(e) => InstrumentationContext::record_error(&mut span, e),
1886 }
1887 #[cfg(feature = "otel")]
1888 timer.finish(instrumentation.metrics(), result.is_ok());
1889
1890 // Drop the span before returning
1891 #[cfg(feature = "otel")]
1892 drop(span);
1893
1894 result
1895 }
1896
1897 /// Execute a statement with a specific timeout.
1898 ///
1899 /// This overrides the default `command_timeout` from the connection configuration
1900 /// for this specific statement. If the statement does not complete within the
1901 /// specified duration, the driver sends an Attention packet to cancel it
1902 /// server-side, drains the acknowledgement, and returns
1903 /// [`Error::CommandTimeout`] with the connection left usable.
1904 ///
1905 /// # Arguments
1906 ///
1907 /// * `sql` - The SQL statement to execute
1908 /// * `params` - Statement parameters
1909 /// * `timeout_duration` - Maximum time to wait for the statement to complete
1910 ///
1911 /// # Example
1912 ///
1913 /// ```rust,no_run
1914 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1915 /// use std::time::Duration;
1916 ///
1917 /// // Execute with a 10-second timeout
1918 /// let rows_affected = client
1919 /// .execute_with_timeout(
1920 /// "UPDATE large_table SET status = @p1",
1921 /// &[&"processed"],
1922 /// Duration::from_secs(10),
1923 /// )
1924 /// .await?;
1925 /// # let _ = rows_affected;
1926 /// # Ok(())
1927 /// # }
1928 /// ```
1929 pub async fn execute_with_timeout(
1930 &mut self,
1931 sql: &str,
1932 params: &[&(dyn crate::ToSql + Sync)],
1933 timeout_duration: std::time::Duration,
1934 ) -> Result<u64> {
1935 self.execute_inner(sql, params, Some(timeout_duration))
1936 .await
1937 }
1938
1939 /// Begin a transaction.
1940 ///
1941 /// This transitions the client from `Ready` to `InTransaction` state.
1942 /// Per MS-TDS spec, the server returns a transaction descriptor in the
1943 /// BeginTransaction EnvChange token that must be included in subsequent
1944 /// ALL_HEADERS sections.
1945 pub async fn begin_transaction(mut self) -> Result<Client<InTransaction>> {
1946 tracing::debug!("beginning transaction");
1947
1948 #[cfg(feature = "otel")]
1949 let instrumentation = self.instrumentation.clone();
1950 #[cfg(feature = "otel")]
1951 let mut span = instrumentation.transaction_span("BEGIN");
1952
1953 // Execute BEGIN TRANSACTION and extract the transaction descriptor
1954 let result = async {
1955 self.send_sql_batch("BEGIN TRANSACTION").await?;
1956 self.read_transaction_begin_result().await
1957 }
1958 .await;
1959
1960 #[cfg(feature = "otel")]
1961 match &result {
1962 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1963 Err(e) => InstrumentationContext::record_error(&mut span, e),
1964 }
1965
1966 // Drop the span before moving instrumentation
1967 #[cfg(feature = "otel")]
1968 drop(span);
1969
1970 let transaction_descriptor = result?;
1971
1972 Ok(Client {
1973 config: self.config,
1974 _state: PhantomData,
1975 connection: self.connection,
1976 server_version: self.server_version,
1977 current_database: self.current_database,
1978 server_collation: self.server_collation,
1979 statement_cache: self.statement_cache,
1980 transaction_descriptor, // Store the descriptor from server
1981 needs_reset: self.needs_reset,
1982 in_flight: self.in_flight,
1983 #[cfg(feature = "otel")]
1984 instrumentation: self.instrumentation,
1985 #[cfg(feature = "always-encrypted")]
1986 encryption_context: self.encryption_context,
1987 })
1988 }
1989
1990 /// Begin a transaction with a specific isolation level.
1991 ///
1992 /// This transitions the client from `Ready` to `InTransaction` state
1993 /// with the specified isolation level.
1994 ///
1995 /// # Example
1996 ///
1997 /// ```rust,no_run
1998 /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1999 /// use mssql_client::IsolationLevel;
2000 ///
2001 /// let tx = client.begin_transaction_with_isolation(IsolationLevel::Serializable).await?;
2002 /// // All operations in this transaction use SERIALIZABLE isolation
2003 /// tx.commit().await?;
2004 /// # Ok(())
2005 /// # }
2006 /// ```
2007 pub async fn begin_transaction_with_isolation(
2008 mut self,
2009 isolation_level: crate::transaction::IsolationLevel,
2010 ) -> Result<Client<InTransaction>> {
2011 tracing::debug!(
2012 isolation_level = %isolation_level.name(),
2013 "beginning transaction with isolation level"
2014 );
2015
2016 #[cfg(feature = "otel")]
2017 let instrumentation = self.instrumentation.clone();
2018 #[cfg(feature = "otel")]
2019 let mut span = instrumentation.transaction_span("BEGIN");
2020
2021 // First set the isolation level
2022 let result = async {
2023 self.send_sql_batch(isolation_level.as_sql()).await?;
2024 self.read_execute_result().await?;
2025
2026 // Then begin the transaction
2027 self.send_sql_batch("BEGIN TRANSACTION").await?;
2028 self.read_transaction_begin_result().await
2029 }
2030 .await;
2031
2032 #[cfg(feature = "otel")]
2033 match &result {
2034 Ok(_) => InstrumentationContext::record_success(&mut span, None),
2035 Err(e) => InstrumentationContext::record_error(&mut span, e),
2036 }
2037
2038 #[cfg(feature = "otel")]
2039 drop(span);
2040
2041 let transaction_descriptor = result?;
2042
2043 Ok(Client {
2044 config: self.config,
2045 _state: PhantomData,
2046 connection: self.connection,
2047 server_version: self.server_version,
2048 current_database: self.current_database,
2049 server_collation: self.server_collation,
2050 statement_cache: self.statement_cache,
2051 transaction_descriptor,
2052 needs_reset: self.needs_reset,
2053 in_flight: self.in_flight,
2054 #[cfg(feature = "otel")]
2055 instrumentation: self.instrumentation,
2056 #[cfg(feature = "always-encrypted")]
2057 encryption_context: self.encryption_context,
2058 })
2059 }
2060
2061 /// Execute a simple query without parameters.
2062 ///
2063 /// This is useful for DDL statements and simple queries where you
2064 /// don't need to retrieve the affected row count.
2065 pub async fn simple_query(&mut self, sql: &str) -> Result<()> {
2066 tracing::debug!(sql = sql, "executing simple query");
2067
2068 // Send SQL batch
2069 self.send_sql_batch(sql).await?;
2070
2071 // Read and discard response
2072 let _ = self.read_execute_result().await?;
2073
2074 Ok(())
2075 }
2076
2077 /// Close the connection gracefully.
2078 pub async fn close(self) -> Result<()> {
2079 tracing::debug!("closing connection");
2080 Ok(())
2081 }
2082
2083 /// Get the current database name.
2084 #[must_use]
2085 pub fn database(&self) -> Option<&str> {
2086 self.config.database.as_deref()
2087 }
2088
2089 /// Get the server host.
2090 #[must_use]
2091 pub fn host(&self) -> &str {
2092 &self.config.host
2093 }
2094
2095 /// Get the server port.
2096 #[must_use]
2097 pub fn port(&self) -> u16 {
2098 self.config.port
2099 }
2100
2101 /// Check if the connection is currently in a transaction.
2102 ///
2103 /// This returns `true` if a transaction was started via raw SQL
2104 /// (`BEGIN TRANSACTION`) and has not yet been committed or rolled back.
2105 ///
2106 /// Note: This only tracks transactions started via raw SQL. Transactions
2107 /// started via the type-state API (`begin_transaction()`) result in a
2108 /// `Client<InTransaction>` which is a different type.
2109 ///
2110 /// # Example
2111 ///
2112 /// ```rust,no_run
2113 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
2114 /// client.execute("BEGIN TRANSACTION", &[]).await?;
2115 /// assert!(client.is_in_transaction());
2116 ///
2117 /// client.execute("COMMIT", &[]).await?;
2118 /// assert!(!client.is_in_transaction());
2119 /// # Ok(())
2120 /// # }
2121 /// ```
2122 #[must_use]
2123 pub fn is_in_transaction(&self) -> bool {
2124 self.transaction_descriptor != 0
2125 }
2126
2127 /// Check if a request is in-flight (sent but response not fully read).
2128 ///
2129 /// Used by the connection pool to detect dirty connections that were
2130 /// interrupted mid-query (e.g., by `tokio::select!` or a timeout).
2131 /// A connection with an in-flight request has unread data in the TCP
2132 /// buffer and must be discarded rather than returned to the pool.
2133 #[must_use]
2134 pub fn is_in_flight(&self) -> bool {
2135 self.in_flight
2136 }
2137
2138 /// Report whether an Always Encrypted key-store provider with the given
2139 /// name is currently reachable through this client's encryption context.
2140 ///
2141 /// Returns `false` when the `always-encrypted` feature isn't enabled, when
2142 /// the connection was opened without `column_encryption` configured, or
2143 /// when no matching provider was registered.
2144 #[cfg(feature = "always-encrypted")]
2145 #[must_use]
2146 pub fn has_encryption_provider(&self, name: &str) -> bool {
2147 self.encryption_context
2148 .as_ref()
2149 .is_some_and(|ctx| ctx.has_provider(name))
2150 }
2151
2152 /// Get a handle for cancelling the current query.
2153 ///
2154 /// The cancel handle can be cloned and sent to other tasks, enabling
2155 /// cancellation of long-running queries from a separate async context.
2156 ///
2157 /// # Example
2158 ///
2159 /// ```rust,no_run
2160 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
2161 /// use std::time::Duration;
2162 ///
2163 /// let cancel_handle = client.cancel_handle();
2164 ///
2165 /// // Spawn a task to cancel after 10 seconds
2166 /// let handle = tokio::spawn(async move {
2167 /// tokio::time::sleep(Duration::from_secs(10)).await;
2168 /// let _ = cancel_handle.cancel().await;
2169 /// });
2170 ///
2171 /// // This query will be cancelled if it runs longer than 10 seconds
2172 /// let result = client.query("SELECT * FROM very_large_table", &[]).await;
2173 /// # let _ = (handle, result);
2174 /// # Ok(())
2175 /// # }
2176 /// ```
2177 #[must_use]
2178 pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
2179 self.connection_cancel_handle()
2180 }
2181}
2182
2183/// # Drop Behavior
2184///
2185/// **`Client<InTransaction>` has no automatic rollback on drop.** If the client is
2186/// dropped without calling [`commit()`](Client::commit) or [`rollback()`](Client::rollback),
2187/// the transaction remains open on the server until the TCP connection closes
2188/// (at which point SQL Server automatically rolls back).
2189///
2190/// This is because `Drop` is synchronous and cannot perform the async I/O needed
2191/// to send a `ROLLBACK TRANSACTION` command.
2192///
2193/// ## Consequences of dropping without commit/rollback
2194///
2195/// - **Direct connections:** The transaction leaks until the OS TCP timeout
2196/// (potentially 30+ minutes), holding locks on any modified rows.
2197/// - **Pooled connections:** The pool detects the active transaction descriptor
2198/// and discards the connection rather than returning it to the idle pool
2199/// (see `PooledConnection::drop` in `mssql-driver-pool`).
2200///
2201/// ## Best practice
2202///
2203/// Always ensure `commit()` or `rollback()` is called. Use helper patterns
2204/// for error paths:
2205///
2206/// ```rust,no_run
2207/// # async fn do_work(_: &mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> { Ok(()) }
2208/// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
2209/// let tx = client.begin_transaction().await?;
2210/// match do_work(&tx).await {
2211/// Ok(_) => { tx.commit().await?; }
2212/// Err(e) => { tx.rollback().await?; return Err(e); }
2213/// }
2214/// # Ok(())
2215/// # }
2216/// ```
2217impl Client<InTransaction> {
2218 /// Execute a query within the transaction and return a streaming result set.
2219 ///
2220 /// See [`Client<Ready>::query`] for usage examples.
2221 pub async fn query<'a>(
2222 &'a mut self,
2223 sql: &str,
2224 params: &[&(dyn crate::ToSql + Sync)],
2225 ) -> Result<QueryStream<'a>> {
2226 let deadline = self.command_deadline();
2227 self.query_inner(sql, params, deadline).await
2228 }
2229
2230 /// Shared query implementation with an explicit command deadline.
2231 async fn query_inner<'a>(
2232 &'a mut self,
2233 sql: &str,
2234 params: &[&(dyn crate::ToSql + Sync)],
2235 deadline: Option<std::time::Duration>,
2236 ) -> Result<QueryStream<'a>> {
2237 tracing::debug!(
2238 sql = sql,
2239 params_count = params.len(),
2240 "executing query in transaction"
2241 );
2242
2243 #[cfg(feature = "otel")]
2244 let instrumentation = self.instrumentation.clone();
2245 #[cfg(feature = "otel")]
2246 let mut span = instrumentation.query_span(sql);
2247 #[cfg(feature = "otel")]
2248 let timer = crate::instrumentation::OperationTimer::start(
2249 crate::instrumentation::extract_operation(sql),
2250 );
2251
2252 let canceller = self.cancel_handle();
2253 let result = run_with_deadline(
2254 async {
2255 // Sends via the prepared-statement cache when enabled, else the
2256 // SQL batch / sp_executesql default.
2257 self.send_query_request(sql, params).await?;
2258
2259 // Read complete response including columns and rows
2260 self.read_query_response().await
2261 },
2262 deadline,
2263 canceller,
2264 )
2265 .await;
2266
2267 #[cfg(feature = "otel")]
2268 match &result {
2269 Ok(_) => InstrumentationContext::record_success(&mut span, None),
2270 Err(e) => InstrumentationContext::record_error(&mut span, e),
2271 }
2272 #[cfg(feature = "otel")]
2273 timer.finish(instrumentation.metrics(), result.is_ok());
2274
2275 // Drop the span before returning
2276 #[cfg(feature = "otel")]
2277 drop(span);
2278
2279 let resp = result?;
2280 #[cfg(feature = "always-encrypted")]
2281 {
2282 Ok(QueryStream::from_raw(
2283 resp.columns,
2284 resp.pending_rows,
2285 resp.meta,
2286 resp.decryptor,
2287 ))
2288 }
2289 #[cfg(not(feature = "always-encrypted"))]
2290 {
2291 Ok(QueryStream::from_raw(
2292 resp.columns,
2293 resp.pending_rows,
2294 resp.meta,
2295 ))
2296 }
2297 }
2298
2299 /// Stream rows incrementally from the network within the transaction.
2300 ///
2301 /// Identical to [`Client<Ready>::query_stream`] except the query runs inside
2302 /// the open transaction. The returned [`RowStream`](crate::RowStream)
2303 /// borrows the transaction client for its lifetime, so the stream must be
2304 /// consumed or dropped before the transaction can be committed or rolled
2305 /// back.
2306 pub async fn query_stream<'a>(
2307 &'a mut self,
2308 sql: &str,
2309 params: &[&(dyn crate::ToSql + Sync)],
2310 ) -> Result<crate::row_stream::RowStream<'a, InTransaction>> {
2311 self.query_stream_inner(sql, params).await
2312 }
2313
2314 /// Stream a row's trailing MAX column from the network within the
2315 /// transaction.
2316 ///
2317 /// See [`Client<Ready>::query_stream_blob`] for semantics and constraints;
2318 /// the only difference is that the query runs inside the open transaction.
2319 pub async fn query_stream_blob<'a>(
2320 &'a mut self,
2321 sql: &str,
2322 params: &[&(dyn crate::ToSql + Sync)],
2323 ) -> Result<crate::blob_stream::BlobStream<'a, InTransaction>> {
2324 self.query_stream_blob_inner(sql, params).await
2325 }
2326
2327 /// Stream a row's trailing MAX columns from the network within the
2328 /// transaction.
2329 ///
2330 /// See [`Client<Ready>::query_stream_rows`] for semantics and constraints;
2331 /// the only difference is that the query runs inside the open transaction.
2332 pub async fn query_stream_rows<'a>(
2333 &'a mut self,
2334 sql: &str,
2335 params: &[&(dyn crate::ToSql + Sync)],
2336 ) -> Result<crate::blob_stream::BlobStream<'a, InTransaction>> {
2337 self.query_stream_rows_inner(sql, params).await
2338 }
2339
2340 /// Execute a statement within the transaction.
2341 ///
2342 /// Returns the number of affected rows.
2343 pub async fn execute(
2344 &mut self,
2345 sql: &str,
2346 params: &[&(dyn crate::ToSql + Sync)],
2347 ) -> Result<u64> {
2348 let deadline = self.command_deadline();
2349 self.execute_inner(sql, params, deadline).await
2350 }
2351
2352 /// Shared execute implementation with an explicit command deadline.
2353 async fn execute_inner(
2354 &mut self,
2355 sql: &str,
2356 params: &[&(dyn crate::ToSql + Sync)],
2357 deadline: Option<std::time::Duration>,
2358 ) -> Result<u64> {
2359 tracing::debug!(
2360 sql = sql,
2361 params_count = params.len(),
2362 "executing statement in transaction"
2363 );
2364
2365 #[cfg(feature = "otel")]
2366 let instrumentation = self.instrumentation.clone();
2367 #[cfg(feature = "otel")]
2368 let mut span = instrumentation.query_span(sql);
2369 #[cfg(feature = "otel")]
2370 let timer = crate::instrumentation::OperationTimer::start(
2371 crate::instrumentation::extract_operation(sql),
2372 );
2373
2374 let canceller = self.cancel_handle();
2375 let result = run_with_deadline(
2376 async {
2377 if params.is_empty() {
2378 // Simple statement without parameters - use SQL batch
2379 self.send_sql_batch(sql).await?;
2380 } else {
2381 // Parameterized statement - sp_executesql (encrypts Always Encrypted params).
2382 let rpc = self.build_parameterized_rpc(sql, params).await?;
2383 self.send_rpc(&rpc).await?;
2384 }
2385
2386 // Read response and get row count
2387 self.read_execute_result().await
2388 },
2389 deadline,
2390 canceller,
2391 )
2392 .await;
2393
2394 #[cfg(feature = "otel")]
2395 match &result {
2396 Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
2397 Err(e) => InstrumentationContext::record_error(&mut span, e),
2398 }
2399 #[cfg(feature = "otel")]
2400 timer.finish(instrumentation.metrics(), result.is_ok());
2401
2402 // Drop the span before returning
2403 #[cfg(feature = "otel")]
2404 drop(span);
2405
2406 result
2407 }
2408
2409 /// Execute a query within the transaction with a specific timeout.
2410 ///
2411 /// See [`Client<Ready>::query_with_timeout`] for details.
2412 pub async fn query_with_timeout<'a>(
2413 &'a mut self,
2414 sql: &str,
2415 params: &[&(dyn crate::ToSql + Sync)],
2416 timeout_duration: std::time::Duration,
2417 ) -> Result<QueryStream<'a>> {
2418 self.query_inner(sql, params, Some(timeout_duration)).await
2419 }
2420
2421 /// Execute a statement within the transaction with a specific timeout.
2422 ///
2423 /// See [`Client<Ready>::execute_with_timeout`] for details.
2424 pub async fn execute_with_timeout(
2425 &mut self,
2426 sql: &str,
2427 params: &[&(dyn crate::ToSql + Sync)],
2428 timeout_duration: std::time::Duration,
2429 ) -> Result<u64> {
2430 self.execute_inner(sql, params, Some(timeout_duration))
2431 .await
2432 }
2433
2434 /// Open a FILESTREAM BLOB for async reading and/or writing.
2435 ///
2436 /// This method queries the server for the transaction context, then opens
2437 /// the FILESTREAM handle using the native Win32 `OpenSqlFilestream` API.
2438 ///
2439 /// # Arguments
2440 ///
2441 /// * `path` — The UNC path obtained from the T-SQL `column.PathName()` function.
2442 /// Query this yourself before calling `open_filestream`:
2443 /// ```sql
2444 /// SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1
2445 /// ```
2446 /// * `access` — Read, write, or read/write access mode.
2447 ///
2448 /// # Requirements
2449 ///
2450 /// - SQL Server must have FILESTREAM enabled (`sp_configure 'filestream access level', 2`)
2451 /// - The Microsoft OLE DB Driver for SQL Server must be installed on the client
2452 /// - The `FileStream` must be dropped before calling [`commit`] or [`rollback`]
2453 ///
2454 /// # Example
2455 ///
2456 /// ```text
2457 /// use mssql_client::FileStreamAccess;
2458 /// use tokio::io::AsyncReadExt;
2459 ///
2460 /// let mut tx = client.begin_transaction().await?;
2461 ///
2462 /// // Get the FILESTREAM path
2463 /// let rows = tx.query(
2464 /// "SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1",
2465 /// &[&doc_id],
2466 /// ).await?;
2467 /// let path: String = rows.into_iter().next().unwrap()?.get(0)?;
2468 ///
2469 /// // Open and read the BLOB
2470 /// let mut stream = tx.open_filestream(&path, FileStreamAccess::Read).await?;
2471 /// let mut data = Vec::new();
2472 /// stream.read_to_end(&mut data).await?;
2473 /// drop(stream);
2474 ///
2475 /// tx.commit().await?;
2476 /// ```
2477 #[cfg(all(windows, feature = "filestream"))]
2478 pub async fn open_filestream(
2479 &mut self,
2480 path: &str,
2481 access: crate::filestream::FileStreamAccess,
2482 ) -> Result<crate::filestream::FileStream> {
2483 tracing::debug!(path = path, ?access, "opening FILESTREAM BLOB");
2484
2485 // Get the transaction context from SQL Server.
2486 // This binds the file access to the current SQL transaction.
2487 let txn_context: Vec<u8> = {
2488 let rows = self
2489 .query("SELECT GET_FILESTREAM_TRANSACTION_CONTEXT()", &[])
2490 .await?;
2491 let mut ctx = None;
2492 for result in rows {
2493 let row = result?;
2494 ctx = Some(row.get::<Vec<u8>>(0)?);
2495 }
2496 ctx.ok_or_else(|| {
2497 Error::FileStream("GET_FILESTREAM_TRANSACTION_CONTEXT() returned no rows".into())
2498 })?
2499 };
2500
2501 crate::filestream::FileStream::open(path, access, &txn_context)
2502 }
2503
2504 /// Commit the transaction.
2505 ///
2506 /// This transitions the client back to `Ready` state.
2507 pub async fn commit(mut self) -> Result<Client<Ready>> {
2508 tracing::debug!("committing transaction");
2509
2510 #[cfg(feature = "otel")]
2511 let instrumentation = self.instrumentation.clone();
2512 #[cfg(feature = "otel")]
2513 let mut span = instrumentation.transaction_span("COMMIT");
2514
2515 // Execute COMMIT TRANSACTION
2516 let result = async {
2517 self.send_sql_batch("COMMIT TRANSACTION").await?;
2518 self.read_execute_result().await
2519 }
2520 .await;
2521
2522 #[cfg(feature = "otel")]
2523 match &result {
2524 Ok(_) => InstrumentationContext::record_success(&mut span, None),
2525 Err(e) => InstrumentationContext::record_error(&mut span, e),
2526 }
2527
2528 // Drop the span before moving instrumentation
2529 #[cfg(feature = "otel")]
2530 drop(span);
2531
2532 result?;
2533
2534 Ok(Client {
2535 config: self.config,
2536 _state: PhantomData,
2537 connection: self.connection,
2538 server_version: self.server_version,
2539 current_database: self.current_database,
2540 server_collation: self.server_collation,
2541 statement_cache: self.statement_cache,
2542 transaction_descriptor: 0, // Reset to auto-commit mode
2543 needs_reset: self.needs_reset,
2544 in_flight: self.in_flight,
2545 #[cfg(feature = "otel")]
2546 instrumentation: self.instrumentation,
2547 #[cfg(feature = "always-encrypted")]
2548 encryption_context: self.encryption_context,
2549 })
2550 }
2551
2552 /// Rollback the transaction.
2553 ///
2554 /// This transitions the client back to `Ready` state.
2555 pub async fn rollback(mut self) -> Result<Client<Ready>> {
2556 tracing::debug!("rolling back transaction");
2557
2558 #[cfg(feature = "otel")]
2559 let instrumentation = self.instrumentation.clone();
2560 #[cfg(feature = "otel")]
2561 let mut span = instrumentation.transaction_span("ROLLBACK");
2562
2563 // Execute ROLLBACK TRANSACTION
2564 let result = async {
2565 self.send_sql_batch("ROLLBACK TRANSACTION").await?;
2566 self.read_execute_result().await
2567 }
2568 .await;
2569
2570 #[cfg(feature = "otel")]
2571 match &result {
2572 Ok(_) => InstrumentationContext::record_success(&mut span, None),
2573 Err(e) => InstrumentationContext::record_error(&mut span, e),
2574 }
2575
2576 // Drop the span before moving instrumentation
2577 #[cfg(feature = "otel")]
2578 drop(span);
2579
2580 result?;
2581
2582 Ok(Client {
2583 config: self.config,
2584 _state: PhantomData,
2585 connection: self.connection,
2586 server_version: self.server_version,
2587 current_database: self.current_database,
2588 server_collation: self.server_collation,
2589 statement_cache: self.statement_cache,
2590 transaction_descriptor: 0, // Reset to auto-commit mode
2591 needs_reset: self.needs_reset,
2592 in_flight: self.in_flight,
2593 #[cfg(feature = "otel")]
2594 instrumentation: self.instrumentation,
2595 #[cfg(feature = "always-encrypted")]
2596 encryption_context: self.encryption_context,
2597 })
2598 }
2599
2600 /// Create a savepoint and return a handle for later rollback.
2601 ///
2602 /// The returned `SavePoint` handle contains the validated savepoint name.
2603 /// Use it with `rollback_to()` to partially undo transaction work.
2604 ///
2605 /// # Example
2606 ///
2607 /// ```rust,no_run
2608 /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
2609 /// let mut tx = client.begin_transaction().await?;
2610 /// tx.execute("INSERT INTO orders ...", &[]).await?;
2611 /// let sp = tx.save_point("before_items").await?;
2612 /// tx.execute("INSERT INTO items ...", &[]).await?;
2613 /// // Oops, rollback just the items
2614 /// tx.rollback_to(&sp).await?;
2615 /// tx.commit().await?;
2616 /// # Ok(())
2617 /// # }
2618 /// ```
2619 pub async fn save_point(&mut self, name: &str) -> Result<SavePoint> {
2620 crate::validation::validate_identifier(name)?;
2621 tracing::debug!(name = name, "creating savepoint");
2622
2623 // Execute SAVE TRANSACTION <name>
2624 // Note: name is validated by validate_identifier() to prevent SQL injection
2625 let sql = format!("SAVE TRANSACTION {name}");
2626 self.send_sql_batch(&sql).await?;
2627 self.read_execute_result().await?;
2628
2629 Ok(SavePoint::new(name.to_string()))
2630 }
2631
2632 /// Rollback to a savepoint.
2633 ///
2634 /// This rolls back all changes made after the savepoint was created,
2635 /// but keeps the transaction active. The savepoint remains valid and
2636 /// can be rolled back to again.
2637 ///
2638 /// # Example
2639 ///
2640 /// ```rust,no_run
2641 /// # async fn ex(mut tx: mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> {
2642 /// let sp = tx.save_point("checkpoint").await?;
2643 /// // ... do some work ...
2644 /// tx.rollback_to(&sp).await?; // Undo changes since checkpoint
2645 /// // Transaction is still active, savepoint is still valid
2646 /// # Ok(())
2647 /// # }
2648 /// ```
2649 pub async fn rollback_to(&mut self, savepoint: &SavePoint) -> Result<()> {
2650 tracing::debug!(name = savepoint.name(), "rolling back to savepoint");
2651
2652 // Execute ROLLBACK TRANSACTION <name>
2653 // Note: savepoint name was validated during creation
2654 let sql = format!("ROLLBACK TRANSACTION {}", savepoint.name());
2655 self.send_sql_batch(&sql).await?;
2656 self.read_execute_result().await?;
2657
2658 Ok(())
2659 }
2660
2661 /// Release a savepoint (optional cleanup).
2662 ///
2663 /// Note: SQL Server doesn't have explicit savepoint release, but this
2664 /// method is provided for API completeness. The savepoint is automatically
2665 /// released when the transaction commits or rolls back.
2666 pub async fn release_savepoint(&mut self, savepoint: SavePoint) -> Result<()> {
2667 tracing::debug!(name = savepoint.name(), "releasing savepoint");
2668
2669 // SQL Server doesn't require explicit savepoint release
2670 // The savepoint is implicitly released on commit/rollback
2671 // This method exists for API completeness
2672 drop(savepoint);
2673 Ok(())
2674 }
2675
2676 /// Get a handle for cancelling the current query within the transaction.
2677 ///
2678 /// See [`Client<Ready>::cancel_handle`] for usage examples.
2679 #[must_use]
2680 pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
2681 self.connection_cancel_handle()
2682 }
2683}
2684
2685impl<S: ConnectionState> std::fmt::Debug for Client<S> {
2686 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2687 f.debug_struct("Client")
2688 .field("host", &self.config.host)
2689 .field("port", &self.config.port)
2690 .field("database", &self.config.database)
2691 .finish()
2692 }
2693}
2694
2695#[cfg(test)]
2696mod blob_result_set_validation_tests {
2697 use tds_protocol::token::{ColMetaData, ColumnData, TypeInfo};
2698 use tds_protocol::types::TypeId;
2699
2700 use super::{Client, Ready};
2701 use crate::error::Error;
2702
2703 /// A scalar (non-MAX) column.
2704 fn scalar(name: &str) -> ColumnData {
2705 col(name, TypeId::Int4, None)
2706 }
2707
2708 /// A MAX (PLP) column: `max_length == 0xFFFF` marks the MAX variant.
2709 fn blob(name: &str) -> ColumnData {
2710 col(name, TypeId::BigVarBinary, Some(0xFFFF))
2711 }
2712
2713 fn col(name: &str, type_id: TypeId, max_length: Option<u32>) -> ColumnData {
2714 ColumnData {
2715 name: name.to_string(),
2716 type_id,
2717 col_type: 0,
2718 flags: 0,
2719 user_type: 0,
2720 type_info: TypeInfo {
2721 max_length,
2722 ..Default::default()
2723 },
2724 crypto_metadata: None,
2725 }
2726 }
2727
2728 fn meta(columns: Vec<ColumnData>) -> ColMetaData {
2729 ColMetaData {
2730 columns,
2731 cek_table: None,
2732 }
2733 }
2734
2735 fn validate(columns: Vec<ColumnData>) -> Result<(usize, usize), Error> {
2736 Client::<Ready>::validate_blob_rows_result_set(&meta(columns))
2737 }
2738
2739 #[test]
2740 fn single_trailing_blob() {
2741 assert_eq!(validate(vec![scalar("id"), blob("doc")]).unwrap(), (1, 1));
2742 }
2743
2744 #[test]
2745 fn multiple_trailing_blobs() {
2746 assert_eq!(
2747 validate(vec![scalar("id"), blob("doc1"), blob("doc2")]).unwrap(),
2748 (1, 2)
2749 );
2750 }
2751
2752 #[test]
2753 fn all_columns_blobs() {
2754 assert_eq!(validate(vec![blob("a"), blob("b")]).unwrap(), (0, 2));
2755 }
2756
2757 #[test]
2758 fn no_max_column_is_rejected() {
2759 assert!(matches!(
2760 validate(vec![scalar("id"), scalar("j")]),
2761 Err(Error::Protocol(_))
2762 ));
2763 }
2764
2765 #[test]
2766 fn scalar_after_blob_is_rejected() {
2767 // Interleaved MAX columns are out of scope: a scalar after a blob.
2768 assert!(matches!(
2769 validate(vec![scalar("id"), blob("doc"), scalar("trailing")]),
2770 Err(Error::Protocol(_))
2771 ));
2772 // ...even when more blobs follow the interloping scalar.
2773 assert!(matches!(
2774 validate(vec![blob("doc1"), scalar("mid"), blob("doc2")]),
2775 Err(Error::Protocol(_))
2776 ));
2777 }
2778}