Skip to main content

oracledb_protocol/thin/
fetch.rs

1#![forbid(unsafe_code)]
2
3use super::*;
4use crate::wire::ProtocolLimits;
5
6/// Validate `slice` as UTF-8 for the hot borrowed-text decode path, returning the
7/// borrowed `&str` on success or `()` on rejection (the caller falls back to the
8/// owned `TextRaw` carrier — semantics identical regardless of validator).
9///
10/// With the `simd-decode` feature this uses `simdutf8::basic::from_utf8`, whose
11/// accept/reject decision is byte-for-byte identical to `core::str::from_utf8`
12/// (it validates the exact same UTF-8 grammar — it only declines to compute the
13/// error *position*, which this path never uses). The crate stays
14/// `#![forbid(unsafe_code)]`-clean: `simdutf8`'s SIMD `unsafe` is encapsulated
15/// inside that dependency and we call only its safe API.
16#[inline]
17fn validate_utf8(slice: &[u8]) -> core::result::Result<&str, ()> {
18    #[cfg(feature = "simd-decode")]
19    {
20        simdutf8::basic::from_utf8(slice).map_err(|_| ())
21    }
22    #[cfg(not(feature = "simd-decode"))]
23    {
24        core::str::from_utf8(slice).map_err(|_| ())
25    }
26}
27
28#[derive(Clone, Copy, Debug, Eq, PartialEq)]
29pub(crate) enum LobDecodeMode {
30    PlainLocator,
31    DefineMetadata,
32}
33
34pub fn build_fetch_payload(cursor_id: u32, arraysize: u32) -> Vec<u8> {
35    build_fetch_payload_with_seq(cursor_id, arraysize, 1)
36}
37
38pub fn build_fetch_payload_with_seq(cursor_id: u32, arraysize: u32, seq_num: u8) -> Vec<u8> {
39    // Fixed tiny payload (function code + ub8 + two ub4 ≈ <=20 bytes). Prealloc
40    // so the small pushes do not grow the Vec through doublings; built every
41    // fetch page, so this matters on multi-page fetches. Bytes unchanged.
42    let mut writer = TtcWriter::with_capacity(32);
43    writer.write_function_code_with_seq(TNS_FUNC_FETCH, seq_num);
44    writer.write_ub8(0);
45    writer.write_ub4(cursor_id);
46    writer.write_ub4(arraysize);
47    writer.into_bytes()
48}
49
50pub fn build_define_fetch_payload_with_seq(
51    cursor_id: u32,
52    arraysize: u32,
53    seq_num: u8,
54    define_columns: &[ColumnMetadata],
55) -> Result<Vec<u8>> {
56    let define_count =
57        u32::try_from(define_columns.len()).map_err(|_| ProtocolError::InvalidPacketLength {
58            length: define_columns.len(),
59            minimum: 0,
60        })?;
61    let mut writer = TtcWriter::new();
62    writer.write_function_code_with_seq(TNS_FUNC_EXECUTE, seq_num);
63    writer.write_ub8(0);
64    writer.write_ub4(TNS_EXEC_OPTION_DEFINE | TNS_EXEC_OPTION_NOT_PLSQL);
65    writer.write_ub4(cursor_id);
66    writer.write_u8(0);
67    writer.write_ub4(0);
68    writer.write_u8(1);
69    writer.write_ub4(13);
70    writer.write_u8(0);
71    writer.write_u8(0);
72    writer.write_ub4(0);
73    writer.write_ub4(arraysize);
74    writer.write_ub4(TNS_MAX_LONG_LENGTH);
75    writer.write_u8(0);
76    writer.write_ub4(0);
77    writer.write_u8(0);
78    writer.write_u8(0);
79    writer.write_u8(0);
80    writer.write_u8(0);
81    writer.write_u8(0);
82    writer.write_u8(1);
83    writer.write_ub4(define_count);
84    writer.write_ub4(0);
85    writer.write_u8(0);
86    writer.write_u8(1);
87    writer.write_u8(0);
88    writer.write_ub4(0);
89    writer.write_u8(0);
90    writer.write_ub4(0);
91    writer.write_ub4(0);
92    writer.write_u8(0);
93    writer.write_ub4(0);
94    writer.write_u8(0);
95    writer.write_u8(0);
96    writer.write_ub4(0);
97    writer.write_ub4(0);
98    writer.write_ub4(0);
99    writer.write_ub4(0);
100    writer.write_ub4(0);
101    writer.write_ub4(0);
102    writer.write_ub4(0);
103    writer.write_ub4(arraysize);
104    writer.write_ub4(0);
105    writer.write_ub4(0);
106    writer.write_ub4(0);
107    writer.write_ub4(0);
108    writer.write_ub4(0);
109    writer.write_ub4(1);
110    writer.write_ub4(0);
111    writer.write_ub4(0);
112    writer.write_ub4(0);
113    writer.write_ub4(0);
114    writer.write_ub4(0);
115    for metadata in define_columns {
116        write_define_column_metadata(&mut writer, metadata);
117    }
118    Ok(writer.into_bytes())
119}
120
121pub(crate) fn write_define_column_metadata(writer: &mut TtcWriter, metadata: &ColumnMetadata) {
122    // reference base.pyx: VECTOR (and JSON) columns advertise a LOB-prefetch
123    // buffer so the server streams the image inline rather than returning a
124    // bare temp-LOB locator
125    let (mut buffer_size, cont_flags, lob_prefetch_length) = match metadata.ora_type_num {
126        ORA_TYPE_NUM_CLOB | ORA_TYPE_NUM_BLOB => (metadata.buffer_size, TNS_LOB_PREFETCH_FLAG, 0),
127        ORA_TYPE_NUM_VECTOR => (
128            TNS_VECTOR_MAX_LENGTH,
129            TNS_LOB_PREFETCH_FLAG,
130            TNS_VECTOR_MAX_LENGTH,
131        ),
132        ORA_TYPE_NUM_JSON => (
133            TNS_JSON_MAX_LENGTH,
134            TNS_LOB_PREFETCH_FLAG,
135            TNS_JSON_MAX_LENGTH,
136        ),
137        _ => (metadata.buffer_size, 0, 0),
138    };
139    buffer_size = buffer_size.max(1);
140    writer.write_u8(metadata.ora_type_num);
141    writer.write_u8(TNS_BIND_USE_INDICATORS);
142    writer.write_u8(0);
143    writer.write_u8(0);
144    writer.write_ub4(buffer_size);
145    writer.write_ub4(0);
146    writer.write_ub8(cont_flags);
147    writer.write_ub4(0);
148    writer.write_ub2(0);
149    if metadata.csfrm != 0 {
150        writer.write_ub2(TNS_CHARSET_UTF8);
151    } else {
152        writer.write_ub2(0);
153    }
154    writer.write_u8(metadata.csfrm);
155    writer.write_ub4(lob_prefetch_length);
156    writer.write_ub4(0);
157}
158
159pub fn parse_query_response(
160    payload: &[u8],
161    capabilities: ClientCapabilities,
162) -> Result<QueryResult> {
163    parse_query_response_with_previous(payload, capabilities, None)
164}
165
166pub fn parse_query_response_with_limits(
167    payload: &[u8],
168    capabilities: ClientCapabilities,
169    limits: ProtocolLimits,
170) -> Result<QueryResult> {
171    parse_query_response_with_context_binds_options_and_limits(
172        payload,
173        capabilities,
174        &[],
175        None,
176        &[],
177        &[],
178        false,
179        ExecuteOptions::default(),
180        limits,
181    )
182}
183
184pub fn parse_query_response_with_binds(
185    payload: &[u8],
186    capabilities: ClientCapabilities,
187    binds: &[BindValue],
188) -> Result<QueryResult> {
189    parse_query_response_with_binds_and_options(
190        payload,
191        capabilities,
192        binds,
193        ExecuteOptions::default(),
194    )
195}
196
197pub fn parse_query_response_with_binds_and_options(
198    payload: &[u8],
199    capabilities: ClientCapabilities,
200    binds: &[BindValue],
201    exec_options: ExecuteOptions,
202) -> Result<QueryResult> {
203    parse_query_response_with_binds_options_and_columns(
204        payload,
205        capabilities,
206        binds,
207        exec_options,
208        &[],
209    )
210}
211
212/// `known_columns` carries the fetch metadata of a re-executed statement
213/// whose response does not repeat the describe information (reference keeps
214/// the statement's fetch vars across executions).
215pub fn parse_query_response_with_binds_options_and_columns(
216    payload: &[u8],
217    capabilities: ClientCapabilities,
218    binds: &[BindValue],
219    exec_options: ExecuteOptions,
220    known_columns: &[ColumnMetadata],
221) -> Result<QueryResult> {
222    let bind_columns = binds.iter().map(bind_column_metadata).collect::<Vec<_>>();
223    let output_bind_indexes = binds
224        .iter()
225        .enumerate()
226        .filter_map(|(index, value)| value.is_return_output().then_some(index))
227        .collect::<Vec<_>>();
228    parse_query_response_with_context_binds_and_options(
229        payload,
230        capabilities,
231        known_columns,
232        None,
233        &bind_columns,
234        &output_bind_indexes,
235        false,
236        exec_options,
237    )
238}
239
240pub fn parse_query_response_with_binds_options_columns_and_limits(
241    payload: &[u8],
242    capabilities: ClientCapabilities,
243    binds: &[BindValue],
244    exec_options: ExecuteOptions,
245    known_columns: &[ColumnMetadata],
246    limits: ProtocolLimits,
247) -> Result<QueryResult> {
248    limits.check_binds(binds.len())?;
249    let bind_columns = binds.iter().map(bind_column_metadata).collect::<Vec<_>>();
250    let output_bind_indexes = binds
251        .iter()
252        .enumerate()
253        .filter_map(|(index, value)| value.is_return_output().then_some(index))
254        .collect::<Vec<_>>();
255    parse_query_response_with_context_binds_options_and_limits(
256        payload,
257        capabilities,
258        known_columns,
259        None,
260        &bind_columns,
261        &output_bind_indexes,
262        false,
263        exec_options,
264        limits,
265    )
266}
267
268pub fn parse_query_response_with_previous(
269    payload: &[u8],
270    capabilities: ClientCapabilities,
271    previous_row: Option<&[Option<QueryValue>]>,
272) -> Result<QueryResult> {
273    parse_query_response_with_context(payload, capabilities, &[], previous_row)
274}
275
276pub fn parse_query_response_with_context(
277    payload: &[u8],
278    capabilities: ClientCapabilities,
279    previous_columns: &[ColumnMetadata],
280    previous_row: Option<&[Option<QueryValue>]>,
281) -> Result<QueryResult> {
282    parse_query_response_with_context_and_binds(
283        payload,
284        capabilities,
285        previous_columns,
286        previous_row,
287        &[],
288        &[],
289        false,
290        ProtocolLimits::DEFAULT,
291    )
292}
293
294pub fn parse_fetch_response_with_context(
295    payload: &[u8],
296    capabilities: ClientCapabilities,
297    previous_columns: &[ColumnMetadata],
298    previous_row: Option<&[Option<QueryValue>]>,
299) -> Result<QueryResult> {
300    parse_fetch_response_with_context_and_limits(
301        payload,
302        capabilities,
303        previous_columns,
304        previous_row,
305        ProtocolLimits::DEFAULT,
306    )
307}
308
309pub fn parse_fetch_response_with_context_and_limits(
310    payload: &[u8],
311    capabilities: ClientCapabilities,
312    previous_columns: &[ColumnMetadata],
313    previous_row: Option<&[Option<QueryValue>]>,
314    limits: ProtocolLimits,
315) -> Result<QueryResult> {
316    parse_query_response_with_context_binds_options_lob_mode_and_limits(
317        payload,
318        capabilities,
319        previous_columns,
320        previous_row,
321        &[],
322        &[],
323        true,
324        ExecuteOptions::default(),
325        LobDecodeMode::PlainLocator,
326        limits,
327    )
328}
329
330pub fn parse_define_fetch_response_with_context_and_limits(
331    payload: &[u8],
332    capabilities: ClientCapabilities,
333    previous_columns: &[ColumnMetadata],
334    previous_row: Option<&[Option<QueryValue>]>,
335    limits: ProtocolLimits,
336) -> Result<QueryResult> {
337    parse_query_response_with_context_binds_options_lob_mode_and_limits(
338        payload,
339        capabilities,
340        previous_columns,
341        previous_row,
342        &[],
343        &[],
344        true,
345        ExecuteOptions::default(),
346        LobDecodeMode::DefineMetadata,
347        limits,
348    )
349}
350
351#[allow(clippy::too_many_arguments)] // mirrors the reference message attribute set
352pub(crate) fn parse_query_response_with_context_and_binds(
353    payload: &[u8],
354    capabilities: ClientCapabilities,
355    previous_columns: &[ColumnMetadata],
356    previous_row: Option<&[Option<QueryValue>]>,
357    bind_columns: &[ColumnMetadata],
358    output_bind_indexes: &[usize],
359    fetch_long_status: bool,
360    limits: ProtocolLimits,
361) -> Result<QueryResult> {
362    parse_query_response_with_context_binds_options_and_limits(
363        payload,
364        capabilities,
365        previous_columns,
366        previous_row,
367        bind_columns,
368        output_bind_indexes,
369        fetch_long_status,
370        ExecuteOptions::default(),
371        limits,
372    )
373}
374
375#[allow(clippy::too_many_arguments)] // mirrors the reference message attribute set
376pub(crate) fn parse_query_response_with_context_binds_and_options(
377    payload: &[u8],
378    capabilities: ClientCapabilities,
379    previous_columns: &[ColumnMetadata],
380    previous_row: Option<&[Option<QueryValue>]>,
381    bind_columns: &[ColumnMetadata],
382    output_bind_indexes: &[usize],
383    fetch_long_status: bool,
384    exec_options: ExecuteOptions,
385) -> Result<QueryResult> {
386    parse_query_response_with_context_binds_options_and_limits(
387        payload,
388        capabilities,
389        previous_columns,
390        previous_row,
391        bind_columns,
392        output_bind_indexes,
393        fetch_long_status,
394        exec_options,
395        ProtocolLimits::DEFAULT,
396    )
397}
398
399#[allow(clippy::too_many_arguments)] // mirrors the reference message attribute set
400pub(crate) fn parse_query_response_with_context_binds_options_and_limits(
401    payload: &[u8],
402    capabilities: ClientCapabilities,
403    previous_columns: &[ColumnMetadata],
404    previous_row: Option<&[Option<QueryValue>]>,
405    bind_columns: &[ColumnMetadata],
406    output_bind_indexes: &[usize],
407    fetch_long_status: bool,
408    exec_options: ExecuteOptions,
409    limits: ProtocolLimits,
410) -> Result<QueryResult> {
411    parse_query_response_with_context_binds_options_lob_mode_and_limits(
412        payload,
413        capabilities,
414        previous_columns,
415        previous_row,
416        bind_columns,
417        output_bind_indexes,
418        fetch_long_status,
419        exec_options,
420        LobDecodeMode::DefineMetadata,
421        limits,
422    )
423}
424
425#[allow(clippy::too_many_arguments)] // mirrors the reference message attribute set
426fn parse_query_response_with_context_binds_options_lob_mode_and_limits(
427    payload: &[u8],
428    capabilities: ClientCapabilities,
429    previous_columns: &[ColumnMetadata],
430    previous_row: Option<&[Option<QueryValue>]>,
431    bind_columns: &[ColumnMetadata],
432    output_bind_indexes: &[usize],
433    fetch_long_status: bool,
434    exec_options: ExecuteOptions,
435    lob_decode_mode: LobDecodeMode,
436    limits: ProtocolLimits,
437) -> Result<QueryResult> {
438    let mut reader = TtcReader::with_limits(payload, limits)?;
439    let mut result = QueryResult {
440        columns: previous_columns.to_vec(),
441        more_rows: true,
442        ..QueryResult::default()
443    };
444    // A re-executed cursor whose column type changed to CLOB/BLOB but was
445    // previously fetched as CHAR/VARCHAR/RAW streams the value in LONG/LONG RAW
446    // form (see `adjust_refetch_metadata`), which carries the LONG status
447    // trailer (null indicator + return code) after each value — even on the
448    // execute path that otherwise passes `fetch_long_status = false`. Promote
449    // the flag when such an adjustment fires so `parse_row_data` consumes that
450    // trailer instead of mis-framing the next message (bead rust-oracledb-f0ad).
451    let mut fetch_long_status = fetch_long_status;
452    let mut bit_vector: Option<Vec<u8>> = None;
453    let mut out_bind_indexes: Vec<usize> = Vec::new();
454    while reader.remaining() > 0 {
455        let message_type = reader.read_u8()?;
456        match message_type {
457            0 => {}
458            TNS_MSG_TYPE_DESCRIBE_INFO => {
459                let _describe_name = reader.read_bytes()?;
460                let previous = std::mem::take(&mut result.columns);
461                parse_describe_info(&mut reader, capabilities, &mut result)?;
462                // re-executing an open cursor whose underlying types changed:
463                // the server re-describes mid-response but still streams the
464                // row data in the adjusted (LONG/LONG RAW) form expected by
465                // the previous fetch metadata (reference `_adjust_metadata`,
466                // impl/thin/messages/base.pyx:820-845, applied during
467                // `_process_describe_info`).
468                for (index, column) in result.columns.iter_mut().enumerate() {
469                    if let Some(prev) = previous.get(index) {
470                        if adjust_refetch_metadata(prev, column) {
471                            // the adjusted column (now LONG / LONG RAW) is
472                            // streamed with the LONG status trailer.
473                            fetch_long_status = true;
474                        }
475                    }
476                }
477            }
478            TNS_MSG_TYPE_ROW_HEADER => {
479                bit_vector = parse_row_header(&mut reader)?;
480            }
481            TNS_MSG_TYPE_ROW_DATA => {
482                if result.columns.is_empty() && !out_bind_indexes.is_empty() {
483                    parse_out_bind_row_data(
484                        &mut reader,
485                        &mut result,
486                        bind_columns,
487                        &out_bind_indexes,
488                    )?;
489                } else if result.columns.is_empty() && !output_bind_indexes.is_empty() {
490                    parse_returning_row_data(
491                        &mut reader,
492                        &mut result,
493                        bind_columns,
494                        output_bind_indexes,
495                    )?;
496                } else {
497                    parse_row_data(
498                        &mut reader,
499                        &mut result,
500                        bit_vector.as_deref(),
501                        previous_row,
502                        fetch_long_status,
503                        lob_decode_mode,
504                    )?;
505                }
506                bit_vector = None;
507            }
508            TNS_MSG_TYPE_BIT_VECTOR => {
509                bit_vector = Some(parse_bit_vector(&mut reader, result.columns.len())?);
510            }
511            TNS_MSG_TYPE_PARAMETER => {
512                let params =
513                    parse_query_return_parameters(&mut reader, exec_options.arraydmlrowcounts)?;
514                if exec_options.arraydmlrowcounts {
515                    result.array_dml_row_counts = Some(params.row_counts.unwrap_or_default());
516                }
517                if params.query_id.is_some() {
518                    result.query_id = params.query_id;
519                }
520            }
521            TNS_MSG_TYPE_STATUS => {
522                let call_status = reader.read_ub4()?;
523                let _seq = reader.read_ub2()?;
524                result.txn_in_progress = Some(call_status & TNS_EOCS_FLAGS_TXN_IN_PROGRESS != 0);
525            }
526            TNS_MSG_TYPE_IO_VECTOR => {
527                out_bind_indexes = parse_io_vector(&mut reader, bind_columns.len())?
528                    .into_iter()
529                    .filter(|index| !output_bind_indexes.contains(index))
530                    .collect();
531            }
532            TNS_MSG_TYPE_FLUSH_OUT_BINDS => break,
533            TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
534                if let Some(update) = skip_server_side_piggyback(&mut reader)? {
535                    result.sessionless_txn_state = Some(update);
536                }
537            }
538            TNS_MSG_TYPE_IMPLICIT_RESULTSET => {
539                // reference messages/base.pyx `_process_implicit_result`
540                let num_results = reader.read_ub4()?;
541                // `num_results` is read straight off the wire (a ub4, up to
542                // ~4e9); each resultset consumes at least one byte, so cap the
543                // reservation by the bytes left in the payload (BoundedReader).
544                // Without this a hostile server forces a multi-gigabyte
545                // allocation (OOM) before the truncated read in the loop body
546                // fails closed.
547                let mut resultsets: Vec<QueryValue> = reader.with_capacity_limited(
548                    num_results as usize,
549                    1,
550                    ProtocolLimits::check_length_prefixed_elements,
551                )?;
552                for _ in 0..num_results {
553                    let num_bytes = reader.read_u8()?;
554                    reader.skip(usize::from(num_bytes))?;
555                    let mut child = QueryResult::default();
556                    parse_describe_info(&mut reader, capabilities, &mut child)?;
557                    let child_cursor_id = u32::from(reader.read_ub2()?);
558                    resultsets.push(QueryValue::Cursor(Box::new(CursorValue {
559                        columns: child.columns,
560                        cursor_id: child_cursor_id,
561                    })));
562                }
563                result.implicit_resultsets = Some(resultsets);
564            }
565            TNS_MSG_TYPE_END_OF_RESPONSE => break,
566            // pipeline responses open with the token of the operation they
567            // answer (messages/base.pyx:288-293); callers compare it against
568            // the expected token (mismatch -> DPY-2052 at the driver layer)
569            TNS_MSG_TYPE_TOKEN => {
570                result.token_num = Some(reader.read_ub8()?);
571            }
572            TNS_MSG_TYPE_ERROR => {
573                let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
574                // The end-of-call ERROR message (number 0 on success) carries
575                // the end-of-call status; sample the transaction-in-progress bit
576                // (reference protocol.pyx `_process_call_status`).
577                result.txn_in_progress =
578                    Some(info.call_status & TNS_EOCS_FLAGS_TXN_IN_PROGRESS != 0);
579                if info.cursor_id != 0 {
580                    result.cursor_id = u32::from(info.cursor_id);
581                }
582                result.row_count = info.row_count;
583                result.compilation_error_warning |= info.compilation_error_warning;
584                result.last_rowid = info.rowid.clone();
585                if info.number == TNS_ERR_NO_DATA_FOUND && !result.columns.is_empty() {
586                    result.more_rows = false;
587                } else if info.number == TNS_ERR_ARRAY_DML_ERRORS {
588                    // executemany(batcherrors=True): errors are reported via
589                    // the batch error arrays instead of raising ORA-24381
590                    // (reference messages/base.pyx `_process_error_info`).
591                    result.batch_errors = info.batch_errors;
592                } else if info.number != 0 {
593                    let mut details = info.into_details();
594                    details.array_dml_row_counts = result.array_dml_row_counts.take();
595                    return Err(ProtocolError::ServerErrorInfo(Box::new(details)));
596                }
597            }
598            _ => {
599                let position = reader.position().saturating_sub(1);
600                if let Some(message) =
601                    find_embedded_server_error(payload, capabilities.ttc_field_version, position)
602                {
603                    return Err(ProtocolError::ServerError(message));
604                }
605                return Err(ProtocolError::UnknownMessageType {
606                    message_type,
607                    position,
608                });
609            }
610        }
611    }
612    Ok(result)
613}
614
615pub(crate) fn bind_column_metadata(value: &BindValue) -> ColumnMetadata {
616    let (ora_type_num, csfrm, buffer_size) = bind_metadata(value);
617    let object_schema = match value {
618        BindValue::ObjectOutput { schema, .. } | BindValue::ObjectInput { schema, .. } => {
619            Some(schema.clone())
620        }
621        _ => None,
622    };
623    let object_type_name = match value {
624        BindValue::ObjectOutput { type_name, .. } | BindValue::ObjectInput { type_name, .. } => {
625            Some(type_name.clone())
626        }
627        _ => None,
628    };
629    ColumnMetadata {
630        name: String::new(),
631        ora_type_num,
632        csfrm,
633        precision: 0,
634        scale: 0,
635        buffer_size,
636        max_size: buffer_size,
637        nulls_allowed: true,
638        is_json: false,
639        is_oson: false,
640        object_schema,
641        object_type_name,
642        is_array: matches!(value, BindValue::Array { .. }),
643        vector_dimensions: None,
644        vector_format: 0,
645        vector_flags: 0,
646        domain_schema: None,
647        domain_name: None,
648        annotations: None,
649    }
650}
651
652pub(crate) fn parse_io_vector(reader: &mut TtcReader<'_>, bind_count: usize) -> Result<Vec<usize>> {
653    let _flags = reader.read_u8()?;
654    let temp16 = reader.read_ub2()?;
655    let temp32 = reader.read_ub4()?;
656    let num_binds = usize::try_from(temp32)
657        .map_err(|_| ProtocolError::InvalidPacketLength {
658            length: usize::MAX,
659            minimum: 0,
660        })?
661        .checked_mul(256)
662        .and_then(|value| value.checked_add(usize::from(temp16)))
663        .ok_or(ProtocolError::InvalidPacketLength {
664            length: usize::MAX,
665            minimum: 0,
666        })?;
667    let _num_iters_this_time = reader.read_ub4()?;
668    let _uac_buffer_length = reader.read_ub2()?;
669    let fast_fetch_len = reader.read_ub2()?;
670    if fast_fetch_len > 0 {
671        reader.skip(usize::from(fast_fetch_len))?;
672    }
673    let rowid_len = reader.read_ub2()?;
674    if rowid_len > 0 {
675        reader.skip(usize::from(rowid_len))?;
676    }
677    let mut out_indexes = Vec::new();
678    reader.limits().check_binds(num_binds)?;
679    for index in 0..num_binds {
680        let direction = reader.read_u8()?;
681        if index < bind_count && direction != TNS_BIND_DIR_INPUT {
682            out_indexes.push(index);
683        }
684    }
685    Ok(out_indexes)
686}
687
688pub(crate) fn find_embedded_server_error(
689    payload: &[u8],
690    ttc_field_version: u8,
691    position: usize,
692) -> Option<String> {
693    let start = position.saturating_sub(64);
694    for candidate in start..=position {
695        if !matches!(payload.get(candidate).copied(), Some(TNS_MSG_TYPE_ERROR)) {
696            continue;
697        }
698        let mut reader = TtcReader::new(payload.get(candidate + 1..)?);
699        let info = parse_server_error_info(&mut reader, ttc_field_version).ok()?;
700        if info.number != 0 && info.message.starts_with("ORA-") {
701            return Some(info.message);
702        }
703    }
704    None
705}
706
707pub(crate) fn parse_describe_info(
708    reader: &mut TtcReader<'_>,
709    capabilities: ClientCapabilities,
710    result: &mut QueryResult,
711) -> Result<()> {
712    let _max_row_size = reader.read_ub4()?;
713    let num_columns = reader.read_ub4()?;
714    reader.limits().check_columns(num_columns as usize)?;
715    result.columns.clear();
716    if num_columns > 0 {
717        reader.skip(1)?;
718    }
719    for _ in 0..num_columns {
720        result
721            .columns
722            .push(parse_column_metadata(reader, capabilities)?);
723    }
724    let _current_date = reader.read_bytes_with_length()?;
725    let _dcbflag = reader.read_ub4()?;
726    let _dcbmdbz = reader.read_ub4()?;
727    let _dcbmnpr = reader.read_ub4()?;
728    let _dcbmxpr = reader.read_ub4()?;
729    let _dcbqcky = reader.read_bytes_with_length()?;
730    Ok(())
731}
732
733pub(crate) fn parse_column_metadata(
734    reader: &mut TtcReader<'_>,
735    capabilities: ClientCapabilities,
736) -> Result<ColumnMetadata> {
737    let ora_type_num = reader.read_u8()?;
738    reader.skip(1)?;
739    let precision = reader.read_i8()?;
740    let scale = reader.read_i8()?;
741    let buffer_size = reader.read_ub4()?;
742    let _max_array_elements = reader.read_ub4()?;
743    let _cont_flags = reader.read_ub8()?;
744    let _oid = reader.read_bytes_with_length()?;
745    let _version = reader.read_ub2()?;
746    let _charset_id = reader.read_ub2()?;
747    let csfrm = reader.read_u8()?;
748    let mut max_size = reader.read_ub4()?;
749    if ora_type_num == ORA_TYPE_NUM_RAW {
750        max_size = buffer_size;
751    }
752    if capabilities.ttc_field_version >= TNS_CCAP_FIELD_VERSION_12_2 {
753        let _oaccolid = reader.read_ub4()?;
754    }
755    let nulls_allowed = reader.read_u8()? != 0;
756    reader.skip(1)?;
757    let name = reader.read_string_with_length()?.unwrap_or_default();
758    let object_schema = reader.read_string_with_length()?;
759    let object_type_name = reader.read_string_with_length()?;
760    let _column_position = reader.read_ub2()?;
761    let uds_flags = reader.read_ub4()?;
762    let mut domain_schema = None;
763    let mut domain_name = None;
764    let mut annotations: Option<Vec<(String, String)>> = None;
765    if capabilities.ttc_field_version >= TNS_CCAP_FIELD_VERSION_23_1 {
766        domain_schema = reader.read_string_with_length()?;
767        domain_name = reader.read_string_with_length()?;
768    }
769    if capabilities.ttc_field_version >= TNS_CCAP_FIELD_VERSION_23_1_EXT_3 {
770        let num_annotations = reader.read_ub4()?;
771        if num_annotations > 0 {
772            reader.skip(1)?;
773            let num_annotations = reader.read_ub4()?;
774            reader.skip(1)?;
775            // Bound by remaining bytes (BoundedReader): each annotation reads
776            // at least a length-prefixed key/value, so a ub4 count larger than
777            // the payload is a lie that must not pre-allocate gigabytes.
778            let mut collected: Vec<(String, String)> = reader.with_capacity_limited(
779                num_annotations as usize,
780                1,
781                ProtocolLimits::check_object_elements,
782            )?;
783            for _ in 0..num_annotations {
784                let key = reader.read_string_with_length()?.unwrap_or_default();
785                // A null annotation value is normalized to "" by the reference
786                // driver (python-oracledb base.pyx _process_metadata).
787                let value = reader.read_string_with_length()?.unwrap_or_default();
788                let _flags = reader.read_ub4()?;
789                collected.push((key, value));
790            }
791            let _flags = reader.read_ub4()?;
792            annotations = Some(collected);
793        }
794    }
795    let mut vector_dimensions = None;
796    let mut vector_format = 0u8;
797    let mut vector_flags = 0u8;
798    if capabilities.ttc_field_version >= TNS_CCAP_FIELD_VERSION_23_4 {
799        // reference metadata.pyx: ub4 dimensions, ub1 format, ub1 flags
800        let dims = reader.read_ub4()?;
801        reader.limits().check_vector_dimensions(dims as usize)?;
802        vector_format = reader.read_u8()?;
803        vector_flags = reader.read_u8()?;
804        if ora_type_num == ORA_TYPE_NUM_VECTOR {
805            vector_dimensions = Some(dims);
806        }
807    }
808
809    Ok(ColumnMetadata {
810        name,
811        ora_type_num,
812        csfrm,
813        precision,
814        scale,
815        buffer_size,
816        max_size,
817        nulls_allowed,
818        is_json: uds_flags & TNS_UDS_FLAGS_IS_JSON != 0,
819        is_oson: uds_flags & TNS_UDS_FLAGS_IS_OSON != 0,
820        object_schema,
821        object_type_name,
822        is_array: false,
823        vector_dimensions,
824        vector_format,
825        vector_flags,
826        domain_schema,
827        domain_name,
828        annotations,
829    })
830}
831
832pub(crate) fn parse_row_header(reader: &mut TtcReader<'_>) -> Result<Option<Vec<u8>>> {
833    reader.skip(1)?;
834    let _num_requests = reader.read_ub2()?;
835    let _iteration_number = reader.read_ub4()?;
836    let _num_iters = reader.read_ub4()?;
837    let _buffer_length = reader.read_ub2()?;
838    let num_bytes = reader.read_ub4()?;
839    let bit_vector = if num_bytes > 0 {
840        reader.skip(1)?;
841        Some(reader.read_raw(num_bytes as usize)?.to_vec())
842    } else {
843        None
844    };
845    let _rxhrid = reader.read_bytes_with_length()?;
846    Ok(bit_vector)
847}
848
849pub(crate) fn parse_bit_vector(reader: &mut TtcReader<'_>, num_columns: usize) -> Result<Vec<u8>> {
850    let _num_columns_sent = reader.read_ub2()?;
851    let num_bytes = num_columns.div_ceil(8);
852    Ok(reader.read_raw(num_bytes)?.to_vec())
853}
854
855pub(crate) fn parse_row_data(
856    reader: &mut TtcReader<'_>,
857    result: &mut QueryResult,
858    bit_vector: Option<&[u8]>,
859    previous_row: Option<&[Option<QueryValue>]>,
860    fetch_long_status: bool,
861    lob_decode_mode: LobDecodeMode,
862) -> Result<()> {
863    let mut row = Vec::with_capacity(result.columns.len());
864    for (index, metadata) in result.columns.iter().enumerate() {
865        if is_duplicate_column(bit_vector, index) {
866            let previous = result
867                .rows
868                .last()
869                .map(Vec::as_slice)
870                .or(previous_row)
871                .and_then(|last| last.get(index))
872                .cloned()
873                .ok_or(ProtocolError::TtcDecode(
874                    "duplicate row data without previous row",
875                ))?;
876            row.push(previous);
877            continue;
878        }
879        row.push(parse_column_value_with_lob_mode(
880            reader,
881            metadata,
882            lob_decode_mode,
883        )?);
884        if fetch_long_status
885            && matches!(
886                metadata.ora_type_num,
887                ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW
888            )
889        {
890            let _null_indicator = reader.read_sb4()?;
891            let _return_code = reader.read_ub4()?;
892        }
893    }
894    result.rows.push(row);
895    Ok(())
896}
897
898pub(crate) fn parse_out_bind_row_data(
899    reader: &mut TtcReader<'_>,
900    result: &mut QueryResult,
901    bind_columns: &[ColumnMetadata],
902    out_bind_indexes: &[usize],
903) -> Result<()> {
904    for index in out_bind_indexes {
905        let metadata = bind_columns.get(*index).ok_or(ProtocolError::TtcDecode(
906            "out bind index without bind metadata",
907        ))?;
908        if metadata.is_array {
909            let num_elements = usize::try_from(reader.read_ub4()?).map_err(|_| {
910                ProtocolError::InvalidPacketLength {
911                    length: usize::MAX,
912                    minimum: 0,
913                }
914            })?;
915            reader.limits().check_batch_rows(num_elements)?;
916            // Cap by remaining bytes (BoundedReader): each element consumes
917            // wire data, so a ub4 count cannot legitimately exceed the payload.
918            let mut values: Vec<Option<QueryValue>> =
919                reader.with_capacity_limited(num_elements, 1, ProtocolLimits::check_batch_rows)?;
920            for _ in 0..num_elements {
921                let value = parse_column_value(reader, metadata)?;
922                let actual_num_bytes = reader.read_sb4()?;
923                values.push(apply_out_bind_actual_num_bytes(
924                    metadata,
925                    value,
926                    actual_num_bytes,
927                    "truncated array OUT bind value",
928                )?);
929            }
930            result
931                .out_values
932                .push((*index, Some(QueryValue::Array(values))));
933            continue;
934        }
935        let value = parse_column_value(reader, metadata)?;
936        let actual_num_bytes = reader.read_sb4()?;
937        result.out_values.push((
938            *index,
939            apply_out_bind_actual_num_bytes(
940                metadata,
941                value,
942                actual_num_bytes,
943                "truncated OUT bind value",
944            )?,
945        ));
946    }
947    Ok(())
948}
949
950pub(crate) fn parse_returning_row_data(
951    reader: &mut TtcReader<'_>,
952    result: &mut QueryResult,
953    bind_columns: &[ColumnMetadata],
954    output_bind_indexes: &[usize],
955) -> Result<()> {
956    for index in output_bind_indexes {
957        let metadata = bind_columns.get(*index).ok_or(ProtocolError::TtcDecode(
958            "return bind index without bind metadata",
959        ))?;
960        let num_rows = usize::try_from(reader.read_ub4()?).map_err(|_| {
961            ProtocolError::InvalidPacketLength {
962                length: usize::MAX,
963                minimum: 0,
964            }
965        })?;
966        reader.limits().check_batch_rows(num_rows)?;
967        // Cap by remaining bytes (BoundedReader); see the OOM note above.
968        let mut values: Vec<Option<QueryValue>> =
969            reader.with_capacity_limited(num_rows, 1, ProtocolLimits::check_batch_rows)?;
970        for _ in 0..num_rows {
971            let value = parse_column_value(reader, metadata)?;
972            let actual_num_bytes = reader.read_sb4()?;
973            values.push(apply_out_bind_actual_num_bytes(
974                metadata,
975                value,
976                actual_num_bytes,
977                "truncated DML RETURNING value",
978            )?);
979        }
980        result.return_values.push((*index, values));
981    }
982    Ok(())
983}
984
985fn apply_out_bind_actual_num_bytes(
986    metadata: &ColumnMetadata,
987    value: Option<QueryValue>,
988    actual_num_bytes: i32,
989    truncation_error: &'static str,
990) -> Result<Option<QueryValue>> {
991    if actual_num_bytes < 0 && metadata.ora_type_num == ORA_TYPE_NUM_BOOLEAN {
992        return Ok(None);
993    }
994    if actual_num_bytes != 0 && value.is_some() {
995        return Err(ProtocolError::TtcDecode(truncation_error));
996    }
997    Ok(value)
998}
999
1000pub(crate) fn is_duplicate_column(bit_vector: Option<&[u8]>, column_num: usize) -> bool {
1001    let Some(bit_vector) = bit_vector else {
1002        return false;
1003    };
1004    let byte_num = column_num / 8;
1005    let bit_num = column_num % 8;
1006    bit_vector
1007        .get(byte_num)
1008        .is_some_and(|byte| byte & (1 << bit_num) == 0)
1009}
1010
1011pub(crate) fn parse_column_value(
1012    reader: &mut TtcReader<'_>,
1013    metadata: &ColumnMetadata,
1014) -> Result<Option<QueryValue>> {
1015    parse_column_value_with_lob_mode(reader, metadata, LobDecodeMode::DefineMetadata)
1016}
1017
1018fn parse_column_value_with_lob_mode(
1019    reader: &mut TtcReader<'_>,
1020    metadata: &ColumnMetadata,
1021    lob_decode_mode: LobDecodeMode,
1022) -> Result<Option<QueryValue>> {
1023    if metadata.buffer_size == 0
1024        && !matches!(
1025            metadata.ora_type_num,
1026            ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW | ORA_TYPE_NUM_UROWID
1027        )
1028    {
1029        return Ok(None);
1030    }
1031    match metadata.ora_type_num {
1032        ORA_TYPE_NUM_VARCHAR | ORA_TYPE_NUM_CHAR | ORA_TYPE_NUM_LONG => {
1033            let Some(bytes) = reader.read_bytes()? else {
1034                return Ok(None);
1035            };
1036            match decode_text_value(&bytes, metadata.csfrm) {
1037                Ok(value) => Ok(Some(QueryValue::Text(value))),
1038                // preserve the raw bytes so the caller can honor the
1039                // configured encoding_errors policy (or raise a Python
1040                // UnicodeDecodeError as the reference does)
1041                Err(ProtocolError::TtcDecode(_)) => Ok(Some(QueryValue::TextRaw {
1042                    bytes,
1043                    csfrm: metadata.csfrm,
1044                })),
1045                Err(err) => Err(err),
1046            }
1047        }
1048        ORA_TYPE_NUM_RAW | ORA_TYPE_NUM_LONG_RAW => Ok(reader.read_bytes()?.map(QueryValue::Raw)),
1049        ORA_TYPE_NUM_ROWID => parse_rowid_value(reader).map(|value| value.map(QueryValue::Rowid)),
1050        ORA_TYPE_NUM_UROWID => parse_urowid_value(reader).map(|value| value.map(QueryValue::Rowid)),
1051        ORA_TYPE_NUM_NUMBER | ORA_TYPE_NUM_BINARY_INTEGER => {
1052            let Some(bytes) = reader.read_bytes()? else {
1053                return Ok(None);
1054            };
1055            decode_number_value(&bytes).map(Some)
1056        }
1057        ORA_TYPE_NUM_BINARY_DOUBLE => {
1058            let Some(bytes) = reader.read_bytes()? else {
1059                return Ok(None);
1060            };
1061            decode_binary_double(&bytes)
1062                .map(|value| Some(QueryValue::BinaryDouble(value.to_string())))
1063        }
1064        ORA_TYPE_NUM_BINARY_FLOAT => {
1065            let Some(bytes) = reader.read_bytes()? else {
1066                return Ok(None);
1067            };
1068            // f64-widened text matches Python float semantics for BINARY_FLOAT
1069            decode_binary_float(&bytes)
1070                .map(|value| Some(QueryValue::BinaryDouble(f64::from(value).to_string())))
1071        }
1072        ORA_TYPE_NUM_BOOLEAN => {
1073            let Some(bytes) = reader.read_bytes()? else {
1074                return Ok(None);
1075            };
1076            // reference read_bool: last byte == 1 means true; native
1077            // DB_TYPE_BOOLEAN surfaces as a Python bool.
1078            let is_true = matches!(bytes.last(), Some(&1));
1079            Ok(Some(QueryValue::Boolean(is_true)))
1080        }
1081        ORA_TYPE_NUM_INTERVAL_DS => {
1082            let Some(bytes) = reader.read_bytes()? else {
1083                return Ok(None);
1084            };
1085            decode_interval_ds(&bytes).map(Some)
1086        }
1087        ORA_TYPE_NUM_INTERVAL_YM => {
1088            let Some(bytes) = reader.read_bytes()? else {
1089                return Ok(None);
1090            };
1091            decode_interval_ym(&bytes).map(Some)
1092        }
1093        ORA_TYPE_NUM_DATE
1094        | ORA_TYPE_NUM_TIMESTAMP
1095        | ORA_TYPE_NUM_TIMESTAMP_LTZ
1096        | ORA_TYPE_NUM_TIMESTAMP_TZ => {
1097            let Some(bytes) = reader.read_bytes()? else {
1098                return Ok(None);
1099            };
1100            decode_datetime_value(&bytes).map(Some)
1101        }
1102        ORA_TYPE_NUM_CLOB | ORA_TYPE_NUM_BLOB | ORA_TYPE_NUM_BFILE => {
1103            parse_lob_value(reader, metadata, lob_decode_mode)
1104        }
1105        ORA_TYPE_NUM_VECTOR => parse_vector_value(reader),
1106        ORA_TYPE_NUM_JSON => parse_json_value(reader),
1107        ORA_TYPE_NUM_CURSOR => parse_cursor_value(reader).map(Some),
1108        ORA_TYPE_NUM_OBJECT => parse_object_value(reader, metadata),
1109        _ => Err(ProtocolError::UnsupportedFeature("query column type")),
1110    }
1111}
1112
1113/// A column value decoded in pass 1 of the borrowed row decode. Scalar values
1114/// that borrow the wire buffer are held directly; values that need a small
1115/// owned arena (synthesized `Number` text, or a cold owned [`QueryValue`]) are
1116/// recorded as a deferred handle into the per-row arena and resolved in pass 2
1117/// once the arena is frozen. This two-pass split is what keeps the borrowed
1118/// path sound under `#![forbid(unsafe_code)]`: no `&str`/`&[u8]` is ever held
1119/// into an arena that is still being grown.
1120enum ColumnSlot<'buf> {
1121    /// SQL NULL.
1122    Null,
1123    /// A value that borrows the wire buffer (or is a small `Copy` value).
1124    Wire(QueryValueRef<'buf>),
1125    /// A `NUMBER` whose canonical text lives at `arena[range]` in the per-row
1126    /// number-text arena.
1127    Number {
1128        range: core::ops::Range<usize>,
1129        is_integer: bool,
1130    },
1131    /// A cold / non-borrowable value parked at `owned[index]` in the per-row
1132    /// owned arena.
1133    Owned(usize),
1134}
1135
1136/// Decode one column into a [`ColumnSlot`], borrowing the wire buffer for the
1137/// hot scalar cases and appending to the per-row arenas for the deferred ones.
1138/// Mirrors [`parse_column_value`] type-for-type; the produced owned value (via
1139/// [`QueryValueRef::to_owned_value`]) is identical to the owned path.
1140///
1141/// `digits` is a caller-owned scratch buffer reused across all cells so the
1142/// per-cell `NUMBER` decode allocates nothing of its own (it writes straight
1143/// into `number_arena`). The hot scalar grid (Text/Raw) borrows the wire buffer
1144/// directly with zero allocation.
1145fn parse_column_slot<'buf>(
1146    reader: &mut TtcReader<'buf>,
1147    metadata: &ColumnMetadata,
1148    number_arena: &mut String,
1149    owned_arena: &mut Vec<QueryValue>,
1150    digits: &mut Vec<u8>,
1151    lob_decode_mode: LobDecodeMode,
1152) -> Result<ColumnSlot<'buf>> {
1153    // Park an owned QueryValue in the arena and return the deferred slot. Used
1154    // for the cold / non-borrowable variants so the hot grid stays borrowed.
1155    fn park(owned_arena: &mut Vec<QueryValue>, value: Option<QueryValue>) -> ColumnSlot<'static> {
1156        match value {
1157            None => ColumnSlot::Null,
1158            Some(value) => {
1159                owned_arena.push(value);
1160                ColumnSlot::Owned(owned_arena.len() - 1)
1161            }
1162        }
1163    }
1164
1165    if metadata.buffer_size == 0
1166        && !matches!(
1167            metadata.ora_type_num,
1168            ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW | ORA_TYPE_NUM_UROWID
1169        )
1170    {
1171        return Ok(ColumnSlot::Null);
1172    }
1173    match metadata.ora_type_num {
1174        ORA_TYPE_NUM_VARCHAR | ORA_TYPE_NUM_CHAR | ORA_TYPE_NUM_LONG => {
1175            match reader.read_bytes_borrowed()? {
1176                BorrowedBytes::Null => Ok(ColumnSlot::Null),
1177                // Borrow the wire bytes directly when they are valid UTF-8 and
1178                // not the UTF-16 NCHAR form (which needs re-encoding). Zero copy.
1179                BorrowedBytes::Slice(slice) if metadata.csfrm != CS_FORM_NCHAR => {
1180                    match validate_utf8(slice) {
1181                        Ok(text) => Ok(ColumnSlot::Wire(QueryValueRef::Text(text))),
1182                        Err(_) => Ok(park(
1183                            owned_arena,
1184                            Some(QueryValue::TextRaw {
1185                                bytes: slice.to_vec(),
1186                                csfrm: metadata.csfrm,
1187                            }),
1188                        )),
1189                    }
1190                }
1191                // NCHAR (UTF-16) or chunked long text: fall back to the owned
1192                // decode, which re-encodes to UTF-8 / reassembles chunks.
1193                other => {
1194                    let bytes = other.into_vec();
1195                    let value = match decode_text_value(&bytes, metadata.csfrm) {
1196                        Ok(text) => QueryValue::Text(text),
1197                        Err(ProtocolError::TtcDecode(_)) => QueryValue::TextRaw {
1198                            bytes,
1199                            csfrm: metadata.csfrm,
1200                        },
1201                        Err(err) => return Err(err),
1202                    };
1203                    Ok(park(owned_arena, Some(value)))
1204                }
1205            }
1206        }
1207        ORA_TYPE_NUM_RAW | ORA_TYPE_NUM_LONG_RAW => match reader.read_bytes_borrowed()? {
1208            BorrowedBytes::Null => Ok(ColumnSlot::Null),
1209            BorrowedBytes::Slice(slice) => Ok(ColumnSlot::Wire(QueryValueRef::Raw(slice))),
1210            BorrowedBytes::Chunked(bytes) => Ok(park(owned_arena, Some(QueryValue::Raw(bytes)))),
1211        },
1212        ORA_TYPE_NUM_NUMBER | ORA_TYPE_NUM_BINARY_INTEGER => {
1213            // The wire NUMBER is binary; its canonical decimal text is
1214            // synthesized, so it cannot be borrowed from the buffer. We
1215            // synthesize it *directly* into the per-row number arena (reusing the
1216            // `digits` scratch), borrowing from the arena — zero per-cell heap
1217            // allocation for the common in-range NUMBER.
1218            with_small_bytes(reader, |bytes| match bytes {
1219                None => Ok(ColumnSlot::Null),
1220                Some(bytes) => {
1221                    let start = number_arena.len();
1222                    let is_integer = decode_number_text_into(bytes, digits, number_arena)?;
1223                    Ok(ColumnSlot::Number {
1224                        range: start..number_arena.len(),
1225                        is_integer,
1226                    })
1227                }
1228            })
1229        }
1230        ORA_TYPE_NUM_BOOLEAN => with_small_bytes(reader, |bytes| match bytes {
1231            None => Ok(ColumnSlot::Null),
1232            Some(bytes) => Ok(ColumnSlot::Wire(QueryValueRef::Boolean(matches!(
1233                bytes.last(),
1234                Some(&1)
1235            )))),
1236        }),
1237        ORA_TYPE_NUM_INTERVAL_DS => with_small_bytes(reader, |bytes| match bytes {
1238            None => Ok(ColumnSlot::Null),
1239            Some(bytes) => match decode_interval_ds(bytes)? {
1240                QueryValue::IntervalDS {
1241                    days,
1242                    hours,
1243                    minutes,
1244                    seconds,
1245                    fseconds,
1246                } => Ok(ColumnSlot::Wire(QueryValueRef::IntervalDS {
1247                    days,
1248                    hours,
1249                    minutes,
1250                    seconds,
1251                    fseconds,
1252                })),
1253                other => Ok(park(owned_arena, Some(other))),
1254            },
1255        }),
1256        ORA_TYPE_NUM_INTERVAL_YM => with_small_bytes(reader, |bytes| match bytes {
1257            None => Ok(ColumnSlot::Null),
1258            Some(bytes) => match decode_interval_ym(bytes)? {
1259                QueryValue::IntervalYM { years, months } => {
1260                    Ok(ColumnSlot::Wire(QueryValueRef::IntervalYM {
1261                        years,
1262                        months,
1263                    }))
1264                }
1265                other => Ok(park(owned_arena, Some(other))),
1266            },
1267        }),
1268        ORA_TYPE_NUM_DATE
1269        | ORA_TYPE_NUM_TIMESTAMP
1270        | ORA_TYPE_NUM_TIMESTAMP_LTZ
1271        | ORA_TYPE_NUM_TIMESTAMP_TZ => with_small_bytes(reader, |bytes| match bytes {
1272            None => Ok(ColumnSlot::Null),
1273            Some(bytes) => match decode_datetime_value(bytes)? {
1274                QueryValue::DateTime {
1275                    year,
1276                    month,
1277                    day,
1278                    hour,
1279                    minute,
1280                    second,
1281                    nanosecond,
1282                } => Ok(ColumnSlot::Wire(QueryValueRef::DateTime {
1283                    year,
1284                    month,
1285                    day,
1286                    hour,
1287                    minute,
1288                    second,
1289                    nanosecond,
1290                })),
1291                QueryValue::TimestampTz {
1292                    year,
1293                    month,
1294                    day,
1295                    hour,
1296                    minute,
1297                    second,
1298                    nanosecond,
1299                    offset_minutes,
1300                } => Ok(ColumnSlot::Wire(QueryValueRef::TimestampTz {
1301                    year,
1302                    month,
1303                    day,
1304                    hour,
1305                    minute,
1306                    second,
1307                    nanosecond,
1308                    offset_minutes,
1309                })),
1310                other => Ok(park(owned_arena, Some(other))),
1311            },
1312        }),
1313        // Everything else (Rowid, BinaryDouble/Float, Clob/Blob/Bfile, Vector,
1314        // Json, Cursor, Object, UROWID) goes through the owned decode and is
1315        // parked in the owned arena. These are the cold / non-borrowable cases.
1316        _ => {
1317            let value = parse_column_value_with_lob_mode(reader, metadata, lob_decode_mode)?;
1318            Ok(park(owned_arena, value))
1319        }
1320    }
1321}
1322
1323/// Read one TTC byte field and hand the body to `f` as a borrowed `&[u8]`
1324/// without allocating in the common contiguous case. `None` is SQL NULL. The
1325/// rare chunked long form is reassembled into a temporary `Vec` (these small
1326/// fixed-size scalar types — number/boolean/interval/datetime — are never sent
1327/// chunked in practice, so this fallback is effectively dead weight).
1328fn with_small_bytes<'buf, T>(
1329    reader: &mut TtcReader<'buf>,
1330    f: impl FnOnce(Option<&[u8]>) -> Result<T>,
1331) -> Result<T> {
1332    match reader.read_bytes_borrowed()? {
1333        BorrowedBytes::Null => f(None),
1334        BorrowedBytes::Slice(slice) => f(Some(slice)),
1335        BorrowedBytes::Chunked(owned) => f(Some(&owned)),
1336    }
1337}
1338
1339impl BorrowedBytes<'_> {
1340    /// Reassemble into an owned `Vec` (zero-copy `Slice` still copies; `Chunked`
1341    /// reuses its already-owned `Vec`). Used by the non-borrowable text fallback.
1342    fn into_vec(self) -> Vec<u8> {
1343        match self {
1344            BorrowedBytes::Null => Vec::new(),
1345            BorrowedBytes::Slice(slice) => slice.to_vec(),
1346            BorrowedBytes::Chunked(owned) => owned,
1347        }
1348    }
1349}
1350
1351/// A decoded fetch batch that **owns** the wire response buffer and column
1352/// metadata, and yields rows of borrowed [`QueryValueRef`] that point straight
1353/// into that buffer. This is the zero-copy fetch fast path: the common scalar
1354/// grid is decoded with no per-cell allocation.
1355///
1356/// ## Soundness
1357///
1358/// The buffer is owned by the batch and outlives every borrowed row: rows are
1359/// only ever surfaced *inside* the [`for_each_row_ref`](Self::for_each_row_ref)
1360/// callback, whose `&[QueryValueRef]` argument cannot escape (its lifetime is
1361/// bound to the call). The borrow checker therefore guarantees no
1362/// `QueryValueRef` can dangle — there is no self-referential struct and no
1363/// `unsafe`. `Number` text and the cold values borrow per-row arenas that are
1364/// fully built (pass 1) before any reference into them is taken (pass 2), so an
1365/// arena is never grown while borrowed.
1366#[derive(Clone, Debug)]
1367pub struct BorrowedRowBatch {
1368    buffer: Vec<u8>,
1369    columns: Vec<ColumnMetadata>,
1370    /// Byte offset into `buffer` where each row's column values begin.
1371    row_starts: Vec<usize>,
1372    /// Per-row duplicate-column bit vector (server row-compression). `None` (or
1373    /// an absent entry) means every column is present on the wire for that row.
1374    /// A zero bit marks a duplicate column whose value repeats the previous
1375    /// row's value and carries no wire bytes (reference `bit_vector`).
1376    row_bit_vectors: Vec<Option<Vec<u8>>>,
1377    /// Whether this batch carried `LONG`/`LONG RAW` status trailers after each
1378    /// such column (the fetch path sets this; the plain execute path does not).
1379    fetch_long_status: bool,
1380    lob_decode_mode: LobDecodeMode,
1381    /// The caller's previous (owned) row, used to resolve duplicate columns in
1382    /// the *first* compressed row of the batch (whose duplicates repeat the row
1383    /// that ended the prior page). `None` outside the compressed-fetch case.
1384    previous_row_seed: Option<Vec<Option<QueryValue>>>,
1385}
1386
1387impl BorrowedRowBatch {
1388    /// Construct a batch from an owned wire `buffer`, the `columns` describing
1389    /// each cell, and the per-row start offsets into `buffer`. Use this for
1390    /// batches with no duplicate-column compression and no LONG trailers (the
1391    /// common synthetic / test case); the framing-aware
1392    /// [`parse_query_response_borrowed`] builds the full form.
1393    pub fn new(buffer: Vec<u8>, columns: Vec<ColumnMetadata>, row_starts: Vec<usize>) -> Self {
1394        Self {
1395            buffer,
1396            columns,
1397            row_starts,
1398            row_bit_vectors: Vec::new(),
1399            fetch_long_status: false,
1400            lob_decode_mode: LobDecodeMode::PlainLocator,
1401            previous_row_seed: None,
1402        }
1403    }
1404
1405    /// Number of rows in the batch.
1406    pub fn row_count(&self) -> usize {
1407        self.row_starts.len()
1408    }
1409
1410    /// The columns describing each cell.
1411    pub fn columns(&self) -> &[ColumnMetadata] {
1412        &self.columns
1413    }
1414
1415    /// The address range of the owned buffer, for tests asserting that borrowed
1416    /// scalar cells truly point into it (zero-copy).
1417    #[cfg(test)]
1418    pub fn buffer_ptr_range(&self) -> core::ops::Range<usize> {
1419        let start = self.buffer.as_ptr() as usize;
1420        start..start + self.buffer.len()
1421    }
1422
1423    /// Decode each row and invoke `callback` with the row's borrowed cells. The
1424    /// `&[Option<QueryValueRef>]` slice borrows the batch buffer and per-row
1425    /// arenas; it is valid only for the duration of the call (it cannot escape).
1426    /// `None` cells are SQL NULL. Returns the first decode/callback error.
1427    ///
1428    /// Generic over the callback's error type `E` (any error a decode failure
1429    /// can convert into, e.g. the driver crate's own error) so callers are not
1430    /// forced through [`ProtocolError`]; a decode failure is surfaced via
1431    /// `E: From<ProtocolError>`.
1432    pub fn for_each_row_ref<F, E>(&self, mut callback: F) -> std::result::Result<(), E>
1433    where
1434        F: FnMut(&[Option<QueryValueRef<'_>>]) -> std::result::Result<(), E>,
1435        E: From<ProtocolError>,
1436    {
1437        // The two arenas are reused across rows (cleared, not reallocated). They
1438        // are mutated only in pass 1; pass 2 borrows them immutably and that
1439        // borrow is confined to a single loop iteration (it ends before the next
1440        // iteration's `clear()`), which is what keeps the borrow checker — and
1441        // soundness — happy.
1442        let mut number_arena = String::new();
1443        let mut owned_arena: Vec<QueryValue> = Vec::new();
1444        // Reusable scratch: `digits` for the NUMBER decode, `slots` for pass 1,
1445        // `row` for the borrowed cells handed to the callback. All cleared and
1446        // reused across rows so the steady-state per-row decode allocates only
1447        // when an arena/scratch genuinely grows (amortized). `slots`/`row`/
1448        // `digits` borrow nothing across iterations beyond the stable buffer.
1449        let mut digits: Vec<u8> = Vec::new();
1450        let mut slots: Vec<Option<ColumnSlot<'_>>> = Vec::with_capacity(self.columns.len());
1451        // Owned snapshot of the previous row, used only to resolve duplicate
1452        // (bit-vector-compressed) columns, which carry no wire bytes. Empty when
1453        // the batch has no bit vectors (the common case), so it costs nothing.
1454        // Seeded from the caller's prior-page row for the first compressed row.
1455        let mut previous_owned: Vec<Option<QueryValue>> =
1456            self.previous_row_seed.clone().unwrap_or_default();
1457        let uses_bit_vectors = !self.row_bit_vectors.is_empty();
1458
1459        for (row_index, &start) in self.row_starts.iter().enumerate() {
1460            number_arena.clear();
1461            owned_arena.clear();
1462            slots.clear();
1463            let bit_vector = self
1464                .row_bit_vectors
1465                .get(row_index)
1466                .and_then(|bv| bv.as_deref());
1467
1468            // Pass 1: decode all columns, growing the per-row arenas. `slots`
1469            // borrows only the buffer (the deferred Number/Owned slots hold a
1470            // range/index into the arenas, never a borrow of them). Duplicate
1471            // columns carry no wire bytes — their owned previous value is parked
1472            // in the owned arena.
1473            let mut reader = TtcReader::new(&self.buffer[start..]);
1474            for (index, metadata) in self.columns.iter().enumerate() {
1475                if is_duplicate_column(bit_vector, index) {
1476                    let previous = previous_owned.get(index).and_then(Option::as_ref);
1477                    match previous {
1478                        None => slots.push(None),
1479                        Some(value) => {
1480                            owned_arena.push(value.clone());
1481                            slots.push(Some(ColumnSlot::Owned(owned_arena.len() - 1)));
1482                        }
1483                    }
1484                    continue;
1485                }
1486                let slot = parse_column_slot(
1487                    &mut reader,
1488                    metadata,
1489                    &mut number_arena,
1490                    &mut owned_arena,
1491                    &mut digits,
1492                    self.lob_decode_mode,
1493                )?;
1494                slots.push(match slot {
1495                    ColumnSlot::Null => None,
1496                    other => Some(other),
1497                });
1498                if self.fetch_long_status
1499                    && matches!(
1500                        metadata.ora_type_num,
1501                        ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW
1502                    )
1503                {
1504                    let _null_indicator = reader.read_sb4()?;
1505                    let _return_code = reader.read_ub4()?;
1506                }
1507            }
1508
1509            // Pass 2: arenas are now frozen — resolve deferred slots into
1510            // borrowed refs. No arena is mutated here, so the borrows are sound.
1511            // `row` is allocated per row: it carries the per-row arena lifetime,
1512            // which (unlike `slots`/`digits`, that borrow only the stable buffer)
1513            // cannot be reused across an arena `clear()`. This is the single
1514            // remaining per-row allocation, versus the owned path's per-row Vec
1515            // *plus* a String per scalar cell.
1516            let row: Vec<Option<QueryValueRef<'_>>> = slots
1517                .iter()
1518                .map(|slot| {
1519                    slot.as_ref().map(|slot| match *slot {
1520                        ColumnSlot::Null => unreachable!("Null slots are stored as None"),
1521                        ColumnSlot::Wire(value) => value,
1522                        ColumnSlot::Number {
1523                            ref range,
1524                            is_integer,
1525                        } => QueryValueRef::Number {
1526                            text: &number_arena[range.clone()],
1527                            is_integer,
1528                        },
1529                        ColumnSlot::Owned(index) => QueryValueRef::Owned(&owned_arena[index]),
1530                    })
1531                })
1532                .collect();
1533
1534            callback(&row)?;
1535
1536            // Snapshot the just-emitted row as owned values for the next row's
1537            // duplicate-column resolution — but only when the batch actually uses
1538            // bit-vector compression, so the zero-copy common path pays nothing.
1539            if uses_bit_vectors {
1540                previous_owned.clear();
1541                previous_owned.extend(row.iter().map(|cell| cell.map(|v| v.to_owned_value())));
1542            }
1543        }
1544        Ok(())
1545    }
1546}
1547
1548/// The borrowed counterpart of a fetched [`QueryResult`]: a [`BorrowedRowBatch`]
1549/// of zero-copy rows plus the response-level fields a caller needs to page and
1550/// finalize the cursor. Produced by [`parse_query_response_borrowed`].
1551#[derive(Clone, Debug)]
1552pub struct BorrowedFetchResult {
1553    /// The decoded rows, borrowing the response buffer.
1554    pub batch: BorrowedRowBatch,
1555    /// Whether the server reports more rows for this cursor.
1556    pub more_rows: bool,
1557    /// Server cursor id (for paging / release).
1558    pub cursor_id: u32,
1559    /// Total affected/processed row count from the end-of-call error message.
1560    pub row_count: u64,
1561}
1562
1563/// Walk a fetch/query response payload and produce a [`BorrowedFetchResult`]
1564/// whose rows borrow `payload` (the caller must keep the owned buffer alive —
1565/// [`BorrowedRowBatch`] owns it). This is the zero-copy companion to
1566/// [`parse_fetch_response_with_context`]: it walks the exact same message
1567/// framing (DESCRIBE_INFO / ROW_HEADER / BIT_VECTOR / ROW_DATA / ERROR /
1568/// END_OF_RESPONSE) but, instead of materializing owned rows, records each
1569/// row's byte offset and bit vector so [`BorrowedRowBatch::for_each_row_ref`]
1570/// can decode them lazily and without per-cell allocation.
1571///
1572/// Scope: the plain query-row case (the fetch path). Out-bind / DML-returning
1573/// rows are not part of a fetch response and are left to the owned path.
1574pub fn parse_query_response_borrowed(
1575    payload: &[u8],
1576    capabilities: ClientCapabilities,
1577    columns: &[ColumnMetadata],
1578    previous_row: Option<&[Option<QueryValue>]>,
1579) -> Result<BorrowedFetchResult> {
1580    parse_query_response_borrowed_with_limits(
1581        payload,
1582        capabilities,
1583        columns,
1584        previous_row,
1585        ProtocolLimits::DEFAULT,
1586    )
1587}
1588
1589pub fn parse_query_response_borrowed_with_limits(
1590    payload: &[u8],
1591    capabilities: ClientCapabilities,
1592    columns: &[ColumnMetadata],
1593    previous_row: Option<&[Option<QueryValue>]>,
1594    limits: ProtocolLimits,
1595) -> Result<BorrowedFetchResult> {
1596    parse_query_response_borrowed_with_lob_mode_and_limits(
1597        payload,
1598        capabilities,
1599        columns,
1600        previous_row,
1601        LobDecodeMode::PlainLocator,
1602        limits,
1603    )
1604}
1605
1606pub fn parse_define_fetch_response_borrowed_with_limits(
1607    payload: &[u8],
1608    capabilities: ClientCapabilities,
1609    columns: &[ColumnMetadata],
1610    previous_row: Option<&[Option<QueryValue>]>,
1611    limits: ProtocolLimits,
1612) -> Result<BorrowedFetchResult> {
1613    parse_query_response_borrowed_with_lob_mode_and_limits(
1614        payload,
1615        capabilities,
1616        columns,
1617        previous_row,
1618        LobDecodeMode::DefineMetadata,
1619        limits,
1620    )
1621}
1622
1623fn parse_query_response_borrowed_with_lob_mode_and_limits(
1624    payload: &[u8],
1625    capabilities: ClientCapabilities,
1626    columns: &[ColumnMetadata],
1627    previous_row: Option<&[Option<QueryValue>]>,
1628    lob_decode_mode: LobDecodeMode,
1629    limits: ProtocolLimits,
1630) -> Result<BorrowedFetchResult> {
1631    let mut reader = TtcReader::with_limits(payload, limits)?;
1632    reader.limits().check_columns(columns.len())?;
1633    let mut result_columns = columns.to_vec();
1634    let mut more_rows = true;
1635    let mut cursor_id = 0u32;
1636    let mut row_count = 0u64;
1637    let mut row_starts: Vec<usize> = Vec::new();
1638    let mut row_bit_vectors: Vec<Option<Vec<u8>>> = Vec::new();
1639    let mut any_bit_vector = false;
1640    let mut pending_bit_vector: Option<Vec<u8>> = None;
1641    // The fetch path always consumes LONG/LONG RAW status trailers.
1642    let fetch_long_status = true;
1643
1644    while reader.remaining() > 0 {
1645        let message_type = reader.read_u8()?;
1646        match message_type {
1647            0 => {}
1648            TNS_MSG_TYPE_DESCRIBE_INFO => {
1649                let _describe_name = reader.read_bytes()?;
1650                let previous = std::mem::take(&mut result_columns);
1651                let mut described = QueryResult::default();
1652                parse_describe_info(&mut reader, capabilities, &mut described)?;
1653                result_columns = described.columns;
1654                for (index, column) in result_columns.iter_mut().enumerate() {
1655                    if let Some(prev) = previous.get(index) {
1656                        adjust_refetch_metadata(prev, column);
1657                    }
1658                }
1659            }
1660            TNS_MSG_TYPE_ROW_HEADER => {
1661                pending_bit_vector = parse_row_header(&mut reader)?;
1662            }
1663            TNS_MSG_TYPE_BIT_VECTOR => {
1664                pending_bit_vector = Some(parse_bit_vector(&mut reader, result_columns.len())?);
1665            }
1666            TNS_MSG_TYPE_ROW_DATA => {
1667                // Record where this row's column values begin, then advance the
1668                // reader past the row (skipping, not materializing).
1669                reader.limits().check_batch_rows(row_starts.len() + 1)?;
1670                row_starts.push(reader.position());
1671                let bit_vector = pending_bit_vector.take();
1672                any_bit_vector |= bit_vector.is_some();
1673                row_bit_vectors.push(bit_vector.clone());
1674                skip_row_data(
1675                    &mut reader,
1676                    &result_columns,
1677                    bit_vector.as_deref(),
1678                    fetch_long_status,
1679                    lob_decode_mode,
1680                )?;
1681            }
1682            TNS_MSG_TYPE_PARAMETER => {
1683                let _params = parse_query_return_parameters(&mut reader, false)?;
1684            }
1685            TNS_MSG_TYPE_STATUS => {
1686                let _call_status = reader.read_ub4()?;
1687                let _seq = reader.read_ub2()?;
1688            }
1689            TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
1690                let _ = skip_server_side_piggyback(&mut reader)?;
1691            }
1692            TNS_MSG_TYPE_FLUSH_OUT_BINDS | TNS_MSG_TYPE_END_OF_RESPONSE => break,
1693            TNS_MSG_TYPE_TOKEN => {
1694                let _token = reader.read_ub8()?;
1695            }
1696            TNS_MSG_TYPE_IMPLICIT_RESULTSET => {
1697                // Mirror the owned parser's framing walk so the reader advances
1698                // past the implicit-resultset block identically (the borrowed
1699                // fetch API does not surface child cursors, but it must still
1700                // consume the bytes). reference messages/base.pyx
1701                // `_process_implicit_result`.
1702                let num_results = reader.read_ub4()?;
1703                reader
1704                    .limits()
1705                    .check_length_prefixed_elements(num_results as usize)?;
1706                for _ in 0..num_results {
1707                    let num_bytes = reader.read_u8()?;
1708                    reader.skip(usize::from(num_bytes))?;
1709                    let mut child = QueryResult::default();
1710                    parse_describe_info(&mut reader, capabilities, &mut child)?;
1711                    let _child_cursor_id = reader.read_ub2()?;
1712                }
1713            }
1714            TNS_MSG_TYPE_ERROR => {
1715                let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
1716                if info.cursor_id != 0 {
1717                    cursor_id = u32::from(info.cursor_id);
1718                }
1719                row_count = info.row_count;
1720                if info.number == TNS_ERR_NO_DATA_FOUND && !result_columns.is_empty() {
1721                    more_rows = false;
1722                } else if info.number != 0 && info.number != TNS_ERR_ARRAY_DML_ERRORS {
1723                    return Err(ProtocolError::ServerErrorInfo(Box::new(
1724                        info.into_details(),
1725                    )));
1726                }
1727            }
1728            _ => {
1729                let position = reader.position().saturating_sub(1);
1730                if let Some(message) =
1731                    find_embedded_server_error(payload, capabilities.ttc_field_version, position)
1732                {
1733                    return Err(ProtocolError::ServerError(message));
1734                }
1735                return Err(ProtocolError::UnknownMessageType {
1736                    message_type,
1737                    position,
1738                });
1739            }
1740        }
1741    }
1742
1743    // If the batch never used duplicate-column compression, drop the per-row
1744    // bit-vector vector so iteration takes the zero-copy fast path (no owned
1745    // previous-row snapshotting).
1746    if !any_bit_vector {
1747        row_bit_vectors.clear();
1748    }
1749
1750    let batch = BorrowedRowBatch {
1751        buffer: payload.to_vec(),
1752        columns: result_columns,
1753        row_starts,
1754        row_bit_vectors,
1755        fetch_long_status,
1756        lob_decode_mode,
1757        // Seed the first compressed row's duplicate resolution from the caller's
1758        // prior-page row (only consulted when the batch uses bit vectors).
1759        previous_row_seed: any_bit_vector.then(|| {
1760            previous_row
1761                .map(<[Option<QueryValue>]>::to_vec)
1762                .unwrap_or_default()
1763        }),
1764    };
1765
1766    Ok(BorrowedFetchResult {
1767        batch,
1768        more_rows,
1769        cursor_id,
1770        row_count,
1771    })
1772}
1773
1774/// Advance `reader` past one ROW_DATA row **without materializing owned values**
1775/// — this is the offset-capture pass, so it must allocate nothing for the hot
1776/// scalar grid. Mirrors [`parse_row_data`]'s consumption exactly: duplicate
1777/// (bit-vector) columns carry no wire bytes and are skipped; the hot byte-field
1778/// scalar types are skipped with a zero-allocation length-prefixed skip; the
1779/// rare cold types (LOB / Vector / JSON / Cursor / Object / ROWID), whose wire
1780/// framing is non-trivial, fall back to [`parse_column_value`] (which may
1781/// allocate, but those are uncommon). `LONG`/`LONG RAW` status trailers are
1782/// consumed when `fetch_long_status`.
1783fn skip_row_data(
1784    reader: &mut TtcReader<'_>,
1785    columns: &[ColumnMetadata],
1786    bit_vector: Option<&[u8]>,
1787    fetch_long_status: bool,
1788    lob_decode_mode: LobDecodeMode,
1789) -> Result<()> {
1790    for (index, metadata) in columns.iter().enumerate() {
1791        if is_duplicate_column(bit_vector, index) {
1792            continue;
1793        }
1794        let consumed_byte_field = metadata.buffer_size != 0
1795            && matches!(
1796                metadata.ora_type_num,
1797                ORA_TYPE_NUM_VARCHAR
1798                    | ORA_TYPE_NUM_CHAR
1799                    | ORA_TYPE_NUM_LONG
1800                    | ORA_TYPE_NUM_RAW
1801                    | ORA_TYPE_NUM_LONG_RAW
1802                    | ORA_TYPE_NUM_NUMBER
1803                    | ORA_TYPE_NUM_BINARY_INTEGER
1804                    | ORA_TYPE_NUM_BINARY_DOUBLE
1805                    | ORA_TYPE_NUM_BINARY_FLOAT
1806                    | ORA_TYPE_NUM_BOOLEAN
1807                    | ORA_TYPE_NUM_INTERVAL_DS
1808                    | ORA_TYPE_NUM_INTERVAL_YM
1809                    | ORA_TYPE_NUM_DATE
1810                    | ORA_TYPE_NUM_TIMESTAMP
1811                    | ORA_TYPE_NUM_TIMESTAMP_LTZ
1812                    | ORA_TYPE_NUM_TIMESTAMP_TZ
1813            );
1814        if consumed_byte_field {
1815            reader.skip_bytes_field()?;
1816        } else {
1817            // Cold / non-byte-field type, or a zero-buffer-size column: defer to
1818            // the full owned decode purely to advance the reader correctly.
1819            let _ = parse_column_value_with_lob_mode(reader, metadata, lob_decode_mode)?;
1820        }
1821        if fetch_long_status
1822            && matches!(
1823                metadata.ora_type_num,
1824                ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW
1825            )
1826        {
1827            let _null_indicator = reader.read_sb4()?;
1828            let _return_code = reader.read_ub4()?;
1829        }
1830    }
1831    Ok(())
1832}
1833
1834pub(crate) fn encode_rowid_component(mut value: u32, size: usize, output: &mut String) {
1835    let mut encoded = vec![b'A'; size];
1836    for index in 0..size {
1837        let alphabet_index = usize::try_from(value & 0x3f).unwrap_or(0);
1838        encoded[size - index - 1] = TNS_BASE64_ALPHABET[alphabet_index];
1839        value >>= 6;
1840    }
1841    output.extend(encoded.into_iter().map(char::from));
1842}
1843
1844pub(crate) fn encode_physical_rowid(
1845    rba: u32,
1846    partition_id: u16,
1847    block_num: u32,
1848    slot_num: u16,
1849) -> String {
1850    let mut output = String::with_capacity(ORA_TYPE_SIZE_ROWID as usize);
1851    encode_rowid_component(rba, 6, &mut output);
1852    encode_rowid_component(u32::from(partition_id), 3, &mut output);
1853    encode_rowid_component(block_num, 6, &mut output);
1854    encode_rowid_component(u32::from(slot_num), 3, &mut output);
1855    output
1856}
1857
1858pub(crate) fn parse_rowid_value(reader: &mut TtcReader<'_>) -> Result<Option<String>> {
1859    let len = reader.read_u8()?;
1860    if len == 0 || len == crate::wire::TNS_NULL_LENGTH_INDICATOR {
1861        return Ok(None);
1862    }
1863    let rba = reader.read_ub4()?;
1864    let partition_id = reader.read_ub2()?;
1865    reader.skip(1)?;
1866    let block_num = reader.read_ub4()?;
1867    let slot_num = reader.read_ub2()?;
1868    Ok(Some(encode_physical_rowid(
1869        rba,
1870        partition_id,
1871        block_num,
1872        slot_num,
1873    )))
1874}
1875
1876pub(crate) fn encode_logical_urowid(bytes: &[u8]) -> String {
1877    let mut input_offset = 1;
1878    let mut input_len = bytes.len().saturating_sub(1);
1879    let mut output = String::with_capacity((bytes.len() / 3) * 4 + 4);
1880    output.push('*');
1881    while input_len > 0 {
1882        let mut pos = bytes[input_offset] >> 2;
1883        output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1884
1885        pos = (bytes[input_offset] & 0x03) << 4;
1886        if input_len == 1 {
1887            output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1888            break;
1889        }
1890        input_offset += 1;
1891        pos |= (bytes[input_offset] & 0xf0) >> 4;
1892        output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1893
1894        pos = (bytes[input_offset] & 0x0f) << 2;
1895        if input_len == 2 {
1896            output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1897            break;
1898        }
1899        input_offset += 1;
1900        pos |= (bytes[input_offset] & 0xc0) >> 6;
1901        output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1902
1903        pos = bytes[input_offset] & 0x3f;
1904        output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1905        input_offset += 1;
1906        input_len -= 3;
1907    }
1908    output
1909}
1910
1911pub(crate) fn parse_urowid_value(reader: &mut TtcReader<'_>) -> Result<Option<String>> {
1912    if reader.read_bytes()?.is_none() {
1913        return Ok(None);
1914    }
1915    let Some(bytes) = reader.read_bytes()? else {
1916        return Ok(None);
1917    };
1918    if bytes.len() < 13 {
1919        return Err(ProtocolError::TtcDecode("encoded UROWID too short"));
1920    }
1921    if bytes[0] == 1 {
1922        let rba = u32::from_be_bytes([bytes[1], bytes[2], bytes[3], bytes[4]]);
1923        let partition_id = u16::from_be_bytes([bytes[5], bytes[6]]);
1924        let block_num = u32::from_be_bytes([bytes[7], bytes[8], bytes[9], bytes[10]]);
1925        let slot_num = u16::from_be_bytes([bytes[11], bytes[12]]);
1926        Ok(Some(encode_physical_rowid(
1927            rba,
1928            partition_id,
1929            block_num,
1930            slot_num,
1931        )))
1932    } else {
1933        Ok(Some(encode_logical_urowid(&bytes)))
1934    }
1935}
1936
1937pub(crate) fn parse_lob_value(
1938    reader: &mut TtcReader<'_>,
1939    metadata: &ColumnMetadata,
1940    lob_decode_mode: LobDecodeMode,
1941) -> Result<Option<QueryValue>> {
1942    let num_bytes = reader.read_ub4()?;
1943    reader.limits().check_response_bytes(num_bytes as usize)?;
1944    if num_bytes == 0 {
1945        return Ok(None);
1946    }
1947    let (size, chunk_size) = if matches!(
1948        (lob_decode_mode, metadata.ora_type_num),
1949        (_, ORA_TYPE_NUM_BFILE) | (LobDecodeMode::PlainLocator, _)
1950    ) {
1951        (0, 0)
1952    } else {
1953        (reader.read_ub8()?, reader.read_ub4()?)
1954    };
1955    let Some(locator) = reader.read_bytes()? else {
1956        return Ok(None);
1957    };
1958    Ok(Some(QueryValue::Lob(Box::new(LobValue {
1959        ora_type_num: metadata.ora_type_num,
1960        csfrm: metadata.csfrm,
1961        locator,
1962        size,
1963        chunk_size,
1964    }))))
1965}
1966
1967#[cfg(test)]
1968mod lob_fetch_shape_tests {
1969    use super::*;
1970
1971    fn clob_column() -> ColumnMetadata {
1972        ColumnMetadata {
1973            name: "BODY".into(),
1974            ora_type_num: ORA_TYPE_NUM_CLOB,
1975            csfrm: CS_FORM_IMPLICIT,
1976            precision: 0,
1977            scale: 0,
1978            buffer_size: 4000,
1979            max_size: 4000,
1980            nulls_allowed: true,
1981            is_json: false,
1982            is_oson: false,
1983            object_schema: None,
1984            object_type_name: None,
1985            is_array: false,
1986            vector_dimensions: None,
1987            vector_format: 0,
1988            vector_flags: 0,
1989            domain_schema: None,
1990            domain_name: None,
1991            annotations: None,
1992        }
1993    }
1994
1995    fn blob_column() -> ColumnMetadata {
1996        ColumnMetadata {
1997            name: "IMAGE".into(),
1998            ora_type_num: ORA_TYPE_NUM_BLOB,
1999            csfrm: 0,
2000            precision: 0,
2001            scale: 0,
2002            buffer_size: 4000,
2003            max_size: 4000,
2004            nulls_allowed: true,
2005            is_json: false,
2006            is_oson: false,
2007            object_schema: None,
2008            object_type_name: None,
2009            is_array: false,
2010            vector_dimensions: None,
2011            vector_format: 0,
2012            vector_flags: 0,
2013            domain_schema: None,
2014            domain_name: None,
2015            annotations: None,
2016        }
2017    }
2018
2019    fn lob_row_payload(locator: &[u8], metadata: Option<(u64, u32)>) -> Vec<u8> {
2020        let mut writer = TtcWriter::new();
2021        writer.write_u8(TNS_MSG_TYPE_ROW_DATA);
2022        writer.write_ub4(u32::try_from(locator.len()).expect("locator length fits ub4"));
2023        if let Some((size, chunk_size)) = metadata {
2024            writer.write_ub8(size);
2025            writer.write_ub4(chunk_size);
2026        }
2027        writer
2028            .write_bytes_with_length(locator)
2029            .expect("synthetic locator length is encodable");
2030        writer.write_u8(TNS_MSG_TYPE_END_OF_RESPONSE);
2031        writer.into_bytes()
2032    }
2033
2034    fn write_define_lob_cell(writer: &mut TtcWriter, locator: &[u8], size: u64, chunk_size: u32) {
2035        writer.write_ub4(u32::try_from(locator.len()).expect("locator length fits ub4"));
2036        writer.write_ub8(size);
2037        writer.write_ub4(chunk_size);
2038        writer
2039            .write_bytes_with_length(locator)
2040            .expect("synthetic locator length is encodable");
2041    }
2042
2043    fn first_lob(result: &QueryResult) -> &LobValue {
2044        match &result.rows[0][0] {
2045            Some(QueryValue::Lob(lob)) => lob.as_ref(),
2046            other => panic!("expected LOB value, got {other:?}"),
2047        }
2048    }
2049
2050    #[test]
2051    fn column_metadata_discards_server_charset_id_and_keeps_csfrm_only() {
2052        let caps = ClientCapabilities {
2053            ttc_field_version: 0,
2054            max_string_size: 32_767,
2055            charset_id: 873,
2056        };
2057        let mut writer = TtcWriter::new();
2058        writer.write_u8(ORA_TYPE_NUM_VARCHAR);
2059        writer.write_u8(0); // flags
2060        writer.write_u8(0); // precision
2061        writer.write_u8(0); // scale
2062        writer.write_ub4(4000);
2063        writer.write_ub4(0); // max array elements
2064        writer.write_ub8(0); // cont flags
2065        writer
2066            .write_bytes_with_two_lengths(None)
2067            .expect("empty oid");
2068        writer.write_ub2(0); // version
2069        writer.write_ub2(0); // unusable server charset id: must not drive decoding
2070        writer.write_u8(CS_FORM_IMPLICIT);
2071        writer.write_ub4(4000);
2072        writer.write_u8(1); // nullable
2073        writer.write_u8(0); // flags
2074        writer
2075            .write_bytes_with_two_lengths(Some(b"TXT"))
2076            .expect("name");
2077        writer
2078            .write_bytes_with_two_lengths(None)
2079            .expect("object schema");
2080        writer
2081            .write_bytes_with_two_lengths(None)
2082            .expect("object type");
2083        writer.write_ub2(1); // column position
2084        writer.write_ub4(0); // uds flags
2085
2086        let bytes = writer.into_bytes();
2087        let mut reader = TtcReader::new(&bytes);
2088        let metadata = parse_column_metadata(&mut reader, caps).expect("metadata should parse");
2089        assert_eq!(metadata.name(), "TXT");
2090        assert_eq!(metadata.csfrm(), CS_FORM_IMPLICIT);
2091        assert_eq!(
2092            decode_text_value("ok".as_bytes(), metadata.csfrm()).expect("decode text"),
2093            "ok"
2094        );
2095    }
2096
2097    #[test]
2098    fn plain_fetch_lob_locator_omits_size_and_chunk_fields() {
2099        let locator: Vec<u8> = (0u8..114).collect();
2100        let payload = lob_row_payload(&locator, None);
2101        let columns = [clob_column()];
2102
2103        let result = parse_fetch_response_with_context(
2104            &payload,
2105            ClientCapabilities::default(),
2106            &columns,
2107            None,
2108        )
2109        .expect("plain fetch CLOB locator should decode");
2110
2111        let lob = first_lob(&result);
2112        assert_eq!(lob.locator, locator);
2113        assert_eq!(lob.size, 0);
2114        assert_eq!(lob.chunk_size, 0);
2115    }
2116
2117    #[test]
2118    fn define_fetch_lob_locator_includes_size_and_chunk_fields() {
2119        let locator: Vec<u8> = (0u8..114).collect();
2120        let payload = lob_row_payload(&locator, Some((23, 8060)));
2121        let columns = [clob_column()];
2122
2123        let result = parse_define_fetch_response_with_context_and_limits(
2124            &payload,
2125            ClientCapabilities::default(),
2126            &columns,
2127            None,
2128            ProtocolLimits::DEFAULT,
2129        )
2130        .expect("define fetch CLOB locator should decode");
2131
2132        let lob = first_lob(&result);
2133        assert_eq!(lob.locator, locator);
2134        assert_eq!(lob.size, 23);
2135        assert_eq!(lob.chunk_size, 8060);
2136    }
2137
2138    #[test]
2139    fn borrowed_define_fetch_lob_page_matches_owned_decode() {
2140        let columns = [clob_column(), blob_column()];
2141        let clob_locator_a: Vec<u8> = (0u8..114).collect();
2142        let blob_locator_a: Vec<u8> = (128u8..242).collect();
2143        let clob_locator_b: Vec<u8> = (32u8..146).collect();
2144        let blob_locator_b: Vec<u8> = (64u8..178).collect();
2145        let mut writer = TtcWriter::new();
2146
2147        writer.write_u8(TNS_MSG_TYPE_ROW_DATA);
2148        write_define_lob_cell(&mut writer, &clob_locator_a, 23, 8060);
2149        write_define_lob_cell(&mut writer, &blob_locator_a, 48, 4096);
2150        writer.write_u8(TNS_MSG_TYPE_ROW_DATA);
2151        write_define_lob_cell(&mut writer, &clob_locator_b, 31, 8060);
2152        write_define_lob_cell(&mut writer, &blob_locator_b, 96, 4096);
2153        writer.write_u8(TNS_MSG_TYPE_END_OF_RESPONSE);
2154        let payload = writer.into_bytes();
2155
2156        let owned = parse_define_fetch_response_with_context_and_limits(
2157            &payload,
2158            ClientCapabilities::default(),
2159            &columns,
2160            None,
2161            ProtocolLimits::DEFAULT,
2162        )
2163        .expect("owned define fetch CLOB/BLOB page should decode");
2164        let borrowed = parse_define_fetch_response_borrowed_with_limits(
2165            &payload,
2166            ClientCapabilities::default(),
2167            &columns,
2168            None,
2169            ProtocolLimits::DEFAULT,
2170        )
2171        .expect("borrowed define fetch CLOB/BLOB page should decode");
2172
2173        let mut borrowed_rows: Vec<Vec<Option<QueryValue>>> = Vec::new();
2174        borrowed
2175            .batch
2176            .for_each_row_ref(|row| {
2177                borrowed_rows.push(
2178                    row.iter()
2179                        .map(|cell| cell.map(|value| value.to_owned_value()))
2180                        .collect(),
2181                );
2182                Ok::<(), ProtocolError>(())
2183            })
2184            .expect("borrowed define fetch CLOB/BLOB page should iterate");
2185
2186        assert_eq!(owned.rows.len(), 2, "owned decode sees both rows");
2187        assert_eq!(
2188            borrowed.batch.row_count(),
2189            2,
2190            "borrowed decode sees both rows"
2191        );
2192        assert_eq!(
2193            borrowed_rows, owned.rows,
2194            "borrowed DefineMetadata LOB decode must match owned decode"
2195        );
2196    }
2197}
2198
2199/// Reads a VECTOR value (reference `ReadBuffer.read_vector` in `packet.pyx`).
2200/// VECTOR is sent as a fully-prefetched LOB: the image data precedes the
2201/// (discarded) LOB locator.
2202pub(crate) fn parse_vector_value(reader: &mut TtcReader<'_>) -> Result<Option<QueryValue>> {
2203    let num_bytes = reader.read_ub4()?;
2204    reader.limits().check_response_bytes(num_bytes as usize)?;
2205    if num_bytes == 0 {
2206        return Ok(None);
2207    }
2208    reader.read_ub8()?; // size (unused)
2209    reader.read_ub4()?; // chunk size (unused)
2210    let Some(data) = reader.read_bytes()? else {
2211        return Ok(None);
2212    };
2213    reader.read_bytes()?; // LOB locator (unused)
2214    if data.is_empty() {
2215        return Ok(None);
2216    }
2217    let vector = crate::vector::decode_vector_with_limits(&data, reader.limits())?;
2218    Ok(Some(QueryValue::Vector(Box::new(vector))))
2219}
2220
2221/// Parses a native JSON (`DB_TYPE_JSON`) column value. Like VECTOR, OSON is sent
2222/// as a fully-prefetched LOB: `num_bytes`, `size`, `chunk_size`, the OSON image,
2223/// then a (discarded) LOB locator (reference packet.pyx `read_oson`).
2224pub(crate) fn parse_json_value(reader: &mut TtcReader<'_>) -> Result<Option<QueryValue>> {
2225    let num_bytes = reader.read_ub4()?;
2226    reader.limits().check_response_bytes(num_bytes as usize)?;
2227    if num_bytes == 0 {
2228        return Ok(None);
2229    }
2230    reader.read_ub8()?; // size (unused)
2231    reader.read_ub4()?; // chunk size (unused)
2232    let Some(data) = reader.read_bytes()? else {
2233        return Ok(None);
2234    };
2235    reader.read_bytes()?; // LOB locator (unused)
2236    if data.is_empty() {
2237        return Ok(None);
2238    }
2239    let value = crate::oson::decode_oson_with_limits(&data, reader.limits())?;
2240    Ok(Some(QueryValue::Json(Box::new(value))))
2241}
2242
2243pub(crate) fn parse_object_value(
2244    reader: &mut TtcReader<'_>,
2245    metadata: &ColumnMetadata,
2246) -> Result<Option<QueryValue>> {
2247    let _toid = reader.read_bytes_with_length()?;
2248    let _oid = reader.read_bytes_with_length()?;
2249    let _snapshot = reader.read_bytes_with_length()?;
2250    let _version = reader.read_ub2()?;
2251    let num_bytes = reader.read_ub4()?;
2252    reader.limits().check_response_bytes(num_bytes as usize)?;
2253    reader.skip(2)?;
2254    if num_bytes == 0 {
2255        return Ok(None);
2256    }
2257    let Some(packed_data) = reader.read_bytes()? else {
2258        return Ok(None);
2259    };
2260    Ok(Some(QueryValue::Object(Box::new(ObjectValue {
2261        schema: metadata.object_schema.clone(),
2262        type_name: metadata.object_type_name.clone(),
2263        packed_data,
2264    }))))
2265}
2266
2267pub(crate) fn parse_cursor_value(reader: &mut TtcReader<'_>) -> Result<QueryValue> {
2268    reader.skip(1)?;
2269    let mut result = QueryResult::default();
2270    parse_describe_info(reader, ClientCapabilities::default(), &mut result)?;
2271    let cursor_id = u32::from(reader.read_ub2()?);
2272    Ok(QueryValue::Cursor(Box::new(CursorValue {
2273        columns: result.columns,
2274        cursor_id,
2275    })))
2276}
2277
2278pub(crate) struct QueryReturnParameters {
2279    pub row_counts: Option<Vec<u64>>,
2280    /// CQN registered-query id extracted from the registration-info block
2281    /// (reference base.pyx:1300-1309); `None` when no block was present.
2282    pub query_id: Option<u64>,
2283}
2284
2285pub(crate) fn parse_query_return_parameters(
2286    reader: &mut TtcReader<'_>,
2287    arraydmlrowcounts: bool,
2288) -> Result<QueryReturnParameters> {
2289    let num_params = reader.read_ub2()?;
2290    for _ in 0..num_params {
2291        let _value = reader.read_ub4()?;
2292    }
2293    let num_bytes = reader.read_ub2()?;
2294    if num_bytes > 0 {
2295        reader.skip(usize::from(num_bytes))?;
2296    }
2297    let num_pairs = reader.read_ub2()?;
2298    skip_keyword_value_pairs(reader, num_pairs)?;
2299    // registration info block: the trailing 8 bytes (msb at -4, lsb at -8) are
2300    // the CQN query id when a registration id was sent (reference base.pyx).
2301    let num_bytes = usize::from(reader.read_ub2()?);
2302    let mut query_id = None;
2303    if num_bytes > 0 {
2304        let block = reader.read_raw(num_bytes)?;
2305        if num_bytes >= 8 {
2306            let msb = u32::from_be_bytes([
2307                block[num_bytes - 4],
2308                block[num_bytes - 3],
2309                block[num_bytes - 2],
2310                block[num_bytes - 1],
2311            ]);
2312            let lsb = u32::from_be_bytes([
2313                block[num_bytes - 8],
2314                block[num_bytes - 7],
2315                block[num_bytes - 6],
2316                block[num_bytes - 5],
2317            ]);
2318            query_id = Some((u64::from(msb) << 32) | u64::from(lsb));
2319        }
2320    }
2321    if arraydmlrowcounts {
2322        // reference messages/base.pyx `_process_return_parameters` tail
2323        let num_rows = reader.read_ub4()?;
2324        reader.limits().check_batch_rows(num_rows as usize)?;
2325        // Each ub8 row count consumes at least one byte, so cap the reservation
2326        // by the remaining payload size (BoundedReader).
2327        let mut row_counts: Vec<u64> =
2328            reader.with_capacity_limited(num_rows as usize, 1, ProtocolLimits::check_batch_rows)?;
2329        for _ in 0..num_rows {
2330            row_counts.push(reader.read_ub8()?);
2331        }
2332        return Ok(QueryReturnParameters {
2333            row_counts: Some(row_counts),
2334            query_id,
2335        });
2336    }
2337    Ok(QueryReturnParameters {
2338        row_counts: None,
2339        query_id,
2340    })
2341}
2342
2343#[cfg(test)]
2344mod return_parameter_tests {
2345    use super::*;
2346
2347    #[test]
2348    fn registration_info_block_extracts_query_id_from_lsb_msb_tail() {
2349        let mut writer = TtcWriter::new();
2350        writer.write_ub2(0); // num params
2351        writer.write_ub2(0); // parameter bytes
2352        writer.write_ub2(0); // keyword/value pairs
2353        writer.write_ub2(8); // registration-info block bytes
2354        writer.write_raw(&[
2355            0x55, 0x66, 0x77, 0x88, // lsb
2356            0x11, 0x22, 0x33, 0x44, // msb
2357        ]);
2358        let payload = writer.into_bytes();
2359        let mut reader = TtcReader::new(&payload);
2360
2361        let params = parse_query_return_parameters(&mut reader, false).expect("return parameters");
2362
2363        assert_eq!(params.query_id, Some(0x1122_3344_5566_7788));
2364        assert_eq!(params.row_counts, None);
2365    }
2366}
2367
2368#[cfg(test)]
2369mod borrowed_fetch_tests {
2370    use super::*;
2371    use crate::thin::codecs::encode_number_text;
2372
2373    // Isomorphism proof for the `simd-decode` feature (bead rust-oracledb-63o):
2374    // `validate_utf8` must make the SAME accept/reject decision as the canonical
2375    // `core::str::from_utf8`, AND return the same `&str` on accept, for every
2376    // input — whether or not the SIMD validator is compiled in. This guards the
2377    // hot text path against any divergence in UTF-8 grammar handling.
2378    #[test]
2379    fn validate_utf8_matches_core_accept_reject() {
2380        let cases: &[&[u8]] = &[
2381            b"",
2382            b"a",
2383            b"hello world",
2384            "VARCHAR2 cell".as_bytes(),
2385            "中文 mixed \u{1f600}".as_bytes(), // CJK + emoji (4-byte)
2386            "naïve café".as_bytes(),
2387            &[0x80],                   // lone continuation byte -> reject
2388            &[0xC0, 0x80],             // overlong NUL -> reject
2389            &[0xED, 0xA0, 0x80],       // UTF-16 surrogate -> reject
2390            &[0xF4, 0x90, 0x80, 0x80], // > U+10FFFF -> reject
2391            &[0xFF],                   // invalid lead -> reject
2392            &[0xE2, 0x82],             // truncated 3-byte -> reject
2393        ];
2394        for &bytes in cases {
2395            let core = core::str::from_utf8(bytes);
2396            let ours = validate_utf8(bytes);
2397            assert_eq!(
2398                core.is_ok(),
2399                ours.is_ok(),
2400                "accept/reject diverged for {bytes:02x?}"
2401            );
2402            if let (Ok(a), Ok(b)) = (core, ours) {
2403                assert_eq!(a, b, "accepted text diverged for {bytes:02x?}");
2404            }
2405        }
2406    }
2407
2408    // Build a synthetic column metadata for a scalar type.
2409    fn col(name: &str, ora_type_num: u8, csfrm: u8, buffer_size: u32) -> ColumnMetadata {
2410        ColumnMetadata {
2411            name: name.to_string(),
2412            ora_type_num,
2413            csfrm,
2414            buffer_size,
2415            ..ColumnMetadata::default()
2416        }
2417    }
2418
2419    // Encode one row of [Text, Number, Raw, NULL-text] as the server would frame
2420    // the column values (each a `write_bytes_with_length` run that `read_bytes`
2421    // / `read_bytes_borrowed` consume identically), and return the byte offset
2422    // where the row's column values begin.
2423    fn encode_mixed_row(writer: &mut TtcWriter, text: &str, number: &str, raw: &[u8]) {
2424        writer.write_bytes_with_length(text.as_bytes()).unwrap();
2425        let num = encode_number_text(number).unwrap();
2426        writer.write_bytes_with_length(&num).unwrap();
2427        writer.write_bytes_with_length(raw).unwrap();
2428        writer.write_u8(0); // NULL column (length byte 0)
2429    }
2430
2431    // The borrowed batch decode must yield, for every cell, a value whose
2432    // `to_owned_value()` is bit-for-bit the owned-path `QueryValue`, across a
2433    // mixed Text/Number/Raw/NULL row. And the Text/Raw cells must genuinely
2434    // borrow the batch buffer (zero-copy), not a fresh allocation.
2435    #[test]
2436    fn borrowed_batch_matches_owned_path_for_mixed_row() {
2437        let columns = vec![
2438            col("T", ORA_TYPE_NUM_VARCHAR, CS_FORM_IMPLICIT, 4000),
2439            col("N", ORA_TYPE_NUM_NUMBER, CS_FORM_IMPLICIT, 22),
2440            col("R", ORA_TYPE_NUM_RAW, CS_FORM_IMPLICIT, 2000),
2441            col("Z", ORA_TYPE_NUM_VARCHAR, CS_FORM_IMPLICIT, 4000),
2442        ];
2443
2444        let mut writer = TtcWriter::new();
2445        encode_mixed_row(
2446            &mut writer,
2447            "héllo world",
2448            "-12.5",
2449            &[0xDE, 0xAD, 0xBE, 0xEF],
2450        );
2451        encode_mixed_row(&mut writer, "second", "42", &[0x01]);
2452        let buffer = writer.into_bytes();
2453        let row_starts = vec![0, {
2454            // Find the second row's start by replaying the first row's consumption.
2455            let mut reader = TtcReader::new(&buffer);
2456            for c in &columns {
2457                let _ = parse_column_value(&mut reader, c).unwrap();
2458            }
2459            reader.position()
2460        }];
2461
2462        // Owned path: decode both rows the existing way for the golden values.
2463        let owned_rows: Vec<Vec<Option<QueryValue>>> = row_starts
2464            .iter()
2465            .map(|&start| {
2466                let mut reader = TtcReader::new(&buffer[start..]);
2467                columns
2468                    .iter()
2469                    .map(|c| parse_column_value(&mut reader, c).unwrap())
2470                    .collect()
2471            })
2472            .collect();
2473
2474        // Borrowed path: decode through the batch, collecting owned copies and
2475        // proving the scalar cells borrow the buffer.
2476        let batch = BorrowedRowBatch::new(buffer.clone(), columns.clone(), row_starts);
2477        let buf_ptr_range = batch.buffer_ptr_range();
2478
2479        let mut seen_rows = 0usize;
2480        let mut borrowed_owned: Vec<Vec<Option<QueryValue>>> = Vec::new();
2481        batch
2482            .for_each_row_ref(|row| {
2483                seen_rows += 1;
2484                // Text cell borrows the buffer.
2485                if let Some(QueryValueRef::Text(t)) = row[0] {
2486                    let p = t.as_ptr() as usize;
2487                    assert!(
2488                        buf_ptr_range.contains(&p),
2489                        "Text cell must borrow the batch buffer (zero-copy)"
2490                    );
2491                }
2492                // Raw cell borrows the buffer.
2493                if let Some(QueryValueRef::Raw(r)) = row[2] {
2494                    let p = r.as_ptr() as usize;
2495                    assert!(
2496                        buf_ptr_range.contains(&p),
2497                        "Raw cell must borrow the batch buffer (zero-copy)"
2498                    );
2499                }
2500                borrowed_owned.push(
2501                    row.iter()
2502                        .map(|cell| cell.map(|v| v.to_owned_value()))
2503                        .collect(),
2504                );
2505                Ok::<(), ProtocolError>(())
2506            })
2507            .unwrap();
2508
2509        assert_eq!(seen_rows, 2, "batch yields both rows");
2510        assert_eq!(
2511            borrowed_owned, owned_rows,
2512            "borrowed cells to_owned() must equal the owned-path values"
2513        );
2514    }
2515
2516    #[test]
2517    fn borrowed_number_to_owned_matches_owned_for_trailing_zero_number() {
2518        let column = col("N", ORA_TYPE_NUM_NUMBER, CS_FORM_IMPLICIT, 22);
2519        let number = encode_number_text("1000").expect("encode trailing-zero number");
2520        let mut writer = TtcWriter::new();
2521        writer
2522            .write_bytes_with_length(&number)
2523            .expect("write framed number");
2524        let buffer = writer.into_bytes();
2525
2526        let mut owned_reader = TtcReader::new(&buffer);
2527        let owned = parse_column_value(&mut owned_reader, &column)
2528            .expect("owned decode")
2529            .expect("owned number should be non-null");
2530
2531        let batch = BorrowedRowBatch::new(buffer, vec![column], vec![0]);
2532        let mut borrowed_owned = Vec::new();
2533        batch
2534            .for_each_row_ref(|row| {
2535                borrowed_owned.push(
2536                    row[0]
2537                        .expect("borrowed number should be non-null")
2538                        .to_owned_value(),
2539                );
2540                Ok::<(), ProtocolError>(())
2541            })
2542            .expect("borrowed decode");
2543        let borrowed = borrowed_owned
2544            .pop()
2545            .expect("borrowed decode should yield one row");
2546
2547        assert_eq!(
2548            borrowed.as_number_text().as_deref(),
2549            owned.as_number_text().as_deref(),
2550            "borrowed and owned paths must expose identical canonical NUMBER text"
2551        );
2552        assert_eq!(
2553            borrowed, owned,
2554            "borrowed to_owned_value should materialize the same trailing-zero NUMBER"
2555        );
2556    }
2557
2558    #[test]
2559    fn describe_size_zero_urowid_decodes_and_preserves_following_column_alignment() {
2560        let columns = vec![
2561            col("RID", ORA_TYPE_NUM_UROWID, CS_FORM_IMPLICIT, 0),
2562            col("NEXT", ORA_TYPE_NUM_VARCHAR, CS_FORM_IMPLICIT, 4000),
2563        ];
2564        let rba: u32 = 0x0102_0304;
2565        let partition_id: u16 = 0x0506;
2566        let block_num: u32 = 0x0708_090a;
2567        let slot_num: u16 = 0x0b0c;
2568        let mut encoded_urowid = Vec::new();
2569        encoded_urowid.push(1);
2570        encoded_urowid.extend_from_slice(&rba.to_be_bytes());
2571        encoded_urowid.extend_from_slice(&partition_id.to_be_bytes());
2572        encoded_urowid.extend_from_slice(&block_num.to_be_bytes());
2573        encoded_urowid.extend_from_slice(&slot_num.to_be_bytes());
2574        let expected_rowid = encode_physical_rowid(rba, partition_id, block_num, slot_num);
2575
2576        let mut writer = TtcWriter::new();
2577        writer
2578            .write_bytes_with_length(&[1])
2579            .expect("write UROWID null probe field");
2580        writer
2581            .write_bytes_with_length(&encoded_urowid)
2582            .expect("write encoded UROWID");
2583        writer
2584            .write_bytes_with_length(b"after")
2585            .expect("write following text column");
2586        let buffer = writer.into_bytes();
2587
2588        let mut owned_reader = TtcReader::new(&buffer);
2589        let owned = columns
2590            .iter()
2591            .map(|column| parse_column_value(&mut owned_reader, column).expect("owned decode"))
2592            .collect::<Vec<_>>();
2593        assert_eq!(
2594            owned[0].as_ref().and_then(QueryValue::as_rowid),
2595            Some(expected_rowid.as_str()),
2596            "owned decode must not NULL a describe-size-0 UROWID"
2597        );
2598        assert_eq!(
2599            owned[1].as_ref().and_then(QueryValue::as_text),
2600            Some("after"),
2601            "owned decode must consume UROWID bytes before the following column"
2602        );
2603
2604        let batch = BorrowedRowBatch::new(buffer, columns, vec![0]);
2605        let mut borrowed_rows = Vec::new();
2606        batch
2607            .for_each_row_ref(|row| {
2608                borrowed_rows.push(
2609                    row.iter()
2610                        .map(|cell| cell.map(|value| value.to_owned_value()))
2611                        .collect::<Vec<_>>(),
2612                );
2613                Ok::<(), ProtocolError>(())
2614            })
2615            .expect("borrowed decode");
2616        assert_eq!(borrowed_rows, vec![owned]);
2617    }
2618
2619    // The borrowed response parser walks the *same* message framing as the owned
2620    // `parse_fetch_response_with_context` (ROW_HEADER / BIT_VECTOR / ROW_DATA /
2621    // END_OF_RESPONSE), but instead of building owned rows it captures each
2622    // row's byte offset and hands back a `BorrowedRowBatch`. Decoding that batch
2623    // must reproduce exactly what the owned fetch path produced — duplicate
2624    // columns (bit vector) and all. Fixture is the same one the owned
2625    // `fetch_response_decodes_rows_with_previous_cursor_metadata` test uses.
2626    #[test]
2627    fn borrowed_response_parse_matches_owned_fetch_path() {
2628        use hex::FromHex;
2629        let payload = Vec::from_hex("06020101000205dc0001010101000702c1041d")
2630            .expect("fixture response should be valid hex");
2631        let columns = vec![
2632            col("INTCOL", ORA_TYPE_NUM_NUMBER, CS_FORM_IMPLICIT, 22),
2633            col("NUMBERCOL", ORA_TYPE_NUM_NUMBER, CS_FORM_IMPLICIT, 22),
2634        ];
2635        let previous_row = vec![
2636            Some(QueryValue::number_from_text("2", true)),
2637            Some(QueryValue::number_from_text("0.5", false)),
2638        ];
2639
2640        // Owned golden.
2641        let owned = parse_query_response_with_context(
2642            &payload,
2643            ClientCapabilities::default(),
2644            &columns,
2645            Some(&previous_row),
2646        )
2647        .expect("owned fetch decode");
2648
2649        // Borrowed parse.
2650        let borrowed = parse_query_response_borrowed(
2651            &payload,
2652            ClientCapabilities::default(),
2653            &columns,
2654            Some(&previous_row),
2655        )
2656        .expect("borrowed fetch decode");
2657
2658        assert_eq!(borrowed.more_rows, owned.more_rows);
2659        assert_eq!(borrowed.cursor_id, owned.cursor_id);
2660        assert_eq!(borrowed.batch.row_count(), owned.rows.len());
2661
2662        let mut borrowed_owned: Vec<Vec<Option<QueryValue>>> = Vec::new();
2663        borrowed
2664            .batch
2665            .for_each_row_ref(|row| {
2666                borrowed_owned.push(
2667                    row.iter()
2668                        .map(|cell| cell.map(|v| v.to_owned_value()))
2669                        .collect(),
2670                );
2671                Ok::<(), ProtocolError>(())
2672            })
2673            .expect("iterate borrowed rows");
2674
2675        assert_eq!(
2676            borrowed_owned, owned.rows,
2677            "borrowed batch must reproduce the owned fetch rows (incl. duplicate columns)"
2678        );
2679    }
2680}
2681
2682#[cfg(test)]
2683mod out_bind_boolean_regression_tests {
2684    use super::*;
2685
2686    fn boolean_column(is_array: bool) -> ColumnMetadata {
2687        ColumnMetadata {
2688            name: "B".to_string(),
2689            ora_type_num: ORA_TYPE_NUM_BOOLEAN,
2690            is_array,
2691            ..ColumnMetadata::default()
2692        }
2693    }
2694
2695    fn boolean_value_with_negative_actual_bytes() -> Vec<u8> {
2696        let mut writer = TtcWriter::new();
2697        writer
2698            .write_bytes_with_length(&[1])
2699            .expect("write present boolean value");
2700        writer.write_sb4(-1);
2701        writer.into_bytes()
2702    }
2703
2704    #[test]
2705    fn scalar_boolean_out_bind_negative_actual_bytes_decodes_null() {
2706        let bind_columns = [boolean_column(false)];
2707        let out_bind_indexes = [0usize];
2708        let payload = boolean_value_with_negative_actual_bytes();
2709        let mut reader = TtcReader::new(&payload);
2710        let mut result = QueryResult::default();
2711
2712        parse_out_bind_row_data(&mut reader, &mut result, &bind_columns, &out_bind_indexes)
2713            .expect("parse scalar BOOLEAN OUT bind");
2714
2715        assert_eq!(result.out_values, vec![(0, None)]);
2716    }
2717
2718    #[test]
2719    fn array_boolean_out_bind_negative_actual_bytes_decodes_null_element() {
2720        let bind_columns = [boolean_column(true)];
2721        let out_bind_indexes = [0usize];
2722        let mut writer = TtcWriter::new();
2723        writer.write_ub4(1);
2724        writer
2725            .write_bytes_with_length(&[1])
2726            .expect("write present boolean array element");
2727        writer.write_sb4(-1);
2728        let payload = writer.into_bytes();
2729        let mut reader = TtcReader::new(&payload);
2730        let mut result = QueryResult::default();
2731
2732        parse_out_bind_row_data(&mut reader, &mut result, &bind_columns, &out_bind_indexes)
2733            .expect("parse array BOOLEAN OUT bind");
2734
2735        assert_eq!(
2736            result.out_values,
2737            vec![(0, Some(QueryValue::Array(vec![None])))]
2738        );
2739    }
2740
2741    #[test]
2742    fn returning_boolean_negative_actual_bytes_decodes_null() {
2743        let bind_columns = [boolean_column(false)];
2744        let output_bind_indexes = [0usize];
2745        let mut writer = TtcWriter::new();
2746        writer.write_ub4(1);
2747        writer
2748            .write_bytes_with_length(&[1])
2749            .expect("write present returning boolean value");
2750        writer.write_sb4(-1);
2751        let payload = writer.into_bytes();
2752        let mut reader = TtcReader::new(&payload);
2753        let mut result = QueryResult::default();
2754
2755        parse_returning_row_data(
2756            &mut reader,
2757            &mut result,
2758            &bind_columns,
2759            &output_bind_indexes,
2760        )
2761        .expect("parse BOOLEAN RETURNING value");
2762
2763        assert_eq!(result.return_values, vec![(0, vec![None])]);
2764    }
2765}
2766
2767#[cfg(test)]
2768mod fuzz_regression_tests {
2769    use super::*;
2770
2771    // Regression (w6-fuzz, query_response target): a TNS_MSG_TYPE_IMPLICIT_RESULTSET
2772    // message (27) whose ub4 result count was ~620M made the dispatch loop
2773    // `Vec::with_capacity` several gigabytes of `QueryValue::Cursor` before the
2774    // truncated read failed, tripping libFuzzer's OOM detector. The parser must
2775    // now fail closed (truncated payload) without the giant allocation.
2776    #[test]
2777    fn fuzz_regression_implicit_resultset_oom() {
2778        // payload: type=27, ub4 length byte 4, value 0x25000000 (~620M), then EOF
2779        let payload = [27u8, 4, 37, 0, 0, 0];
2780        let err = parse_query_response(&payload, ClientCapabilities::default())
2781            .expect_err("oversized implicit-resultset count must fail closed");
2782        assert!(
2783            matches!(
2784                err,
2785                ProtocolError::TtcDecode(_) | ProtocolError::ResourceLimit { .. }
2786            ),
2787            "expected fail-closed protocol error, got {err:?}"
2788        );
2789    }
2790
2791    // BoundedReader invariant (l2p), query-columns family: a DESCRIBE_INFO
2792    // message (16) declaring a huge num_columns (ub4 ~620M) with no column
2793    // metadata bytes following must fail closed, not pre-allocate one
2794    // ColumnMetadata per declared column. parse_describe_info grows the column
2795    // Vec via push (no speculative with_capacity), and the first
2796    // parse_column_metadata read past the end errors.
2797    #[test]
2798    fn describe_info_oversized_column_count_fails_closed_not_oom() {
2799        // type=16 DESCRIBE_INFO; describe_name read_bytes len byte 0 (null);
2800        // max_row_size ub4 = 0; num_columns ub4 (len byte 4) = 0x25000000
2801        // (~620M); then EOF before the skip(1)/column records.
2802        let payload = [16u8, 0, 0, 4, 0x25, 0x00, 0x00, 0x00];
2803        let err = parse_query_response(&payload, ClientCapabilities::default())
2804            .expect_err("oversized column count must fail closed");
2805        assert!(
2806            matches!(
2807                err,
2808                ProtocolError::TtcDecode(_) | ProtocolError::ResourceLimit { .. }
2809            ),
2810            "expected fail-closed protocol error, got {err:?}"
2811        );
2812    }
2813
2814    #[test]
2815    fn describe_info_respects_protocol_column_limit() {
2816        // type=16 DESCRIBE_INFO; describe_name null; max_row_size=0;
2817        // num_columns=2. A max_columns=1 policy should fail before any column
2818        // metadata allocation/parsing.
2819        let payload = [TNS_MSG_TYPE_DESCRIBE_INFO, 0, 0, 1, 2];
2820        let limits = ProtocolLimits {
2821            max_columns: 1,
2822            ..ProtocolLimits::DEFAULT
2823        };
2824        let err = parse_query_response_with_limits(&payload, ClientCapabilities::default(), limits)
2825            .expect_err("column count above policy must fail");
2826        assert!(
2827            matches!(
2828                err,
2829                ProtocolError::ResourceLimit {
2830                    limit: "columns",
2831                    observed: 2,
2832                    maximum: 1,
2833                }
2834            ),
2835            "expected column ResourceLimit, got {err:?}"
2836        );
2837    }
2838
2839    // BoundedReader invariant (l2p), out-bind array family: an array OUT bind
2840    // whose ub4 num_elements is enormous (~620M) but carries no element bytes
2841    // must fail closed via with_capacity_bounded + the per-element read, not
2842    // reserve gigabytes of Option<QueryValue>.
2843    #[test]
2844    fn out_bind_array_oversized_element_count_fails_closed_not_oom() {
2845        let metadata = ColumnMetadata {
2846            name: "ARR".to_string(),
2847            ora_type_num: ORA_TYPE_NUM_NUMBER,
2848            is_array: true,
2849            ..ColumnMetadata::default()
2850        };
2851        let bind_columns = [metadata];
2852        let out_bind_indexes = [0usize];
2853        // ub4 num_elements: len byte 4, value 0x25000000, then no elements.
2854        let payload = [4u8, 0x25, 0x00, 0x00, 0x00];
2855        let mut reader = TtcReader::new(&payload);
2856        let mut result = QueryResult::default();
2857        let err =
2858            parse_out_bind_row_data(&mut reader, &mut result, &bind_columns, &out_bind_indexes)
2859                .expect_err("oversized array OUT bind count must fail closed");
2860        assert!(
2861            matches!(
2862                err,
2863                ProtocolError::TtcDecode(_) | ProtocolError::ResourceLimit { .. }
2864            ),
2865            "expected fail-closed protocol error, got {err:?}"
2866        );
2867    }
2868}