mssql_client/blob_stream.rs
1//! Streaming a row's trailing MAX column directly from the socket.
2//!
3//! [`Client::query_stream_blob`](crate::Client::query_stream_blob) targets the
4//! one OOM case the row streaming of [`RowStream`](crate::RowStream) does not
5//! cover: a single MAX cell (`VARBINARY(MAX)`, `NVARCHAR(MAX)`, `VARCHAR(MAX)`,
6//! `XML`) larger than memory. Row streaming bounds peak to ~one row, but a row
7//! holding a 4 GB BLOB is still ~4 GB because the whole PLP value is decoded
8//! into the row.
9//!
10//! [`BlobStream`] decodes each row's **leading scalar columns** into a [`Row`]
11//! (they are small), then exposes the **trailing MAX column(s)** as chunk
12//! streams pulled from the connection on demand via the `PlpDecoder`. Peak
13//! memory is one packet plus one PLP chunk.
14//!
15//! Because TDS sends columns inline and sequentially, the MAX column(s) must be
16//! **trailing** — every column must precede them (a scalar column after a BLOB
17//! could not be decoded until that BLOB was consumed).
18//! [`Client::query_stream_blob`](crate::Client::query_stream_blob) targets the
19//! single-trailing-MAX case; [`Client::query_stream_rows`](crate::Client::query_stream_rows)
20//! generalizes it to a run of trailing MAX columns, iterated with
21//! [`BlobStream::next_blob`]. The blob(s) of one row must be consumed before the
22//! next row; calling [`BlobStream::next`] auto-drains any unconsumed blob so the
23//! wire stays aligned.
24//!
25//! ```no_run
26//! # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
27//! # let mut sink: Vec<u8> = Vec::new();
28//! let mut stream = client
29//! .query_stream_blob("SELECT id, document FROM files", &[])
30//! .await?;
31//! while let Some(row) = stream.next().await? {
32//! let id: i32 = row.get_by_name("id")?;
33//! let _ = id;
34//! stream.copy_blob_to(&mut sink).await?; // streamed, never fully buffered
35//! }
36//! # Ok(())
37//! # }
38//! ```
39
40use bytes::{Buf, Bytes, BytesMut};
41use tds_protocol::ProtocolError;
42use tds_protocol::token::{ColMetaData, ColumnData, NbcRow, RawRow, Token, TokenParser};
43use tds_protocol::types::TypeId;
44
45use crate::Client;
46use crate::client::response::server_token_to_error;
47use crate::error::{Error, Result};
48use crate::plp::{PlpDecoder, PlpEvent};
49use crate::row::{Column, Row};
50use crate::state::{ConnectionState, Ready};
51
52/// Whether a column is a PLP-encoded MAX type that this path can sub-stream.
53pub(crate) fn is_plp_max(col: &ColumnData) -> bool {
54 match col.type_id {
55 TypeId::BigVarChar | TypeId::BigVarBinary | TypeId::NVarChar => {
56 col.type_info.max_length == Some(0xFFFF)
57 }
58 TypeId::Xml => true,
59 _ => false,
60 }
61}
62
63/// A stream of rows whose trailing MAX column is read incrementally from the
64/// socket. See the [module docs](self). Obtain one from
65/// [`Client::query_stream_blob`](crate::Client::query_stream_blob).
66#[must_use = "streams must be consumed; dropping a stream discards remaining rows"]
67pub struct BlobStream<'a, S: ConnectionState = Ready> {
68 client: &'a mut Client<S>,
69 /// Unconsumed wire bytes (post-metadata), shared by the column decode and
70 /// the PLP chunk streaming.
71 buf: Bytes,
72 /// END_OF_MESSAGE seen — no more packets will arrive.
73 eom: bool,
74 encryption_enabled: bool,
75 /// Full result-set metadata (all columns, including the trailing MAX ones).
76 meta: ColMetaData,
77 /// Metadata for just the leading scalar columns (for row decoding).
78 prefix_meta: ColMetaData,
79 /// `Column`s for the leading scalar columns.
80 scalar_row_meta: std::sync::Arc<crate::row::ColMetaData>,
81 /// `Column`s for the trailing MAX columns, in wire order.
82 blob_row_meta: std::sync::Arc<crate::row::ColMetaData>,
83 /// Index of the first trailing MAX column; the scalar prefix is `0..first_blob`.
84 first_blob: usize,
85 /// Number of trailing MAX columns (1 for `query_stream_blob`).
86 blob_count: usize,
87 /// Index (within the trailing run, `0..blob_count`) of the next blob to
88 /// position; reset to 0 at the start of each row.
89 next_blob_idx: usize,
90 /// The blob currently positioned for reading (within the trailing run), or
91 /// `None` before the first is positioned / after the row is exhausted.
92 current_blob_idx: Option<usize>,
93 /// When the current row arrived as an NBCROW, its null bitmap (so each
94 /// trailing blob's nullness can be looked up); `None` for a plain ROW.
95 current_nbc: Option<NbcRow>,
96 /// Whether `next` should auto-position the first trailing blob. True for the
97 /// single-blob `query_stream_blob` path (preserves its API: the blob is
98 /// ready to read after `next`); false for the multi-blob `query_stream_rows`
99 /// path (the caller drives blobs explicitly via [`next_blob`](Self::next_blob)).
100 auto_position_first: bool,
101 /// PLP decoder for the current blob; `Some` while a non-NULL blob is being read.
102 plp: Option<PlpDecoder>,
103 /// The current blob is NULL (an NBCROW omitted its value).
104 blob_null: bool,
105 finished: bool,
106}
107
108impl<'a, S: ConnectionState> BlobStream<'a, S> {
109 #[allow(clippy::too_many_arguments)]
110 pub(crate) fn new(
111 client: &'a mut Client<S>,
112 buf: Bytes,
113 eom: bool,
114 encryption_enabled: bool,
115 meta: ColMetaData,
116 first_blob: usize,
117 blob_count: usize,
118 auto_position_first: bool,
119 ) -> Self {
120 let prefix_meta = ColMetaData {
121 columns: meta.columns.iter().take(first_blob).cloned().collect(),
122 cek_table: meta.cek_table.clone(),
123 };
124 let blob_meta = ColMetaData {
125 columns: meta
126 .columns
127 .iter()
128 .skip(first_blob)
129 .take(blob_count)
130 .cloned()
131 .collect(),
132 cek_table: meta.cek_table.clone(),
133 };
134 let scalar_columns = Client::<S>::build_columns(&prefix_meta);
135 let blob_columns = Client::<S>::build_columns(&blob_meta);
136 Self {
137 client,
138 buf,
139 eom,
140 encryption_enabled,
141 meta,
142 prefix_meta,
143 scalar_row_meta: std::sync::Arc::new(crate::row::ColMetaData::new(scalar_columns)),
144 blob_row_meta: std::sync::Arc::new(crate::row::ColMetaData::new(blob_columns)),
145 first_blob,
146 blob_count,
147 // Start as if the previous row were fully consumed, so the first
148 // `next` does not try to drain a nonexistent prior row.
149 next_blob_idx: blob_count,
150 current_blob_idx: None,
151 current_nbc: None,
152 auto_position_first,
153 plp: None,
154 blob_null: false,
155 finished: true, // set false once construction succeeds below
156 }
157 .started()
158 }
159
160 fn started(mut self) -> Self {
161 self.finished = false;
162 self
163 }
164
165 /// The leading (scalar) columns of the result set — everything before the
166 /// trailing MAX column(s).
167 #[must_use]
168 pub fn columns(&self) -> &[Column] {
169 &self.scalar_row_meta.columns
170 }
171
172 /// The trailing MAX (blob) columns of the result set, in wire order.
173 ///
174 /// For a [`query_stream_blob`](crate::Client::query_stream_blob) stream this
175 /// is a single column; for
176 /// [`query_stream_rows`](crate::Client::query_stream_rows) it is the run of
177 /// trailing MAX columns, in the order [`next_blob`](Self::next_blob) visits
178 /// them.
179 #[must_use]
180 pub fn blob_columns(&self) -> &[Column] {
181 &self.blob_row_meta.columns
182 }
183
184 /// The column metadata of the currently positioned blob, or `None` before
185 /// the first blob of a row is positioned (or after the row is exhausted).
186 #[must_use]
187 pub fn current_blob_column(&self) -> Option<&Column> {
188 self.current_blob_idx
189 .and_then(|j| self.blob_row_meta.columns.get(j))
190 }
191
192 /// Advance to the next row, returning its scalar columns.
193 ///
194 /// Auto-drains any unread trailing blob(s) of the previous row, so the wire
195 /// stays aligned. Returns `Ok(None)` at end of stream (connection clean).
196 pub async fn next(&mut self) -> Result<Option<Row>> {
197 if self.finished {
198 return Ok(None);
199 }
200 self.drain_to_row_end().await?;
201
202 loop {
203 if self.buf.is_empty() {
204 if !self.pull_packet().await? {
205 self.finish();
206 return Ok(None);
207 }
208 continue;
209 }
210 match self.buf[0] {
211 0xD1 => return Ok(Some(self.decode_row().await?)),
212 0xD2 => return Ok(Some(self.decode_nbc_row().await?)),
213 _ => match self.parse_control_token().await? {
214 Control::Finished => {
215 self.finish();
216 return Ok(None);
217 }
218 Control::Continue => continue,
219 },
220 }
221 }
222 }
223
224 /// Advance to the next trailing MAX column of the current row, returning
225 /// `true` while one is positioned and `false` once the row's blobs are
226 /// exhausted.
227 ///
228 /// Auto-drains the previously positioned blob (if not fully read) before
229 /// advancing, so the wire stays aligned. After this returns `true`, read the
230 /// blob with [`read_chunk`](Self::read_chunk) / [`copy_blob_to`](Self::copy_blob_to)
231 /// and inspect it with [`current_blob_column`](Self::current_blob_column) /
232 /// [`blob_is_null`](Self::blob_is_null) / [`blob_len`](Self::blob_len).
233 ///
234 /// This is the iteration primitive for
235 /// [`query_stream_rows`](crate::Client::query_stream_rows):
236 ///
237 /// ```no_run
238 /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
239 /// # let mut sink: Vec<u8> = Vec::new();
240 /// let mut stream = client
241 /// .query_stream_rows("SELECT id, doc1, doc2 FROM files", &[])
242 /// .await?;
243 /// while let Some(row) = stream.next().await? {
244 /// let id: i32 = row.get_by_name("id")?;
245 /// let _ = id;
246 /// while stream.next_blob().await? {
247 /// stream.copy_blob_to(&mut sink).await?; // each trailing MAX column
248 /// }
249 /// }
250 /// # Ok(())
251 /// # }
252 /// ```
253 pub async fn next_blob(&mut self) -> Result<bool> {
254 if self.finished {
255 return Ok(false);
256 }
257 self.drain_current_blob().await?;
258 if self.next_blob_idx >= self.blob_count {
259 self.current_blob_idx = None;
260 return Ok(false);
261 }
262 self.position_next_blob();
263 Ok(true)
264 }
265
266 /// Read the next chunk of the current blob, or `None` when it is fully
267 /// read (or NULL). Reads more packets from the socket as needed.
268 ///
269 /// Chunks are **raw bytes**, not decoded text. For an `NVARCHAR(MAX)` /
270 /// `XML` column the bytes are little-endian UCS-2, and a chunk boundary can
271 /// fall in the middle of a two-byte code unit (or a surrogate pair) — so do
272 /// not decode each chunk to `str` independently. Concatenate the chunks
273 /// first (or stream to a byte sink), then decode the whole value.
274 pub async fn read_chunk(&mut self) -> Result<Option<Bytes>> {
275 loop {
276 let event = match self.plp.as_mut() {
277 Some(plp) if !plp.is_done() => plp.pull(&mut self.buf)?,
278 _ => return Ok(None),
279 };
280 match event {
281 PlpEvent::Data(d) => return Ok(Some(d)),
282 PlpEvent::End => return Ok(None),
283 PlpEvent::NeedMore => {
284 if !self.pull_packet().await? {
285 return Err(Error::ConnectionClosed);
286 }
287 }
288 }
289 }
290 }
291
292 /// Stream the current row's blob to an async writer, returning bytes written.
293 pub async fn copy_blob_to<W>(&mut self, w: &mut W) -> Result<u64>
294 where
295 W: tokio::io::AsyncWrite + Unpin,
296 {
297 use tokio::io::AsyncWriteExt;
298 let mut total = 0u64;
299 while let Some(chunk) = self.read_chunk().await? {
300 w.write_all(&chunk).await.map_err(Error::from)?;
301 total += chunk.len() as u64;
302 }
303 Ok(total)
304 }
305
306 /// The current row's blob length in bytes, once known (after the first
307 /// chunk is read). `None` before that, for a NULL blob, or for an
308 /// unknown-length value.
309 #[must_use]
310 pub fn blob_len(&self) -> Option<u64> {
311 if self.blob_null {
312 return None;
313 }
314 self.plp.as_ref().and_then(PlpDecoder::total_len)
315 }
316
317 /// Whether the current row's blob is NULL.
318 #[must_use]
319 pub fn blob_is_null(&self) -> bool {
320 self.blob_null
321 }
322
323 fn finish(&mut self) {
324 self.finished = true;
325 self.client.note_response_drained();
326 }
327
328 async fn decode_row(&mut self) -> Result<Row> {
329 loop {
330 let mut view: &[u8] = &self.buf[..];
331 let before = view.len();
332 view.advance(1); // ROW token byte
333 match RawRow::decode_prefix(&mut view, &self.meta, self.first_blob) {
334 Ok(raw) => {
335 let consumed = before - view.len();
336 self.buf.advance(consumed);
337 let row = crate::column_parser::convert_raw_row(
338 &raw,
339 &self.prefix_meta,
340 &self.scalar_row_meta,
341 )?;
342 self.begin_row(None);
343 return Ok(row);
344 }
345 Err(ProtocolError::UnexpectedEof) if !self.eom => {
346 self.pull_packet().await?;
347 }
348 Err(e) => return Err(e.into()),
349 }
350 }
351 }
352
353 async fn decode_nbc_row(&mut self) -> Result<Row> {
354 loop {
355 let mut view: &[u8] = &self.buf[..];
356 let before = view.len();
357 view.advance(1); // NBCROW token byte
358 match NbcRow::decode_prefix(&mut view, &self.meta, self.first_blob) {
359 Ok(nbc) => {
360 let consumed = before - view.len();
361 self.buf.advance(consumed);
362 let row = crate::column_parser::convert_nbc_row(
363 &nbc,
364 &self.prefix_meta,
365 &self.scalar_row_meta,
366 )?;
367 self.begin_row(Some(nbc));
368 return Ok(row);
369 }
370 Err(ProtocolError::UnexpectedEof) if !self.eom => {
371 self.pull_packet().await?;
372 }
373 Err(e) => return Err(e.into()),
374 }
375 }
376 }
377
378 /// Parse a single non-row token (Done / Error / Info / …) and decide whether
379 /// the stream continues or has finished.
380 async fn parse_control_token(&mut self) -> Result<Control> {
381 loop {
382 let mut parser =
383 TokenParser::new(self.buf.clone()).with_encryption(self.encryption_enabled);
384 match parser.next_token_with_metadata(Some(&self.meta)) {
385 Ok(Some(token)) => {
386 let consumed = self.buf.len() - parser.remaining();
387 self.buf.advance(consumed);
388 return self.classify(token);
389 }
390 Ok(None) => {
391 if self.eom {
392 return Ok(Control::Finished);
393 }
394 self.pull_packet().await?;
395 }
396 Err(ProtocolError::UnexpectedEof | ProtocolError::IncompletePacket { .. })
397 if !self.eom =>
398 {
399 self.pull_packet().await?;
400 }
401 Err(e) => return Err(e.into()),
402 }
403 }
404 }
405
406 fn classify(&mut self, token: Token) -> Result<Control> {
407 match token {
408 Token::Done(d) => {
409 if d.status.error {
410 return Err(Error::Query(
411 "query failed (server set error flag in DONE token)".to_string(),
412 ));
413 }
414 Ok(if d.status.more {
415 Control::Continue
416 } else {
417 Control::Finished
418 })
419 }
420 Token::Error(e) => Err(server_token_to_error(&e)),
421 Token::ColMetaData(_) => Err(Error::Protocol(
422 "blob streaming does not support multiple result sets".to_string(),
423 )),
424 Token::EnvChange(ref e) => {
425 // Keep the transaction descriptor in sync with raw
426 // BEGIN/COMMIT/ROLLBACK seen mid-stream, as the buffered
427 // readers do.
428 self.client.apply_transaction_env_change(e);
429 Ok(Control::Continue)
430 }
431 // DoneProc / DoneInProc / Info / Order / etc.
432 _ => Ok(Control::Continue),
433 }
434 }
435
436 /// Reset per-row blob state after decoding a row's scalar prefix, optionally
437 /// auto-positioning the first trailing blob (for the single-blob path).
438 fn begin_row(&mut self, nbc: Option<NbcRow>) {
439 self.current_nbc = nbc;
440 self.next_blob_idx = 0;
441 self.current_blob_idx = None;
442 self.plp = None;
443 self.blob_null = false;
444 if self.auto_position_first {
445 self.position_next_blob();
446 }
447 }
448
449 /// Position the next trailing blob (sync; no socket IO). Sets up its PLP
450 /// decoder, or marks it NULL when an NBCROW omitted its value.
451 fn position_next_blob(&mut self) {
452 let j = self.next_blob_idx;
453 self.next_blob_idx += 1;
454 self.current_blob_idx = Some(j);
455 let col_idx = self.first_blob + j;
456 let is_null = self
457 .current_nbc
458 .as_ref()
459 .is_some_and(|n| n.is_null(col_idx));
460 self.blob_null = is_null;
461 self.plp = if is_null {
462 None
463 } else {
464 Some(PlpDecoder::new())
465 };
466 }
467
468 /// Drain the currently positioned blob off the wire (if not fully read).
469 async fn drain_current_blob(&mut self) -> Result<()> {
470 if self.plp.is_some() && !self.blob_null {
471 while self.read_chunk().await?.is_some() {}
472 }
473 self.plp = None;
474 Ok(())
475 }
476
477 /// Consume every remaining trailing blob of the current row off the wire, so
478 /// the next ROW/NBCROW token is reachable. Drains the positioned blob, then
479 /// positions and drains each not-yet-visited trailing blob in turn.
480 async fn drain_to_row_end(&mut self) -> Result<()> {
481 loop {
482 self.drain_current_blob().await?;
483 if self.next_blob_idx >= self.blob_count {
484 self.current_blob_idx = None;
485 return Ok(());
486 }
487 self.position_next_blob();
488 }
489 }
490
491 /// Pull one packet onto the rolling buffer. Returns `false` at EOF.
492 async fn pull_packet(&mut self) -> Result<bool> {
493 match self.client.read_response_packet().await? {
494 Some((payload, is_eom)) => {
495 if self.buf.is_empty() {
496 self.buf = payload;
497 } else {
498 let mut joined = BytesMut::with_capacity(self.buf.len() + payload.len());
499 joined.extend_from_slice(&self.buf);
500 joined.extend_from_slice(&payload);
501 self.buf = joined.freeze();
502 }
503 self.eom |= is_eom;
504 Ok(true)
505 }
506 None => {
507 self.eom = true;
508 Ok(false)
509 }
510 }
511 }
512}
513
514/// Outcome of parsing a non-row control token.
515enum Control {
516 Continue,
517 Finished,
518}