Skip to main content

oracledb_protocol/thin/
fetch.rs

1#![forbid(unsafe_code)]
2
3use super::*;
4
5/// Validate `slice` as UTF-8 for the hot borrowed-text decode path, returning the
6/// borrowed `&str` on success or `()` on rejection (the caller falls back to the
7/// owned `TextRaw` carrier — semantics identical regardless of validator).
8///
9/// With the `simd-decode` feature this uses `simdutf8::basic::from_utf8`, whose
10/// accept/reject decision is byte-for-byte identical to `core::str::from_utf8`
11/// (it validates the exact same UTF-8 grammar — it only declines to compute the
12/// error *position*, which this path never uses). The crate stays
13/// `#![forbid(unsafe_code)]`-clean: `simdutf8`'s SIMD `unsafe` is encapsulated
14/// inside that dependency and we call only its safe API.
15#[inline]
16fn validate_utf8(slice: &[u8]) -> core::result::Result<&str, ()> {
17    #[cfg(feature = "simd-decode")]
18    {
19        simdutf8::basic::from_utf8(slice).map_err(|_| ())
20    }
21    #[cfg(not(feature = "simd-decode"))]
22    {
23        core::str::from_utf8(slice).map_err(|_| ())
24    }
25}
26
27pub fn build_fetch_payload(cursor_id: u32, arraysize: u32) -> Vec<u8> {
28    build_fetch_payload_with_seq(cursor_id, arraysize, 1)
29}
30
31pub fn build_fetch_payload_with_seq(cursor_id: u32, arraysize: u32, seq_num: u8) -> Vec<u8> {
32    // Fixed tiny payload (function code + ub8 + two ub4 ≈ <=20 bytes). Prealloc
33    // so the small pushes do not grow the Vec through doublings; built every
34    // fetch page, so this matters on multi-page fetches. Bytes unchanged.
35    let mut writer = TtcWriter::with_capacity(32);
36    writer.write_function_code_with_seq(TNS_FUNC_FETCH, seq_num);
37    writer.write_ub8(0);
38    writer.write_ub4(cursor_id);
39    writer.write_ub4(arraysize);
40    writer.into_bytes()
41}
42
43pub fn build_define_fetch_payload_with_seq(
44    cursor_id: u32,
45    arraysize: u32,
46    seq_num: u8,
47    define_columns: &[ColumnMetadata],
48) -> Result<Vec<u8>> {
49    let define_count =
50        u32::try_from(define_columns.len()).map_err(|_| ProtocolError::InvalidPacketLength {
51            length: define_columns.len(),
52            minimum: 0,
53        })?;
54    let mut writer = TtcWriter::new();
55    writer.write_function_code_with_seq(TNS_FUNC_EXECUTE, seq_num);
56    writer.write_ub8(0);
57    writer.write_ub4(TNS_EXEC_OPTION_DEFINE | TNS_EXEC_OPTION_NOT_PLSQL);
58    writer.write_ub4(cursor_id);
59    writer.write_u8(0);
60    writer.write_ub4(0);
61    writer.write_u8(1);
62    writer.write_ub4(13);
63    writer.write_u8(0);
64    writer.write_u8(0);
65    writer.write_ub4(0);
66    writer.write_ub4(arraysize);
67    writer.write_ub4(TNS_MAX_LONG_LENGTH);
68    writer.write_u8(0);
69    writer.write_ub4(0);
70    writer.write_u8(0);
71    writer.write_u8(0);
72    writer.write_u8(0);
73    writer.write_u8(0);
74    writer.write_u8(0);
75    writer.write_u8(1);
76    writer.write_ub4(define_count);
77    writer.write_ub4(0);
78    writer.write_u8(0);
79    writer.write_u8(1);
80    writer.write_u8(0);
81    writer.write_ub4(0);
82    writer.write_u8(0);
83    writer.write_ub4(0);
84    writer.write_ub4(0);
85    writer.write_u8(0);
86    writer.write_ub4(0);
87    writer.write_u8(0);
88    writer.write_u8(0);
89    writer.write_ub4(0);
90    writer.write_ub4(0);
91    writer.write_ub4(0);
92    writer.write_ub4(0);
93    writer.write_ub4(0);
94    writer.write_ub4(0);
95    writer.write_ub4(0);
96    writer.write_ub4(arraysize);
97    writer.write_ub4(0);
98    writer.write_ub4(0);
99    writer.write_ub4(0);
100    writer.write_ub4(0);
101    writer.write_ub4(0);
102    writer.write_ub4(1);
103    writer.write_ub4(0);
104    writer.write_ub4(0);
105    writer.write_ub4(0);
106    writer.write_ub4(0);
107    writer.write_ub4(0);
108    for metadata in define_columns {
109        write_define_column_metadata(&mut writer, metadata);
110    }
111    Ok(writer.into_bytes())
112}
113
114pub(crate) fn write_define_column_metadata(writer: &mut TtcWriter, metadata: &ColumnMetadata) {
115    // reference base.pyx: VECTOR (and JSON) columns advertise a LOB-prefetch
116    // buffer so the server streams the image inline rather than returning a
117    // bare temp-LOB locator
118    let (mut buffer_size, cont_flags, lob_prefetch_length) = match metadata.ora_type_num {
119        ORA_TYPE_NUM_CLOB | ORA_TYPE_NUM_BLOB => (metadata.buffer_size, TNS_LOB_PREFETCH_FLAG, 0),
120        ORA_TYPE_NUM_VECTOR => (
121            TNS_VECTOR_MAX_LENGTH,
122            TNS_LOB_PREFETCH_FLAG,
123            TNS_VECTOR_MAX_LENGTH,
124        ),
125        ORA_TYPE_NUM_JSON => (
126            TNS_JSON_MAX_LENGTH,
127            TNS_LOB_PREFETCH_FLAG,
128            TNS_JSON_MAX_LENGTH,
129        ),
130        _ => (metadata.buffer_size, 0, 0),
131    };
132    buffer_size = buffer_size.max(1);
133    writer.write_u8(metadata.ora_type_num);
134    writer.write_u8(TNS_BIND_USE_INDICATORS);
135    writer.write_u8(0);
136    writer.write_u8(0);
137    writer.write_ub4(buffer_size);
138    writer.write_ub4(0);
139    writer.write_ub8(cont_flags);
140    writer.write_ub4(0);
141    writer.write_ub2(0);
142    if metadata.csfrm != 0 {
143        writer.write_ub2(TNS_CHARSET_UTF8);
144    } else {
145        writer.write_ub2(0);
146    }
147    writer.write_u8(metadata.csfrm);
148    writer.write_ub4(lob_prefetch_length);
149    writer.write_ub4(0);
150}
151
152pub fn parse_query_response(
153    payload: &[u8],
154    capabilities: ClientCapabilities,
155) -> Result<QueryResult> {
156    parse_query_response_with_previous(payload, capabilities, None)
157}
158
159pub fn parse_query_response_with_binds(
160    payload: &[u8],
161    capabilities: ClientCapabilities,
162    binds: &[BindValue],
163) -> Result<QueryResult> {
164    parse_query_response_with_binds_and_options(
165        payload,
166        capabilities,
167        binds,
168        ExecuteOptions::default(),
169    )
170}
171
172pub fn parse_query_response_with_binds_and_options(
173    payload: &[u8],
174    capabilities: ClientCapabilities,
175    binds: &[BindValue],
176    exec_options: ExecuteOptions,
177) -> Result<QueryResult> {
178    parse_query_response_with_binds_options_and_columns(
179        payload,
180        capabilities,
181        binds,
182        exec_options,
183        &[],
184    )
185}
186
187/// `known_columns` carries the fetch metadata of a re-executed statement
188/// whose response does not repeat the describe information (reference keeps
189/// the statement's fetch vars across executions).
190pub fn parse_query_response_with_binds_options_and_columns(
191    payload: &[u8],
192    capabilities: ClientCapabilities,
193    binds: &[BindValue],
194    exec_options: ExecuteOptions,
195    known_columns: &[ColumnMetadata],
196) -> Result<QueryResult> {
197    let bind_columns = binds.iter().map(bind_column_metadata).collect::<Vec<_>>();
198    let output_bind_indexes = binds
199        .iter()
200        .enumerate()
201        .filter_map(|(index, value)| value.is_return_output().then_some(index))
202        .collect::<Vec<_>>();
203    parse_query_response_with_context_binds_and_options(
204        payload,
205        capabilities,
206        known_columns,
207        None,
208        &bind_columns,
209        &output_bind_indexes,
210        false,
211        exec_options,
212    )
213}
214
215pub fn parse_query_response_with_previous(
216    payload: &[u8],
217    capabilities: ClientCapabilities,
218    previous_row: Option<&[Option<QueryValue>]>,
219) -> Result<QueryResult> {
220    parse_query_response_with_context(payload, capabilities, &[], previous_row)
221}
222
223pub fn parse_query_response_with_context(
224    payload: &[u8],
225    capabilities: ClientCapabilities,
226    previous_columns: &[ColumnMetadata],
227    previous_row: Option<&[Option<QueryValue>]>,
228) -> Result<QueryResult> {
229    parse_query_response_with_context_and_binds(
230        payload,
231        capabilities,
232        previous_columns,
233        previous_row,
234        &[],
235        &[],
236        false,
237    )
238}
239
240pub fn parse_fetch_response_with_context(
241    payload: &[u8],
242    capabilities: ClientCapabilities,
243    previous_columns: &[ColumnMetadata],
244    previous_row: Option<&[Option<QueryValue>]>,
245) -> Result<QueryResult> {
246    parse_query_response_with_context_and_binds(
247        payload,
248        capabilities,
249        previous_columns,
250        previous_row,
251        &[],
252        &[],
253        true,
254    )
255}
256
257pub(crate) fn parse_query_response_with_context_and_binds(
258    payload: &[u8],
259    capabilities: ClientCapabilities,
260    previous_columns: &[ColumnMetadata],
261    previous_row: Option<&[Option<QueryValue>]>,
262    bind_columns: &[ColumnMetadata],
263    output_bind_indexes: &[usize],
264    fetch_long_status: bool,
265) -> Result<QueryResult> {
266    parse_query_response_with_context_binds_and_options(
267        payload,
268        capabilities,
269        previous_columns,
270        previous_row,
271        bind_columns,
272        output_bind_indexes,
273        fetch_long_status,
274        ExecuteOptions::default(),
275    )
276}
277
278#[allow(clippy::too_many_arguments)] // mirrors the reference message attribute set
279pub(crate) fn parse_query_response_with_context_binds_and_options(
280    payload: &[u8],
281    capabilities: ClientCapabilities,
282    previous_columns: &[ColumnMetadata],
283    previous_row: Option<&[Option<QueryValue>]>,
284    bind_columns: &[ColumnMetadata],
285    output_bind_indexes: &[usize],
286    fetch_long_status: bool,
287    exec_options: ExecuteOptions,
288) -> Result<QueryResult> {
289    let mut reader = TtcReader::new(payload);
290    let mut result = QueryResult {
291        columns: previous_columns.to_vec(),
292        more_rows: true,
293        ..QueryResult::default()
294    };
295    let mut bit_vector: Option<Vec<u8>> = None;
296    let mut out_bind_indexes: Vec<usize> = Vec::new();
297    while reader.remaining() > 0 {
298        let message_type = reader.read_u8()?;
299        match message_type {
300            0 => {}
301            TNS_MSG_TYPE_DESCRIBE_INFO => {
302                let _describe_name = reader.read_bytes()?;
303                let previous = std::mem::take(&mut result.columns);
304                parse_describe_info(&mut reader, capabilities, &mut result)?;
305                // re-executing an open cursor whose underlying types changed:
306                // the server re-describes mid-response but still streams the
307                // row data in the adjusted (LONG/LONG RAW) form expected by
308                // the previous fetch metadata (reference `_adjust_metadata`,
309                // impl/thin/messages/base.pyx:820-845, applied during
310                // `_process_describe_info`).
311                for (index, column) in result.columns.iter_mut().enumerate() {
312                    if let Some(prev) = previous.get(index) {
313                        adjust_refetch_metadata(prev, column);
314                    }
315                }
316            }
317            TNS_MSG_TYPE_ROW_HEADER => {
318                bit_vector = parse_row_header(&mut reader)?;
319            }
320            TNS_MSG_TYPE_ROW_DATA => {
321                if result.columns.is_empty() && !out_bind_indexes.is_empty() {
322                    parse_out_bind_row_data(
323                        &mut reader,
324                        &mut result,
325                        bind_columns,
326                        &out_bind_indexes,
327                    )?;
328                } else if result.columns.is_empty() && !output_bind_indexes.is_empty() {
329                    parse_returning_row_data(
330                        &mut reader,
331                        &mut result,
332                        bind_columns,
333                        output_bind_indexes,
334                    )?;
335                } else {
336                    parse_row_data(
337                        &mut reader,
338                        &mut result,
339                        bit_vector.as_deref(),
340                        previous_row,
341                        fetch_long_status,
342                    )?;
343                }
344                bit_vector = None;
345            }
346            TNS_MSG_TYPE_BIT_VECTOR => {
347                bit_vector = Some(parse_bit_vector(&mut reader, result.columns.len())?);
348            }
349            TNS_MSG_TYPE_PARAMETER => {
350                let params =
351                    parse_query_return_parameters(&mut reader, exec_options.arraydmlrowcounts)?;
352                if exec_options.arraydmlrowcounts {
353                    result.array_dml_row_counts = Some(params.row_counts.unwrap_or_default());
354                }
355                if params.query_id.is_some() {
356                    result.query_id = params.query_id;
357                }
358            }
359            TNS_MSG_TYPE_STATUS => {
360                let call_status = reader.read_ub4()?;
361                let _seq = reader.read_ub2()?;
362                result.txn_in_progress = Some(call_status & TNS_EOCS_FLAGS_TXN_IN_PROGRESS != 0);
363            }
364            TNS_MSG_TYPE_IO_VECTOR => {
365                out_bind_indexes = parse_io_vector(&mut reader, bind_columns.len())?
366                    .into_iter()
367                    .filter(|index| !output_bind_indexes.contains(index))
368                    .collect();
369            }
370            TNS_MSG_TYPE_FLUSH_OUT_BINDS => break,
371            TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
372                if let Some(update) = skip_server_side_piggyback(&mut reader)? {
373                    result.sessionless_txn_state = Some(update);
374                }
375            }
376            TNS_MSG_TYPE_IMPLICIT_RESULTSET => {
377                // reference messages/base.pyx `_process_implicit_result`
378                let num_results = reader.read_ub4()?;
379                // `num_results` is read straight off the wire (a ub4, up to
380                // ~4e9); each resultset consumes at least one byte, so cap the
381                // reservation by the bytes left in the payload (BoundedReader).
382                // Without this a hostile server forces a multi-gigabyte
383                // allocation (OOM) before the truncated read in the loop body
384                // fails closed.
385                let mut resultsets: Vec<QueryValue> =
386                    reader.with_capacity_bounded(num_results as usize, 1);
387                for _ in 0..num_results {
388                    let num_bytes = reader.read_u8()?;
389                    reader.skip(usize::from(num_bytes))?;
390                    let mut child = QueryResult::default();
391                    parse_describe_info(&mut reader, capabilities, &mut child)?;
392                    let child_cursor_id = u32::from(reader.read_ub2()?);
393                    resultsets.push(QueryValue::Cursor(Box::new(CursorValue {
394                        columns: child.columns,
395                        cursor_id: child_cursor_id,
396                    })));
397                }
398                result.implicit_resultsets = Some(resultsets);
399            }
400            TNS_MSG_TYPE_END_OF_RESPONSE => break,
401            // pipeline responses open with the token of the operation they
402            // answer (messages/base.pyx:288-293); callers compare it against
403            // the expected token (mismatch -> DPY-2052 at the driver layer)
404            TNS_MSG_TYPE_TOKEN => {
405                result.token_num = Some(reader.read_ub8()?);
406            }
407            TNS_MSG_TYPE_ERROR => {
408                let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
409                // The end-of-call ERROR message (number 0 on success) carries
410                // the end-of-call status; sample the transaction-in-progress bit
411                // (reference protocol.pyx `_process_call_status`).
412                result.txn_in_progress =
413                    Some(info.call_status & TNS_EOCS_FLAGS_TXN_IN_PROGRESS != 0);
414                if info.cursor_id != 0 {
415                    result.cursor_id = u32::from(info.cursor_id);
416                }
417                result.row_count = info.row_count;
418                result.compilation_error_warning |= info.compilation_error_warning;
419                result.last_rowid = info.rowid.clone();
420                if info.number == TNS_ERR_NO_DATA_FOUND && !result.columns.is_empty() {
421                    result.more_rows = false;
422                } else if info.number == TNS_ERR_ARRAY_DML_ERRORS {
423                    // executemany(batcherrors=True): errors are reported via
424                    // the batch error arrays instead of raising ORA-24381
425                    // (reference messages/base.pyx `_process_error_info`).
426                    result.batch_errors = info.batch_errors;
427                } else if info.number != 0 {
428                    let mut details = info.into_details();
429                    details.array_dml_row_counts = result.array_dml_row_counts.take();
430                    return Err(ProtocolError::ServerErrorInfo(Box::new(details)));
431                }
432            }
433            _ => {
434                let position = reader.position().saturating_sub(1);
435                if let Some(message) =
436                    find_embedded_server_error(payload, capabilities.ttc_field_version, position)
437                {
438                    return Err(ProtocolError::ServerError(message));
439                }
440                return Err(ProtocolError::UnknownMessageType {
441                    message_type,
442                    position,
443                });
444            }
445        }
446    }
447    Ok(result)
448}
449
450pub(crate) fn bind_column_metadata(value: &BindValue) -> ColumnMetadata {
451    let (ora_type_num, csfrm, buffer_size) = bind_metadata(value);
452    let object_schema = match value {
453        BindValue::ObjectOutput { schema, .. } | BindValue::ObjectInput { schema, .. } => {
454            Some(schema.clone())
455        }
456        _ => None,
457    };
458    let object_type_name = match value {
459        BindValue::ObjectOutput { type_name, .. } | BindValue::ObjectInput { type_name, .. } => {
460            Some(type_name.clone())
461        }
462        _ => None,
463    };
464    ColumnMetadata {
465        name: String::new(),
466        ora_type_num,
467        csfrm,
468        precision: 0,
469        scale: 0,
470        buffer_size,
471        max_size: buffer_size,
472        nulls_allowed: true,
473        is_json: false,
474        is_oson: false,
475        object_schema,
476        object_type_name,
477        is_array: matches!(value, BindValue::Array { .. }),
478        vector_dimensions: None,
479        vector_format: 0,
480        vector_flags: 0,
481        domain_schema: None,
482        domain_name: None,
483        annotations: None,
484    }
485}
486
487pub(crate) fn parse_io_vector(reader: &mut TtcReader<'_>, bind_count: usize) -> Result<Vec<usize>> {
488    let _flags = reader.read_u8()?;
489    let temp16 = reader.read_ub2()?;
490    let temp32 = reader.read_ub4()?;
491    let num_binds = usize::try_from(temp32)
492        .map_err(|_| ProtocolError::InvalidPacketLength {
493            length: usize::MAX,
494            minimum: 0,
495        })?
496        .checked_mul(256)
497        .and_then(|value| value.checked_add(usize::from(temp16)))
498        .ok_or(ProtocolError::InvalidPacketLength {
499            length: usize::MAX,
500            minimum: 0,
501        })?;
502    let _num_iters_this_time = reader.read_ub4()?;
503    let _uac_buffer_length = reader.read_ub2()?;
504    let fast_fetch_len = reader.read_ub2()?;
505    if fast_fetch_len > 0 {
506        reader.skip(usize::from(fast_fetch_len))?;
507    }
508    let rowid_len = reader.read_ub2()?;
509    if rowid_len > 0 {
510        reader.skip(usize::from(rowid_len))?;
511    }
512    let mut out_indexes = Vec::new();
513    for index in 0..num_binds {
514        let direction = reader.read_u8()?;
515        if index < bind_count && direction != TNS_BIND_DIR_INPUT {
516            out_indexes.push(index);
517        }
518    }
519    Ok(out_indexes)
520}
521
522pub(crate) fn find_embedded_server_error(
523    payload: &[u8],
524    ttc_field_version: u8,
525    position: usize,
526) -> Option<String> {
527    let start = position.saturating_sub(64);
528    for candidate in start..=position {
529        if !matches!(payload.get(candidate).copied(), Some(TNS_MSG_TYPE_ERROR)) {
530            continue;
531        }
532        let mut reader = TtcReader::new(payload.get(candidate + 1..)?);
533        let info = parse_server_error_info(&mut reader, ttc_field_version).ok()?;
534        if info.number != 0 && info.message.starts_with("ORA-") {
535            return Some(info.message);
536        }
537    }
538    None
539}
540
541pub(crate) fn parse_describe_info(
542    reader: &mut TtcReader<'_>,
543    capabilities: ClientCapabilities,
544    result: &mut QueryResult,
545) -> Result<()> {
546    let _max_row_size = reader.read_ub4()?;
547    let num_columns = reader.read_ub4()?;
548    result.columns.clear();
549    if num_columns > 0 {
550        reader.skip(1)?;
551    }
552    for _ in 0..num_columns {
553        result
554            .columns
555            .push(parse_column_metadata(reader, capabilities)?);
556    }
557    let _current_date = reader.read_bytes_with_length()?;
558    let _dcbflag = reader.read_ub4()?;
559    let _dcbmdbz = reader.read_ub4()?;
560    let _dcbmnpr = reader.read_ub4()?;
561    let _dcbmxpr = reader.read_ub4()?;
562    let _dcbqcky = reader.read_bytes_with_length()?;
563    Ok(())
564}
565
566pub(crate) fn parse_column_metadata(
567    reader: &mut TtcReader<'_>,
568    capabilities: ClientCapabilities,
569) -> Result<ColumnMetadata> {
570    let ora_type_num = reader.read_u8()?;
571    reader.skip(1)?;
572    let precision = reader.read_i8()?;
573    let scale = reader.read_i8()?;
574    let buffer_size = reader.read_ub4()?;
575    let _max_array_elements = reader.read_ub4()?;
576    let _cont_flags = reader.read_ub8()?;
577    let _oid = reader.read_bytes_with_length()?;
578    let _version = reader.read_ub2()?;
579    let _charset_id = reader.read_ub2()?;
580    let csfrm = reader.read_u8()?;
581    let mut max_size = reader.read_ub4()?;
582    if ora_type_num == ORA_TYPE_NUM_RAW {
583        max_size = buffer_size;
584    }
585    if capabilities.ttc_field_version >= TNS_CCAP_FIELD_VERSION_12_2 {
586        let _oaccolid = reader.read_ub4()?;
587    }
588    let nulls_allowed = reader.read_u8()? != 0;
589    reader.skip(1)?;
590    let name = reader.read_string_with_length()?.unwrap_or_default();
591    let object_schema = reader.read_string_with_length()?;
592    let object_type_name = reader.read_string_with_length()?;
593    let _column_position = reader.read_ub2()?;
594    let uds_flags = reader.read_ub4()?;
595    let mut domain_schema = None;
596    let mut domain_name = None;
597    let mut annotations: Option<Vec<(String, String)>> = None;
598    if capabilities.ttc_field_version >= TNS_CCAP_FIELD_VERSION_23_1 {
599        domain_schema = reader.read_string_with_length()?;
600        domain_name = reader.read_string_with_length()?;
601    }
602    if capabilities.ttc_field_version >= TNS_CCAP_FIELD_VERSION_23_1_EXT_3 {
603        let num_annotations = reader.read_ub4()?;
604        if num_annotations > 0 {
605            reader.skip(1)?;
606            let num_annotations = reader.read_ub4()?;
607            reader.skip(1)?;
608            // Bound by remaining bytes (BoundedReader): each annotation reads
609            // at least a length-prefixed key/value, so a ub4 count larger than
610            // the payload is a lie that must not pre-allocate gigabytes.
611            let mut collected: Vec<(String, String)> =
612                reader.with_capacity_bounded(num_annotations as usize, 1);
613            for _ in 0..num_annotations {
614                let key = reader.read_string_with_length()?.unwrap_or_default();
615                // A null annotation value is normalized to "" by the reference
616                // driver (python-oracledb base.pyx _process_metadata).
617                let value = reader.read_string_with_length()?.unwrap_or_default();
618                let _flags = reader.read_ub4()?;
619                collected.push((key, value));
620            }
621            let _flags = reader.read_ub4()?;
622            annotations = Some(collected);
623        }
624    }
625    let mut vector_dimensions = None;
626    let mut vector_format = 0u8;
627    let mut vector_flags = 0u8;
628    if capabilities.ttc_field_version >= TNS_CCAP_FIELD_VERSION_23_4 {
629        // reference metadata.pyx: ub4 dimensions, ub1 format, ub1 flags
630        let dims = reader.read_ub4()?;
631        vector_format = reader.read_u8()?;
632        vector_flags = reader.read_u8()?;
633        if ora_type_num == ORA_TYPE_NUM_VECTOR {
634            vector_dimensions = Some(dims);
635        }
636    }
637
638    Ok(ColumnMetadata {
639        name,
640        ora_type_num,
641        csfrm,
642        precision,
643        scale,
644        buffer_size,
645        max_size,
646        nulls_allowed,
647        is_json: uds_flags & TNS_UDS_FLAGS_IS_JSON != 0,
648        is_oson: uds_flags & TNS_UDS_FLAGS_IS_OSON != 0,
649        object_schema,
650        object_type_name,
651        is_array: false,
652        vector_dimensions,
653        vector_format,
654        vector_flags,
655        domain_schema,
656        domain_name,
657        annotations,
658    })
659}
660
661pub(crate) fn parse_row_header(reader: &mut TtcReader<'_>) -> Result<Option<Vec<u8>>> {
662    reader.skip(1)?;
663    let _num_requests = reader.read_ub2()?;
664    let _iteration_number = reader.read_ub4()?;
665    let _num_iters = reader.read_ub4()?;
666    let _buffer_length = reader.read_ub2()?;
667    let num_bytes = reader.read_ub4()?;
668    let bit_vector = if num_bytes > 0 {
669        reader.skip(1)?;
670        Some(reader.read_raw(num_bytes as usize)?.to_vec())
671    } else {
672        None
673    };
674    let _rxhrid = reader.read_bytes_with_length()?;
675    Ok(bit_vector)
676}
677
678pub(crate) fn parse_bit_vector(reader: &mut TtcReader<'_>, num_columns: usize) -> Result<Vec<u8>> {
679    let _num_columns_sent = reader.read_ub2()?;
680    let num_bytes = num_columns.div_ceil(8);
681    Ok(reader.read_raw(num_bytes)?.to_vec())
682}
683
684pub(crate) fn parse_row_data(
685    reader: &mut TtcReader<'_>,
686    result: &mut QueryResult,
687    bit_vector: Option<&[u8]>,
688    previous_row: Option<&[Option<QueryValue>]>,
689    fetch_long_status: bool,
690) -> Result<()> {
691    let mut row = Vec::with_capacity(result.columns.len());
692    for (index, metadata) in result.columns.iter().enumerate() {
693        if is_duplicate_column(bit_vector, index) {
694            let previous = result
695                .rows
696                .last()
697                .map(Vec::as_slice)
698                .or(previous_row)
699                .and_then(|last| last.get(index))
700                .cloned()
701                .ok_or(ProtocolError::TtcDecode(
702                    "duplicate row data without previous row",
703                ))?;
704            row.push(previous);
705            continue;
706        }
707        row.push(parse_column_value(reader, metadata)?);
708        if fetch_long_status
709            && matches!(
710                metadata.ora_type_num,
711                ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW
712            )
713        {
714            let _null_indicator = reader.read_sb4()?;
715            let _return_code = reader.read_ub4()?;
716        }
717    }
718    result.rows.push(row);
719    Ok(())
720}
721
722pub(crate) fn parse_out_bind_row_data(
723    reader: &mut TtcReader<'_>,
724    result: &mut QueryResult,
725    bind_columns: &[ColumnMetadata],
726    out_bind_indexes: &[usize],
727) -> Result<()> {
728    for index in out_bind_indexes {
729        let metadata = bind_columns.get(*index).ok_or(ProtocolError::TtcDecode(
730            "out bind index without bind metadata",
731        ))?;
732        if metadata.is_array {
733            let num_elements = usize::try_from(reader.read_ub4()?).map_err(|_| {
734                ProtocolError::InvalidPacketLength {
735                    length: usize::MAX,
736                    minimum: 0,
737                }
738            })?;
739            // Cap by remaining bytes (BoundedReader): each element consumes
740            // wire data, so a ub4 count cannot legitimately exceed the payload.
741            let mut values: Vec<Option<QueryValue>> = reader.with_capacity_bounded(num_elements, 1);
742            for _ in 0..num_elements {
743                let value = parse_column_value(reader, metadata)?;
744                let actual_num_bytes = reader.read_sb4()?;
745                if actual_num_bytes != 0 && value.is_some() {
746                    return Err(ProtocolError::TtcDecode("truncated array OUT bind value"));
747                }
748                values.push(value);
749            }
750            result
751                .out_values
752                .push((*index, Some(QueryValue::Array(values))));
753            continue;
754        }
755        let value = parse_column_value(reader, metadata)?;
756        let actual_num_bytes = reader.read_sb4()?;
757        if actual_num_bytes != 0 && value.is_some() {
758            return Err(ProtocolError::TtcDecode("truncated OUT bind value"));
759        }
760        result.out_values.push((*index, value));
761    }
762    Ok(())
763}
764
765pub(crate) fn parse_returning_row_data(
766    reader: &mut TtcReader<'_>,
767    result: &mut QueryResult,
768    bind_columns: &[ColumnMetadata],
769    output_bind_indexes: &[usize],
770) -> Result<()> {
771    for index in output_bind_indexes {
772        let metadata = bind_columns.get(*index).ok_or(ProtocolError::TtcDecode(
773            "return bind index without bind metadata",
774        ))?;
775        let num_rows = usize::try_from(reader.read_ub4()?).map_err(|_| {
776            ProtocolError::InvalidPacketLength {
777                length: usize::MAX,
778                minimum: 0,
779            }
780        })?;
781        // Cap by remaining bytes (BoundedReader); see the OOM note above.
782        let mut values: Vec<Option<QueryValue>> = reader.with_capacity_bounded(num_rows, 1);
783        for _ in 0..num_rows {
784            let value = parse_column_value(reader, metadata)?;
785            let actual_num_bytes = reader.read_sb4()?;
786            if actual_num_bytes != 0 && value.is_some() {
787                return Err(ProtocolError::TtcDecode("truncated DML RETURNING value"));
788            }
789            values.push(value);
790        }
791        result.return_values.push((*index, values));
792    }
793    Ok(())
794}
795
796pub(crate) fn is_duplicate_column(bit_vector: Option<&[u8]>, column_num: usize) -> bool {
797    let Some(bit_vector) = bit_vector else {
798        return false;
799    };
800    let byte_num = column_num / 8;
801    let bit_num = column_num % 8;
802    bit_vector
803        .get(byte_num)
804        .is_some_and(|byte| byte & (1 << bit_num) == 0)
805}
806
807pub(crate) fn parse_column_value(
808    reader: &mut TtcReader<'_>,
809    metadata: &ColumnMetadata,
810) -> Result<Option<QueryValue>> {
811    if metadata.buffer_size == 0
812        && !matches!(
813            metadata.ora_type_num,
814            ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW
815        )
816    {
817        return Ok(None);
818    }
819    match metadata.ora_type_num {
820        ORA_TYPE_NUM_VARCHAR | ORA_TYPE_NUM_CHAR | ORA_TYPE_NUM_LONG => {
821            let Some(bytes) = reader.read_bytes()? else {
822                return Ok(None);
823            };
824            match decode_text_value(&bytes, metadata.csfrm) {
825                Ok(value) => Ok(Some(QueryValue::Text(value))),
826                // preserve the raw bytes so the caller can honor the
827                // configured encoding_errors policy (or raise a Python
828                // UnicodeDecodeError as the reference does)
829                Err(ProtocolError::TtcDecode(_)) => Ok(Some(QueryValue::TextRaw {
830                    bytes,
831                    csfrm: metadata.csfrm,
832                })),
833                Err(err) => Err(err),
834            }
835        }
836        ORA_TYPE_NUM_RAW | ORA_TYPE_NUM_LONG_RAW => Ok(reader.read_bytes()?.map(QueryValue::Raw)),
837        ORA_TYPE_NUM_ROWID => parse_rowid_value(reader).map(|value| value.map(QueryValue::Rowid)),
838        ORA_TYPE_NUM_UROWID => parse_urowid_value(reader).map(|value| value.map(QueryValue::Rowid)),
839        ORA_TYPE_NUM_NUMBER | ORA_TYPE_NUM_BINARY_INTEGER => {
840            let Some(bytes) = reader.read_bytes()? else {
841                return Ok(None);
842            };
843            decode_number_value(&bytes).map(Some)
844        }
845        ORA_TYPE_NUM_BINARY_DOUBLE => {
846            let Some(bytes) = reader.read_bytes()? else {
847                return Ok(None);
848            };
849            decode_binary_double(&bytes)
850                .map(|value| Some(QueryValue::BinaryDouble(value.to_string())))
851        }
852        ORA_TYPE_NUM_BINARY_FLOAT => {
853            let Some(bytes) = reader.read_bytes()? else {
854                return Ok(None);
855            };
856            // f64-widened text matches Python float semantics for BINARY_FLOAT
857            decode_binary_float(&bytes)
858                .map(|value| Some(QueryValue::BinaryDouble(f64::from(value).to_string())))
859        }
860        ORA_TYPE_NUM_BOOLEAN => {
861            let Some(bytes) = reader.read_bytes()? else {
862                return Ok(None);
863            };
864            // reference read_bool: last byte == 1 means true; native
865            // DB_TYPE_BOOLEAN surfaces as a Python bool.
866            let is_true = matches!(bytes.last(), Some(&1));
867            Ok(Some(QueryValue::Boolean(is_true)))
868        }
869        ORA_TYPE_NUM_INTERVAL_DS => {
870            let Some(bytes) = reader.read_bytes()? else {
871                return Ok(None);
872            };
873            decode_interval_ds(&bytes).map(Some)
874        }
875        ORA_TYPE_NUM_INTERVAL_YM => {
876            let Some(bytes) = reader.read_bytes()? else {
877                return Ok(None);
878            };
879            decode_interval_ym(&bytes).map(Some)
880        }
881        ORA_TYPE_NUM_DATE
882        | ORA_TYPE_NUM_TIMESTAMP
883        | ORA_TYPE_NUM_TIMESTAMP_LTZ
884        | ORA_TYPE_NUM_TIMESTAMP_TZ => {
885            let Some(bytes) = reader.read_bytes()? else {
886                return Ok(None);
887            };
888            decode_datetime_value(&bytes).map(Some)
889        }
890        ORA_TYPE_NUM_CLOB | ORA_TYPE_NUM_BLOB | ORA_TYPE_NUM_BFILE => {
891            parse_lob_value(reader, metadata)
892        }
893        ORA_TYPE_NUM_VECTOR => parse_vector_value(reader),
894        ORA_TYPE_NUM_JSON => parse_json_value(reader),
895        ORA_TYPE_NUM_CURSOR => parse_cursor_value(reader).map(Some),
896        ORA_TYPE_NUM_OBJECT => parse_object_value(reader, metadata),
897        _ => Err(ProtocolError::UnsupportedFeature("query column type")),
898    }
899}
900
901/// A column value decoded in pass 1 of the borrowed row decode. Scalar values
902/// that borrow the wire buffer are held directly; values that need a small
903/// owned arena (synthesized `Number` text, or a cold owned [`QueryValue`]) are
904/// recorded as a deferred handle into the per-row arena and resolved in pass 2
905/// once the arena is frozen. This two-pass split is what keeps the borrowed
906/// path sound under `#![forbid(unsafe_code)]`: no `&str`/`&[u8]` is ever held
907/// into an arena that is still being grown.
908enum ColumnSlot<'buf> {
909    /// SQL NULL.
910    Null,
911    /// A value that borrows the wire buffer (or is a small `Copy` value).
912    Wire(QueryValueRef<'buf>),
913    /// A `NUMBER` whose canonical text lives at `arena[range]` in the per-row
914    /// number-text arena.
915    Number {
916        range: core::ops::Range<usize>,
917        is_integer: bool,
918    },
919    /// A cold / non-borrowable value parked at `owned[index]` in the per-row
920    /// owned arena.
921    Owned(usize),
922}
923
924/// Decode one column into a [`ColumnSlot`], borrowing the wire buffer for the
925/// hot scalar cases and appending to the per-row arenas for the deferred ones.
926/// Mirrors [`parse_column_value`] type-for-type; the produced owned value (via
927/// [`QueryValueRef::to_owned_value`]) is identical to the owned path.
928///
929/// `digits` is a caller-owned scratch buffer reused across all cells so the
930/// per-cell `NUMBER` decode allocates nothing of its own (it writes straight
931/// into `number_arena`). The hot scalar grid (Text/Raw) borrows the wire buffer
932/// directly with zero allocation.
933fn parse_column_slot<'buf>(
934    reader: &mut TtcReader<'buf>,
935    metadata: &ColumnMetadata,
936    number_arena: &mut String,
937    owned_arena: &mut Vec<QueryValue>,
938    digits: &mut Vec<u8>,
939) -> Result<ColumnSlot<'buf>> {
940    // Park an owned QueryValue in the arena and return the deferred slot. Used
941    // for the cold / non-borrowable variants so the hot grid stays borrowed.
942    fn park(owned_arena: &mut Vec<QueryValue>, value: Option<QueryValue>) -> ColumnSlot<'static> {
943        match value {
944            None => ColumnSlot::Null,
945            Some(value) => {
946                owned_arena.push(value);
947                ColumnSlot::Owned(owned_arena.len() - 1)
948            }
949        }
950    }
951
952    if metadata.buffer_size == 0
953        && !matches!(
954            metadata.ora_type_num,
955            ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW
956        )
957    {
958        return Ok(ColumnSlot::Null);
959    }
960    match metadata.ora_type_num {
961        ORA_TYPE_NUM_VARCHAR | ORA_TYPE_NUM_CHAR | ORA_TYPE_NUM_LONG => {
962            match reader.read_bytes_borrowed()? {
963                BorrowedBytes::Null => Ok(ColumnSlot::Null),
964                // Borrow the wire bytes directly when they are valid UTF-8 and
965                // not the UTF-16 NCHAR form (which needs re-encoding). Zero copy.
966                BorrowedBytes::Slice(slice) if metadata.csfrm != CS_FORM_NCHAR => {
967                    match validate_utf8(slice) {
968                        Ok(text) => Ok(ColumnSlot::Wire(QueryValueRef::Text(text))),
969                        Err(_) => Ok(park(
970                            owned_arena,
971                            Some(QueryValue::TextRaw {
972                                bytes: slice.to_vec(),
973                                csfrm: metadata.csfrm,
974                            }),
975                        )),
976                    }
977                }
978                // NCHAR (UTF-16) or chunked long text: fall back to the owned
979                // decode, which re-encodes to UTF-8 / reassembles chunks.
980                other => {
981                    let bytes = other.into_vec();
982                    let value = match decode_text_value(&bytes, metadata.csfrm) {
983                        Ok(text) => QueryValue::Text(text),
984                        Err(ProtocolError::TtcDecode(_)) => QueryValue::TextRaw {
985                            bytes,
986                            csfrm: metadata.csfrm,
987                        },
988                        Err(err) => return Err(err),
989                    };
990                    Ok(park(owned_arena, Some(value)))
991                }
992            }
993        }
994        ORA_TYPE_NUM_RAW | ORA_TYPE_NUM_LONG_RAW => match reader.read_bytes_borrowed()? {
995            BorrowedBytes::Null => Ok(ColumnSlot::Null),
996            BorrowedBytes::Slice(slice) => Ok(ColumnSlot::Wire(QueryValueRef::Raw(slice))),
997            BorrowedBytes::Chunked(bytes) => Ok(park(owned_arena, Some(QueryValue::Raw(bytes)))),
998        },
999        ORA_TYPE_NUM_NUMBER | ORA_TYPE_NUM_BINARY_INTEGER => {
1000            // The wire NUMBER is binary; its canonical decimal text is
1001            // synthesized, so it cannot be borrowed from the buffer. We
1002            // synthesize it *directly* into the per-row number arena (reusing the
1003            // `digits` scratch), borrowing from the arena — zero per-cell heap
1004            // allocation for the common in-range NUMBER.
1005            with_small_bytes(reader, |bytes| match bytes {
1006                None => Ok(ColumnSlot::Null),
1007                Some(bytes) => {
1008                    let start = number_arena.len();
1009                    let is_integer = decode_number_text_into(bytes, digits, number_arena)?;
1010                    Ok(ColumnSlot::Number {
1011                        range: start..number_arena.len(),
1012                        is_integer,
1013                    })
1014                }
1015            })
1016        }
1017        ORA_TYPE_NUM_BOOLEAN => with_small_bytes(reader, |bytes| match bytes {
1018            None => Ok(ColumnSlot::Null),
1019            Some(bytes) => Ok(ColumnSlot::Wire(QueryValueRef::Boolean(matches!(
1020                bytes.last(),
1021                Some(&1)
1022            )))),
1023        }),
1024        ORA_TYPE_NUM_INTERVAL_DS => with_small_bytes(reader, |bytes| match bytes {
1025            None => Ok(ColumnSlot::Null),
1026            Some(bytes) => match decode_interval_ds(bytes)? {
1027                QueryValue::IntervalDS {
1028                    days,
1029                    hours,
1030                    minutes,
1031                    seconds,
1032                    fseconds,
1033                } => Ok(ColumnSlot::Wire(QueryValueRef::IntervalDS {
1034                    days,
1035                    hours,
1036                    minutes,
1037                    seconds,
1038                    fseconds,
1039                })),
1040                other => Ok(park(owned_arena, Some(other))),
1041            },
1042        }),
1043        ORA_TYPE_NUM_INTERVAL_YM => with_small_bytes(reader, |bytes| match bytes {
1044            None => Ok(ColumnSlot::Null),
1045            Some(bytes) => match decode_interval_ym(bytes)? {
1046                QueryValue::IntervalYM { years, months } => {
1047                    Ok(ColumnSlot::Wire(QueryValueRef::IntervalYM {
1048                        years,
1049                        months,
1050                    }))
1051                }
1052                other => Ok(park(owned_arena, Some(other))),
1053            },
1054        }),
1055        ORA_TYPE_NUM_DATE
1056        | ORA_TYPE_NUM_TIMESTAMP
1057        | ORA_TYPE_NUM_TIMESTAMP_LTZ
1058        | ORA_TYPE_NUM_TIMESTAMP_TZ => with_small_bytes(reader, |bytes| match bytes {
1059            None => Ok(ColumnSlot::Null),
1060            Some(bytes) => match decode_datetime_value(bytes)? {
1061                QueryValue::DateTime {
1062                    year,
1063                    month,
1064                    day,
1065                    hour,
1066                    minute,
1067                    second,
1068                    nanosecond,
1069                } => Ok(ColumnSlot::Wire(QueryValueRef::DateTime {
1070                    year,
1071                    month,
1072                    day,
1073                    hour,
1074                    minute,
1075                    second,
1076                    nanosecond,
1077                })),
1078                other => Ok(park(owned_arena, Some(other))),
1079            },
1080        }),
1081        // Everything else (Rowid, BinaryDouble/Float, Clob/Blob/Bfile, Vector,
1082        // Json, Cursor, Object, UROWID) goes through the owned decode and is
1083        // parked in the owned arena. These are the cold / non-borrowable cases.
1084        _ => {
1085            let value = parse_column_value(reader, metadata)?;
1086            Ok(park(owned_arena, value))
1087        }
1088    }
1089}
1090
1091/// Read one TTC byte field and hand the body to `f` as a borrowed `&[u8]`
1092/// without allocating in the common contiguous case. `None` is SQL NULL. The
1093/// rare chunked long form is reassembled into a temporary `Vec` (these small
1094/// fixed-size scalar types — number/boolean/interval/datetime — are never sent
1095/// chunked in practice, so this fallback is effectively dead weight).
1096fn with_small_bytes<'buf, T>(
1097    reader: &mut TtcReader<'buf>,
1098    f: impl FnOnce(Option<&[u8]>) -> Result<T>,
1099) -> Result<T> {
1100    match reader.read_bytes_borrowed()? {
1101        BorrowedBytes::Null => f(None),
1102        BorrowedBytes::Slice(slice) => f(Some(slice)),
1103        BorrowedBytes::Chunked(owned) => f(Some(&owned)),
1104    }
1105}
1106
1107impl BorrowedBytes<'_> {
1108    /// Reassemble into an owned `Vec` (zero-copy `Slice` still copies; `Chunked`
1109    /// reuses its already-owned `Vec`). Used by the non-borrowable text fallback.
1110    fn into_vec(self) -> Vec<u8> {
1111        match self {
1112            BorrowedBytes::Null => Vec::new(),
1113            BorrowedBytes::Slice(slice) => slice.to_vec(),
1114            BorrowedBytes::Chunked(owned) => owned,
1115        }
1116    }
1117}
1118
1119/// A decoded fetch batch that **owns** the wire response buffer and column
1120/// metadata, and yields rows of borrowed [`QueryValueRef`] that point straight
1121/// into that buffer. This is the zero-copy fetch fast path: the common scalar
1122/// grid is decoded with no per-cell allocation.
1123///
1124/// ## Soundness
1125///
1126/// The buffer is owned by the batch and outlives every borrowed row: rows are
1127/// only ever surfaced *inside* the [`for_each_row_ref`](Self::for_each_row_ref)
1128/// callback, whose `&[QueryValueRef]` argument cannot escape (its lifetime is
1129/// bound to the call). The borrow checker therefore guarantees no
1130/// `QueryValueRef` can dangle — there is no self-referential struct and no
1131/// `unsafe`. `Number` text and the cold values borrow per-row arenas that are
1132/// fully built (pass 1) before any reference into them is taken (pass 2), so an
1133/// arena is never grown while borrowed.
1134#[derive(Clone, Debug)]
1135pub struct BorrowedRowBatch {
1136    buffer: Vec<u8>,
1137    columns: Vec<ColumnMetadata>,
1138    /// Byte offset into `buffer` where each row's column values begin.
1139    row_starts: Vec<usize>,
1140    /// Per-row duplicate-column bit vector (server row-compression). `None` (or
1141    /// an absent entry) means every column is present on the wire for that row.
1142    /// A zero bit marks a duplicate column whose value repeats the previous
1143    /// row's value and carries no wire bytes (reference `bit_vector`).
1144    row_bit_vectors: Vec<Option<Vec<u8>>>,
1145    /// Whether this batch carried `LONG`/`LONG RAW` status trailers after each
1146    /// such column (the fetch path sets this; the plain execute path does not).
1147    fetch_long_status: bool,
1148    /// The caller's previous (owned) row, used to resolve duplicate columns in
1149    /// the *first* compressed row of the batch (whose duplicates repeat the row
1150    /// that ended the prior page). `None` outside the compressed-fetch case.
1151    previous_row_seed: Option<Vec<Option<QueryValue>>>,
1152}
1153
1154impl BorrowedRowBatch {
1155    /// Construct a batch from an owned wire `buffer`, the `columns` describing
1156    /// each cell, and the per-row start offsets into `buffer`. Use this for
1157    /// batches with no duplicate-column compression and no LONG trailers (the
1158    /// common synthetic / test case); the framing-aware
1159    /// [`parse_query_response_borrowed`] builds the full form.
1160    pub fn new(buffer: Vec<u8>, columns: Vec<ColumnMetadata>, row_starts: Vec<usize>) -> Self {
1161        Self {
1162            buffer,
1163            columns,
1164            row_starts,
1165            row_bit_vectors: Vec::new(),
1166            fetch_long_status: false,
1167            previous_row_seed: None,
1168        }
1169    }
1170
1171    /// Number of rows in the batch.
1172    pub fn row_count(&self) -> usize {
1173        self.row_starts.len()
1174    }
1175
1176    /// The columns describing each cell.
1177    pub fn columns(&self) -> &[ColumnMetadata] {
1178        &self.columns
1179    }
1180
1181    /// The address range of the owned buffer, for tests asserting that borrowed
1182    /// scalar cells truly point into it (zero-copy).
1183    #[cfg(test)]
1184    pub fn buffer_ptr_range(&self) -> core::ops::Range<usize> {
1185        let start = self.buffer.as_ptr() as usize;
1186        start..start + self.buffer.len()
1187    }
1188
1189    /// Decode each row and invoke `callback` with the row's borrowed cells. The
1190    /// `&[Option<QueryValueRef>]` slice borrows the batch buffer and per-row
1191    /// arenas; it is valid only for the duration of the call (it cannot escape).
1192    /// `None` cells are SQL NULL. Returns the first decode/callback error.
1193    ///
1194    /// Generic over the callback's error type `E` (any error a decode failure
1195    /// can convert into, e.g. the driver crate's own error) so callers are not
1196    /// forced through [`ProtocolError`]; a decode failure is surfaced via
1197    /// `E: From<ProtocolError>`.
1198    pub fn for_each_row_ref<F, E>(&self, mut callback: F) -> std::result::Result<(), E>
1199    where
1200        F: FnMut(&[Option<QueryValueRef<'_>>]) -> std::result::Result<(), E>,
1201        E: From<ProtocolError>,
1202    {
1203        // The two arenas are reused across rows (cleared, not reallocated). They
1204        // are mutated only in pass 1; pass 2 borrows them immutably and that
1205        // borrow is confined to a single loop iteration (it ends before the next
1206        // iteration's `clear()`), which is what keeps the borrow checker — and
1207        // soundness — happy.
1208        let mut number_arena = String::new();
1209        let mut owned_arena: Vec<QueryValue> = Vec::new();
1210        // Reusable scratch: `digits` for the NUMBER decode, `slots` for pass 1,
1211        // `row` for the borrowed cells handed to the callback. All cleared and
1212        // reused across rows so the steady-state per-row decode allocates only
1213        // when an arena/scratch genuinely grows (amortized). `slots`/`row`/
1214        // `digits` borrow nothing across iterations beyond the stable buffer.
1215        let mut digits: Vec<u8> = Vec::new();
1216        let mut slots: Vec<Option<ColumnSlot<'_>>> = Vec::with_capacity(self.columns.len());
1217        // Owned snapshot of the previous row, used only to resolve duplicate
1218        // (bit-vector-compressed) columns, which carry no wire bytes. Empty when
1219        // the batch has no bit vectors (the common case), so it costs nothing.
1220        // Seeded from the caller's prior-page row for the first compressed row.
1221        let mut previous_owned: Vec<Option<QueryValue>> =
1222            self.previous_row_seed.clone().unwrap_or_default();
1223        let uses_bit_vectors = !self.row_bit_vectors.is_empty();
1224
1225        for (row_index, &start) in self.row_starts.iter().enumerate() {
1226            number_arena.clear();
1227            owned_arena.clear();
1228            slots.clear();
1229            let bit_vector = self
1230                .row_bit_vectors
1231                .get(row_index)
1232                .and_then(|bv| bv.as_deref());
1233
1234            // Pass 1: decode all columns, growing the per-row arenas. `slots`
1235            // borrows only the buffer (the deferred Number/Owned slots hold a
1236            // range/index into the arenas, never a borrow of them). Duplicate
1237            // columns carry no wire bytes — their owned previous value is parked
1238            // in the owned arena.
1239            let mut reader = TtcReader::new(&self.buffer[start..]);
1240            for (index, metadata) in self.columns.iter().enumerate() {
1241                if is_duplicate_column(bit_vector, index) {
1242                    let previous = previous_owned.get(index).and_then(Option::as_ref);
1243                    match previous {
1244                        None => slots.push(None),
1245                        Some(value) => {
1246                            owned_arena.push(value.clone());
1247                            slots.push(Some(ColumnSlot::Owned(owned_arena.len() - 1)));
1248                        }
1249                    }
1250                    continue;
1251                }
1252                let slot = parse_column_slot(
1253                    &mut reader,
1254                    metadata,
1255                    &mut number_arena,
1256                    &mut owned_arena,
1257                    &mut digits,
1258                )?;
1259                slots.push(match slot {
1260                    ColumnSlot::Null => None,
1261                    other => Some(other),
1262                });
1263                if self.fetch_long_status
1264                    && matches!(
1265                        metadata.ora_type_num,
1266                        ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW
1267                    )
1268                {
1269                    let _null_indicator = reader.read_sb4()?;
1270                    let _return_code = reader.read_ub4()?;
1271                }
1272            }
1273
1274            // Pass 2: arenas are now frozen — resolve deferred slots into
1275            // borrowed refs. No arena is mutated here, so the borrows are sound.
1276            // `row` is allocated per row: it carries the per-row arena lifetime,
1277            // which (unlike `slots`/`digits`, that borrow only the stable buffer)
1278            // cannot be reused across an arena `clear()`. This is the single
1279            // remaining per-row allocation, versus the owned path's per-row Vec
1280            // *plus* a String per scalar cell.
1281            let row: Vec<Option<QueryValueRef<'_>>> = slots
1282                .iter()
1283                .map(|slot| {
1284                    slot.as_ref().map(|slot| match *slot {
1285                        ColumnSlot::Null => unreachable!("Null slots are stored as None"),
1286                        ColumnSlot::Wire(value) => value,
1287                        ColumnSlot::Number {
1288                            ref range,
1289                            is_integer,
1290                        } => QueryValueRef::Number {
1291                            text: &number_arena[range.clone()],
1292                            is_integer,
1293                        },
1294                        ColumnSlot::Owned(index) => QueryValueRef::Owned(&owned_arena[index]),
1295                    })
1296                })
1297                .collect();
1298
1299            callback(&row)?;
1300
1301            // Snapshot the just-emitted row as owned values for the next row's
1302            // duplicate-column resolution — but only when the batch actually uses
1303            // bit-vector compression, so the zero-copy common path pays nothing.
1304            if uses_bit_vectors {
1305                previous_owned.clear();
1306                previous_owned.extend(row.iter().map(|cell| cell.map(|v| v.to_owned_value())));
1307            }
1308        }
1309        Ok(())
1310    }
1311}
1312
1313/// The borrowed counterpart of a fetched [`QueryResult`]: a [`BorrowedRowBatch`]
1314/// of zero-copy rows plus the response-level fields a caller needs to page and
1315/// finalize the cursor. Produced by [`parse_query_response_borrowed`].
1316#[derive(Clone, Debug)]
1317pub struct BorrowedFetchResult {
1318    /// The decoded rows, borrowing the response buffer.
1319    pub batch: BorrowedRowBatch,
1320    /// Whether the server reports more rows for this cursor.
1321    pub more_rows: bool,
1322    /// Server cursor id (for paging / release).
1323    pub cursor_id: u32,
1324    /// Total affected/processed row count from the end-of-call error message.
1325    pub row_count: u64,
1326}
1327
1328/// Walk a fetch/query response payload and produce a [`BorrowedFetchResult`]
1329/// whose rows borrow `payload` (the caller must keep the owned buffer alive —
1330/// [`BorrowedRowBatch`] owns it). This is the zero-copy companion to
1331/// [`parse_fetch_response_with_context`]: it walks the exact same message
1332/// framing (DESCRIBE_INFO / ROW_HEADER / BIT_VECTOR / ROW_DATA / ERROR /
1333/// END_OF_RESPONSE) but, instead of materializing owned rows, records each
1334/// row's byte offset and bit vector so [`BorrowedRowBatch::for_each_row_ref`]
1335/// can decode them lazily and without per-cell allocation.
1336///
1337/// Scope: the plain query-row case (the fetch path). Out-bind / DML-returning
1338/// rows are not part of a fetch response and are left to the owned path.
1339pub fn parse_query_response_borrowed(
1340    payload: &[u8],
1341    capabilities: ClientCapabilities,
1342    columns: &[ColumnMetadata],
1343    previous_row: Option<&[Option<QueryValue>]>,
1344) -> Result<BorrowedFetchResult> {
1345    let mut reader = TtcReader::new(payload);
1346    let mut result_columns = columns.to_vec();
1347    let mut more_rows = true;
1348    let mut cursor_id = 0u32;
1349    let mut row_count = 0u64;
1350    let mut row_starts: Vec<usize> = Vec::new();
1351    let mut row_bit_vectors: Vec<Option<Vec<u8>>> = Vec::new();
1352    let mut any_bit_vector = false;
1353    let mut pending_bit_vector: Option<Vec<u8>> = None;
1354    // The fetch path always consumes LONG/LONG RAW status trailers.
1355    let fetch_long_status = true;
1356
1357    while reader.remaining() > 0 {
1358        let message_type = reader.read_u8()?;
1359        match message_type {
1360            0 => {}
1361            TNS_MSG_TYPE_DESCRIBE_INFO => {
1362                let _describe_name = reader.read_bytes()?;
1363                let previous = std::mem::take(&mut result_columns);
1364                let mut described = QueryResult::default();
1365                parse_describe_info(&mut reader, capabilities, &mut described)?;
1366                result_columns = described.columns;
1367                for (index, column) in result_columns.iter_mut().enumerate() {
1368                    if let Some(prev) = previous.get(index) {
1369                        adjust_refetch_metadata(prev, column);
1370                    }
1371                }
1372            }
1373            TNS_MSG_TYPE_ROW_HEADER => {
1374                pending_bit_vector = parse_row_header(&mut reader)?;
1375            }
1376            TNS_MSG_TYPE_BIT_VECTOR => {
1377                pending_bit_vector = Some(parse_bit_vector(&mut reader, result_columns.len())?);
1378            }
1379            TNS_MSG_TYPE_ROW_DATA => {
1380                // Record where this row's column values begin, then advance the
1381                // reader past the row (skipping, not materializing).
1382                row_starts.push(reader.position());
1383                let bit_vector = pending_bit_vector.take();
1384                any_bit_vector |= bit_vector.is_some();
1385                row_bit_vectors.push(bit_vector.clone());
1386                skip_row_data(
1387                    &mut reader,
1388                    &result_columns,
1389                    bit_vector.as_deref(),
1390                    fetch_long_status,
1391                )?;
1392            }
1393            TNS_MSG_TYPE_PARAMETER => {
1394                let _params = parse_query_return_parameters(&mut reader, false)?;
1395            }
1396            TNS_MSG_TYPE_STATUS => {
1397                let _call_status = reader.read_ub4()?;
1398                let _seq = reader.read_ub2()?;
1399            }
1400            TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
1401                let _ = skip_server_side_piggyback(&mut reader)?;
1402            }
1403            TNS_MSG_TYPE_FLUSH_OUT_BINDS | TNS_MSG_TYPE_END_OF_RESPONSE => break,
1404            TNS_MSG_TYPE_TOKEN => {
1405                let _token = reader.read_ub8()?;
1406            }
1407            TNS_MSG_TYPE_IMPLICIT_RESULTSET => {
1408                // Mirror the owned parser's framing walk so the reader advances
1409                // past the implicit-resultset block identically (the borrowed
1410                // fetch API does not surface child cursors, but it must still
1411                // consume the bytes). reference messages/base.pyx
1412                // `_process_implicit_result`.
1413                let num_results = reader.read_ub4()?;
1414                for _ in 0..num_results.min(reader.remaining() as u32) {
1415                    let num_bytes = reader.read_u8()?;
1416                    reader.skip(usize::from(num_bytes))?;
1417                    let mut child = QueryResult::default();
1418                    parse_describe_info(&mut reader, capabilities, &mut child)?;
1419                    let _child_cursor_id = reader.read_ub2()?;
1420                }
1421            }
1422            TNS_MSG_TYPE_ERROR => {
1423                let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
1424                if info.cursor_id != 0 {
1425                    cursor_id = u32::from(info.cursor_id);
1426                }
1427                row_count = info.row_count;
1428                if info.number == TNS_ERR_NO_DATA_FOUND && !result_columns.is_empty() {
1429                    more_rows = false;
1430                } else if info.number != 0 && info.number != TNS_ERR_ARRAY_DML_ERRORS {
1431                    return Err(ProtocolError::ServerErrorInfo(Box::new(
1432                        info.into_details(),
1433                    )));
1434                }
1435            }
1436            _ => {
1437                let position = reader.position().saturating_sub(1);
1438                if let Some(message) =
1439                    find_embedded_server_error(payload, capabilities.ttc_field_version, position)
1440                {
1441                    return Err(ProtocolError::ServerError(message));
1442                }
1443                return Err(ProtocolError::UnknownMessageType {
1444                    message_type,
1445                    position,
1446                });
1447            }
1448        }
1449    }
1450
1451    // If the batch never used duplicate-column compression, drop the per-row
1452    // bit-vector vector so iteration takes the zero-copy fast path (no owned
1453    // previous-row snapshotting).
1454    if !any_bit_vector {
1455        row_bit_vectors.clear();
1456    }
1457
1458    let batch = BorrowedRowBatch {
1459        buffer: payload.to_vec(),
1460        columns: result_columns,
1461        row_starts,
1462        row_bit_vectors,
1463        fetch_long_status,
1464        // Seed the first compressed row's duplicate resolution from the caller's
1465        // prior-page row (only consulted when the batch uses bit vectors).
1466        previous_row_seed: any_bit_vector.then(|| {
1467            previous_row
1468                .map(<[Option<QueryValue>]>::to_vec)
1469                .unwrap_or_default()
1470        }),
1471    };
1472
1473    Ok(BorrowedFetchResult {
1474        batch,
1475        more_rows,
1476        cursor_id,
1477        row_count,
1478    })
1479}
1480
1481/// Advance `reader` past one ROW_DATA row **without materializing owned values**
1482/// — this is the offset-capture pass, so it must allocate nothing for the hot
1483/// scalar grid. Mirrors [`parse_row_data`]'s consumption exactly: duplicate
1484/// (bit-vector) columns carry no wire bytes and are skipped; the hot byte-field
1485/// scalar types are skipped with a zero-allocation length-prefixed skip; the
1486/// rare cold types (LOB / Vector / JSON / Cursor / Object / ROWID), whose wire
1487/// framing is non-trivial, fall back to [`parse_column_value`] (which may
1488/// allocate, but those are uncommon). `LONG`/`LONG RAW` status trailers are
1489/// consumed when `fetch_long_status`.
1490fn skip_row_data(
1491    reader: &mut TtcReader<'_>,
1492    columns: &[ColumnMetadata],
1493    bit_vector: Option<&[u8]>,
1494    fetch_long_status: bool,
1495) -> Result<()> {
1496    for (index, metadata) in columns.iter().enumerate() {
1497        if is_duplicate_column(bit_vector, index) {
1498            continue;
1499        }
1500        let consumed_byte_field = metadata.buffer_size != 0
1501            && matches!(
1502                metadata.ora_type_num,
1503                ORA_TYPE_NUM_VARCHAR
1504                    | ORA_TYPE_NUM_CHAR
1505                    | ORA_TYPE_NUM_LONG
1506                    | ORA_TYPE_NUM_RAW
1507                    | ORA_TYPE_NUM_LONG_RAW
1508                    | ORA_TYPE_NUM_NUMBER
1509                    | ORA_TYPE_NUM_BINARY_INTEGER
1510                    | ORA_TYPE_NUM_BINARY_DOUBLE
1511                    | ORA_TYPE_NUM_BINARY_FLOAT
1512                    | ORA_TYPE_NUM_BOOLEAN
1513                    | ORA_TYPE_NUM_INTERVAL_DS
1514                    | ORA_TYPE_NUM_INTERVAL_YM
1515                    | ORA_TYPE_NUM_DATE
1516                    | ORA_TYPE_NUM_TIMESTAMP
1517                    | ORA_TYPE_NUM_TIMESTAMP_LTZ
1518                    | ORA_TYPE_NUM_TIMESTAMP_TZ
1519            );
1520        if consumed_byte_field {
1521            reader.skip_bytes_field()?;
1522        } else {
1523            // Cold / non-byte-field type, or a zero-buffer-size column: defer to
1524            // the full owned decode purely to advance the reader correctly.
1525            let _ = parse_column_value(reader, metadata)?;
1526        }
1527        if fetch_long_status
1528            && matches!(
1529                metadata.ora_type_num,
1530                ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW
1531            )
1532        {
1533            let _null_indicator = reader.read_sb4()?;
1534            let _return_code = reader.read_ub4()?;
1535        }
1536    }
1537    Ok(())
1538}
1539
1540pub(crate) fn encode_rowid_component(mut value: u32, size: usize, output: &mut String) {
1541    let mut encoded = vec![b'A'; size];
1542    for index in 0..size {
1543        let alphabet_index = usize::try_from(value & 0x3f).unwrap_or(0);
1544        encoded[size - index - 1] = TNS_BASE64_ALPHABET[alphabet_index];
1545        value >>= 6;
1546    }
1547    output.extend(encoded.into_iter().map(char::from));
1548}
1549
1550pub(crate) fn encode_physical_rowid(
1551    rba: u32,
1552    partition_id: u16,
1553    block_num: u32,
1554    slot_num: u16,
1555) -> String {
1556    let mut output = String::with_capacity(ORA_TYPE_SIZE_ROWID as usize);
1557    encode_rowid_component(rba, 6, &mut output);
1558    encode_rowid_component(u32::from(partition_id), 3, &mut output);
1559    encode_rowid_component(block_num, 6, &mut output);
1560    encode_rowid_component(u32::from(slot_num), 3, &mut output);
1561    output
1562}
1563
1564pub(crate) fn parse_rowid_value(reader: &mut TtcReader<'_>) -> Result<Option<String>> {
1565    let len = reader.read_u8()?;
1566    if len == 0 || len == crate::wire::TNS_NULL_LENGTH_INDICATOR {
1567        return Ok(None);
1568    }
1569    let rba = reader.read_ub4()?;
1570    let partition_id = reader.read_ub2()?;
1571    reader.skip(1)?;
1572    let block_num = reader.read_ub4()?;
1573    let slot_num = reader.read_ub2()?;
1574    Ok(Some(encode_physical_rowid(
1575        rba,
1576        partition_id,
1577        block_num,
1578        slot_num,
1579    )))
1580}
1581
1582pub(crate) fn encode_logical_urowid(bytes: &[u8]) -> String {
1583    let mut input_offset = 1;
1584    let mut input_len = bytes.len().saturating_sub(1);
1585    let mut output = String::with_capacity((bytes.len() / 3) * 4 + 4);
1586    output.push('*');
1587    while input_len > 0 {
1588        let mut pos = bytes[input_offset] >> 2;
1589        output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1590
1591        pos = (bytes[input_offset] & 0x03) << 4;
1592        if input_len == 1 {
1593            output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1594            break;
1595        }
1596        input_offset += 1;
1597        pos |= (bytes[input_offset] & 0xf0) >> 4;
1598        output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1599
1600        pos = (bytes[input_offset] & 0x0f) << 2;
1601        if input_len == 2 {
1602            output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1603            break;
1604        }
1605        input_offset += 1;
1606        pos |= (bytes[input_offset] & 0xc0) >> 6;
1607        output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1608
1609        pos = bytes[input_offset] & 0x3f;
1610        output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1611        input_offset += 1;
1612        input_len -= 3;
1613    }
1614    output
1615}
1616
1617pub(crate) fn parse_urowid_value(reader: &mut TtcReader<'_>) -> Result<Option<String>> {
1618    if reader.read_bytes()?.is_none() {
1619        return Ok(None);
1620    }
1621    let Some(bytes) = reader.read_bytes()? else {
1622        return Ok(None);
1623    };
1624    if bytes.len() < 13 {
1625        return Err(ProtocolError::TtcDecode("encoded UROWID too short"));
1626    }
1627    if bytes[0] == 1 {
1628        let rba = u32::from_be_bytes([bytes[1], bytes[2], bytes[3], bytes[4]]);
1629        let partition_id = u16::from_be_bytes([bytes[5], bytes[6]]);
1630        let block_num = u32::from_be_bytes([bytes[7], bytes[8], bytes[9], bytes[10]]);
1631        let slot_num = u16::from_be_bytes([bytes[11], bytes[12]]);
1632        Ok(Some(encode_physical_rowid(
1633            rba,
1634            partition_id,
1635            block_num,
1636            slot_num,
1637        )))
1638    } else {
1639        Ok(Some(encode_logical_urowid(&bytes)))
1640    }
1641}
1642
1643pub(crate) fn parse_lob_value(
1644    reader: &mut TtcReader<'_>,
1645    metadata: &ColumnMetadata,
1646) -> Result<Option<QueryValue>> {
1647    let num_bytes = reader.read_ub4()?;
1648    if num_bytes == 0 {
1649        return Ok(None);
1650    }
1651    let (size, chunk_size) = if matches!(metadata.ora_type_num, ORA_TYPE_NUM_BFILE) {
1652        (0, 0)
1653    } else {
1654        (reader.read_ub8()?, reader.read_ub4()?)
1655    };
1656    let Some(locator) = reader.read_bytes()? else {
1657        return Ok(None);
1658    };
1659    Ok(Some(QueryValue::Lob(Box::new(LobValue {
1660        ora_type_num: metadata.ora_type_num,
1661        csfrm: metadata.csfrm,
1662        locator,
1663        size,
1664        chunk_size,
1665    }))))
1666}
1667
1668/// Reads a VECTOR value (reference `ReadBuffer.read_vector` in `packet.pyx`).
1669/// VECTOR is sent as a fully-prefetched LOB: the image data precedes the
1670/// (discarded) LOB locator.
1671pub(crate) fn parse_vector_value(reader: &mut TtcReader<'_>) -> Result<Option<QueryValue>> {
1672    let num_bytes = reader.read_ub4()?;
1673    if num_bytes == 0 {
1674        return Ok(None);
1675    }
1676    reader.read_ub8()?; // size (unused)
1677    reader.read_ub4()?; // chunk size (unused)
1678    let Some(data) = reader.read_bytes()? else {
1679        return Ok(None);
1680    };
1681    reader.read_bytes()?; // LOB locator (unused)
1682    if data.is_empty() {
1683        return Ok(None);
1684    }
1685    let vector = crate::vector::decode_vector(&data)?;
1686    Ok(Some(QueryValue::Vector(Box::new(vector))))
1687}
1688
1689/// Parses a native JSON (`DB_TYPE_JSON`) column value. Like VECTOR, OSON is sent
1690/// as a fully-prefetched LOB: `num_bytes`, `size`, `chunk_size`, the OSON image,
1691/// then a (discarded) LOB locator (reference packet.pyx `read_oson`).
1692pub(crate) fn parse_json_value(reader: &mut TtcReader<'_>) -> Result<Option<QueryValue>> {
1693    let num_bytes = reader.read_ub4()?;
1694    if num_bytes == 0 {
1695        return Ok(None);
1696    }
1697    reader.read_ub8()?; // size (unused)
1698    reader.read_ub4()?; // chunk size (unused)
1699    let Some(data) = reader.read_bytes()? else {
1700        return Ok(None);
1701    };
1702    reader.read_bytes()?; // LOB locator (unused)
1703    if data.is_empty() {
1704        return Ok(None);
1705    }
1706    let value = crate::oson::decode_oson(&data)?;
1707    Ok(Some(QueryValue::Json(Box::new(value))))
1708}
1709
1710pub(crate) fn parse_object_value(
1711    reader: &mut TtcReader<'_>,
1712    metadata: &ColumnMetadata,
1713) -> Result<Option<QueryValue>> {
1714    let _toid = reader.read_bytes_with_length()?;
1715    let _oid = reader.read_bytes_with_length()?;
1716    let _snapshot = reader.read_bytes_with_length()?;
1717    let _version = reader.read_ub2()?;
1718    let num_bytes = reader.read_ub4()?;
1719    reader.skip(2)?;
1720    if num_bytes == 0 {
1721        return Ok(None);
1722    }
1723    let Some(packed_data) = reader.read_bytes()? else {
1724        return Ok(None);
1725    };
1726    Ok(Some(QueryValue::Object(Box::new(ObjectValue {
1727        schema: metadata.object_schema.clone(),
1728        type_name: metadata.object_type_name.clone(),
1729        packed_data,
1730    }))))
1731}
1732
1733pub(crate) fn parse_cursor_value(reader: &mut TtcReader<'_>) -> Result<QueryValue> {
1734    reader.skip(1)?;
1735    let mut result = QueryResult::default();
1736    parse_describe_info(reader, ClientCapabilities::default(), &mut result)?;
1737    let cursor_id = u32::from(reader.read_ub2()?);
1738    Ok(QueryValue::Cursor(Box::new(CursorValue {
1739        columns: result.columns,
1740        cursor_id,
1741    })))
1742}
1743
1744pub(crate) struct QueryReturnParameters {
1745    pub row_counts: Option<Vec<u64>>,
1746    /// CQN registered-query id extracted from the registration-info block
1747    /// (reference base.pyx:1300-1309); `None` when no block was present.
1748    pub query_id: Option<u64>,
1749}
1750
1751pub(crate) fn parse_query_return_parameters(
1752    reader: &mut TtcReader<'_>,
1753    arraydmlrowcounts: bool,
1754) -> Result<QueryReturnParameters> {
1755    let num_params = reader.read_ub2()?;
1756    for _ in 0..num_params {
1757        let _value = reader.read_ub4()?;
1758    }
1759    let num_bytes = reader.read_ub2()?;
1760    if num_bytes > 0 {
1761        reader.skip(usize::from(num_bytes))?;
1762    }
1763    let num_pairs = reader.read_ub2()?;
1764    skip_keyword_value_pairs(reader, num_pairs)?;
1765    // registration info block: the trailing 8 bytes (msb at -4, lsb at -8) are
1766    // the CQN query id when a registration id was sent (reference base.pyx).
1767    let num_bytes = usize::from(reader.read_ub2()?);
1768    let mut query_id = None;
1769    if num_bytes > 0 {
1770        let block = reader.read_raw(num_bytes)?;
1771        if num_bytes >= 8 {
1772            let msb = u32::from_be_bytes([
1773                block[num_bytes - 4],
1774                block[num_bytes - 3],
1775                block[num_bytes - 2],
1776                block[num_bytes - 1],
1777            ]);
1778            let lsb = u32::from_be_bytes([
1779                block[num_bytes - 8],
1780                block[num_bytes - 7],
1781                block[num_bytes - 6],
1782                block[num_bytes - 5],
1783            ]);
1784            query_id = Some((u64::from(msb) << 32) | u64::from(lsb));
1785        }
1786    }
1787    if arraydmlrowcounts {
1788        // reference messages/base.pyx `_process_return_parameters` tail
1789        let num_rows = reader.read_ub4()?;
1790        // Each ub8 row count consumes at least one byte, so cap the reservation
1791        // by the remaining payload size (BoundedReader).
1792        let mut row_counts: Vec<u64> = reader.with_capacity_bounded(num_rows as usize, 1);
1793        for _ in 0..num_rows {
1794            row_counts.push(reader.read_ub8()?);
1795        }
1796        return Ok(QueryReturnParameters {
1797            row_counts: Some(row_counts),
1798            query_id,
1799        });
1800    }
1801    Ok(QueryReturnParameters {
1802        row_counts: None,
1803        query_id,
1804    })
1805}
1806
1807#[cfg(test)]
1808mod borrowed_fetch_tests {
1809    use super::*;
1810    use crate::thin::codecs::encode_number_text;
1811
1812    // Isomorphism proof for the `simd-decode` feature (bead rust-oracledb-63o):
1813    // `validate_utf8` must make the SAME accept/reject decision as the canonical
1814    // `core::str::from_utf8`, AND return the same `&str` on accept, for every
1815    // input — whether or not the SIMD validator is compiled in. This guards the
1816    // hot text path against any divergence in UTF-8 grammar handling.
1817    #[test]
1818    fn validate_utf8_matches_core_accept_reject() {
1819        let cases: &[&[u8]] = &[
1820            b"",
1821            b"a",
1822            b"hello world",
1823            "VARCHAR2 cell".as_bytes(),
1824            "中文 mixed \u{1f600}".as_bytes(), // CJK + emoji (4-byte)
1825            "naïve café".as_bytes(),
1826            &[0x80],                   // lone continuation byte -> reject
1827            &[0xC0, 0x80],             // overlong NUL -> reject
1828            &[0xED, 0xA0, 0x80],       // UTF-16 surrogate -> reject
1829            &[0xF4, 0x90, 0x80, 0x80], // > U+10FFFF -> reject
1830            &[0xFF],                   // invalid lead -> reject
1831            &[0xE2, 0x82],             // truncated 3-byte -> reject
1832        ];
1833        for &bytes in cases {
1834            let core = core::str::from_utf8(bytes);
1835            let ours = validate_utf8(bytes);
1836            assert_eq!(
1837                core.is_ok(),
1838                ours.is_ok(),
1839                "accept/reject diverged for {bytes:02x?}"
1840            );
1841            if let (Ok(a), Ok(b)) = (core, ours) {
1842                assert_eq!(a, b, "accepted text diverged for {bytes:02x?}");
1843            }
1844        }
1845    }
1846
1847    // Build a synthetic column metadata for a scalar type.
1848    fn col(name: &str, ora_type_num: u8, csfrm: u8, buffer_size: u32) -> ColumnMetadata {
1849        ColumnMetadata {
1850            name: name.to_string(),
1851            ora_type_num,
1852            csfrm,
1853            buffer_size,
1854            ..ColumnMetadata::default()
1855        }
1856    }
1857
1858    // Encode one row of [Text, Number, Raw, NULL-text] as the server would frame
1859    // the column values (each a `write_bytes_with_length` run that `read_bytes`
1860    // / `read_bytes_borrowed` consume identically), and return the byte offset
1861    // where the row's column values begin.
1862    fn encode_mixed_row(writer: &mut TtcWriter, text: &str, number: &str, raw: &[u8]) {
1863        writer.write_bytes_with_length(text.as_bytes()).unwrap();
1864        let num = encode_number_text(number).unwrap();
1865        writer.write_bytes_with_length(&num).unwrap();
1866        writer.write_bytes_with_length(raw).unwrap();
1867        writer.write_u8(0); // NULL column (length byte 0)
1868    }
1869
1870    // The borrowed batch decode must yield, for every cell, a value whose
1871    // `to_owned_value()` is bit-for-bit the owned-path `QueryValue`, across a
1872    // mixed Text/Number/Raw/NULL row. And the Text/Raw cells must genuinely
1873    // borrow the batch buffer (zero-copy), not a fresh allocation.
1874    #[test]
1875    fn borrowed_batch_matches_owned_path_for_mixed_row() {
1876        let columns = vec![
1877            col("T", ORA_TYPE_NUM_VARCHAR, CS_FORM_IMPLICIT, 4000),
1878            col("N", ORA_TYPE_NUM_NUMBER, CS_FORM_IMPLICIT, 22),
1879            col("R", ORA_TYPE_NUM_RAW, CS_FORM_IMPLICIT, 2000),
1880            col("Z", ORA_TYPE_NUM_VARCHAR, CS_FORM_IMPLICIT, 4000),
1881        ];
1882
1883        let mut writer = TtcWriter::new();
1884        encode_mixed_row(
1885            &mut writer,
1886            "héllo world",
1887            "-12.5",
1888            &[0xDE, 0xAD, 0xBE, 0xEF],
1889        );
1890        encode_mixed_row(&mut writer, "second", "42", &[0x01]);
1891        let buffer = writer.into_bytes();
1892        let row_starts = vec![0, {
1893            // Find the second row's start by replaying the first row's consumption.
1894            let mut reader = TtcReader::new(&buffer);
1895            for c in &columns {
1896                let _ = parse_column_value(&mut reader, c).unwrap();
1897            }
1898            reader.position()
1899        }];
1900
1901        // Owned path: decode both rows the existing way for the golden values.
1902        let owned_rows: Vec<Vec<Option<QueryValue>>> = row_starts
1903            .iter()
1904            .map(|&start| {
1905                let mut reader = TtcReader::new(&buffer[start..]);
1906                columns
1907                    .iter()
1908                    .map(|c| parse_column_value(&mut reader, c).unwrap())
1909                    .collect()
1910            })
1911            .collect();
1912
1913        // Borrowed path: decode through the batch, collecting owned copies and
1914        // proving the scalar cells borrow the buffer.
1915        let batch = BorrowedRowBatch::new(buffer.clone(), columns.clone(), row_starts);
1916        let buf_ptr_range = batch.buffer_ptr_range();
1917
1918        let mut seen_rows = 0usize;
1919        let mut borrowed_owned: Vec<Vec<Option<QueryValue>>> = Vec::new();
1920        batch
1921            .for_each_row_ref(|row| {
1922                seen_rows += 1;
1923                // Text cell borrows the buffer.
1924                if let Some(QueryValueRef::Text(t)) = row[0] {
1925                    let p = t.as_ptr() as usize;
1926                    assert!(
1927                        buf_ptr_range.contains(&p),
1928                        "Text cell must borrow the batch buffer (zero-copy)"
1929                    );
1930                }
1931                // Raw cell borrows the buffer.
1932                if let Some(QueryValueRef::Raw(r)) = row[2] {
1933                    let p = r.as_ptr() as usize;
1934                    assert!(
1935                        buf_ptr_range.contains(&p),
1936                        "Raw cell must borrow the batch buffer (zero-copy)"
1937                    );
1938                }
1939                borrowed_owned.push(
1940                    row.iter()
1941                        .map(|cell| cell.map(|v| v.to_owned_value()))
1942                        .collect(),
1943                );
1944                Ok::<(), ProtocolError>(())
1945            })
1946            .unwrap();
1947
1948        assert_eq!(seen_rows, 2, "batch yields both rows");
1949        assert_eq!(
1950            borrowed_owned, owned_rows,
1951            "borrowed cells to_owned() must equal the owned-path values"
1952        );
1953    }
1954
1955    // The borrowed response parser walks the *same* message framing as the owned
1956    // `parse_fetch_response_with_context` (ROW_HEADER / BIT_VECTOR / ROW_DATA /
1957    // END_OF_RESPONSE), but instead of building owned rows it captures each
1958    // row's byte offset and hands back a `BorrowedRowBatch`. Decoding that batch
1959    // must reproduce exactly what the owned fetch path produced — duplicate
1960    // columns (bit vector) and all. Fixture is the same one the owned
1961    // `fetch_response_decodes_rows_with_previous_cursor_metadata` test uses.
1962    #[test]
1963    fn borrowed_response_parse_matches_owned_fetch_path() {
1964        use hex::FromHex;
1965        let payload = Vec::from_hex("06020101000205dc0001010101000702c1041d")
1966            .expect("fixture response should be valid hex");
1967        let columns = vec![
1968            col("INTCOL", ORA_TYPE_NUM_NUMBER, CS_FORM_IMPLICIT, 22),
1969            col("NUMBERCOL", ORA_TYPE_NUM_NUMBER, CS_FORM_IMPLICIT, 22),
1970        ];
1971        let previous_row = vec![
1972            Some(QueryValue::number_from_text("2", true)),
1973            Some(QueryValue::number_from_text("0.5", false)),
1974        ];
1975
1976        // Owned golden.
1977        let owned = parse_query_response_with_context(
1978            &payload,
1979            ClientCapabilities::default(),
1980            &columns,
1981            Some(&previous_row),
1982        )
1983        .expect("owned fetch decode");
1984
1985        // Borrowed parse.
1986        let borrowed = parse_query_response_borrowed(
1987            &payload,
1988            ClientCapabilities::default(),
1989            &columns,
1990            Some(&previous_row),
1991        )
1992        .expect("borrowed fetch decode");
1993
1994        assert_eq!(borrowed.more_rows, owned.more_rows);
1995        assert_eq!(borrowed.cursor_id, owned.cursor_id);
1996        assert_eq!(borrowed.batch.row_count(), owned.rows.len());
1997
1998        let mut borrowed_owned: Vec<Vec<Option<QueryValue>>> = Vec::new();
1999        borrowed
2000            .batch
2001            .for_each_row_ref(|row| {
2002                borrowed_owned.push(
2003                    row.iter()
2004                        .map(|cell| cell.map(|v| v.to_owned_value()))
2005                        .collect(),
2006                );
2007                Ok::<(), ProtocolError>(())
2008            })
2009            .expect("iterate borrowed rows");
2010
2011        assert_eq!(
2012            borrowed_owned, owned.rows,
2013            "borrowed batch must reproduce the owned fetch rows (incl. duplicate columns)"
2014        );
2015    }
2016}
2017
2018#[cfg(test)]
2019mod fuzz_regression_tests {
2020    use super::*;
2021
2022    // Regression (w6-fuzz, query_response target): a TNS_MSG_TYPE_IMPLICIT_RESULTSET
2023    // message (27) whose ub4 result count was ~620M made the dispatch loop
2024    // `Vec::with_capacity` several gigabytes of `QueryValue::Cursor` before the
2025    // truncated read failed, tripping libFuzzer's OOM detector. The parser must
2026    // now fail closed (truncated payload) without the giant allocation.
2027    #[test]
2028    fn fuzz_regression_implicit_resultset_oom() {
2029        // payload: type=27, ub4 length byte 4, value 0x25000000 (~620M), then EOF
2030        let payload = [27u8, 4, 37, 0, 0, 0];
2031        let err = parse_query_response(&payload, ClientCapabilities::default())
2032            .expect_err("oversized implicit-resultset count must fail closed");
2033        assert!(
2034            matches!(err, ProtocolError::TtcDecode(_)),
2035            "expected fail-closed TtcDecode, got {err:?}"
2036        );
2037    }
2038
2039    // BoundedReader invariant (l2p), query-columns family: a DESCRIBE_INFO
2040    // message (16) declaring a huge num_columns (ub4 ~620M) with no column
2041    // metadata bytes following must fail closed, not pre-allocate one
2042    // ColumnMetadata per declared column. parse_describe_info grows the column
2043    // Vec via push (no speculative with_capacity), and the first
2044    // parse_column_metadata read past the end errors.
2045    #[test]
2046    fn describe_info_oversized_column_count_fails_closed_not_oom() {
2047        // type=16 DESCRIBE_INFO; describe_name read_bytes len byte 0 (null);
2048        // max_row_size ub4 = 0; num_columns ub4 (len byte 4) = 0x25000000
2049        // (~620M); then EOF before the skip(1)/column records.
2050        let payload = [16u8, 0, 0, 4, 0x25, 0x00, 0x00, 0x00];
2051        let err = parse_query_response(&payload, ClientCapabilities::default())
2052            .expect_err("oversized column count must fail closed");
2053        assert!(
2054            matches!(err, ProtocolError::TtcDecode(_)),
2055            "expected fail-closed TtcDecode, got {err:?}"
2056        );
2057    }
2058
2059    // BoundedReader invariant (l2p), out-bind array family: an array OUT bind
2060    // whose ub4 num_elements is enormous (~620M) but carries no element bytes
2061    // must fail closed via with_capacity_bounded + the per-element read, not
2062    // reserve gigabytes of Option<QueryValue>.
2063    #[test]
2064    fn out_bind_array_oversized_element_count_fails_closed_not_oom() {
2065        let metadata = ColumnMetadata {
2066            name: "ARR".to_string(),
2067            ora_type_num: ORA_TYPE_NUM_NUMBER,
2068            is_array: true,
2069            ..ColumnMetadata::default()
2070        };
2071        let bind_columns = [metadata];
2072        let out_bind_indexes = [0usize];
2073        // ub4 num_elements: len byte 4, value 0x25000000, then no elements.
2074        let payload = [4u8, 0x25, 0x00, 0x00, 0x00];
2075        let mut reader = TtcReader::new(&payload);
2076        let mut result = QueryResult::default();
2077        let err =
2078            parse_out_bind_row_data(&mut reader, &mut result, &bind_columns, &out_bind_indexes)
2079                .expect_err("oversized array OUT bind count must fail closed");
2080        assert!(
2081            matches!(err, ProtocolError::TtcDecode(_)),
2082            "expected fail-closed TtcDecode, got {err:?}"
2083        );
2084    }
2085}