Skip to main content

oracledb_protocol/thin/
fetch.rs

1#![forbid(unsafe_code)]
2
3use super::*;
4use crate::wire::ProtocolLimits;
5
6/// Validate `slice` as UTF-8 for the hot borrowed-text decode path, returning the
7/// borrowed `&str` on success or `()` on rejection (the caller falls back to the
8/// owned `TextRaw` carrier — semantics identical regardless of validator).
9///
10/// With the `simd-decode` feature this uses `simdutf8::basic::from_utf8`, whose
11/// accept/reject decision is byte-for-byte identical to `core::str::from_utf8`
12/// (it validates the exact same UTF-8 grammar — it only declines to compute the
13/// error *position*, which this path never uses). The crate stays
14/// `#![forbid(unsafe_code)]`-clean: `simdutf8`'s SIMD `unsafe` is encapsulated
15/// inside that dependency and we call only its safe API.
16#[inline]
17fn validate_utf8(slice: &[u8]) -> core::result::Result<&str, ()> {
18    #[cfg(feature = "simd-decode")]
19    {
20        simdutf8::basic::from_utf8(slice).map_err(|_| ())
21    }
22    #[cfg(not(feature = "simd-decode"))]
23    {
24        core::str::from_utf8(slice).map_err(|_| ())
25    }
26}
27
28#[derive(Clone, Copy, Debug, Eq, PartialEq)]
29pub(crate) enum LobDecodeMode {
30    PlainLocator,
31    DefineMetadata,
32}
33
34pub fn build_fetch_payload(cursor_id: u32, arraysize: u32) -> Vec<u8> {
35    build_fetch_payload_with_seq(cursor_id, arraysize, 1)
36}
37
38pub fn build_fetch_payload_with_seq(cursor_id: u32, arraysize: u32, seq_num: u8) -> Vec<u8> {
39    // Fixed tiny payload (function code + ub8 + two ub4 ≈ <=20 bytes). Prealloc
40    // so the small pushes do not grow the Vec through doublings; built every
41    // fetch page, so this matters on multi-page fetches. Bytes unchanged.
42    let mut writer = TtcWriter::with_capacity(32);
43    writer.write_function_code_with_seq(TNS_FUNC_FETCH, seq_num);
44    writer.write_ub8(0);
45    writer.write_ub4(cursor_id);
46    writer.write_ub4(arraysize);
47    writer.into_bytes()
48}
49
50pub fn build_define_fetch_payload_with_seq(
51    cursor_id: u32,
52    arraysize: u32,
53    seq_num: u8,
54    define_columns: &[ColumnMetadata],
55) -> Result<Vec<u8>> {
56    let define_count =
57        u32::try_from(define_columns.len()).map_err(|_| ProtocolError::InvalidPacketLength {
58            length: define_columns.len(),
59            minimum: 0,
60        })?;
61    let mut writer = TtcWriter::new();
62    writer.write_function_code_with_seq(TNS_FUNC_EXECUTE, seq_num);
63    writer.write_ub8(0);
64    writer.write_ub4(TNS_EXEC_OPTION_DEFINE | TNS_EXEC_OPTION_NOT_PLSQL);
65    writer.write_ub4(cursor_id);
66    writer.write_u8(0);
67    writer.write_ub4(0);
68    writer.write_u8(1);
69    writer.write_ub4(13);
70    writer.write_u8(0);
71    writer.write_u8(0);
72    writer.write_ub4(0);
73    writer.write_ub4(arraysize);
74    writer.write_ub4(TNS_MAX_LONG_LENGTH);
75    writer.write_u8(0);
76    writer.write_ub4(0);
77    writer.write_u8(0);
78    writer.write_u8(0);
79    writer.write_u8(0);
80    writer.write_u8(0);
81    writer.write_u8(0);
82    writer.write_u8(1);
83    writer.write_ub4(define_count);
84    writer.write_ub4(0);
85    writer.write_u8(0);
86    writer.write_u8(1);
87    writer.write_u8(0);
88    writer.write_ub4(0);
89    writer.write_u8(0);
90    writer.write_ub4(0);
91    writer.write_ub4(0);
92    writer.write_u8(0);
93    writer.write_ub4(0);
94    writer.write_u8(0);
95    writer.write_u8(0);
96    writer.write_ub4(0);
97    writer.write_ub4(0);
98    writer.write_ub4(0);
99    writer.write_ub4(0);
100    writer.write_ub4(0);
101    writer.write_ub4(0);
102    writer.write_ub4(0);
103    writer.write_ub4(arraysize);
104    writer.write_ub4(0);
105    writer.write_ub4(0);
106    writer.write_ub4(0);
107    writer.write_ub4(0);
108    writer.write_ub4(0);
109    writer.write_ub4(1);
110    writer.write_ub4(0);
111    writer.write_ub4(0);
112    writer.write_ub4(0);
113    writer.write_ub4(0);
114    writer.write_ub4(0);
115    for metadata in define_columns {
116        write_define_column_metadata(&mut writer, metadata);
117    }
118    Ok(writer.into_bytes())
119}
120
121pub(crate) fn write_define_column_metadata(writer: &mut TtcWriter, metadata: &ColumnMetadata) {
122    // reference base.pyx: VECTOR (and JSON) columns advertise a LOB-prefetch
123    // buffer so the server streams the image inline rather than returning a
124    // bare temp-LOB locator
125    let (mut buffer_size, cont_flags, lob_prefetch_length) = match metadata.ora_type_num {
126        ORA_TYPE_NUM_CLOB | ORA_TYPE_NUM_BLOB => (metadata.buffer_size, TNS_LOB_PREFETCH_FLAG, 0),
127        ORA_TYPE_NUM_VECTOR => (
128            TNS_VECTOR_MAX_LENGTH,
129            TNS_LOB_PREFETCH_FLAG,
130            TNS_VECTOR_MAX_LENGTH,
131        ),
132        ORA_TYPE_NUM_JSON => (
133            TNS_JSON_MAX_LENGTH,
134            TNS_LOB_PREFETCH_FLAG,
135            TNS_JSON_MAX_LENGTH,
136        ),
137        _ => (metadata.buffer_size, 0, 0),
138    };
139    buffer_size = buffer_size.max(1);
140    writer.write_u8(metadata.ora_type_num);
141    writer.write_u8(TNS_BIND_USE_INDICATORS);
142    writer.write_u8(0);
143    writer.write_u8(0);
144    writer.write_ub4(buffer_size);
145    writer.write_ub4(0);
146    writer.write_ub8(cont_flags);
147    writer.write_ub4(0);
148    writer.write_ub2(0);
149    if metadata.csfrm != 0 {
150        writer.write_ub2(TNS_CHARSET_UTF8);
151    } else {
152        writer.write_ub2(0);
153    }
154    writer.write_u8(metadata.csfrm);
155    writer.write_ub4(lob_prefetch_length);
156    writer.write_ub4(0);
157}
158
159pub fn parse_query_response(
160    payload: &[u8],
161    capabilities: ClientCapabilities,
162) -> Result<QueryResult> {
163    parse_query_response_with_previous(payload, capabilities, None)
164}
165
166pub fn parse_query_response_with_limits(
167    payload: &[u8],
168    capabilities: ClientCapabilities,
169    limits: ProtocolLimits,
170) -> Result<QueryResult> {
171    parse_query_response_with_context_binds_options_and_limits(
172        payload,
173        capabilities,
174        &[],
175        None,
176        &[],
177        &[],
178        false,
179        ExecuteOptions::default(),
180        limits,
181    )
182}
183
184pub fn parse_query_response_with_binds(
185    payload: &[u8],
186    capabilities: ClientCapabilities,
187    binds: &[BindValue],
188) -> Result<QueryResult> {
189    parse_query_response_with_binds_and_options(
190        payload,
191        capabilities,
192        binds,
193        ExecuteOptions::default(),
194    )
195}
196
197pub fn parse_query_response_with_binds_and_options(
198    payload: &[u8],
199    capabilities: ClientCapabilities,
200    binds: &[BindValue],
201    exec_options: ExecuteOptions,
202) -> Result<QueryResult> {
203    parse_query_response_with_binds_options_and_columns(
204        payload,
205        capabilities,
206        binds,
207        exec_options,
208        &[],
209    )
210}
211
212/// `known_columns` carries the fetch metadata of a re-executed statement
213/// whose response does not repeat the describe information (reference keeps
214/// the statement's fetch vars across executions).
215pub fn parse_query_response_with_binds_options_and_columns(
216    payload: &[u8],
217    capabilities: ClientCapabilities,
218    binds: &[BindValue],
219    exec_options: ExecuteOptions,
220    known_columns: &[ColumnMetadata],
221) -> Result<QueryResult> {
222    let bind_columns = binds.iter().map(bind_column_metadata).collect::<Vec<_>>();
223    let output_bind_indexes = binds
224        .iter()
225        .enumerate()
226        .filter_map(|(index, value)| value.is_return_output().then_some(index))
227        .collect::<Vec<_>>();
228    parse_query_response_with_context_binds_and_options(
229        payload,
230        capabilities,
231        known_columns,
232        None,
233        &bind_columns,
234        &output_bind_indexes,
235        false,
236        exec_options,
237    )
238}
239
240pub fn parse_query_response_with_binds_options_columns_and_limits(
241    payload: &[u8],
242    capabilities: ClientCapabilities,
243    binds: &[BindValue],
244    exec_options: ExecuteOptions,
245    known_columns: &[ColumnMetadata],
246    limits: ProtocolLimits,
247) -> Result<QueryResult> {
248    limits.check_binds(binds.len())?;
249    let bind_columns = binds.iter().map(bind_column_metadata).collect::<Vec<_>>();
250    let output_bind_indexes = binds
251        .iter()
252        .enumerate()
253        .filter_map(|(index, value)| value.is_return_output().then_some(index))
254        .collect::<Vec<_>>();
255    parse_query_response_with_context_binds_options_and_limits(
256        payload,
257        capabilities,
258        known_columns,
259        None,
260        &bind_columns,
261        &output_bind_indexes,
262        false,
263        exec_options,
264        limits,
265    )
266}
267
268pub fn parse_query_response_with_previous(
269    payload: &[u8],
270    capabilities: ClientCapabilities,
271    previous_row: Option<&[Option<QueryValue>]>,
272) -> Result<QueryResult> {
273    parse_query_response_with_context(payload, capabilities, &[], previous_row)
274}
275
276pub fn parse_query_response_with_context(
277    payload: &[u8],
278    capabilities: ClientCapabilities,
279    previous_columns: &[ColumnMetadata],
280    previous_row: Option<&[Option<QueryValue>]>,
281) -> Result<QueryResult> {
282    parse_query_response_with_context_and_binds(
283        payload,
284        capabilities,
285        previous_columns,
286        previous_row,
287        &[],
288        &[],
289        false,
290        ProtocolLimits::DEFAULT,
291    )
292}
293
294pub fn parse_fetch_response_with_context(
295    payload: &[u8],
296    capabilities: ClientCapabilities,
297    previous_columns: &[ColumnMetadata],
298    previous_row: Option<&[Option<QueryValue>]>,
299) -> Result<QueryResult> {
300    parse_fetch_response_with_context_and_limits(
301        payload,
302        capabilities,
303        previous_columns,
304        previous_row,
305        ProtocolLimits::DEFAULT,
306    )
307}
308
309pub fn parse_fetch_response_with_context_and_limits(
310    payload: &[u8],
311    capabilities: ClientCapabilities,
312    previous_columns: &[ColumnMetadata],
313    previous_row: Option<&[Option<QueryValue>]>,
314    limits: ProtocolLimits,
315) -> Result<QueryResult> {
316    parse_query_response_with_context_binds_options_lob_mode_and_limits(
317        payload,
318        capabilities,
319        previous_columns,
320        previous_row,
321        &[],
322        &[],
323        true,
324        ExecuteOptions::default(),
325        LobDecodeMode::PlainLocator,
326        limits,
327    )
328}
329
330pub fn parse_define_fetch_response_with_context_and_limits(
331    payload: &[u8],
332    capabilities: ClientCapabilities,
333    previous_columns: &[ColumnMetadata],
334    previous_row: Option<&[Option<QueryValue>]>,
335    limits: ProtocolLimits,
336) -> Result<QueryResult> {
337    parse_query_response_with_context_binds_options_lob_mode_and_limits(
338        payload,
339        capabilities,
340        previous_columns,
341        previous_row,
342        &[],
343        &[],
344        true,
345        ExecuteOptions::default(),
346        LobDecodeMode::DefineMetadata,
347        limits,
348    )
349}
350
351#[allow(clippy::too_many_arguments)] // mirrors the reference message attribute set
352pub(crate) fn parse_query_response_with_context_and_binds(
353    payload: &[u8],
354    capabilities: ClientCapabilities,
355    previous_columns: &[ColumnMetadata],
356    previous_row: Option<&[Option<QueryValue>]>,
357    bind_columns: &[ColumnMetadata],
358    output_bind_indexes: &[usize],
359    fetch_long_status: bool,
360    limits: ProtocolLimits,
361) -> Result<QueryResult> {
362    parse_query_response_with_context_binds_options_and_limits(
363        payload,
364        capabilities,
365        previous_columns,
366        previous_row,
367        bind_columns,
368        output_bind_indexes,
369        fetch_long_status,
370        ExecuteOptions::default(),
371        limits,
372    )
373}
374
375#[allow(clippy::too_many_arguments)] // mirrors the reference message attribute set
376pub(crate) fn parse_query_response_with_context_binds_and_options(
377    payload: &[u8],
378    capabilities: ClientCapabilities,
379    previous_columns: &[ColumnMetadata],
380    previous_row: Option<&[Option<QueryValue>]>,
381    bind_columns: &[ColumnMetadata],
382    output_bind_indexes: &[usize],
383    fetch_long_status: bool,
384    exec_options: ExecuteOptions,
385) -> Result<QueryResult> {
386    parse_query_response_with_context_binds_options_and_limits(
387        payload,
388        capabilities,
389        previous_columns,
390        previous_row,
391        bind_columns,
392        output_bind_indexes,
393        fetch_long_status,
394        exec_options,
395        ProtocolLimits::DEFAULT,
396    )
397}
398
399#[allow(clippy::too_many_arguments)] // mirrors the reference message attribute set
400pub(crate) fn parse_query_response_with_context_binds_options_and_limits(
401    payload: &[u8],
402    capabilities: ClientCapabilities,
403    previous_columns: &[ColumnMetadata],
404    previous_row: Option<&[Option<QueryValue>]>,
405    bind_columns: &[ColumnMetadata],
406    output_bind_indexes: &[usize],
407    fetch_long_status: bool,
408    exec_options: ExecuteOptions,
409    limits: ProtocolLimits,
410) -> Result<QueryResult> {
411    parse_query_response_with_context_binds_options_lob_mode_and_limits(
412        payload,
413        capabilities,
414        previous_columns,
415        previous_row,
416        bind_columns,
417        output_bind_indexes,
418        fetch_long_status,
419        exec_options,
420        LobDecodeMode::DefineMetadata,
421        limits,
422    )
423}
424
425#[allow(clippy::too_many_arguments)] // mirrors the reference message attribute set
426fn parse_query_response_with_context_binds_options_lob_mode_and_limits(
427    payload: &[u8],
428    capabilities: ClientCapabilities,
429    previous_columns: &[ColumnMetadata],
430    previous_row: Option<&[Option<QueryValue>]>,
431    bind_columns: &[ColumnMetadata],
432    output_bind_indexes: &[usize],
433    fetch_long_status: bool,
434    exec_options: ExecuteOptions,
435    lob_decode_mode: LobDecodeMode,
436    limits: ProtocolLimits,
437) -> Result<QueryResult> {
438    let mut reader = TtcReader::with_limits(payload, limits)?;
439    let mut result = QueryResult {
440        columns: previous_columns.to_vec(),
441        more_rows: true,
442        ..QueryResult::default()
443    };
444    // A re-executed cursor whose column type changed to CLOB/BLOB but was
445    // previously fetched as CHAR/VARCHAR/RAW streams the value in LONG/LONG RAW
446    // form (see `adjust_refetch_metadata`), which carries the LONG status
447    // trailer (null indicator + return code) after each value — even on the
448    // execute path that otherwise passes `fetch_long_status = false`. Promote
449    // the flag when such an adjustment fires so `parse_row_data` consumes that
450    // trailer instead of mis-framing the next message (bead rust-oracledb-f0ad).
451    let mut fetch_long_status = fetch_long_status;
452    let mut bit_vector: Option<Vec<u8>> = None;
453    let mut out_bind_indexes: Vec<usize> = Vec::new();
454    while reader.remaining() > 0 {
455        let message_type = reader.read_u8()?;
456        match message_type {
457            0 => {}
458            TNS_MSG_TYPE_DESCRIBE_INFO => {
459                let _describe_name = reader.read_bytes()?;
460                let previous = std::mem::take(&mut result.columns);
461                parse_describe_info(&mut reader, capabilities, &mut result)?;
462                // re-executing an open cursor whose underlying types changed:
463                // the server re-describes mid-response but still streams the
464                // row data in the adjusted (LONG/LONG RAW) form expected by
465                // the previous fetch metadata (reference `_adjust_metadata`,
466                // impl/thin/messages/base.pyx:820-845, applied during
467                // `_process_describe_info`).
468                for (index, column) in result.columns.iter_mut().enumerate() {
469                    if let Some(prev) = previous.get(index) {
470                        if adjust_refetch_metadata(prev, column) {
471                            // the adjusted column (now LONG / LONG RAW) is
472                            // streamed with the LONG status trailer.
473                            fetch_long_status = true;
474                        }
475                    }
476                }
477            }
478            TNS_MSG_TYPE_ROW_HEADER => {
479                bit_vector = parse_row_header(&mut reader)?;
480            }
481            TNS_MSG_TYPE_ROW_DATA => {
482                if result.columns.is_empty() && !out_bind_indexes.is_empty() {
483                    parse_out_bind_row_data(
484                        &mut reader,
485                        &mut result,
486                        bind_columns,
487                        &out_bind_indexes,
488                    )?;
489                } else if result.columns.is_empty() && !output_bind_indexes.is_empty() {
490                    parse_returning_row_data(
491                        &mut reader,
492                        &mut result,
493                        bind_columns,
494                        output_bind_indexes,
495                    )?;
496                } else {
497                    parse_row_data(
498                        &mut reader,
499                        &mut result,
500                        bit_vector.as_deref(),
501                        previous_row,
502                        fetch_long_status,
503                        lob_decode_mode,
504                    )?;
505                }
506                bit_vector = None;
507            }
508            TNS_MSG_TYPE_BIT_VECTOR => {
509                bit_vector = Some(parse_bit_vector(&mut reader, result.columns.len())?);
510            }
511            TNS_MSG_TYPE_PARAMETER => {
512                let params =
513                    parse_query_return_parameters(&mut reader, exec_options.arraydmlrowcounts)?;
514                if exec_options.arraydmlrowcounts {
515                    result.array_dml_row_counts = Some(params.row_counts.unwrap_or_default());
516                }
517                if params.query_id.is_some() {
518                    result.query_id = params.query_id;
519                }
520            }
521            TNS_MSG_TYPE_STATUS => {
522                let call_status = reader.read_ub4()?;
523                let _seq = reader.read_ub2()?;
524                result.txn_in_progress = Some(call_status & TNS_EOCS_FLAGS_TXN_IN_PROGRESS != 0);
525            }
526            TNS_MSG_TYPE_IO_VECTOR => {
527                out_bind_indexes = parse_io_vector(&mut reader, bind_columns.len())?
528                    .into_iter()
529                    .filter(|index| !output_bind_indexes.contains(index))
530                    .collect();
531            }
532            TNS_MSG_TYPE_FLUSH_OUT_BINDS => break,
533            TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
534                if let Some(update) = skip_server_side_piggyback(&mut reader)? {
535                    result.sessionless_txn_state = Some(update);
536                }
537            }
538            TNS_MSG_TYPE_IMPLICIT_RESULTSET => {
539                // reference messages/base.pyx `_process_implicit_result`
540                let num_results = reader.read_ub4()?;
541                // `num_results` is read straight off the wire (a ub4, up to
542                // ~4e9); each resultset consumes at least one byte, so cap the
543                // reservation by the bytes left in the payload (BoundedReader).
544                // Without this a hostile server forces a multi-gigabyte
545                // allocation (OOM) before the truncated read in the loop body
546                // fails closed.
547                let mut resultsets: Vec<QueryValue> = reader.with_capacity_limited(
548                    num_results as usize,
549                    1,
550                    ProtocolLimits::check_length_prefixed_elements,
551                )?;
552                for _ in 0..num_results {
553                    let num_bytes = reader.read_u8()?;
554                    reader.skip(usize::from(num_bytes))?;
555                    let mut child = QueryResult::default();
556                    parse_describe_info(&mut reader, capabilities, &mut child)?;
557                    let child_cursor_id = u32::from(reader.read_ub2()?);
558                    resultsets.push(QueryValue::Cursor(Box::new(CursorValue {
559                        columns: child.columns,
560                        cursor_id: child_cursor_id,
561                    })));
562                }
563                result.implicit_resultsets = Some(resultsets);
564            }
565            TNS_MSG_TYPE_END_OF_RESPONSE => break,
566            // pipeline responses open with the token of the operation they
567            // answer (messages/base.pyx:288-293); callers compare it against
568            // the expected token (mismatch -> DPY-2052 at the driver layer)
569            TNS_MSG_TYPE_TOKEN => {
570                result.token_num = Some(reader.read_ub8()?);
571            }
572            TNS_MSG_TYPE_ERROR => {
573                let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
574                // The end-of-call ERROR message (number 0 on success) carries
575                // the end-of-call status; sample the transaction-in-progress bit
576                // (reference protocol.pyx `_process_call_status`).
577                result.txn_in_progress =
578                    Some(info.call_status & TNS_EOCS_FLAGS_TXN_IN_PROGRESS != 0);
579                if info.cursor_id != 0 {
580                    result.cursor_id = u32::from(info.cursor_id);
581                }
582                result.row_count = info.row_count;
583                result.compilation_error_warning |= info.compilation_error_warning;
584                result.last_rowid = info.rowid.clone();
585                if info.number == TNS_ERR_NO_DATA_FOUND && !result.columns.is_empty() {
586                    result.more_rows = false;
587                } else if info.number == TNS_ERR_ARRAY_DML_ERRORS {
588                    // executemany(batcherrors=True): errors are reported via
589                    // the batch error arrays instead of raising ORA-24381
590                    // (reference messages/base.pyx `_process_error_info`).
591                    result.batch_errors = info.batch_errors;
592                } else if info.number != 0 {
593                    let mut details = info.into_details();
594                    details.array_dml_row_counts = result.array_dml_row_counts.take();
595                    return Err(ProtocolError::ServerErrorInfo(Box::new(details)));
596                }
597            }
598            _ => {
599                let position = reader.position().saturating_sub(1);
600                if let Some(message) =
601                    find_embedded_server_error(payload, capabilities.ttc_field_version, position)
602                {
603                    return Err(ProtocolError::ServerError(message));
604                }
605                return Err(ProtocolError::UnknownMessageType {
606                    message_type,
607                    position,
608                });
609            }
610        }
611    }
612    Ok(result)
613}
614
615pub(crate) fn bind_column_metadata(value: &BindValue) -> ColumnMetadata {
616    let (ora_type_num, csfrm, buffer_size) = bind_metadata(value);
617    let object_schema = match value {
618        BindValue::ObjectOutput { schema, .. } | BindValue::ObjectInput { schema, .. } => {
619            Some(schema.clone())
620        }
621        _ => None,
622    };
623    let object_type_name = match value {
624        BindValue::ObjectOutput { type_name, .. } | BindValue::ObjectInput { type_name, .. } => {
625            Some(type_name.clone())
626        }
627        _ => None,
628    };
629    ColumnMetadata {
630        name: String::new(),
631        ora_type_num,
632        csfrm,
633        precision: 0,
634        scale: 0,
635        buffer_size,
636        max_size: buffer_size,
637        nulls_allowed: true,
638        is_json: false,
639        is_oson: false,
640        object_schema,
641        object_type_name,
642        is_array: matches!(value, BindValue::Array { .. }),
643        vector_dimensions: None,
644        vector_format: 0,
645        vector_flags: 0,
646        domain_schema: None,
647        domain_name: None,
648        annotations: None,
649    }
650}
651
652pub(crate) fn parse_io_vector(reader: &mut TtcReader<'_>, bind_count: usize) -> Result<Vec<usize>> {
653    let _flags = reader.read_u8()?;
654    let temp16 = reader.read_ub2()?;
655    let temp32 = reader.read_ub4()?;
656    let num_binds = usize::try_from(temp32)
657        .map_err(|_| ProtocolError::InvalidPacketLength {
658            length: usize::MAX,
659            minimum: 0,
660        })?
661        .checked_mul(256)
662        .and_then(|value| value.checked_add(usize::from(temp16)))
663        .ok_or(ProtocolError::InvalidPacketLength {
664            length: usize::MAX,
665            minimum: 0,
666        })?;
667    let _num_iters_this_time = reader.read_ub4()?;
668    let _uac_buffer_length = reader.read_ub2()?;
669    let fast_fetch_len = reader.read_ub2()?;
670    if fast_fetch_len > 0 {
671        reader.skip(usize::from(fast_fetch_len))?;
672    }
673    let rowid_len = reader.read_ub2()?;
674    if rowid_len > 0 {
675        reader.skip(usize::from(rowid_len))?;
676    }
677    let mut out_indexes = Vec::new();
678    reader.limits().check_binds(num_binds)?;
679    for index in 0..num_binds {
680        let direction = reader.read_u8()?;
681        if index < bind_count && direction != TNS_BIND_DIR_INPUT {
682            out_indexes.push(index);
683        }
684    }
685    Ok(out_indexes)
686}
687
688pub(crate) fn find_embedded_server_error(
689    payload: &[u8],
690    ttc_field_version: u8,
691    position: usize,
692) -> Option<String> {
693    let start = position.saturating_sub(64);
694    for candidate in start..=position {
695        if !matches!(payload.get(candidate).copied(), Some(TNS_MSG_TYPE_ERROR)) {
696            continue;
697        }
698        let mut reader = TtcReader::new(payload.get(candidate + 1..)?);
699        let info = parse_server_error_info(&mut reader, ttc_field_version).ok()?;
700        if info.number != 0 && info.message.starts_with("ORA-") {
701            return Some(info.message);
702        }
703    }
704    None
705}
706
707pub(crate) fn parse_describe_info(
708    reader: &mut TtcReader<'_>,
709    capabilities: ClientCapabilities,
710    result: &mut QueryResult,
711) -> Result<()> {
712    let _max_row_size = reader.read_ub4()?;
713    let num_columns = reader.read_ub4()?;
714    reader.limits().check_columns(num_columns as usize)?;
715    result.columns.clear();
716    if num_columns > 0 {
717        reader.skip(1)?;
718    }
719    for _ in 0..num_columns {
720        result
721            .columns
722            .push(parse_column_metadata(reader, capabilities)?);
723    }
724    let _current_date = reader.read_bytes_with_length()?;
725    let _dcbflag = reader.read_ub4()?;
726    let _dcbmdbz = reader.read_ub4()?;
727    let _dcbmnpr = reader.read_ub4()?;
728    let _dcbmxpr = reader.read_ub4()?;
729    let _dcbqcky = reader.read_bytes_with_length()?;
730    Ok(())
731}
732
733pub(crate) fn parse_column_metadata(
734    reader: &mut TtcReader<'_>,
735    capabilities: ClientCapabilities,
736) -> Result<ColumnMetadata> {
737    let ora_type_num = reader.read_u8()?;
738    reader.skip(1)?;
739    let precision = reader.read_i8()?;
740    let scale = reader.read_i8()?;
741    let buffer_size = reader.read_ub4()?;
742    let _max_array_elements = reader.read_ub4()?;
743    let _cont_flags = reader.read_ub8()?;
744    let _oid = reader.read_bytes_with_length()?;
745    let _version = reader.read_ub2()?;
746    let _charset_id = reader.read_ub2()?;
747    let csfrm = reader.read_u8()?;
748    let mut max_size = reader.read_ub4()?;
749    if ora_type_num == ORA_TYPE_NUM_RAW {
750        max_size = buffer_size;
751    }
752    if capabilities.ttc_field_version >= TNS_CCAP_FIELD_VERSION_12_2 {
753        let _oaccolid = reader.read_ub4()?;
754    }
755    let nulls_allowed = reader.read_u8()? != 0;
756    reader.skip(1)?;
757    let name = reader.read_string_with_length()?.unwrap_or_default();
758    let object_schema = reader.read_string_with_length()?;
759    let object_type_name = reader.read_string_with_length()?;
760    let _column_position = reader.read_ub2()?;
761    let uds_flags = reader.read_ub4()?;
762    let mut domain_schema = None;
763    let mut domain_name = None;
764    let mut annotations: Option<Vec<(String, String)>> = None;
765    if capabilities.ttc_field_version >= TNS_CCAP_FIELD_VERSION_23_1 {
766        domain_schema = reader.read_string_with_length()?;
767        domain_name = reader.read_string_with_length()?;
768    }
769    if capabilities.ttc_field_version >= TNS_CCAP_FIELD_VERSION_23_1_EXT_3 {
770        let num_annotations = reader.read_ub4()?;
771        if num_annotations > 0 {
772            reader.skip(1)?;
773            let num_annotations = reader.read_ub4()?;
774            reader.skip(1)?;
775            // Bound by remaining bytes (BoundedReader): each annotation reads
776            // at least a length-prefixed key/value, so a ub4 count larger than
777            // the payload is a lie that must not pre-allocate gigabytes.
778            let mut collected: Vec<(String, String)> = reader.with_capacity_limited(
779                num_annotations as usize,
780                1,
781                ProtocolLimits::check_object_elements,
782            )?;
783            for _ in 0..num_annotations {
784                let key = reader.read_string_with_length()?.unwrap_or_default();
785                // A null annotation value is normalized to "" by the reference
786                // driver (python-oracledb base.pyx _process_metadata).
787                let value = reader.read_string_with_length()?.unwrap_or_default();
788                let _flags = reader.read_ub4()?;
789                collected.push((key, value));
790            }
791            let _flags = reader.read_ub4()?;
792            annotations = Some(collected);
793        }
794    }
795    let mut vector_dimensions = None;
796    let mut vector_format = 0u8;
797    let mut vector_flags = 0u8;
798    if capabilities.ttc_field_version >= TNS_CCAP_FIELD_VERSION_23_4 {
799        // reference metadata.pyx: ub4 dimensions, ub1 format, ub1 flags
800        let dims = reader.read_ub4()?;
801        reader.limits().check_vector_dimensions(dims as usize)?;
802        vector_format = reader.read_u8()?;
803        vector_flags = reader.read_u8()?;
804        if ora_type_num == ORA_TYPE_NUM_VECTOR {
805            vector_dimensions = Some(dims);
806        }
807    }
808
809    Ok(ColumnMetadata {
810        name,
811        ora_type_num,
812        csfrm,
813        precision,
814        scale,
815        buffer_size,
816        max_size,
817        nulls_allowed,
818        is_json: uds_flags & TNS_UDS_FLAGS_IS_JSON != 0,
819        is_oson: uds_flags & TNS_UDS_FLAGS_IS_OSON != 0,
820        object_schema,
821        object_type_name,
822        is_array: false,
823        vector_dimensions,
824        vector_format,
825        vector_flags,
826        domain_schema,
827        domain_name,
828        annotations,
829    })
830}
831
832pub(crate) fn parse_row_header(reader: &mut TtcReader<'_>) -> Result<Option<Vec<u8>>> {
833    reader.skip(1)?;
834    let _num_requests = reader.read_ub2()?;
835    let _iteration_number = reader.read_ub4()?;
836    let _num_iters = reader.read_ub4()?;
837    let _buffer_length = reader.read_ub2()?;
838    let num_bytes = reader.read_ub4()?;
839    let bit_vector = if num_bytes > 0 {
840        reader.skip(1)?;
841        Some(reader.read_raw(num_bytes as usize)?.to_vec())
842    } else {
843        None
844    };
845    let _rxhrid = reader.read_bytes_with_length()?;
846    Ok(bit_vector)
847}
848
849pub(crate) fn parse_bit_vector(reader: &mut TtcReader<'_>, num_columns: usize) -> Result<Vec<u8>> {
850    let _num_columns_sent = reader.read_ub2()?;
851    let num_bytes = num_columns.div_ceil(8);
852    Ok(reader.read_raw(num_bytes)?.to_vec())
853}
854
855pub(crate) fn parse_row_data(
856    reader: &mut TtcReader<'_>,
857    result: &mut QueryResult,
858    bit_vector: Option<&[u8]>,
859    previous_row: Option<&[Option<QueryValue>]>,
860    fetch_long_status: bool,
861    lob_decode_mode: LobDecodeMode,
862) -> Result<()> {
863    let mut row = Vec::with_capacity(result.columns.len());
864    for (index, metadata) in result.columns.iter().enumerate() {
865        if is_duplicate_column(bit_vector, index) {
866            let previous = result
867                .rows
868                .last()
869                .map(Vec::as_slice)
870                .or(previous_row)
871                .and_then(|last| last.get(index))
872                .cloned()
873                .ok_or(ProtocolError::TtcDecode(
874                    "duplicate row data without previous row",
875                ))?;
876            row.push(previous);
877            continue;
878        }
879        row.push(parse_column_value_with_lob_mode(
880            reader,
881            metadata,
882            lob_decode_mode,
883        )?);
884        if fetch_long_status
885            && matches!(
886                metadata.ora_type_num,
887                ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW
888            )
889        {
890            let _null_indicator = reader.read_sb4()?;
891            let _return_code = reader.read_ub4()?;
892        }
893    }
894    result.rows.push(row);
895    Ok(())
896}
897
898pub(crate) fn parse_out_bind_row_data(
899    reader: &mut TtcReader<'_>,
900    result: &mut QueryResult,
901    bind_columns: &[ColumnMetadata],
902    out_bind_indexes: &[usize],
903) -> Result<()> {
904    for index in out_bind_indexes {
905        let metadata = bind_columns.get(*index).ok_or(ProtocolError::TtcDecode(
906            "out bind index without bind metadata",
907        ))?;
908        if metadata.is_array {
909            let num_elements = usize::try_from(reader.read_ub4()?).map_err(|_| {
910                ProtocolError::InvalidPacketLength {
911                    length: usize::MAX,
912                    minimum: 0,
913                }
914            })?;
915            reader.limits().check_batch_rows(num_elements)?;
916            // Cap by remaining bytes (BoundedReader): each element consumes
917            // wire data, so a ub4 count cannot legitimately exceed the payload.
918            let mut values: Vec<Option<QueryValue>> =
919                reader.with_capacity_limited(num_elements, 1, ProtocolLimits::check_batch_rows)?;
920            for _ in 0..num_elements {
921                let value = parse_column_value(reader, metadata)?;
922                let actual_num_bytes = reader.read_sb4()?;
923                values.push(apply_out_bind_actual_num_bytes(
924                    metadata,
925                    value,
926                    actual_num_bytes,
927                    "truncated array OUT bind value",
928                )?);
929            }
930            result
931                .out_values
932                .push((*index, Some(QueryValue::Array(values))));
933            continue;
934        }
935        let value = parse_column_value(reader, metadata)?;
936        let actual_num_bytes = reader.read_sb4()?;
937        result.out_values.push((
938            *index,
939            apply_out_bind_actual_num_bytes(
940                metadata,
941                value,
942                actual_num_bytes,
943                "truncated OUT bind value",
944            )?,
945        ));
946    }
947    Ok(())
948}
949
950pub(crate) fn parse_returning_row_data(
951    reader: &mut TtcReader<'_>,
952    result: &mut QueryResult,
953    bind_columns: &[ColumnMetadata],
954    output_bind_indexes: &[usize],
955) -> Result<()> {
956    for index in output_bind_indexes {
957        let metadata = bind_columns.get(*index).ok_or(ProtocolError::TtcDecode(
958            "return bind index without bind metadata",
959        ))?;
960        let num_rows = usize::try_from(reader.read_ub4()?).map_err(|_| {
961            ProtocolError::InvalidPacketLength {
962                length: usize::MAX,
963                minimum: 0,
964            }
965        })?;
966        reader.limits().check_batch_rows(num_rows)?;
967        // Cap by remaining bytes (BoundedReader); see the OOM note above.
968        let mut values: Vec<Option<QueryValue>> =
969            reader.with_capacity_limited(num_rows, 1, ProtocolLimits::check_batch_rows)?;
970        for _ in 0..num_rows {
971            let value = parse_column_value(reader, metadata)?;
972            let actual_num_bytes = reader.read_sb4()?;
973            values.push(apply_out_bind_actual_num_bytes(
974                metadata,
975                value,
976                actual_num_bytes,
977                "truncated DML RETURNING value",
978            )?);
979        }
980        result.return_values.push((*index, values));
981    }
982    Ok(())
983}
984
985fn apply_out_bind_actual_num_bytes(
986    metadata: &ColumnMetadata,
987    value: Option<QueryValue>,
988    actual_num_bytes: i32,
989    truncation_error: &'static str,
990) -> Result<Option<QueryValue>> {
991    if actual_num_bytes < 0 && metadata.ora_type_num == ORA_TYPE_NUM_BOOLEAN {
992        return Ok(None);
993    }
994    if actual_num_bytes != 0 && value.is_some() {
995        return Err(ProtocolError::TtcDecode(truncation_error));
996    }
997    Ok(value)
998}
999
1000pub(crate) fn is_duplicate_column(bit_vector: Option<&[u8]>, column_num: usize) -> bool {
1001    let Some(bit_vector) = bit_vector else {
1002        return false;
1003    };
1004    let byte_num = column_num / 8;
1005    let bit_num = column_num % 8;
1006    bit_vector
1007        .get(byte_num)
1008        .is_some_and(|byte| byte & (1 << bit_num) == 0)
1009}
1010
1011pub(crate) fn parse_column_value(
1012    reader: &mut TtcReader<'_>,
1013    metadata: &ColumnMetadata,
1014) -> Result<Option<QueryValue>> {
1015    parse_column_value_with_lob_mode(reader, metadata, LobDecodeMode::DefineMetadata)
1016}
1017
1018fn parse_column_value_with_lob_mode(
1019    reader: &mut TtcReader<'_>,
1020    metadata: &ColumnMetadata,
1021    lob_decode_mode: LobDecodeMode,
1022) -> Result<Option<QueryValue>> {
1023    if metadata.buffer_size == 0
1024        && !matches!(
1025            metadata.ora_type_num,
1026            ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW | ORA_TYPE_NUM_UROWID
1027        )
1028    {
1029        return Ok(None);
1030    }
1031    match metadata.ora_type_num {
1032        ORA_TYPE_NUM_VARCHAR | ORA_TYPE_NUM_CHAR | ORA_TYPE_NUM_LONG => {
1033            let Some(bytes) = reader.read_bytes()? else {
1034                return Ok(None);
1035            };
1036            match decode_text_value(&bytes, metadata.csfrm) {
1037                Ok(value) => Ok(Some(QueryValue::Text(value))),
1038                // preserve the raw bytes so the caller can honor the
1039                // configured encoding_errors policy (or raise a Python
1040                // UnicodeDecodeError as the reference does)
1041                Err(ProtocolError::TtcDecode(_)) => Ok(Some(QueryValue::TextRaw {
1042                    bytes,
1043                    csfrm: metadata.csfrm,
1044                })),
1045                Err(err) => Err(err),
1046            }
1047        }
1048        ORA_TYPE_NUM_RAW | ORA_TYPE_NUM_LONG_RAW => Ok(reader.read_bytes()?.map(QueryValue::Raw)),
1049        ORA_TYPE_NUM_ROWID => parse_rowid_value(reader).map(|value| value.map(QueryValue::Rowid)),
1050        ORA_TYPE_NUM_UROWID => parse_urowid_value(reader).map(|value| value.map(QueryValue::Rowid)),
1051        ORA_TYPE_NUM_NUMBER | ORA_TYPE_NUM_BINARY_INTEGER => {
1052            let Some(bytes) = reader.read_bytes()? else {
1053                return Ok(None);
1054            };
1055            decode_number_value(&bytes).map(Some)
1056        }
1057        ORA_TYPE_NUM_BINARY_DOUBLE => {
1058            let Some(bytes) = reader.read_bytes()? else {
1059                return Ok(None);
1060            };
1061            decode_binary_double(&bytes)
1062                .map(|value| Some(QueryValue::BinaryDouble(value.to_string())))
1063        }
1064        ORA_TYPE_NUM_BINARY_FLOAT => {
1065            let Some(bytes) = reader.read_bytes()? else {
1066                return Ok(None);
1067            };
1068            // f64-widened text matches Python float semantics for BINARY_FLOAT
1069            decode_binary_float(&bytes)
1070                .map(|value| Some(QueryValue::BinaryDouble(f64::from(value).to_string())))
1071        }
1072        ORA_TYPE_NUM_BOOLEAN => {
1073            let Some(bytes) = reader.read_bytes()? else {
1074                return Ok(None);
1075            };
1076            // reference read_bool: last byte == 1 means true; native
1077            // DB_TYPE_BOOLEAN surfaces as a Python bool.
1078            let is_true = matches!(bytes.last(), Some(&1));
1079            Ok(Some(QueryValue::Boolean(is_true)))
1080        }
1081        ORA_TYPE_NUM_INTERVAL_DS => {
1082            let Some(bytes) = reader.read_bytes()? else {
1083                return Ok(None);
1084            };
1085            decode_interval_ds(&bytes).map(Some)
1086        }
1087        ORA_TYPE_NUM_INTERVAL_YM => {
1088            let Some(bytes) = reader.read_bytes()? else {
1089                return Ok(None);
1090            };
1091            decode_interval_ym(&bytes).map(Some)
1092        }
1093        ORA_TYPE_NUM_DATE
1094        | ORA_TYPE_NUM_TIMESTAMP
1095        | ORA_TYPE_NUM_TIMESTAMP_LTZ
1096        | ORA_TYPE_NUM_TIMESTAMP_TZ => {
1097            let Some(bytes) = reader.read_bytes()? else {
1098                return Ok(None);
1099            };
1100            decode_datetime_value(&bytes).map(Some)
1101        }
1102        ORA_TYPE_NUM_CLOB | ORA_TYPE_NUM_BLOB | ORA_TYPE_NUM_BFILE => {
1103            parse_lob_value(reader, metadata, lob_decode_mode)
1104        }
1105        ORA_TYPE_NUM_VECTOR => parse_vector_value(reader),
1106        ORA_TYPE_NUM_JSON => parse_json_value(reader),
1107        ORA_TYPE_NUM_CURSOR => parse_cursor_value(reader).map(Some),
1108        ORA_TYPE_NUM_OBJECT => parse_object_value(reader, metadata),
1109        _ => Err(ProtocolError::UnsupportedFeature("query column type")),
1110    }
1111}
1112
1113/// A column value decoded in pass 1 of the borrowed row decode. Scalar values
1114/// that borrow the wire buffer are held directly; values that need a small
1115/// owned arena (synthesized `Number` text, or a cold owned [`QueryValue`]) are
1116/// recorded as a deferred handle into the per-row arena and resolved in pass 2
1117/// once the arena is frozen. This two-pass split is what keeps the borrowed
1118/// path sound under `#![forbid(unsafe_code)]`: no `&str`/`&[u8]` is ever held
1119/// into an arena that is still being grown.
1120enum ColumnSlot<'buf> {
1121    /// SQL NULL.
1122    Null,
1123    /// A value that borrows the wire buffer (or is a small `Copy` value).
1124    Wire(QueryValueRef<'buf>),
1125    /// A `NUMBER` whose canonical text lives at `arena[range]` in the per-row
1126    /// number-text arena.
1127    Number {
1128        range: core::ops::Range<usize>,
1129        is_integer: bool,
1130    },
1131    /// A cold / non-borrowable value parked at `owned[index]` in the per-row
1132    /// owned arena.
1133    Owned(usize),
1134}
1135
1136/// Decode one column into a [`ColumnSlot`], borrowing the wire buffer for the
1137/// hot scalar cases and appending to the per-row arenas for the deferred ones.
1138/// Mirrors [`parse_column_value`] type-for-type; the produced owned value (via
1139/// [`QueryValueRef::to_owned_value`]) is identical to the owned path.
1140///
1141/// `digits` is a caller-owned scratch buffer reused across all cells so the
1142/// per-cell `NUMBER` decode allocates nothing of its own (it writes straight
1143/// into `number_arena`). The hot scalar grid (Text/Raw) borrows the wire buffer
1144/// directly with zero allocation.
1145fn parse_column_slot<'buf>(
1146    reader: &mut TtcReader<'buf>,
1147    metadata: &ColumnMetadata,
1148    number_arena: &mut String,
1149    owned_arena: &mut Vec<QueryValue>,
1150    digits: &mut Vec<u8>,
1151    lob_decode_mode: LobDecodeMode,
1152) -> Result<ColumnSlot<'buf>> {
1153    // Park an owned QueryValue in the arena and return the deferred slot. Used
1154    // for the cold / non-borrowable variants so the hot grid stays borrowed.
1155    fn park(owned_arena: &mut Vec<QueryValue>, value: Option<QueryValue>) -> ColumnSlot<'static> {
1156        match value {
1157            None => ColumnSlot::Null,
1158            Some(value) => {
1159                owned_arena.push(value);
1160                ColumnSlot::Owned(owned_arena.len() - 1)
1161            }
1162        }
1163    }
1164
1165    if metadata.buffer_size == 0
1166        && !matches!(
1167            metadata.ora_type_num,
1168            ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW | ORA_TYPE_NUM_UROWID
1169        )
1170    {
1171        return Ok(ColumnSlot::Null);
1172    }
1173    match metadata.ora_type_num {
1174        ORA_TYPE_NUM_VARCHAR | ORA_TYPE_NUM_CHAR | ORA_TYPE_NUM_LONG => {
1175            match reader.read_bytes_borrowed()? {
1176                BorrowedBytes::Null => Ok(ColumnSlot::Null),
1177                // Borrow the wire bytes directly when they are valid UTF-8 and
1178                // not the UTF-16 NCHAR form (which needs re-encoding). Zero copy.
1179                BorrowedBytes::Slice(slice) if metadata.csfrm != CS_FORM_NCHAR => {
1180                    match validate_utf8(slice) {
1181                        Ok(text) => Ok(ColumnSlot::Wire(QueryValueRef::Text(text))),
1182                        Err(_) => Ok(park(
1183                            owned_arena,
1184                            Some(QueryValue::TextRaw {
1185                                bytes: slice.to_vec(),
1186                                csfrm: metadata.csfrm,
1187                            }),
1188                        )),
1189                    }
1190                }
1191                // NCHAR (UTF-16) or chunked long text: fall back to the owned
1192                // decode, which re-encodes to UTF-8 / reassembles chunks.
1193                other => {
1194                    let bytes = other.into_vec();
1195                    let value = match decode_text_value(&bytes, metadata.csfrm) {
1196                        Ok(text) => QueryValue::Text(text),
1197                        Err(ProtocolError::TtcDecode(_)) => QueryValue::TextRaw {
1198                            bytes,
1199                            csfrm: metadata.csfrm,
1200                        },
1201                        Err(err) => return Err(err),
1202                    };
1203                    Ok(park(owned_arena, Some(value)))
1204                }
1205            }
1206        }
1207        ORA_TYPE_NUM_RAW | ORA_TYPE_NUM_LONG_RAW => match reader.read_bytes_borrowed()? {
1208            BorrowedBytes::Null => Ok(ColumnSlot::Null),
1209            BorrowedBytes::Slice(slice) => Ok(ColumnSlot::Wire(QueryValueRef::Raw(slice))),
1210            BorrowedBytes::Chunked(bytes) => Ok(park(owned_arena, Some(QueryValue::Raw(bytes)))),
1211        },
1212        ORA_TYPE_NUM_NUMBER | ORA_TYPE_NUM_BINARY_INTEGER => {
1213            // The wire NUMBER is binary; its canonical decimal text is
1214            // synthesized, so it cannot be borrowed from the buffer. We
1215            // synthesize it *directly* into the per-row number arena (reusing the
1216            // `digits` scratch), borrowing from the arena — zero per-cell heap
1217            // allocation for the common in-range NUMBER.
1218            with_small_bytes(reader, |bytes| match bytes {
1219                None => Ok(ColumnSlot::Null),
1220                Some(bytes) => {
1221                    let start = number_arena.len();
1222                    let is_integer = decode_number_text_into(bytes, digits, number_arena)?;
1223                    Ok(ColumnSlot::Number {
1224                        range: start..number_arena.len(),
1225                        is_integer,
1226                    })
1227                }
1228            })
1229        }
1230        ORA_TYPE_NUM_BOOLEAN => with_small_bytes(reader, |bytes| match bytes {
1231            None => Ok(ColumnSlot::Null),
1232            Some(bytes) => Ok(ColumnSlot::Wire(QueryValueRef::Boolean(matches!(
1233                bytes.last(),
1234                Some(&1)
1235            )))),
1236        }),
1237        ORA_TYPE_NUM_INTERVAL_DS => with_small_bytes(reader, |bytes| match bytes {
1238            None => Ok(ColumnSlot::Null),
1239            Some(bytes) => match decode_interval_ds(bytes)? {
1240                QueryValue::IntervalDS {
1241                    days,
1242                    hours,
1243                    minutes,
1244                    seconds,
1245                    fseconds,
1246                } => Ok(ColumnSlot::Wire(QueryValueRef::IntervalDS {
1247                    days,
1248                    hours,
1249                    minutes,
1250                    seconds,
1251                    fseconds,
1252                })),
1253                other => Ok(park(owned_arena, Some(other))),
1254            },
1255        }),
1256        ORA_TYPE_NUM_INTERVAL_YM => with_small_bytes(reader, |bytes| match bytes {
1257            None => Ok(ColumnSlot::Null),
1258            Some(bytes) => match decode_interval_ym(bytes)? {
1259                QueryValue::IntervalYM { years, months } => {
1260                    Ok(ColumnSlot::Wire(QueryValueRef::IntervalYM {
1261                        years,
1262                        months,
1263                    }))
1264                }
1265                other => Ok(park(owned_arena, Some(other))),
1266            },
1267        }),
1268        ORA_TYPE_NUM_DATE
1269        | ORA_TYPE_NUM_TIMESTAMP
1270        | ORA_TYPE_NUM_TIMESTAMP_LTZ
1271        | ORA_TYPE_NUM_TIMESTAMP_TZ => with_small_bytes(reader, |bytes| match bytes {
1272            None => Ok(ColumnSlot::Null),
1273            Some(bytes) => match decode_datetime_value(bytes)? {
1274                QueryValue::DateTime {
1275                    year,
1276                    month,
1277                    day,
1278                    hour,
1279                    minute,
1280                    second,
1281                    nanosecond,
1282                } => Ok(ColumnSlot::Wire(QueryValueRef::DateTime {
1283                    year,
1284                    month,
1285                    day,
1286                    hour,
1287                    minute,
1288                    second,
1289                    nanosecond,
1290                })),
1291                other => Ok(park(owned_arena, Some(other))),
1292            },
1293        }),
1294        // Everything else (Rowid, BinaryDouble/Float, Clob/Blob/Bfile, Vector,
1295        // Json, Cursor, Object, UROWID) goes through the owned decode and is
1296        // parked in the owned arena. These are the cold / non-borrowable cases.
1297        _ => {
1298            let value = parse_column_value_with_lob_mode(reader, metadata, lob_decode_mode)?;
1299            Ok(park(owned_arena, value))
1300        }
1301    }
1302}
1303
1304/// Read one TTC byte field and hand the body to `f` as a borrowed `&[u8]`
1305/// without allocating in the common contiguous case. `None` is SQL NULL. The
1306/// rare chunked long form is reassembled into a temporary `Vec` (these small
1307/// fixed-size scalar types — number/boolean/interval/datetime — are never sent
1308/// chunked in practice, so this fallback is effectively dead weight).
1309fn with_small_bytes<'buf, T>(
1310    reader: &mut TtcReader<'buf>,
1311    f: impl FnOnce(Option<&[u8]>) -> Result<T>,
1312) -> Result<T> {
1313    match reader.read_bytes_borrowed()? {
1314        BorrowedBytes::Null => f(None),
1315        BorrowedBytes::Slice(slice) => f(Some(slice)),
1316        BorrowedBytes::Chunked(owned) => f(Some(&owned)),
1317    }
1318}
1319
1320impl BorrowedBytes<'_> {
1321    /// Reassemble into an owned `Vec` (zero-copy `Slice` still copies; `Chunked`
1322    /// reuses its already-owned `Vec`). Used by the non-borrowable text fallback.
1323    fn into_vec(self) -> Vec<u8> {
1324        match self {
1325            BorrowedBytes::Null => Vec::new(),
1326            BorrowedBytes::Slice(slice) => slice.to_vec(),
1327            BorrowedBytes::Chunked(owned) => owned,
1328        }
1329    }
1330}
1331
1332/// A decoded fetch batch that **owns** the wire response buffer and column
1333/// metadata, and yields rows of borrowed [`QueryValueRef`] that point straight
1334/// into that buffer. This is the zero-copy fetch fast path: the common scalar
1335/// grid is decoded with no per-cell allocation.
1336///
1337/// ## Soundness
1338///
1339/// The buffer is owned by the batch and outlives every borrowed row: rows are
1340/// only ever surfaced *inside* the [`for_each_row_ref`](Self::for_each_row_ref)
1341/// callback, whose `&[QueryValueRef]` argument cannot escape (its lifetime is
1342/// bound to the call). The borrow checker therefore guarantees no
1343/// `QueryValueRef` can dangle — there is no self-referential struct and no
1344/// `unsafe`. `Number` text and the cold values borrow per-row arenas that are
1345/// fully built (pass 1) before any reference into them is taken (pass 2), so an
1346/// arena is never grown while borrowed.
1347#[derive(Clone, Debug)]
1348pub struct BorrowedRowBatch {
1349    buffer: Vec<u8>,
1350    columns: Vec<ColumnMetadata>,
1351    /// Byte offset into `buffer` where each row's column values begin.
1352    row_starts: Vec<usize>,
1353    /// Per-row duplicate-column bit vector (server row-compression). `None` (or
1354    /// an absent entry) means every column is present on the wire for that row.
1355    /// A zero bit marks a duplicate column whose value repeats the previous
1356    /// row's value and carries no wire bytes (reference `bit_vector`).
1357    row_bit_vectors: Vec<Option<Vec<u8>>>,
1358    /// Whether this batch carried `LONG`/`LONG RAW` status trailers after each
1359    /// such column (the fetch path sets this; the plain execute path does not).
1360    fetch_long_status: bool,
1361    lob_decode_mode: LobDecodeMode,
1362    /// The caller's previous (owned) row, used to resolve duplicate columns in
1363    /// the *first* compressed row of the batch (whose duplicates repeat the row
1364    /// that ended the prior page). `None` outside the compressed-fetch case.
1365    previous_row_seed: Option<Vec<Option<QueryValue>>>,
1366}
1367
1368impl BorrowedRowBatch {
1369    /// Construct a batch from an owned wire `buffer`, the `columns` describing
1370    /// each cell, and the per-row start offsets into `buffer`. Use this for
1371    /// batches with no duplicate-column compression and no LONG trailers (the
1372    /// common synthetic / test case); the framing-aware
1373    /// [`parse_query_response_borrowed`] builds the full form.
1374    pub fn new(buffer: Vec<u8>, columns: Vec<ColumnMetadata>, row_starts: Vec<usize>) -> Self {
1375        Self {
1376            buffer,
1377            columns,
1378            row_starts,
1379            row_bit_vectors: Vec::new(),
1380            fetch_long_status: false,
1381            lob_decode_mode: LobDecodeMode::PlainLocator,
1382            previous_row_seed: None,
1383        }
1384    }
1385
1386    /// Number of rows in the batch.
1387    pub fn row_count(&self) -> usize {
1388        self.row_starts.len()
1389    }
1390
1391    /// The columns describing each cell.
1392    pub fn columns(&self) -> &[ColumnMetadata] {
1393        &self.columns
1394    }
1395
1396    /// The address range of the owned buffer, for tests asserting that borrowed
1397    /// scalar cells truly point into it (zero-copy).
1398    #[cfg(test)]
1399    pub fn buffer_ptr_range(&self) -> core::ops::Range<usize> {
1400        let start = self.buffer.as_ptr() as usize;
1401        start..start + self.buffer.len()
1402    }
1403
1404    /// Decode each row and invoke `callback` with the row's borrowed cells. The
1405    /// `&[Option<QueryValueRef>]` slice borrows the batch buffer and per-row
1406    /// arenas; it is valid only for the duration of the call (it cannot escape).
1407    /// `None` cells are SQL NULL. Returns the first decode/callback error.
1408    ///
1409    /// Generic over the callback's error type `E` (any error a decode failure
1410    /// can convert into, e.g. the driver crate's own error) so callers are not
1411    /// forced through [`ProtocolError`]; a decode failure is surfaced via
1412    /// `E: From<ProtocolError>`.
1413    pub fn for_each_row_ref<F, E>(&self, mut callback: F) -> std::result::Result<(), E>
1414    where
1415        F: FnMut(&[Option<QueryValueRef<'_>>]) -> std::result::Result<(), E>,
1416        E: From<ProtocolError>,
1417    {
1418        // The two arenas are reused across rows (cleared, not reallocated). They
1419        // are mutated only in pass 1; pass 2 borrows them immutably and that
1420        // borrow is confined to a single loop iteration (it ends before the next
1421        // iteration's `clear()`), which is what keeps the borrow checker — and
1422        // soundness — happy.
1423        let mut number_arena = String::new();
1424        let mut owned_arena: Vec<QueryValue> = Vec::new();
1425        // Reusable scratch: `digits` for the NUMBER decode, `slots` for pass 1,
1426        // `row` for the borrowed cells handed to the callback. All cleared and
1427        // reused across rows so the steady-state per-row decode allocates only
1428        // when an arena/scratch genuinely grows (amortized). `slots`/`row`/
1429        // `digits` borrow nothing across iterations beyond the stable buffer.
1430        let mut digits: Vec<u8> = Vec::new();
1431        let mut slots: Vec<Option<ColumnSlot<'_>>> = Vec::with_capacity(self.columns.len());
1432        // Owned snapshot of the previous row, used only to resolve duplicate
1433        // (bit-vector-compressed) columns, which carry no wire bytes. Empty when
1434        // the batch has no bit vectors (the common case), so it costs nothing.
1435        // Seeded from the caller's prior-page row for the first compressed row.
1436        let mut previous_owned: Vec<Option<QueryValue>> =
1437            self.previous_row_seed.clone().unwrap_or_default();
1438        let uses_bit_vectors = !self.row_bit_vectors.is_empty();
1439
1440        for (row_index, &start) in self.row_starts.iter().enumerate() {
1441            number_arena.clear();
1442            owned_arena.clear();
1443            slots.clear();
1444            let bit_vector = self
1445                .row_bit_vectors
1446                .get(row_index)
1447                .and_then(|bv| bv.as_deref());
1448
1449            // Pass 1: decode all columns, growing the per-row arenas. `slots`
1450            // borrows only the buffer (the deferred Number/Owned slots hold a
1451            // range/index into the arenas, never a borrow of them). Duplicate
1452            // columns carry no wire bytes — their owned previous value is parked
1453            // in the owned arena.
1454            let mut reader = TtcReader::new(&self.buffer[start..]);
1455            for (index, metadata) in self.columns.iter().enumerate() {
1456                if is_duplicate_column(bit_vector, index) {
1457                    let previous = previous_owned.get(index).and_then(Option::as_ref);
1458                    match previous {
1459                        None => slots.push(None),
1460                        Some(value) => {
1461                            owned_arena.push(value.clone());
1462                            slots.push(Some(ColumnSlot::Owned(owned_arena.len() - 1)));
1463                        }
1464                    }
1465                    continue;
1466                }
1467                let slot = parse_column_slot(
1468                    &mut reader,
1469                    metadata,
1470                    &mut number_arena,
1471                    &mut owned_arena,
1472                    &mut digits,
1473                    self.lob_decode_mode,
1474                )?;
1475                slots.push(match slot {
1476                    ColumnSlot::Null => None,
1477                    other => Some(other),
1478                });
1479                if self.fetch_long_status
1480                    && matches!(
1481                        metadata.ora_type_num,
1482                        ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW
1483                    )
1484                {
1485                    let _null_indicator = reader.read_sb4()?;
1486                    let _return_code = reader.read_ub4()?;
1487                }
1488            }
1489
1490            // Pass 2: arenas are now frozen — resolve deferred slots into
1491            // borrowed refs. No arena is mutated here, so the borrows are sound.
1492            // `row` is allocated per row: it carries the per-row arena lifetime,
1493            // which (unlike `slots`/`digits`, that borrow only the stable buffer)
1494            // cannot be reused across an arena `clear()`. This is the single
1495            // remaining per-row allocation, versus the owned path's per-row Vec
1496            // *plus* a String per scalar cell.
1497            let row: Vec<Option<QueryValueRef<'_>>> = slots
1498                .iter()
1499                .map(|slot| {
1500                    slot.as_ref().map(|slot| match *slot {
1501                        ColumnSlot::Null => unreachable!("Null slots are stored as None"),
1502                        ColumnSlot::Wire(value) => value,
1503                        ColumnSlot::Number {
1504                            ref range,
1505                            is_integer,
1506                        } => QueryValueRef::Number {
1507                            text: &number_arena[range.clone()],
1508                            is_integer,
1509                        },
1510                        ColumnSlot::Owned(index) => QueryValueRef::Owned(&owned_arena[index]),
1511                    })
1512                })
1513                .collect();
1514
1515            callback(&row)?;
1516
1517            // Snapshot the just-emitted row as owned values for the next row's
1518            // duplicate-column resolution — but only when the batch actually uses
1519            // bit-vector compression, so the zero-copy common path pays nothing.
1520            if uses_bit_vectors {
1521                previous_owned.clear();
1522                previous_owned.extend(row.iter().map(|cell| cell.map(|v| v.to_owned_value())));
1523            }
1524        }
1525        Ok(())
1526    }
1527}
1528
1529/// The borrowed counterpart of a fetched [`QueryResult`]: a [`BorrowedRowBatch`]
1530/// of zero-copy rows plus the response-level fields a caller needs to page and
1531/// finalize the cursor. Produced by [`parse_query_response_borrowed`].
1532#[derive(Clone, Debug)]
1533pub struct BorrowedFetchResult {
1534    /// The decoded rows, borrowing the response buffer.
1535    pub batch: BorrowedRowBatch,
1536    /// Whether the server reports more rows for this cursor.
1537    pub more_rows: bool,
1538    /// Server cursor id (for paging / release).
1539    pub cursor_id: u32,
1540    /// Total affected/processed row count from the end-of-call error message.
1541    pub row_count: u64,
1542}
1543
1544/// Walk a fetch/query response payload and produce a [`BorrowedFetchResult`]
1545/// whose rows borrow `payload` (the caller must keep the owned buffer alive —
1546/// [`BorrowedRowBatch`] owns it). This is the zero-copy companion to
1547/// [`parse_fetch_response_with_context`]: it walks the exact same message
1548/// framing (DESCRIBE_INFO / ROW_HEADER / BIT_VECTOR / ROW_DATA / ERROR /
1549/// END_OF_RESPONSE) but, instead of materializing owned rows, records each
1550/// row's byte offset and bit vector so [`BorrowedRowBatch::for_each_row_ref`]
1551/// can decode them lazily and without per-cell allocation.
1552///
1553/// Scope: the plain query-row case (the fetch path). Out-bind / DML-returning
1554/// rows are not part of a fetch response and are left to the owned path.
1555pub fn parse_query_response_borrowed(
1556    payload: &[u8],
1557    capabilities: ClientCapabilities,
1558    columns: &[ColumnMetadata],
1559    previous_row: Option<&[Option<QueryValue>]>,
1560) -> Result<BorrowedFetchResult> {
1561    parse_query_response_borrowed_with_limits(
1562        payload,
1563        capabilities,
1564        columns,
1565        previous_row,
1566        ProtocolLimits::DEFAULT,
1567    )
1568}
1569
1570pub fn parse_query_response_borrowed_with_limits(
1571    payload: &[u8],
1572    capabilities: ClientCapabilities,
1573    columns: &[ColumnMetadata],
1574    previous_row: Option<&[Option<QueryValue>]>,
1575    limits: ProtocolLimits,
1576) -> Result<BorrowedFetchResult> {
1577    parse_query_response_borrowed_with_lob_mode_and_limits(
1578        payload,
1579        capabilities,
1580        columns,
1581        previous_row,
1582        LobDecodeMode::PlainLocator,
1583        limits,
1584    )
1585}
1586
1587pub fn parse_define_fetch_response_borrowed_with_limits(
1588    payload: &[u8],
1589    capabilities: ClientCapabilities,
1590    columns: &[ColumnMetadata],
1591    previous_row: Option<&[Option<QueryValue>]>,
1592    limits: ProtocolLimits,
1593) -> Result<BorrowedFetchResult> {
1594    parse_query_response_borrowed_with_lob_mode_and_limits(
1595        payload,
1596        capabilities,
1597        columns,
1598        previous_row,
1599        LobDecodeMode::DefineMetadata,
1600        limits,
1601    )
1602}
1603
1604fn parse_query_response_borrowed_with_lob_mode_and_limits(
1605    payload: &[u8],
1606    capabilities: ClientCapabilities,
1607    columns: &[ColumnMetadata],
1608    previous_row: Option<&[Option<QueryValue>]>,
1609    lob_decode_mode: LobDecodeMode,
1610    limits: ProtocolLimits,
1611) -> Result<BorrowedFetchResult> {
1612    let mut reader = TtcReader::with_limits(payload, limits)?;
1613    reader.limits().check_columns(columns.len())?;
1614    let mut result_columns = columns.to_vec();
1615    let mut more_rows = true;
1616    let mut cursor_id = 0u32;
1617    let mut row_count = 0u64;
1618    let mut row_starts: Vec<usize> = Vec::new();
1619    let mut row_bit_vectors: Vec<Option<Vec<u8>>> = Vec::new();
1620    let mut any_bit_vector = false;
1621    let mut pending_bit_vector: Option<Vec<u8>> = None;
1622    // The fetch path always consumes LONG/LONG RAW status trailers.
1623    let fetch_long_status = true;
1624
1625    while reader.remaining() > 0 {
1626        let message_type = reader.read_u8()?;
1627        match message_type {
1628            0 => {}
1629            TNS_MSG_TYPE_DESCRIBE_INFO => {
1630                let _describe_name = reader.read_bytes()?;
1631                let previous = std::mem::take(&mut result_columns);
1632                let mut described = QueryResult::default();
1633                parse_describe_info(&mut reader, capabilities, &mut described)?;
1634                result_columns = described.columns;
1635                for (index, column) in result_columns.iter_mut().enumerate() {
1636                    if let Some(prev) = previous.get(index) {
1637                        adjust_refetch_metadata(prev, column);
1638                    }
1639                }
1640            }
1641            TNS_MSG_TYPE_ROW_HEADER => {
1642                pending_bit_vector = parse_row_header(&mut reader)?;
1643            }
1644            TNS_MSG_TYPE_BIT_VECTOR => {
1645                pending_bit_vector = Some(parse_bit_vector(&mut reader, result_columns.len())?);
1646            }
1647            TNS_MSG_TYPE_ROW_DATA => {
1648                // Record where this row's column values begin, then advance the
1649                // reader past the row (skipping, not materializing).
1650                reader.limits().check_batch_rows(row_starts.len() + 1)?;
1651                row_starts.push(reader.position());
1652                let bit_vector = pending_bit_vector.take();
1653                any_bit_vector |= bit_vector.is_some();
1654                row_bit_vectors.push(bit_vector.clone());
1655                skip_row_data(
1656                    &mut reader,
1657                    &result_columns,
1658                    bit_vector.as_deref(),
1659                    fetch_long_status,
1660                    lob_decode_mode,
1661                )?;
1662            }
1663            TNS_MSG_TYPE_PARAMETER => {
1664                let _params = parse_query_return_parameters(&mut reader, false)?;
1665            }
1666            TNS_MSG_TYPE_STATUS => {
1667                let _call_status = reader.read_ub4()?;
1668                let _seq = reader.read_ub2()?;
1669            }
1670            TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
1671                let _ = skip_server_side_piggyback(&mut reader)?;
1672            }
1673            TNS_MSG_TYPE_FLUSH_OUT_BINDS | TNS_MSG_TYPE_END_OF_RESPONSE => break,
1674            TNS_MSG_TYPE_TOKEN => {
1675                let _token = reader.read_ub8()?;
1676            }
1677            TNS_MSG_TYPE_IMPLICIT_RESULTSET => {
1678                // Mirror the owned parser's framing walk so the reader advances
1679                // past the implicit-resultset block identically (the borrowed
1680                // fetch API does not surface child cursors, but it must still
1681                // consume the bytes). reference messages/base.pyx
1682                // `_process_implicit_result`.
1683                let num_results = reader.read_ub4()?;
1684                reader
1685                    .limits()
1686                    .check_length_prefixed_elements(num_results as usize)?;
1687                for _ in 0..num_results {
1688                    let num_bytes = reader.read_u8()?;
1689                    reader.skip(usize::from(num_bytes))?;
1690                    let mut child = QueryResult::default();
1691                    parse_describe_info(&mut reader, capabilities, &mut child)?;
1692                    let _child_cursor_id = reader.read_ub2()?;
1693                }
1694            }
1695            TNS_MSG_TYPE_ERROR => {
1696                let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
1697                if info.cursor_id != 0 {
1698                    cursor_id = u32::from(info.cursor_id);
1699                }
1700                row_count = info.row_count;
1701                if info.number == TNS_ERR_NO_DATA_FOUND && !result_columns.is_empty() {
1702                    more_rows = false;
1703                } else if info.number != 0 && info.number != TNS_ERR_ARRAY_DML_ERRORS {
1704                    return Err(ProtocolError::ServerErrorInfo(Box::new(
1705                        info.into_details(),
1706                    )));
1707                }
1708            }
1709            _ => {
1710                let position = reader.position().saturating_sub(1);
1711                if let Some(message) =
1712                    find_embedded_server_error(payload, capabilities.ttc_field_version, position)
1713                {
1714                    return Err(ProtocolError::ServerError(message));
1715                }
1716                return Err(ProtocolError::UnknownMessageType {
1717                    message_type,
1718                    position,
1719                });
1720            }
1721        }
1722    }
1723
1724    // If the batch never used duplicate-column compression, drop the per-row
1725    // bit-vector vector so iteration takes the zero-copy fast path (no owned
1726    // previous-row snapshotting).
1727    if !any_bit_vector {
1728        row_bit_vectors.clear();
1729    }
1730
1731    let batch = BorrowedRowBatch {
1732        buffer: payload.to_vec(),
1733        columns: result_columns,
1734        row_starts,
1735        row_bit_vectors,
1736        fetch_long_status,
1737        lob_decode_mode,
1738        // Seed the first compressed row's duplicate resolution from the caller's
1739        // prior-page row (only consulted when the batch uses bit vectors).
1740        previous_row_seed: any_bit_vector.then(|| {
1741            previous_row
1742                .map(<[Option<QueryValue>]>::to_vec)
1743                .unwrap_or_default()
1744        }),
1745    };
1746
1747    Ok(BorrowedFetchResult {
1748        batch,
1749        more_rows,
1750        cursor_id,
1751        row_count,
1752    })
1753}
1754
1755/// Advance `reader` past one ROW_DATA row **without materializing owned values**
1756/// — this is the offset-capture pass, so it must allocate nothing for the hot
1757/// scalar grid. Mirrors [`parse_row_data`]'s consumption exactly: duplicate
1758/// (bit-vector) columns carry no wire bytes and are skipped; the hot byte-field
1759/// scalar types are skipped with a zero-allocation length-prefixed skip; the
1760/// rare cold types (LOB / Vector / JSON / Cursor / Object / ROWID), whose wire
1761/// framing is non-trivial, fall back to [`parse_column_value`] (which may
1762/// allocate, but those are uncommon). `LONG`/`LONG RAW` status trailers are
1763/// consumed when `fetch_long_status`.
1764fn skip_row_data(
1765    reader: &mut TtcReader<'_>,
1766    columns: &[ColumnMetadata],
1767    bit_vector: Option<&[u8]>,
1768    fetch_long_status: bool,
1769    lob_decode_mode: LobDecodeMode,
1770) -> Result<()> {
1771    for (index, metadata) in columns.iter().enumerate() {
1772        if is_duplicate_column(bit_vector, index) {
1773            continue;
1774        }
1775        let consumed_byte_field = metadata.buffer_size != 0
1776            && matches!(
1777                metadata.ora_type_num,
1778                ORA_TYPE_NUM_VARCHAR
1779                    | ORA_TYPE_NUM_CHAR
1780                    | ORA_TYPE_NUM_LONG
1781                    | ORA_TYPE_NUM_RAW
1782                    | ORA_TYPE_NUM_LONG_RAW
1783                    | ORA_TYPE_NUM_NUMBER
1784                    | ORA_TYPE_NUM_BINARY_INTEGER
1785                    | ORA_TYPE_NUM_BINARY_DOUBLE
1786                    | ORA_TYPE_NUM_BINARY_FLOAT
1787                    | ORA_TYPE_NUM_BOOLEAN
1788                    | ORA_TYPE_NUM_INTERVAL_DS
1789                    | ORA_TYPE_NUM_INTERVAL_YM
1790                    | ORA_TYPE_NUM_DATE
1791                    | ORA_TYPE_NUM_TIMESTAMP
1792                    | ORA_TYPE_NUM_TIMESTAMP_LTZ
1793                    | ORA_TYPE_NUM_TIMESTAMP_TZ
1794            );
1795        if consumed_byte_field {
1796            reader.skip_bytes_field()?;
1797        } else {
1798            // Cold / non-byte-field type, or a zero-buffer-size column: defer to
1799            // the full owned decode purely to advance the reader correctly.
1800            let _ = parse_column_value_with_lob_mode(reader, metadata, lob_decode_mode)?;
1801        }
1802        if fetch_long_status
1803            && matches!(
1804                metadata.ora_type_num,
1805                ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW
1806            )
1807        {
1808            let _null_indicator = reader.read_sb4()?;
1809            let _return_code = reader.read_ub4()?;
1810        }
1811    }
1812    Ok(())
1813}
1814
1815pub(crate) fn encode_rowid_component(mut value: u32, size: usize, output: &mut String) {
1816    let mut encoded = vec![b'A'; size];
1817    for index in 0..size {
1818        let alphabet_index = usize::try_from(value & 0x3f).unwrap_or(0);
1819        encoded[size - index - 1] = TNS_BASE64_ALPHABET[alphabet_index];
1820        value >>= 6;
1821    }
1822    output.extend(encoded.into_iter().map(char::from));
1823}
1824
1825pub(crate) fn encode_physical_rowid(
1826    rba: u32,
1827    partition_id: u16,
1828    block_num: u32,
1829    slot_num: u16,
1830) -> String {
1831    let mut output = String::with_capacity(ORA_TYPE_SIZE_ROWID as usize);
1832    encode_rowid_component(rba, 6, &mut output);
1833    encode_rowid_component(u32::from(partition_id), 3, &mut output);
1834    encode_rowid_component(block_num, 6, &mut output);
1835    encode_rowid_component(u32::from(slot_num), 3, &mut output);
1836    output
1837}
1838
1839pub(crate) fn parse_rowid_value(reader: &mut TtcReader<'_>) -> Result<Option<String>> {
1840    let len = reader.read_u8()?;
1841    if len == 0 || len == crate::wire::TNS_NULL_LENGTH_INDICATOR {
1842        return Ok(None);
1843    }
1844    let rba = reader.read_ub4()?;
1845    let partition_id = reader.read_ub2()?;
1846    reader.skip(1)?;
1847    let block_num = reader.read_ub4()?;
1848    let slot_num = reader.read_ub2()?;
1849    Ok(Some(encode_physical_rowid(
1850        rba,
1851        partition_id,
1852        block_num,
1853        slot_num,
1854    )))
1855}
1856
1857pub(crate) fn encode_logical_urowid(bytes: &[u8]) -> String {
1858    let mut input_offset = 1;
1859    let mut input_len = bytes.len().saturating_sub(1);
1860    let mut output = String::with_capacity((bytes.len() / 3) * 4 + 4);
1861    output.push('*');
1862    while input_len > 0 {
1863        let mut pos = bytes[input_offset] >> 2;
1864        output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1865
1866        pos = (bytes[input_offset] & 0x03) << 4;
1867        if input_len == 1 {
1868            output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1869            break;
1870        }
1871        input_offset += 1;
1872        pos |= (bytes[input_offset] & 0xf0) >> 4;
1873        output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1874
1875        pos = (bytes[input_offset] & 0x0f) << 2;
1876        if input_len == 2 {
1877            output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1878            break;
1879        }
1880        input_offset += 1;
1881        pos |= (bytes[input_offset] & 0xc0) >> 6;
1882        output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1883
1884        pos = bytes[input_offset] & 0x3f;
1885        output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1886        input_offset += 1;
1887        input_len -= 3;
1888    }
1889    output
1890}
1891
1892pub(crate) fn parse_urowid_value(reader: &mut TtcReader<'_>) -> Result<Option<String>> {
1893    if reader.read_bytes()?.is_none() {
1894        return Ok(None);
1895    }
1896    let Some(bytes) = reader.read_bytes()? else {
1897        return Ok(None);
1898    };
1899    if bytes.len() < 13 {
1900        return Err(ProtocolError::TtcDecode("encoded UROWID too short"));
1901    }
1902    if bytes[0] == 1 {
1903        let rba = u32::from_be_bytes([bytes[1], bytes[2], bytes[3], bytes[4]]);
1904        let partition_id = u16::from_be_bytes([bytes[5], bytes[6]]);
1905        let block_num = u32::from_be_bytes([bytes[7], bytes[8], bytes[9], bytes[10]]);
1906        let slot_num = u16::from_be_bytes([bytes[11], bytes[12]]);
1907        Ok(Some(encode_physical_rowid(
1908            rba,
1909            partition_id,
1910            block_num,
1911            slot_num,
1912        )))
1913    } else {
1914        Ok(Some(encode_logical_urowid(&bytes)))
1915    }
1916}
1917
1918pub(crate) fn parse_lob_value(
1919    reader: &mut TtcReader<'_>,
1920    metadata: &ColumnMetadata,
1921    lob_decode_mode: LobDecodeMode,
1922) -> Result<Option<QueryValue>> {
1923    let num_bytes = reader.read_ub4()?;
1924    reader.limits().check_response_bytes(num_bytes as usize)?;
1925    if num_bytes == 0 {
1926        return Ok(None);
1927    }
1928    let (size, chunk_size) = if matches!(
1929        (lob_decode_mode, metadata.ora_type_num),
1930        (_, ORA_TYPE_NUM_BFILE) | (LobDecodeMode::PlainLocator, _)
1931    ) {
1932        (0, 0)
1933    } else {
1934        (reader.read_ub8()?, reader.read_ub4()?)
1935    };
1936    let Some(locator) = reader.read_bytes()? else {
1937        return Ok(None);
1938    };
1939    Ok(Some(QueryValue::Lob(Box::new(LobValue {
1940        ora_type_num: metadata.ora_type_num,
1941        csfrm: metadata.csfrm,
1942        locator,
1943        size,
1944        chunk_size,
1945    }))))
1946}
1947
1948#[cfg(test)]
1949mod lob_fetch_shape_tests {
1950    use super::*;
1951
1952    fn clob_column() -> ColumnMetadata {
1953        ColumnMetadata {
1954            name: "BODY".into(),
1955            ora_type_num: ORA_TYPE_NUM_CLOB,
1956            csfrm: CS_FORM_IMPLICIT,
1957            precision: 0,
1958            scale: 0,
1959            buffer_size: 4000,
1960            max_size: 4000,
1961            nulls_allowed: true,
1962            is_json: false,
1963            is_oson: false,
1964            object_schema: None,
1965            object_type_name: None,
1966            is_array: false,
1967            vector_dimensions: None,
1968            vector_format: 0,
1969            vector_flags: 0,
1970            domain_schema: None,
1971            domain_name: None,
1972            annotations: None,
1973        }
1974    }
1975
1976    fn blob_column() -> ColumnMetadata {
1977        ColumnMetadata {
1978            name: "IMAGE".into(),
1979            ora_type_num: ORA_TYPE_NUM_BLOB,
1980            csfrm: 0,
1981            precision: 0,
1982            scale: 0,
1983            buffer_size: 4000,
1984            max_size: 4000,
1985            nulls_allowed: true,
1986            is_json: false,
1987            is_oson: false,
1988            object_schema: None,
1989            object_type_name: None,
1990            is_array: false,
1991            vector_dimensions: None,
1992            vector_format: 0,
1993            vector_flags: 0,
1994            domain_schema: None,
1995            domain_name: None,
1996            annotations: None,
1997        }
1998    }
1999
2000    fn lob_row_payload(locator: &[u8], metadata: Option<(u64, u32)>) -> Vec<u8> {
2001        let mut writer = TtcWriter::new();
2002        writer.write_u8(TNS_MSG_TYPE_ROW_DATA);
2003        writer.write_ub4(u32::try_from(locator.len()).expect("locator length fits ub4"));
2004        if let Some((size, chunk_size)) = metadata {
2005            writer.write_ub8(size);
2006            writer.write_ub4(chunk_size);
2007        }
2008        writer
2009            .write_bytes_with_length(locator)
2010            .expect("synthetic locator length is encodable");
2011        writer.write_u8(TNS_MSG_TYPE_END_OF_RESPONSE);
2012        writer.into_bytes()
2013    }
2014
2015    fn write_define_lob_cell(writer: &mut TtcWriter, locator: &[u8], size: u64, chunk_size: u32) {
2016        writer.write_ub4(u32::try_from(locator.len()).expect("locator length fits ub4"));
2017        writer.write_ub8(size);
2018        writer.write_ub4(chunk_size);
2019        writer
2020            .write_bytes_with_length(locator)
2021            .expect("synthetic locator length is encodable");
2022    }
2023
2024    fn first_lob(result: &QueryResult) -> &LobValue {
2025        match &result.rows[0][0] {
2026            Some(QueryValue::Lob(lob)) => lob.as_ref(),
2027            other => panic!("expected LOB value, got {other:?}"),
2028        }
2029    }
2030
2031    #[test]
2032    fn plain_fetch_lob_locator_omits_size_and_chunk_fields() {
2033        let locator: Vec<u8> = (0u8..114).collect();
2034        let payload = lob_row_payload(&locator, None);
2035        let columns = [clob_column()];
2036
2037        let result = parse_fetch_response_with_context(
2038            &payload,
2039            ClientCapabilities::default(),
2040            &columns,
2041            None,
2042        )
2043        .expect("plain fetch CLOB locator should decode");
2044
2045        let lob = first_lob(&result);
2046        assert_eq!(lob.locator, locator);
2047        assert_eq!(lob.size, 0);
2048        assert_eq!(lob.chunk_size, 0);
2049    }
2050
2051    #[test]
2052    fn define_fetch_lob_locator_includes_size_and_chunk_fields() {
2053        let locator: Vec<u8> = (0u8..114).collect();
2054        let payload = lob_row_payload(&locator, Some((23, 8060)));
2055        let columns = [clob_column()];
2056
2057        let result = parse_define_fetch_response_with_context_and_limits(
2058            &payload,
2059            ClientCapabilities::default(),
2060            &columns,
2061            None,
2062            ProtocolLimits::DEFAULT,
2063        )
2064        .expect("define fetch CLOB locator should decode");
2065
2066        let lob = first_lob(&result);
2067        assert_eq!(lob.locator, locator);
2068        assert_eq!(lob.size, 23);
2069        assert_eq!(lob.chunk_size, 8060);
2070    }
2071
2072    #[test]
2073    fn borrowed_define_fetch_lob_page_matches_owned_decode() {
2074        let columns = [clob_column(), blob_column()];
2075        let clob_locator_a: Vec<u8> = (0u8..114).collect();
2076        let blob_locator_a: Vec<u8> = (128u8..242).collect();
2077        let clob_locator_b: Vec<u8> = (32u8..146).collect();
2078        let blob_locator_b: Vec<u8> = (64u8..178).collect();
2079        let mut writer = TtcWriter::new();
2080
2081        writer.write_u8(TNS_MSG_TYPE_ROW_DATA);
2082        write_define_lob_cell(&mut writer, &clob_locator_a, 23, 8060);
2083        write_define_lob_cell(&mut writer, &blob_locator_a, 48, 4096);
2084        writer.write_u8(TNS_MSG_TYPE_ROW_DATA);
2085        write_define_lob_cell(&mut writer, &clob_locator_b, 31, 8060);
2086        write_define_lob_cell(&mut writer, &blob_locator_b, 96, 4096);
2087        writer.write_u8(TNS_MSG_TYPE_END_OF_RESPONSE);
2088        let payload = writer.into_bytes();
2089
2090        let owned = parse_define_fetch_response_with_context_and_limits(
2091            &payload,
2092            ClientCapabilities::default(),
2093            &columns,
2094            None,
2095            ProtocolLimits::DEFAULT,
2096        )
2097        .expect("owned define fetch CLOB/BLOB page should decode");
2098        let borrowed = parse_define_fetch_response_borrowed_with_limits(
2099            &payload,
2100            ClientCapabilities::default(),
2101            &columns,
2102            None,
2103            ProtocolLimits::DEFAULT,
2104        )
2105        .expect("borrowed define fetch CLOB/BLOB page should decode");
2106
2107        let mut borrowed_rows: Vec<Vec<Option<QueryValue>>> = Vec::new();
2108        borrowed
2109            .batch
2110            .for_each_row_ref(|row| {
2111                borrowed_rows.push(
2112                    row.iter()
2113                        .map(|cell| cell.map(|value| value.to_owned_value()))
2114                        .collect(),
2115                );
2116                Ok::<(), ProtocolError>(())
2117            })
2118            .expect("borrowed define fetch CLOB/BLOB page should iterate");
2119
2120        assert_eq!(owned.rows.len(), 2, "owned decode sees both rows");
2121        assert_eq!(
2122            borrowed.batch.row_count(),
2123            2,
2124            "borrowed decode sees both rows"
2125        );
2126        assert_eq!(
2127            borrowed_rows, owned.rows,
2128            "borrowed DefineMetadata LOB decode must match owned decode"
2129        );
2130    }
2131}
2132
2133/// Reads a VECTOR value (reference `ReadBuffer.read_vector` in `packet.pyx`).
2134/// VECTOR is sent as a fully-prefetched LOB: the image data precedes the
2135/// (discarded) LOB locator.
2136pub(crate) fn parse_vector_value(reader: &mut TtcReader<'_>) -> Result<Option<QueryValue>> {
2137    let num_bytes = reader.read_ub4()?;
2138    reader.limits().check_response_bytes(num_bytes as usize)?;
2139    if num_bytes == 0 {
2140        return Ok(None);
2141    }
2142    reader.read_ub8()?; // size (unused)
2143    reader.read_ub4()?; // chunk size (unused)
2144    let Some(data) = reader.read_bytes()? else {
2145        return Ok(None);
2146    };
2147    reader.read_bytes()?; // LOB locator (unused)
2148    if data.is_empty() {
2149        return Ok(None);
2150    }
2151    let vector = crate::vector::decode_vector_with_limits(&data, reader.limits())?;
2152    Ok(Some(QueryValue::Vector(Box::new(vector))))
2153}
2154
2155/// Parses a native JSON (`DB_TYPE_JSON`) column value. Like VECTOR, OSON is sent
2156/// as a fully-prefetched LOB: `num_bytes`, `size`, `chunk_size`, the OSON image,
2157/// then a (discarded) LOB locator (reference packet.pyx `read_oson`).
2158pub(crate) fn parse_json_value(reader: &mut TtcReader<'_>) -> Result<Option<QueryValue>> {
2159    let num_bytes = reader.read_ub4()?;
2160    reader.limits().check_response_bytes(num_bytes as usize)?;
2161    if num_bytes == 0 {
2162        return Ok(None);
2163    }
2164    reader.read_ub8()?; // size (unused)
2165    reader.read_ub4()?; // chunk size (unused)
2166    let Some(data) = reader.read_bytes()? else {
2167        return Ok(None);
2168    };
2169    reader.read_bytes()?; // LOB locator (unused)
2170    if data.is_empty() {
2171        return Ok(None);
2172    }
2173    let value = crate::oson::decode_oson_with_limits(&data, reader.limits())?;
2174    Ok(Some(QueryValue::Json(Box::new(value))))
2175}
2176
2177pub(crate) fn parse_object_value(
2178    reader: &mut TtcReader<'_>,
2179    metadata: &ColumnMetadata,
2180) -> Result<Option<QueryValue>> {
2181    let _toid = reader.read_bytes_with_length()?;
2182    let _oid = reader.read_bytes_with_length()?;
2183    let _snapshot = reader.read_bytes_with_length()?;
2184    let _version = reader.read_ub2()?;
2185    let num_bytes = reader.read_ub4()?;
2186    reader.limits().check_response_bytes(num_bytes as usize)?;
2187    reader.skip(2)?;
2188    if num_bytes == 0 {
2189        return Ok(None);
2190    }
2191    let Some(packed_data) = reader.read_bytes()? else {
2192        return Ok(None);
2193    };
2194    Ok(Some(QueryValue::Object(Box::new(ObjectValue {
2195        schema: metadata.object_schema.clone(),
2196        type_name: metadata.object_type_name.clone(),
2197        packed_data,
2198    }))))
2199}
2200
2201pub(crate) fn parse_cursor_value(reader: &mut TtcReader<'_>) -> Result<QueryValue> {
2202    reader.skip(1)?;
2203    let mut result = QueryResult::default();
2204    parse_describe_info(reader, ClientCapabilities::default(), &mut result)?;
2205    let cursor_id = u32::from(reader.read_ub2()?);
2206    Ok(QueryValue::Cursor(Box::new(CursorValue {
2207        columns: result.columns,
2208        cursor_id,
2209    })))
2210}
2211
2212pub(crate) struct QueryReturnParameters {
2213    pub row_counts: Option<Vec<u64>>,
2214    /// CQN registered-query id extracted from the registration-info block
2215    /// (reference base.pyx:1300-1309); `None` when no block was present.
2216    pub query_id: Option<u64>,
2217}
2218
2219pub(crate) fn parse_query_return_parameters(
2220    reader: &mut TtcReader<'_>,
2221    arraydmlrowcounts: bool,
2222) -> Result<QueryReturnParameters> {
2223    let num_params = reader.read_ub2()?;
2224    for _ in 0..num_params {
2225        let _value = reader.read_ub4()?;
2226    }
2227    let num_bytes = reader.read_ub2()?;
2228    if num_bytes > 0 {
2229        reader.skip(usize::from(num_bytes))?;
2230    }
2231    let num_pairs = reader.read_ub2()?;
2232    skip_keyword_value_pairs(reader, num_pairs)?;
2233    // registration info block: the trailing 8 bytes (msb at -4, lsb at -8) are
2234    // the CQN query id when a registration id was sent (reference base.pyx).
2235    let num_bytes = usize::from(reader.read_ub2()?);
2236    let mut query_id = None;
2237    if num_bytes > 0 {
2238        let block = reader.read_raw(num_bytes)?;
2239        if num_bytes >= 8 {
2240            let msb = u32::from_be_bytes([
2241                block[num_bytes - 4],
2242                block[num_bytes - 3],
2243                block[num_bytes - 2],
2244                block[num_bytes - 1],
2245            ]);
2246            let lsb = u32::from_be_bytes([
2247                block[num_bytes - 8],
2248                block[num_bytes - 7],
2249                block[num_bytes - 6],
2250                block[num_bytes - 5],
2251            ]);
2252            query_id = Some((u64::from(msb) << 32) | u64::from(lsb));
2253        }
2254    }
2255    if arraydmlrowcounts {
2256        // reference messages/base.pyx `_process_return_parameters` tail
2257        let num_rows = reader.read_ub4()?;
2258        reader.limits().check_batch_rows(num_rows as usize)?;
2259        // Each ub8 row count consumes at least one byte, so cap the reservation
2260        // by the remaining payload size (BoundedReader).
2261        let mut row_counts: Vec<u64> =
2262            reader.with_capacity_limited(num_rows as usize, 1, ProtocolLimits::check_batch_rows)?;
2263        for _ in 0..num_rows {
2264            row_counts.push(reader.read_ub8()?);
2265        }
2266        return Ok(QueryReturnParameters {
2267            row_counts: Some(row_counts),
2268            query_id,
2269        });
2270    }
2271    Ok(QueryReturnParameters {
2272        row_counts: None,
2273        query_id,
2274    })
2275}
2276
2277#[cfg(test)]
2278mod return_parameter_tests {
2279    use super::*;
2280
2281    #[test]
2282    fn registration_info_block_extracts_query_id_from_lsb_msb_tail() {
2283        let mut writer = TtcWriter::new();
2284        writer.write_ub2(0); // num params
2285        writer.write_ub2(0); // parameter bytes
2286        writer.write_ub2(0); // keyword/value pairs
2287        writer.write_ub2(8); // registration-info block bytes
2288        writer.write_raw(&[
2289            0x55, 0x66, 0x77, 0x88, // lsb
2290            0x11, 0x22, 0x33, 0x44, // msb
2291        ]);
2292        let payload = writer.into_bytes();
2293        let mut reader = TtcReader::new(&payload);
2294
2295        let params = parse_query_return_parameters(&mut reader, false).expect("return parameters");
2296
2297        assert_eq!(params.query_id, Some(0x1122_3344_5566_7788));
2298        assert_eq!(params.row_counts, None);
2299    }
2300}
2301
2302#[cfg(test)]
2303mod borrowed_fetch_tests {
2304    use super::*;
2305    use crate::thin::codecs::encode_number_text;
2306
2307    // Isomorphism proof for the `simd-decode` feature (bead rust-oracledb-63o):
2308    // `validate_utf8` must make the SAME accept/reject decision as the canonical
2309    // `core::str::from_utf8`, AND return the same `&str` on accept, for every
2310    // input — whether or not the SIMD validator is compiled in. This guards the
2311    // hot text path against any divergence in UTF-8 grammar handling.
2312    #[test]
2313    fn validate_utf8_matches_core_accept_reject() {
2314        let cases: &[&[u8]] = &[
2315            b"",
2316            b"a",
2317            b"hello world",
2318            "VARCHAR2 cell".as_bytes(),
2319            "中文 mixed \u{1f600}".as_bytes(), // CJK + emoji (4-byte)
2320            "naïve café".as_bytes(),
2321            &[0x80],                   // lone continuation byte -> reject
2322            &[0xC0, 0x80],             // overlong NUL -> reject
2323            &[0xED, 0xA0, 0x80],       // UTF-16 surrogate -> reject
2324            &[0xF4, 0x90, 0x80, 0x80], // > U+10FFFF -> reject
2325            &[0xFF],                   // invalid lead -> reject
2326            &[0xE2, 0x82],             // truncated 3-byte -> reject
2327        ];
2328        for &bytes in cases {
2329            let core = core::str::from_utf8(bytes);
2330            let ours = validate_utf8(bytes);
2331            assert_eq!(
2332                core.is_ok(),
2333                ours.is_ok(),
2334                "accept/reject diverged for {bytes:02x?}"
2335            );
2336            if let (Ok(a), Ok(b)) = (core, ours) {
2337                assert_eq!(a, b, "accepted text diverged for {bytes:02x?}");
2338            }
2339        }
2340    }
2341
2342    // Build a synthetic column metadata for a scalar type.
2343    fn col(name: &str, ora_type_num: u8, csfrm: u8, buffer_size: u32) -> ColumnMetadata {
2344        ColumnMetadata {
2345            name: name.to_string(),
2346            ora_type_num,
2347            csfrm,
2348            buffer_size,
2349            ..ColumnMetadata::default()
2350        }
2351    }
2352
2353    // Encode one row of [Text, Number, Raw, NULL-text] as the server would frame
2354    // the column values (each a `write_bytes_with_length` run that `read_bytes`
2355    // / `read_bytes_borrowed` consume identically), and return the byte offset
2356    // where the row's column values begin.
2357    fn encode_mixed_row(writer: &mut TtcWriter, text: &str, number: &str, raw: &[u8]) {
2358        writer.write_bytes_with_length(text.as_bytes()).unwrap();
2359        let num = encode_number_text(number).unwrap();
2360        writer.write_bytes_with_length(&num).unwrap();
2361        writer.write_bytes_with_length(raw).unwrap();
2362        writer.write_u8(0); // NULL column (length byte 0)
2363    }
2364
2365    // The borrowed batch decode must yield, for every cell, a value whose
2366    // `to_owned_value()` is bit-for-bit the owned-path `QueryValue`, across a
2367    // mixed Text/Number/Raw/NULL row. And the Text/Raw cells must genuinely
2368    // borrow the batch buffer (zero-copy), not a fresh allocation.
2369    #[test]
2370    fn borrowed_batch_matches_owned_path_for_mixed_row() {
2371        let columns = vec![
2372            col("T", ORA_TYPE_NUM_VARCHAR, CS_FORM_IMPLICIT, 4000),
2373            col("N", ORA_TYPE_NUM_NUMBER, CS_FORM_IMPLICIT, 22),
2374            col("R", ORA_TYPE_NUM_RAW, CS_FORM_IMPLICIT, 2000),
2375            col("Z", ORA_TYPE_NUM_VARCHAR, CS_FORM_IMPLICIT, 4000),
2376        ];
2377
2378        let mut writer = TtcWriter::new();
2379        encode_mixed_row(
2380            &mut writer,
2381            "héllo world",
2382            "-12.5",
2383            &[0xDE, 0xAD, 0xBE, 0xEF],
2384        );
2385        encode_mixed_row(&mut writer, "second", "42", &[0x01]);
2386        let buffer = writer.into_bytes();
2387        let row_starts = vec![0, {
2388            // Find the second row's start by replaying the first row's consumption.
2389            let mut reader = TtcReader::new(&buffer);
2390            for c in &columns {
2391                let _ = parse_column_value(&mut reader, c).unwrap();
2392            }
2393            reader.position()
2394        }];
2395
2396        // Owned path: decode both rows the existing way for the golden values.
2397        let owned_rows: Vec<Vec<Option<QueryValue>>> = row_starts
2398            .iter()
2399            .map(|&start| {
2400                let mut reader = TtcReader::new(&buffer[start..]);
2401                columns
2402                    .iter()
2403                    .map(|c| parse_column_value(&mut reader, c).unwrap())
2404                    .collect()
2405            })
2406            .collect();
2407
2408        // Borrowed path: decode through the batch, collecting owned copies and
2409        // proving the scalar cells borrow the buffer.
2410        let batch = BorrowedRowBatch::new(buffer.clone(), columns.clone(), row_starts);
2411        let buf_ptr_range = batch.buffer_ptr_range();
2412
2413        let mut seen_rows = 0usize;
2414        let mut borrowed_owned: Vec<Vec<Option<QueryValue>>> = Vec::new();
2415        batch
2416            .for_each_row_ref(|row| {
2417                seen_rows += 1;
2418                // Text cell borrows the buffer.
2419                if let Some(QueryValueRef::Text(t)) = row[0] {
2420                    let p = t.as_ptr() as usize;
2421                    assert!(
2422                        buf_ptr_range.contains(&p),
2423                        "Text cell must borrow the batch buffer (zero-copy)"
2424                    );
2425                }
2426                // Raw cell borrows the buffer.
2427                if let Some(QueryValueRef::Raw(r)) = row[2] {
2428                    let p = r.as_ptr() as usize;
2429                    assert!(
2430                        buf_ptr_range.contains(&p),
2431                        "Raw cell must borrow the batch buffer (zero-copy)"
2432                    );
2433                }
2434                borrowed_owned.push(
2435                    row.iter()
2436                        .map(|cell| cell.map(|v| v.to_owned_value()))
2437                        .collect(),
2438                );
2439                Ok::<(), ProtocolError>(())
2440            })
2441            .unwrap();
2442
2443        assert_eq!(seen_rows, 2, "batch yields both rows");
2444        assert_eq!(
2445            borrowed_owned, owned_rows,
2446            "borrowed cells to_owned() must equal the owned-path values"
2447        );
2448    }
2449
2450    #[test]
2451    fn borrowed_number_to_owned_matches_owned_for_trailing_zero_number() {
2452        let column = col("N", ORA_TYPE_NUM_NUMBER, CS_FORM_IMPLICIT, 22);
2453        let number = encode_number_text("1000").expect("encode trailing-zero number");
2454        let mut writer = TtcWriter::new();
2455        writer
2456            .write_bytes_with_length(&number)
2457            .expect("write framed number");
2458        let buffer = writer.into_bytes();
2459
2460        let mut owned_reader = TtcReader::new(&buffer);
2461        let owned = parse_column_value(&mut owned_reader, &column)
2462            .expect("owned decode")
2463            .expect("owned number should be non-null");
2464
2465        let batch = BorrowedRowBatch::new(buffer, vec![column], vec![0]);
2466        let mut borrowed_owned = Vec::new();
2467        batch
2468            .for_each_row_ref(|row| {
2469                borrowed_owned.push(
2470                    row[0]
2471                        .expect("borrowed number should be non-null")
2472                        .to_owned_value(),
2473                );
2474                Ok::<(), ProtocolError>(())
2475            })
2476            .expect("borrowed decode");
2477        let borrowed = borrowed_owned
2478            .pop()
2479            .expect("borrowed decode should yield one row");
2480
2481        assert_eq!(
2482            borrowed.as_number_text().as_deref(),
2483            owned.as_number_text().as_deref(),
2484            "borrowed and owned paths must expose identical canonical NUMBER text"
2485        );
2486        assert_eq!(
2487            borrowed, owned,
2488            "borrowed to_owned_value should materialize the same trailing-zero NUMBER"
2489        );
2490    }
2491
2492    #[test]
2493    fn describe_size_zero_urowid_decodes_and_preserves_following_column_alignment() {
2494        let columns = vec![
2495            col("RID", ORA_TYPE_NUM_UROWID, CS_FORM_IMPLICIT, 0),
2496            col("NEXT", ORA_TYPE_NUM_VARCHAR, CS_FORM_IMPLICIT, 4000),
2497        ];
2498        let rba: u32 = 0x0102_0304;
2499        let partition_id: u16 = 0x0506;
2500        let block_num: u32 = 0x0708_090a;
2501        let slot_num: u16 = 0x0b0c;
2502        let mut encoded_urowid = Vec::new();
2503        encoded_urowid.push(1);
2504        encoded_urowid.extend_from_slice(&rba.to_be_bytes());
2505        encoded_urowid.extend_from_slice(&partition_id.to_be_bytes());
2506        encoded_urowid.extend_from_slice(&block_num.to_be_bytes());
2507        encoded_urowid.extend_from_slice(&slot_num.to_be_bytes());
2508        let expected_rowid = encode_physical_rowid(rba, partition_id, block_num, slot_num);
2509
2510        let mut writer = TtcWriter::new();
2511        writer
2512            .write_bytes_with_length(&[1])
2513            .expect("write UROWID null probe field");
2514        writer
2515            .write_bytes_with_length(&encoded_urowid)
2516            .expect("write encoded UROWID");
2517        writer
2518            .write_bytes_with_length(b"after")
2519            .expect("write following text column");
2520        let buffer = writer.into_bytes();
2521
2522        let mut owned_reader = TtcReader::new(&buffer);
2523        let owned = columns
2524            .iter()
2525            .map(|column| parse_column_value(&mut owned_reader, column).expect("owned decode"))
2526            .collect::<Vec<_>>();
2527        assert_eq!(
2528            owned[0].as_ref().and_then(QueryValue::as_rowid),
2529            Some(expected_rowid.as_str()),
2530            "owned decode must not NULL a describe-size-0 UROWID"
2531        );
2532        assert_eq!(
2533            owned[1].as_ref().and_then(QueryValue::as_text),
2534            Some("after"),
2535            "owned decode must consume UROWID bytes before the following column"
2536        );
2537
2538        let batch = BorrowedRowBatch::new(buffer, columns, vec![0]);
2539        let mut borrowed_rows = Vec::new();
2540        batch
2541            .for_each_row_ref(|row| {
2542                borrowed_rows.push(
2543                    row.iter()
2544                        .map(|cell| cell.map(|value| value.to_owned_value()))
2545                        .collect::<Vec<_>>(),
2546                );
2547                Ok::<(), ProtocolError>(())
2548            })
2549            .expect("borrowed decode");
2550        assert_eq!(borrowed_rows, vec![owned]);
2551    }
2552
2553    // The borrowed response parser walks the *same* message framing as the owned
2554    // `parse_fetch_response_with_context` (ROW_HEADER / BIT_VECTOR / ROW_DATA /
2555    // END_OF_RESPONSE), but instead of building owned rows it captures each
2556    // row's byte offset and hands back a `BorrowedRowBatch`. Decoding that batch
2557    // must reproduce exactly what the owned fetch path produced — duplicate
2558    // columns (bit vector) and all. Fixture is the same one the owned
2559    // `fetch_response_decodes_rows_with_previous_cursor_metadata` test uses.
2560    #[test]
2561    fn borrowed_response_parse_matches_owned_fetch_path() {
2562        use hex::FromHex;
2563        let payload = Vec::from_hex("06020101000205dc0001010101000702c1041d")
2564            .expect("fixture response should be valid hex");
2565        let columns = vec![
2566            col("INTCOL", ORA_TYPE_NUM_NUMBER, CS_FORM_IMPLICIT, 22),
2567            col("NUMBERCOL", ORA_TYPE_NUM_NUMBER, CS_FORM_IMPLICIT, 22),
2568        ];
2569        let previous_row = vec![
2570            Some(QueryValue::number_from_text("2", true)),
2571            Some(QueryValue::number_from_text("0.5", false)),
2572        ];
2573
2574        // Owned golden.
2575        let owned = parse_query_response_with_context(
2576            &payload,
2577            ClientCapabilities::default(),
2578            &columns,
2579            Some(&previous_row),
2580        )
2581        .expect("owned fetch decode");
2582
2583        // Borrowed parse.
2584        let borrowed = parse_query_response_borrowed(
2585            &payload,
2586            ClientCapabilities::default(),
2587            &columns,
2588            Some(&previous_row),
2589        )
2590        .expect("borrowed fetch decode");
2591
2592        assert_eq!(borrowed.more_rows, owned.more_rows);
2593        assert_eq!(borrowed.cursor_id, owned.cursor_id);
2594        assert_eq!(borrowed.batch.row_count(), owned.rows.len());
2595
2596        let mut borrowed_owned: Vec<Vec<Option<QueryValue>>> = Vec::new();
2597        borrowed
2598            .batch
2599            .for_each_row_ref(|row| {
2600                borrowed_owned.push(
2601                    row.iter()
2602                        .map(|cell| cell.map(|v| v.to_owned_value()))
2603                        .collect(),
2604                );
2605                Ok::<(), ProtocolError>(())
2606            })
2607            .expect("iterate borrowed rows");
2608
2609        assert_eq!(
2610            borrowed_owned, owned.rows,
2611            "borrowed batch must reproduce the owned fetch rows (incl. duplicate columns)"
2612        );
2613    }
2614}
2615
2616#[cfg(test)]
2617mod out_bind_boolean_regression_tests {
2618    use super::*;
2619
2620    fn boolean_column(is_array: bool) -> ColumnMetadata {
2621        ColumnMetadata {
2622            name: "B".to_string(),
2623            ora_type_num: ORA_TYPE_NUM_BOOLEAN,
2624            is_array,
2625            ..ColumnMetadata::default()
2626        }
2627    }
2628
2629    fn boolean_value_with_negative_actual_bytes() -> Vec<u8> {
2630        let mut writer = TtcWriter::new();
2631        writer
2632            .write_bytes_with_length(&[1])
2633            .expect("write present boolean value");
2634        writer.write_sb4(-1);
2635        writer.into_bytes()
2636    }
2637
2638    #[test]
2639    fn scalar_boolean_out_bind_negative_actual_bytes_decodes_null() {
2640        let bind_columns = [boolean_column(false)];
2641        let out_bind_indexes = [0usize];
2642        let payload = boolean_value_with_negative_actual_bytes();
2643        let mut reader = TtcReader::new(&payload);
2644        let mut result = QueryResult::default();
2645
2646        parse_out_bind_row_data(&mut reader, &mut result, &bind_columns, &out_bind_indexes)
2647            .expect("parse scalar BOOLEAN OUT bind");
2648
2649        assert_eq!(result.out_values, vec![(0, None)]);
2650    }
2651
2652    #[test]
2653    fn array_boolean_out_bind_negative_actual_bytes_decodes_null_element() {
2654        let bind_columns = [boolean_column(true)];
2655        let out_bind_indexes = [0usize];
2656        let mut writer = TtcWriter::new();
2657        writer.write_ub4(1);
2658        writer
2659            .write_bytes_with_length(&[1])
2660            .expect("write present boolean array element");
2661        writer.write_sb4(-1);
2662        let payload = writer.into_bytes();
2663        let mut reader = TtcReader::new(&payload);
2664        let mut result = QueryResult::default();
2665
2666        parse_out_bind_row_data(&mut reader, &mut result, &bind_columns, &out_bind_indexes)
2667            .expect("parse array BOOLEAN OUT bind");
2668
2669        assert_eq!(
2670            result.out_values,
2671            vec![(0, Some(QueryValue::Array(vec![None])))]
2672        );
2673    }
2674
2675    #[test]
2676    fn returning_boolean_negative_actual_bytes_decodes_null() {
2677        let bind_columns = [boolean_column(false)];
2678        let output_bind_indexes = [0usize];
2679        let mut writer = TtcWriter::new();
2680        writer.write_ub4(1);
2681        writer
2682            .write_bytes_with_length(&[1])
2683            .expect("write present returning boolean value");
2684        writer.write_sb4(-1);
2685        let payload = writer.into_bytes();
2686        let mut reader = TtcReader::new(&payload);
2687        let mut result = QueryResult::default();
2688
2689        parse_returning_row_data(
2690            &mut reader,
2691            &mut result,
2692            &bind_columns,
2693            &output_bind_indexes,
2694        )
2695        .expect("parse BOOLEAN RETURNING value");
2696
2697        assert_eq!(result.return_values, vec![(0, vec![None])]);
2698    }
2699}
2700
2701#[cfg(test)]
2702mod fuzz_regression_tests {
2703    use super::*;
2704
2705    // Regression (w6-fuzz, query_response target): a TNS_MSG_TYPE_IMPLICIT_RESULTSET
2706    // message (27) whose ub4 result count was ~620M made the dispatch loop
2707    // `Vec::with_capacity` several gigabytes of `QueryValue::Cursor` before the
2708    // truncated read failed, tripping libFuzzer's OOM detector. The parser must
2709    // now fail closed (truncated payload) without the giant allocation.
2710    #[test]
2711    fn fuzz_regression_implicit_resultset_oom() {
2712        // payload: type=27, ub4 length byte 4, value 0x25000000 (~620M), then EOF
2713        let payload = [27u8, 4, 37, 0, 0, 0];
2714        let err = parse_query_response(&payload, ClientCapabilities::default())
2715            .expect_err("oversized implicit-resultset count must fail closed");
2716        assert!(
2717            matches!(
2718                err,
2719                ProtocolError::TtcDecode(_) | ProtocolError::ResourceLimit { .. }
2720            ),
2721            "expected fail-closed protocol error, got {err:?}"
2722        );
2723    }
2724
2725    // BoundedReader invariant (l2p), query-columns family: a DESCRIBE_INFO
2726    // message (16) declaring a huge num_columns (ub4 ~620M) with no column
2727    // metadata bytes following must fail closed, not pre-allocate one
2728    // ColumnMetadata per declared column. parse_describe_info grows the column
2729    // Vec via push (no speculative with_capacity), and the first
2730    // parse_column_metadata read past the end errors.
2731    #[test]
2732    fn describe_info_oversized_column_count_fails_closed_not_oom() {
2733        // type=16 DESCRIBE_INFO; describe_name read_bytes len byte 0 (null);
2734        // max_row_size ub4 = 0; num_columns ub4 (len byte 4) = 0x25000000
2735        // (~620M); then EOF before the skip(1)/column records.
2736        let payload = [16u8, 0, 0, 4, 0x25, 0x00, 0x00, 0x00];
2737        let err = parse_query_response(&payload, ClientCapabilities::default())
2738            .expect_err("oversized column count must fail closed");
2739        assert!(
2740            matches!(
2741                err,
2742                ProtocolError::TtcDecode(_) | ProtocolError::ResourceLimit { .. }
2743            ),
2744            "expected fail-closed protocol error, got {err:?}"
2745        );
2746    }
2747
2748    #[test]
2749    fn describe_info_respects_protocol_column_limit() {
2750        // type=16 DESCRIBE_INFO; describe_name null; max_row_size=0;
2751        // num_columns=2. A max_columns=1 policy should fail before any column
2752        // metadata allocation/parsing.
2753        let payload = [TNS_MSG_TYPE_DESCRIBE_INFO, 0, 0, 1, 2];
2754        let limits = ProtocolLimits {
2755            max_columns: 1,
2756            ..ProtocolLimits::DEFAULT
2757        };
2758        let err = parse_query_response_with_limits(&payload, ClientCapabilities::default(), limits)
2759            .expect_err("column count above policy must fail");
2760        assert!(
2761            matches!(
2762                err,
2763                ProtocolError::ResourceLimit {
2764                    limit: "columns",
2765                    observed: 2,
2766                    maximum: 1,
2767                }
2768            ),
2769            "expected column ResourceLimit, got {err:?}"
2770        );
2771    }
2772
2773    // BoundedReader invariant (l2p), out-bind array family: an array OUT bind
2774    // whose ub4 num_elements is enormous (~620M) but carries no element bytes
2775    // must fail closed via with_capacity_bounded + the per-element read, not
2776    // reserve gigabytes of Option<QueryValue>.
2777    #[test]
2778    fn out_bind_array_oversized_element_count_fails_closed_not_oom() {
2779        let metadata = ColumnMetadata {
2780            name: "ARR".to_string(),
2781            ora_type_num: ORA_TYPE_NUM_NUMBER,
2782            is_array: true,
2783            ..ColumnMetadata::default()
2784        };
2785        let bind_columns = [metadata];
2786        let out_bind_indexes = [0usize];
2787        // ub4 num_elements: len byte 4, value 0x25000000, then no elements.
2788        let payload = [4u8, 0x25, 0x00, 0x00, 0x00];
2789        let mut reader = TtcReader::new(&payload);
2790        let mut result = QueryResult::default();
2791        let err =
2792            parse_out_bind_row_data(&mut reader, &mut result, &bind_columns, &out_bind_indexes)
2793                .expect_err("oversized array OUT bind count must fail closed");
2794        assert!(
2795            matches!(
2796                err,
2797                ProtocolError::TtcDecode(_) | ProtocolError::ResourceLimit { .. }
2798            ),
2799            "expected fail-closed protocol error, got {err:?}"
2800        );
2801    }
2802}