Skip to main content

arrow_tiberius/write/
writer.rs

1//! Baseline bulk writer public API skeleton.
2
3use std::borrow::Cow;
4
5use arrow_array::RecordBatch;
6use futures_util::io::{AsyncRead, AsyncWrite};
7
8use crate::{
9    Diagnostic, DiagnosticCode, DiagnosticSet, FieldRef, PlanOptions, Result, SchemaMapping,
10    TableName,
11};
12
13use super::{
14    SchemaCheck,
15    direct::{
16        DirectEncoder, MeasuredDirectBatch, MeasuredRowRange,
17        plan::{DirectColumnEncoding, DirectColumnPlan, DirectEncoderPlan},
18    },
19    profile,
20    record_batch::RecordBatchView,
21    token_row::tiberius_row_owned,
22};
23use crate::conversion::arrow_to_mssql::{
24    fixed_size_binary::FixedSizeBinaryArrowToMssql, primitive::PrimitiveArrowToMssql,
25    temporal::TemporalArrowToMssql, variable_width::VariableWidthArrowToMssql,
26};
27
28const DIRECT_RAW_MAX_PAYLOAD_BYTES: usize = 8 * 1024 * 1024;
29
30/// Write backend selection.
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
32pub enum WriteBackend {
33    /// Select the best available backend for the current crate build and plan.
34    #[default]
35    Auto,
36    /// Use Tiberius' row-oriented `TokenRow` bulk-load path.
37    BaselineTokenRow,
38    /// Use direct bulk-row payload encoding through Tiberius' framed sink.
39    DirectFramedBulk,
40    /// Use the raw bulk-row payload path exposed by the Tiberius fork.
41    DirectRawBulk,
42}
43
44/// Execution-time write options.
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
46pub struct WriteOptions {
47    /// Requested write backend.
48    pub backend: WriteBackend,
49    /// Batch schema validation policy.
50    pub schema_check: SchemaCheck,
51    /// Planning/runtime conversion policies used by policy-dependent write conversions.
52    pub plan_options: PlanOptions,
53}
54
55/// Cumulative write statistics.
56#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
57pub struct WriteStats {
58    /// Number of rows accepted by the writer.
59    pub rows_written: u64,
60    /// Number of batches accepted by the writer.
61    pub batches_written: u64,
62}
63
64#[derive(Debug)]
65struct WriterState {
66    backend: WriteBackend,
67    direct_encoder: Option<DirectEncoder>,
68    schema_check: SchemaCheck,
69    plan_options: PlanOptions,
70    mappings: Vec<SchemaMapping>,
71    stats: WriteStats,
72}
73
74impl WriterState {
75    fn new(
76        requested_backend: WriteBackend,
77        schema_check: SchemaCheck,
78        plan_options: PlanOptions,
79        mappings: Vec<SchemaMapping>,
80    ) -> Result<Self> {
81        let backend = resolve_backend(requested_backend)?;
82        let direct_encoder = match backend {
83            WriteBackend::DirectFramedBulk | WriteBackend::DirectRawBulk => {
84                Some(DirectEncoder::new_with_options(&mappings, plan_options)?)
85            }
86            WriteBackend::Auto | WriteBackend::BaselineTokenRow => None,
87        };
88
89        Ok(Self {
90            backend,
91            direct_encoder,
92            schema_check,
93            plan_options,
94            mappings,
95            stats: WriteStats::default(),
96        })
97    }
98
99    fn backend(&self) -> WriteBackend {
100        self.backend
101    }
102
103    fn direct_encoder(&self) -> Option<&DirectEncoder> {
104        self.direct_encoder.as_ref()
105    }
106
107    fn mappings(&self) -> &[SchemaMapping] {
108        &self.mappings
109    }
110
111    fn schema_check(&self) -> SchemaCheck {
112        self.schema_check
113    }
114
115    fn plan_options(&self) -> &PlanOptions {
116        &self.plan_options
117    }
118
119    fn stats(&self) -> WriteStats {
120        self.stats
121    }
122
123    fn record_accepted_batch(&mut self, rows: u64) -> WriteStats {
124        self.stats.rows_written = self.stats.rows_written.saturating_add(rows);
125        self.stats.batches_written = self.stats.batches_written.saturating_add(1);
126        self.stats
127    }
128}
129
130/// SQL Server bulk writer for Arrow record batches.
131#[derive(Debug)]
132pub struct BulkWriter<'client, S>
133where
134    S: AsyncRead + AsyncWrite + Unpin + Send,
135{
136    state: WriterState,
137    request: tiberius::BulkLoadRequest<'client, S>,
138}
139
140impl<'client, S> BulkWriter<'client, S>
141where
142    S: AsyncRead + AsyncWrite + Unpin + Send,
143{
144    /// Starts a bulk writer for a planned SQL Server table target.
145    pub async fn new(
146        client: &'client mut tiberius::Client<S>,
147        table: TableName,
148        mappings: Vec<SchemaMapping>,
149        options: WriteOptions,
150    ) -> Result<Self> {
151        let state = WriterState::new(
152            options.backend,
153            options.schema_check,
154            options.plan_options,
155            mappings,
156        )?;
157        let mut request = match state.backend() {
158            WriteBackend::BaselineTokenRow
159            | WriteBackend::DirectFramedBulk
160            | WriteBackend::DirectRawBulk => {
161                let table_sql = bulk_insert_table_sql(&table);
162                let columns = client
163                    .bulk_insert_columns(&table_sql)
164                    .await
165                    .map_err(|source| crate::Error::Tiberius { source })?;
166                validate_bulk_target_columns(columns.iter(), state.mappings())?;
167                if matches!(
168                    state.backend(),
169                    WriteBackend::DirectFramedBulk | WriteBackend::DirectRawBulk
170                ) {
171                    let encoder =
172                        state
173                            .direct_encoder()
174                            .ok_or_else(|| crate::Error::BackendUnavailable {
175                                backend: state.backend(),
176                                reason: "direct bulk encoder is not available for this writer"
177                                    .to_owned(),
178                            })?;
179                    validate_direct_bulk_target_column_types(columns.iter(), encoder.plan())?;
180                }
181                client
182                    .bulk_insert_with_columns(&table_sql, columns)
183                    .await
184                    .map_err(|source| crate::Error::Tiberius { source })?
185            }
186            WriteBackend::Auto => {
187                return Err(execution_unavailable(state.backend()));
188            }
189        };
190
191        if state.backend() == WriteBackend::DirectRawBulk {
192            request.enable_direct_packet_writes();
193        }
194
195        Ok(Self { state, request })
196    }
197
198    /// Writes one Arrow record batch.
199    pub async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteStats> {
200        match self.state.backend() {
201            WriteBackend::BaselineTokenRow => {
202                write_batch_to_sink(&mut self.state, &mut self.request, batch).await
203            }
204            WriteBackend::DirectFramedBulk | WriteBackend::DirectRawBulk => {
205                write_direct_batch_to_sink(&mut self.state, &mut self.request, batch).await
206            }
207            WriteBackend::Auto => Err(execution_unavailable(WriteBackend::Auto)),
208        }
209    }
210
211    /// Finalizes the bulk writer and returns cumulative write statistics.
212    pub async fn finish(self) -> Result<WriteStats> {
213        let Self { state, request } = self;
214        let stats = state.stats();
215
216        #[cfg(feature = "bench-profile")]
217        {
218            let (_result, stats) = request
219                .finalize_with_stats()
220                .await
221                .map_err(|source| crate::Error::Tiberius { source })?;
222            profile::record_bulk_load_stats(stats);
223        }
224
225        #[cfg(not(feature = "bench-profile"))]
226        request
227            .finalize()
228            .await
229            .map_err(|source| crate::Error::Tiberius { source })?;
230
231        Ok(stats)
232    }
233}
234
235fn bulk_insert_table_sql(table: &TableName) -> String {
236    table.quoted_sql()
237}
238
239fn record_batch_view<'a>(
240    batch: &'a RecordBatch,
241    mappings: &'a [SchemaMapping],
242    schema_check: SchemaCheck,
243    plan_options: &PlanOptions,
244) -> Result<RecordBatchView<'a>> {
245    match schema_check {
246        SchemaCheck::Strict => RecordBatchView::new_with_options(batch, mappings, plan_options),
247    }
248}
249
250fn validate_batch_rows(view: &RecordBatchView<'_>) -> Result<()> {
251    for row_index in 0..view.row_count() {
252        let _cells = view.mssql_row(row_index)?;
253    }
254
255    Ok(())
256}
257
258fn validate_bulk_target_columns<Column>(
259    columns: impl ExactSizeIterator<Item = Column>,
260    mappings: &[SchemaMapping],
261) -> Result<()>
262where
263    Column: BulkTargetColumnMetadata,
264{
265    let column_count = columns.len();
266    let mut diagnostics = DiagnosticSet::new();
267
268    if column_count != mappings.len() {
269        diagnostics.push(Diagnostic::error(
270            DiagnosticCode::SchemaMismatch,
271            format!(
272                "bulk target has {column_count} updateable column(s) but mappings contain {} column(s)",
273                mappings.len()
274            ),
275        ));
276    }
277
278    for (position, (column, mapping)) in columns.zip(mappings).enumerate() {
279        validate_bulk_target_column(position, column, mapping, &mut diagnostics);
280    }
281
282    if diagnostics.has_errors() {
283        return Err(crate::Error::ValueConversion { diagnostics });
284    }
285
286    Ok(())
287}
288
289fn validate_bulk_target_column(
290    position: usize,
291    column: impl BulkTargetColumnMetadata,
292    mapping: &SchemaMapping,
293    diagnostics: &mut DiagnosticSet,
294) {
295    if column.ordinal() != position {
296        diagnostics.push(bulk_target_column_diagnostic(
297            mapping,
298            format!(
299                "bulk target column ordinal {} does not match mapping position {position}",
300                column.ordinal()
301            ),
302        ));
303    }
304
305    if column.name() != mapping.mssql().name().as_str() {
306        diagnostics.push(bulk_target_column_diagnostic(
307            mapping,
308            format!(
309                "bulk target column name {} does not match planned MSSQL column name {}",
310                column.name(),
311                mapping.mssql().name().as_str()
312            ),
313        ));
314    }
315
316    if column.is_nullable() != mapping.mssql().nullable() {
317        diagnostics.push(bulk_target_column_diagnostic(
318            mapping,
319            format!(
320                "bulk target column nullability {} does not match planned MSSQL column nullability {}",
321                column.is_nullable(),
322                mapping.mssql().nullable()
323            ),
324        ));
325    }
326}
327
328fn validate_direct_bulk_target_column_types<Column>(
329    columns: impl ExactSizeIterator<Item = Column>,
330    plan: &DirectEncoderPlan,
331) -> Result<()>
332where
333    Column: BulkTargetColumnMetadata,
334{
335    let column_count = columns.len();
336    let mut diagnostics = DiagnosticSet::new();
337
338    if column_count != plan.column_count() {
339        diagnostics.push(Diagnostic::error(
340            DiagnosticCode::SchemaMismatch,
341            format!(
342                "bulk target has {column_count} updateable column(s) but direct plan contains {} column(s)",
343                plan.column_count()
344            ),
345        ));
346    }
347
348    for (column, plan_column) in columns.zip(plan.columns()) {
349        validate_direct_bulk_target_column_type(column, plan_column, &mut diagnostics);
350    }
351
352    if diagnostics.has_errors() {
353        return Err(crate::Error::ValueConversion { diagnostics });
354    }
355
356    Ok(())
357}
358
359fn validate_direct_bulk_target_column_type(
360    column: impl BulkTargetColumnMetadata,
361    plan_column: &DirectColumnPlan,
362    diagnostics: &mut DiagnosticSet,
363) {
364    let Some(expected) = expected_direct_bulk_column_type(plan_column) else {
365        diagnostics.push(
366            Diagnostic::error(
367                DiagnosticCode::DirectEncodingUnsupportedMapping,
368                format!(
369                    "direct target type validation is not implemented for {:?}",
370                    plan_column.encoding()
371                ),
372            )
373            .with_field(FieldRef::new(
374                plan_column.source_index(),
375                plan_column.source_name(),
376            )),
377        );
378        return;
379    };
380    let actual = column.column_type();
381
382    if actual != expected {
383        diagnostics.push(
384            Diagnostic::error(
385                DiagnosticCode::SchemaMismatch,
386                format!(
387                    "bulk target column type {actual:?} does not match direct encoder type {expected:?}"
388                ),
389            )
390            .with_field(FieldRef::new(
391                plan_column.source_index(),
392                plan_column.source_name(),
393            )),
394        );
395    }
396
397    if let Some((expected_precision, expected_scale)) =
398        expected_direct_decimal_precision_scale(plan_column)
399    {
400        match column.decimal_precision_scale() {
401            Some((actual_precision, actual_scale))
402                if actual_precision == expected_precision && actual_scale == expected_scale => {}
403            Some((actual_precision, actual_scale)) => diagnostics.push(
404                Diagnostic::error(
405                    DiagnosticCode::SchemaMismatch,
406                    format!(
407                        "bulk target decimal precision/scale ({actual_precision},{actual_scale}) does not match direct encoder precision/scale ({expected_precision},{expected_scale})"
408                    ),
409                )
410                .with_field(FieldRef::new(
411                    plan_column.source_index(),
412                    plan_column.source_name(),
413                )),
414            ),
415            None => diagnostics.push(
416                Diagnostic::error(
417                    DiagnosticCode::SchemaMismatch,
418                    "bulk target decimal precision/scale metadata is not available",
419                )
420                .with_field(FieldRef::new(
421                    plan_column.source_index(),
422                    plan_column.source_name(),
423                )),
424            ),
425        }
426    }
427}
428
429fn expected_direct_bulk_column_type(column: &DirectColumnPlan) -> Option<tiberius::ColumnType> {
430    match column.encoding() {
431        DirectColumnEncoding::Primitive(PrimitiveArrowToMssql::BooleanToBit) => {
432            if column.nullable() {
433                Some(tiberius::ColumnType::Bitn)
434            } else {
435                Some(tiberius::ColumnType::Bit)
436            }
437        }
438        DirectColumnEncoding::Primitive(PrimitiveArrowToMssql::UInt8ToTinyInt) => {
439            Some(tiberius::ColumnType::Int1)
440        }
441        DirectColumnEncoding::Primitive(
442            PrimitiveArrowToMssql::Int8ToSmallInt | PrimitiveArrowToMssql::Int16ToSmallInt,
443        ) => Some(tiberius::ColumnType::Int2),
444        DirectColumnEncoding::Primitive(PrimitiveArrowToMssql::Int32ToInt) => {
445            Some(tiberius::ColumnType::Int4)
446        }
447        DirectColumnEncoding::Primitive(PrimitiveArrowToMssql::UInt16ToInt) => {
448            Some(tiberius::ColumnType::Int4)
449        }
450        DirectColumnEncoding::Primitive(PrimitiveArrowToMssql::Int64ToBigInt) => {
451            Some(tiberius::ColumnType::Int8)
452        }
453        DirectColumnEncoding::Primitive(PrimitiveArrowToMssql::UInt32ToBigInt) => {
454            Some(tiberius::ColumnType::Int8)
455        }
456        DirectColumnEncoding::Primitive(PrimitiveArrowToMssql::UInt64ToCheckedBigInt) => {
457            Some(tiberius::ColumnType::Int8)
458        }
459        DirectColumnEncoding::Primitive(
460            PrimitiveArrowToMssql::Float16ToReal | PrimitiveArrowToMssql::Float32ToReal,
461        ) => Some(tiberius::ColumnType::Float4),
462        DirectColumnEncoding::Primitive(PrimitiveArrowToMssql::Float64ToFloat) => {
463            Some(tiberius::ColumnType::Float8)
464        }
465        DirectColumnEncoding::UInt64Decimal20_0 | DirectColumnEncoding::Decimal(_) => {
466            Some(tiberius::ColumnType::Decimaln)
467        }
468        DirectColumnEncoding::VariableWidth(
469            VariableWidthArrowToMssql::Utf8ToNVarChar { .. }
470            | VariableWidthArrowToMssql::LargeUtf8ToNVarChar { .. },
471        ) => Some(tiberius::ColumnType::NVarchar),
472        DirectColumnEncoding::VariableWidth(
473            VariableWidthArrowToMssql::BinaryToVarBinary { .. }
474            | VariableWidthArrowToMssql::LargeBinaryToVarBinary { .. },
475        ) => Some(tiberius::ColumnType::BigVarBin),
476        DirectColumnEncoding::FixedSizeBinary(
477            FixedSizeBinaryArrowToMssql::FixedSizeBinaryToBinary { .. },
478        ) => Some(tiberius::ColumnType::BigBinary),
479        DirectColumnEncoding::Temporal(TemporalArrowToMssql::Date32ToDate) => {
480            Some(tiberius::ColumnType::Daten)
481        }
482        DirectColumnEncoding::Temporal(TemporalArrowToMssql::Date64ToDateTime2) => {
483            Some(tiberius::ColumnType::Datetime2)
484        }
485        DirectColumnEncoding::Temporal(
486            TemporalArrowToMssql::TimestampSecondToDateTime2
487            | TemporalArrowToMssql::TimestampMillisecondToDateTime2
488            | TemporalArrowToMssql::TimestampMicrosecondToDateTime2
489            | TemporalArrowToMssql::TimestampNanosecondToDateTime2
490            | TemporalArrowToMssql::TimestampSecondTzToDateTime2
491            | TemporalArrowToMssql::TimestampMillisecondTzToDateTime2
492            | TemporalArrowToMssql::TimestampMicrosecondTzToDateTime2
493            | TemporalArrowToMssql::TimestampNanosecondTzToDateTime2,
494        ) => Some(tiberius::ColumnType::Datetime2),
495        DirectColumnEncoding::Temporal(
496            TemporalArrowToMssql::Time32SecondToTime
497            | TemporalArrowToMssql::Time32MillisecondToTime
498            | TemporalArrowToMssql::Time64MicrosecondToTime
499            | TemporalArrowToMssql::Time64NanosecondToTime,
500        ) => Some(tiberius::ColumnType::Timen),
501        DirectColumnEncoding::Temporal(
502            TemporalArrowToMssql::TimestampSecondTzToDateTimeOffset
503            | TemporalArrowToMssql::TimestampMillisecondTzToDateTimeOffset
504            | TemporalArrowToMssql::TimestampMicrosecondTzToDateTimeOffset
505            | TemporalArrowToMssql::TimestampNanosecondTzToDateTimeOffset,
506        ) => Some(tiberius::ColumnType::DatetimeOffsetn),
507    }
508}
509
510fn expected_direct_decimal_precision_scale(column: &DirectColumnPlan) -> Option<(u8, u8)> {
511    match column.encoding() {
512        DirectColumnEncoding::UInt64Decimal20_0 => Some((20, 0)),
513        DirectColumnEncoding::Decimal(classification) => Some((
514            classification.target_precision(),
515            classification.target_scale(),
516        )),
517        _ => None,
518    }
519}
520
521fn bulk_target_column_diagnostic(
522    mapping: &SchemaMapping,
523    message: impl Into<String>,
524) -> Diagnostic {
525    Diagnostic::error(DiagnosticCode::SchemaMismatch, message).with_field(FieldRef::new(
526        mapping.arrow().index(),
527        mapping.arrow().name(),
528    ))
529}
530
531trait BulkTargetColumnMetadata {
532    fn ordinal(&self) -> usize;
533
534    fn name(&self) -> &str;
535
536    fn is_nullable(&self) -> bool;
537
538    fn column_type(&self) -> tiberius::ColumnType;
539
540    fn decimal_precision_scale(&self) -> Option<(u8, u8)> {
541        None
542    }
543}
544
545impl BulkTargetColumnMetadata for tiberius::BulkLoadColumn<'_> {
546    fn ordinal(&self) -> usize {
547        self.ordinal()
548    }
549
550    fn name(&self) -> &str {
551        self.name()
552    }
553
554    fn is_nullable(&self) -> bool {
555        self.is_nullable()
556    }
557
558    fn column_type(&self) -> tiberius::ColumnType {
559        self.column_type()
560    }
561
562    fn decimal_precision_scale(&self) -> Option<(u8, u8)> {
563        match self.type_info() {
564            tiberius::TypeInfo::VarLenSizedPrecision {
565                ty: tiberius::VarLenType::Decimaln | tiberius::VarLenType::Numericn,
566                precision,
567                scale,
568                ..
569            } => Some((*precision, *scale)),
570            _ => None,
571        }
572    }
573}
574
575async fn write_batch_to_sink<Sink>(
576    state: &mut WriterState,
577    sink: &mut Sink,
578    batch: &RecordBatch,
579) -> Result<WriteStats>
580where
581    Sink: TokenRowSink,
582{
583    let view = record_batch_view(
584        batch,
585        state.mappings(),
586        state.schema_check(),
587        state.plan_options(),
588    )?;
589    validate_batch_rows(&view)?;
590    let rows_written = usize_to_u64_saturating(view.row_count());
591
592    for row_index in 0..view.row_count() {
593        let row = tiberius_row_owned(&view, row_index)?;
594        sink.send_token_row(row).await?;
595    }
596
597    Ok(state.record_accepted_batch(rows_written))
598}
599
600trait TokenRowSink {
601    async fn send_token_row(&mut self, row: tiberius::TokenRow<'static>) -> Result<()>;
602}
603
604impl<S> TokenRowSink for tiberius::BulkLoadRequest<'_, S>
605where
606    S: AsyncRead + AsyncWrite + Unpin + Send,
607{
608    async fn send_token_row(&mut self, row: tiberius::TokenRow<'static>) -> Result<()> {
609        self.send(row)
610            .await
611            .map_err(|source| crate::Error::Tiberius { source })
612    }
613}
614
615async fn write_direct_batch_to_sink<Sink>(
616    state: &mut WriterState,
617    sink: &mut Sink,
618    batch: &RecordBatch,
619) -> Result<WriteStats>
620where
621    Sink: RawRowsSink,
622{
623    let encoder = state
624        .direct_encoder()
625        .ok_or_else(|| crate::Error::BackendUnavailable {
626            backend: state.backend(),
627            reason: "direct bulk encoder is not available for this writer".to_owned(),
628        })?;
629    let measure_start = std::time::Instant::now();
630    let measured = encoder.measure_batch(batch);
631    let measured = profile::record_elapsed(measure_start, profile::record_measure_batch, measured)?;
632    let rows_written = usize_to_u64_saturating(measured.row_count());
633
634    let split_start = std::time::Instant::now();
635    let ranges = measured.row_ranges(DIRECT_RAW_MAX_PAYLOAD_BYTES);
636    let ranges = profile::record_elapsed(split_start, profile::record_row_range_split, ranges)?;
637
638    for range in ranges {
639        sink.send_measured_raw_rows(encoder, batch, &measured, range)
640            .await?;
641    }
642
643    profile::record_accepted_batch(measured.row_count());
644    Ok(state.record_accepted_batch(rows_written))
645}
646
647trait RawRowsSink {
648    async fn send_measured_raw_rows(
649        &mut self,
650        encoder: &DirectEncoder,
651        batch: &RecordBatch,
652        measured: &MeasuredDirectBatch,
653        range: MeasuredRowRange,
654    ) -> Result<()>;
655}
656
657impl<S> RawRowsSink for tiberius::BulkLoadRequest<'_, S>
658where
659    S: AsyncRead + AsyncWrite + Unpin + Send,
660{
661    async fn send_measured_raw_rows(
662        &mut self,
663        encoder: &DirectEncoder,
664        batch: &RecordBatch,
665        measured: &MeasuredDirectBatch,
666        range: MeasuredRowRange,
667    ) -> Result<()> {
668        let encoded_bytes = measured.range_payload_len(range.start, range.len)?;
669        profile::record_row_range(encoded_bytes);
670
671        if !encoder.has_variable_width_column() {
672            let encode_start = std::time::Instant::now();
673            let payload =
674                encoder.encode_measured_batch_range(batch, measured, range.start, range.len)?;
675            profile::record_append_encode(encode_start.elapsed());
676
677            let send_start = std::time::Instant::now();
678            let send_result = self
679                .send_raw_rows_payload_checked(payload.bytes(), payload.row_token_offsets())
680                .await
681                .map_err(|source| crate::Error::Tiberius { source });
682            profile::record_send_total(send_start.elapsed());
683            return send_result;
684        }
685
686        let mut encode_error = None;
687        let send_start = std::time::Instant::now();
688        let send_result = self
689            .send_raw_rows_with(|buf| {
690                let encode_start = std::time::Instant::now();
691                let encoded = encoder.encode_measured_batch_range_into(
692                    batch,
693                    measured,
694                    range.start,
695                    range.len,
696                    buf,
697                );
698                profile::record_append_encode(encode_start.elapsed());
699
700                match encoded {
701                    Ok(append) => Ok(append),
702                    Err(err) => {
703                        encode_error = Some(err);
704                        Err(tiberius::error::Error::BulkInput(Cow::Borrowed(
705                            "direct raw row encoding failed",
706                        )))
707                    }
708                }
709            })
710            .await;
711        profile::record_send_total(send_start.elapsed());
712
713        if let Some(err) = encode_error {
714            return Err(err);
715        }
716
717        send_result.map_err(|source| crate::Error::Tiberius { source })
718    }
719}
720
721fn usize_to_u64_saturating(value: usize) -> u64 {
722    u64::try_from(value).unwrap_or(u64::MAX)
723}
724
725fn resolve_backend(requested_backend: WriteBackend) -> Result<WriteBackend> {
726    match requested_backend {
727        WriteBackend::Auto | WriteBackend::DirectRawBulk => Ok(WriteBackend::DirectRawBulk),
728        WriteBackend::BaselineTokenRow => Ok(WriteBackend::BaselineTokenRow),
729        WriteBackend::DirectFramedBulk => Ok(WriteBackend::DirectFramedBulk),
730    }
731}
732
733fn execution_unavailable(backend: WriteBackend) -> crate::Error {
734    crate::Error::BackendUnavailable {
735        backend,
736        reason: "bulk writer execution is not implemented yet".to_owned(),
737    }
738}
739
740#[cfg(test)]
741mod tests {
742    use std::{
743        borrow::Cow,
744        future::Future,
745        pin::Pin,
746        sync::Arc,
747        task::{Context, Poll, Wake, Waker},
748    };
749
750    use arrow_array::{BinaryArray, Float64Array, Int32Array, RecordBatch, UInt64Array};
751    use arrow_schema::{DataType, Field, Schema};
752    use futures_util::io::{AsyncRead, AsyncWrite};
753
754    use super::{
755        BulkTargetColumnMetadata, DIRECT_RAW_MAX_PAYLOAD_BYTES, DirectEncoder, MeasuredDirectBatch,
756        MeasuredRowRange, RawRowsSink, TokenRowSink, WriteBackend, WriteOptions, WriteStats,
757        WriterState, bulk_insert_table_sql, record_batch_view, resolve_backend, tiberius_row_owned,
758        validate_batch_rows, validate_bulk_target_columns,
759        validate_direct_bulk_target_column_types, write_batch_to_sink, write_direct_batch_to_sink,
760    };
761    use crate::{
762        ArrowFieldRef, DiagnosticCode, Error, Identifier, MssqlColumn, MssqlType, MssqlTypeLength,
763        PlanOptions, SchemaCheck, SchemaMapping, TableName,
764    };
765
766    #[test]
767    fn write_backend_defaults_to_auto() {
768        assert_eq!(WriteBackend::default(), WriteBackend::Auto);
769    }
770
771    #[test]
772    fn write_options_default_to_auto_backend_and_strict_schema_check() {
773        let options = WriteOptions::default();
774
775        assert_eq!(options.backend, WriteBackend::Auto);
776        assert_eq!(options.schema_check, SchemaCheck::Strict);
777        assert_eq!(options.plan_options, PlanOptions::default());
778    }
779
780    #[test]
781    fn write_options_preserve_explicit_backend_selection() {
782        for backend in [
783            WriteBackend::Auto,
784            WriteBackend::BaselineTokenRow,
785            WriteBackend::DirectFramedBulk,
786            WriteBackend::DirectRawBulk,
787        ] {
788            let options = WriteOptions {
789                backend,
790                schema_check: SchemaCheck::Strict,
791                ..WriteOptions::default()
792            };
793
794            assert_eq!(options.backend, backend);
795            assert_eq!(options.schema_check, SchemaCheck::Strict);
796        }
797    }
798
799    #[test]
800    fn write_stats_default_to_zero() {
801        let stats = WriteStats::default();
802
803        assert_eq!(stats.rows_written, 0);
804        assert_eq!(stats.batches_written, 0);
805    }
806
807    #[test]
808    fn auto_backend_resolves_to_direct_raw_bulk() {
809        assert_eq!(
810            resolve_backend(WriteBackend::Auto).unwrap(),
811            WriteBackend::DirectRawBulk
812        );
813    }
814
815    #[test]
816    fn explicit_backends_resolve_to_requested_backend() {
817        assert_eq!(
818            resolve_backend(WriteBackend::BaselineTokenRow).unwrap(),
819            WriteBackend::BaselineTokenRow
820        );
821        assert_eq!(
822            resolve_backend(WriteBackend::DirectFramedBulk).unwrap(),
823            WriteBackend::DirectFramedBulk
824        );
825        assert_eq!(
826            resolve_backend(WriteBackend::DirectRawBulk).unwrap(),
827            WriteBackend::DirectRawBulk
828        );
829    }
830
831    #[test]
832    fn writer_state_starts_with_resolved_backend_mappings_and_zero_stats() {
833        let mappings = vec![mapping("id")];
834
835        let state = WriterState::new(
836            WriteBackend::Auto,
837            SchemaCheck::Strict,
838            PlanOptions::default(),
839            mappings.clone(),
840        )
841        .unwrap();
842
843        assert_eq!(state.backend(), WriteBackend::DirectRawBulk);
844        assert!(state.direct_encoder().is_some());
845        assert_eq!(state.schema_check(), SchemaCheck::Strict);
846        assert_eq!(state.mappings(), mappings.as_slice());
847        assert_eq!(state.stats(), WriteStats::default());
848    }
849
850    #[test]
851    fn direct_writer_state_builds_encoder_for_supported_mappings() {
852        let mappings = vec![
853            mapping("id32"),
854            SchemaMapping::new(
855                ArrowFieldRef::new(1, "id64".to_owned(), false, DataType::Int64),
856                MssqlColumn::new(Identifier::new("id64").unwrap(), MssqlType::BigInt, false),
857            ),
858            float_mapping_at(2, "score"),
859            SchemaMapping::new(
860                ArrowFieldRef::new(3, "name".to_owned(), true, DataType::Utf8),
861                MssqlColumn::new(
862                    Identifier::new("name").unwrap(),
863                    MssqlType::NVarChar(crate::MssqlTypeLength::Max),
864                    true,
865                ),
866            ),
867        ];
868
869        for backend in [WriteBackend::DirectFramedBulk, WriteBackend::DirectRawBulk] {
870            let state = WriterState::new(
871                backend,
872                SchemaCheck::Strict,
873                PlanOptions::default(),
874                mappings.clone(),
875            )
876            .unwrap();
877
878            assert_eq!(state.backend(), backend);
879            assert!(state.direct_encoder().is_some());
880        }
881    }
882
883    #[test]
884    fn direct_writer_state_rejects_unsupported_mappings() {
885        let mappings = vec![SchemaMapping::new(
886            ArrowFieldRef::new(
887                0,
888                "list_value".to_owned(),
889                true,
890                DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
891            ),
892            MssqlColumn::new(
893                Identifier::new("list_value").unwrap(),
894                MssqlType::NVarChar(MssqlTypeLength::Max),
895                true,
896            ),
897        )];
898
899        let err = WriterState::new(
900            WriteBackend::DirectRawBulk,
901            SchemaCheck::Strict,
902            PlanOptions::default(),
903            mappings,
904        )
905        .unwrap_err();
906
907        let Error::DirectEncoding { diagnostics } = err else {
908            panic!("expected direct encoding error");
909        };
910        assert_eq!(diagnostics.len(), 1);
911        assert_eq!(
912            diagnostics.all()[0].code(),
913            DiagnosticCode::DirectEncodingUnsupportedMapping
914        );
915    }
916
917    #[test]
918    fn writer_state_accumulates_accepted_batch_stats() {
919        let mut state = WriterState::new(
920            WriteBackend::BaselineTokenRow,
921            SchemaCheck::Strict,
922            PlanOptions::default(),
923            Vec::new(),
924        )
925        .unwrap();
926
927        assert_eq!(
928            state.record_accepted_batch(0),
929            WriteStats {
930                rows_written: 0,
931                batches_written: 1
932            }
933        );
934        assert_eq!(
935            state.record_accepted_batch(3),
936            WriteStats {
937                rows_written: 3,
938                batches_written: 2
939            }
940        );
941        assert_eq!(
942            state.record_accepted_batch(5),
943            WriteStats {
944                rows_written: 8,
945                batches_written: 3
946            }
947        );
948    }
949
950    #[test]
951    fn bulk_insert_table_sql_uses_quoted_table_name() {
952        let table = TableName::new("dbo]x", "target.table").unwrap();
953
954        assert_eq!(bulk_insert_table_sql(&table), "[dbo]]x].[target.table]");
955    }
956
957    #[test]
958    fn strict_batch_validation_accepts_supported_rows_without_owning_payloads() {
959        let batch = int32_batch("id", &[1, 2]);
960        let mappings = [mapping("id")];
961        let view = record_batch_view(
962            &batch,
963            &mappings,
964            SchemaCheck::Strict,
965            &PlanOptions::default(),
966        )
967        .unwrap();
968
969        validate_batch_rows(&view).unwrap();
970
971        let row = tiberius_row_owned(&view, 1).unwrap();
972        assert_eq!(row.get(0), Some(&tiberius::ColumnData::I32(Some(2))));
973    }
974
975    #[test]
976    fn strict_batch_view_rejects_runtime_schema_mismatch_before_send() {
977        let batch = int32_batch("renamed_id", &[1]);
978        let err = record_batch_view(
979            &batch,
980            &[mapping("id")],
981            SchemaCheck::Strict,
982            &PlanOptions::default(),
983        )
984        .unwrap_err();
985
986        let Error::ValueConversion { diagnostics } = err else {
987            panic!("expected value conversion error");
988        };
989        assert_eq!(diagnostics.len(), 1);
990        let diagnostic = &diagnostics.all()[0];
991        assert_eq!(diagnostic.code(), DiagnosticCode::SchemaMismatch);
992        assert_eq!(diagnostic.field().map(|field| field.name()), Some("id"));
993    }
994
995    #[test]
996    fn strict_batch_validation_rejects_bad_later_row_before_any_send() {
997        let schema = Arc::new(Schema::new(vec![Field::new(
998            "amount",
999            DataType::Float64,
1000            false,
1001        )]));
1002        let batch = RecordBatch::try_new(
1003            schema,
1004            vec![Arc::new(Float64Array::from(vec![
1005                Some(1.0),
1006                Some(f64::NAN),
1007            ]))],
1008        )
1009        .unwrap();
1010        let mappings = [SchemaMapping::new(
1011            ArrowFieldRef::new(0, "amount".to_owned(), false, DataType::Float64),
1012            MssqlColumn::new(
1013                Identifier::new("amount").unwrap(),
1014                MssqlType::Float { precision: 53 },
1015                false,
1016            ),
1017        )];
1018
1019        let view = record_batch_view(
1020            &batch,
1021            &mappings,
1022            SchemaCheck::Strict,
1023            &PlanOptions::default(),
1024        )
1025        .unwrap();
1026        let err = validate_batch_rows(&view).unwrap_err();
1027
1028        let Error::ValueConversion { diagnostics } = err else {
1029            panic!("expected value conversion error");
1030        };
1031        assert_eq!(diagnostics.len(), 1);
1032        let diagnostic = &diagnostics.all()[0];
1033        assert_eq!(diagnostic.code(), DiagnosticCode::NonFiniteFloat);
1034        assert_eq!(diagnostic.row(), Some(1));
1035    }
1036
1037    #[test]
1038    fn bulk_target_column_validation_accepts_matching_metadata() {
1039        let mappings = vec![mapping("id")];
1040        let columns = vec![bulk_target_column(0, "id", false)];
1041
1042        validate_bulk_target_columns(columns.into_iter(), &mappings).unwrap();
1043    }
1044
1045    #[test]
1046    fn bulk_target_column_validation_rejects_missing_target_columns() {
1047        let mappings = vec![mapping("id")];
1048        let columns = Vec::<FakeBulkTargetColumn>::new();
1049
1050        let err = validate_bulk_target_columns(columns.into_iter(), &mappings).unwrap_err();
1051
1052        let Error::ValueConversion { diagnostics } = err else {
1053            panic!("expected value conversion error");
1054        };
1055        assert_eq!(diagnostics.len(), 1);
1056        assert_eq!(diagnostics.all()[0].code(), DiagnosticCode::SchemaMismatch);
1057        assert_eq!(
1058            diagnostics.all()[0].message(),
1059            "bulk target has 0 updateable column(s) but mappings contain 1 column(s)"
1060        );
1061    }
1062
1063    #[test]
1064    fn bulk_target_column_validation_rejects_ordinal_name_and_nullability_drift() {
1065        let mappings = vec![mapping("id")];
1066        let columns = vec![bulk_target_column(7, "id]; DROP TABLE target;--", true)];
1067
1068        let err = validate_bulk_target_columns(columns.into_iter(), &mappings).unwrap_err();
1069
1070        let Error::ValueConversion { diagnostics } = err else {
1071            panic!("expected value conversion error");
1072        };
1073        assert_eq!(diagnostics.len(), 3);
1074        assert!(
1075            diagnostics
1076                .all()
1077                .iter()
1078                .all(|diagnostic| diagnostic.code() == DiagnosticCode::SchemaMismatch)
1079        );
1080        assert!(
1081            diagnostics
1082                .all()
1083                .iter()
1084                .all(|diagnostic| diagnostic.field().map(|field| field.name()) == Some("id"))
1085        );
1086        assert!(
1087            diagnostics
1088                .all()
1089                .iter()
1090                .any(|diagnostic| diagnostic.message().contains("ordinal 7"))
1091        );
1092        assert!(
1093            diagnostics
1094                .all()
1095                .iter()
1096                .any(|diagnostic| diagnostic.message().contains("DROP TABLE"))
1097        );
1098        assert!(
1099            diagnostics
1100                .all()
1101                .iter()
1102                .any(|diagnostic| diagnostic.message().contains("nullability true"))
1103        );
1104    }
1105
1106    #[test]
1107    fn direct_bulk_target_type_validation_accepts_matching_primitive_metadata() {
1108        let mappings = vec![mapping("id")];
1109        let state = WriterState::new(
1110            WriteBackend::DirectRawBulk,
1111            SchemaCheck::Strict,
1112            PlanOptions::default(),
1113            mappings,
1114        )
1115        .unwrap();
1116        let columns = vec![bulk_target_column_with_type(
1117            0,
1118            "id",
1119            false,
1120            tiberius::ColumnType::Int4,
1121        )];
1122
1123        validate_direct_bulk_target_column_types(
1124            columns.into_iter(),
1125            state.direct_encoder().unwrap().plan(),
1126        )
1127        .unwrap();
1128    }
1129
1130    #[test]
1131    fn direct_bulk_target_type_validation_accepts_issue_75_integer_metadata() {
1132        let mappings = vec![
1133            schema_mapping_at(0, "tiny", DataType::UInt8, MssqlType::TinyInt, false),
1134            schema_mapping_at(1, "signed_tiny", DataType::Int8, MssqlType::SmallInt, false),
1135            schema_mapping_at(2, "small", DataType::Int16, MssqlType::SmallInt, false),
1136            schema_mapping_at(
1137                3,
1138                "unsigned_medium",
1139                DataType::UInt16,
1140                MssqlType::Int,
1141                false,
1142            ),
1143            schema_mapping_at(
1144                4,
1145                "unsigned_total",
1146                DataType::UInt32,
1147                MssqlType::BigInt,
1148                false,
1149            ),
1150        ];
1151        let state = WriterState::new(
1152            WriteBackend::DirectRawBulk,
1153            SchemaCheck::Strict,
1154            PlanOptions::default(),
1155            mappings,
1156        )
1157        .unwrap();
1158        let columns = vec![
1159            bulk_target_column_with_type(0, "tiny", false, tiberius::ColumnType::Int1),
1160            bulk_target_column_with_type(1, "signed_tiny", false, tiberius::ColumnType::Int2),
1161            bulk_target_column_with_type(2, "small", false, tiberius::ColumnType::Int2),
1162            bulk_target_column_with_type(3, "unsigned_medium", false, tiberius::ColumnType::Int4),
1163            bulk_target_column_with_type(4, "unsigned_total", false, tiberius::ColumnType::Int8),
1164        ];
1165
1166        validate_direct_bulk_target_column_types(
1167            columns.into_iter(),
1168            state.direct_encoder().unwrap().plan(),
1169        )
1170        .unwrap();
1171    }
1172
1173    #[test]
1174    fn direct_bulk_target_type_validation_accepts_issue_75_float32_metadata() {
1175        let mappings = vec![schema_mapping_at(
1176            0,
1177            "real_value",
1178            DataType::Float32,
1179            MssqlType::Real,
1180            false,
1181        )];
1182        let state = WriterState::new(
1183            WriteBackend::DirectRawBulk,
1184            SchemaCheck::Strict,
1185            PlanOptions::default(),
1186            mappings,
1187        )
1188        .unwrap();
1189        let columns = vec![bulk_target_column_with_type(
1190            0,
1191            "real_value",
1192            false,
1193            tiberius::ColumnType::Float4,
1194        )];
1195
1196        validate_direct_bulk_target_column_types(
1197            columns.into_iter(),
1198            state.direct_encoder().unwrap().plan(),
1199        )
1200        .unwrap();
1201    }
1202
1203    #[test]
1204    fn direct_bulk_target_type_validation_accepts_uint64_policy_metadata() {
1205        let mappings = vec![
1206            schema_mapping_at(0, "checked", DataType::UInt64, MssqlType::BigInt, false),
1207            schema_mapping_at(
1208                1,
1209                "decimal",
1210                DataType::UInt64,
1211                MssqlType::Decimal {
1212                    precision: 20,
1213                    scale: 0,
1214                },
1215                false,
1216            ),
1217        ];
1218        let state = WriterState::new(
1219            WriteBackend::DirectRawBulk,
1220            SchemaCheck::Strict,
1221            PlanOptions::default(),
1222            mappings,
1223        )
1224        .unwrap();
1225        let columns = vec![
1226            bulk_target_column_with_type(0, "checked", false, tiberius::ColumnType::Int8),
1227            bulk_target_decimal_column(1, "decimal", false, 20, 0),
1228        ];
1229
1230        validate_direct_bulk_target_column_types(
1231            columns.into_iter(),
1232            state.direct_encoder().unwrap().plan(),
1233        )
1234        .unwrap();
1235    }
1236
1237    #[test]
1238    fn direct_bulk_target_type_validation_rejects_uint64_decimal_precision_drift() {
1239        let mappings = vec![schema_mapping_at(
1240            0,
1241            "decimal",
1242            DataType::UInt64,
1243            MssqlType::Decimal {
1244                precision: 20,
1245                scale: 0,
1246            },
1247            false,
1248        )];
1249        let state = WriterState::new(
1250            WriteBackend::DirectRawBulk,
1251            SchemaCheck::Strict,
1252            PlanOptions::default(),
1253            mappings,
1254        )
1255        .unwrap();
1256        let columns = vec![bulk_target_decimal_column(0, "decimal", false, 19, 0)];
1257
1258        let err = validate_direct_bulk_target_column_types(
1259            columns.into_iter(),
1260            state.direct_encoder().unwrap().plan(),
1261        )
1262        .unwrap_err();
1263
1264        let Error::ValueConversion { diagnostics } = err else {
1265            panic!("expected value conversion error");
1266        };
1267        assert_eq!(diagnostics.len(), 1);
1268        let diagnostic = &diagnostics.all()[0];
1269        assert_eq!(diagnostic.code(), DiagnosticCode::SchemaMismatch);
1270        assert!(diagnostic.message().contains("precision/scale (19,0)"));
1271        assert_eq!(
1272            diagnostic
1273                .field()
1274                .map(|field| (field.index(), field.name())),
1275            Some((0, "decimal"))
1276        );
1277    }
1278
1279    #[test]
1280    fn direct_bulk_target_type_validation_accepts_matching_variable_width_metadata() {
1281        let mappings = vec![utf8_mapping_at(0, "name"), binary_mapping_at(1, "payload")];
1282        let state = WriterState::new(
1283            WriteBackend::DirectRawBulk,
1284            SchemaCheck::Strict,
1285            PlanOptions::default(),
1286            mappings,
1287        )
1288        .unwrap();
1289        let columns = vec![
1290            bulk_target_column_with_type(0, "name", false, tiberius::ColumnType::NVarchar),
1291            bulk_target_column_with_type(1, "payload", false, tiberius::ColumnType::BigVarBin),
1292        ];
1293
1294        validate_direct_bulk_target_column_types(
1295            columns.into_iter(),
1296            state.direct_encoder().unwrap().plan(),
1297        )
1298        .unwrap();
1299    }
1300
1301    #[test]
1302    fn direct_bulk_target_type_validation_accepts_matching_large_variable_width_metadata() {
1303        let mappings = vec![
1304            schema_mapping_at(
1305                0,
1306                "large_name",
1307                DataType::LargeUtf8,
1308                MssqlType::NVarChar(MssqlTypeLength::Max),
1309                false,
1310            ),
1311            schema_mapping_at(
1312                1,
1313                "large_payload",
1314                DataType::LargeBinary,
1315                MssqlType::VarBinary(MssqlTypeLength::Max),
1316                false,
1317            ),
1318        ];
1319        let state = WriterState::new(
1320            WriteBackend::DirectRawBulk,
1321            SchemaCheck::Strict,
1322            PlanOptions::default(),
1323            mappings,
1324        )
1325        .unwrap();
1326        let columns = vec![
1327            bulk_target_column_with_type(0, "large_name", false, tiberius::ColumnType::NVarchar),
1328            bulk_target_column_with_type(
1329                1,
1330                "large_payload",
1331                false,
1332                tiberius::ColumnType::BigVarBin,
1333            ),
1334        ];
1335
1336        validate_direct_bulk_target_column_types(
1337            columns.into_iter(),
1338            state.direct_encoder().unwrap().plan(),
1339        )
1340        .unwrap();
1341    }
1342
1343    #[test]
1344    fn direct_bulk_target_type_validation_accepts_fixed_size_binary_metadata() {
1345        let mappings = vec![fixed_size_binary_mapping_at(0, "digest", 32)];
1346        let state = WriterState::new(
1347            WriteBackend::DirectRawBulk,
1348            SchemaCheck::Strict,
1349            PlanOptions::default(),
1350            mappings,
1351        )
1352        .unwrap();
1353        let columns = vec![bulk_target_column_with_type(
1354            0,
1355            "digest",
1356            false,
1357            tiberius::ColumnType::BigBinary,
1358        )];
1359
1360        validate_direct_bulk_target_column_types(
1361            columns.into_iter(),
1362            state.direct_encoder().unwrap().plan(),
1363        )
1364        .unwrap();
1365    }
1366
1367    #[test]
1368    fn direct_bulk_target_type_validation_rejects_fixed_size_binary_as_varbinary() {
1369        let mappings = vec![fixed_size_binary_mapping_at(0, "digest", 32)];
1370        let state = WriterState::new(
1371            WriteBackend::DirectRawBulk,
1372            SchemaCheck::Strict,
1373            PlanOptions::default(),
1374            mappings,
1375        )
1376        .unwrap();
1377        let columns = vec![bulk_target_column_with_type(
1378            0,
1379            "digest",
1380            false,
1381            tiberius::ColumnType::BigVarBin,
1382        )];
1383
1384        let err = validate_direct_bulk_target_column_types(
1385            columns.into_iter(),
1386            state.direct_encoder().unwrap().plan(),
1387        )
1388        .unwrap_err();
1389
1390        let Error::ValueConversion { diagnostics } = err else {
1391            panic!("expected value conversion error");
1392        };
1393        assert_eq!(diagnostics.len(), 1);
1394        let diagnostic = &diagnostics.all()[0];
1395        assert_eq!(diagnostic.code(), DiagnosticCode::SchemaMismatch);
1396        assert_eq!(diagnostic.field().map(|field| field.name()), Some("digest"));
1397        assert!(diagnostic.message().contains(
1398            "bulk target column type BigVarBin does not match direct encoder type BigBinary"
1399        ));
1400    }
1401
1402    #[test]
1403    fn direct_bulk_target_type_validation_accepts_date_metadata() {
1404        let mappings = vec![
1405            SchemaMapping::new(
1406                ArrowFieldRef::new(0, "created_on".to_owned(), true, DataType::Date32),
1407                MssqlColumn::new(
1408                    Identifier::new("created_on").unwrap(),
1409                    MssqlType::Date,
1410                    true,
1411                ),
1412            ),
1413            SchemaMapping::new(
1414                ArrowFieldRef::new(1, "created_at".to_owned(), true, DataType::Date64),
1415                MssqlColumn::new(
1416                    Identifier::new("created_at").unwrap(),
1417                    MssqlType::DateTime2 { precision: 3 },
1418                    true,
1419                ),
1420            ),
1421        ];
1422        let state = WriterState::new(
1423            WriteBackend::DirectRawBulk,
1424            SchemaCheck::Strict,
1425            PlanOptions::default(),
1426            mappings,
1427        )
1428        .unwrap();
1429        let columns = vec![
1430            bulk_target_column_with_type(0, "created_on", true, tiberius::ColumnType::Daten),
1431            bulk_target_column_with_type(1, "created_at", true, tiberius::ColumnType::Datetime2),
1432        ];
1433
1434        validate_direct_bulk_target_column_types(
1435            columns.into_iter(),
1436            state.direct_encoder().unwrap().plan(),
1437        )
1438        .unwrap();
1439    }
1440
1441    #[test]
1442    fn direct_bulk_target_type_validation_rejects_variable_width_type_swap() {
1443        let mappings = vec![utf8_mapping_at(0, "name"), binary_mapping_at(1, "payload")];
1444        let state = WriterState::new(
1445            WriteBackend::DirectRawBulk,
1446            SchemaCheck::Strict,
1447            PlanOptions::default(),
1448            mappings,
1449        )
1450        .unwrap();
1451        let columns = vec![
1452            bulk_target_column_with_type(0, "name", false, tiberius::ColumnType::BigVarBin),
1453            bulk_target_column_with_type(1, "payload", false, tiberius::ColumnType::NVarchar),
1454        ];
1455
1456        let err = validate_direct_bulk_target_column_types(
1457            columns.into_iter(),
1458            state.direct_encoder().unwrap().plan(),
1459        )
1460        .unwrap_err();
1461
1462        let Error::ValueConversion { diagnostics } = err else {
1463            panic!("expected value conversion error");
1464        };
1465        assert_eq!(diagnostics.len(), 2);
1466        assert!(
1467            diagnostics
1468                .all()
1469                .iter()
1470                .any(|diagnostic| diagnostic.message().contains("NVarchar"))
1471        );
1472        assert!(
1473            diagnostics
1474                .all()
1475                .iter()
1476                .any(|diagnostic| diagnostic.message().contains("BigVarBin"))
1477        );
1478    }
1479
1480    #[test]
1481    fn direct_bulk_target_type_validation_rejects_same_name_with_wrong_type() {
1482        let mappings = vec![mapping("id")];
1483        let state = WriterState::new(
1484            WriteBackend::DirectRawBulk,
1485            SchemaCheck::Strict,
1486            PlanOptions::default(),
1487            mappings,
1488        )
1489        .unwrap();
1490        let columns = vec![bulk_target_column_with_type(
1491            0,
1492            "id",
1493            false,
1494            tiberius::ColumnType::Int8,
1495        )];
1496
1497        let err = validate_direct_bulk_target_column_types(
1498            columns.into_iter(),
1499            state.direct_encoder().unwrap().plan(),
1500        )
1501        .unwrap_err();
1502
1503        let Error::ValueConversion { diagnostics } = err else {
1504            panic!("expected value conversion error");
1505        };
1506        assert_eq!(diagnostics.len(), 1);
1507        let diagnostic = &diagnostics.all()[0];
1508        assert_eq!(diagnostic.code(), DiagnosticCode::SchemaMismatch);
1509        assert_eq!(diagnostic.field().map(|field| field.name()), Some("id"));
1510        assert!(
1511            diagnostic
1512                .message()
1513                .contains("bulk target column type Int8 does not match direct encoder type Int4")
1514        );
1515    }
1516
1517    #[test]
1518    fn write_batch_to_sink_accepts_empty_matching_batch() {
1519        let mappings = vec![mapping("id")];
1520        let mut state = WriterState::new(
1521            WriteBackend::BaselineTokenRow,
1522            SchemaCheck::Strict,
1523            PlanOptions::default(),
1524            mappings,
1525        )
1526        .unwrap();
1527        let mut sink = RecordingSink::default();
1528        let batch = int32_batch("id", &[]);
1529
1530        let stats = poll_ready(write_batch_to_sink(&mut state, &mut sink, &batch)).unwrap();
1531
1532        assert_eq!(
1533            stats,
1534            WriteStats {
1535                rows_written: 0,
1536                batches_written: 1
1537            }
1538        );
1539        assert!(sink.rows.is_empty());
1540    }
1541
1542    #[test]
1543    fn write_batch_to_sink_accumulates_multi_batch_stats() {
1544        let mappings = vec![mapping("id")];
1545        let mut state = WriterState::new(
1546            WriteBackend::BaselineTokenRow,
1547            SchemaCheck::Strict,
1548            PlanOptions::default(),
1549            mappings,
1550        )
1551        .unwrap();
1552        let mut sink = RecordingSink::default();
1553
1554        let first = poll_ready(write_batch_to_sink(
1555            &mut state,
1556            &mut sink,
1557            &int32_batch("id", &[10, 20]),
1558        ))
1559        .unwrap();
1560        let second = poll_ready(write_batch_to_sink(
1561            &mut state,
1562            &mut sink,
1563            &int32_batch("id", &[30]),
1564        ))
1565        .unwrap();
1566
1567        assert_eq!(
1568            first,
1569            WriteStats {
1570                rows_written: 2,
1571                batches_written: 1
1572            }
1573        );
1574        assert_eq!(
1575            second,
1576            WriteStats {
1577                rows_written: 3,
1578                batches_written: 2
1579            }
1580        );
1581        assert_eq!(sink.rows.len(), 3);
1582        assert_eq!(
1583            sink.rows[2].get(0),
1584            Some(&tiberius::ColumnData::I32(Some(30)))
1585        );
1586    }
1587
1588    #[test]
1589    fn write_batch_to_sink_conversion_failure_sends_nothing_and_keeps_stats() {
1590        let mappings = vec![float_mapping("amount")];
1591        let mut state = WriterState::new(
1592            WriteBackend::BaselineTokenRow,
1593            SchemaCheck::Strict,
1594            PlanOptions::default(),
1595            mappings,
1596        )
1597        .unwrap();
1598        let mut sink = RecordingSink::default();
1599        let batch = float64_batch("amount", &[Some(1.0), Some(f64::NAN)]);
1600
1601        let err = poll_ready(write_batch_to_sink(&mut state, &mut sink, &batch)).unwrap_err();
1602
1603        let Error::ValueConversion { diagnostics } = err else {
1604            panic!("expected value conversion error");
1605        };
1606        assert_eq!(diagnostics.all()[0].code(), DiagnosticCode::NonFiniteFloat);
1607        assert_eq!(diagnostics.all()[0].row(), Some(1));
1608        assert!(sink.rows.is_empty());
1609        assert_eq!(state.stats(), WriteStats::default());
1610    }
1611
1612    #[test]
1613    fn write_batch_to_sink_send_failure_preserves_error_and_keeps_stats() {
1614        let mappings = vec![mapping("id")];
1615        let mut state = WriterState::new(
1616            WriteBackend::BaselineTokenRow,
1617            SchemaCheck::Strict,
1618            PlanOptions::default(),
1619            mappings,
1620        )
1621        .unwrap();
1622        let mut sink = RecordingSink {
1623            fail_on_send: Some(1),
1624            rows: Vec::new(),
1625        };
1626        let batch = int32_batch("id", &[1, 2, 3]);
1627
1628        let err = poll_ready(write_batch_to_sink(&mut state, &mut sink, &batch)).unwrap_err();
1629
1630        let Error::Tiberius { source } = err else {
1631            panic!("expected tiberius error");
1632        };
1633        assert_eq!(
1634            source.to_string(),
1635            "BULK UPLOAD input failure: fake send failure"
1636        );
1637        assert_eq!(sink.rows.len(), 1);
1638        assert_eq!(state.stats(), WriteStats::default());
1639    }
1640
1641    #[test]
1642    fn write_direct_batch_to_sink_sends_one_checked_payload_per_batch() {
1643        let mappings = vec![mapping("id")];
1644        let mut state = WriterState::new(
1645            WriteBackend::DirectRawBulk,
1646            SchemaCheck::Strict,
1647            PlanOptions::default(),
1648            mappings,
1649        )
1650        .unwrap();
1651        let mut sink = RecordingRawSink::default();
1652        let batch = int32_batch("id", &[10, 20]);
1653
1654        let stats = poll_ready(write_direct_batch_to_sink(&mut state, &mut sink, &batch)).unwrap();
1655
1656        assert_eq!(
1657            stats,
1658            WriteStats {
1659                rows_written: 2,
1660                batches_written: 1
1661            }
1662        );
1663        assert_eq!(sink.payloads.len(), 1);
1664        assert_eq!(sink.payloads[0].row_token_offsets, vec![0, 5]);
1665        assert_eq!(
1666            sink.payloads[0].bytes,
1667            vec![0xD1, 10, 0, 0, 0, 0xD1, 20, 0, 0, 0]
1668        );
1669    }
1670
1671    #[test]
1672    fn write_direct_batch_to_sink_accumulates_multi_batch_stats() {
1673        let mappings = vec![mapping("id")];
1674        let mut state = WriterState::new(
1675            WriteBackend::DirectRawBulk,
1676            SchemaCheck::Strict,
1677            PlanOptions::default(),
1678            mappings,
1679        )
1680        .unwrap();
1681        let mut sink = RecordingRawSink::default();
1682
1683        let first = poll_ready(write_direct_batch_to_sink(
1684            &mut state,
1685            &mut sink,
1686            &int32_batch("id", &[10, 20]),
1687        ))
1688        .unwrap();
1689        let second = poll_ready(write_direct_batch_to_sink(
1690            &mut state,
1691            &mut sink,
1692            &int32_batch("id", &[30]),
1693        ))
1694        .unwrap();
1695
1696        assert_eq!(
1697            first,
1698            WriteStats {
1699                rows_written: 2,
1700                batches_written: 1
1701            }
1702        );
1703        assert_eq!(
1704            second,
1705            WriteStats {
1706                rows_written: 3,
1707                batches_written: 2
1708            }
1709        );
1710        assert_eq!(sink.payloads.len(), 2);
1711        assert_eq!(sink.payloads[1].bytes, vec![0xD1, 30, 0, 0, 0]);
1712    }
1713
1714    #[test]
1715    fn write_direct_batch_to_sink_chunks_measured_payloads_by_byte_limit() {
1716        let mappings = vec![binary_mapping_at(0, "payload")];
1717        let mut state = WriterState::new(
1718            WriteBackend::DirectRawBulk,
1719            SchemaCheck::Strict,
1720            PlanOptions::default(),
1721            mappings,
1722        )
1723        .unwrap();
1724        let mut sink = RecordingRawSink::default();
1725        let row_bytes = vec![0x5a; DIRECT_RAW_MAX_PAYLOAD_BYTES / 2 + 1];
1726        let batch = binary_batch("payload", &[row_bytes.as_slice(), row_bytes.as_slice()]);
1727
1728        let stats = poll_ready(write_direct_batch_to_sink(&mut state, &mut sink, &batch)).unwrap();
1729
1730        assert_eq!(
1731            stats,
1732            WriteStats {
1733                rows_written: 2,
1734                batches_written: 1
1735            }
1736        );
1737        assert_eq!(sink.payloads.len(), 2);
1738        assert_eq!(sink.payloads[0].row_token_offsets, [0]);
1739        assert_eq!(sink.payloads[1].row_token_offsets, [0]);
1740    }
1741
1742    #[test]
1743    fn write_direct_batch_to_sink_skips_send_for_empty_batch_but_records_stats() {
1744        let mappings = vec![mapping("id")];
1745        let mut state = WriterState::new(
1746            WriteBackend::DirectRawBulk,
1747            SchemaCheck::Strict,
1748            PlanOptions::default(),
1749            mappings,
1750        )
1751        .unwrap();
1752        let mut sink = RecordingRawSink::default();
1753        let batch = int32_batch("id", &[]);
1754
1755        let stats = poll_ready(write_direct_batch_to_sink(&mut state, &mut sink, &batch)).unwrap();
1756
1757        assert_eq!(
1758            stats,
1759            WriteStats {
1760                rows_written: 0,
1761                batches_written: 1
1762            }
1763        );
1764        assert!(sink.payloads.is_empty());
1765    }
1766
1767    #[test]
1768    fn write_direct_batch_to_sink_rejects_bad_later_row_before_send() {
1769        let mappings = vec![float_mapping("amount")];
1770        let mut state = WriterState::new(
1771            WriteBackend::DirectRawBulk,
1772            SchemaCheck::Strict,
1773            PlanOptions::default(),
1774            mappings,
1775        )
1776        .unwrap();
1777        let mut sink = RecordingRawSink::default();
1778        let batch = float64_batch("amount", &[Some(1.0), Some(f64::NAN)]);
1779
1780        let err =
1781            poll_ready(write_direct_batch_to_sink(&mut state, &mut sink, &batch)).unwrap_err();
1782
1783        let Error::ValueConversion { diagnostics } = err else {
1784            panic!("expected value conversion error");
1785        };
1786        assert_eq!(diagnostics.all()[0].code(), DiagnosticCode::NonFiniteFloat);
1787        assert_eq!(diagnostics.all()[0].row(), Some(1));
1788        assert!(sink.payloads.is_empty());
1789        assert_eq!(state.stats(), WriteStats::default());
1790    }
1791
1792    #[test]
1793    fn write_direct_batch_to_sink_rejects_uint64_bigint_overflow_before_any_range_send() {
1794        let mappings = vec![schema_mapping_at(
1795            0,
1796            "u64_value",
1797            DataType::UInt64,
1798            MssqlType::BigInt,
1799            false,
1800        )];
1801        let mut state = WriterState::new(
1802            WriteBackend::DirectRawBulk,
1803            SchemaCheck::Strict,
1804            PlanOptions::default(),
1805            mappings,
1806        )
1807        .unwrap();
1808        let mut sink = RecordingRawSink::default();
1809        let row_count = DIRECT_RAW_MAX_PAYLOAD_BYTES / 9 + 2;
1810        let mut values = vec![1_u64; row_count];
1811        values[row_count - 1] = i64::MAX as u64 + 1;
1812        let batch = uint64_batch("u64_value", &values);
1813
1814        let err =
1815            poll_ready(write_direct_batch_to_sink(&mut state, &mut sink, &batch)).unwrap_err();
1816
1817        let Error::ValueConversion { diagnostics } = err else {
1818            panic!("expected value conversion error");
1819        };
1820        assert_eq!(
1821            diagnostics.all()[0].code(),
1822            DiagnosticCode::IntegerOutOfRange
1823        );
1824        assert_eq!(diagnostics.all()[0].row(), Some(row_count - 1));
1825        assert!(sink.payloads.is_empty());
1826        assert_eq!(state.stats(), WriteStats::default());
1827    }
1828
1829    #[test]
1830    fn write_direct_batch_to_sink_rejects_runtime_type_mismatch_before_send() {
1831        let mappings = vec![mapping("id")];
1832        let mut state = WriterState::new(
1833            WriteBackend::DirectRawBulk,
1834            SchemaCheck::Strict,
1835            PlanOptions::default(),
1836            mappings,
1837        )
1838        .unwrap();
1839        let mut sink = RecordingRawSink::default();
1840        let batch = RecordBatch::try_new(
1841            Arc::new(Schema::new(vec![Field::new(
1842                "id",
1843                DataType::Float64,
1844                false,
1845            )])),
1846            vec![Arc::new(Float64Array::from(vec![1.0]))],
1847        )
1848        .unwrap();
1849
1850        let err =
1851            poll_ready(write_direct_batch_to_sink(&mut state, &mut sink, &batch)).unwrap_err();
1852
1853        let Error::ValueConversion { diagnostics } = err else {
1854            panic!("expected value conversion error");
1855        };
1856        assert_eq!(diagnostics.all()[0].code(), DiagnosticCode::SchemaMismatch);
1857        assert!(
1858            diagnostics.all()[0]
1859                .message()
1860                .contains("runtime Arrow type Float64")
1861        );
1862        assert!(sink.payloads.is_empty());
1863        assert_eq!(state.stats(), WriteStats::default());
1864    }
1865
1866    #[test]
1867    fn write_direct_batch_to_sink_send_failure_preserves_error_and_keeps_stats() {
1868        let mappings = vec![mapping("id")];
1869        let mut state = WriterState::new(
1870            WriteBackend::DirectRawBulk,
1871            SchemaCheck::Strict,
1872            PlanOptions::default(),
1873            mappings,
1874        )
1875        .unwrap();
1876        let mut sink = RecordingRawSink {
1877            fail_on_send: true,
1878            payloads: Vec::new(),
1879        };
1880        let batch = int32_batch("id", &[1, 2, 3]);
1881
1882        let err =
1883            poll_ready(write_direct_batch_to_sink(&mut state, &mut sink, &batch)).unwrap_err();
1884
1885        let Error::Tiberius { source } = err else {
1886            panic!("expected tiberius error");
1887        };
1888        assert_eq!(
1889            source.to_string(),
1890            "BULK UPLOAD input failure: fake raw send failure"
1891        );
1892        assert!(sink.payloads.is_empty());
1893        assert_eq!(state.stats(), WriteStats::default());
1894    }
1895
1896    #[test]
1897    fn writer_types_are_exported_from_crate_root() {
1898        assert_eq!(crate::WriteBackend::default(), WriteBackend::Auto);
1899        assert_eq!(crate::WriteOptions::default(), WriteOptions::default());
1900        assert_eq!(crate::WriteStats::default(), WriteStats::default());
1901        let _ = std::any::type_name::<crate::BulkWriter<'static, DummyStream>>();
1902    }
1903
1904    #[test]
1905    fn tiberius_alias_exposes_client_type() {
1906        let name = std::any::type_name::<tiberius::Client<DummyStream>>();
1907
1908        assert!(name.contains("tiberius"));
1909    }
1910
1911    fn mapping(name: &str) -> SchemaMapping {
1912        SchemaMapping::new(
1913            ArrowFieldRef::new(0, name.to_owned(), false, DataType::Int32),
1914            MssqlColumn::new(Identifier::new(name).unwrap(), MssqlType::Int, false),
1915        )
1916    }
1917
1918    fn schema_mapping_at(
1919        index: usize,
1920        name: &str,
1921        arrow_type: DataType,
1922        mssql_type: MssqlType,
1923        nullable: bool,
1924    ) -> SchemaMapping {
1925        SchemaMapping::new(
1926            ArrowFieldRef::new(index, name.to_owned(), nullable, arrow_type),
1927            MssqlColumn::new(Identifier::new(name).unwrap(), mssql_type, nullable),
1928        )
1929    }
1930
1931    fn float_mapping(name: &str) -> SchemaMapping {
1932        float_mapping_at(0, name)
1933    }
1934
1935    fn float_mapping_at(index: usize, name: &str) -> SchemaMapping {
1936        SchemaMapping::new(
1937            ArrowFieldRef::new(index, name.to_owned(), false, DataType::Float64),
1938            MssqlColumn::new(
1939                Identifier::new(name).unwrap(),
1940                MssqlType::Float { precision: 53 },
1941                false,
1942            ),
1943        )
1944    }
1945
1946    fn utf8_mapping_at(index: usize, name: &str) -> SchemaMapping {
1947        SchemaMapping::new(
1948            ArrowFieldRef::new(index, name.to_owned(), false, DataType::Utf8),
1949            MssqlColumn::new(
1950                Identifier::new(name).unwrap(),
1951                MssqlType::NVarChar(MssqlTypeLength::Max),
1952                false,
1953            ),
1954        )
1955    }
1956
1957    fn binary_mapping_at(index: usize, name: &str) -> SchemaMapping {
1958        SchemaMapping::new(
1959            ArrowFieldRef::new(index, name.to_owned(), false, DataType::Binary),
1960            MssqlColumn::new(
1961                Identifier::new(name).unwrap(),
1962                MssqlType::VarBinary(MssqlTypeLength::Max),
1963                false,
1964            ),
1965        )
1966    }
1967
1968    fn fixed_size_binary_mapping_at(index: usize, name: &str, length: usize) -> SchemaMapping {
1969        SchemaMapping::new(
1970            ArrowFieldRef::new(
1971                index,
1972                name.to_owned(),
1973                false,
1974                DataType::FixedSizeBinary(i32::try_from(length).unwrap()),
1975            ),
1976            MssqlColumn::new(
1977                Identifier::new(name).unwrap(),
1978                MssqlType::Binary(length),
1979                false,
1980            ),
1981        )
1982    }
1983
1984    fn int32_batch(name: &str, values: &[i32]) -> RecordBatch {
1985        let schema = Arc::new(Schema::new(vec![Field::new(name, DataType::Int32, false)]));
1986        let array = Arc::new(Int32Array::from(values.to_vec()));
1987
1988        RecordBatch::try_new(schema, vec![array]).unwrap()
1989    }
1990
1991    fn uint64_batch(name: &str, values: &[u64]) -> RecordBatch {
1992        let schema = Arc::new(Schema::new(vec![Field::new(name, DataType::UInt64, false)]));
1993        let array = Arc::new(UInt64Array::from(values.to_vec()));
1994
1995        RecordBatch::try_new(schema, vec![array]).unwrap()
1996    }
1997
1998    fn binary_batch(name: &str, values: &[&[u8]]) -> RecordBatch {
1999        let schema = Arc::new(Schema::new(vec![Field::new(name, DataType::Binary, false)]));
2000        let array = Arc::new(BinaryArray::from_iter_values(values.iter().copied()));
2001
2002        RecordBatch::try_new(schema, vec![array]).unwrap()
2003    }
2004
2005    fn bulk_target_column(ordinal: usize, name: &str, nullable: bool) -> FakeBulkTargetColumn {
2006        bulk_target_column_with_type(ordinal, name, nullable, tiberius::ColumnType::Int4)
2007    }
2008
2009    fn bulk_target_column_with_type(
2010        ordinal: usize,
2011        name: &str,
2012        nullable: bool,
2013        column_type: tiberius::ColumnType,
2014    ) -> FakeBulkTargetColumn {
2015        FakeBulkTargetColumn {
2016            ordinal,
2017            name: name.to_owned(),
2018            nullable,
2019            column_type,
2020            decimal_precision_scale: None,
2021        }
2022    }
2023
2024    fn bulk_target_decimal_column(
2025        ordinal: usize,
2026        name: &str,
2027        nullable: bool,
2028        precision: u8,
2029        scale: u8,
2030    ) -> FakeBulkTargetColumn {
2031        FakeBulkTargetColumn {
2032            ordinal,
2033            name: name.to_owned(),
2034            nullable,
2035            column_type: tiberius::ColumnType::Decimaln,
2036            decimal_precision_scale: Some((precision, scale)),
2037        }
2038    }
2039
2040    fn float64_batch(name: &str, values: &[Option<f64>]) -> RecordBatch {
2041        let schema = Arc::new(Schema::new(vec![Field::new(
2042            name,
2043            DataType::Float64,
2044            false,
2045        )]));
2046        let array = Arc::new(Float64Array::from(values.to_vec()));
2047
2048        RecordBatch::try_new(schema, vec![array]).unwrap()
2049    }
2050
2051    fn poll_ready<F>(future: F) -> F::Output
2052    where
2053        F: Future,
2054    {
2055        let waker = Waker::from(Arc::new(NoopWake));
2056        let mut context = Context::from_waker(&waker);
2057        let mut future = Box::pin(future);
2058
2059        match future.as_mut().poll(&mut context) {
2060            Poll::Ready(output) => output,
2061            Poll::Pending => panic!("future unexpectedly returned pending"),
2062        }
2063    }
2064
2065    #[derive(Debug, Default)]
2066    struct RecordingSink {
2067        fail_on_send: Option<usize>,
2068        rows: Vec<tiberius::TokenRow<'static>>,
2069    }
2070
2071    #[derive(Debug, Default)]
2072    struct RecordingRawSink {
2073        fail_on_send: bool,
2074        payloads: Vec<RecordedRawPayload>,
2075    }
2076
2077    #[derive(Debug, PartialEq, Eq)]
2078    struct RecordedRawPayload {
2079        bytes: Vec<u8>,
2080        row_token_offsets: Vec<usize>,
2081    }
2082
2083    impl RawRowsSink for RecordingRawSink {
2084        async fn send_measured_raw_rows(
2085            &mut self,
2086            encoder: &DirectEncoder,
2087            batch: &RecordBatch,
2088            measured: &MeasuredDirectBatch,
2089            range: MeasuredRowRange,
2090        ) -> crate::Result<()> {
2091            let payload =
2092                encoder.encode_measured_batch_range(batch, measured, range.start, range.len)?;
2093
2094            if self.fail_on_send {
2095                return Err(Error::Tiberius {
2096                    source: tiberius::error::Error::BulkInput(Cow::Borrowed(
2097                        "fake raw send failure",
2098                    )),
2099                });
2100            }
2101
2102            self.payloads.push(RecordedRawPayload {
2103                bytes: payload.bytes().to_vec(),
2104                row_token_offsets: payload.row_token_offsets().to_vec(),
2105            });
2106            Ok(())
2107        }
2108    }
2109
2110    impl TokenRowSink for RecordingSink {
2111        async fn send_token_row(&mut self, row: tiberius::TokenRow<'static>) -> crate::Result<()> {
2112            if self.fail_on_send == Some(self.rows.len()) {
2113                return Err(Error::Tiberius {
2114                    source: tiberius::error::Error::BulkInput(Cow::Borrowed("fake send failure")),
2115                });
2116            }
2117
2118            self.rows.push(row);
2119            Ok(())
2120        }
2121    }
2122
2123    #[derive(Debug)]
2124    struct FakeBulkTargetColumn {
2125        ordinal: usize,
2126        name: String,
2127        nullable: bool,
2128        column_type: tiberius::ColumnType,
2129        decimal_precision_scale: Option<(u8, u8)>,
2130    }
2131
2132    impl BulkTargetColumnMetadata for FakeBulkTargetColumn {
2133        fn ordinal(&self) -> usize {
2134            self.ordinal
2135        }
2136
2137        fn name(&self) -> &str {
2138            &self.name
2139        }
2140
2141        fn is_nullable(&self) -> bool {
2142            self.nullable
2143        }
2144
2145        fn column_type(&self) -> tiberius::ColumnType {
2146            self.column_type
2147        }
2148
2149        fn decimal_precision_scale(&self) -> Option<(u8, u8)> {
2150            self.decimal_precision_scale
2151        }
2152    }
2153
2154    struct NoopWake;
2155
2156    impl Wake for NoopWake {
2157        fn wake(self: Arc<Self>) {}
2158    }
2159
2160    #[derive(Debug)]
2161    struct DummyStream;
2162
2163    impl AsyncRead for DummyStream {
2164        fn poll_read(
2165            self: Pin<&mut Self>,
2166            _cx: &mut Context<'_>,
2167            _buf: &mut [u8],
2168        ) -> Poll<std::io::Result<usize>> {
2169            Poll::Ready(Ok(0))
2170        }
2171    }
2172
2173    impl AsyncWrite for DummyStream {
2174        fn poll_write(
2175            self: Pin<&mut Self>,
2176            _cx: &mut Context<'_>,
2177            buf: &[u8],
2178        ) -> Poll<std::io::Result<usize>> {
2179            Poll::Ready(Ok(buf.len()))
2180        }
2181
2182        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
2183            Poll::Ready(Ok(()))
2184        }
2185
2186        fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
2187            Poll::Ready(Ok(()))
2188        }
2189    }
2190}