1use std::borrow::Cow;
4
5use arrow_array::RecordBatch;
6use futures_util::io::{AsyncRead, AsyncWrite};
7
8use crate::{
9 Diagnostic, DiagnosticCode, DiagnosticSet, FieldRef, PlanOptions, Result, SchemaMapping,
10 TableName,
11};
12
13use super::{
14 SchemaCheck,
15 direct::{
16 DirectEncoder, MeasuredDirectBatch, MeasuredRowRange,
17 plan::{DirectColumnEncoding, DirectColumnPlan, DirectEncoderPlan},
18 },
19 profile,
20 record_batch::RecordBatchView,
21 token_row::tiberius_row_owned,
22};
23use crate::conversion::arrow_to_mssql::{
24 fixed_size_binary::FixedSizeBinaryArrowToMssql, primitive::PrimitiveArrowToMssql,
25 temporal::TemporalArrowToMssql, variable_width::VariableWidthArrowToMssql,
26};
27
28const DIRECT_RAW_MAX_PAYLOAD_BYTES: usize = 8 * 1024 * 1024;
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
32pub enum WriteBackend {
33 #[default]
35 Auto,
36 BaselineTokenRow,
38 DirectFramedBulk,
40 DirectRawBulk,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
46pub struct WriteOptions {
47 pub backend: WriteBackend,
49 pub schema_check: SchemaCheck,
51 pub plan_options: PlanOptions,
53}
54
55#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
57pub struct WriteStats {
58 pub rows_written: u64,
60 pub batches_written: u64,
62}
63
64#[derive(Debug)]
65struct WriterState {
66 backend: WriteBackend,
67 direct_encoder: Option<DirectEncoder>,
68 schema_check: SchemaCheck,
69 plan_options: PlanOptions,
70 mappings: Vec<SchemaMapping>,
71 stats: WriteStats,
72}
73
74impl WriterState {
75 fn new(
76 requested_backend: WriteBackend,
77 schema_check: SchemaCheck,
78 plan_options: PlanOptions,
79 mappings: Vec<SchemaMapping>,
80 ) -> Result<Self> {
81 let backend = resolve_backend(requested_backend)?;
82 let direct_encoder = match backend {
83 WriteBackend::DirectFramedBulk | WriteBackend::DirectRawBulk => {
84 Some(DirectEncoder::new_with_options(&mappings, plan_options)?)
85 }
86 WriteBackend::Auto | WriteBackend::BaselineTokenRow => None,
87 };
88
89 Ok(Self {
90 backend,
91 direct_encoder,
92 schema_check,
93 plan_options,
94 mappings,
95 stats: WriteStats::default(),
96 })
97 }
98
99 fn backend(&self) -> WriteBackend {
100 self.backend
101 }
102
103 fn direct_encoder(&self) -> Option<&DirectEncoder> {
104 self.direct_encoder.as_ref()
105 }
106
107 fn mappings(&self) -> &[SchemaMapping] {
108 &self.mappings
109 }
110
111 fn schema_check(&self) -> SchemaCheck {
112 self.schema_check
113 }
114
115 fn plan_options(&self) -> &PlanOptions {
116 &self.plan_options
117 }
118
119 fn stats(&self) -> WriteStats {
120 self.stats
121 }
122
123 fn record_accepted_batch(&mut self, rows: u64) -> WriteStats {
124 self.stats.rows_written = self.stats.rows_written.saturating_add(rows);
125 self.stats.batches_written = self.stats.batches_written.saturating_add(1);
126 self.stats
127 }
128}
129
130#[derive(Debug)]
132pub struct BulkWriter<'client, S>
133where
134 S: AsyncRead + AsyncWrite + Unpin + Send,
135{
136 state: WriterState,
137 request: tiberius::BulkLoadRequest<'client, S>,
138}
139
140impl<'client, S> BulkWriter<'client, S>
141where
142 S: AsyncRead + AsyncWrite + Unpin + Send,
143{
144 pub async fn new(
146 client: &'client mut tiberius::Client<S>,
147 table: TableName,
148 mappings: Vec<SchemaMapping>,
149 options: WriteOptions,
150 ) -> Result<Self> {
151 let state = WriterState::new(
152 options.backend,
153 options.schema_check,
154 options.plan_options,
155 mappings,
156 )?;
157 let mut request = match state.backend() {
158 WriteBackend::BaselineTokenRow
159 | WriteBackend::DirectFramedBulk
160 | WriteBackend::DirectRawBulk => {
161 let table_sql = bulk_insert_table_sql(&table);
162 let columns = client
163 .bulk_insert_columns(&table_sql)
164 .await
165 .map_err(|source| crate::Error::Tiberius { source })?;
166 validate_bulk_target_columns(columns.iter(), state.mappings())?;
167 if matches!(
168 state.backend(),
169 WriteBackend::DirectFramedBulk | WriteBackend::DirectRawBulk
170 ) {
171 let encoder =
172 state
173 .direct_encoder()
174 .ok_or_else(|| crate::Error::BackendUnavailable {
175 backend: state.backend(),
176 reason: "direct bulk encoder is not available for this writer"
177 .to_owned(),
178 })?;
179 validate_direct_bulk_target_column_types(columns.iter(), encoder.plan())?;
180 }
181 client
182 .bulk_insert_with_columns(&table_sql, columns)
183 .await
184 .map_err(|source| crate::Error::Tiberius { source })?
185 }
186 WriteBackend::Auto => {
187 return Err(execution_unavailable(state.backend()));
188 }
189 };
190
191 if state.backend() == WriteBackend::DirectRawBulk {
192 request.enable_direct_packet_writes();
193 }
194
195 Ok(Self { state, request })
196 }
197
198 pub async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteStats> {
200 match self.state.backend() {
201 WriteBackend::BaselineTokenRow => {
202 write_batch_to_sink(&mut self.state, &mut self.request, batch).await
203 }
204 WriteBackend::DirectFramedBulk | WriteBackend::DirectRawBulk => {
205 write_direct_batch_to_sink(&mut self.state, &mut self.request, batch).await
206 }
207 WriteBackend::Auto => Err(execution_unavailable(WriteBackend::Auto)),
208 }
209 }
210
211 pub async fn finish(self) -> Result<WriteStats> {
213 let Self { state, request } = self;
214 let stats = state.stats();
215
216 #[cfg(feature = "bench-profile")]
217 {
218 let (_result, stats) = request
219 .finalize_with_stats()
220 .await
221 .map_err(|source| crate::Error::Tiberius { source })?;
222 profile::record_bulk_load_stats(stats);
223 }
224
225 #[cfg(not(feature = "bench-profile"))]
226 request
227 .finalize()
228 .await
229 .map_err(|source| crate::Error::Tiberius { source })?;
230
231 Ok(stats)
232 }
233}
234
235fn bulk_insert_table_sql(table: &TableName) -> String {
236 table.quoted_sql()
237}
238
239fn record_batch_view<'a>(
240 batch: &'a RecordBatch,
241 mappings: &'a [SchemaMapping],
242 schema_check: SchemaCheck,
243 plan_options: &PlanOptions,
244) -> Result<RecordBatchView<'a>> {
245 match schema_check {
246 SchemaCheck::Strict => RecordBatchView::new_with_options(batch, mappings, plan_options),
247 }
248}
249
250fn validate_batch_rows(view: &RecordBatchView<'_>) -> Result<()> {
251 for row_index in 0..view.row_count() {
252 let _cells = view.mssql_row(row_index)?;
253 }
254
255 Ok(())
256}
257
258fn validate_bulk_target_columns<Column>(
259 columns: impl ExactSizeIterator<Item = Column>,
260 mappings: &[SchemaMapping],
261) -> Result<()>
262where
263 Column: BulkTargetColumnMetadata,
264{
265 let column_count = columns.len();
266 let mut diagnostics = DiagnosticSet::new();
267
268 if column_count != mappings.len() {
269 diagnostics.push(Diagnostic::error(
270 DiagnosticCode::SchemaMismatch,
271 format!(
272 "bulk target has {column_count} updateable column(s) but mappings contain {} column(s)",
273 mappings.len()
274 ),
275 ));
276 }
277
278 for (position, (column, mapping)) in columns.zip(mappings).enumerate() {
279 validate_bulk_target_column(position, column, mapping, &mut diagnostics);
280 }
281
282 if diagnostics.has_errors() {
283 return Err(crate::Error::ValueConversion { diagnostics });
284 }
285
286 Ok(())
287}
288
289fn validate_bulk_target_column(
290 position: usize,
291 column: impl BulkTargetColumnMetadata,
292 mapping: &SchemaMapping,
293 diagnostics: &mut DiagnosticSet,
294) {
295 if column.ordinal() != position {
296 diagnostics.push(bulk_target_column_diagnostic(
297 mapping,
298 format!(
299 "bulk target column ordinal {} does not match mapping position {position}",
300 column.ordinal()
301 ),
302 ));
303 }
304
305 if column.name() != mapping.mssql().name().as_str() {
306 diagnostics.push(bulk_target_column_diagnostic(
307 mapping,
308 format!(
309 "bulk target column name {} does not match planned MSSQL column name {}",
310 column.name(),
311 mapping.mssql().name().as_str()
312 ),
313 ));
314 }
315
316 if column.is_nullable() != mapping.mssql().nullable() {
317 diagnostics.push(bulk_target_column_diagnostic(
318 mapping,
319 format!(
320 "bulk target column nullability {} does not match planned MSSQL column nullability {}",
321 column.is_nullable(),
322 mapping.mssql().nullable()
323 ),
324 ));
325 }
326}
327
328fn validate_direct_bulk_target_column_types<Column>(
329 columns: impl ExactSizeIterator<Item = Column>,
330 plan: &DirectEncoderPlan,
331) -> Result<()>
332where
333 Column: BulkTargetColumnMetadata,
334{
335 let column_count = columns.len();
336 let mut diagnostics = DiagnosticSet::new();
337
338 if column_count != plan.column_count() {
339 diagnostics.push(Diagnostic::error(
340 DiagnosticCode::SchemaMismatch,
341 format!(
342 "bulk target has {column_count} updateable column(s) but direct plan contains {} column(s)",
343 plan.column_count()
344 ),
345 ));
346 }
347
348 for (column, plan_column) in columns.zip(plan.columns()) {
349 validate_direct_bulk_target_column_type(column, plan_column, &mut diagnostics);
350 }
351
352 if diagnostics.has_errors() {
353 return Err(crate::Error::ValueConversion { diagnostics });
354 }
355
356 Ok(())
357}
358
359fn validate_direct_bulk_target_column_type(
360 column: impl BulkTargetColumnMetadata,
361 plan_column: &DirectColumnPlan,
362 diagnostics: &mut DiagnosticSet,
363) {
364 let Some(expected) = expected_direct_bulk_column_type(plan_column) else {
365 diagnostics.push(
366 Diagnostic::error(
367 DiagnosticCode::DirectEncodingUnsupportedMapping,
368 format!(
369 "direct target type validation is not implemented for {:?}",
370 plan_column.encoding()
371 ),
372 )
373 .with_field(FieldRef::new(
374 plan_column.source_index(),
375 plan_column.source_name(),
376 )),
377 );
378 return;
379 };
380 let actual = column.column_type();
381
382 if actual != expected {
383 diagnostics.push(
384 Diagnostic::error(
385 DiagnosticCode::SchemaMismatch,
386 format!(
387 "bulk target column type {actual:?} does not match direct encoder type {expected:?}"
388 ),
389 )
390 .with_field(FieldRef::new(
391 plan_column.source_index(),
392 plan_column.source_name(),
393 )),
394 );
395 }
396
397 if let Some((expected_precision, expected_scale)) =
398 expected_direct_decimal_precision_scale(plan_column)
399 {
400 match column.decimal_precision_scale() {
401 Some((actual_precision, actual_scale))
402 if actual_precision == expected_precision && actual_scale == expected_scale => {}
403 Some((actual_precision, actual_scale)) => diagnostics.push(
404 Diagnostic::error(
405 DiagnosticCode::SchemaMismatch,
406 format!(
407 "bulk target decimal precision/scale ({actual_precision},{actual_scale}) does not match direct encoder precision/scale ({expected_precision},{expected_scale})"
408 ),
409 )
410 .with_field(FieldRef::new(
411 plan_column.source_index(),
412 plan_column.source_name(),
413 )),
414 ),
415 None => diagnostics.push(
416 Diagnostic::error(
417 DiagnosticCode::SchemaMismatch,
418 "bulk target decimal precision/scale metadata is not available",
419 )
420 .with_field(FieldRef::new(
421 plan_column.source_index(),
422 plan_column.source_name(),
423 )),
424 ),
425 }
426 }
427}
428
429fn expected_direct_bulk_column_type(column: &DirectColumnPlan) -> Option<tiberius::ColumnType> {
430 match column.encoding() {
431 DirectColumnEncoding::Primitive(PrimitiveArrowToMssql::BooleanToBit) => {
432 if column.nullable() {
433 Some(tiberius::ColumnType::Bitn)
434 } else {
435 Some(tiberius::ColumnType::Bit)
436 }
437 }
438 DirectColumnEncoding::Primitive(PrimitiveArrowToMssql::UInt8ToTinyInt) => {
439 Some(tiberius::ColumnType::Int1)
440 }
441 DirectColumnEncoding::Primitive(
442 PrimitiveArrowToMssql::Int8ToSmallInt | PrimitiveArrowToMssql::Int16ToSmallInt,
443 ) => Some(tiberius::ColumnType::Int2),
444 DirectColumnEncoding::Primitive(PrimitiveArrowToMssql::Int32ToInt) => {
445 Some(tiberius::ColumnType::Int4)
446 }
447 DirectColumnEncoding::Primitive(PrimitiveArrowToMssql::UInt16ToInt) => {
448 Some(tiberius::ColumnType::Int4)
449 }
450 DirectColumnEncoding::Primitive(PrimitiveArrowToMssql::Int64ToBigInt) => {
451 Some(tiberius::ColumnType::Int8)
452 }
453 DirectColumnEncoding::Primitive(PrimitiveArrowToMssql::UInt32ToBigInt) => {
454 Some(tiberius::ColumnType::Int8)
455 }
456 DirectColumnEncoding::Primitive(PrimitiveArrowToMssql::UInt64ToCheckedBigInt) => {
457 Some(tiberius::ColumnType::Int8)
458 }
459 DirectColumnEncoding::Primitive(
460 PrimitiveArrowToMssql::Float16ToReal | PrimitiveArrowToMssql::Float32ToReal,
461 ) => Some(tiberius::ColumnType::Float4),
462 DirectColumnEncoding::Primitive(PrimitiveArrowToMssql::Float64ToFloat) => {
463 Some(tiberius::ColumnType::Float8)
464 }
465 DirectColumnEncoding::UInt64Decimal20_0 | DirectColumnEncoding::Decimal(_) => {
466 Some(tiberius::ColumnType::Decimaln)
467 }
468 DirectColumnEncoding::VariableWidth(
469 VariableWidthArrowToMssql::Utf8ToNVarChar { .. }
470 | VariableWidthArrowToMssql::LargeUtf8ToNVarChar { .. },
471 ) => Some(tiberius::ColumnType::NVarchar),
472 DirectColumnEncoding::VariableWidth(
473 VariableWidthArrowToMssql::BinaryToVarBinary { .. }
474 | VariableWidthArrowToMssql::LargeBinaryToVarBinary { .. },
475 ) => Some(tiberius::ColumnType::BigVarBin),
476 DirectColumnEncoding::FixedSizeBinary(
477 FixedSizeBinaryArrowToMssql::FixedSizeBinaryToBinary { .. },
478 ) => Some(tiberius::ColumnType::BigBinary),
479 DirectColumnEncoding::Temporal(TemporalArrowToMssql::Date32ToDate) => {
480 Some(tiberius::ColumnType::Daten)
481 }
482 DirectColumnEncoding::Temporal(TemporalArrowToMssql::Date64ToDateTime2) => {
483 Some(tiberius::ColumnType::Datetime2)
484 }
485 DirectColumnEncoding::Temporal(
486 TemporalArrowToMssql::TimestampSecondToDateTime2
487 | TemporalArrowToMssql::TimestampMillisecondToDateTime2
488 | TemporalArrowToMssql::TimestampMicrosecondToDateTime2
489 | TemporalArrowToMssql::TimestampNanosecondToDateTime2
490 | TemporalArrowToMssql::TimestampSecondTzToDateTime2
491 | TemporalArrowToMssql::TimestampMillisecondTzToDateTime2
492 | TemporalArrowToMssql::TimestampMicrosecondTzToDateTime2
493 | TemporalArrowToMssql::TimestampNanosecondTzToDateTime2,
494 ) => Some(tiberius::ColumnType::Datetime2),
495 DirectColumnEncoding::Temporal(
496 TemporalArrowToMssql::Time32SecondToTime
497 | TemporalArrowToMssql::Time32MillisecondToTime
498 | TemporalArrowToMssql::Time64MicrosecondToTime
499 | TemporalArrowToMssql::Time64NanosecondToTime,
500 ) => Some(tiberius::ColumnType::Timen),
501 DirectColumnEncoding::Temporal(
502 TemporalArrowToMssql::TimestampSecondTzToDateTimeOffset
503 | TemporalArrowToMssql::TimestampMillisecondTzToDateTimeOffset
504 | TemporalArrowToMssql::TimestampMicrosecondTzToDateTimeOffset
505 | TemporalArrowToMssql::TimestampNanosecondTzToDateTimeOffset,
506 ) => Some(tiberius::ColumnType::DatetimeOffsetn),
507 }
508}
509
510fn expected_direct_decimal_precision_scale(column: &DirectColumnPlan) -> Option<(u8, u8)> {
511 match column.encoding() {
512 DirectColumnEncoding::UInt64Decimal20_0 => Some((20, 0)),
513 DirectColumnEncoding::Decimal(classification) => Some((
514 classification.target_precision(),
515 classification.target_scale(),
516 )),
517 _ => None,
518 }
519}
520
521fn bulk_target_column_diagnostic(
522 mapping: &SchemaMapping,
523 message: impl Into<String>,
524) -> Diagnostic {
525 Diagnostic::error(DiagnosticCode::SchemaMismatch, message).with_field(FieldRef::new(
526 mapping.arrow().index(),
527 mapping.arrow().name(),
528 ))
529}
530
531trait BulkTargetColumnMetadata {
532 fn ordinal(&self) -> usize;
533
534 fn name(&self) -> &str;
535
536 fn is_nullable(&self) -> bool;
537
538 fn column_type(&self) -> tiberius::ColumnType;
539
540 fn decimal_precision_scale(&self) -> Option<(u8, u8)> {
541 None
542 }
543}
544
545impl BulkTargetColumnMetadata for tiberius::BulkLoadColumn<'_> {
546 fn ordinal(&self) -> usize {
547 self.ordinal()
548 }
549
550 fn name(&self) -> &str {
551 self.name()
552 }
553
554 fn is_nullable(&self) -> bool {
555 self.is_nullable()
556 }
557
558 fn column_type(&self) -> tiberius::ColumnType {
559 self.column_type()
560 }
561
562 fn decimal_precision_scale(&self) -> Option<(u8, u8)> {
563 match self.type_info() {
564 tiberius::TypeInfo::VarLenSizedPrecision {
565 ty: tiberius::VarLenType::Decimaln | tiberius::VarLenType::Numericn,
566 precision,
567 scale,
568 ..
569 } => Some((*precision, *scale)),
570 _ => None,
571 }
572 }
573}
574
575async fn write_batch_to_sink<Sink>(
576 state: &mut WriterState,
577 sink: &mut Sink,
578 batch: &RecordBatch,
579) -> Result<WriteStats>
580where
581 Sink: TokenRowSink,
582{
583 let view = record_batch_view(
584 batch,
585 state.mappings(),
586 state.schema_check(),
587 state.plan_options(),
588 )?;
589 validate_batch_rows(&view)?;
590 let rows_written = usize_to_u64_saturating(view.row_count());
591
592 for row_index in 0..view.row_count() {
593 let row = tiberius_row_owned(&view, row_index)?;
594 sink.send_token_row(row).await?;
595 }
596
597 Ok(state.record_accepted_batch(rows_written))
598}
599
600trait TokenRowSink {
601 async fn send_token_row(&mut self, row: tiberius::TokenRow<'static>) -> Result<()>;
602}
603
604impl<S> TokenRowSink for tiberius::BulkLoadRequest<'_, S>
605where
606 S: AsyncRead + AsyncWrite + Unpin + Send,
607{
608 async fn send_token_row(&mut self, row: tiberius::TokenRow<'static>) -> Result<()> {
609 self.send(row)
610 .await
611 .map_err(|source| crate::Error::Tiberius { source })
612 }
613}
614
615async fn write_direct_batch_to_sink<Sink>(
616 state: &mut WriterState,
617 sink: &mut Sink,
618 batch: &RecordBatch,
619) -> Result<WriteStats>
620where
621 Sink: RawRowsSink,
622{
623 let encoder = state
624 .direct_encoder()
625 .ok_or_else(|| crate::Error::BackendUnavailable {
626 backend: state.backend(),
627 reason: "direct bulk encoder is not available for this writer".to_owned(),
628 })?;
629 let measure_start = std::time::Instant::now();
630 let measured = encoder.measure_batch(batch);
631 let measured = profile::record_elapsed(measure_start, profile::record_measure_batch, measured)?;
632 let rows_written = usize_to_u64_saturating(measured.row_count());
633
634 let split_start = std::time::Instant::now();
635 let ranges = measured.row_ranges(DIRECT_RAW_MAX_PAYLOAD_BYTES);
636 let ranges = profile::record_elapsed(split_start, profile::record_row_range_split, ranges)?;
637
638 for range in ranges {
639 sink.send_measured_raw_rows(encoder, batch, &measured, range)
640 .await?;
641 }
642
643 profile::record_accepted_batch(measured.row_count());
644 Ok(state.record_accepted_batch(rows_written))
645}
646
647trait RawRowsSink {
648 async fn send_measured_raw_rows(
649 &mut self,
650 encoder: &DirectEncoder,
651 batch: &RecordBatch,
652 measured: &MeasuredDirectBatch,
653 range: MeasuredRowRange,
654 ) -> Result<()>;
655}
656
657impl<S> RawRowsSink for tiberius::BulkLoadRequest<'_, S>
658where
659 S: AsyncRead + AsyncWrite + Unpin + Send,
660{
661 async fn send_measured_raw_rows(
662 &mut self,
663 encoder: &DirectEncoder,
664 batch: &RecordBatch,
665 measured: &MeasuredDirectBatch,
666 range: MeasuredRowRange,
667 ) -> Result<()> {
668 let encoded_bytes = measured.range_payload_len(range.start, range.len)?;
669 profile::record_row_range(encoded_bytes);
670
671 if !encoder.has_variable_width_column() {
672 let encode_start = std::time::Instant::now();
673 let payload =
674 encoder.encode_measured_batch_range(batch, measured, range.start, range.len)?;
675 profile::record_append_encode(encode_start.elapsed());
676
677 let send_start = std::time::Instant::now();
678 let send_result = self
679 .send_raw_rows_payload_checked(payload.bytes(), payload.row_token_offsets())
680 .await
681 .map_err(|source| crate::Error::Tiberius { source });
682 profile::record_send_total(send_start.elapsed());
683 return send_result;
684 }
685
686 let mut encode_error = None;
687 let send_start = std::time::Instant::now();
688 let send_result = self
689 .send_raw_rows_with(|buf| {
690 let encode_start = std::time::Instant::now();
691 let encoded = encoder.encode_measured_batch_range_into(
692 batch,
693 measured,
694 range.start,
695 range.len,
696 buf,
697 );
698 profile::record_append_encode(encode_start.elapsed());
699
700 match encoded {
701 Ok(append) => Ok(append),
702 Err(err) => {
703 encode_error = Some(err);
704 Err(tiberius::error::Error::BulkInput(Cow::Borrowed(
705 "direct raw row encoding failed",
706 )))
707 }
708 }
709 })
710 .await;
711 profile::record_send_total(send_start.elapsed());
712
713 if let Some(err) = encode_error {
714 return Err(err);
715 }
716
717 send_result.map_err(|source| crate::Error::Tiberius { source })
718 }
719}
720
721fn usize_to_u64_saturating(value: usize) -> u64 {
722 u64::try_from(value).unwrap_or(u64::MAX)
723}
724
725fn resolve_backend(requested_backend: WriteBackend) -> Result<WriteBackend> {
726 match requested_backend {
727 WriteBackend::Auto | WriteBackend::DirectRawBulk => Ok(WriteBackend::DirectRawBulk),
728 WriteBackend::BaselineTokenRow => Ok(WriteBackend::BaselineTokenRow),
729 WriteBackend::DirectFramedBulk => Ok(WriteBackend::DirectFramedBulk),
730 }
731}
732
733fn execution_unavailable(backend: WriteBackend) -> crate::Error {
734 crate::Error::BackendUnavailable {
735 backend,
736 reason: "bulk writer execution is not implemented yet".to_owned(),
737 }
738}
739
740#[cfg(test)]
741mod tests {
742 use std::{
743 borrow::Cow,
744 future::Future,
745 pin::Pin,
746 sync::Arc,
747 task::{Context, Poll, Wake, Waker},
748 };
749
750 use arrow_array::{BinaryArray, Float64Array, Int32Array, RecordBatch, UInt64Array};
751 use arrow_schema::{DataType, Field, Schema};
752 use futures_util::io::{AsyncRead, AsyncWrite};
753
754 use super::{
755 BulkTargetColumnMetadata, DIRECT_RAW_MAX_PAYLOAD_BYTES, DirectEncoder, MeasuredDirectBatch,
756 MeasuredRowRange, RawRowsSink, TokenRowSink, WriteBackend, WriteOptions, WriteStats,
757 WriterState, bulk_insert_table_sql, record_batch_view, resolve_backend, tiberius_row_owned,
758 validate_batch_rows, validate_bulk_target_columns,
759 validate_direct_bulk_target_column_types, write_batch_to_sink, write_direct_batch_to_sink,
760 };
761 use crate::{
762 ArrowFieldRef, DiagnosticCode, Error, Identifier, MssqlColumn, MssqlType, MssqlTypeLength,
763 PlanOptions, SchemaCheck, SchemaMapping, TableName,
764 };
765
766 #[test]
767 fn write_backend_defaults_to_auto() {
768 assert_eq!(WriteBackend::default(), WriteBackend::Auto);
769 }
770
771 #[test]
772 fn write_options_default_to_auto_backend_and_strict_schema_check() {
773 let options = WriteOptions::default();
774
775 assert_eq!(options.backend, WriteBackend::Auto);
776 assert_eq!(options.schema_check, SchemaCheck::Strict);
777 assert_eq!(options.plan_options, PlanOptions::default());
778 }
779
780 #[test]
781 fn write_options_preserve_explicit_backend_selection() {
782 for backend in [
783 WriteBackend::Auto,
784 WriteBackend::BaselineTokenRow,
785 WriteBackend::DirectFramedBulk,
786 WriteBackend::DirectRawBulk,
787 ] {
788 let options = WriteOptions {
789 backend,
790 schema_check: SchemaCheck::Strict,
791 ..WriteOptions::default()
792 };
793
794 assert_eq!(options.backend, backend);
795 assert_eq!(options.schema_check, SchemaCheck::Strict);
796 }
797 }
798
799 #[test]
800 fn write_stats_default_to_zero() {
801 let stats = WriteStats::default();
802
803 assert_eq!(stats.rows_written, 0);
804 assert_eq!(stats.batches_written, 0);
805 }
806
807 #[test]
808 fn auto_backend_resolves_to_direct_raw_bulk() {
809 assert_eq!(
810 resolve_backend(WriteBackend::Auto).unwrap(),
811 WriteBackend::DirectRawBulk
812 );
813 }
814
815 #[test]
816 fn explicit_backends_resolve_to_requested_backend() {
817 assert_eq!(
818 resolve_backend(WriteBackend::BaselineTokenRow).unwrap(),
819 WriteBackend::BaselineTokenRow
820 );
821 assert_eq!(
822 resolve_backend(WriteBackend::DirectFramedBulk).unwrap(),
823 WriteBackend::DirectFramedBulk
824 );
825 assert_eq!(
826 resolve_backend(WriteBackend::DirectRawBulk).unwrap(),
827 WriteBackend::DirectRawBulk
828 );
829 }
830
831 #[test]
832 fn writer_state_starts_with_resolved_backend_mappings_and_zero_stats() {
833 let mappings = vec![mapping("id")];
834
835 let state = WriterState::new(
836 WriteBackend::Auto,
837 SchemaCheck::Strict,
838 PlanOptions::default(),
839 mappings.clone(),
840 )
841 .unwrap();
842
843 assert_eq!(state.backend(), WriteBackend::DirectRawBulk);
844 assert!(state.direct_encoder().is_some());
845 assert_eq!(state.schema_check(), SchemaCheck::Strict);
846 assert_eq!(state.mappings(), mappings.as_slice());
847 assert_eq!(state.stats(), WriteStats::default());
848 }
849
850 #[test]
851 fn direct_writer_state_builds_encoder_for_supported_mappings() {
852 let mappings = vec![
853 mapping("id32"),
854 SchemaMapping::new(
855 ArrowFieldRef::new(1, "id64".to_owned(), false, DataType::Int64),
856 MssqlColumn::new(Identifier::new("id64").unwrap(), MssqlType::BigInt, false),
857 ),
858 float_mapping_at(2, "score"),
859 SchemaMapping::new(
860 ArrowFieldRef::new(3, "name".to_owned(), true, DataType::Utf8),
861 MssqlColumn::new(
862 Identifier::new("name").unwrap(),
863 MssqlType::NVarChar(crate::MssqlTypeLength::Max),
864 true,
865 ),
866 ),
867 ];
868
869 for backend in [WriteBackend::DirectFramedBulk, WriteBackend::DirectRawBulk] {
870 let state = WriterState::new(
871 backend,
872 SchemaCheck::Strict,
873 PlanOptions::default(),
874 mappings.clone(),
875 )
876 .unwrap();
877
878 assert_eq!(state.backend(), backend);
879 assert!(state.direct_encoder().is_some());
880 }
881 }
882
883 #[test]
884 fn direct_writer_state_rejects_unsupported_mappings() {
885 let mappings = vec![SchemaMapping::new(
886 ArrowFieldRef::new(
887 0,
888 "list_value".to_owned(),
889 true,
890 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
891 ),
892 MssqlColumn::new(
893 Identifier::new("list_value").unwrap(),
894 MssqlType::NVarChar(MssqlTypeLength::Max),
895 true,
896 ),
897 )];
898
899 let err = WriterState::new(
900 WriteBackend::DirectRawBulk,
901 SchemaCheck::Strict,
902 PlanOptions::default(),
903 mappings,
904 )
905 .unwrap_err();
906
907 let Error::DirectEncoding { diagnostics } = err else {
908 panic!("expected direct encoding error");
909 };
910 assert_eq!(diagnostics.len(), 1);
911 assert_eq!(
912 diagnostics.all()[0].code(),
913 DiagnosticCode::DirectEncodingUnsupportedMapping
914 );
915 }
916
917 #[test]
918 fn writer_state_accumulates_accepted_batch_stats() {
919 let mut state = WriterState::new(
920 WriteBackend::BaselineTokenRow,
921 SchemaCheck::Strict,
922 PlanOptions::default(),
923 Vec::new(),
924 )
925 .unwrap();
926
927 assert_eq!(
928 state.record_accepted_batch(0),
929 WriteStats {
930 rows_written: 0,
931 batches_written: 1
932 }
933 );
934 assert_eq!(
935 state.record_accepted_batch(3),
936 WriteStats {
937 rows_written: 3,
938 batches_written: 2
939 }
940 );
941 assert_eq!(
942 state.record_accepted_batch(5),
943 WriteStats {
944 rows_written: 8,
945 batches_written: 3
946 }
947 );
948 }
949
950 #[test]
951 fn bulk_insert_table_sql_uses_quoted_table_name() {
952 let table = TableName::new("dbo]x", "target.table").unwrap();
953
954 assert_eq!(bulk_insert_table_sql(&table), "[dbo]]x].[target.table]");
955 }
956
957 #[test]
958 fn strict_batch_validation_accepts_supported_rows_without_owning_payloads() {
959 let batch = int32_batch("id", &[1, 2]);
960 let mappings = [mapping("id")];
961 let view = record_batch_view(
962 &batch,
963 &mappings,
964 SchemaCheck::Strict,
965 &PlanOptions::default(),
966 )
967 .unwrap();
968
969 validate_batch_rows(&view).unwrap();
970
971 let row = tiberius_row_owned(&view, 1).unwrap();
972 assert_eq!(row.get(0), Some(&tiberius::ColumnData::I32(Some(2))));
973 }
974
975 #[test]
976 fn strict_batch_view_rejects_runtime_schema_mismatch_before_send() {
977 let batch = int32_batch("renamed_id", &[1]);
978 let err = record_batch_view(
979 &batch,
980 &[mapping("id")],
981 SchemaCheck::Strict,
982 &PlanOptions::default(),
983 )
984 .unwrap_err();
985
986 let Error::ValueConversion { diagnostics } = err else {
987 panic!("expected value conversion error");
988 };
989 assert_eq!(diagnostics.len(), 1);
990 let diagnostic = &diagnostics.all()[0];
991 assert_eq!(diagnostic.code(), DiagnosticCode::SchemaMismatch);
992 assert_eq!(diagnostic.field().map(|field| field.name()), Some("id"));
993 }
994
995 #[test]
996 fn strict_batch_validation_rejects_bad_later_row_before_any_send() {
997 let schema = Arc::new(Schema::new(vec![Field::new(
998 "amount",
999 DataType::Float64,
1000 false,
1001 )]));
1002 let batch = RecordBatch::try_new(
1003 schema,
1004 vec![Arc::new(Float64Array::from(vec![
1005 Some(1.0),
1006 Some(f64::NAN),
1007 ]))],
1008 )
1009 .unwrap();
1010 let mappings = [SchemaMapping::new(
1011 ArrowFieldRef::new(0, "amount".to_owned(), false, DataType::Float64),
1012 MssqlColumn::new(
1013 Identifier::new("amount").unwrap(),
1014 MssqlType::Float { precision: 53 },
1015 false,
1016 ),
1017 )];
1018
1019 let view = record_batch_view(
1020 &batch,
1021 &mappings,
1022 SchemaCheck::Strict,
1023 &PlanOptions::default(),
1024 )
1025 .unwrap();
1026 let err = validate_batch_rows(&view).unwrap_err();
1027
1028 let Error::ValueConversion { diagnostics } = err else {
1029 panic!("expected value conversion error");
1030 };
1031 assert_eq!(diagnostics.len(), 1);
1032 let diagnostic = &diagnostics.all()[0];
1033 assert_eq!(diagnostic.code(), DiagnosticCode::NonFiniteFloat);
1034 assert_eq!(diagnostic.row(), Some(1));
1035 }
1036
1037 #[test]
1038 fn bulk_target_column_validation_accepts_matching_metadata() {
1039 let mappings = vec![mapping("id")];
1040 let columns = vec![bulk_target_column(0, "id", false)];
1041
1042 validate_bulk_target_columns(columns.into_iter(), &mappings).unwrap();
1043 }
1044
1045 #[test]
1046 fn bulk_target_column_validation_rejects_missing_target_columns() {
1047 let mappings = vec![mapping("id")];
1048 let columns = Vec::<FakeBulkTargetColumn>::new();
1049
1050 let err = validate_bulk_target_columns(columns.into_iter(), &mappings).unwrap_err();
1051
1052 let Error::ValueConversion { diagnostics } = err else {
1053 panic!("expected value conversion error");
1054 };
1055 assert_eq!(diagnostics.len(), 1);
1056 assert_eq!(diagnostics.all()[0].code(), DiagnosticCode::SchemaMismatch);
1057 assert_eq!(
1058 diagnostics.all()[0].message(),
1059 "bulk target has 0 updateable column(s) but mappings contain 1 column(s)"
1060 );
1061 }
1062
1063 #[test]
1064 fn bulk_target_column_validation_rejects_ordinal_name_and_nullability_drift() {
1065 let mappings = vec![mapping("id")];
1066 let columns = vec![bulk_target_column(7, "id]; DROP TABLE target;--", true)];
1067
1068 let err = validate_bulk_target_columns(columns.into_iter(), &mappings).unwrap_err();
1069
1070 let Error::ValueConversion { diagnostics } = err else {
1071 panic!("expected value conversion error");
1072 };
1073 assert_eq!(diagnostics.len(), 3);
1074 assert!(
1075 diagnostics
1076 .all()
1077 .iter()
1078 .all(|diagnostic| diagnostic.code() == DiagnosticCode::SchemaMismatch)
1079 );
1080 assert!(
1081 diagnostics
1082 .all()
1083 .iter()
1084 .all(|diagnostic| diagnostic.field().map(|field| field.name()) == Some("id"))
1085 );
1086 assert!(
1087 diagnostics
1088 .all()
1089 .iter()
1090 .any(|diagnostic| diagnostic.message().contains("ordinal 7"))
1091 );
1092 assert!(
1093 diagnostics
1094 .all()
1095 .iter()
1096 .any(|diagnostic| diagnostic.message().contains("DROP TABLE"))
1097 );
1098 assert!(
1099 diagnostics
1100 .all()
1101 .iter()
1102 .any(|diagnostic| diagnostic.message().contains("nullability true"))
1103 );
1104 }
1105
1106 #[test]
1107 fn direct_bulk_target_type_validation_accepts_matching_primitive_metadata() {
1108 let mappings = vec![mapping("id")];
1109 let state = WriterState::new(
1110 WriteBackend::DirectRawBulk,
1111 SchemaCheck::Strict,
1112 PlanOptions::default(),
1113 mappings,
1114 )
1115 .unwrap();
1116 let columns = vec![bulk_target_column_with_type(
1117 0,
1118 "id",
1119 false,
1120 tiberius::ColumnType::Int4,
1121 )];
1122
1123 validate_direct_bulk_target_column_types(
1124 columns.into_iter(),
1125 state.direct_encoder().unwrap().plan(),
1126 )
1127 .unwrap();
1128 }
1129
1130 #[test]
1131 fn direct_bulk_target_type_validation_accepts_issue_75_integer_metadata() {
1132 let mappings = vec![
1133 schema_mapping_at(0, "tiny", DataType::UInt8, MssqlType::TinyInt, false),
1134 schema_mapping_at(1, "signed_tiny", DataType::Int8, MssqlType::SmallInt, false),
1135 schema_mapping_at(2, "small", DataType::Int16, MssqlType::SmallInt, false),
1136 schema_mapping_at(
1137 3,
1138 "unsigned_medium",
1139 DataType::UInt16,
1140 MssqlType::Int,
1141 false,
1142 ),
1143 schema_mapping_at(
1144 4,
1145 "unsigned_total",
1146 DataType::UInt32,
1147 MssqlType::BigInt,
1148 false,
1149 ),
1150 ];
1151 let state = WriterState::new(
1152 WriteBackend::DirectRawBulk,
1153 SchemaCheck::Strict,
1154 PlanOptions::default(),
1155 mappings,
1156 )
1157 .unwrap();
1158 let columns = vec![
1159 bulk_target_column_with_type(0, "tiny", false, tiberius::ColumnType::Int1),
1160 bulk_target_column_with_type(1, "signed_tiny", false, tiberius::ColumnType::Int2),
1161 bulk_target_column_with_type(2, "small", false, tiberius::ColumnType::Int2),
1162 bulk_target_column_with_type(3, "unsigned_medium", false, tiberius::ColumnType::Int4),
1163 bulk_target_column_with_type(4, "unsigned_total", false, tiberius::ColumnType::Int8),
1164 ];
1165
1166 validate_direct_bulk_target_column_types(
1167 columns.into_iter(),
1168 state.direct_encoder().unwrap().plan(),
1169 )
1170 .unwrap();
1171 }
1172
1173 #[test]
1174 fn direct_bulk_target_type_validation_accepts_issue_75_float32_metadata() {
1175 let mappings = vec![schema_mapping_at(
1176 0,
1177 "real_value",
1178 DataType::Float32,
1179 MssqlType::Real,
1180 false,
1181 )];
1182 let state = WriterState::new(
1183 WriteBackend::DirectRawBulk,
1184 SchemaCheck::Strict,
1185 PlanOptions::default(),
1186 mappings,
1187 )
1188 .unwrap();
1189 let columns = vec![bulk_target_column_with_type(
1190 0,
1191 "real_value",
1192 false,
1193 tiberius::ColumnType::Float4,
1194 )];
1195
1196 validate_direct_bulk_target_column_types(
1197 columns.into_iter(),
1198 state.direct_encoder().unwrap().plan(),
1199 )
1200 .unwrap();
1201 }
1202
1203 #[test]
1204 fn direct_bulk_target_type_validation_accepts_uint64_policy_metadata() {
1205 let mappings = vec![
1206 schema_mapping_at(0, "checked", DataType::UInt64, MssqlType::BigInt, false),
1207 schema_mapping_at(
1208 1,
1209 "decimal",
1210 DataType::UInt64,
1211 MssqlType::Decimal {
1212 precision: 20,
1213 scale: 0,
1214 },
1215 false,
1216 ),
1217 ];
1218 let state = WriterState::new(
1219 WriteBackend::DirectRawBulk,
1220 SchemaCheck::Strict,
1221 PlanOptions::default(),
1222 mappings,
1223 )
1224 .unwrap();
1225 let columns = vec![
1226 bulk_target_column_with_type(0, "checked", false, tiberius::ColumnType::Int8),
1227 bulk_target_decimal_column(1, "decimal", false, 20, 0),
1228 ];
1229
1230 validate_direct_bulk_target_column_types(
1231 columns.into_iter(),
1232 state.direct_encoder().unwrap().plan(),
1233 )
1234 .unwrap();
1235 }
1236
1237 #[test]
1238 fn direct_bulk_target_type_validation_rejects_uint64_decimal_precision_drift() {
1239 let mappings = vec![schema_mapping_at(
1240 0,
1241 "decimal",
1242 DataType::UInt64,
1243 MssqlType::Decimal {
1244 precision: 20,
1245 scale: 0,
1246 },
1247 false,
1248 )];
1249 let state = WriterState::new(
1250 WriteBackend::DirectRawBulk,
1251 SchemaCheck::Strict,
1252 PlanOptions::default(),
1253 mappings,
1254 )
1255 .unwrap();
1256 let columns = vec![bulk_target_decimal_column(0, "decimal", false, 19, 0)];
1257
1258 let err = validate_direct_bulk_target_column_types(
1259 columns.into_iter(),
1260 state.direct_encoder().unwrap().plan(),
1261 )
1262 .unwrap_err();
1263
1264 let Error::ValueConversion { diagnostics } = err else {
1265 panic!("expected value conversion error");
1266 };
1267 assert_eq!(diagnostics.len(), 1);
1268 let diagnostic = &diagnostics.all()[0];
1269 assert_eq!(diagnostic.code(), DiagnosticCode::SchemaMismatch);
1270 assert!(diagnostic.message().contains("precision/scale (19,0)"));
1271 assert_eq!(
1272 diagnostic
1273 .field()
1274 .map(|field| (field.index(), field.name())),
1275 Some((0, "decimal"))
1276 );
1277 }
1278
1279 #[test]
1280 fn direct_bulk_target_type_validation_accepts_matching_variable_width_metadata() {
1281 let mappings = vec![utf8_mapping_at(0, "name"), binary_mapping_at(1, "payload")];
1282 let state = WriterState::new(
1283 WriteBackend::DirectRawBulk,
1284 SchemaCheck::Strict,
1285 PlanOptions::default(),
1286 mappings,
1287 )
1288 .unwrap();
1289 let columns = vec![
1290 bulk_target_column_with_type(0, "name", false, tiberius::ColumnType::NVarchar),
1291 bulk_target_column_with_type(1, "payload", false, tiberius::ColumnType::BigVarBin),
1292 ];
1293
1294 validate_direct_bulk_target_column_types(
1295 columns.into_iter(),
1296 state.direct_encoder().unwrap().plan(),
1297 )
1298 .unwrap();
1299 }
1300
1301 #[test]
1302 fn direct_bulk_target_type_validation_accepts_matching_large_variable_width_metadata() {
1303 let mappings = vec![
1304 schema_mapping_at(
1305 0,
1306 "large_name",
1307 DataType::LargeUtf8,
1308 MssqlType::NVarChar(MssqlTypeLength::Max),
1309 false,
1310 ),
1311 schema_mapping_at(
1312 1,
1313 "large_payload",
1314 DataType::LargeBinary,
1315 MssqlType::VarBinary(MssqlTypeLength::Max),
1316 false,
1317 ),
1318 ];
1319 let state = WriterState::new(
1320 WriteBackend::DirectRawBulk,
1321 SchemaCheck::Strict,
1322 PlanOptions::default(),
1323 mappings,
1324 )
1325 .unwrap();
1326 let columns = vec![
1327 bulk_target_column_with_type(0, "large_name", false, tiberius::ColumnType::NVarchar),
1328 bulk_target_column_with_type(
1329 1,
1330 "large_payload",
1331 false,
1332 tiberius::ColumnType::BigVarBin,
1333 ),
1334 ];
1335
1336 validate_direct_bulk_target_column_types(
1337 columns.into_iter(),
1338 state.direct_encoder().unwrap().plan(),
1339 )
1340 .unwrap();
1341 }
1342
1343 #[test]
1344 fn direct_bulk_target_type_validation_accepts_fixed_size_binary_metadata() {
1345 let mappings = vec![fixed_size_binary_mapping_at(0, "digest", 32)];
1346 let state = WriterState::new(
1347 WriteBackend::DirectRawBulk,
1348 SchemaCheck::Strict,
1349 PlanOptions::default(),
1350 mappings,
1351 )
1352 .unwrap();
1353 let columns = vec![bulk_target_column_with_type(
1354 0,
1355 "digest",
1356 false,
1357 tiberius::ColumnType::BigBinary,
1358 )];
1359
1360 validate_direct_bulk_target_column_types(
1361 columns.into_iter(),
1362 state.direct_encoder().unwrap().plan(),
1363 )
1364 .unwrap();
1365 }
1366
1367 #[test]
1368 fn direct_bulk_target_type_validation_rejects_fixed_size_binary_as_varbinary() {
1369 let mappings = vec![fixed_size_binary_mapping_at(0, "digest", 32)];
1370 let state = WriterState::new(
1371 WriteBackend::DirectRawBulk,
1372 SchemaCheck::Strict,
1373 PlanOptions::default(),
1374 mappings,
1375 )
1376 .unwrap();
1377 let columns = vec![bulk_target_column_with_type(
1378 0,
1379 "digest",
1380 false,
1381 tiberius::ColumnType::BigVarBin,
1382 )];
1383
1384 let err = validate_direct_bulk_target_column_types(
1385 columns.into_iter(),
1386 state.direct_encoder().unwrap().plan(),
1387 )
1388 .unwrap_err();
1389
1390 let Error::ValueConversion { diagnostics } = err else {
1391 panic!("expected value conversion error");
1392 };
1393 assert_eq!(diagnostics.len(), 1);
1394 let diagnostic = &diagnostics.all()[0];
1395 assert_eq!(diagnostic.code(), DiagnosticCode::SchemaMismatch);
1396 assert_eq!(diagnostic.field().map(|field| field.name()), Some("digest"));
1397 assert!(diagnostic.message().contains(
1398 "bulk target column type BigVarBin does not match direct encoder type BigBinary"
1399 ));
1400 }
1401
1402 #[test]
1403 fn direct_bulk_target_type_validation_accepts_date_metadata() {
1404 let mappings = vec![
1405 SchemaMapping::new(
1406 ArrowFieldRef::new(0, "created_on".to_owned(), true, DataType::Date32),
1407 MssqlColumn::new(
1408 Identifier::new("created_on").unwrap(),
1409 MssqlType::Date,
1410 true,
1411 ),
1412 ),
1413 SchemaMapping::new(
1414 ArrowFieldRef::new(1, "created_at".to_owned(), true, DataType::Date64),
1415 MssqlColumn::new(
1416 Identifier::new("created_at").unwrap(),
1417 MssqlType::DateTime2 { precision: 3 },
1418 true,
1419 ),
1420 ),
1421 ];
1422 let state = WriterState::new(
1423 WriteBackend::DirectRawBulk,
1424 SchemaCheck::Strict,
1425 PlanOptions::default(),
1426 mappings,
1427 )
1428 .unwrap();
1429 let columns = vec![
1430 bulk_target_column_with_type(0, "created_on", true, tiberius::ColumnType::Daten),
1431 bulk_target_column_with_type(1, "created_at", true, tiberius::ColumnType::Datetime2),
1432 ];
1433
1434 validate_direct_bulk_target_column_types(
1435 columns.into_iter(),
1436 state.direct_encoder().unwrap().plan(),
1437 )
1438 .unwrap();
1439 }
1440
1441 #[test]
1442 fn direct_bulk_target_type_validation_rejects_variable_width_type_swap() {
1443 let mappings = vec![utf8_mapping_at(0, "name"), binary_mapping_at(1, "payload")];
1444 let state = WriterState::new(
1445 WriteBackend::DirectRawBulk,
1446 SchemaCheck::Strict,
1447 PlanOptions::default(),
1448 mappings,
1449 )
1450 .unwrap();
1451 let columns = vec![
1452 bulk_target_column_with_type(0, "name", false, tiberius::ColumnType::BigVarBin),
1453 bulk_target_column_with_type(1, "payload", false, tiberius::ColumnType::NVarchar),
1454 ];
1455
1456 let err = validate_direct_bulk_target_column_types(
1457 columns.into_iter(),
1458 state.direct_encoder().unwrap().plan(),
1459 )
1460 .unwrap_err();
1461
1462 let Error::ValueConversion { diagnostics } = err else {
1463 panic!("expected value conversion error");
1464 };
1465 assert_eq!(diagnostics.len(), 2);
1466 assert!(
1467 diagnostics
1468 .all()
1469 .iter()
1470 .any(|diagnostic| diagnostic.message().contains("NVarchar"))
1471 );
1472 assert!(
1473 diagnostics
1474 .all()
1475 .iter()
1476 .any(|diagnostic| diagnostic.message().contains("BigVarBin"))
1477 );
1478 }
1479
1480 #[test]
1481 fn direct_bulk_target_type_validation_rejects_same_name_with_wrong_type() {
1482 let mappings = vec![mapping("id")];
1483 let state = WriterState::new(
1484 WriteBackend::DirectRawBulk,
1485 SchemaCheck::Strict,
1486 PlanOptions::default(),
1487 mappings,
1488 )
1489 .unwrap();
1490 let columns = vec![bulk_target_column_with_type(
1491 0,
1492 "id",
1493 false,
1494 tiberius::ColumnType::Int8,
1495 )];
1496
1497 let err = validate_direct_bulk_target_column_types(
1498 columns.into_iter(),
1499 state.direct_encoder().unwrap().plan(),
1500 )
1501 .unwrap_err();
1502
1503 let Error::ValueConversion { diagnostics } = err else {
1504 panic!("expected value conversion error");
1505 };
1506 assert_eq!(diagnostics.len(), 1);
1507 let diagnostic = &diagnostics.all()[0];
1508 assert_eq!(diagnostic.code(), DiagnosticCode::SchemaMismatch);
1509 assert_eq!(diagnostic.field().map(|field| field.name()), Some("id"));
1510 assert!(
1511 diagnostic
1512 .message()
1513 .contains("bulk target column type Int8 does not match direct encoder type Int4")
1514 );
1515 }
1516
1517 #[test]
1518 fn write_batch_to_sink_accepts_empty_matching_batch() {
1519 let mappings = vec![mapping("id")];
1520 let mut state = WriterState::new(
1521 WriteBackend::BaselineTokenRow,
1522 SchemaCheck::Strict,
1523 PlanOptions::default(),
1524 mappings,
1525 )
1526 .unwrap();
1527 let mut sink = RecordingSink::default();
1528 let batch = int32_batch("id", &[]);
1529
1530 let stats = poll_ready(write_batch_to_sink(&mut state, &mut sink, &batch)).unwrap();
1531
1532 assert_eq!(
1533 stats,
1534 WriteStats {
1535 rows_written: 0,
1536 batches_written: 1
1537 }
1538 );
1539 assert!(sink.rows.is_empty());
1540 }
1541
1542 #[test]
1543 fn write_batch_to_sink_accumulates_multi_batch_stats() {
1544 let mappings = vec![mapping("id")];
1545 let mut state = WriterState::new(
1546 WriteBackend::BaselineTokenRow,
1547 SchemaCheck::Strict,
1548 PlanOptions::default(),
1549 mappings,
1550 )
1551 .unwrap();
1552 let mut sink = RecordingSink::default();
1553
1554 let first = poll_ready(write_batch_to_sink(
1555 &mut state,
1556 &mut sink,
1557 &int32_batch("id", &[10, 20]),
1558 ))
1559 .unwrap();
1560 let second = poll_ready(write_batch_to_sink(
1561 &mut state,
1562 &mut sink,
1563 &int32_batch("id", &[30]),
1564 ))
1565 .unwrap();
1566
1567 assert_eq!(
1568 first,
1569 WriteStats {
1570 rows_written: 2,
1571 batches_written: 1
1572 }
1573 );
1574 assert_eq!(
1575 second,
1576 WriteStats {
1577 rows_written: 3,
1578 batches_written: 2
1579 }
1580 );
1581 assert_eq!(sink.rows.len(), 3);
1582 assert_eq!(
1583 sink.rows[2].get(0),
1584 Some(&tiberius::ColumnData::I32(Some(30)))
1585 );
1586 }
1587
1588 #[test]
1589 fn write_batch_to_sink_conversion_failure_sends_nothing_and_keeps_stats() {
1590 let mappings = vec![float_mapping("amount")];
1591 let mut state = WriterState::new(
1592 WriteBackend::BaselineTokenRow,
1593 SchemaCheck::Strict,
1594 PlanOptions::default(),
1595 mappings,
1596 )
1597 .unwrap();
1598 let mut sink = RecordingSink::default();
1599 let batch = float64_batch("amount", &[Some(1.0), Some(f64::NAN)]);
1600
1601 let err = poll_ready(write_batch_to_sink(&mut state, &mut sink, &batch)).unwrap_err();
1602
1603 let Error::ValueConversion { diagnostics } = err else {
1604 panic!("expected value conversion error");
1605 };
1606 assert_eq!(diagnostics.all()[0].code(), DiagnosticCode::NonFiniteFloat);
1607 assert_eq!(diagnostics.all()[0].row(), Some(1));
1608 assert!(sink.rows.is_empty());
1609 assert_eq!(state.stats(), WriteStats::default());
1610 }
1611
1612 #[test]
1613 fn write_batch_to_sink_send_failure_preserves_error_and_keeps_stats() {
1614 let mappings = vec![mapping("id")];
1615 let mut state = WriterState::new(
1616 WriteBackend::BaselineTokenRow,
1617 SchemaCheck::Strict,
1618 PlanOptions::default(),
1619 mappings,
1620 )
1621 .unwrap();
1622 let mut sink = RecordingSink {
1623 fail_on_send: Some(1),
1624 rows: Vec::new(),
1625 };
1626 let batch = int32_batch("id", &[1, 2, 3]);
1627
1628 let err = poll_ready(write_batch_to_sink(&mut state, &mut sink, &batch)).unwrap_err();
1629
1630 let Error::Tiberius { source } = err else {
1631 panic!("expected tiberius error");
1632 };
1633 assert_eq!(
1634 source.to_string(),
1635 "BULK UPLOAD input failure: fake send failure"
1636 );
1637 assert_eq!(sink.rows.len(), 1);
1638 assert_eq!(state.stats(), WriteStats::default());
1639 }
1640
1641 #[test]
1642 fn write_direct_batch_to_sink_sends_one_checked_payload_per_batch() {
1643 let mappings = vec![mapping("id")];
1644 let mut state = WriterState::new(
1645 WriteBackend::DirectRawBulk,
1646 SchemaCheck::Strict,
1647 PlanOptions::default(),
1648 mappings,
1649 )
1650 .unwrap();
1651 let mut sink = RecordingRawSink::default();
1652 let batch = int32_batch("id", &[10, 20]);
1653
1654 let stats = poll_ready(write_direct_batch_to_sink(&mut state, &mut sink, &batch)).unwrap();
1655
1656 assert_eq!(
1657 stats,
1658 WriteStats {
1659 rows_written: 2,
1660 batches_written: 1
1661 }
1662 );
1663 assert_eq!(sink.payloads.len(), 1);
1664 assert_eq!(sink.payloads[0].row_token_offsets, vec![0, 5]);
1665 assert_eq!(
1666 sink.payloads[0].bytes,
1667 vec![0xD1, 10, 0, 0, 0, 0xD1, 20, 0, 0, 0]
1668 );
1669 }
1670
1671 #[test]
1672 fn write_direct_batch_to_sink_accumulates_multi_batch_stats() {
1673 let mappings = vec![mapping("id")];
1674 let mut state = WriterState::new(
1675 WriteBackend::DirectRawBulk,
1676 SchemaCheck::Strict,
1677 PlanOptions::default(),
1678 mappings,
1679 )
1680 .unwrap();
1681 let mut sink = RecordingRawSink::default();
1682
1683 let first = poll_ready(write_direct_batch_to_sink(
1684 &mut state,
1685 &mut sink,
1686 &int32_batch("id", &[10, 20]),
1687 ))
1688 .unwrap();
1689 let second = poll_ready(write_direct_batch_to_sink(
1690 &mut state,
1691 &mut sink,
1692 &int32_batch("id", &[30]),
1693 ))
1694 .unwrap();
1695
1696 assert_eq!(
1697 first,
1698 WriteStats {
1699 rows_written: 2,
1700 batches_written: 1
1701 }
1702 );
1703 assert_eq!(
1704 second,
1705 WriteStats {
1706 rows_written: 3,
1707 batches_written: 2
1708 }
1709 );
1710 assert_eq!(sink.payloads.len(), 2);
1711 assert_eq!(sink.payloads[1].bytes, vec![0xD1, 30, 0, 0, 0]);
1712 }
1713
1714 #[test]
1715 fn write_direct_batch_to_sink_chunks_measured_payloads_by_byte_limit() {
1716 let mappings = vec![binary_mapping_at(0, "payload")];
1717 let mut state = WriterState::new(
1718 WriteBackend::DirectRawBulk,
1719 SchemaCheck::Strict,
1720 PlanOptions::default(),
1721 mappings,
1722 )
1723 .unwrap();
1724 let mut sink = RecordingRawSink::default();
1725 let row_bytes = vec![0x5a; DIRECT_RAW_MAX_PAYLOAD_BYTES / 2 + 1];
1726 let batch = binary_batch("payload", &[row_bytes.as_slice(), row_bytes.as_slice()]);
1727
1728 let stats = poll_ready(write_direct_batch_to_sink(&mut state, &mut sink, &batch)).unwrap();
1729
1730 assert_eq!(
1731 stats,
1732 WriteStats {
1733 rows_written: 2,
1734 batches_written: 1
1735 }
1736 );
1737 assert_eq!(sink.payloads.len(), 2);
1738 assert_eq!(sink.payloads[0].row_token_offsets, [0]);
1739 assert_eq!(sink.payloads[1].row_token_offsets, [0]);
1740 }
1741
1742 #[test]
1743 fn write_direct_batch_to_sink_skips_send_for_empty_batch_but_records_stats() {
1744 let mappings = vec![mapping("id")];
1745 let mut state = WriterState::new(
1746 WriteBackend::DirectRawBulk,
1747 SchemaCheck::Strict,
1748 PlanOptions::default(),
1749 mappings,
1750 )
1751 .unwrap();
1752 let mut sink = RecordingRawSink::default();
1753 let batch = int32_batch("id", &[]);
1754
1755 let stats = poll_ready(write_direct_batch_to_sink(&mut state, &mut sink, &batch)).unwrap();
1756
1757 assert_eq!(
1758 stats,
1759 WriteStats {
1760 rows_written: 0,
1761 batches_written: 1
1762 }
1763 );
1764 assert!(sink.payloads.is_empty());
1765 }
1766
1767 #[test]
1768 fn write_direct_batch_to_sink_rejects_bad_later_row_before_send() {
1769 let mappings = vec![float_mapping("amount")];
1770 let mut state = WriterState::new(
1771 WriteBackend::DirectRawBulk,
1772 SchemaCheck::Strict,
1773 PlanOptions::default(),
1774 mappings,
1775 )
1776 .unwrap();
1777 let mut sink = RecordingRawSink::default();
1778 let batch = float64_batch("amount", &[Some(1.0), Some(f64::NAN)]);
1779
1780 let err =
1781 poll_ready(write_direct_batch_to_sink(&mut state, &mut sink, &batch)).unwrap_err();
1782
1783 let Error::ValueConversion { diagnostics } = err else {
1784 panic!("expected value conversion error");
1785 };
1786 assert_eq!(diagnostics.all()[0].code(), DiagnosticCode::NonFiniteFloat);
1787 assert_eq!(diagnostics.all()[0].row(), Some(1));
1788 assert!(sink.payloads.is_empty());
1789 assert_eq!(state.stats(), WriteStats::default());
1790 }
1791
1792 #[test]
1793 fn write_direct_batch_to_sink_rejects_uint64_bigint_overflow_before_any_range_send() {
1794 let mappings = vec![schema_mapping_at(
1795 0,
1796 "u64_value",
1797 DataType::UInt64,
1798 MssqlType::BigInt,
1799 false,
1800 )];
1801 let mut state = WriterState::new(
1802 WriteBackend::DirectRawBulk,
1803 SchemaCheck::Strict,
1804 PlanOptions::default(),
1805 mappings,
1806 )
1807 .unwrap();
1808 let mut sink = RecordingRawSink::default();
1809 let row_count = DIRECT_RAW_MAX_PAYLOAD_BYTES / 9 + 2;
1810 let mut values = vec![1_u64; row_count];
1811 values[row_count - 1] = i64::MAX as u64 + 1;
1812 let batch = uint64_batch("u64_value", &values);
1813
1814 let err =
1815 poll_ready(write_direct_batch_to_sink(&mut state, &mut sink, &batch)).unwrap_err();
1816
1817 let Error::ValueConversion { diagnostics } = err else {
1818 panic!("expected value conversion error");
1819 };
1820 assert_eq!(
1821 diagnostics.all()[0].code(),
1822 DiagnosticCode::IntegerOutOfRange
1823 );
1824 assert_eq!(diagnostics.all()[0].row(), Some(row_count - 1));
1825 assert!(sink.payloads.is_empty());
1826 assert_eq!(state.stats(), WriteStats::default());
1827 }
1828
1829 #[test]
1830 fn write_direct_batch_to_sink_rejects_runtime_type_mismatch_before_send() {
1831 let mappings = vec![mapping("id")];
1832 let mut state = WriterState::new(
1833 WriteBackend::DirectRawBulk,
1834 SchemaCheck::Strict,
1835 PlanOptions::default(),
1836 mappings,
1837 )
1838 .unwrap();
1839 let mut sink = RecordingRawSink::default();
1840 let batch = RecordBatch::try_new(
1841 Arc::new(Schema::new(vec![Field::new(
1842 "id",
1843 DataType::Float64,
1844 false,
1845 )])),
1846 vec![Arc::new(Float64Array::from(vec![1.0]))],
1847 )
1848 .unwrap();
1849
1850 let err =
1851 poll_ready(write_direct_batch_to_sink(&mut state, &mut sink, &batch)).unwrap_err();
1852
1853 let Error::ValueConversion { diagnostics } = err else {
1854 panic!("expected value conversion error");
1855 };
1856 assert_eq!(diagnostics.all()[0].code(), DiagnosticCode::SchemaMismatch);
1857 assert!(
1858 diagnostics.all()[0]
1859 .message()
1860 .contains("runtime Arrow type Float64")
1861 );
1862 assert!(sink.payloads.is_empty());
1863 assert_eq!(state.stats(), WriteStats::default());
1864 }
1865
1866 #[test]
1867 fn write_direct_batch_to_sink_send_failure_preserves_error_and_keeps_stats() {
1868 let mappings = vec![mapping("id")];
1869 let mut state = WriterState::new(
1870 WriteBackend::DirectRawBulk,
1871 SchemaCheck::Strict,
1872 PlanOptions::default(),
1873 mappings,
1874 )
1875 .unwrap();
1876 let mut sink = RecordingRawSink {
1877 fail_on_send: true,
1878 payloads: Vec::new(),
1879 };
1880 let batch = int32_batch("id", &[1, 2, 3]);
1881
1882 let err =
1883 poll_ready(write_direct_batch_to_sink(&mut state, &mut sink, &batch)).unwrap_err();
1884
1885 let Error::Tiberius { source } = err else {
1886 panic!("expected tiberius error");
1887 };
1888 assert_eq!(
1889 source.to_string(),
1890 "BULK UPLOAD input failure: fake raw send failure"
1891 );
1892 assert!(sink.payloads.is_empty());
1893 assert_eq!(state.stats(), WriteStats::default());
1894 }
1895
1896 #[test]
1897 fn writer_types_are_exported_from_crate_root() {
1898 assert_eq!(crate::WriteBackend::default(), WriteBackend::Auto);
1899 assert_eq!(crate::WriteOptions::default(), WriteOptions::default());
1900 assert_eq!(crate::WriteStats::default(), WriteStats::default());
1901 let _ = std::any::type_name::<crate::BulkWriter<'static, DummyStream>>();
1902 }
1903
1904 #[test]
1905 fn tiberius_alias_exposes_client_type() {
1906 let name = std::any::type_name::<tiberius::Client<DummyStream>>();
1907
1908 assert!(name.contains("tiberius"));
1909 }
1910
1911 fn mapping(name: &str) -> SchemaMapping {
1912 SchemaMapping::new(
1913 ArrowFieldRef::new(0, name.to_owned(), false, DataType::Int32),
1914 MssqlColumn::new(Identifier::new(name).unwrap(), MssqlType::Int, false),
1915 )
1916 }
1917
1918 fn schema_mapping_at(
1919 index: usize,
1920 name: &str,
1921 arrow_type: DataType,
1922 mssql_type: MssqlType,
1923 nullable: bool,
1924 ) -> SchemaMapping {
1925 SchemaMapping::new(
1926 ArrowFieldRef::new(index, name.to_owned(), nullable, arrow_type),
1927 MssqlColumn::new(Identifier::new(name).unwrap(), mssql_type, nullable),
1928 )
1929 }
1930
1931 fn float_mapping(name: &str) -> SchemaMapping {
1932 float_mapping_at(0, name)
1933 }
1934
1935 fn float_mapping_at(index: usize, name: &str) -> SchemaMapping {
1936 SchemaMapping::new(
1937 ArrowFieldRef::new(index, name.to_owned(), false, DataType::Float64),
1938 MssqlColumn::new(
1939 Identifier::new(name).unwrap(),
1940 MssqlType::Float { precision: 53 },
1941 false,
1942 ),
1943 )
1944 }
1945
1946 fn utf8_mapping_at(index: usize, name: &str) -> SchemaMapping {
1947 SchemaMapping::new(
1948 ArrowFieldRef::new(index, name.to_owned(), false, DataType::Utf8),
1949 MssqlColumn::new(
1950 Identifier::new(name).unwrap(),
1951 MssqlType::NVarChar(MssqlTypeLength::Max),
1952 false,
1953 ),
1954 )
1955 }
1956
1957 fn binary_mapping_at(index: usize, name: &str) -> SchemaMapping {
1958 SchemaMapping::new(
1959 ArrowFieldRef::new(index, name.to_owned(), false, DataType::Binary),
1960 MssqlColumn::new(
1961 Identifier::new(name).unwrap(),
1962 MssqlType::VarBinary(MssqlTypeLength::Max),
1963 false,
1964 ),
1965 )
1966 }
1967
1968 fn fixed_size_binary_mapping_at(index: usize, name: &str, length: usize) -> SchemaMapping {
1969 SchemaMapping::new(
1970 ArrowFieldRef::new(
1971 index,
1972 name.to_owned(),
1973 false,
1974 DataType::FixedSizeBinary(i32::try_from(length).unwrap()),
1975 ),
1976 MssqlColumn::new(
1977 Identifier::new(name).unwrap(),
1978 MssqlType::Binary(length),
1979 false,
1980 ),
1981 )
1982 }
1983
1984 fn int32_batch(name: &str, values: &[i32]) -> RecordBatch {
1985 let schema = Arc::new(Schema::new(vec![Field::new(name, DataType::Int32, false)]));
1986 let array = Arc::new(Int32Array::from(values.to_vec()));
1987
1988 RecordBatch::try_new(schema, vec![array]).unwrap()
1989 }
1990
1991 fn uint64_batch(name: &str, values: &[u64]) -> RecordBatch {
1992 let schema = Arc::new(Schema::new(vec![Field::new(name, DataType::UInt64, false)]));
1993 let array = Arc::new(UInt64Array::from(values.to_vec()));
1994
1995 RecordBatch::try_new(schema, vec![array]).unwrap()
1996 }
1997
1998 fn binary_batch(name: &str, values: &[&[u8]]) -> RecordBatch {
1999 let schema = Arc::new(Schema::new(vec![Field::new(name, DataType::Binary, false)]));
2000 let array = Arc::new(BinaryArray::from_iter_values(values.iter().copied()));
2001
2002 RecordBatch::try_new(schema, vec![array]).unwrap()
2003 }
2004
2005 fn bulk_target_column(ordinal: usize, name: &str, nullable: bool) -> FakeBulkTargetColumn {
2006 bulk_target_column_with_type(ordinal, name, nullable, tiberius::ColumnType::Int4)
2007 }
2008
2009 fn bulk_target_column_with_type(
2010 ordinal: usize,
2011 name: &str,
2012 nullable: bool,
2013 column_type: tiberius::ColumnType,
2014 ) -> FakeBulkTargetColumn {
2015 FakeBulkTargetColumn {
2016 ordinal,
2017 name: name.to_owned(),
2018 nullable,
2019 column_type,
2020 decimal_precision_scale: None,
2021 }
2022 }
2023
2024 fn bulk_target_decimal_column(
2025 ordinal: usize,
2026 name: &str,
2027 nullable: bool,
2028 precision: u8,
2029 scale: u8,
2030 ) -> FakeBulkTargetColumn {
2031 FakeBulkTargetColumn {
2032 ordinal,
2033 name: name.to_owned(),
2034 nullable,
2035 column_type: tiberius::ColumnType::Decimaln,
2036 decimal_precision_scale: Some((precision, scale)),
2037 }
2038 }
2039
2040 fn float64_batch(name: &str, values: &[Option<f64>]) -> RecordBatch {
2041 let schema = Arc::new(Schema::new(vec![Field::new(
2042 name,
2043 DataType::Float64,
2044 false,
2045 )]));
2046 let array = Arc::new(Float64Array::from(values.to_vec()));
2047
2048 RecordBatch::try_new(schema, vec![array]).unwrap()
2049 }
2050
2051 fn poll_ready<F>(future: F) -> F::Output
2052 where
2053 F: Future,
2054 {
2055 let waker = Waker::from(Arc::new(NoopWake));
2056 let mut context = Context::from_waker(&waker);
2057 let mut future = Box::pin(future);
2058
2059 match future.as_mut().poll(&mut context) {
2060 Poll::Ready(output) => output,
2061 Poll::Pending => panic!("future unexpectedly returned pending"),
2062 }
2063 }
2064
2065 #[derive(Debug, Default)]
2066 struct RecordingSink {
2067 fail_on_send: Option<usize>,
2068 rows: Vec<tiberius::TokenRow<'static>>,
2069 }
2070
2071 #[derive(Debug, Default)]
2072 struct RecordingRawSink {
2073 fail_on_send: bool,
2074 payloads: Vec<RecordedRawPayload>,
2075 }
2076
2077 #[derive(Debug, PartialEq, Eq)]
2078 struct RecordedRawPayload {
2079 bytes: Vec<u8>,
2080 row_token_offsets: Vec<usize>,
2081 }
2082
2083 impl RawRowsSink for RecordingRawSink {
2084 async fn send_measured_raw_rows(
2085 &mut self,
2086 encoder: &DirectEncoder,
2087 batch: &RecordBatch,
2088 measured: &MeasuredDirectBatch,
2089 range: MeasuredRowRange,
2090 ) -> crate::Result<()> {
2091 let payload =
2092 encoder.encode_measured_batch_range(batch, measured, range.start, range.len)?;
2093
2094 if self.fail_on_send {
2095 return Err(Error::Tiberius {
2096 source: tiberius::error::Error::BulkInput(Cow::Borrowed(
2097 "fake raw send failure",
2098 )),
2099 });
2100 }
2101
2102 self.payloads.push(RecordedRawPayload {
2103 bytes: payload.bytes().to_vec(),
2104 row_token_offsets: payload.row_token_offsets().to_vec(),
2105 });
2106 Ok(())
2107 }
2108 }
2109
2110 impl TokenRowSink for RecordingSink {
2111 async fn send_token_row(&mut self, row: tiberius::TokenRow<'static>) -> crate::Result<()> {
2112 if self.fail_on_send == Some(self.rows.len()) {
2113 return Err(Error::Tiberius {
2114 source: tiberius::error::Error::BulkInput(Cow::Borrowed("fake send failure")),
2115 });
2116 }
2117
2118 self.rows.push(row);
2119 Ok(())
2120 }
2121 }
2122
2123 #[derive(Debug)]
2124 struct FakeBulkTargetColumn {
2125 ordinal: usize,
2126 name: String,
2127 nullable: bool,
2128 column_type: tiberius::ColumnType,
2129 decimal_precision_scale: Option<(u8, u8)>,
2130 }
2131
2132 impl BulkTargetColumnMetadata for FakeBulkTargetColumn {
2133 fn ordinal(&self) -> usize {
2134 self.ordinal
2135 }
2136
2137 fn name(&self) -> &str {
2138 &self.name
2139 }
2140
2141 fn is_nullable(&self) -> bool {
2142 self.nullable
2143 }
2144
2145 fn column_type(&self) -> tiberius::ColumnType {
2146 self.column_type
2147 }
2148
2149 fn decimal_precision_scale(&self) -> Option<(u8, u8)> {
2150 self.decimal_precision_scale
2151 }
2152 }
2153
2154 struct NoopWake;
2155
2156 impl Wake for NoopWake {
2157 fn wake(self: Arc<Self>) {}
2158 }
2159
2160 #[derive(Debug)]
2161 struct DummyStream;
2162
2163 impl AsyncRead for DummyStream {
2164 fn poll_read(
2165 self: Pin<&mut Self>,
2166 _cx: &mut Context<'_>,
2167 _buf: &mut [u8],
2168 ) -> Poll<std::io::Result<usize>> {
2169 Poll::Ready(Ok(0))
2170 }
2171 }
2172
2173 impl AsyncWrite for DummyStream {
2174 fn poll_write(
2175 self: Pin<&mut Self>,
2176 _cx: &mut Context<'_>,
2177 buf: &[u8],
2178 ) -> Poll<std::io::Result<usize>> {
2179 Poll::Ready(Ok(buf.len()))
2180 }
2181
2182 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
2183 Poll::Ready(Ok(()))
2184 }
2185
2186 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
2187 Poll::Ready(Ok(()))
2188 }
2189 }
2190}