Skip to main content

oracledb_protocol/thin/
fetch.rs

1#![forbid(unsafe_code)]
2
3use super::*;
4
5pub fn build_fetch_payload(cursor_id: u32, arraysize: u32) -> Vec<u8> {
6    build_fetch_payload_with_seq(cursor_id, arraysize, 1)
7}
8
9pub fn build_fetch_payload_with_seq(cursor_id: u32, arraysize: u32, seq_num: u8) -> Vec<u8> {
10    // Fixed tiny payload (function code + ub8 + two ub4 ≈ <=20 bytes). Prealloc
11    // so the small pushes do not grow the Vec through doublings; built every
12    // fetch page, so this matters on multi-page fetches. Bytes unchanged.
13    let mut writer = TtcWriter::with_capacity(32);
14    writer.write_function_code_with_seq(TNS_FUNC_FETCH, seq_num);
15    writer.write_ub8(0);
16    writer.write_ub4(cursor_id);
17    writer.write_ub4(arraysize);
18    writer.into_bytes()
19}
20
21pub fn build_define_fetch_payload_with_seq(
22    cursor_id: u32,
23    arraysize: u32,
24    seq_num: u8,
25    define_columns: &[ColumnMetadata],
26) -> Result<Vec<u8>> {
27    let define_count =
28        u32::try_from(define_columns.len()).map_err(|_| ProtocolError::InvalidPacketLength {
29            length: define_columns.len(),
30            minimum: 0,
31        })?;
32    let mut writer = TtcWriter::new();
33    writer.write_function_code_with_seq(TNS_FUNC_EXECUTE, seq_num);
34    writer.write_ub8(0);
35    writer.write_ub4(TNS_EXEC_OPTION_DEFINE | TNS_EXEC_OPTION_NOT_PLSQL);
36    writer.write_ub4(cursor_id);
37    writer.write_u8(0);
38    writer.write_ub4(0);
39    writer.write_u8(1);
40    writer.write_ub4(13);
41    writer.write_u8(0);
42    writer.write_u8(0);
43    writer.write_ub4(0);
44    writer.write_ub4(arraysize);
45    writer.write_ub4(TNS_MAX_LONG_LENGTH);
46    writer.write_u8(0);
47    writer.write_ub4(0);
48    writer.write_u8(0);
49    writer.write_u8(0);
50    writer.write_u8(0);
51    writer.write_u8(0);
52    writer.write_u8(0);
53    writer.write_u8(1);
54    writer.write_ub4(define_count);
55    writer.write_ub4(0);
56    writer.write_u8(0);
57    writer.write_u8(1);
58    writer.write_u8(0);
59    writer.write_ub4(0);
60    writer.write_u8(0);
61    writer.write_ub4(0);
62    writer.write_ub4(0);
63    writer.write_u8(0);
64    writer.write_ub4(0);
65    writer.write_u8(0);
66    writer.write_u8(0);
67    writer.write_ub4(0);
68    writer.write_ub4(0);
69    writer.write_ub4(0);
70    writer.write_ub4(0);
71    writer.write_ub4(0);
72    writer.write_ub4(0);
73    writer.write_ub4(0);
74    writer.write_ub4(arraysize);
75    writer.write_ub4(0);
76    writer.write_ub4(0);
77    writer.write_ub4(0);
78    writer.write_ub4(0);
79    writer.write_ub4(0);
80    writer.write_ub4(1);
81    writer.write_ub4(0);
82    writer.write_ub4(0);
83    writer.write_ub4(0);
84    writer.write_ub4(0);
85    writer.write_ub4(0);
86    for metadata in define_columns {
87        write_define_column_metadata(&mut writer, metadata);
88    }
89    Ok(writer.into_bytes())
90}
91
92pub(crate) fn write_define_column_metadata(writer: &mut TtcWriter, metadata: &ColumnMetadata) {
93    // reference base.pyx: VECTOR (and JSON) columns advertise a LOB-prefetch
94    // buffer so the server streams the image inline rather than returning a
95    // bare temp-LOB locator
96    let (mut buffer_size, cont_flags, lob_prefetch_length) = match metadata.ora_type_num {
97        ORA_TYPE_NUM_CLOB | ORA_TYPE_NUM_BLOB => (metadata.buffer_size, TNS_LOB_PREFETCH_FLAG, 0),
98        ORA_TYPE_NUM_VECTOR => (
99            TNS_VECTOR_MAX_LENGTH,
100            TNS_LOB_PREFETCH_FLAG,
101            TNS_VECTOR_MAX_LENGTH,
102        ),
103        ORA_TYPE_NUM_JSON => (
104            TNS_JSON_MAX_LENGTH,
105            TNS_LOB_PREFETCH_FLAG,
106            TNS_JSON_MAX_LENGTH,
107        ),
108        _ => (metadata.buffer_size, 0, 0),
109    };
110    buffer_size = buffer_size.max(1);
111    writer.write_u8(metadata.ora_type_num);
112    writer.write_u8(TNS_BIND_USE_INDICATORS);
113    writer.write_u8(0);
114    writer.write_u8(0);
115    writer.write_ub4(buffer_size);
116    writer.write_ub4(0);
117    writer.write_ub8(cont_flags);
118    writer.write_ub4(0);
119    writer.write_ub2(0);
120    if metadata.csfrm != 0 {
121        writer.write_ub2(TNS_CHARSET_UTF8);
122    } else {
123        writer.write_ub2(0);
124    }
125    writer.write_u8(metadata.csfrm);
126    writer.write_ub4(lob_prefetch_length);
127    writer.write_ub4(0);
128}
129
130pub fn parse_query_response(
131    payload: &[u8],
132    capabilities: ClientCapabilities,
133) -> Result<QueryResult> {
134    parse_query_response_with_previous(payload, capabilities, None)
135}
136
137pub fn parse_query_response_with_binds(
138    payload: &[u8],
139    capabilities: ClientCapabilities,
140    binds: &[BindValue],
141) -> Result<QueryResult> {
142    parse_query_response_with_binds_and_options(
143        payload,
144        capabilities,
145        binds,
146        ExecuteOptions::default(),
147    )
148}
149
150pub fn parse_query_response_with_binds_and_options(
151    payload: &[u8],
152    capabilities: ClientCapabilities,
153    binds: &[BindValue],
154    exec_options: ExecuteOptions,
155) -> Result<QueryResult> {
156    parse_query_response_with_binds_options_and_columns(
157        payload,
158        capabilities,
159        binds,
160        exec_options,
161        &[],
162    )
163}
164
165/// `known_columns` carries the fetch metadata of a re-executed statement
166/// whose response does not repeat the describe information (reference keeps
167/// the statement's fetch vars across executions).
168pub fn parse_query_response_with_binds_options_and_columns(
169    payload: &[u8],
170    capabilities: ClientCapabilities,
171    binds: &[BindValue],
172    exec_options: ExecuteOptions,
173    known_columns: &[ColumnMetadata],
174) -> Result<QueryResult> {
175    let bind_columns = binds.iter().map(bind_column_metadata).collect::<Vec<_>>();
176    let output_bind_indexes = binds
177        .iter()
178        .enumerate()
179        .filter_map(|(index, value)| value.is_return_output().then_some(index))
180        .collect::<Vec<_>>();
181    parse_query_response_with_context_binds_and_options(
182        payload,
183        capabilities,
184        known_columns,
185        None,
186        &bind_columns,
187        &output_bind_indexes,
188        false,
189        exec_options,
190    )
191}
192
193pub fn parse_query_response_with_previous(
194    payload: &[u8],
195    capabilities: ClientCapabilities,
196    previous_row: Option<&[Option<QueryValue>]>,
197) -> Result<QueryResult> {
198    parse_query_response_with_context(payload, capabilities, &[], previous_row)
199}
200
201pub fn parse_query_response_with_context(
202    payload: &[u8],
203    capabilities: ClientCapabilities,
204    previous_columns: &[ColumnMetadata],
205    previous_row: Option<&[Option<QueryValue>]>,
206) -> Result<QueryResult> {
207    parse_query_response_with_context_and_binds(
208        payload,
209        capabilities,
210        previous_columns,
211        previous_row,
212        &[],
213        &[],
214        false,
215    )
216}
217
218pub fn parse_fetch_response_with_context(
219    payload: &[u8],
220    capabilities: ClientCapabilities,
221    previous_columns: &[ColumnMetadata],
222    previous_row: Option<&[Option<QueryValue>]>,
223) -> Result<QueryResult> {
224    parse_query_response_with_context_and_binds(
225        payload,
226        capabilities,
227        previous_columns,
228        previous_row,
229        &[],
230        &[],
231        true,
232    )
233}
234
235pub(crate) fn parse_query_response_with_context_and_binds(
236    payload: &[u8],
237    capabilities: ClientCapabilities,
238    previous_columns: &[ColumnMetadata],
239    previous_row: Option<&[Option<QueryValue>]>,
240    bind_columns: &[ColumnMetadata],
241    output_bind_indexes: &[usize],
242    fetch_long_status: bool,
243) -> Result<QueryResult> {
244    parse_query_response_with_context_binds_and_options(
245        payload,
246        capabilities,
247        previous_columns,
248        previous_row,
249        bind_columns,
250        output_bind_indexes,
251        fetch_long_status,
252        ExecuteOptions::default(),
253    )
254}
255
256#[allow(clippy::too_many_arguments)] // mirrors the reference message attribute set
257pub(crate) fn parse_query_response_with_context_binds_and_options(
258    payload: &[u8],
259    capabilities: ClientCapabilities,
260    previous_columns: &[ColumnMetadata],
261    previous_row: Option<&[Option<QueryValue>]>,
262    bind_columns: &[ColumnMetadata],
263    output_bind_indexes: &[usize],
264    fetch_long_status: bool,
265    exec_options: ExecuteOptions,
266) -> Result<QueryResult> {
267    let mut reader = TtcReader::new(payload);
268    let mut result = QueryResult {
269        columns: previous_columns.to_vec(),
270        more_rows: true,
271        ..QueryResult::default()
272    };
273    let mut bit_vector: Option<Vec<u8>> = None;
274    let mut out_bind_indexes: Vec<usize> = Vec::new();
275    while reader.remaining() > 0 {
276        let message_type = reader.read_u8()?;
277        match message_type {
278            0 => {}
279            TNS_MSG_TYPE_DESCRIBE_INFO => {
280                let _describe_name = reader.read_bytes()?;
281                let previous = std::mem::take(&mut result.columns);
282                parse_describe_info(&mut reader, capabilities, &mut result)?;
283                // re-executing an open cursor whose underlying types changed:
284                // the server re-describes mid-response but still streams the
285                // row data in the adjusted (LONG/LONG RAW) form expected by
286                // the previous fetch metadata (reference `_adjust_metadata`,
287                // impl/thin/messages/base.pyx:820-845, applied during
288                // `_process_describe_info`).
289                for (index, column) in result.columns.iter_mut().enumerate() {
290                    if let Some(prev) = previous.get(index) {
291                        adjust_refetch_metadata(prev, column);
292                    }
293                }
294            }
295            TNS_MSG_TYPE_ROW_HEADER => {
296                bit_vector = parse_row_header(&mut reader)?;
297            }
298            TNS_MSG_TYPE_ROW_DATA => {
299                if result.columns.is_empty() && !out_bind_indexes.is_empty() {
300                    parse_out_bind_row_data(
301                        &mut reader,
302                        &mut result,
303                        bind_columns,
304                        &out_bind_indexes,
305                    )?;
306                } else if result.columns.is_empty() && !output_bind_indexes.is_empty() {
307                    parse_returning_row_data(
308                        &mut reader,
309                        &mut result,
310                        bind_columns,
311                        output_bind_indexes,
312                    )?;
313                } else {
314                    parse_row_data(
315                        &mut reader,
316                        &mut result,
317                        bit_vector.as_deref(),
318                        previous_row,
319                        fetch_long_status,
320                    )?;
321                }
322                bit_vector = None;
323            }
324            TNS_MSG_TYPE_BIT_VECTOR => {
325                bit_vector = Some(parse_bit_vector(&mut reader, result.columns.len())?);
326            }
327            TNS_MSG_TYPE_PARAMETER => {
328                let params =
329                    parse_query_return_parameters(&mut reader, exec_options.arraydmlrowcounts)?;
330                if exec_options.arraydmlrowcounts {
331                    result.array_dml_row_counts = Some(params.row_counts.unwrap_or_default());
332                }
333                if params.query_id.is_some() {
334                    result.query_id = params.query_id;
335                }
336            }
337            TNS_MSG_TYPE_STATUS => {
338                let call_status = reader.read_ub4()?;
339                let _seq = reader.read_ub2()?;
340                result.txn_in_progress = Some(call_status & TNS_EOCS_FLAGS_TXN_IN_PROGRESS != 0);
341            }
342            TNS_MSG_TYPE_IO_VECTOR => {
343                out_bind_indexes = parse_io_vector(&mut reader, bind_columns.len())?
344                    .into_iter()
345                    .filter(|index| !output_bind_indexes.contains(index))
346                    .collect();
347            }
348            TNS_MSG_TYPE_FLUSH_OUT_BINDS => break,
349            TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
350                if let Some(update) = skip_server_side_piggyback(&mut reader)? {
351                    result.sessionless_txn_state = Some(update);
352                }
353            }
354            TNS_MSG_TYPE_IMPLICIT_RESULTSET => {
355                // reference messages/base.pyx `_process_implicit_result`
356                let num_results = reader.read_ub4()?;
357                // `num_results` is read straight off the wire (a ub4, up to
358                // ~4e9); each resultset consumes at least one byte, so cap the
359                // reservation by the bytes left in the payload (BoundedReader).
360                // Without this a hostile server forces a multi-gigabyte
361                // allocation (OOM) before the truncated read in the loop body
362                // fails closed.
363                let mut resultsets: Vec<QueryValue> =
364                    reader.with_capacity_bounded(num_results as usize, 1);
365                for _ in 0..num_results {
366                    let num_bytes = reader.read_u8()?;
367                    reader.skip(usize::from(num_bytes))?;
368                    let mut child = QueryResult::default();
369                    parse_describe_info(&mut reader, capabilities, &mut child)?;
370                    let child_cursor_id = u32::from(reader.read_ub2()?);
371                    resultsets.push(QueryValue::Cursor(Box::new(CursorValue {
372                        columns: child.columns,
373                        cursor_id: child_cursor_id,
374                    })));
375                }
376                result.implicit_resultsets = Some(resultsets);
377            }
378            TNS_MSG_TYPE_END_OF_RESPONSE => break,
379            // pipeline responses open with the token of the operation they
380            // answer (messages/base.pyx:288-293); callers compare it against
381            // the expected token (mismatch -> DPY-2052 at the driver layer)
382            TNS_MSG_TYPE_TOKEN => {
383                result.token_num = Some(reader.read_ub8()?);
384            }
385            TNS_MSG_TYPE_ERROR => {
386                let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
387                // The end-of-call ERROR message (number 0 on success) carries
388                // the end-of-call status; sample the transaction-in-progress bit
389                // (reference protocol.pyx `_process_call_status`).
390                result.txn_in_progress =
391                    Some(info.call_status & TNS_EOCS_FLAGS_TXN_IN_PROGRESS != 0);
392                if info.cursor_id != 0 {
393                    result.cursor_id = u32::from(info.cursor_id);
394                }
395                result.row_count = info.row_count;
396                result.compilation_error_warning |= info.compilation_error_warning;
397                result.last_rowid = info.rowid.clone();
398                if info.number == TNS_ERR_NO_DATA_FOUND && !result.columns.is_empty() {
399                    result.more_rows = false;
400                } else if info.number == TNS_ERR_ARRAY_DML_ERRORS {
401                    // executemany(batcherrors=True): errors are reported via
402                    // the batch error arrays instead of raising ORA-24381
403                    // (reference messages/base.pyx `_process_error_info`).
404                    result.batch_errors = info.batch_errors;
405                } else if info.number != 0 {
406                    let mut details = info.into_details();
407                    details.array_dml_row_counts = result.array_dml_row_counts.take();
408                    return Err(ProtocolError::ServerErrorInfo(Box::new(details)));
409                }
410            }
411            _ => {
412                let position = reader.position().saturating_sub(1);
413                if let Some(message) =
414                    find_embedded_server_error(payload, capabilities.ttc_field_version, position)
415                {
416                    return Err(ProtocolError::ServerError(message));
417                }
418                return Err(ProtocolError::UnknownMessageType {
419                    message_type,
420                    position,
421                });
422            }
423        }
424    }
425    Ok(result)
426}
427
428pub(crate) fn bind_column_metadata(value: &BindValue) -> ColumnMetadata {
429    let (ora_type_num, csfrm, buffer_size) = bind_metadata(value);
430    let object_schema = match value {
431        BindValue::ObjectOutput { schema, .. } | BindValue::ObjectInput { schema, .. } => {
432            Some(schema.clone())
433        }
434        _ => None,
435    };
436    let object_type_name = match value {
437        BindValue::ObjectOutput { type_name, .. } | BindValue::ObjectInput { type_name, .. } => {
438            Some(type_name.clone())
439        }
440        _ => None,
441    };
442    ColumnMetadata {
443        name: String::new(),
444        ora_type_num,
445        csfrm,
446        precision: 0,
447        scale: 0,
448        buffer_size,
449        max_size: buffer_size,
450        nulls_allowed: true,
451        is_json: false,
452        is_oson: false,
453        object_schema,
454        object_type_name,
455        is_array: matches!(value, BindValue::Array { .. }),
456        vector_dimensions: None,
457        vector_format: 0,
458        vector_flags: 0,
459        domain_schema: None,
460        domain_name: None,
461        annotations: None,
462    }
463}
464
465pub(crate) fn parse_io_vector(reader: &mut TtcReader<'_>, bind_count: usize) -> Result<Vec<usize>> {
466    let _flags = reader.read_u8()?;
467    let temp16 = reader.read_ub2()?;
468    let temp32 = reader.read_ub4()?;
469    let num_binds = usize::try_from(temp32)
470        .map_err(|_| ProtocolError::InvalidPacketLength {
471            length: usize::MAX,
472            minimum: 0,
473        })?
474        .checked_mul(256)
475        .and_then(|value| value.checked_add(usize::from(temp16)))
476        .ok_or(ProtocolError::InvalidPacketLength {
477            length: usize::MAX,
478            minimum: 0,
479        })?;
480    let _num_iters_this_time = reader.read_ub4()?;
481    let _uac_buffer_length = reader.read_ub2()?;
482    let fast_fetch_len = reader.read_ub2()?;
483    if fast_fetch_len > 0 {
484        reader.skip(usize::from(fast_fetch_len))?;
485    }
486    let rowid_len = reader.read_ub2()?;
487    if rowid_len > 0 {
488        reader.skip(usize::from(rowid_len))?;
489    }
490    let mut out_indexes = Vec::new();
491    for index in 0..num_binds {
492        let direction = reader.read_u8()?;
493        if index < bind_count && direction != TNS_BIND_DIR_INPUT {
494            out_indexes.push(index);
495        }
496    }
497    Ok(out_indexes)
498}
499
500pub(crate) fn find_embedded_server_error(
501    payload: &[u8],
502    ttc_field_version: u8,
503    position: usize,
504) -> Option<String> {
505    let start = position.saturating_sub(64);
506    for candidate in start..=position {
507        if !matches!(payload.get(candidate).copied(), Some(TNS_MSG_TYPE_ERROR)) {
508            continue;
509        }
510        let mut reader = TtcReader::new(payload.get(candidate + 1..)?);
511        let info = parse_server_error_info(&mut reader, ttc_field_version).ok()?;
512        if info.number != 0 && info.message.starts_with("ORA-") {
513            return Some(info.message);
514        }
515    }
516    None
517}
518
519pub(crate) fn parse_describe_info(
520    reader: &mut TtcReader<'_>,
521    capabilities: ClientCapabilities,
522    result: &mut QueryResult,
523) -> Result<()> {
524    let _max_row_size = reader.read_ub4()?;
525    let num_columns = reader.read_ub4()?;
526    result.columns.clear();
527    if num_columns > 0 {
528        reader.skip(1)?;
529    }
530    for _ in 0..num_columns {
531        result
532            .columns
533            .push(parse_column_metadata(reader, capabilities)?);
534    }
535    let _current_date = reader.read_bytes_with_length()?;
536    let _dcbflag = reader.read_ub4()?;
537    let _dcbmdbz = reader.read_ub4()?;
538    let _dcbmnpr = reader.read_ub4()?;
539    let _dcbmxpr = reader.read_ub4()?;
540    let _dcbqcky = reader.read_bytes_with_length()?;
541    Ok(())
542}
543
544pub(crate) fn parse_column_metadata(
545    reader: &mut TtcReader<'_>,
546    capabilities: ClientCapabilities,
547) -> Result<ColumnMetadata> {
548    let ora_type_num = reader.read_u8()?;
549    reader.skip(1)?;
550    let precision = reader.read_i8()?;
551    let scale = reader.read_i8()?;
552    let buffer_size = reader.read_ub4()?;
553    let _max_array_elements = reader.read_ub4()?;
554    let _cont_flags = reader.read_ub8()?;
555    let _oid = reader.read_bytes_with_length()?;
556    let _version = reader.read_ub2()?;
557    let _charset_id = reader.read_ub2()?;
558    let csfrm = reader.read_u8()?;
559    let mut max_size = reader.read_ub4()?;
560    if ora_type_num == ORA_TYPE_NUM_RAW {
561        max_size = buffer_size;
562    }
563    if capabilities.ttc_field_version >= TNS_CCAP_FIELD_VERSION_12_2 {
564        let _oaccolid = reader.read_ub4()?;
565    }
566    let nulls_allowed = reader.read_u8()? != 0;
567    reader.skip(1)?;
568    let name = reader.read_string_with_length()?.unwrap_or_default();
569    let object_schema = reader.read_string_with_length()?;
570    let object_type_name = reader.read_string_with_length()?;
571    let _column_position = reader.read_ub2()?;
572    let uds_flags = reader.read_ub4()?;
573    let mut domain_schema = None;
574    let mut domain_name = None;
575    let mut annotations: Option<Vec<(String, String)>> = None;
576    if capabilities.ttc_field_version >= TNS_CCAP_FIELD_VERSION_23_1 {
577        domain_schema = reader.read_string_with_length()?;
578        domain_name = reader.read_string_with_length()?;
579    }
580    if capabilities.ttc_field_version >= TNS_CCAP_FIELD_VERSION_23_1_EXT_3 {
581        let num_annotations = reader.read_ub4()?;
582        if num_annotations > 0 {
583            reader.skip(1)?;
584            let num_annotations = reader.read_ub4()?;
585            reader.skip(1)?;
586            // Bound by remaining bytes (BoundedReader): each annotation reads
587            // at least a length-prefixed key/value, so a ub4 count larger than
588            // the payload is a lie that must not pre-allocate gigabytes.
589            let mut collected: Vec<(String, String)> =
590                reader.with_capacity_bounded(num_annotations as usize, 1);
591            for _ in 0..num_annotations {
592                let key = reader.read_string_with_length()?.unwrap_or_default();
593                // A null annotation value is normalized to "" by the reference
594                // driver (python-oracledb base.pyx _process_metadata).
595                let value = reader.read_string_with_length()?.unwrap_or_default();
596                let _flags = reader.read_ub4()?;
597                collected.push((key, value));
598            }
599            let _flags = reader.read_ub4()?;
600            annotations = Some(collected);
601        }
602    }
603    let mut vector_dimensions = None;
604    let mut vector_format = 0u8;
605    let mut vector_flags = 0u8;
606    if capabilities.ttc_field_version >= TNS_CCAP_FIELD_VERSION_23_4 {
607        // reference metadata.pyx: ub4 dimensions, ub1 format, ub1 flags
608        let dims = reader.read_ub4()?;
609        vector_format = reader.read_u8()?;
610        vector_flags = reader.read_u8()?;
611        if ora_type_num == ORA_TYPE_NUM_VECTOR {
612            vector_dimensions = Some(dims);
613        }
614    }
615
616    Ok(ColumnMetadata {
617        name,
618        ora_type_num,
619        csfrm,
620        precision,
621        scale,
622        buffer_size,
623        max_size,
624        nulls_allowed,
625        is_json: uds_flags & TNS_UDS_FLAGS_IS_JSON != 0,
626        is_oson: uds_flags & TNS_UDS_FLAGS_IS_OSON != 0,
627        object_schema,
628        object_type_name,
629        is_array: false,
630        vector_dimensions,
631        vector_format,
632        vector_flags,
633        domain_schema,
634        domain_name,
635        annotations,
636    })
637}
638
639pub(crate) fn parse_row_header(reader: &mut TtcReader<'_>) -> Result<Option<Vec<u8>>> {
640    reader.skip(1)?;
641    let _num_requests = reader.read_ub2()?;
642    let _iteration_number = reader.read_ub4()?;
643    let _num_iters = reader.read_ub4()?;
644    let _buffer_length = reader.read_ub2()?;
645    let num_bytes = reader.read_ub4()?;
646    let bit_vector = if num_bytes > 0 {
647        reader.skip(1)?;
648        Some(reader.read_raw(num_bytes as usize)?.to_vec())
649    } else {
650        None
651    };
652    let _rxhrid = reader.read_bytes_with_length()?;
653    Ok(bit_vector)
654}
655
656pub(crate) fn parse_bit_vector(reader: &mut TtcReader<'_>, num_columns: usize) -> Result<Vec<u8>> {
657    let _num_columns_sent = reader.read_ub2()?;
658    let num_bytes = num_columns.div_ceil(8);
659    Ok(reader.read_raw(num_bytes)?.to_vec())
660}
661
662pub(crate) fn parse_row_data(
663    reader: &mut TtcReader<'_>,
664    result: &mut QueryResult,
665    bit_vector: Option<&[u8]>,
666    previous_row: Option<&[Option<QueryValue>]>,
667    fetch_long_status: bool,
668) -> Result<()> {
669    let mut row = Vec::with_capacity(result.columns.len());
670    for (index, metadata) in result.columns.iter().enumerate() {
671        if is_duplicate_column(bit_vector, index) {
672            let previous = result
673                .rows
674                .last()
675                .map(Vec::as_slice)
676                .or(previous_row)
677                .and_then(|last| last.get(index))
678                .cloned()
679                .ok_or(ProtocolError::TtcDecode(
680                    "duplicate row data without previous row",
681                ))?;
682            row.push(previous);
683            continue;
684        }
685        row.push(parse_column_value(reader, metadata)?);
686        if fetch_long_status
687            && matches!(
688                metadata.ora_type_num,
689                ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW
690            )
691        {
692            let _null_indicator = reader.read_sb4()?;
693            let _return_code = reader.read_ub4()?;
694        }
695    }
696    result.rows.push(row);
697    Ok(())
698}
699
700pub(crate) fn parse_out_bind_row_data(
701    reader: &mut TtcReader<'_>,
702    result: &mut QueryResult,
703    bind_columns: &[ColumnMetadata],
704    out_bind_indexes: &[usize],
705) -> Result<()> {
706    for index in out_bind_indexes {
707        let metadata = bind_columns.get(*index).ok_or(ProtocolError::TtcDecode(
708            "out bind index without bind metadata",
709        ))?;
710        if metadata.is_array {
711            let num_elements = usize::try_from(reader.read_ub4()?).map_err(|_| {
712                ProtocolError::InvalidPacketLength {
713                    length: usize::MAX,
714                    minimum: 0,
715                }
716            })?;
717            // Cap by remaining bytes (BoundedReader): each element consumes
718            // wire data, so a ub4 count cannot legitimately exceed the payload.
719            let mut values: Vec<Option<QueryValue>> = reader.with_capacity_bounded(num_elements, 1);
720            for _ in 0..num_elements {
721                let value = parse_column_value(reader, metadata)?;
722                let actual_num_bytes = reader.read_sb4()?;
723                if actual_num_bytes != 0 && value.is_some() {
724                    return Err(ProtocolError::TtcDecode("truncated array OUT bind value"));
725                }
726                values.push(value);
727            }
728            result
729                .out_values
730                .push((*index, Some(QueryValue::Array(values))));
731            continue;
732        }
733        let value = parse_column_value(reader, metadata)?;
734        let actual_num_bytes = reader.read_sb4()?;
735        if actual_num_bytes != 0 && value.is_some() {
736            return Err(ProtocolError::TtcDecode("truncated OUT bind value"));
737        }
738        result.out_values.push((*index, value));
739    }
740    Ok(())
741}
742
743pub(crate) fn parse_returning_row_data(
744    reader: &mut TtcReader<'_>,
745    result: &mut QueryResult,
746    bind_columns: &[ColumnMetadata],
747    output_bind_indexes: &[usize],
748) -> Result<()> {
749    for index in output_bind_indexes {
750        let metadata = bind_columns.get(*index).ok_or(ProtocolError::TtcDecode(
751            "return bind index without bind metadata",
752        ))?;
753        let num_rows = usize::try_from(reader.read_ub4()?).map_err(|_| {
754            ProtocolError::InvalidPacketLength {
755                length: usize::MAX,
756                minimum: 0,
757            }
758        })?;
759        // Cap by remaining bytes (BoundedReader); see the OOM note above.
760        let mut values: Vec<Option<QueryValue>> = reader.with_capacity_bounded(num_rows, 1);
761        for _ in 0..num_rows {
762            let value = parse_column_value(reader, metadata)?;
763            let actual_num_bytes = reader.read_sb4()?;
764            if actual_num_bytes != 0 && value.is_some() {
765                return Err(ProtocolError::TtcDecode("truncated DML RETURNING value"));
766            }
767            values.push(value);
768        }
769        result.return_values.push((*index, values));
770    }
771    Ok(())
772}
773
774pub(crate) fn is_duplicate_column(bit_vector: Option<&[u8]>, column_num: usize) -> bool {
775    let Some(bit_vector) = bit_vector else {
776        return false;
777    };
778    let byte_num = column_num / 8;
779    let bit_num = column_num % 8;
780    bit_vector
781        .get(byte_num)
782        .is_some_and(|byte| byte & (1 << bit_num) == 0)
783}
784
785pub(crate) fn parse_column_value(
786    reader: &mut TtcReader<'_>,
787    metadata: &ColumnMetadata,
788) -> Result<Option<QueryValue>> {
789    if metadata.buffer_size == 0
790        && !matches!(
791            metadata.ora_type_num,
792            ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW
793        )
794    {
795        return Ok(None);
796    }
797    match metadata.ora_type_num {
798        ORA_TYPE_NUM_VARCHAR | ORA_TYPE_NUM_CHAR | ORA_TYPE_NUM_LONG => {
799            let Some(bytes) = reader.read_bytes()? else {
800                return Ok(None);
801            };
802            match decode_text_value(&bytes, metadata.csfrm) {
803                Ok(value) => Ok(Some(QueryValue::Text(value))),
804                // preserve the raw bytes so the caller can honor the
805                // configured encoding_errors policy (or raise a Python
806                // UnicodeDecodeError as the reference does)
807                Err(ProtocolError::TtcDecode(_)) => Ok(Some(QueryValue::TextRaw {
808                    bytes,
809                    csfrm: metadata.csfrm,
810                })),
811                Err(err) => Err(err),
812            }
813        }
814        ORA_TYPE_NUM_RAW | ORA_TYPE_NUM_LONG_RAW => Ok(reader.read_bytes()?.map(QueryValue::Raw)),
815        ORA_TYPE_NUM_ROWID => parse_rowid_value(reader).map(|value| value.map(QueryValue::Rowid)),
816        ORA_TYPE_NUM_UROWID => parse_urowid_value(reader).map(|value| value.map(QueryValue::Rowid)),
817        ORA_TYPE_NUM_NUMBER | ORA_TYPE_NUM_BINARY_INTEGER => {
818            let Some(bytes) = reader.read_bytes()? else {
819                return Ok(None);
820            };
821            decode_number_value(&bytes).map(Some)
822        }
823        ORA_TYPE_NUM_BINARY_DOUBLE => {
824            let Some(bytes) = reader.read_bytes()? else {
825                return Ok(None);
826            };
827            decode_binary_double(&bytes)
828                .map(|value| Some(QueryValue::BinaryDouble(value.to_string())))
829        }
830        ORA_TYPE_NUM_BINARY_FLOAT => {
831            let Some(bytes) = reader.read_bytes()? else {
832                return Ok(None);
833            };
834            // f64-widened text matches Python float semantics for BINARY_FLOAT
835            decode_binary_float(&bytes)
836                .map(|value| Some(QueryValue::BinaryDouble(f64::from(value).to_string())))
837        }
838        ORA_TYPE_NUM_BOOLEAN => {
839            let Some(bytes) = reader.read_bytes()? else {
840                return Ok(None);
841            };
842            // reference read_bool: last byte == 1 means true; native
843            // DB_TYPE_BOOLEAN surfaces as a Python bool.
844            let is_true = matches!(bytes.last(), Some(&1));
845            Ok(Some(QueryValue::Boolean(is_true)))
846        }
847        ORA_TYPE_NUM_INTERVAL_DS => {
848            let Some(bytes) = reader.read_bytes()? else {
849                return Ok(None);
850            };
851            decode_interval_ds(&bytes).map(Some)
852        }
853        ORA_TYPE_NUM_INTERVAL_YM => {
854            let Some(bytes) = reader.read_bytes()? else {
855                return Ok(None);
856            };
857            decode_interval_ym(&bytes).map(Some)
858        }
859        ORA_TYPE_NUM_DATE
860        | ORA_TYPE_NUM_TIMESTAMP
861        | ORA_TYPE_NUM_TIMESTAMP_LTZ
862        | ORA_TYPE_NUM_TIMESTAMP_TZ => {
863            let Some(bytes) = reader.read_bytes()? else {
864                return Ok(None);
865            };
866            decode_datetime_value(&bytes).map(Some)
867        }
868        ORA_TYPE_NUM_CLOB | ORA_TYPE_NUM_BLOB | ORA_TYPE_NUM_BFILE => {
869            parse_lob_value(reader, metadata)
870        }
871        ORA_TYPE_NUM_VECTOR => parse_vector_value(reader),
872        ORA_TYPE_NUM_JSON => parse_json_value(reader),
873        ORA_TYPE_NUM_CURSOR => parse_cursor_value(reader).map(Some),
874        ORA_TYPE_NUM_OBJECT => parse_object_value(reader, metadata),
875        _ => Err(ProtocolError::UnsupportedFeature("query column type")),
876    }
877}
878
879/// A column value decoded in pass 1 of the borrowed row decode. Scalar values
880/// that borrow the wire buffer are held directly; values that need a small
881/// owned arena (synthesized `Number` text, or a cold owned [`QueryValue`]) are
882/// recorded as a deferred handle into the per-row arena and resolved in pass 2
883/// once the arena is frozen. This two-pass split is what keeps the borrowed
884/// path sound under `#![forbid(unsafe_code)]`: no `&str`/`&[u8]` is ever held
885/// into an arena that is still being grown.
886enum ColumnSlot<'buf> {
887    /// SQL NULL.
888    Null,
889    /// A value that borrows the wire buffer (or is a small `Copy` value).
890    Wire(QueryValueRef<'buf>),
891    /// A `NUMBER` whose canonical text lives at `arena[range]` in the per-row
892    /// number-text arena.
893    Number {
894        range: core::ops::Range<usize>,
895        is_integer: bool,
896    },
897    /// A cold / non-borrowable value parked at `owned[index]` in the per-row
898    /// owned arena.
899    Owned(usize),
900}
901
902/// Decode one column into a [`ColumnSlot`], borrowing the wire buffer for the
903/// hot scalar cases and appending to the per-row arenas for the deferred ones.
904/// Mirrors [`parse_column_value`] type-for-type; the produced owned value (via
905/// [`QueryValueRef::to_owned_value`]) is identical to the owned path.
906///
907/// `digits` is a caller-owned scratch buffer reused across all cells so the
908/// per-cell `NUMBER` decode allocates nothing of its own (it writes straight
909/// into `number_arena`). The hot scalar grid (Text/Raw) borrows the wire buffer
910/// directly with zero allocation.
911fn parse_column_slot<'buf>(
912    reader: &mut TtcReader<'buf>,
913    metadata: &ColumnMetadata,
914    number_arena: &mut String,
915    owned_arena: &mut Vec<QueryValue>,
916    digits: &mut Vec<u8>,
917) -> Result<ColumnSlot<'buf>> {
918    // Park an owned QueryValue in the arena and return the deferred slot. Used
919    // for the cold / non-borrowable variants so the hot grid stays borrowed.
920    fn park(owned_arena: &mut Vec<QueryValue>, value: Option<QueryValue>) -> ColumnSlot<'static> {
921        match value {
922            None => ColumnSlot::Null,
923            Some(value) => {
924                owned_arena.push(value);
925                ColumnSlot::Owned(owned_arena.len() - 1)
926            }
927        }
928    }
929
930    if metadata.buffer_size == 0
931        && !matches!(
932            metadata.ora_type_num,
933            ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW
934        )
935    {
936        return Ok(ColumnSlot::Null);
937    }
938    match metadata.ora_type_num {
939        ORA_TYPE_NUM_VARCHAR | ORA_TYPE_NUM_CHAR | ORA_TYPE_NUM_LONG => {
940            match reader.read_bytes_borrowed()? {
941                BorrowedBytes::Null => Ok(ColumnSlot::Null),
942                // Borrow the wire bytes directly when they are valid UTF-8 and
943                // not the UTF-16 NCHAR form (which needs re-encoding). Zero copy.
944                BorrowedBytes::Slice(slice) if metadata.csfrm != CS_FORM_NCHAR => {
945                    match core::str::from_utf8(slice) {
946                        Ok(text) => Ok(ColumnSlot::Wire(QueryValueRef::Text(text))),
947                        Err(_) => Ok(park(
948                            owned_arena,
949                            Some(QueryValue::TextRaw {
950                                bytes: slice.to_vec(),
951                                csfrm: metadata.csfrm,
952                            }),
953                        )),
954                    }
955                }
956                // NCHAR (UTF-16) or chunked long text: fall back to the owned
957                // decode, which re-encodes to UTF-8 / reassembles chunks.
958                other => {
959                    let bytes = other.into_vec();
960                    let value = match decode_text_value(&bytes, metadata.csfrm) {
961                        Ok(text) => QueryValue::Text(text),
962                        Err(ProtocolError::TtcDecode(_)) => QueryValue::TextRaw {
963                            bytes,
964                            csfrm: metadata.csfrm,
965                        },
966                        Err(err) => return Err(err),
967                    };
968                    Ok(park(owned_arena, Some(value)))
969                }
970            }
971        }
972        ORA_TYPE_NUM_RAW | ORA_TYPE_NUM_LONG_RAW => match reader.read_bytes_borrowed()? {
973            BorrowedBytes::Null => Ok(ColumnSlot::Null),
974            BorrowedBytes::Slice(slice) => Ok(ColumnSlot::Wire(QueryValueRef::Raw(slice))),
975            BorrowedBytes::Chunked(bytes) => Ok(park(owned_arena, Some(QueryValue::Raw(bytes)))),
976        },
977        ORA_TYPE_NUM_NUMBER | ORA_TYPE_NUM_BINARY_INTEGER => {
978            // The wire NUMBER is binary; its canonical decimal text is
979            // synthesized, so it cannot be borrowed from the buffer. We
980            // synthesize it *directly* into the per-row number arena (reusing the
981            // `digits` scratch), borrowing from the arena — zero per-cell heap
982            // allocation for the common in-range NUMBER.
983            with_small_bytes(reader, |bytes| match bytes {
984                None => Ok(ColumnSlot::Null),
985                Some(bytes) => {
986                    let start = number_arena.len();
987                    let is_integer = decode_number_text_into(bytes, digits, number_arena)?;
988                    Ok(ColumnSlot::Number {
989                        range: start..number_arena.len(),
990                        is_integer,
991                    })
992                }
993            })
994        }
995        ORA_TYPE_NUM_BOOLEAN => with_small_bytes(reader, |bytes| match bytes {
996            None => Ok(ColumnSlot::Null),
997            Some(bytes) => Ok(ColumnSlot::Wire(QueryValueRef::Boolean(matches!(
998                bytes.last(),
999                Some(&1)
1000            )))),
1001        }),
1002        ORA_TYPE_NUM_INTERVAL_DS => with_small_bytes(reader, |bytes| match bytes {
1003            None => Ok(ColumnSlot::Null),
1004            Some(bytes) => match decode_interval_ds(bytes)? {
1005                QueryValue::IntervalDS {
1006                    days,
1007                    hours,
1008                    minutes,
1009                    seconds,
1010                    fseconds,
1011                } => Ok(ColumnSlot::Wire(QueryValueRef::IntervalDS {
1012                    days,
1013                    hours,
1014                    minutes,
1015                    seconds,
1016                    fseconds,
1017                })),
1018                other => Ok(park(owned_arena, Some(other))),
1019            },
1020        }),
1021        ORA_TYPE_NUM_INTERVAL_YM => with_small_bytes(reader, |bytes| match bytes {
1022            None => Ok(ColumnSlot::Null),
1023            Some(bytes) => match decode_interval_ym(bytes)? {
1024                QueryValue::IntervalYM { years, months } => {
1025                    Ok(ColumnSlot::Wire(QueryValueRef::IntervalYM {
1026                        years,
1027                        months,
1028                    }))
1029                }
1030                other => Ok(park(owned_arena, Some(other))),
1031            },
1032        }),
1033        ORA_TYPE_NUM_DATE
1034        | ORA_TYPE_NUM_TIMESTAMP
1035        | ORA_TYPE_NUM_TIMESTAMP_LTZ
1036        | ORA_TYPE_NUM_TIMESTAMP_TZ => with_small_bytes(reader, |bytes| match bytes {
1037            None => Ok(ColumnSlot::Null),
1038            Some(bytes) => match decode_datetime_value(bytes)? {
1039                QueryValue::DateTime {
1040                    year,
1041                    month,
1042                    day,
1043                    hour,
1044                    minute,
1045                    second,
1046                    nanosecond,
1047                } => Ok(ColumnSlot::Wire(QueryValueRef::DateTime {
1048                    year,
1049                    month,
1050                    day,
1051                    hour,
1052                    minute,
1053                    second,
1054                    nanosecond,
1055                })),
1056                other => Ok(park(owned_arena, Some(other))),
1057            },
1058        }),
1059        // Everything else (Rowid, BinaryDouble/Float, Clob/Blob/Bfile, Vector,
1060        // Json, Cursor, Object, UROWID) goes through the owned decode and is
1061        // parked in the owned arena. These are the cold / non-borrowable cases.
1062        _ => {
1063            let value = parse_column_value(reader, metadata)?;
1064            Ok(park(owned_arena, value))
1065        }
1066    }
1067}
1068
1069/// Read one TTC byte field and hand the body to `f` as a borrowed `&[u8]`
1070/// without allocating in the common contiguous case. `None` is SQL NULL. The
1071/// rare chunked long form is reassembled into a temporary `Vec` (these small
1072/// fixed-size scalar types — number/boolean/interval/datetime — are never sent
1073/// chunked in practice, so this fallback is effectively dead weight).
1074fn with_small_bytes<'buf, T>(
1075    reader: &mut TtcReader<'buf>,
1076    f: impl FnOnce(Option<&[u8]>) -> Result<T>,
1077) -> Result<T> {
1078    match reader.read_bytes_borrowed()? {
1079        BorrowedBytes::Null => f(None),
1080        BorrowedBytes::Slice(slice) => f(Some(slice)),
1081        BorrowedBytes::Chunked(owned) => f(Some(&owned)),
1082    }
1083}
1084
1085impl BorrowedBytes<'_> {
1086    /// Reassemble into an owned `Vec` (zero-copy `Slice` still copies; `Chunked`
1087    /// reuses its already-owned `Vec`). Used by the non-borrowable text fallback.
1088    fn into_vec(self) -> Vec<u8> {
1089        match self {
1090            BorrowedBytes::Null => Vec::new(),
1091            BorrowedBytes::Slice(slice) => slice.to_vec(),
1092            BorrowedBytes::Chunked(owned) => owned,
1093        }
1094    }
1095}
1096
1097/// A decoded fetch batch that **owns** the wire response buffer and column
1098/// metadata, and yields rows of borrowed [`QueryValueRef`] that point straight
1099/// into that buffer. This is the zero-copy fetch fast path: the common scalar
1100/// grid is decoded with no per-cell allocation.
1101///
1102/// ## Soundness
1103///
1104/// The buffer is owned by the batch and outlives every borrowed row: rows are
1105/// only ever surfaced *inside* the [`for_each_row_ref`](Self::for_each_row_ref)
1106/// callback, whose `&[QueryValueRef]` argument cannot escape (its lifetime is
1107/// bound to the call). The borrow checker therefore guarantees no
1108/// `QueryValueRef` can dangle — there is no self-referential struct and no
1109/// `unsafe`. `Number` text and the cold values borrow per-row arenas that are
1110/// fully built (pass 1) before any reference into them is taken (pass 2), so an
1111/// arena is never grown while borrowed.
1112#[derive(Clone, Debug)]
1113pub struct BorrowedRowBatch {
1114    buffer: Vec<u8>,
1115    columns: Vec<ColumnMetadata>,
1116    /// Byte offset into `buffer` where each row's column values begin.
1117    row_starts: Vec<usize>,
1118    /// Per-row duplicate-column bit vector (server row-compression). `None` (or
1119    /// an absent entry) means every column is present on the wire for that row.
1120    /// A zero bit marks a duplicate column whose value repeats the previous
1121    /// row's value and carries no wire bytes (reference `bit_vector`).
1122    row_bit_vectors: Vec<Option<Vec<u8>>>,
1123    /// Whether this batch carried `LONG`/`LONG RAW` status trailers after each
1124    /// such column (the fetch path sets this; the plain execute path does not).
1125    fetch_long_status: bool,
1126    /// The caller's previous (owned) row, used to resolve duplicate columns in
1127    /// the *first* compressed row of the batch (whose duplicates repeat the row
1128    /// that ended the prior page). `None` outside the compressed-fetch case.
1129    previous_row_seed: Option<Vec<Option<QueryValue>>>,
1130}
1131
1132impl BorrowedRowBatch {
1133    /// Construct a batch from an owned wire `buffer`, the `columns` describing
1134    /// each cell, and the per-row start offsets into `buffer`. Use this for
1135    /// batches with no duplicate-column compression and no LONG trailers (the
1136    /// common synthetic / test case); the framing-aware
1137    /// [`parse_query_response_borrowed`] builds the full form.
1138    pub fn new(buffer: Vec<u8>, columns: Vec<ColumnMetadata>, row_starts: Vec<usize>) -> Self {
1139        Self {
1140            buffer,
1141            columns,
1142            row_starts,
1143            row_bit_vectors: Vec::new(),
1144            fetch_long_status: false,
1145            previous_row_seed: None,
1146        }
1147    }
1148
1149    /// Number of rows in the batch.
1150    pub fn row_count(&self) -> usize {
1151        self.row_starts.len()
1152    }
1153
1154    /// The columns describing each cell.
1155    pub fn columns(&self) -> &[ColumnMetadata] {
1156        &self.columns
1157    }
1158
1159    /// The address range of the owned buffer, for tests asserting that borrowed
1160    /// scalar cells truly point into it (zero-copy).
1161    #[cfg(test)]
1162    pub fn buffer_ptr_range(&self) -> core::ops::Range<usize> {
1163        let start = self.buffer.as_ptr() as usize;
1164        start..start + self.buffer.len()
1165    }
1166
1167    /// Decode each row and invoke `callback` with the row's borrowed cells. The
1168    /// `&[Option<QueryValueRef>]` slice borrows the batch buffer and per-row
1169    /// arenas; it is valid only for the duration of the call (it cannot escape).
1170    /// `None` cells are SQL NULL. Returns the first decode/callback error.
1171    ///
1172    /// Generic over the callback's error type `E` (any error a decode failure
1173    /// can convert into, e.g. the driver crate's own error) so callers are not
1174    /// forced through [`ProtocolError`]; a decode failure is surfaced via
1175    /// `E: From<ProtocolError>`.
1176    pub fn for_each_row_ref<F, E>(&self, mut callback: F) -> std::result::Result<(), E>
1177    where
1178        F: FnMut(&[Option<QueryValueRef<'_>>]) -> std::result::Result<(), E>,
1179        E: From<ProtocolError>,
1180    {
1181        // The two arenas are reused across rows (cleared, not reallocated). They
1182        // are mutated only in pass 1; pass 2 borrows them immutably and that
1183        // borrow is confined to a single loop iteration (it ends before the next
1184        // iteration's `clear()`), which is what keeps the borrow checker — and
1185        // soundness — happy.
1186        let mut number_arena = String::new();
1187        let mut owned_arena: Vec<QueryValue> = Vec::new();
1188        // Reusable scratch: `digits` for the NUMBER decode, `slots` for pass 1,
1189        // `row` for the borrowed cells handed to the callback. All cleared and
1190        // reused across rows so the steady-state per-row decode allocates only
1191        // when an arena/scratch genuinely grows (amortized). `slots`/`row`/
1192        // `digits` borrow nothing across iterations beyond the stable buffer.
1193        let mut digits: Vec<u8> = Vec::new();
1194        let mut slots: Vec<Option<ColumnSlot<'_>>> = Vec::with_capacity(self.columns.len());
1195        // Owned snapshot of the previous row, used only to resolve duplicate
1196        // (bit-vector-compressed) columns, which carry no wire bytes. Empty when
1197        // the batch has no bit vectors (the common case), so it costs nothing.
1198        // Seeded from the caller's prior-page row for the first compressed row.
1199        let mut previous_owned: Vec<Option<QueryValue>> =
1200            self.previous_row_seed.clone().unwrap_or_default();
1201        let uses_bit_vectors = !self.row_bit_vectors.is_empty();
1202
1203        for (row_index, &start) in self.row_starts.iter().enumerate() {
1204            number_arena.clear();
1205            owned_arena.clear();
1206            slots.clear();
1207            let bit_vector = self
1208                .row_bit_vectors
1209                .get(row_index)
1210                .and_then(|bv| bv.as_deref());
1211
1212            // Pass 1: decode all columns, growing the per-row arenas. `slots`
1213            // borrows only the buffer (the deferred Number/Owned slots hold a
1214            // range/index into the arenas, never a borrow of them). Duplicate
1215            // columns carry no wire bytes — their owned previous value is parked
1216            // in the owned arena.
1217            let mut reader = TtcReader::new(&self.buffer[start..]);
1218            for (index, metadata) in self.columns.iter().enumerate() {
1219                if is_duplicate_column(bit_vector, index) {
1220                    let previous = previous_owned.get(index).and_then(Option::as_ref);
1221                    match previous {
1222                        None => slots.push(None),
1223                        Some(value) => {
1224                            owned_arena.push(value.clone());
1225                            slots.push(Some(ColumnSlot::Owned(owned_arena.len() - 1)));
1226                        }
1227                    }
1228                    continue;
1229                }
1230                let slot = parse_column_slot(
1231                    &mut reader,
1232                    metadata,
1233                    &mut number_arena,
1234                    &mut owned_arena,
1235                    &mut digits,
1236                )?;
1237                slots.push(match slot {
1238                    ColumnSlot::Null => None,
1239                    other => Some(other),
1240                });
1241                if self.fetch_long_status
1242                    && matches!(
1243                        metadata.ora_type_num,
1244                        ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW
1245                    )
1246                {
1247                    let _null_indicator = reader.read_sb4()?;
1248                    let _return_code = reader.read_ub4()?;
1249                }
1250            }
1251
1252            // Pass 2: arenas are now frozen — resolve deferred slots into
1253            // borrowed refs. No arena is mutated here, so the borrows are sound.
1254            // `row` is allocated per row: it carries the per-row arena lifetime,
1255            // which (unlike `slots`/`digits`, that borrow only the stable buffer)
1256            // cannot be reused across an arena `clear()`. This is the single
1257            // remaining per-row allocation, versus the owned path's per-row Vec
1258            // *plus* a String per scalar cell.
1259            let row: Vec<Option<QueryValueRef<'_>>> = slots
1260                .iter()
1261                .map(|slot| {
1262                    slot.as_ref().map(|slot| match *slot {
1263                        ColumnSlot::Null => unreachable!("Null slots are stored as None"),
1264                        ColumnSlot::Wire(value) => value,
1265                        ColumnSlot::Number {
1266                            ref range,
1267                            is_integer,
1268                        } => QueryValueRef::Number {
1269                            text: &number_arena[range.clone()],
1270                            is_integer,
1271                        },
1272                        ColumnSlot::Owned(index) => QueryValueRef::Owned(&owned_arena[index]),
1273                    })
1274                })
1275                .collect();
1276
1277            callback(&row)?;
1278
1279            // Snapshot the just-emitted row as owned values for the next row's
1280            // duplicate-column resolution — but only when the batch actually uses
1281            // bit-vector compression, so the zero-copy common path pays nothing.
1282            if uses_bit_vectors {
1283                previous_owned.clear();
1284                previous_owned.extend(row.iter().map(|cell| cell.map(|v| v.to_owned_value())));
1285            }
1286        }
1287        Ok(())
1288    }
1289}
1290
1291/// The borrowed counterpart of a fetched [`QueryResult`]: a [`BorrowedRowBatch`]
1292/// of zero-copy rows plus the response-level fields a caller needs to page and
1293/// finalize the cursor. Produced by [`parse_query_response_borrowed`].
1294#[derive(Clone, Debug)]
1295pub struct BorrowedFetchResult {
1296    /// The decoded rows, borrowing the response buffer.
1297    pub batch: BorrowedRowBatch,
1298    /// Whether the server reports more rows for this cursor.
1299    pub more_rows: bool,
1300    /// Server cursor id (for paging / release).
1301    pub cursor_id: u32,
1302    /// Total affected/processed row count from the end-of-call error message.
1303    pub row_count: u64,
1304}
1305
1306/// Walk a fetch/query response payload and produce a [`BorrowedFetchResult`]
1307/// whose rows borrow `payload` (the caller must keep the owned buffer alive —
1308/// [`BorrowedRowBatch`] owns it). This is the zero-copy companion to
1309/// [`parse_fetch_response_with_context`]: it walks the exact same message
1310/// framing (DESCRIBE_INFO / ROW_HEADER / BIT_VECTOR / ROW_DATA / ERROR /
1311/// END_OF_RESPONSE) but, instead of materializing owned rows, records each
1312/// row's byte offset and bit vector so [`BorrowedRowBatch::for_each_row_ref`]
1313/// can decode them lazily and without per-cell allocation.
1314///
1315/// Scope: the plain query-row case (the fetch path). Out-bind / DML-returning
1316/// rows are not part of a fetch response and are left to the owned path.
1317pub fn parse_query_response_borrowed(
1318    payload: &[u8],
1319    capabilities: ClientCapabilities,
1320    columns: &[ColumnMetadata],
1321    previous_row: Option<&[Option<QueryValue>]>,
1322) -> Result<BorrowedFetchResult> {
1323    let mut reader = TtcReader::new(payload);
1324    let mut result_columns = columns.to_vec();
1325    let mut more_rows = true;
1326    let mut cursor_id = 0u32;
1327    let mut row_count = 0u64;
1328    let mut row_starts: Vec<usize> = Vec::new();
1329    let mut row_bit_vectors: Vec<Option<Vec<u8>>> = Vec::new();
1330    let mut any_bit_vector = false;
1331    let mut pending_bit_vector: Option<Vec<u8>> = None;
1332    // The fetch path always consumes LONG/LONG RAW status trailers.
1333    let fetch_long_status = true;
1334
1335    while reader.remaining() > 0 {
1336        let message_type = reader.read_u8()?;
1337        match message_type {
1338            0 => {}
1339            TNS_MSG_TYPE_DESCRIBE_INFO => {
1340                let _describe_name = reader.read_bytes()?;
1341                let previous = std::mem::take(&mut result_columns);
1342                let mut described = QueryResult::default();
1343                parse_describe_info(&mut reader, capabilities, &mut described)?;
1344                result_columns = described.columns;
1345                for (index, column) in result_columns.iter_mut().enumerate() {
1346                    if let Some(prev) = previous.get(index) {
1347                        adjust_refetch_metadata(prev, column);
1348                    }
1349                }
1350            }
1351            TNS_MSG_TYPE_ROW_HEADER => {
1352                pending_bit_vector = parse_row_header(&mut reader)?;
1353            }
1354            TNS_MSG_TYPE_BIT_VECTOR => {
1355                pending_bit_vector = Some(parse_bit_vector(&mut reader, result_columns.len())?);
1356            }
1357            TNS_MSG_TYPE_ROW_DATA => {
1358                // Record where this row's column values begin, then advance the
1359                // reader past the row (skipping, not materializing).
1360                row_starts.push(reader.position());
1361                let bit_vector = pending_bit_vector.take();
1362                any_bit_vector |= bit_vector.is_some();
1363                row_bit_vectors.push(bit_vector.clone());
1364                skip_row_data(
1365                    &mut reader,
1366                    &result_columns,
1367                    bit_vector.as_deref(),
1368                    fetch_long_status,
1369                )?;
1370            }
1371            TNS_MSG_TYPE_PARAMETER => {
1372                let _params = parse_query_return_parameters(&mut reader, false)?;
1373            }
1374            TNS_MSG_TYPE_STATUS => {
1375                let _call_status = reader.read_ub4()?;
1376                let _seq = reader.read_ub2()?;
1377            }
1378            TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
1379                let _ = skip_server_side_piggyback(&mut reader)?;
1380            }
1381            TNS_MSG_TYPE_FLUSH_OUT_BINDS | TNS_MSG_TYPE_END_OF_RESPONSE => break,
1382            TNS_MSG_TYPE_TOKEN => {
1383                let _token = reader.read_ub8()?;
1384            }
1385            TNS_MSG_TYPE_IMPLICIT_RESULTSET => {
1386                // Mirror the owned parser's framing walk so the reader advances
1387                // past the implicit-resultset block identically (the borrowed
1388                // fetch API does not surface child cursors, but it must still
1389                // consume the bytes). reference messages/base.pyx
1390                // `_process_implicit_result`.
1391                let num_results = reader.read_ub4()?;
1392                for _ in 0..num_results.min(reader.remaining() as u32) {
1393                    let num_bytes = reader.read_u8()?;
1394                    reader.skip(usize::from(num_bytes))?;
1395                    let mut child = QueryResult::default();
1396                    parse_describe_info(&mut reader, capabilities, &mut child)?;
1397                    let _child_cursor_id = reader.read_ub2()?;
1398                }
1399            }
1400            TNS_MSG_TYPE_ERROR => {
1401                let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
1402                if info.cursor_id != 0 {
1403                    cursor_id = u32::from(info.cursor_id);
1404                }
1405                row_count = info.row_count;
1406                if info.number == TNS_ERR_NO_DATA_FOUND && !result_columns.is_empty() {
1407                    more_rows = false;
1408                } else if info.number != 0 && info.number != TNS_ERR_ARRAY_DML_ERRORS {
1409                    return Err(ProtocolError::ServerErrorInfo(Box::new(
1410                        info.into_details(),
1411                    )));
1412                }
1413            }
1414            _ => {
1415                let position = reader.position().saturating_sub(1);
1416                if let Some(message) =
1417                    find_embedded_server_error(payload, capabilities.ttc_field_version, position)
1418                {
1419                    return Err(ProtocolError::ServerError(message));
1420                }
1421                return Err(ProtocolError::UnknownMessageType {
1422                    message_type,
1423                    position,
1424                });
1425            }
1426        }
1427    }
1428
1429    // If the batch never used duplicate-column compression, drop the per-row
1430    // bit-vector vector so iteration takes the zero-copy fast path (no owned
1431    // previous-row snapshotting).
1432    if !any_bit_vector {
1433        row_bit_vectors.clear();
1434    }
1435
1436    let batch = BorrowedRowBatch {
1437        buffer: payload.to_vec(),
1438        columns: result_columns,
1439        row_starts,
1440        row_bit_vectors,
1441        fetch_long_status,
1442        // Seed the first compressed row's duplicate resolution from the caller's
1443        // prior-page row (only consulted when the batch uses bit vectors).
1444        previous_row_seed: any_bit_vector.then(|| {
1445            previous_row
1446                .map(<[Option<QueryValue>]>::to_vec)
1447                .unwrap_or_default()
1448        }),
1449    };
1450
1451    Ok(BorrowedFetchResult {
1452        batch,
1453        more_rows,
1454        cursor_id,
1455        row_count,
1456    })
1457}
1458
1459/// Advance `reader` past one ROW_DATA row **without materializing owned values**
1460/// — this is the offset-capture pass, so it must allocate nothing for the hot
1461/// scalar grid. Mirrors [`parse_row_data`]'s consumption exactly: duplicate
1462/// (bit-vector) columns carry no wire bytes and are skipped; the hot byte-field
1463/// scalar types are skipped with a zero-allocation length-prefixed skip; the
1464/// rare cold types (LOB / Vector / JSON / Cursor / Object / ROWID), whose wire
1465/// framing is non-trivial, fall back to [`parse_column_value`] (which may
1466/// allocate, but those are uncommon). `LONG`/`LONG RAW` status trailers are
1467/// consumed when `fetch_long_status`.
1468fn skip_row_data(
1469    reader: &mut TtcReader<'_>,
1470    columns: &[ColumnMetadata],
1471    bit_vector: Option<&[u8]>,
1472    fetch_long_status: bool,
1473) -> Result<()> {
1474    for (index, metadata) in columns.iter().enumerate() {
1475        if is_duplicate_column(bit_vector, index) {
1476            continue;
1477        }
1478        let consumed_byte_field = metadata.buffer_size != 0
1479            && matches!(
1480                metadata.ora_type_num,
1481                ORA_TYPE_NUM_VARCHAR
1482                    | ORA_TYPE_NUM_CHAR
1483                    | ORA_TYPE_NUM_LONG
1484                    | ORA_TYPE_NUM_RAW
1485                    | ORA_TYPE_NUM_LONG_RAW
1486                    | ORA_TYPE_NUM_NUMBER
1487                    | ORA_TYPE_NUM_BINARY_INTEGER
1488                    | ORA_TYPE_NUM_BINARY_DOUBLE
1489                    | ORA_TYPE_NUM_BINARY_FLOAT
1490                    | ORA_TYPE_NUM_BOOLEAN
1491                    | ORA_TYPE_NUM_INTERVAL_DS
1492                    | ORA_TYPE_NUM_INTERVAL_YM
1493                    | ORA_TYPE_NUM_DATE
1494                    | ORA_TYPE_NUM_TIMESTAMP
1495                    | ORA_TYPE_NUM_TIMESTAMP_LTZ
1496                    | ORA_TYPE_NUM_TIMESTAMP_TZ
1497            );
1498        if consumed_byte_field {
1499            reader.skip_bytes_field()?;
1500        } else {
1501            // Cold / non-byte-field type, or a zero-buffer-size column: defer to
1502            // the full owned decode purely to advance the reader correctly.
1503            let _ = parse_column_value(reader, metadata)?;
1504        }
1505        if fetch_long_status
1506            && matches!(
1507                metadata.ora_type_num,
1508                ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW
1509            )
1510        {
1511            let _null_indicator = reader.read_sb4()?;
1512            let _return_code = reader.read_ub4()?;
1513        }
1514    }
1515    Ok(())
1516}
1517
1518pub(crate) fn encode_rowid_component(mut value: u32, size: usize, output: &mut String) {
1519    let mut encoded = vec![b'A'; size];
1520    for index in 0..size {
1521        let alphabet_index = usize::try_from(value & 0x3f).unwrap_or(0);
1522        encoded[size - index - 1] = TNS_BASE64_ALPHABET[alphabet_index];
1523        value >>= 6;
1524    }
1525    output.extend(encoded.into_iter().map(char::from));
1526}
1527
1528pub(crate) fn encode_physical_rowid(
1529    rba: u32,
1530    partition_id: u16,
1531    block_num: u32,
1532    slot_num: u16,
1533) -> String {
1534    let mut output = String::with_capacity(ORA_TYPE_SIZE_ROWID as usize);
1535    encode_rowid_component(rba, 6, &mut output);
1536    encode_rowid_component(u32::from(partition_id), 3, &mut output);
1537    encode_rowid_component(block_num, 6, &mut output);
1538    encode_rowid_component(u32::from(slot_num), 3, &mut output);
1539    output
1540}
1541
1542pub(crate) fn parse_rowid_value(reader: &mut TtcReader<'_>) -> Result<Option<String>> {
1543    let len = reader.read_u8()?;
1544    if len == 0 || len == crate::wire::TNS_NULL_LENGTH_INDICATOR {
1545        return Ok(None);
1546    }
1547    let rba = reader.read_ub4()?;
1548    let partition_id = reader.read_ub2()?;
1549    reader.skip(1)?;
1550    let block_num = reader.read_ub4()?;
1551    let slot_num = reader.read_ub2()?;
1552    Ok(Some(encode_physical_rowid(
1553        rba,
1554        partition_id,
1555        block_num,
1556        slot_num,
1557    )))
1558}
1559
1560pub(crate) fn encode_logical_urowid(bytes: &[u8]) -> String {
1561    let mut input_offset = 1;
1562    let mut input_len = bytes.len().saturating_sub(1);
1563    let mut output = String::with_capacity((bytes.len() / 3) * 4 + 4);
1564    output.push('*');
1565    while input_len > 0 {
1566        let mut pos = bytes[input_offset] >> 2;
1567        output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1568
1569        pos = (bytes[input_offset] & 0x03) << 4;
1570        if input_len == 1 {
1571            output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1572            break;
1573        }
1574        input_offset += 1;
1575        pos |= (bytes[input_offset] & 0xf0) >> 4;
1576        output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1577
1578        pos = (bytes[input_offset] & 0x0f) << 2;
1579        if input_len == 2 {
1580            output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1581            break;
1582        }
1583        input_offset += 1;
1584        pos |= (bytes[input_offset] & 0xc0) >> 6;
1585        output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1586
1587        pos = bytes[input_offset] & 0x3f;
1588        output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1589        input_offset += 1;
1590        input_len -= 3;
1591    }
1592    output
1593}
1594
1595pub(crate) fn parse_urowid_value(reader: &mut TtcReader<'_>) -> Result<Option<String>> {
1596    if reader.read_bytes()?.is_none() {
1597        return Ok(None);
1598    }
1599    let Some(bytes) = reader.read_bytes()? else {
1600        return Ok(None);
1601    };
1602    if bytes.len() < 13 {
1603        return Err(ProtocolError::TtcDecode("encoded UROWID too short"));
1604    }
1605    if bytes[0] == 1 {
1606        let rba = u32::from_be_bytes([bytes[1], bytes[2], bytes[3], bytes[4]]);
1607        let partition_id = u16::from_be_bytes([bytes[5], bytes[6]]);
1608        let block_num = u32::from_be_bytes([bytes[7], bytes[8], bytes[9], bytes[10]]);
1609        let slot_num = u16::from_be_bytes([bytes[11], bytes[12]]);
1610        Ok(Some(encode_physical_rowid(
1611            rba,
1612            partition_id,
1613            block_num,
1614            slot_num,
1615        )))
1616    } else {
1617        Ok(Some(encode_logical_urowid(&bytes)))
1618    }
1619}
1620
1621pub(crate) fn parse_lob_value(
1622    reader: &mut TtcReader<'_>,
1623    metadata: &ColumnMetadata,
1624) -> Result<Option<QueryValue>> {
1625    let num_bytes = reader.read_ub4()?;
1626    if num_bytes == 0 {
1627        return Ok(None);
1628    }
1629    let (size, chunk_size) = if matches!(metadata.ora_type_num, ORA_TYPE_NUM_BFILE) {
1630        (0, 0)
1631    } else {
1632        (reader.read_ub8()?, reader.read_ub4()?)
1633    };
1634    let Some(locator) = reader.read_bytes()? else {
1635        return Ok(None);
1636    };
1637    Ok(Some(QueryValue::Lob(Box::new(LobValue {
1638        ora_type_num: metadata.ora_type_num,
1639        csfrm: metadata.csfrm,
1640        locator,
1641        size,
1642        chunk_size,
1643    }))))
1644}
1645
1646/// Reads a VECTOR value (reference `ReadBuffer.read_vector` in `packet.pyx`).
1647/// VECTOR is sent as a fully-prefetched LOB: the image data precedes the
1648/// (discarded) LOB locator.
1649pub(crate) fn parse_vector_value(reader: &mut TtcReader<'_>) -> Result<Option<QueryValue>> {
1650    let num_bytes = reader.read_ub4()?;
1651    if num_bytes == 0 {
1652        return Ok(None);
1653    }
1654    reader.read_ub8()?; // size (unused)
1655    reader.read_ub4()?; // chunk size (unused)
1656    let Some(data) = reader.read_bytes()? else {
1657        return Ok(None);
1658    };
1659    reader.read_bytes()?; // LOB locator (unused)
1660    if data.is_empty() {
1661        return Ok(None);
1662    }
1663    let vector = crate::vector::decode_vector(&data)?;
1664    Ok(Some(QueryValue::Vector(Box::new(vector))))
1665}
1666
1667/// Parses a native JSON (`DB_TYPE_JSON`) column value. Like VECTOR, OSON is sent
1668/// as a fully-prefetched LOB: `num_bytes`, `size`, `chunk_size`, the OSON image,
1669/// then a (discarded) LOB locator (reference packet.pyx `read_oson`).
1670pub(crate) fn parse_json_value(reader: &mut TtcReader<'_>) -> Result<Option<QueryValue>> {
1671    let num_bytes = reader.read_ub4()?;
1672    if num_bytes == 0 {
1673        return Ok(None);
1674    }
1675    reader.read_ub8()?; // size (unused)
1676    reader.read_ub4()?; // chunk size (unused)
1677    let Some(data) = reader.read_bytes()? else {
1678        return Ok(None);
1679    };
1680    reader.read_bytes()?; // LOB locator (unused)
1681    if data.is_empty() {
1682        return Ok(None);
1683    }
1684    let value = crate::oson::decode_oson(&data)?;
1685    Ok(Some(QueryValue::Json(Box::new(value))))
1686}
1687
1688pub(crate) fn parse_object_value(
1689    reader: &mut TtcReader<'_>,
1690    metadata: &ColumnMetadata,
1691) -> Result<Option<QueryValue>> {
1692    let _toid = reader.read_bytes_with_length()?;
1693    let _oid = reader.read_bytes_with_length()?;
1694    let _snapshot = reader.read_bytes_with_length()?;
1695    let _version = reader.read_ub2()?;
1696    let num_bytes = reader.read_ub4()?;
1697    reader.skip(2)?;
1698    if num_bytes == 0 {
1699        return Ok(None);
1700    }
1701    let Some(packed_data) = reader.read_bytes()? else {
1702        return Ok(None);
1703    };
1704    Ok(Some(QueryValue::Object(Box::new(ObjectValue {
1705        schema: metadata.object_schema.clone(),
1706        type_name: metadata.object_type_name.clone(),
1707        packed_data,
1708    }))))
1709}
1710
1711pub(crate) fn parse_cursor_value(reader: &mut TtcReader<'_>) -> Result<QueryValue> {
1712    reader.skip(1)?;
1713    let mut result = QueryResult::default();
1714    parse_describe_info(reader, ClientCapabilities::default(), &mut result)?;
1715    let cursor_id = u32::from(reader.read_ub2()?);
1716    Ok(QueryValue::Cursor(Box::new(CursorValue {
1717        columns: result.columns,
1718        cursor_id,
1719    })))
1720}
1721
1722pub(crate) struct QueryReturnParameters {
1723    pub row_counts: Option<Vec<u64>>,
1724    /// CQN registered-query id extracted from the registration-info block
1725    /// (reference base.pyx:1300-1309); `None` when no block was present.
1726    pub query_id: Option<u64>,
1727}
1728
1729pub(crate) fn parse_query_return_parameters(
1730    reader: &mut TtcReader<'_>,
1731    arraydmlrowcounts: bool,
1732) -> Result<QueryReturnParameters> {
1733    let num_params = reader.read_ub2()?;
1734    for _ in 0..num_params {
1735        let _value = reader.read_ub4()?;
1736    }
1737    let num_bytes = reader.read_ub2()?;
1738    if num_bytes > 0 {
1739        reader.skip(usize::from(num_bytes))?;
1740    }
1741    let num_pairs = reader.read_ub2()?;
1742    skip_keyword_value_pairs(reader, num_pairs)?;
1743    // registration info block: the trailing 8 bytes (msb at -4, lsb at -8) are
1744    // the CQN query id when a registration id was sent (reference base.pyx).
1745    let num_bytes = usize::from(reader.read_ub2()?);
1746    let mut query_id = None;
1747    if num_bytes > 0 {
1748        let block = reader.read_raw(num_bytes)?;
1749        if num_bytes >= 8 {
1750            let msb = u32::from_be_bytes([
1751                block[num_bytes - 4],
1752                block[num_bytes - 3],
1753                block[num_bytes - 2],
1754                block[num_bytes - 1],
1755            ]);
1756            let lsb = u32::from_be_bytes([
1757                block[num_bytes - 8],
1758                block[num_bytes - 7],
1759                block[num_bytes - 6],
1760                block[num_bytes - 5],
1761            ]);
1762            query_id = Some((u64::from(msb) << 32) | u64::from(lsb));
1763        }
1764    }
1765    if arraydmlrowcounts {
1766        // reference messages/base.pyx `_process_return_parameters` tail
1767        let num_rows = reader.read_ub4()?;
1768        // Each ub8 row count consumes at least one byte, so cap the reservation
1769        // by the remaining payload size (BoundedReader).
1770        let mut row_counts: Vec<u64> = reader.with_capacity_bounded(num_rows as usize, 1);
1771        for _ in 0..num_rows {
1772            row_counts.push(reader.read_ub8()?);
1773        }
1774        return Ok(QueryReturnParameters {
1775            row_counts: Some(row_counts),
1776            query_id,
1777        });
1778    }
1779    Ok(QueryReturnParameters {
1780        row_counts: None,
1781        query_id,
1782    })
1783}
1784
1785#[cfg(test)]
1786mod borrowed_fetch_tests {
1787    use super::*;
1788    use crate::thin::codecs::encode_number_text;
1789
1790    // Build a synthetic column metadata for a scalar type.
1791    fn col(name: &str, ora_type_num: u8, csfrm: u8, buffer_size: u32) -> ColumnMetadata {
1792        ColumnMetadata {
1793            name: name.to_string(),
1794            ora_type_num,
1795            csfrm,
1796            buffer_size,
1797            ..ColumnMetadata::default()
1798        }
1799    }
1800
1801    // Encode one row of [Text, Number, Raw, NULL-text] as the server would frame
1802    // the column values (each a `write_bytes_with_length` run that `read_bytes`
1803    // / `read_bytes_borrowed` consume identically), and return the byte offset
1804    // where the row's column values begin.
1805    fn encode_mixed_row(writer: &mut TtcWriter, text: &str, number: &str, raw: &[u8]) {
1806        writer.write_bytes_with_length(text.as_bytes()).unwrap();
1807        let num = encode_number_text(number).unwrap();
1808        writer.write_bytes_with_length(&num).unwrap();
1809        writer.write_bytes_with_length(raw).unwrap();
1810        writer.write_u8(0); // NULL column (length byte 0)
1811    }
1812
1813    // The borrowed batch decode must yield, for every cell, a value whose
1814    // `to_owned_value()` is bit-for-bit the owned-path `QueryValue`, across a
1815    // mixed Text/Number/Raw/NULL row. And the Text/Raw cells must genuinely
1816    // borrow the batch buffer (zero-copy), not a fresh allocation.
1817    #[test]
1818    fn borrowed_batch_matches_owned_path_for_mixed_row() {
1819        let columns = vec![
1820            col("T", ORA_TYPE_NUM_VARCHAR, CS_FORM_IMPLICIT, 4000),
1821            col("N", ORA_TYPE_NUM_NUMBER, CS_FORM_IMPLICIT, 22),
1822            col("R", ORA_TYPE_NUM_RAW, CS_FORM_IMPLICIT, 2000),
1823            col("Z", ORA_TYPE_NUM_VARCHAR, CS_FORM_IMPLICIT, 4000),
1824        ];
1825
1826        let mut writer = TtcWriter::new();
1827        encode_mixed_row(
1828            &mut writer,
1829            "héllo world",
1830            "-12.5",
1831            &[0xDE, 0xAD, 0xBE, 0xEF],
1832        );
1833        encode_mixed_row(&mut writer, "second", "42", &[0x01]);
1834        let buffer = writer.into_bytes();
1835        let row_starts = vec![0, {
1836            // Find the second row's start by replaying the first row's consumption.
1837            let mut reader = TtcReader::new(&buffer);
1838            for c in &columns {
1839                let _ = parse_column_value(&mut reader, c).unwrap();
1840            }
1841            reader.position()
1842        }];
1843
1844        // Owned path: decode both rows the existing way for the golden values.
1845        let owned_rows: Vec<Vec<Option<QueryValue>>> = row_starts
1846            .iter()
1847            .map(|&start| {
1848                let mut reader = TtcReader::new(&buffer[start..]);
1849                columns
1850                    .iter()
1851                    .map(|c| parse_column_value(&mut reader, c).unwrap())
1852                    .collect()
1853            })
1854            .collect();
1855
1856        // Borrowed path: decode through the batch, collecting owned copies and
1857        // proving the scalar cells borrow the buffer.
1858        let batch = BorrowedRowBatch::new(buffer.clone(), columns.clone(), row_starts);
1859        let buf_ptr_range = batch.buffer_ptr_range();
1860
1861        let mut seen_rows = 0usize;
1862        let mut borrowed_owned: Vec<Vec<Option<QueryValue>>> = Vec::new();
1863        batch
1864            .for_each_row_ref(|row| {
1865                seen_rows += 1;
1866                // Text cell borrows the buffer.
1867                if let Some(QueryValueRef::Text(t)) = row[0] {
1868                    let p = t.as_ptr() as usize;
1869                    assert!(
1870                        buf_ptr_range.contains(&p),
1871                        "Text cell must borrow the batch buffer (zero-copy)"
1872                    );
1873                }
1874                // Raw cell borrows the buffer.
1875                if let Some(QueryValueRef::Raw(r)) = row[2] {
1876                    let p = r.as_ptr() as usize;
1877                    assert!(
1878                        buf_ptr_range.contains(&p),
1879                        "Raw cell must borrow the batch buffer (zero-copy)"
1880                    );
1881                }
1882                borrowed_owned.push(
1883                    row.iter()
1884                        .map(|cell| cell.map(|v| v.to_owned_value()))
1885                        .collect(),
1886                );
1887                Ok::<(), ProtocolError>(())
1888            })
1889            .unwrap();
1890
1891        assert_eq!(seen_rows, 2, "batch yields both rows");
1892        assert_eq!(
1893            borrowed_owned, owned_rows,
1894            "borrowed cells to_owned() must equal the owned-path values"
1895        );
1896    }
1897
1898    // The borrowed response parser walks the *same* message framing as the owned
1899    // `parse_fetch_response_with_context` (ROW_HEADER / BIT_VECTOR / ROW_DATA /
1900    // END_OF_RESPONSE), but instead of building owned rows it captures each
1901    // row's byte offset and hands back a `BorrowedRowBatch`. Decoding that batch
1902    // must reproduce exactly what the owned fetch path produced — duplicate
1903    // columns (bit vector) and all. Fixture is the same one the owned
1904    // `fetch_response_decodes_rows_with_previous_cursor_metadata` test uses.
1905    #[test]
1906    fn borrowed_response_parse_matches_owned_fetch_path() {
1907        use hex::FromHex;
1908        let payload = Vec::from_hex("06020101000205dc0001010101000702c1041d")
1909            .expect("fixture response should be valid hex");
1910        let columns = vec![
1911            col("INTCOL", ORA_TYPE_NUM_NUMBER, CS_FORM_IMPLICIT, 22),
1912            col("NUMBERCOL", ORA_TYPE_NUM_NUMBER, CS_FORM_IMPLICIT, 22),
1913        ];
1914        let previous_row = vec![
1915            Some(QueryValue::number_from_text("2", true)),
1916            Some(QueryValue::number_from_text("0.5", false)),
1917        ];
1918
1919        // Owned golden.
1920        let owned = parse_query_response_with_context(
1921            &payload,
1922            ClientCapabilities::default(),
1923            &columns,
1924            Some(&previous_row),
1925        )
1926        .expect("owned fetch decode");
1927
1928        // Borrowed parse.
1929        let borrowed = parse_query_response_borrowed(
1930            &payload,
1931            ClientCapabilities::default(),
1932            &columns,
1933            Some(&previous_row),
1934        )
1935        .expect("borrowed fetch decode");
1936
1937        assert_eq!(borrowed.more_rows, owned.more_rows);
1938        assert_eq!(borrowed.cursor_id, owned.cursor_id);
1939        assert_eq!(borrowed.batch.row_count(), owned.rows.len());
1940
1941        let mut borrowed_owned: Vec<Vec<Option<QueryValue>>> = Vec::new();
1942        borrowed
1943            .batch
1944            .for_each_row_ref(|row| {
1945                borrowed_owned.push(
1946                    row.iter()
1947                        .map(|cell| cell.map(|v| v.to_owned_value()))
1948                        .collect(),
1949                );
1950                Ok::<(), ProtocolError>(())
1951            })
1952            .expect("iterate borrowed rows");
1953
1954        assert_eq!(
1955            borrowed_owned, owned.rows,
1956            "borrowed batch must reproduce the owned fetch rows (incl. duplicate columns)"
1957        );
1958    }
1959}
1960
1961#[cfg(test)]
1962mod fuzz_regression_tests {
1963    use super::*;
1964
1965    // Regression (w6-fuzz, query_response target): a TNS_MSG_TYPE_IMPLICIT_RESULTSET
1966    // message (27) whose ub4 result count was ~620M made the dispatch loop
1967    // `Vec::with_capacity` several gigabytes of `QueryValue::Cursor` before the
1968    // truncated read failed, tripping libFuzzer's OOM detector. The parser must
1969    // now fail closed (truncated payload) without the giant allocation.
1970    #[test]
1971    fn fuzz_regression_implicit_resultset_oom() {
1972        // payload: type=27, ub4 length byte 4, value 0x25000000 (~620M), then EOF
1973        let payload = [27u8, 4, 37, 0, 0, 0];
1974        let err = parse_query_response(&payload, ClientCapabilities::default())
1975            .expect_err("oversized implicit-resultset count must fail closed");
1976        assert!(
1977            matches!(err, ProtocolError::TtcDecode(_)),
1978            "expected fail-closed TtcDecode, got {err:?}"
1979        );
1980    }
1981
1982    // BoundedReader invariant (l2p), query-columns family: a DESCRIBE_INFO
1983    // message (16) declaring a huge num_columns (ub4 ~620M) with no column
1984    // metadata bytes following must fail closed, not pre-allocate one
1985    // ColumnMetadata per declared column. parse_describe_info grows the column
1986    // Vec via push (no speculative with_capacity), and the first
1987    // parse_column_metadata read past the end errors.
1988    #[test]
1989    fn describe_info_oversized_column_count_fails_closed_not_oom() {
1990        // type=16 DESCRIBE_INFO; describe_name read_bytes len byte 0 (null);
1991        // max_row_size ub4 = 0; num_columns ub4 (len byte 4) = 0x25000000
1992        // (~620M); then EOF before the skip(1)/column records.
1993        let payload = [16u8, 0, 0, 4, 0x25, 0x00, 0x00, 0x00];
1994        let err = parse_query_response(&payload, ClientCapabilities::default())
1995            .expect_err("oversized column count must fail closed");
1996        assert!(
1997            matches!(err, ProtocolError::TtcDecode(_)),
1998            "expected fail-closed TtcDecode, got {err:?}"
1999        );
2000    }
2001
2002    // BoundedReader invariant (l2p), out-bind array family: an array OUT bind
2003    // whose ub4 num_elements is enormous (~620M) but carries no element bytes
2004    // must fail closed via with_capacity_bounded + the per-element read, not
2005    // reserve gigabytes of Option<QueryValue>.
2006    #[test]
2007    fn out_bind_array_oversized_element_count_fails_closed_not_oom() {
2008        let metadata = ColumnMetadata {
2009            name: "ARR".to_string(),
2010            ora_type_num: ORA_TYPE_NUM_NUMBER,
2011            is_array: true,
2012            ..ColumnMetadata::default()
2013        };
2014        let bind_columns = [metadata];
2015        let out_bind_indexes = [0usize];
2016        // ub4 num_elements: len byte 4, value 0x25000000, then no elements.
2017        let payload = [4u8, 0x25, 0x00, 0x00, 0x00];
2018        let mut reader = TtcReader::new(&payload);
2019        let mut result = QueryResult::default();
2020        let err =
2021            parse_out_bind_row_data(&mut reader, &mut result, &bind_columns, &out_bind_indexes)
2022                .expect_err("oversized array OUT bind count must fail closed");
2023        assert!(
2024            matches!(err, ProtocolError::TtcDecode(_)),
2025            "expected fail-closed TtcDecode, got {err:?}"
2026        );
2027    }
2028}