mssql_client/client.rs
1//! SQL Server client implementation.
2//!
3//! ## DDL and statement routing
4//!
5//! [`Client::execute`] routes automatically by parameter count: with no
6//! parameters it sends a SQL batch (which permits DDL such as `CREATE` / `ALTER`
7//! / `DROP`); with parameters it uses `sp_executesql`, whose procedure context
8//! SQL Server forbids DDL in. Run DDL with an empty parameter slice:
9//!
10//! ```rust,no_run
11//! # async fn create_table(config: mssql_client::Config) -> Result<(), mssql_client::Error> {
12//! # let mut client = mssql_client::Client::connect(config).await?;
13//! client.execute("CREATE TABLE dbo.t (id INT)", &[]).await?;
14//! # Ok(())
15//! # }
16//! ```
17//!
18//! Use [`Client::simple_query`] for fire-and-forget batches (including
19//! multi-statement, `;`-separated DDL) when you don't need the affected-row count.
20
21// Allow unwrap/expect for chrono date construction with known-valid constant dates
22// and for regex patterns that are compile-time constants
23#![allow(clippy::unwrap_used, clippy::expect_used, clippy::needless_range_loop)]
24
25mod connect;
26mod params;
27pub(crate) mod response;
28
29use std::marker::PhantomData;
30
31use mssql_codec::connection::Connection;
32#[cfg(feature = "tls")]
33use mssql_tls::TlsStream;
34use tds_protocol::packet::PacketType;
35use tds_protocol::rpc::RpcRequest;
36use tds_protocol::token::{EnvChange, EnvChangeType};
37use tokio::net::TcpStream;
38
39use crate::config::Config;
40use crate::error::{Error, Result};
41#[cfg(feature = "otel")]
42use crate::instrumentation::InstrumentationContext;
43use crate::state::{ConnectionState, InTransaction, Ready};
44use crate::statement_cache::StatementCache;
45use crate::stream::{MultiResultStream, QueryStream};
46use crate::transaction::SavePoint;
47
48/// How long to wait for the server to acknowledge an Attention packet after
49/// a command timeout. SqlClient waits 5 seconds before dooming the
50/// connection; we match it.
51const ATTENTION_ACK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
52
53/// Run a network future under an optional command deadline.
54///
55/// On timeout this sends an Attention packet via `canceller` and then awaits
56/// the future so its own read loop drains the server's DONE_ATTN
57/// acknowledgement, leaving the connection clean before returning
58/// [`Error::CommandTimeout`]. This is the cancel-safe alternative to dropping
59/// the future (e.g. via `tokio::time::timeout`), which would leave unconsumed
60/// TDS data in the connection buffer and desync the next request.
61///
62/// The drain itself is bounded by [`ATTENTION_ACK_TIMEOUT`] — a hung server
63/// that never acknowledges the attention must not turn the timeout into an
64/// infinite wait. When the bound expires the connection is abandoned
65/// mid-response: `in_flight` stays set, so the pool discards the connection
66/// at check-in instead of reusing it.
67pub(crate) async fn run_with_deadline<F, T>(
68 fut: F,
69 deadline: Option<std::time::Duration>,
70 canceller: crate::cancel::CancelHandle,
71) -> Result<T>
72where
73 F: std::future::Future<Output = Result<T>>,
74{
75 let Some(d) = deadline else {
76 return fut.await;
77 };
78 tokio::pin!(fut);
79 tokio::select! {
80 biased;
81 res = &mut fut => res,
82 () = tokio::time::sleep(d) => {
83 // Signal cancellation, then let the in-flight read consume the
84 // server's attention acknowledgement so the connection stays usable.
85 let drain = async {
86 let _ = canceller.cancel().await;
87 let _ = (&mut fut).await;
88 };
89 if tokio::time::timeout(ATTENTION_ACK_TIMEOUT, drain).await.is_err() {
90 tracing::warn!(
91 timeout = ?ATTENTION_ACK_TIMEOUT,
92 "server did not acknowledge attention; abandoning the connection as dirty"
93 );
94 }
95 Err(Error::CommandTimeout)
96 }
97 }
98}
99
100/// SQL Server client with type-state connection management.
101///
102/// The generic parameter `S` represents the current connection state,
103/// ensuring at compile time that certain operations are only available
104/// in appropriate states.
105pub struct Client<S: ConnectionState> {
106 config: Config,
107 _state: PhantomData<S>,
108 /// The underlying connection (present only when connected)
109 connection: Option<ConnectionHandle>,
110 /// Server version from LoginAck (raw u32 TDS version)
111 server_version: Option<u32>,
112 /// Current database from EnvChange
113 current_database: Option<String>,
114 /// Server's default collation from SqlCollation EnvChange during login.
115 /// Used when `SendStringParametersAsUnicode=false` to encode VARCHAR
116 /// parameters with the correct character encoding and collation bytes.
117 server_collation: Option<tds_protocol::token::Collation>,
118 /// Prepared statement cache for query optimization
119 statement_cache: StatementCache,
120 /// Transaction descriptor from BeginTransaction EnvChange.
121 /// Per MS-TDS spec, this value must be included in ALL_HEADERS for subsequent
122 /// requests within an explicit transaction. 0 indicates auto-commit mode.
123 transaction_descriptor: u64,
124 /// Whether a request has been sent and the response has not yet been fully read.
125 /// Used by the connection pool to detect dirty connections after cancel/timeout.
126 in_flight: bool,
127 /// Whether this connection needs a reset on next use.
128 /// Set by connection pool on checkin, cleared after first query/execute.
129 /// When true, the RESETCONNECTION flag is set on the first TDS packet.
130 needs_reset: bool,
131 /// OpenTelemetry instrumentation context (when otel feature is enabled)
132 #[cfg(feature = "otel")]
133 instrumentation: InstrumentationContext,
134 /// Always Encrypted context for column decryption (when always-encrypted feature is enabled)
135 #[cfg(feature = "always-encrypted")]
136 pub(crate) encryption_context: Option<std::sync::Arc<crate::encryption::EncryptionContext>>,
137}
138
139/// Internal connection handle wrapping the actual connection.
140///
141/// This is an enum to support different connection types:
142/// - TLS (TDS 8.0 strict mode) - requires `tls` feature
143/// - TLS with PreLogin wrapping (TDS 7.x style) - requires `tls` feature
144/// - Plain TCP (for internal networks or when `tls` feature is disabled)
145#[allow(dead_code)] // Connection will be used once query execution is implemented
146enum ConnectionHandle {
147 /// TLS connection (TDS 8.0 strict mode - TLS before any TDS traffic)
148 #[cfg(feature = "tls")]
149 Tls(Connection<TlsStream<TcpStream>>),
150 /// TLS connection with PreLogin wrapping (TDS 7.x style)
151 #[cfg(feature = "tls")]
152 TlsPrelogin(Connection<TlsStream<mssql_tls::TlsPreloginWrapper<TcpStream>>>),
153 /// Plain TCP connection (for internal networks or when `tls` feature is disabled)
154 Plain(Connection<TcpStream>),
155}
156
157/// The parameter `TypeInfo` to declare a typed NULL ([`crate::null`]) with, from
158/// its [`crate::ToSql::sql_type`] name. Returns `None` for an untyped NULL
159/// (`Option::None`, type `"NULL"`), which falls back to the default param type.
160#[cfg(feature = "always-encrypted")]
161fn null_param_type_info(sql_type: &str) -> Option<tds_protocol::rpc::TypeInfo> {
162 use tds_protocol::rpc::TypeInfo;
163 Some(match sql_type {
164 "BIT" => TypeInfo::bit(),
165 "TINYINT" => TypeInfo::tinyint(),
166 "SMALLINT" => TypeInfo::smallint(),
167 "INT" => TypeInfo::int(),
168 "BIGINT" => TypeInfo::bigint(),
169 "REAL" => TypeInfo::real(),
170 "FLOAT" => TypeInfo::float(),
171 "NVARCHAR" => TypeInfo::nvarchar(1),
172 "VARBINARY" => TypeInfo::varbinary(1),
173 "UNIQUEIDENTIFIER" => TypeInfo::uuid(),
174 "DATE" => TypeInfo::date(),
175 _ => return None,
176 })
177}
178
179/// Map a typed-parameter wrapper's [`EncryptedParamType`] to the `TypeInfo` the
180/// driver declares it as (for `sp_describe_parameter_encryption` and the
181/// `CryptoMetadata` base type). Unknown future variants error rather than
182/// silently declaring the wrong type.
183#[cfg(feature = "always-encrypted")]
184fn encrypted_param_type_info(
185 ty: mssql_types::EncryptedParamType,
186) -> Result<tds_protocol::rpc::TypeInfo> {
187 use mssql_types::EncryptedParamType as E;
188 use tds_protocol::rpc::TypeInfo;
189 Ok(match ty {
190 E::Decimal { precision, scale } => TypeInfo::decimal(precision, scale),
191 E::Time { scale } => TypeInfo::time(scale),
192 E::DateTime2 { scale } => TypeInfo::datetime2(scale),
193 E::DateTimeOffset { scale } => TypeInfo::datetimeoffset(scale),
194 E::DateTime => TypeInfo::datetime(),
195 E::Char { length } => TypeInfo::char(length),
196 E::NChar { length } => TypeInfo::nchar(length),
197 E::Binary { length } => TypeInfo::binary(length),
198 _ => {
199 return Err(Error::Encryption(
200 "unsupported Always Encrypted parameter type".to_string(),
201 ));
202 }
203 })
204}
205
206// Private helper methods available to all connection states
207impl<S: ConnectionState> Client<S> {
208 /// The default per-command deadline from `command_timeout`.
209 ///
210 /// Returns `None` when `command_timeout` is zero, which means "no limit"
211 /// (matching ADO.NET's `SqlCommand.CommandTimeout = 0`).
212 pub(crate) fn command_deadline(&self) -> Option<std::time::Duration> {
213 let t = self.config.command_timeout;
214 if t.is_zero() { None } else { Some(t) }
215 }
216
217 /// Build a cancel handle for the current connection, regardless of
218 /// connection state. The public, documented surface is
219 /// [`Client::<Ready>::cancel_handle`]; both state-specific methods
220 /// delegate here.
221 pub(crate) fn connection_cancel_handle(&self) -> crate::cancel::CancelHandle {
222 let connection = self
223 .connection
224 .as_ref()
225 .expect("connection should be present");
226 match connection {
227 #[cfg(feature = "tls")]
228 ConnectionHandle::Tls(conn) => {
229 crate::cancel::CancelHandle::from_tls(conn.cancel_handle())
230 }
231 #[cfg(feature = "tls")]
232 ConnectionHandle::TlsPrelogin(conn) => {
233 crate::cancel::CancelHandle::from_tls_prelogin(conn.cancel_handle())
234 }
235 ConnectionHandle::Plain(conn) => {
236 crate::cancel::CancelHandle::from_plain(conn.cancel_handle())
237 }
238 }
239 }
240
241 /// Cancel an in-flight response that was abandoned without being drained —
242 /// e.g. a [`RowStream`](crate::RowStream) dropped or cancelled mid-result.
243 ///
244 /// Sends an Attention and drains to the server's DONE_ATTN acknowledgement so
245 /// the socket is clean and the connection reusable. A no-op when nothing is
246 /// in flight. Bounded by [`ATTENTION_ACK_TIMEOUT`]: if the acknowledgement
247 /// never arrives the connection is left marked in-flight (so the pool
248 /// discards it on return) and an error is returned.
249 pub(crate) async fn cancel_in_flight_response(&mut self) -> Result<()> {
250 if !self.in_flight {
251 return Ok(());
252 }
253 let canceller = self.connection_cancel_handle();
254 let drain = async {
255 canceller.cancel().await?;
256 // With the cancelling flag set, `read_response_message` routes through
257 // the codec's drain-after-cancel path and returns `Err(Cancelled)`
258 // once the DONE_ATTN acknowledgement is consumed (clearing
259 // `in_flight`). Any full messages that arrive before the ack are
260 // discarded.
261 loop {
262 match self.read_response_message().await {
263 Err(Error::Cancelled) => return Ok(()),
264 Ok(_) => continue,
265 Err(e) => return Err(e),
266 }
267 }
268 };
269 match tokio::time::timeout(ATTENTION_ACK_TIMEOUT, drain).await {
270 Ok(result) => result,
271 Err(_) => {
272 tracing::warn!(
273 timeout = ?ATTENTION_ACK_TIMEOUT,
274 "attention acknowledgement not received while cancelling an \
275 abandoned response; connection left dirty"
276 );
277 Err(Error::Cancelled)
278 }
279 }
280 }
281
282 /// Process transaction-related EnvChange tokens.
283 ///
284 /// This handles BeginTransaction, CommitTransaction, and RollbackTransaction
285 /// EnvChange tokens, updating the transaction descriptor accordingly.
286 ///
287 /// This enables executing BEGIN TRANSACTION, COMMIT, and ROLLBACK via raw SQL
288 /// while still having the transaction descriptor tracked correctly.
289 fn process_transaction_env_change(env: &EnvChange, transaction_descriptor: &mut u64) {
290 use tds_protocol::token::EnvChangeValue;
291
292 match env.env_type {
293 EnvChangeType::BeginTransaction => {
294 if let EnvChangeValue::Binary(ref data) = env.new_value {
295 if data.len() >= 8 {
296 let descriptor = u64::from_le_bytes([
297 data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
298 ]);
299 tracing::debug!(descriptor = descriptor, "transaction started via raw SQL");
300 *transaction_descriptor = descriptor;
301 }
302 }
303 }
304 EnvChangeType::CommitTransaction | EnvChangeType::RollbackTransaction => {
305 tracing::debug!(
306 env_type = ?env.env_type,
307 "transaction ended via raw SQL"
308 );
309 *transaction_descriptor = 0;
310 }
311 _ => {}
312 }
313 }
314
315 /// Apply a transaction-related `ENVCHANGE` to this client's descriptor.
316 ///
317 /// Lets the streaming readers (which live in sibling modules) keep the
318 /// transaction descriptor in sync with raw `BEGIN`/`COMMIT`/`ROLLBACK`
319 /// batches seen mid-stream, exactly as the buffered readers do.
320 pub(crate) fn apply_transaction_env_change(&mut self, env: &EnvChange) {
321 Self::process_transaction_env_change(env, &mut self.transaction_descriptor);
322 }
323
324 /// Send a SQL batch to the server.
325 ///
326 /// Uses the client's current transaction descriptor in ALL_HEADERS.
327 /// Per MS-TDS spec, when in an explicit transaction, the descriptor
328 /// returned by BeginTransaction must be included.
329 ///
330 /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
331 /// is included in the first packet to reset connection state.
332 async fn send_sql_batch(&mut self, sql: &str) -> Result<()> {
333 // If a previous streamed response was abandoned (a RowStream dropped
334 // mid-result), drain it before issuing a new request so the next read
335 // does not pick up the old response's bytes.
336 self.cancel_in_flight_response().await?;
337
338 let payload = tds_protocol::__private::encode_sql_batch_with_transaction(
339 sql,
340 self.transaction_descriptor,
341 );
342 let max_packet = self.config.packet_size as usize;
343
344 // Check if we need to reset the connection on this request
345 let reset = self.needs_reset;
346 if reset {
347 self.needs_reset = false; // Clear flag before sending
348 tracing::debug!("sending SQL batch with RESETCONNECTION flag");
349 }
350
351 self.in_flight = true;
352 let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
353
354 match connection {
355 #[cfg(feature = "tls")]
356 ConnectionHandle::Tls(conn) => {
357 conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
358 .await?;
359 }
360 #[cfg(feature = "tls")]
361 ConnectionHandle::TlsPrelogin(conn) => {
362 conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
363 .await?;
364 }
365 ConnectionHandle::Plain(conn) => {
366 conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
367 .await?;
368 }
369 }
370
371 Ok(())
372 }
373
374 /// Send an RPC request to the server.
375 ///
376 /// Uses the client's current transaction descriptor in ALL_HEADERS.
377 ///
378 /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
379 /// is included in the first packet to reset connection state.
380 pub(crate) async fn send_rpc(&mut self, rpc: &RpcRequest) -> Result<()> {
381 // Drain an abandoned streamed response (see `send_sql_batch`) before
382 // issuing this request.
383 self.cancel_in_flight_response().await?;
384
385 let payload = rpc.encode_with_transaction(self.transaction_descriptor);
386 let max_packet = self.config.packet_size as usize;
387
388 // Check if we need to reset the connection on this request
389 let reset = self.needs_reset;
390 if reset {
391 self.needs_reset = false; // Clear flag before sending
392 tracing::debug!("sending RPC with RESETCONNECTION flag");
393 }
394
395 self.in_flight = true;
396 let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
397
398 match connection {
399 #[cfg(feature = "tls")]
400 ConnectionHandle::Tls(conn) => {
401 conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
402 .await?;
403 }
404 #[cfg(feature = "tls")]
405 ConnectionHandle::TlsPrelogin(conn) => {
406 conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
407 .await?;
408 }
409 ConnectionHandle::Plain(conn) => {
410 conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
411 .await?;
412 }
413 }
414
415 Ok(())
416 }
417
418 /// Start building a stored procedure call with full control over parameters.
419 ///
420 /// Returns a [`crate::procedure::ProcedureBuilder`] that allows adding named input and output
421 /// parameters before executing the call.
422 ///
423 /// The procedure name is validated to prevent SQL injection. It may be
424 /// schema-qualified (e.g., `"dbo.MyProc"`).
425 ///
426 /// # Example
427 ///
428 /// ```rust,no_run
429 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
430 /// let result = client.procedure("dbo.CalculateSum")?
431 /// .input("@a", &10i32)
432 /// .input("@b", &20i32)
433 /// .output_int("@result")
434 /// .execute().await?;
435 ///
436 /// let sum = result.get_output("@result").unwrap();
437 /// # let _ = sum;
438 /// # Ok(())
439 /// # }
440 /// ```
441 pub fn procedure(
442 &mut self,
443 proc_name: &str,
444 ) -> Result<crate::procedure::ProcedureBuilder<'_, S>> {
445 crate::validation::validate_qualified_identifier(proc_name)?;
446 Ok(crate::procedure::ProcedureBuilder::new(self, proc_name))
447 }
448
449 /// Execute a stored procedure with positional input parameters.
450 ///
451 /// This is a convenience method for the common case of calling a procedure
452 /// with input-only parameters. For output parameters or named parameters,
453 /// use [`procedure()`](Client::procedure) instead.
454 ///
455 /// # Example
456 ///
457 /// ```rust,no_run
458 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
459 /// let result = client.call_procedure("dbo.GetUser", &[&1i32]).await?;
460 /// assert_eq!(result.return_value, 0);
461 ///
462 /// if let Some(rs) = result.first_result_set() {
463 /// println!("columns: {:?}", rs.columns());
464 /// }
465 /// # Ok(())
466 /// # }
467 /// ```
468 pub async fn call_procedure(
469 &mut self,
470 proc_name: &str,
471 params: &[&(dyn crate::ToSql + Sync)],
472 ) -> Result<crate::stream::ProcedureResult> {
473 crate::validation::validate_qualified_identifier(proc_name)?;
474
475 tracing::debug!(
476 proc_name = proc_name,
477 params_count = params.len(),
478 "executing stored procedure"
479 );
480
481 let rpc_params =
482 Self::convert_params_positional(params, self.send_unicode(), self.server_collation())?;
483 let mut rpc = RpcRequest::named(proc_name);
484 for param in rpc_params {
485 rpc = rpc.param(param);
486 }
487
488 let deadline = self.command_deadline();
489 let canceller = self.connection_cancel_handle();
490 run_with_deadline(
491 async {
492 self.send_rpc(&rpc).await?;
493 self.read_procedure_result().await
494 },
495 deadline,
496 canceller,
497 )
498 .await
499 }
500
501 /// Ask the server how each parameter of a statement must be encrypted.
502 ///
503 /// Issues the `sp_describe_parameter_encryption` system RPC for the
504 /// parameterized statement `tsql` with the parameter declaration `params`
505 /// (e.g. `"@id int, @name nvarchar(64)"`), and parses the two result sets
506 /// into a [`ParameterEncryptionInfo`](crate::encryption::ParameterEncryptionInfo): the
507 /// CEK table, plus — for each parameter the server reports as encrypted —
508 /// which CEK and whether deterministic or randomized. Parameters the server
509 /// reports as plaintext are omitted.
510 ///
511 /// This is the first step of Always Encrypted parameter encryption; the
512 /// connection must have negotiated it (`Column Encryption Setting=Enabled`).
513 #[cfg(feature = "always-encrypted")]
514 pub(crate) async fn describe_parameter_encryption(
515 &mut self,
516 tsql: &str,
517 params: &str,
518 ) -> Result<crate::encryption::ParameterEncryptionInfo> {
519 let tsql_arg = tsql.to_string();
520 let params_arg = params.to_string();
521 let mut result = self
522 .call_procedure(
523 "sp_describe_parameter_encryption",
524 &[&tsql_arg, ¶ms_arg],
525 )
526 .await?;
527 crate::encryption::ParameterEncryptionInfo::from_describe_result_sets(
528 &mut result.result_sets,
529 )
530 }
531
532 /// Build the `sp_executesql` request for a parameterized statement.
533 ///
534 /// When the connection has Always Encrypted enabled, parameters the server
535 /// reports as encrypted are encrypted client-side first (an extra
536 /// `sp_describe_parameter_encryption` round-trip). Otherwise this is the
537 /// plain parameter conversion.
538 pub(crate) async fn build_parameterized_rpc(
539 &mut self,
540 sql: &str,
541 params: &[&(dyn crate::ToSql + Sync)],
542 ) -> Result<RpcRequest> {
543 #[cfg(feature = "always-encrypted")]
544 if self.encryption_context.is_some() {
545 return self.build_encrypted_sql_rpc(sql, params).await;
546 }
547 let rpc_params =
548 Self::convert_params(params, self.send_unicode(), self.server_collation())?;
549 Ok(RpcRequest::execute_sql(sql, rpc_params))
550 }
551
552 /// Encrypt the Always Encrypted parameters of a statement, then build its
553 /// `sp_executesql` request.
554 ///
555 /// Asks the server which parameters are encrypted
556 /// ([`describe_parameter_encryption`](Self::describe_parameter_encryption)),
557 /// then for each one normalizes the value, resolves its column encryption
558 /// key, encrypts, and emits an encrypted RPC parameter. Parameters the
559 /// server reports as plaintext are sent unchanged.
560 #[cfg(feature = "always-encrypted")]
561 async fn build_encrypted_sql_rpc(
562 &mut self,
563 sql: &str,
564 params: &[&(dyn crate::ToSql + Sync)],
565 ) -> Result<RpcRequest> {
566 use tds_protocol::rpc::RpcParam;
567
568 let Some(ctx) = self.encryption_context.clone() else {
569 let rpc_params =
570 Self::convert_params(params, self.send_unicode(), self.server_collation())?;
571 return Ok(RpcRequest::execute_sql(sql, rpc_params));
572 };
573
574 // Resolve each parameter's value once (AE normalization needs the typed
575 // value, not the wire encoding) and build the plaintext RPC params.
576 let send_unicode = self.send_unicode();
577 let collation = self.server_collation().cloned();
578 let mut values: Vec<mssql_types::SqlValue> = Vec::with_capacity(params.len());
579 let mut plaintext: Vec<RpcParam> = Vec::with_capacity(params.len());
580 let mut hints: Vec<Option<mssql_types::EncryptedParamType>> =
581 Vec::with_capacity(params.len());
582 for (i, p) in params.iter().enumerate() {
583 let name = format!("@p{}", i + 1);
584 let value = p.to_sql()?;
585 let hint = p.encrypted_param_type();
586 // A typed NULL (e.g. `null::<i32>()`) is declared by its SQL type so
587 // describe accepts it against the target encrypted column; an untyped
588 // NULL falls back to the default in `sql_value_to_rpc_param`.
589 let rpc_param = match (&value, null_param_type_info(p.sql_type())) {
590 (mssql_types::SqlValue::Null, Some(type_info)) => RpcParam::null(&name, type_info),
591 _ => {
592 let mut param = Self::sql_value_to_rpc_param(
593 &name,
594 &value,
595 send_unicode,
596 collation.as_ref(),
597 )?;
598 // A typed-parameter wrapper (e.g. `numeric(v, p, s)`,
599 // `datetime2(v, scale)`) declares an explicit SQL type so
600 // describe matches the encrypted column exactly — the value
601 // alone cannot convey precision/scale or the legacy-`datetime`
602 // vs `datetime2` distinction.
603 if let Some(ty) = hint {
604 param.type_info = encrypted_param_type_info(ty)?;
605 }
606 param
607 }
608 };
609 plaintext.push(rpc_param);
610 values.push(value);
611 hints.push(hint);
612 }
613
614 if plaintext.is_empty() {
615 return Ok(RpcRequest::execute_sql(sql, plaintext));
616 }
617
618 // Ask the server which parameters need encryption.
619 let declarations = RpcRequest::build_param_declarations(&plaintext);
620 let info = self
621 .describe_parameter_encryption(sql, &declarations)
622 .await?;
623 if info.parameters.is_empty() {
624 return Ok(RpcRequest::execute_sql(sql, plaintext));
625 }
626
627 // Encrypt the flagged parameters; pass the rest through untouched.
628 let mut final_params: Vec<RpcParam> = Vec::with_capacity(plaintext.len());
629 for ((value, param), hint) in values.into_iter().zip(plaintext).zip(hints) {
630 let Some(crypto) = info.get_parameter(¶m.name) else {
631 final_params.push(param);
632 continue;
633 };
634 let entry = info.cek_table.get(crypto.cek_ordinal).ok_or_else(|| {
635 Error::Protocol(format!(
636 "encrypted parameter {} references missing CEK ordinal {}",
637 param.name, crypto.cek_ordinal
638 ))
639 })?;
640 let metadata = tds_protocol::rpc::EncryptedParamMetadata {
641 base_type_info: param.type_info.clone(),
642 algorithm_id: crypto.algorithm_id,
643 encryption_type: crypto.encryption_type,
644 database_id: entry.database_id,
645 cek_id: entry.cek_id,
646 cek_version: entry.cek_version,
647 cek_md_version: entry.cek_md_version,
648 normalization_rule_version: crypto.normalization_rule_version,
649 };
650 // A NULL value bound to an encrypted column is sent as an encrypted
651 // NULL (the server rejects a plaintext parameter for an encrypted
652 // column); there is nothing to encrypt.
653 if matches!(value, mssql_types::SqlValue::Null) {
654 final_params.push(RpcParam::encrypted_null(param.name, metadata));
655 continue;
656 }
657 let normalized = crate::encryption::normalize_for_encryption(&value, hint)?;
658 let ciphertext = ctx
659 .encrypt_value(&normalized, entry, crypto.encryption_type)
660 .await?;
661 final_params.push(RpcParam::encrypted(
662 param.name,
663 bytes::Bytes::from(ciphertext),
664 metadata,
665 ));
666 }
667
668 Ok(RpcRequest::execute_sql(sql, final_params))
669 }
670
671 /// Start a bulk insert operation for the specified table.
672 ///
673 /// Sends the `INSERT BULK` statement to the server and returns a
674 /// [`crate::bulk::BulkWriter`] for streaming rows. The writer holds
675 /// a mutable borrow on the client, preventing other operations while
676 /// the bulk insert is in progress.
677 ///
678 /// # Example
679 ///
680 /// ```rust,no_run
681 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
682 /// use mssql_client::{BulkInsertBuilder, BulkColumn, SqlValue};
683 ///
684 /// let builder = BulkInsertBuilder::new("dbo.Users")
685 /// .with_typed_columns(vec![
686 /// BulkColumn::new("id", "INT", 0)?,
687 /// BulkColumn::new("name", "NVARCHAR(100)", 1)?,
688 /// ]);
689 ///
690 /// let mut writer = client.bulk_insert(&builder).await?;
691 /// writer.send_row_values(&[SqlValue::Int(1), SqlValue::String("Alice".into())])?;
692 /// writer.send_row_values(&[SqlValue::Int(2), SqlValue::String("Bob".into())])?;
693 /// let result = writer.finish().await?;
694 /// println!("Inserted {} rows", result.rows_affected);
695 /// # Ok(())
696 /// # }
697 /// ```
698 pub async fn bulk_insert(
699 &mut self,
700 builder: &crate::bulk::BulkInsertBuilder,
701 ) -> Result<crate::bulk::BulkWriter<'_, S>> {
702 use tds_protocol::token::{ColMetaData, Token};
703
704 tracing::debug!(
705 table = builder.table_name(),
706 columns = builder.columns().len(),
707 "starting bulk insert"
708 );
709
710 // Step 1: Query the server for column metadata.
711 // This gives us the exact type encoding the server expects for BulkLoad,
712 // following the pattern established by Tiberius.
713 let meta_query = format!("SELECT TOP 0 * FROM {}", builder.table_name());
714 let deadline = self.command_deadline();
715 let canceller = self.connection_cancel_handle();
716 let message = run_with_deadline(
717 async {
718 self.send_sql_batch(&meta_query).await?;
719 self.read_response_message().await
720 },
721 deadline,
722 canceller,
723 )
724 .await?;
725 self.in_flight = false;
726
727 // Capture both the raw COLMETADATA bytes and parsed column info
728 let raw_payload = message.payload.clone();
729 let mut parser = self.create_parser(message.payload);
730 let mut server_metadata: Option<ColMetaData> = None;
731 let mut meta_start: usize = 0;
732 let mut meta_end: usize = 0;
733
734 loop {
735 let pos_before = raw_payload.len() - parser.remaining();
736 let token = parser.next_token_with_metadata(server_metadata.as_ref())?;
737 let pos_after = raw_payload.len() - parser.remaining();
738 let Some(token) = token else { break };
739
740 match token {
741 Token::ColMetaData(meta) => {
742 meta_start = pos_before;
743 meta_end = pos_after;
744 server_metadata = Some(meta);
745 }
746 Token::Done(_) => break,
747 _ => {}
748 }
749 }
750
751 // Reject deprecated TEXT/NTEXT/IMAGE columns reported by the server.
752 // These types require a legacy TEXTPTR wire format that this driver
753 // does not support — users should migrate the column to VARCHAR(MAX) /
754 // NVARCHAR(MAX) / VARBINARY(MAX).
755 if let Some(ref meta) = server_metadata {
756 use tds_protocol::types::TypeId;
757 for col in meta.columns.iter() {
758 let (rejected, replacement) = match col.type_id {
759 TypeId::Text => (Some("TEXT"), "VARCHAR(MAX)"),
760 TypeId::NText => (Some("NTEXT"), "NVARCHAR(MAX)"),
761 TypeId::Image => (Some("IMAGE"), "VARBINARY(MAX)"),
762 _ => (None, ""),
763 };
764 if let Some(sql_type) = rejected {
765 return Err(Error::from(mssql_types::TypeError::UnsupportedType {
766 sql_type: sql_type.to_string(),
767 reason: format!(
768 "column `{}` in table `{}` is {} — TEXT/NTEXT/IMAGE \
769 are not supported. Alter the column to {} instead \
770 (Microsoft deprecated TEXT/NTEXT/IMAGE in SQL \
771 Server 2005).",
772 col.name,
773 builder.table_name(),
774 sql_type,
775 replacement,
776 ),
777 }));
778 }
779 }
780 }
781
782 // Step 2: Send INSERT BULK statement to put server in bulk load mode
783 let stmt = builder.build_insert_bulk_statement()?;
784 let deadline = self.command_deadline();
785 let canceller = self.connection_cancel_handle();
786 run_with_deadline(
787 async {
788 self.send_sql_batch(&stmt).await?;
789 self.read_execute_result().await
790 },
791 deadline,
792 canceller,
793 )
794 .await?;
795
796 // Step 3: Create bulk writer with server's metadata
797 let raw_meta = if meta_end > meta_start {
798 Some(raw_payload.slice(meta_start..meta_end))
799 } else {
800 None
801 };
802
803 let server_cols = server_metadata.as_ref().map(|m| m.columns.as_slice());
804 let bulk = crate::bulk::BulkInsert::new_with_server_metadata(
805 builder.columns().to_vec(),
806 builder.options().batch_size,
807 raw_meta,
808 server_cols,
809 );
810
811 Ok(crate::bulk::BulkWriter::new(self, bulk))
812 }
813
814 /// Start a bulk insert without querying the server for column metadata.
815 ///
816 /// Unlike [`bulk_insert()`](Self::bulk_insert), this method does not send
817 /// `SELECT TOP 0 * FROM table` to discover column types. Instead, the
818 /// column metadata is constructed from the `BulkColumn` types provided
819 /// on the builder. This saves a round-trip when the schema is known.
820 ///
821 /// # Caveats
822 ///
823 /// The caller must ensure `BulkColumn` entries match the target table's
824 /// column definitions exactly. Mismatched types, lengths, precision/scale,
825 /// or column ordering will cause the server to reject the BulkLoad packet.
826 ///
827 /// For most use cases, prefer [`bulk_insert()`](Self::bulk_insert) — the
828 /// extra round-trip is usually negligible and the server-supplied metadata
829 /// is guaranteed correct.
830 pub async fn bulk_insert_without_schema_discovery(
831 &mut self,
832 builder: &crate::bulk::BulkInsertBuilder,
833 ) -> Result<crate::bulk::BulkWriter<'_, S>> {
834 tracing::debug!(
835 table = builder.table_name(),
836 columns = builder.columns().len(),
837 "starting bulk insert (no schema discovery)"
838 );
839
840 // Send INSERT BULK statement to put server in bulk load mode
841 let stmt = builder.build_insert_bulk_statement()?;
842 let deadline = self.command_deadline();
843 let canceller = self.connection_cancel_handle();
844 run_with_deadline(
845 async {
846 self.send_sql_batch(&stmt).await?;
847 self.read_execute_result().await
848 },
849 deadline,
850 canceller,
851 )
852 .await?;
853
854 // Create bulk writer with hand-crafted metadata
855 let bulk =
856 crate::bulk::BulkInsert::new(builder.columns().to_vec(), builder.options().batch_size);
857
858 Ok(crate::bulk::BulkWriter::new(self, bulk))
859 }
860
861 /// Send bulk load data as a BulkLoad (0x07) message and read the server response.
862 ///
863 /// Used internally by [`crate::bulk::BulkWriter::finish()`] to transmit accumulated
864 /// row data after the `INSERT BULK` statement has been acknowledged.
865 pub(crate) async fn send_and_read_bulk_load(&mut self, payload: bytes::Bytes) -> Result<u64> {
866 let max_packet = self.config.packet_size as usize;
867
868 self.in_flight = true;
869 let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
870
871 match connection {
872 #[cfg(feature = "tls")]
873 ConnectionHandle::Tls(conn) => {
874 conn.send_message(PacketType::BulkLoad, payload, max_packet)
875 .await?;
876 }
877 #[cfg(feature = "tls")]
878 ConnectionHandle::TlsPrelogin(conn) => {
879 conn.send_message(PacketType::BulkLoad, payload, max_packet)
880 .await?;
881 }
882 ConnectionHandle::Plain(conn) => {
883 conn.send_message(PacketType::BulkLoad, payload, max_packet)
884 .await?;
885 }
886 }
887
888 // Read the server's Done response with row count
889 self.read_execute_result().await
890 }
891
892 /// Execute a query with named parameters and return a streaming result set.
893 ///
894 /// This method accepts [`NamedParam`](crate::to_params::NamedParam) values,
895 /// making it compatible with the [`ToParams`](crate::to_params::ToParams) trait
896 /// and the `#[derive(ToParams)]` macro.
897 ///
898 /// # Example
899 ///
900 /// ```rust,no_run
901 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
902 /// use mssql_client::{NamedParam, ToParams};
903 ///
904 /// // With derive macro:
905 /// #[derive(mssql_derive::ToParams)]
906 /// struct UserQuery { name: String }
907 ///
908 /// let q = UserQuery { name: "Alice".into() };
909 /// let rows = client.query_named(
910 /// "SELECT * FROM users WHERE name = @name",
911 /// &q.to_params()?,
912 /// ).await?;
913 ///
914 /// // Or manually:
915 /// let params = vec![NamedParam::from_value("name", &"Alice")?];
916 /// let rows = client.query_named(
917 /// "SELECT * FROM users WHERE name = @name",
918 /// ¶ms,
919 /// ).await?;
920 /// # let _ = rows;
921 /// # Ok(())
922 /// # }
923 /// ```
924 pub async fn query_named<'a>(
925 &'a mut self,
926 sql: &str,
927 params: &[crate::to_params::NamedParam],
928 ) -> Result<QueryStream<'a>> {
929 tracing::debug!(
930 sql = sql,
931 params_count = params.len(),
932 "executing query with named parameters"
933 );
934
935 if params.is_empty() {
936 self.send_sql_batch(sql).await?;
937 } else {
938 let rpc_params =
939 Self::convert_named_params(params, self.send_unicode(), self.server_collation())?;
940 let rpc = RpcRequest::execute_sql(sql, rpc_params);
941 self.send_rpc(&rpc).await?;
942 }
943
944 let resp = self.read_query_response().await?;
945 #[cfg(feature = "always-encrypted")]
946 {
947 Ok(QueryStream::from_raw(
948 resp.columns,
949 resp.pending_rows,
950 resp.meta,
951 resp.decryptor,
952 ))
953 }
954 #[cfg(not(feature = "always-encrypted"))]
955 {
956 Ok(QueryStream::from_raw(
957 resp.columns,
958 resp.pending_rows,
959 resp.meta,
960 ))
961 }
962 }
963
964 /// Execute a statement with named parameters.
965 ///
966 /// Returns the number of affected rows. This is the named-parameter
967 /// counterpart of [`execute()`](Client::execute), compatible with the
968 /// [`ToParams`](crate::to_params::ToParams) trait.
969 ///
970 /// # Example
971 ///
972 /// ```rust,no_run
973 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
974 /// use mssql_client::NamedParam;
975 ///
976 /// let params = vec![
977 /// NamedParam::from_value("name", &"Alice")?,
978 /// NamedParam::from_value("email", &"alice@example.com")?,
979 /// ];
980 /// let rows_affected = client.execute_named(
981 /// "INSERT INTO users (name, email) VALUES (@name, @email)",
982 /// ¶ms,
983 /// ).await?;
984 /// # let _ = rows_affected;
985 /// # Ok(())
986 /// # }
987 /// ```
988 pub async fn execute_named(
989 &mut self,
990 sql: &str,
991 params: &[crate::to_params::NamedParam],
992 ) -> Result<u64> {
993 tracing::debug!(
994 sql = sql,
995 params_count = params.len(),
996 "executing statement with named parameters"
997 );
998
999 let deadline = self.command_deadline();
1000 let canceller = self.connection_cancel_handle();
1001 run_with_deadline(
1002 async {
1003 if params.is_empty() {
1004 self.send_sql_batch(sql).await?;
1005 } else {
1006 let rpc_params = Self::convert_named_params(
1007 params,
1008 self.send_unicode(),
1009 self.server_collation(),
1010 )?;
1011 let rpc = RpcRequest::execute_sql(sql, rpc_params);
1012 self.send_rpc(&rpc).await?;
1013 }
1014
1015 self.read_execute_result().await
1016 },
1017 deadline,
1018 canceller,
1019 )
1020 .await
1021 }
1022
1023 /// Whether string parameters are sent as NVARCHAR (Unicode).
1024 pub(crate) fn send_unicode(&self) -> bool {
1025 self.config.send_string_parameters_as_unicode
1026 }
1027
1028 /// Server's default collation, captured from ENVCHANGE during login.
1029 pub(crate) fn server_collation(&self) -> Option<&tds_protocol::token::Collation> {
1030 self.server_collation.as_ref()
1031 }
1032
1033 /// Shared implementation behind `query_stream` for both `Ready` and
1034 /// `InTransaction`. Sends the request, then pulls packets until the first
1035 /// result set's `ColMetaData` (resolving columns and any Always Encrypted
1036 /// decryptor up front) before handing back a [`RowStream`].
1037 pub(crate) async fn query_stream_inner<'a>(
1038 &'a mut self,
1039 sql: &str,
1040 params: &[&(dyn crate::ToSql + Sync)],
1041 ) -> Result<crate::row_stream::RowStream<'a, S>> {
1042 use crate::client::response::server_token_to_error;
1043 use crate::row_source::{Pull, RowSource};
1044 use tds_protocol::token::Token;
1045
1046 tracing::debug!(sql = sql, params_count = params.len(), "streaming query");
1047
1048 // Send the request (same wire format as the buffered path).
1049 if params.is_empty() {
1050 self.send_sql_batch(sql).await?;
1051 } else {
1052 let rpc = self.build_parameterized_rpc(sql, params).await?;
1053 self.send_rpc(&rpc).await?;
1054 }
1055 self.in_flight = true;
1056
1057 #[cfg(feature = "always-encrypted")]
1058 let encryption_enabled = self.encryption_context.is_some();
1059 #[cfg(not(feature = "always-encrypted"))]
1060 let encryption_enabled = false;
1061
1062 let mut source = RowSource::new(encryption_enabled);
1063
1064 // Prelude: pull packets until the first result set's ColMetaData (so the
1065 // columns and any Always Encrypted decryptor are resolved up front), or
1066 // until a terminal Done/Error if there is no result set.
1067 loop {
1068 match source.pull()? {
1069 Pull::Token(Token::ColMetaData(meta)) => {
1070 let columns = Self::build_columns(&meta);
1071 #[cfg(feature = "always-encrypted")]
1072 let decryptor = self
1073 .resolve_decryptor(&meta)
1074 .await?
1075 .map(std::sync::Arc::new);
1076 return Ok(crate::row_stream::RowStream::new(
1077 self,
1078 source,
1079 columns,
1080 meta,
1081 #[cfg(feature = "always-encrypted")]
1082 decryptor,
1083 ));
1084 }
1085 Pull::Token(Token::Error(err)) => {
1086 self.in_flight = false;
1087 return Err(server_token_to_error(&err));
1088 }
1089 Pull::Token(Token::Done(done)) => {
1090 if done.status.error {
1091 self.in_flight = false;
1092 return Err(Error::Query(
1093 "query failed (server set error flag in DONE token)".to_string(),
1094 ));
1095 }
1096 if !done.status.more {
1097 // No result set (e.g. an INSERT) — an empty stream.
1098 self.in_flight = false;
1099 return Ok(crate::row_stream::RowStream::empty(self));
1100 }
1101 // More results may follow; keep looking for ColMetaData.
1102 }
1103 Pull::Token(Token::EnvChange(env)) => {
1104 Self::process_transaction_env_change(&env, &mut self.transaction_descriptor);
1105 }
1106 Pull::Token(_) => {
1107 // Info / Order / DoneProc / DoneInProc, etc. — keep pulling.
1108 }
1109 Pull::NeedMore => match self.read_response_packet().await? {
1110 Some((payload, is_eom)) => source.push_packet(payload, is_eom),
1111 None => {
1112 self.in_flight = false;
1113 return Err(Error::ConnectionClosed);
1114 }
1115 },
1116 Pull::End => {
1117 self.in_flight = false;
1118 return Ok(crate::row_stream::RowStream::empty(self));
1119 }
1120 }
1121 }
1122 }
1123
1124 /// Shared implementation behind `query_stream_blob` for both `Ready` and
1125 /// `InTransaction`.
1126 pub(crate) async fn query_stream_blob_inner<'a>(
1127 &'a mut self,
1128 sql: &str,
1129 params: &[&(dyn crate::ToSql + Sync)],
1130 ) -> Result<crate::blob_stream::BlobStream<'a, S>> {
1131 use crate::client::response::server_token_to_error;
1132 use crate::row_source::{Pull, RowSource};
1133 use tds_protocol::token::Token;
1134
1135 if params.is_empty() {
1136 self.send_sql_batch(sql).await?;
1137 } else {
1138 let rpc = self.build_parameterized_rpc(sql, params).await?;
1139 self.send_rpc(&rpc).await?;
1140 }
1141 self.in_flight = true;
1142
1143 #[cfg(feature = "always-encrypted")]
1144 let encryption_enabled = self.encryption_context.is_some();
1145 #[cfg(not(feature = "always-encrypted"))]
1146 let encryption_enabled = false;
1147
1148 let mut source = RowSource::new(encryption_enabled);
1149
1150 loop {
1151 match source.pull()? {
1152 Pull::Token(Token::ColMetaData(meta)) => {
1153 let blob_index = Self::validate_blob_result_set(&meta)?;
1154 let (buf, eom) = source.into_parts();
1155 return Ok(crate::blob_stream::BlobStream::new(
1156 self,
1157 buf,
1158 eom,
1159 encryption_enabled,
1160 meta,
1161 blob_index,
1162 ));
1163 }
1164 Pull::Token(Token::Error(err)) => {
1165 self.in_flight = false;
1166 return Err(server_token_to_error(&err));
1167 }
1168 Pull::Token(Token::Done(_)) => {
1169 self.in_flight = false;
1170 return Err(Error::Protocol(
1171 "query_stream_blob: query produced no result set".to_string(),
1172 ));
1173 }
1174 Pull::Token(_) => {}
1175 Pull::NeedMore => match self.read_response_packet().await? {
1176 Some((payload, is_eom)) => source.push_packet(payload, is_eom),
1177 None => {
1178 self.in_flight = false;
1179 return Err(Error::ConnectionClosed);
1180 }
1181 },
1182 Pull::End => {
1183 self.in_flight = false;
1184 return Err(Error::Protocol(
1185 "query_stream_blob: query produced no result set".to_string(),
1186 ));
1187 }
1188 }
1189 }
1190 }
1191
1192 /// Validate that a result set is shaped for [`query_stream_blob`] and return
1193 /// the index of its single trailing MAX column.
1194 fn validate_blob_result_set(meta: &tds_protocol::token::ColMetaData) -> Result<usize> {
1195 if meta.cek_table.is_some() {
1196 return Err(Error::Protocol(
1197 "query_stream_blob does not support Always Encrypted result sets".to_string(),
1198 ));
1199 }
1200 let max_cols: Vec<usize> = meta
1201 .columns
1202 .iter()
1203 .enumerate()
1204 .filter(|(_, c)| crate::blob_stream::is_plp_max(c))
1205 .map(|(i, _)| i)
1206 .collect();
1207 match max_cols.as_slice() {
1208 [] => Err(Error::Protocol(
1209 "query_stream_blob: result set has no MAX column — use query_stream".to_string(),
1210 )),
1211 [idx] if *idx == meta.columns.len() - 1 => Ok(*idx),
1212 [_] => Err(Error::Protocol(
1213 "query_stream_blob: the MAX column must be the last column".to_string(),
1214 )),
1215 _ => Err(Error::Protocol(
1216 "query_stream_blob: result set has more than one MAX column".to_string(),
1217 )),
1218 }
1219 }
1220}
1221
1222impl Client<Ready> {
1223 /// Mark this connection as needing a reset on next use.
1224 ///
1225 /// Called by the connection pool when a connection is returned.
1226 /// The next SQL batch or RPC will include the RESETCONNECTION flag
1227 /// in the TDS packet header, causing SQL Server to reset connection
1228 /// state (temp tables, SET options, transaction isolation level, etc.)
1229 /// before executing the command.
1230 ///
1231 /// This is more efficient than calling `sp_reset_connection` as a
1232 /// separate command because it's handled at the TDS protocol level.
1233 pub fn mark_needs_reset(&mut self) {
1234 self.needs_reset = true;
1235 }
1236
1237 /// Check if this connection needs a reset.
1238 ///
1239 /// Returns true if `mark_needs_reset()` was called and the reset
1240 /// hasn't been performed yet.
1241 #[must_use]
1242 pub fn needs_reset(&self) -> bool {
1243 self.needs_reset
1244 }
1245
1246 /// Execute a query and return a result set with lazy per-row decoding.
1247 ///
1248 /// Per ADR-007 the full response is buffered in memory and each row is
1249 /// *decoded* on demand as you iterate — this is not incremental network
1250 /// streaming, so peak memory tracks the response size. Use
1251 /// `.collect_all()` if you want all rows materialized into a `Vec` up
1252 /// front.
1253 ///
1254 /// # Example
1255 ///
1256 /// ```rust,no_run
1257 /// # use mssql_client::Row;
1258 /// # fn process(_: &Row) {}
1259 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1260 /// // Streaming (synchronous iteration over the result set)
1261 /// let stream = client.query("SELECT * FROM users WHERE id = @p1", &[&1]).await?;
1262 /// for row in stream {
1263 /// let row = row?;
1264 /// process(&row);
1265 /// }
1266 ///
1267 /// // Buffered (loads all into memory)
1268 /// let rows: Vec<Row> = client
1269 /// .query("SELECT * FROM small_table", &[])
1270 /// .await?
1271 /// .collect_all()
1272 /// .await?;
1273 /// # let _ = rows;
1274 /// # Ok(())
1275 /// # }
1276 /// ```
1277 pub async fn query<'a>(
1278 &'a mut self,
1279 sql: &str,
1280 params: &[&(dyn crate::ToSql + Sync)],
1281 ) -> Result<QueryStream<'a>> {
1282 let deadline = self.command_deadline();
1283 self.query_inner(sql, params, deadline).await
1284 }
1285
1286 /// Shared query implementation with an explicit command deadline.
1287 async fn query_inner<'a>(
1288 &'a mut self,
1289 sql: &str,
1290 params: &[&(dyn crate::ToSql + Sync)],
1291 deadline: Option<std::time::Duration>,
1292 ) -> Result<QueryStream<'a>> {
1293 tracing::debug!(sql = sql, params_count = params.len(), "executing query");
1294
1295 #[cfg(feature = "otel")]
1296 let instrumentation = self.instrumentation.clone();
1297 #[cfg(feature = "otel")]
1298 let mut span = instrumentation.query_span(sql);
1299 #[cfg(feature = "otel")]
1300 let timer = crate::instrumentation::OperationTimer::start(
1301 crate::instrumentation::extract_operation(sql),
1302 );
1303
1304 let canceller = self.cancel_handle();
1305 let result = run_with_deadline(
1306 async {
1307 if params.is_empty() {
1308 // Simple query without parameters - use SQL batch
1309 self.send_sql_batch(sql).await?;
1310 } else {
1311 // Parameterized query - sp_executesql (encrypts Always Encrypted params).
1312 let rpc = self.build_parameterized_rpc(sql, params).await?;
1313 self.send_rpc(&rpc).await?;
1314 }
1315
1316 // Read complete response including columns and rows
1317 self.read_query_response().await
1318 },
1319 deadline,
1320 canceller,
1321 )
1322 .await;
1323
1324 #[cfg(feature = "otel")]
1325 match &result {
1326 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1327 Err(e) => InstrumentationContext::record_error(&mut span, e),
1328 }
1329 #[cfg(feature = "otel")]
1330 timer.finish(instrumentation.metrics(), result.is_ok());
1331
1332 // Drop the span before returning
1333 #[cfg(feature = "otel")]
1334 drop(span);
1335
1336 let resp = result?;
1337 #[cfg(feature = "always-encrypted")]
1338 {
1339 Ok(QueryStream::from_raw(
1340 resp.columns,
1341 resp.pending_rows,
1342 resp.meta,
1343 resp.decryptor,
1344 ))
1345 }
1346 #[cfg(not(feature = "always-encrypted"))]
1347 {
1348 Ok(QueryStream::from_raw(
1349 resp.columns,
1350 resp.pending_rows,
1351 resp.meta,
1352 ))
1353 }
1354 }
1355
1356 /// Execute a query and stream rows incrementally from the network.
1357 ///
1358 /// Unlike [`query`](Self::query) — which buffers the whole response in
1359 /// memory before returning — this reads TDS packets on demand as rows are
1360 /// pulled, so peak memory is roughly one packet plus one row regardless of
1361 /// result-set size. Use it for large result sets; use [`query`](Self::query)
1362 /// for the common small-result case where the buffered, synchronously
1363 /// iterable [`QueryStream`] is more convenient.
1364 ///
1365 /// The returned [`RowStream`](crate::RowStream) borrows the client for its
1366 /// lifetime, so no other request can run on this connection until the stream
1367 /// is consumed or dropped. Also available on `Client<InTransaction>` to
1368 /// stream within a transaction.
1369 ///
1370 /// # Example
1371 ///
1372 /// ```rust,no_run
1373 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1374 /// let mut stream = client.query_stream("SELECT id FROM big_table", &[]).await?;
1375 /// while let Some(row) = stream.try_next().await? {
1376 /// let id: i32 = row.get_by_name("id")?;
1377 /// let _ = id;
1378 /// }
1379 /// # Ok(())
1380 /// # }
1381 /// ```
1382 pub async fn query_stream<'a>(
1383 &'a mut self,
1384 sql: &str,
1385 params: &[&(dyn crate::ToSql + Sync)],
1386 ) -> Result<crate::row_stream::RowStream<'a, Ready>> {
1387 self.query_stream_inner(sql, params).await
1388 }
1389
1390 /// Execute a query and stream a row's trailing MAX column from the network.
1391 ///
1392 /// For result sets whose last column is a single MAX type
1393 /// (`VARBINARY(MAX)`, `NVARCHAR(MAX)`, `VARCHAR(MAX)`, `XML`), this reads
1394 /// that column's bytes incrementally from the socket instead of
1395 /// materializing the cell — so a multi-GB BLOB can be streamed to a sink in
1396 /// bounded memory. The leading (scalar) columns are decoded eagerly into the
1397 /// per-row [`Row`](crate::Row).
1398 ///
1399 /// The MAX column must be the **last** column. The returned
1400 /// [`BlobStream`](crate::BlobStream) yields scalar [`Row`](crate::Row)s via
1401 /// [`next`](crate::BlobStream::next); read each row's blob with
1402 /// [`read_chunk`](crate::BlobStream::read_chunk) /
1403 /// [`copy_blob_to`](crate::BlobStream::copy_blob_to) before advancing. Also
1404 /// available on `Client<InTransaction>`.
1405 ///
1406 /// # Errors
1407 ///
1408 /// Returns an error if the result set has no trailing MAX column, has more
1409 /// than one MAX column, the MAX column is not last, or the result set uses
1410 /// Always Encrypted (not yet supported on this path).
1411 pub async fn query_stream_blob<'a>(
1412 &'a mut self,
1413 sql: &str,
1414 params: &[&(dyn crate::ToSql + Sync)],
1415 ) -> Result<crate::blob_stream::BlobStream<'a, Ready>> {
1416 self.query_stream_blob_inner(sql, params).await
1417 }
1418
1419 /// Execute a query with a specific timeout.
1420 ///
1421 /// This overrides the default `command_timeout` from the connection configuration
1422 /// for this specific query. If the query does not complete within the specified
1423 /// duration, the driver sends an Attention packet to cancel it server-side,
1424 /// drains the acknowledgement, and returns [`Error::CommandTimeout`] with the
1425 /// connection left usable for the next request.
1426 ///
1427 /// # Arguments
1428 ///
1429 /// * `sql` - The SQL query to execute
1430 /// * `params` - Query parameters
1431 /// * `timeout_duration` - Maximum time to wait for the query to complete
1432 ///
1433 /// # Example
1434 ///
1435 /// ```rust,no_run
1436 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1437 /// use std::time::Duration;
1438 ///
1439 /// // Execute with a 5-second timeout
1440 /// let rows = client
1441 /// .query_with_timeout(
1442 /// "SELECT * FROM large_table",
1443 /// &[],
1444 /// Duration::from_secs(5),
1445 /// )
1446 /// .await?;
1447 /// # let _ = rows;
1448 /// # Ok(())
1449 /// # }
1450 /// ```
1451 pub async fn query_with_timeout<'a>(
1452 &'a mut self,
1453 sql: &str,
1454 params: &[&(dyn crate::ToSql + Sync)],
1455 timeout_duration: std::time::Duration,
1456 ) -> Result<QueryStream<'a>> {
1457 self.query_inner(sql, params, Some(timeout_duration)).await
1458 }
1459
1460 /// Execute a batch that may return multiple result sets.
1461 ///
1462 /// This is useful for stored procedures or SQL batches that contain
1463 /// multiple SELECT statements.
1464 ///
1465 /// # Example
1466 ///
1467 /// ```rust,no_run
1468 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1469 /// // Execute a batch with multiple SELECT statements
1470 /// let mut results = client.query_multiple(
1471 /// "SELECT 1 AS a; SELECT 2 AS b, 3 AS c;",
1472 /// &[]
1473 /// ).await?;
1474 ///
1475 /// // Process first result set
1476 /// while let Some(row) = results.next_row().await? {
1477 /// println!("Result 1: {:?}", row);
1478 /// }
1479 ///
1480 /// // Move to second result set
1481 /// if results.next_result().await? {
1482 /// while let Some(row) = results.next_row().await? {
1483 /// println!("Result 2: {:?}", row);
1484 /// }
1485 /// }
1486 /// # Ok(())
1487 /// # }
1488 /// ```
1489 pub async fn query_multiple<'a>(
1490 &'a mut self,
1491 sql: &str,
1492 params: &[&(dyn crate::ToSql + Sync)],
1493 ) -> Result<MultiResultStream<'a>> {
1494 tracing::debug!(
1495 sql = sql,
1496 params_count = params.len(),
1497 "executing multi-result query"
1498 );
1499
1500 let deadline = self.command_deadline();
1501 let canceller = self.connection_cancel_handle();
1502 let result_sets = run_with_deadline(
1503 async {
1504 if params.is_empty() {
1505 // Simple batch without parameters - use SQL batch
1506 self.send_sql_batch(sql).await?;
1507 } else {
1508 // Parameterized query - sp_executesql (encrypts Always Encrypted params).
1509 let rpc = self.build_parameterized_rpc(sql, params).await?;
1510 self.send_rpc(&rpc).await?;
1511 }
1512
1513 // Read all result sets
1514 self.read_multi_result_response().await
1515 },
1516 deadline,
1517 canceller,
1518 )
1519 .await?;
1520 Ok(MultiResultStream::new(result_sets))
1521 }
1522
1523 /// Execute a query that doesn't return rows.
1524 ///
1525 /// Returns the number of affected rows.
1526 pub async fn execute(
1527 &mut self,
1528 sql: &str,
1529 params: &[&(dyn crate::ToSql + Sync)],
1530 ) -> Result<u64> {
1531 let deadline = self.command_deadline();
1532 self.execute_inner(sql, params, deadline).await
1533 }
1534
1535 /// Shared execute implementation with an explicit command deadline.
1536 async fn execute_inner(
1537 &mut self,
1538 sql: &str,
1539 params: &[&(dyn crate::ToSql + Sync)],
1540 deadline: Option<std::time::Duration>,
1541 ) -> Result<u64> {
1542 tracing::debug!(
1543 sql = sql,
1544 params_count = params.len(),
1545 "executing statement"
1546 );
1547
1548 #[cfg(feature = "otel")]
1549 let instrumentation = self.instrumentation.clone();
1550 #[cfg(feature = "otel")]
1551 let mut span = instrumentation.query_span(sql);
1552 #[cfg(feature = "otel")]
1553 let timer = crate::instrumentation::OperationTimer::start(
1554 crate::instrumentation::extract_operation(sql),
1555 );
1556
1557 let canceller = self.cancel_handle();
1558 let result = run_with_deadline(
1559 async {
1560 if params.is_empty() {
1561 // Simple statement without parameters - use SQL batch
1562 self.send_sql_batch(sql).await?;
1563 } else {
1564 // Parameterized statement - sp_executesql (encrypts Always Encrypted params).
1565 let rpc = self.build_parameterized_rpc(sql, params).await?;
1566 self.send_rpc(&rpc).await?;
1567 }
1568
1569 // Read response and get row count
1570 self.read_execute_result().await
1571 },
1572 deadline,
1573 canceller,
1574 )
1575 .await;
1576
1577 #[cfg(feature = "otel")]
1578 match &result {
1579 Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
1580 Err(e) => InstrumentationContext::record_error(&mut span, e),
1581 }
1582 #[cfg(feature = "otel")]
1583 timer.finish(instrumentation.metrics(), result.is_ok());
1584
1585 // Drop the span before returning
1586 #[cfg(feature = "otel")]
1587 drop(span);
1588
1589 result
1590 }
1591
1592 /// Execute a statement with a specific timeout.
1593 ///
1594 /// This overrides the default `command_timeout` from the connection configuration
1595 /// for this specific statement. If the statement does not complete within the
1596 /// specified duration, the driver sends an Attention packet to cancel it
1597 /// server-side, drains the acknowledgement, and returns
1598 /// [`Error::CommandTimeout`] with the connection left usable.
1599 ///
1600 /// # Arguments
1601 ///
1602 /// * `sql` - The SQL statement to execute
1603 /// * `params` - Statement parameters
1604 /// * `timeout_duration` - Maximum time to wait for the statement to complete
1605 ///
1606 /// # Example
1607 ///
1608 /// ```rust,no_run
1609 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1610 /// use std::time::Duration;
1611 ///
1612 /// // Execute with a 10-second timeout
1613 /// let rows_affected = client
1614 /// .execute_with_timeout(
1615 /// "UPDATE large_table SET status = @p1",
1616 /// &[&"processed"],
1617 /// Duration::from_secs(10),
1618 /// )
1619 /// .await?;
1620 /// # let _ = rows_affected;
1621 /// # Ok(())
1622 /// # }
1623 /// ```
1624 pub async fn execute_with_timeout(
1625 &mut self,
1626 sql: &str,
1627 params: &[&(dyn crate::ToSql + Sync)],
1628 timeout_duration: std::time::Duration,
1629 ) -> Result<u64> {
1630 self.execute_inner(sql, params, Some(timeout_duration))
1631 .await
1632 }
1633
1634 /// Begin a transaction.
1635 ///
1636 /// This transitions the client from `Ready` to `InTransaction` state.
1637 /// Per MS-TDS spec, the server returns a transaction descriptor in the
1638 /// BeginTransaction EnvChange token that must be included in subsequent
1639 /// ALL_HEADERS sections.
1640 pub async fn begin_transaction(mut self) -> Result<Client<InTransaction>> {
1641 tracing::debug!("beginning transaction");
1642
1643 #[cfg(feature = "otel")]
1644 let instrumentation = self.instrumentation.clone();
1645 #[cfg(feature = "otel")]
1646 let mut span = instrumentation.transaction_span("BEGIN");
1647
1648 // Execute BEGIN TRANSACTION and extract the transaction descriptor
1649 let result = async {
1650 self.send_sql_batch("BEGIN TRANSACTION").await?;
1651 self.read_transaction_begin_result().await
1652 }
1653 .await;
1654
1655 #[cfg(feature = "otel")]
1656 match &result {
1657 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1658 Err(e) => InstrumentationContext::record_error(&mut span, e),
1659 }
1660
1661 // Drop the span before moving instrumentation
1662 #[cfg(feature = "otel")]
1663 drop(span);
1664
1665 let transaction_descriptor = result?;
1666
1667 Ok(Client {
1668 config: self.config,
1669 _state: PhantomData,
1670 connection: self.connection,
1671 server_version: self.server_version,
1672 current_database: self.current_database,
1673 server_collation: self.server_collation,
1674 statement_cache: self.statement_cache,
1675 transaction_descriptor, // Store the descriptor from server
1676 needs_reset: self.needs_reset,
1677 in_flight: self.in_flight,
1678 #[cfg(feature = "otel")]
1679 instrumentation: self.instrumentation,
1680 #[cfg(feature = "always-encrypted")]
1681 encryption_context: self.encryption_context,
1682 })
1683 }
1684
1685 /// Begin a transaction with a specific isolation level.
1686 ///
1687 /// This transitions the client from `Ready` to `InTransaction` state
1688 /// with the specified isolation level.
1689 ///
1690 /// # Example
1691 ///
1692 /// ```rust,no_run
1693 /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1694 /// use mssql_client::IsolationLevel;
1695 ///
1696 /// let tx = client.begin_transaction_with_isolation(IsolationLevel::Serializable).await?;
1697 /// // All operations in this transaction use SERIALIZABLE isolation
1698 /// tx.commit().await?;
1699 /// # Ok(())
1700 /// # }
1701 /// ```
1702 pub async fn begin_transaction_with_isolation(
1703 mut self,
1704 isolation_level: crate::transaction::IsolationLevel,
1705 ) -> Result<Client<InTransaction>> {
1706 tracing::debug!(
1707 isolation_level = %isolation_level.name(),
1708 "beginning transaction with isolation level"
1709 );
1710
1711 #[cfg(feature = "otel")]
1712 let instrumentation = self.instrumentation.clone();
1713 #[cfg(feature = "otel")]
1714 let mut span = instrumentation.transaction_span("BEGIN");
1715
1716 // First set the isolation level
1717 let result = async {
1718 self.send_sql_batch(isolation_level.as_sql()).await?;
1719 self.read_execute_result().await?;
1720
1721 // Then begin the transaction
1722 self.send_sql_batch("BEGIN TRANSACTION").await?;
1723 self.read_transaction_begin_result().await
1724 }
1725 .await;
1726
1727 #[cfg(feature = "otel")]
1728 match &result {
1729 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1730 Err(e) => InstrumentationContext::record_error(&mut span, e),
1731 }
1732
1733 #[cfg(feature = "otel")]
1734 drop(span);
1735
1736 let transaction_descriptor = result?;
1737
1738 Ok(Client {
1739 config: self.config,
1740 _state: PhantomData,
1741 connection: self.connection,
1742 server_version: self.server_version,
1743 current_database: self.current_database,
1744 server_collation: self.server_collation,
1745 statement_cache: self.statement_cache,
1746 transaction_descriptor,
1747 needs_reset: self.needs_reset,
1748 in_flight: self.in_flight,
1749 #[cfg(feature = "otel")]
1750 instrumentation: self.instrumentation,
1751 #[cfg(feature = "always-encrypted")]
1752 encryption_context: self.encryption_context,
1753 })
1754 }
1755
1756 /// Execute a simple query without parameters.
1757 ///
1758 /// This is useful for DDL statements and simple queries where you
1759 /// don't need to retrieve the affected row count.
1760 pub async fn simple_query(&mut self, sql: &str) -> Result<()> {
1761 tracing::debug!(sql = sql, "executing simple query");
1762
1763 // Send SQL batch
1764 self.send_sql_batch(sql).await?;
1765
1766 // Read and discard response
1767 let _ = self.read_execute_result().await?;
1768
1769 Ok(())
1770 }
1771
1772 /// Close the connection gracefully.
1773 pub async fn close(self) -> Result<()> {
1774 tracing::debug!("closing connection");
1775 Ok(())
1776 }
1777
1778 /// Get the current database name.
1779 #[must_use]
1780 pub fn database(&self) -> Option<&str> {
1781 self.config.database.as_deref()
1782 }
1783
1784 /// Get the server host.
1785 #[must_use]
1786 pub fn host(&self) -> &str {
1787 &self.config.host
1788 }
1789
1790 /// Get the server port.
1791 #[must_use]
1792 pub fn port(&self) -> u16 {
1793 self.config.port
1794 }
1795
1796 /// Check if the connection is currently in a transaction.
1797 ///
1798 /// This returns `true` if a transaction was started via raw SQL
1799 /// (`BEGIN TRANSACTION`) and has not yet been committed or rolled back.
1800 ///
1801 /// Note: This only tracks transactions started via raw SQL. Transactions
1802 /// started via the type-state API (`begin_transaction()`) result in a
1803 /// `Client<InTransaction>` which is a different type.
1804 ///
1805 /// # Example
1806 ///
1807 /// ```rust,no_run
1808 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1809 /// client.execute("BEGIN TRANSACTION", &[]).await?;
1810 /// assert!(client.is_in_transaction());
1811 ///
1812 /// client.execute("COMMIT", &[]).await?;
1813 /// assert!(!client.is_in_transaction());
1814 /// # Ok(())
1815 /// # }
1816 /// ```
1817 #[must_use]
1818 pub fn is_in_transaction(&self) -> bool {
1819 self.transaction_descriptor != 0
1820 }
1821
1822 /// Check if a request is in-flight (sent but response not fully read).
1823 ///
1824 /// Used by the connection pool to detect dirty connections that were
1825 /// interrupted mid-query (e.g., by `tokio::select!` or a timeout).
1826 /// A connection with an in-flight request has unread data in the TCP
1827 /// buffer and must be discarded rather than returned to the pool.
1828 #[must_use]
1829 pub fn is_in_flight(&self) -> bool {
1830 self.in_flight
1831 }
1832
1833 /// Report whether an Always Encrypted key-store provider with the given
1834 /// name is currently reachable through this client's encryption context.
1835 ///
1836 /// Returns `false` when the `always-encrypted` feature isn't enabled, when
1837 /// the connection was opened without `column_encryption` configured, or
1838 /// when no matching provider was registered.
1839 #[cfg(feature = "always-encrypted")]
1840 #[must_use]
1841 pub fn has_encryption_provider(&self, name: &str) -> bool {
1842 self.encryption_context
1843 .as_ref()
1844 .is_some_and(|ctx| ctx.has_provider(name))
1845 }
1846
1847 /// Get a handle for cancelling the current query.
1848 ///
1849 /// The cancel handle can be cloned and sent to other tasks, enabling
1850 /// cancellation of long-running queries from a separate async context.
1851 ///
1852 /// # Example
1853 ///
1854 /// ```rust,no_run
1855 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1856 /// use std::time::Duration;
1857 ///
1858 /// let cancel_handle = client.cancel_handle();
1859 ///
1860 /// // Spawn a task to cancel after 10 seconds
1861 /// let handle = tokio::spawn(async move {
1862 /// tokio::time::sleep(Duration::from_secs(10)).await;
1863 /// let _ = cancel_handle.cancel().await;
1864 /// });
1865 ///
1866 /// // This query will be cancelled if it runs longer than 10 seconds
1867 /// let result = client.query("SELECT * FROM very_large_table", &[]).await;
1868 /// # let _ = (handle, result);
1869 /// # Ok(())
1870 /// # }
1871 /// ```
1872 #[must_use]
1873 pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
1874 self.connection_cancel_handle()
1875 }
1876}
1877
1878/// # Drop Behavior
1879///
1880/// **`Client<InTransaction>` has no automatic rollback on drop.** If the client is
1881/// dropped without calling [`commit()`](Client::commit) or [`rollback()`](Client::rollback),
1882/// the transaction remains open on the server until the TCP connection closes
1883/// (at which point SQL Server automatically rolls back).
1884///
1885/// This is because `Drop` is synchronous and cannot perform the async I/O needed
1886/// to send a `ROLLBACK TRANSACTION` command.
1887///
1888/// ## Consequences of dropping without commit/rollback
1889///
1890/// - **Direct connections:** The transaction leaks until the OS TCP timeout
1891/// (potentially 30+ minutes), holding locks on any modified rows.
1892/// - **Pooled connections:** The pool detects the active transaction descriptor
1893/// and discards the connection rather than returning it to the idle pool
1894/// (see `PooledConnection::drop` in `mssql-driver-pool`).
1895///
1896/// ## Best practice
1897///
1898/// Always ensure `commit()` or `rollback()` is called. Use helper patterns
1899/// for error paths:
1900///
1901/// ```rust,no_run
1902/// # async fn do_work(_: &mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> { Ok(()) }
1903/// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1904/// let tx = client.begin_transaction().await?;
1905/// match do_work(&tx).await {
1906/// Ok(_) => { tx.commit().await?; }
1907/// Err(e) => { tx.rollback().await?; return Err(e); }
1908/// }
1909/// # Ok(())
1910/// # }
1911/// ```
1912impl Client<InTransaction> {
1913 /// Execute a query within the transaction and return a streaming result set.
1914 ///
1915 /// See [`Client<Ready>::query`] for usage examples.
1916 pub async fn query<'a>(
1917 &'a mut self,
1918 sql: &str,
1919 params: &[&(dyn crate::ToSql + Sync)],
1920 ) -> Result<QueryStream<'a>> {
1921 let deadline = self.command_deadline();
1922 self.query_inner(sql, params, deadline).await
1923 }
1924
1925 /// Shared query implementation with an explicit command deadline.
1926 async fn query_inner<'a>(
1927 &'a mut self,
1928 sql: &str,
1929 params: &[&(dyn crate::ToSql + Sync)],
1930 deadline: Option<std::time::Duration>,
1931 ) -> Result<QueryStream<'a>> {
1932 tracing::debug!(
1933 sql = sql,
1934 params_count = params.len(),
1935 "executing query in transaction"
1936 );
1937
1938 #[cfg(feature = "otel")]
1939 let instrumentation = self.instrumentation.clone();
1940 #[cfg(feature = "otel")]
1941 let mut span = instrumentation.query_span(sql);
1942 #[cfg(feature = "otel")]
1943 let timer = crate::instrumentation::OperationTimer::start(
1944 crate::instrumentation::extract_operation(sql),
1945 );
1946
1947 let canceller = self.cancel_handle();
1948 let result = run_with_deadline(
1949 async {
1950 if params.is_empty() {
1951 // Simple query without parameters - use SQL batch
1952 self.send_sql_batch(sql).await?;
1953 } else {
1954 // Parameterized query - sp_executesql (encrypts Always Encrypted params).
1955 let rpc = self.build_parameterized_rpc(sql, params).await?;
1956 self.send_rpc(&rpc).await?;
1957 }
1958
1959 // Read complete response including columns and rows
1960 self.read_query_response().await
1961 },
1962 deadline,
1963 canceller,
1964 )
1965 .await;
1966
1967 #[cfg(feature = "otel")]
1968 match &result {
1969 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1970 Err(e) => InstrumentationContext::record_error(&mut span, e),
1971 }
1972 #[cfg(feature = "otel")]
1973 timer.finish(instrumentation.metrics(), result.is_ok());
1974
1975 // Drop the span before returning
1976 #[cfg(feature = "otel")]
1977 drop(span);
1978
1979 let resp = result?;
1980 #[cfg(feature = "always-encrypted")]
1981 {
1982 Ok(QueryStream::from_raw(
1983 resp.columns,
1984 resp.pending_rows,
1985 resp.meta,
1986 resp.decryptor,
1987 ))
1988 }
1989 #[cfg(not(feature = "always-encrypted"))]
1990 {
1991 Ok(QueryStream::from_raw(
1992 resp.columns,
1993 resp.pending_rows,
1994 resp.meta,
1995 ))
1996 }
1997 }
1998
1999 /// Stream rows incrementally from the network within the transaction.
2000 ///
2001 /// Identical to [`Client<Ready>::query_stream`] except the query runs inside
2002 /// the open transaction. The returned [`RowStream`](crate::RowStream)
2003 /// borrows the transaction client for its lifetime, so the stream must be
2004 /// consumed or dropped before the transaction can be committed or rolled
2005 /// back.
2006 pub async fn query_stream<'a>(
2007 &'a mut self,
2008 sql: &str,
2009 params: &[&(dyn crate::ToSql + Sync)],
2010 ) -> Result<crate::row_stream::RowStream<'a, InTransaction>> {
2011 self.query_stream_inner(sql, params).await
2012 }
2013
2014 /// Stream a row's trailing MAX column from the network within the
2015 /// transaction.
2016 ///
2017 /// See [`Client<Ready>::query_stream_blob`] for semantics and constraints;
2018 /// the only difference is that the query runs inside the open transaction.
2019 pub async fn query_stream_blob<'a>(
2020 &'a mut self,
2021 sql: &str,
2022 params: &[&(dyn crate::ToSql + Sync)],
2023 ) -> Result<crate::blob_stream::BlobStream<'a, InTransaction>> {
2024 self.query_stream_blob_inner(sql, params).await
2025 }
2026
2027 /// Execute a statement within the transaction.
2028 ///
2029 /// Returns the number of affected rows.
2030 pub async fn execute(
2031 &mut self,
2032 sql: &str,
2033 params: &[&(dyn crate::ToSql + Sync)],
2034 ) -> Result<u64> {
2035 let deadline = self.command_deadline();
2036 self.execute_inner(sql, params, deadline).await
2037 }
2038
2039 /// Shared execute implementation with an explicit command deadline.
2040 async fn execute_inner(
2041 &mut self,
2042 sql: &str,
2043 params: &[&(dyn crate::ToSql + Sync)],
2044 deadline: Option<std::time::Duration>,
2045 ) -> Result<u64> {
2046 tracing::debug!(
2047 sql = sql,
2048 params_count = params.len(),
2049 "executing statement in transaction"
2050 );
2051
2052 #[cfg(feature = "otel")]
2053 let instrumentation = self.instrumentation.clone();
2054 #[cfg(feature = "otel")]
2055 let mut span = instrumentation.query_span(sql);
2056 #[cfg(feature = "otel")]
2057 let timer = crate::instrumentation::OperationTimer::start(
2058 crate::instrumentation::extract_operation(sql),
2059 );
2060
2061 let canceller = self.cancel_handle();
2062 let result = run_with_deadline(
2063 async {
2064 if params.is_empty() {
2065 // Simple statement without parameters - use SQL batch
2066 self.send_sql_batch(sql).await?;
2067 } else {
2068 // Parameterized statement - sp_executesql (encrypts Always Encrypted params).
2069 let rpc = self.build_parameterized_rpc(sql, params).await?;
2070 self.send_rpc(&rpc).await?;
2071 }
2072
2073 // Read response and get row count
2074 self.read_execute_result().await
2075 },
2076 deadline,
2077 canceller,
2078 )
2079 .await;
2080
2081 #[cfg(feature = "otel")]
2082 match &result {
2083 Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
2084 Err(e) => InstrumentationContext::record_error(&mut span, e),
2085 }
2086 #[cfg(feature = "otel")]
2087 timer.finish(instrumentation.metrics(), result.is_ok());
2088
2089 // Drop the span before returning
2090 #[cfg(feature = "otel")]
2091 drop(span);
2092
2093 result
2094 }
2095
2096 /// Execute a query within the transaction with a specific timeout.
2097 ///
2098 /// See [`Client<Ready>::query_with_timeout`] for details.
2099 pub async fn query_with_timeout<'a>(
2100 &'a mut self,
2101 sql: &str,
2102 params: &[&(dyn crate::ToSql + Sync)],
2103 timeout_duration: std::time::Duration,
2104 ) -> Result<QueryStream<'a>> {
2105 self.query_inner(sql, params, Some(timeout_duration)).await
2106 }
2107
2108 /// Execute a statement within the transaction with a specific timeout.
2109 ///
2110 /// See [`Client<Ready>::execute_with_timeout`] for details.
2111 pub async fn execute_with_timeout(
2112 &mut self,
2113 sql: &str,
2114 params: &[&(dyn crate::ToSql + Sync)],
2115 timeout_duration: std::time::Duration,
2116 ) -> Result<u64> {
2117 self.execute_inner(sql, params, Some(timeout_duration))
2118 .await
2119 }
2120
2121 /// Open a FILESTREAM BLOB for async reading and/or writing.
2122 ///
2123 /// This method queries the server for the transaction context, then opens
2124 /// the FILESTREAM handle using the native Win32 `OpenSqlFilestream` API.
2125 ///
2126 /// # Arguments
2127 ///
2128 /// * `path` — The UNC path obtained from the T-SQL `column.PathName()` function.
2129 /// Query this yourself before calling `open_filestream`:
2130 /// ```sql
2131 /// SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1
2132 /// ```
2133 /// * `access` — Read, write, or read/write access mode.
2134 ///
2135 /// # Requirements
2136 ///
2137 /// - SQL Server must have FILESTREAM enabled (`sp_configure 'filestream access level', 2`)
2138 /// - The Microsoft OLE DB Driver for SQL Server must be installed on the client
2139 /// - The `FileStream` must be dropped before calling [`commit`] or [`rollback`]
2140 ///
2141 /// # Example
2142 ///
2143 /// ```text
2144 /// use mssql_client::FileStreamAccess;
2145 /// use tokio::io::AsyncReadExt;
2146 ///
2147 /// let mut tx = client.begin_transaction().await?;
2148 ///
2149 /// // Get the FILESTREAM path
2150 /// let rows = tx.query(
2151 /// "SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1",
2152 /// &[&doc_id],
2153 /// ).await?;
2154 /// let path: String = rows.into_iter().next().unwrap()?.get(0)?;
2155 ///
2156 /// // Open and read the BLOB
2157 /// let mut stream = tx.open_filestream(&path, FileStreamAccess::Read).await?;
2158 /// let mut data = Vec::new();
2159 /// stream.read_to_end(&mut data).await?;
2160 /// drop(stream);
2161 ///
2162 /// tx.commit().await?;
2163 /// ```
2164 #[cfg(all(windows, feature = "filestream"))]
2165 pub async fn open_filestream(
2166 &mut self,
2167 path: &str,
2168 access: crate::filestream::FileStreamAccess,
2169 ) -> Result<crate::filestream::FileStream> {
2170 tracing::debug!(path = path, ?access, "opening FILESTREAM BLOB");
2171
2172 // Get the transaction context from SQL Server.
2173 // This binds the file access to the current SQL transaction.
2174 let txn_context: Vec<u8> = {
2175 let rows = self
2176 .query("SELECT GET_FILESTREAM_TRANSACTION_CONTEXT()", &[])
2177 .await?;
2178 let mut ctx = None;
2179 for result in rows {
2180 let row = result?;
2181 ctx = Some(row.get::<Vec<u8>>(0)?);
2182 }
2183 ctx.ok_or_else(|| {
2184 Error::FileStream("GET_FILESTREAM_TRANSACTION_CONTEXT() returned no rows".into())
2185 })?
2186 };
2187
2188 crate::filestream::FileStream::open(path, access, &txn_context)
2189 }
2190
2191 /// Commit the transaction.
2192 ///
2193 /// This transitions the client back to `Ready` state.
2194 pub async fn commit(mut self) -> Result<Client<Ready>> {
2195 tracing::debug!("committing transaction");
2196
2197 #[cfg(feature = "otel")]
2198 let instrumentation = self.instrumentation.clone();
2199 #[cfg(feature = "otel")]
2200 let mut span = instrumentation.transaction_span("COMMIT");
2201
2202 // Execute COMMIT TRANSACTION
2203 let result = async {
2204 self.send_sql_batch("COMMIT TRANSACTION").await?;
2205 self.read_execute_result().await
2206 }
2207 .await;
2208
2209 #[cfg(feature = "otel")]
2210 match &result {
2211 Ok(_) => InstrumentationContext::record_success(&mut span, None),
2212 Err(e) => InstrumentationContext::record_error(&mut span, e),
2213 }
2214
2215 // Drop the span before moving instrumentation
2216 #[cfg(feature = "otel")]
2217 drop(span);
2218
2219 result?;
2220
2221 Ok(Client {
2222 config: self.config,
2223 _state: PhantomData,
2224 connection: self.connection,
2225 server_version: self.server_version,
2226 current_database: self.current_database,
2227 server_collation: self.server_collation,
2228 statement_cache: self.statement_cache,
2229 transaction_descriptor: 0, // Reset to auto-commit mode
2230 needs_reset: self.needs_reset,
2231 in_flight: self.in_flight,
2232 #[cfg(feature = "otel")]
2233 instrumentation: self.instrumentation,
2234 #[cfg(feature = "always-encrypted")]
2235 encryption_context: self.encryption_context,
2236 })
2237 }
2238
2239 /// Rollback the transaction.
2240 ///
2241 /// This transitions the client back to `Ready` state.
2242 pub async fn rollback(mut self) -> Result<Client<Ready>> {
2243 tracing::debug!("rolling back transaction");
2244
2245 #[cfg(feature = "otel")]
2246 let instrumentation = self.instrumentation.clone();
2247 #[cfg(feature = "otel")]
2248 let mut span = instrumentation.transaction_span("ROLLBACK");
2249
2250 // Execute ROLLBACK TRANSACTION
2251 let result = async {
2252 self.send_sql_batch("ROLLBACK TRANSACTION").await?;
2253 self.read_execute_result().await
2254 }
2255 .await;
2256
2257 #[cfg(feature = "otel")]
2258 match &result {
2259 Ok(_) => InstrumentationContext::record_success(&mut span, None),
2260 Err(e) => InstrumentationContext::record_error(&mut span, e),
2261 }
2262
2263 // Drop the span before moving instrumentation
2264 #[cfg(feature = "otel")]
2265 drop(span);
2266
2267 result?;
2268
2269 Ok(Client {
2270 config: self.config,
2271 _state: PhantomData,
2272 connection: self.connection,
2273 server_version: self.server_version,
2274 current_database: self.current_database,
2275 server_collation: self.server_collation,
2276 statement_cache: self.statement_cache,
2277 transaction_descriptor: 0, // Reset to auto-commit mode
2278 needs_reset: self.needs_reset,
2279 in_flight: self.in_flight,
2280 #[cfg(feature = "otel")]
2281 instrumentation: self.instrumentation,
2282 #[cfg(feature = "always-encrypted")]
2283 encryption_context: self.encryption_context,
2284 })
2285 }
2286
2287 /// Create a savepoint and return a handle for later rollback.
2288 ///
2289 /// The returned `SavePoint` handle contains the validated savepoint name.
2290 /// Use it with `rollback_to()` to partially undo transaction work.
2291 ///
2292 /// # Example
2293 ///
2294 /// ```rust,no_run
2295 /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
2296 /// let mut tx = client.begin_transaction().await?;
2297 /// tx.execute("INSERT INTO orders ...", &[]).await?;
2298 /// let sp = tx.save_point("before_items").await?;
2299 /// tx.execute("INSERT INTO items ...", &[]).await?;
2300 /// // Oops, rollback just the items
2301 /// tx.rollback_to(&sp).await?;
2302 /// tx.commit().await?;
2303 /// # Ok(())
2304 /// # }
2305 /// ```
2306 pub async fn save_point(&mut self, name: &str) -> Result<SavePoint> {
2307 crate::validation::validate_identifier(name)?;
2308 tracing::debug!(name = name, "creating savepoint");
2309
2310 // Execute SAVE TRANSACTION <name>
2311 // Note: name is validated by validate_identifier() to prevent SQL injection
2312 let sql = format!("SAVE TRANSACTION {name}");
2313 self.send_sql_batch(&sql).await?;
2314 self.read_execute_result().await?;
2315
2316 Ok(SavePoint::new(name.to_string()))
2317 }
2318
2319 /// Rollback to a savepoint.
2320 ///
2321 /// This rolls back all changes made after the savepoint was created,
2322 /// but keeps the transaction active. The savepoint remains valid and
2323 /// can be rolled back to again.
2324 ///
2325 /// # Example
2326 ///
2327 /// ```rust,no_run
2328 /// # async fn ex(mut tx: mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> {
2329 /// let sp = tx.save_point("checkpoint").await?;
2330 /// // ... do some work ...
2331 /// tx.rollback_to(&sp).await?; // Undo changes since checkpoint
2332 /// // Transaction is still active, savepoint is still valid
2333 /// # Ok(())
2334 /// # }
2335 /// ```
2336 pub async fn rollback_to(&mut self, savepoint: &SavePoint) -> Result<()> {
2337 tracing::debug!(name = savepoint.name(), "rolling back to savepoint");
2338
2339 // Execute ROLLBACK TRANSACTION <name>
2340 // Note: savepoint name was validated during creation
2341 let sql = format!("ROLLBACK TRANSACTION {}", savepoint.name());
2342 self.send_sql_batch(&sql).await?;
2343 self.read_execute_result().await?;
2344
2345 Ok(())
2346 }
2347
2348 /// Release a savepoint (optional cleanup).
2349 ///
2350 /// Note: SQL Server doesn't have explicit savepoint release, but this
2351 /// method is provided for API completeness. The savepoint is automatically
2352 /// released when the transaction commits or rolls back.
2353 pub async fn release_savepoint(&mut self, savepoint: SavePoint) -> Result<()> {
2354 tracing::debug!(name = savepoint.name(), "releasing savepoint");
2355
2356 // SQL Server doesn't require explicit savepoint release
2357 // The savepoint is implicitly released on commit/rollback
2358 // This method exists for API completeness
2359 drop(savepoint);
2360 Ok(())
2361 }
2362
2363 /// Get a handle for cancelling the current query within the transaction.
2364 ///
2365 /// See [`Client<Ready>::cancel_handle`] for usage examples.
2366 #[must_use]
2367 pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
2368 self.connection_cancel_handle()
2369 }
2370}
2371
2372impl<S: ConnectionState> std::fmt::Debug for Client<S> {
2373 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2374 f.debug_struct("Client")
2375 .field("host", &self.config.host)
2376 .field("port", &self.config.port)
2377 .field("database", &self.config.database)
2378 .finish()
2379 }
2380}