Skip to main content

oracledb_protocol/thin/
fetch.rs

1#![forbid(unsafe_code)]
2
3use super::*;
4use crate::wire::ProtocolLimits;
5
6/// Validate `slice` as UTF-8 for the hot borrowed-text decode path, returning the
7/// borrowed `&str` on success or `()` on rejection (the caller falls back to the
8/// owned `TextRaw` carrier — semantics identical regardless of validator).
9///
10/// With the `simd-decode` feature this uses `simdutf8::basic::from_utf8`, whose
11/// accept/reject decision is byte-for-byte identical to `core::str::from_utf8`
12/// (it validates the exact same UTF-8 grammar — it only declines to compute the
13/// error *position*, which this path never uses). The crate stays
14/// `#![forbid(unsafe_code)]`-clean: `simdutf8`'s SIMD `unsafe` is encapsulated
15/// inside that dependency and we call only its safe API.
16#[inline]
17fn validate_utf8(slice: &[u8]) -> core::result::Result<&str, ()> {
18    #[cfg(feature = "simd-decode")]
19    {
20        simdutf8::basic::from_utf8(slice).map_err(|_| ())
21    }
22    #[cfg(not(feature = "simd-decode"))]
23    {
24        core::str::from_utf8(slice).map_err(|_| ())
25    }
26}
27
28pub fn build_fetch_payload(cursor_id: u32, arraysize: u32) -> Vec<u8> {
29    build_fetch_payload_with_seq(cursor_id, arraysize, 1)
30}
31
32pub fn build_fetch_payload_with_seq(cursor_id: u32, arraysize: u32, seq_num: u8) -> Vec<u8> {
33    // Fixed tiny payload (function code + ub8 + two ub4 ≈ <=20 bytes). Prealloc
34    // so the small pushes do not grow the Vec through doublings; built every
35    // fetch page, so this matters on multi-page fetches. Bytes unchanged.
36    let mut writer = TtcWriter::with_capacity(32);
37    writer.write_function_code_with_seq(TNS_FUNC_FETCH, seq_num);
38    writer.write_ub8(0);
39    writer.write_ub4(cursor_id);
40    writer.write_ub4(arraysize);
41    writer.into_bytes()
42}
43
44pub fn build_define_fetch_payload_with_seq(
45    cursor_id: u32,
46    arraysize: u32,
47    seq_num: u8,
48    define_columns: &[ColumnMetadata],
49) -> Result<Vec<u8>> {
50    let define_count =
51        u32::try_from(define_columns.len()).map_err(|_| ProtocolError::InvalidPacketLength {
52            length: define_columns.len(),
53            minimum: 0,
54        })?;
55    let mut writer = TtcWriter::new();
56    writer.write_function_code_with_seq(TNS_FUNC_EXECUTE, seq_num);
57    writer.write_ub8(0);
58    writer.write_ub4(TNS_EXEC_OPTION_DEFINE | TNS_EXEC_OPTION_NOT_PLSQL);
59    writer.write_ub4(cursor_id);
60    writer.write_u8(0);
61    writer.write_ub4(0);
62    writer.write_u8(1);
63    writer.write_ub4(13);
64    writer.write_u8(0);
65    writer.write_u8(0);
66    writer.write_ub4(0);
67    writer.write_ub4(arraysize);
68    writer.write_ub4(TNS_MAX_LONG_LENGTH);
69    writer.write_u8(0);
70    writer.write_ub4(0);
71    writer.write_u8(0);
72    writer.write_u8(0);
73    writer.write_u8(0);
74    writer.write_u8(0);
75    writer.write_u8(0);
76    writer.write_u8(1);
77    writer.write_ub4(define_count);
78    writer.write_ub4(0);
79    writer.write_u8(0);
80    writer.write_u8(1);
81    writer.write_u8(0);
82    writer.write_ub4(0);
83    writer.write_u8(0);
84    writer.write_ub4(0);
85    writer.write_ub4(0);
86    writer.write_u8(0);
87    writer.write_ub4(0);
88    writer.write_u8(0);
89    writer.write_u8(0);
90    writer.write_ub4(0);
91    writer.write_ub4(0);
92    writer.write_ub4(0);
93    writer.write_ub4(0);
94    writer.write_ub4(0);
95    writer.write_ub4(0);
96    writer.write_ub4(0);
97    writer.write_ub4(arraysize);
98    writer.write_ub4(0);
99    writer.write_ub4(0);
100    writer.write_ub4(0);
101    writer.write_ub4(0);
102    writer.write_ub4(0);
103    writer.write_ub4(1);
104    writer.write_ub4(0);
105    writer.write_ub4(0);
106    writer.write_ub4(0);
107    writer.write_ub4(0);
108    writer.write_ub4(0);
109    for metadata in define_columns {
110        write_define_column_metadata(&mut writer, metadata);
111    }
112    Ok(writer.into_bytes())
113}
114
115pub(crate) fn write_define_column_metadata(writer: &mut TtcWriter, metadata: &ColumnMetadata) {
116    // reference base.pyx: VECTOR (and JSON) columns advertise a LOB-prefetch
117    // buffer so the server streams the image inline rather than returning a
118    // bare temp-LOB locator
119    let (mut buffer_size, cont_flags, lob_prefetch_length) = match metadata.ora_type_num {
120        ORA_TYPE_NUM_CLOB | ORA_TYPE_NUM_BLOB => (metadata.buffer_size, TNS_LOB_PREFETCH_FLAG, 0),
121        ORA_TYPE_NUM_VECTOR => (
122            TNS_VECTOR_MAX_LENGTH,
123            TNS_LOB_PREFETCH_FLAG,
124            TNS_VECTOR_MAX_LENGTH,
125        ),
126        ORA_TYPE_NUM_JSON => (
127            TNS_JSON_MAX_LENGTH,
128            TNS_LOB_PREFETCH_FLAG,
129            TNS_JSON_MAX_LENGTH,
130        ),
131        _ => (metadata.buffer_size, 0, 0),
132    };
133    buffer_size = buffer_size.max(1);
134    writer.write_u8(metadata.ora_type_num);
135    writer.write_u8(TNS_BIND_USE_INDICATORS);
136    writer.write_u8(0);
137    writer.write_u8(0);
138    writer.write_ub4(buffer_size);
139    writer.write_ub4(0);
140    writer.write_ub8(cont_flags);
141    writer.write_ub4(0);
142    writer.write_ub2(0);
143    if metadata.csfrm != 0 {
144        writer.write_ub2(TNS_CHARSET_UTF8);
145    } else {
146        writer.write_ub2(0);
147    }
148    writer.write_u8(metadata.csfrm);
149    writer.write_ub4(lob_prefetch_length);
150    writer.write_ub4(0);
151}
152
153pub fn parse_query_response(
154    payload: &[u8],
155    capabilities: ClientCapabilities,
156) -> Result<QueryResult> {
157    parse_query_response_with_previous(payload, capabilities, None)
158}
159
160pub fn parse_query_response_with_limits(
161    payload: &[u8],
162    capabilities: ClientCapabilities,
163    limits: ProtocolLimits,
164) -> Result<QueryResult> {
165    parse_query_response_with_context_binds_options_and_limits(
166        payload,
167        capabilities,
168        &[],
169        None,
170        &[],
171        &[],
172        false,
173        ExecuteOptions::default(),
174        limits,
175    )
176}
177
178pub fn parse_query_response_with_binds(
179    payload: &[u8],
180    capabilities: ClientCapabilities,
181    binds: &[BindValue],
182) -> Result<QueryResult> {
183    parse_query_response_with_binds_and_options(
184        payload,
185        capabilities,
186        binds,
187        ExecuteOptions::default(),
188    )
189}
190
191pub fn parse_query_response_with_binds_and_options(
192    payload: &[u8],
193    capabilities: ClientCapabilities,
194    binds: &[BindValue],
195    exec_options: ExecuteOptions,
196) -> Result<QueryResult> {
197    parse_query_response_with_binds_options_and_columns(
198        payload,
199        capabilities,
200        binds,
201        exec_options,
202        &[],
203    )
204}
205
206/// `known_columns` carries the fetch metadata of a re-executed statement
207/// whose response does not repeat the describe information (reference keeps
208/// the statement's fetch vars across executions).
209pub fn parse_query_response_with_binds_options_and_columns(
210    payload: &[u8],
211    capabilities: ClientCapabilities,
212    binds: &[BindValue],
213    exec_options: ExecuteOptions,
214    known_columns: &[ColumnMetadata],
215) -> Result<QueryResult> {
216    let bind_columns = binds.iter().map(bind_column_metadata).collect::<Vec<_>>();
217    let output_bind_indexes = binds
218        .iter()
219        .enumerate()
220        .filter_map(|(index, value)| value.is_return_output().then_some(index))
221        .collect::<Vec<_>>();
222    parse_query_response_with_context_binds_and_options(
223        payload,
224        capabilities,
225        known_columns,
226        None,
227        &bind_columns,
228        &output_bind_indexes,
229        false,
230        exec_options,
231    )
232}
233
234pub fn parse_query_response_with_binds_options_columns_and_limits(
235    payload: &[u8],
236    capabilities: ClientCapabilities,
237    binds: &[BindValue],
238    exec_options: ExecuteOptions,
239    known_columns: &[ColumnMetadata],
240    limits: ProtocolLimits,
241) -> Result<QueryResult> {
242    limits.check_binds(binds.len())?;
243    let bind_columns = binds.iter().map(bind_column_metadata).collect::<Vec<_>>();
244    let output_bind_indexes = binds
245        .iter()
246        .enumerate()
247        .filter_map(|(index, value)| value.is_return_output().then_some(index))
248        .collect::<Vec<_>>();
249    parse_query_response_with_context_binds_options_and_limits(
250        payload,
251        capabilities,
252        known_columns,
253        None,
254        &bind_columns,
255        &output_bind_indexes,
256        false,
257        exec_options,
258        limits,
259    )
260}
261
262pub fn parse_query_response_with_previous(
263    payload: &[u8],
264    capabilities: ClientCapabilities,
265    previous_row: Option<&[Option<QueryValue>]>,
266) -> Result<QueryResult> {
267    parse_query_response_with_context(payload, capabilities, &[], previous_row)
268}
269
270pub fn parse_query_response_with_context(
271    payload: &[u8],
272    capabilities: ClientCapabilities,
273    previous_columns: &[ColumnMetadata],
274    previous_row: Option<&[Option<QueryValue>]>,
275) -> Result<QueryResult> {
276    parse_query_response_with_context_and_binds(
277        payload,
278        capabilities,
279        previous_columns,
280        previous_row,
281        &[],
282        &[],
283        false,
284        ProtocolLimits::DEFAULT,
285    )
286}
287
288pub fn parse_fetch_response_with_context(
289    payload: &[u8],
290    capabilities: ClientCapabilities,
291    previous_columns: &[ColumnMetadata],
292    previous_row: Option<&[Option<QueryValue>]>,
293) -> Result<QueryResult> {
294    parse_fetch_response_with_context_and_limits(
295        payload,
296        capabilities,
297        previous_columns,
298        previous_row,
299        ProtocolLimits::DEFAULT,
300    )
301}
302
303pub fn parse_fetch_response_with_context_and_limits(
304    payload: &[u8],
305    capabilities: ClientCapabilities,
306    previous_columns: &[ColumnMetadata],
307    previous_row: Option<&[Option<QueryValue>]>,
308    limits: ProtocolLimits,
309) -> Result<QueryResult> {
310    parse_query_response_with_context_and_binds(
311        payload,
312        capabilities,
313        previous_columns,
314        previous_row,
315        &[],
316        &[],
317        true,
318        limits,
319    )
320}
321
322#[allow(clippy::too_many_arguments)] // mirrors the reference message attribute set
323pub(crate) fn parse_query_response_with_context_and_binds(
324    payload: &[u8],
325    capabilities: ClientCapabilities,
326    previous_columns: &[ColumnMetadata],
327    previous_row: Option<&[Option<QueryValue>]>,
328    bind_columns: &[ColumnMetadata],
329    output_bind_indexes: &[usize],
330    fetch_long_status: bool,
331    limits: ProtocolLimits,
332) -> Result<QueryResult> {
333    parse_query_response_with_context_binds_options_and_limits(
334        payload,
335        capabilities,
336        previous_columns,
337        previous_row,
338        bind_columns,
339        output_bind_indexes,
340        fetch_long_status,
341        ExecuteOptions::default(),
342        limits,
343    )
344}
345
346#[allow(clippy::too_many_arguments)] // mirrors the reference message attribute set
347pub(crate) fn parse_query_response_with_context_binds_and_options(
348    payload: &[u8],
349    capabilities: ClientCapabilities,
350    previous_columns: &[ColumnMetadata],
351    previous_row: Option<&[Option<QueryValue>]>,
352    bind_columns: &[ColumnMetadata],
353    output_bind_indexes: &[usize],
354    fetch_long_status: bool,
355    exec_options: ExecuteOptions,
356) -> Result<QueryResult> {
357    parse_query_response_with_context_binds_options_and_limits(
358        payload,
359        capabilities,
360        previous_columns,
361        previous_row,
362        bind_columns,
363        output_bind_indexes,
364        fetch_long_status,
365        exec_options,
366        ProtocolLimits::DEFAULT,
367    )
368}
369
370#[allow(clippy::too_many_arguments)] // mirrors the reference message attribute set
371pub(crate) fn parse_query_response_with_context_binds_options_and_limits(
372    payload: &[u8],
373    capabilities: ClientCapabilities,
374    previous_columns: &[ColumnMetadata],
375    previous_row: Option<&[Option<QueryValue>]>,
376    bind_columns: &[ColumnMetadata],
377    output_bind_indexes: &[usize],
378    fetch_long_status: bool,
379    exec_options: ExecuteOptions,
380    limits: ProtocolLimits,
381) -> Result<QueryResult> {
382    let mut reader = TtcReader::with_limits(payload, limits)?;
383    let mut result = QueryResult {
384        columns: previous_columns.to_vec(),
385        more_rows: true,
386        ..QueryResult::default()
387    };
388    let mut bit_vector: Option<Vec<u8>> = None;
389    let mut out_bind_indexes: Vec<usize> = Vec::new();
390    while reader.remaining() > 0 {
391        let message_type = reader.read_u8()?;
392        match message_type {
393            0 => {}
394            TNS_MSG_TYPE_DESCRIBE_INFO => {
395                let _describe_name = reader.read_bytes()?;
396                let previous = std::mem::take(&mut result.columns);
397                parse_describe_info(&mut reader, capabilities, &mut result)?;
398                // re-executing an open cursor whose underlying types changed:
399                // the server re-describes mid-response but still streams the
400                // row data in the adjusted (LONG/LONG RAW) form expected by
401                // the previous fetch metadata (reference `_adjust_metadata`,
402                // impl/thin/messages/base.pyx:820-845, applied during
403                // `_process_describe_info`).
404                for (index, column) in result.columns.iter_mut().enumerate() {
405                    if let Some(prev) = previous.get(index) {
406                        adjust_refetch_metadata(prev, column);
407                    }
408                }
409            }
410            TNS_MSG_TYPE_ROW_HEADER => {
411                bit_vector = parse_row_header(&mut reader)?;
412            }
413            TNS_MSG_TYPE_ROW_DATA => {
414                if result.columns.is_empty() && !out_bind_indexes.is_empty() {
415                    parse_out_bind_row_data(
416                        &mut reader,
417                        &mut result,
418                        bind_columns,
419                        &out_bind_indexes,
420                    )?;
421                } else if result.columns.is_empty() && !output_bind_indexes.is_empty() {
422                    parse_returning_row_data(
423                        &mut reader,
424                        &mut result,
425                        bind_columns,
426                        output_bind_indexes,
427                    )?;
428                } else {
429                    parse_row_data(
430                        &mut reader,
431                        &mut result,
432                        bit_vector.as_deref(),
433                        previous_row,
434                        fetch_long_status,
435                    )?;
436                }
437                bit_vector = None;
438            }
439            TNS_MSG_TYPE_BIT_VECTOR => {
440                bit_vector = Some(parse_bit_vector(&mut reader, result.columns.len())?);
441            }
442            TNS_MSG_TYPE_PARAMETER => {
443                let params =
444                    parse_query_return_parameters(&mut reader, exec_options.arraydmlrowcounts)?;
445                if exec_options.arraydmlrowcounts {
446                    result.array_dml_row_counts = Some(params.row_counts.unwrap_or_default());
447                }
448                if params.query_id.is_some() {
449                    result.query_id = params.query_id;
450                }
451            }
452            TNS_MSG_TYPE_STATUS => {
453                let call_status = reader.read_ub4()?;
454                let _seq = reader.read_ub2()?;
455                result.txn_in_progress = Some(call_status & TNS_EOCS_FLAGS_TXN_IN_PROGRESS != 0);
456            }
457            TNS_MSG_TYPE_IO_VECTOR => {
458                out_bind_indexes = parse_io_vector(&mut reader, bind_columns.len())?
459                    .into_iter()
460                    .filter(|index| !output_bind_indexes.contains(index))
461                    .collect();
462            }
463            TNS_MSG_TYPE_FLUSH_OUT_BINDS => break,
464            TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
465                if let Some(update) = skip_server_side_piggyback(&mut reader)? {
466                    result.sessionless_txn_state = Some(update);
467                }
468            }
469            TNS_MSG_TYPE_IMPLICIT_RESULTSET => {
470                // reference messages/base.pyx `_process_implicit_result`
471                let num_results = reader.read_ub4()?;
472                // `num_results` is read straight off the wire (a ub4, up to
473                // ~4e9); each resultset consumes at least one byte, so cap the
474                // reservation by the bytes left in the payload (BoundedReader).
475                // Without this a hostile server forces a multi-gigabyte
476                // allocation (OOM) before the truncated read in the loop body
477                // fails closed.
478                let mut resultsets: Vec<QueryValue> = reader.with_capacity_limited(
479                    num_results as usize,
480                    1,
481                    ProtocolLimits::check_length_prefixed_elements,
482                )?;
483                for _ in 0..num_results {
484                    let num_bytes = reader.read_u8()?;
485                    reader.skip(usize::from(num_bytes))?;
486                    let mut child = QueryResult::default();
487                    parse_describe_info(&mut reader, capabilities, &mut child)?;
488                    let child_cursor_id = u32::from(reader.read_ub2()?);
489                    resultsets.push(QueryValue::Cursor(Box::new(CursorValue {
490                        columns: child.columns,
491                        cursor_id: child_cursor_id,
492                    })));
493                }
494                result.implicit_resultsets = Some(resultsets);
495            }
496            TNS_MSG_TYPE_END_OF_RESPONSE => break,
497            // pipeline responses open with the token of the operation they
498            // answer (messages/base.pyx:288-293); callers compare it against
499            // the expected token (mismatch -> DPY-2052 at the driver layer)
500            TNS_MSG_TYPE_TOKEN => {
501                result.token_num = Some(reader.read_ub8()?);
502            }
503            TNS_MSG_TYPE_ERROR => {
504                let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
505                // The end-of-call ERROR message (number 0 on success) carries
506                // the end-of-call status; sample the transaction-in-progress bit
507                // (reference protocol.pyx `_process_call_status`).
508                result.txn_in_progress =
509                    Some(info.call_status & TNS_EOCS_FLAGS_TXN_IN_PROGRESS != 0);
510                if info.cursor_id != 0 {
511                    result.cursor_id = u32::from(info.cursor_id);
512                }
513                result.row_count = info.row_count;
514                result.compilation_error_warning |= info.compilation_error_warning;
515                result.last_rowid = info.rowid.clone();
516                if info.number == TNS_ERR_NO_DATA_FOUND && !result.columns.is_empty() {
517                    result.more_rows = false;
518                } else if info.number == TNS_ERR_ARRAY_DML_ERRORS {
519                    // executemany(batcherrors=True): errors are reported via
520                    // the batch error arrays instead of raising ORA-24381
521                    // (reference messages/base.pyx `_process_error_info`).
522                    result.batch_errors = info.batch_errors;
523                } else if info.number != 0 {
524                    let mut details = info.into_details();
525                    details.array_dml_row_counts = result.array_dml_row_counts.take();
526                    return Err(ProtocolError::ServerErrorInfo(Box::new(details)));
527                }
528            }
529            _ => {
530                let position = reader.position().saturating_sub(1);
531                if let Some(message) =
532                    find_embedded_server_error(payload, capabilities.ttc_field_version, position)
533                {
534                    return Err(ProtocolError::ServerError(message));
535                }
536                return Err(ProtocolError::UnknownMessageType {
537                    message_type,
538                    position,
539                });
540            }
541        }
542    }
543    Ok(result)
544}
545
546pub(crate) fn bind_column_metadata(value: &BindValue) -> ColumnMetadata {
547    let (ora_type_num, csfrm, buffer_size) = bind_metadata(value);
548    let object_schema = match value {
549        BindValue::ObjectOutput { schema, .. } | BindValue::ObjectInput { schema, .. } => {
550            Some(schema.clone())
551        }
552        _ => None,
553    };
554    let object_type_name = match value {
555        BindValue::ObjectOutput { type_name, .. } | BindValue::ObjectInput { type_name, .. } => {
556            Some(type_name.clone())
557        }
558        _ => None,
559    };
560    ColumnMetadata {
561        name: String::new(),
562        ora_type_num,
563        csfrm,
564        precision: 0,
565        scale: 0,
566        buffer_size,
567        max_size: buffer_size,
568        nulls_allowed: true,
569        is_json: false,
570        is_oson: false,
571        object_schema,
572        object_type_name,
573        is_array: matches!(value, BindValue::Array { .. }),
574        vector_dimensions: None,
575        vector_format: 0,
576        vector_flags: 0,
577        domain_schema: None,
578        domain_name: None,
579        annotations: None,
580    }
581}
582
583pub(crate) fn parse_io_vector(reader: &mut TtcReader<'_>, bind_count: usize) -> Result<Vec<usize>> {
584    let _flags = reader.read_u8()?;
585    let temp16 = reader.read_ub2()?;
586    let temp32 = reader.read_ub4()?;
587    let num_binds = usize::try_from(temp32)
588        .map_err(|_| ProtocolError::InvalidPacketLength {
589            length: usize::MAX,
590            minimum: 0,
591        })?
592        .checked_mul(256)
593        .and_then(|value| value.checked_add(usize::from(temp16)))
594        .ok_or(ProtocolError::InvalidPacketLength {
595            length: usize::MAX,
596            minimum: 0,
597        })?;
598    let _num_iters_this_time = reader.read_ub4()?;
599    let _uac_buffer_length = reader.read_ub2()?;
600    let fast_fetch_len = reader.read_ub2()?;
601    if fast_fetch_len > 0 {
602        reader.skip(usize::from(fast_fetch_len))?;
603    }
604    let rowid_len = reader.read_ub2()?;
605    if rowid_len > 0 {
606        reader.skip(usize::from(rowid_len))?;
607    }
608    let mut out_indexes = Vec::new();
609    reader.limits().check_binds(num_binds)?;
610    for index in 0..num_binds {
611        let direction = reader.read_u8()?;
612        if index < bind_count && direction != TNS_BIND_DIR_INPUT {
613            out_indexes.push(index);
614        }
615    }
616    Ok(out_indexes)
617}
618
619pub(crate) fn find_embedded_server_error(
620    payload: &[u8],
621    ttc_field_version: u8,
622    position: usize,
623) -> Option<String> {
624    let start = position.saturating_sub(64);
625    for candidate in start..=position {
626        if !matches!(payload.get(candidate).copied(), Some(TNS_MSG_TYPE_ERROR)) {
627            continue;
628        }
629        let mut reader = TtcReader::new(payload.get(candidate + 1..)?);
630        let info = parse_server_error_info(&mut reader, ttc_field_version).ok()?;
631        if info.number != 0 && info.message.starts_with("ORA-") {
632            return Some(info.message);
633        }
634    }
635    None
636}
637
638pub(crate) fn parse_describe_info(
639    reader: &mut TtcReader<'_>,
640    capabilities: ClientCapabilities,
641    result: &mut QueryResult,
642) -> Result<()> {
643    let _max_row_size = reader.read_ub4()?;
644    let num_columns = reader.read_ub4()?;
645    reader.limits().check_columns(num_columns as usize)?;
646    result.columns.clear();
647    if num_columns > 0 {
648        reader.skip(1)?;
649    }
650    for _ in 0..num_columns {
651        result
652            .columns
653            .push(parse_column_metadata(reader, capabilities)?);
654    }
655    let _current_date = reader.read_bytes_with_length()?;
656    let _dcbflag = reader.read_ub4()?;
657    let _dcbmdbz = reader.read_ub4()?;
658    let _dcbmnpr = reader.read_ub4()?;
659    let _dcbmxpr = reader.read_ub4()?;
660    let _dcbqcky = reader.read_bytes_with_length()?;
661    Ok(())
662}
663
664pub(crate) fn parse_column_metadata(
665    reader: &mut TtcReader<'_>,
666    capabilities: ClientCapabilities,
667) -> Result<ColumnMetadata> {
668    let ora_type_num = reader.read_u8()?;
669    reader.skip(1)?;
670    let precision = reader.read_i8()?;
671    let scale = reader.read_i8()?;
672    let buffer_size = reader.read_ub4()?;
673    let _max_array_elements = reader.read_ub4()?;
674    let _cont_flags = reader.read_ub8()?;
675    let _oid = reader.read_bytes_with_length()?;
676    let _version = reader.read_ub2()?;
677    let _charset_id = reader.read_ub2()?;
678    let csfrm = reader.read_u8()?;
679    let mut max_size = reader.read_ub4()?;
680    if ora_type_num == ORA_TYPE_NUM_RAW {
681        max_size = buffer_size;
682    }
683    if capabilities.ttc_field_version >= TNS_CCAP_FIELD_VERSION_12_2 {
684        let _oaccolid = reader.read_ub4()?;
685    }
686    let nulls_allowed = reader.read_u8()? != 0;
687    reader.skip(1)?;
688    let name = reader.read_string_with_length()?.unwrap_or_default();
689    let object_schema = reader.read_string_with_length()?;
690    let object_type_name = reader.read_string_with_length()?;
691    let _column_position = reader.read_ub2()?;
692    let uds_flags = reader.read_ub4()?;
693    let mut domain_schema = None;
694    let mut domain_name = None;
695    let mut annotations: Option<Vec<(String, String)>> = None;
696    if capabilities.ttc_field_version >= TNS_CCAP_FIELD_VERSION_23_1 {
697        domain_schema = reader.read_string_with_length()?;
698        domain_name = reader.read_string_with_length()?;
699    }
700    if capabilities.ttc_field_version >= TNS_CCAP_FIELD_VERSION_23_1_EXT_3 {
701        let num_annotations = reader.read_ub4()?;
702        if num_annotations > 0 {
703            reader.skip(1)?;
704            let num_annotations = reader.read_ub4()?;
705            reader.skip(1)?;
706            // Bound by remaining bytes (BoundedReader): each annotation reads
707            // at least a length-prefixed key/value, so a ub4 count larger than
708            // the payload is a lie that must not pre-allocate gigabytes.
709            let mut collected: Vec<(String, String)> = reader.with_capacity_limited(
710                num_annotations as usize,
711                1,
712                ProtocolLimits::check_object_elements,
713            )?;
714            for _ in 0..num_annotations {
715                let key = reader.read_string_with_length()?.unwrap_or_default();
716                // A null annotation value is normalized to "" by the reference
717                // driver (python-oracledb base.pyx _process_metadata).
718                let value = reader.read_string_with_length()?.unwrap_or_default();
719                let _flags = reader.read_ub4()?;
720                collected.push((key, value));
721            }
722            let _flags = reader.read_ub4()?;
723            annotations = Some(collected);
724        }
725    }
726    let mut vector_dimensions = None;
727    let mut vector_format = 0u8;
728    let mut vector_flags = 0u8;
729    if capabilities.ttc_field_version >= TNS_CCAP_FIELD_VERSION_23_4 {
730        // reference metadata.pyx: ub4 dimensions, ub1 format, ub1 flags
731        let dims = reader.read_ub4()?;
732        reader.limits().check_vector_dimensions(dims as usize)?;
733        vector_format = reader.read_u8()?;
734        vector_flags = reader.read_u8()?;
735        if ora_type_num == ORA_TYPE_NUM_VECTOR {
736            vector_dimensions = Some(dims);
737        }
738    }
739
740    Ok(ColumnMetadata {
741        name,
742        ora_type_num,
743        csfrm,
744        precision,
745        scale,
746        buffer_size,
747        max_size,
748        nulls_allowed,
749        is_json: uds_flags & TNS_UDS_FLAGS_IS_JSON != 0,
750        is_oson: uds_flags & TNS_UDS_FLAGS_IS_OSON != 0,
751        object_schema,
752        object_type_name,
753        is_array: false,
754        vector_dimensions,
755        vector_format,
756        vector_flags,
757        domain_schema,
758        domain_name,
759        annotations,
760    })
761}
762
763pub(crate) fn parse_row_header(reader: &mut TtcReader<'_>) -> Result<Option<Vec<u8>>> {
764    reader.skip(1)?;
765    let _num_requests = reader.read_ub2()?;
766    let _iteration_number = reader.read_ub4()?;
767    let _num_iters = reader.read_ub4()?;
768    let _buffer_length = reader.read_ub2()?;
769    let num_bytes = reader.read_ub4()?;
770    let bit_vector = if num_bytes > 0 {
771        reader.skip(1)?;
772        Some(reader.read_raw(num_bytes as usize)?.to_vec())
773    } else {
774        None
775    };
776    let _rxhrid = reader.read_bytes_with_length()?;
777    Ok(bit_vector)
778}
779
780pub(crate) fn parse_bit_vector(reader: &mut TtcReader<'_>, num_columns: usize) -> Result<Vec<u8>> {
781    let _num_columns_sent = reader.read_ub2()?;
782    let num_bytes = num_columns.div_ceil(8);
783    Ok(reader.read_raw(num_bytes)?.to_vec())
784}
785
786pub(crate) fn parse_row_data(
787    reader: &mut TtcReader<'_>,
788    result: &mut QueryResult,
789    bit_vector: Option<&[u8]>,
790    previous_row: Option<&[Option<QueryValue>]>,
791    fetch_long_status: bool,
792) -> Result<()> {
793    let mut row = Vec::with_capacity(result.columns.len());
794    for (index, metadata) in result.columns.iter().enumerate() {
795        if is_duplicate_column(bit_vector, index) {
796            let previous = result
797                .rows
798                .last()
799                .map(Vec::as_slice)
800                .or(previous_row)
801                .and_then(|last| last.get(index))
802                .cloned()
803                .ok_or(ProtocolError::TtcDecode(
804                    "duplicate row data without previous row",
805                ))?;
806            row.push(previous);
807            continue;
808        }
809        row.push(parse_column_value(reader, metadata)?);
810        if fetch_long_status
811            && matches!(
812                metadata.ora_type_num,
813                ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW
814            )
815        {
816            let _null_indicator = reader.read_sb4()?;
817            let _return_code = reader.read_ub4()?;
818        }
819    }
820    result.rows.push(row);
821    Ok(())
822}
823
824pub(crate) fn parse_out_bind_row_data(
825    reader: &mut TtcReader<'_>,
826    result: &mut QueryResult,
827    bind_columns: &[ColumnMetadata],
828    out_bind_indexes: &[usize],
829) -> Result<()> {
830    for index in out_bind_indexes {
831        let metadata = bind_columns.get(*index).ok_or(ProtocolError::TtcDecode(
832            "out bind index without bind metadata",
833        ))?;
834        if metadata.is_array {
835            let num_elements = usize::try_from(reader.read_ub4()?).map_err(|_| {
836                ProtocolError::InvalidPacketLength {
837                    length: usize::MAX,
838                    minimum: 0,
839                }
840            })?;
841            reader.limits().check_batch_rows(num_elements)?;
842            // Cap by remaining bytes (BoundedReader): each element consumes
843            // wire data, so a ub4 count cannot legitimately exceed the payload.
844            let mut values: Vec<Option<QueryValue>> =
845                reader.with_capacity_limited(num_elements, 1, ProtocolLimits::check_batch_rows)?;
846            for _ in 0..num_elements {
847                let value = parse_column_value(reader, metadata)?;
848                let actual_num_bytes = reader.read_sb4()?;
849                if actual_num_bytes != 0 && value.is_some() {
850                    return Err(ProtocolError::TtcDecode("truncated array OUT bind value"));
851                }
852                values.push(value);
853            }
854            result
855                .out_values
856                .push((*index, Some(QueryValue::Array(values))));
857            continue;
858        }
859        let value = parse_column_value(reader, metadata)?;
860        let actual_num_bytes = reader.read_sb4()?;
861        if actual_num_bytes != 0 && value.is_some() {
862            return Err(ProtocolError::TtcDecode("truncated OUT bind value"));
863        }
864        result.out_values.push((*index, value));
865    }
866    Ok(())
867}
868
869pub(crate) fn parse_returning_row_data(
870    reader: &mut TtcReader<'_>,
871    result: &mut QueryResult,
872    bind_columns: &[ColumnMetadata],
873    output_bind_indexes: &[usize],
874) -> Result<()> {
875    for index in output_bind_indexes {
876        let metadata = bind_columns.get(*index).ok_or(ProtocolError::TtcDecode(
877            "return bind index without bind metadata",
878        ))?;
879        let num_rows = usize::try_from(reader.read_ub4()?).map_err(|_| {
880            ProtocolError::InvalidPacketLength {
881                length: usize::MAX,
882                minimum: 0,
883            }
884        })?;
885        reader.limits().check_batch_rows(num_rows)?;
886        // Cap by remaining bytes (BoundedReader); see the OOM note above.
887        let mut values: Vec<Option<QueryValue>> =
888            reader.with_capacity_limited(num_rows, 1, ProtocolLimits::check_batch_rows)?;
889        for _ in 0..num_rows {
890            let value = parse_column_value(reader, metadata)?;
891            let actual_num_bytes = reader.read_sb4()?;
892            if actual_num_bytes != 0 && value.is_some() {
893                return Err(ProtocolError::TtcDecode("truncated DML RETURNING value"));
894            }
895            values.push(value);
896        }
897        result.return_values.push((*index, values));
898    }
899    Ok(())
900}
901
902pub(crate) fn is_duplicate_column(bit_vector: Option<&[u8]>, column_num: usize) -> bool {
903    let Some(bit_vector) = bit_vector else {
904        return false;
905    };
906    let byte_num = column_num / 8;
907    let bit_num = column_num % 8;
908    bit_vector
909        .get(byte_num)
910        .is_some_and(|byte| byte & (1 << bit_num) == 0)
911}
912
913pub(crate) fn parse_column_value(
914    reader: &mut TtcReader<'_>,
915    metadata: &ColumnMetadata,
916) -> Result<Option<QueryValue>> {
917    if metadata.buffer_size == 0
918        && !matches!(
919            metadata.ora_type_num,
920            ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW
921        )
922    {
923        return Ok(None);
924    }
925    match metadata.ora_type_num {
926        ORA_TYPE_NUM_VARCHAR | ORA_TYPE_NUM_CHAR | ORA_TYPE_NUM_LONG => {
927            let Some(bytes) = reader.read_bytes()? else {
928                return Ok(None);
929            };
930            match decode_text_value(&bytes, metadata.csfrm) {
931                Ok(value) => Ok(Some(QueryValue::Text(value))),
932                // preserve the raw bytes so the caller can honor the
933                // configured encoding_errors policy (or raise a Python
934                // UnicodeDecodeError as the reference does)
935                Err(ProtocolError::TtcDecode(_)) => Ok(Some(QueryValue::TextRaw {
936                    bytes,
937                    csfrm: metadata.csfrm,
938                })),
939                Err(err) => Err(err),
940            }
941        }
942        ORA_TYPE_NUM_RAW | ORA_TYPE_NUM_LONG_RAW => Ok(reader.read_bytes()?.map(QueryValue::Raw)),
943        ORA_TYPE_NUM_ROWID => parse_rowid_value(reader).map(|value| value.map(QueryValue::Rowid)),
944        ORA_TYPE_NUM_UROWID => parse_urowid_value(reader).map(|value| value.map(QueryValue::Rowid)),
945        ORA_TYPE_NUM_NUMBER | ORA_TYPE_NUM_BINARY_INTEGER => {
946            let Some(bytes) = reader.read_bytes()? else {
947                return Ok(None);
948            };
949            decode_number_value(&bytes).map(Some)
950        }
951        ORA_TYPE_NUM_BINARY_DOUBLE => {
952            let Some(bytes) = reader.read_bytes()? else {
953                return Ok(None);
954            };
955            decode_binary_double(&bytes)
956                .map(|value| Some(QueryValue::BinaryDouble(value.to_string())))
957        }
958        ORA_TYPE_NUM_BINARY_FLOAT => {
959            let Some(bytes) = reader.read_bytes()? else {
960                return Ok(None);
961            };
962            // f64-widened text matches Python float semantics for BINARY_FLOAT
963            decode_binary_float(&bytes)
964                .map(|value| Some(QueryValue::BinaryDouble(f64::from(value).to_string())))
965        }
966        ORA_TYPE_NUM_BOOLEAN => {
967            let Some(bytes) = reader.read_bytes()? else {
968                return Ok(None);
969            };
970            // reference read_bool: last byte == 1 means true; native
971            // DB_TYPE_BOOLEAN surfaces as a Python bool.
972            let is_true = matches!(bytes.last(), Some(&1));
973            Ok(Some(QueryValue::Boolean(is_true)))
974        }
975        ORA_TYPE_NUM_INTERVAL_DS => {
976            let Some(bytes) = reader.read_bytes()? else {
977                return Ok(None);
978            };
979            decode_interval_ds(&bytes).map(Some)
980        }
981        ORA_TYPE_NUM_INTERVAL_YM => {
982            let Some(bytes) = reader.read_bytes()? else {
983                return Ok(None);
984            };
985            decode_interval_ym(&bytes).map(Some)
986        }
987        ORA_TYPE_NUM_DATE
988        | ORA_TYPE_NUM_TIMESTAMP
989        | ORA_TYPE_NUM_TIMESTAMP_LTZ
990        | ORA_TYPE_NUM_TIMESTAMP_TZ => {
991            let Some(bytes) = reader.read_bytes()? else {
992                return Ok(None);
993            };
994            decode_datetime_value(&bytes).map(Some)
995        }
996        ORA_TYPE_NUM_CLOB | ORA_TYPE_NUM_BLOB | ORA_TYPE_NUM_BFILE => {
997            parse_lob_value(reader, metadata)
998        }
999        ORA_TYPE_NUM_VECTOR => parse_vector_value(reader),
1000        ORA_TYPE_NUM_JSON => parse_json_value(reader),
1001        ORA_TYPE_NUM_CURSOR => parse_cursor_value(reader).map(Some),
1002        ORA_TYPE_NUM_OBJECT => parse_object_value(reader, metadata),
1003        _ => Err(ProtocolError::UnsupportedFeature("query column type")),
1004    }
1005}
1006
1007/// A column value decoded in pass 1 of the borrowed row decode. Scalar values
1008/// that borrow the wire buffer are held directly; values that need a small
1009/// owned arena (synthesized `Number` text, or a cold owned [`QueryValue`]) are
1010/// recorded as a deferred handle into the per-row arena and resolved in pass 2
1011/// once the arena is frozen. This two-pass split is what keeps the borrowed
1012/// path sound under `#![forbid(unsafe_code)]`: no `&str`/`&[u8]` is ever held
1013/// into an arena that is still being grown.
1014enum ColumnSlot<'buf> {
1015    /// SQL NULL.
1016    Null,
1017    /// A value that borrows the wire buffer (or is a small `Copy` value).
1018    Wire(QueryValueRef<'buf>),
1019    /// A `NUMBER` whose canonical text lives at `arena[range]` in the per-row
1020    /// number-text arena.
1021    Number {
1022        range: core::ops::Range<usize>,
1023        is_integer: bool,
1024    },
1025    /// A cold / non-borrowable value parked at `owned[index]` in the per-row
1026    /// owned arena.
1027    Owned(usize),
1028}
1029
1030/// Decode one column into a [`ColumnSlot`], borrowing the wire buffer for the
1031/// hot scalar cases and appending to the per-row arenas for the deferred ones.
1032/// Mirrors [`parse_column_value`] type-for-type; the produced owned value (via
1033/// [`QueryValueRef::to_owned_value`]) is identical to the owned path.
1034///
1035/// `digits` is a caller-owned scratch buffer reused across all cells so the
1036/// per-cell `NUMBER` decode allocates nothing of its own (it writes straight
1037/// into `number_arena`). The hot scalar grid (Text/Raw) borrows the wire buffer
1038/// directly with zero allocation.
1039fn parse_column_slot<'buf>(
1040    reader: &mut TtcReader<'buf>,
1041    metadata: &ColumnMetadata,
1042    number_arena: &mut String,
1043    owned_arena: &mut Vec<QueryValue>,
1044    digits: &mut Vec<u8>,
1045) -> Result<ColumnSlot<'buf>> {
1046    // Park an owned QueryValue in the arena and return the deferred slot. Used
1047    // for the cold / non-borrowable variants so the hot grid stays borrowed.
1048    fn park(owned_arena: &mut Vec<QueryValue>, value: Option<QueryValue>) -> ColumnSlot<'static> {
1049        match value {
1050            None => ColumnSlot::Null,
1051            Some(value) => {
1052                owned_arena.push(value);
1053                ColumnSlot::Owned(owned_arena.len() - 1)
1054            }
1055        }
1056    }
1057
1058    if metadata.buffer_size == 0
1059        && !matches!(
1060            metadata.ora_type_num,
1061            ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW
1062        )
1063    {
1064        return Ok(ColumnSlot::Null);
1065    }
1066    match metadata.ora_type_num {
1067        ORA_TYPE_NUM_VARCHAR | ORA_TYPE_NUM_CHAR | ORA_TYPE_NUM_LONG => {
1068            match reader.read_bytes_borrowed()? {
1069                BorrowedBytes::Null => Ok(ColumnSlot::Null),
1070                // Borrow the wire bytes directly when they are valid UTF-8 and
1071                // not the UTF-16 NCHAR form (which needs re-encoding). Zero copy.
1072                BorrowedBytes::Slice(slice) if metadata.csfrm != CS_FORM_NCHAR => {
1073                    match validate_utf8(slice) {
1074                        Ok(text) => Ok(ColumnSlot::Wire(QueryValueRef::Text(text))),
1075                        Err(_) => Ok(park(
1076                            owned_arena,
1077                            Some(QueryValue::TextRaw {
1078                                bytes: slice.to_vec(),
1079                                csfrm: metadata.csfrm,
1080                            }),
1081                        )),
1082                    }
1083                }
1084                // NCHAR (UTF-16) or chunked long text: fall back to the owned
1085                // decode, which re-encodes to UTF-8 / reassembles chunks.
1086                other => {
1087                    let bytes = other.into_vec();
1088                    let value = match decode_text_value(&bytes, metadata.csfrm) {
1089                        Ok(text) => QueryValue::Text(text),
1090                        Err(ProtocolError::TtcDecode(_)) => QueryValue::TextRaw {
1091                            bytes,
1092                            csfrm: metadata.csfrm,
1093                        },
1094                        Err(err) => return Err(err),
1095                    };
1096                    Ok(park(owned_arena, Some(value)))
1097                }
1098            }
1099        }
1100        ORA_TYPE_NUM_RAW | ORA_TYPE_NUM_LONG_RAW => match reader.read_bytes_borrowed()? {
1101            BorrowedBytes::Null => Ok(ColumnSlot::Null),
1102            BorrowedBytes::Slice(slice) => Ok(ColumnSlot::Wire(QueryValueRef::Raw(slice))),
1103            BorrowedBytes::Chunked(bytes) => Ok(park(owned_arena, Some(QueryValue::Raw(bytes)))),
1104        },
1105        ORA_TYPE_NUM_NUMBER | ORA_TYPE_NUM_BINARY_INTEGER => {
1106            // The wire NUMBER is binary; its canonical decimal text is
1107            // synthesized, so it cannot be borrowed from the buffer. We
1108            // synthesize it *directly* into the per-row number arena (reusing the
1109            // `digits` scratch), borrowing from the arena — zero per-cell heap
1110            // allocation for the common in-range NUMBER.
1111            with_small_bytes(reader, |bytes| match bytes {
1112                None => Ok(ColumnSlot::Null),
1113                Some(bytes) => {
1114                    let start = number_arena.len();
1115                    let is_integer = decode_number_text_into(bytes, digits, number_arena)?;
1116                    Ok(ColumnSlot::Number {
1117                        range: start..number_arena.len(),
1118                        is_integer,
1119                    })
1120                }
1121            })
1122        }
1123        ORA_TYPE_NUM_BOOLEAN => with_small_bytes(reader, |bytes| match bytes {
1124            None => Ok(ColumnSlot::Null),
1125            Some(bytes) => Ok(ColumnSlot::Wire(QueryValueRef::Boolean(matches!(
1126                bytes.last(),
1127                Some(&1)
1128            )))),
1129        }),
1130        ORA_TYPE_NUM_INTERVAL_DS => with_small_bytes(reader, |bytes| match bytes {
1131            None => Ok(ColumnSlot::Null),
1132            Some(bytes) => match decode_interval_ds(bytes)? {
1133                QueryValue::IntervalDS {
1134                    days,
1135                    hours,
1136                    minutes,
1137                    seconds,
1138                    fseconds,
1139                } => Ok(ColumnSlot::Wire(QueryValueRef::IntervalDS {
1140                    days,
1141                    hours,
1142                    minutes,
1143                    seconds,
1144                    fseconds,
1145                })),
1146                other => Ok(park(owned_arena, Some(other))),
1147            },
1148        }),
1149        ORA_TYPE_NUM_INTERVAL_YM => with_small_bytes(reader, |bytes| match bytes {
1150            None => Ok(ColumnSlot::Null),
1151            Some(bytes) => match decode_interval_ym(bytes)? {
1152                QueryValue::IntervalYM { years, months } => {
1153                    Ok(ColumnSlot::Wire(QueryValueRef::IntervalYM {
1154                        years,
1155                        months,
1156                    }))
1157                }
1158                other => Ok(park(owned_arena, Some(other))),
1159            },
1160        }),
1161        ORA_TYPE_NUM_DATE
1162        | ORA_TYPE_NUM_TIMESTAMP
1163        | ORA_TYPE_NUM_TIMESTAMP_LTZ
1164        | ORA_TYPE_NUM_TIMESTAMP_TZ => with_small_bytes(reader, |bytes| match bytes {
1165            None => Ok(ColumnSlot::Null),
1166            Some(bytes) => match decode_datetime_value(bytes)? {
1167                QueryValue::DateTime {
1168                    year,
1169                    month,
1170                    day,
1171                    hour,
1172                    minute,
1173                    second,
1174                    nanosecond,
1175                } => Ok(ColumnSlot::Wire(QueryValueRef::DateTime {
1176                    year,
1177                    month,
1178                    day,
1179                    hour,
1180                    minute,
1181                    second,
1182                    nanosecond,
1183                })),
1184                other => Ok(park(owned_arena, Some(other))),
1185            },
1186        }),
1187        // Everything else (Rowid, BinaryDouble/Float, Clob/Blob/Bfile, Vector,
1188        // Json, Cursor, Object, UROWID) goes through the owned decode and is
1189        // parked in the owned arena. These are the cold / non-borrowable cases.
1190        _ => {
1191            let value = parse_column_value(reader, metadata)?;
1192            Ok(park(owned_arena, value))
1193        }
1194    }
1195}
1196
1197/// Read one TTC byte field and hand the body to `f` as a borrowed `&[u8]`
1198/// without allocating in the common contiguous case. `None` is SQL NULL. The
1199/// rare chunked long form is reassembled into a temporary `Vec` (these small
1200/// fixed-size scalar types — number/boolean/interval/datetime — are never sent
1201/// chunked in practice, so this fallback is effectively dead weight).
1202fn with_small_bytes<'buf, T>(
1203    reader: &mut TtcReader<'buf>,
1204    f: impl FnOnce(Option<&[u8]>) -> Result<T>,
1205) -> Result<T> {
1206    match reader.read_bytes_borrowed()? {
1207        BorrowedBytes::Null => f(None),
1208        BorrowedBytes::Slice(slice) => f(Some(slice)),
1209        BorrowedBytes::Chunked(owned) => f(Some(&owned)),
1210    }
1211}
1212
1213impl BorrowedBytes<'_> {
1214    /// Reassemble into an owned `Vec` (zero-copy `Slice` still copies; `Chunked`
1215    /// reuses its already-owned `Vec`). Used by the non-borrowable text fallback.
1216    fn into_vec(self) -> Vec<u8> {
1217        match self {
1218            BorrowedBytes::Null => Vec::new(),
1219            BorrowedBytes::Slice(slice) => slice.to_vec(),
1220            BorrowedBytes::Chunked(owned) => owned,
1221        }
1222    }
1223}
1224
1225/// A decoded fetch batch that **owns** the wire response buffer and column
1226/// metadata, and yields rows of borrowed [`QueryValueRef`] that point straight
1227/// into that buffer. This is the zero-copy fetch fast path: the common scalar
1228/// grid is decoded with no per-cell allocation.
1229///
1230/// ## Soundness
1231///
1232/// The buffer is owned by the batch and outlives every borrowed row: rows are
1233/// only ever surfaced *inside* the [`for_each_row_ref`](Self::for_each_row_ref)
1234/// callback, whose `&[QueryValueRef]` argument cannot escape (its lifetime is
1235/// bound to the call). The borrow checker therefore guarantees no
1236/// `QueryValueRef` can dangle — there is no self-referential struct and no
1237/// `unsafe`. `Number` text and the cold values borrow per-row arenas that are
1238/// fully built (pass 1) before any reference into them is taken (pass 2), so an
1239/// arena is never grown while borrowed.
1240#[derive(Clone, Debug)]
1241pub struct BorrowedRowBatch {
1242    buffer: Vec<u8>,
1243    columns: Vec<ColumnMetadata>,
1244    /// Byte offset into `buffer` where each row's column values begin.
1245    row_starts: Vec<usize>,
1246    /// Per-row duplicate-column bit vector (server row-compression). `None` (or
1247    /// an absent entry) means every column is present on the wire for that row.
1248    /// A zero bit marks a duplicate column whose value repeats the previous
1249    /// row's value and carries no wire bytes (reference `bit_vector`).
1250    row_bit_vectors: Vec<Option<Vec<u8>>>,
1251    /// Whether this batch carried `LONG`/`LONG RAW` status trailers after each
1252    /// such column (the fetch path sets this; the plain execute path does not).
1253    fetch_long_status: bool,
1254    /// The caller's previous (owned) row, used to resolve duplicate columns in
1255    /// the *first* compressed row of the batch (whose duplicates repeat the row
1256    /// that ended the prior page). `None` outside the compressed-fetch case.
1257    previous_row_seed: Option<Vec<Option<QueryValue>>>,
1258}
1259
1260impl BorrowedRowBatch {
1261    /// Construct a batch from an owned wire `buffer`, the `columns` describing
1262    /// each cell, and the per-row start offsets into `buffer`. Use this for
1263    /// batches with no duplicate-column compression and no LONG trailers (the
1264    /// common synthetic / test case); the framing-aware
1265    /// [`parse_query_response_borrowed`] builds the full form.
1266    pub fn new(buffer: Vec<u8>, columns: Vec<ColumnMetadata>, row_starts: Vec<usize>) -> Self {
1267        Self {
1268            buffer,
1269            columns,
1270            row_starts,
1271            row_bit_vectors: Vec::new(),
1272            fetch_long_status: false,
1273            previous_row_seed: None,
1274        }
1275    }
1276
1277    /// Number of rows in the batch.
1278    pub fn row_count(&self) -> usize {
1279        self.row_starts.len()
1280    }
1281
1282    /// The columns describing each cell.
1283    pub fn columns(&self) -> &[ColumnMetadata] {
1284        &self.columns
1285    }
1286
1287    /// The address range of the owned buffer, for tests asserting that borrowed
1288    /// scalar cells truly point into it (zero-copy).
1289    #[cfg(test)]
1290    pub fn buffer_ptr_range(&self) -> core::ops::Range<usize> {
1291        let start = self.buffer.as_ptr() as usize;
1292        start..start + self.buffer.len()
1293    }
1294
1295    /// Decode each row and invoke `callback` with the row's borrowed cells. The
1296    /// `&[Option<QueryValueRef>]` slice borrows the batch buffer and per-row
1297    /// arenas; it is valid only for the duration of the call (it cannot escape).
1298    /// `None` cells are SQL NULL. Returns the first decode/callback error.
1299    ///
1300    /// Generic over the callback's error type `E` (any error a decode failure
1301    /// can convert into, e.g. the driver crate's own error) so callers are not
1302    /// forced through [`ProtocolError`]; a decode failure is surfaced via
1303    /// `E: From<ProtocolError>`.
1304    pub fn for_each_row_ref<F, E>(&self, mut callback: F) -> std::result::Result<(), E>
1305    where
1306        F: FnMut(&[Option<QueryValueRef<'_>>]) -> std::result::Result<(), E>,
1307        E: From<ProtocolError>,
1308    {
1309        // The two arenas are reused across rows (cleared, not reallocated). They
1310        // are mutated only in pass 1; pass 2 borrows them immutably and that
1311        // borrow is confined to a single loop iteration (it ends before the next
1312        // iteration's `clear()`), which is what keeps the borrow checker — and
1313        // soundness — happy.
1314        let mut number_arena = String::new();
1315        let mut owned_arena: Vec<QueryValue> = Vec::new();
1316        // Reusable scratch: `digits` for the NUMBER decode, `slots` for pass 1,
1317        // `row` for the borrowed cells handed to the callback. All cleared and
1318        // reused across rows so the steady-state per-row decode allocates only
1319        // when an arena/scratch genuinely grows (amortized). `slots`/`row`/
1320        // `digits` borrow nothing across iterations beyond the stable buffer.
1321        let mut digits: Vec<u8> = Vec::new();
1322        let mut slots: Vec<Option<ColumnSlot<'_>>> = Vec::with_capacity(self.columns.len());
1323        // Owned snapshot of the previous row, used only to resolve duplicate
1324        // (bit-vector-compressed) columns, which carry no wire bytes. Empty when
1325        // the batch has no bit vectors (the common case), so it costs nothing.
1326        // Seeded from the caller's prior-page row for the first compressed row.
1327        let mut previous_owned: Vec<Option<QueryValue>> =
1328            self.previous_row_seed.clone().unwrap_or_default();
1329        let uses_bit_vectors = !self.row_bit_vectors.is_empty();
1330
1331        for (row_index, &start) in self.row_starts.iter().enumerate() {
1332            number_arena.clear();
1333            owned_arena.clear();
1334            slots.clear();
1335            let bit_vector = self
1336                .row_bit_vectors
1337                .get(row_index)
1338                .and_then(|bv| bv.as_deref());
1339
1340            // Pass 1: decode all columns, growing the per-row arenas. `slots`
1341            // borrows only the buffer (the deferred Number/Owned slots hold a
1342            // range/index into the arenas, never a borrow of them). Duplicate
1343            // columns carry no wire bytes — their owned previous value is parked
1344            // in the owned arena.
1345            let mut reader = TtcReader::new(&self.buffer[start..]);
1346            for (index, metadata) in self.columns.iter().enumerate() {
1347                if is_duplicate_column(bit_vector, index) {
1348                    let previous = previous_owned.get(index).and_then(Option::as_ref);
1349                    match previous {
1350                        None => slots.push(None),
1351                        Some(value) => {
1352                            owned_arena.push(value.clone());
1353                            slots.push(Some(ColumnSlot::Owned(owned_arena.len() - 1)));
1354                        }
1355                    }
1356                    continue;
1357                }
1358                let slot = parse_column_slot(
1359                    &mut reader,
1360                    metadata,
1361                    &mut number_arena,
1362                    &mut owned_arena,
1363                    &mut digits,
1364                )?;
1365                slots.push(match slot {
1366                    ColumnSlot::Null => None,
1367                    other => Some(other),
1368                });
1369                if self.fetch_long_status
1370                    && matches!(
1371                        metadata.ora_type_num,
1372                        ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW
1373                    )
1374                {
1375                    let _null_indicator = reader.read_sb4()?;
1376                    let _return_code = reader.read_ub4()?;
1377                }
1378            }
1379
1380            // Pass 2: arenas are now frozen — resolve deferred slots into
1381            // borrowed refs. No arena is mutated here, so the borrows are sound.
1382            // `row` is allocated per row: it carries the per-row arena lifetime,
1383            // which (unlike `slots`/`digits`, that borrow only the stable buffer)
1384            // cannot be reused across an arena `clear()`. This is the single
1385            // remaining per-row allocation, versus the owned path's per-row Vec
1386            // *plus* a String per scalar cell.
1387            let row: Vec<Option<QueryValueRef<'_>>> = slots
1388                .iter()
1389                .map(|slot| {
1390                    slot.as_ref().map(|slot| match *slot {
1391                        ColumnSlot::Null => unreachable!("Null slots are stored as None"),
1392                        ColumnSlot::Wire(value) => value,
1393                        ColumnSlot::Number {
1394                            ref range,
1395                            is_integer,
1396                        } => QueryValueRef::Number {
1397                            text: &number_arena[range.clone()],
1398                            is_integer,
1399                        },
1400                        ColumnSlot::Owned(index) => QueryValueRef::Owned(&owned_arena[index]),
1401                    })
1402                })
1403                .collect();
1404
1405            callback(&row)?;
1406
1407            // Snapshot the just-emitted row as owned values for the next row's
1408            // duplicate-column resolution — but only when the batch actually uses
1409            // bit-vector compression, so the zero-copy common path pays nothing.
1410            if uses_bit_vectors {
1411                previous_owned.clear();
1412                previous_owned.extend(row.iter().map(|cell| cell.map(|v| v.to_owned_value())));
1413            }
1414        }
1415        Ok(())
1416    }
1417}
1418
1419/// The borrowed counterpart of a fetched [`QueryResult`]: a [`BorrowedRowBatch`]
1420/// of zero-copy rows plus the response-level fields a caller needs to page and
1421/// finalize the cursor. Produced by [`parse_query_response_borrowed`].
1422#[derive(Clone, Debug)]
1423pub struct BorrowedFetchResult {
1424    /// The decoded rows, borrowing the response buffer.
1425    pub batch: BorrowedRowBatch,
1426    /// Whether the server reports more rows for this cursor.
1427    pub more_rows: bool,
1428    /// Server cursor id (for paging / release).
1429    pub cursor_id: u32,
1430    /// Total affected/processed row count from the end-of-call error message.
1431    pub row_count: u64,
1432}
1433
1434/// Walk a fetch/query response payload and produce a [`BorrowedFetchResult`]
1435/// whose rows borrow `payload` (the caller must keep the owned buffer alive —
1436/// [`BorrowedRowBatch`] owns it). This is the zero-copy companion to
1437/// [`parse_fetch_response_with_context`]: it walks the exact same message
1438/// framing (DESCRIBE_INFO / ROW_HEADER / BIT_VECTOR / ROW_DATA / ERROR /
1439/// END_OF_RESPONSE) but, instead of materializing owned rows, records each
1440/// row's byte offset and bit vector so [`BorrowedRowBatch::for_each_row_ref`]
1441/// can decode them lazily and without per-cell allocation.
1442///
1443/// Scope: the plain query-row case (the fetch path). Out-bind / DML-returning
1444/// rows are not part of a fetch response and are left to the owned path.
1445pub fn parse_query_response_borrowed(
1446    payload: &[u8],
1447    capabilities: ClientCapabilities,
1448    columns: &[ColumnMetadata],
1449    previous_row: Option<&[Option<QueryValue>]>,
1450) -> Result<BorrowedFetchResult> {
1451    parse_query_response_borrowed_with_limits(
1452        payload,
1453        capabilities,
1454        columns,
1455        previous_row,
1456        ProtocolLimits::DEFAULT,
1457    )
1458}
1459
1460pub fn parse_query_response_borrowed_with_limits(
1461    payload: &[u8],
1462    capabilities: ClientCapabilities,
1463    columns: &[ColumnMetadata],
1464    previous_row: Option<&[Option<QueryValue>]>,
1465    limits: ProtocolLimits,
1466) -> Result<BorrowedFetchResult> {
1467    let mut reader = TtcReader::with_limits(payload, limits)?;
1468    reader.limits().check_columns(columns.len())?;
1469    let mut result_columns = columns.to_vec();
1470    let mut more_rows = true;
1471    let mut cursor_id = 0u32;
1472    let mut row_count = 0u64;
1473    let mut row_starts: Vec<usize> = Vec::new();
1474    let mut row_bit_vectors: Vec<Option<Vec<u8>>> = Vec::new();
1475    let mut any_bit_vector = false;
1476    let mut pending_bit_vector: Option<Vec<u8>> = None;
1477    // The fetch path always consumes LONG/LONG RAW status trailers.
1478    let fetch_long_status = true;
1479
1480    while reader.remaining() > 0 {
1481        let message_type = reader.read_u8()?;
1482        match message_type {
1483            0 => {}
1484            TNS_MSG_TYPE_DESCRIBE_INFO => {
1485                let _describe_name = reader.read_bytes()?;
1486                let previous = std::mem::take(&mut result_columns);
1487                let mut described = QueryResult::default();
1488                parse_describe_info(&mut reader, capabilities, &mut described)?;
1489                result_columns = described.columns;
1490                for (index, column) in result_columns.iter_mut().enumerate() {
1491                    if let Some(prev) = previous.get(index) {
1492                        adjust_refetch_metadata(prev, column);
1493                    }
1494                }
1495            }
1496            TNS_MSG_TYPE_ROW_HEADER => {
1497                pending_bit_vector = parse_row_header(&mut reader)?;
1498            }
1499            TNS_MSG_TYPE_BIT_VECTOR => {
1500                pending_bit_vector = Some(parse_bit_vector(&mut reader, result_columns.len())?);
1501            }
1502            TNS_MSG_TYPE_ROW_DATA => {
1503                // Record where this row's column values begin, then advance the
1504                // reader past the row (skipping, not materializing).
1505                reader.limits().check_batch_rows(row_starts.len() + 1)?;
1506                row_starts.push(reader.position());
1507                let bit_vector = pending_bit_vector.take();
1508                any_bit_vector |= bit_vector.is_some();
1509                row_bit_vectors.push(bit_vector.clone());
1510                skip_row_data(
1511                    &mut reader,
1512                    &result_columns,
1513                    bit_vector.as_deref(),
1514                    fetch_long_status,
1515                )?;
1516            }
1517            TNS_MSG_TYPE_PARAMETER => {
1518                let _params = parse_query_return_parameters(&mut reader, false)?;
1519            }
1520            TNS_MSG_TYPE_STATUS => {
1521                let _call_status = reader.read_ub4()?;
1522                let _seq = reader.read_ub2()?;
1523            }
1524            TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
1525                let _ = skip_server_side_piggyback(&mut reader)?;
1526            }
1527            TNS_MSG_TYPE_FLUSH_OUT_BINDS | TNS_MSG_TYPE_END_OF_RESPONSE => break,
1528            TNS_MSG_TYPE_TOKEN => {
1529                let _token = reader.read_ub8()?;
1530            }
1531            TNS_MSG_TYPE_IMPLICIT_RESULTSET => {
1532                // Mirror the owned parser's framing walk so the reader advances
1533                // past the implicit-resultset block identically (the borrowed
1534                // fetch API does not surface child cursors, but it must still
1535                // consume the bytes). reference messages/base.pyx
1536                // `_process_implicit_result`.
1537                let num_results = reader.read_ub4()?;
1538                reader
1539                    .limits()
1540                    .check_length_prefixed_elements(num_results as usize)?;
1541                for _ in 0..num_results {
1542                    let num_bytes = reader.read_u8()?;
1543                    reader.skip(usize::from(num_bytes))?;
1544                    let mut child = QueryResult::default();
1545                    parse_describe_info(&mut reader, capabilities, &mut child)?;
1546                    let _child_cursor_id = reader.read_ub2()?;
1547                }
1548            }
1549            TNS_MSG_TYPE_ERROR => {
1550                let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
1551                if info.cursor_id != 0 {
1552                    cursor_id = u32::from(info.cursor_id);
1553                }
1554                row_count = info.row_count;
1555                if info.number == TNS_ERR_NO_DATA_FOUND && !result_columns.is_empty() {
1556                    more_rows = false;
1557                } else if info.number != 0 && info.number != TNS_ERR_ARRAY_DML_ERRORS {
1558                    return Err(ProtocolError::ServerErrorInfo(Box::new(
1559                        info.into_details(),
1560                    )));
1561                }
1562            }
1563            _ => {
1564                let position = reader.position().saturating_sub(1);
1565                if let Some(message) =
1566                    find_embedded_server_error(payload, capabilities.ttc_field_version, position)
1567                {
1568                    return Err(ProtocolError::ServerError(message));
1569                }
1570                return Err(ProtocolError::UnknownMessageType {
1571                    message_type,
1572                    position,
1573                });
1574            }
1575        }
1576    }
1577
1578    // If the batch never used duplicate-column compression, drop the per-row
1579    // bit-vector vector so iteration takes the zero-copy fast path (no owned
1580    // previous-row snapshotting).
1581    if !any_bit_vector {
1582        row_bit_vectors.clear();
1583    }
1584
1585    let batch = BorrowedRowBatch {
1586        buffer: payload.to_vec(),
1587        columns: result_columns,
1588        row_starts,
1589        row_bit_vectors,
1590        fetch_long_status,
1591        // Seed the first compressed row's duplicate resolution from the caller's
1592        // prior-page row (only consulted when the batch uses bit vectors).
1593        previous_row_seed: any_bit_vector.then(|| {
1594            previous_row
1595                .map(<[Option<QueryValue>]>::to_vec)
1596                .unwrap_or_default()
1597        }),
1598    };
1599
1600    Ok(BorrowedFetchResult {
1601        batch,
1602        more_rows,
1603        cursor_id,
1604        row_count,
1605    })
1606}
1607
1608/// Advance `reader` past one ROW_DATA row **without materializing owned values**
1609/// — this is the offset-capture pass, so it must allocate nothing for the hot
1610/// scalar grid. Mirrors [`parse_row_data`]'s consumption exactly: duplicate
1611/// (bit-vector) columns carry no wire bytes and are skipped; the hot byte-field
1612/// scalar types are skipped with a zero-allocation length-prefixed skip; the
1613/// rare cold types (LOB / Vector / JSON / Cursor / Object / ROWID), whose wire
1614/// framing is non-trivial, fall back to [`parse_column_value`] (which may
1615/// allocate, but those are uncommon). `LONG`/`LONG RAW` status trailers are
1616/// consumed when `fetch_long_status`.
1617fn skip_row_data(
1618    reader: &mut TtcReader<'_>,
1619    columns: &[ColumnMetadata],
1620    bit_vector: Option<&[u8]>,
1621    fetch_long_status: bool,
1622) -> Result<()> {
1623    for (index, metadata) in columns.iter().enumerate() {
1624        if is_duplicate_column(bit_vector, index) {
1625            continue;
1626        }
1627        let consumed_byte_field = metadata.buffer_size != 0
1628            && matches!(
1629                metadata.ora_type_num,
1630                ORA_TYPE_NUM_VARCHAR
1631                    | ORA_TYPE_NUM_CHAR
1632                    | ORA_TYPE_NUM_LONG
1633                    | ORA_TYPE_NUM_RAW
1634                    | ORA_TYPE_NUM_LONG_RAW
1635                    | ORA_TYPE_NUM_NUMBER
1636                    | ORA_TYPE_NUM_BINARY_INTEGER
1637                    | ORA_TYPE_NUM_BINARY_DOUBLE
1638                    | ORA_TYPE_NUM_BINARY_FLOAT
1639                    | ORA_TYPE_NUM_BOOLEAN
1640                    | ORA_TYPE_NUM_INTERVAL_DS
1641                    | ORA_TYPE_NUM_INTERVAL_YM
1642                    | ORA_TYPE_NUM_DATE
1643                    | ORA_TYPE_NUM_TIMESTAMP
1644                    | ORA_TYPE_NUM_TIMESTAMP_LTZ
1645                    | ORA_TYPE_NUM_TIMESTAMP_TZ
1646            );
1647        if consumed_byte_field {
1648            reader.skip_bytes_field()?;
1649        } else {
1650            // Cold / non-byte-field type, or a zero-buffer-size column: defer to
1651            // the full owned decode purely to advance the reader correctly.
1652            let _ = parse_column_value(reader, metadata)?;
1653        }
1654        if fetch_long_status
1655            && matches!(
1656                metadata.ora_type_num,
1657                ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW
1658            )
1659        {
1660            let _null_indicator = reader.read_sb4()?;
1661            let _return_code = reader.read_ub4()?;
1662        }
1663    }
1664    Ok(())
1665}
1666
1667pub(crate) fn encode_rowid_component(mut value: u32, size: usize, output: &mut String) {
1668    let mut encoded = vec![b'A'; size];
1669    for index in 0..size {
1670        let alphabet_index = usize::try_from(value & 0x3f).unwrap_or(0);
1671        encoded[size - index - 1] = TNS_BASE64_ALPHABET[alphabet_index];
1672        value >>= 6;
1673    }
1674    output.extend(encoded.into_iter().map(char::from));
1675}
1676
1677pub(crate) fn encode_physical_rowid(
1678    rba: u32,
1679    partition_id: u16,
1680    block_num: u32,
1681    slot_num: u16,
1682) -> String {
1683    let mut output = String::with_capacity(ORA_TYPE_SIZE_ROWID as usize);
1684    encode_rowid_component(rba, 6, &mut output);
1685    encode_rowid_component(u32::from(partition_id), 3, &mut output);
1686    encode_rowid_component(block_num, 6, &mut output);
1687    encode_rowid_component(u32::from(slot_num), 3, &mut output);
1688    output
1689}
1690
1691pub(crate) fn parse_rowid_value(reader: &mut TtcReader<'_>) -> Result<Option<String>> {
1692    let len = reader.read_u8()?;
1693    if len == 0 || len == crate::wire::TNS_NULL_LENGTH_INDICATOR {
1694        return Ok(None);
1695    }
1696    let rba = reader.read_ub4()?;
1697    let partition_id = reader.read_ub2()?;
1698    reader.skip(1)?;
1699    let block_num = reader.read_ub4()?;
1700    let slot_num = reader.read_ub2()?;
1701    Ok(Some(encode_physical_rowid(
1702        rba,
1703        partition_id,
1704        block_num,
1705        slot_num,
1706    )))
1707}
1708
1709pub(crate) fn encode_logical_urowid(bytes: &[u8]) -> String {
1710    let mut input_offset = 1;
1711    let mut input_len = bytes.len().saturating_sub(1);
1712    let mut output = String::with_capacity((bytes.len() / 3) * 4 + 4);
1713    output.push('*');
1714    while input_len > 0 {
1715        let mut pos = bytes[input_offset] >> 2;
1716        output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1717
1718        pos = (bytes[input_offset] & 0x03) << 4;
1719        if input_len == 1 {
1720            output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1721            break;
1722        }
1723        input_offset += 1;
1724        pos |= (bytes[input_offset] & 0xf0) >> 4;
1725        output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1726
1727        pos = (bytes[input_offset] & 0x0f) << 2;
1728        if input_len == 2 {
1729            output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1730            break;
1731        }
1732        input_offset += 1;
1733        pos |= (bytes[input_offset] & 0xc0) >> 6;
1734        output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1735
1736        pos = bytes[input_offset] & 0x3f;
1737        output.push(char::from(TNS_BASE64_ALPHABET[usize::from(pos)]));
1738        input_offset += 1;
1739        input_len -= 3;
1740    }
1741    output
1742}
1743
1744pub(crate) fn parse_urowid_value(reader: &mut TtcReader<'_>) -> Result<Option<String>> {
1745    if reader.read_bytes()?.is_none() {
1746        return Ok(None);
1747    }
1748    let Some(bytes) = reader.read_bytes()? else {
1749        return Ok(None);
1750    };
1751    if bytes.len() < 13 {
1752        return Err(ProtocolError::TtcDecode("encoded UROWID too short"));
1753    }
1754    if bytes[0] == 1 {
1755        let rba = u32::from_be_bytes([bytes[1], bytes[2], bytes[3], bytes[4]]);
1756        let partition_id = u16::from_be_bytes([bytes[5], bytes[6]]);
1757        let block_num = u32::from_be_bytes([bytes[7], bytes[8], bytes[9], bytes[10]]);
1758        let slot_num = u16::from_be_bytes([bytes[11], bytes[12]]);
1759        Ok(Some(encode_physical_rowid(
1760            rba,
1761            partition_id,
1762            block_num,
1763            slot_num,
1764        )))
1765    } else {
1766        Ok(Some(encode_logical_urowid(&bytes)))
1767    }
1768}
1769
1770pub(crate) fn parse_lob_value(
1771    reader: &mut TtcReader<'_>,
1772    metadata: &ColumnMetadata,
1773) -> Result<Option<QueryValue>> {
1774    let num_bytes = reader.read_ub4()?;
1775    reader.limits().check_response_bytes(num_bytes as usize)?;
1776    if num_bytes == 0 {
1777        return Ok(None);
1778    }
1779    let (size, chunk_size) = if matches!(metadata.ora_type_num, ORA_TYPE_NUM_BFILE) {
1780        (0, 0)
1781    } else {
1782        (reader.read_ub8()?, reader.read_ub4()?)
1783    };
1784    let Some(locator) = reader.read_bytes()? else {
1785        return Ok(None);
1786    };
1787    Ok(Some(QueryValue::Lob(Box::new(LobValue {
1788        ora_type_num: metadata.ora_type_num,
1789        csfrm: metadata.csfrm,
1790        locator,
1791        size,
1792        chunk_size,
1793    }))))
1794}
1795
1796/// Reads a VECTOR value (reference `ReadBuffer.read_vector` in `packet.pyx`).
1797/// VECTOR is sent as a fully-prefetched LOB: the image data precedes the
1798/// (discarded) LOB locator.
1799pub(crate) fn parse_vector_value(reader: &mut TtcReader<'_>) -> Result<Option<QueryValue>> {
1800    let num_bytes = reader.read_ub4()?;
1801    reader.limits().check_response_bytes(num_bytes as usize)?;
1802    if num_bytes == 0 {
1803        return Ok(None);
1804    }
1805    reader.read_ub8()?; // size (unused)
1806    reader.read_ub4()?; // chunk size (unused)
1807    let Some(data) = reader.read_bytes()? else {
1808        return Ok(None);
1809    };
1810    reader.read_bytes()?; // LOB locator (unused)
1811    if data.is_empty() {
1812        return Ok(None);
1813    }
1814    let vector = crate::vector::decode_vector_with_limits(&data, reader.limits())?;
1815    Ok(Some(QueryValue::Vector(Box::new(vector))))
1816}
1817
1818/// Parses a native JSON (`DB_TYPE_JSON`) column value. Like VECTOR, OSON is sent
1819/// as a fully-prefetched LOB: `num_bytes`, `size`, `chunk_size`, the OSON image,
1820/// then a (discarded) LOB locator (reference packet.pyx `read_oson`).
1821pub(crate) fn parse_json_value(reader: &mut TtcReader<'_>) -> Result<Option<QueryValue>> {
1822    let num_bytes = reader.read_ub4()?;
1823    reader.limits().check_response_bytes(num_bytes as usize)?;
1824    if num_bytes == 0 {
1825        return Ok(None);
1826    }
1827    reader.read_ub8()?; // size (unused)
1828    reader.read_ub4()?; // chunk size (unused)
1829    let Some(data) = reader.read_bytes()? else {
1830        return Ok(None);
1831    };
1832    reader.read_bytes()?; // LOB locator (unused)
1833    if data.is_empty() {
1834        return Ok(None);
1835    }
1836    let value = crate::oson::decode_oson_with_limits(&data, reader.limits())?;
1837    Ok(Some(QueryValue::Json(Box::new(value))))
1838}
1839
1840pub(crate) fn parse_object_value(
1841    reader: &mut TtcReader<'_>,
1842    metadata: &ColumnMetadata,
1843) -> Result<Option<QueryValue>> {
1844    let _toid = reader.read_bytes_with_length()?;
1845    let _oid = reader.read_bytes_with_length()?;
1846    let _snapshot = reader.read_bytes_with_length()?;
1847    let _version = reader.read_ub2()?;
1848    let num_bytes = reader.read_ub4()?;
1849    reader.limits().check_response_bytes(num_bytes as usize)?;
1850    reader.skip(2)?;
1851    if num_bytes == 0 {
1852        return Ok(None);
1853    }
1854    let Some(packed_data) = reader.read_bytes()? else {
1855        return Ok(None);
1856    };
1857    Ok(Some(QueryValue::Object(Box::new(ObjectValue {
1858        schema: metadata.object_schema.clone(),
1859        type_name: metadata.object_type_name.clone(),
1860        packed_data,
1861    }))))
1862}
1863
1864pub(crate) fn parse_cursor_value(reader: &mut TtcReader<'_>) -> Result<QueryValue> {
1865    reader.skip(1)?;
1866    let mut result = QueryResult::default();
1867    parse_describe_info(reader, ClientCapabilities::default(), &mut result)?;
1868    let cursor_id = u32::from(reader.read_ub2()?);
1869    Ok(QueryValue::Cursor(Box::new(CursorValue {
1870        columns: result.columns,
1871        cursor_id,
1872    })))
1873}
1874
1875pub(crate) struct QueryReturnParameters {
1876    pub row_counts: Option<Vec<u64>>,
1877    /// CQN registered-query id extracted from the registration-info block
1878    /// (reference base.pyx:1300-1309); `None` when no block was present.
1879    pub query_id: Option<u64>,
1880}
1881
1882pub(crate) fn parse_query_return_parameters(
1883    reader: &mut TtcReader<'_>,
1884    arraydmlrowcounts: bool,
1885) -> Result<QueryReturnParameters> {
1886    let num_params = reader.read_ub2()?;
1887    for _ in 0..num_params {
1888        let _value = reader.read_ub4()?;
1889    }
1890    let num_bytes = reader.read_ub2()?;
1891    if num_bytes > 0 {
1892        reader.skip(usize::from(num_bytes))?;
1893    }
1894    let num_pairs = reader.read_ub2()?;
1895    skip_keyword_value_pairs(reader, num_pairs)?;
1896    // registration info block: the trailing 8 bytes (msb at -4, lsb at -8) are
1897    // the CQN query id when a registration id was sent (reference base.pyx).
1898    let num_bytes = usize::from(reader.read_ub2()?);
1899    let mut query_id = None;
1900    if num_bytes > 0 {
1901        let block = reader.read_raw(num_bytes)?;
1902        if num_bytes >= 8 {
1903            let msb = u32::from_be_bytes([
1904                block[num_bytes - 4],
1905                block[num_bytes - 3],
1906                block[num_bytes - 2],
1907                block[num_bytes - 1],
1908            ]);
1909            let lsb = u32::from_be_bytes([
1910                block[num_bytes - 8],
1911                block[num_bytes - 7],
1912                block[num_bytes - 6],
1913                block[num_bytes - 5],
1914            ]);
1915            query_id = Some((u64::from(msb) << 32) | u64::from(lsb));
1916        }
1917    }
1918    if arraydmlrowcounts {
1919        // reference messages/base.pyx `_process_return_parameters` tail
1920        let num_rows = reader.read_ub4()?;
1921        reader.limits().check_batch_rows(num_rows as usize)?;
1922        // Each ub8 row count consumes at least one byte, so cap the reservation
1923        // by the remaining payload size (BoundedReader).
1924        let mut row_counts: Vec<u64> =
1925            reader.with_capacity_limited(num_rows as usize, 1, ProtocolLimits::check_batch_rows)?;
1926        for _ in 0..num_rows {
1927            row_counts.push(reader.read_ub8()?);
1928        }
1929        return Ok(QueryReturnParameters {
1930            row_counts: Some(row_counts),
1931            query_id,
1932        });
1933    }
1934    Ok(QueryReturnParameters {
1935        row_counts: None,
1936        query_id,
1937    })
1938}
1939
1940#[cfg(test)]
1941mod return_parameter_tests {
1942    use super::*;
1943
1944    #[test]
1945    fn registration_info_block_extracts_query_id_from_lsb_msb_tail() {
1946        let mut writer = TtcWriter::new();
1947        writer.write_ub2(0); // num params
1948        writer.write_ub2(0); // parameter bytes
1949        writer.write_ub2(0); // keyword/value pairs
1950        writer.write_ub2(8); // registration-info block bytes
1951        writer.write_raw(&[
1952            0x55, 0x66, 0x77, 0x88, // lsb
1953            0x11, 0x22, 0x33, 0x44, // msb
1954        ]);
1955        let payload = writer.into_bytes();
1956        let mut reader = TtcReader::new(&payload);
1957
1958        let params = parse_query_return_parameters(&mut reader, false).expect("return parameters");
1959
1960        assert_eq!(params.query_id, Some(0x1122_3344_5566_7788));
1961        assert_eq!(params.row_counts, None);
1962    }
1963}
1964
1965#[cfg(test)]
1966mod borrowed_fetch_tests {
1967    use super::*;
1968    use crate::thin::codecs::encode_number_text;
1969
1970    // Isomorphism proof for the `simd-decode` feature (bead rust-oracledb-63o):
1971    // `validate_utf8` must make the SAME accept/reject decision as the canonical
1972    // `core::str::from_utf8`, AND return the same `&str` on accept, for every
1973    // input — whether or not the SIMD validator is compiled in. This guards the
1974    // hot text path against any divergence in UTF-8 grammar handling.
1975    #[test]
1976    fn validate_utf8_matches_core_accept_reject() {
1977        let cases: &[&[u8]] = &[
1978            b"",
1979            b"a",
1980            b"hello world",
1981            "VARCHAR2 cell".as_bytes(),
1982            "中文 mixed \u{1f600}".as_bytes(), // CJK + emoji (4-byte)
1983            "naïve café".as_bytes(),
1984            &[0x80],                   // lone continuation byte -> reject
1985            &[0xC0, 0x80],             // overlong NUL -> reject
1986            &[0xED, 0xA0, 0x80],       // UTF-16 surrogate -> reject
1987            &[0xF4, 0x90, 0x80, 0x80], // > U+10FFFF -> reject
1988            &[0xFF],                   // invalid lead -> reject
1989            &[0xE2, 0x82],             // truncated 3-byte -> reject
1990        ];
1991        for &bytes in cases {
1992            let core = core::str::from_utf8(bytes);
1993            let ours = validate_utf8(bytes);
1994            assert_eq!(
1995                core.is_ok(),
1996                ours.is_ok(),
1997                "accept/reject diverged for {bytes:02x?}"
1998            );
1999            if let (Ok(a), Ok(b)) = (core, ours) {
2000                assert_eq!(a, b, "accepted text diverged for {bytes:02x?}");
2001            }
2002        }
2003    }
2004
2005    // Build a synthetic column metadata for a scalar type.
2006    fn col(name: &str, ora_type_num: u8, csfrm: u8, buffer_size: u32) -> ColumnMetadata {
2007        ColumnMetadata {
2008            name: name.to_string(),
2009            ora_type_num,
2010            csfrm,
2011            buffer_size,
2012            ..ColumnMetadata::default()
2013        }
2014    }
2015
2016    // Encode one row of [Text, Number, Raw, NULL-text] as the server would frame
2017    // the column values (each a `write_bytes_with_length` run that `read_bytes`
2018    // / `read_bytes_borrowed` consume identically), and return the byte offset
2019    // where the row's column values begin.
2020    fn encode_mixed_row(writer: &mut TtcWriter, text: &str, number: &str, raw: &[u8]) {
2021        writer.write_bytes_with_length(text.as_bytes()).unwrap();
2022        let num = encode_number_text(number).unwrap();
2023        writer.write_bytes_with_length(&num).unwrap();
2024        writer.write_bytes_with_length(raw).unwrap();
2025        writer.write_u8(0); // NULL column (length byte 0)
2026    }
2027
2028    // The borrowed batch decode must yield, for every cell, a value whose
2029    // `to_owned_value()` is bit-for-bit the owned-path `QueryValue`, across a
2030    // mixed Text/Number/Raw/NULL row. And the Text/Raw cells must genuinely
2031    // borrow the batch buffer (zero-copy), not a fresh allocation.
2032    #[test]
2033    fn borrowed_batch_matches_owned_path_for_mixed_row() {
2034        let columns = vec![
2035            col("T", ORA_TYPE_NUM_VARCHAR, CS_FORM_IMPLICIT, 4000),
2036            col("N", ORA_TYPE_NUM_NUMBER, CS_FORM_IMPLICIT, 22),
2037            col("R", ORA_TYPE_NUM_RAW, CS_FORM_IMPLICIT, 2000),
2038            col("Z", ORA_TYPE_NUM_VARCHAR, CS_FORM_IMPLICIT, 4000),
2039        ];
2040
2041        let mut writer = TtcWriter::new();
2042        encode_mixed_row(
2043            &mut writer,
2044            "héllo world",
2045            "-12.5",
2046            &[0xDE, 0xAD, 0xBE, 0xEF],
2047        );
2048        encode_mixed_row(&mut writer, "second", "42", &[0x01]);
2049        let buffer = writer.into_bytes();
2050        let row_starts = vec![0, {
2051            // Find the second row's start by replaying the first row's consumption.
2052            let mut reader = TtcReader::new(&buffer);
2053            for c in &columns {
2054                let _ = parse_column_value(&mut reader, c).unwrap();
2055            }
2056            reader.position()
2057        }];
2058
2059        // Owned path: decode both rows the existing way for the golden values.
2060        let owned_rows: Vec<Vec<Option<QueryValue>>> = row_starts
2061            .iter()
2062            .map(|&start| {
2063                let mut reader = TtcReader::new(&buffer[start..]);
2064                columns
2065                    .iter()
2066                    .map(|c| parse_column_value(&mut reader, c).unwrap())
2067                    .collect()
2068            })
2069            .collect();
2070
2071        // Borrowed path: decode through the batch, collecting owned copies and
2072        // proving the scalar cells borrow the buffer.
2073        let batch = BorrowedRowBatch::new(buffer.clone(), columns.clone(), row_starts);
2074        let buf_ptr_range = batch.buffer_ptr_range();
2075
2076        let mut seen_rows = 0usize;
2077        let mut borrowed_owned: Vec<Vec<Option<QueryValue>>> = Vec::new();
2078        batch
2079            .for_each_row_ref(|row| {
2080                seen_rows += 1;
2081                // Text cell borrows the buffer.
2082                if let Some(QueryValueRef::Text(t)) = row[0] {
2083                    let p = t.as_ptr() as usize;
2084                    assert!(
2085                        buf_ptr_range.contains(&p),
2086                        "Text cell must borrow the batch buffer (zero-copy)"
2087                    );
2088                }
2089                // Raw cell borrows the buffer.
2090                if let Some(QueryValueRef::Raw(r)) = row[2] {
2091                    let p = r.as_ptr() as usize;
2092                    assert!(
2093                        buf_ptr_range.contains(&p),
2094                        "Raw cell must borrow the batch buffer (zero-copy)"
2095                    );
2096                }
2097                borrowed_owned.push(
2098                    row.iter()
2099                        .map(|cell| cell.map(|v| v.to_owned_value()))
2100                        .collect(),
2101                );
2102                Ok::<(), ProtocolError>(())
2103            })
2104            .unwrap();
2105
2106        assert_eq!(seen_rows, 2, "batch yields both rows");
2107        assert_eq!(
2108            borrowed_owned, owned_rows,
2109            "borrowed cells to_owned() must equal the owned-path values"
2110        );
2111    }
2112
2113    // The borrowed response parser walks the *same* message framing as the owned
2114    // `parse_fetch_response_with_context` (ROW_HEADER / BIT_VECTOR / ROW_DATA /
2115    // END_OF_RESPONSE), but instead of building owned rows it captures each
2116    // row's byte offset and hands back a `BorrowedRowBatch`. Decoding that batch
2117    // must reproduce exactly what the owned fetch path produced — duplicate
2118    // columns (bit vector) and all. Fixture is the same one the owned
2119    // `fetch_response_decodes_rows_with_previous_cursor_metadata` test uses.
2120    #[test]
2121    fn borrowed_response_parse_matches_owned_fetch_path() {
2122        use hex::FromHex;
2123        let payload = Vec::from_hex("06020101000205dc0001010101000702c1041d")
2124            .expect("fixture response should be valid hex");
2125        let columns = vec![
2126            col("INTCOL", ORA_TYPE_NUM_NUMBER, CS_FORM_IMPLICIT, 22),
2127            col("NUMBERCOL", ORA_TYPE_NUM_NUMBER, CS_FORM_IMPLICIT, 22),
2128        ];
2129        let previous_row = vec![
2130            Some(QueryValue::number_from_text("2", true)),
2131            Some(QueryValue::number_from_text("0.5", false)),
2132        ];
2133
2134        // Owned golden.
2135        let owned = parse_query_response_with_context(
2136            &payload,
2137            ClientCapabilities::default(),
2138            &columns,
2139            Some(&previous_row),
2140        )
2141        .expect("owned fetch decode");
2142
2143        // Borrowed parse.
2144        let borrowed = parse_query_response_borrowed(
2145            &payload,
2146            ClientCapabilities::default(),
2147            &columns,
2148            Some(&previous_row),
2149        )
2150        .expect("borrowed fetch decode");
2151
2152        assert_eq!(borrowed.more_rows, owned.more_rows);
2153        assert_eq!(borrowed.cursor_id, owned.cursor_id);
2154        assert_eq!(borrowed.batch.row_count(), owned.rows.len());
2155
2156        let mut borrowed_owned: Vec<Vec<Option<QueryValue>>> = Vec::new();
2157        borrowed
2158            .batch
2159            .for_each_row_ref(|row| {
2160                borrowed_owned.push(
2161                    row.iter()
2162                        .map(|cell| cell.map(|v| v.to_owned_value()))
2163                        .collect(),
2164                );
2165                Ok::<(), ProtocolError>(())
2166            })
2167            .expect("iterate borrowed rows");
2168
2169        assert_eq!(
2170            borrowed_owned, owned.rows,
2171            "borrowed batch must reproduce the owned fetch rows (incl. duplicate columns)"
2172        );
2173    }
2174}
2175
2176#[cfg(test)]
2177mod fuzz_regression_tests {
2178    use super::*;
2179
2180    // Regression (w6-fuzz, query_response target): a TNS_MSG_TYPE_IMPLICIT_RESULTSET
2181    // message (27) whose ub4 result count was ~620M made the dispatch loop
2182    // `Vec::with_capacity` several gigabytes of `QueryValue::Cursor` before the
2183    // truncated read failed, tripping libFuzzer's OOM detector. The parser must
2184    // now fail closed (truncated payload) without the giant allocation.
2185    #[test]
2186    fn fuzz_regression_implicit_resultset_oom() {
2187        // payload: type=27, ub4 length byte 4, value 0x25000000 (~620M), then EOF
2188        let payload = [27u8, 4, 37, 0, 0, 0];
2189        let err = parse_query_response(&payload, ClientCapabilities::default())
2190            .expect_err("oversized implicit-resultset count must fail closed");
2191        assert!(
2192            matches!(
2193                err,
2194                ProtocolError::TtcDecode(_) | ProtocolError::ResourceLimit { .. }
2195            ),
2196            "expected fail-closed protocol error, got {err:?}"
2197        );
2198    }
2199
2200    // BoundedReader invariant (l2p), query-columns family: a DESCRIBE_INFO
2201    // message (16) declaring a huge num_columns (ub4 ~620M) with no column
2202    // metadata bytes following must fail closed, not pre-allocate one
2203    // ColumnMetadata per declared column. parse_describe_info grows the column
2204    // Vec via push (no speculative with_capacity), and the first
2205    // parse_column_metadata read past the end errors.
2206    #[test]
2207    fn describe_info_oversized_column_count_fails_closed_not_oom() {
2208        // type=16 DESCRIBE_INFO; describe_name read_bytes len byte 0 (null);
2209        // max_row_size ub4 = 0; num_columns ub4 (len byte 4) = 0x25000000
2210        // (~620M); then EOF before the skip(1)/column records.
2211        let payload = [16u8, 0, 0, 4, 0x25, 0x00, 0x00, 0x00];
2212        let err = parse_query_response(&payload, ClientCapabilities::default())
2213            .expect_err("oversized column count must fail closed");
2214        assert!(
2215            matches!(
2216                err,
2217                ProtocolError::TtcDecode(_) | ProtocolError::ResourceLimit { .. }
2218            ),
2219            "expected fail-closed protocol error, got {err:?}"
2220        );
2221    }
2222
2223    #[test]
2224    fn describe_info_respects_protocol_column_limit() {
2225        // type=16 DESCRIBE_INFO; describe_name null; max_row_size=0;
2226        // num_columns=2. A max_columns=1 policy should fail before any column
2227        // metadata allocation/parsing.
2228        let payload = [TNS_MSG_TYPE_DESCRIBE_INFO, 0, 0, 1, 2];
2229        let limits = ProtocolLimits {
2230            max_columns: 1,
2231            ..ProtocolLimits::DEFAULT
2232        };
2233        let err = parse_query_response_with_limits(&payload, ClientCapabilities::default(), limits)
2234            .expect_err("column count above policy must fail");
2235        assert!(
2236            matches!(
2237                err,
2238                ProtocolError::ResourceLimit {
2239                    limit: "columns",
2240                    observed: 2,
2241                    maximum: 1,
2242                }
2243            ),
2244            "expected column ResourceLimit, got {err:?}"
2245        );
2246    }
2247
2248    // BoundedReader invariant (l2p), out-bind array family: an array OUT bind
2249    // whose ub4 num_elements is enormous (~620M) but carries no element bytes
2250    // must fail closed via with_capacity_bounded + the per-element read, not
2251    // reserve gigabytes of Option<QueryValue>.
2252    #[test]
2253    fn out_bind_array_oversized_element_count_fails_closed_not_oom() {
2254        let metadata = ColumnMetadata {
2255            name: "ARR".to_string(),
2256            ora_type_num: ORA_TYPE_NUM_NUMBER,
2257            is_array: true,
2258            ..ColumnMetadata::default()
2259        };
2260        let bind_columns = [metadata];
2261        let out_bind_indexes = [0usize];
2262        // ub4 num_elements: len byte 4, value 0x25000000, then no elements.
2263        let payload = [4u8, 0x25, 0x00, 0x00, 0x00];
2264        let mut reader = TtcReader::new(&payload);
2265        let mut result = QueryResult::default();
2266        let err =
2267            parse_out_bind_row_data(&mut reader, &mut result, &bind_columns, &out_bind_indexes)
2268                .expect_err("oversized array OUT bind count must fail closed");
2269        assert!(
2270            matches!(
2271                err,
2272                ProtocolError::TtcDecode(_) | ProtocolError::ResourceLimit { .. }
2273            ),
2274            "expected fail-closed protocol error, got {err:?}"
2275        );
2276    }
2277}