1use crate::thin::{
17 encode_binary_double, encode_binary_float, encode_number_text, encode_oracle_date,
18 encode_oracle_timestamp, encode_oracle_timestamp_tz, parse_column_metadata,
19 parse_server_error_info, skip_server_side_piggyback, ClientCapabilities, ColumnMetadata,
20 CS_FORM_IMPLICIT, CS_FORM_NCHAR, ORA_TYPE_NUM_BINARY_DOUBLE, ORA_TYPE_NUM_BINARY_FLOAT,
21 ORA_TYPE_NUM_BINARY_INTEGER, ORA_TYPE_NUM_BLOB, ORA_TYPE_NUM_BOOLEAN, ORA_TYPE_NUM_CHAR,
22 ORA_TYPE_NUM_CLOB, ORA_TYPE_NUM_DATE, ORA_TYPE_NUM_LONG, ORA_TYPE_NUM_LONG_RAW,
23 ORA_TYPE_NUM_NUMBER, ORA_TYPE_NUM_RAW, ORA_TYPE_NUM_TIMESTAMP, ORA_TYPE_NUM_TIMESTAMP_LTZ,
24 ORA_TYPE_NUM_TIMESTAMP_TZ, ORA_TYPE_NUM_VARCHAR, TNS_MSG_TYPE_END_OF_RESPONSE,
25 TNS_MSG_TYPE_ERROR, TNS_MSG_TYPE_PARAMETER, TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK,
26 TNS_MSG_TYPE_STATUS,
27};
28use crate::wire::{BoundedReader, TtcReader, TtcWriter};
29use crate::{ProtocolError, Result};
30
31pub const TNS_FUNC_DIRECT_PATH_PREPARE: u8 = 128;
32pub const TNS_FUNC_DIRECT_PATH_LOAD_STREAM: u8 = 129;
33pub const TNS_FUNC_DIRECT_PATH_OP: u8 = 130;
34
35pub const TNS_DP_INTERFACE_VERSION: u32 = 400;
36pub const TNS_DP_STREAM_VERSION: u32 = 400;
37
38pub const TNS_DPP_OP_CODE_LOAD: u32 = 1;
39
40pub const TNS_DP_OP_ABORT: u32 = 1;
41pub const TNS_DP_OP_FINISH: u32 = 2;
42
43const TNS_DPP_IN_INDEX_INTERFACE_VERSION: usize = 0;
44const TNS_DPP_IN_INDEX_STREAM_VERSION: usize = 1;
45const TNS_DPP_IN_INDEX_LOCK_WAIT: usize = 14;
46const TNS_DPP_KW_INDEX_OBJECT_NAME: u16 = 1;
47const TNS_DPP_KW_INDEX_SCHEMA_NAME: u16 = 3;
48const TNS_DPP_KW_INDEX_COLUMN_NAME: u16 = 4;
49const TNS_DPP_KW_INDEX_NFOBJ_OID_POS: usize = 11;
50const TNS_DPP_OUT_INDEX_CURSOR: usize = 3;
51const TNS_DPP_IN_VALUES_SENT: usize = TNS_DPP_IN_INDEX_LOCK_WAIT + 1;
55
56pub const TNS_DPLS_ROW_HEADER_FAST_PIECE: u8 = 0x10;
57pub const TNS_DPLS_ROW_HEADER_FAST_ROW: u8 = 0x20;
58pub const TNS_DPLS_ROW_HEADER_FIRST: u8 = 0x08;
59pub const TNS_DPLS_ROW_HEADER_LAST: u8 = 0x04;
60pub const TNS_DPLS_ROW_HEADER_SPLIT_WITH_PREV: u8 = 0x02;
61pub const TNS_DPLS_ROW_HEADER_SPLIT_WITH_NEXT: u8 = 0x01;
62
63pub const TNS_DPLS_MAX_MESSAGE_SIZE: u64 = 1_073_728_895;
64pub const TNS_DPLS_MAX_SHORT_LENGTH: usize = 0xfa;
65pub const TNS_DPLS_MAX_PIECE_SIZE: usize = 0xfff0;
66
67const TNS_DPLS_LONG_LENGTH_INDICATOR: u8 = 0xfe;
68const TNS_NULL_LENGTH_INDICATOR: u8 = 0xff;
69
70pub fn build_direct_path_prepare_payload(
74 schema_name: &str,
75 table_name: &str,
76 column_names: &[String],
77 seq_num: u8,
78) -> Result<Vec<u8>> {
79 let keyword_parameters_length =
80 u32::try_from(column_names.len() + 2).map_err(|_| ProtocolError::InvalidPacketLength {
81 length: column_names.len(),
82 minimum: 0,
83 })?;
84
85 let mut in_values = [0u32; TNS_DPP_IN_VALUES_SENT];
86 in_values[TNS_DPP_IN_INDEX_INTERFACE_VERSION] = TNS_DP_INTERFACE_VERSION;
87 in_values[TNS_DPP_IN_INDEX_STREAM_VERSION] = TNS_DP_STREAM_VERSION;
88 in_values[TNS_DPP_KW_INDEX_NFOBJ_OID_POS] = 0xffff;
89 in_values[TNS_DPP_IN_INDEX_LOCK_WAIT] = 1;
90
91 let mut writer = TtcWriter::new();
92 writer.write_function_code_with_seq(TNS_FUNC_DIRECT_PATH_PREPARE, seq_num);
93 writer.write_ub8(0); writer.write_ub4(TNS_DPP_OP_CODE_LOAD);
95 writer.write_u8(1); writer.write_ub4(keyword_parameters_length);
97 writer.write_u8(1); writer.write_ub2(TNS_DPP_IN_VALUES_SENT as u16);
99 writer.write_u8(1); writer.write_u8(1); writer.write_u8(1); writer.write_u8(1); writer.write_u8(1); writer.write_u8(1); write_keyword_param(&mut writer, TNS_DPP_KW_INDEX_SCHEMA_NAME, schema_name)?;
106 write_keyword_param(&mut writer, TNS_DPP_KW_INDEX_OBJECT_NAME, table_name)?;
107 for name in column_names {
108 write_keyword_param(&mut writer, TNS_DPP_KW_INDEX_COLUMN_NAME, name)?;
109 }
110 for value in in_values {
111 writer.write_ub4(value);
112 }
113 Ok(writer.into_bytes())
114}
115
116fn write_keyword_param(writer: &mut TtcWriter, index: u16, value: &str) -> Result<()> {
117 let bytes = value.as_bytes();
118 let len = u16::try_from(bytes.len()).map_err(|_| ProtocolError::InvalidPacketLength {
119 length: bytes.len(),
120 minimum: 0,
121 })?;
122 writer.write_ub2(0); writer.write_ub2(len);
124 writer.write_bytes_with_length(bytes)?;
125 writer.write_ub2(index);
126 Ok(())
127}
128
129#[derive(Clone, Debug, Default, Eq, PartialEq)]
130pub struct DirectPathPrepareResult {
131 pub column_metadata: Vec<ColumnMetadata>,
132 pub cursor_id: u16,
133 pub out_values: Vec<u32>,
134}
135
136pub fn parse_direct_path_prepare_response(
143 payload: &[u8],
144 capabilities: ClientCapabilities,
145) -> Result<DirectPathPrepareResult> {
146 let mut reader = TtcReader::new(payload);
147 let mut result: Option<DirectPathPrepareResult> = None;
148 while reader.remaining() > 0 {
149 let message_type = reader.read_u8()?;
150 match message_type {
151 0 => {}
152 TNS_MSG_TYPE_PARAMETER => {
153 result = Some(parse_prepare_return_parameters(&mut reader, capabilities)?);
154 }
155 TNS_MSG_TYPE_STATUS => {
156 let _call_status = reader.read_ub4()?;
157 let _seq = reader.read_ub2()?;
158 }
159 TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
160 let _ = skip_server_side_piggyback(&mut reader)?;
161 }
162 TNS_MSG_TYPE_END_OF_RESPONSE => break,
163 TNS_MSG_TYPE_ERROR => {
164 let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
165 if info.number != 0 {
166 return Err(ProtocolError::ServerError(info.message));
167 }
168 }
169 _ => {
170 return Err(ProtocolError::UnknownMessageType {
171 message_type,
172 position: reader.position().saturating_sub(1),
173 })
174 }
175 }
176 }
177 result.ok_or(ProtocolError::TtcDecode(
178 "direct path prepare response did not contain return parameters",
179 ))
180}
181
182fn parse_prepare_return_parameters(
183 reader: &mut TtcReader<'_>,
184 capabilities: ClientCapabilities,
185) -> Result<DirectPathPrepareResult> {
186 let num_columns = reader.read_ub4()?;
187 let mut column_metadata: Vec<ColumnMetadata> =
191 reader.with_capacity_bounded(num_columns as usize, 1);
192 for _ in 0..num_columns {
193 let mut metadata = parse_column_metadata(reader, capabilities)?;
194 apply_direct_path_metadata_overrides(&mut metadata, capabilities.charset_id);
195 column_metadata.push(metadata);
196 }
197 let num_params = reader.read_ub2()?;
198 if num_params != 0 {
199 return Err(ProtocolError::TtcDecode(
200 "unexpected parameters in direct path prepare response",
201 ));
202 }
203 let out_values_length = reader.read_ub2()?;
204 let mut out_values: Vec<u32> = reader.with_capacity_bounded(usize::from(out_values_length), 1);
206 for _ in 0..out_values_length {
207 out_values.push(reader.read_ub4()?);
208 }
209 let cursor_id =
210 out_values
211 .get(TNS_DPP_OUT_INDEX_CURSOR)
212 .copied()
213 .ok_or(ProtocolError::TtcDecode(
214 "direct path prepare response missing cursor id",
215 ))?;
216 let cursor_id = u16::try_from(cursor_id)
217 .map_err(|_| ProtocolError::TtcDecode("direct path cursor id out of range"))?;
218 Ok(DirectPathPrepareResult {
219 column_metadata,
220 cursor_id,
221 out_values,
222 })
223}
224
225fn apply_direct_path_metadata_overrides(metadata: &mut ColumnMetadata, charset_id: u16) {
229 if metadata.ora_type_num == ORA_TYPE_NUM_CLOB {
230 if metadata.csfrm == CS_FORM_IMPLICIT && charset_id >= 800 {
231 metadata.csfrm = CS_FORM_NCHAR;
232 }
233 metadata.ora_type_num = ORA_TYPE_NUM_LONG;
234 } else if metadata.ora_type_num == ORA_TYPE_NUM_BLOB {
235 metadata.ora_type_num = ORA_TYPE_NUM_LONG_RAW;
236 metadata.csfrm = 0;
237 }
238}
239
240pub fn build_direct_path_op_payload(cursor_id: u16, op_code: u32, seq_num: u8) -> Vec<u8> {
245 let mut writer = TtcWriter::new();
246 writer.write_function_code_with_seq(TNS_FUNC_DIRECT_PATH_OP, seq_num);
247 writer.write_ub8(0); writer.write_ub4(op_code);
249 writer.write_ub2(cursor_id);
250 writer.write_u8(0); writer.write_ub4(0); writer.write_u8(1); writer.write_u8(1); writer.into_bytes()
255}
256
257pub fn parse_direct_path_simple_response(
260 payload: &[u8],
261 capabilities: ClientCapabilities,
262) -> Result<()> {
263 let mut reader = TtcReader::new(payload);
264 while reader.remaining() > 0 {
265 let message_type = reader.read_u8()?;
266 match message_type {
267 0 => {}
268 TNS_MSG_TYPE_PARAMETER => {
269 let num_out_values = reader.read_ub2()?;
270 for _ in 0..num_out_values {
271 let _value = reader.read_ub4()?;
272 }
273 }
274 TNS_MSG_TYPE_STATUS => {
275 let _call_status = reader.read_ub4()?;
276 let _seq = reader.read_ub2()?;
277 }
278 TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
279 let _ = skip_server_side_piggyback(&mut reader)?;
280 }
281 TNS_MSG_TYPE_END_OF_RESPONSE => break,
282 TNS_MSG_TYPE_ERROR => {
283 let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
284 if info.number != 0 {
285 return Err(ProtocolError::ServerError(info.message));
286 }
287 }
288 _ => {
289 return Err(ProtocolError::UnknownMessageType {
290 message_type,
291 position: reader.position().saturating_sub(1),
292 })
293 }
294 }
295 }
296 Ok(())
297}
298
299pub use parse_direct_path_simple_response as parse_direct_path_load_stream_response;
300pub use parse_direct_path_simple_response as parse_direct_path_op_response;
301
302#[derive(Clone, Debug, PartialEq)]
308pub enum DirectPathColumnValue {
309 Null,
310 Bytes(Vec<u8>),
311 Number(String),
312 BinaryDouble(f64),
313 BinaryFloat(f32),
314 DateTime {
315 year: i32,
316 month: u8,
317 day: u8,
318 hour: u8,
319 minute: u8,
320 second: u8,
321 nanosecond: u32,
322 },
323 Boolean(bool),
324}
325
326#[derive(Clone, Debug, Eq, PartialEq)]
329pub struct DirectPathPiece {
330 flags: u8,
331 num_segments: u8,
332 data: Vec<u8>,
333}
334
335impl DirectPathPiece {
336 pub fn flags(&self) -> u8 {
337 self.flags
338 }
339
340 pub fn num_segments(&self) -> u8 {
341 self.num_segments
342 }
343
344 pub fn data(&self) -> &[u8] {
345 &self.data
346 }
347
348 fn is_fast_row(&self) -> bool {
349 self.flags & TNS_DPLS_ROW_HEADER_FAST_ROW != 0
350 }
351
352 fn header_length(&self) -> u64 {
353 if self.is_fast_row() {
354 4
355 } else {
356 2
357 }
358 }
359
360 fn write_to(&self, writer: &mut TtcWriter) -> Result<()> {
361 writer.write_u8(self.flags);
362 if self.is_fast_row() {
363 let total = self.data.len() as u64 + self.header_length();
364 let total = u16::try_from(total).map_err(|_| {
365 ProtocolError::TtcDecode("direct path fast piece exceeds 16-bit length")
366 })?;
367 writer.write_u16be(total);
368 }
369 writer.write_u8(self.num_segments);
370 writer.write_raw(&self.data);
371 Ok(())
372 }
373}
374
375#[derive(Clone, Copy, Debug, Default)]
376struct PieceState {
377 is_first: bool,
378 is_last: bool,
379 is_split_with_prev: bool,
380 is_split_with_next: bool,
381 is_fast: bool,
382 num_segments: u16,
383}
384
385#[derive(Debug, Default)]
391pub struct DirectPathPieceBuffer {
392 pieces: Vec<DirectPathPiece>,
393 total_piece_length: u64,
394 data: Vec<u8>,
395 current: Option<PieceState>,
396}
397
398impl DirectPathPieceBuffer {
399 pub fn new() -> Self {
400 Self::default()
401 }
402
403 pub fn start_row(&mut self) -> Result<()> {
404 if self.current.is_some() {
405 return Err(ProtocolError::TtcDecode(
406 "direct path row started before previous row was finished",
407 ));
408 }
409 self.current = Some(PieceState {
410 is_first: true,
411 is_fast: true,
412 ..PieceState::default()
413 });
414 Ok(())
415 }
416
417 pub fn finish_row(&mut self) -> Result<()> {
418 let Some(state) = self.current.as_mut() else {
419 return Err(ProtocolError::TtcDecode(
420 "direct path row finished without being started",
421 ));
422 };
423 state.is_last = true;
424 self.finalize_piece()?;
425 self.current = None;
426 Ok(())
427 }
428
429 pub fn add_column_value(
430 &mut self,
431 metadata: &ColumnMetadata,
432 value: &DirectPathColumnValue,
433 row_num: u64,
434 ) -> Result<()> {
435 let Some(state) = self.current.as_mut() else {
436 return Err(ProtocolError::TtcDecode(
437 "direct path column value added outside of a row",
438 ));
439 };
440
441 if state.num_segments == 255 {
443 self.finalize_piece()?;
444 self.current = Some(PieceState::default());
445 }
446
447 if !is_fast_dbtype(metadata) {
448 if let Some(state) = self.current.as_mut() {
449 state.is_fast = false;
450 }
451 }
452
453 match value {
454 DirectPathColumnValue::Null => {
455 if !metadata.nulls_allowed {
456 return Err(ProtocolError::NullsNotAllowed {
457 column_name: metadata.name.clone(),
458 row_num,
459 });
460 }
461 self.write_u8_in_piece(TNS_NULL_LENGTH_INDICATOR)?;
462 self.bump_segments();
463 Ok(())
464 }
465 DirectPathColumnValue::Bytes(bytes) => {
466 if !matches!(
467 metadata.ora_type_num,
468 ORA_TYPE_NUM_VARCHAR
469 | ORA_TYPE_NUM_CHAR
470 | ORA_TYPE_NUM_LONG
471 | ORA_TYPE_NUM_RAW
472 | ORA_TYPE_NUM_LONG_RAW
473 ) {
474 return Err(ProtocolError::TtcDecode(
475 "direct path byte value sent for non-character column",
476 ));
477 }
478 if metadata.max_size > 0 && bytes.len() as u64 > u64::from(metadata.max_size) {
479 return Err(ProtocolError::ValueTooLarge {
480 actual_size: bytes.len(),
481 max_size: metadata.max_size,
482 column_name: metadata.name.clone(),
483 row_num,
484 });
485 }
486 self.write_raw_bytes_and_length(bytes)
487 }
488 DirectPathColumnValue::Number(text) => {
489 if !matches!(
490 metadata.ora_type_num,
491 ORA_TYPE_NUM_NUMBER | ORA_TYPE_NUM_BINARY_INTEGER
492 ) {
493 return Err(ProtocolError::TtcDecode(
494 "direct path number value sent for non-number column",
495 ));
496 }
497 let encoded = encode_number_text(text)?;
498 self.write_raw_bytes_and_length(&encoded)
499 }
500 DirectPathColumnValue::BinaryDouble(value) => {
501 if metadata.ora_type_num != ORA_TYPE_NUM_BINARY_DOUBLE {
502 return Err(ProtocolError::TtcDecode(
503 "direct path binary double sent for other column type",
504 ));
505 }
506 let encoded = encode_binary_double(*value);
507 self.write_raw_bytes_and_length(&encoded)
508 }
509 DirectPathColumnValue::BinaryFloat(value) => {
510 if metadata.ora_type_num != ORA_TYPE_NUM_BINARY_FLOAT {
511 return Err(ProtocolError::TtcDecode(
512 "direct path binary float sent for other column type",
513 ));
514 }
515 let encoded = encode_binary_float(*value);
516 self.write_raw_bytes_and_length(&encoded)
517 }
518 DirectPathColumnValue::DateTime {
519 year,
520 month,
521 day,
522 hour,
523 minute,
524 second,
525 nanosecond,
526 } => {
527 let encoded = match metadata.ora_type_num {
528 ORA_TYPE_NUM_DATE => {
529 if *nanosecond != 0 {
530 return Err(ProtocolError::TtcDecode(
531 "direct path DATE value has fractional seconds",
532 ));
533 }
534 encode_oracle_date(*year, *month, *day, *hour, *minute, *second)?.to_vec()
535 }
536 ORA_TYPE_NUM_TIMESTAMP | ORA_TYPE_NUM_TIMESTAMP_LTZ => encode_oracle_timestamp(
539 *year,
540 *month,
541 *day,
542 *hour,
543 *minute,
544 *second,
545 *nanosecond,
546 )?,
547 ORA_TYPE_NUM_TIMESTAMP_TZ => encode_oracle_timestamp_tz(
548 *year,
549 *month,
550 *day,
551 *hour,
552 *minute,
553 *second,
554 *nanosecond,
555 )?,
556 _ => {
557 return Err(ProtocolError::TtcDecode(
558 "direct path datetime sent for non-datetime column",
559 ))
560 }
561 };
562 self.write_raw_bytes_and_length(&encoded)
563 }
564 DirectPathColumnValue::Boolean(value) => {
565 if metadata.ora_type_num != ORA_TYPE_NUM_BOOLEAN {
566 return Err(ProtocolError::TtcDecode(
567 "direct path boolean sent for non-boolean column",
568 ));
569 }
570 let encoded: &[u8] = if *value { &[1, 1] } else { &[0] };
571 self.write_raw_bytes_and_length(encoded)
572 }
573 }
574 }
575
576 pub fn finish(self) -> Result<(Vec<DirectPathPiece>, u32)> {
579 if self.current.is_some() {
580 return Err(ProtocolError::TtcDecode(
581 "direct path stream finished mid-row",
582 ));
583 }
584 let total = u32::try_from(self.total_piece_length)
585 .map_err(|_| ProtocolError::DirectPathLoadTooMuchData)?;
586 Ok((self.pieces, total))
587 }
588
589 fn bump_segments(&mut self) {
590 if let Some(state) = self.current.as_mut() {
591 state.num_segments = state.num_segments.saturating_add(1);
592 }
593 }
594
595 fn space_left(&self) -> usize {
596 TNS_DPLS_MAX_PIECE_SIZE.saturating_sub(self.data.len())
597 }
598
599 fn write_u8_in_piece(&mut self, value: u8) -> Result<()> {
600 if self.space_left() < 1 {
601 self.finalize_piece()?;
602 self.current = Some(PieceState::default());
603 }
604 self.data.push(value);
605 Ok(())
606 }
607
608 fn write_raw_bytes_and_length(&mut self, bytes: &[u8]) -> Result<()> {
613 if bytes.len() <= TNS_DPLS_MAX_SHORT_LENGTH {
614 if bytes.len() + 1 > self.space_left() {
615 self.finalize_piece()?;
616 self.current = Some(PieceState::default());
617 }
618 self.data.push(bytes.len() as u8);
619 self.data.extend_from_slice(bytes);
620 self.bump_segments();
621 return Ok(());
622 }
623
624 let mut remaining = bytes;
625 while remaining.len() + 3 > self.space_left() {
626 if self.space_left() < 4 {
630 self.finalize_piece()?;
631 self.current = Some(PieceState::default());
632 continue;
633 }
634 let chunk_len = self.space_left() - 3;
635 let (chunk, rest) = remaining.split_at(chunk_len.min(remaining.len()));
636 self.data.push(TNS_DPLS_LONG_LENGTH_INDICATOR);
637 self.data
638 .extend_from_slice(&(chunk.len() as u16).to_be_bytes());
639 self.data.extend_from_slice(chunk);
640 remaining = rest;
641 if let Some(state) = self.current.as_mut() {
642 state.is_split_with_next = true;
643 }
644 self.bump_segments();
645 self.finalize_piece()?;
646 self.current = Some(PieceState {
647 is_split_with_prev: !remaining.is_empty(),
648 ..PieceState::default()
649 });
650 }
651 if !remaining.is_empty() {
652 self.bump_segments();
653 self.data.push(TNS_DPLS_LONG_LENGTH_INDICATOR);
654 self.data
655 .extend_from_slice(&(remaining.len() as u16).to_be_bytes());
656 self.data.extend_from_slice(remaining);
657 }
658 Ok(())
659 }
660
661 fn finalize_piece(&mut self) -> Result<()> {
662 let Some(state) = self.current.take() else {
663 return Err(ProtocolError::TtcDecode(
664 "direct path piece finalized without an active piece",
665 ));
666 };
667 let mut flags = 0u8;
668 if state.is_first {
669 flags |= TNS_DPLS_ROW_HEADER_FIRST;
670 } else if state.is_split_with_prev {
671 flags |= TNS_DPLS_ROW_HEADER_SPLIT_WITH_PREV;
672 }
673 if state.is_last {
674 flags |= TNS_DPLS_ROW_HEADER_LAST;
675 } else if state.is_split_with_next {
676 flags |= TNS_DPLS_ROW_HEADER_SPLIT_WITH_NEXT;
677 }
678 let is_fast_row = state.is_first && state.is_last && state.is_fast;
679 if is_fast_row {
680 flags |= TNS_DPLS_ROW_HEADER_FAST_ROW | TNS_DPLS_ROW_HEADER_FAST_PIECE;
681 }
682 let num_segments = u8::try_from(state.num_segments)
683 .map_err(|_| ProtocolError::TtcDecode("direct path piece segment count overflow"))?;
684 let piece = DirectPathPiece {
685 flags,
686 num_segments,
687 data: std::mem::take(&mut self.data),
688 };
689 let new_length = self.total_piece_length + piece.data.len() as u64 + piece.header_length();
690 if new_length > TNS_DPLS_MAX_MESSAGE_SIZE {
691 return Err(ProtocolError::DirectPathLoadTooMuchData);
692 }
693 self.total_piece_length = new_length;
694 self.pieces.push(piece);
695 Ok(())
697 }
698}
699
700fn is_fast_dbtype(metadata: &ColumnMetadata) -> bool {
703 matches!(
704 metadata.ora_type_num,
705 ORA_TYPE_NUM_VARCHAR
706 | ORA_TYPE_NUM_NUMBER
707 | ORA_TYPE_NUM_BINARY_INTEGER
708 | ORA_TYPE_NUM_CHAR
709 | ORA_TYPE_NUM_DATE
710 | ORA_TYPE_NUM_RAW
711 | ORA_TYPE_NUM_BINARY_FLOAT
712 | ORA_TYPE_NUM_BINARY_DOUBLE
713 | ORA_TYPE_NUM_BOOLEAN
714 | ORA_TYPE_NUM_TIMESTAMP
715 | ORA_TYPE_NUM_TIMESTAMP_TZ
716 | ORA_TYPE_NUM_TIMESTAMP_LTZ
717 )
718}
719
720#[derive(Clone, Debug, Eq, PartialEq)]
722pub struct DirectPathStream {
723 pub pieces: Vec<DirectPathPiece>,
724 pub total_piece_length: u32,
725}
726
727pub fn encode_direct_path_rows(
733 column_metadata: &[ColumnMetadata],
734 rows: &[Vec<DirectPathColumnValue>],
735 first_row_num: u64,
736) -> Result<DirectPathStream> {
737 let mut buffer = DirectPathPieceBuffer::new();
738 for (row_index, row) in rows.iter().enumerate() {
739 if row.len() != column_metadata.len() {
740 return Err(ProtocolError::TtcDecode(
741 "direct path row width does not match column metadata",
742 ));
743 }
744 let row_num = first_row_num + row_index as u64;
745 buffer.start_row()?;
746 for (metadata, value) in column_metadata.iter().zip(row) {
747 buffer.add_column_value(metadata, value, row_num)?;
748 }
749 buffer.finish_row()?;
750 }
751 let (pieces, total_piece_length) = buffer.finish()?;
752 Ok(DirectPathStream {
753 pieces,
754 total_piece_length,
755 })
756}
757
758pub fn build_direct_path_load_stream_payload(
762 cursor_id: u16,
763 stream: &DirectPathStream,
764 seq_num: u8,
765) -> Result<Vec<u8>> {
766 let mut writer = TtcWriter::new();
767 writer.write_function_code_with_seq(TNS_FUNC_DIRECT_PATH_LOAD_STREAM, seq_num);
768 writer.write_ub8(0); writer.write_ub2(cursor_id);
770 writer.write_u8(1); writer.write_ub4(stream.total_piece_length);
772 writer.write_ub4(TNS_DP_STREAM_VERSION);
773 writer.write_u8(0); writer.write_ub4(0); writer.write_u8(1); writer.write_u8(1); for piece in &stream.pieces {
778 piece.write_to(&mut writer)?;
779 }
780 Ok(writer.into_bytes())
781}
782
783#[derive(Clone, Debug, Eq, PartialEq)]
792pub struct BatchLoadState {
793 chunk_lengths: Vec<u64>,
794 batch_size: u32,
795 chunk_index: usize,
796 offset: u64,
797 message_offset: u64,
798 num_rows: u32,
799}
800
801impl BatchLoadState {
802 pub fn new(chunk_lengths: Vec<u64>, batch_size: u32) -> Result<Self> {
803 if batch_size == 0 {
804 return Err(ProtocolError::TtcDecode(
805 "batch_size must be a positive integer",
806 ));
807 }
808 let mut state = Self {
809 chunk_lengths,
810 batch_size,
811 chunk_index: 0,
812 offset: 0,
813 message_offset: 0,
814 num_rows: 0,
815 };
816 state.advance_batch();
817 Ok(state)
818 }
819
820 pub fn for_rows(total_rows: u64, batch_size: u32) -> Result<Self> {
823 Self::new(vec![total_rows], batch_size)
824 }
825
826 pub fn num_rows(&self) -> u32 {
828 self.num_rows
829 }
830
831 pub fn offset(&self) -> u64 {
833 self.offset
834 }
835
836 pub fn message_offset(&self) -> u64 {
839 self.message_offset
840 }
841
842 pub fn chunk_index(&self) -> usize {
844 self.chunk_index
845 }
846
847 pub fn is_done(&self) -> bool {
848 self.num_rows == 0
849 }
850
851 pub fn next_batch(&mut self) {
853 self.offset += u64::from(self.num_rows);
854 self.advance_batch();
855 }
856
857 fn rows_in_current_chunk(&self) -> u64 {
858 self.chunk_lengths
859 .get(self.chunk_index)
860 .copied()
861 .unwrap_or(0)
862 }
863
864 fn calculate_num_rows_in_batch(&mut self) {
865 let remaining = self.rows_in_current_chunk().saturating_sub(self.offset);
866 self.num_rows = u32::try_from(remaining.min(u64::from(self.batch_size))).unwrap_or(0);
867 }
868
869 fn advance_batch(&mut self) {
870 self.message_offset = self.offset;
871 self.calculate_num_rows_in_batch();
872 if self.num_rows == 0 {
873 self.advance_chunk();
874 }
875 }
876
877 fn advance_chunk(&mut self) {
878 while self.chunk_index + 1 < self.chunk_lengths.len() {
879 self.offset = 0;
880 self.message_offset = 0;
881 self.chunk_index += 1;
882 self.calculate_num_rows_in_batch();
883 if self.num_rows > 0 {
884 break;
885 }
886 }
887 }
888}
889
890#[cfg(test)]
891mod tests {
892 use super::*;
893
894 #[test]
900 fn direct_path_oversized_column_count_fails_closed_not_oom() {
901 let payload = [TNS_MSG_TYPE_PARAMETER, 4, 0x25, 0x00, 0x00, 0x00];
903 let err = parse_direct_path_prepare_response(&payload, ClientCapabilities::default())
904 .expect_err("oversized direct-path column count must fail closed");
905 assert!(matches!(err, ProtocolError::TtcDecode(_)), "got {err:?}");
906 }
907
908 fn column(name: &str, ora_type_num: u8, max_size: u32, nulls_allowed: bool) -> ColumnMetadata {
909 ColumnMetadata {
910 name: name.to_string(),
911 ora_type_num,
912 csfrm: if matches!(
913 ora_type_num,
914 ORA_TYPE_NUM_VARCHAR | ORA_TYPE_NUM_CHAR | ORA_TYPE_NUM_LONG
915 ) {
916 CS_FORM_IMPLICIT
917 } else {
918 0
919 },
920 precision: 0,
921 scale: 0,
922 buffer_size: max_size,
923 max_size,
924 nulls_allowed,
925 is_json: false,
926 is_oson: false,
927 object_schema: None,
928 object_type_name: None,
929 is_array: false,
930 vector_dimensions: None,
931 vector_format: 0,
932 vector_flags: 0,
933 ..Default::default()
934 }
935 }
936
937 #[test]
938 fn prepare_payload_matches_reference_layout() {
939 let payload = build_direct_path_prepare_payload(
940 "pythontest",
941 "dpl_golden",
942 &["id".to_string(), "name".to_string()],
943 10,
944 )
945 .expect("payload should build");
946 assert_eq!(&payload[..4], &[3, 128, 10, 0]);
948 let mut expected = vec![
949 1, 1, 1, 1, 4, 1, 1, 15, 1, 1, 1, 1, 1, 1, ];
956 expected.extend_from_slice(&[0, 1, 10]);
958 expected.extend_from_slice(&[10]);
959 expected.extend_from_slice(b"pythontest");
960 expected.extend_from_slice(&[1, 3]);
961 expected.extend_from_slice(&[0, 1, 10]);
963 expected.extend_from_slice(&[10]);
964 expected.extend_from_slice(b"dpl_golden");
965 expected.extend_from_slice(&[1, 1]);
966 expected.extend_from_slice(&[0, 1, 2, 2]);
968 expected.extend_from_slice(b"id");
969 expected.extend_from_slice(&[1, 4]);
970 expected.extend_from_slice(&[0, 1, 4, 4]);
971 expected.extend_from_slice(b"name");
972 expected.extend_from_slice(&[1, 4]);
973 expected.extend_from_slice(&[2, 0x01, 0x90, 2, 0x01, 0x90]);
975 expected.extend_from_slice(&[0; 9]);
976 expected.extend_from_slice(&[2, 0xff, 0xff, 0, 0, 1, 1]);
977 assert_eq!(&payload[4..], expected.as_slice());
978 }
979
980 #[test]
981 fn op_payload_matches_reference_layout() {
982 let payload = build_direct_path_op_payload(1, TNS_DP_OP_FINISH, 12);
983 assert_eq!(
984 payload,
985 vec![3, 130, 12, 0, 1, 2, 1, 1, 0, 0, 1, 1],
986 "fn code, seq, token, ub4 op, ub2 cursor, ptr 0, ub4 0, ptr 1, ptr 1"
987 );
988 }
989
990 #[test]
991 fn single_fast_row_produces_one_fast_piece() {
992 let columns = vec![
993 column("ID", ORA_TYPE_NUM_NUMBER, 0, false),
994 column("NAME", ORA_TYPE_NUM_VARCHAR, 100, false),
995 ];
996 let rows = vec![vec![
997 DirectPathColumnValue::Number("1".into()),
998 DirectPathColumnValue::Bytes(b"alpha".to_vec()),
999 ]];
1000 let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1001 assert_eq!(stream.pieces.len(), 1);
1002 let piece = &stream.pieces[0];
1003 assert_eq!(
1004 piece.flags(),
1005 TNS_DPLS_ROW_HEADER_FIRST
1006 | TNS_DPLS_ROW_HEADER_LAST
1007 | TNS_DPLS_ROW_HEADER_FAST_ROW
1008 | TNS_DPLS_ROW_HEADER_FAST_PIECE
1009 );
1010 assert_eq!(piece.num_segments(), 2);
1011 assert_eq!(
1013 piece.data(),
1014 &[2, 0xc1, 0x02, 5, b'a', b'l', b'p', b'h', b'a']
1015 );
1016 assert_eq!(stream.total_piece_length, piece.data().len() as u32 + 4);
1018 }
1019
1020 #[test]
1021 fn long_column_clears_fast_flag() {
1022 let columns = vec![column("WIDE", ORA_TYPE_NUM_LONG, 0, false)];
1023 let rows = vec![vec![DirectPathColumnValue::Bytes(vec![b'x'; 10])]];
1024 let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1025 assert_eq!(stream.pieces.len(), 1);
1026 assert_eq!(
1027 stream.pieces[0].flags(),
1028 TNS_DPLS_ROW_HEADER_FIRST | TNS_DPLS_ROW_HEADER_LAST
1029 );
1030 assert_eq!(stream.total_piece_length, 11 + 2);
1032 }
1033
1034 #[test]
1035 fn null_values_encode_as_null_indicator() {
1036 let columns = vec![column("SALARY", ORA_TYPE_NUM_NUMBER, 0, true)];
1037 let rows = vec![vec![DirectPathColumnValue::Null]];
1038 let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1039 assert_eq!(stream.pieces[0].data(), &[0xff]);
1040 assert_eq!(stream.pieces[0].num_segments(), 1);
1041 }
1042
1043 #[test]
1044 fn null_into_not_null_column_raises_dpy_8001() {
1045 let columns = vec![column("NAME", ORA_TYPE_NUM_VARCHAR, 100, false)];
1046 let rows = vec![vec![DirectPathColumnValue::Null]];
1047 let err = encode_direct_path_rows(&columns, &rows, 1).expect_err("nulls must be rejected");
1048 assert!(
1049 err.to_string().starts_with("DPY-8001:"),
1050 "unexpected error: {err}"
1051 );
1052 assert!(err.to_string().contains("\"NAME\""), "{err}");
1053 assert!(err.to_string().contains("row 1"), "{err}");
1054 }
1055
1056 #[test]
1057 fn oversized_value_raises_dpy_8000() {
1058 let columns = vec![column("NAME", ORA_TYPE_NUM_VARCHAR, 4, false)];
1059 let rows = vec![vec![DirectPathColumnValue::Bytes(b"toolong".to_vec())]];
1060 let err = encode_direct_path_rows(&columns, &rows, 3).expect_err("size must be enforced");
1061 assert!(
1062 err.to_string().starts_with("DPY-8000:"),
1063 "unexpected error: {err}"
1064 );
1065 assert!(err.to_string().contains("row 3"), "{err}");
1066 }
1067
1068 #[test]
1069 fn long_values_use_fe_chunked_segments() {
1070 let columns = vec![column("WIDE", ORA_TYPE_NUM_VARCHAR, 1000, false)];
1072 let value = vec![b'q'; 600];
1073 let rows = vec![vec![DirectPathColumnValue::Bytes(value.clone())]];
1074 let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1075 assert_eq!(stream.pieces.len(), 1);
1076 let piece = &stream.pieces[0];
1077 assert_eq!(piece.num_segments(), 1);
1078 let mut expected = vec![0xfe, 0x02, 0x58];
1079 expected.extend_from_slice(&value);
1080 assert_eq!(piece.data(), expected.as_slice());
1081 }
1082
1083 #[test]
1084 fn values_larger_than_piece_split_across_pieces_with_split_flags() {
1085 let columns = vec![column("WIDE", ORA_TYPE_NUM_LONG, 0, false)];
1086 let total = TNS_DPLS_MAX_PIECE_SIZE + 100;
1087 let rows = vec![vec![DirectPathColumnValue::Bytes(vec![b'z'; total])]];
1088 let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1089 assert_eq!(stream.pieces.len(), 2);
1090 let first = &stream.pieces[0];
1091 let second = &stream.pieces[1];
1092 assert_eq!(
1093 first.flags(),
1094 TNS_DPLS_ROW_HEADER_FIRST | TNS_DPLS_ROW_HEADER_SPLIT_WITH_NEXT
1095 );
1096 assert_eq!(
1097 second.flags(),
1098 TNS_DPLS_ROW_HEADER_SPLIT_WITH_PREV | TNS_DPLS_ROW_HEADER_LAST
1099 );
1100 assert_eq!(first.data().len(), TNS_DPLS_MAX_PIECE_SIZE);
1102 assert_eq!(first.data()[0], 0xfe);
1103 let first_chunk = usize::from(u16::from_be_bytes([first.data()[1], first.data()[2]]));
1104 assert_eq!(first_chunk, TNS_DPLS_MAX_PIECE_SIZE - 3);
1105 let second_chunk = usize::from(u16::from_be_bytes([second.data()[1], second.data()[2]]));
1106 assert_eq!(first_chunk + second_chunk, total);
1107 assert_eq!(
1108 stream.total_piece_length as usize,
1109 first.data().len() + second.data().len() + 2 + 2
1110 );
1111 }
1112
1113 #[test]
1114 fn segment_count_caps_at_255_per_piece() {
1115 let columns: Vec<ColumnMetadata> = (0..300)
1116 .map(|i| column(&format!("C{i}"), ORA_TYPE_NUM_NUMBER, 0, true))
1117 .collect();
1118 let row: Vec<DirectPathColumnValue> =
1119 (0..300).map(|_| DirectPathColumnValue::Null).collect();
1120 let stream = encode_direct_path_rows(&columns, &[row], 1).expect("stream should encode");
1121 assert_eq!(stream.pieces.len(), 2);
1122 assert_eq!(stream.pieces[0].num_segments(), 255);
1123 assert_eq!(stream.pieces[1].num_segments(), 45);
1124 assert_eq!(stream.pieces[0].flags(), TNS_DPLS_ROW_HEADER_FIRST);
1127 assert_eq!(stream.pieces[1].flags(), TNS_DPLS_ROW_HEADER_LAST);
1128 }
1129
1130 #[test]
1131 fn timestamp_with_zero_fraction_collapses_to_seven_bytes() {
1132 let columns = vec![column("TS", ORA_TYPE_NUM_TIMESTAMP, 0, true)];
1133 let rows = vec![vec![DirectPathColumnValue::DateTime {
1134 year: 2024,
1135 month: 1,
1136 day: 2,
1137 hour: 3,
1138 minute: 4,
1139 second: 5,
1140 nanosecond: 0,
1141 }]];
1142 let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1143 assert_eq!(
1144 stream.pieces[0].data(),
1145 &[7, 120, 124, 1, 2, 4, 5, 6],
1146 "7-byte date form expected when fractional seconds are zero"
1147 );
1148 }
1149
1150 #[test]
1151 fn boolean_values_encode_per_reference() {
1152 let columns = vec![column("FLAG", ORA_TYPE_NUM_BOOLEAN, 0, true)];
1153 let rows = vec![
1154 vec![DirectPathColumnValue::Boolean(true)],
1155 vec![DirectPathColumnValue::Boolean(false)],
1156 ];
1157 let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1158 assert_eq!(stream.pieces[0].data(), &[2, 1, 1]);
1159 assert_eq!(stream.pieces[1].data(), &[1, 0]);
1160 }
1161
1162 #[test]
1163 fn row_width_mismatch_is_rejected() {
1164 let columns = vec![
1165 column("A", ORA_TYPE_NUM_NUMBER, 0, true),
1166 column("B", ORA_TYPE_NUM_NUMBER, 0, true),
1167 ];
1168 let rows = vec![vec![DirectPathColumnValue::Null]];
1169 assert!(encode_direct_path_rows(&columns, &rows, 1).is_err());
1170 }
1171
1172 #[test]
1173 fn metadata_overrides_inline_lobs() {
1174 let mut clob = column("DOC", ORA_TYPE_NUM_CLOB, 0, true);
1175 clob.csfrm = CS_FORM_IMPLICIT;
1176 apply_direct_path_metadata_overrides(&mut clob, 873);
1177 assert_eq!(clob.ora_type_num, ORA_TYPE_NUM_LONG);
1178 assert_eq!(clob.csfrm, CS_FORM_NCHAR, "multi-byte charset uses NCHAR");
1179
1180 let mut clob = column("DOC", ORA_TYPE_NUM_CLOB, 0, true);
1181 clob.csfrm = CS_FORM_IMPLICIT;
1182 apply_direct_path_metadata_overrides(&mut clob, 178);
1183 assert_eq!(
1184 clob.csfrm, CS_FORM_IMPLICIT,
1185 "single-byte charset keeps form"
1186 );
1187
1188 let mut blob = column("BIN", ORA_TYPE_NUM_BLOB, 0, true);
1189 apply_direct_path_metadata_overrides(&mut blob, 873);
1190 assert_eq!(blob.ora_type_num, ORA_TYPE_NUM_LONG_RAW);
1191 assert_eq!(blob.csfrm, 0);
1192 }
1193
1194 #[test]
1195 fn batch_state_single_chunk_splits_by_batch_size() {
1196 let mut state = BatchLoadState::for_rows(5, 2).expect("state should build");
1197 assert_eq!(
1198 (state.num_rows(), state.offset(), state.message_offset()),
1199 (2, 0, 0)
1200 );
1201 state.next_batch();
1202 assert_eq!(
1203 (state.num_rows(), state.offset(), state.message_offset()),
1204 (2, 2, 2)
1205 );
1206 state.next_batch();
1207 assert_eq!(
1208 (state.num_rows(), state.offset(), state.message_offset()),
1209 (1, 4, 4)
1210 );
1211 state.next_batch();
1212 assert!(state.is_done());
1213 }
1214
1215 #[test]
1216 fn batch_state_never_spans_chunks() {
1217 let mut state = BatchLoadState::new(vec![3, 2], 2).expect("state should build");
1219 assert_eq!(
1220 (
1221 state.chunk_index(),
1222 state.num_rows(),
1223 state.message_offset()
1224 ),
1225 (0, 2, 0)
1226 );
1227 state.next_batch();
1228 assert_eq!(
1229 (
1230 state.chunk_index(),
1231 state.num_rows(),
1232 state.message_offset()
1233 ),
1234 (0, 1, 2)
1235 );
1236 state.next_batch();
1237 assert_eq!(
1238 (
1239 state.chunk_index(),
1240 state.num_rows(),
1241 state.message_offset()
1242 ),
1243 (1, 2, 0)
1244 );
1245 state.next_batch();
1246 assert!(state.is_done());
1247 }
1248
1249 #[test]
1250 fn batch_state_skips_empty_chunks() {
1251 let mut state = BatchLoadState::new(vec![0, 0, 3], 10).expect("state should build");
1252 assert_eq!((state.chunk_index(), state.num_rows()), (2, 3));
1253 state.next_batch();
1254 assert!(state.is_done());
1255 }
1256
1257 #[test]
1258 fn batch_state_rejects_zero_batch_size() {
1259 assert!(BatchLoadState::for_rows(5, 0).is_err());
1260 }
1261
1262 #[test]
1263 fn batch_state_empty_source_is_done_immediately() {
1264 let state = BatchLoadState::for_rows(0, 10).expect("state should build");
1265 assert!(state.is_done());
1266 }
1267
1268 #[test]
1269 fn load_stream_payload_header_matches_reference_layout() {
1270 let columns = vec![column("ID", ORA_TYPE_NUM_NUMBER, 0, false)];
1271 let rows = vec![vec![DirectPathColumnValue::Number("1".into())]];
1272 let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1273 let payload =
1274 build_direct_path_load_stream_payload(1, &stream, 11).expect("payload should build");
1275 let mut expected = vec![
1276 3, 129, 11, 0, 1, 1, 1, 1, 7, 2, 0x01, 0x90, 0, 0, 1, 1, 0x3c, 0, 7, 1, 2, 0xc1, 0x02, ];
1288 assert_eq!(payload, std::mem::take(&mut expected));
1289 }
1290}