mssql_client/client.rs
1//! SQL Server client implementation.
2//!
3//! ## DDL and statement routing
4//!
5//! [`Client::execute`] routes automatically by parameter count: with no
6//! parameters it sends a SQL batch (which permits DDL such as `CREATE` / `ALTER`
7//! / `DROP`); with parameters it uses `sp_executesql`, whose procedure context
8//! SQL Server forbids DDL in. Run DDL with an empty parameter slice:
9//!
10//! ```rust,no_run
11//! # async fn create_table(config: mssql_client::Config) -> Result<(), mssql_client::Error> {
12//! # let mut client = mssql_client::Client::connect(config).await?;
13//! client.execute("CREATE TABLE dbo.t (id INT)", &[]).await?;
14//! # Ok(())
15//! # }
16//! ```
17//!
18//! Use [`Client::simple_query`] for fire-and-forget batches (including
19//! multi-statement, `;`-separated DDL) when you don't need the affected-row count.
20
21// Allow unwrap/expect for chrono date construction with known-valid constant dates
22// and for regex patterns that are compile-time constants
23#![allow(clippy::unwrap_used, clippy::expect_used, clippy::needless_range_loop)]
24
25mod connect;
26mod params;
27mod response;
28
29use std::marker::PhantomData;
30
31use mssql_codec::connection::Connection;
32#[cfg(feature = "tls")]
33use mssql_tls::TlsStream;
34use tds_protocol::packet::PacketType;
35use tds_protocol::rpc::RpcRequest;
36use tds_protocol::token::{EnvChange, EnvChangeType};
37use tokio::net::TcpStream;
38
39use crate::config::Config;
40use crate::error::{Error, Result};
41#[cfg(feature = "otel")]
42use crate::instrumentation::InstrumentationContext;
43use crate::state::{ConnectionState, InTransaction, Ready};
44use crate::statement_cache::StatementCache;
45use crate::stream::{MultiResultStream, QueryStream};
46use crate::transaction::SavePoint;
47
48/// Run a network future under an optional command deadline.
49///
50/// On timeout this sends an Attention packet via `canceller` and then awaits
51/// the future so its own read loop drains the server's DONE_ATTN
52/// acknowledgement, leaving the connection clean before returning
53/// [`Error::CommandTimeout`]. This is the cancel-safe alternative to dropping
54/// the future (e.g. via `tokio::time::timeout`), which would leave unconsumed
55/// TDS data in the connection buffer and desync the next request.
56async fn run_with_deadline<F, T>(
57 fut: F,
58 deadline: Option<std::time::Duration>,
59 canceller: crate::cancel::CancelHandle,
60) -> Result<T>
61where
62 F: std::future::Future<Output = Result<T>>,
63{
64 let Some(d) = deadline else {
65 return fut.await;
66 };
67 tokio::pin!(fut);
68 tokio::select! {
69 biased;
70 res = &mut fut => res,
71 () = tokio::time::sleep(d) => {
72 // Signal cancellation, then let the in-flight read consume the
73 // server's attention acknowledgement so the connection stays usable.
74 let _ = canceller.cancel().await;
75 let _ = fut.await;
76 Err(Error::CommandTimeout)
77 }
78 }
79}
80
81/// SQL Server client with type-state connection management.
82///
83/// The generic parameter `S` represents the current connection state,
84/// ensuring at compile time that certain operations are only available
85/// in appropriate states.
86pub struct Client<S: ConnectionState> {
87 config: Config,
88 _state: PhantomData<S>,
89 /// The underlying connection (present only when connected)
90 connection: Option<ConnectionHandle>,
91 /// Server version from LoginAck (raw u32 TDS version)
92 server_version: Option<u32>,
93 /// Current database from EnvChange
94 current_database: Option<String>,
95 /// Server's default collation from SqlCollation EnvChange during login.
96 /// Used when `SendStringParametersAsUnicode=false` to encode VARCHAR
97 /// parameters with the correct character encoding and collation bytes.
98 server_collation: Option<tds_protocol::token::Collation>,
99 /// Prepared statement cache for query optimization
100 statement_cache: StatementCache,
101 /// Transaction descriptor from BeginTransaction EnvChange.
102 /// Per MS-TDS spec, this value must be included in ALL_HEADERS for subsequent
103 /// requests within an explicit transaction. 0 indicates auto-commit mode.
104 transaction_descriptor: u64,
105 /// Whether a request has been sent and the response has not yet been fully read.
106 /// Used by the connection pool to detect dirty connections after cancel/timeout.
107 in_flight: bool,
108 /// Whether this connection needs a reset on next use.
109 /// Set by connection pool on checkin, cleared after first query/execute.
110 /// When true, the RESETCONNECTION flag is set on the first TDS packet.
111 needs_reset: bool,
112 /// OpenTelemetry instrumentation context (when otel feature is enabled)
113 #[cfg(feature = "otel")]
114 instrumentation: InstrumentationContext,
115 /// Always Encrypted context for column decryption (when always-encrypted feature is enabled)
116 #[cfg(feature = "always-encrypted")]
117 pub(crate) encryption_context: Option<std::sync::Arc<crate::encryption::EncryptionContext>>,
118}
119
120/// Internal connection handle wrapping the actual connection.
121///
122/// This is an enum to support different connection types:
123/// - TLS (TDS 8.0 strict mode) - requires `tls` feature
124/// - TLS with PreLogin wrapping (TDS 7.x style) - requires `tls` feature
125/// - Plain TCP (for internal networks or when `tls` feature is disabled)
126#[allow(dead_code)] // Connection will be used once query execution is implemented
127enum ConnectionHandle {
128 /// TLS connection (TDS 8.0 strict mode - TLS before any TDS traffic)
129 #[cfg(feature = "tls")]
130 Tls(Connection<TlsStream<TcpStream>>),
131 /// TLS connection with PreLogin wrapping (TDS 7.x style)
132 #[cfg(feature = "tls")]
133 TlsPrelogin(Connection<TlsStream<mssql_tls::TlsPreloginWrapper<TcpStream>>>),
134 /// Plain TCP connection (for internal networks or when `tls` feature is disabled)
135 Plain(Connection<TcpStream>),
136}
137
138// Private helper methods available to all connection states
139impl<S: ConnectionState> Client<S> {
140 /// The default per-command deadline from `command_timeout`.
141 ///
142 /// Returns `None` when `command_timeout` is zero, which means "no limit"
143 /// (matching ADO.NET's `SqlCommand.CommandTimeout = 0`).
144 fn command_deadline(&self) -> Option<std::time::Duration> {
145 let t = self.config.command_timeout;
146 if t.is_zero() { None } else { Some(t) }
147 }
148
149 /// Process transaction-related EnvChange tokens.
150 ///
151 /// This handles BeginTransaction, CommitTransaction, and RollbackTransaction
152 /// EnvChange tokens, updating the transaction descriptor accordingly.
153 ///
154 /// This enables executing BEGIN TRANSACTION, COMMIT, and ROLLBACK via raw SQL
155 /// while still having the transaction descriptor tracked correctly.
156 fn process_transaction_env_change(env: &EnvChange, transaction_descriptor: &mut u64) {
157 use tds_protocol::token::EnvChangeValue;
158
159 match env.env_type {
160 EnvChangeType::BeginTransaction => {
161 if let EnvChangeValue::Binary(ref data) = env.new_value {
162 if data.len() >= 8 {
163 let descriptor = u64::from_le_bytes([
164 data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
165 ]);
166 tracing::debug!(descriptor = descriptor, "transaction started via raw SQL");
167 *transaction_descriptor = descriptor;
168 }
169 }
170 }
171 EnvChangeType::CommitTransaction | EnvChangeType::RollbackTransaction => {
172 tracing::debug!(
173 env_type = ?env.env_type,
174 "transaction ended via raw SQL"
175 );
176 *transaction_descriptor = 0;
177 }
178 _ => {}
179 }
180 }
181
182 /// Send a SQL batch to the server.
183 ///
184 /// Uses the client's current transaction descriptor in ALL_HEADERS.
185 /// Per MS-TDS spec, when in an explicit transaction, the descriptor
186 /// returned by BeginTransaction must be included.
187 ///
188 /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
189 /// is included in the first packet to reset connection state.
190 async fn send_sql_batch(&mut self, sql: &str) -> Result<()> {
191 let payload =
192 tds_protocol::encode_sql_batch_with_transaction(sql, self.transaction_descriptor);
193 let max_packet = self.config.packet_size as usize;
194
195 // Check if we need to reset the connection on this request
196 let reset = self.needs_reset;
197 if reset {
198 self.needs_reset = false; // Clear flag before sending
199 tracing::debug!("sending SQL batch with RESETCONNECTION flag");
200 }
201
202 self.in_flight = true;
203 let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
204
205 match connection {
206 #[cfg(feature = "tls")]
207 ConnectionHandle::Tls(conn) => {
208 conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
209 .await?;
210 }
211 #[cfg(feature = "tls")]
212 ConnectionHandle::TlsPrelogin(conn) => {
213 conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
214 .await?;
215 }
216 ConnectionHandle::Plain(conn) => {
217 conn.send_message_with_reset(PacketType::SqlBatch, payload, max_packet, reset)
218 .await?;
219 }
220 }
221
222 Ok(())
223 }
224
225 /// Send an RPC request to the server.
226 ///
227 /// Uses the client's current transaction descriptor in ALL_HEADERS.
228 ///
229 /// If `needs_reset` is set (from pool return), the RESETCONNECTION flag
230 /// is included in the first packet to reset connection state.
231 pub(crate) async fn send_rpc(&mut self, rpc: &RpcRequest) -> Result<()> {
232 let payload = rpc.encode_with_transaction(self.transaction_descriptor);
233 let max_packet = self.config.packet_size as usize;
234
235 // Check if we need to reset the connection on this request
236 let reset = self.needs_reset;
237 if reset {
238 self.needs_reset = false; // Clear flag before sending
239 tracing::debug!("sending RPC with RESETCONNECTION flag");
240 }
241
242 self.in_flight = true;
243 let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
244
245 match connection {
246 #[cfg(feature = "tls")]
247 ConnectionHandle::Tls(conn) => {
248 conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
249 .await?;
250 }
251 #[cfg(feature = "tls")]
252 ConnectionHandle::TlsPrelogin(conn) => {
253 conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
254 .await?;
255 }
256 ConnectionHandle::Plain(conn) => {
257 conn.send_message_with_reset(PacketType::Rpc, payload, max_packet, reset)
258 .await?;
259 }
260 }
261
262 Ok(())
263 }
264
265 /// Start building a stored procedure call with full control over parameters.
266 ///
267 /// Returns a [`crate::procedure::ProcedureBuilder`] that allows adding named input and output
268 /// parameters before executing the call.
269 ///
270 /// The procedure name is validated to prevent SQL injection. It may be
271 /// schema-qualified (e.g., `"dbo.MyProc"`).
272 ///
273 /// # Example
274 ///
275 /// ```rust,no_run
276 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
277 /// let result = client.procedure("dbo.CalculateSum")?
278 /// .input("@a", &10i32)
279 /// .input("@b", &20i32)
280 /// .output_int("@result")
281 /// .execute().await?;
282 ///
283 /// let sum = result.get_output("@result").unwrap();
284 /// # let _ = sum;
285 /// # Ok(())
286 /// # }
287 /// ```
288 pub fn procedure(
289 &mut self,
290 proc_name: &str,
291 ) -> Result<crate::procedure::ProcedureBuilder<'_, S>> {
292 crate::validation::validate_qualified_identifier(proc_name)?;
293 Ok(crate::procedure::ProcedureBuilder::new(self, proc_name))
294 }
295
296 /// Execute a stored procedure with positional input parameters.
297 ///
298 /// This is a convenience method for the common case of calling a procedure
299 /// with input-only parameters. For output parameters or named parameters,
300 /// use [`procedure()`](Client::procedure) instead.
301 ///
302 /// # Example
303 ///
304 /// ```rust,no_run
305 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
306 /// let result = client.call_procedure("dbo.GetUser", &[&1i32]).await?;
307 /// assert_eq!(result.return_value, 0);
308 ///
309 /// if let Some(rs) = result.first_result_set() {
310 /// println!("columns: {:?}", rs.columns());
311 /// }
312 /// # Ok(())
313 /// # }
314 /// ```
315 pub async fn call_procedure(
316 &mut self,
317 proc_name: &str,
318 params: &[&(dyn crate::ToSql + Sync)],
319 ) -> Result<crate::stream::ProcedureResult> {
320 crate::validation::validate_qualified_identifier(proc_name)?;
321
322 tracing::debug!(
323 proc_name = proc_name,
324 params_count = params.len(),
325 "executing stored procedure"
326 );
327
328 let rpc_params =
329 Self::convert_params_positional(params, self.send_unicode(), self.server_collation())?;
330 let mut rpc = RpcRequest::named(proc_name);
331 for param in rpc_params {
332 rpc = rpc.param(param);
333 }
334
335 self.send_rpc(&rpc).await?;
336 self.read_procedure_result().await
337 }
338
339 /// Start a bulk insert operation for the specified table.
340 ///
341 /// Sends the `INSERT BULK` statement to the server and returns a
342 /// [`crate::bulk::BulkWriter`] for streaming rows. The writer holds
343 /// a mutable borrow on the client, preventing other operations while
344 /// the bulk insert is in progress.
345 ///
346 /// # Example
347 ///
348 /// ```rust,no_run
349 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
350 /// use mssql_client::{BulkInsertBuilder, BulkColumn, SqlValue};
351 ///
352 /// let builder = BulkInsertBuilder::new("dbo.Users")
353 /// .with_typed_columns(vec![
354 /// BulkColumn::new("id", "INT", 0)?,
355 /// BulkColumn::new("name", "NVARCHAR(100)", 1)?,
356 /// ]);
357 ///
358 /// let mut writer = client.bulk_insert(&builder).await?;
359 /// writer.send_row_values(&[SqlValue::Int(1), SqlValue::String("Alice".into())])?;
360 /// writer.send_row_values(&[SqlValue::Int(2), SqlValue::String("Bob".into())])?;
361 /// let result = writer.finish().await?;
362 /// println!("Inserted {} rows", result.rows_affected);
363 /// # Ok(())
364 /// # }
365 /// ```
366 pub async fn bulk_insert(
367 &mut self,
368 builder: &crate::bulk::BulkInsertBuilder,
369 ) -> Result<crate::bulk::BulkWriter<'_, S>> {
370 use tds_protocol::token::{ColMetaData, Token};
371
372 tracing::debug!(
373 table = builder.table_name(),
374 columns = builder.columns().len(),
375 "starting bulk insert"
376 );
377
378 // Step 1: Query the server for column metadata.
379 // This gives us the exact type encoding the server expects for BulkLoad,
380 // following the pattern established by Tiberius.
381 let meta_query = format!("SELECT TOP 0 * FROM {}", builder.table_name());
382 self.send_sql_batch(&meta_query).await?;
383
384 let message = self.read_response_message().await?;
385 self.in_flight = false;
386
387 // Capture both the raw COLMETADATA bytes and parsed column info
388 let raw_payload = message.payload.clone();
389 let mut parser = self.create_parser(message.payload);
390 let mut server_metadata: Option<ColMetaData> = None;
391 let mut meta_start: usize = 0;
392 let mut meta_end: usize = 0;
393
394 loop {
395 let pos_before = raw_payload.len() - parser.remaining();
396 let token = parser.next_token_with_metadata(server_metadata.as_ref())?;
397 let pos_after = raw_payload.len() - parser.remaining();
398 let Some(token) = token else { break };
399
400 match token {
401 Token::ColMetaData(meta) => {
402 meta_start = pos_before;
403 meta_end = pos_after;
404 server_metadata = Some(meta);
405 }
406 Token::Done(_) => break,
407 _ => {}
408 }
409 }
410
411 // Reject deprecated TEXT/NTEXT/IMAGE columns reported by the server.
412 // These types require a legacy TEXTPTR wire format that this driver
413 // does not support — users should migrate the column to VARCHAR(MAX) /
414 // NVARCHAR(MAX) / VARBINARY(MAX).
415 if let Some(ref meta) = server_metadata {
416 use tds_protocol::types::TypeId;
417 for col in meta.columns.iter() {
418 let (rejected, replacement) = match col.type_id {
419 TypeId::Text => (Some("TEXT"), "VARCHAR(MAX)"),
420 TypeId::NText => (Some("NTEXT"), "NVARCHAR(MAX)"),
421 TypeId::Image => (Some("IMAGE"), "VARBINARY(MAX)"),
422 _ => (None, ""),
423 };
424 if let Some(sql_type) = rejected {
425 return Err(Error::from(mssql_types::TypeError::UnsupportedType {
426 sql_type: sql_type.to_string(),
427 reason: format!(
428 "column `{}` in table `{}` is {} — TEXT/NTEXT/IMAGE \
429 are not supported. Alter the column to {} instead \
430 (Microsoft deprecated TEXT/NTEXT/IMAGE in SQL \
431 Server 2005).",
432 col.name,
433 builder.table_name(),
434 sql_type,
435 replacement,
436 ),
437 }));
438 }
439 }
440 }
441
442 // Step 2: Send INSERT BULK statement to put server in bulk load mode
443 let stmt = builder.build_insert_bulk_statement()?;
444 self.send_sql_batch(&stmt).await?;
445 self.read_execute_result().await?;
446
447 // Step 3: Create bulk writer with server's metadata
448 let raw_meta = if meta_end > meta_start {
449 Some(raw_payload.slice(meta_start..meta_end))
450 } else {
451 None
452 };
453
454 let server_cols = server_metadata.as_ref().map(|m| m.columns.as_slice());
455 let bulk = crate::bulk::BulkInsert::new_with_server_metadata(
456 builder.columns().to_vec(),
457 builder.options().batch_size,
458 raw_meta,
459 server_cols,
460 );
461
462 Ok(crate::bulk::BulkWriter::new(self, bulk))
463 }
464
465 /// Start a bulk insert without querying the server for column metadata.
466 ///
467 /// Unlike [`bulk_insert()`](Self::bulk_insert), this method does not send
468 /// `SELECT TOP 0 * FROM table` to discover column types. Instead, the
469 /// column metadata is constructed from the `BulkColumn` types provided
470 /// on the builder. This saves a round-trip when the schema is known.
471 ///
472 /// # Caveats
473 ///
474 /// The caller must ensure `BulkColumn` entries match the target table's
475 /// column definitions exactly. Mismatched types, lengths, precision/scale,
476 /// or column ordering will cause the server to reject the BulkLoad packet.
477 ///
478 /// For most use cases, prefer [`bulk_insert()`](Self::bulk_insert) — the
479 /// extra round-trip is usually negligible and the server-supplied metadata
480 /// is guaranteed correct.
481 pub async fn bulk_insert_without_schema_discovery(
482 &mut self,
483 builder: &crate::bulk::BulkInsertBuilder,
484 ) -> Result<crate::bulk::BulkWriter<'_, S>> {
485 tracing::debug!(
486 table = builder.table_name(),
487 columns = builder.columns().len(),
488 "starting bulk insert (no schema discovery)"
489 );
490
491 // Send INSERT BULK statement to put server in bulk load mode
492 let stmt = builder.build_insert_bulk_statement()?;
493 self.send_sql_batch(&stmt).await?;
494 self.read_execute_result().await?;
495
496 // Create bulk writer with hand-crafted metadata
497 let bulk =
498 crate::bulk::BulkInsert::new(builder.columns().to_vec(), builder.options().batch_size);
499
500 Ok(crate::bulk::BulkWriter::new(self, bulk))
501 }
502
503 /// Send bulk load data as a BulkLoad (0x07) message and read the server response.
504 ///
505 /// Used internally by [`crate::bulk::BulkWriter::finish()`] to transmit accumulated
506 /// row data after the `INSERT BULK` statement has been acknowledged.
507 pub(crate) async fn send_and_read_bulk_load(&mut self, payload: bytes::Bytes) -> Result<u64> {
508 let max_packet = self.config.packet_size as usize;
509
510 self.in_flight = true;
511 let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
512
513 match connection {
514 #[cfg(feature = "tls")]
515 ConnectionHandle::Tls(conn) => {
516 conn.send_message(PacketType::BulkLoad, payload, max_packet)
517 .await?;
518 }
519 #[cfg(feature = "tls")]
520 ConnectionHandle::TlsPrelogin(conn) => {
521 conn.send_message(PacketType::BulkLoad, payload, max_packet)
522 .await?;
523 }
524 ConnectionHandle::Plain(conn) => {
525 conn.send_message(PacketType::BulkLoad, payload, max_packet)
526 .await?;
527 }
528 }
529
530 // Read the server's Done response with row count
531 self.read_execute_result().await
532 }
533
534 /// Execute a query with named parameters and return a streaming result set.
535 ///
536 /// This method accepts [`NamedParam`](crate::to_params::NamedParam) values,
537 /// making it compatible with the [`ToParams`](crate::to_params::ToParams) trait
538 /// and the `#[derive(ToParams)]` macro.
539 ///
540 /// # Example
541 ///
542 /// ```rust,no_run
543 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
544 /// use mssql_client::{NamedParam, ToParams};
545 ///
546 /// // With derive macro:
547 /// #[derive(mssql_derive::ToParams)]
548 /// struct UserQuery { name: String }
549 ///
550 /// let q = UserQuery { name: "Alice".into() };
551 /// let rows = client.query_named(
552 /// "SELECT * FROM users WHERE name = @name",
553 /// &q.to_params()?,
554 /// ).await?;
555 ///
556 /// // Or manually:
557 /// let params = vec![NamedParam::from_value("name", &"Alice")?];
558 /// let rows = client.query_named(
559 /// "SELECT * FROM users WHERE name = @name",
560 /// ¶ms,
561 /// ).await?;
562 /// # let _ = rows;
563 /// # Ok(())
564 /// # }
565 /// ```
566 pub async fn query_named<'a>(
567 &'a mut self,
568 sql: &str,
569 params: &[crate::to_params::NamedParam],
570 ) -> Result<QueryStream<'a>> {
571 tracing::debug!(
572 sql = sql,
573 params_count = params.len(),
574 "executing query with named parameters"
575 );
576
577 if params.is_empty() {
578 self.send_sql_batch(sql).await?;
579 } else {
580 let rpc_params =
581 Self::convert_named_params(params, self.send_unicode(), self.server_collation())?;
582 let rpc = RpcRequest::execute_sql(sql, rpc_params);
583 self.send_rpc(&rpc).await?;
584 }
585
586 let resp = self.read_query_response().await?;
587 #[cfg(feature = "always-encrypted")]
588 {
589 Ok(QueryStream::from_raw(
590 resp.columns,
591 resp.pending_rows,
592 resp.meta,
593 resp.decryptor,
594 ))
595 }
596 #[cfg(not(feature = "always-encrypted"))]
597 {
598 Ok(QueryStream::from_raw(
599 resp.columns,
600 resp.pending_rows,
601 resp.meta,
602 ))
603 }
604 }
605
606 /// Execute a statement with named parameters.
607 ///
608 /// Returns the number of affected rows. This is the named-parameter
609 /// counterpart of [`execute()`](Client::execute), compatible with the
610 /// [`ToParams`](crate::to_params::ToParams) trait.
611 ///
612 /// # Example
613 ///
614 /// ```rust,no_run
615 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
616 /// use mssql_client::NamedParam;
617 ///
618 /// let params = vec![
619 /// NamedParam::from_value("name", &"Alice")?,
620 /// NamedParam::from_value("email", &"alice@example.com")?,
621 /// ];
622 /// let rows_affected = client.execute_named(
623 /// "INSERT INTO users (name, email) VALUES (@name, @email)",
624 /// ¶ms,
625 /// ).await?;
626 /// # let _ = rows_affected;
627 /// # Ok(())
628 /// # }
629 /// ```
630 pub async fn execute_named(
631 &mut self,
632 sql: &str,
633 params: &[crate::to_params::NamedParam],
634 ) -> Result<u64> {
635 tracing::debug!(
636 sql = sql,
637 params_count = params.len(),
638 "executing statement with named parameters"
639 );
640
641 if params.is_empty() {
642 self.send_sql_batch(sql).await?;
643 } else {
644 let rpc_params =
645 Self::convert_named_params(params, self.send_unicode(), self.server_collation())?;
646 let rpc = RpcRequest::execute_sql(sql, rpc_params);
647 self.send_rpc(&rpc).await?;
648 }
649
650 self.read_execute_result().await
651 }
652
653 /// Whether string parameters are sent as NVARCHAR (Unicode).
654 pub(crate) fn send_unicode(&self) -> bool {
655 self.config.send_string_parameters_as_unicode
656 }
657
658 /// Server's default collation, captured from ENVCHANGE during login.
659 pub(crate) fn server_collation(&self) -> Option<&tds_protocol::token::Collation> {
660 self.server_collation.as_ref()
661 }
662}
663
664impl Client<Ready> {
665 /// Mark this connection as needing a reset on next use.
666 ///
667 /// Called by the connection pool when a connection is returned.
668 /// The next SQL batch or RPC will include the RESETCONNECTION flag
669 /// in the TDS packet header, causing SQL Server to reset connection
670 /// state (temp tables, SET options, transaction isolation level, etc.)
671 /// before executing the command.
672 ///
673 /// This is more efficient than calling `sp_reset_connection` as a
674 /// separate command because it's handled at the TDS protocol level.
675 pub fn mark_needs_reset(&mut self) {
676 self.needs_reset = true;
677 }
678
679 /// Check if this connection needs a reset.
680 ///
681 /// Returns true if `mark_needs_reset()` was called and the reset
682 /// hasn't been performed yet.
683 #[must_use]
684 pub fn needs_reset(&self) -> bool {
685 self.needs_reset
686 }
687
688 /// Execute a query and return a result set with lazy per-row decoding.
689 ///
690 /// Per ADR-007 the full response is buffered in memory and each row is
691 /// *decoded* on demand as you iterate — this is not incremental network
692 /// streaming, so peak memory tracks the response size. Use
693 /// `.collect_all()` if you want all rows materialized into a `Vec` up
694 /// front.
695 ///
696 /// # Example
697 ///
698 /// ```rust,no_run
699 /// # use mssql_client::Row;
700 /// # fn process(_: &Row) {}
701 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
702 /// // Streaming (synchronous iteration over the result set)
703 /// let stream = client.query("SELECT * FROM users WHERE id = @p1", &[&1]).await?;
704 /// for row in stream {
705 /// let row = row?;
706 /// process(&row);
707 /// }
708 ///
709 /// // Buffered (loads all into memory)
710 /// let rows: Vec<Row> = client
711 /// .query("SELECT * FROM small_table", &[])
712 /// .await?
713 /// .collect_all()
714 /// .await?;
715 /// # let _ = rows;
716 /// # Ok(())
717 /// # }
718 /// ```
719 pub async fn query<'a>(
720 &'a mut self,
721 sql: &str,
722 params: &[&(dyn crate::ToSql + Sync)],
723 ) -> Result<QueryStream<'a>> {
724 let deadline = self.command_deadline();
725 self.query_inner(sql, params, deadline).await
726 }
727
728 /// Shared query implementation with an explicit command deadline.
729 async fn query_inner<'a>(
730 &'a mut self,
731 sql: &str,
732 params: &[&(dyn crate::ToSql + Sync)],
733 deadline: Option<std::time::Duration>,
734 ) -> Result<QueryStream<'a>> {
735 tracing::debug!(sql = sql, params_count = params.len(), "executing query");
736
737 #[cfg(feature = "otel")]
738 let instrumentation = self.instrumentation.clone();
739 #[cfg(feature = "otel")]
740 let mut span = instrumentation.query_span(sql);
741 #[cfg(feature = "otel")]
742 let timer = crate::instrumentation::OperationTimer::start(
743 crate::instrumentation::extract_operation(sql),
744 );
745
746 let canceller = self.cancel_handle();
747 let result = run_with_deadline(
748 async {
749 if params.is_empty() {
750 // Simple query without parameters - use SQL batch
751 self.send_sql_batch(sql).await?;
752 } else {
753 // Parameterized query - use sp_executesql via RPC
754 let rpc_params =
755 Self::convert_params(params, self.send_unicode(), self.server_collation())?;
756 let rpc = RpcRequest::execute_sql(sql, rpc_params);
757 self.send_rpc(&rpc).await?;
758 }
759
760 // Read complete response including columns and rows
761 self.read_query_response().await
762 },
763 deadline,
764 canceller,
765 )
766 .await;
767
768 #[cfg(feature = "otel")]
769 match &result {
770 Ok(_) => InstrumentationContext::record_success(&mut span, None),
771 Err(e) => InstrumentationContext::record_error(&mut span, e),
772 }
773 #[cfg(feature = "otel")]
774 timer.finish(instrumentation.metrics(), result.is_ok());
775
776 // Drop the span before returning
777 #[cfg(feature = "otel")]
778 drop(span);
779
780 let resp = result?;
781 #[cfg(feature = "always-encrypted")]
782 {
783 Ok(QueryStream::from_raw(
784 resp.columns,
785 resp.pending_rows,
786 resp.meta,
787 resp.decryptor,
788 ))
789 }
790 #[cfg(not(feature = "always-encrypted"))]
791 {
792 Ok(QueryStream::from_raw(
793 resp.columns,
794 resp.pending_rows,
795 resp.meta,
796 ))
797 }
798 }
799
800 /// Execute a query with a specific timeout.
801 ///
802 /// This overrides the default `command_timeout` from the connection configuration
803 /// for this specific query. If the query does not complete within the specified
804 /// duration, the driver sends an Attention packet to cancel it server-side,
805 /// drains the acknowledgement, and returns [`Error::CommandTimeout`] with the
806 /// connection left usable for the next request.
807 ///
808 /// # Arguments
809 ///
810 /// * `sql` - The SQL query to execute
811 /// * `params` - Query parameters
812 /// * `timeout_duration` - Maximum time to wait for the query to complete
813 ///
814 /// # Example
815 ///
816 /// ```rust,no_run
817 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
818 /// use std::time::Duration;
819 ///
820 /// // Execute with a 5-second timeout
821 /// let rows = client
822 /// .query_with_timeout(
823 /// "SELECT * FROM large_table",
824 /// &[],
825 /// Duration::from_secs(5),
826 /// )
827 /// .await?;
828 /// # let _ = rows;
829 /// # Ok(())
830 /// # }
831 /// ```
832 pub async fn query_with_timeout<'a>(
833 &'a mut self,
834 sql: &str,
835 params: &[&(dyn crate::ToSql + Sync)],
836 timeout_duration: std::time::Duration,
837 ) -> Result<QueryStream<'a>> {
838 self.query_inner(sql, params, Some(timeout_duration)).await
839 }
840
841 /// Execute a batch that may return multiple result sets.
842 ///
843 /// This is useful for stored procedures or SQL batches that contain
844 /// multiple SELECT statements.
845 ///
846 /// # Example
847 ///
848 /// ```rust,no_run
849 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
850 /// // Execute a batch with multiple SELECT statements
851 /// let mut results = client.query_multiple(
852 /// "SELECT 1 AS a; SELECT 2 AS b, 3 AS c;",
853 /// &[]
854 /// ).await?;
855 ///
856 /// // Process first result set
857 /// while let Some(row) = results.next_row().await? {
858 /// println!("Result 1: {:?}", row);
859 /// }
860 ///
861 /// // Move to second result set
862 /// if results.next_result().await? {
863 /// while let Some(row) = results.next_row().await? {
864 /// println!("Result 2: {:?}", row);
865 /// }
866 /// }
867 /// # Ok(())
868 /// # }
869 /// ```
870 pub async fn query_multiple<'a>(
871 &'a mut self,
872 sql: &str,
873 params: &[&(dyn crate::ToSql + Sync)],
874 ) -> Result<MultiResultStream<'a>> {
875 tracing::debug!(
876 sql = sql,
877 params_count = params.len(),
878 "executing multi-result query"
879 );
880
881 if params.is_empty() {
882 // Simple batch without parameters - use SQL batch
883 self.send_sql_batch(sql).await?;
884 } else {
885 // Parameterized query - use sp_executesql via RPC
886 let rpc_params =
887 Self::convert_params(params, self.send_unicode(), self.server_collation())?;
888 let rpc = RpcRequest::execute_sql(sql, rpc_params);
889 self.send_rpc(&rpc).await?;
890 }
891
892 // Read all result sets
893 let result_sets = self.read_multi_result_response().await?;
894 Ok(MultiResultStream::new(result_sets))
895 }
896
897 /// Execute a query that doesn't return rows.
898 ///
899 /// Returns the number of affected rows.
900 pub async fn execute(
901 &mut self,
902 sql: &str,
903 params: &[&(dyn crate::ToSql + Sync)],
904 ) -> Result<u64> {
905 let deadline = self.command_deadline();
906 self.execute_inner(sql, params, deadline).await
907 }
908
909 /// Shared execute implementation with an explicit command deadline.
910 async fn execute_inner(
911 &mut self,
912 sql: &str,
913 params: &[&(dyn crate::ToSql + Sync)],
914 deadline: Option<std::time::Duration>,
915 ) -> Result<u64> {
916 tracing::debug!(
917 sql = sql,
918 params_count = params.len(),
919 "executing statement"
920 );
921
922 #[cfg(feature = "otel")]
923 let instrumentation = self.instrumentation.clone();
924 #[cfg(feature = "otel")]
925 let mut span = instrumentation.query_span(sql);
926 #[cfg(feature = "otel")]
927 let timer = crate::instrumentation::OperationTimer::start(
928 crate::instrumentation::extract_operation(sql),
929 );
930
931 let canceller = self.cancel_handle();
932 let result = run_with_deadline(
933 async {
934 if params.is_empty() {
935 // Simple statement without parameters - use SQL batch
936 self.send_sql_batch(sql).await?;
937 } else {
938 // Parameterized statement - use sp_executesql via RPC
939 let rpc_params =
940 Self::convert_params(params, self.send_unicode(), self.server_collation())?;
941 let rpc = RpcRequest::execute_sql(sql, rpc_params);
942 self.send_rpc(&rpc).await?;
943 }
944
945 // Read response and get row count
946 self.read_execute_result().await
947 },
948 deadline,
949 canceller,
950 )
951 .await;
952
953 #[cfg(feature = "otel")]
954 match &result {
955 Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
956 Err(e) => InstrumentationContext::record_error(&mut span, e),
957 }
958 #[cfg(feature = "otel")]
959 timer.finish(instrumentation.metrics(), result.is_ok());
960
961 // Drop the span before returning
962 #[cfg(feature = "otel")]
963 drop(span);
964
965 result
966 }
967
968 /// Execute a statement with a specific timeout.
969 ///
970 /// This overrides the default `command_timeout` from the connection configuration
971 /// for this specific statement. If the statement does not complete within the
972 /// specified duration, the driver sends an Attention packet to cancel it
973 /// server-side, drains the acknowledgement, and returns
974 /// [`Error::CommandTimeout`] with the connection left usable.
975 ///
976 /// # Arguments
977 ///
978 /// * `sql` - The SQL statement to execute
979 /// * `params` - Statement parameters
980 /// * `timeout_duration` - Maximum time to wait for the statement to complete
981 ///
982 /// # Example
983 ///
984 /// ```rust,no_run
985 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
986 /// use std::time::Duration;
987 ///
988 /// // Execute with a 10-second timeout
989 /// let rows_affected = client
990 /// .execute_with_timeout(
991 /// "UPDATE large_table SET status = @p1",
992 /// &[&"processed"],
993 /// Duration::from_secs(10),
994 /// )
995 /// .await?;
996 /// # let _ = rows_affected;
997 /// # Ok(())
998 /// # }
999 /// ```
1000 pub async fn execute_with_timeout(
1001 &mut self,
1002 sql: &str,
1003 params: &[&(dyn crate::ToSql + Sync)],
1004 timeout_duration: std::time::Duration,
1005 ) -> Result<u64> {
1006 self.execute_inner(sql, params, Some(timeout_duration))
1007 .await
1008 }
1009
1010 /// Begin a transaction.
1011 ///
1012 /// This transitions the client from `Ready` to `InTransaction` state.
1013 /// Per MS-TDS spec, the server returns a transaction descriptor in the
1014 /// BeginTransaction EnvChange token that must be included in subsequent
1015 /// ALL_HEADERS sections.
1016 pub async fn begin_transaction(mut self) -> Result<Client<InTransaction>> {
1017 tracing::debug!("beginning transaction");
1018
1019 #[cfg(feature = "otel")]
1020 let instrumentation = self.instrumentation.clone();
1021 #[cfg(feature = "otel")]
1022 let mut span = instrumentation.transaction_span("BEGIN");
1023
1024 // Execute BEGIN TRANSACTION and extract the transaction descriptor
1025 let result = async {
1026 self.send_sql_batch("BEGIN TRANSACTION").await?;
1027 self.read_transaction_begin_result().await
1028 }
1029 .await;
1030
1031 #[cfg(feature = "otel")]
1032 match &result {
1033 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1034 Err(e) => InstrumentationContext::record_error(&mut span, e),
1035 }
1036
1037 // Drop the span before moving instrumentation
1038 #[cfg(feature = "otel")]
1039 drop(span);
1040
1041 let transaction_descriptor = result?;
1042
1043 Ok(Client {
1044 config: self.config,
1045 _state: PhantomData,
1046 connection: self.connection,
1047 server_version: self.server_version,
1048 current_database: self.current_database,
1049 server_collation: self.server_collation,
1050 statement_cache: self.statement_cache,
1051 transaction_descriptor, // Store the descriptor from server
1052 needs_reset: self.needs_reset,
1053 in_flight: self.in_flight,
1054 #[cfg(feature = "otel")]
1055 instrumentation: self.instrumentation,
1056 #[cfg(feature = "always-encrypted")]
1057 encryption_context: self.encryption_context,
1058 })
1059 }
1060
1061 /// Begin a transaction with a specific isolation level.
1062 ///
1063 /// This transitions the client from `Ready` to `InTransaction` state
1064 /// with the specified isolation level.
1065 ///
1066 /// # Example
1067 ///
1068 /// ```rust,no_run
1069 /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1070 /// use mssql_client::IsolationLevel;
1071 ///
1072 /// let tx = client.begin_transaction_with_isolation(IsolationLevel::Serializable).await?;
1073 /// // All operations in this transaction use SERIALIZABLE isolation
1074 /// tx.commit().await?;
1075 /// # Ok(())
1076 /// # }
1077 /// ```
1078 pub async fn begin_transaction_with_isolation(
1079 mut self,
1080 isolation_level: crate::transaction::IsolationLevel,
1081 ) -> Result<Client<InTransaction>> {
1082 tracing::debug!(
1083 isolation_level = %isolation_level.name(),
1084 "beginning transaction with isolation level"
1085 );
1086
1087 #[cfg(feature = "otel")]
1088 let instrumentation = self.instrumentation.clone();
1089 #[cfg(feature = "otel")]
1090 let mut span = instrumentation.transaction_span("BEGIN");
1091
1092 // First set the isolation level
1093 let result = async {
1094 self.send_sql_batch(isolation_level.as_sql()).await?;
1095 self.read_execute_result().await?;
1096
1097 // Then begin the transaction
1098 self.send_sql_batch("BEGIN TRANSACTION").await?;
1099 self.read_transaction_begin_result().await
1100 }
1101 .await;
1102
1103 #[cfg(feature = "otel")]
1104 match &result {
1105 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1106 Err(e) => InstrumentationContext::record_error(&mut span, e),
1107 }
1108
1109 #[cfg(feature = "otel")]
1110 drop(span);
1111
1112 let transaction_descriptor = result?;
1113
1114 Ok(Client {
1115 config: self.config,
1116 _state: PhantomData,
1117 connection: self.connection,
1118 server_version: self.server_version,
1119 current_database: self.current_database,
1120 server_collation: self.server_collation,
1121 statement_cache: self.statement_cache,
1122 transaction_descriptor,
1123 needs_reset: self.needs_reset,
1124 in_flight: self.in_flight,
1125 #[cfg(feature = "otel")]
1126 instrumentation: self.instrumentation,
1127 #[cfg(feature = "always-encrypted")]
1128 encryption_context: self.encryption_context,
1129 })
1130 }
1131
1132 /// Execute a simple query without parameters.
1133 ///
1134 /// This is useful for DDL statements and simple queries where you
1135 /// don't need to retrieve the affected row count.
1136 pub async fn simple_query(&mut self, sql: &str) -> Result<()> {
1137 tracing::debug!(sql = sql, "executing simple query");
1138
1139 // Send SQL batch
1140 self.send_sql_batch(sql).await?;
1141
1142 // Read and discard response
1143 let _ = self.read_execute_result().await?;
1144
1145 Ok(())
1146 }
1147
1148 /// Close the connection gracefully.
1149 pub async fn close(self) -> Result<()> {
1150 tracing::debug!("closing connection");
1151 Ok(())
1152 }
1153
1154 /// Get the current database name.
1155 #[must_use]
1156 pub fn database(&self) -> Option<&str> {
1157 self.config.database.as_deref()
1158 }
1159
1160 /// Get the server host.
1161 #[must_use]
1162 pub fn host(&self) -> &str {
1163 &self.config.host
1164 }
1165
1166 /// Get the server port.
1167 #[must_use]
1168 pub fn port(&self) -> u16 {
1169 self.config.port
1170 }
1171
1172 /// Check if the connection is currently in a transaction.
1173 ///
1174 /// This returns `true` if a transaction was started via raw SQL
1175 /// (`BEGIN TRANSACTION`) and has not yet been committed or rolled back.
1176 ///
1177 /// Note: This only tracks transactions started via raw SQL. Transactions
1178 /// started via the type-state API (`begin_transaction()`) result in a
1179 /// `Client<InTransaction>` which is a different type.
1180 ///
1181 /// # Example
1182 ///
1183 /// ```rust,no_run
1184 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1185 /// client.execute("BEGIN TRANSACTION", &[]).await?;
1186 /// assert!(client.is_in_transaction());
1187 ///
1188 /// client.execute("COMMIT", &[]).await?;
1189 /// assert!(!client.is_in_transaction());
1190 /// # Ok(())
1191 /// # }
1192 /// ```
1193 #[must_use]
1194 pub fn is_in_transaction(&self) -> bool {
1195 self.transaction_descriptor != 0
1196 }
1197
1198 /// Check if a request is in-flight (sent but response not fully read).
1199 ///
1200 /// Used by the connection pool to detect dirty connections that were
1201 /// interrupted mid-query (e.g., by `tokio::select!` or a timeout).
1202 /// A connection with an in-flight request has unread data in the TCP
1203 /// buffer and must be discarded rather than returned to the pool.
1204 #[must_use]
1205 pub fn is_in_flight(&self) -> bool {
1206 self.in_flight
1207 }
1208
1209 /// Report whether an Always Encrypted key-store provider with the given
1210 /// name is currently reachable through this client's encryption context.
1211 ///
1212 /// Returns `false` when the `always-encrypted` feature isn't enabled, when
1213 /// the connection was opened without `column_encryption` configured, or
1214 /// when no matching provider was registered.
1215 #[cfg(feature = "always-encrypted")]
1216 #[must_use]
1217 pub fn has_encryption_provider(&self, name: &str) -> bool {
1218 self.encryption_context
1219 .as_ref()
1220 .is_some_and(|ctx| ctx.has_provider(name))
1221 }
1222
1223 /// Get a handle for cancelling the current query.
1224 ///
1225 /// The cancel handle can be cloned and sent to other tasks, enabling
1226 /// cancellation of long-running queries from a separate async context.
1227 ///
1228 /// # Example
1229 ///
1230 /// ```rust,no_run
1231 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1232 /// use std::time::Duration;
1233 ///
1234 /// let cancel_handle = client.cancel_handle();
1235 ///
1236 /// // Spawn a task to cancel after 10 seconds
1237 /// let handle = tokio::spawn(async move {
1238 /// tokio::time::sleep(Duration::from_secs(10)).await;
1239 /// let _ = cancel_handle.cancel().await;
1240 /// });
1241 ///
1242 /// // This query will be cancelled if it runs longer than 10 seconds
1243 /// let result = client.query("SELECT * FROM very_large_table", &[]).await;
1244 /// # let _ = (handle, result);
1245 /// # Ok(())
1246 /// # }
1247 /// ```
1248 #[must_use]
1249 pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
1250 let connection = self
1251 .connection
1252 .as_ref()
1253 .expect("connection should be present");
1254 match connection {
1255 #[cfg(feature = "tls")]
1256 ConnectionHandle::Tls(conn) => {
1257 crate::cancel::CancelHandle::from_tls(conn.cancel_handle())
1258 }
1259 #[cfg(feature = "tls")]
1260 ConnectionHandle::TlsPrelogin(conn) => {
1261 crate::cancel::CancelHandle::from_tls_prelogin(conn.cancel_handle())
1262 }
1263 ConnectionHandle::Plain(conn) => {
1264 crate::cancel::CancelHandle::from_plain(conn.cancel_handle())
1265 }
1266 }
1267 }
1268}
1269
1270/// # Drop Behavior
1271///
1272/// **`Client<InTransaction>` has no automatic rollback on drop.** If the client is
1273/// dropped without calling [`commit()`](Client::commit) or [`rollback()`](Client::rollback),
1274/// the transaction remains open on the server until the TCP connection closes
1275/// (at which point SQL Server automatically rolls back).
1276///
1277/// This is because `Drop` is synchronous and cannot perform the async I/O needed
1278/// to send a `ROLLBACK TRANSACTION` command.
1279///
1280/// ## Consequences of dropping without commit/rollback
1281///
1282/// - **Direct connections:** The transaction leaks until the OS TCP timeout
1283/// (potentially 30+ minutes), holding locks on any modified rows.
1284/// - **Pooled connections:** The pool detects the active transaction descriptor
1285/// and discards the connection rather than returning it to the idle pool
1286/// (see `PooledConnection::drop` in `mssql-driver-pool`).
1287///
1288/// ## Best practice
1289///
1290/// Always ensure `commit()` or `rollback()` is called. Use helper patterns
1291/// for error paths:
1292///
1293/// ```rust,no_run
1294/// # async fn do_work(_: &mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> { Ok(()) }
1295/// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1296/// let tx = client.begin_transaction().await?;
1297/// match do_work(&tx).await {
1298/// Ok(_) => { tx.commit().await?; }
1299/// Err(e) => { tx.rollback().await?; return Err(e); }
1300/// }
1301/// # Ok(())
1302/// # }
1303/// ```
1304impl Client<InTransaction> {
1305 /// Execute a query within the transaction and return a streaming result set.
1306 ///
1307 /// See [`Client<Ready>::query`] for usage examples.
1308 pub async fn query<'a>(
1309 &'a mut self,
1310 sql: &str,
1311 params: &[&(dyn crate::ToSql + Sync)],
1312 ) -> Result<QueryStream<'a>> {
1313 let deadline = self.command_deadline();
1314 self.query_inner(sql, params, deadline).await
1315 }
1316
1317 /// Shared query implementation with an explicit command deadline.
1318 async fn query_inner<'a>(
1319 &'a mut self,
1320 sql: &str,
1321 params: &[&(dyn crate::ToSql + Sync)],
1322 deadline: Option<std::time::Duration>,
1323 ) -> Result<QueryStream<'a>> {
1324 tracing::debug!(
1325 sql = sql,
1326 params_count = params.len(),
1327 "executing query in transaction"
1328 );
1329
1330 #[cfg(feature = "otel")]
1331 let instrumentation = self.instrumentation.clone();
1332 #[cfg(feature = "otel")]
1333 let mut span = instrumentation.query_span(sql);
1334 #[cfg(feature = "otel")]
1335 let timer = crate::instrumentation::OperationTimer::start(
1336 crate::instrumentation::extract_operation(sql),
1337 );
1338
1339 let canceller = self.cancel_handle();
1340 let result = run_with_deadline(
1341 async {
1342 if params.is_empty() {
1343 // Simple query without parameters - use SQL batch
1344 self.send_sql_batch(sql).await?;
1345 } else {
1346 // Parameterized query - use sp_executesql via RPC
1347 let rpc_params =
1348 Self::convert_params(params, self.send_unicode(), self.server_collation())?;
1349 let rpc = RpcRequest::execute_sql(sql, rpc_params);
1350 self.send_rpc(&rpc).await?;
1351 }
1352
1353 // Read complete response including columns and rows
1354 self.read_query_response().await
1355 },
1356 deadline,
1357 canceller,
1358 )
1359 .await;
1360
1361 #[cfg(feature = "otel")]
1362 match &result {
1363 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1364 Err(e) => InstrumentationContext::record_error(&mut span, e),
1365 }
1366 #[cfg(feature = "otel")]
1367 timer.finish(instrumentation.metrics(), result.is_ok());
1368
1369 // Drop the span before returning
1370 #[cfg(feature = "otel")]
1371 drop(span);
1372
1373 let resp = result?;
1374 #[cfg(feature = "always-encrypted")]
1375 {
1376 Ok(QueryStream::from_raw(
1377 resp.columns,
1378 resp.pending_rows,
1379 resp.meta,
1380 resp.decryptor,
1381 ))
1382 }
1383 #[cfg(not(feature = "always-encrypted"))]
1384 {
1385 Ok(QueryStream::from_raw(
1386 resp.columns,
1387 resp.pending_rows,
1388 resp.meta,
1389 ))
1390 }
1391 }
1392
1393 /// Execute a statement within the transaction.
1394 ///
1395 /// Returns the number of affected rows.
1396 pub async fn execute(
1397 &mut self,
1398 sql: &str,
1399 params: &[&(dyn crate::ToSql + Sync)],
1400 ) -> Result<u64> {
1401 let deadline = self.command_deadline();
1402 self.execute_inner(sql, params, deadline).await
1403 }
1404
1405 /// Shared execute implementation with an explicit command deadline.
1406 async fn execute_inner(
1407 &mut self,
1408 sql: &str,
1409 params: &[&(dyn crate::ToSql + Sync)],
1410 deadline: Option<std::time::Duration>,
1411 ) -> Result<u64> {
1412 tracing::debug!(
1413 sql = sql,
1414 params_count = params.len(),
1415 "executing statement in transaction"
1416 );
1417
1418 #[cfg(feature = "otel")]
1419 let instrumentation = self.instrumentation.clone();
1420 #[cfg(feature = "otel")]
1421 let mut span = instrumentation.query_span(sql);
1422 #[cfg(feature = "otel")]
1423 let timer = crate::instrumentation::OperationTimer::start(
1424 crate::instrumentation::extract_operation(sql),
1425 );
1426
1427 let canceller = self.cancel_handle();
1428 let result = run_with_deadline(
1429 async {
1430 if params.is_empty() {
1431 // Simple statement without parameters - use SQL batch
1432 self.send_sql_batch(sql).await?;
1433 } else {
1434 // Parameterized statement - use sp_executesql via RPC
1435 let rpc_params =
1436 Self::convert_params(params, self.send_unicode(), self.server_collation())?;
1437 let rpc = RpcRequest::execute_sql(sql, rpc_params);
1438 self.send_rpc(&rpc).await?;
1439 }
1440
1441 // Read response and get row count
1442 self.read_execute_result().await
1443 },
1444 deadline,
1445 canceller,
1446 )
1447 .await;
1448
1449 #[cfg(feature = "otel")]
1450 match &result {
1451 Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
1452 Err(e) => InstrumentationContext::record_error(&mut span, e),
1453 }
1454 #[cfg(feature = "otel")]
1455 timer.finish(instrumentation.metrics(), result.is_ok());
1456
1457 // Drop the span before returning
1458 #[cfg(feature = "otel")]
1459 drop(span);
1460
1461 result
1462 }
1463
1464 /// Execute a query within the transaction with a specific timeout.
1465 ///
1466 /// See [`Client<Ready>::query_with_timeout`] for details.
1467 pub async fn query_with_timeout<'a>(
1468 &'a mut self,
1469 sql: &str,
1470 params: &[&(dyn crate::ToSql + Sync)],
1471 timeout_duration: std::time::Duration,
1472 ) -> Result<QueryStream<'a>> {
1473 self.query_inner(sql, params, Some(timeout_duration)).await
1474 }
1475
1476 /// Execute a statement within the transaction with a specific timeout.
1477 ///
1478 /// See [`Client<Ready>::execute_with_timeout`] for details.
1479 pub async fn execute_with_timeout(
1480 &mut self,
1481 sql: &str,
1482 params: &[&(dyn crate::ToSql + Sync)],
1483 timeout_duration: std::time::Duration,
1484 ) -> Result<u64> {
1485 self.execute_inner(sql, params, Some(timeout_duration))
1486 .await
1487 }
1488
1489 /// Open a FILESTREAM BLOB for async reading and/or writing.
1490 ///
1491 /// This method queries the server for the transaction context, then opens
1492 /// the FILESTREAM handle using the native Win32 `OpenSqlFilestream` API.
1493 ///
1494 /// # Arguments
1495 ///
1496 /// * `path` — The UNC path obtained from the T-SQL `column.PathName()` function.
1497 /// Query this yourself before calling `open_filestream`:
1498 /// ```sql
1499 /// SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1
1500 /// ```
1501 /// * `access` — Read, write, or read/write access mode.
1502 ///
1503 /// # Requirements
1504 ///
1505 /// - SQL Server must have FILESTREAM enabled (`sp_configure 'filestream access level', 2`)
1506 /// - The Microsoft OLE DB Driver for SQL Server must be installed on the client
1507 /// - The `FileStream` must be dropped before calling [`commit`] or [`rollback`]
1508 ///
1509 /// # Example
1510 ///
1511 /// ```text
1512 /// use mssql_client::FileStreamAccess;
1513 /// use tokio::io::AsyncReadExt;
1514 ///
1515 /// let mut tx = client.begin_transaction().await?;
1516 ///
1517 /// // Get the FILESTREAM path
1518 /// let rows = tx.query(
1519 /// "SELECT Content.PathName() FROM dbo.Documents WHERE Id = @p1",
1520 /// &[&doc_id],
1521 /// ).await?;
1522 /// let path: String = rows.into_iter().next().unwrap()?.get(0)?;
1523 ///
1524 /// // Open and read the BLOB
1525 /// let mut stream = tx.open_filestream(&path, FileStreamAccess::Read).await?;
1526 /// let mut data = Vec::new();
1527 /// stream.read_to_end(&mut data).await?;
1528 /// drop(stream);
1529 ///
1530 /// tx.commit().await?;
1531 /// ```
1532 #[cfg(all(windows, feature = "filestream"))]
1533 pub async fn open_filestream(
1534 &mut self,
1535 path: &str,
1536 access: crate::filestream::FileStreamAccess,
1537 ) -> Result<crate::filestream::FileStream> {
1538 tracing::debug!(path = path, ?access, "opening FILESTREAM BLOB");
1539
1540 // Get the transaction context from SQL Server.
1541 // This binds the file access to the current SQL transaction.
1542 let txn_context: Vec<u8> = {
1543 let rows = self
1544 .query("SELECT GET_FILESTREAM_TRANSACTION_CONTEXT()", &[])
1545 .await?;
1546 let mut ctx = None;
1547 for result in rows {
1548 let row = result?;
1549 ctx = Some(row.get::<Vec<u8>>(0)?);
1550 }
1551 ctx.ok_or_else(|| {
1552 Error::FileStream("GET_FILESTREAM_TRANSACTION_CONTEXT() returned no rows".into())
1553 })?
1554 };
1555
1556 crate::filestream::FileStream::open(path, access, &txn_context)
1557 }
1558
1559 /// Commit the transaction.
1560 ///
1561 /// This transitions the client back to `Ready` state.
1562 pub async fn commit(mut self) -> Result<Client<Ready>> {
1563 tracing::debug!("committing transaction");
1564
1565 #[cfg(feature = "otel")]
1566 let instrumentation = self.instrumentation.clone();
1567 #[cfg(feature = "otel")]
1568 let mut span = instrumentation.transaction_span("COMMIT");
1569
1570 // Execute COMMIT TRANSACTION
1571 let result = async {
1572 self.send_sql_batch("COMMIT TRANSACTION").await?;
1573 self.read_execute_result().await
1574 }
1575 .await;
1576
1577 #[cfg(feature = "otel")]
1578 match &result {
1579 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1580 Err(e) => InstrumentationContext::record_error(&mut span, e),
1581 }
1582
1583 // Drop the span before moving instrumentation
1584 #[cfg(feature = "otel")]
1585 drop(span);
1586
1587 result?;
1588
1589 Ok(Client {
1590 config: self.config,
1591 _state: PhantomData,
1592 connection: self.connection,
1593 server_version: self.server_version,
1594 current_database: self.current_database,
1595 server_collation: self.server_collation,
1596 statement_cache: self.statement_cache,
1597 transaction_descriptor: 0, // Reset to auto-commit mode
1598 needs_reset: self.needs_reset,
1599 in_flight: self.in_flight,
1600 #[cfg(feature = "otel")]
1601 instrumentation: self.instrumentation,
1602 #[cfg(feature = "always-encrypted")]
1603 encryption_context: self.encryption_context,
1604 })
1605 }
1606
1607 /// Rollback the transaction.
1608 ///
1609 /// This transitions the client back to `Ready` state.
1610 pub async fn rollback(mut self) -> Result<Client<Ready>> {
1611 tracing::debug!("rolling back transaction");
1612
1613 #[cfg(feature = "otel")]
1614 let instrumentation = self.instrumentation.clone();
1615 #[cfg(feature = "otel")]
1616 let mut span = instrumentation.transaction_span("ROLLBACK");
1617
1618 // Execute ROLLBACK TRANSACTION
1619 let result = async {
1620 self.send_sql_batch("ROLLBACK TRANSACTION").await?;
1621 self.read_execute_result().await
1622 }
1623 .await;
1624
1625 #[cfg(feature = "otel")]
1626 match &result {
1627 Ok(_) => InstrumentationContext::record_success(&mut span, None),
1628 Err(e) => InstrumentationContext::record_error(&mut span, e),
1629 }
1630
1631 // Drop the span before moving instrumentation
1632 #[cfg(feature = "otel")]
1633 drop(span);
1634
1635 result?;
1636
1637 Ok(Client {
1638 config: self.config,
1639 _state: PhantomData,
1640 connection: self.connection,
1641 server_version: self.server_version,
1642 current_database: self.current_database,
1643 server_collation: self.server_collation,
1644 statement_cache: self.statement_cache,
1645 transaction_descriptor: 0, // Reset to auto-commit mode
1646 needs_reset: self.needs_reset,
1647 in_flight: self.in_flight,
1648 #[cfg(feature = "otel")]
1649 instrumentation: self.instrumentation,
1650 #[cfg(feature = "always-encrypted")]
1651 encryption_context: self.encryption_context,
1652 })
1653 }
1654
1655 /// Create a savepoint and return a handle for later rollback.
1656 ///
1657 /// The returned `SavePoint` handle contains the validated savepoint name.
1658 /// Use it with `rollback_to()` to partially undo transaction work.
1659 ///
1660 /// # Example
1661 ///
1662 /// ```rust,no_run
1663 /// # async fn ex(client: mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
1664 /// let mut tx = client.begin_transaction().await?;
1665 /// tx.execute("INSERT INTO orders ...", &[]).await?;
1666 /// let sp = tx.save_point("before_items").await?;
1667 /// tx.execute("INSERT INTO items ...", &[]).await?;
1668 /// // Oops, rollback just the items
1669 /// tx.rollback_to(&sp).await?;
1670 /// tx.commit().await?;
1671 /// # Ok(())
1672 /// # }
1673 /// ```
1674 pub async fn save_point(&mut self, name: &str) -> Result<SavePoint> {
1675 crate::validation::validate_identifier(name)?;
1676 tracing::debug!(name = name, "creating savepoint");
1677
1678 // Execute SAVE TRANSACTION <name>
1679 // Note: name is validated by validate_identifier() to prevent SQL injection
1680 let sql = format!("SAVE TRANSACTION {name}");
1681 self.send_sql_batch(&sql).await?;
1682 self.read_execute_result().await?;
1683
1684 Ok(SavePoint::new(name.to_string()))
1685 }
1686
1687 /// Rollback to a savepoint.
1688 ///
1689 /// This rolls back all changes made after the savepoint was created,
1690 /// but keeps the transaction active. The savepoint remains valid and
1691 /// can be rolled back to again.
1692 ///
1693 /// # Example
1694 ///
1695 /// ```rust,no_run
1696 /// # async fn ex(mut tx: mssql_client::Client<mssql_client::InTransaction>) -> Result<(), mssql_client::Error> {
1697 /// let sp = tx.save_point("checkpoint").await?;
1698 /// // ... do some work ...
1699 /// tx.rollback_to(&sp).await?; // Undo changes since checkpoint
1700 /// // Transaction is still active, savepoint is still valid
1701 /// # Ok(())
1702 /// # }
1703 /// ```
1704 pub async fn rollback_to(&mut self, savepoint: &SavePoint) -> Result<()> {
1705 tracing::debug!(name = savepoint.name(), "rolling back to savepoint");
1706
1707 // Execute ROLLBACK TRANSACTION <name>
1708 // Note: savepoint name was validated during creation
1709 let sql = format!("ROLLBACK TRANSACTION {}", savepoint.name());
1710 self.send_sql_batch(&sql).await?;
1711 self.read_execute_result().await?;
1712
1713 Ok(())
1714 }
1715
1716 /// Release a savepoint (optional cleanup).
1717 ///
1718 /// Note: SQL Server doesn't have explicit savepoint release, but this
1719 /// method is provided for API completeness. The savepoint is automatically
1720 /// released when the transaction commits or rolls back.
1721 pub async fn release_savepoint(&mut self, savepoint: SavePoint) -> Result<()> {
1722 tracing::debug!(name = savepoint.name(), "releasing savepoint");
1723
1724 // SQL Server doesn't require explicit savepoint release
1725 // The savepoint is implicitly released on commit/rollback
1726 // This method exists for API completeness
1727 drop(savepoint);
1728 Ok(())
1729 }
1730
1731 /// Get a handle for cancelling the current query within the transaction.
1732 ///
1733 /// See [`Client<Ready>::cancel_handle`] for usage examples.
1734 #[must_use]
1735 pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
1736 let connection = self
1737 .connection
1738 .as_ref()
1739 .expect("connection should be present");
1740 match connection {
1741 #[cfg(feature = "tls")]
1742 ConnectionHandle::Tls(conn) => {
1743 crate::cancel::CancelHandle::from_tls(conn.cancel_handle())
1744 }
1745 #[cfg(feature = "tls")]
1746 ConnectionHandle::TlsPrelogin(conn) => {
1747 crate::cancel::CancelHandle::from_tls_prelogin(conn.cancel_handle())
1748 }
1749 ConnectionHandle::Plain(conn) => {
1750 crate::cancel::CancelHandle::from_plain(conn.cancel_handle())
1751 }
1752 }
1753 }
1754}
1755
1756impl<S: ConnectionState> std::fmt::Debug for Client<S> {
1757 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1758 f.debug_struct("Client")
1759 .field("host", &self.config.host)
1760 .field("port", &self.config.port)
1761 .field("database", &self.config.database)
1762 .finish()
1763 }
1764}