mssql_client/
blob.rs

1//! Streaming large object (LOB) reader for VARBINARY(MAX) and TEXT types.
2//!
3//! This module provides `BlobReader` for streaming large binary objects without
4//! requiring the entire object in memory at once (from the user's perspective).
5//!
6//! ## Supported Types
7//!
8//! - `VARBINARY(MAX)` - Binary data up to 2GB
9//! - `VARCHAR(MAX)` / `NVARCHAR(MAX)` - Text data up to 2GB
10//! - `TEXT` / `NTEXT` / `IMAGE` - Legacy types (prefer MAX variants)
11//! - `XML` - XML documents
12//!
13//! ## Usage
14//!
15//! ```rust,ignore
16//! use mssql_client::blob::BlobReader;
17//! use tokio::io::AsyncReadExt;
18//!
19//! // Get binary column as a blob reader
20//! let data: Bytes = row.get(0)?;
21//! let mut reader = BlobReader::from_bytes(data);
22//!
23//! // Read in chunks
24//! let mut buffer = vec![0u8; 8192];
25//! loop {
26//!     let n = reader.read(&mut buffer).await?;
27//!     if n == 0 {
28//!         break;
29//!     }
30//!     process_chunk(&buffer[..n]);
31//! }
32//!
33//! // Or copy directly to a file
34//! let mut file = tokio::fs::File::create("output.bin").await?;
35//! tokio::io::copy(&mut reader, &mut file).await?;
36//! ```
37//!
38//! ## Memory Model
39//!
40//! The current implementation buffers the complete LOB data internally (received
41//! from SQL Server as a single `Bytes` allocation). The `BlobReader` API enables:
42//!
43//! - Chunked processing without additional allocations
44//! - Streaming to files or other destinations
45//! - Compatible API for future true-streaming implementation
46//!
47//! For LOBs under 100MB, this buffering approach is acceptable. For larger objects,
48//! consider application-level chunking via SQL `SUBSTRING` queries.
49
50use std::io;
51use std::pin::Pin;
52use std::task::{Context, Poll};
53
54use bytes::Bytes;
55use tokio::io::{AsyncRead, ReadBuf};
56
57/// Streaming reader for large binary objects (LOBs).
58///
59/// `BlobReader` implements [`AsyncRead`] to provide a streaming interface for
60/// reading large objects. Data can be read in chunks without allocating
61/// additional buffers for each read operation.
62///
63/// # Example
64///
65/// ```rust,ignore
66/// use mssql_client::blob::BlobReader;
67/// use tokio::io::AsyncReadExt;
68///
69/// let data: Bytes = row.get("binary_column")?;
70/// let mut reader = BlobReader::from_bytes(data);
71///
72/// // Read up to 4KB at a time
73/// let mut buffer = vec![0u8; 4096];
74/// let bytes_read = reader.read(&mut buffer).await?;
75/// ```
76pub struct BlobReader {
77    /// The underlying buffer containing LOB data.
78    buffer: Bytes,
79    /// Current read position within the buffer.
80    position: usize,
81}
82
83impl BlobReader {
84    /// Create a new `BlobReader` from a `Bytes` buffer.
85    ///
86    /// This is the primary constructor for creating a blob reader from
87    /// column data retrieved from a query result.
88    ///
89    /// # Example
90    ///
91    /// ```rust
92    /// use bytes::Bytes;
93    /// use mssql_client::blob::BlobReader;
94    ///
95    /// let data = Bytes::from_static(b"Hello, World!");
96    /// let reader = BlobReader::from_bytes(data);
97    /// assert_eq!(reader.len(), Some(13));
98    /// ```
99    #[must_use]
100    pub fn from_bytes(buffer: Bytes) -> Self {
101        Self {
102            buffer,
103            position: 0,
104        }
105    }
106
107    /// Create an empty `BlobReader`.
108    ///
109    /// Returns a reader with no data that will immediately return EOF.
110    #[must_use]
111    pub fn empty() -> Self {
112        Self {
113            buffer: Bytes::new(),
114            position: 0,
115        }
116    }
117
118    /// Create a `BlobReader` from a byte slice.
119    ///
120    /// This copies the data into an owned `Bytes` buffer.
121    #[must_use]
122    pub fn from_slice(data: &[u8]) -> Self {
123        Self {
124            buffer: Bytes::copy_from_slice(data),
125            position: 0,
126        }
127    }
128
129    /// Get the total length of the BLOB in bytes.
130    ///
131    /// Returns `Some(length)` with the total size of the data.
132    /// Returns `None` only for truly streaming implementations where
133    /// the total length is unknown in advance.
134    ///
135    /// # Example
136    ///
137    /// ```rust
138    /// use bytes::Bytes;
139    /// use mssql_client::blob::BlobReader;
140    ///
141    /// let reader = BlobReader::from_bytes(Bytes::from_static(b"test"));
142    /// assert_eq!(reader.len(), Some(4));
143    /// ```
144    #[must_use]
145    pub fn len(&self) -> Option<u64> {
146        Some(self.buffer.len() as u64)
147    }
148
149    /// Check if the BLOB is empty.
150    ///
151    /// Returns `true` if the BLOB contains no data.
152    ///
153    /// # Example
154    ///
155    /// ```rust
156    /// use bytes::Bytes;
157    /// use mssql_client::blob::BlobReader;
158    ///
159    /// let empty = BlobReader::empty();
160    /// assert!(empty.is_empty());
161    ///
162    /// let non_empty = BlobReader::from_bytes(Bytes::from_static(b"data"));
163    /// assert!(!non_empty.is_empty());
164    /// ```
165    #[must_use]
166    pub fn is_empty(&self) -> bool {
167        self.buffer.is_empty()
168    }
169
170    /// Get the number of bytes read so far.
171    ///
172    /// This tracks progress through the BLOB and can be used for
173    /// progress reporting.
174    ///
175    /// # Example
176    ///
177    /// ```rust
178    /// use bytes::Bytes;
179    /// use mssql_client::blob::BlobReader;
180    ///
181    /// let reader = BlobReader::from_bytes(Bytes::from_static(b"Hello"));
182    /// assert_eq!(reader.bytes_read(), 0);
183    /// ```
184    #[must_use]
185    pub fn bytes_read(&self) -> u64 {
186        self.position as u64
187    }
188
189    /// Get the number of bytes remaining to be read.
190    ///
191    /// Returns `Some(remaining)` if the total length is known.
192    #[must_use]
193    pub fn remaining(&self) -> Option<u64> {
194        self.len()
195            .map(|total| total.saturating_sub(self.position as u64))
196    }
197
198    /// Reset the reader position to the beginning.
199    ///
200    /// After calling this, `bytes_read()` will return 0 and subsequent
201    /// reads will start from the beginning of the data.
202    pub fn rewind(&mut self) {
203        self.position = 0;
204    }
205
206    /// Consume the reader and return the underlying `Bytes` buffer.
207    ///
208    /// This returns the complete data, including any bytes already read.
209    /// The buffer is not consumed by reads; it remains complete.
210    ///
211    /// # Example
212    ///
213    /// ```rust
214    /// use bytes::Bytes;
215    /// use mssql_client::blob::BlobReader;
216    ///
217    /// let data = Bytes::from_static(b"Hello");
218    /// let reader = BlobReader::from_bytes(data.clone());
219    /// let recovered = reader.into_bytes();
220    /// assert_eq!(recovered, data);
221    /// ```
222    #[must_use]
223    pub fn into_bytes(self) -> Bytes {
224        self.buffer
225    }
226
227    /// Get a reference to the underlying bytes.
228    ///
229    /// Returns the complete buffer, not just unread bytes.
230    #[must_use]
231    pub fn as_bytes(&self) -> &Bytes {
232        &self.buffer
233    }
234
235    /// Get the unread portion as a slice.
236    ///
237    /// Returns a slice of the data that hasn't been read yet.
238    #[must_use]
239    pub fn unread_slice(&self) -> &[u8] {
240        &self.buffer[self.position..]
241    }
242
243    /// Check if all data has been read.
244    ///
245    /// Returns `true` if `bytes_read() == len()`.
246    #[must_use]
247    pub fn is_exhausted(&self) -> bool {
248        self.position >= self.buffer.len()
249    }
250}
251
252impl Default for BlobReader {
253    fn default() -> Self {
254        Self::empty()
255    }
256}
257
258impl AsyncRead for BlobReader {
259    fn poll_read(
260        mut self: Pin<&mut Self>,
261        _cx: &mut Context<'_>,
262        buf: &mut ReadBuf<'_>,
263    ) -> Poll<io::Result<()>> {
264        // Calculate how many bytes we can read
265        let remaining = self.buffer.len().saturating_sub(self.position);
266        if remaining == 0 {
267            // EOF - no more data to read
268            return Poll::Ready(Ok(()));
269        }
270
271        // Read up to the buffer capacity or remaining data, whichever is smaller
272        let to_read = remaining.min(buf.remaining());
273        let end = self.position + to_read;
274
275        // Copy data to the read buffer
276        buf.put_slice(&self.buffer[self.position..end]);
277
278        // Update position
279        self.position = end;
280
281        Poll::Ready(Ok(()))
282    }
283}
284
285impl std::fmt::Debug for BlobReader {
286    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
287        f.debug_struct("BlobReader")
288            .field("total_len", &self.buffer.len())
289            .field("position", &self.position)
290            .field("remaining", &self.remaining())
291            .finish()
292    }
293}
294
295impl Clone for BlobReader {
296    /// Clone the reader.
297    ///
298    /// The cloned reader shares the underlying `Bytes` buffer (cheap clone)
299    /// but has its own position, starting from the beginning.
300    fn clone(&self) -> Self {
301        Self {
302            buffer: self.buffer.clone(),
303            position: 0, // Clone starts from beginning
304        }
305    }
306}
307
308impl From<Bytes> for BlobReader {
309    fn from(bytes: Bytes) -> Self {
310        Self::from_bytes(bytes)
311    }
312}
313
314impl From<Vec<u8>> for BlobReader {
315    fn from(vec: Vec<u8>) -> Self {
316        Self::from_bytes(Bytes::from(vec))
317    }
318}
319
320impl From<&[u8]> for BlobReader {
321    fn from(slice: &[u8]) -> Self {
322        Self::from_slice(slice)
323    }
324}
325
326impl From<&str> for BlobReader {
327    fn from(s: &str) -> Self {
328        Self::from_slice(s.as_bytes())
329    }
330}
331
332impl From<String> for BlobReader {
333    fn from(s: String) -> Self {
334        Self::from_bytes(Bytes::from(s))
335    }
336}
337
338#[cfg(test)]
339#[allow(clippy::unwrap_used)]
340mod tests {
341    use super::*;
342    use tokio::io::AsyncReadExt;
343
344    #[test]
345    fn test_blob_reader_creation() {
346        let data = Bytes::from_static(b"Hello, World!");
347        let reader = BlobReader::from_bytes(data);
348
349        assert_eq!(reader.len(), Some(13));
350        assert!(!reader.is_empty());
351        assert_eq!(reader.bytes_read(), 0);
352        assert_eq!(reader.remaining(), Some(13));
353    }
354
355    #[test]
356    fn test_blob_reader_empty() {
357        let reader = BlobReader::empty();
358
359        assert_eq!(reader.len(), Some(0));
360        assert!(reader.is_empty());
361        assert!(reader.is_exhausted());
362    }
363
364    #[test]
365    fn test_blob_reader_from_slice() {
366        let reader = BlobReader::from_slice(b"test data");
367
368        assert_eq!(reader.len(), Some(9));
369        assert_eq!(reader.as_bytes().as_ref(), b"test data");
370    }
371
372    #[tokio::test]
373    async fn test_blob_reader_read_all() {
374        let data = Bytes::from_static(b"Hello, World!");
375        let mut reader = BlobReader::from_bytes(data);
376
377        let mut output = Vec::new();
378        reader.read_to_end(&mut output).await.unwrap();
379
380        assert_eq!(output, b"Hello, World!");
381        assert_eq!(reader.bytes_read(), 13);
382        assert!(reader.is_exhausted());
383    }
384
385    #[tokio::test]
386    async fn test_blob_reader_read_chunked() {
387        let data = Bytes::from_static(b"0123456789ABCDEF");
388        let mut reader = BlobReader::from_bytes(data);
389
390        let mut buffer = [0u8; 4];
391
392        // Read first chunk
393        let n = reader.read(&mut buffer).await.unwrap();
394        assert_eq!(n, 4);
395        assert_eq!(&buffer, b"0123");
396        assert_eq!(reader.bytes_read(), 4);
397
398        // Read second chunk
399        let n = reader.read(&mut buffer).await.unwrap();
400        assert_eq!(n, 4);
401        assert_eq!(&buffer, b"4567");
402        assert_eq!(reader.bytes_read(), 8);
403
404        // Read remaining
405        let mut remaining = Vec::new();
406        reader.read_to_end(&mut remaining).await.unwrap();
407        assert_eq!(remaining, b"89ABCDEF");
408        assert!(reader.is_exhausted());
409    }
410
411    #[tokio::test]
412    async fn test_blob_reader_empty_read() {
413        let mut reader = BlobReader::empty();
414
415        let mut buffer = [0u8; 10];
416        let n = reader.read(&mut buffer).await.unwrap();
417
418        assert_eq!(n, 0); // EOF immediately
419    }
420
421    #[test]
422    fn test_blob_reader_rewind() {
423        let data = Bytes::from_static(b"test");
424        let mut reader = BlobReader::from_bytes(data);
425
426        // Simulate reading
427        reader.position = 4;
428        assert!(reader.is_exhausted());
429
430        // Rewind
431        reader.rewind();
432        assert_eq!(reader.bytes_read(), 0);
433        assert!(!reader.is_exhausted());
434    }
435
436    #[test]
437    fn test_blob_reader_into_bytes() {
438        let data = Bytes::from_static(b"Hello");
439        let reader = BlobReader::from_bytes(data.clone());
440
441        // Consume and get bytes back
442        let recovered = reader.into_bytes();
443        assert_eq!(recovered, data);
444    }
445
446    #[test]
447    fn test_blob_reader_unread_slice() {
448        let data = Bytes::from_static(b"Hello");
449        let mut reader = BlobReader::from_bytes(data);
450
451        assert_eq!(reader.unread_slice(), b"Hello");
452
453        reader.position = 2;
454        assert_eq!(reader.unread_slice(), b"llo");
455    }
456
457    #[test]
458    fn test_blob_reader_clone() {
459        let data = Bytes::from_static(b"test");
460        let mut original = BlobReader::from_bytes(data);
461        original.position = 2;
462
463        let cloned = original.clone();
464
465        // Cloned reader starts from beginning
466        assert_eq!(cloned.bytes_read(), 0);
467        assert_eq!(original.bytes_read(), 2);
468
469        // Both share the same underlying data
470        assert_eq!(cloned.as_bytes(), original.as_bytes());
471    }
472
473    #[test]
474    fn test_blob_reader_from_conversions() {
475        let from_vec: BlobReader = vec![1u8, 2, 3].into();
476        assert_eq!(from_vec.len(), Some(3));
477
478        let from_slice: BlobReader = b"hello".as_slice().into();
479        assert_eq!(from_slice.len(), Some(5));
480
481        let from_str: BlobReader = "world".into();
482        assert_eq!(from_str.len(), Some(5));
483
484        let from_string: BlobReader = String::from("test").into();
485        assert_eq!(from_string.len(), Some(4));
486    }
487
488    #[test]
489    fn test_blob_reader_debug() {
490        let reader = BlobReader::from_bytes(Bytes::from_static(b"test"));
491        let debug = format!("{:?}", reader);
492
493        assert!(debug.contains("BlobReader"));
494        assert!(debug.contains("total_len"));
495        assert!(debug.contains("position"));
496    }
497
498    #[tokio::test]
499    async fn test_blob_reader_large_data() {
500        // Test with larger data to ensure no issues with buffer sizes
501        let data: Vec<u8> = (0..10000).map(|i| (i % 256) as u8).collect();
502        let mut reader = BlobReader::from_bytes(Bytes::from(data.clone()));
503
504        let mut output = Vec::new();
505        reader.read_to_end(&mut output).await.unwrap();
506
507        assert_eq!(output, data);
508    }
509}