1use crate::thin::{
17 encode_binary_double, encode_binary_float, encode_number_text, encode_oracle_date,
18 encode_oracle_timestamp, encode_oracle_timestamp_tz, parse_column_metadata,
19 parse_server_error_info, skip_server_side_piggyback, ClientCapabilities, ColumnMetadata,
20 CS_FORM_IMPLICIT, CS_FORM_NCHAR, ORA_TYPE_NUM_BINARY_DOUBLE, ORA_TYPE_NUM_BINARY_FLOAT,
21 ORA_TYPE_NUM_BINARY_INTEGER, ORA_TYPE_NUM_BLOB, ORA_TYPE_NUM_BOOLEAN, ORA_TYPE_NUM_CHAR,
22 ORA_TYPE_NUM_CLOB, ORA_TYPE_NUM_DATE, ORA_TYPE_NUM_LONG, ORA_TYPE_NUM_LONG_RAW,
23 ORA_TYPE_NUM_NUMBER, ORA_TYPE_NUM_RAW, ORA_TYPE_NUM_TIMESTAMP, ORA_TYPE_NUM_TIMESTAMP_LTZ,
24 ORA_TYPE_NUM_TIMESTAMP_TZ, ORA_TYPE_NUM_VARCHAR, TNS_MSG_TYPE_END_OF_RESPONSE,
25 TNS_MSG_TYPE_ERROR, TNS_MSG_TYPE_PARAMETER, TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK,
26 TNS_MSG_TYPE_STATUS,
27};
28use crate::wire::{BoundedReader, ProtocolLimits, TtcReader, TtcWriter};
29use crate::{ProtocolError, Result};
30
31pub const TNS_FUNC_DIRECT_PATH_PREPARE: u8 = 128;
32pub const TNS_FUNC_DIRECT_PATH_LOAD_STREAM: u8 = 129;
33pub const TNS_FUNC_DIRECT_PATH_OP: u8 = 130;
34
35pub const TNS_DP_INTERFACE_VERSION: u32 = 400;
36pub const TNS_DP_STREAM_VERSION: u32 = 400;
37
38pub const TNS_DPP_OP_CODE_LOAD: u32 = 1;
39
40pub const TNS_DP_OP_ABORT: u32 = 1;
41pub const TNS_DP_OP_FINISH: u32 = 2;
42
43const TNS_DPP_IN_INDEX_INTERFACE_VERSION: usize = 0;
44const TNS_DPP_IN_INDEX_STREAM_VERSION: usize = 1;
45const TNS_DPP_IN_INDEX_LOCK_WAIT: usize = 14;
46const TNS_DPP_KW_INDEX_OBJECT_NAME: u16 = 1;
47const TNS_DPP_KW_INDEX_SCHEMA_NAME: u16 = 3;
48const TNS_DPP_KW_INDEX_COLUMN_NAME: u16 = 4;
49const TNS_DPP_KW_INDEX_NFOBJ_OID_POS: usize = 11;
50const TNS_DPP_OUT_INDEX_CURSOR: usize = 3;
51const TNS_DPP_IN_VALUES_SENT: usize = TNS_DPP_IN_INDEX_LOCK_WAIT + 1;
55
56pub const TNS_DPLS_ROW_HEADER_FAST_PIECE: u8 = 0x10;
57pub const TNS_DPLS_ROW_HEADER_FAST_ROW: u8 = 0x20;
58pub const TNS_DPLS_ROW_HEADER_FIRST: u8 = 0x08;
59pub const TNS_DPLS_ROW_HEADER_LAST: u8 = 0x04;
60pub const TNS_DPLS_ROW_HEADER_SPLIT_WITH_PREV: u8 = 0x02;
61pub const TNS_DPLS_ROW_HEADER_SPLIT_WITH_NEXT: u8 = 0x01;
62
63pub const TNS_DPLS_MAX_MESSAGE_SIZE: u64 = 1_073_728_895;
64pub const TNS_DPLS_MAX_SHORT_LENGTH: usize = 0xfa;
65pub const TNS_DPLS_MAX_PIECE_SIZE: usize = 0xfff0;
66
67const TNS_DPLS_LONG_LENGTH_INDICATOR: u8 = 0xfe;
68const TNS_NULL_LENGTH_INDICATOR: u8 = 0xff;
69
70pub fn build_direct_path_prepare_payload(
74 schema_name: &str,
75 table_name: &str,
76 column_names: &[String],
77 seq_num: u8,
78) -> Result<Vec<u8>> {
79 let keyword_parameters_length =
80 u32::try_from(column_names.len() + 2).map_err(|_| ProtocolError::InvalidPacketLength {
81 length: column_names.len(),
82 minimum: 0,
83 })?;
84
85 let mut in_values = [0u32; TNS_DPP_IN_VALUES_SENT];
86 in_values[TNS_DPP_IN_INDEX_INTERFACE_VERSION] = TNS_DP_INTERFACE_VERSION;
87 in_values[TNS_DPP_IN_INDEX_STREAM_VERSION] = TNS_DP_STREAM_VERSION;
88 in_values[TNS_DPP_KW_INDEX_NFOBJ_OID_POS] = 0xffff;
89 in_values[TNS_DPP_IN_INDEX_LOCK_WAIT] = 1;
90
91 let mut writer = TtcWriter::new();
92 writer.write_function_code_with_seq(TNS_FUNC_DIRECT_PATH_PREPARE, seq_num);
93 writer.write_ub8(0); writer.write_ub4(TNS_DPP_OP_CODE_LOAD);
95 writer.write_u8(1); writer.write_ub4(keyword_parameters_length);
97 writer.write_u8(1); writer.write_ub2(TNS_DPP_IN_VALUES_SENT as u16);
99 writer.write_u8(1); writer.write_u8(1); writer.write_u8(1); writer.write_u8(1); writer.write_u8(1); writer.write_u8(1); write_keyword_param(&mut writer, TNS_DPP_KW_INDEX_SCHEMA_NAME, schema_name)?;
106 write_keyword_param(&mut writer, TNS_DPP_KW_INDEX_OBJECT_NAME, table_name)?;
107 for name in column_names {
108 write_keyword_param(&mut writer, TNS_DPP_KW_INDEX_COLUMN_NAME, name)?;
109 }
110 for value in in_values {
111 writer.write_ub4(value);
112 }
113 Ok(writer.into_bytes())
114}
115
116fn write_keyword_param(writer: &mut TtcWriter, index: u16, value: &str) -> Result<()> {
117 let bytes = value.as_bytes();
118 let len = u16::try_from(bytes.len()).map_err(|_| ProtocolError::InvalidPacketLength {
119 length: bytes.len(),
120 minimum: 0,
121 })?;
122 writer.write_ub2(0); writer.write_ub2(len);
124 writer.write_bytes_with_length(bytes)?;
125 writer.write_ub2(index);
126 Ok(())
127}
128
129#[derive(Clone, Debug, Default, Eq, PartialEq)]
130pub struct DirectPathPrepareResult {
131 pub column_metadata: Vec<ColumnMetadata>,
132 pub cursor_id: u16,
133 pub out_values: Vec<u32>,
134}
135
136pub fn parse_direct_path_prepare_response(
143 payload: &[u8],
144 capabilities: ClientCapabilities,
145) -> Result<DirectPathPrepareResult> {
146 parse_direct_path_prepare_response_with_limits(payload, capabilities, ProtocolLimits::DEFAULT)
147}
148
149pub fn parse_direct_path_prepare_response_with_limits(
150 payload: &[u8],
151 capabilities: ClientCapabilities,
152 limits: ProtocolLimits,
153) -> Result<DirectPathPrepareResult> {
154 let mut reader = TtcReader::with_limits(payload, limits)?;
155 let mut result: Option<DirectPathPrepareResult> = None;
156 while reader.remaining() > 0 {
157 let message_type = reader.read_u8()?;
158 match message_type {
159 0 => {}
160 TNS_MSG_TYPE_PARAMETER => {
161 result = Some(parse_prepare_return_parameters(&mut reader, capabilities)?);
162 }
163 TNS_MSG_TYPE_STATUS => {
164 let _call_status = reader.read_ub4()?;
165 let _seq = reader.read_ub2()?;
166 }
167 TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
168 let _ = skip_server_side_piggyback(&mut reader)?;
169 }
170 TNS_MSG_TYPE_END_OF_RESPONSE => break,
171 TNS_MSG_TYPE_ERROR => {
172 let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
173 if info.number != 0 {
174 return Err(ProtocolError::ServerError(info.message));
175 }
176 }
177 _ => {
178 return Err(ProtocolError::UnknownMessageType {
179 message_type,
180 position: reader.position().saturating_sub(1),
181 })
182 }
183 }
184 }
185 result.ok_or(ProtocolError::TtcDecode(
186 "direct path prepare response did not contain return parameters",
187 ))
188}
189
190fn parse_prepare_return_parameters(
191 reader: &mut TtcReader<'_>,
192 capabilities: ClientCapabilities,
193) -> Result<DirectPathPrepareResult> {
194 let num_columns = reader.read_ub4()?;
195 reader.limits().check_columns(num_columns as usize)?;
196 let mut column_metadata: Vec<ColumnMetadata> =
200 reader.with_capacity_limited(num_columns as usize, 1, ProtocolLimits::check_columns)?;
201 for _ in 0..num_columns {
202 let mut metadata = parse_column_metadata(reader, capabilities)?;
203 apply_direct_path_metadata_overrides(&mut metadata, capabilities.charset_id);
204 column_metadata.push(metadata);
205 }
206 let num_params = reader.read_ub2()?;
207 if num_params != 0 {
208 return Err(ProtocolError::TtcDecode(
209 "unexpected parameters in direct path prepare response",
210 ));
211 }
212 let out_values_length = reader.read_ub2()?;
213 reader
214 .limits()
215 .check_length_prefixed_elements(usize::from(out_values_length))?;
216 let mut out_values: Vec<u32> = reader.with_capacity_limited(
218 usize::from(out_values_length),
219 1,
220 ProtocolLimits::check_length_prefixed_elements,
221 )?;
222 for _ in 0..out_values_length {
223 out_values.push(reader.read_ub4()?);
224 }
225 let cursor_id =
226 out_values
227 .get(TNS_DPP_OUT_INDEX_CURSOR)
228 .copied()
229 .ok_or(ProtocolError::TtcDecode(
230 "direct path prepare response missing cursor id",
231 ))?;
232 let cursor_id = u16::try_from(cursor_id)
233 .map_err(|_| ProtocolError::TtcDecode("direct path cursor id out of range"))?;
234 Ok(DirectPathPrepareResult {
235 column_metadata,
236 cursor_id,
237 out_values,
238 })
239}
240
241fn apply_direct_path_metadata_overrides(metadata: &mut ColumnMetadata, charset_id: u16) {
245 if metadata.ora_type_num == ORA_TYPE_NUM_CLOB {
246 if metadata.csfrm == CS_FORM_IMPLICIT && charset_id >= 800 {
247 metadata.csfrm = CS_FORM_NCHAR;
248 }
249 metadata.ora_type_num = ORA_TYPE_NUM_LONG;
250 } else if metadata.ora_type_num == ORA_TYPE_NUM_BLOB {
251 metadata.ora_type_num = ORA_TYPE_NUM_LONG_RAW;
252 metadata.csfrm = 0;
253 }
254}
255
256pub fn build_direct_path_op_payload(cursor_id: u16, op_code: u32, seq_num: u8) -> Vec<u8> {
261 let mut writer = TtcWriter::new();
262 writer.write_function_code_with_seq(TNS_FUNC_DIRECT_PATH_OP, seq_num);
263 writer.write_ub8(0); writer.write_ub4(op_code);
265 writer.write_ub2(cursor_id);
266 writer.write_u8(0); writer.write_ub4(0); writer.write_u8(1); writer.write_u8(1); writer.into_bytes()
271}
272
273pub fn parse_direct_path_simple_response(
276 payload: &[u8],
277 capabilities: ClientCapabilities,
278) -> Result<()> {
279 parse_direct_path_simple_response_with_limits(payload, capabilities, ProtocolLimits::DEFAULT)
280}
281
282pub fn parse_direct_path_simple_response_with_limits(
283 payload: &[u8],
284 capabilities: ClientCapabilities,
285 limits: ProtocolLimits,
286) -> Result<()> {
287 let mut reader = TtcReader::with_limits(payload, limits)?;
288 while reader.remaining() > 0 {
289 let message_type = reader.read_u8()?;
290 match message_type {
291 0 => {}
292 TNS_MSG_TYPE_PARAMETER => {
293 let num_out_values = reader.read_ub2()?;
294 for _ in 0..num_out_values {
295 let _value = reader.read_ub4()?;
296 }
297 }
298 TNS_MSG_TYPE_STATUS => {
299 let _call_status = reader.read_ub4()?;
300 let _seq = reader.read_ub2()?;
301 }
302 TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
303 let _ = skip_server_side_piggyback(&mut reader)?;
304 }
305 TNS_MSG_TYPE_END_OF_RESPONSE => break,
306 TNS_MSG_TYPE_ERROR => {
307 let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
308 if info.number != 0 {
309 return Err(ProtocolError::ServerError(info.message));
310 }
311 }
312 _ => {
313 return Err(ProtocolError::UnknownMessageType {
314 message_type,
315 position: reader.position().saturating_sub(1),
316 })
317 }
318 }
319 }
320 Ok(())
321}
322
323pub use parse_direct_path_simple_response as parse_direct_path_load_stream_response;
324pub use parse_direct_path_simple_response as parse_direct_path_op_response;
325pub use parse_direct_path_simple_response_with_limits as parse_direct_path_load_stream_response_with_limits;
326pub use parse_direct_path_simple_response_with_limits as parse_direct_path_op_response_with_limits;
327
328#[derive(Clone, Debug, PartialEq)]
334pub enum DirectPathColumnValue {
335 Null,
336 Bytes(Vec<u8>),
337 Number(String),
338 BinaryDouble(f64),
339 BinaryFloat(f32),
340 DateTime {
341 year: i32,
342 month: u8,
343 day: u8,
344 hour: u8,
345 minute: u8,
346 second: u8,
347 nanosecond: u32,
348 },
349 Boolean(bool),
350}
351
352#[derive(Clone, Debug, Eq, PartialEq)]
355pub struct DirectPathPiece {
356 flags: u8,
357 num_segments: u8,
358 data: Vec<u8>,
359}
360
361impl DirectPathPiece {
362 pub fn flags(&self) -> u8 {
363 self.flags
364 }
365
366 pub fn num_segments(&self) -> u8 {
367 self.num_segments
368 }
369
370 pub fn data(&self) -> &[u8] {
371 &self.data
372 }
373
374 fn is_fast_row(&self) -> bool {
375 self.flags & TNS_DPLS_ROW_HEADER_FAST_ROW != 0
376 }
377
378 fn header_length(&self) -> u64 {
379 if self.is_fast_row() {
380 4
381 } else {
382 2
383 }
384 }
385
386 fn write_to(&self, writer: &mut TtcWriter) -> Result<()> {
387 writer.write_u8(self.flags);
388 if self.is_fast_row() {
389 let total = self.data.len() as u64 + self.header_length();
390 let total = u16::try_from(total).map_err(|_| {
391 ProtocolError::TtcDecode("direct path fast piece exceeds 16-bit length")
392 })?;
393 writer.write_u16be(total);
394 }
395 writer.write_u8(self.num_segments);
396 writer.write_raw(&self.data);
397 Ok(())
398 }
399}
400
401#[derive(Clone, Copy, Debug, Default)]
402struct PieceState {
403 is_first: bool,
404 is_last: bool,
405 is_split_with_prev: bool,
406 is_split_with_next: bool,
407 is_fast: bool,
408 num_segments: u16,
409}
410
411#[derive(Debug, Default)]
417pub(crate) struct DirectPathPieceBuffer {
418 pieces: Vec<DirectPathPiece>,
419 total_piece_length: u64,
420 data: Vec<u8>,
421 current: Option<PieceState>,
422}
423
424impl DirectPathPieceBuffer {
425 pub fn new() -> Self {
426 Self::default()
427 }
428
429 pub fn start_row(&mut self) -> Result<()> {
430 if self.current.is_some() {
431 return Err(ProtocolError::TtcDecode(
432 "direct path row started before previous row was finished",
433 ));
434 }
435 self.current = Some(PieceState {
436 is_first: true,
437 is_fast: true,
438 ..PieceState::default()
439 });
440 Ok(())
441 }
442
443 pub fn finish_row(&mut self) -> Result<()> {
444 let Some(state) = self.current.as_mut() else {
445 return Err(ProtocolError::TtcDecode(
446 "direct path row finished without being started",
447 ));
448 };
449 state.is_last = true;
450 self.finalize_piece()?;
451 self.current = None;
452 Ok(())
453 }
454
455 pub fn add_column_value(
456 &mut self,
457 metadata: &ColumnMetadata,
458 value: &DirectPathColumnValue,
459 row_num: u64,
460 ) -> Result<()> {
461 let Some(state) = self.current.as_mut() else {
462 return Err(ProtocolError::TtcDecode(
463 "direct path column value added outside of a row",
464 ));
465 };
466
467 if state.num_segments == 255 {
469 self.finalize_piece()?;
470 self.current = Some(PieceState::default());
471 }
472
473 if !is_fast_dbtype(metadata) {
474 if let Some(state) = self.current.as_mut() {
475 state.is_fast = false;
476 }
477 }
478
479 match value {
480 DirectPathColumnValue::Null => {
481 if !metadata.nulls_allowed {
482 return Err(ProtocolError::NullsNotAllowed {
483 column_name: metadata.name.clone(),
484 row_num,
485 });
486 }
487 self.write_u8_in_piece(TNS_NULL_LENGTH_INDICATOR)?;
488 self.bump_segments();
489 Ok(())
490 }
491 DirectPathColumnValue::Bytes(bytes) => {
492 if !matches!(
493 metadata.ora_type_num,
494 ORA_TYPE_NUM_VARCHAR
495 | ORA_TYPE_NUM_CHAR
496 | ORA_TYPE_NUM_LONG
497 | ORA_TYPE_NUM_RAW
498 | ORA_TYPE_NUM_LONG_RAW
499 ) {
500 return Err(ProtocolError::TtcDecode(
501 "direct path byte value sent for non-character column",
502 ));
503 }
504 if metadata.max_size > 0 && bytes.len() as u64 > u64::from(metadata.max_size) {
505 return Err(ProtocolError::ValueTooLarge {
506 actual_size: bytes.len(),
507 max_size: metadata.max_size,
508 column_name: metadata.name.clone(),
509 row_num,
510 });
511 }
512 self.write_raw_bytes_and_length(bytes)
513 }
514 DirectPathColumnValue::Number(text) => {
515 if !matches!(
516 metadata.ora_type_num,
517 ORA_TYPE_NUM_NUMBER | ORA_TYPE_NUM_BINARY_INTEGER
518 ) {
519 return Err(ProtocolError::TtcDecode(
520 "direct path number value sent for non-number column",
521 ));
522 }
523 let encoded = encode_number_text(text)?;
524 self.write_raw_bytes_and_length(&encoded)
525 }
526 DirectPathColumnValue::BinaryDouble(value) => {
527 if metadata.ora_type_num != ORA_TYPE_NUM_BINARY_DOUBLE {
528 return Err(ProtocolError::TtcDecode(
529 "direct path binary double sent for other column type",
530 ));
531 }
532 let encoded = encode_binary_double(*value);
533 self.write_raw_bytes_and_length(&encoded)
534 }
535 DirectPathColumnValue::BinaryFloat(value) => {
536 if metadata.ora_type_num != ORA_TYPE_NUM_BINARY_FLOAT {
537 return Err(ProtocolError::TtcDecode(
538 "direct path binary float sent for other column type",
539 ));
540 }
541 let encoded = encode_binary_float(*value);
542 self.write_raw_bytes_and_length(&encoded)
543 }
544 DirectPathColumnValue::DateTime {
545 year,
546 month,
547 day,
548 hour,
549 minute,
550 second,
551 nanosecond,
552 } => {
553 let encoded = match metadata.ora_type_num {
554 ORA_TYPE_NUM_DATE => {
555 if *nanosecond != 0 {
556 return Err(ProtocolError::TtcDecode(
557 "direct path DATE value has fractional seconds",
558 ));
559 }
560 encode_oracle_date(*year, *month, *day, *hour, *minute, *second)?.to_vec()
561 }
562 ORA_TYPE_NUM_TIMESTAMP | ORA_TYPE_NUM_TIMESTAMP_LTZ => encode_oracle_timestamp(
565 *year,
566 *month,
567 *day,
568 *hour,
569 *minute,
570 *second,
571 *nanosecond,
572 )?,
573 ORA_TYPE_NUM_TIMESTAMP_TZ => encode_oracle_timestamp_tz(
574 *year,
575 *month,
576 *day,
577 *hour,
578 *minute,
579 *second,
580 *nanosecond,
581 )?,
582 _ => {
583 return Err(ProtocolError::TtcDecode(
584 "direct path datetime sent for non-datetime column",
585 ))
586 }
587 };
588 self.write_raw_bytes_and_length(&encoded)
589 }
590 DirectPathColumnValue::Boolean(value) => {
591 if metadata.ora_type_num != ORA_TYPE_NUM_BOOLEAN {
592 return Err(ProtocolError::TtcDecode(
593 "direct path boolean sent for non-boolean column",
594 ));
595 }
596 let encoded: &[u8] = if *value { &[1, 1] } else { &[0] };
597 self.write_raw_bytes_and_length(encoded)
598 }
599 }
600 }
601
602 pub fn finish(self) -> Result<(Vec<DirectPathPiece>, u32)> {
605 if self.current.is_some() {
606 return Err(ProtocolError::TtcDecode(
607 "direct path stream finished mid-row",
608 ));
609 }
610 let total = u32::try_from(self.total_piece_length)
611 .map_err(|_| ProtocolError::DirectPathLoadTooMuchData)?;
612 Ok((self.pieces, total))
613 }
614
615 fn bump_segments(&mut self) {
616 if let Some(state) = self.current.as_mut() {
617 state.num_segments = state.num_segments.saturating_add(1);
618 }
619 }
620
621 fn space_left(&self) -> usize {
622 TNS_DPLS_MAX_PIECE_SIZE.saturating_sub(self.data.len())
623 }
624
625 fn write_u8_in_piece(&mut self, value: u8) -> Result<()> {
626 if self.space_left() < 1 {
627 self.finalize_piece()?;
628 self.current = Some(PieceState::default());
629 }
630 self.data.push(value);
631 Ok(())
632 }
633
634 fn write_raw_bytes_and_length(&mut self, bytes: &[u8]) -> Result<()> {
639 if bytes.len() <= TNS_DPLS_MAX_SHORT_LENGTH {
640 if bytes.len() + 1 > self.space_left() {
641 self.finalize_piece()?;
642 self.current = Some(PieceState::default());
643 }
644 self.data.push(bytes.len() as u8);
645 self.data.extend_from_slice(bytes);
646 self.bump_segments();
647 return Ok(());
648 }
649
650 let mut remaining = bytes;
651 while remaining.len() + 3 > self.space_left() {
652 if self.space_left() < 4 {
656 self.finalize_piece()?;
657 self.current = Some(PieceState::default());
658 continue;
659 }
660 let chunk_len = self.space_left() - 3;
661 let (chunk, rest) = remaining.split_at(chunk_len.min(remaining.len()));
662 self.data.push(TNS_DPLS_LONG_LENGTH_INDICATOR);
663 self.data
664 .extend_from_slice(&(chunk.len() as u16).to_be_bytes());
665 self.data.extend_from_slice(chunk);
666 remaining = rest;
667 if let Some(state) = self.current.as_mut() {
668 state.is_split_with_next = true;
669 }
670 self.bump_segments();
671 self.finalize_piece()?;
672 self.current = Some(PieceState {
673 is_split_with_prev: !remaining.is_empty(),
674 ..PieceState::default()
675 });
676 }
677 if !remaining.is_empty() {
678 self.bump_segments();
679 self.data.push(TNS_DPLS_LONG_LENGTH_INDICATOR);
680 self.data
681 .extend_from_slice(&(remaining.len() as u16).to_be_bytes());
682 self.data.extend_from_slice(remaining);
683 }
684 Ok(())
685 }
686
687 fn finalize_piece(&mut self) -> Result<()> {
688 let Some(state) = self.current.take() else {
689 return Err(ProtocolError::TtcDecode(
690 "direct path piece finalized without an active piece",
691 ));
692 };
693 let mut flags = 0u8;
694 if state.is_first {
695 flags |= TNS_DPLS_ROW_HEADER_FIRST;
696 } else if state.is_split_with_prev {
697 flags |= TNS_DPLS_ROW_HEADER_SPLIT_WITH_PREV;
698 }
699 if state.is_last {
700 flags |= TNS_DPLS_ROW_HEADER_LAST;
701 } else if state.is_split_with_next {
702 flags |= TNS_DPLS_ROW_HEADER_SPLIT_WITH_NEXT;
703 }
704 let is_fast_row = state.is_first && state.is_last && state.is_fast;
705 if is_fast_row {
706 flags |= TNS_DPLS_ROW_HEADER_FAST_ROW | TNS_DPLS_ROW_HEADER_FAST_PIECE;
707 }
708 let num_segments = u8::try_from(state.num_segments)
709 .map_err(|_| ProtocolError::TtcDecode("direct path piece segment count overflow"))?;
710 let piece = DirectPathPiece {
711 flags,
712 num_segments,
713 data: std::mem::take(&mut self.data),
714 };
715 let new_length = self.total_piece_length + piece.data.len() as u64 + piece.header_length();
716 if new_length > TNS_DPLS_MAX_MESSAGE_SIZE {
717 return Err(ProtocolError::DirectPathLoadTooMuchData);
718 }
719 self.total_piece_length = new_length;
720 self.pieces.push(piece);
721 Ok(())
723 }
724}
725
726fn is_fast_dbtype(metadata: &ColumnMetadata) -> bool {
729 matches!(
730 metadata.ora_type_num,
731 ORA_TYPE_NUM_VARCHAR
732 | ORA_TYPE_NUM_NUMBER
733 | ORA_TYPE_NUM_BINARY_INTEGER
734 | ORA_TYPE_NUM_CHAR
735 | ORA_TYPE_NUM_DATE
736 | ORA_TYPE_NUM_RAW
737 | ORA_TYPE_NUM_BINARY_FLOAT
738 | ORA_TYPE_NUM_BINARY_DOUBLE
739 | ORA_TYPE_NUM_BOOLEAN
740 | ORA_TYPE_NUM_TIMESTAMP
741 | ORA_TYPE_NUM_TIMESTAMP_TZ
742 | ORA_TYPE_NUM_TIMESTAMP_LTZ
743 )
744}
745
746#[derive(Clone, Debug, Eq, PartialEq)]
748pub struct DirectPathStream {
749 pub(crate) pieces: Vec<DirectPathPiece>,
750 pub(crate) total_piece_length: u32,
751}
752
753pub fn encode_direct_path_rows(
759 column_metadata: &[ColumnMetadata],
760 rows: &[Vec<DirectPathColumnValue>],
761 first_row_num: u64,
762) -> Result<DirectPathStream> {
763 let mut buffer = DirectPathPieceBuffer::new();
764 for (row_index, row) in rows.iter().enumerate() {
765 if row.len() != column_metadata.len() {
766 return Err(ProtocolError::TtcDecode(
767 "direct path row width does not match column metadata",
768 ));
769 }
770 let row_num = first_row_num + row_index as u64;
771 buffer.start_row()?;
772 for (metadata, value) in column_metadata.iter().zip(row) {
773 buffer.add_column_value(metadata, value, row_num)?;
774 }
775 buffer.finish_row()?;
776 }
777 let (pieces, total_piece_length) = buffer.finish()?;
778 Ok(DirectPathStream {
779 pieces,
780 total_piece_length,
781 })
782}
783
784pub fn build_direct_path_load_stream_payload(
788 cursor_id: u16,
789 stream: &DirectPathStream,
790 seq_num: u8,
791) -> Result<Vec<u8>> {
792 let mut writer = TtcWriter::new();
793 writer.write_function_code_with_seq(TNS_FUNC_DIRECT_PATH_LOAD_STREAM, seq_num);
794 writer.write_ub8(0); writer.write_ub2(cursor_id);
796 writer.write_u8(1); writer.write_ub4(stream.total_piece_length);
798 writer.write_ub4(TNS_DP_STREAM_VERSION);
799 writer.write_u8(0); writer.write_ub4(0); writer.write_u8(1); writer.write_u8(1); for piece in &stream.pieces {
804 piece.write_to(&mut writer)?;
805 }
806 Ok(writer.into_bytes())
807}
808
809#[derive(Clone, Debug, Eq, PartialEq)]
818pub struct BatchLoadState {
819 chunk_lengths: Vec<u64>,
820 batch_size: u32,
821 chunk_index: usize,
822 offset: u64,
823 message_offset: u64,
824 num_rows: u32,
825}
826
827impl BatchLoadState {
828 pub fn new(chunk_lengths: Vec<u64>, batch_size: u32) -> Result<Self> {
829 if batch_size == 0 {
830 return Err(ProtocolError::TtcDecode(
831 "batch_size must be a positive integer",
832 ));
833 }
834 let mut state = Self {
835 chunk_lengths,
836 batch_size,
837 chunk_index: 0,
838 offset: 0,
839 message_offset: 0,
840 num_rows: 0,
841 };
842 state.advance_batch();
843 Ok(state)
844 }
845
846 pub fn for_rows(total_rows: u64, batch_size: u32) -> Result<Self> {
849 Self::new(vec![total_rows], batch_size)
850 }
851
852 pub fn num_rows(&self) -> u32 {
854 self.num_rows
855 }
856
857 pub fn offset(&self) -> u64 {
859 self.offset
860 }
861
862 pub fn message_offset(&self) -> u64 {
865 self.message_offset
866 }
867
868 pub fn chunk_index(&self) -> usize {
870 self.chunk_index
871 }
872
873 pub fn is_done(&self) -> bool {
874 self.num_rows == 0
875 }
876
877 pub fn next_batch(&mut self) {
879 self.offset += u64::from(self.num_rows);
880 self.advance_batch();
881 }
882
883 fn rows_in_current_chunk(&self) -> u64 {
884 self.chunk_lengths
885 .get(self.chunk_index)
886 .copied()
887 .unwrap_or(0)
888 }
889
890 fn calculate_num_rows_in_batch(&mut self) {
891 let remaining = self.rows_in_current_chunk().saturating_sub(self.offset);
892 self.num_rows = u32::try_from(remaining.min(u64::from(self.batch_size))).unwrap_or(0);
893 }
894
895 fn advance_batch(&mut self) {
896 self.message_offset = self.offset;
897 self.calculate_num_rows_in_batch();
898 if self.num_rows == 0 {
899 self.advance_chunk();
900 }
901 }
902
903 fn advance_chunk(&mut self) {
904 while self.chunk_index + 1 < self.chunk_lengths.len() {
905 self.offset = 0;
906 self.message_offset = 0;
907 self.chunk_index += 1;
908 self.calculate_num_rows_in_batch();
909 if self.num_rows > 0 {
910 break;
911 }
912 }
913 }
914}
915
916#[cfg(test)]
917mod tests {
918 use super::*;
919
920 #[test]
926 fn direct_path_oversized_column_count_fails_closed_not_oom() {
927 let payload = [TNS_MSG_TYPE_PARAMETER, 4, 0x25, 0x00, 0x00, 0x00];
929 let err = parse_direct_path_prepare_response(&payload, ClientCapabilities::default())
930 .expect_err("oversized direct-path column count must fail closed");
931 assert!(
932 matches!(
933 err,
934 ProtocolError::TtcDecode(_) | ProtocolError::ResourceLimit { .. }
935 ),
936 "got {err:?}"
937 );
938 }
939
940 #[test]
941 fn direct_path_prepare_respects_protocol_column_limit() {
942 let payload = [TNS_MSG_TYPE_PARAMETER, 1, 2];
943 let limits = ProtocolLimits {
944 max_columns: 1,
945 ..ProtocolLimits::DEFAULT
946 };
947 let err = parse_direct_path_prepare_response_with_limits(
948 &payload,
949 ClientCapabilities::default(),
950 limits,
951 )
952 .expect_err("column count above policy must fail");
953 assert!(
954 matches!(
955 err,
956 ProtocolError::ResourceLimit {
957 limit: "columns",
958 observed: 2,
959 maximum: 1,
960 }
961 ),
962 "got {err:?}"
963 );
964 }
965
966 fn column(name: &str, ora_type_num: u8, max_size: u32, nulls_allowed: bool) -> ColumnMetadata {
967 ColumnMetadata {
968 name: name.to_string(),
969 ora_type_num,
970 csfrm: if matches!(
971 ora_type_num,
972 ORA_TYPE_NUM_VARCHAR | ORA_TYPE_NUM_CHAR | ORA_TYPE_NUM_LONG
973 ) {
974 CS_FORM_IMPLICIT
975 } else {
976 0
977 },
978 precision: 0,
979 scale: 0,
980 buffer_size: max_size,
981 max_size,
982 nulls_allowed,
983 is_json: false,
984 is_oson: false,
985 object_schema: None,
986 object_type_name: None,
987 is_array: false,
988 vector_dimensions: None,
989 vector_format: 0,
990 vector_flags: 0,
991 ..Default::default()
992 }
993 }
994
995 #[test]
996 fn prepare_payload_matches_reference_layout() {
997 let payload = build_direct_path_prepare_payload(
998 "pythontest",
999 "dpl_golden",
1000 &["id".to_string(), "name".to_string()],
1001 10,
1002 )
1003 .expect("payload should build");
1004 assert_eq!(&payload[..4], &[3, 128, 10, 0]);
1006 let mut expected = vec![
1007 1, 1, 1, 1, 4, 1, 1, 15, 1, 1, 1, 1, 1, 1, ];
1014 expected.extend_from_slice(&[0, 1, 10]);
1016 expected.extend_from_slice(&[10]);
1017 expected.extend_from_slice(b"pythontest");
1018 expected.extend_from_slice(&[1, 3]);
1019 expected.extend_from_slice(&[0, 1, 10]);
1021 expected.extend_from_slice(&[10]);
1022 expected.extend_from_slice(b"dpl_golden");
1023 expected.extend_from_slice(&[1, 1]);
1024 expected.extend_from_slice(&[0, 1, 2, 2]);
1026 expected.extend_from_slice(b"id");
1027 expected.extend_from_slice(&[1, 4]);
1028 expected.extend_from_slice(&[0, 1, 4, 4]);
1029 expected.extend_from_slice(b"name");
1030 expected.extend_from_slice(&[1, 4]);
1031 expected.extend_from_slice(&[2, 0x01, 0x90, 2, 0x01, 0x90]);
1033 expected.extend_from_slice(&[0; 9]);
1034 expected.extend_from_slice(&[2, 0xff, 0xff, 0, 0, 1, 1]);
1035 assert_eq!(&payload[4..], expected.as_slice());
1036 }
1037
1038 #[test]
1039 fn op_payload_matches_reference_layout() {
1040 let payload = build_direct_path_op_payload(1, TNS_DP_OP_FINISH, 12);
1041 assert_eq!(
1042 payload,
1043 vec![3, 130, 12, 0, 1, 2, 1, 1, 0, 0, 1, 1],
1044 "fn code, seq, token, ub4 op, ub2 cursor, ptr 0, ub4 0, ptr 1, ptr 1"
1045 );
1046 }
1047
1048 #[test]
1049 fn single_fast_row_produces_one_fast_piece() {
1050 let columns = vec![
1051 column("ID", ORA_TYPE_NUM_NUMBER, 0, false),
1052 column("NAME", ORA_TYPE_NUM_VARCHAR, 100, false),
1053 ];
1054 let rows = vec![vec![
1055 DirectPathColumnValue::Number("1".into()),
1056 DirectPathColumnValue::Bytes(b"alpha".to_vec()),
1057 ]];
1058 let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1059 assert_eq!(stream.pieces.len(), 1);
1060 let piece = &stream.pieces[0];
1061 assert_eq!(
1062 piece.flags(),
1063 TNS_DPLS_ROW_HEADER_FIRST
1064 | TNS_DPLS_ROW_HEADER_LAST
1065 | TNS_DPLS_ROW_HEADER_FAST_ROW
1066 | TNS_DPLS_ROW_HEADER_FAST_PIECE
1067 );
1068 assert_eq!(piece.num_segments(), 2);
1069 assert_eq!(
1071 piece.data(),
1072 &[2, 0xc1, 0x02, 5, b'a', b'l', b'p', b'h', b'a']
1073 );
1074 assert_eq!(stream.total_piece_length, piece.data().len() as u32 + 4);
1076 }
1077
1078 #[test]
1079 fn long_column_clears_fast_flag() {
1080 let columns = vec![column("WIDE", ORA_TYPE_NUM_LONG, 0, false)];
1081 let rows = vec![vec![DirectPathColumnValue::Bytes(vec![b'x'; 10])]];
1082 let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1083 assert_eq!(stream.pieces.len(), 1);
1084 assert_eq!(
1085 stream.pieces[0].flags(),
1086 TNS_DPLS_ROW_HEADER_FIRST | TNS_DPLS_ROW_HEADER_LAST
1087 );
1088 assert_eq!(stream.total_piece_length, 11 + 2);
1090 }
1091
1092 #[test]
1093 fn null_values_encode_as_null_indicator() {
1094 let columns = vec![column("SALARY", ORA_TYPE_NUM_NUMBER, 0, true)];
1095 let rows = vec![vec![DirectPathColumnValue::Null]];
1096 let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1097 assert_eq!(stream.pieces[0].data(), &[0xff]);
1098 assert_eq!(stream.pieces[0].num_segments(), 1);
1099 }
1100
1101 #[test]
1102 fn null_into_not_null_column_raises_dpy_8001() {
1103 let columns = vec![column("NAME", ORA_TYPE_NUM_VARCHAR, 100, false)];
1104 let rows = vec![vec![DirectPathColumnValue::Null]];
1105 let err = encode_direct_path_rows(&columns, &rows, 1).expect_err("nulls must be rejected");
1106 assert!(
1107 err.to_string().starts_with("DPY-8001:"),
1108 "unexpected error: {err}"
1109 );
1110 assert!(err.to_string().contains("\"NAME\""), "{err}");
1111 assert!(err.to_string().contains("row 1"), "{err}");
1112 }
1113
1114 #[test]
1115 fn oversized_value_raises_dpy_8000() {
1116 let columns = vec![column("NAME", ORA_TYPE_NUM_VARCHAR, 4, false)];
1117 let rows = vec![vec![DirectPathColumnValue::Bytes(b"toolong".to_vec())]];
1118 let err = encode_direct_path_rows(&columns, &rows, 3).expect_err("size must be enforced");
1119 assert!(
1120 err.to_string().starts_with("DPY-8000:"),
1121 "unexpected error: {err}"
1122 );
1123 assert!(err.to_string().contains("row 3"), "{err}");
1124 }
1125
1126 #[test]
1127 fn long_values_use_fe_chunked_segments() {
1128 let columns = vec![column("WIDE", ORA_TYPE_NUM_VARCHAR, 1000, false)];
1130 let value = vec![b'q'; 600];
1131 let rows = vec![vec![DirectPathColumnValue::Bytes(value.clone())]];
1132 let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1133 assert_eq!(stream.pieces.len(), 1);
1134 let piece = &stream.pieces[0];
1135 assert_eq!(piece.num_segments(), 1);
1136 let mut expected = vec![0xfe, 0x02, 0x58];
1137 expected.extend_from_slice(&value);
1138 assert_eq!(piece.data(), expected.as_slice());
1139 }
1140
1141 #[test]
1142 fn values_larger_than_piece_split_across_pieces_with_split_flags() {
1143 let columns = vec![column("WIDE", ORA_TYPE_NUM_LONG, 0, false)];
1144 let total = TNS_DPLS_MAX_PIECE_SIZE + 100;
1145 let rows = vec![vec![DirectPathColumnValue::Bytes(vec![b'z'; total])]];
1146 let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1147 assert_eq!(stream.pieces.len(), 2);
1148 let first = &stream.pieces[0];
1149 let second = &stream.pieces[1];
1150 assert_eq!(
1151 first.flags(),
1152 TNS_DPLS_ROW_HEADER_FIRST | TNS_DPLS_ROW_HEADER_SPLIT_WITH_NEXT
1153 );
1154 assert_eq!(
1155 second.flags(),
1156 TNS_DPLS_ROW_HEADER_SPLIT_WITH_PREV | TNS_DPLS_ROW_HEADER_LAST
1157 );
1158 assert_eq!(first.data().len(), TNS_DPLS_MAX_PIECE_SIZE);
1160 assert_eq!(first.data()[0], 0xfe);
1161 let first_chunk = usize::from(u16::from_be_bytes([first.data()[1], first.data()[2]]));
1162 assert_eq!(first_chunk, TNS_DPLS_MAX_PIECE_SIZE - 3);
1163 let second_chunk = usize::from(u16::from_be_bytes([second.data()[1], second.data()[2]]));
1164 assert_eq!(first_chunk + second_chunk, total);
1165 assert_eq!(
1166 stream.total_piece_length as usize,
1167 first.data().len() + second.data().len() + 2 + 2
1168 );
1169 }
1170
1171 #[test]
1172 fn segment_count_caps_at_255_per_piece() {
1173 let columns: Vec<ColumnMetadata> = (0..300)
1174 .map(|i| column(&format!("C{i}"), ORA_TYPE_NUM_NUMBER, 0, true))
1175 .collect();
1176 let row: Vec<DirectPathColumnValue> =
1177 (0..300).map(|_| DirectPathColumnValue::Null).collect();
1178 let stream = encode_direct_path_rows(&columns, &[row], 1).expect("stream should encode");
1179 assert_eq!(stream.pieces.len(), 2);
1180 assert_eq!(stream.pieces[0].num_segments(), 255);
1181 assert_eq!(stream.pieces[1].num_segments(), 45);
1182 assert_eq!(stream.pieces[0].flags(), TNS_DPLS_ROW_HEADER_FIRST);
1185 assert_eq!(stream.pieces[1].flags(), TNS_DPLS_ROW_HEADER_LAST);
1186 }
1187
1188 #[test]
1189 fn timestamp_with_zero_fraction_collapses_to_seven_bytes() {
1190 let columns = vec![column("TS", ORA_TYPE_NUM_TIMESTAMP, 0, true)];
1191 let rows = vec![vec![DirectPathColumnValue::DateTime {
1192 year: 2024,
1193 month: 1,
1194 day: 2,
1195 hour: 3,
1196 minute: 4,
1197 second: 5,
1198 nanosecond: 0,
1199 }]];
1200 let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1201 assert_eq!(
1202 stream.pieces[0].data(),
1203 &[7, 120, 124, 1, 2, 4, 5, 6],
1204 "7-byte date form expected when fractional seconds are zero"
1205 );
1206 }
1207
1208 #[test]
1209 fn boolean_values_encode_per_reference() {
1210 let columns = vec![column("FLAG", ORA_TYPE_NUM_BOOLEAN, 0, true)];
1211 let rows = vec![
1212 vec![DirectPathColumnValue::Boolean(true)],
1213 vec![DirectPathColumnValue::Boolean(false)],
1214 ];
1215 let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1216 assert_eq!(stream.pieces[0].data(), &[2, 1, 1]);
1217 assert_eq!(stream.pieces[1].data(), &[1, 0]);
1218 }
1219
1220 #[test]
1221 fn row_width_mismatch_is_rejected() {
1222 let columns = vec![
1223 column("A", ORA_TYPE_NUM_NUMBER, 0, true),
1224 column("B", ORA_TYPE_NUM_NUMBER, 0, true),
1225 ];
1226 let rows = vec![vec![DirectPathColumnValue::Null]];
1227 assert!(encode_direct_path_rows(&columns, &rows, 1).is_err());
1228 }
1229
1230 #[test]
1231 fn metadata_overrides_inline_lobs() {
1232 let mut clob = column("DOC", ORA_TYPE_NUM_CLOB, 0, true);
1233 clob.csfrm = CS_FORM_IMPLICIT;
1234 apply_direct_path_metadata_overrides(&mut clob, 873);
1235 assert_eq!(clob.ora_type_num, ORA_TYPE_NUM_LONG);
1236 assert_eq!(clob.csfrm, CS_FORM_NCHAR, "multi-byte charset uses NCHAR");
1237
1238 let mut clob = column("DOC", ORA_TYPE_NUM_CLOB, 0, true);
1239 clob.csfrm = CS_FORM_IMPLICIT;
1240 apply_direct_path_metadata_overrides(&mut clob, 178);
1241 assert_eq!(
1242 clob.csfrm, CS_FORM_IMPLICIT,
1243 "single-byte charset keeps form"
1244 );
1245
1246 let mut blob = column("BIN", ORA_TYPE_NUM_BLOB, 0, true);
1247 apply_direct_path_metadata_overrides(&mut blob, 873);
1248 assert_eq!(blob.ora_type_num, ORA_TYPE_NUM_LONG_RAW);
1249 assert_eq!(blob.csfrm, 0);
1250 }
1251
1252 #[test]
1253 fn batch_state_single_chunk_splits_by_batch_size() {
1254 let mut state = BatchLoadState::for_rows(5, 2).expect("state should build");
1255 assert_eq!(
1256 (state.num_rows(), state.offset(), state.message_offset()),
1257 (2, 0, 0)
1258 );
1259 state.next_batch();
1260 assert_eq!(
1261 (state.num_rows(), state.offset(), state.message_offset()),
1262 (2, 2, 2)
1263 );
1264 state.next_batch();
1265 assert_eq!(
1266 (state.num_rows(), state.offset(), state.message_offset()),
1267 (1, 4, 4)
1268 );
1269 state.next_batch();
1270 assert!(state.is_done());
1271 }
1272
1273 #[test]
1274 fn batch_state_never_spans_chunks() {
1275 let mut state = BatchLoadState::new(vec![3, 2], 2).expect("state should build");
1277 assert_eq!(
1278 (
1279 state.chunk_index(),
1280 state.num_rows(),
1281 state.message_offset()
1282 ),
1283 (0, 2, 0)
1284 );
1285 state.next_batch();
1286 assert_eq!(
1287 (
1288 state.chunk_index(),
1289 state.num_rows(),
1290 state.message_offset()
1291 ),
1292 (0, 1, 2)
1293 );
1294 state.next_batch();
1295 assert_eq!(
1296 (
1297 state.chunk_index(),
1298 state.num_rows(),
1299 state.message_offset()
1300 ),
1301 (1, 2, 0)
1302 );
1303 state.next_batch();
1304 assert!(state.is_done());
1305 }
1306
1307 #[test]
1308 fn batch_state_skips_empty_chunks() {
1309 let mut state = BatchLoadState::new(vec![0, 0, 3], 10).expect("state should build");
1310 assert_eq!((state.chunk_index(), state.num_rows()), (2, 3));
1311 state.next_batch();
1312 assert!(state.is_done());
1313 }
1314
1315 #[test]
1316 fn batch_state_rejects_zero_batch_size() {
1317 assert!(BatchLoadState::for_rows(5, 0).is_err());
1318 }
1319
1320 #[test]
1321 fn batch_state_empty_source_is_done_immediately() {
1322 let state = BatchLoadState::for_rows(0, 10).expect("state should build");
1323 assert!(state.is_done());
1324 }
1325
1326 #[test]
1327 fn load_stream_payload_header_matches_reference_layout() {
1328 let columns = vec![column("ID", ORA_TYPE_NUM_NUMBER, 0, false)];
1329 let rows = vec![vec![DirectPathColumnValue::Number("1".into())]];
1330 let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1331 let payload =
1332 build_direct_path_load_stream_payload(1, &stream, 11).expect("payload should build");
1333 let mut expected = vec![
1334 3, 129, 11, 0, 1, 1, 1, 1, 7, 2, 0x01, 0x90, 0, 0, 1, 1, 0x3c, 0, 7, 1, 2, 0xc1, 0x02, ];
1346 assert_eq!(payload, std::mem::take(&mut expected));
1347 }
1348}