Skip to main content

mssql_client/
bulk.rs

1//! Bulk Copy Protocol (BCP) support.
2//!
3//! This module provides first-class support for bulk insert operations using
4//! the TDS Bulk Load protocol (packet type 0x07). BCP is significantly more
5//! efficient than individual INSERT statements for loading large amounts of data.
6//!
7//! ## Performance Benefits
8//!
9//! - Minimal logging (when using simple recovery model)
10//! - Batch commits reduce lock contention
11//! - Direct data streaming without SQL parsing overhead
12//! - Optional table lock for maximum throughput
13//!
14//! ## Usage
15//!
16//! ```rust,ignore
17//! use mssql_client::{BulkInsertBuilder, BulkColumn, BulkOptions};
18//!
19//! let builder = BulkInsertBuilder::new("dbo.Users")
20//!     .with_typed_columns(vec![
21//!         BulkColumn::new("id", "INT", 0)?,
22//!         BulkColumn::new("name", "NVARCHAR(100)", 1)?,
23//!         BulkColumn::new("email", "NVARCHAR(200)", 2)?,
24//!     ])
25//!     .with_options(BulkOptions {
26//!         batch_size: 1000,
27//!         check_constraints: true,
28//!         fire_triggers: false,
29//!         keep_nulls: true,
30//!         table_lock: true,
31//!         order_hint: None,
32//!     });
33//!
34//! let mut writer = client.bulk_insert(&builder).await?;
35//!
36//! // Send rows — buffered in memory, sent on finish()
37//! for user in users {
38//!     writer.send_row(&[&user.id, &user.name, &user.email])?;
39//! }
40//!
41//! let result = writer.finish().await?;
42//! println!("Inserted {} rows", result.rows_affected);
43//! ```
44//!
45//! ## Implementation Notes
46//!
47//! The bulk load protocol uses:
48//! - Packet type 0x07 (BulkLoad)
49//! - COLMETADATA token describing column structure
50//! - ROW tokens containing actual data
51//! - DONE token signaling completion
52//!
53//! Per MS-TDS specification, the row data format matches the server output format
54//! (same as SELECT results) rather than storage format.
55
56use bytes::{BufMut, BytesMut};
57use once_cell::sync::Lazy;
58use regex::Regex;
59use std::sync::Arc;
60
61use mssql_types::{SqlValue, ToSql, TypeError};
62use tds_protocol::packet::{PacketHeader, PacketStatus, PacketType};
63use tds_protocol::token::{Collation, DoneStatus, TokenType};
64
65use crate::error::Error;
66
67/// Options controlling bulk insert behavior.
68///
69/// These options map to SQL Server's BULK INSERT hints and
70/// affect performance, logging, and constraint checking.
71#[derive(Debug, Clone)]
72pub struct BulkOptions {
73    /// Number of rows per batch commit.
74    ///
75    /// Smaller batches use less memory but have more overhead.
76    /// Larger batches are more efficient but hold locks longer.
77    /// Default: 0 (single batch for entire operation).
78    pub batch_size: usize,
79
80    /// Check constraints during insert.
81    ///
82    /// Default: true
83    pub check_constraints: bool,
84
85    /// Fire INSERT triggers on the table.
86    ///
87    /// Default: false (better performance)
88    pub fire_triggers: bool,
89
90    /// Keep NULL values instead of using column defaults.
91    ///
92    /// Default: true
93    pub keep_nulls: bool,
94
95    /// Acquire a table-level lock for the duration of the bulk operation.
96    ///
97    /// This can significantly improve performance by reducing lock
98    /// escalation overhead, but blocks all other access to the table.
99    /// Default: false
100    pub table_lock: bool,
101
102    /// Order hint for the data being inserted.
103    ///
104    /// If data is pre-sorted by the clustered index, specify the columns
105    /// here to avoid a sort operation on the server.
106    /// Default: None
107    pub order_hint: Option<Vec<String>>,
108}
109
110impl Default for BulkOptions {
111    fn default() -> Self {
112        Self {
113            batch_size: 0,
114            check_constraints: true,
115            fire_triggers: false,
116            keep_nulls: true,
117            table_lock: false,
118            order_hint: None,
119        }
120    }
121}
122
123/// Column definition for bulk insert.
124#[derive(Debug, Clone)]
125pub struct BulkColumn {
126    /// Column name.
127    pub name: String,
128    /// SQL Server type (e.g., "INT", "NVARCHAR(100)").
129    pub sql_type: String,
130    /// Whether the column allows NULL values.
131    pub nullable: bool,
132    /// Column ordinal (0-based).
133    pub ordinal: usize,
134    /// TDS type ID.
135    type_id: u8,
136    /// Maximum length for variable-length types.
137    max_length: Option<u32>,
138    /// Precision for decimal types.
139    precision: Option<u8>,
140    /// Scale for decimal types.
141    scale: Option<u8>,
142    /// Collation for VARCHAR/CHAR columns.
143    ///
144    /// Populated automatically from the server's COLMETADATA when
145    /// [`Client::bulk_insert`](crate::Client::bulk_insert) is used. Can be set
146    /// manually via [`with_collation`](Self::with_collation) for the
147    /// schema-discovery-free path. When `None`, VARCHAR values fall back to
148    /// the default Latin1_General_CI_AS collation (Windows-1252).
149    collation: Option<Collation>,
150}
151
152impl BulkColumn {
153    /// Create a new bulk column definition.
154    ///
155    /// # Errors
156    ///
157    /// Returns [`TypeError::UnsupportedType`] when `sql_type` names a deprecated
158    /// large object type (`TEXT`, `NTEXT`, `IMAGE`). Use `VARCHAR(MAX)` /
159    /// `NVARCHAR(MAX)` / `VARBINARY(MAX)` instead — Microsoft deprecated
160    /// `TEXT` / `NTEXT` / `IMAGE` in SQL Server 2005 and recommends the `MAX`
161    /// types for all new development.
162    pub fn new<S: Into<String>>(name: S, sql_type: S, ordinal: usize) -> Result<Self, TypeError> {
163        let sql_type_str: String = sql_type.into();
164        reject_unsupported_bulk_type(&sql_type_str)?;
165        let (type_id, max_length, precision, scale) = parse_sql_type(&sql_type_str);
166
167        Ok(Self {
168            name: name.into(),
169            sql_type: sql_type_str,
170            nullable: true,
171            ordinal,
172            type_id,
173            max_length,
174            precision,
175            scale,
176            collation: None,
177        })
178    }
179
180    /// Set whether this column allows NULL values.
181    #[must_use]
182    pub fn with_nullable(mut self, nullable: bool) -> Self {
183        self.nullable = nullable;
184        self
185    }
186
187    /// Set the collation used for VARCHAR/CHAR columns.
188    ///
189    /// Required when [`Client::bulk_insert_without_schema_discovery`](crate::Client::bulk_insert_without_schema_discovery)
190    /// targets VARCHAR columns on a server whose default collation is not
191    /// Latin1_General_CI_AS and the target column uses a different code page.
192    /// Ignored for NVARCHAR/NCHAR columns (always UTF-16).
193    #[must_use]
194    pub fn with_collation(mut self, collation: Collation) -> Self {
195        self.collation = Some(collation);
196        self
197    }
198}
199
200/// Parse SQL type string into TDS type information.
201///
202/// Type parameters (e.g., the "100" in `VARCHAR(100)`) are parsed with
203/// `.parse().ok()` — if a parameter is malformed it falls through to the
204/// type's SQL Server default length (e.g., 8000 for VARCHAR, 4000 for
205/// NVARCHAR). This is intentional: bulk-insert column definitions come
206/// from user code, and defaulting to max length is safer than rejecting
207/// the operation when the base type is valid.
208fn parse_sql_type(sql_type: &str) -> (u8, Option<u32>, Option<u8>, Option<u8>) {
209    let upper = sql_type.to_uppercase();
210
211    // Extract base type and parameters
212    let (base, params) = if let Some(paren_pos) = upper.find('(') {
213        let base = &upper[..paren_pos];
214        let params_str = upper[paren_pos + 1..].trim_end_matches(')');
215        (base, Some(params_str))
216    } else {
217        (upper.as_str(), None)
218    };
219
220    // This returns the nullable type variant ID. `write_colmetadata` switches
221    // to the fixed-width variant (e.g. 0x26 INTN → 0x38 Int4) when the target
222    // column is NOT NULL, since SQL Server's BulkLoad rejects nullable type IDs
223    // for NOT NULL columns with error 4816.
224    match base {
225        "BIT" => (0x68, Some(1), None, None),      // BITN
226        "TINYINT" => (0x26, Some(1), None, None),  // INTN(1)
227        "SMALLINT" => (0x26, Some(2), None, None), // INTN(2)
228        "INT" => (0x26, Some(4), None, None),      // INTN(4)
229        "BIGINT" => (0x26, Some(8), None, None),   // INTN(8)
230        "REAL" => (0x6D, Some(4), None, None),     // FLTN(4)
231        "FLOAT" => (0x6D, Some(8), None, None),    // FLTN(8)
232        "DATE" => (0x28, None, None, None),
233        "TIME" => {
234            let scale = params.and_then(|p| p.parse().ok()).unwrap_or(7);
235            (0x29, None, None, Some(scale))
236        }
237        "DATETIME" => (0x6F, Some(8), None, None), // DATETIMEN(8)
238        "DATETIME2" => {
239            let scale = params.and_then(|p| p.parse().ok()).unwrap_or(7);
240            (0x2A, None, None, Some(scale))
241        }
242        "DATETIMEOFFSET" => {
243            let scale = params.and_then(|p| p.parse().ok()).unwrap_or(7);
244            (0x2B, None, None, Some(scale))
245        }
246        "SMALLDATETIME" => (0x6F, Some(4), None, None), // DATETIMEN(4)
247        "UNIQUEIDENTIFIER" => (0x24, Some(16), None, None),
248        "VARCHAR" | "CHAR" => {
249            let len = params
250                .and_then(|p| {
251                    if p == "MAX" {
252                        Some(0xFFFF_u32)
253                    } else {
254                        p.parse().ok()
255                    }
256                })
257                .unwrap_or(8000);
258            (0xA7, Some(len), None, None)
259        }
260        "NVARCHAR" | "NCHAR" => {
261            let is_max = params.map(|p| p == "MAX").unwrap_or(false);
262            if is_max {
263                // MAX types use 0xFFFF marker (not doubled)
264                (0xE7, Some(0xFFFF), None, None)
265            } else {
266                // Normal lengths are in characters, double for UTF-16 byte length
267                let len = params.and_then(|p| p.parse().ok()).unwrap_or(4000);
268                (0xE7, Some(len * 2), None, None)
269            }
270        }
271        "VARBINARY" | "BINARY" => {
272            let len = params
273                .and_then(|p| {
274                    if p == "MAX" {
275                        Some(0xFFFF_u32)
276                    } else {
277                        p.parse().ok()
278                    }
279                })
280                .unwrap_or(8000);
281            (0xA5, Some(len), None, None)
282        }
283        "DECIMAL" | "NUMERIC" => {
284            let (precision, scale) = if let Some(p) = params {
285                let parts: Vec<&str> = p.split(',').map(|s| s.trim()).collect();
286                (
287                    parts.first().and_then(|s| s.parse().ok()).unwrap_or(18),
288                    parts.get(1).and_then(|s| s.parse().ok()).unwrap_or(0),
289                )
290            } else {
291                (18, 0)
292            };
293            (0x6C, None, Some(precision), Some(scale))
294        }
295        "MONEY" => (0x6E, Some(8), None, None), // MONEYN(8)
296        "SMALLMONEY" => (0x6E, Some(4), None, None), // MONEYN(4)
297        "XML" => (0xF1, Some(0xFFFF), None, None),
298        _ => (0xE7, Some(8000), None, None), // Default to NVARCHAR(4000)
299    }
300}
301
302/// Reject deprecated large object types that this driver does not support in
303/// bulk insert. `TEXT` / `NTEXT` / `IMAGE` have been deprecated since SQL
304/// Server 2005 and use a legacy TEXTPTR wire format. Users should use
305/// `VARCHAR(MAX)` / `NVARCHAR(MAX)` / `VARBINARY(MAX)` which the driver
306/// supports end-to-end.
307fn reject_unsupported_bulk_type(sql_type: &str) -> Result<(), TypeError> {
308    let base = sql_type
309        .split('(')
310        .next()
311        .unwrap_or("")
312        .trim()
313        .to_uppercase();
314    match base.as_str() {
315        "TEXT" | "NTEXT" => Err(TypeError::UnsupportedType {
316            sql_type: base,
317            reason: "TEXT/NTEXT are not supported. Use VARCHAR(MAX) / \
318                     NVARCHAR(MAX) instead (Microsoft deprecated TEXT/NTEXT in \
319                     SQL Server 2005)."
320                .to_string(),
321        }),
322        "IMAGE" => Err(TypeError::UnsupportedType {
323            sql_type: base,
324            reason: "IMAGE is not supported. Use VARBINARY(MAX) instead \
325                     (Microsoft deprecated IMAGE in SQL Server 2005)."
326                .to_string(),
327        }),
328        _ => Ok(()),
329    }
330}
331
332/// Result of a bulk insert operation.
333#[derive(Debug, Clone)]
334pub struct BulkInsertResult {
335    /// Total number of rows inserted.
336    pub rows_affected: u64,
337    /// Number of batches committed.
338    pub batches_committed: u32,
339    /// Whether any errors were encountered.
340    pub has_errors: bool,
341}
342
343/// Builder for configuring a bulk insert operation.
344#[derive(Debug)]
345pub struct BulkInsertBuilder {
346    table_name: String,
347    columns: Vec<BulkColumn>,
348    options: BulkOptions,
349}
350
351impl BulkInsertBuilder {
352    /// Create a new bulk insert builder for the specified table.
353    pub fn new<S: Into<String>>(table_name: S) -> Self {
354        Self {
355            table_name: table_name.into(),
356            columns: Vec::new(),
357            options: BulkOptions::default(),
358        }
359    }
360
361    /// Specify the columns to insert.
362    ///
363    /// Columns will be queried from the server if not specified,
364    /// but providing them explicitly is more efficient.
365    #[must_use]
366    #[allow(clippy::expect_used)] // NVARCHAR(MAX) is always a supported bulk type
367    pub fn with_columns(mut self, column_names: &[&str]) -> Self {
368        self.columns = column_names
369            .iter()
370            .enumerate()
371            .map(|(i, name)| {
372                BulkColumn::new(*name, "NVARCHAR(MAX)", i)
373                    .expect("NVARCHAR(MAX) is always a supported type")
374            })
375            .collect();
376        self
377    }
378
379    /// Specify columns with full type information.
380    #[must_use]
381    pub fn with_typed_columns(mut self, columns: Vec<BulkColumn>) -> Self {
382        self.columns = columns;
383        self
384    }
385
386    /// Set bulk insert options.
387    #[must_use]
388    pub fn with_options(mut self, options: BulkOptions) -> Self {
389        self.options = options;
390        self
391    }
392
393    /// Set the batch size.
394    #[must_use]
395    pub fn batch_size(mut self, size: usize) -> Self {
396        self.options.batch_size = size;
397        self
398    }
399
400    /// Enable or disable table lock.
401    #[must_use]
402    pub fn table_lock(mut self, enabled: bool) -> Self {
403        self.options.table_lock = enabled;
404        self
405    }
406
407    /// Enable or disable trigger firing.
408    #[must_use]
409    pub fn fire_triggers(mut self, enabled: bool) -> Self {
410        self.options.fire_triggers = enabled;
411        self
412    }
413
414    /// Get the table name.
415    pub fn table_name(&self) -> &str {
416        &self.table_name
417    }
418
419    /// Get the columns.
420    pub fn columns(&self) -> &[BulkColumn] {
421        &self.columns
422    }
423
424    /// Get the options.
425    pub fn options(&self) -> &BulkOptions {
426        &self.options
427    }
428
429    /// Build the INSERT BULK SQL statement.
430    ///
431    /// # Errors
432    ///
433    /// Returns an error if the table name or any column name fails identifier
434    /// validation, preventing SQL injection.
435    pub fn build_insert_bulk_statement(&self) -> Result<String, Error> {
436        // Validate table name (may be schema-qualified: dbo.Users, catalog.schema.table)
437        crate::validation::validate_qualified_identifier(&self.table_name)?;
438
439        // Validate column names
440        for col in &self.columns {
441            crate::validation::validate_identifier(&col.name)?;
442        }
443
444        let mut sql = format!("INSERT BULK {}", self.table_name);
445
446        // Add column definitions
447        if !self.columns.is_empty() {
448            sql.push_str(" (");
449            let cols: Vec<String> = self
450                .columns
451                .iter()
452                .map(|c| {
453                    // Validate sql_type to prevent SQL injection: only allow
454                    // alphanumerics, parentheses (for length/precision), commas,
455                    // spaces, and the MAX keyword — which covers all valid T-SQL
456                    // type specifiers like "NVARCHAR(100)", "DECIMAL(18, 2)",
457                    // "VARBINARY(MAX)", etc.
458                    validate_sql_type(&c.sql_type)?;
459                    Ok(format!("{} {}", c.name, c.sql_type))
460                })
461                .collect::<Result<Vec<_>, Error>>()?;
462            sql.push_str(&cols.join(", "));
463            sql.push(')');
464        }
465
466        // Add WITH clause for options
467        let mut hints: Vec<String> = Vec::new();
468
469        if self.options.check_constraints {
470            hints.push("CHECK_CONSTRAINTS".to_string());
471        }
472        if self.options.fire_triggers {
473            hints.push("FIRE_TRIGGERS".to_string());
474        }
475        if self.options.keep_nulls {
476            hints.push("KEEP_NULLS".to_string());
477        }
478        if self.options.table_lock {
479            hints.push("TABLOCK".to_string());
480        }
481        if self.options.batch_size > 0 {
482            hints.push(format!("ROWS_PER_BATCH = {}", self.options.batch_size));
483        }
484
485        if let Some(ref order) = self.options.order_hint {
486            // Validate order hint column names
487            for col_name in order {
488                crate::validation::validate_identifier(col_name)?;
489            }
490            hints.push(format!("ORDER({})", order.join(", ")));
491        }
492
493        if !hints.is_empty() {
494            sql.push_str(" WITH (");
495            sql.push_str(&hints.join(", "));
496            sql.push(')');
497        }
498
499        Ok(sql)
500    }
501}
502
503/// Validate a SQL type specifier to prevent SQL injection.
504///
505/// Allows only characters that can appear in valid T-SQL type declarations:
506/// letters, digits, parentheses, commas, spaces, and periods.
507/// Examples: "INT", "NVARCHAR(100)", "DECIMAL(18, 2)", "VARBINARY(MAX)".
508fn validate_sql_type(type_str: &str) -> Result<(), Error> {
509    #[allow(clippy::expect_used)] // Static regex compilation with known-valid pattern
510    static SQL_TYPE_RE: Lazy<Regex> =
511        Lazy::new(|| Regex::new(r"^[a-zA-Z][a-zA-Z0-9_ ()\.,]{0,127}$").expect("valid regex"));
512
513    if type_str.is_empty() {
514        return Err(Error::Config("SQL type cannot be empty".into()));
515    }
516
517    if !SQL_TYPE_RE.is_match(type_str) {
518        return Err(Error::Config(format!(
519            "invalid SQL type '{type_str}': contains disallowed characters"
520        )));
521    }
522
523    Ok(())
524}
525
526/// Active bulk insert operation.
527///
528/// This struct manages the streaming of row data to the server.
529/// Call `send_row()` for each row, then `finish()` to complete.
530pub struct BulkInsert {
531    /// Column metadata.
532    columns: Arc<[BulkColumn]>,
533    /// Whether each column uses a fixed-length type on the wire.
534    /// When true, row values for that column are written without a length prefix.
535    fixed_len: Arc<[bool]>,
536    /// Buffer for accumulating rows.
537    buffer: BytesMut,
538    /// Rows in current batch.
539    rows_in_batch: usize,
540    /// Total rows sent.
541    total_rows: u64,
542    /// Batch size (0 = single batch).
543    batch_size: usize,
544    /// Number of batches committed.
545    batches_committed: u32,
546    /// Packet ID counter.
547    packet_id: u8,
548}
549
550impl BulkInsert {
551    /// Create a new bulk insert operation.
552    pub fn new(columns: Vec<BulkColumn>, batch_size: usize) -> Self {
553        Self::new_with_server_metadata(columns, batch_size, None, None)
554    }
555
556    /// Create a new bulk insert operation using server metadata.
557    ///
558    /// When `raw_colmetadata` is provided, it is written directly into the
559    /// BulkLoad buffer, ensuring the COLMETADATA matches the server's exact
560    /// encoding. `server_columns` provides per-column type info so row values
561    /// are encoded correctly (fixed-length types have no length prefix).
562    ///
563    /// This follows the pattern used by Tiberius: the server's own metadata
564    /// from `SELECT TOP 0` is echoed back rather than constructing it from
565    /// user-specified types.
566    pub fn new_with_server_metadata(
567        mut columns: Vec<BulkColumn>,
568        batch_size: usize,
569        raw_colmetadata: Option<bytes::Bytes>,
570        server_columns: Option<&[tds_protocol::token::ColumnData]>,
571    ) -> Self {
572        // Determine which columns use fixed-length types on the wire.
573        // Fixed-length types omit the per-row length prefix.
574        let fixed_len: Vec<bool> = if let Some(srv_cols) = server_columns {
575            // Propagate collation from server metadata for VARCHAR/CHAR columns.
576            // The user's BulkColumn is constructed from type strings alone and
577            // has no collation until we see the server's COLMETADATA — falling
578            // back to the default Latin1 on NON-Latin servers would silently
579            // corrupt extended characters.
580            for (col, srv) in columns.iter_mut().zip(srv_cols.iter()) {
581                if col.collation.is_none() {
582                    col.collation = srv.type_info.collation;
583                }
584            }
585            srv_cols
586                .iter()
587                .map(|c| c.type_id.is_fixed_length())
588                .collect()
589        } else {
590            // Without server metadata, NOT NULL columns of fixed-width types
591            // must use the fixed type ID variant (e.g. INT NOT NULL uses 0x38
592            // Int4, not 0x26 INTN). SQL Server rejects nullable type IDs for
593            // NOT NULL target columns with error 4816.
594            columns
595                .iter()
596                .map(|c| !c.nullable && nullable_to_fixed_type(c.type_id, c.max_length).is_some())
597                .collect()
598        };
599
600        let mut bulk = Self {
601            columns: columns.into(),
602            fixed_len: fixed_len.into(),
603            buffer: BytesMut::with_capacity(64 * 1024),
604            rows_in_batch: 0,
605            total_rows: 0,
606            batch_size,
607            batches_committed: 0,
608            packet_id: 1,
609        };
610
611        if let Some(raw) = raw_colmetadata {
612            bulk.buffer.extend_from_slice(&raw);
613        } else {
614            bulk.write_colmetadata();
615        }
616
617        bulk
618    }
619
620    /// Write the COLMETADATA token to the buffer.
621    fn write_colmetadata(&mut self) {
622        let buf = &mut self.buffer;
623
624        // Token type
625        buf.put_u8(TokenType::ColMetaData as u8);
626
627        // Column count
628        buf.put_u16_le(self.columns.len() as u16);
629
630        for col in self.columns.iter() {
631            // User type (always 0 for basic types)
632            buf.put_u32_le(0);
633
634            // For NOT NULL columns with a fixed-width type, use the fixed type ID
635            // variant (e.g. INT NOT NULL → 0x38 Int4 instead of 0x26 INTN).
636            // SQL Server's BCP rejects nullable type IDs for NOT NULL columns.
637            let effective_type_id = if !col.nullable {
638                nullable_to_fixed_type(col.type_id, col.max_length).unwrap_or(col.type_id)
639            } else {
640                col.type_id
641            };
642            let is_fixed_variant = effective_type_id != col.type_id;
643
644            // Flags: Nullable (bit 0) | Updateable (bit 3)
645            // BulkLoad columns must have Updateable set to indicate they accept writes.
646            let mut flags: u16 = 0x0008; // Updateable
647            if col.nullable {
648                flags |= 0x0001; // Nullable
649            }
650            buf.put_u16_le(flags);
651
652            // Type info
653            buf.put_u8(effective_type_id);
654
655            // Fixed-width types have no additional TYPE_INFO bytes — skip straight
656            // to the column name.
657            if is_fixed_variant {
658                let name_utf16: Vec<u16> = col.name.encode_utf16().collect();
659                buf.put_u8(name_utf16.len() as u8);
660                for code_unit in name_utf16 {
661                    buf.put_u16_le(code_unit);
662                }
663                continue;
664            }
665
666            // Type-specific length/precision/scale
667            match col.type_id {
668                // Nullable fixed-length types — 1-byte max-length follows type ID
669                // INTN(0x26), BITN(0x68), FLTN(0x6D), MONEYN(0x6E), DATETIMEN(0x6F)
670                0x26 | 0x68 | 0x6D | 0x6E | 0x6F => {
671                    buf.put_u8(col.max_length.unwrap_or(4) as u8);
672                }
673
674                // DATE has no length byte (fixed 3-byte value)
675                0x28 => {}
676
677                // Variable-length string/binary types
678                0xE7 | 0xA7 | 0xA5 | 0xAD => {
679                    // Max length (2 bytes for normal, 4 bytes for MAX)
680                    let max_len = col.max_length.unwrap_or(8000);
681                    if max_len == 0xFFFF {
682                        buf.put_u16_le(0xFFFF);
683                    } else {
684                        buf.put_u16_le(max_len as u16);
685                    }
686
687                    // Collation for string types (5 bytes). Use the caller-
688                    // supplied collation when present (via `with_collation()`),
689                    // otherwise fall back to Latin1_General_CI_AS.
690                    if col.type_id == 0xE7 || col.type_id == 0xA7 {
691                        if let Some(coll) = col.collation.as_ref() {
692                            buf.put_slice(&coll.to_bytes());
693                        } else {
694                            // Default collation: Latin1_General_CI_AS
695                            // Bytes: LCID(0x0409) + flags(0xD000) + SortId(0x34)
696                            buf.put_slice(&[0x09, 0x04, 0xD0, 0x00, 0x34]);
697                        }
698                    }
699                }
700
701                // Decimal/Numeric
702                0x6C | 0x6A => {
703                    // Length (calculated from precision)
704                    let precision = col.precision.unwrap_or(18);
705                    let len = decimal_byte_length(precision);
706                    buf.put_u8(len);
707                    buf.put_u8(precision);
708                    buf.put_u8(col.scale.unwrap_or(0));
709                }
710
711                // Time-based with scale
712                0x29..=0x2B => {
713                    buf.put_u8(col.scale.unwrap_or(7));
714                }
715
716                // GUID
717                0x24 => {
718                    buf.put_u8(16);
719                }
720
721                // Other types - write max length if present
722                _ => {
723                    if let Some(len) = col.max_length {
724                        if len <= 0xFFFF {
725                            buf.put_u16_le(len as u16);
726                        }
727                    }
728                }
729            }
730
731            // Column name (B_VARCHAR format: 1-byte length prefix)
732            let name_utf16: Vec<u16> = col.name.encode_utf16().collect();
733            buf.put_u8(name_utf16.len() as u8);
734            for code_unit in name_utf16 {
735                buf.put_u16_le(code_unit);
736            }
737        }
738    }
739
740    /// Send a row of data.
741    ///
742    /// The values must match the column order and types specified
743    /// when creating the bulk insert.
744    ///
745    /// # Errors
746    ///
747    /// Returns an error if:
748    /// - Wrong number of values provided
749    /// - A value cannot be converted to the expected type
750    pub fn send_row<T: ToSql>(&mut self, values: &[T]) -> Result<(), Error> {
751        if values.len() != self.columns.len() {
752            return Err(Error::Config(format!(
753                "expected {} values, got {}",
754                self.columns.len(),
755                values.len()
756            )));
757        }
758
759        // Convert all values to SqlValue
760        let sql_values: Result<Vec<SqlValue>, TypeError> =
761            values.iter().map(|v| v.to_sql()).collect();
762        let sql_values = sql_values.map_err(Error::from)?;
763
764        self.write_row(&sql_values)?;
765
766        self.rows_in_batch += 1;
767        self.total_rows += 1;
768
769        Ok(())
770    }
771
772    /// Send a row of pre-converted SQL values.
773    pub fn send_row_values(&mut self, values: &[SqlValue]) -> Result<(), Error> {
774        if values.len() != self.columns.len() {
775            return Err(Error::Config(format!(
776                "expected {} values, got {}",
777                self.columns.len(),
778                values.len()
779            )));
780        }
781
782        self.write_row(values)?;
783
784        self.rows_in_batch += 1;
785        self.total_rows += 1;
786
787        Ok(())
788    }
789
790    /// Write a ROW token to the buffer.
791    fn write_row(&mut self, values: &[SqlValue]) -> Result<(), Error> {
792        // ROW token type
793        self.buffer.put_u8(TokenType::Row as u8);
794
795        // Collect column info needed for encoding to avoid borrow conflict
796        let columns: Vec<_> = self.columns.iter().cloned().collect();
797        let fixed_len = self.fixed_len.clone();
798
799        // Write each column value
800        for (i, (col, value)) in columns.iter().zip(values.iter()).enumerate() {
801            let is_fixed = *fixed_len.get(i).unwrap_or(&false);
802            self.encode_column_value(col, value, is_fixed)
803                .map_err(|e| Error::Config(format!("failed to encode column {i}: {e}")))?;
804        }
805
806        Ok(())
807    }
808
809    /// Encode a column value according to its type.
810    ///
811    /// When `is_fixed` is true, the column uses a fixed-length type on the wire
812    /// and values are written without a length prefix. When false, values include
813    /// a length prefix (1 byte for numeric nullable types, 2 bytes for strings).
814    fn encode_column_value(
815        &mut self,
816        col: &BulkColumn,
817        value: &SqlValue,
818        is_fixed: bool,
819    ) -> Result<(), TypeError> {
820        let buf = &mut self.buffer;
821
822        // Check if this column uses PLP (Partially Length-Prefixed) encoding
823        // MAX types (max_length == 0xFFFF) use PLP format
824        let is_plp_type =
825            col.max_length == Some(0xFFFF) && matches!(col.type_id, 0xE7 | 0xA7 | 0xA5 | 0xAD);
826
827        match value {
828            SqlValue::Null => {
829                // NULL encoding depends on type
830                match col.type_id {
831                    // Variable-length types
832                    0xE7 | 0xA7 | 0xA5 | 0xAD => {
833                        if is_plp_type {
834                            // PLP NULL: 0xFFFFFFFFFFFFFFFF
835                            buf.put_u64_le(0xFFFF_FFFF_FFFF_FFFF);
836                        } else {
837                            // Standard NULL: 0xFFFF length marker
838                            buf.put_u16_le(0xFFFF);
839                        }
840                    }
841                    // Nullable fixed types use 0 length
842                    // INTN, BITN, FLTN, MONEYN, DATETIMEN, Decimal, GUID, temporal
843                    0x26 | 0x68 | 0x6D | 0x6E | 0x6F | 0x6C | 0x6A | 0x24 | 0x28 | 0x29 | 0x2A
844                    | 0x2B => {
845                        buf.put_u8(0);
846                    }
847                    // Fixed types without nullable variant
848                    _ => {
849                        if col.nullable {
850                            buf.put_u8(0);
851                        } else {
852                            return Err(TypeError::UnexpectedNull);
853                        }
854                    }
855                }
856            }
857
858            SqlValue::Bool(v) => {
859                if !is_fixed {
860                    buf.put_u8(1);
861                }
862                buf.put_u8(if *v { 1 } else { 0 });
863            }
864
865            SqlValue::TinyInt(v) => {
866                if !is_fixed {
867                    buf.put_u8(1);
868                }
869                buf.put_u8(*v);
870            }
871
872            SqlValue::SmallInt(v) => {
873                if !is_fixed {
874                    buf.put_u8(2);
875                }
876                buf.put_i16_le(*v);
877            }
878
879            SqlValue::Int(v) => {
880                if !is_fixed {
881                    buf.put_u8(4);
882                }
883                buf.put_i32_le(*v);
884            }
885
886            SqlValue::BigInt(v) => {
887                if !is_fixed {
888                    buf.put_u8(8);
889                }
890                buf.put_i64_le(*v);
891            }
892
893            SqlValue::Float(v) => {
894                if !is_fixed {
895                    buf.put_u8(4);
896                }
897                buf.put_f32_le(*v);
898            }
899
900            SqlValue::Double(v) => {
901                if !is_fixed {
902                    buf.put_u8(8);
903                }
904                buf.put_f64_le(*v);
905            }
906
907            SqlValue::String(s) => {
908                // NVARCHAR/NCHAR columns (0xE7/0xEF) use UTF-16LE on the wire.
909                // VARCHAR/CHAR/BIGCHAR columns (0xA7/0x2F/0xAF) use the
910                // collation's code page for single-byte encoding — writing UTF-16
911                // into a VARCHAR column lands each surrogate half in its own
912                // single-byte slot and silently corrupts the data.
913                let is_varchar = matches!(col.type_id, 0xA7 | 0x2F | 0xAF);
914
915                if is_varchar {
916                    let encoded = encode_varchar_for_collation(s, col.collation.as_ref());
917                    let byte_len = encoded.len();
918
919                    if is_plp_type {
920                        encode_plp_binary(&encoded, buf);
921                    } else if byte_len > 0xFFFF {
922                        return Err(TypeError::BufferTooSmall {
923                            needed: byte_len,
924                            available: 0xFFFF,
925                        });
926                    } else {
927                        buf.put_u16_le(byte_len as u16);
928                        buf.put_slice(&encoded);
929                    }
930                } else {
931                    // UTF-16LE encoding for NVARCHAR
932                    let utf16: Vec<u16> = s.encode_utf16().collect();
933                    let byte_len = utf16.len() * 2;
934
935                    if is_plp_type {
936                        // PLP format for MAX types - supports unlimited size
937                        // Send as a single chunk for simplicity
938                        encode_plp_string(&utf16, buf);
939                    } else if byte_len > 0xFFFF {
940                        // Non-MAX column can't hold this much data
941                        return Err(TypeError::BufferTooSmall {
942                            needed: byte_len,
943                            available: 0xFFFF,
944                        });
945                    } else {
946                        // Standard encoding with 2-byte length prefix
947                        buf.put_u16_le(byte_len as u16);
948                        for code_unit in utf16 {
949                            buf.put_u16_le(code_unit);
950                        }
951                    }
952                }
953            }
954
955            SqlValue::Binary(b) => {
956                if is_plp_type {
957                    // PLP format for MAX types - supports unlimited size
958                    encode_plp_binary(b, buf);
959                } else if b.len() > 0xFFFF {
960                    // Non-MAX column can't hold this much data
961                    return Err(TypeError::BufferTooSmall {
962                        needed: b.len(),
963                        available: 0xFFFF,
964                    });
965                } else {
966                    // Standard encoding with 2-byte length prefix
967                    buf.put_u16_le(b.len() as u16);
968                    buf.put_slice(b);
969                }
970            }
971
972            // Feature-gated types - use mssql_types::encode module
973            #[cfg(feature = "decimal")]
974            SqlValue::Decimal(d) => {
975                if col.type_id == 0x6E {
976                    // MONEY / SMALLMONEY — fixed-point scaled by 10_000, not DECIMAL format.
977                    encode_money_value(*d, col, buf, is_fixed)?;
978                } else {
979                    let precision = col.precision.unwrap_or(18);
980                    let len = decimal_byte_length(precision);
981                    buf.put_u8(len);
982
983                    // Sign: 0 = negative, 1 = positive
984                    buf.put_u8(if d.is_sign_negative() { 0 } else { 1 });
985
986                    // Mantissa as unsigned 128-bit integer
987                    let mantissa = d.mantissa().unsigned_abs();
988                    let mantissa_bytes = mantissa.to_le_bytes();
989                    buf.put_slice(&mantissa_bytes[..((len - 1) as usize)]);
990                }
991            }
992
993            #[cfg(feature = "uuid")]
994            SqlValue::Uuid(u) => {
995                buf.put_u8(16); // Length
996                // Use mssql_types encode function
997                mssql_types::encode::encode_uuid(*u, buf);
998            }
999
1000            #[cfg(feature = "chrono")]
1001            SqlValue::Date(d) => {
1002                buf.put_u8(3); // Length
1003                mssql_types::encode::encode_date(*d, buf);
1004            }
1005
1006            #[cfg(feature = "chrono")]
1007            SqlValue::Time(t) => {
1008                let scale = col.scale.unwrap_or(7);
1009                let len = time_byte_length(scale);
1010                buf.put_u8(len);
1011                // Encode time with proper scale handling
1012                encode_time_with_scale(*t, scale, buf);
1013            }
1014
1015            #[cfg(feature = "chrono")]
1016            SqlValue::DateTime(dt) => {
1017                // Type 0x6F is DATETIMEN — legacy DATETIME (8 bytes) or
1018                // SMALLDATETIME (4 bytes) format selected by max_length. The
1019                // wire format differs from DATETIME2 (type 0x2A), which uses a
1020                // scale-aware time-then-date layout.
1021                if col.type_id == 0x6F {
1022                    let total_len = col.max_length.unwrap_or(8) as u8;
1023                    if !is_fixed {
1024                        buf.put_u8(total_len);
1025                    }
1026                    match total_len {
1027                        8 => mssql_types::encode::encode_datetime_legacy(*dt, buf),
1028                        4 => mssql_types::encode::encode_smalldatetime(*dt, buf)?,
1029                        _ => {
1030                            return Err(TypeError::InvalidDateTime(format!(
1031                                "DATETIMEN max_length must be 4 or 8, got {total_len}"
1032                            )));
1033                        }
1034                    }
1035                } else {
1036                    let scale = col.scale.unwrap_or(7);
1037                    let time_len = time_byte_length(scale);
1038                    let total_len = time_len + 3;
1039                    buf.put_u8(total_len);
1040                    // Encode time then date
1041                    encode_time_with_scale(dt.time(), scale, buf);
1042                    mssql_types::encode::encode_date(dt.date(), buf);
1043                }
1044            }
1045            #[cfg(feature = "chrono")]
1046            SqlValue::SmallDateTime(dt) => {
1047                // Explicit SMALLDATETIME variant — always 4-byte days+minutes,
1048                // regardless of column metadata.
1049                if !is_fixed {
1050                    buf.put_u8(4);
1051                }
1052                mssql_types::encode::encode_smalldatetime(*dt, buf)?;
1053            }
1054            #[cfg(feature = "decimal")]
1055            SqlValue::Money(d) => {
1056                // Force 8-byte MONEY encoding regardless of column metadata.
1057                if !is_fixed {
1058                    buf.put_u8(8);
1059                }
1060                mssql_types::encode::encode_money(*d, buf)?;
1061            }
1062            #[cfg(feature = "decimal")]
1063            SqlValue::SmallMoney(d) => {
1064                if !is_fixed {
1065                    buf.put_u8(4);
1066                }
1067                mssql_types::encode::encode_smallmoney(*d, buf)?;
1068            }
1069
1070            #[cfg(feature = "chrono")]
1071            SqlValue::DateTimeOffset(dto) => {
1072                let scale = col.scale.unwrap_or(7);
1073                let time_len = time_byte_length(scale);
1074                let total_len = time_len + 3 + 2;
1075                buf.put_u8(total_len);
1076                // Use mssql_types encode
1077                encode_time_with_scale(dto.time(), scale, buf);
1078                mssql_types::encode::encode_date(dto.date_naive(), buf);
1079                // Timezone offset in minutes
1080                use chrono::Offset;
1081                let offset_minutes = (dto.offset().fix().local_minus_utc() / 60) as i16;
1082                buf.put_i16_le(offset_minutes);
1083            }
1084
1085            #[cfg(feature = "json")]
1086            SqlValue::Json(j) => {
1087                let s = j.to_string();
1088                encode_nvarchar_value(&s, buf)?;
1089            }
1090
1091            SqlValue::Xml(x) => {
1092                encode_nvarchar_value(x, buf)?;
1093            }
1094
1095            SqlValue::Tvp(_) => {
1096                // TVPs are not valid in bulk copy operations - they're for RPC parameters only
1097                return Err(TypeError::UnsupportedConversion {
1098                    from: "TVP".to_string(),
1099                    to: "bulk copy value",
1100                });
1101            }
1102            // Handle future SqlValue variants
1103            _ => {
1104                return Err(TypeError::UnsupportedConversion {
1105                    from: value.type_name().to_string(),
1106                    to: "bulk copy value",
1107                });
1108            }
1109        }
1110
1111        Ok(())
1112    }
1113}
1114
1115/// Encode a MONEY or SMALLMONEY column value with the appropriate length prefix.
1116///
1117/// When `is_fixed` is true (fixed type ID 0x3C or 0x7A), no length byte
1118/// precedes the payload. Otherwise a 1-byte length prefix is written
1119/// (matching the MONEYN nullable variant).
1120#[cfg(feature = "decimal")]
1121fn encode_money_value(
1122    value: rust_decimal::Decimal,
1123    col: &BulkColumn,
1124    buf: &mut BytesMut,
1125    is_fixed: bool,
1126) -> Result<(), TypeError> {
1127    let money_bytes: u8 = col.max_length.unwrap_or(8) as u8;
1128    if !is_fixed {
1129        buf.put_u8(money_bytes);
1130    }
1131    match money_bytes {
1132        4 => mssql_types::encode::encode_smallmoney(value, buf),
1133        8 => mssql_types::encode::encode_money(value, buf),
1134        _ => Err(TypeError::InvalidDecimal(format!(
1135            "MONEY column has invalid max_length: {money_bytes}"
1136        ))),
1137    }
1138}
1139
1140/// Encode a string as NVARCHAR with length prefix.
1141fn encode_nvarchar_value(s: &str, buf: &mut BytesMut) -> Result<(), TypeError> {
1142    let utf16: Vec<u16> = s.encode_utf16().collect();
1143    let byte_len = utf16.len() * 2;
1144
1145    if byte_len > 0xFFFF {
1146        return Err(TypeError::BufferTooSmall {
1147            needed: byte_len,
1148            available: 0xFFFF,
1149        });
1150    }
1151
1152    buf.put_u16_le(byte_len as u16);
1153    for code_unit in utf16 {
1154        buf.put_u16_le(code_unit);
1155    }
1156    Ok(())
1157}
1158
1159/// PLP marker for an unknown total length (MS-TDS 2.2.5.2.3).
1160/// When the client doesn't know or doesn't wish to compute the total in advance,
1161/// the 8-byte ULONGLONGLEN is set to this value and the server relies on chunk
1162/// framing + the 4-byte terminator to detect the end.
1163const PLP_UNKNOWN_LEN: u64 = 0xFFFFFFFFFFFFFFFE;
1164
1165/// Encode a UTF-16 string using PLP (Partially Length-Prefixed) format.
1166///
1167/// PLP format (per MS-TDS 2.2.5.2.3):
1168/// - 8 bytes: ULONGLONGLEN — PLP_UNKNOWN_LEN or actual total byte count
1169/// - One or more chunks: 4-byte chunk length + chunk bytes
1170/// - Terminator: 4-byte zero
1171///
1172/// We emit `PLP_UNKNOWN_LEN` for compatibility with SQL Server's BulkLoad
1173/// parser. Empirically, some server versions reject a concrete total length
1174/// in the BulkLoad (0x07) path even though the token-stream spec allows it
1175/// ("premature end-of-message" errors for NVARCHAR(MAX) bulk inserts).
1176/// Tiberius uses the same approach.
1177fn encode_plp_string(utf16: &[u16], buf: &mut BytesMut) {
1178    let byte_len = utf16.len() * 2;
1179
1180    buf.put_u64_le(PLP_UNKNOWN_LEN);
1181
1182    if byte_len > 0 {
1183        buf.put_u32_le(byte_len as u32);
1184        for code_unit in utf16 {
1185            buf.put_u16_le(*code_unit);
1186        }
1187    }
1188
1189    buf.put_u32_le(0);
1190}
1191
1192/// Encode binary data using PLP (Partially Length-Prefixed) format.
1193/// See [`encode_plp_string`] for the format specification.
1194fn encode_plp_binary(data: &[u8], buf: &mut BytesMut) {
1195    buf.put_u64_le(PLP_UNKNOWN_LEN);
1196
1197    if !data.is_empty() {
1198        buf.put_u32_le(data.len() as u32);
1199        buf.put_slice(data);
1200    }
1201
1202    buf.put_u32_le(0);
1203}
1204
1205/// Encode a Rust string into single-byte VARCHAR bytes using the column's collation.
1206///
1207/// Delegates to [`tds_protocol::collation::encode_str_for_collation`] so the
1208/// RPC parameter path and the bulk insert path share one implementation.
1209fn encode_varchar_for_collation(value: &str, collation: Option<&Collation>) -> Vec<u8> {
1210    tds_protocol::collation::encode_str_for_collation(value, collation)
1211}
1212
1213/// Encode time with specific scale (for bulk copy).
1214#[cfg(feature = "chrono")]
1215fn encode_time_with_scale(time: chrono::NaiveTime, scale: u8, buf: &mut BytesMut) {
1216    use chrono::Timelike;
1217
1218    let nanos = time.num_seconds_from_midnight() as u64 * 1_000_000_000 + time.nanosecond() as u64;
1219    let intervals = nanos / time_scale_divisor(scale);
1220    let len = time_byte_length(scale);
1221
1222    for i in 0..len {
1223        buf.put_u8(((intervals >> (i * 8)) & 0xFF) as u8);
1224    }
1225}
1226
1227impl BulkInsert {
1228    /// Write the DONE token signaling completion.
1229    fn write_done(&mut self) {
1230        let buf = &mut self.buffer;
1231
1232        buf.put_u8(TokenType::Done as u8);
1233
1234        // Status: FINAL (0x00) | COUNT (0x10)
1235        let status = DoneStatus::from_bits(0x0010); // DONE_COUNT
1236        buf.put_u16_le(status.to_bits());
1237
1238        // Current command (0 for bulk load)
1239        buf.put_u16_le(0);
1240
1241        // Row count
1242        buf.put_u64_le(self.total_rows);
1243    }
1244
1245    /// Get the buffered data as packets ready to send.
1246    ///
1247    /// Returns a vector of complete TDS packets with BulkLoad packet type (0x07).
1248    pub fn take_packets(&mut self) -> Vec<BytesMut> {
1249        const MAX_PACKET_SIZE: usize = 4096;
1250        const HEADER_SIZE: usize = 8;
1251        const MAX_PAYLOAD: usize = MAX_PACKET_SIZE - HEADER_SIZE;
1252
1253        let data = self.buffer.split();
1254        let mut packets = Vec::new();
1255        let mut offset = 0;
1256
1257        while offset < data.len() {
1258            let remaining = data.len() - offset;
1259            let payload_size = remaining.min(MAX_PAYLOAD);
1260            let is_last = offset + payload_size >= data.len();
1261
1262            let mut packet = BytesMut::with_capacity(MAX_PACKET_SIZE);
1263
1264            // Write packet header
1265            let header = PacketHeader {
1266                packet_type: PacketType::BulkLoad,
1267                status: if is_last {
1268                    PacketStatus::END_OF_MESSAGE
1269                } else {
1270                    PacketStatus::NORMAL
1271                },
1272                length: (HEADER_SIZE + payload_size) as u16,
1273                spid: 0,
1274                packet_id: self.packet_id,
1275                window: 0,
1276            };
1277
1278            header.encode(&mut packet);
1279
1280            // Write payload
1281            packet.put_slice(&data[offset..offset + payload_size]);
1282
1283            packets.push(packet);
1284            offset += payload_size;
1285            self.packet_id = self.packet_id.wrapping_add(1);
1286        }
1287
1288        packets
1289    }
1290
1291    /// Get total rows sent so far.
1292    pub fn total_rows(&self) -> u64 {
1293        self.total_rows
1294    }
1295
1296    /// Get rows in current batch.
1297    pub fn rows_in_batch(&self) -> usize {
1298        self.rows_in_batch
1299    }
1300
1301    /// Check if a batch flush is needed.
1302    pub fn should_flush(&self) -> bool {
1303        self.batch_size > 0 && self.rows_in_batch >= self.batch_size
1304    }
1305
1306    /// Prepare for finishing the bulk operation.
1307    /// Writes the DONE token and returns final packets.
1308    pub fn finish_packets(&mut self) -> Vec<BytesMut> {
1309        self.write_done();
1310        self.take_packets()
1311    }
1312
1313    /// Create a result from the current state.
1314    pub fn result(&self) -> BulkInsertResult {
1315        BulkInsertResult {
1316            rows_affected: self.total_rows,
1317            batches_committed: self.batches_committed,
1318            has_errors: false,
1319        }
1320    }
1321}
1322
1323/// Active streaming writer for bulk insert operations.
1324///
1325/// Created via [`crate::client::Client::bulk_insert()`]. Rows are buffered in
1326/// memory as they are added with [`send_row()`](BulkWriter::send_row), then
1327/// transmitted to the server when [`finish()`](BulkWriter::finish) is called.
1328///
1329/// The writer holds a mutable reference to the [`crate::Client`], preventing
1330/// other operations on the connection while the bulk insert is in progress.
1331///
1332/// # Example
1333///
1334/// ```rust,ignore
1335/// let builder = BulkInsertBuilder::new("dbo.Users")
1336///     .with_typed_columns(vec![
1337///         BulkColumn::new("id", "INT", 0)?,
1338///         BulkColumn::new("name", "NVARCHAR(100)", 1)?,
1339///     ]);
1340///
1341/// let mut writer = client.bulk_insert(&builder).await?;
1342/// writer.send_row(&[&1i32, &"Alice"])?;
1343/// writer.send_row(&[&2i32, &"Bob"])?;
1344/// let result = writer.finish().await?;
1345/// ```
1346pub struct BulkWriter<'a, S: crate::state::ConnectionState> {
1347    client: &'a mut crate::client::Client<S>,
1348    bulk: BulkInsert,
1349}
1350
1351impl<'a, S: crate::state::ConnectionState> BulkWriter<'a, S> {
1352    /// Create a new bulk writer.
1353    pub(crate) fn new(client: &'a mut crate::client::Client<S>, bulk: BulkInsert) -> Self {
1354        Self { client, bulk }
1355    }
1356
1357    /// Add a row to the bulk insert buffer.
1358    ///
1359    /// Values are encoded immediately but not sent to the server until
1360    /// [`finish()`](BulkWriter::finish) is called. The number of values must
1361    /// match the number of columns defined for this bulk insert.
1362    pub fn send_row<T: ToSql>(&mut self, values: &[T]) -> Result<(), Error> {
1363        self.bulk.send_row(values)
1364    }
1365
1366    /// Add a row of pre-converted SQL values to the buffer.
1367    pub fn send_row_values(&mut self, values: &[SqlValue]) -> Result<(), Error> {
1368        self.bulk.send_row_values(values)
1369    }
1370
1371    /// Get the number of rows buffered so far.
1372    pub fn total_rows(&self) -> u64 {
1373        self.bulk.total_rows()
1374    }
1375
1376    /// Finish the bulk insert operation and send all buffered data to the server.
1377    ///
1378    /// Writes the DONE token, sends the accumulated row data as a BulkLoad
1379    /// (0x07) message, and reads the server's response.
1380    pub async fn finish(mut self) -> Result<BulkInsertResult, Error> {
1381        let total_rows = self.bulk.total_rows();
1382        tracing::debug!(total_rows = total_rows, "finishing bulk insert");
1383
1384        // Write DONE token and freeze the payload
1385        self.bulk.write_done();
1386        let payload = self.bulk.buffer.split().freeze();
1387
1388        // Send BulkLoad data and read server response
1389        let rows_affected = self.client.send_and_read_bulk_load(payload).await?;
1390
1391        Ok(BulkInsertResult {
1392            rows_affected,
1393            batches_committed: 1,
1394            has_errors: false,
1395        })
1396    }
1397}
1398
1399/// Map a nullable type ID to its fixed-width counterpart.
1400///
1401/// SQL Server's BulkLoad protocol rejects nullable type IDs (INTN, BITN, etc.)
1402/// when the target column is NOT NULL. For those columns, the fixed type ID
1403/// variant must be sent instead — with no max_length and no per-row length
1404/// prefix.
1405///
1406/// Returns `None` for types that have no fixed-width variant (e.g. NVARCHAR,
1407/// VARBINARY, DECIMAL, temporal types other than DATETIME/SMALLDATETIME).
1408fn nullable_to_fixed_type(type_id: u8, max_length: Option<u32>) -> Option<u8> {
1409    match (type_id, max_length) {
1410        (0x68, _) => Some(0x32),       // BITN → Bit
1411        (0x26, Some(1)) => Some(0x30), // INTN(1) → Int1 (TINYINT)
1412        (0x26, Some(2)) => Some(0x34), // INTN(2) → Int2 (SMALLINT)
1413        (0x26, Some(4)) => Some(0x38), // INTN(4) → Int4 (INT)
1414        (0x26, Some(8)) => Some(0x7F), // INTN(8) → Int8 (BIGINT)
1415        (0x6D, Some(4)) => Some(0x3B), // FLTN(4) → Float4 (REAL)
1416        (0x6D, Some(8)) => Some(0x3E), // FLTN(8) → Float8 (FLOAT)
1417        (0x6E, Some(4)) => Some(0x7A), // MONEYN(4) → Money4 (SMALLMONEY)
1418        (0x6E, Some(8)) => Some(0x3C), // MONEYN(8) → Money (MONEY)
1419        (0x6F, Some(4)) => Some(0x3A), // DATETIMEN(4) → DateTime4 (SMALLDATETIME)
1420        (0x6F, Some(8)) => Some(0x3D), // DATETIMEN(8) → DateTime (DATETIME)
1421        _ => None,
1422    }
1423}
1424
1425/// Calculate byte length for decimal based on precision.
1426fn decimal_byte_length(precision: u8) -> u8 {
1427    match precision {
1428        1..=9 => 5,
1429        10..=19 => 9,
1430        20..=28 => 13,
1431        29..=38 => 17,
1432        _ => 17, // Max precision
1433    }
1434}
1435
1436/// Calculate byte length for time based on scale.
1437#[cfg(feature = "chrono")]
1438fn time_byte_length(scale: u8) -> u8 {
1439    match scale {
1440        0..=2 => 3,
1441        3..=4 => 4,
1442        5..=7 => 5,
1443        _ => 5,
1444    }
1445}
1446
1447/// Get the divisor for time scale.
1448#[cfg(feature = "chrono")]
1449fn time_scale_divisor(scale: u8) -> u64 {
1450    match scale {
1451        0 => 1_000_000_000,
1452        1 => 100_000_000,
1453        2 => 10_000_000,
1454        3 => 1_000_000,
1455        4 => 100_000,
1456        5 => 10_000,
1457        6 => 1_000,
1458        7 => 100,
1459        _ => 100,
1460    }
1461}
1462
1463#[cfg(test)]
1464#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1465mod tests {
1466    use super::*;
1467
1468    #[test]
1469    fn test_bulk_options_default() {
1470        let opts = BulkOptions::default();
1471        assert_eq!(opts.batch_size, 0);
1472        assert!(opts.check_constraints);
1473        assert!(!opts.fire_triggers);
1474        assert!(opts.keep_nulls);
1475        assert!(!opts.table_lock);
1476    }
1477
1478    #[test]
1479    fn test_bulk_column_creation() {
1480        let col = BulkColumn::new("id", "INT", 0).unwrap();
1481        assert_eq!(col.name, "id");
1482        assert_eq!(col.type_id, 0x26); // INTN
1483        assert_eq!(col.max_length, Some(4));
1484        assert!(col.nullable);
1485    }
1486
1487    #[test]
1488    fn test_bulk_column_rejects_text() {
1489        let err = BulkColumn::new("body", "TEXT", 0).unwrap_err();
1490        match err {
1491            TypeError::UnsupportedType { sql_type, reason } => {
1492                assert_eq!(sql_type, "TEXT");
1493                assert!(
1494                    reason.contains("VARCHAR(MAX)"),
1495                    "error should redirect to VARCHAR(MAX), got: {reason}"
1496                );
1497                assert!(
1498                    reason.contains("deprecated"),
1499                    "error should mention deprecation, got: {reason}"
1500                );
1501            }
1502            other => panic!("expected UnsupportedType, got {other:?}"),
1503        }
1504    }
1505
1506    #[test]
1507    fn test_bulk_column_rejects_ntext() {
1508        let err = BulkColumn::new("body", "NTEXT", 0).unwrap_err();
1509        match err {
1510            TypeError::UnsupportedType { sql_type, reason } => {
1511                assert_eq!(sql_type, "NTEXT");
1512                assert!(
1513                    reason.contains("NVARCHAR(MAX)"),
1514                    "error should redirect to NVARCHAR(MAX), got: {reason}"
1515                );
1516                assert!(
1517                    reason.contains("deprecated"),
1518                    "error should mention deprecation, got: {reason}"
1519                );
1520            }
1521            other => panic!("expected UnsupportedType, got {other:?}"),
1522        }
1523    }
1524
1525    #[test]
1526    fn test_bulk_column_rejects_text_case_insensitive() {
1527        assert!(matches!(
1528            BulkColumn::new("body", "text", 0),
1529            Err(TypeError::UnsupportedType { .. })
1530        ));
1531        assert!(matches!(
1532            BulkColumn::new("body", "Ntext", 0),
1533            Err(TypeError::UnsupportedType { .. })
1534        ));
1535    }
1536
1537    #[test]
1538    fn test_bulk_column_rejects_image() {
1539        let err = BulkColumn::new("blob", "IMAGE", 0).unwrap_err();
1540        match err {
1541            TypeError::UnsupportedType { sql_type, reason } => {
1542                assert_eq!(sql_type, "IMAGE");
1543                assert!(
1544                    reason.contains("VARBINARY(MAX)"),
1545                    "error should redirect to VARBINARY(MAX), got: {reason}"
1546                );
1547                assert!(
1548                    reason.contains("deprecated"),
1549                    "error should mention deprecation, got: {reason}"
1550                );
1551            }
1552            other => panic!("expected UnsupportedType, got {other:?}"),
1553        }
1554    }
1555
1556    #[test]
1557    fn test_bulk_column_rejects_image_case_insensitive() {
1558        assert!(matches!(
1559            BulkColumn::new("blob", "image", 0),
1560            Err(TypeError::UnsupportedType { .. })
1561        ));
1562        assert!(matches!(
1563            BulkColumn::new("blob", "Image", 0),
1564            Err(TypeError::UnsupportedType { .. })
1565        ));
1566    }
1567
1568    #[test]
1569    fn test_parse_sql_type() {
1570        // Integer types → INTN (0x26) with appropriate length
1571        let (type_id, len, _prec, _scale) = parse_sql_type("INT");
1572        assert_eq!(type_id, 0x26);
1573        assert_eq!(len, Some(4));
1574
1575        let (type_id, len, _, _) = parse_sql_type("NVARCHAR(100)");
1576        assert_eq!(type_id, 0xE7);
1577        assert_eq!(len, Some(200)); // UTF-16 doubles
1578
1579        let (type_id, _, prec, scale) = parse_sql_type("DECIMAL(10,2)");
1580        assert_eq!(type_id, 0x6C);
1581        assert_eq!(prec, Some(10));
1582        assert_eq!(scale, Some(2));
1583
1584        // SMALLDATETIME/DATETIME → DATETIMEN (0x6F)
1585        let (type_id, len, _, _) = parse_sql_type("SMALLDATETIME");
1586        assert_eq!(type_id, 0x6F);
1587        assert_eq!(len, Some(4));
1588
1589        let (type_id, len, _, _) = parse_sql_type("DATETIME");
1590        assert_eq!(type_id, 0x6F);
1591        assert_eq!(len, Some(8));
1592    }
1593
1594    #[test]
1595    fn test_insert_bulk_statement() {
1596        let builder = BulkInsertBuilder::new("dbo.Users")
1597            .with_typed_columns(vec![
1598                BulkColumn::new("id", "INT", 0).unwrap(),
1599                BulkColumn::new("name", "NVARCHAR(100)", 1).unwrap(),
1600            ])
1601            .table_lock(true);
1602
1603        let sql = builder.build_insert_bulk_statement().unwrap();
1604        assert!(sql.contains("INSERT BULK dbo.Users"));
1605        assert!(sql.contains("TABLOCK"));
1606    }
1607
1608    #[test]
1609    fn test_bulk_insert_rejects_injection() {
1610        let builder = BulkInsertBuilder::new("table;DROP TABLE users")
1611            .with_typed_columns(vec![BulkColumn::new("id", "INT", 0).unwrap()]);
1612
1613        assert!(builder.build_insert_bulk_statement().is_err());
1614    }
1615
1616    #[test]
1617    fn test_bulk_insert_validates_column_names() {
1618        let builder = BulkInsertBuilder::new("Users")
1619            .with_typed_columns(vec![BulkColumn::new("col;DROP TABLE x", "INT", 0).unwrap()]);
1620
1621        assert!(builder.build_insert_bulk_statement().is_err());
1622    }
1623
1624    #[test]
1625    fn test_bulk_insert_accepts_qualified_names() {
1626        let builder = BulkInsertBuilder::new("catalog.dbo.Users")
1627            .with_typed_columns(vec![BulkColumn::new("id", "INT", 0).unwrap()]);
1628
1629        assert!(builder.build_insert_bulk_statement().is_ok());
1630    }
1631
1632    #[test]
1633    fn test_bulk_insert_creation() {
1634        let columns = vec![
1635            BulkColumn::new("id", "INT", 0).unwrap(),
1636            BulkColumn::new("name", "NVARCHAR(100)", 1).unwrap(),
1637        ];
1638
1639        let bulk = BulkInsert::new(columns, 1000);
1640        assert_eq!(bulk.total_rows(), 0);
1641        assert_eq!(bulk.rows_in_batch(), 0);
1642        assert!(!bulk.should_flush());
1643    }
1644
1645    #[test]
1646    fn test_decimal_byte_length() {
1647        assert_eq!(decimal_byte_length(5), 5);
1648        assert_eq!(decimal_byte_length(15), 9);
1649        assert_eq!(decimal_byte_length(25), 13);
1650        assert_eq!(decimal_byte_length(35), 17);
1651    }
1652
1653    #[test]
1654    #[cfg(feature = "chrono")]
1655    fn test_time_byte_length() {
1656        assert_eq!(time_byte_length(0), 3);
1657        assert_eq!(time_byte_length(3), 4);
1658        assert_eq!(time_byte_length(7), 5);
1659    }
1660
1661    #[test]
1662    fn test_plp_string_encoding() {
1663        let mut buf = BytesMut::new();
1664        let text = "Hello";
1665        let utf16: Vec<u16> = text.encode_utf16().collect();
1666
1667        encode_plp_string(&utf16, &mut buf);
1668
1669        // Verify structure:
1670        // - 8 bytes PLP_UNKNOWN_LEN marker
1671        // - 4 bytes chunk length
1672        // - data (5 chars * 2 bytes = 10 bytes)
1673        // - 4 bytes terminator (0)
1674        assert_eq!(buf.len(), 8 + 4 + 10 + 4);
1675
1676        // Check total length marker (PLP_UNKNOWN_LEN)
1677        assert_eq!(&buf[0..8], &PLP_UNKNOWN_LEN.to_le_bytes());
1678
1679        // Check chunk length
1680        assert_eq!(&buf[8..12], &10u32.to_le_bytes());
1681
1682        // Check terminator
1683        assert_eq!(&buf[22..26], &0u32.to_le_bytes());
1684    }
1685
1686    #[test]
1687    fn test_plp_binary_encoding() {
1688        let mut buf = BytesMut::new();
1689        let data = b"test binary data";
1690
1691        encode_plp_binary(data, &mut buf);
1692
1693        // Verify structure:
1694        // - 8 bytes PLP_UNKNOWN_LEN marker
1695        // - 4 bytes chunk length
1696        // - data (16 bytes)
1697        // - 4 bytes terminator (0)
1698        assert_eq!(buf.len(), 8 + 4 + 16 + 4);
1699
1700        // Check total length marker
1701        assert_eq!(&buf[0..8], &PLP_UNKNOWN_LEN.to_le_bytes());
1702
1703        // Check chunk length
1704        assert_eq!(&buf[8..12], &16u32.to_le_bytes());
1705
1706        // Check data
1707        assert_eq!(&buf[12..28], data);
1708
1709        // Check terminator
1710        assert_eq!(&buf[28..32], &0u32.to_le_bytes());
1711    }
1712
1713    #[test]
1714    fn test_plp_empty_string() {
1715        let mut buf = BytesMut::new();
1716        let utf16: Vec<u16> = "".encode_utf16().collect();
1717
1718        encode_plp_string(&utf16, &mut buf);
1719
1720        // Empty string: PLP_UNKNOWN_LEN (8) + terminator (4)
1721        assert_eq!(buf.len(), 8 + 4);
1722
1723        // Check total length marker
1724        assert_eq!(&buf[0..8], &PLP_UNKNOWN_LEN.to_le_bytes());
1725
1726        // Check terminator
1727        assert_eq!(&buf[8..12], &0u32.to_le_bytes());
1728    }
1729
1730    #[test]
1731    fn test_plp_empty_binary() {
1732        let mut buf = BytesMut::new();
1733
1734        encode_plp_binary(&[], &mut buf);
1735
1736        // Empty binary: PLP_UNKNOWN_LEN (8) + terminator (4)
1737        assert_eq!(buf.len(), 8 + 4);
1738
1739        // Check total length marker
1740        assert_eq!(&buf[0..8], &PLP_UNKNOWN_LEN.to_le_bytes());
1741
1742        // Check terminator
1743        assert_eq!(&buf[8..12], &0u32.to_le_bytes());
1744    }
1745
1746    /// Verify that write_colmetadata() produces bytes that the TDS parser can
1747    /// decode correctly for all supported column types (nullable variants).
1748    #[test]
1749    fn test_write_colmetadata_roundtrip() {
1750        use tds_protocol::token::ColMetaData;
1751
1752        let columns = vec![
1753            BulkColumn::new("id", "INT", 0).unwrap(),
1754            BulkColumn::new("tiny", "TINYINT", 1).unwrap(),
1755            BulkColumn::new("small", "SMALLINT", 2).unwrap(),
1756            BulkColumn::new("big", "BIGINT", 3).unwrap(),
1757            BulkColumn::new("flag", "BIT", 4).unwrap(),
1758            BulkColumn::new("r", "REAL", 5).unwrap(),
1759            BulkColumn::new("f", "FLOAT", 6).unwrap(),
1760            BulkColumn::new("name", "NVARCHAR(100)", 7).unwrap(),
1761            BulkColumn::new("code", "VARCHAR(50)", 8).unwrap(),
1762            BulkColumn::new("data", "VARBINARY(200)", 9).unwrap(),
1763            BulkColumn::new("d", "DATE", 10).unwrap(),
1764            BulkColumn::new("t", "TIME(3)", 11).unwrap(),
1765            BulkColumn::new("dt", "DATETIME", 12).unwrap(),
1766            BulkColumn::new("dt2", "DATETIME2(7)", 13).unwrap(),
1767            BulkColumn::new("dto", "DATETIMEOFFSET(7)", 14).unwrap(),
1768            BulkColumn::new("sdt", "SMALLDATETIME", 15).unwrap(),
1769            BulkColumn::new("uid", "UNIQUEIDENTIFIER", 16).unwrap(),
1770            BulkColumn::new("amt", "DECIMAL(18,2)", 17).unwrap(),
1771            BulkColumn::new("price", "MONEY", 18).unwrap(),
1772            BulkColumn::new("smoney", "SMALLMONEY", 19).unwrap(),
1773            BulkColumn::new("nmax", "NVARCHAR(MAX)", 20).unwrap(),
1774            BulkColumn::new("vmax", "VARCHAR(MAX)", 21).unwrap(),
1775            BulkColumn::new("bmax", "VARBINARY(MAX)", 22).unwrap(),
1776        ];
1777
1778        let bulk = BulkInsert::new(columns.clone(), 0);
1779
1780        // Extract COLMETADATA bytes (skip the 0x81 token type byte)
1781        let buf = &bulk.buffer[1..];
1782        let mut cursor = bytes::Bytes::copy_from_slice(buf);
1783        let meta = ColMetaData::decode(&mut cursor)
1784            .expect("write_colmetadata output should be parseable by TDS decoder");
1785
1786        assert_eq!(meta.columns.len(), columns.len());
1787
1788        // Verify each column parsed correctly
1789        for (i, (parsed, original)) in meta.columns.iter().zip(columns.iter()).enumerate() {
1790            assert_eq!(parsed.name, original.name, "column {i} name mismatch");
1791            assert_eq!(
1792                parsed.col_type, original.type_id,
1793                "column {i} ({}) type mismatch",
1794                original.name
1795            );
1796
1797            // Verify type-specific metadata
1798            match original.type_id {
1799                // INTN — max_length should match
1800                0x26 => {
1801                    assert_eq!(
1802                        parsed.type_info.max_length, original.max_length,
1803                        "column {i} ({}) INTN max_length",
1804                        original.name
1805                    );
1806                }
1807                // BITN
1808                0x68 => {
1809                    assert_eq!(parsed.type_info.max_length, Some(1));
1810                }
1811                // FLTN
1812                0x6D => {
1813                    assert_eq!(
1814                        parsed.type_info.max_length, original.max_length,
1815                        "column {i} ({}) FLTN max_length",
1816                        original.name
1817                    );
1818                }
1819                // MONEYN
1820                0x6E => {
1821                    assert_eq!(
1822                        parsed.type_info.max_length, original.max_length,
1823                        "column {i} ({}) MONEYN max_length",
1824                        original.name
1825                    );
1826                }
1827                // DATETIMEN
1828                0x6F => {
1829                    assert_eq!(
1830                        parsed.type_info.max_length, original.max_length,
1831                        "column {i} ({}) DATETIMEN max_length",
1832                        original.name
1833                    );
1834                }
1835                // GUID
1836                0x24 => {
1837                    assert_eq!(parsed.type_info.max_length, Some(16));
1838                }
1839                // DATE — no extra metadata
1840                0x28 => {}
1841                // TIME/DATETIME2/DATETIMEOFFSET — scale
1842                0x29..=0x2B => {
1843                    assert_eq!(
1844                        parsed.type_info.scale, original.scale,
1845                        "column {i} ({}) scale",
1846                        original.name
1847                    );
1848                }
1849                // NVARCHAR/VARCHAR — max_length + collation
1850                0xE7 | 0xA7 => {
1851                    assert_eq!(
1852                        parsed.type_info.max_length, original.max_length,
1853                        "column {i} ({}) string max_length",
1854                        original.name
1855                    );
1856                    assert!(
1857                        parsed.type_info.collation.is_some(),
1858                        "column {i} ({}) should have collation",
1859                        original.name
1860                    );
1861                }
1862                // VARBINARY — max_length, no collation
1863                0xA5 => {
1864                    assert_eq!(
1865                        parsed.type_info.max_length, original.max_length,
1866                        "column {i} ({}) binary max_length",
1867                        original.name
1868                    );
1869                    assert!(
1870                        parsed.type_info.collation.is_none(),
1871                        "column {i} ({}) should not have collation",
1872                        original.name
1873                    );
1874                }
1875                // DECIMAL
1876                0x6C => {
1877                    assert_eq!(
1878                        parsed.type_info.precision, original.precision,
1879                        "column {i} ({}) precision",
1880                        original.name
1881                    );
1882                    assert_eq!(
1883                        parsed.type_info.scale, original.scale,
1884                        "column {i} ({}) scale",
1885                        original.name
1886                    );
1887                }
1888                _ => {}
1889            }
1890        }
1891    }
1892
1893    /// Verify that NOT NULL columns use fixed-width type IDs (0x38 Int4,
1894    /// 0x32 Bit, etc.) rather than nullable type IDs (0x26 INTN, 0x68 BITN).
1895    /// SQL Server's BulkLoad rejects nullable IDs for NOT NULL columns.
1896    #[test]
1897    fn test_write_colmetadata_not_null_uses_fixed_types() {
1898        use tds_protocol::token::ColMetaData;
1899        use tds_protocol::types::TypeId;
1900
1901        let columns = vec![
1902            BulkColumn::new("id", "INT", 0)
1903                .unwrap()
1904                .with_nullable(false),
1905            BulkColumn::new("tiny", "TINYINT", 1)
1906                .unwrap()
1907                .with_nullable(false),
1908            BulkColumn::new("small", "SMALLINT", 2)
1909                .unwrap()
1910                .with_nullable(false),
1911            BulkColumn::new("big", "BIGINT", 3)
1912                .unwrap()
1913                .with_nullable(false),
1914            BulkColumn::new("flag", "BIT", 4)
1915                .unwrap()
1916                .with_nullable(false),
1917            BulkColumn::new("r", "REAL", 5)
1918                .unwrap()
1919                .with_nullable(false),
1920            BulkColumn::new("f", "FLOAT", 6)
1921                .unwrap()
1922                .with_nullable(false),
1923            BulkColumn::new("dt", "DATETIME", 7)
1924                .unwrap()
1925                .with_nullable(false),
1926            BulkColumn::new("sdt", "SMALLDATETIME", 8)
1927                .unwrap()
1928                .with_nullable(false),
1929            BulkColumn::new("mny", "MONEY", 9)
1930                .unwrap()
1931                .with_nullable(false),
1932            BulkColumn::new("smny", "SMALLMONEY", 10)
1933                .unwrap()
1934                .with_nullable(false),
1935        ];
1936
1937        let bulk = BulkInsert::new(columns.clone(), 0);
1938
1939        // Every NOT NULL fixed-width column should have fixed_len=true
1940        for (i, fixed) in bulk.fixed_len.iter().enumerate() {
1941            assert!(
1942                *fixed,
1943                "column {i} ({}) should be fixed_len",
1944                columns[i].name
1945            );
1946        }
1947
1948        // Parse the generated COLMETADATA
1949        let buf = &bulk.buffer[1..]; // skip token type byte
1950        let mut cursor = bytes::Bytes::copy_from_slice(buf);
1951        let meta = ColMetaData::decode(&mut cursor).expect("parseable");
1952
1953        // Verify each column has the expected fixed type ID and no Nullable flag
1954        let expected: &[(&str, TypeId)] = &[
1955            ("id", TypeId::Int4),
1956            ("tiny", TypeId::Int1),
1957            ("small", TypeId::Int2),
1958            ("big", TypeId::Int8),
1959            ("flag", TypeId::Bit),
1960            ("r", TypeId::Float4),
1961            ("f", TypeId::Float8),
1962            ("dt", TypeId::DateTime),
1963            ("sdt", TypeId::DateTime4),
1964            ("mny", TypeId::Money),
1965            ("smny", TypeId::Money4),
1966        ];
1967
1968        for (i, (name, ty)) in expected.iter().enumerate() {
1969            assert_eq!(meta.columns[i].name, *name, "column {i} name");
1970            assert_eq!(meta.columns[i].type_id, *ty, "column {i} ({name}) type");
1971            assert_eq!(
1972                meta.columns[i].flags & 0x0001,
1973                0,
1974                "column {i} ({name}) should not have Nullable flag set"
1975            );
1976        }
1977    }
1978
1979    /// Verify that `with_collation()` on a VARCHAR column propagates into
1980    /// the COLMETADATA token — the hand-crafted path previously hardcoded
1981    /// Latin1_General_CI_AS regardless of the caller-supplied collation.
1982    #[test]
1983    fn test_write_colmetadata_uses_caller_collation() {
1984        use tds_protocol::token::{ColMetaData, Collation};
1985
1986        // Chinese_PRC_CI_AS: LCID 0x0804, sort_id 0x52 (just a non-default pair)
1987        let chinese = Collation {
1988            lcid: 0x0804,
1989            sort_id: 0x52,
1990        };
1991
1992        let columns = vec![
1993            BulkColumn::new("s", "VARCHAR(50)", 0)
1994                .unwrap()
1995                .with_collation(chinese),
1996            // NVARCHAR also writes 5 collation bytes — should honor caller too
1997            BulkColumn::new("n", "NVARCHAR(50)", 1)
1998                .unwrap()
1999                .with_collation(chinese),
2000            // VARCHAR without with_collation should keep the Latin1 default
2001            BulkColumn::new("d", "VARCHAR(10)", 2).unwrap(),
2002        ];
2003        let bulk = BulkInsert::new(columns, 0);
2004
2005        let buf = &bulk.buffer[1..];
2006        let mut cursor = bytes::Bytes::copy_from_slice(buf);
2007        let meta = ColMetaData::decode(&mut cursor).expect("parseable");
2008
2009        let c0 = meta.columns[0]
2010            .type_info
2011            .collation
2012            .as_ref()
2013            .expect("VARCHAR has collation");
2014        assert_eq!(c0.lcid, chinese.lcid, "VARCHAR caller LCID");
2015        assert_eq!(c0.sort_id, chinese.sort_id, "VARCHAR caller sort_id");
2016
2017        let c1 = meta.columns[1]
2018            .type_info
2019            .collation
2020            .as_ref()
2021            .expect("NVARCHAR has collation");
2022        assert_eq!(c1.lcid, chinese.lcid, "NVARCHAR caller LCID");
2023        assert_eq!(c1.sort_id, chinese.sort_id, "NVARCHAR caller sort_id");
2024
2025        // Default collation: Latin1_General_CI_AS wire bytes
2026        // [0x09, 0x04, 0xD0, 0x00, 0x34] → lcid u32 LE = 0x00D0_0409, sort_id = 0x34
2027        let default = meta.columns[2]
2028            .type_info
2029            .collation
2030            .as_ref()
2031            .expect("VARCHAR has default collation");
2032        assert_eq!(default.to_bytes(), [0x09, 0x04, 0xD0, 0x00, 0x34]);
2033    }
2034
2035    #[test]
2036    fn test_parse_sql_type_max() {
2037        // Test NVARCHAR(MAX) parsing - uses 0xFFFF marker (not doubled for MAX)
2038        let (type_id, len, _, _) = parse_sql_type("NVARCHAR(MAX)");
2039        assert_eq!(type_id, 0xE7);
2040        assert_eq!(len, Some(0xFFFF)); // MAX marker is 0xFFFF
2041
2042        // Test VARBINARY(MAX) parsing
2043        let (type_id, len, _, _) = parse_sql_type("VARBINARY(MAX)");
2044        assert_eq!(type_id, 0xA5);
2045        assert_eq!(len, Some(0xFFFF));
2046
2047        // Test VARCHAR(MAX) parsing
2048        let (type_id, len, _, _) = parse_sql_type("VARCHAR(MAX)");
2049        assert_eq!(type_id, 0xA7);
2050        assert_eq!(len, Some(0xFFFF));
2051
2052        // Verify normal NVARCHAR does double the length
2053        let (type_id, len, _, _) = parse_sql_type("NVARCHAR(100)");
2054        assert_eq!(type_id, 0xE7);
2055        assert_eq!(len, Some(200)); // 100 * 2 for UTF-16
2056    }
2057}