Skip to main content

tiberius/tds/codec/
bulk_load.rs

1use asynchronous_codec::BytesMut;
2use bytes::BufMut;
3use enumflags2::BitFlags;
4use futures_util::io::{AsyncRead, AsyncWrite};
5#[cfg(feature = "bulk-load-profile")]
6use std::time::{Duration, Instant};
7use tracing::{event, Level};
8
9#[cfg(feature = "bulk-load-profile")]
10use crate::client::{DirectPacketPollWriteSummary, DirectPacketWriteTiming};
11use crate::{
12    client::Connection, sql_read_bytes::SqlReadBytes, BytesMutWithDataColumns, ColumnFlag,
13    ColumnType, ExecuteResult,
14};
15
16use super::{
17    Encode, MetaDataColumn, PacketHeader, PacketStatus, TokenColMetaData, TokenDone, TokenRow,
18    TokenType, TypeInfo, HEADER_BYTES,
19};
20
21/// Owned destination metadata for a bulk-load request.
22#[derive(Debug, Clone)]
23pub struct BulkLoadColumns<'a> {
24    columns: Vec<MetaDataColumn<'a>>,
25}
26
27impl<'a> BulkLoadColumns<'a> {
28    pub(crate) fn new(columns: Vec<MetaDataColumn<'a>>) -> Self {
29        Self { columns }
30    }
31
32    pub(crate) fn into_inner(self) -> Vec<MetaDataColumn<'a>> {
33        self.columns
34    }
35
36    /// The number of destination columns in the bulk row encoding order.
37    pub fn len(&self) -> usize {
38        self.columns.len()
39    }
40
41    /// Whether there are no destination columns.
42    pub fn is_empty(&self) -> bool {
43        self.columns.is_empty()
44    }
45
46    /// Returns the destination columns in bulk row encoding order.
47    pub fn iter(&self) -> impl ExactSizeIterator<Item = BulkLoadColumn<'_>> {
48        bulk_load_columns(&self.columns)
49    }
50}
51
52/// Metadata for rows appended directly into a raw bulk-load buffer.
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct RawRowsAppend {
55    row_token_offsets: Vec<usize>,
56}
57
58impl RawRowsAppend {
59    /// Creates appended-row metadata from row-token offsets.
60    ///
61    /// Offsets must be relative to the start of the appended byte region, not
62    /// to the start of the full bulk-load request buffer.
63    pub fn new(row_token_offsets: Vec<usize>) -> Self {
64        Self { row_token_offsets }
65    }
66
67    /// Returns row-token offsets relative to the appended byte region.
68    pub fn row_token_offsets(&self) -> &[usize] {
69        &self.row_token_offsets
70    }
71}
72
73/// Append-only access to a raw bulk-load request buffer.
74///
75/// This is a capability wrapper for [`BulkLoadRequest::send_raw_rows_with`].
76/// It lets callers append encoded row bytes directly into the request buffer
77/// without exposing `BytesMut` operations that could truncate, split, clear, or
78/// otherwise mutate bytes that existed before the append started. That keeps
79/// the method's rollback behavior well-defined when encoding or validation
80/// fails.
81#[derive(Debug)]
82pub struct RawRowsAppendBuffer<'a> {
83    bytes: &'a mut BytesMut,
84}
85
86impl RawRowsAppendBuffer<'_> {
87    /// Appends raw row bytes to the request buffer.
88    pub fn extend_from_slice(&mut self, slice: &[u8]) {
89        self.bytes.extend_from_slice(slice);
90    }
91
92    /// Appends one raw row byte to the request buffer.
93    pub fn put_u8(&mut self, value: u8) {
94        self.bytes.put_u8(value);
95    }
96
97    /// Appends a little-endian 16-bit unsigned integer.
98    pub fn put_u16_le(&mut self, value: u16) {
99        self.bytes.put_u16_le(value);
100    }
101
102    /// Appends a little-endian 32-bit unsigned integer.
103    pub fn put_u32_le(&mut self, value: u32) {
104        self.bytes.put_u32_le(value);
105    }
106
107    /// Appends a little-endian 64-bit unsigned integer.
108    pub fn put_u64_le(&mut self, value: u64) {
109        self.bytes.put_u64_le(value);
110    }
111}
112
113/// Packet-write statistics collected by a bulk-load request.
114///
115/// These counters are intended for benchmarking and diagnostics. They do not
116/// change bulk-load behavior and do not include TDS packet header bytes unless
117/// a field name explicitly says otherwise.
118#[cfg(feature = "bulk-load-profile")]
119#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
120pub struct BulkLoadPacketStats {
121    /// Number of times the request attempted to drain complete packets.
122    pub write_packets_calls: u64,
123    /// Number of complete bulk-load packets written before finalization.
124    pub packets_written: u64,
125    /// Complete packet payload bytes written before finalization.
126    pub packet_payload_bytes: u64,
127    /// Largest complete packet payload written before finalization.
128    pub max_packet_payload_bytes: usize,
129    /// Largest buffered byte count observed before draining packets.
130    pub max_buffered_bytes_before_write: usize,
131    /// Buffered tail bytes left after the most recent packet drain.
132    pub buffered_bytes_after_last_write: usize,
133    /// Payload bytes written by the final `EndOfMessage` packet.
134    pub finalized_packet_payload_bytes: usize,
135}
136
137#[cfg(feature = "bulk-load-profile")]
138impl BulkLoadPacketStats {
139    fn record_write_packets_call(&mut self, buffered_bytes_before_write: usize) {
140        self.write_packets_calls = self.write_packets_calls.saturating_add(1);
141        self.max_buffered_bytes_before_write = self
142            .max_buffered_bytes_before_write
143            .max(buffered_bytes_before_write);
144    }
145
146    fn record_packet_written(&mut self, packet_payload_bytes: usize) {
147        self.packets_written = self.packets_written.saturating_add(1);
148        self.packet_payload_bytes = self
149            .packet_payload_bytes
150            .saturating_add(usize_to_u64_saturating(packet_payload_bytes));
151        self.max_packet_payload_bytes = self.max_packet_payload_bytes.max(packet_payload_bytes);
152    }
153
154    fn record_buffered_bytes_after_write(&mut self, buffered_bytes_after_write: usize) {
155        self.buffered_bytes_after_last_write = buffered_bytes_after_write;
156    }
157
158    fn record_finalized_packet(&mut self, packet_payload_bytes: usize) {
159        self.finalized_packet_payload_bytes = packet_payload_bytes;
160    }
161}
162
163/// Bulk-load write timing statistics collected by a bulk-load request.
164///
165/// These counters are intended for benchmarking and diagnostics. They separate
166/// time spent in bulk-load packet draining from lower-level connection writes
167/// and flushes without changing bulk-load behavior.
168#[cfg(feature = "bulk-load-profile")]
169#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
170pub struct BulkLoadWriteTimingStats {
171    /// Time spent inside bulk-load packet drain attempts.
172    pub write_packets_elapsed: Duration,
173    /// Number of times bulk-load packet draining wrote to the connection.
174    pub write_to_wire_calls: u64,
175    /// Time spent awaiting lower-level connection writes from bulk load.
176    pub write_to_wire_elapsed: Duration,
177    /// Payload bytes passed to lower-level connection writes from bulk load.
178    pub write_to_wire_payload_bytes: u64,
179    /// Slowest lower-level connection write awaited by bulk load.
180    pub max_write_to_wire_elapsed: Duration,
181    /// Largest payload passed to a lower-level connection write from bulk load.
182    pub max_write_to_wire_payload_bytes: usize,
183    /// Number of bulk-load flushes.
184    pub flush_calls: u64,
185    /// Time spent awaiting bulk-load flushes.
186    pub flush_elapsed: Duration,
187    /// Slowest explicit flush awaited by bulk load.
188    pub max_flush_elapsed: Duration,
189    /// Time spent finalizing the bulk-load request.
190    pub finalize_elapsed: Duration,
191    /// Time spent awaiting the final `EndOfMessage` packet write.
192    pub finalize_write_to_wire_elapsed: Duration,
193    /// Time spent awaiting the final explicit flush.
194    pub finalize_flush_elapsed: Duration,
195    /// Time spent waiting for the server result after final bulk packet flush.
196    pub finalize_result_elapsed: Duration,
197    /// Breakdown of bulk-load connection writes below the coarse
198    /// `write_to_wire` aggregate.
199    pub connection_write: BulkLoadConnectionWriteStats,
200    /// Experimental raw-bulk direct packet write statistics.
201    pub direct_packet_write: BulkLoadDirectPacketWriteStats,
202}
203
204#[cfg(feature = "bulk-load-profile")]
205impl BulkLoadWriteTimingStats {
206    fn record_write_packets_elapsed(&mut self, elapsed: Duration) {
207        self.write_packets_elapsed += elapsed;
208    }
209
210    fn record_write_to_wire(&mut self, elapsed: Duration, payload_bytes: usize) {
211        self.write_to_wire_calls = self.write_to_wire_calls.saturating_add(1);
212        self.write_to_wire_elapsed += elapsed;
213        self.write_to_wire_payload_bytes = self
214            .write_to_wire_payload_bytes
215            .saturating_add(usize_to_u64_saturating(payload_bytes));
216        self.max_write_to_wire_elapsed = self.max_write_to_wire_elapsed.max(elapsed);
217        self.max_write_to_wire_payload_bytes =
218            self.max_write_to_wire_payload_bytes.max(payload_bytes);
219    }
220
221    fn record_flush(&mut self, elapsed: Duration) {
222        self.flush_calls = self.flush_calls.saturating_add(1);
223        self.flush_elapsed += elapsed;
224        self.max_flush_elapsed = self.max_flush_elapsed.max(elapsed);
225    }
226
227    fn record_finalize_elapsed(&mut self, elapsed: Duration) {
228        self.finalize_elapsed += elapsed;
229    }
230
231    fn record_finalize_write_to_wire_elapsed(&mut self, elapsed: Duration) {
232        self.finalize_write_to_wire_elapsed += elapsed;
233    }
234
235    fn record_finalize_flush_elapsed(&mut self, elapsed: Duration) {
236        self.finalize_flush_elapsed += elapsed;
237    }
238
239    fn record_finalize_result_elapsed(&mut self, elapsed: Duration) {
240        self.finalize_result_elapsed += elapsed;
241    }
242
243    fn record_connection_write(
244        &mut self,
245        payload_bytes: usize,
246        ready_elapsed: Duration,
247        encode_elapsed: Duration,
248        flush_elapsed: Duration,
249    ) {
250        self.connection_write
251            .record(payload_bytes, ready_elapsed, encode_elapsed, flush_elapsed);
252    }
253
254    fn record_direct_packet_write(&mut self, timing: DirectPacketWriteTiming) {
255        self.direct_packet_write.record_timing(timing, false);
256    }
257
258    fn record_direct_final_packet_write(&mut self, timing: DirectPacketWriteTiming) {
259        self.direct_packet_write.record_timing(timing, true);
260    }
261}
262
263/// Detailed timing statistics for bulk-load writes through the framed
264/// connection sink.
265///
266/// These counters are a diagnostic breakdown under
267/// [`BulkLoadWriteTimingStats::write_to_wire_elapsed`]. They are intended to
268/// show whether raw bulk writes are dominated by sink readiness, packet
269/// encoding, or sink flushing while preserving the existing write behavior.
270#[cfg(feature = "bulk-load-profile")]
271#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
272pub struct BulkLoadConnectionWriteStats {
273    /// Number of bulk-load packets passed into the connection write path.
274    pub calls: u64,
275    /// Payload bytes passed into the connection write path, excluding TDS
276    /// packet headers.
277    pub payload_bytes: u64,
278    /// Time spent waiting for the framed sink to accept another packet.
279    pub ready_elapsed: Duration,
280    /// Time spent encoding packets into the framed sink buffer.
281    pub encode_elapsed: Duration,
282    /// Time spent flushing the framed sink after packet encoding.
283    pub flush_elapsed: Duration,
284    /// Slowest framed sink readiness wait.
285    pub max_ready_elapsed: Duration,
286    /// Slowest packet encode operation.
287    pub max_encode_elapsed: Duration,
288    /// Slowest framed sink flush.
289    pub max_flush_elapsed: Duration,
290    /// Largest payload passed into the connection write path.
291    pub max_payload_bytes: usize,
292}
293
294#[cfg(feature = "bulk-load-profile")]
295impl BulkLoadConnectionWriteStats {
296    fn record(
297        &mut self,
298        payload_bytes: usize,
299        ready_elapsed: Duration,
300        encode_elapsed: Duration,
301        flush_elapsed: Duration,
302    ) {
303        self.calls = self.calls.saturating_add(1);
304        self.payload_bytes = self
305            .payload_bytes
306            .saturating_add(usize_to_u64_saturating(payload_bytes));
307        self.ready_elapsed += ready_elapsed;
308        self.encode_elapsed += encode_elapsed;
309        self.flush_elapsed += flush_elapsed;
310        self.max_ready_elapsed = self.max_ready_elapsed.max(ready_elapsed);
311        self.max_encode_elapsed = self.max_encode_elapsed.max(encode_elapsed);
312        self.max_flush_elapsed = self.max_flush_elapsed.max(flush_elapsed);
313        self.max_payload_bytes = self.max_payload_bytes.max(payload_bytes);
314    }
315}
316
317/// Detailed timing statistics for an experimental raw-bulk direct packet
318/// writer.
319///
320/// These counters are intended to compare an experimental bulk-only packet
321/// writer against the framed sink path. They remain zero when the framed path
322/// is used.
323#[cfg(feature = "bulk-load-profile")]
324#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
325pub struct BulkLoadDirectPacketWriteStats {
326    /// Number of TDS packets passed to the direct packet writer.
327    pub calls: u64,
328    /// Payload bytes passed to the direct packet writer, excluding headers.
329    pub payload_bytes: u64,
330    /// Header bytes written by the direct packet writer.
331    pub header_bytes: u64,
332    /// Largest payload passed to the direct packet writer.
333    pub max_payload_bytes: usize,
334    /// Number of final `EndOfMessage` packets passed to the direct packet writer.
335    pub final_calls: u64,
336    /// Payload bytes in final `EndOfMessage` packets.
337    pub final_payload_bytes: u64,
338    /// Header bytes in final `EndOfMessage` packets.
339    pub final_header_bytes: u64,
340    /// Direct packet writes observed on a raw, non-TLS stream.
341    pub raw_stream_calls: u64,
342    /// Direct packet writes observed on a TLS stream.
343    pub tls_stream_calls: u64,
344    /// Number of lower-level write calls issued by the direct packet writer.
345    pub write_calls: u64,
346    /// Bytes accepted by lower-level writes, including headers and payloads.
347    pub write_bytes: u64,
348    /// Largest byte count accepted by a single lower-level write.
349    pub max_write_bytes: usize,
350    /// Time spent awaiting lower-level writes.
351    pub write_elapsed: Duration,
352    /// Slowest lower-level write.
353    pub max_write_elapsed: Duration,
354    /// Number of lower-level writes used for packet headers.
355    pub header_write_calls: u64,
356    /// Header bytes accepted by lower-level writes.
357    pub header_write_bytes: u64,
358    /// Largest header byte count accepted by a single lower-level write.
359    pub header_max_write_bytes: usize,
360    /// Time spent awaiting lower-level header writes.
361    pub header_write_elapsed: Duration,
362    /// Slowest lower-level header write.
363    pub header_max_write_elapsed: Duration,
364    /// Header writes that accepted fewer bytes than remained in the header slice.
365    pub header_partial_writes: u64,
366    /// Number of lower-level writes used for packet payloads.
367    pub payload_write_calls: u64,
368    /// Payload bytes accepted by lower-level writes.
369    pub payload_write_bytes: u64,
370    /// Largest payload byte count accepted by a single lower-level write.
371    pub payload_max_write_bytes: usize,
372    /// Time spent awaiting lower-level payload writes.
373    pub payload_write_elapsed: Duration,
374    /// Slowest lower-level payload write.
375    pub payload_max_write_elapsed: Duration,
376    /// Payload writes that accepted fewer bytes than remained in the payload slice.
377    pub payload_partial_writes: u64,
378    /// Number of low-level `poll_write` attempts.
379    pub poll_write_polls: u64,
380    /// Number of `poll_write` attempts that returned `Pending`.
381    pub poll_write_pending_count: u64,
382    /// Time spent waiting after `poll_write` returned `Pending`.
383    pub poll_write_pending_elapsed: Duration,
384    /// Slowest wait after a `poll_write` returned `Pending`.
385    pub poll_write_max_pending_elapsed: Duration,
386    /// Number of `poll_write` attempts that returned ready with a write result.
387    pub poll_write_ready_count: u64,
388    /// Time spent in ready `poll_write` attempts.
389    pub poll_write_ready_elapsed: Duration,
390    /// Slowest ready `poll_write` attempt.
391    pub poll_write_max_ready_elapsed: Duration,
392    /// Number of explicit direct packet writer flushes.
393    pub flush_calls: u64,
394    /// Time spent awaiting explicit direct packet writer flushes.
395    pub flush_elapsed: Duration,
396    /// Slowest explicit direct packet writer flush.
397    pub max_flush_elapsed: Duration,
398    /// Number of direct packet flush polls that returned `Pending`.
399    pub flush_pending_count: u64,
400    /// Time spent waiting after direct packet flush polls returned `Pending`.
401    pub flush_pending_elapsed: Duration,
402    /// Slowest wait after a direct packet flush poll returned `Pending`.
403    pub flush_max_pending_elapsed: Duration,
404}
405
406#[cfg(feature = "bulk-load-profile")]
407impl BulkLoadDirectPacketWriteStats {
408    fn record_timing(&mut self, timing: DirectPacketWriteTiming, final_packet: bool) {
409        self.record_packet(timing.payload_bytes, timing.header_bytes, final_packet);
410        self.record_stream_mode(timing.raw_stream, timing.tls_stream);
411        self.record_write_summary(
412            timing.write_calls,
413            timing.write_bytes,
414            timing.max_write_bytes,
415            timing.write_elapsed,
416            timing.max_write_elapsed,
417        );
418        self.record_header_write_summary(
419            timing.header_write_calls,
420            timing.header_write_bytes,
421            timing.header_max_write_bytes,
422            timing.header_write_elapsed,
423            timing.header_max_write_elapsed,
424            timing.header_partial_writes,
425        );
426        self.record_payload_write_summary(
427            timing.payload_write_calls,
428            timing.payload_write_bytes,
429            timing.payload_max_write_bytes,
430            timing.payload_write_elapsed,
431            timing.payload_max_write_elapsed,
432            timing.payload_partial_writes,
433        );
434        self.record_poll_write_summary(timing.poll_write_summary());
435        self.record_flush(
436            timing.flush_elapsed,
437            timing.flush_pending_count,
438            timing.flush_pending_elapsed,
439            timing.flush_max_pending_elapsed,
440        );
441    }
442
443    fn record_packet(&mut self, payload_bytes: usize, header_bytes: usize, final_packet: bool) {
444        self.calls = self.calls.saturating_add(1);
445        self.payload_bytes = self
446            .payload_bytes
447            .saturating_add(usize_to_u64_saturating(payload_bytes));
448        self.header_bytes = self
449            .header_bytes
450            .saturating_add(usize_to_u64_saturating(header_bytes));
451        self.max_payload_bytes = self.max_payload_bytes.max(payload_bytes);
452
453        if final_packet {
454            self.final_calls = self.final_calls.saturating_add(1);
455            self.final_payload_bytes = self
456                .final_payload_bytes
457                .saturating_add(usize_to_u64_saturating(payload_bytes));
458            self.final_header_bytes = self
459                .final_header_bytes
460                .saturating_add(usize_to_u64_saturating(header_bytes));
461        }
462    }
463
464    fn record_stream_mode(&mut self, raw_stream: bool, tls_stream: bool) {
465        if raw_stream {
466            self.raw_stream_calls = self.raw_stream_calls.saturating_add(1);
467        }
468        if tls_stream {
469            self.tls_stream_calls = self.tls_stream_calls.saturating_add(1);
470        }
471    }
472
473    fn record_write_summary(
474        &mut self,
475        calls: u64,
476        bytes: u64,
477        max_bytes: usize,
478        elapsed: Duration,
479        max_elapsed: Duration,
480    ) {
481        self.write_calls = self.write_calls.saturating_add(calls);
482        self.write_bytes = self.write_bytes.saturating_add(bytes);
483        self.max_write_bytes = self.max_write_bytes.max(max_bytes);
484        self.write_elapsed += elapsed;
485        self.max_write_elapsed = self.max_write_elapsed.max(max_elapsed);
486    }
487
488    fn record_header_write_summary(
489        &mut self,
490        calls: u64,
491        bytes: u64,
492        max_bytes: usize,
493        elapsed: Duration,
494        max_elapsed: Duration,
495        partial_writes: u64,
496    ) {
497        self.header_write_calls = self.header_write_calls.saturating_add(calls);
498        self.header_write_bytes = self.header_write_bytes.saturating_add(bytes);
499        self.header_max_write_bytes = self.header_max_write_bytes.max(max_bytes);
500        self.header_write_elapsed += elapsed;
501        self.header_max_write_elapsed = self.header_max_write_elapsed.max(max_elapsed);
502        self.header_partial_writes = self.header_partial_writes.saturating_add(partial_writes);
503    }
504
505    fn record_payload_write_summary(
506        &mut self,
507        calls: u64,
508        bytes: u64,
509        max_bytes: usize,
510        elapsed: Duration,
511        max_elapsed: Duration,
512        partial_writes: u64,
513    ) {
514        self.payload_write_calls = self.payload_write_calls.saturating_add(calls);
515        self.payload_write_bytes = self.payload_write_bytes.saturating_add(bytes);
516        self.payload_max_write_bytes = self.payload_max_write_bytes.max(max_bytes);
517        self.payload_write_elapsed += elapsed;
518        self.payload_max_write_elapsed = self.payload_max_write_elapsed.max(max_elapsed);
519        self.payload_partial_writes = self.payload_partial_writes.saturating_add(partial_writes);
520    }
521
522    fn record_poll_write_summary(&mut self, summary: DirectPacketPollWriteSummary) {
523        self.poll_write_polls = self.poll_write_polls.saturating_add(summary.polls);
524        self.poll_write_pending_count = self
525            .poll_write_pending_count
526            .saturating_add(summary.pending_count);
527        self.poll_write_pending_elapsed += summary.pending_elapsed;
528        self.poll_write_max_pending_elapsed = self
529            .poll_write_max_pending_elapsed
530            .max(summary.max_pending_elapsed);
531        self.poll_write_ready_count = self
532            .poll_write_ready_count
533            .saturating_add(summary.ready_count);
534        self.poll_write_ready_elapsed += summary.ready_elapsed;
535        self.poll_write_max_ready_elapsed = self
536            .poll_write_max_ready_elapsed
537            .max(summary.max_ready_elapsed);
538    }
539
540    fn record_flush(
541        &mut self,
542        elapsed: Duration,
543        pending_count: u64,
544        pending_elapsed: Duration,
545        max_pending_elapsed: Duration,
546    ) {
547        self.flush_calls = self.flush_calls.saturating_add(1);
548        self.flush_elapsed += elapsed;
549        self.max_flush_elapsed = self.max_flush_elapsed.max(elapsed);
550        self.flush_pending_count = self.flush_pending_count.saturating_add(pending_count);
551        self.flush_pending_elapsed += pending_elapsed;
552        self.flush_max_pending_elapsed = self.flush_max_pending_elapsed.max(max_pending_elapsed);
553    }
554}
555
556/// Complete benchmark statistics collected by a bulk-load request.
557#[cfg(feature = "bulk-load-profile")]
558#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
559pub struct BulkLoadStats {
560    /// Packet counters collected while writing bulk-load data.
561    pub packet: BulkLoadPacketStats,
562    /// Timing counters collected while writing bulk-load data.
563    pub write_timing: BulkLoadWriteTimingStats,
564}
565
566/// A handler for a bulk insert data flow.
567#[derive(Debug)]
568pub struct BulkLoadRequest<'a, S>
569where
570    S: AsyncRead + AsyncWrite + Unpin + Send,
571{
572    connection: &'a mut Connection<S>,
573    packet_id: u8,
574    buf: BytesMut,
575    columns: Vec<MetaDataColumn<'a>>,
576    #[cfg(feature = "bulk-load-profile")]
577    packet_stats: BulkLoadPacketStats,
578    #[cfg(feature = "bulk-load-profile")]
579    write_timing_stats: BulkLoadWriteTimingStats,
580    direct_packet_writes: bool,
581}
582
583// Direct packet mode keeps the pending buffer shaped as:
584//
585//   [8-byte header slot][pending payload bytes...]
586//
587// Normal framed mode keeps only payload bytes in the buffer. Switching to
588// direct mode copies the current payload once so later packet writes can patch
589// headers in place.
590fn replace_with_direct_packet_buffer(buf: &mut BytesMut) {
591    let payload = buf.split();
592    let mut packet_buf = BytesMut::with_capacity(HEADER_BYTES.saturating_add(payload.len()));
593    packet_buf.resize(HEADER_BYTES, 0);
594    packet_buf.extend_from_slice(&payload);
595    *buf = packet_buf;
596}
597
598fn direct_packet_payload_len(buf: &BytesMut) -> usize {
599    buf.len().saturating_sub(HEADER_BYTES)
600}
601
602fn encode_direct_packet_header(
603    buf: &mut BytesMut,
604    packet_id: u8,
605    header_start: usize,
606    payload_len: usize,
607    status: PacketStatus,
608) -> crate::Result<()> {
609    // `header_start` may point at bytes from an already-sent packet. That is
610    // intentional: once those bytes are on the wire, the buffer space can be
611    // reused as the header slot for the next contiguous packet write.
612    let mut header = PacketHeader::bulk_load(packet_id);
613    header.set_status(status);
614    let header_end = header_start.saturating_add(HEADER_BYTES);
615    let mut header_buf = &mut buf[header_start..header_end];
616    header.encode_for_payload(payload_len, &mut header_buf)
617}
618
619impl<'a, S> BulkLoadRequest<'a, S>
620where
621    S: AsyncRead + AsyncWrite + Unpin + Send,
622{
623    pub(crate) fn new(
624        connection: &'a mut Connection<S>,
625        columns: Vec<MetaDataColumn<'a>>,
626    ) -> crate::Result<Self> {
627        let packet_id = connection.context_mut().next_packet_id();
628        let mut buf = BytesMut::new();
629
630        let cmd = TokenColMetaData {
631            columns: columns.clone(),
632        };
633
634        cmd.encode(&mut buf)?;
635
636        let this = Self {
637            connection,
638            packet_id,
639            buf,
640            columns,
641            #[cfg(feature = "bulk-load-profile")]
642            packet_stats: BulkLoadPacketStats::default(),
643            #[cfg(feature = "bulk-load-profile")]
644            write_timing_stats: BulkLoadWriteTimingStats::default(),
645            direct_packet_writes: false,
646        };
647
648        Ok(this)
649    }
650
651    /// Returns the destination columns used by this bulk-load request.
652    ///
653    /// The returned columns are ordered exactly as [`send`] expects row values
654    /// to be encoded. The metadata is discovered during
655    /// [`Client::bulk_insert`] and filtered to updateable destination columns
656    /// before the request is created.
657    ///
658    /// [`send`]: Self::send
659    /// [`Client::bulk_insert`]: crate::Client::bulk_insert
660    pub fn columns(&self) -> impl ExactSizeIterator<Item = BulkLoadColumn<'_>> {
661        bulk_load_columns(&self.columns)
662    }
663
664    /// Returns packet-write statistics collected by this bulk-load request.
665    #[cfg(feature = "bulk-load-profile")]
666    pub fn packet_stats(&self) -> BulkLoadPacketStats {
667        self.packet_stats
668    }
669
670    /// Returns write timing statistics collected by this bulk-load request.
671    #[cfg(feature = "bulk-load-profile")]
672    pub fn write_timing_stats(&self) -> BulkLoadWriteTimingStats {
673        self.write_timing_stats
674    }
675
676    /// Returns all benchmark statistics collected by this bulk-load request.
677    #[cfg(feature = "bulk-load-profile")]
678    pub fn stats(&self) -> BulkLoadStats {
679        BulkLoadStats {
680            packet: self.packet_stats,
681            write_timing: self.write_timing_stats,
682        }
683    }
684
685    /// Enables the experimental direct packet write path for this bulk-load
686    /// request.
687    ///
688    /// The default path uses Tiberius' framed packet sink. This opt-in path is
689    /// intended for benchmark comparisons only: it preserves TDS packet framing
690    /// while writing raw bulk packets directly to the underlying transport.
691    ///
692    /// Direct mode changes the internal pending buffer from payload-only bytes
693    /// to a header-prefixed packet buffer. The first 8 bytes are reserved for a
694    /// TDS header and are filled immediately before writing each packet.
695    ///
696    /// Normal query writes and requests that do not call this method continue
697    /// using the framed sink path.
698    pub fn enable_direct_packet_writes(&mut self) {
699        if !self.direct_packet_writes {
700            replace_with_direct_packet_buffer(&mut self.buf);
701            self.direct_packet_writes = true;
702        }
703    }
704
705    /// Returns true if this request uses the experimental direct packet writer.
706    pub fn direct_packet_writes_enabled(&self) -> bool {
707        self.direct_packet_writes
708    }
709
710    fn direct_payload_len(&self) -> usize {
711        direct_packet_payload_len(&self.buf)
712    }
713
714    fn encode_direct_packet_header_at(
715        &mut self,
716        header_start: usize,
717        payload_len: usize,
718        status: PacketStatus,
719    ) -> crate::Result<()> {
720        encode_direct_packet_header(
721            &mut self.buf,
722            self.packet_id,
723            header_start,
724            payload_len,
725            status,
726        )
727    }
728
729    /// Adds a new row to the bulk insert, flushing only when having a full packet of data.
730    ///
731    /// # Warning
732    ///
733    /// After the last row, [`finalize`] must be called to flush the buffered
734    /// data and for the data to actually be available in the table.
735    ///
736    /// [`finalize`]: #method.finalize
737    pub async fn send(&mut self, row: TokenRow<'a>) -> crate::Result<()> {
738        let mut buf_with_columns = BytesMutWithDataColumns::new(&mut self.buf, &self.columns);
739
740        row.encode(&mut buf_with_columns)?;
741        self.write_packets().await?;
742
743        Ok(())
744    }
745
746    /// Adds one already-encoded row value payload to the bulk insert.
747    ///
748    /// The payload must contain only the encoded value bytes for one row. It
749    /// must not include the TDS `ROW` token byte. This method prefixes the
750    /// normal `ROW` token (`0xD1`) and then appends the payload to the same
751    /// packet buffer used by [`send`].
752    ///
753    /// Empty payloads are rejected. After the last row, [`finalize`] must be
754    /// called to flush the buffered data and complete the bulk load.
755    ///
756    /// [`send`]: Self::send
757    /// [`finalize`]: Self::finalize
758    pub async fn send_raw_row_payload(&mut self, payload: impl AsRef<[u8]>) -> crate::Result<()> {
759        append_raw_row_payload(&mut self.buf, payload.as_ref())?;
760        self.write_packets().await?;
761
762        Ok(())
763    }
764
765    /// Adds already-encoded complete TDS rows to the bulk insert.
766    ///
767    /// The payload must contain one or more complete TDS rows. Each row must
768    /// begin with the TDS `ROW` token byte (`0xD1`) followed by that row's
769    /// encoded value payload. This is the batched raw path intended for callers
770    /// that encode many rows, such as one Arrow `RecordBatch`, before handing
771    /// bytes to Tiberius.
772    ///
773    /// Empty payloads are rejected. This method performs only a cheap first-byte
774    /// check; callers are responsible for producing semantically valid row
775    /// bytes for this request's [`columns`]. After the last batch, [`finalize`]
776    /// must be called to flush the buffered data and complete the bulk load.
777    ///
778    /// [`columns`]: Self::columns
779    /// [`finalize`]: Self::finalize
780    pub async fn send_raw_rows_payload(&mut self, payload: impl AsRef<[u8]>) -> crate::Result<()> {
781        append_raw_rows_payload(&mut self.buf, payload.as_ref())?;
782        self.write_packets().await?;
783
784        Ok(())
785    }
786
787    /// Adds already-encoded complete TDS rows with row-token offset checks.
788    ///
789    /// This method has the same byte boundary as [`send_raw_rows_payload`]:
790    /// `payload` must contain one or more complete TDS rows and each row must
791    /// start with the TDS `ROW` token byte (`0xD1`). The `row_token_offsets`
792    /// slice identifies the byte offset of every row token in `payload`.
793    ///
794    /// The offset checks are intended as a cheap validation layer for batched
795    /// encoders. They verify that offsets are non-empty, start at zero, are
796    /// strictly increasing, are in bounds, and point at `ROW` tokens. They do
797    /// not parse or validate the row value payloads.
798    ///
799    /// [`send_raw_rows_payload`]: Self::send_raw_rows_payload
800    pub async fn send_raw_rows_payload_checked(
801        &mut self,
802        payload: impl AsRef<[u8]>,
803        row_token_offsets: impl AsRef<[usize]>,
804    ) -> crate::Result<()> {
805        append_raw_rows_payload_checked(
806            &mut self.buf,
807            payload.as_ref(),
808            row_token_offsets.as_ref(),
809        )?;
810        self.write_packets().await?;
811
812        Ok(())
813    }
814
815    /// Adds raw rows by encoding directly into this request's packet buffer.
816    ///
817    /// The closure receives the same internal buffer used by [`send`] and the
818    /// other raw bulk methods. It must append one or more complete TDS rows,
819    /// where each row starts with the TDS `ROW` token byte (`0xD1`), and then
820    /// return [`RawRowsAppend`] with row-token offsets relative to the appended
821    /// region.
822    ///
823    /// If the closure returns an error, or if the appended region fails
824    /// row-token validation, this method truncates the request buffer back to
825    /// its original length before returning the error. On success, it uses the
826    /// normal bulk-load packet splitting path.
827    ///
828    /// [`send`]: Self::send
829    pub async fn send_raw_rows_with<F>(&mut self, encode: F) -> crate::Result<()>
830    where
831        F: FnOnce(&mut RawRowsAppendBuffer<'_>) -> crate::Result<RawRowsAppend>,
832    {
833        append_raw_rows_with(&mut self.buf, encode)?;
834        self.write_packets().await?;
835
836        Ok(())
837    }
838
839    /// Ends the bulk load, flushing all pending data to the wire.
840    ///
841    /// This method must be called after sending all the data to flush all
842    /// pending data and to get the server actually to store the rows to the
843    /// table.
844    #[cfg(feature = "bulk-load-profile")]
845    pub async fn finalize(self) -> crate::Result<ExecuteResult> {
846        let (result, _) = self.finalize_with_stats().await?;
847        Ok(result)
848    }
849
850    /// Ends the bulk load, flushing all pending data to the wire.
851    ///
852    /// This method must be called after sending all the data to flush all
853    /// pending data and to get the server actually to store the rows to the
854    /// table.
855    #[cfg(not(feature = "bulk-load-profile"))]
856    pub async fn finalize(mut self) -> crate::Result<ExecuteResult> {
857        TokenDone::default().encode(&mut self.buf)?;
858        self.write_packets().await?;
859
860        let data = self.buf.split();
861        let data_len = if self.direct_packet_writes {
862            data.len().saturating_sub(HEADER_BYTES)
863        } else {
864            data.len()
865        };
866
867        event!(
868            Level::TRACE,
869            "Finalizing a bulk insert ({} bytes)",
870            data_len + HEADER_BYTES,
871        );
872
873        if self.direct_packet_writes {
874            let mut data = data;
875            let mut header = PacketHeader::bulk_load(self.packet_id);
876            header.set_status(PacketStatus::EndOfMessage);
877            let mut header_buf = &mut data[..HEADER_BYTES];
878            header.encode_for_payload(data_len, &mut header_buf)?;
879            self.connection.write_direct_packet_buffer(&data).await?;
880        } else {
881            let mut header = PacketHeader::bulk_load(self.packet_id);
882            header.set_status(PacketStatus::EndOfMessage);
883            self.connection.write_to_wire(header, data).await?;
884        }
885
886        self.connection.flush_sink().await?;
887        ExecuteResult::new(self.connection).await
888    }
889
890    /// Ends the bulk load and returns packet statistics collected by the request.
891    ///
892    /// This method has the same write behavior as [`finalize`], but also
893    /// returns the final packet counters that are otherwise unavailable because
894    /// finalization consumes the request.
895    ///
896    /// [`finalize`]: Self::finalize
897    #[cfg(feature = "bulk-load-profile")]
898    pub async fn finalize_with_packet_stats(
899        self,
900    ) -> crate::Result<(ExecuteResult, BulkLoadPacketStats)> {
901        let (result, stats) = self.finalize_with_stats().await?;
902        Ok((result, stats.packet))
903    }
904
905    /// Ends the bulk load and returns all benchmark statistics collected by the request.
906    ///
907    /// This method has the same write behavior as [`finalize`], but returns
908    /// packet counters and write timing counters after finalization consumes
909    /// the request.
910    ///
911    /// [`finalize`]: Self::finalize
912    #[cfg(feature = "bulk-load-profile")]
913    pub async fn finalize_with_stats(mut self) -> crate::Result<(ExecuteResult, BulkLoadStats)> {
914        let finalize_start = Instant::now();
915        TokenDone::default().encode(&mut self.buf)?;
916        self.write_packets().await?;
917
918        let data = self.buf.split();
919        let data_len = if self.direct_packet_writes {
920            data.len().saturating_sub(HEADER_BYTES)
921        } else {
922            data.len()
923        };
924        self.packet_stats.record_finalized_packet(data_len);
925
926        event!(
927            Level::TRACE,
928            "Finalizing a bulk insert ({} bytes)",
929            data_len + HEADER_BYTES,
930        );
931
932        let write_start = Instant::now();
933        if self.direct_packet_writes {
934            let mut data = data;
935            let mut header = PacketHeader::bulk_load(self.packet_id);
936            header.set_status(PacketStatus::EndOfMessage);
937            let mut header_buf = &mut data[..HEADER_BYTES];
938            header.encode_for_payload(data_len, &mut header_buf)?;
939            let write_result = self
940                .connection
941                .write_direct_packet_buffer_with_timing(&data)
942                .await;
943            let write_elapsed = write_start.elapsed();
944            self.write_timing_stats
945                .record_write_to_wire(write_elapsed, data_len);
946            match write_result {
947                Ok(direct_timing) => {
948                    self.write_timing_stats
949                        .record_direct_final_packet_write(direct_timing);
950                    self.write_timing_stats
951                        .record_finalize_write_to_wire_elapsed(write_elapsed);
952                }
953                Err(err) => return Err(err),
954            }
955        } else {
956            let mut header = PacketHeader::bulk_load(self.packet_id);
957            header.set_status(PacketStatus::EndOfMessage);
958            let write_result = self
959                .connection
960                .write_to_wire_with_timing(header, data)
961                .await;
962            let write_elapsed = write_start.elapsed();
963            self.write_timing_stats
964                .record_write_to_wire(write_elapsed, data_len);
965            match write_result {
966                Ok(connection_timing) => {
967                    self.write_timing_stats.record_connection_write(
968                        data_len,
969                        connection_timing.ready_elapsed,
970                        connection_timing.encode_elapsed,
971                        connection_timing.flush_elapsed,
972                    );
973                    self.write_timing_stats
974                        .record_finalize_write_to_wire_elapsed(write_elapsed);
975                }
976                Err(err) => return Err(err),
977            }
978        }
979
980        let flush_start = Instant::now();
981        let flush_result = self.connection.flush_sink().await;
982        let flush_elapsed = flush_start.elapsed();
983        self.write_timing_stats.record_flush(flush_elapsed);
984        self.write_timing_stats
985            .record_finalize_flush_elapsed(flush_elapsed);
986        flush_result?;
987
988        let result_start = Instant::now();
989        let result = ExecuteResult::new(self.connection).await?;
990        self.write_timing_stats
991            .record_finalize_result_elapsed(result_start.elapsed());
992        self.write_timing_stats
993            .record_finalize_elapsed(finalize_start.elapsed());
994        let stats = self.stats();
995
996        Ok((result, stats))
997    }
998
999    async fn write_packets(&mut self) -> crate::Result<()> {
1000        #[cfg(feature = "bulk-load-profile")]
1001        {
1002            let write_packets_start = Instant::now();
1003            let result = if self.direct_packet_writes {
1004                self.write_packets_direct_inner().await
1005            } else {
1006                self.write_packets_framed_inner().await
1007            };
1008            self.write_timing_stats
1009                .record_write_packets_elapsed(write_packets_start.elapsed());
1010            result
1011        }
1012
1013        #[cfg(not(feature = "bulk-load-profile"))]
1014        {
1015            if self.direct_packet_writes {
1016                self.write_packets_direct_inner().await
1017            } else {
1018                self.write_packets_framed_inner().await
1019            }
1020        }
1021    }
1022
1023    async fn write_packets_framed_inner(&mut self) -> crate::Result<()> {
1024        let packet_size = (self.connection.context().packet_size() as usize) - HEADER_BYTES;
1025        #[cfg(feature = "bulk-load-profile")]
1026        self.packet_stats.record_write_packets_call(self.buf.len());
1027
1028        while self.buf.len() > packet_size {
1029            let header = PacketHeader::bulk_load(self.packet_id);
1030            let data = self.buf.split_to(packet_size);
1031            #[cfg(feature = "bulk-load-profile")]
1032            self.packet_stats.record_packet_written(data.len());
1033
1034            event!(
1035                Level::TRACE,
1036                "Bulk insert packet ({} bytes)",
1037                data.len() + HEADER_BYTES,
1038            );
1039
1040            #[cfg(feature = "bulk-load-profile")]
1041            {
1042                let data_len = data.len();
1043                let write_start = Instant::now();
1044                let write_result = self
1045                    .connection
1046                    .write_to_wire_with_timing(header, data)
1047                    .await;
1048                self.write_timing_stats
1049                    .record_write_to_wire(write_start.elapsed(), data_len);
1050                match write_result {
1051                    Ok(connection_timing) => {
1052                        self.write_timing_stats.record_connection_write(
1053                            data_len,
1054                            connection_timing.ready_elapsed,
1055                            connection_timing.encode_elapsed,
1056                            connection_timing.flush_elapsed,
1057                        );
1058                    }
1059                    Err(err) => return Err(err),
1060                }
1061            }
1062            #[cfg(not(feature = "bulk-load-profile"))]
1063            self.connection.write_to_wire(header, data).await?;
1064        }
1065
1066        #[cfg(feature = "bulk-load-profile")]
1067        self.packet_stats
1068            .record_buffered_bytes_after_write(self.buf.len());
1069
1070        Ok(())
1071    }
1072
1073    async fn write_packets_direct_inner(&mut self) -> crate::Result<()> {
1074        let packet_size = (self.connection.context().packet_size() as usize) - HEADER_BYTES;
1075        #[cfg(feature = "bulk-load-profile")]
1076        self.packet_stats
1077            .record_write_packets_call(self.direct_payload_len());
1078
1079        let mut payload_start = HEADER_BYTES;
1080        let mut payload_remaining = self.direct_payload_len();
1081        let mut sent_packets = false;
1082
1083        while payload_remaining > packet_size {
1084            // The first packet uses the reserved header slot at byte 0. Later
1085            // packets put their header in the 8 bytes immediately before the
1086            // next payload chunk. Those bytes belonged to an earlier packet
1087            // that has already been written, so overwriting them is safe and
1088            // avoids building a fresh header-plus-payload buffer per packet.
1089            let header_start = payload_start - HEADER_BYTES;
1090            let packet_end = payload_start + packet_size;
1091            self.encode_direct_packet_header_at(
1092                header_start,
1093                packet_size,
1094                PacketStatus::NormalMessage,
1095            )?;
1096            #[cfg(feature = "bulk-load-profile")]
1097            self.packet_stats.record_packet_written(packet_size);
1098
1099            event!(
1100                Level::TRACE,
1101                "Bulk insert direct packet ({} bytes)",
1102                packet_size + HEADER_BYTES,
1103            );
1104
1105            #[cfg(feature = "bulk-load-profile")]
1106            {
1107                let write_start = Instant::now();
1108                let write_result = self
1109                    .connection
1110                    .write_direct_packet_buffer_with_timing(&self.buf[header_start..packet_end])
1111                    .await;
1112                self.write_timing_stats
1113                    .record_write_to_wire(write_start.elapsed(), packet_size);
1114                match write_result {
1115                    Ok(direct_timing) => self
1116                        .write_timing_stats
1117                        .record_direct_packet_write(direct_timing),
1118                    Err(err) => return Err(err),
1119                }
1120            }
1121            #[cfg(not(feature = "bulk-load-profile"))]
1122            self.connection
1123                .write_direct_packet_buffer(&self.buf[header_start..packet_end])
1124                .await?;
1125
1126            sent_packets = true;
1127            payload_start += packet_size;
1128            payload_remaining -= packet_size;
1129        }
1130
1131        if sent_packets {
1132            // Only the unsent tail remains relevant after the loop. It is at
1133            // most one packet payload, so copying it behind a fresh header slot
1134            // avoids the per-packet copy while preserving the direct-mode
1135            // buffer invariant for the next append or final packet.
1136            let remaining = self.buf.split_off(payload_start);
1137            self.buf.clear();
1138            self.buf = remaining;
1139            replace_with_direct_packet_buffer(&mut self.buf);
1140        }
1141
1142        #[cfg(feature = "bulk-load-profile")]
1143        self.packet_stats
1144            .record_buffered_bytes_after_write(self.direct_payload_len());
1145
1146        Ok(())
1147    }
1148}
1149
1150fn bulk_load_columns<'a>(
1151    columns: &'a [MetaDataColumn<'a>],
1152) -> impl ExactSizeIterator<Item = BulkLoadColumn<'a>> {
1153    columns
1154        .iter()
1155        .enumerate()
1156        .map(|(ordinal, column)| BulkLoadColumn { ordinal, column })
1157}
1158
1159fn append_raw_row_payload(buf: &mut BytesMut, payload: &[u8]) -> crate::Result<()> {
1160    if payload.is_empty() {
1161        return Err(crate::Error::BulkInput(
1162            "raw bulk row payload cannot be empty".into(),
1163        ));
1164    }
1165
1166    buf.put_u8(TokenType::Row as u8);
1167    buf.extend_from_slice(payload);
1168
1169    Ok(())
1170}
1171
1172fn append_raw_rows_payload(buf: &mut BytesMut, payload: &[u8]) -> crate::Result<()> {
1173    if payload.is_empty() {
1174        return Err(crate::Error::BulkInput(
1175            "raw bulk rows payload cannot be empty".into(),
1176        ));
1177    }
1178
1179    if payload[0] != TokenType::Row as u8 {
1180        return Err(crate::Error::BulkInput(
1181            "raw bulk rows payload must start with a TDS ROW token".into(),
1182        ));
1183    }
1184
1185    buf.extend_from_slice(payload);
1186
1187    Ok(())
1188}
1189
1190fn append_raw_rows_payload_checked(
1191    buf: &mut BytesMut,
1192    payload: &[u8],
1193    row_token_offsets: &[usize],
1194) -> crate::Result<()> {
1195    validate_raw_row_token_offsets(payload, row_token_offsets)?;
1196    buf.extend_from_slice(payload);
1197
1198    Ok(())
1199}
1200
1201/// Runs a rollback-safe append transaction against the raw bulk request buffer.
1202///
1203/// `buf` is the existing `BulkLoadRequest` buffer. It may already contain
1204/// column metadata, previously buffered rows, or both. The `encode` closure is
1205/// responsible for appending new complete TDS rows through
1206/// `RawRowsAppendBuffer`, then returning row-token offsets relative to only the
1207/// bytes it appended.
1208///
1209/// If encoding or validation fails, this helper truncates `buf` back to the
1210/// original length so the request can continue to behave as if the attempted
1211/// append never happened.
1212fn append_raw_rows_with<F>(buf: &mut BytesMut, encode: F) -> crate::Result<()>
1213where
1214    F: FnOnce(&mut RawRowsAppendBuffer<'_>) -> crate::Result<RawRowsAppend>,
1215{
1216    // Everything after this byte offset belongs to the attempted append.
1217    let start_len = buf.len();
1218    let mut raw_buf = RawRowsAppendBuffer { bytes: buf };
1219
1220    // The caller writes row bytes into `raw_buf` here.
1221    let append = match encode(&mut raw_buf) {
1222        Ok(append) => append,
1223        Err(err) => {
1224            raw_buf.bytes.truncate(start_len);
1225            return Err(err);
1226        }
1227    };
1228
1229    // Validate only the bytes appended by this call. Returned offsets are
1230    // relative to `raw_buf.bytes[start_len..]`, not the full request buffer.
1231    if let Err(err) =
1232        validate_raw_row_token_offsets(&raw_buf.bytes[start_len..], append.row_token_offsets())
1233    {
1234        raw_buf.bytes.truncate(start_len);
1235        return Err(err);
1236    }
1237
1238    Ok(())
1239}
1240
1241#[cfg(feature = "bulk-load-profile")]
1242fn usize_to_u64_saturating(value: usize) -> u64 {
1243    u64::try_from(value).unwrap_or(u64::MAX)
1244}
1245
1246fn validate_raw_row_token_offsets(
1247    payload: &[u8],
1248    row_token_offsets: &[usize],
1249) -> crate::Result<()> {
1250    if payload.is_empty() {
1251        return Err(crate::Error::BulkInput(
1252            "raw bulk rows payload cannot be empty".into(),
1253        ));
1254    }
1255
1256    if row_token_offsets.is_empty() {
1257        return Err(crate::Error::BulkInput(
1258            "raw bulk row token offsets cannot be empty".into(),
1259        ));
1260    }
1261
1262    if row_token_offsets[0] != 0 {
1263        return Err(crate::Error::BulkInput(
1264            "raw bulk row token offsets must start at zero".into(),
1265        ));
1266    }
1267
1268    let mut previous = None;
1269
1270    for &offset in row_token_offsets {
1271        if offset >= payload.len() {
1272            return Err(crate::Error::BulkInput(
1273                "raw bulk row token offset is out of bounds".into(),
1274            ));
1275        }
1276
1277        if previous.is_some_and(|previous| offset <= previous) {
1278            return Err(crate::Error::BulkInput(
1279                "raw bulk row token offsets must be strictly increasing".into(),
1280            ));
1281        }
1282
1283        if payload[offset] != TokenType::Row as u8 {
1284            return Err(crate::Error::BulkInput(
1285                "raw bulk row token offset must point to a TDS ROW token".into(),
1286            ));
1287        }
1288
1289        previous = Some(offset);
1290    }
1291
1292    Ok(())
1293}
1294
1295/// Read-only destination metadata for one bulk-load column.
1296#[derive(Debug, Clone, Copy)]
1297pub struct BulkLoadColumn<'a> {
1298    ordinal: usize,
1299    column: &'a MetaDataColumn<'a>,
1300}
1301
1302impl BulkLoadColumn<'_> {
1303    /// The zero-based ordinal in the bulk row encoding order.
1304    pub fn ordinal(&self) -> usize {
1305        self.ordinal
1306    }
1307
1308    /// The destination column name.
1309    pub fn name(&self) -> &str {
1310        &self.column.col_name
1311    }
1312
1313    /// The coarse logical column type.
1314    pub fn column_type(&self) -> ColumnType {
1315        ColumnType::from(&self.column.base.ty)
1316    }
1317
1318    /// The raw TDS column flags reported for this destination column.
1319    pub fn flags(&self) -> BitFlags<ColumnFlag> {
1320        self.column.base.flags
1321    }
1322
1323    /// Whether this destination column accepts null values.
1324    pub fn is_nullable(&self) -> bool {
1325        self.flags().contains(ColumnFlag::Nullable)
1326    }
1327
1328    /// Whether this destination column is updateable by bulk load.
1329    pub fn is_updateable(&self) -> bool {
1330        self.flags().contains(ColumnFlag::Updateable)
1331    }
1332
1333    /// The detailed TDS type metadata for direct bulk-row encoding.
1334    pub fn type_info(&self) -> &TypeInfo {
1335        &self.column.base.ty
1336    }
1337}
1338
1339#[cfg(test)]
1340mod tests {
1341    use std::borrow::Cow;
1342
1343    use super::*;
1344    use crate::tds::codec::{BaseMetaDataColumn, Decode, FixedLenType};
1345
1346    #[test]
1347    fn exposes_bulk_load_column_metadata() {
1348        let metadata = MetaDataColumn {
1349            base: BaseMetaDataColumn {
1350                flags: ColumnFlag::Nullable | ColumnFlag::Updateable,
1351                ty: TypeInfo::FixedLen(FixedLenType::Int4),
1352            },
1353            col_name: Cow::Borrowed("value"),
1354        };
1355
1356        let column = BulkLoadColumn {
1357            ordinal: 2,
1358            column: &metadata,
1359        };
1360
1361        assert_eq!(2, column.ordinal());
1362        assert_eq!("value", column.name());
1363        assert_eq!(ColumnType::Int4, column.column_type());
1364        assert!(column.is_nullable());
1365        assert!(column.is_updateable());
1366        assert!(column.flags().contains(ColumnFlag::Nullable));
1367        assert_eq!(&TypeInfo::FixedLen(FixedLenType::Int4), column.type_info());
1368    }
1369
1370    #[test]
1371    fn direct_packet_buffer_reserves_header_prefix() {
1372        let mut buf = BytesMut::from(&b"payload"[..]);
1373
1374        replace_with_direct_packet_buffer(&mut buf);
1375
1376        assert_eq!(direct_packet_payload_len(&buf), 7);
1377        assert_eq!(&buf[..HEADER_BYTES], &[0_u8; HEADER_BYTES]);
1378        assert_eq!(&buf[HEADER_BYTES..], b"payload");
1379    }
1380
1381    #[test]
1382    fn direct_packet_header_overwrites_already_sent_bytes() {
1383        let packet_payload_len = 4;
1384        let mut buf = BytesMut::from(&[0_u8; HEADER_BYTES][..]);
1385        buf.extend_from_slice(b"aaaabbbbcc");
1386
1387        encode_direct_packet_header(
1388            &mut buf,
1389            7,
1390            0,
1391            packet_payload_len,
1392            PacketStatus::NormalMessage,
1393        )
1394        .expect("first packet header should encode");
1395        encode_direct_packet_header(
1396            &mut buf,
1397            7,
1398            packet_payload_len,
1399            packet_payload_len,
1400            PacketStatus::NormalMessage,
1401        )
1402        .expect("second packet header should encode over previous bytes");
1403
1404        let second_header_start = packet_payload_len;
1405        let second_payload_start = second_header_start + HEADER_BYTES;
1406        let header = PacketHeader::decode(&mut BytesMut::from(
1407            &buf[second_header_start..second_payload_start],
1408        ))
1409        .expect("overwritten bytes should decode as a packet header");
1410
1411        assert_eq!(header.length(), (HEADER_BYTES + packet_payload_len) as u16);
1412        assert_eq!(header.status(), PacketStatus::NormalMessage);
1413        assert_eq!(
1414            &buf[second_payload_start..second_payload_start + packet_payload_len],
1415            b"bbbb"
1416        );
1417    }
1418
1419    #[test]
1420    #[cfg(feature = "bulk-load-profile")]
1421    fn bulk_load_packet_stats_default_to_zero() {
1422        let stats = BulkLoadPacketStats::default();
1423
1424        assert_eq!(stats.write_packets_calls, 0);
1425        assert_eq!(stats.packets_written, 0);
1426        assert_eq!(stats.packet_payload_bytes, 0);
1427        assert_eq!(stats.max_packet_payload_bytes, 0);
1428        assert_eq!(stats.max_buffered_bytes_before_write, 0);
1429        assert_eq!(stats.buffered_bytes_after_last_write, 0);
1430        assert_eq!(stats.finalized_packet_payload_bytes, 0);
1431    }
1432
1433    #[test]
1434    #[cfg(feature = "bulk-load-profile")]
1435    fn bulk_load_packet_stats_record_write_attempts_and_buffer_tail() {
1436        let mut stats = BulkLoadPacketStats::default();
1437
1438        stats.record_write_packets_call(128);
1439        stats.record_buffered_bytes_after_write(17);
1440        stats.record_write_packets_call(64);
1441        stats.record_buffered_bytes_after_write(3);
1442
1443        assert_eq!(stats.write_packets_calls, 2);
1444        assert_eq!(stats.max_buffered_bytes_before_write, 128);
1445        assert_eq!(stats.buffered_bytes_after_last_write, 3);
1446        assert_eq!(stats.packets_written, 0);
1447    }
1448
1449    #[test]
1450    #[cfg(feature = "bulk-load-profile")]
1451    fn bulk_load_packet_stats_accumulate_packet_bytes_and_maxima() {
1452        let mut stats = BulkLoadPacketStats::default();
1453
1454        stats.record_packet_written(4);
1455        stats.record_packet_written(9);
1456        stats.record_packet_written(7);
1457
1458        assert_eq!(stats.packets_written, 3);
1459        assert_eq!(stats.packet_payload_bytes, 20);
1460        assert_eq!(stats.max_packet_payload_bytes, 9);
1461    }
1462
1463    #[test]
1464    #[cfg(feature = "bulk-load-profile")]
1465    fn bulk_load_packet_stats_record_final_packet_separately() {
1466        let mut stats = BulkLoadPacketStats::default();
1467
1468        stats.record_packet_written(4096);
1469        stats.record_finalized_packet(41);
1470
1471        assert_eq!(stats.packet_payload_bytes, 4096);
1472        assert_eq!(stats.finalized_packet_payload_bytes, 41);
1473    }
1474
1475    #[test]
1476    #[cfg(feature = "bulk-load-profile")]
1477    fn bulk_load_write_timing_stats_default_to_zero() {
1478        let stats = BulkLoadWriteTimingStats::default();
1479
1480        assert_eq!(stats.write_packets_elapsed, Duration::ZERO);
1481        assert_eq!(stats.write_to_wire_calls, 0);
1482        assert_eq!(stats.write_to_wire_elapsed, Duration::ZERO);
1483        assert_eq!(stats.write_to_wire_payload_bytes, 0);
1484        assert_eq!(stats.max_write_to_wire_elapsed, Duration::ZERO);
1485        assert_eq!(stats.max_write_to_wire_payload_bytes, 0);
1486        assert_eq!(stats.flush_calls, 0);
1487        assert_eq!(stats.flush_elapsed, Duration::ZERO);
1488        assert_eq!(stats.max_flush_elapsed, Duration::ZERO);
1489        assert_eq!(stats.finalize_elapsed, Duration::ZERO);
1490        assert_eq!(stats.finalize_write_to_wire_elapsed, Duration::ZERO);
1491        assert_eq!(stats.finalize_flush_elapsed, Duration::ZERO);
1492        assert_eq!(stats.finalize_result_elapsed, Duration::ZERO);
1493        assert_eq!(
1494            stats.connection_write,
1495            BulkLoadConnectionWriteStats::default()
1496        );
1497        assert_eq!(
1498            stats.direct_packet_write,
1499            BulkLoadDirectPacketWriteStats::default()
1500        );
1501    }
1502
1503    #[test]
1504    #[cfg(feature = "bulk-load-profile")]
1505    fn bulk_load_connection_write_stats_default_to_zero() {
1506        let stats = BulkLoadConnectionWriteStats::default();
1507
1508        assert_eq!(stats.calls, 0);
1509        assert_eq!(stats.payload_bytes, 0);
1510        assert_eq!(stats.ready_elapsed, Duration::ZERO);
1511        assert_eq!(stats.encode_elapsed, Duration::ZERO);
1512        assert_eq!(stats.flush_elapsed, Duration::ZERO);
1513        assert_eq!(stats.max_ready_elapsed, Duration::ZERO);
1514        assert_eq!(stats.max_encode_elapsed, Duration::ZERO);
1515        assert_eq!(stats.max_flush_elapsed, Duration::ZERO);
1516        assert_eq!(stats.max_payload_bytes, 0);
1517    }
1518
1519    #[test]
1520    #[cfg(feature = "bulk-load-profile")]
1521    fn bulk_load_connection_write_stats_accumulate_and_track_maxima() {
1522        let mut stats = BulkLoadConnectionWriteStats::default();
1523
1524        stats.record(
1525            128,
1526            Duration::from_millis(3),
1527            Duration::from_millis(5),
1528            Duration::from_millis(7),
1529        );
1530        stats.record(
1531            256,
1532            Duration::from_millis(11),
1533            Duration::from_millis(2),
1534            Duration::from_millis(13),
1535        );
1536
1537        assert_eq!(stats.calls, 2);
1538        assert_eq!(stats.payload_bytes, 384);
1539        assert_eq!(stats.ready_elapsed, Duration::from_millis(14));
1540        assert_eq!(stats.encode_elapsed, Duration::from_millis(7));
1541        assert_eq!(stats.flush_elapsed, Duration::from_millis(20));
1542        assert_eq!(stats.max_ready_elapsed, Duration::from_millis(11));
1543        assert_eq!(stats.max_encode_elapsed, Duration::from_millis(5));
1544        assert_eq!(stats.max_flush_elapsed, Duration::from_millis(13));
1545        assert_eq!(stats.max_payload_bytes, 256);
1546    }
1547
1548    #[test]
1549    #[cfg(feature = "bulk-load-profile")]
1550    fn bulk_load_direct_packet_write_stats_default_to_zero() {
1551        let stats = BulkLoadDirectPacketWriteStats::default();
1552
1553        assert_eq!(stats.calls, 0);
1554        assert_eq!(stats.payload_bytes, 0);
1555        assert_eq!(stats.header_bytes, 0);
1556        assert_eq!(stats.max_payload_bytes, 0);
1557        assert_eq!(stats.final_calls, 0);
1558        assert_eq!(stats.final_payload_bytes, 0);
1559        assert_eq!(stats.final_header_bytes, 0);
1560        assert_eq!(stats.raw_stream_calls, 0);
1561        assert_eq!(stats.tls_stream_calls, 0);
1562        assert_eq!(stats.write_calls, 0);
1563        assert_eq!(stats.write_bytes, 0);
1564        assert_eq!(stats.max_write_bytes, 0);
1565        assert_eq!(stats.write_elapsed, Duration::ZERO);
1566        assert_eq!(stats.max_write_elapsed, Duration::ZERO);
1567        assert_eq!(stats.header_write_calls, 0);
1568        assert_eq!(stats.header_write_bytes, 0);
1569        assert_eq!(stats.header_max_write_bytes, 0);
1570        assert_eq!(stats.header_write_elapsed, Duration::ZERO);
1571        assert_eq!(stats.header_max_write_elapsed, Duration::ZERO);
1572        assert_eq!(stats.header_partial_writes, 0);
1573        assert_eq!(stats.payload_write_calls, 0);
1574        assert_eq!(stats.payload_write_bytes, 0);
1575        assert_eq!(stats.payload_max_write_bytes, 0);
1576        assert_eq!(stats.payload_write_elapsed, Duration::ZERO);
1577        assert_eq!(stats.payload_max_write_elapsed, Duration::ZERO);
1578        assert_eq!(stats.payload_partial_writes, 0);
1579        assert_eq!(stats.poll_write_polls, 0);
1580        assert_eq!(stats.poll_write_pending_count, 0);
1581        assert_eq!(stats.poll_write_pending_elapsed, Duration::ZERO);
1582        assert_eq!(stats.poll_write_max_pending_elapsed, Duration::ZERO);
1583        assert_eq!(stats.poll_write_ready_count, 0);
1584        assert_eq!(stats.poll_write_ready_elapsed, Duration::ZERO);
1585        assert_eq!(stats.poll_write_max_ready_elapsed, Duration::ZERO);
1586        assert_eq!(stats.flush_calls, 0);
1587        assert_eq!(stats.flush_elapsed, Duration::ZERO);
1588        assert_eq!(stats.max_flush_elapsed, Duration::ZERO);
1589        assert_eq!(stats.flush_pending_count, 0);
1590        assert_eq!(stats.flush_pending_elapsed, Duration::ZERO);
1591        assert_eq!(stats.flush_max_pending_elapsed, Duration::ZERO);
1592    }
1593
1594    #[test]
1595    #[cfg(feature = "bulk-load-profile")]
1596    fn bulk_load_direct_packet_write_stats_accumulate_and_track_maxima() {
1597        let mut stats = BulkLoadDirectPacketWriteStats::default();
1598
1599        stats.record_packet(128, HEADER_BYTES, false);
1600        stats.record_packet(256, HEADER_BYTES, true);
1601        stats.record_stream_mode(true, false);
1602        stats.record_stream_mode(false, true);
1603        stats.record_write_summary(
1604            2,
1605            576,
1606            512,
1607            Duration::from_millis(14),
1608            Duration::from_millis(11),
1609        );
1610        stats.record_header_write_summary(
1611            3,
1612            24,
1613            HEADER_BYTES,
1614            Duration::from_millis(17),
1615            Duration::from_millis(13),
1616            1,
1617        );
1618        stats.record_payload_write_summary(
1619            5,
1620            552,
1621            384,
1622            Duration::from_millis(19),
1623            Duration::from_millis(15),
1624            2,
1625        );
1626        stats.record_poll_write_summary(DirectPacketPollWriteSummary {
1627            polls: 11,
1628            pending_count: 7,
1629            pending_elapsed: Duration::from_millis(23),
1630            max_pending_elapsed: Duration::from_millis(17),
1631            ready_count: 4,
1632            ready_elapsed: Duration::from_millis(29),
1633            max_ready_elapsed: Duration::from_millis(19),
1634        });
1635        stats.record_flush(
1636            Duration::from_millis(5),
1637            2,
1638            Duration::from_millis(3),
1639            Duration::from_millis(2),
1640        );
1641        stats.record_flush(
1642            Duration::from_millis(7),
1643            3,
1644            Duration::from_millis(4),
1645            Duration::from_millis(3),
1646        );
1647
1648        assert_eq!(stats.calls, 2);
1649        assert_eq!(stats.payload_bytes, 384);
1650        assert_eq!(stats.header_bytes, u64::try_from(HEADER_BYTES * 2).unwrap());
1651        assert_eq!(stats.max_payload_bytes, 256);
1652        assert_eq!(stats.final_calls, 1);
1653        assert_eq!(stats.final_payload_bytes, 256);
1654        assert_eq!(
1655            stats.final_header_bytes,
1656            u64::try_from(HEADER_BYTES).unwrap()
1657        );
1658        assert_eq!(stats.raw_stream_calls, 1);
1659        assert_eq!(stats.tls_stream_calls, 1);
1660        assert_eq!(stats.write_calls, 2);
1661        assert_eq!(stats.write_bytes, 576);
1662        assert_eq!(stats.max_write_bytes, 512);
1663        assert_eq!(stats.write_elapsed, Duration::from_millis(14));
1664        assert_eq!(stats.max_write_elapsed, Duration::from_millis(11));
1665        assert_eq!(stats.header_write_calls, 3);
1666        assert_eq!(stats.header_write_bytes, 24);
1667        assert_eq!(stats.header_max_write_bytes, HEADER_BYTES);
1668        assert_eq!(stats.header_write_elapsed, Duration::from_millis(17));
1669        assert_eq!(stats.header_max_write_elapsed, Duration::from_millis(13));
1670        assert_eq!(stats.header_partial_writes, 1);
1671        assert_eq!(stats.payload_write_calls, 5);
1672        assert_eq!(stats.payload_write_bytes, 552);
1673        assert_eq!(stats.payload_max_write_bytes, 384);
1674        assert_eq!(stats.payload_write_elapsed, Duration::from_millis(19));
1675        assert_eq!(stats.payload_max_write_elapsed, Duration::from_millis(15));
1676        assert_eq!(stats.payload_partial_writes, 2);
1677        assert_eq!(stats.poll_write_polls, 11);
1678        assert_eq!(stats.poll_write_pending_count, 7);
1679        assert_eq!(stats.poll_write_pending_elapsed, Duration::from_millis(23));
1680        assert_eq!(
1681            stats.poll_write_max_pending_elapsed,
1682            Duration::from_millis(17)
1683        );
1684        assert_eq!(stats.poll_write_ready_count, 4);
1685        assert_eq!(stats.poll_write_ready_elapsed, Duration::from_millis(29));
1686        assert_eq!(
1687            stats.poll_write_max_ready_elapsed,
1688            Duration::from_millis(19)
1689        );
1690        assert_eq!(stats.flush_calls, 2);
1691        assert_eq!(stats.flush_elapsed, Duration::from_millis(12));
1692        assert_eq!(stats.max_flush_elapsed, Duration::from_millis(7));
1693        assert_eq!(stats.flush_pending_count, 5);
1694        assert_eq!(stats.flush_pending_elapsed, Duration::from_millis(7));
1695        assert_eq!(stats.flush_max_pending_elapsed, Duration::from_millis(3));
1696    }
1697
1698    #[test]
1699    #[cfg(feature = "bulk-load-profile")]
1700    fn bulk_load_write_timing_stats_accumulate_write_packets_elapsed() {
1701        let mut stats = BulkLoadWriteTimingStats::default();
1702
1703        stats.record_write_packets_elapsed(Duration::from_millis(7));
1704        stats.record_write_packets_elapsed(Duration::from_millis(11));
1705
1706        assert_eq!(stats.write_packets_elapsed, Duration::from_millis(18));
1707    }
1708
1709    #[test]
1710    #[cfg(feature = "bulk-load-profile")]
1711    fn bulk_load_write_timing_stats_accumulate_write_to_wire() {
1712        let mut stats = BulkLoadWriteTimingStats::default();
1713
1714        stats.record_write_to_wire(Duration::from_millis(13), 128);
1715        stats.record_write_to_wire(Duration::from_millis(17), 256);
1716
1717        assert_eq!(stats.write_to_wire_calls, 2);
1718        assert_eq!(stats.write_to_wire_elapsed, Duration::from_millis(30));
1719        assert_eq!(stats.write_to_wire_payload_bytes, 384);
1720        assert_eq!(stats.max_write_to_wire_elapsed, Duration::from_millis(17));
1721        assert_eq!(stats.max_write_to_wire_payload_bytes, 256);
1722    }
1723
1724    #[test]
1725    #[cfg(feature = "bulk-load-profile")]
1726    fn bulk_load_write_timing_stats_accumulate_flush_and_finalize() {
1727        let mut stats = BulkLoadWriteTimingStats::default();
1728
1729        stats.record_flush(Duration::from_millis(19));
1730        stats.record_flush(Duration::from_millis(23));
1731        stats.record_finalize_elapsed(Duration::from_millis(29));
1732        stats.record_finalize_elapsed(Duration::from_millis(31));
1733        stats.record_finalize_write_to_wire_elapsed(Duration::from_millis(37));
1734        stats.record_finalize_flush_elapsed(Duration::from_millis(41));
1735        stats.record_finalize_result_elapsed(Duration::from_millis(43));
1736        stats.record_connection_write(
1737            128,
1738            Duration::from_millis(47),
1739            Duration::from_millis(53),
1740            Duration::from_millis(59),
1741        );
1742
1743        assert_eq!(stats.flush_calls, 2);
1744        assert_eq!(stats.flush_elapsed, Duration::from_millis(42));
1745        assert_eq!(stats.max_flush_elapsed, Duration::from_millis(23));
1746        assert_eq!(stats.finalize_elapsed, Duration::from_millis(60));
1747        assert_eq!(
1748            stats.finalize_write_to_wire_elapsed,
1749            Duration::from_millis(37)
1750        );
1751        assert_eq!(stats.finalize_flush_elapsed, Duration::from_millis(41));
1752        assert_eq!(stats.finalize_result_elapsed, Duration::from_millis(43));
1753        assert_eq!(stats.connection_write.calls, 1);
1754        assert_eq!(stats.connection_write.payload_bytes, 128);
1755        assert_eq!(
1756            stats.connection_write.ready_elapsed,
1757            Duration::from_millis(47)
1758        );
1759        assert_eq!(
1760            stats.connection_write.encode_elapsed,
1761            Duration::from_millis(53)
1762        );
1763        assert_eq!(
1764            stats.connection_write.flush_elapsed,
1765            Duration::from_millis(59)
1766        );
1767    }
1768
1769    #[test]
1770    fn appends_single_raw_row_payload_with_row_token() {
1771        let mut buf = BytesMut::new();
1772
1773        append_raw_row_payload(&mut buf, &[0x01, 0x02, 0x03]).expect("payload should append");
1774
1775        assert_eq!(&[TokenType::Row as u8, 0x01, 0x02, 0x03], &buf[..]);
1776    }
1777
1778    #[test]
1779    fn rejects_empty_single_raw_row_payload() {
1780        let mut buf = BytesMut::new();
1781
1782        append_raw_row_payload(&mut buf, &[]).expect_err("empty payload should fail");
1783
1784        assert!(buf.is_empty());
1785    }
1786
1787    #[test]
1788    fn appends_batched_raw_rows_payload_unchanged() {
1789        let mut buf = BytesMut::new();
1790        let payload = [TokenType::Row as u8, 0x01, TokenType::Row as u8, 0x02, 0x03];
1791
1792        append_raw_rows_payload(&mut buf, &payload).expect("payload should append");
1793
1794        assert_eq!(&payload, &buf[..]);
1795    }
1796
1797    #[test]
1798    fn rejects_empty_batched_raw_rows_payload() {
1799        let mut buf = BytesMut::new();
1800
1801        append_raw_rows_payload(&mut buf, &[]).expect_err("empty payload should fail");
1802
1803        assert!(buf.is_empty());
1804    }
1805
1806    #[test]
1807    fn rejects_batched_raw_rows_payload_without_row_token() {
1808        let mut buf = BytesMut::new();
1809
1810        append_raw_rows_payload(&mut buf, &[0x01, 0x02])
1811            .expect_err("payload without row token should fail");
1812
1813        assert!(buf.is_empty());
1814    }
1815
1816    #[test]
1817    fn appends_checked_batched_raw_rows_payload_unchanged() {
1818        let mut buf = BytesMut::new();
1819        let payload = [TokenType::Row as u8, 0x01, TokenType::Row as u8, 0x02, 0x03];
1820
1821        append_raw_rows_payload_checked(&mut buf, &payload, &[0, 2])
1822            .expect("payload should append");
1823
1824        assert_eq!(&payload, &buf[..]);
1825    }
1826
1827    #[test]
1828    fn appends_raw_rows_with_relative_offsets_after_existing_bytes() {
1829        let mut buf = BytesMut::from(&b"prefix"[..]);
1830        let payload = [TokenType::Row as u8, 0x01, TokenType::Row as u8, 0x02, 0x03];
1831
1832        append_raw_rows_with(&mut buf, |buf| {
1833            buf.extend_from_slice(&payload);
1834            Ok(RawRowsAppend::new(vec![0, 2]))
1835        })
1836        .expect("payload should append");
1837
1838        assert_eq!(&b"prefix"[..], &buf[..6]);
1839        assert_eq!(&payload, &buf[6..]);
1840    }
1841
1842    #[test]
1843    fn rolls_back_raw_rows_with_closure_error() {
1844        let mut buf = BytesMut::from(&b"prefix"[..]);
1845
1846        append_raw_rows_with(&mut buf, |buf| {
1847            buf.extend_from_slice(&[TokenType::Row as u8, 0x01]);
1848            Err(crate::Error::BulkInput(Cow::Borrowed(
1849                "fake append failure",
1850            )))
1851        })
1852        .expect_err("closure error should fail");
1853
1854        assert_eq!(&b"prefix"[..], &buf[..]);
1855    }
1856
1857    #[test]
1858    fn rolls_back_raw_rows_with_validation_error_after_append() {
1859        let mut buf = BytesMut::from(&b"prefix"[..]);
1860
1861        append_raw_rows_with(&mut buf, |buf| {
1862            buf.extend_from_slice(&[TokenType::Row as u8, 0x01]);
1863            Ok(RawRowsAppend::new(vec![1]))
1864        })
1865        .expect_err("validation error should fail");
1866
1867        assert_eq!(&b"prefix"[..], &buf[..]);
1868    }
1869
1870    #[test]
1871    fn rejects_empty_raw_rows_with_append() {
1872        let mut buf = BytesMut::new();
1873
1874        append_raw_rows_with(&mut buf, |_| Ok(RawRowsAppend::new(vec![0])))
1875            .expect_err("empty append should fail");
1876
1877        assert!(buf.is_empty());
1878    }
1879
1880    #[test]
1881    fn rejects_invalid_raw_rows_with_offsets_and_rolls_back() {
1882        let cases: &[(&[u8], &[usize])] = &[
1883            (&[TokenType::Row as u8], &[]),
1884            (&[0x00, TokenType::Row as u8], &[1]),
1885            (&[TokenType::Row as u8], &[0, 1]),
1886            (&[TokenType::Row as u8, 0x01], &[0, 0]),
1887            (&[TokenType::Row as u8, 0x01, 0x02], &[0, 1]),
1888        ];
1889
1890        for (payload, row_token_offsets) in cases {
1891            let mut buf = BytesMut::from(&b"prefix"[..]);
1892
1893            append_raw_rows_with(&mut buf, |buf| {
1894                buf.extend_from_slice(payload);
1895                Ok(RawRowsAppend::new(row_token_offsets.to_vec()))
1896            })
1897            .expect_err("invalid row offsets should fail");
1898
1899            assert_eq!(&b"prefix"[..], &buf[..]);
1900        }
1901    }
1902
1903    #[test]
1904    fn rejects_checked_batched_raw_rows_payload_with_empty_offsets() {
1905        let mut buf = BytesMut::new();
1906
1907        append_raw_rows_payload_checked(&mut buf, &[TokenType::Row as u8], &[])
1908            .expect_err("empty offsets should fail");
1909
1910        assert!(buf.is_empty());
1911    }
1912
1913    #[test]
1914    fn rejects_checked_batched_raw_rows_payload_with_nonzero_first_offset() {
1915        let mut buf = BytesMut::new();
1916
1917        append_raw_rows_payload_checked(&mut buf, &[0x00, TokenType::Row as u8], &[1])
1918            .expect_err("nonzero first offset should fail");
1919
1920        assert!(buf.is_empty());
1921    }
1922
1923    #[test]
1924    fn rejects_checked_batched_raw_rows_payload_with_out_of_bounds_offset() {
1925        let mut buf = BytesMut::new();
1926
1927        append_raw_rows_payload_checked(&mut buf, &[TokenType::Row as u8], &[0, 1])
1928            .expect_err("out of bounds offset should fail");
1929
1930        assert!(buf.is_empty());
1931    }
1932
1933    #[test]
1934    fn rejects_checked_batched_raw_rows_payload_with_non_increasing_offsets() {
1935        let mut buf = BytesMut::new();
1936
1937        append_raw_rows_payload_checked(&mut buf, &[TokenType::Row as u8, 0x01], &[0, 0])
1938            .expect_err("repeated offset should fail");
1939
1940        assert!(buf.is_empty());
1941    }
1942
1943    #[test]
1944    fn rejects_checked_batched_raw_rows_payload_with_offset_not_on_row_token() {
1945        let mut buf = BytesMut::new();
1946
1947        append_raw_rows_payload_checked(&mut buf, &[TokenType::Row as u8, 0x01, 0x02], &[0, 1])
1948            .expect_err("offset not on row token should fail");
1949
1950        assert!(buf.is_empty());
1951    }
1952}