Skip to main content

oracledb_protocol/
dpl.rs

1//! Direct Path Load (DPL) protocol support.
2//!
3//! Implements the three TTC functions used by python-oracledb's
4//! `Connection.direct_path_load()`:
5//!
6//! * function 128 — direct path prepare (send table/column names, receive
7//!   server-side column metadata and a direct path cursor id),
8//! * function 129 — direct path load stream (column-array piece stream),
9//! * function 130 — direct path op (FINISH commits, ABORT discards).
10//!
11//! The builders/parsers mirror `impl/thin/messages/direct_path_*.pyx` of the
12//! python-oracledb v4.0.1 reference and are validated against golden wire
13//! captures in `tests/golden/`. The batch state machine mirrors
14//! `impl/base/batch_load_manager.pyx`.
15
16use crate::thin::{
17    encode_binary_double, encode_binary_float, encode_number_text, encode_oracle_date,
18    encode_oracle_timestamp, encode_oracle_timestamp_tz, parse_column_metadata,
19    parse_server_error_info, skip_server_side_piggyback, ClientCapabilities, ColumnMetadata,
20    CS_FORM_IMPLICIT, CS_FORM_NCHAR, ORA_TYPE_NUM_BINARY_DOUBLE, ORA_TYPE_NUM_BINARY_FLOAT,
21    ORA_TYPE_NUM_BINARY_INTEGER, ORA_TYPE_NUM_BLOB, ORA_TYPE_NUM_BOOLEAN, ORA_TYPE_NUM_CHAR,
22    ORA_TYPE_NUM_CLOB, ORA_TYPE_NUM_DATE, ORA_TYPE_NUM_LONG, ORA_TYPE_NUM_LONG_RAW,
23    ORA_TYPE_NUM_NUMBER, ORA_TYPE_NUM_RAW, ORA_TYPE_NUM_TIMESTAMP, ORA_TYPE_NUM_TIMESTAMP_LTZ,
24    ORA_TYPE_NUM_TIMESTAMP_TZ, ORA_TYPE_NUM_VARCHAR, TNS_MSG_TYPE_END_OF_RESPONSE,
25    TNS_MSG_TYPE_ERROR, TNS_MSG_TYPE_PARAMETER, TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK,
26    TNS_MSG_TYPE_STATUS,
27};
28use crate::wire::{BoundedReader, TtcReader, TtcWriter};
29use crate::{ProtocolError, Result};
30
31pub const TNS_FUNC_DIRECT_PATH_PREPARE: u8 = 128;
32pub const TNS_FUNC_DIRECT_PATH_LOAD_STREAM: u8 = 129;
33pub const TNS_FUNC_DIRECT_PATH_OP: u8 = 130;
34
35pub const TNS_DP_INTERFACE_VERSION: u32 = 400;
36pub const TNS_DP_STREAM_VERSION: u32 = 400;
37
38pub const TNS_DPP_OP_CODE_LOAD: u32 = 1;
39
40pub const TNS_DP_OP_ABORT: u32 = 1;
41pub const TNS_DP_OP_FINISH: u32 = 2;
42
43const TNS_DPP_IN_INDEX_INTERFACE_VERSION: usize = 0;
44const TNS_DPP_IN_INDEX_STREAM_VERSION: usize = 1;
45const TNS_DPP_IN_INDEX_LOCK_WAIT: usize = 14;
46const TNS_DPP_KW_INDEX_OBJECT_NAME: u16 = 1;
47const TNS_DPP_KW_INDEX_SCHEMA_NAME: u16 = 3;
48const TNS_DPP_KW_INDEX_COLUMN_NAME: u16 = 4;
49const TNS_DPP_KW_INDEX_NFOBJ_OID_POS: usize = 11;
50const TNS_DPP_OUT_INDEX_CURSOR: usize = 3;
51// The reference sizes the input array at TNS_DPP_IN_MAX_PARAMS (36) but only
52// transmits the first 15 entries: `_initialize_hook` seeds indices 16/17 with
53// 0xffff *without* updating `in_values_length`, so they are never sent.
54const TNS_DPP_IN_VALUES_SENT: usize = TNS_DPP_IN_INDEX_LOCK_WAIT + 1;
55
56pub const TNS_DPLS_ROW_HEADER_FAST_PIECE: u8 = 0x10;
57pub const TNS_DPLS_ROW_HEADER_FAST_ROW: u8 = 0x20;
58pub const TNS_DPLS_ROW_HEADER_FIRST: u8 = 0x08;
59pub const TNS_DPLS_ROW_HEADER_LAST: u8 = 0x04;
60pub const TNS_DPLS_ROW_HEADER_SPLIT_WITH_PREV: u8 = 0x02;
61pub const TNS_DPLS_ROW_HEADER_SPLIT_WITH_NEXT: u8 = 0x01;
62
63pub const TNS_DPLS_MAX_MESSAGE_SIZE: u64 = 1_073_728_895;
64pub const TNS_DPLS_MAX_SHORT_LENGTH: usize = 0xfa;
65pub const TNS_DPLS_MAX_PIECE_SIZE: usize = 0xfff0;
66
67const TNS_DPLS_LONG_LENGTH_INDICATOR: u8 = 0xfe;
68const TNS_NULL_LENGTH_INDICATOR: u8 = 0xff;
69
70/// Builds the payload for TTC function 128 (direct path prepare).
71///
72/// Mirrors `DirectPathPrepareMessage._write_message`.
73pub fn build_direct_path_prepare_payload(
74    schema_name: &str,
75    table_name: &str,
76    column_names: &[String],
77    seq_num: u8,
78) -> Result<Vec<u8>> {
79    let keyword_parameters_length =
80        u32::try_from(column_names.len() + 2).map_err(|_| ProtocolError::InvalidPacketLength {
81            length: column_names.len(),
82            minimum: 0,
83        })?;
84
85    let mut in_values = [0u32; TNS_DPP_IN_VALUES_SENT];
86    in_values[TNS_DPP_IN_INDEX_INTERFACE_VERSION] = TNS_DP_INTERFACE_VERSION;
87    in_values[TNS_DPP_IN_INDEX_STREAM_VERSION] = TNS_DP_STREAM_VERSION;
88    in_values[TNS_DPP_KW_INDEX_NFOBJ_OID_POS] = 0xffff;
89    in_values[TNS_DPP_IN_INDEX_LOCK_WAIT] = 1;
90
91    let mut writer = TtcWriter::new();
92    writer.write_function_code_with_seq(TNS_FUNC_DIRECT_PATH_PREPARE, seq_num);
93    writer.write_ub8(0); // token number
94    writer.write_ub4(TNS_DPP_OP_CODE_LOAD);
95    writer.write_u8(1); // keyword parameters (pointer)
96    writer.write_ub4(keyword_parameters_length);
97    writer.write_u8(1); // input array (pointer)
98    writer.write_ub2(TNS_DPP_IN_VALUES_SENT as u16);
99    writer.write_u8(1); // metadata (pointer)
100    writer.write_u8(1); // metadata length (pointer)
101    writer.write_u8(1); // parameters (pointer)
102    writer.write_u8(1); // parameters length (pointer)
103    writer.write_u8(1); // output array (pointer)
104    writer.write_u8(1); // output array length (pointer)
105    write_keyword_param(&mut writer, TNS_DPP_KW_INDEX_SCHEMA_NAME, schema_name)?;
106    write_keyword_param(&mut writer, TNS_DPP_KW_INDEX_OBJECT_NAME, table_name)?;
107    for name in column_names {
108        write_keyword_param(&mut writer, TNS_DPP_KW_INDEX_COLUMN_NAME, name)?;
109    }
110    for value in in_values {
111        writer.write_ub4(value);
112    }
113    Ok(writer.into_bytes())
114}
115
116fn write_keyword_param(writer: &mut TtcWriter, index: u16, value: &str) -> Result<()> {
117    let bytes = value.as_bytes();
118    let len = u16::try_from(bytes.len()).map_err(|_| ProtocolError::InvalidPacketLength {
119        length: bytes.len(),
120        minimum: 0,
121    })?;
122    writer.write_ub2(0); // text length
123    writer.write_ub2(len);
124    writer.write_bytes_with_length(bytes)?;
125    writer.write_ub2(index);
126    Ok(())
127}
128
129#[derive(Clone, Debug, Default, Eq, PartialEq)]
130pub struct DirectPathPrepareResult {
131    pub column_metadata: Vec<ColumnMetadata>,
132    pub cursor_id: u16,
133    pub out_values: Vec<u32>,
134}
135
136/// Parses the response to TTC function 128 (direct path prepare).
137///
138/// `capabilities.charset_id` drives the CLOB metadata override (charset ids
139/// of 800 and above are multi-byte, in which case implicit-charset CLOBs
140/// switch to the NCHAR form). Mirrors the reference's
141/// `DirectPathPrepareMessage._process_metadata`/`_process_return_parameters`.
142pub fn parse_direct_path_prepare_response(
143    payload: &[u8],
144    capabilities: ClientCapabilities,
145) -> Result<DirectPathPrepareResult> {
146    let mut reader = TtcReader::new(payload);
147    let mut result: Option<DirectPathPrepareResult> = None;
148    while reader.remaining() > 0 {
149        let message_type = reader.read_u8()?;
150        match message_type {
151            0 => {}
152            TNS_MSG_TYPE_PARAMETER => {
153                result = Some(parse_prepare_return_parameters(&mut reader, capabilities)?);
154            }
155            TNS_MSG_TYPE_STATUS => {
156                let _call_status = reader.read_ub4()?;
157                let _seq = reader.read_ub2()?;
158            }
159            TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
160                let _ = skip_server_side_piggyback(&mut reader)?;
161            }
162            TNS_MSG_TYPE_END_OF_RESPONSE => break,
163            TNS_MSG_TYPE_ERROR => {
164                let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
165                if info.number != 0 {
166                    return Err(ProtocolError::ServerError(info.message));
167                }
168            }
169            _ => {
170                return Err(ProtocolError::UnknownMessageType {
171                    message_type,
172                    position: reader.position().saturating_sub(1),
173                })
174            }
175        }
176    }
177    result.ok_or(ProtocolError::TtcDecode(
178        "direct path prepare response did not contain return parameters",
179    ))
180}
181
182fn parse_prepare_return_parameters(
183    reader: &mut TtcReader<'_>,
184    capabilities: ClientCapabilities,
185) -> Result<DirectPathPrepareResult> {
186    let num_columns = reader.read_ub4()?;
187    // Each column reads a multi-field metadata record (>=1 byte), so bound the
188    // reservation by the buffer (BoundedReader) instead of an arbitrary cap;
189    // parse_column_metadata still fails closed on truncation.
190    let mut column_metadata: Vec<ColumnMetadata> =
191        reader.with_capacity_bounded(num_columns as usize, 1);
192    for _ in 0..num_columns {
193        let mut metadata = parse_column_metadata(reader, capabilities)?;
194        apply_direct_path_metadata_overrides(&mut metadata, capabilities.charset_id);
195        column_metadata.push(metadata);
196    }
197    let num_params = reader.read_ub2()?;
198    if num_params != 0 {
199        return Err(ProtocolError::TtcDecode(
200            "unexpected parameters in direct path prepare response",
201        ));
202    }
203    let out_values_length = reader.read_ub2()?;
204    // Each out value is a ub4 (>=1 byte on the wire); bound by the buffer.
205    let mut out_values: Vec<u32> = reader.with_capacity_bounded(usize::from(out_values_length), 1);
206    for _ in 0..out_values_length {
207        out_values.push(reader.read_ub4()?);
208    }
209    let cursor_id =
210        out_values
211            .get(TNS_DPP_OUT_INDEX_CURSOR)
212            .copied()
213            .ok_or(ProtocolError::TtcDecode(
214                "direct path prepare response missing cursor id",
215            ))?;
216    let cursor_id = u16::try_from(cursor_id)
217        .map_err(|_| ProtocolError::TtcDecode("direct path cursor id out of range"))?;
218    Ok(DirectPathPrepareResult {
219        column_metadata,
220        cursor_id,
221        out_values,
222    })
223}
224
225/// CLOB/NCLOB and BLOB columns are always streamed as LONG/LONG RAW during a
226/// direct path load. Implicit-charset CLOBs switch to the NCHAR form when the
227/// database charset is multi-byte (charset ids >= 800).
228fn apply_direct_path_metadata_overrides(metadata: &mut ColumnMetadata, charset_id: u16) {
229    if metadata.ora_type_num == ORA_TYPE_NUM_CLOB {
230        if metadata.csfrm == CS_FORM_IMPLICIT && charset_id >= 800 {
231            metadata.csfrm = CS_FORM_NCHAR;
232        }
233        metadata.ora_type_num = ORA_TYPE_NUM_LONG;
234    } else if metadata.ora_type_num == ORA_TYPE_NUM_BLOB {
235        metadata.ora_type_num = ORA_TYPE_NUM_LONG_RAW;
236        metadata.csfrm = 0;
237    }
238}
239
240/// Builds the payload for TTC function 130 (direct path op).
241///
242/// Mirrors `DirectPathOpMessage._write_message`. `op_code` is
243/// [`TNS_DP_OP_FINISH`] (commits the load) or [`TNS_DP_OP_ABORT`].
244pub fn build_direct_path_op_payload(cursor_id: u16, op_code: u32, seq_num: u8) -> Vec<u8> {
245    let mut writer = TtcWriter::new();
246    writer.write_function_code_with_seq(TNS_FUNC_DIRECT_PATH_OP, seq_num);
247    writer.write_ub8(0); // token number
248    writer.write_ub4(op_code);
249    writer.write_ub2(cursor_id);
250    writer.write_u8(0); // pointer (input values)
251    writer.write_ub4(0); // number of input values
252    writer.write_u8(1); // pointer (output values)
253    writer.write_u8(1); // pointer (output values length)
254    writer.into_bytes()
255}
256
257/// Parses the response to TTC functions 129 and 130 (both return the same
258/// shape: a ub2 count of out values that are each skipped).
259pub fn parse_direct_path_simple_response(
260    payload: &[u8],
261    capabilities: ClientCapabilities,
262) -> Result<()> {
263    let mut reader = TtcReader::new(payload);
264    while reader.remaining() > 0 {
265        let message_type = reader.read_u8()?;
266        match message_type {
267            0 => {}
268            TNS_MSG_TYPE_PARAMETER => {
269                let num_out_values = reader.read_ub2()?;
270                for _ in 0..num_out_values {
271                    let _value = reader.read_ub4()?;
272                }
273            }
274            TNS_MSG_TYPE_STATUS => {
275                let _call_status = reader.read_ub4()?;
276                let _seq = reader.read_ub2()?;
277            }
278            TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
279                let _ = skip_server_side_piggyback(&mut reader)?;
280            }
281            TNS_MSG_TYPE_END_OF_RESPONSE => break,
282            TNS_MSG_TYPE_ERROR => {
283                let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
284                if info.number != 0 {
285                    return Err(ProtocolError::ServerError(info.message));
286                }
287            }
288            _ => {
289                return Err(ProtocolError::UnknownMessageType {
290                    message_type,
291                    position: reader.position().saturating_sub(1),
292                })
293            }
294        }
295    }
296    Ok(())
297}
298
299pub use parse_direct_path_simple_response as parse_direct_path_load_stream_response;
300pub use parse_direct_path_simple_response as parse_direct_path_op_response;
301
302/// One column value of a direct path load row, already converted to the
303/// Oracle-facing intermediate form (mirrors the reference's `OracleData`).
304///
305/// `Bytes` carries the on-the-wire byte payload for VARCHAR/CHAR/LONG (text
306/// already encoded per the column's charset form) and RAW/LONG RAW columns.
307#[derive(Clone, Debug, PartialEq)]
308pub enum DirectPathColumnValue {
309    Null,
310    Bytes(Vec<u8>),
311    Number(String),
312    BinaryDouble(f64),
313    BinaryFloat(f32),
314    DateTime {
315        year: i32,
316        month: u8,
317        day: u8,
318        hour: u8,
319        minute: u8,
320        second: u8,
321        nanosecond: u32,
322    },
323    Boolean(bool),
324}
325
326/// A finalized direct path piece, ready to be written to a load stream
327/// message.
328#[derive(Clone, Debug, Eq, PartialEq)]
329pub struct DirectPathPiece {
330    flags: u8,
331    num_segments: u8,
332    data: Vec<u8>,
333}
334
335impl DirectPathPiece {
336    pub fn flags(&self) -> u8 {
337        self.flags
338    }
339
340    pub fn num_segments(&self) -> u8 {
341        self.num_segments
342    }
343
344    pub fn data(&self) -> &[u8] {
345        &self.data
346    }
347
348    fn is_fast_row(&self) -> bool {
349        self.flags & TNS_DPLS_ROW_HEADER_FAST_ROW != 0
350    }
351
352    fn header_length(&self) -> u64 {
353        if self.is_fast_row() {
354            4
355        } else {
356            2
357        }
358    }
359
360    fn write_to(&self, writer: &mut TtcWriter) -> Result<()> {
361        writer.write_u8(self.flags);
362        if self.is_fast_row() {
363            let total = self.data.len() as u64 + self.header_length();
364            let total = u16::try_from(total).map_err(|_| {
365                ProtocolError::TtcDecode("direct path fast piece exceeds 16-bit length")
366            })?;
367            writer.write_u16be(total);
368        }
369        writer.write_u8(self.num_segments);
370        writer.write_raw(&self.data);
371        Ok(())
372    }
373}
374
375#[derive(Clone, Copy, Debug, Default)]
376struct PieceState {
377    is_first: bool,
378    is_last: bool,
379    is_split_with_prev: bool,
380    is_split_with_next: bool,
381    is_fast: bool,
382    num_segments: u16,
383}
384
385/// Streaming encoder for the direct path column-array piece format.
386///
387/// Port of the reference `PieceBuffer` (direct_path_load_stream.pyx). Usage:
388/// `start_row()` / `add_column_value(..)` per column / `finish_row()`, then
389/// [`DirectPathPieceBuffer::finish`].
390#[derive(Debug, Default)]
391pub struct DirectPathPieceBuffer {
392    pieces: Vec<DirectPathPiece>,
393    total_piece_length: u64,
394    data: Vec<u8>,
395    current: Option<PieceState>,
396}
397
398impl DirectPathPieceBuffer {
399    pub fn new() -> Self {
400        Self::default()
401    }
402
403    pub fn start_row(&mut self) -> Result<()> {
404        if self.current.is_some() {
405            return Err(ProtocolError::TtcDecode(
406                "direct path row started before previous row was finished",
407            ));
408        }
409        self.current = Some(PieceState {
410            is_first: true,
411            is_fast: true,
412            ..PieceState::default()
413        });
414        Ok(())
415    }
416
417    pub fn finish_row(&mut self) -> Result<()> {
418        let Some(state) = self.current.as_mut() else {
419            return Err(ProtocolError::TtcDecode(
420                "direct path row finished without being started",
421            ));
422        };
423        state.is_last = true;
424        self.finalize_piece()?;
425        self.current = None;
426        Ok(())
427    }
428
429    pub fn add_column_value(
430        &mut self,
431        metadata: &ColumnMetadata,
432        value: &DirectPathColumnValue,
433        row_num: u64,
434    ) -> Result<()> {
435        let Some(state) = self.current.as_mut() else {
436            return Err(ProtocolError::TtcDecode(
437                "direct path column value added outside of a row",
438            ));
439        };
440
441        // at most 255 segments per piece
442        if state.num_segments == 255 {
443            self.finalize_piece()?;
444            self.current = Some(PieceState::default());
445        }
446
447        if !is_fast_dbtype(metadata) {
448            if let Some(state) = self.current.as_mut() {
449                state.is_fast = false;
450            }
451        }
452
453        match value {
454            DirectPathColumnValue::Null => {
455                if !metadata.nulls_allowed {
456                    return Err(ProtocolError::NullsNotAllowed {
457                        column_name: metadata.name.clone(),
458                        row_num,
459                    });
460                }
461                self.write_u8_in_piece(TNS_NULL_LENGTH_INDICATOR)?;
462                self.bump_segments();
463                Ok(())
464            }
465            DirectPathColumnValue::Bytes(bytes) => {
466                if !matches!(
467                    metadata.ora_type_num,
468                    ORA_TYPE_NUM_VARCHAR
469                        | ORA_TYPE_NUM_CHAR
470                        | ORA_TYPE_NUM_LONG
471                        | ORA_TYPE_NUM_RAW
472                        | ORA_TYPE_NUM_LONG_RAW
473                ) {
474                    return Err(ProtocolError::TtcDecode(
475                        "direct path byte value sent for non-character column",
476                    ));
477                }
478                if metadata.max_size > 0 && bytes.len() as u64 > u64::from(metadata.max_size) {
479                    return Err(ProtocolError::ValueTooLarge {
480                        actual_size: bytes.len(),
481                        max_size: metadata.max_size,
482                        column_name: metadata.name.clone(),
483                        row_num,
484                    });
485                }
486                self.write_raw_bytes_and_length(bytes)
487            }
488            DirectPathColumnValue::Number(text) => {
489                if !matches!(
490                    metadata.ora_type_num,
491                    ORA_TYPE_NUM_NUMBER | ORA_TYPE_NUM_BINARY_INTEGER
492                ) {
493                    return Err(ProtocolError::TtcDecode(
494                        "direct path number value sent for non-number column",
495                    ));
496                }
497                let encoded = encode_number_text(text)?;
498                self.write_raw_bytes_and_length(&encoded)
499            }
500            DirectPathColumnValue::BinaryDouble(value) => {
501                if metadata.ora_type_num != ORA_TYPE_NUM_BINARY_DOUBLE {
502                    return Err(ProtocolError::TtcDecode(
503                        "direct path binary double sent for other column type",
504                    ));
505                }
506                let encoded = encode_binary_double(*value);
507                self.write_raw_bytes_and_length(&encoded)
508            }
509            DirectPathColumnValue::BinaryFloat(value) => {
510                if metadata.ora_type_num != ORA_TYPE_NUM_BINARY_FLOAT {
511                    return Err(ProtocolError::TtcDecode(
512                        "direct path binary float sent for other column type",
513                    ));
514                }
515                let encoded = encode_binary_float(*value);
516                self.write_raw_bytes_and_length(&encoded)
517            }
518            DirectPathColumnValue::DateTime {
519                year,
520                month,
521                day,
522                hour,
523                minute,
524                second,
525                nanosecond,
526            } => {
527                let encoded = match metadata.ora_type_num {
528                    ORA_TYPE_NUM_DATE => {
529                        if *nanosecond != 0 {
530                            return Err(ProtocolError::TtcDecode(
531                                "direct path DATE value has fractional seconds",
532                            ));
533                        }
534                        encode_oracle_date(*year, *month, *day, *hour, *minute, *second)?.to_vec()
535                    }
536                    // the protocol requires a timestamp with zero fractional
537                    // seconds to be transmitted as a 7-byte date
538                    ORA_TYPE_NUM_TIMESTAMP | ORA_TYPE_NUM_TIMESTAMP_LTZ => encode_oracle_timestamp(
539                        *year,
540                        *month,
541                        *day,
542                        *hour,
543                        *minute,
544                        *second,
545                        *nanosecond,
546                    )?,
547                    ORA_TYPE_NUM_TIMESTAMP_TZ => encode_oracle_timestamp_tz(
548                        *year,
549                        *month,
550                        *day,
551                        *hour,
552                        *minute,
553                        *second,
554                        *nanosecond,
555                    )?,
556                    _ => {
557                        return Err(ProtocolError::TtcDecode(
558                            "direct path datetime sent for non-datetime column",
559                        ))
560                    }
561                };
562                self.write_raw_bytes_and_length(&encoded)
563            }
564            DirectPathColumnValue::Boolean(value) => {
565                if metadata.ora_type_num != ORA_TYPE_NUM_BOOLEAN {
566                    return Err(ProtocolError::TtcDecode(
567                        "direct path boolean sent for non-boolean column",
568                    ));
569                }
570                let encoded: &[u8] = if *value { &[1, 1] } else { &[0] };
571                self.write_raw_bytes_and_length(encoded)
572            }
573        }
574    }
575
576    /// Finalizes the stream and returns the pieces plus the total piece
577    /// length (piece data plus piece headers) for the load stream message.
578    pub fn finish(self) -> Result<(Vec<DirectPathPiece>, u32)> {
579        if self.current.is_some() {
580            return Err(ProtocolError::TtcDecode(
581                "direct path stream finished mid-row",
582            ));
583        }
584        let total = u32::try_from(self.total_piece_length)
585            .map_err(|_| ProtocolError::DirectPathLoadTooMuchData)?;
586        Ok((self.pieces, total))
587    }
588
589    fn bump_segments(&mut self) {
590        if let Some(state) = self.current.as_mut() {
591            state.num_segments = state.num_segments.saturating_add(1);
592        }
593    }
594
595    fn space_left(&self) -> usize {
596        TNS_DPLS_MAX_PIECE_SIZE.saturating_sub(self.data.len())
597    }
598
599    fn write_u8_in_piece(&mut self, value: u8) -> Result<()> {
600        if self.space_left() < 1 {
601            self.finalize_piece()?;
602            self.current = Some(PieceState::default());
603        }
604        self.data.push(value);
605        Ok(())
606    }
607
608    /// Mirrors `PieceBuffer._write_raw_bytes_and_length`: short values
609    /// (<= 0xfa bytes) are written as `u8 length + data`; longer values are
610    /// written as one or more `0xfe + u16be length + data` chunks that may
611    /// split across pieces with the SPLIT_WITH_PREV/NEXT flags.
612    fn write_raw_bytes_and_length(&mut self, bytes: &[u8]) -> Result<()> {
613        if bytes.len() <= TNS_DPLS_MAX_SHORT_LENGTH {
614            if bytes.len() + 1 > self.space_left() {
615                self.finalize_piece()?;
616                self.current = Some(PieceState::default());
617            }
618            self.data.push(bytes.len() as u8);
619            self.data.extend_from_slice(bytes);
620            self.bump_segments();
621            return Ok(());
622        }
623
624        let mut remaining = bytes;
625        while remaining.len() + 3 > self.space_left() {
626            // Fail-closed divergence from the reference: if fewer than four
627            // bytes remain in the piece the reference would emit a corrupt
628            // zero/negative-length chunk; start a fresh piece instead.
629            if self.space_left() < 4 {
630                self.finalize_piece()?;
631                self.current = Some(PieceState::default());
632                continue;
633            }
634            let chunk_len = self.space_left() - 3;
635            let (chunk, rest) = remaining.split_at(chunk_len.min(remaining.len()));
636            self.data.push(TNS_DPLS_LONG_LENGTH_INDICATOR);
637            self.data
638                .extend_from_slice(&(chunk.len() as u16).to_be_bytes());
639            self.data.extend_from_slice(chunk);
640            remaining = rest;
641            if let Some(state) = self.current.as_mut() {
642                state.is_split_with_next = true;
643            }
644            self.bump_segments();
645            self.finalize_piece()?;
646            self.current = Some(PieceState {
647                is_split_with_prev: !remaining.is_empty(),
648                ..PieceState::default()
649            });
650        }
651        if !remaining.is_empty() {
652            self.bump_segments();
653            self.data.push(TNS_DPLS_LONG_LENGTH_INDICATOR);
654            self.data
655                .extend_from_slice(&(remaining.len() as u16).to_be_bytes());
656            self.data.extend_from_slice(remaining);
657        }
658        Ok(())
659    }
660
661    fn finalize_piece(&mut self) -> Result<()> {
662        let Some(state) = self.current.take() else {
663            return Err(ProtocolError::TtcDecode(
664                "direct path piece finalized without an active piece",
665            ));
666        };
667        let mut flags = 0u8;
668        if state.is_first {
669            flags |= TNS_DPLS_ROW_HEADER_FIRST;
670        } else if state.is_split_with_prev {
671            flags |= TNS_DPLS_ROW_HEADER_SPLIT_WITH_PREV;
672        }
673        if state.is_last {
674            flags |= TNS_DPLS_ROW_HEADER_LAST;
675        } else if state.is_split_with_next {
676            flags |= TNS_DPLS_ROW_HEADER_SPLIT_WITH_NEXT;
677        }
678        let is_fast_row = state.is_first && state.is_last && state.is_fast;
679        if is_fast_row {
680            flags |= TNS_DPLS_ROW_HEADER_FAST_ROW | TNS_DPLS_ROW_HEADER_FAST_PIECE;
681        }
682        let num_segments = u8::try_from(state.num_segments)
683            .map_err(|_| ProtocolError::TtcDecode("direct path piece segment count overflow"))?;
684        let piece = DirectPathPiece {
685            flags,
686            num_segments,
687            data: std::mem::take(&mut self.data),
688        };
689        let new_length = self.total_piece_length + piece.data.len() as u64 + piece.header_length();
690        if new_length > TNS_DPLS_MAX_MESSAGE_SIZE {
691            return Err(ProtocolError::DirectPathLoadTooMuchData);
692        }
693        self.total_piece_length = new_length;
694        self.pieces.push(piece);
695        // callers decide what the next piece (if any) looks like
696        Ok(())
697    }
698}
699
700/// Fast direct path types per the reference `DbType._is_fast` flags. LONG and
701/// LONG RAW (and thus inlined CLOB/BLOB) are not fast.
702fn is_fast_dbtype(metadata: &ColumnMetadata) -> bool {
703    matches!(
704        metadata.ora_type_num,
705        ORA_TYPE_NUM_VARCHAR
706            | ORA_TYPE_NUM_NUMBER
707            | ORA_TYPE_NUM_BINARY_INTEGER
708            | ORA_TYPE_NUM_CHAR
709            | ORA_TYPE_NUM_DATE
710            | ORA_TYPE_NUM_RAW
711            | ORA_TYPE_NUM_BINARY_FLOAT
712            | ORA_TYPE_NUM_BINARY_DOUBLE
713            | ORA_TYPE_NUM_BOOLEAN
714            | ORA_TYPE_NUM_TIMESTAMP
715            | ORA_TYPE_NUM_TIMESTAMP_TZ
716            | ORA_TYPE_NUM_TIMESTAMP_LTZ
717    )
718}
719
720/// Result of encoding one batch of rows into the piece stream format.
721#[derive(Clone, Debug, Eq, PartialEq)]
722pub struct DirectPathStream {
723    pub pieces: Vec<DirectPathPiece>,
724    pub total_piece_length: u32,
725}
726
727/// Encodes a batch of rows into direct path pieces.
728///
729/// `first_row_num` is the 1-based number of the first row in this batch for
730/// error reporting; the reference keeps a running row counter across batches
731/// of a single `direct_path_load` call.
732pub fn encode_direct_path_rows(
733    column_metadata: &[ColumnMetadata],
734    rows: &[Vec<DirectPathColumnValue>],
735    first_row_num: u64,
736) -> Result<DirectPathStream> {
737    let mut buffer = DirectPathPieceBuffer::new();
738    for (row_index, row) in rows.iter().enumerate() {
739        if row.len() != column_metadata.len() {
740            return Err(ProtocolError::TtcDecode(
741                "direct path row width does not match column metadata",
742            ));
743        }
744        let row_num = first_row_num + row_index as u64;
745        buffer.start_row()?;
746        for (metadata, value) in column_metadata.iter().zip(row) {
747            buffer.add_column_value(metadata, value, row_num)?;
748        }
749        buffer.finish_row()?;
750    }
751    let (pieces, total_piece_length) = buffer.finish()?;
752    Ok(DirectPathStream {
753        pieces,
754        total_piece_length,
755    })
756}
757
758/// Builds the payload for TTC function 129 (direct path load stream).
759///
760/// Mirrors `DirectPathLoadStreamMessage._write_message`.
761pub fn build_direct_path_load_stream_payload(
762    cursor_id: u16,
763    stream: &DirectPathStream,
764    seq_num: u8,
765) -> Result<Vec<u8>> {
766    let mut writer = TtcWriter::new();
767    writer.write_function_code_with_seq(TNS_FUNC_DIRECT_PATH_LOAD_STREAM, seq_num);
768    writer.write_ub8(0); // token number
769    writer.write_ub2(cursor_id);
770    writer.write_u8(1); // pointer (buffer)
771    writer.write_ub4(stream.total_piece_length);
772    writer.write_ub4(TNS_DP_STREAM_VERSION);
773    writer.write_u8(0); // pointer (input values)
774    writer.write_ub4(0); // number of input values
775    writer.write_u8(1); // pointer (output values)
776    writer.write_u8(1); // pointer (output values length)
777    for piece in &stream.pieces {
778        piece.write_to(&mut writer)?;
779    }
780    Ok(writer.into_bytes())
781}
782
783/// Batch/chunk state machine shared by `executemany` ingestion and direct
784/// path load. Port of `BatchLoadManager`/`DataFrameBatchLoadManager`
785/// (impl/base/batch_load_manager.pyx).
786///
787/// The data source is modelled as a list of chunks (an Arrow chunked array
788/// has one entry per chunk; a plain list of rows is a single chunk). Batches
789/// never span chunk boundaries; `message_offset` is the row offset *within
790/// the current chunk* that must accompany the execute/load message.
791#[derive(Clone, Debug, Eq, PartialEq)]
792pub struct BatchLoadState {
793    chunk_lengths: Vec<u64>,
794    batch_size: u32,
795    chunk_index: usize,
796    offset: u64,
797    message_offset: u64,
798    num_rows: u32,
799}
800
801impl BatchLoadState {
802    pub fn new(chunk_lengths: Vec<u64>, batch_size: u32) -> Result<Self> {
803        if batch_size == 0 {
804            return Err(ProtocolError::TtcDecode(
805                "batch_size must be a positive integer",
806            ));
807        }
808        let mut state = Self {
809            chunk_lengths,
810            batch_size,
811            chunk_index: 0,
812            offset: 0,
813            message_offset: 0,
814            num_rows: 0,
815        };
816        state.advance_batch();
817        Ok(state)
818    }
819
820    /// Creates the state machine for a single-chunk source of `total_rows`
821    /// rows (a plain list of rows).
822    pub fn for_rows(total_rows: u64, batch_size: u32) -> Result<Self> {
823        Self::new(vec![total_rows], batch_size)
824    }
825
826    /// Number of rows in the current batch; zero means the load is complete.
827    pub fn num_rows(&self) -> u32 {
828        self.num_rows
829    }
830
831    /// Row offset of the current batch within the current chunk.
832    pub fn offset(&self) -> u64 {
833        self.offset
834    }
835
836    /// Offset to send with the execute/load message (row offset within the
837    /// current chunk at the time the batch was formed).
838    pub fn message_offset(&self) -> u64 {
839        self.message_offset
840    }
841
842    /// Index of the chunk the current batch draws from.
843    pub fn chunk_index(&self) -> usize {
844        self.chunk_index
845    }
846
847    pub fn is_done(&self) -> bool {
848        self.num_rows == 0
849    }
850
851    /// Advances to the next batch (mirrors `BatchLoadManager.next_batch`).
852    pub fn next_batch(&mut self) {
853        self.offset += u64::from(self.num_rows);
854        self.advance_batch();
855    }
856
857    fn rows_in_current_chunk(&self) -> u64 {
858        self.chunk_lengths
859            .get(self.chunk_index)
860            .copied()
861            .unwrap_or(0)
862    }
863
864    fn calculate_num_rows_in_batch(&mut self) {
865        let remaining = self.rows_in_current_chunk().saturating_sub(self.offset);
866        self.num_rows = u32::try_from(remaining.min(u64::from(self.batch_size))).unwrap_or(0);
867    }
868
869    fn advance_batch(&mut self) {
870        self.message_offset = self.offset;
871        self.calculate_num_rows_in_batch();
872        if self.num_rows == 0 {
873            self.advance_chunk();
874        }
875    }
876
877    fn advance_chunk(&mut self) {
878        while self.chunk_index + 1 < self.chunk_lengths.len() {
879            self.offset = 0;
880            self.message_offset = 0;
881            self.chunk_index += 1;
882            self.calculate_num_rows_in_batch();
883            if self.num_rows > 0 {
884                break;
885            }
886        }
887    }
888}
889
890#[cfg(test)]
891mod tests {
892    use super::*;
893
894    // BoundedReader invariant (l2p), direct-path columns family: a PARAMETER
895    // message declaring a huge num_columns (ub4 ~620M) with no column-metadata
896    // bytes following must fail closed via with_capacity_bounded + the per-
897    // column parse, not reserve one ColumnMetadata per declared column. (This
898    // replaces the old arbitrary `.min(1024)` cap with a buffer-anchored bound.)
899    #[test]
900    fn direct_path_oversized_column_count_fails_closed_not_oom() {
901        // type=8 PARAMETER; num_columns ub4 (len byte 4) = 0x25000000, then EOF.
902        let payload = [TNS_MSG_TYPE_PARAMETER, 4, 0x25, 0x00, 0x00, 0x00];
903        let err = parse_direct_path_prepare_response(&payload, ClientCapabilities::default())
904            .expect_err("oversized direct-path column count must fail closed");
905        assert!(matches!(err, ProtocolError::TtcDecode(_)), "got {err:?}");
906    }
907
908    fn column(name: &str, ora_type_num: u8, max_size: u32, nulls_allowed: bool) -> ColumnMetadata {
909        ColumnMetadata {
910            name: name.to_string(),
911            ora_type_num,
912            csfrm: if matches!(
913                ora_type_num,
914                ORA_TYPE_NUM_VARCHAR | ORA_TYPE_NUM_CHAR | ORA_TYPE_NUM_LONG
915            ) {
916                CS_FORM_IMPLICIT
917            } else {
918                0
919            },
920            precision: 0,
921            scale: 0,
922            buffer_size: max_size,
923            max_size,
924            nulls_allowed,
925            is_json: false,
926            is_oson: false,
927            object_schema: None,
928            object_type_name: None,
929            is_array: false,
930            vector_dimensions: None,
931            vector_format: 0,
932            vector_flags: 0,
933            ..Default::default()
934        }
935    }
936
937    #[test]
938    fn prepare_payload_matches_reference_layout() {
939        let payload = build_direct_path_prepare_payload(
940            "pythontest",
941            "dpl_golden",
942            &["id".to_string(), "name".to_string()],
943            10,
944        )
945        .expect("payload should build");
946        // header: msg type, function code, seq, token
947        assert_eq!(&payload[..4], &[3, 128, 10, 0]);
948        let mut expected = vec![
949            1, 1, // ub4 op code LOAD
950            1, // kw pointer
951            1, 4, // ub4 kw length = 2 columns + 2
952            1, // input array pointer
953            1, 15, // ub2 in values length
954            1, 1, 1, 1, 1, 1, // six pointers
955        ];
956        // schema name
957        expected.extend_from_slice(&[0, 1, 10]);
958        expected.extend_from_slice(&[10]);
959        expected.extend_from_slice(b"pythontest");
960        expected.extend_from_slice(&[1, 3]);
961        // table name
962        expected.extend_from_slice(&[0, 1, 10]);
963        expected.extend_from_slice(&[10]);
964        expected.extend_from_slice(b"dpl_golden");
965        expected.extend_from_slice(&[1, 1]);
966        // column names
967        expected.extend_from_slice(&[0, 1, 2, 2]);
968        expected.extend_from_slice(b"id");
969        expected.extend_from_slice(&[1, 4]);
970        expected.extend_from_slice(&[0, 1, 4, 4]);
971        expected.extend_from_slice(b"name");
972        expected.extend_from_slice(&[1, 4]);
973        // in values: 400, 400, 9 zeros, 0xffff, 0, 0, 1
974        expected.extend_from_slice(&[2, 0x01, 0x90, 2, 0x01, 0x90]);
975        expected.extend_from_slice(&[0; 9]);
976        expected.extend_from_slice(&[2, 0xff, 0xff, 0, 0, 1, 1]);
977        assert_eq!(&payload[4..], expected.as_slice());
978    }
979
980    #[test]
981    fn op_payload_matches_reference_layout() {
982        let payload = build_direct_path_op_payload(1, TNS_DP_OP_FINISH, 12);
983        assert_eq!(
984            payload,
985            vec![3, 130, 12, 0, 1, 2, 1, 1, 0, 0, 1, 1],
986            "fn code, seq, token, ub4 op, ub2 cursor, ptr 0, ub4 0, ptr 1, ptr 1"
987        );
988    }
989
990    #[test]
991    fn single_fast_row_produces_one_fast_piece() {
992        let columns = vec![
993            column("ID", ORA_TYPE_NUM_NUMBER, 0, false),
994            column("NAME", ORA_TYPE_NUM_VARCHAR, 100, false),
995        ];
996        let rows = vec![vec![
997            DirectPathColumnValue::Number("1".into()),
998            DirectPathColumnValue::Bytes(b"alpha".to_vec()),
999        ]];
1000        let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1001        assert_eq!(stream.pieces.len(), 1);
1002        let piece = &stream.pieces[0];
1003        assert_eq!(
1004            piece.flags(),
1005            TNS_DPLS_ROW_HEADER_FIRST
1006                | TNS_DPLS_ROW_HEADER_LAST
1007                | TNS_DPLS_ROW_HEADER_FAST_ROW
1008                | TNS_DPLS_ROW_HEADER_FAST_PIECE
1009        );
1010        assert_eq!(piece.num_segments(), 2);
1011        // number 1 encodes as c1 02; "alpha" as length + bytes
1012        assert_eq!(
1013            piece.data(),
1014            &[2, 0xc1, 0x02, 5, b'a', b'l', b'p', b'h', b'a']
1015        );
1016        // total = data + 4-byte fast header
1017        assert_eq!(stream.total_piece_length, piece.data().len() as u32 + 4);
1018    }
1019
1020    #[test]
1021    fn long_column_clears_fast_flag() {
1022        let columns = vec![column("WIDE", ORA_TYPE_NUM_LONG, 0, false)];
1023        let rows = vec![vec![DirectPathColumnValue::Bytes(vec![b'x'; 10])]];
1024        let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1025        assert_eq!(stream.pieces.len(), 1);
1026        assert_eq!(
1027            stream.pieces[0].flags(),
1028            TNS_DPLS_ROW_HEADER_FIRST | TNS_DPLS_ROW_HEADER_LAST
1029        );
1030        // 1 length byte + 10 data bytes + 2-byte slow header
1031        assert_eq!(stream.total_piece_length, 11 + 2);
1032    }
1033
1034    #[test]
1035    fn null_values_encode_as_null_indicator() {
1036        let columns = vec![column("SALARY", ORA_TYPE_NUM_NUMBER, 0, true)];
1037        let rows = vec![vec![DirectPathColumnValue::Null]];
1038        let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1039        assert_eq!(stream.pieces[0].data(), &[0xff]);
1040        assert_eq!(stream.pieces[0].num_segments(), 1);
1041    }
1042
1043    #[test]
1044    fn null_into_not_null_column_raises_dpy_8001() {
1045        let columns = vec![column("NAME", ORA_TYPE_NUM_VARCHAR, 100, false)];
1046        let rows = vec![vec![DirectPathColumnValue::Null]];
1047        let err = encode_direct_path_rows(&columns, &rows, 1).expect_err("nulls must be rejected");
1048        assert!(
1049            err.to_string().starts_with("DPY-8001:"),
1050            "unexpected error: {err}"
1051        );
1052        assert!(err.to_string().contains("\"NAME\""), "{err}");
1053        assert!(err.to_string().contains("row 1"), "{err}");
1054    }
1055
1056    #[test]
1057    fn oversized_value_raises_dpy_8000() {
1058        let columns = vec![column("NAME", ORA_TYPE_NUM_VARCHAR, 4, false)];
1059        let rows = vec![vec![DirectPathColumnValue::Bytes(b"toolong".to_vec())]];
1060        let err = encode_direct_path_rows(&columns, &rows, 3).expect_err("size must be enforced");
1061        assert!(
1062            err.to_string().starts_with("DPY-8000:"),
1063            "unexpected error: {err}"
1064        );
1065        assert!(err.to_string().contains("row 3"), "{err}");
1066    }
1067
1068    #[test]
1069    fn long_values_use_fe_chunked_segments() {
1070        // 600 bytes > 0xfa, must use the 0xfe + u16be length form
1071        let columns = vec![column("WIDE", ORA_TYPE_NUM_VARCHAR, 1000, false)];
1072        let value = vec![b'q'; 600];
1073        let rows = vec![vec![DirectPathColumnValue::Bytes(value.clone())]];
1074        let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1075        assert_eq!(stream.pieces.len(), 1);
1076        let piece = &stream.pieces[0];
1077        assert_eq!(piece.num_segments(), 1);
1078        let mut expected = vec![0xfe, 0x02, 0x58];
1079        expected.extend_from_slice(&value);
1080        assert_eq!(piece.data(), expected.as_slice());
1081    }
1082
1083    #[test]
1084    fn values_larger_than_piece_split_across_pieces_with_split_flags() {
1085        let columns = vec![column("WIDE", ORA_TYPE_NUM_LONG, 0, false)];
1086        let total = TNS_DPLS_MAX_PIECE_SIZE + 100;
1087        let rows = vec![vec![DirectPathColumnValue::Bytes(vec![b'z'; total])]];
1088        let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1089        assert_eq!(stream.pieces.len(), 2);
1090        let first = &stream.pieces[0];
1091        let second = &stream.pieces[1];
1092        assert_eq!(
1093            first.flags(),
1094            TNS_DPLS_ROW_HEADER_FIRST | TNS_DPLS_ROW_HEADER_SPLIT_WITH_NEXT
1095        );
1096        assert_eq!(
1097            second.flags(),
1098            TNS_DPLS_ROW_HEADER_SPLIT_WITH_PREV | TNS_DPLS_ROW_HEADER_LAST
1099        );
1100        // first piece is filled to the brim: 3-byte chunk header + payload
1101        assert_eq!(first.data().len(), TNS_DPLS_MAX_PIECE_SIZE);
1102        assert_eq!(first.data()[0], 0xfe);
1103        let first_chunk = usize::from(u16::from_be_bytes([first.data()[1], first.data()[2]]));
1104        assert_eq!(first_chunk, TNS_DPLS_MAX_PIECE_SIZE - 3);
1105        let second_chunk = usize::from(u16::from_be_bytes([second.data()[1], second.data()[2]]));
1106        assert_eq!(first_chunk + second_chunk, total);
1107        assert_eq!(
1108            stream.total_piece_length as usize,
1109            first.data().len() + second.data().len() + 2 + 2
1110        );
1111    }
1112
1113    #[test]
1114    fn segment_count_caps_at_255_per_piece() {
1115        let columns: Vec<ColumnMetadata> = (0..300)
1116            .map(|i| column(&format!("C{i}"), ORA_TYPE_NUM_NUMBER, 0, true))
1117            .collect();
1118        let row: Vec<DirectPathColumnValue> =
1119            (0..300).map(|_| DirectPathColumnValue::Null).collect();
1120        let stream = encode_direct_path_rows(&columns, &[row], 1).expect("stream should encode");
1121        assert_eq!(stream.pieces.len(), 2);
1122        assert_eq!(stream.pieces[0].num_segments(), 255);
1123        assert_eq!(stream.pieces[1].num_segments(), 45);
1124        // continuation piece created by the segment cap carries neither FIRST
1125        // nor SPLIT_WITH_PREV (mirrors the reference)
1126        assert_eq!(stream.pieces[0].flags(), TNS_DPLS_ROW_HEADER_FIRST);
1127        assert_eq!(stream.pieces[1].flags(), TNS_DPLS_ROW_HEADER_LAST);
1128    }
1129
1130    #[test]
1131    fn timestamp_with_zero_fraction_collapses_to_seven_bytes() {
1132        let columns = vec![column("TS", ORA_TYPE_NUM_TIMESTAMP, 0, true)];
1133        let rows = vec![vec![DirectPathColumnValue::DateTime {
1134            year: 2024,
1135            month: 1,
1136            day: 2,
1137            hour: 3,
1138            minute: 4,
1139            second: 5,
1140            nanosecond: 0,
1141        }]];
1142        let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1143        assert_eq!(
1144            stream.pieces[0].data(),
1145            &[7, 120, 124, 1, 2, 4, 5, 6],
1146            "7-byte date form expected when fractional seconds are zero"
1147        );
1148    }
1149
1150    #[test]
1151    fn boolean_values_encode_per_reference() {
1152        let columns = vec![column("FLAG", ORA_TYPE_NUM_BOOLEAN, 0, true)];
1153        let rows = vec![
1154            vec![DirectPathColumnValue::Boolean(true)],
1155            vec![DirectPathColumnValue::Boolean(false)],
1156        ];
1157        let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1158        assert_eq!(stream.pieces[0].data(), &[2, 1, 1]);
1159        assert_eq!(stream.pieces[1].data(), &[1, 0]);
1160    }
1161
1162    #[test]
1163    fn row_width_mismatch_is_rejected() {
1164        let columns = vec![
1165            column("A", ORA_TYPE_NUM_NUMBER, 0, true),
1166            column("B", ORA_TYPE_NUM_NUMBER, 0, true),
1167        ];
1168        let rows = vec![vec![DirectPathColumnValue::Null]];
1169        assert!(encode_direct_path_rows(&columns, &rows, 1).is_err());
1170    }
1171
1172    #[test]
1173    fn metadata_overrides_inline_lobs() {
1174        let mut clob = column("DOC", ORA_TYPE_NUM_CLOB, 0, true);
1175        clob.csfrm = CS_FORM_IMPLICIT;
1176        apply_direct_path_metadata_overrides(&mut clob, 873);
1177        assert_eq!(clob.ora_type_num, ORA_TYPE_NUM_LONG);
1178        assert_eq!(clob.csfrm, CS_FORM_NCHAR, "multi-byte charset uses NCHAR");
1179
1180        let mut clob = column("DOC", ORA_TYPE_NUM_CLOB, 0, true);
1181        clob.csfrm = CS_FORM_IMPLICIT;
1182        apply_direct_path_metadata_overrides(&mut clob, 178);
1183        assert_eq!(
1184            clob.csfrm, CS_FORM_IMPLICIT,
1185            "single-byte charset keeps form"
1186        );
1187
1188        let mut blob = column("BIN", ORA_TYPE_NUM_BLOB, 0, true);
1189        apply_direct_path_metadata_overrides(&mut blob, 873);
1190        assert_eq!(blob.ora_type_num, ORA_TYPE_NUM_LONG_RAW);
1191        assert_eq!(blob.csfrm, 0);
1192    }
1193
1194    #[test]
1195    fn batch_state_single_chunk_splits_by_batch_size() {
1196        let mut state = BatchLoadState::for_rows(5, 2).expect("state should build");
1197        assert_eq!(
1198            (state.num_rows(), state.offset(), state.message_offset()),
1199            (2, 0, 0)
1200        );
1201        state.next_batch();
1202        assert_eq!(
1203            (state.num_rows(), state.offset(), state.message_offset()),
1204            (2, 2, 2)
1205        );
1206        state.next_batch();
1207        assert_eq!(
1208            (state.num_rows(), state.offset(), state.message_offset()),
1209            (1, 4, 4)
1210        );
1211        state.next_batch();
1212        assert!(state.is_done());
1213    }
1214
1215    #[test]
1216    fn batch_state_never_spans_chunks() {
1217        // chunks of 3 and 2 rows with batch size 2: batches are 2, 1, 2
1218        let mut state = BatchLoadState::new(vec![3, 2], 2).expect("state should build");
1219        assert_eq!(
1220            (
1221                state.chunk_index(),
1222                state.num_rows(),
1223                state.message_offset()
1224            ),
1225            (0, 2, 0)
1226        );
1227        state.next_batch();
1228        assert_eq!(
1229            (
1230                state.chunk_index(),
1231                state.num_rows(),
1232                state.message_offset()
1233            ),
1234            (0, 1, 2)
1235        );
1236        state.next_batch();
1237        assert_eq!(
1238            (
1239                state.chunk_index(),
1240                state.num_rows(),
1241                state.message_offset()
1242            ),
1243            (1, 2, 0)
1244        );
1245        state.next_batch();
1246        assert!(state.is_done());
1247    }
1248
1249    #[test]
1250    fn batch_state_skips_empty_chunks() {
1251        let mut state = BatchLoadState::new(vec![0, 0, 3], 10).expect("state should build");
1252        assert_eq!((state.chunk_index(), state.num_rows()), (2, 3));
1253        state.next_batch();
1254        assert!(state.is_done());
1255    }
1256
1257    #[test]
1258    fn batch_state_rejects_zero_batch_size() {
1259        assert!(BatchLoadState::for_rows(5, 0).is_err());
1260    }
1261
1262    #[test]
1263    fn batch_state_empty_source_is_done_immediately() {
1264        let state = BatchLoadState::for_rows(0, 10).expect("state should build");
1265        assert!(state.is_done());
1266    }
1267
1268    #[test]
1269    fn load_stream_payload_header_matches_reference_layout() {
1270        let columns = vec![column("ID", ORA_TYPE_NUM_NUMBER, 0, false)];
1271        let rows = vec![vec![DirectPathColumnValue::Number("1".into())]];
1272        let stream = encode_direct_path_rows(&columns, &rows, 1).expect("stream should encode");
1273        let payload =
1274            build_direct_path_load_stream_payload(1, &stream, 11).expect("payload should build");
1275        let mut expected = vec![
1276            3, 129, 11, // fn code + seq
1277            0,  // token
1278            1, 1, // ub2 cursor id
1279            1, // buffer pointer
1280            1, 7, // ub4 total piece length (3 data + 4 header)
1281            2, 0x01, 0x90, // ub4 stream version 400
1282            0,    // input values pointer
1283            0,    // ub4 input values count
1284            1, 1, // output pointers
1285            0x3c, 0, 7, 1, // piece: flags, u16be total, num segments
1286            2, 0xc1, 0x02, // number 1
1287        ];
1288        assert_eq!(payload, std::mem::take(&mut expected));
1289    }
1290}