Skip to main content

oracledb_protocol/
dpl.rs

1//! Direct Path Load (DPL) protocol support.
2//!
3//! Implements the three TTC functions used by python-oracledb's
4//! `Connection.direct_path_load()`:
5//!
6//! * function 128 — direct path prepare (send table/column names, receive
7//!   server-side column metadata and a direct path cursor id),
8//! * function 129 — direct path load stream (column-array piece stream),
9//! * function 130 — direct path op (FINISH commits, ABORT discards).
10//!
11//! The builders/parsers mirror `impl/thin/messages/direct_path_*.pyx` of the
12//! python-oracledb v4.0.1 reference and are validated against golden wire
13//! captures in `tests/golden/`. The batch state machine mirrors
14//! `impl/base/batch_load_manager.pyx`.
15
16use crate::thin::{
17    encode_binary_double, encode_binary_float, encode_number_text, encode_oracle_date,
18    encode_oracle_timestamp, encode_oracle_timestamp_tz, parse_column_metadata,
19    parse_server_error_info, skip_server_side_piggyback, ClientCapabilities, ColumnMetadata,
20    CS_FORM_IMPLICIT, CS_FORM_NCHAR, ORA_TYPE_NUM_BINARY_DOUBLE, ORA_TYPE_NUM_BINARY_FLOAT,
21    ORA_TYPE_NUM_BINARY_INTEGER, ORA_TYPE_NUM_BLOB, ORA_TYPE_NUM_BOOLEAN, ORA_TYPE_NUM_CHAR,
22    ORA_TYPE_NUM_CLOB, ORA_TYPE_NUM_DATE, ORA_TYPE_NUM_LONG, ORA_TYPE_NUM_LONG_RAW,
23    ORA_TYPE_NUM_NUMBER, ORA_TYPE_NUM_RAW, ORA_TYPE_NUM_TIMESTAMP, ORA_TYPE_NUM_TIMESTAMP_LTZ,
24    ORA_TYPE_NUM_TIMESTAMP_TZ, ORA_TYPE_NUM_VARCHAR, TNS_MSG_TYPE_END_OF_RESPONSE,
25    TNS_MSG_TYPE_ERROR, TNS_MSG_TYPE_PARAMETER, TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK,
26    TNS_MSG_TYPE_STATUS,
27};
28use crate::wire::{BoundedReader, ProtocolLimits, TtcReader, TtcWriter};
29use crate::{ProtocolError, Result};
30
31pub const TNS_FUNC_DIRECT_PATH_PREPARE: u8 = 128;
32pub const TNS_FUNC_DIRECT_PATH_LOAD_STREAM: u8 = 129;
33pub const TNS_FUNC_DIRECT_PATH_OP: u8 = 130;
34
35pub const TNS_DP_INTERFACE_VERSION: u32 = 400;
36pub const TNS_DP_STREAM_VERSION: u32 = 400;
37
38pub const TNS_DPP_OP_CODE_LOAD: u32 = 1;
39
40pub const TNS_DP_OP_ABORT: u32 = 1;
41pub const TNS_DP_OP_FINISH: u32 = 2;
42
43const TNS_DPP_IN_INDEX_INTERFACE_VERSION: usize = 0;
44const TNS_DPP_IN_INDEX_STREAM_VERSION: usize = 1;
45const TNS_DPP_IN_INDEX_LOCK_WAIT: usize = 14;
46const TNS_DPP_KW_INDEX_OBJECT_NAME: u16 = 1;
47const TNS_DPP_KW_INDEX_SCHEMA_NAME: u16 = 3;
48const TNS_DPP_KW_INDEX_COLUMN_NAME: u16 = 4;
49const TNS_DPP_KW_INDEX_NFOBJ_OID_POS: usize = 11;
50const TNS_DPP_OUT_INDEX_CURSOR: usize = 3;
51// The reference sizes the input array at TNS_DPP_IN_MAX_PARAMS (36) but only
52// transmits the first 15 entries: `_initialize_hook` seeds indices 16/17 with
53// 0xffff *without* updating `in_values_length`, so they are never sent.
54const TNS_DPP_IN_VALUES_SENT: usize = TNS_DPP_IN_INDEX_LOCK_WAIT + 1;
55
56pub const TNS_DPLS_ROW_HEADER_FAST_PIECE: u8 = 0x10;
57pub const TNS_DPLS_ROW_HEADER_FAST_ROW: u8 = 0x20;
58pub const TNS_DPLS_ROW_HEADER_FIRST: u8 = 0x08;
59pub const TNS_DPLS_ROW_HEADER_LAST: u8 = 0x04;
60pub const TNS_DPLS_ROW_HEADER_SPLIT_WITH_PREV: u8 = 0x02;
61pub const TNS_DPLS_ROW_HEADER_SPLIT_WITH_NEXT: u8 = 0x01;
62
63pub const TNS_DPLS_MAX_MESSAGE_SIZE: u64 = 1_073_728_895;
64pub const TNS_DPLS_MAX_SHORT_LENGTH: usize = 0xfa;
65pub const TNS_DPLS_MAX_PIECE_SIZE: usize = 0xfff0;
66
67const TNS_DPLS_LONG_LENGTH_INDICATOR: u8 = 0xfe;
68const TNS_NULL_LENGTH_INDICATOR: u8 = 0xff;
69
70/// Builds the payload for TTC function 128 (direct path prepare).
71///
72/// Mirrors `DirectPathPrepareMessage._write_message`.
73pub fn build_direct_path_prepare_payload(
74    schema_name: &str,
75    table_name: &str,
76    column_names: &[String],
77    seq_num: u8,
78) -> Result<Vec<u8>> {
79    let keyword_parameters_length =
80        u32::try_from(column_names.len() + 2).map_err(|_| ProtocolError::InvalidPacketLength {
81            length: column_names.len(),
82            minimum: 0,
83        })?;
84
85    let mut in_values = [0u32; TNS_DPP_IN_VALUES_SENT];
86    in_values[TNS_DPP_IN_INDEX_INTERFACE_VERSION] = TNS_DP_INTERFACE_VERSION;
87    in_values[TNS_DPP_IN_INDEX_STREAM_VERSION] = TNS_DP_STREAM_VERSION;
88    in_values[TNS_DPP_KW_INDEX_NFOBJ_OID_POS] = 0xffff;
89    in_values[TNS_DPP_IN_INDEX_LOCK_WAIT] = 1;
90
91    let mut writer = TtcWriter::new();
92    writer.write_function_code_with_seq(TNS_FUNC_DIRECT_PATH_PREPARE, seq_num);
93    writer.write_ub8(0); // token number
94    writer.write_ub4(TNS_DPP_OP_CODE_LOAD);
95    writer.write_u8(1); // keyword parameters (pointer)
96    writer.write_ub4(keyword_parameters_length);
97    writer.write_u8(1); // input array (pointer)
98    writer.write_ub2(TNS_DPP_IN_VALUES_SENT as u16);
99    writer.write_u8(1); // metadata (pointer)
100    writer.write_u8(1); // metadata length (pointer)
101    writer.write_u8(1); // parameters (pointer)
102    writer.write_u8(1); // parameters length (pointer)
103    writer.write_u8(1); // output array (pointer)
104    writer.write_u8(1); // output array length (pointer)
105    write_keyword_param(&mut writer, TNS_DPP_KW_INDEX_SCHEMA_NAME, schema_name)?;
106    write_keyword_param(&mut writer, TNS_DPP_KW_INDEX_OBJECT_NAME, table_name)?;
107    for name in column_names {
108        write_keyword_param(&mut writer, TNS_DPP_KW_INDEX_COLUMN_NAME, name)?;
109    }
110    for value in in_values {
111        writer.write_ub4(value);
112    }
113    Ok(writer.into_bytes())
114}
115
116fn write_keyword_param(writer: &mut TtcWriter, index: u16, value: &str) -> Result<()> {
117    let bytes = value.as_bytes();
118    let len = u16::try_from(bytes.len()).map_err(|_| ProtocolError::InvalidPacketLength {
119        length: bytes.len(),
120        minimum: 0,
121    })?;
122    writer.write_ub2(0); // text length
123    writer.write_ub2(len);
124    writer.write_bytes_with_length(bytes)?;
125    writer.write_ub2(index);
126    Ok(())
127}
128
129#[derive(Clone, Debug, Default, Eq, PartialEq)]
130pub struct DirectPathPrepareResult {
131    pub column_metadata: Vec<ColumnMetadata>,
132    pub cursor_id: u16,
133    pub out_values: Vec<u32>,
134}
135
136/// Parses the response to TTC function 128 (direct path prepare).
137///
138/// `capabilities.charset_id` drives the CLOB metadata override (charset ids
139/// of 800 and above are multi-byte, in which case implicit-charset CLOBs
140/// switch to the NCHAR form). Mirrors the reference's
141/// `DirectPathPrepareMessage._process_metadata`/`_process_return_parameters`.
142pub fn parse_direct_path_prepare_response(
143    payload: &[u8],
144    capabilities: ClientCapabilities,
145) -> Result<DirectPathPrepareResult> {
146    parse_direct_path_prepare_response_with_limits(payload, capabilities, ProtocolLimits::DEFAULT)
147}
148
149pub fn parse_direct_path_prepare_response_with_limits(
150    payload: &[u8],
151    capabilities: ClientCapabilities,
152    limits: ProtocolLimits,
153) -> Result<DirectPathPrepareResult> {
154    let mut reader = TtcReader::with_limits(payload, limits)?;
155    let mut result: Option<DirectPathPrepareResult> = None;
156    while reader.remaining() > 0 {
157        let message_type = reader.read_u8()?;
158        match message_type {
159            0 => {}
160            TNS_MSG_TYPE_PARAMETER => {
161                result = Some(parse_prepare_return_parameters(&mut reader, capabilities)?);
162            }
163            TNS_MSG_TYPE_STATUS => {
164                let _call_status = reader.read_ub4()?;
165                let _seq = reader.read_ub2()?;
166            }
167            TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
168                let _ = skip_server_side_piggyback(&mut reader)?;
169            }
170            TNS_MSG_TYPE_END_OF_RESPONSE => break,
171            TNS_MSG_TYPE_ERROR => {
172                let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
173                if info.number != 0 {
174                    return Err(ProtocolError::ServerError(info.message));
175                }
176            }
177            _ => {
178                return Err(ProtocolError::UnknownMessageType {
179                    message_type,
180                    position: reader.position().saturating_sub(1),
181                })
182            }
183        }
184    }
185    result.ok_or(ProtocolError::TtcDecode(
186        "direct path prepare response did not contain return parameters",
187    ))
188}
189
190fn parse_prepare_return_parameters(
191    reader: &mut TtcReader<'_>,
192    capabilities: ClientCapabilities,
193) -> Result<DirectPathPrepareResult> {
194    let num_columns = reader.read_ub4()?;
195    reader.limits().check_columns(num_columns as usize)?;
196    // Each column reads a multi-field metadata record (>=1 byte), so bound the
197    // reservation by the buffer (BoundedReader) instead of an arbitrary cap;
198    // parse_column_metadata still fails closed on truncation.
199    let mut column_metadata: Vec<ColumnMetadata> =
200        reader.with_capacity_limited(num_columns as usize, 1, ProtocolLimits::check_columns)?;
201    for _ in 0..num_columns {
202        let mut metadata = parse_column_metadata(reader, capabilities)?;
203        apply_direct_path_metadata_overrides(&mut metadata, capabilities.charset_id);
204        column_metadata.push(metadata);
205    }
206    let num_params = reader.read_ub2()?;
207    if num_params != 0 {
208        return Err(ProtocolError::TtcDecode(
209            "unexpected parameters in direct path prepare response",
210        ));
211    }
212    let out_values_length = reader.read_ub2()?;
213    reader
214        .limits()
215        .check_length_prefixed_elements(usize::from(out_values_length))?;
216    // Each out value is a ub4 (>=1 byte on the wire); bound by the buffer.
217    let mut out_values: Vec<u32> = reader.with_capacity_limited(
218        usize::from(out_values_length),
219        1,
220        ProtocolLimits::check_length_prefixed_elements,
221    )?;
222    for _ in 0..out_values_length {
223        out_values.push(reader.read_ub4()?);
224    }
225    let cursor_id =
226        out_values
227            .get(TNS_DPP_OUT_INDEX_CURSOR)
228            .copied()
229            .ok_or(ProtocolError::TtcDecode(
230                "direct path prepare response missing cursor id",
231            ))?;
232    let cursor_id = u16::try_from(cursor_id)
233        .map_err(|_| ProtocolError::TtcDecode("direct path cursor id out of range"))?;
234    Ok(DirectPathPrepareResult {
235        column_metadata,
236        cursor_id,
237        out_values,
238    })
239}
240
241/// CLOB/NCLOB and BLOB columns are always streamed as LONG/LONG RAW during a
242/// direct path load. Implicit-charset CLOBs switch to the NCHAR form when the
243/// database charset is multi-byte (charset ids >= 800).
244fn apply_direct_path_metadata_overrides(metadata: &mut ColumnMetadata, charset_id: u16) {
245    if metadata.ora_type_num == ORA_TYPE_NUM_CLOB {
246        if metadata.csfrm == CS_FORM_IMPLICIT && charset_id >= 800 {
247            metadata.csfrm = CS_FORM_NCHAR;
248        }
249        metadata.ora_type_num = ORA_TYPE_NUM_LONG;
250    } else if metadata.ora_type_num == ORA_TYPE_NUM_BLOB {
251        metadata.ora_type_num = ORA_TYPE_NUM_LONG_RAW;
252        metadata.csfrm = 0;
253    }
254}
255
256/// Builds the payload for TTC function 130 (direct path op).
257///
258/// Mirrors `DirectPathOpMessage._write_message`. `op_code` is
259/// [`TNS_DP_OP_FINISH`] (commits the load) or [`TNS_DP_OP_ABORT`].
260pub fn build_direct_path_op_payload(cursor_id: u16, op_code: u32, seq_num: u8) -> Vec<u8> {
261    let mut writer = TtcWriter::new();
262    writer.write_function_code_with_seq(TNS_FUNC_DIRECT_PATH_OP, seq_num);
263    writer.write_ub8(0); // token number
264    writer.write_ub4(op_code);
265    writer.write_ub2(cursor_id);
266    writer.write_u8(0); // pointer (input values)
267    writer.write_ub4(0); // number of input values
268    writer.write_u8(1); // pointer (output values)
269    writer.write_u8(1); // pointer (output values length)
270    writer.into_bytes()
271}
272
273/// Parses the response to TTC functions 129 and 130 (both return the same
274/// shape: a ub2 count of out values that are each skipped).
275pub fn parse_direct_path_simple_response(
276    payload: &[u8],
277    capabilities: ClientCapabilities,
278) -> Result<()> {
279    parse_direct_path_simple_response_with_limits(payload, capabilities, ProtocolLimits::DEFAULT)
280}
281
282pub fn parse_direct_path_simple_response_with_limits(
283    payload: &[u8],
284    capabilities: ClientCapabilities,
285    limits: ProtocolLimits,
286) -> Result<()> {
287    let mut reader = TtcReader::with_limits(payload, limits)?;
288    while reader.remaining() > 0 {
289        let message_type = reader.read_u8()?;
290        match message_type {
291            0 => {}
292            TNS_MSG_TYPE_PARAMETER => {
293                let num_out_values = reader.read_ub2()?;
294                for _ in 0..num_out_values {
295                    let _value = reader.read_ub4()?;
296                }
297            }
298            TNS_MSG_TYPE_STATUS => {
299                let _call_status = reader.read_ub4()?;
300                let _seq = reader.read_ub2()?;
301            }
302            TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
303                let _ = skip_server_side_piggyback(&mut reader)?;
304            }
305            TNS_MSG_TYPE_END_OF_RESPONSE => break,
306            TNS_MSG_TYPE_ERROR => {
307                let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
308                if info.number != 0 {
309                    return Err(ProtocolError::ServerError(info.message));
310                }
311            }
312            _ => {
313                return Err(ProtocolError::UnknownMessageType {
314                    message_type,
315                    position: reader.position().saturating_sub(1),
316                })
317            }
318        }
319    }
320    Ok(())
321}
322
323pub use parse_direct_path_simple_response as parse_direct_path_load_stream_response;
324pub use parse_direct_path_simple_response as parse_direct_path_op_response;
325pub use parse_direct_path_simple_response_with_limits as parse_direct_path_load_stream_response_with_limits;
326pub use parse_direct_path_simple_response_with_limits as parse_direct_path_op_response_with_limits;
327
328/// One column value of a direct path load row, already converted to the
329/// Oracle-facing intermediate form (mirrors the reference's `OracleData`).
330///
331/// `Bytes` carries the on-the-wire byte payload for VARCHAR/CHAR/LONG (text
332/// already encoded per the column's charset form) and RAW/LONG RAW columns.
333#[derive(Clone, Debug, PartialEq)]
334pub enum DirectPathColumnValue {
335    Null,
336    Bytes(Vec<u8>),
337    Number(String),
338    BinaryDouble(f64),
339    BinaryFloat(f32),
340    DateTime {
341        year: i32,
342        month: u8,
343        day: u8,
344        hour: u8,
345        minute: u8,
346        second: u8,
347        nanosecond: u32,
348    },
349    Boolean(bool),
350}
351
352/// A finalized direct path piece, ready to be written to a load stream
353/// message.
354#[derive(Clone, Debug, Eq, PartialEq)]
355pub struct DirectPathPiece {
356    flags: u8,
357    num_segments: u8,
358    data: Vec<u8>,
359}
360
361impl DirectPathPiece {
362    pub fn flags(&self) -> u8 {
363        self.flags
364    }
365
366    pub fn num_segments(&self) -> u8 {
367        self.num_segments
368    }
369
370    pub fn data(&self) -> &[u8] {
371        &self.data
372    }
373
374    fn is_fast_row(&self) -> bool {
375        self.flags & TNS_DPLS_ROW_HEADER_FAST_ROW != 0
376    }
377
378    fn header_length(&self) -> u64 {
379        if self.is_fast_row() {
380            4
381        } else {
382            2
383        }
384    }
385
386    fn write_to(&self, writer: &mut TtcWriter) -> Result<()> {
387        writer.write_u8(self.flags);
388        if self.is_fast_row() {
389            let total = self.data.len() as u64 + self.header_length();
390            let total = u16::try_from(total).map_err(|_| {
391                ProtocolError::TtcDecode("direct path fast piece exceeds 16-bit length")
392            })?;
393            writer.write_u16be(total);
394        }
395        writer.write_u8(self.num_segments);
396        writer.write_raw(&self.data);
397        Ok(())
398    }
399}
400
401#[derive(Clone, Copy, Debug, Default)]
402struct PieceState {
403    is_first: bool,
404    is_last: bool,
405    is_split_with_prev: bool,
406    is_split_with_next: bool,
407    is_fast: bool,
408    num_segments: u16,
409}
410
411/// Streaming encoder for the direct path column-array piece format.
412///
413/// Port of the reference `PieceBuffer` (direct_path_load_stream.pyx). Usage:
414/// `start_row()` / `add_column_value(..)` per column / `finish_row()`, then
415/// [`DirectPathPieceBuffer::finish`].
416#[derive(Debug, Default)]
417pub(crate) struct DirectPathPieceBuffer {
418    pieces: Vec<DirectPathPiece>,
419    total_piece_length: u64,
420    data: Vec<u8>,
421    current: Option<PieceState>,
422}
423
424impl DirectPathPieceBuffer {
425    pub fn new() -> Self {
426        Self::default()
427    }
428
429    pub fn start_row(&mut self) -> Result<()> {
430        if self.current.is_some() {
431            return Err(ProtocolError::TtcDecode(
432                "direct path row started before previous row was finished",
433            ));
434        }
435        self.current = Some(PieceState {
436            is_first: true,
437            is_fast: true,
438            ..PieceState::default()
439        });
440        Ok(())
441    }
442
443    pub fn finish_row(&mut self) -> Result<()> {
444        let Some(state) = self.current.as_mut() else {
445            return Err(ProtocolError::TtcDecode(
446                "direct path row finished without being started",
447            ));
448        };
449        state.is_last = true;
450        self.finalize_piece()?;
451        self.current = None;
452        Ok(())
453    }
454
455    pub fn add_column_value(
456        &mut self,
457        metadata: &ColumnMetadata,
458        value: &DirectPathColumnValue,
459        row_num: u64,
460    ) -> Result<()> {
461        let Some(state) = self.current.as_mut() else {
462            return Err(ProtocolError::TtcDecode(
463                "direct path column value added outside of a row",
464            ));
465        };
466
467        // at most 255 segments per piece
468        if state.num_segments == 255 {
469            self.finalize_piece()?;
470            self.current = Some(PieceState::default());
471        }
472
473        if !is_fast_dbtype(metadata) {
474            if let Some(state) = self.current.as_mut() {
475                state.is_fast = false;
476            }
477        }
478
479        match value {
480            DirectPathColumnValue::Null => {
481                if !metadata.nulls_allowed {
482                    return Err(ProtocolError::NullsNotAllowed {
483                        column_name: metadata.name.clone(),
484                        row_num,
485                    });
486                }
487                self.write_u8_in_piece(TNS_NULL_LENGTH_INDICATOR)?;
488                self.bump_segments();
489                Ok(())
490            }
491            DirectPathColumnValue::Bytes(bytes) => {
492                if !matches!(
493                    metadata.ora_type_num,
494                    ORA_TYPE_NUM_VARCHAR
495                        | ORA_TYPE_NUM_CHAR
496                        | ORA_TYPE_NUM_LONG
497                        | ORA_TYPE_NUM_RAW
498                        | ORA_TYPE_NUM_LONG_RAW
499                ) {
500                    return Err(ProtocolError::TtcDecode(
501                        "direct path byte value sent for non-character column",
502                    ));
503                }
504                if metadata.max_size > 0 && bytes.len() as u64 > u64::from(metadata.max_size) {
505                    return Err(ProtocolError::ValueTooLarge {
506                        actual_size: bytes.len(),
507                        max_size: metadata.max_size,
508                        column_name: metadata.name.clone(),
509                        row_num,
510                    });
511                }
512                self.write_raw_bytes_and_length(bytes)
513            }
514            DirectPathColumnValue::Number(text) => {
515                if !matches!(
516                    metadata.ora_type_num,
517                    ORA_TYPE_NUM_NUMBER | ORA_TYPE_NUM_BINARY_INTEGER
518                ) {
519                    return Err(ProtocolError::TtcDecode(
520                        "direct path number value sent for non-number column",
521                    ));
522                }
523                let encoded = encode_number_text(text)?;
524                self.write_raw_bytes_and_length(&encoded)
525            }
526            DirectPathColumnValue::BinaryDouble(value) => {
527                if metadata.ora_type_num != ORA_TYPE_NUM_BINARY_DOUBLE {
528                    return Err(ProtocolError::TtcDecode(
529                        "direct path binary double sent for other column type",
530                    ));
531                }
532                let encoded = encode_binary_double(*value);
533                self.write_raw_bytes_and_length(&encoded)
534            }
535            DirectPathColumnValue::BinaryFloat(value) => {
536                if metadata.ora_type_num != ORA_TYPE_NUM_BINARY_FLOAT {
537                    return Err(ProtocolError::TtcDecode(
538                        "direct path binary float sent for other column type",
539                    ));
540                }
541                let encoded = encode_binary_float(*value);
542                self.write_raw_bytes_and_length(&encoded)
543            }
544            DirectPathColumnValue::DateTime {
545                year,
546                month,
547                day,
548                hour,
549                minute,
550                second,
551                nanosecond,
552            } => {
553                let encoded = match metadata.ora_type_num {
554                    ORA_TYPE_NUM_DATE => {
555                        if *nanosecond != 0 {
556                            return Err(ProtocolError::TtcDecode(
557                                "direct path DATE value has fractional seconds",
558                            ));
559                        }
560                        encode_oracle_date(*year, *month, *day, *hour, *minute, *second)?.to_vec()
561                    }
562                    // the protocol requires a timestamp with zero fractional
563                    // seconds to be transmitted as a 7-byte date
564                    ORA_TYPE_NUM_TIMESTAMP | ORA_TYPE_NUM_TIMESTAMP_LTZ => encode_oracle_timestamp(
565                        *year,
566                        *month,
567                        *day,
568                        *hour,
569                        *minute,
570                        *second,
571                        *nanosecond,
572                    )?,
573                    ORA_TYPE_NUM_TIMESTAMP_TZ => encode_oracle_timestamp_tz(
574                        *year,
575                        *month,
576                        *day,
577                        *hour,
578                        *minute,
579                        *second,
580                        *nanosecond,
581                    )?,
582                    _ => {
583                        return Err(ProtocolError::TtcDecode(
584                            "direct path datetime sent for non-datetime column",
585                        ))
586                    }
587                };
588                self.write_raw_bytes_and_length(&encoded)
589            }
590            DirectPathColumnValue::Boolean(value) => {
591                if metadata.ora_type_num != ORA_TYPE_NUM_BOOLEAN {
592                    return Err(ProtocolError::TtcDecode(
593                        "direct path boolean sent for non-boolean column",
594                    ));
595                }
596                let encoded: &[u8] = if *value { &[1, 1] } else { &[0] };
597                self.write_raw_bytes_and_length(encoded)
598            }
599        }
600    }
601
602    /// Finalizes the stream and returns the pieces plus the total piece
603    /// length (piece data plus piece headers) for the load stream message.
604    pub fn finish(self) -> Result<(Vec<DirectPathPiece>, u32)> {
605        if self.current.is_some() {
606            return Err(ProtocolError::TtcDecode(
607                "direct path stream finished mid-row",
608            ));
609        }
610        let total = u32::try_from(self.total_piece_length)
611            .map_err(|_| ProtocolError::DirectPathLoadTooMuchData)?;
612        Ok((self.pieces, total))
613    }
614
615    fn bump_segments(&mut self) {
616        if let Some(state) = self.current.as_mut() {
617            state.num_segments = state.num_segments.saturating_add(1);
618        }
619    }
620
621    fn space_left(&self) -> usize {
622        TNS_DPLS_MAX_PIECE_SIZE.saturating_sub(self.data.len())
623    }
624
625    fn write_u8_in_piece(&mut self, value: u8) -> Result<()> {
626        if self.space_left() < 1 {
627            self.finalize_piece()?;
628            self.current = Some(PieceState::default());
629        }
630        self.data.push(value);
631        Ok(())
632    }
633
634    /// Mirrors `PieceBuffer._write_raw_bytes_and_length`: short values
635    /// (<= 0xfa bytes) are written as `u8 length + data`; longer values are
636    /// written as one or more `0xfe + u16be length + data` chunks that may
637    /// split across pieces with the SPLIT_WITH_PREV/NEXT flags.
638    fn write_raw_bytes_and_length(&mut self, bytes: &[u8]) -> Result<()> {
639        if bytes.len() <= TNS_DPLS_MAX_SHORT_LENGTH {
640            if bytes.len() + 1 > self.space_left() {
641                self.finalize_piece()?;
642                self.current = Some(PieceState::default());
643            }
644            self.data.push(bytes.len() as u8);
645            self.data.extend_from_slice(bytes);
646            self.bump_segments();
647            return Ok(());
648        }
649
650        let mut remaining = bytes;
651        while remaining.len() + 3 > self.space_left() {
652            // Fail-closed divergence from the reference: if fewer than four
653            // bytes remain in the piece the reference would emit a corrupt
654            // zero/negative-length chunk; start a fresh piece instead.
655            if self.space_left() < 4 {
656                self.finalize_piece()?;
657                self.current = Some(PieceState::default());
658                continue;
659            }
660            let chunk_len = self.space_left() - 3;
661            let (chunk, rest) = remaining.split_at(chunk_len.min(remaining.len()));
662            self.data.push(TNS_DPLS_LONG_LENGTH_INDICATOR);
663            self.data
664                .extend_from_slice(&(chunk.len() as u16).to_be_bytes());
665            self.data.extend_from_slice(chunk);
666            remaining = rest;
667            if let Some(state) = self.current.as_mut() {
668                state.is_split_with_next = true;
669            }
670            self.bump_segments();
671            self.finalize_piece()?;
672            self.current = Some(PieceState {
673                is_split_with_prev: !remaining.is_empty(),
674                ..PieceState::default()
675            });
676        }
677        if !remaining.is_empty() {
678            self.bump_segments();
679            self.data.push(TNS_DPLS_LONG_LENGTH_INDICATOR);
680            self.data
681                .extend_from_slice(&(remaining.len() as u16).to_be_bytes());
682            self.data.extend_from_slice(remaining);
683        }
684        Ok(())
685    }
686
687    fn finalize_piece(&mut self) -> Result<()> {
688        let Some(state) = self.current.take() else {
689            return Err(ProtocolError::TtcDecode(
690                "direct path piece finalized without an active piece",
691            ));
692        };
693        let mut flags = 0u8;
694        if state.is_first {
695            flags |= TNS_DPLS_ROW_HEADER_FIRST;
696        } else if state.is_split_with_prev {
697            flags |= TNS_DPLS_ROW_HEADER_SPLIT_WITH_PREV;
698        }
699        if state.is_last {
700            flags |= TNS_DPLS_ROW_HEADER_LAST;
701        } else if state.is_split_with_next {
702            flags |= TNS_DPLS_ROW_HEADER_SPLIT_WITH_NEXT;
703        }
704        let is_fast_row = state.is_first && state.is_last && state.is_fast;
705        if is_fast_row {
706            flags |= TNS_DPLS_ROW_HEADER_FAST_ROW | TNS_DPLS_ROW_HEADER_FAST_PIECE;
707        }
708        let num_segments = u8::try_from(state.num_segments)
709            .map_err(|_| ProtocolError::TtcDecode("direct path piece segment count overflow"))?;
710        let piece = DirectPathPiece {
711            flags,
712            num_segments,
713            data: std::mem::take(&mut self.data),
714        };
715        let new_length = self.total_piece_length + piece.data.len() as u64 + piece.header_length();
716        if new_length > TNS_DPLS_MAX_MESSAGE_SIZE {
717            return Err(ProtocolError::DirectPathLoadTooMuchData);
718        }
719        self.total_piece_length = new_length;
720        self.pieces.push(piece);
721        // callers decide what the next piece (if any) looks like
722        Ok(())
723    }
724}
725
726/// Fast direct path types per the reference `DbType._is_fast` flags. LONG and
727/// LONG RAW (and thus inlined CLOB/BLOB) are not fast.
728fn is_fast_dbtype(metadata: &ColumnMetadata) -> bool {
729    matches!(
730        metadata.ora_type_num,
731        ORA_TYPE_NUM_VARCHAR
732            | ORA_TYPE_NUM_NUMBER
733            | ORA_TYPE_NUM_BINARY_INTEGER
734            | ORA_TYPE_NUM_CHAR
735            | ORA_TYPE_NUM_DATE
736            | ORA_TYPE_NUM_RAW
737            | ORA_TYPE_NUM_BINARY_FLOAT
738            | ORA_TYPE_NUM_BINARY_DOUBLE
739            | ORA_TYPE_NUM_BOOLEAN
740            | ORA_TYPE_NUM_TIMESTAMP
741            | ORA_TYPE_NUM_TIMESTAMP_TZ
742            | ORA_TYPE_NUM_TIMESTAMP_LTZ
743    )
744}
745
746/// Result of encoding one batch of rows into the piece stream format.
747#[derive(Clone, Debug, Eq, PartialEq)]
748pub struct DirectPathStream {
749    pub(crate) pieces: Vec<DirectPathPiece>,
750    pub(crate) total_piece_length: u32,
751}
752
753/// Encodes a batch of rows into direct path pieces.
754///
755/// `first_row_num` is the 1-based number of the first row in this batch for
756/// error reporting; the reference keeps a running row counter across batches
757/// of a single `direct_path_load` call.
758pub fn encode_direct_path_rows(
759    column_metadata: &[ColumnMetadata],
760    rows: &[Vec<DirectPathColumnValue>],
761    first_row_num: u64,
762) -> Result<DirectPathStream> {
763    let mut buffer = DirectPathPieceBuffer::new();
764    for (row_index, row) in rows.iter().enumerate() {
765        if row.len() != column_metadata.len() {
766            return Err(ProtocolError::TtcDecode(
767                "direct path row width does not match column metadata",
768            ));
769        }
770        let row_num = first_row_num + row_index as u64;
771        buffer.start_row()?;
772        for (metadata, value) in column_metadata.iter().zip(row) {
773            buffer.add_column_value(metadata, value, row_num)?;
774        }
775        buffer.finish_row()?;
776    }
777    let (pieces, total_piece_length) = buffer.finish()?;
778    Ok(DirectPathStream {
779        pieces,
780        total_piece_length,
781    })
782}
783
784/// Builds the payload for TTC function 129 (direct path load stream).
785///
786/// Mirrors `DirectPathLoadStreamMessage._write_message`.
787pub fn build_direct_path_load_stream_payload(
788    cursor_id: u16,
789    stream: &DirectPathStream,
790    seq_num: u8,
791) -> Result<Vec<u8>> {
792    let mut writer = TtcWriter::new();
793    writer.write_function_code_with_seq(TNS_FUNC_DIRECT_PATH_LOAD_STREAM, seq_num);
794    writer.write_ub8(0); // token number
795    writer.write_ub2(cursor_id);
796    writer.write_u8(1); // pointer (buffer)
797    writer.write_ub4(stream.total_piece_length);
798    writer.write_ub4(TNS_DP_STREAM_VERSION);
799    writer.write_u8(0); // pointer (input values)
800    writer.write_ub4(0); // number of input values
801    writer.write_u8(1); // pointer (output values)
802    writer.write_u8(1); // pointer (output values length)
803    for piece in &stream.pieces {
804        piece.write_to(&mut writer)?;
805    }
806    Ok(writer.into_bytes())
807}
808
809/// Batch/chunk state machine shared by `executemany` ingestion and direct
810/// path load. Port of `BatchLoadManager`/`DataFrameBatchLoadManager`
811/// (impl/base/batch_load_manager.pyx).
812///
813/// The data source is modelled as a list of chunks (an Arrow chunked array
814/// has one entry per chunk; a plain list of rows is a single chunk). Batches
815/// never span chunk boundaries; `message_offset` is the row offset *within
816/// the current chunk* that must accompany the execute/load message.
817#[derive(Clone, Debug, Eq, PartialEq)]
818pub struct BatchLoadState {
819    chunk_lengths: Vec<u64>,
820    batch_size: u32,
821    chunk_index: usize,
822    offset: u64,
823    message_offset: u64,
824    num_rows: u32,
825}
826
827impl BatchLoadState {
828    pub fn new(chunk_lengths: Vec<u64>, batch_size: u32) -> Result<Self> {
829        if batch_size == 0 {
830            return Err(ProtocolError::TtcDecode(
831                "batch_size must be a positive integer",
832            ));
833        }
834        let mut state = Self {
835            chunk_lengths,
836            batch_size,
837            chunk_index: 0,
838            offset: 0,
839            message_offset: 0,
840            num_rows: 0,
841        };
842        state.advance_batch();
843        Ok(state)
844    }
845
846    /// Creates the state machine for a single-chunk source of `total_rows`
847    /// rows (a plain list of rows).
848    pub fn for_rows(total_rows: u64, batch_size: u32) -> Result<Self> {
849        Self::new(vec![total_rows], batch_size)
850    }
851
852    /// Number of rows in the current batch; zero means the load is complete.
853    pub fn num_rows(&self) -> u32 {
854        self.num_rows
855    }
856
857    /// Row offset of the current batch within the current chunk.
858    pub fn offset(&self) -> u64 {
859        self.offset
860    }
861
862    /// Offset to send with the execute/load message (row offset within the
863    /// current chunk at the time the batch was formed).
864    pub fn message_offset(&self) -> u64 {
865        self.message_offset
866    }
867
868    /// Index of the chunk the current batch draws from.
869    pub fn chunk_index(&self) -> usize {
870        self.chunk_index
871    }
872
873    pub fn is_done(&self) -> bool {
874        self.num_rows == 0
875    }
876
877    /// Advances to the next batch (mirrors `BatchLoadManager.next_batch`).
878    pub fn next_batch(&mut self) {
879        self.offset += u64::from(self.num_rows);
880        self.advance_batch();
881    }
882
883    fn rows_in_current_chunk(&self) -> u64 {
884        self.chunk_lengths
885            .get(self.chunk_index)
886            .copied()
887            .unwrap_or(0)
888    }
889
890    fn calculate_num_rows_in_batch(&mut self) {
891        let remaining = self.rows_in_current_chunk().saturating_sub(self.offset);
892        self.num_rows = u32::try_from(remaining.min(u64::from(self.batch_size))).unwrap_or(0);
893    }
894
895    fn advance_batch(&mut self) {
896        self.message_offset = self.offset;
897        self.calculate_num_rows_in_batch();
898        if self.num_rows == 0 {
899            self.advance_chunk();
900        }
901    }
902
903    fn advance_chunk(&mut self) {
904        while self.chunk_index + 1 < self.chunk_lengths.len() {
905            self.offset = 0;
906            self.message_offset = 0;
907            self.chunk_index += 1;
908            self.calculate_num_rows_in_batch();
909            if self.num_rows > 0 {
910                break;
911            }
912        }
913    }
914}
915
916#[cfg(test)]
917mod tests {
918    use super::*;
919
920    // BoundedReader invariant (l2p), direct-path columns family: a PARAMETER
921    // message declaring a huge num_columns (ub4 ~620M) with no column-metadata
922    // bytes following must fail closed via with_capacity_bounded + the per-
923    // column parse, not reserve one ColumnMetadata per declared column. (This
924    // replaces the old arbitrary `.min(1024)` cap with a buffer-anchored bound.)
925    #[test]
926    fn direct_path_oversized_column_count_fails_closed_not_oom() {
927        // type=8 PARAMETER; num_columns ub4 (len byte 4) = 0x25000000, then EOF.
928        let payload = [TNS_MSG_TYPE_PARAMETER, 4, 0x25, 0x00, 0x00, 0x00];
929        let err = parse_direct_path_prepare_response(&payload, ClientCapabilities::default())
930            .expect_err("oversized direct-path column count must fail closed");
931        assert!(
932            matches!(
933                err,
934                ProtocolError::TtcDecode(_) | ProtocolError::ResourceLimit { .. }
935            ),
936            "got {err:?}"
937        );
938    }
939
940    #[test]
941    fn direct_path_prepare_respects_protocol_column_limit() {
942        let payload = [TNS_MSG_TYPE_PARAMETER, 1, 2];
943        let limits = ProtocolLimits {
944            max_columns: 1,
945            ..ProtocolLimits::DEFAULT
946        };
947        let err = parse_direct_path_prepare_response_with_limits(
948            &payload,
949            ClientCapabilities::default(),
950            limits,
951        )
952        .expect_err("column count above policy must fail");
953        assert!(
954            matches!(
955                err,
956                ProtocolError::ResourceLimit {
957                    limit: "columns",
958                    observed: 2,
959                    maximum: 1,
960                }
961            ),
962            "got {err:?}"
963        );
964    }
965
966    fn column(name: &str, ora_type_num: u8, max_size: u32, nulls_allowed: bool) -> ColumnMetadata {
967        ColumnMetadata {
968            name: name.to_string(),
969            ora_type_num,
970            csfrm: if matches!(
971                ora_type_num,
972                ORA_TYPE_NUM_VARCHAR | ORA_TYPE_NUM_CHAR | ORA_TYPE_NUM_LONG
973            ) {
974                CS_FORM_IMPLICIT
975            } else {
976                0
977            },
978            precision: 0,
979            scale: 0,
980            buffer_size: max_size,
981            max_size,
982            nulls_allowed,
983            is_json: false,
984            is_oson: false,
985            object_schema: None,
986            object_type_name: None,
987            is_array: false,
988            vector_dimensions: None,
989            vector_format: 0,
990            vector_flags: 0,
991            ..Default::default()
992        }
993    }
994
995    #[test]
996    fn prepare_payload_matches_reference_layout() {
997        let payload = build_direct_path_prepare_payload(
998            "pythontest",
999            "dpl_golden",
1000            &["id".to_string(), "name".to_string()],
1001            10,
1002        )
1003        .expect("payload should build");
1004        // header: msg type, function code, seq, token
1005        assert_eq!(&payload[..4], &[3, 128, 10, 0]);
1006        let mut expected = vec![
1007            1, 1, // ub4 op code LOAD
1008            1, // kw pointer
1009            1, 4, // ub4 kw length = 2 columns + 2
1010            1, // input array pointer
1011            1, 15, // ub2 in values length
1012            1, 1, 1, 1, 1, 1, // six pointers
1013        ];
1014        // schema name
1015        expected.extend_from_slice(&[0, 1, 10]);
1016        expected.extend_from_slice(&[10]);
1017        expected.extend_from_slice(b"pythontest");
1018        expected.extend_from_slice(&[1, 3]);
1019        // table name
1020        expected.extend_from_slice(&[0, 1, 10]);
1021        expected.extend_from_slice(&[10]);
1022        expected.extend_from_slice(b"dpl_golden");
1023        expected.extend_from_slice(&[1, 1]);
1024        // column names
1025        expected.extend_from_slice(&[0, 1, 2, 2]);
1026        expected.extend_from_slice(b"id");
1027        expected.extend_from_slice(&[1, 4]);
1028        expected.extend_from_slice(&[0, 1, 4, 4]);
1029        expected.extend_from_slice(b"name");
1030        expected.extend_from_slice(&[1, 4]);
1031        // in values: 400, 400, 9 zeros, 0xffff, 0, 0, 1
1032        expected.extend_from_slice(&[2, 0x01, 0x90, 2, 0x01, 0x90]);
1033        expected.extend_from_slice(&[0; 9]);
1034        expected.extend_from_slice(&[2, 0xff, 0xff, 0, 0, 1, 1]);
1035        assert_eq!(&payload[4..], expected.as_slice());
1036    }
1037
1038    #[test]
1039    fn op_payload_matches_reference_layout() {
1040        let payload = build_direct_path_op_payload(1, TNS_DP_OP_FINISH, 12);
1041        assert_eq!(
1042            payload,
1043            vec![3, 130, 12, 0, 1, 2, 1, 1, 0, 0, 1, 1],
1044            "fn code, seq, token, ub4 op, ub2 cursor, ptr 0, ub4 0, ptr 1, ptr 1"
1045        );
1046    }
1047
1048    #[test]
1049    fn single_fast_row_produces_one_fast_piece() {
1050        let columns = vec![
1051            column("ID", ORA_TYPE_NUM_NUMBER, 0, false),
1052            column("NAME", ORA_TYPE_NUM_VARCHAR, 100, false),
1053        ];
1054        let rows = vec![vec![
1055            DirectPathColumnValue::Number("1".into()),
1056            DirectPathColumnValue::Bytes(b"alpha".to_vec()),
1057        ]];
1058        let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1059        assert_eq!(stream.pieces.len(), 1);
1060        let piece = &stream.pieces[0];
1061        assert_eq!(
1062            piece.flags(),
1063            TNS_DPLS_ROW_HEADER_FIRST
1064                | TNS_DPLS_ROW_HEADER_LAST
1065                | TNS_DPLS_ROW_HEADER_FAST_ROW
1066                | TNS_DPLS_ROW_HEADER_FAST_PIECE
1067        );
1068        assert_eq!(piece.num_segments(), 2);
1069        // number 1 encodes as c1 02; "alpha" as length + bytes
1070        assert_eq!(
1071            piece.data(),
1072            &[2, 0xc1, 0x02, 5, b'a', b'l', b'p', b'h', b'a']
1073        );
1074        // total = data + 4-byte fast header
1075        assert_eq!(stream.total_piece_length, piece.data().len() as u32 + 4);
1076    }
1077
1078    #[test]
1079    fn long_column_clears_fast_flag() {
1080        let columns = vec![column("WIDE", ORA_TYPE_NUM_LONG, 0, false)];
1081        let rows = vec![vec![DirectPathColumnValue::Bytes(vec![b'x'; 10])]];
1082        let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1083        assert_eq!(stream.pieces.len(), 1);
1084        assert_eq!(
1085            stream.pieces[0].flags(),
1086            TNS_DPLS_ROW_HEADER_FIRST | TNS_DPLS_ROW_HEADER_LAST
1087        );
1088        // 1 length byte + 10 data bytes + 2-byte slow header
1089        assert_eq!(stream.total_piece_length, 11 + 2);
1090    }
1091
1092    #[test]
1093    fn null_values_encode_as_null_indicator() {
1094        let columns = vec![column("SALARY", ORA_TYPE_NUM_NUMBER, 0, true)];
1095        let rows = vec![vec![DirectPathColumnValue::Null]];
1096        let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1097        assert_eq!(stream.pieces[0].data(), &[0xff]);
1098        assert_eq!(stream.pieces[0].num_segments(), 1);
1099    }
1100
1101    #[test]
1102    fn null_into_not_null_column_raises_dpy_8001() {
1103        let columns = vec![column("NAME", ORA_TYPE_NUM_VARCHAR, 100, false)];
1104        let rows = vec![vec![DirectPathColumnValue::Null]];
1105        let err = encode_direct_path_rows(&columns, &rows, 1).expect_err("nulls must be rejected");
1106        assert!(
1107            err.to_string().starts_with("DPY-8001:"),
1108            "unexpected error: {err}"
1109        );
1110        assert!(err.to_string().contains("\"NAME\""), "{err}");
1111        assert!(err.to_string().contains("row 1"), "{err}");
1112    }
1113
1114    #[test]
1115    fn oversized_value_raises_dpy_8000() {
1116        let columns = vec![column("NAME", ORA_TYPE_NUM_VARCHAR, 4, false)];
1117        let rows = vec![vec![DirectPathColumnValue::Bytes(b"toolong".to_vec())]];
1118        let err = encode_direct_path_rows(&columns, &rows, 3).expect_err("size must be enforced");
1119        assert!(
1120            err.to_string().starts_with("DPY-8000:"),
1121            "unexpected error: {err}"
1122        );
1123        assert!(err.to_string().contains("row 3"), "{err}");
1124    }
1125
1126    #[test]
1127    fn long_values_use_fe_chunked_segments() {
1128        // 600 bytes > 0xfa, must use the 0xfe + u16be length form
1129        let columns = vec![column("WIDE", ORA_TYPE_NUM_VARCHAR, 1000, false)];
1130        let value = vec![b'q'; 600];
1131        let rows = vec![vec![DirectPathColumnValue::Bytes(value.clone())]];
1132        let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1133        assert_eq!(stream.pieces.len(), 1);
1134        let piece = &stream.pieces[0];
1135        assert_eq!(piece.num_segments(), 1);
1136        let mut expected = vec![0xfe, 0x02, 0x58];
1137        expected.extend_from_slice(&value);
1138        assert_eq!(piece.data(), expected.as_slice());
1139    }
1140
1141    #[test]
1142    fn values_larger_than_piece_split_across_pieces_with_split_flags() {
1143        let columns = vec![column("WIDE", ORA_TYPE_NUM_LONG, 0, false)];
1144        let total = TNS_DPLS_MAX_PIECE_SIZE + 100;
1145        let rows = vec![vec![DirectPathColumnValue::Bytes(vec![b'z'; total])]];
1146        let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1147        assert_eq!(stream.pieces.len(), 2);
1148        let first = &stream.pieces[0];
1149        let second = &stream.pieces[1];
1150        assert_eq!(
1151            first.flags(),
1152            TNS_DPLS_ROW_HEADER_FIRST | TNS_DPLS_ROW_HEADER_SPLIT_WITH_NEXT
1153        );
1154        assert_eq!(
1155            second.flags(),
1156            TNS_DPLS_ROW_HEADER_SPLIT_WITH_PREV | TNS_DPLS_ROW_HEADER_LAST
1157        );
1158        // first piece is filled to the brim: 3-byte chunk header + payload
1159        assert_eq!(first.data().len(), TNS_DPLS_MAX_PIECE_SIZE);
1160        assert_eq!(first.data()[0], 0xfe);
1161        let first_chunk = usize::from(u16::from_be_bytes([first.data()[1], first.data()[2]]));
1162        assert_eq!(first_chunk, TNS_DPLS_MAX_PIECE_SIZE - 3);
1163        let second_chunk = usize::from(u16::from_be_bytes([second.data()[1], second.data()[2]]));
1164        assert_eq!(first_chunk + second_chunk, total);
1165        assert_eq!(
1166            stream.total_piece_length as usize,
1167            first.data().len() + second.data().len() + 2 + 2
1168        );
1169    }
1170
1171    #[test]
1172    fn segment_count_caps_at_255_per_piece() {
1173        let columns: Vec<ColumnMetadata> = (0..300)
1174            .map(|i| column(&format!("C{i}"), ORA_TYPE_NUM_NUMBER, 0, true))
1175            .collect();
1176        let row: Vec<DirectPathColumnValue> =
1177            (0..300).map(|_| DirectPathColumnValue::Null).collect();
1178        let stream = encode_direct_path_rows(&columns, &[row], 1).expect("stream should encode");
1179        assert_eq!(stream.pieces.len(), 2);
1180        assert_eq!(stream.pieces[0].num_segments(), 255);
1181        assert_eq!(stream.pieces[1].num_segments(), 45);
1182        // continuation piece created by the segment cap carries neither FIRST
1183        // nor SPLIT_WITH_PREV (mirrors the reference)
1184        assert_eq!(stream.pieces[0].flags(), TNS_DPLS_ROW_HEADER_FIRST);
1185        assert_eq!(stream.pieces[1].flags(), TNS_DPLS_ROW_HEADER_LAST);
1186    }
1187
1188    #[test]
1189    fn timestamp_with_zero_fraction_collapses_to_seven_bytes() {
1190        let columns = vec![column("TS", ORA_TYPE_NUM_TIMESTAMP, 0, true)];
1191        let rows = vec![vec![DirectPathColumnValue::DateTime {
1192            year: 2024,
1193            month: 1,
1194            day: 2,
1195            hour: 3,
1196            minute: 4,
1197            second: 5,
1198            nanosecond: 0,
1199        }]];
1200        let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1201        assert_eq!(
1202            stream.pieces[0].data(),
1203            &[7, 120, 124, 1, 2, 4, 5, 6],
1204            "7-byte date form expected when fractional seconds are zero"
1205        );
1206    }
1207
1208    #[test]
1209    fn boolean_values_encode_per_reference() {
1210        let columns = vec![column("FLAG", ORA_TYPE_NUM_BOOLEAN, 0, true)];
1211        let rows = vec![
1212            vec![DirectPathColumnValue::Boolean(true)],
1213            vec![DirectPathColumnValue::Boolean(false)],
1214        ];
1215        let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1216        assert_eq!(stream.pieces[0].data(), &[2, 1, 1]);
1217        assert_eq!(stream.pieces[1].data(), &[1, 0]);
1218    }
1219
1220    #[test]
1221    fn row_width_mismatch_is_rejected() {
1222        let columns = vec![
1223            column("A", ORA_TYPE_NUM_NUMBER, 0, true),
1224            column("B", ORA_TYPE_NUM_NUMBER, 0, true),
1225        ];
1226        let rows = vec![vec![DirectPathColumnValue::Null]];
1227        assert!(encode_direct_path_rows(&columns, &rows, 1).is_err());
1228    }
1229
1230    #[test]
1231    fn metadata_overrides_inline_lobs() {
1232        let mut clob = column("DOC", ORA_TYPE_NUM_CLOB, 0, true);
1233        clob.csfrm = CS_FORM_IMPLICIT;
1234        apply_direct_path_metadata_overrides(&mut clob, 873);
1235        assert_eq!(clob.ora_type_num, ORA_TYPE_NUM_LONG);
1236        assert_eq!(clob.csfrm, CS_FORM_NCHAR, "multi-byte charset uses NCHAR");
1237
1238        let mut clob = column("DOC", ORA_TYPE_NUM_CLOB, 0, true);
1239        clob.csfrm = CS_FORM_IMPLICIT;
1240        apply_direct_path_metadata_overrides(&mut clob, 178);
1241        assert_eq!(
1242            clob.csfrm, CS_FORM_IMPLICIT,
1243            "single-byte charset keeps form"
1244        );
1245
1246        let mut blob = column("BIN", ORA_TYPE_NUM_BLOB, 0, true);
1247        apply_direct_path_metadata_overrides(&mut blob, 873);
1248        assert_eq!(blob.ora_type_num, ORA_TYPE_NUM_LONG_RAW);
1249        assert_eq!(blob.csfrm, 0);
1250    }
1251
1252    #[test]
1253    fn batch_state_single_chunk_splits_by_batch_size() {
1254        let mut state = BatchLoadState::for_rows(5, 2).expect("state should build");
1255        assert_eq!(
1256            (state.num_rows(), state.offset(), state.message_offset()),
1257            (2, 0, 0)
1258        );
1259        state.next_batch();
1260        assert_eq!(
1261            (state.num_rows(), state.offset(), state.message_offset()),
1262            (2, 2, 2)
1263        );
1264        state.next_batch();
1265        assert_eq!(
1266            (state.num_rows(), state.offset(), state.message_offset()),
1267            (1, 4, 4)
1268        );
1269        state.next_batch();
1270        assert!(state.is_done());
1271    }
1272
1273    #[test]
1274    fn batch_state_never_spans_chunks() {
1275        // chunks of 3 and 2 rows with batch size 2: batches are 2, 1, 2
1276        let mut state = BatchLoadState::new(vec![3, 2], 2).expect("state should build");
1277        assert_eq!(
1278            (
1279                state.chunk_index(),
1280                state.num_rows(),
1281                state.message_offset()
1282            ),
1283            (0, 2, 0)
1284        );
1285        state.next_batch();
1286        assert_eq!(
1287            (
1288                state.chunk_index(),
1289                state.num_rows(),
1290                state.message_offset()
1291            ),
1292            (0, 1, 2)
1293        );
1294        state.next_batch();
1295        assert_eq!(
1296            (
1297                state.chunk_index(),
1298                state.num_rows(),
1299                state.message_offset()
1300            ),
1301            (1, 2, 0)
1302        );
1303        state.next_batch();
1304        assert!(state.is_done());
1305    }
1306
1307    #[test]
1308    fn batch_state_skips_empty_chunks() {
1309        let mut state = BatchLoadState::new(vec![0, 0, 3], 10).expect("state should build");
1310        assert_eq!((state.chunk_index(), state.num_rows()), (2, 3));
1311        state.next_batch();
1312        assert!(state.is_done());
1313    }
1314
1315    #[test]
1316    fn batch_state_rejects_zero_batch_size() {
1317        assert!(BatchLoadState::for_rows(5, 0).is_err());
1318    }
1319
1320    #[test]
1321    fn batch_state_empty_source_is_done_immediately() {
1322        let state = BatchLoadState::for_rows(0, 10).expect("state should build");
1323        assert!(state.is_done());
1324    }
1325
1326    #[test]
1327    fn load_stream_payload_header_matches_reference_layout() {
1328        let columns = vec![column("ID", ORA_TYPE_NUM_NUMBER, 0, false)];
1329        let rows = vec![vec![DirectPathColumnValue::Number("1".into())]];
1330        let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1331        let payload =
1332            build_direct_path_load_stream_payload(1, &stream, 11).expect("payload should build");
1333        let mut expected = vec![
1334            3, 129, 11, // fn code + seq
1335            0,  // token
1336            1, 1, // ub2 cursor id
1337            1, // buffer pointer
1338            1, 7, // ub4 total piece length (3 data + 4 header)
1339            2, 0x01, 0x90, // ub4 stream version 400
1340            0,    // input values pointer
1341            0,    // ub4 input values count
1342            1, 1, // output pointers
1343            0x3c, 0, 7, 1, // piece: flags, u16be total, num segments
1344            2, 0xc1, 0x02, // number 1
1345        ];
1346        assert_eq!(payload, std::mem::take(&mut expected));
1347    }
1348}