Skip to main content

oracledb_protocol/thin/
execute.rs

1#![forbid(unsafe_code)]
2
3use super::*;
4
5pub fn build_execute_query_payload(sql: &str, prefetch_rows: u32) -> Result<Vec<u8>> {
6    build_execute_query_payload_with_seq(sql, prefetch_rows, 1)
7}
8
9pub fn build_execute_query_payload_with_seq(
10    sql: &str,
11    prefetch_rows: u32,
12    seq_num: u8,
13) -> Result<Vec<u8>> {
14    build_execute_payload_with_seq(sql, prefetch_rows, seq_num, true)
15}
16
17pub fn build_execute_payload_with_seq(
18    sql: &str,
19    prefetch_rows: u32,
20    seq_num: u8,
21    is_query: bool,
22) -> Result<Vec<u8>> {
23    build_execute_payload_with_binds_with_seq(sql, prefetch_rows, seq_num, is_query, &[])
24}
25
26pub fn build_execute_payload_with_binds_with_seq(
27    sql: &str,
28    prefetch_rows: u32,
29    seq_num: u8,
30    is_query: bool,
31    binds: &[BindValue],
32) -> Result<Vec<u8>> {
33    let bind_rows = if binds.is_empty() {
34        Vec::new()
35    } else {
36        vec![binds.to_vec()]
37    };
38    build_execute_payload_with_bind_rows_with_seq(sql, prefetch_rows, seq_num, is_query, &bind_rows)
39}
40
41pub fn build_execute_payload_with_bind_rows_with_seq(
42    sql: &str,
43    prefetch_rows: u32,
44    seq_num: u8,
45    is_query: bool,
46    bind_rows: &[Vec<BindValue>],
47) -> Result<Vec<u8>> {
48    build_execute_payload_with_bind_rows_and_options_with_seq(
49        sql,
50        prefetch_rows,
51        seq_num,
52        is_query,
53        bind_rows,
54        ExecuteOptions::default(),
55    )
56}
57
58/// Execute message with an explicit pipeline token; pipelined operations
59/// carry tokens 1..N (impl/thin/connection.pyx `_create_messages_for_pipeline`),
60/// everything else carries 0.
61pub fn build_execute_payload_with_bind_rows_with_seq_and_token(
62    sql: &str,
63    prefetch_rows: u32,
64    seq_num: u8,
65    is_query: bool,
66    bind_rows: &[Vec<BindValue>],
67    token_num: u64,
68) -> Result<Vec<u8>> {
69    build_execute_payload_with_bind_rows_and_options_with_seq(
70        sql,
71        prefetch_rows,
72        seq_num,
73        is_query,
74        bind_rows,
75        ExecuteOptions {
76            token_num,
77            ..ExecuteOptions::default()
78        },
79    )
80}
81
82/// Builds a close-cursors piggyback message (reference
83/// `_write_close_cursors_piggyback` + `write_cursors_to_close`); it is
84/// prepended to the next regular message in the same data packet and
85/// consumes a TTC sequence number of its own.
86pub fn build_close_cursors_piggyback(cursor_ids: &[u32], seq_num: u8) -> Vec<u8> {
87    let mut writer = TtcWriter::new();
88    writer.write_u8(TNS_MSG_TYPE_PIGGYBACK);
89    writer.write_u8(TNS_FUNC_CLOSE_CURSORS);
90    writer.write_u8(seq_num);
91    writer.write_ub8(0); // token number (23.1 ext 1+)
92    writer.write_u8(1); // pointer
93    writer.write_ub4(u32::try_from(cursor_ids.len()).unwrap_or(u32::MAX));
94    for cursor_id in cursor_ids {
95        writer.write_ub4(*cursor_id);
96    }
97    writer.into_bytes()
98}
99
100pub fn build_execute_payload_with_bind_rows_and_options_with_seq(
101    sql: &str,
102    prefetch_rows: u32,
103    seq_num: u8,
104    is_query: bool,
105    bind_rows: &[Vec<BindValue>],
106    exec_options: ExecuteOptions,
107) -> Result<Vec<u8>> {
108    let sql_bytes = sql.as_bytes();
109    let sql_len =
110        u32::try_from(sql_bytes.len()).map_err(|_| ProtocolError::InvalidPacketLength {
111            length: sql_bytes.len(),
112            minimum: 0,
113        })?;
114    let bind_count = bind_rows.first().map_or(0, Vec::len);
115    for row in bind_rows {
116        if row.len() != bind_count {
117            return Err(ProtocolError::TtcDecode("inconsistent bind row width"));
118        }
119    }
120    let bind_count = u32::try_from(bind_count).map_err(|_| ProtocolError::InvalidPacketLength {
121        length: bind_count,
122        minimum: 0,
123    })?;
124    let bind_row_count =
125        u32::try_from(bind_rows.len()).map_err(|_| ProtocolError::InvalidPacketLength {
126            length: bind_rows.len(),
127            minimum: 0,
128        })?;
129    // Preallocate the writer so the small per-field `write_*` pushes do not grow
130    // the backing `Vec` through several doublings (each a heap allocation). The
131    // fixed message header + the inline SQL bytes dominate the no-bind/small-bind
132    // common case (e.g. `select 1 from dual` is 87 bytes total); bind columns add
133    // their own bytes and may still grow the buffer, but the hot small-statement
134    // path now builds in a single allocation. The written bytes are unchanged —
135    // this is a pure allocation optimization (see `TtcWriter::with_capacity`).
136    let writer_capacity = 96 + sql_bytes.len();
137    let mut writer = TtcWriter::with_capacity(writer_capacity);
138    writer.write_function_code_with_seq(TNS_FUNC_EXECUTE, seq_num);
139    writer.write_ub8(exec_options.token_num);
140
141    let is_plsql = statement_is_plsql(sql);
142    let parse_only = exec_options.parse_only;
143    // a fresh parse is required when the statement has no open server cursor
144    // or is DDL (reference execute.pyx:88-89)
145    let needs_parse = exec_options.cursor_id == 0 || crate::sql::statement_is_ddl(sql);
146    // a scroll request only repositions the open cursor and fetches; the
147    // EXECUTE/BIND options are suppressed (reference execute.pyx:82-84,105)
148    let scroll_operation = exec_options.scroll_operation;
149    let mut options = 0;
150    if needs_parse {
151        options |= TNS_EXEC_OPTION_PARSE;
152    }
153    if !parse_only && !scroll_operation {
154        options |= TNS_EXEC_OPTION_EXECUTE;
155    }
156    if is_query {
157        if parse_only {
158            options |= TNS_EXEC_OPTION_DESCRIBE;
159        } else if !exec_options.no_prefetch {
160            // reference execute.pyx:99 gates FETCH on `not stmt._no_prefetch`;
161            // a no-prefetch statement (VECTOR columns) leaves the rows to be
162            // retrieved by the follow-up define-fetch instead.
163            options |= TNS_EXEC_OPTION_FETCH;
164        }
165    }
166    if bind_count > 0 && !scroll_operation {
167        options |= TNS_EXEC_OPTION_BIND;
168    }
169    if is_plsql {
170        if bind_count > 0 {
171            options |= TNS_EXEC_OPTION_PLSQL_BIND;
172        }
173    } else if !parse_only {
174        options |= TNS_EXEC_OPTION_NOT_PLSQL;
175    }
176    if exec_options.batcherrors {
177        options |= TNS_EXEC_OPTION_BATCH_ERRORS;
178    }
179    let num_iters = if is_query && !parse_only {
180        prefetch_rows
181    } else {
182        1
183    };
184    // al8i4[1]: queries report 0 on first execute and the iteration count on
185    // re-execute of an open cursor (execute.pyx:187-193)
186    let exec_count = if parse_only {
187        0
188    } else if is_query {
189        if exec_options.cursor_id == 0 {
190            0
191        } else {
192            num_iters
193        }
194    } else {
195        bind_row_count.max(1)
196    };
197    let query_flag = u32::from(is_query);
198    // reference sets the implicit-resultset flag on every full execute with
199    // SQL (execute.pyx:81-82); anonymous PL/SQL blocks need it for
200    // dbms_sql.return_result (ORA-29481 otherwise)
201    let mut exec_flags = if parse_only {
202        0
203    } else {
204        TNS_EXEC_FLAGS_IMPLICIT_RESULTSET
205    };
206    if exec_options.arraydmlrowcounts {
207        exec_flags |= TNS_EXEC_FLAGS_DML_ROWCOUNTS;
208    }
209    // scrollable cursors keep the result set open across fetches and avoid the
210    // server cancelling on end-of-fetch (reference execute.pyx:85-87)
211    if exec_options.scrollable && !parse_only {
212        exec_flags |= TNS_EXEC_FLAGS_SCROLLABLE;
213        exec_flags |= TNS_EXEC_FLAGS_NO_CANCEL_ON_EOF;
214    }
215    writer.write_ub4(options);
216    writer.write_ub4(exec_options.cursor_id);
217    if needs_parse {
218        writer.write_u8(1); // pointer (cursor id)
219        writer.write_ub4(sql_len);
220    } else {
221        writer.write_u8(0); // pointer (cursor id)
222        writer.write_ub4(0);
223    }
224    writer.write_u8(1);
225    writer.write_ub4(13);
226    writer.write_u8(0);
227    writer.write_u8(0);
228    writer.write_ub4(0);
229    writer.write_ub4(num_iters);
230    writer.write_ub4(TNS_MAX_LONG_LENGTH);
231    if bind_count == 0 {
232        writer.write_u8(0);
233        writer.write_ub4(0);
234    } else {
235        writer.write_u8(1);
236        writer.write_ub4(bind_count);
237    }
238    // CQN registration id (registerquery) split lsb/msb across the al8i4 slots
239    // (reference execute.pyx:116-119,156,163). Zero for ordinary executes.
240    let registration_id_lsb = (exec_options.registration_id & 0xffff_ffff) as u32;
241    let registration_id_msb = ((exec_options.registration_id >> 32) & 0xffff_ffff) as u32;
242    writer.write_u8(0);
243    writer.write_u8(0);
244    writer.write_u8(0);
245    writer.write_u8(0);
246    writer.write_u8(0);
247    writer.write_u8(0);
248    writer.write_ub4(0);
249    writer.write_ub4(registration_id_lsb); // registration id (lsb)
250    writer.write_u8(0); // pointer (al8objlist)
251    writer.write_u8(1); // pointer (al8objlen)
252    writer.write_u8(0); // pointer (al8blv)
253    writer.write_ub4(0); // al8blvl
254    writer.write_u8(0); // pointer (al8dnam)
255    writer.write_ub4(0); // al8dnaml
256    writer.write_ub4(registration_id_msb); // registration id (msb)
257    if exec_options.arraydmlrowcounts {
258        writer.write_u8(1); // pointer (al8pidmlrc)
259        writer.write_ub4(exec_count); // al8pidmlrcbl
260        writer.write_u8(1); // pointer (al8pidmlrcl)
261    } else {
262        writer.write_u8(0); // pointer (al8pidmlrc)
263        writer.write_ub4(0); // al8pidmlrcbl
264        writer.write_u8(0); // pointer (al8pidmlrcl)
265    }
266    writer.write_u8(0); // pointer (al8sqlsig)
267    writer.write_ub4(0); // SQL signature length
268    writer.write_u8(0); // pointer (SQL ID)
269    writer.write_ub4(0); // allocated size of SQL ID
270    writer.write_u8(0); // pointer (length of SQL ID)
271    writer.write_u8(0); // pointer (chunk ids)
272    writer.write_ub4(0); // number of chunk ids
273
274    if needs_parse {
275        writer.write_bytes_with_length(sql_bytes)?;
276        writer.write_ub4(1); // al8i4[0] parse
277    } else {
278        writer.write_ub4(0); // al8i4[0] parse
279    }
280    writer.write_ub4(exec_count);
281    writer.write_ub4(0);
282    writer.write_ub4(0);
283    writer.write_ub4(0);
284    writer.write_ub4(0);
285    writer.write_ub4(0);
286    writer.write_ub4(query_flag); // al8i4[7] is query
287    writer.write_ub4(0); // al8i4[8]
288    writer.write_ub4(exec_flags); // al8i4[9] execute flags
289    writer.write_ub4(exec_options.fetch_orientation); // al8i4[10] fetch orientation
290    writer.write_ub4(exec_options.fetch_pos); // al8i4[11] fetch pos
291    writer.write_ub4(0); // al8i4[12]
292                         // a scroll request carries no bind parameters (reference suppresses the
293                         // BIND option and never writes bind params for scroll_operation)
294    if !bind_rows.is_empty() && !scroll_operation {
295        write_bind_params(
296            &mut writer,
297            bind_rows,
298            is_plsql,
299            exec_options.max_string_size,
300        )?;
301    }
302    Ok(writer.into_bytes())
303}
304
305pub(crate) fn write_bind_params(
306    writer: &mut TtcWriter,
307    bind_rows: &[Vec<BindValue>],
308    is_plsql: bool,
309    max_string_size: u32,
310) -> Result<()> {
311    let Some(first_row) = bind_rows.first() else {
312        return Ok(());
313    };
314    let mut bind_metadata = Vec::with_capacity(first_row.len());
315    for index in 0..first_row.len() {
316        bind_metadata.push(write_bind_metadata_for_rows(writer, bind_rows, index)?);
317    }
318    for row in bind_rows {
319        if !is_plsql && row.iter().all(BindValue::is_output_only) {
320            continue;
321        }
322        writer.write_u8(TNS_MSG_TYPE_ROW_DATA);
323        for index in bind_row_value_order(row, &bind_metadata, is_plsql, max_string_size) {
324            let value = &row[index];
325            let (_ora_type_num, csfrm, _buffer_size) = bind_metadata
326                .get(index)
327                .copied()
328                .unwrap_or((ORA_TYPE_NUM_VARCHAR, CS_FORM_IMPLICIT, 1));
329            write_bind_value(writer, value, csfrm)?;
330        }
331    }
332    Ok(())
333}
334
335pub(crate) fn bind_row_value_order(
336    row: &[BindValue],
337    bind_metadata: &[(u8, u8, u32)],
338    is_plsql: bool,
339    max_string_size: u32,
340) -> Vec<usize> {
341    let mut non_long = Vec::with_capacity(row.len());
342    let mut long = Vec::new();
343    for (index, value) in row.iter().enumerate() {
344        if !is_plsql && value.is_output_only() {
345            continue;
346        }
347        // non-LONG values are written first followed by any LONG values; a
348        // value is "long" when its buffer size exceeds the maximum string
349        // size (reference messages/base.pyx:1529-1565 keys this off
350        // `metadata.buffer_size > buf._caps.max_string_size`)
351        if !is_plsql
352            && bind_metadata
353                .get(index)
354                .is_some_and(|(ora_type_num, _, buffer_size)| {
355                    matches!(*ora_type_num, ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW)
356                        || *buffer_size > max_string_size
357                })
358        {
359            long.push(index);
360        } else {
361            non_long.push(index);
362        }
363    }
364    non_long.extend(long);
365    non_long
366}
367
368pub(crate) fn write_bind_metadata_for_rows(
369    writer: &mut TtcWriter,
370    bind_rows: &[Vec<BindValue>],
371    index: usize,
372) -> Result<(u8, u8, u32)> {
373    let Some(first_row) = bind_rows.first() else {
374        return Ok((ORA_TYPE_NUM_VARCHAR, CS_FORM_IMPLICIT, 1));
375    };
376    let Some(first_value) = first_row.get(index) else {
377        return Ok((ORA_TYPE_NUM_VARCHAR, CS_FORM_IMPLICIT, 1));
378    };
379    let mut metadata_value = first_value;
380    let (mut ora_type_num, mut csfrm, mut buffer_size) = bind_metadata(first_value);
381    let mut needs_type_inference = matches!(first_value, BindValue::Null);
382    for row in bind_rows.iter().skip(1) {
383        let Some(value) = row.get(index) else {
384            continue;
385        };
386        if needs_type_inference {
387            if matches!(value, BindValue::Null) {
388                continue;
389            }
390            metadata_value = value;
391            (ora_type_num, csfrm, buffer_size) = bind_metadata(value);
392            needs_type_inference = false;
393            continue;
394        }
395        let (row_ora_type_num, row_csfrm, row_buffer_size) = bind_metadata(value);
396        if row_csfrm == csfrm && bind_metadata_types_are_compatible(ora_type_num, row_ora_type_num)
397        {
398            ora_type_num = promoted_bind_metadata_type(ora_type_num, row_ora_type_num);
399            buffer_size = buffer_size.max(row_buffer_size);
400        }
401    }
402    write_bind_metadata_with_type(writer, metadata_value, ora_type_num, csfrm, buffer_size)?;
403    Ok((ora_type_num, csfrm, buffer_size))
404}
405
406pub(crate) fn bind_metadata_types_are_compatible(left: u8, right: u8) -> bool {
407    left == right
408        || (matches!(
409            left,
410            ORA_TYPE_NUM_CHAR | ORA_TYPE_NUM_VARCHAR | ORA_TYPE_NUM_LONG
411        ) && matches!(
412            right,
413            ORA_TYPE_NUM_CHAR | ORA_TYPE_NUM_VARCHAR | ORA_TYPE_NUM_LONG
414        ))
415        || (matches!(left, ORA_TYPE_NUM_RAW | ORA_TYPE_NUM_LONG_RAW)
416            && matches!(right, ORA_TYPE_NUM_RAW | ORA_TYPE_NUM_LONG_RAW))
417}
418
419pub(crate) fn promoted_bind_metadata_type(left: u8, right: u8) -> u8 {
420    if matches!(left, ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW) {
421        left
422    } else if matches!(right, ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW) {
423        right
424    } else {
425        left
426    }
427}
428
429#[cfg(test)]
430mod tests {
431    use super::*;
432
433    fn find_subslice(haystack: &[u8], needle: &[u8], label: &str) -> usize {
434        haystack
435            .windows(needle.len())
436            .position(|window| window == needle)
437            .unwrap_or_else(|| panic!("{label} not found in execute payload"))
438    }
439
440    #[test]
441    fn register_query_execute_payload_splits_registration_id() {
442        let registration_id = 0x1122_3344_5566_7788_u64;
443        let payload = build_execute_payload_with_bind_rows_and_options_with_seq(
444            "select * from rust_register_query_t",
445            0,
446            7,
447            true,
448            &[],
449            ExecuteOptions {
450                registration_id,
451                ..ExecuteOptions::default()
452            },
453        )
454        .expect("execute payload");
455
456        let lsb = [0x55, 0x66, 0x77, 0x88];
457        let msb = [0x11, 0x22, 0x33, 0x44];
458        let lsb_pos = payload
459            .windows(lsb.len())
460            .position(|window| window == lsb)
461            .expect("registration id lsb is encoded");
462        let msb_pos = payload
463            .windows(msb.len())
464            .position(|window| window == msb)
465            .expect("registration id msb is encoded");
466
467        assert!(
468            lsb_pos < msb_pos,
469            "execute payload writes registration id lsb before msb"
470        );
471    }
472
473    #[test]
474    fn long_bind_split_uses_negotiated_max_string_size() {
475        let row = vec![
476            BindValue::Raw(vec![b'A'; 3_999]),
477            BindValue::Raw(vec![b'B'; 4_001]),
478            BindValue::Raw(vec![b'C'; 32_767]),
479            BindValue::Text("ZMARK".to_string()),
480        ];
481        let metadata = row.iter().map(bind_metadata).collect::<Vec<_>>();
482
483        assert_eq!(
484            bind_row_value_order(&row, &metadata, false, 4_000),
485            vec![0, 3, 1, 2],
486            "STANDARD max_string_size=4000 writes >4000-byte binds in the long section"
487        );
488        assert_eq!(
489            bind_row_value_order(&row, &metadata, false, 32_767),
490            vec![0, 1, 2, 3],
491            "32K-capable connections keep <=32767-byte binds in ordinary order"
492        );
493        assert_eq!(
494            bind_row_value_order(&row, &metadata, true, 4_000),
495            vec![0, 1, 2, 3],
496            "PL/SQL bind order is unchanged"
497        );
498
499        let standard = build_execute_payload_with_bind_rows_and_options_with_seq(
500            "insert into t values (:1, :2, :3, :4)",
501            1,
502            7,
503            false,
504            &[row.clone()],
505            ExecuteOptions::default().with_max_string_size(4_000),
506        )
507        .expect("STANDARD execute payload");
508        let standard_a = find_subslice(&standard, &[b'A'; 32], "STANDARD A value");
509        let standard_b = find_subslice(&standard, &[b'B'; 32], "STANDARD B value");
510        let standard_c = find_subslice(&standard, &[b'C'; 32], "STANDARD C value");
511        let standard_z = find_subslice(&standard, b"ZMARK", "STANDARD raw marker");
512        assert!(
513            standard_a < standard_z && standard_z < standard_b && standard_b < standard_c,
514            "STANDARD payload must write non-long values before >4000-byte long values"
515        );
516
517        let extended = build_execute_payload_with_bind_rows_and_options_with_seq(
518            "insert into t values (:1, :2, :3, :4)",
519            1,
520            8,
521            false,
522            &[row],
523            ExecuteOptions::default().with_max_string_size(32_767),
524        )
525        .expect("32K execute payload");
526        let extended_a = find_subslice(&extended, &[b'A'; 32], "32K A value");
527        let extended_b = find_subslice(&extended, &[b'B'; 32], "32K B value");
528        let extended_c = find_subslice(&extended, &[b'C'; 32], "32K C value");
529        let extended_z = find_subslice(&extended, b"ZMARK", "32K raw marker");
530        assert!(
531            extended_a < extended_b && extended_b < extended_c && extended_c < extended_z,
532            "32K payload must keep <=32767-byte values in bind order"
533        );
534    }
535}