1use asynchronous_codec::BytesMut;
2use bytes::BufMut;
3use enumflags2::BitFlags;
4use futures_util::io::{AsyncRead, AsyncWrite};
5#[cfg(feature = "bulk-load-profile")]
6use std::time::{Duration, Instant};
7use tracing::{event, Level};
8
9#[cfg(feature = "bulk-load-profile")]
10use crate::client::{DirectPacketPollWriteSummary, DirectPacketWriteTiming};
11use crate::{
12 client::Connection, sql_read_bytes::SqlReadBytes, BytesMutWithDataColumns, ColumnFlag,
13 ColumnType, ExecuteResult,
14};
15
16use super::{
17 Encode, MetaDataColumn, PacketHeader, PacketStatus, TokenColMetaData, TokenDone, TokenRow,
18 TokenType, TypeInfo, HEADER_BYTES,
19};
20
21#[derive(Debug, Clone)]
23pub struct BulkLoadColumns<'a> {
24 columns: Vec<MetaDataColumn<'a>>,
25}
26
27impl<'a> BulkLoadColumns<'a> {
28 pub(crate) fn new(columns: Vec<MetaDataColumn<'a>>) -> Self {
29 Self { columns }
30 }
31
32 pub(crate) fn into_inner(self) -> Vec<MetaDataColumn<'a>> {
33 self.columns
34 }
35
36 pub fn len(&self) -> usize {
38 self.columns.len()
39 }
40
41 pub fn is_empty(&self) -> bool {
43 self.columns.is_empty()
44 }
45
46 pub fn iter(&self) -> impl ExactSizeIterator<Item = BulkLoadColumn<'_>> {
48 bulk_load_columns(&self.columns)
49 }
50}
51
52#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct RawRowsAppend {
55 row_token_offsets: Vec<usize>,
56}
57
58impl RawRowsAppend {
59 pub fn new(row_token_offsets: Vec<usize>) -> Self {
64 Self { row_token_offsets }
65 }
66
67 pub fn row_token_offsets(&self) -> &[usize] {
69 &self.row_token_offsets
70 }
71}
72
73#[derive(Debug)]
82pub struct RawRowsAppendBuffer<'a> {
83 bytes: &'a mut BytesMut,
84}
85
86impl RawRowsAppendBuffer<'_> {
87 pub fn extend_from_slice(&mut self, slice: &[u8]) {
89 self.bytes.extend_from_slice(slice);
90 }
91
92 pub fn put_u8(&mut self, value: u8) {
94 self.bytes.put_u8(value);
95 }
96
97 pub fn put_u16_le(&mut self, value: u16) {
99 self.bytes.put_u16_le(value);
100 }
101
102 pub fn put_u32_le(&mut self, value: u32) {
104 self.bytes.put_u32_le(value);
105 }
106
107 pub fn put_u64_le(&mut self, value: u64) {
109 self.bytes.put_u64_le(value);
110 }
111}
112
113#[cfg(feature = "bulk-load-profile")]
119#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
120pub struct BulkLoadPacketStats {
121 pub write_packets_calls: u64,
123 pub packets_written: u64,
125 pub packet_payload_bytes: u64,
127 pub max_packet_payload_bytes: usize,
129 pub max_buffered_bytes_before_write: usize,
131 pub buffered_bytes_after_last_write: usize,
133 pub finalized_packet_payload_bytes: usize,
135}
136
137#[cfg(feature = "bulk-load-profile")]
138impl BulkLoadPacketStats {
139 fn record_write_packets_call(&mut self, buffered_bytes_before_write: usize) {
140 self.write_packets_calls = self.write_packets_calls.saturating_add(1);
141 self.max_buffered_bytes_before_write = self
142 .max_buffered_bytes_before_write
143 .max(buffered_bytes_before_write);
144 }
145
146 fn record_packet_written(&mut self, packet_payload_bytes: usize) {
147 self.packets_written = self.packets_written.saturating_add(1);
148 self.packet_payload_bytes = self
149 .packet_payload_bytes
150 .saturating_add(usize_to_u64_saturating(packet_payload_bytes));
151 self.max_packet_payload_bytes = self.max_packet_payload_bytes.max(packet_payload_bytes);
152 }
153
154 fn record_buffered_bytes_after_write(&mut self, buffered_bytes_after_write: usize) {
155 self.buffered_bytes_after_last_write = buffered_bytes_after_write;
156 }
157
158 fn record_finalized_packet(&mut self, packet_payload_bytes: usize) {
159 self.finalized_packet_payload_bytes = packet_payload_bytes;
160 }
161}
162
163#[cfg(feature = "bulk-load-profile")]
169#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
170pub struct BulkLoadWriteTimingStats {
171 pub write_packets_elapsed: Duration,
173 pub write_to_wire_calls: u64,
175 pub write_to_wire_elapsed: Duration,
177 pub write_to_wire_payload_bytes: u64,
179 pub max_write_to_wire_elapsed: Duration,
181 pub max_write_to_wire_payload_bytes: usize,
183 pub flush_calls: u64,
185 pub flush_elapsed: Duration,
187 pub max_flush_elapsed: Duration,
189 pub finalize_elapsed: Duration,
191 pub finalize_write_to_wire_elapsed: Duration,
193 pub finalize_flush_elapsed: Duration,
195 pub finalize_result_elapsed: Duration,
197 pub connection_write: BulkLoadConnectionWriteStats,
200 pub direct_packet_write: BulkLoadDirectPacketWriteStats,
202}
203
204#[cfg(feature = "bulk-load-profile")]
205impl BulkLoadWriteTimingStats {
206 fn record_write_packets_elapsed(&mut self, elapsed: Duration) {
207 self.write_packets_elapsed += elapsed;
208 }
209
210 fn record_write_to_wire(&mut self, elapsed: Duration, payload_bytes: usize) {
211 self.write_to_wire_calls = self.write_to_wire_calls.saturating_add(1);
212 self.write_to_wire_elapsed += elapsed;
213 self.write_to_wire_payload_bytes = self
214 .write_to_wire_payload_bytes
215 .saturating_add(usize_to_u64_saturating(payload_bytes));
216 self.max_write_to_wire_elapsed = self.max_write_to_wire_elapsed.max(elapsed);
217 self.max_write_to_wire_payload_bytes =
218 self.max_write_to_wire_payload_bytes.max(payload_bytes);
219 }
220
221 fn record_flush(&mut self, elapsed: Duration) {
222 self.flush_calls = self.flush_calls.saturating_add(1);
223 self.flush_elapsed += elapsed;
224 self.max_flush_elapsed = self.max_flush_elapsed.max(elapsed);
225 }
226
227 fn record_finalize_elapsed(&mut self, elapsed: Duration) {
228 self.finalize_elapsed += elapsed;
229 }
230
231 fn record_finalize_write_to_wire_elapsed(&mut self, elapsed: Duration) {
232 self.finalize_write_to_wire_elapsed += elapsed;
233 }
234
235 fn record_finalize_flush_elapsed(&mut self, elapsed: Duration) {
236 self.finalize_flush_elapsed += elapsed;
237 }
238
239 fn record_finalize_result_elapsed(&mut self, elapsed: Duration) {
240 self.finalize_result_elapsed += elapsed;
241 }
242
243 fn record_connection_write(
244 &mut self,
245 payload_bytes: usize,
246 ready_elapsed: Duration,
247 encode_elapsed: Duration,
248 flush_elapsed: Duration,
249 ) {
250 self.connection_write
251 .record(payload_bytes, ready_elapsed, encode_elapsed, flush_elapsed);
252 }
253
254 fn record_direct_packet_write(&mut self, timing: DirectPacketWriteTiming) {
255 self.direct_packet_write.record_timing(timing, false);
256 }
257
258 fn record_direct_final_packet_write(&mut self, timing: DirectPacketWriteTiming) {
259 self.direct_packet_write.record_timing(timing, true);
260 }
261}
262
263#[cfg(feature = "bulk-load-profile")]
271#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
272pub struct BulkLoadConnectionWriteStats {
273 pub calls: u64,
275 pub payload_bytes: u64,
278 pub ready_elapsed: Duration,
280 pub encode_elapsed: Duration,
282 pub flush_elapsed: Duration,
284 pub max_ready_elapsed: Duration,
286 pub max_encode_elapsed: Duration,
288 pub max_flush_elapsed: Duration,
290 pub max_payload_bytes: usize,
292}
293
294#[cfg(feature = "bulk-load-profile")]
295impl BulkLoadConnectionWriteStats {
296 fn record(
297 &mut self,
298 payload_bytes: usize,
299 ready_elapsed: Duration,
300 encode_elapsed: Duration,
301 flush_elapsed: Duration,
302 ) {
303 self.calls = self.calls.saturating_add(1);
304 self.payload_bytes = self
305 .payload_bytes
306 .saturating_add(usize_to_u64_saturating(payload_bytes));
307 self.ready_elapsed += ready_elapsed;
308 self.encode_elapsed += encode_elapsed;
309 self.flush_elapsed += flush_elapsed;
310 self.max_ready_elapsed = self.max_ready_elapsed.max(ready_elapsed);
311 self.max_encode_elapsed = self.max_encode_elapsed.max(encode_elapsed);
312 self.max_flush_elapsed = self.max_flush_elapsed.max(flush_elapsed);
313 self.max_payload_bytes = self.max_payload_bytes.max(payload_bytes);
314 }
315}
316
317#[cfg(feature = "bulk-load-profile")]
324#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
325pub struct BulkLoadDirectPacketWriteStats {
326 pub calls: u64,
328 pub payload_bytes: u64,
330 pub header_bytes: u64,
332 pub max_payload_bytes: usize,
334 pub final_calls: u64,
336 pub final_payload_bytes: u64,
338 pub final_header_bytes: u64,
340 pub raw_stream_calls: u64,
342 pub tls_stream_calls: u64,
344 pub write_calls: u64,
346 pub write_bytes: u64,
348 pub max_write_bytes: usize,
350 pub write_elapsed: Duration,
352 pub max_write_elapsed: Duration,
354 pub header_write_calls: u64,
356 pub header_write_bytes: u64,
358 pub header_max_write_bytes: usize,
360 pub header_write_elapsed: Duration,
362 pub header_max_write_elapsed: Duration,
364 pub header_partial_writes: u64,
366 pub payload_write_calls: u64,
368 pub payload_write_bytes: u64,
370 pub payload_max_write_bytes: usize,
372 pub payload_write_elapsed: Duration,
374 pub payload_max_write_elapsed: Duration,
376 pub payload_partial_writes: u64,
378 pub poll_write_polls: u64,
380 pub poll_write_pending_count: u64,
382 pub poll_write_pending_elapsed: Duration,
384 pub poll_write_max_pending_elapsed: Duration,
386 pub poll_write_ready_count: u64,
388 pub poll_write_ready_elapsed: Duration,
390 pub poll_write_max_ready_elapsed: Duration,
392 pub flush_calls: u64,
394 pub flush_elapsed: Duration,
396 pub max_flush_elapsed: Duration,
398 pub flush_pending_count: u64,
400 pub flush_pending_elapsed: Duration,
402 pub flush_max_pending_elapsed: Duration,
404}
405
406#[cfg(feature = "bulk-load-profile")]
407impl BulkLoadDirectPacketWriteStats {
408 fn record_timing(&mut self, timing: DirectPacketWriteTiming, final_packet: bool) {
409 self.record_packet(timing.payload_bytes, timing.header_bytes, final_packet);
410 self.record_stream_mode(timing.raw_stream, timing.tls_stream);
411 self.record_write_summary(
412 timing.write_calls,
413 timing.write_bytes,
414 timing.max_write_bytes,
415 timing.write_elapsed,
416 timing.max_write_elapsed,
417 );
418 self.record_header_write_summary(
419 timing.header_write_calls,
420 timing.header_write_bytes,
421 timing.header_max_write_bytes,
422 timing.header_write_elapsed,
423 timing.header_max_write_elapsed,
424 timing.header_partial_writes,
425 );
426 self.record_payload_write_summary(
427 timing.payload_write_calls,
428 timing.payload_write_bytes,
429 timing.payload_max_write_bytes,
430 timing.payload_write_elapsed,
431 timing.payload_max_write_elapsed,
432 timing.payload_partial_writes,
433 );
434 self.record_poll_write_summary(timing.poll_write_summary());
435 self.record_flush(
436 timing.flush_elapsed,
437 timing.flush_pending_count,
438 timing.flush_pending_elapsed,
439 timing.flush_max_pending_elapsed,
440 );
441 }
442
443 fn record_packet(&mut self, payload_bytes: usize, header_bytes: usize, final_packet: bool) {
444 self.calls = self.calls.saturating_add(1);
445 self.payload_bytes = self
446 .payload_bytes
447 .saturating_add(usize_to_u64_saturating(payload_bytes));
448 self.header_bytes = self
449 .header_bytes
450 .saturating_add(usize_to_u64_saturating(header_bytes));
451 self.max_payload_bytes = self.max_payload_bytes.max(payload_bytes);
452
453 if final_packet {
454 self.final_calls = self.final_calls.saturating_add(1);
455 self.final_payload_bytes = self
456 .final_payload_bytes
457 .saturating_add(usize_to_u64_saturating(payload_bytes));
458 self.final_header_bytes = self
459 .final_header_bytes
460 .saturating_add(usize_to_u64_saturating(header_bytes));
461 }
462 }
463
464 fn record_stream_mode(&mut self, raw_stream: bool, tls_stream: bool) {
465 if raw_stream {
466 self.raw_stream_calls = self.raw_stream_calls.saturating_add(1);
467 }
468 if tls_stream {
469 self.tls_stream_calls = self.tls_stream_calls.saturating_add(1);
470 }
471 }
472
473 fn record_write_summary(
474 &mut self,
475 calls: u64,
476 bytes: u64,
477 max_bytes: usize,
478 elapsed: Duration,
479 max_elapsed: Duration,
480 ) {
481 self.write_calls = self.write_calls.saturating_add(calls);
482 self.write_bytes = self.write_bytes.saturating_add(bytes);
483 self.max_write_bytes = self.max_write_bytes.max(max_bytes);
484 self.write_elapsed += elapsed;
485 self.max_write_elapsed = self.max_write_elapsed.max(max_elapsed);
486 }
487
488 fn record_header_write_summary(
489 &mut self,
490 calls: u64,
491 bytes: u64,
492 max_bytes: usize,
493 elapsed: Duration,
494 max_elapsed: Duration,
495 partial_writes: u64,
496 ) {
497 self.header_write_calls = self.header_write_calls.saturating_add(calls);
498 self.header_write_bytes = self.header_write_bytes.saturating_add(bytes);
499 self.header_max_write_bytes = self.header_max_write_bytes.max(max_bytes);
500 self.header_write_elapsed += elapsed;
501 self.header_max_write_elapsed = self.header_max_write_elapsed.max(max_elapsed);
502 self.header_partial_writes = self.header_partial_writes.saturating_add(partial_writes);
503 }
504
505 fn record_payload_write_summary(
506 &mut self,
507 calls: u64,
508 bytes: u64,
509 max_bytes: usize,
510 elapsed: Duration,
511 max_elapsed: Duration,
512 partial_writes: u64,
513 ) {
514 self.payload_write_calls = self.payload_write_calls.saturating_add(calls);
515 self.payload_write_bytes = self.payload_write_bytes.saturating_add(bytes);
516 self.payload_max_write_bytes = self.payload_max_write_bytes.max(max_bytes);
517 self.payload_write_elapsed += elapsed;
518 self.payload_max_write_elapsed = self.payload_max_write_elapsed.max(max_elapsed);
519 self.payload_partial_writes = self.payload_partial_writes.saturating_add(partial_writes);
520 }
521
522 fn record_poll_write_summary(&mut self, summary: DirectPacketPollWriteSummary) {
523 self.poll_write_polls = self.poll_write_polls.saturating_add(summary.polls);
524 self.poll_write_pending_count = self
525 .poll_write_pending_count
526 .saturating_add(summary.pending_count);
527 self.poll_write_pending_elapsed += summary.pending_elapsed;
528 self.poll_write_max_pending_elapsed = self
529 .poll_write_max_pending_elapsed
530 .max(summary.max_pending_elapsed);
531 self.poll_write_ready_count = self
532 .poll_write_ready_count
533 .saturating_add(summary.ready_count);
534 self.poll_write_ready_elapsed += summary.ready_elapsed;
535 self.poll_write_max_ready_elapsed = self
536 .poll_write_max_ready_elapsed
537 .max(summary.max_ready_elapsed);
538 }
539
540 fn record_flush(
541 &mut self,
542 elapsed: Duration,
543 pending_count: u64,
544 pending_elapsed: Duration,
545 max_pending_elapsed: Duration,
546 ) {
547 self.flush_calls = self.flush_calls.saturating_add(1);
548 self.flush_elapsed += elapsed;
549 self.max_flush_elapsed = self.max_flush_elapsed.max(elapsed);
550 self.flush_pending_count = self.flush_pending_count.saturating_add(pending_count);
551 self.flush_pending_elapsed += pending_elapsed;
552 self.flush_max_pending_elapsed = self.flush_max_pending_elapsed.max(max_pending_elapsed);
553 }
554}
555
556#[cfg(feature = "bulk-load-profile")]
558#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
559pub struct BulkLoadStats {
560 pub packet: BulkLoadPacketStats,
562 pub write_timing: BulkLoadWriteTimingStats,
564}
565
566#[derive(Debug)]
568pub struct BulkLoadRequest<'a, S>
569where
570 S: AsyncRead + AsyncWrite + Unpin + Send,
571{
572 connection: &'a mut Connection<S>,
573 packet_id: u8,
574 buf: BytesMut,
575 columns: Vec<MetaDataColumn<'a>>,
576 #[cfg(feature = "bulk-load-profile")]
577 packet_stats: BulkLoadPacketStats,
578 #[cfg(feature = "bulk-load-profile")]
579 write_timing_stats: BulkLoadWriteTimingStats,
580 direct_packet_writes: bool,
581}
582
583fn replace_with_direct_packet_buffer(buf: &mut BytesMut) {
591 let payload = buf.split();
592 let mut packet_buf = BytesMut::with_capacity(HEADER_BYTES.saturating_add(payload.len()));
593 packet_buf.resize(HEADER_BYTES, 0);
594 packet_buf.extend_from_slice(&payload);
595 *buf = packet_buf;
596}
597
598fn direct_packet_payload_len(buf: &BytesMut) -> usize {
599 buf.len().saturating_sub(HEADER_BYTES)
600}
601
602fn encode_direct_packet_header(
603 buf: &mut BytesMut,
604 packet_id: u8,
605 header_start: usize,
606 payload_len: usize,
607 status: PacketStatus,
608) -> crate::Result<()> {
609 let mut header = PacketHeader::bulk_load(packet_id);
613 header.set_status(status);
614 let header_end = header_start.saturating_add(HEADER_BYTES);
615 let mut header_buf = &mut buf[header_start..header_end];
616 header.encode_for_payload(payload_len, &mut header_buf)
617}
618
619impl<'a, S> BulkLoadRequest<'a, S>
620where
621 S: AsyncRead + AsyncWrite + Unpin + Send,
622{
623 pub(crate) fn new(
624 connection: &'a mut Connection<S>,
625 columns: Vec<MetaDataColumn<'a>>,
626 ) -> crate::Result<Self> {
627 let packet_id = connection.context_mut().next_packet_id();
628 let mut buf = BytesMut::new();
629
630 let cmd = TokenColMetaData {
631 columns: columns.clone(),
632 };
633
634 cmd.encode(&mut buf)?;
635
636 let this = Self {
637 connection,
638 packet_id,
639 buf,
640 columns,
641 #[cfg(feature = "bulk-load-profile")]
642 packet_stats: BulkLoadPacketStats::default(),
643 #[cfg(feature = "bulk-load-profile")]
644 write_timing_stats: BulkLoadWriteTimingStats::default(),
645 direct_packet_writes: false,
646 };
647
648 Ok(this)
649 }
650
651 pub fn columns(&self) -> impl ExactSizeIterator<Item = BulkLoadColumn<'_>> {
661 bulk_load_columns(&self.columns)
662 }
663
664 #[cfg(feature = "bulk-load-profile")]
666 pub fn packet_stats(&self) -> BulkLoadPacketStats {
667 self.packet_stats
668 }
669
670 #[cfg(feature = "bulk-load-profile")]
672 pub fn write_timing_stats(&self) -> BulkLoadWriteTimingStats {
673 self.write_timing_stats
674 }
675
676 #[cfg(feature = "bulk-load-profile")]
678 pub fn stats(&self) -> BulkLoadStats {
679 BulkLoadStats {
680 packet: self.packet_stats,
681 write_timing: self.write_timing_stats,
682 }
683 }
684
685 pub fn enable_direct_packet_writes(&mut self) {
699 if !self.direct_packet_writes {
700 replace_with_direct_packet_buffer(&mut self.buf);
701 self.direct_packet_writes = true;
702 }
703 }
704
705 pub fn direct_packet_writes_enabled(&self) -> bool {
707 self.direct_packet_writes
708 }
709
710 fn direct_payload_len(&self) -> usize {
711 direct_packet_payload_len(&self.buf)
712 }
713
714 fn encode_direct_packet_header_at(
715 &mut self,
716 header_start: usize,
717 payload_len: usize,
718 status: PacketStatus,
719 ) -> crate::Result<()> {
720 encode_direct_packet_header(
721 &mut self.buf,
722 self.packet_id,
723 header_start,
724 payload_len,
725 status,
726 )
727 }
728
729 pub async fn send(&mut self, row: TokenRow<'a>) -> crate::Result<()> {
738 let mut buf_with_columns = BytesMutWithDataColumns::new(&mut self.buf, &self.columns);
739
740 row.encode(&mut buf_with_columns)?;
741 self.write_packets().await?;
742
743 Ok(())
744 }
745
746 pub async fn send_raw_row_payload(&mut self, payload: impl AsRef<[u8]>) -> crate::Result<()> {
759 append_raw_row_payload(&mut self.buf, payload.as_ref())?;
760 self.write_packets().await?;
761
762 Ok(())
763 }
764
765 pub async fn send_raw_rows_payload(&mut self, payload: impl AsRef<[u8]>) -> crate::Result<()> {
781 append_raw_rows_payload(&mut self.buf, payload.as_ref())?;
782 self.write_packets().await?;
783
784 Ok(())
785 }
786
787 pub async fn send_raw_rows_payload_checked(
801 &mut self,
802 payload: impl AsRef<[u8]>,
803 row_token_offsets: impl AsRef<[usize]>,
804 ) -> crate::Result<()> {
805 append_raw_rows_payload_checked(
806 &mut self.buf,
807 payload.as_ref(),
808 row_token_offsets.as_ref(),
809 )?;
810 self.write_packets().await?;
811
812 Ok(())
813 }
814
815 pub async fn send_raw_rows_with<F>(&mut self, encode: F) -> crate::Result<()>
830 where
831 F: FnOnce(&mut RawRowsAppendBuffer<'_>) -> crate::Result<RawRowsAppend>,
832 {
833 append_raw_rows_with(&mut self.buf, encode)?;
834 self.write_packets().await?;
835
836 Ok(())
837 }
838
839 #[cfg(feature = "bulk-load-profile")]
845 pub async fn finalize(self) -> crate::Result<ExecuteResult> {
846 let (result, _) = self.finalize_with_stats().await?;
847 Ok(result)
848 }
849
850 #[cfg(not(feature = "bulk-load-profile"))]
856 pub async fn finalize(mut self) -> crate::Result<ExecuteResult> {
857 TokenDone::default().encode(&mut self.buf)?;
858 self.write_packets().await?;
859
860 let data = self.buf.split();
861 let data_len = if self.direct_packet_writes {
862 data.len().saturating_sub(HEADER_BYTES)
863 } else {
864 data.len()
865 };
866
867 event!(
868 Level::TRACE,
869 "Finalizing a bulk insert ({} bytes)",
870 data_len + HEADER_BYTES,
871 );
872
873 if self.direct_packet_writes {
874 let mut data = data;
875 let mut header = PacketHeader::bulk_load(self.packet_id);
876 header.set_status(PacketStatus::EndOfMessage);
877 let mut header_buf = &mut data[..HEADER_BYTES];
878 header.encode_for_payload(data_len, &mut header_buf)?;
879 self.connection.write_direct_packet_buffer(&data).await?;
880 } else {
881 let mut header = PacketHeader::bulk_load(self.packet_id);
882 header.set_status(PacketStatus::EndOfMessage);
883 self.connection.write_to_wire(header, data).await?;
884 }
885
886 self.connection.flush_sink().await?;
887 ExecuteResult::new(self.connection).await
888 }
889
890 #[cfg(feature = "bulk-load-profile")]
898 pub async fn finalize_with_packet_stats(
899 self,
900 ) -> crate::Result<(ExecuteResult, BulkLoadPacketStats)> {
901 let (result, stats) = self.finalize_with_stats().await?;
902 Ok((result, stats.packet))
903 }
904
905 #[cfg(feature = "bulk-load-profile")]
913 pub async fn finalize_with_stats(mut self) -> crate::Result<(ExecuteResult, BulkLoadStats)> {
914 let finalize_start = Instant::now();
915 TokenDone::default().encode(&mut self.buf)?;
916 self.write_packets().await?;
917
918 let data = self.buf.split();
919 let data_len = if self.direct_packet_writes {
920 data.len().saturating_sub(HEADER_BYTES)
921 } else {
922 data.len()
923 };
924 self.packet_stats.record_finalized_packet(data_len);
925
926 event!(
927 Level::TRACE,
928 "Finalizing a bulk insert ({} bytes)",
929 data_len + HEADER_BYTES,
930 );
931
932 let write_start = Instant::now();
933 if self.direct_packet_writes {
934 let mut data = data;
935 let mut header = PacketHeader::bulk_load(self.packet_id);
936 header.set_status(PacketStatus::EndOfMessage);
937 let mut header_buf = &mut data[..HEADER_BYTES];
938 header.encode_for_payload(data_len, &mut header_buf)?;
939 let write_result = self
940 .connection
941 .write_direct_packet_buffer_with_timing(&data)
942 .await;
943 let write_elapsed = write_start.elapsed();
944 self.write_timing_stats
945 .record_write_to_wire(write_elapsed, data_len);
946 match write_result {
947 Ok(direct_timing) => {
948 self.write_timing_stats
949 .record_direct_final_packet_write(direct_timing);
950 self.write_timing_stats
951 .record_finalize_write_to_wire_elapsed(write_elapsed);
952 }
953 Err(err) => return Err(err),
954 }
955 } else {
956 let mut header = PacketHeader::bulk_load(self.packet_id);
957 header.set_status(PacketStatus::EndOfMessage);
958 let write_result = self
959 .connection
960 .write_to_wire_with_timing(header, data)
961 .await;
962 let write_elapsed = write_start.elapsed();
963 self.write_timing_stats
964 .record_write_to_wire(write_elapsed, data_len);
965 match write_result {
966 Ok(connection_timing) => {
967 self.write_timing_stats.record_connection_write(
968 data_len,
969 connection_timing.ready_elapsed,
970 connection_timing.encode_elapsed,
971 connection_timing.flush_elapsed,
972 );
973 self.write_timing_stats
974 .record_finalize_write_to_wire_elapsed(write_elapsed);
975 }
976 Err(err) => return Err(err),
977 }
978 }
979
980 let flush_start = Instant::now();
981 let flush_result = self.connection.flush_sink().await;
982 let flush_elapsed = flush_start.elapsed();
983 self.write_timing_stats.record_flush(flush_elapsed);
984 self.write_timing_stats
985 .record_finalize_flush_elapsed(flush_elapsed);
986 flush_result?;
987
988 let result_start = Instant::now();
989 let result = ExecuteResult::new(self.connection).await?;
990 self.write_timing_stats
991 .record_finalize_result_elapsed(result_start.elapsed());
992 self.write_timing_stats
993 .record_finalize_elapsed(finalize_start.elapsed());
994 let stats = self.stats();
995
996 Ok((result, stats))
997 }
998
999 async fn write_packets(&mut self) -> crate::Result<()> {
1000 #[cfg(feature = "bulk-load-profile")]
1001 {
1002 let write_packets_start = Instant::now();
1003 let result = if self.direct_packet_writes {
1004 self.write_packets_direct_inner().await
1005 } else {
1006 self.write_packets_framed_inner().await
1007 };
1008 self.write_timing_stats
1009 .record_write_packets_elapsed(write_packets_start.elapsed());
1010 result
1011 }
1012
1013 #[cfg(not(feature = "bulk-load-profile"))]
1014 {
1015 if self.direct_packet_writes {
1016 self.write_packets_direct_inner().await
1017 } else {
1018 self.write_packets_framed_inner().await
1019 }
1020 }
1021 }
1022
1023 async fn write_packets_framed_inner(&mut self) -> crate::Result<()> {
1024 let packet_size = (self.connection.context().packet_size() as usize) - HEADER_BYTES;
1025 #[cfg(feature = "bulk-load-profile")]
1026 self.packet_stats.record_write_packets_call(self.buf.len());
1027
1028 while self.buf.len() > packet_size {
1029 let header = PacketHeader::bulk_load(self.packet_id);
1030 let data = self.buf.split_to(packet_size);
1031 #[cfg(feature = "bulk-load-profile")]
1032 self.packet_stats.record_packet_written(data.len());
1033
1034 event!(
1035 Level::TRACE,
1036 "Bulk insert packet ({} bytes)",
1037 data.len() + HEADER_BYTES,
1038 );
1039
1040 #[cfg(feature = "bulk-load-profile")]
1041 {
1042 let data_len = data.len();
1043 let write_start = Instant::now();
1044 let write_result = self
1045 .connection
1046 .write_to_wire_with_timing(header, data)
1047 .await;
1048 self.write_timing_stats
1049 .record_write_to_wire(write_start.elapsed(), data_len);
1050 match write_result {
1051 Ok(connection_timing) => {
1052 self.write_timing_stats.record_connection_write(
1053 data_len,
1054 connection_timing.ready_elapsed,
1055 connection_timing.encode_elapsed,
1056 connection_timing.flush_elapsed,
1057 );
1058 }
1059 Err(err) => return Err(err),
1060 }
1061 }
1062 #[cfg(not(feature = "bulk-load-profile"))]
1063 self.connection.write_to_wire(header, data).await?;
1064 }
1065
1066 #[cfg(feature = "bulk-load-profile")]
1067 self.packet_stats
1068 .record_buffered_bytes_after_write(self.buf.len());
1069
1070 Ok(())
1071 }
1072
1073 async fn write_packets_direct_inner(&mut self) -> crate::Result<()> {
1074 let packet_size = (self.connection.context().packet_size() as usize) - HEADER_BYTES;
1075 #[cfg(feature = "bulk-load-profile")]
1076 self.packet_stats
1077 .record_write_packets_call(self.direct_payload_len());
1078
1079 let mut payload_start = HEADER_BYTES;
1080 let mut payload_remaining = self.direct_payload_len();
1081 let mut sent_packets = false;
1082
1083 while payload_remaining > packet_size {
1084 let header_start = payload_start - HEADER_BYTES;
1090 let packet_end = payload_start + packet_size;
1091 self.encode_direct_packet_header_at(
1092 header_start,
1093 packet_size,
1094 PacketStatus::NormalMessage,
1095 )?;
1096 #[cfg(feature = "bulk-load-profile")]
1097 self.packet_stats.record_packet_written(packet_size);
1098
1099 event!(
1100 Level::TRACE,
1101 "Bulk insert direct packet ({} bytes)",
1102 packet_size + HEADER_BYTES,
1103 );
1104
1105 #[cfg(feature = "bulk-load-profile")]
1106 {
1107 let write_start = Instant::now();
1108 let write_result = self
1109 .connection
1110 .write_direct_packet_buffer_with_timing(&self.buf[header_start..packet_end])
1111 .await;
1112 self.write_timing_stats
1113 .record_write_to_wire(write_start.elapsed(), packet_size);
1114 match write_result {
1115 Ok(direct_timing) => self
1116 .write_timing_stats
1117 .record_direct_packet_write(direct_timing),
1118 Err(err) => return Err(err),
1119 }
1120 }
1121 #[cfg(not(feature = "bulk-load-profile"))]
1122 self.connection
1123 .write_direct_packet_buffer(&self.buf[header_start..packet_end])
1124 .await?;
1125
1126 sent_packets = true;
1127 payload_start += packet_size;
1128 payload_remaining -= packet_size;
1129 }
1130
1131 if sent_packets {
1132 let remaining = self.buf.split_off(payload_start);
1137 self.buf.clear();
1138 self.buf = remaining;
1139 replace_with_direct_packet_buffer(&mut self.buf);
1140 }
1141
1142 #[cfg(feature = "bulk-load-profile")]
1143 self.packet_stats
1144 .record_buffered_bytes_after_write(self.direct_payload_len());
1145
1146 Ok(())
1147 }
1148}
1149
1150fn bulk_load_columns<'a>(
1151 columns: &'a [MetaDataColumn<'a>],
1152) -> impl ExactSizeIterator<Item = BulkLoadColumn<'a>> {
1153 columns
1154 .iter()
1155 .enumerate()
1156 .map(|(ordinal, column)| BulkLoadColumn { ordinal, column })
1157}
1158
1159fn append_raw_row_payload(buf: &mut BytesMut, payload: &[u8]) -> crate::Result<()> {
1160 if payload.is_empty() {
1161 return Err(crate::Error::BulkInput(
1162 "raw bulk row payload cannot be empty".into(),
1163 ));
1164 }
1165
1166 buf.put_u8(TokenType::Row as u8);
1167 buf.extend_from_slice(payload);
1168
1169 Ok(())
1170}
1171
1172fn append_raw_rows_payload(buf: &mut BytesMut, payload: &[u8]) -> crate::Result<()> {
1173 if payload.is_empty() {
1174 return Err(crate::Error::BulkInput(
1175 "raw bulk rows payload cannot be empty".into(),
1176 ));
1177 }
1178
1179 if payload[0] != TokenType::Row as u8 {
1180 return Err(crate::Error::BulkInput(
1181 "raw bulk rows payload must start with a TDS ROW token".into(),
1182 ));
1183 }
1184
1185 buf.extend_from_slice(payload);
1186
1187 Ok(())
1188}
1189
1190fn append_raw_rows_payload_checked(
1191 buf: &mut BytesMut,
1192 payload: &[u8],
1193 row_token_offsets: &[usize],
1194) -> crate::Result<()> {
1195 validate_raw_row_token_offsets(payload, row_token_offsets)?;
1196 buf.extend_from_slice(payload);
1197
1198 Ok(())
1199}
1200
1201fn append_raw_rows_with<F>(buf: &mut BytesMut, encode: F) -> crate::Result<()>
1213where
1214 F: FnOnce(&mut RawRowsAppendBuffer<'_>) -> crate::Result<RawRowsAppend>,
1215{
1216 let start_len = buf.len();
1218 let mut raw_buf = RawRowsAppendBuffer { bytes: buf };
1219
1220 let append = match encode(&mut raw_buf) {
1222 Ok(append) => append,
1223 Err(err) => {
1224 raw_buf.bytes.truncate(start_len);
1225 return Err(err);
1226 }
1227 };
1228
1229 if let Err(err) =
1232 validate_raw_row_token_offsets(&raw_buf.bytes[start_len..], append.row_token_offsets())
1233 {
1234 raw_buf.bytes.truncate(start_len);
1235 return Err(err);
1236 }
1237
1238 Ok(())
1239}
1240
1241#[cfg(feature = "bulk-load-profile")]
1242fn usize_to_u64_saturating(value: usize) -> u64 {
1243 u64::try_from(value).unwrap_or(u64::MAX)
1244}
1245
1246fn validate_raw_row_token_offsets(
1247 payload: &[u8],
1248 row_token_offsets: &[usize],
1249) -> crate::Result<()> {
1250 if payload.is_empty() {
1251 return Err(crate::Error::BulkInput(
1252 "raw bulk rows payload cannot be empty".into(),
1253 ));
1254 }
1255
1256 if row_token_offsets.is_empty() {
1257 return Err(crate::Error::BulkInput(
1258 "raw bulk row token offsets cannot be empty".into(),
1259 ));
1260 }
1261
1262 if row_token_offsets[0] != 0 {
1263 return Err(crate::Error::BulkInput(
1264 "raw bulk row token offsets must start at zero".into(),
1265 ));
1266 }
1267
1268 let mut previous = None;
1269
1270 for &offset in row_token_offsets {
1271 if offset >= payload.len() {
1272 return Err(crate::Error::BulkInput(
1273 "raw bulk row token offset is out of bounds".into(),
1274 ));
1275 }
1276
1277 if previous.is_some_and(|previous| offset <= previous) {
1278 return Err(crate::Error::BulkInput(
1279 "raw bulk row token offsets must be strictly increasing".into(),
1280 ));
1281 }
1282
1283 if payload[offset] != TokenType::Row as u8 {
1284 return Err(crate::Error::BulkInput(
1285 "raw bulk row token offset must point to a TDS ROW token".into(),
1286 ));
1287 }
1288
1289 previous = Some(offset);
1290 }
1291
1292 Ok(())
1293}
1294
1295#[derive(Debug, Clone, Copy)]
1297pub struct BulkLoadColumn<'a> {
1298 ordinal: usize,
1299 column: &'a MetaDataColumn<'a>,
1300}
1301
1302impl BulkLoadColumn<'_> {
1303 pub fn ordinal(&self) -> usize {
1305 self.ordinal
1306 }
1307
1308 pub fn name(&self) -> &str {
1310 &self.column.col_name
1311 }
1312
1313 pub fn column_type(&self) -> ColumnType {
1315 ColumnType::from(&self.column.base.ty)
1316 }
1317
1318 pub fn flags(&self) -> BitFlags<ColumnFlag> {
1320 self.column.base.flags
1321 }
1322
1323 pub fn is_nullable(&self) -> bool {
1325 self.flags().contains(ColumnFlag::Nullable)
1326 }
1327
1328 pub fn is_updateable(&self) -> bool {
1330 self.flags().contains(ColumnFlag::Updateable)
1331 }
1332
1333 pub fn type_info(&self) -> &TypeInfo {
1335 &self.column.base.ty
1336 }
1337}
1338
1339#[cfg(test)]
1340mod tests {
1341 use std::borrow::Cow;
1342
1343 use super::*;
1344 use crate::tds::codec::{BaseMetaDataColumn, Decode, FixedLenType};
1345
1346 #[test]
1347 fn exposes_bulk_load_column_metadata() {
1348 let metadata = MetaDataColumn {
1349 base: BaseMetaDataColumn {
1350 flags: ColumnFlag::Nullable | ColumnFlag::Updateable,
1351 ty: TypeInfo::FixedLen(FixedLenType::Int4),
1352 },
1353 col_name: Cow::Borrowed("value"),
1354 };
1355
1356 let column = BulkLoadColumn {
1357 ordinal: 2,
1358 column: &metadata,
1359 };
1360
1361 assert_eq!(2, column.ordinal());
1362 assert_eq!("value", column.name());
1363 assert_eq!(ColumnType::Int4, column.column_type());
1364 assert!(column.is_nullable());
1365 assert!(column.is_updateable());
1366 assert!(column.flags().contains(ColumnFlag::Nullable));
1367 assert_eq!(&TypeInfo::FixedLen(FixedLenType::Int4), column.type_info());
1368 }
1369
1370 #[test]
1371 fn direct_packet_buffer_reserves_header_prefix() {
1372 let mut buf = BytesMut::from(&b"payload"[..]);
1373
1374 replace_with_direct_packet_buffer(&mut buf);
1375
1376 assert_eq!(direct_packet_payload_len(&buf), 7);
1377 assert_eq!(&buf[..HEADER_BYTES], &[0_u8; HEADER_BYTES]);
1378 assert_eq!(&buf[HEADER_BYTES..], b"payload");
1379 }
1380
1381 #[test]
1382 fn direct_packet_header_overwrites_already_sent_bytes() {
1383 let packet_payload_len = 4;
1384 let mut buf = BytesMut::from(&[0_u8; HEADER_BYTES][..]);
1385 buf.extend_from_slice(b"aaaabbbbcc");
1386
1387 encode_direct_packet_header(
1388 &mut buf,
1389 7,
1390 0,
1391 packet_payload_len,
1392 PacketStatus::NormalMessage,
1393 )
1394 .expect("first packet header should encode");
1395 encode_direct_packet_header(
1396 &mut buf,
1397 7,
1398 packet_payload_len,
1399 packet_payload_len,
1400 PacketStatus::NormalMessage,
1401 )
1402 .expect("second packet header should encode over previous bytes");
1403
1404 let second_header_start = packet_payload_len;
1405 let second_payload_start = second_header_start + HEADER_BYTES;
1406 let header = PacketHeader::decode(&mut BytesMut::from(
1407 &buf[second_header_start..second_payload_start],
1408 ))
1409 .expect("overwritten bytes should decode as a packet header");
1410
1411 assert_eq!(header.length(), (HEADER_BYTES + packet_payload_len) as u16);
1412 assert_eq!(header.status(), PacketStatus::NormalMessage);
1413 assert_eq!(
1414 &buf[second_payload_start..second_payload_start + packet_payload_len],
1415 b"bbbb"
1416 );
1417 }
1418
1419 #[test]
1420 #[cfg(feature = "bulk-load-profile")]
1421 fn bulk_load_packet_stats_default_to_zero() {
1422 let stats = BulkLoadPacketStats::default();
1423
1424 assert_eq!(stats.write_packets_calls, 0);
1425 assert_eq!(stats.packets_written, 0);
1426 assert_eq!(stats.packet_payload_bytes, 0);
1427 assert_eq!(stats.max_packet_payload_bytes, 0);
1428 assert_eq!(stats.max_buffered_bytes_before_write, 0);
1429 assert_eq!(stats.buffered_bytes_after_last_write, 0);
1430 assert_eq!(stats.finalized_packet_payload_bytes, 0);
1431 }
1432
1433 #[test]
1434 #[cfg(feature = "bulk-load-profile")]
1435 fn bulk_load_packet_stats_record_write_attempts_and_buffer_tail() {
1436 let mut stats = BulkLoadPacketStats::default();
1437
1438 stats.record_write_packets_call(128);
1439 stats.record_buffered_bytes_after_write(17);
1440 stats.record_write_packets_call(64);
1441 stats.record_buffered_bytes_after_write(3);
1442
1443 assert_eq!(stats.write_packets_calls, 2);
1444 assert_eq!(stats.max_buffered_bytes_before_write, 128);
1445 assert_eq!(stats.buffered_bytes_after_last_write, 3);
1446 assert_eq!(stats.packets_written, 0);
1447 }
1448
1449 #[test]
1450 #[cfg(feature = "bulk-load-profile")]
1451 fn bulk_load_packet_stats_accumulate_packet_bytes_and_maxima() {
1452 let mut stats = BulkLoadPacketStats::default();
1453
1454 stats.record_packet_written(4);
1455 stats.record_packet_written(9);
1456 stats.record_packet_written(7);
1457
1458 assert_eq!(stats.packets_written, 3);
1459 assert_eq!(stats.packet_payload_bytes, 20);
1460 assert_eq!(stats.max_packet_payload_bytes, 9);
1461 }
1462
1463 #[test]
1464 #[cfg(feature = "bulk-load-profile")]
1465 fn bulk_load_packet_stats_record_final_packet_separately() {
1466 let mut stats = BulkLoadPacketStats::default();
1467
1468 stats.record_packet_written(4096);
1469 stats.record_finalized_packet(41);
1470
1471 assert_eq!(stats.packet_payload_bytes, 4096);
1472 assert_eq!(stats.finalized_packet_payload_bytes, 41);
1473 }
1474
1475 #[test]
1476 #[cfg(feature = "bulk-load-profile")]
1477 fn bulk_load_write_timing_stats_default_to_zero() {
1478 let stats = BulkLoadWriteTimingStats::default();
1479
1480 assert_eq!(stats.write_packets_elapsed, Duration::ZERO);
1481 assert_eq!(stats.write_to_wire_calls, 0);
1482 assert_eq!(stats.write_to_wire_elapsed, Duration::ZERO);
1483 assert_eq!(stats.write_to_wire_payload_bytes, 0);
1484 assert_eq!(stats.max_write_to_wire_elapsed, Duration::ZERO);
1485 assert_eq!(stats.max_write_to_wire_payload_bytes, 0);
1486 assert_eq!(stats.flush_calls, 0);
1487 assert_eq!(stats.flush_elapsed, Duration::ZERO);
1488 assert_eq!(stats.max_flush_elapsed, Duration::ZERO);
1489 assert_eq!(stats.finalize_elapsed, Duration::ZERO);
1490 assert_eq!(stats.finalize_write_to_wire_elapsed, Duration::ZERO);
1491 assert_eq!(stats.finalize_flush_elapsed, Duration::ZERO);
1492 assert_eq!(stats.finalize_result_elapsed, Duration::ZERO);
1493 assert_eq!(
1494 stats.connection_write,
1495 BulkLoadConnectionWriteStats::default()
1496 );
1497 assert_eq!(
1498 stats.direct_packet_write,
1499 BulkLoadDirectPacketWriteStats::default()
1500 );
1501 }
1502
1503 #[test]
1504 #[cfg(feature = "bulk-load-profile")]
1505 fn bulk_load_connection_write_stats_default_to_zero() {
1506 let stats = BulkLoadConnectionWriteStats::default();
1507
1508 assert_eq!(stats.calls, 0);
1509 assert_eq!(stats.payload_bytes, 0);
1510 assert_eq!(stats.ready_elapsed, Duration::ZERO);
1511 assert_eq!(stats.encode_elapsed, Duration::ZERO);
1512 assert_eq!(stats.flush_elapsed, Duration::ZERO);
1513 assert_eq!(stats.max_ready_elapsed, Duration::ZERO);
1514 assert_eq!(stats.max_encode_elapsed, Duration::ZERO);
1515 assert_eq!(stats.max_flush_elapsed, Duration::ZERO);
1516 assert_eq!(stats.max_payload_bytes, 0);
1517 }
1518
1519 #[test]
1520 #[cfg(feature = "bulk-load-profile")]
1521 fn bulk_load_connection_write_stats_accumulate_and_track_maxima() {
1522 let mut stats = BulkLoadConnectionWriteStats::default();
1523
1524 stats.record(
1525 128,
1526 Duration::from_millis(3),
1527 Duration::from_millis(5),
1528 Duration::from_millis(7),
1529 );
1530 stats.record(
1531 256,
1532 Duration::from_millis(11),
1533 Duration::from_millis(2),
1534 Duration::from_millis(13),
1535 );
1536
1537 assert_eq!(stats.calls, 2);
1538 assert_eq!(stats.payload_bytes, 384);
1539 assert_eq!(stats.ready_elapsed, Duration::from_millis(14));
1540 assert_eq!(stats.encode_elapsed, Duration::from_millis(7));
1541 assert_eq!(stats.flush_elapsed, Duration::from_millis(20));
1542 assert_eq!(stats.max_ready_elapsed, Duration::from_millis(11));
1543 assert_eq!(stats.max_encode_elapsed, Duration::from_millis(5));
1544 assert_eq!(stats.max_flush_elapsed, Duration::from_millis(13));
1545 assert_eq!(stats.max_payload_bytes, 256);
1546 }
1547
1548 #[test]
1549 #[cfg(feature = "bulk-load-profile")]
1550 fn bulk_load_direct_packet_write_stats_default_to_zero() {
1551 let stats = BulkLoadDirectPacketWriteStats::default();
1552
1553 assert_eq!(stats.calls, 0);
1554 assert_eq!(stats.payload_bytes, 0);
1555 assert_eq!(stats.header_bytes, 0);
1556 assert_eq!(stats.max_payload_bytes, 0);
1557 assert_eq!(stats.final_calls, 0);
1558 assert_eq!(stats.final_payload_bytes, 0);
1559 assert_eq!(stats.final_header_bytes, 0);
1560 assert_eq!(stats.raw_stream_calls, 0);
1561 assert_eq!(stats.tls_stream_calls, 0);
1562 assert_eq!(stats.write_calls, 0);
1563 assert_eq!(stats.write_bytes, 0);
1564 assert_eq!(stats.max_write_bytes, 0);
1565 assert_eq!(stats.write_elapsed, Duration::ZERO);
1566 assert_eq!(stats.max_write_elapsed, Duration::ZERO);
1567 assert_eq!(stats.header_write_calls, 0);
1568 assert_eq!(stats.header_write_bytes, 0);
1569 assert_eq!(stats.header_max_write_bytes, 0);
1570 assert_eq!(stats.header_write_elapsed, Duration::ZERO);
1571 assert_eq!(stats.header_max_write_elapsed, Duration::ZERO);
1572 assert_eq!(stats.header_partial_writes, 0);
1573 assert_eq!(stats.payload_write_calls, 0);
1574 assert_eq!(stats.payload_write_bytes, 0);
1575 assert_eq!(stats.payload_max_write_bytes, 0);
1576 assert_eq!(stats.payload_write_elapsed, Duration::ZERO);
1577 assert_eq!(stats.payload_max_write_elapsed, Duration::ZERO);
1578 assert_eq!(stats.payload_partial_writes, 0);
1579 assert_eq!(stats.poll_write_polls, 0);
1580 assert_eq!(stats.poll_write_pending_count, 0);
1581 assert_eq!(stats.poll_write_pending_elapsed, Duration::ZERO);
1582 assert_eq!(stats.poll_write_max_pending_elapsed, Duration::ZERO);
1583 assert_eq!(stats.poll_write_ready_count, 0);
1584 assert_eq!(stats.poll_write_ready_elapsed, Duration::ZERO);
1585 assert_eq!(stats.poll_write_max_ready_elapsed, Duration::ZERO);
1586 assert_eq!(stats.flush_calls, 0);
1587 assert_eq!(stats.flush_elapsed, Duration::ZERO);
1588 assert_eq!(stats.max_flush_elapsed, Duration::ZERO);
1589 assert_eq!(stats.flush_pending_count, 0);
1590 assert_eq!(stats.flush_pending_elapsed, Duration::ZERO);
1591 assert_eq!(stats.flush_max_pending_elapsed, Duration::ZERO);
1592 }
1593
1594 #[test]
1595 #[cfg(feature = "bulk-load-profile")]
1596 fn bulk_load_direct_packet_write_stats_accumulate_and_track_maxima() {
1597 let mut stats = BulkLoadDirectPacketWriteStats::default();
1598
1599 stats.record_packet(128, HEADER_BYTES, false);
1600 stats.record_packet(256, HEADER_BYTES, true);
1601 stats.record_stream_mode(true, false);
1602 stats.record_stream_mode(false, true);
1603 stats.record_write_summary(
1604 2,
1605 576,
1606 512,
1607 Duration::from_millis(14),
1608 Duration::from_millis(11),
1609 );
1610 stats.record_header_write_summary(
1611 3,
1612 24,
1613 HEADER_BYTES,
1614 Duration::from_millis(17),
1615 Duration::from_millis(13),
1616 1,
1617 );
1618 stats.record_payload_write_summary(
1619 5,
1620 552,
1621 384,
1622 Duration::from_millis(19),
1623 Duration::from_millis(15),
1624 2,
1625 );
1626 stats.record_poll_write_summary(DirectPacketPollWriteSummary {
1627 polls: 11,
1628 pending_count: 7,
1629 pending_elapsed: Duration::from_millis(23),
1630 max_pending_elapsed: Duration::from_millis(17),
1631 ready_count: 4,
1632 ready_elapsed: Duration::from_millis(29),
1633 max_ready_elapsed: Duration::from_millis(19),
1634 });
1635 stats.record_flush(
1636 Duration::from_millis(5),
1637 2,
1638 Duration::from_millis(3),
1639 Duration::from_millis(2),
1640 );
1641 stats.record_flush(
1642 Duration::from_millis(7),
1643 3,
1644 Duration::from_millis(4),
1645 Duration::from_millis(3),
1646 );
1647
1648 assert_eq!(stats.calls, 2);
1649 assert_eq!(stats.payload_bytes, 384);
1650 assert_eq!(stats.header_bytes, u64::try_from(HEADER_BYTES * 2).unwrap());
1651 assert_eq!(stats.max_payload_bytes, 256);
1652 assert_eq!(stats.final_calls, 1);
1653 assert_eq!(stats.final_payload_bytes, 256);
1654 assert_eq!(
1655 stats.final_header_bytes,
1656 u64::try_from(HEADER_BYTES).unwrap()
1657 );
1658 assert_eq!(stats.raw_stream_calls, 1);
1659 assert_eq!(stats.tls_stream_calls, 1);
1660 assert_eq!(stats.write_calls, 2);
1661 assert_eq!(stats.write_bytes, 576);
1662 assert_eq!(stats.max_write_bytes, 512);
1663 assert_eq!(stats.write_elapsed, Duration::from_millis(14));
1664 assert_eq!(stats.max_write_elapsed, Duration::from_millis(11));
1665 assert_eq!(stats.header_write_calls, 3);
1666 assert_eq!(stats.header_write_bytes, 24);
1667 assert_eq!(stats.header_max_write_bytes, HEADER_BYTES);
1668 assert_eq!(stats.header_write_elapsed, Duration::from_millis(17));
1669 assert_eq!(stats.header_max_write_elapsed, Duration::from_millis(13));
1670 assert_eq!(stats.header_partial_writes, 1);
1671 assert_eq!(stats.payload_write_calls, 5);
1672 assert_eq!(stats.payload_write_bytes, 552);
1673 assert_eq!(stats.payload_max_write_bytes, 384);
1674 assert_eq!(stats.payload_write_elapsed, Duration::from_millis(19));
1675 assert_eq!(stats.payload_max_write_elapsed, Duration::from_millis(15));
1676 assert_eq!(stats.payload_partial_writes, 2);
1677 assert_eq!(stats.poll_write_polls, 11);
1678 assert_eq!(stats.poll_write_pending_count, 7);
1679 assert_eq!(stats.poll_write_pending_elapsed, Duration::from_millis(23));
1680 assert_eq!(
1681 stats.poll_write_max_pending_elapsed,
1682 Duration::from_millis(17)
1683 );
1684 assert_eq!(stats.poll_write_ready_count, 4);
1685 assert_eq!(stats.poll_write_ready_elapsed, Duration::from_millis(29));
1686 assert_eq!(
1687 stats.poll_write_max_ready_elapsed,
1688 Duration::from_millis(19)
1689 );
1690 assert_eq!(stats.flush_calls, 2);
1691 assert_eq!(stats.flush_elapsed, Duration::from_millis(12));
1692 assert_eq!(stats.max_flush_elapsed, Duration::from_millis(7));
1693 assert_eq!(stats.flush_pending_count, 5);
1694 assert_eq!(stats.flush_pending_elapsed, Duration::from_millis(7));
1695 assert_eq!(stats.flush_max_pending_elapsed, Duration::from_millis(3));
1696 }
1697
1698 #[test]
1699 #[cfg(feature = "bulk-load-profile")]
1700 fn bulk_load_write_timing_stats_accumulate_write_packets_elapsed() {
1701 let mut stats = BulkLoadWriteTimingStats::default();
1702
1703 stats.record_write_packets_elapsed(Duration::from_millis(7));
1704 stats.record_write_packets_elapsed(Duration::from_millis(11));
1705
1706 assert_eq!(stats.write_packets_elapsed, Duration::from_millis(18));
1707 }
1708
1709 #[test]
1710 #[cfg(feature = "bulk-load-profile")]
1711 fn bulk_load_write_timing_stats_accumulate_write_to_wire() {
1712 let mut stats = BulkLoadWriteTimingStats::default();
1713
1714 stats.record_write_to_wire(Duration::from_millis(13), 128);
1715 stats.record_write_to_wire(Duration::from_millis(17), 256);
1716
1717 assert_eq!(stats.write_to_wire_calls, 2);
1718 assert_eq!(stats.write_to_wire_elapsed, Duration::from_millis(30));
1719 assert_eq!(stats.write_to_wire_payload_bytes, 384);
1720 assert_eq!(stats.max_write_to_wire_elapsed, Duration::from_millis(17));
1721 assert_eq!(stats.max_write_to_wire_payload_bytes, 256);
1722 }
1723
1724 #[test]
1725 #[cfg(feature = "bulk-load-profile")]
1726 fn bulk_load_write_timing_stats_accumulate_flush_and_finalize() {
1727 let mut stats = BulkLoadWriteTimingStats::default();
1728
1729 stats.record_flush(Duration::from_millis(19));
1730 stats.record_flush(Duration::from_millis(23));
1731 stats.record_finalize_elapsed(Duration::from_millis(29));
1732 stats.record_finalize_elapsed(Duration::from_millis(31));
1733 stats.record_finalize_write_to_wire_elapsed(Duration::from_millis(37));
1734 stats.record_finalize_flush_elapsed(Duration::from_millis(41));
1735 stats.record_finalize_result_elapsed(Duration::from_millis(43));
1736 stats.record_connection_write(
1737 128,
1738 Duration::from_millis(47),
1739 Duration::from_millis(53),
1740 Duration::from_millis(59),
1741 );
1742
1743 assert_eq!(stats.flush_calls, 2);
1744 assert_eq!(stats.flush_elapsed, Duration::from_millis(42));
1745 assert_eq!(stats.max_flush_elapsed, Duration::from_millis(23));
1746 assert_eq!(stats.finalize_elapsed, Duration::from_millis(60));
1747 assert_eq!(
1748 stats.finalize_write_to_wire_elapsed,
1749 Duration::from_millis(37)
1750 );
1751 assert_eq!(stats.finalize_flush_elapsed, Duration::from_millis(41));
1752 assert_eq!(stats.finalize_result_elapsed, Duration::from_millis(43));
1753 assert_eq!(stats.connection_write.calls, 1);
1754 assert_eq!(stats.connection_write.payload_bytes, 128);
1755 assert_eq!(
1756 stats.connection_write.ready_elapsed,
1757 Duration::from_millis(47)
1758 );
1759 assert_eq!(
1760 stats.connection_write.encode_elapsed,
1761 Duration::from_millis(53)
1762 );
1763 assert_eq!(
1764 stats.connection_write.flush_elapsed,
1765 Duration::from_millis(59)
1766 );
1767 }
1768
1769 #[test]
1770 fn appends_single_raw_row_payload_with_row_token() {
1771 let mut buf = BytesMut::new();
1772
1773 append_raw_row_payload(&mut buf, &[0x01, 0x02, 0x03]).expect("payload should append");
1774
1775 assert_eq!(&[TokenType::Row as u8, 0x01, 0x02, 0x03], &buf[..]);
1776 }
1777
1778 #[test]
1779 fn rejects_empty_single_raw_row_payload() {
1780 let mut buf = BytesMut::new();
1781
1782 append_raw_row_payload(&mut buf, &[]).expect_err("empty payload should fail");
1783
1784 assert!(buf.is_empty());
1785 }
1786
1787 #[test]
1788 fn appends_batched_raw_rows_payload_unchanged() {
1789 let mut buf = BytesMut::new();
1790 let payload = [TokenType::Row as u8, 0x01, TokenType::Row as u8, 0x02, 0x03];
1791
1792 append_raw_rows_payload(&mut buf, &payload).expect("payload should append");
1793
1794 assert_eq!(&payload, &buf[..]);
1795 }
1796
1797 #[test]
1798 fn rejects_empty_batched_raw_rows_payload() {
1799 let mut buf = BytesMut::new();
1800
1801 append_raw_rows_payload(&mut buf, &[]).expect_err("empty payload should fail");
1802
1803 assert!(buf.is_empty());
1804 }
1805
1806 #[test]
1807 fn rejects_batched_raw_rows_payload_without_row_token() {
1808 let mut buf = BytesMut::new();
1809
1810 append_raw_rows_payload(&mut buf, &[0x01, 0x02])
1811 .expect_err("payload without row token should fail");
1812
1813 assert!(buf.is_empty());
1814 }
1815
1816 #[test]
1817 fn appends_checked_batched_raw_rows_payload_unchanged() {
1818 let mut buf = BytesMut::new();
1819 let payload = [TokenType::Row as u8, 0x01, TokenType::Row as u8, 0x02, 0x03];
1820
1821 append_raw_rows_payload_checked(&mut buf, &payload, &[0, 2])
1822 .expect("payload should append");
1823
1824 assert_eq!(&payload, &buf[..]);
1825 }
1826
1827 #[test]
1828 fn appends_raw_rows_with_relative_offsets_after_existing_bytes() {
1829 let mut buf = BytesMut::from(&b"prefix"[..]);
1830 let payload = [TokenType::Row as u8, 0x01, TokenType::Row as u8, 0x02, 0x03];
1831
1832 append_raw_rows_with(&mut buf, |buf| {
1833 buf.extend_from_slice(&payload);
1834 Ok(RawRowsAppend::new(vec![0, 2]))
1835 })
1836 .expect("payload should append");
1837
1838 assert_eq!(&b"prefix"[..], &buf[..6]);
1839 assert_eq!(&payload, &buf[6..]);
1840 }
1841
1842 #[test]
1843 fn rolls_back_raw_rows_with_closure_error() {
1844 let mut buf = BytesMut::from(&b"prefix"[..]);
1845
1846 append_raw_rows_with(&mut buf, |buf| {
1847 buf.extend_from_slice(&[TokenType::Row as u8, 0x01]);
1848 Err(crate::Error::BulkInput(Cow::Borrowed(
1849 "fake append failure",
1850 )))
1851 })
1852 .expect_err("closure error should fail");
1853
1854 assert_eq!(&b"prefix"[..], &buf[..]);
1855 }
1856
1857 #[test]
1858 fn rolls_back_raw_rows_with_validation_error_after_append() {
1859 let mut buf = BytesMut::from(&b"prefix"[..]);
1860
1861 append_raw_rows_with(&mut buf, |buf| {
1862 buf.extend_from_slice(&[TokenType::Row as u8, 0x01]);
1863 Ok(RawRowsAppend::new(vec![1]))
1864 })
1865 .expect_err("validation error should fail");
1866
1867 assert_eq!(&b"prefix"[..], &buf[..]);
1868 }
1869
1870 #[test]
1871 fn rejects_empty_raw_rows_with_append() {
1872 let mut buf = BytesMut::new();
1873
1874 append_raw_rows_with(&mut buf, |_| Ok(RawRowsAppend::new(vec![0])))
1875 .expect_err("empty append should fail");
1876
1877 assert!(buf.is_empty());
1878 }
1879
1880 #[test]
1881 fn rejects_invalid_raw_rows_with_offsets_and_rolls_back() {
1882 let cases: &[(&[u8], &[usize])] = &[
1883 (&[TokenType::Row as u8], &[]),
1884 (&[0x00, TokenType::Row as u8], &[1]),
1885 (&[TokenType::Row as u8], &[0, 1]),
1886 (&[TokenType::Row as u8, 0x01], &[0, 0]),
1887 (&[TokenType::Row as u8, 0x01, 0x02], &[0, 1]),
1888 ];
1889
1890 for (payload, row_token_offsets) in cases {
1891 let mut buf = BytesMut::from(&b"prefix"[..]);
1892
1893 append_raw_rows_with(&mut buf, |buf| {
1894 buf.extend_from_slice(payload);
1895 Ok(RawRowsAppend::new(row_token_offsets.to_vec()))
1896 })
1897 .expect_err("invalid row offsets should fail");
1898
1899 assert_eq!(&b"prefix"[..], &buf[..]);
1900 }
1901 }
1902
1903 #[test]
1904 fn rejects_checked_batched_raw_rows_payload_with_empty_offsets() {
1905 let mut buf = BytesMut::new();
1906
1907 append_raw_rows_payload_checked(&mut buf, &[TokenType::Row as u8], &[])
1908 .expect_err("empty offsets should fail");
1909
1910 assert!(buf.is_empty());
1911 }
1912
1913 #[test]
1914 fn rejects_checked_batched_raw_rows_payload_with_nonzero_first_offset() {
1915 let mut buf = BytesMut::new();
1916
1917 append_raw_rows_payload_checked(&mut buf, &[0x00, TokenType::Row as u8], &[1])
1918 .expect_err("nonzero first offset should fail");
1919
1920 assert!(buf.is_empty());
1921 }
1922
1923 #[test]
1924 fn rejects_checked_batched_raw_rows_payload_with_out_of_bounds_offset() {
1925 let mut buf = BytesMut::new();
1926
1927 append_raw_rows_payload_checked(&mut buf, &[TokenType::Row as u8], &[0, 1])
1928 .expect_err("out of bounds offset should fail");
1929
1930 assert!(buf.is_empty());
1931 }
1932
1933 #[test]
1934 fn rejects_checked_batched_raw_rows_payload_with_non_increasing_offsets() {
1935 let mut buf = BytesMut::new();
1936
1937 append_raw_rows_payload_checked(&mut buf, &[TokenType::Row as u8, 0x01], &[0, 0])
1938 .expect_err("repeated offset should fail");
1939
1940 assert!(buf.is_empty());
1941 }
1942
1943 #[test]
1944 fn rejects_checked_batched_raw_rows_payload_with_offset_not_on_row_token() {
1945 let mut buf = BytesMut::new();
1946
1947 append_raw_rows_payload_checked(&mut buf, &[TokenType::Row as u8, 0x01, 0x02], &[0, 1])
1948 .expect_err("offset not on row token should fail");
1949
1950 assert!(buf.is_empty());
1951 }
1952}