Skip to main content

mssql_client/
blob_stream.rs

1//! Streaming a row's trailing MAX column directly from the socket.
2//!
3//! [`Client::query_stream_blob`](crate::Client::query_stream_blob) targets the
4//! one OOM case the row streaming of [`RowStream`](crate::RowStream) does not
5//! cover: a single MAX cell (`VARBINARY(MAX)`, `NVARCHAR(MAX)`, `VARCHAR(MAX)`,
6//! `XML`) larger than memory. Row streaming bounds peak to ~one row, but a row
7//! holding a 4 GB BLOB is still ~4 GB because the whole PLP value is decoded
8//! into the row.
9//!
10//! [`BlobStream`] decodes each row's **leading scalar columns** into a [`Row`]
11//! (they are small), then exposes the **trailing MAX column(s)** as chunk
12//! streams pulled from the connection on demand via the `PlpDecoder`. Peak
13//! memory is one packet plus one PLP chunk.
14//!
15//! Because TDS sends columns inline and sequentially, the MAX column(s) must be
16//! **trailing** — every column must precede them (a scalar column after a BLOB
17//! could not be decoded until that BLOB was consumed).
18//! [`Client::query_stream_blob`](crate::Client::query_stream_blob) targets the
19//! single-trailing-MAX case; [`Client::query_stream_rows`](crate::Client::query_stream_rows)
20//! generalizes it to a run of trailing MAX columns, iterated with
21//! [`BlobStream::next_blob`]. The blob(s) of one row must be consumed before the
22//! next row; calling [`BlobStream::next`] auto-drains any unconsumed blob so the
23//! wire stays aligned.
24//!
25//! ```no_run
26//! # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
27//! # let mut sink: Vec<u8> = Vec::new();
28//! let mut stream = client
29//!     .query_stream_blob("SELECT id, document FROM files", &[])
30//!     .await?;
31//! while let Some(row) = stream.next().await? {
32//!     let id: i32 = row.get_by_name("id")?;
33//!     let _ = id;
34//!     stream.copy_blob_to(&mut sink).await?; // streamed, never fully buffered
35//! }
36//! # Ok(())
37//! # }
38//! ```
39
40use bytes::{Buf, Bytes, BytesMut};
41use tds_protocol::ProtocolError;
42use tds_protocol::token::{ColMetaData, ColumnData, NbcRow, RawRow, Token, TokenParser};
43use tds_protocol::types::TypeId;
44
45use crate::Client;
46use crate::client::response::server_token_to_error;
47use crate::error::{Error, Result};
48use crate::plp::{PlpDecoder, PlpEvent};
49use crate::row::{Column, Row};
50use crate::state::{ConnectionState, Ready};
51
52/// Whether a column is a PLP-encoded MAX type that this path can sub-stream.
53pub(crate) fn is_plp_max(col: &ColumnData) -> bool {
54    match col.type_id {
55        TypeId::BigVarChar | TypeId::BigVarBinary | TypeId::NVarChar => {
56            col.type_info.max_length == Some(0xFFFF)
57        }
58        TypeId::Xml => true,
59        _ => false,
60    }
61}
62
63/// A stream of rows whose trailing MAX column is read incrementally from the
64/// socket. See the [module docs](self). Obtain one from
65/// [`Client::query_stream_blob`](crate::Client::query_stream_blob).
66#[must_use = "streams must be consumed; dropping a stream discards remaining rows"]
67pub struct BlobStream<'a, S: ConnectionState = Ready> {
68    client: &'a mut Client<S>,
69    /// Unconsumed wire bytes (post-metadata), shared by the column decode and
70    /// the PLP chunk streaming.
71    buf: Bytes,
72    /// END_OF_MESSAGE seen — no more packets will arrive.
73    eom: bool,
74    encryption_enabled: bool,
75    /// Full result-set metadata (all columns, including the trailing MAX ones).
76    meta: ColMetaData,
77    /// Metadata for just the leading scalar columns (for row decoding).
78    prefix_meta: ColMetaData,
79    /// `Column`s for the leading scalar columns.
80    scalar_row_meta: std::sync::Arc<crate::row::ColMetaData>,
81    /// `Column`s for the trailing MAX columns, in wire order.
82    blob_row_meta: std::sync::Arc<crate::row::ColMetaData>,
83    /// Index of the first trailing MAX column; the scalar prefix is `0..first_blob`.
84    first_blob: usize,
85    /// Number of trailing MAX columns (1 for `query_stream_blob`).
86    blob_count: usize,
87    /// Index (within the trailing run, `0..blob_count`) of the next blob to
88    /// position; reset to 0 at the start of each row.
89    next_blob_idx: usize,
90    /// The blob currently positioned for reading (within the trailing run), or
91    /// `None` before the first is positioned / after the row is exhausted.
92    current_blob_idx: Option<usize>,
93    /// When the current row arrived as an NBCROW, its null bitmap (so each
94    /// trailing blob's nullness can be looked up); `None` for a plain ROW.
95    current_nbc: Option<NbcRow>,
96    /// Whether `next` should auto-position the first trailing blob. True for the
97    /// single-blob `query_stream_blob` path (preserves its API: the blob is
98    /// ready to read after `next`); false for the multi-blob `query_stream_rows`
99    /// path (the caller drives blobs explicitly via [`next_blob`](Self::next_blob)).
100    auto_position_first: bool,
101    /// PLP decoder for the current blob; `Some` while a non-NULL blob is being read.
102    plp: Option<PlpDecoder>,
103    /// The current blob is NULL (an NBCROW omitted its value).
104    blob_null: bool,
105    finished: bool,
106}
107
108impl<'a, S: ConnectionState> BlobStream<'a, S> {
109    #[allow(clippy::too_many_arguments)]
110    pub(crate) fn new(
111        client: &'a mut Client<S>,
112        buf: Bytes,
113        eom: bool,
114        encryption_enabled: bool,
115        meta: ColMetaData,
116        first_blob: usize,
117        blob_count: usize,
118        auto_position_first: bool,
119    ) -> Self {
120        let prefix_meta = ColMetaData {
121            columns: meta.columns.iter().take(first_blob).cloned().collect(),
122            cek_table: meta.cek_table.clone(),
123        };
124        let blob_meta = ColMetaData {
125            columns: meta
126                .columns
127                .iter()
128                .skip(first_blob)
129                .take(blob_count)
130                .cloned()
131                .collect(),
132            cek_table: meta.cek_table.clone(),
133        };
134        let scalar_columns = Client::<S>::build_columns(&prefix_meta);
135        let blob_columns = Client::<S>::build_columns(&blob_meta);
136        Self {
137            client,
138            buf,
139            eom,
140            encryption_enabled,
141            meta,
142            prefix_meta,
143            scalar_row_meta: std::sync::Arc::new(crate::row::ColMetaData::new(scalar_columns)),
144            blob_row_meta: std::sync::Arc::new(crate::row::ColMetaData::new(blob_columns)),
145            first_blob,
146            blob_count,
147            // Start as if the previous row were fully consumed, so the first
148            // `next` does not try to drain a nonexistent prior row.
149            next_blob_idx: blob_count,
150            current_blob_idx: None,
151            current_nbc: None,
152            auto_position_first,
153            plp: None,
154            blob_null: false,
155            finished: true, // set false once construction succeeds below
156        }
157        .started()
158    }
159
160    fn started(mut self) -> Self {
161        self.finished = false;
162        self
163    }
164
165    /// The leading (scalar) columns of the result set — everything before the
166    /// trailing MAX column(s).
167    #[must_use]
168    pub fn columns(&self) -> &[Column] {
169        &self.scalar_row_meta.columns
170    }
171
172    /// The trailing MAX (blob) columns of the result set, in wire order.
173    ///
174    /// For a [`query_stream_blob`](crate::Client::query_stream_blob) stream this
175    /// is a single column; for
176    /// [`query_stream_rows`](crate::Client::query_stream_rows) it is the run of
177    /// trailing MAX columns, in the order [`next_blob`](Self::next_blob) visits
178    /// them.
179    #[must_use]
180    pub fn blob_columns(&self) -> &[Column] {
181        &self.blob_row_meta.columns
182    }
183
184    /// The column metadata of the currently positioned blob, or `None` before
185    /// the first blob of a row is positioned (or after the row is exhausted).
186    #[must_use]
187    pub fn current_blob_column(&self) -> Option<&Column> {
188        self.current_blob_idx
189            .and_then(|j| self.blob_row_meta.columns.get(j))
190    }
191
192    /// Advance to the next row, returning its scalar columns.
193    ///
194    /// Auto-drains any unread trailing blob(s) of the previous row, so the wire
195    /// stays aligned. Returns `Ok(None)` at end of stream (connection clean).
196    pub async fn next(&mut self) -> Result<Option<Row>> {
197        if self.finished {
198            return Ok(None);
199        }
200        self.drain_to_row_end().await?;
201
202        loop {
203            if self.buf.is_empty() {
204                if !self.pull_packet().await? {
205                    self.finish();
206                    return Ok(None);
207                }
208                continue;
209            }
210            match self.buf[0] {
211                0xD1 => return Ok(Some(self.decode_row().await?)),
212                0xD2 => return Ok(Some(self.decode_nbc_row().await?)),
213                _ => match self.parse_control_token().await? {
214                    Control::Finished => {
215                        self.finish();
216                        return Ok(None);
217                    }
218                    Control::Continue => continue,
219                },
220            }
221        }
222    }
223
224    /// Advance to the next trailing MAX column of the current row, returning
225    /// `true` while one is positioned and `false` once the row's blobs are
226    /// exhausted.
227    ///
228    /// Auto-drains the previously positioned blob (if not fully read) before
229    /// advancing, so the wire stays aligned. After this returns `true`, read the
230    /// blob with [`read_chunk`](Self::read_chunk) / [`copy_blob_to`](Self::copy_blob_to)
231    /// and inspect it with [`current_blob_column`](Self::current_blob_column) /
232    /// [`blob_is_null`](Self::blob_is_null) / [`blob_len`](Self::blob_len).
233    ///
234    /// This is the iteration primitive for
235    /// [`query_stream_rows`](crate::Client::query_stream_rows):
236    ///
237    /// ```no_run
238    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
239    /// # let mut sink: Vec<u8> = Vec::new();
240    /// let mut stream = client
241    ///     .query_stream_rows("SELECT id, doc1, doc2 FROM files", &[])
242    ///     .await?;
243    /// while let Some(row) = stream.next().await? {
244    ///     let id: i32 = row.get_by_name("id")?;
245    ///     let _ = id;
246    ///     while stream.next_blob().await? {
247    ///         stream.copy_blob_to(&mut sink).await?; // each trailing MAX column
248    ///     }
249    /// }
250    /// # Ok(())
251    /// # }
252    /// ```
253    pub async fn next_blob(&mut self) -> Result<bool> {
254        if self.finished {
255            return Ok(false);
256        }
257        self.drain_current_blob().await?;
258        if self.next_blob_idx >= self.blob_count {
259            self.current_blob_idx = None;
260            return Ok(false);
261        }
262        self.position_next_blob();
263        Ok(true)
264    }
265
266    /// Read the next chunk of the current blob, or `None` when it is fully
267    /// read (or NULL). Reads more packets from the socket as needed.
268    ///
269    /// Chunks are **raw bytes**, not decoded text. For an `NVARCHAR(MAX)` /
270    /// `XML` column the bytes are little-endian UCS-2, and a chunk boundary can
271    /// fall in the middle of a two-byte code unit (or a surrogate pair) — so do
272    /// not decode each chunk to `str` independently. Concatenate the chunks
273    /// first (or stream to a byte sink), then decode the whole value.
274    pub async fn read_chunk(&mut self) -> Result<Option<Bytes>> {
275        loop {
276            let event = match self.plp.as_mut() {
277                Some(plp) if !plp.is_done() => plp.pull(&mut self.buf)?,
278                _ => return Ok(None),
279            };
280            match event {
281                PlpEvent::Data(d) => return Ok(Some(d)),
282                PlpEvent::End => return Ok(None),
283                PlpEvent::NeedMore => {
284                    if !self.pull_packet().await? {
285                        return Err(Error::ConnectionClosed);
286                    }
287                }
288            }
289        }
290    }
291
292    /// Stream the current row's blob to an async writer, returning bytes written.
293    pub async fn copy_blob_to<W>(&mut self, w: &mut W) -> Result<u64>
294    where
295        W: tokio::io::AsyncWrite + Unpin,
296    {
297        use tokio::io::AsyncWriteExt;
298        let mut total = 0u64;
299        while let Some(chunk) = self.read_chunk().await? {
300            w.write_all(&chunk).await.map_err(Error::from)?;
301            total += chunk.len() as u64;
302        }
303        Ok(total)
304    }
305
306    /// The current row's blob length in bytes, once known (after the first
307    /// chunk is read). `None` before that, for a NULL blob, or for an
308    /// unknown-length value.
309    #[must_use]
310    pub fn blob_len(&self) -> Option<u64> {
311        if self.blob_null {
312            return None;
313        }
314        self.plp.as_ref().and_then(PlpDecoder::total_len)
315    }
316
317    /// Whether the current row's blob is NULL.
318    #[must_use]
319    pub fn blob_is_null(&self) -> bool {
320        self.blob_null
321    }
322
323    fn finish(&mut self) {
324        self.finished = true;
325        self.client.note_response_drained();
326    }
327
328    async fn decode_row(&mut self) -> Result<Row> {
329        loop {
330            let mut view: &[u8] = &self.buf[..];
331            let before = view.len();
332            view.advance(1); // ROW token byte
333            match RawRow::decode_prefix(&mut view, &self.meta, self.first_blob) {
334                Ok(raw) => {
335                    let consumed = before - view.len();
336                    self.buf.advance(consumed);
337                    let row = crate::column_parser::convert_raw_row(
338                        &raw,
339                        &self.prefix_meta,
340                        &self.scalar_row_meta,
341                    )?;
342                    self.begin_row(None);
343                    return Ok(row);
344                }
345                Err(ProtocolError::UnexpectedEof) if !self.eom => {
346                    self.pull_packet().await?;
347                }
348                Err(e) => return Err(e.into()),
349            }
350        }
351    }
352
353    async fn decode_nbc_row(&mut self) -> Result<Row> {
354        loop {
355            let mut view: &[u8] = &self.buf[..];
356            let before = view.len();
357            view.advance(1); // NBCROW token byte
358            match NbcRow::decode_prefix(&mut view, &self.meta, self.first_blob) {
359                Ok(nbc) => {
360                    let consumed = before - view.len();
361                    self.buf.advance(consumed);
362                    let row = crate::column_parser::convert_nbc_row(
363                        &nbc,
364                        &self.prefix_meta,
365                        &self.scalar_row_meta,
366                    )?;
367                    self.begin_row(Some(nbc));
368                    return Ok(row);
369                }
370                Err(ProtocolError::UnexpectedEof) if !self.eom => {
371                    self.pull_packet().await?;
372                }
373                Err(e) => return Err(e.into()),
374            }
375        }
376    }
377
378    /// Parse a single non-row token (Done / Error / Info / …) and decide whether
379    /// the stream continues or has finished.
380    async fn parse_control_token(&mut self) -> Result<Control> {
381        loop {
382            let mut parser =
383                TokenParser::new(self.buf.clone()).with_encryption(self.encryption_enabled);
384            match parser.next_token_with_metadata(Some(&self.meta)) {
385                Ok(Some(token)) => {
386                    let consumed = self.buf.len() - parser.remaining();
387                    self.buf.advance(consumed);
388                    return self.classify(token);
389                }
390                Ok(None) => {
391                    if self.eom {
392                        return Ok(Control::Finished);
393                    }
394                    self.pull_packet().await?;
395                }
396                Err(ProtocolError::UnexpectedEof | ProtocolError::IncompletePacket { .. })
397                    if !self.eom =>
398                {
399                    self.pull_packet().await?;
400                }
401                Err(e) => return Err(e.into()),
402            }
403        }
404    }
405
406    fn classify(&mut self, token: Token) -> Result<Control> {
407        match token {
408            Token::Done(d) => {
409                if d.status.error {
410                    return Err(Error::Query(
411                        "query failed (server set error flag in DONE token)".to_string(),
412                    ));
413                }
414                Ok(if d.status.more {
415                    Control::Continue
416                } else {
417                    Control::Finished
418                })
419            }
420            Token::Error(e) => Err(server_token_to_error(&e)),
421            Token::ColMetaData(_) => Err(Error::Protocol(
422                "blob streaming does not support multiple result sets".to_string(),
423            )),
424            Token::EnvChange(ref e) => {
425                // Keep the transaction descriptor in sync with raw
426                // BEGIN/COMMIT/ROLLBACK seen mid-stream, as the buffered
427                // readers do.
428                self.client.apply_transaction_env_change(e);
429                Ok(Control::Continue)
430            }
431            // DoneProc / DoneInProc / Info / Order / etc.
432            _ => Ok(Control::Continue),
433        }
434    }
435
436    /// Reset per-row blob state after decoding a row's scalar prefix, optionally
437    /// auto-positioning the first trailing blob (for the single-blob path).
438    fn begin_row(&mut self, nbc: Option<NbcRow>) {
439        self.current_nbc = nbc;
440        self.next_blob_idx = 0;
441        self.current_blob_idx = None;
442        self.plp = None;
443        self.blob_null = false;
444        if self.auto_position_first {
445            self.position_next_blob();
446        }
447    }
448
449    /// Position the next trailing blob (sync; no socket IO). Sets up its PLP
450    /// decoder, or marks it NULL when an NBCROW omitted its value.
451    fn position_next_blob(&mut self) {
452        let j = self.next_blob_idx;
453        self.next_blob_idx += 1;
454        self.current_blob_idx = Some(j);
455        let col_idx = self.first_blob + j;
456        let is_null = self
457            .current_nbc
458            .as_ref()
459            .is_some_and(|n| n.is_null(col_idx));
460        self.blob_null = is_null;
461        self.plp = if is_null {
462            None
463        } else {
464            Some(PlpDecoder::new())
465        };
466    }
467
468    /// Drain the currently positioned blob off the wire (if not fully read).
469    async fn drain_current_blob(&mut self) -> Result<()> {
470        if self.plp.is_some() && !self.blob_null {
471            while self.read_chunk().await?.is_some() {}
472        }
473        self.plp = None;
474        Ok(())
475    }
476
477    /// Consume every remaining trailing blob of the current row off the wire, so
478    /// the next ROW/NBCROW token is reachable. Drains the positioned blob, then
479    /// positions and drains each not-yet-visited trailing blob in turn.
480    async fn drain_to_row_end(&mut self) -> Result<()> {
481        loop {
482            self.drain_current_blob().await?;
483            if self.next_blob_idx >= self.blob_count {
484                self.current_blob_idx = None;
485                return Ok(());
486            }
487            self.position_next_blob();
488        }
489    }
490
491    /// Pull one packet onto the rolling buffer. Returns `false` at EOF.
492    async fn pull_packet(&mut self) -> Result<bool> {
493        match self.client.read_response_packet().await? {
494            Some((payload, is_eom)) => {
495                if self.buf.is_empty() {
496                    self.buf = payload;
497                } else {
498                    let mut joined = BytesMut::with_capacity(self.buf.len() + payload.len());
499                    joined.extend_from_slice(&self.buf);
500                    joined.extend_from_slice(&payload);
501                    self.buf = joined.freeze();
502                }
503                self.eom |= is_eom;
504                Ok(true)
505            }
506            None => {
507                self.eom = true;
508                Ok(false)
509            }
510        }
511    }
512}
513
514/// Outcome of parsing a non-row control token.
515enum Control {
516    Continue,
517    Finished,
518}