mssql_client/blob.rs
1//! Streaming large object (LOB) reader for VARBINARY(MAX) and TEXT types.
2//!
3//! This module provides `BlobReader` for streaming large binary objects without
4//! requiring the entire object in memory at once (from the user's perspective).
5//!
6//! ## Supported Types
7//!
8//! - `VARBINARY(MAX)` - Binary data up to 2GB
9//! - `VARCHAR(MAX)` / `NVARCHAR(MAX)` - Text data up to 2GB
10//! - `TEXT` / `NTEXT` / `IMAGE` - Legacy types (prefer MAX variants)
11//! - `XML` - XML documents
12//!
13//! ## Usage
14//!
15//! ```rust,ignore
16//! use mssql_client::blob::BlobReader;
17//! use tokio::io::AsyncReadExt;
18//!
19//! // Get binary column as a blob reader
20//! let data: Bytes = row.get(0)?;
21//! let mut reader = BlobReader::from_bytes(data);
22//!
23//! // Read in chunks
24//! let mut buffer = vec![0u8; 8192];
25//! loop {
26//! let n = reader.read(&mut buffer).await?;
27//! if n == 0 {
28//! break;
29//! }
30//! process_chunk(&buffer[..n]);
31//! }
32//!
33//! // Or copy directly to a file
34//! let mut file = tokio::fs::File::create("output.bin").await?;
35//! tokio::io::copy(&mut reader, &mut file).await?;
36//! ```
37//!
38//! ## Memory Model
39//!
40//! The current implementation buffers the complete LOB data internally (received
41//! from SQL Server as a single `Bytes` allocation). The `BlobReader` API enables:
42//!
43//! - Chunked processing without additional allocations
44//! - Streaming to files or other destinations
45//! - Compatible API for future true-streaming implementation
46//!
47//! For LOBs under 100MB, this buffering approach is acceptable. For larger objects,
48//! consider application-level chunking via SQL `SUBSTRING` queries.
49
50use std::io;
51use std::pin::Pin;
52use std::task::{Context, Poll};
53
54use bytes::Bytes;
55use tokio::io::{AsyncRead, ReadBuf};
56
57/// Streaming reader for large binary objects (LOBs).
58///
59/// `BlobReader` implements [`AsyncRead`] to provide a streaming interface for
60/// reading large objects. Data can be read in chunks without allocating
61/// additional buffers for each read operation.
62///
63/// # Example
64///
65/// ```rust,ignore
66/// use mssql_client::blob::BlobReader;
67/// use tokio::io::AsyncReadExt;
68///
69/// let data: Bytes = row.get("binary_column")?;
70/// let mut reader = BlobReader::from_bytes(data);
71///
72/// // Read up to 4KB at a time
73/// let mut buffer = vec![0u8; 4096];
74/// let bytes_read = reader.read(&mut buffer).await?;
75/// ```
76pub struct BlobReader {
77 /// The underlying buffer containing LOB data.
78 buffer: Bytes,
79 /// Current read position within the buffer.
80 position: usize,
81}
82
83impl BlobReader {
84 /// Create a new `BlobReader` from a `Bytes` buffer.
85 ///
86 /// This is the primary constructor for creating a blob reader from
87 /// column data retrieved from a query result.
88 ///
89 /// # Example
90 ///
91 /// ```rust
92 /// use bytes::Bytes;
93 /// use mssql_client::blob::BlobReader;
94 ///
95 /// let data = Bytes::from_static(b"Hello, World!");
96 /// let reader = BlobReader::from_bytes(data);
97 /// assert_eq!(reader.len(), Some(13));
98 /// ```
99 #[must_use]
100 pub fn from_bytes(buffer: Bytes) -> Self {
101 Self {
102 buffer,
103 position: 0,
104 }
105 }
106
107 /// Create an empty `BlobReader`.
108 ///
109 /// Returns a reader with no data that will immediately return EOF.
110 #[must_use]
111 pub fn empty() -> Self {
112 Self {
113 buffer: Bytes::new(),
114 position: 0,
115 }
116 }
117
118 /// Create a `BlobReader` from a byte slice.
119 ///
120 /// This copies the data into an owned `Bytes` buffer.
121 #[must_use]
122 pub fn from_slice(data: &[u8]) -> Self {
123 Self {
124 buffer: Bytes::copy_from_slice(data),
125 position: 0,
126 }
127 }
128
129 /// Get the total length of the BLOB in bytes.
130 ///
131 /// Returns `Some(length)` with the total size of the data.
132 /// Returns `None` only for truly streaming implementations where
133 /// the total length is unknown in advance.
134 ///
135 /// # Example
136 ///
137 /// ```rust
138 /// use bytes::Bytes;
139 /// use mssql_client::blob::BlobReader;
140 ///
141 /// let reader = BlobReader::from_bytes(Bytes::from_static(b"test"));
142 /// assert_eq!(reader.len(), Some(4));
143 /// ```
144 #[must_use]
145 pub fn len(&self) -> Option<u64> {
146 Some(self.buffer.len() as u64)
147 }
148
149 /// Check if the BLOB is empty.
150 ///
151 /// Returns `true` if the BLOB contains no data.
152 ///
153 /// # Example
154 ///
155 /// ```rust
156 /// use bytes::Bytes;
157 /// use mssql_client::blob::BlobReader;
158 ///
159 /// let empty = BlobReader::empty();
160 /// assert!(empty.is_empty());
161 ///
162 /// let non_empty = BlobReader::from_bytes(Bytes::from_static(b"data"));
163 /// assert!(!non_empty.is_empty());
164 /// ```
165 #[must_use]
166 pub fn is_empty(&self) -> bool {
167 self.buffer.is_empty()
168 }
169
170 /// Get the number of bytes read so far.
171 ///
172 /// This tracks progress through the BLOB and can be used for
173 /// progress reporting.
174 ///
175 /// # Example
176 ///
177 /// ```rust
178 /// use bytes::Bytes;
179 /// use mssql_client::blob::BlobReader;
180 ///
181 /// let reader = BlobReader::from_bytes(Bytes::from_static(b"Hello"));
182 /// assert_eq!(reader.bytes_read(), 0);
183 /// ```
184 #[must_use]
185 pub fn bytes_read(&self) -> u64 {
186 self.position as u64
187 }
188
189 /// Get the number of bytes remaining to be read.
190 ///
191 /// Returns `Some(remaining)` if the total length is known.
192 #[must_use]
193 pub fn remaining(&self) -> Option<u64> {
194 self.len()
195 .map(|total| total.saturating_sub(self.position as u64))
196 }
197
198 /// Reset the reader position to the beginning.
199 ///
200 /// After calling this, `bytes_read()` will return 0 and subsequent
201 /// reads will start from the beginning of the data.
202 pub fn rewind(&mut self) {
203 self.position = 0;
204 }
205
206 /// Consume the reader and return the underlying `Bytes` buffer.
207 ///
208 /// This returns the complete data, including any bytes already read.
209 /// The buffer is not consumed by reads; it remains complete.
210 ///
211 /// # Example
212 ///
213 /// ```rust
214 /// use bytes::Bytes;
215 /// use mssql_client::blob::BlobReader;
216 ///
217 /// let data = Bytes::from_static(b"Hello");
218 /// let reader = BlobReader::from_bytes(data.clone());
219 /// let recovered = reader.into_bytes();
220 /// assert_eq!(recovered, data);
221 /// ```
222 #[must_use]
223 pub fn into_bytes(self) -> Bytes {
224 self.buffer
225 }
226
227 /// Get a reference to the underlying bytes.
228 ///
229 /// Returns the complete buffer, not just unread bytes.
230 #[must_use]
231 pub fn as_bytes(&self) -> &Bytes {
232 &self.buffer
233 }
234
235 /// Get the unread portion as a slice.
236 ///
237 /// Returns a slice of the data that hasn't been read yet.
238 #[must_use]
239 pub fn unread_slice(&self) -> &[u8] {
240 &self.buffer[self.position..]
241 }
242
243 /// Check if all data has been read.
244 ///
245 /// Returns `true` if `bytes_read() == len()`.
246 #[must_use]
247 pub fn is_exhausted(&self) -> bool {
248 self.position >= self.buffer.len()
249 }
250}
251
252impl Default for BlobReader {
253 fn default() -> Self {
254 Self::empty()
255 }
256}
257
258impl AsyncRead for BlobReader {
259 fn poll_read(
260 mut self: Pin<&mut Self>,
261 _cx: &mut Context<'_>,
262 buf: &mut ReadBuf<'_>,
263 ) -> Poll<io::Result<()>> {
264 // Calculate how many bytes we can read
265 let remaining = self.buffer.len().saturating_sub(self.position);
266 if remaining == 0 {
267 // EOF - no more data to read
268 return Poll::Ready(Ok(()));
269 }
270
271 // Read up to the buffer capacity or remaining data, whichever is smaller
272 let to_read = remaining.min(buf.remaining());
273 let end = self.position + to_read;
274
275 // Copy data to the read buffer
276 buf.put_slice(&self.buffer[self.position..end]);
277
278 // Update position
279 self.position = end;
280
281 Poll::Ready(Ok(()))
282 }
283}
284
285impl std::fmt::Debug for BlobReader {
286 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
287 f.debug_struct("BlobReader")
288 .field("total_len", &self.buffer.len())
289 .field("position", &self.position)
290 .field("remaining", &self.remaining())
291 .finish()
292 }
293}
294
295impl Clone for BlobReader {
296 /// Clone the reader.
297 ///
298 /// The cloned reader shares the underlying `Bytes` buffer (cheap clone)
299 /// but has its own position, starting from the beginning.
300 fn clone(&self) -> Self {
301 Self {
302 buffer: self.buffer.clone(),
303 position: 0, // Clone starts from beginning
304 }
305 }
306}
307
308impl From<Bytes> for BlobReader {
309 fn from(bytes: Bytes) -> Self {
310 Self::from_bytes(bytes)
311 }
312}
313
314impl From<Vec<u8>> for BlobReader {
315 fn from(vec: Vec<u8>) -> Self {
316 Self::from_bytes(Bytes::from(vec))
317 }
318}
319
320impl From<&[u8]> for BlobReader {
321 fn from(slice: &[u8]) -> Self {
322 Self::from_slice(slice)
323 }
324}
325
326impl From<&str> for BlobReader {
327 fn from(s: &str) -> Self {
328 Self::from_slice(s.as_bytes())
329 }
330}
331
332impl From<String> for BlobReader {
333 fn from(s: String) -> Self {
334 Self::from_bytes(Bytes::from(s))
335 }
336}
337
338#[cfg(test)]
339#[allow(clippy::unwrap_used)]
340mod tests {
341 use super::*;
342 use tokio::io::AsyncReadExt;
343
344 #[test]
345 fn test_blob_reader_creation() {
346 let data = Bytes::from_static(b"Hello, World!");
347 let reader = BlobReader::from_bytes(data);
348
349 assert_eq!(reader.len(), Some(13));
350 assert!(!reader.is_empty());
351 assert_eq!(reader.bytes_read(), 0);
352 assert_eq!(reader.remaining(), Some(13));
353 }
354
355 #[test]
356 fn test_blob_reader_empty() {
357 let reader = BlobReader::empty();
358
359 assert_eq!(reader.len(), Some(0));
360 assert!(reader.is_empty());
361 assert!(reader.is_exhausted());
362 }
363
364 #[test]
365 fn test_blob_reader_from_slice() {
366 let reader = BlobReader::from_slice(b"test data");
367
368 assert_eq!(reader.len(), Some(9));
369 assert_eq!(reader.as_bytes().as_ref(), b"test data");
370 }
371
372 #[tokio::test]
373 async fn test_blob_reader_read_all() {
374 let data = Bytes::from_static(b"Hello, World!");
375 let mut reader = BlobReader::from_bytes(data);
376
377 let mut output = Vec::new();
378 reader.read_to_end(&mut output).await.unwrap();
379
380 assert_eq!(output, b"Hello, World!");
381 assert_eq!(reader.bytes_read(), 13);
382 assert!(reader.is_exhausted());
383 }
384
385 #[tokio::test]
386 async fn test_blob_reader_read_chunked() {
387 let data = Bytes::from_static(b"0123456789ABCDEF");
388 let mut reader = BlobReader::from_bytes(data);
389
390 let mut buffer = [0u8; 4];
391
392 // Read first chunk
393 let n = reader.read(&mut buffer).await.unwrap();
394 assert_eq!(n, 4);
395 assert_eq!(&buffer, b"0123");
396 assert_eq!(reader.bytes_read(), 4);
397
398 // Read second chunk
399 let n = reader.read(&mut buffer).await.unwrap();
400 assert_eq!(n, 4);
401 assert_eq!(&buffer, b"4567");
402 assert_eq!(reader.bytes_read(), 8);
403
404 // Read remaining
405 let mut remaining = Vec::new();
406 reader.read_to_end(&mut remaining).await.unwrap();
407 assert_eq!(remaining, b"89ABCDEF");
408 assert!(reader.is_exhausted());
409 }
410
411 #[tokio::test]
412 async fn test_blob_reader_empty_read() {
413 let mut reader = BlobReader::empty();
414
415 let mut buffer = [0u8; 10];
416 let n = reader.read(&mut buffer).await.unwrap();
417
418 assert_eq!(n, 0); // EOF immediately
419 }
420
421 #[test]
422 fn test_blob_reader_rewind() {
423 let data = Bytes::from_static(b"test");
424 let mut reader = BlobReader::from_bytes(data);
425
426 // Simulate reading
427 reader.position = 4;
428 assert!(reader.is_exhausted());
429
430 // Rewind
431 reader.rewind();
432 assert_eq!(reader.bytes_read(), 0);
433 assert!(!reader.is_exhausted());
434 }
435
436 #[test]
437 fn test_blob_reader_into_bytes() {
438 let data = Bytes::from_static(b"Hello");
439 let reader = BlobReader::from_bytes(data.clone());
440
441 // Consume and get bytes back
442 let recovered = reader.into_bytes();
443 assert_eq!(recovered, data);
444 }
445
446 #[test]
447 fn test_blob_reader_unread_slice() {
448 let data = Bytes::from_static(b"Hello");
449 let mut reader = BlobReader::from_bytes(data);
450
451 assert_eq!(reader.unread_slice(), b"Hello");
452
453 reader.position = 2;
454 assert_eq!(reader.unread_slice(), b"llo");
455 }
456
457 #[test]
458 fn test_blob_reader_clone() {
459 let data = Bytes::from_static(b"test");
460 let mut original = BlobReader::from_bytes(data);
461 original.position = 2;
462
463 let cloned = original.clone();
464
465 // Cloned reader starts from beginning
466 assert_eq!(cloned.bytes_read(), 0);
467 assert_eq!(original.bytes_read(), 2);
468
469 // Both share the same underlying data
470 assert_eq!(cloned.as_bytes(), original.as_bytes());
471 }
472
473 #[test]
474 fn test_blob_reader_from_conversions() {
475 let from_vec: BlobReader = vec![1u8, 2, 3].into();
476 assert_eq!(from_vec.len(), Some(3));
477
478 let from_slice: BlobReader = b"hello".as_slice().into();
479 assert_eq!(from_slice.len(), Some(5));
480
481 let from_str: BlobReader = "world".into();
482 assert_eq!(from_str.len(), Some(5));
483
484 let from_string: BlobReader = String::from("test").into();
485 assert_eq!(from_string.len(), Some(4));
486 }
487
488 #[test]
489 fn test_blob_reader_debug() {
490 let reader = BlobReader::from_bytes(Bytes::from_static(b"test"));
491 let debug = format!("{:?}", reader);
492
493 assert!(debug.contains("BlobReader"));
494 assert!(debug.contains("total_len"));
495 assert!(debug.contains("position"));
496 }
497
498 #[tokio::test]
499 async fn test_blob_reader_large_data() {
500 // Test with larger data to ensure no issues with buffer sizes
501 let data: Vec<u8> = (0..10000).map(|i| (i % 256) as u8).collect();
502 let mut reader = BlobReader::from_bytes(Bytes::from(data.clone()));
503
504 let mut output = Vec::new();
505 reader.read_to_end(&mut output).await.unwrap();
506
507 assert_eq!(output, data);
508 }
509}