Skip to main content

oracledb_protocol/thin/
execute.rs

1#![forbid(unsafe_code)]
2
3use super::*;
4
5pub fn build_execute_query_payload(sql: &str, prefetch_rows: u32) -> Result<Vec<u8>> {
6    build_execute_query_payload_with_seq(sql, prefetch_rows, 1)
7}
8
9pub fn build_execute_query_payload_with_seq(
10    sql: &str,
11    prefetch_rows: u32,
12    seq_num: u8,
13) -> Result<Vec<u8>> {
14    build_execute_payload_with_seq(sql, prefetch_rows, seq_num, true)
15}
16
17pub fn build_execute_payload_with_seq(
18    sql: &str,
19    prefetch_rows: u32,
20    seq_num: u8,
21    is_query: bool,
22) -> Result<Vec<u8>> {
23    build_execute_payload_with_binds_with_seq(sql, prefetch_rows, seq_num, is_query, &[])
24}
25
26pub fn build_execute_payload_with_binds_with_seq(
27    sql: &str,
28    prefetch_rows: u32,
29    seq_num: u8,
30    is_query: bool,
31    binds: &[BindValue],
32) -> Result<Vec<u8>> {
33    let bind_rows = if binds.is_empty() {
34        Vec::new()
35    } else {
36        vec![binds.to_vec()]
37    };
38    build_execute_payload_with_bind_rows_with_seq(sql, prefetch_rows, seq_num, is_query, &bind_rows)
39}
40
41pub fn build_execute_payload_with_bind_rows_with_seq(
42    sql: &str,
43    prefetch_rows: u32,
44    seq_num: u8,
45    is_query: bool,
46    bind_rows: &[Vec<BindValue>],
47) -> Result<Vec<u8>> {
48    build_execute_payload_with_bind_rows_and_options_with_seq(
49        sql,
50        prefetch_rows,
51        seq_num,
52        is_query,
53        bind_rows,
54        ExecuteOptions::default(),
55    )
56}
57
58/// Execute message with an explicit pipeline token; pipelined operations
59/// carry tokens 1..N (impl/thin/connection.pyx `_create_messages_for_pipeline`),
60/// everything else carries 0.
61pub fn build_execute_payload_with_bind_rows_with_seq_and_token(
62    sql: &str,
63    prefetch_rows: u32,
64    seq_num: u8,
65    is_query: bool,
66    bind_rows: &[Vec<BindValue>],
67    token_num: u64,
68) -> Result<Vec<u8>> {
69    build_execute_payload_with_bind_rows_and_options_with_seq(
70        sql,
71        prefetch_rows,
72        seq_num,
73        is_query,
74        bind_rows,
75        ExecuteOptions {
76            token_num,
77            ..ExecuteOptions::default()
78        },
79    )
80}
81
82/// Builds a close-cursors piggyback message (reference
83/// `_write_close_cursors_piggyback` + `write_cursors_to_close`); it is
84/// prepended to the next regular message in the same data packet and
85/// consumes a TTC sequence number of its own.
86pub fn build_close_cursors_piggyback(cursor_ids: &[u32], seq_num: u8) -> Vec<u8> {
87    let mut writer = TtcWriter::new();
88    writer.write_u8(TNS_MSG_TYPE_PIGGYBACK);
89    writer.write_u8(TNS_FUNC_CLOSE_CURSORS);
90    writer.write_u8(seq_num);
91    writer.write_ub8(0); // token number (23.1 ext 1+)
92    writer.write_u8(1); // pointer
93    writer.write_ub4(u32::try_from(cursor_ids.len()).unwrap_or(u32::MAX));
94    for cursor_id in cursor_ids {
95        writer.write_ub4(*cursor_id);
96    }
97    writer.into_bytes()
98}
99
100pub fn build_execute_payload_with_bind_rows_and_options_with_seq(
101    sql: &str,
102    prefetch_rows: u32,
103    seq_num: u8,
104    is_query: bool,
105    bind_rows: &[Vec<BindValue>],
106    exec_options: ExecuteOptions,
107) -> Result<Vec<u8>> {
108    let sql_bytes = sql.as_bytes();
109    let sql_len =
110        u32::try_from(sql_bytes.len()).map_err(|_| ProtocolError::InvalidPacketLength {
111            length: sql_bytes.len(),
112            minimum: 0,
113        })?;
114    let bind_count = bind_rows.first().map_or(0, Vec::len);
115    for row in bind_rows {
116        if row.len() != bind_count {
117            return Err(ProtocolError::TtcDecode("inconsistent bind row width"));
118        }
119    }
120    let bind_count = u32::try_from(bind_count).map_err(|_| ProtocolError::InvalidPacketLength {
121        length: bind_count,
122        minimum: 0,
123    })?;
124    let bind_row_count =
125        u32::try_from(bind_rows.len()).map_err(|_| ProtocolError::InvalidPacketLength {
126            length: bind_rows.len(),
127            minimum: 0,
128        })?;
129    // Preallocate the writer so the small per-field `write_*` pushes do not grow
130    // the backing `Vec` through several doublings (each a heap allocation). The
131    // fixed message header + the inline SQL bytes dominate the no-bind/small-bind
132    // common case (e.g. `select 1 from dual` is 87 bytes total); bind columns add
133    // their own bytes and may still grow the buffer, but the hot small-statement
134    // path now builds in a single allocation. The written bytes are unchanged —
135    // this is a pure allocation optimization (see `TtcWriter::with_capacity`).
136    let writer_capacity = 96 + sql_bytes.len();
137    let mut writer = TtcWriter::with_capacity(writer_capacity);
138    writer.write_function_code_with_seq(TNS_FUNC_EXECUTE, seq_num);
139    writer.write_ub8(exec_options.token_num);
140
141    let is_plsql = statement_is_plsql(sql);
142    let parse_only = exec_options.parse_only;
143    // a fresh parse is required when the statement has no open server cursor
144    // or is DDL (reference execute.pyx:88-89)
145    let needs_parse = exec_options.cursor_id == 0 || crate::sql::statement_is_ddl(sql);
146    // a scroll request only repositions the open cursor and fetches; the
147    // EXECUTE/BIND options are suppressed (reference execute.pyx:82-84,105)
148    let scroll_operation = exec_options.scroll_operation;
149    let mut options = 0;
150    if needs_parse {
151        options |= TNS_EXEC_OPTION_PARSE;
152    }
153    if !parse_only && !scroll_operation {
154        options |= TNS_EXEC_OPTION_EXECUTE;
155    }
156    if is_query {
157        if parse_only {
158            options |= TNS_EXEC_OPTION_DESCRIBE;
159        } else if !exec_options.no_prefetch {
160            // reference execute.pyx:99 gates FETCH on `not stmt._no_prefetch`;
161            // a no-prefetch statement (VECTOR columns) leaves the rows to be
162            // retrieved by the follow-up define-fetch instead.
163            options |= TNS_EXEC_OPTION_FETCH;
164        }
165    }
166    if bind_count > 0 && !scroll_operation {
167        options |= TNS_EXEC_OPTION_BIND;
168    }
169    if is_plsql {
170        if bind_count > 0 {
171            options |= TNS_EXEC_OPTION_PLSQL_BIND;
172        }
173    } else if !parse_only {
174        options |= TNS_EXEC_OPTION_NOT_PLSQL;
175    }
176    if exec_options.batcherrors {
177        options |= TNS_EXEC_OPTION_BATCH_ERRORS;
178    }
179    let num_iters = if is_query && !parse_only {
180        prefetch_rows
181    } else {
182        1
183    };
184    // al8i4[1]: queries report 0 on first execute and the iteration count on
185    // re-execute of an open cursor (execute.pyx:187-193)
186    let exec_count = if parse_only {
187        0
188    } else if is_query {
189        if exec_options.cursor_id == 0 {
190            0
191        } else {
192            num_iters
193        }
194    } else {
195        bind_row_count.max(1)
196    };
197    let query_flag = u32::from(is_query);
198    // reference sets the implicit-resultset flag on every full execute with
199    // SQL (execute.pyx:81-82); anonymous PL/SQL blocks need it for
200    // dbms_sql.return_result (ORA-29481 otherwise)
201    let mut exec_flags = if parse_only {
202        0
203    } else {
204        TNS_EXEC_FLAGS_IMPLICIT_RESULTSET
205    };
206    if exec_options.arraydmlrowcounts {
207        exec_flags |= TNS_EXEC_FLAGS_DML_ROWCOUNTS;
208    }
209    // scrollable cursors keep the result set open across fetches and avoid the
210    // server cancelling on end-of-fetch (reference execute.pyx:85-87)
211    if exec_options.scrollable && !parse_only {
212        exec_flags |= TNS_EXEC_FLAGS_SCROLLABLE;
213        exec_flags |= TNS_EXEC_FLAGS_NO_CANCEL_ON_EOF;
214    }
215    writer.write_ub4(options);
216    writer.write_ub4(exec_options.cursor_id);
217    if needs_parse {
218        writer.write_u8(1); // pointer (cursor id)
219        writer.write_ub4(sql_len);
220    } else {
221        writer.write_u8(0); // pointer (cursor id)
222        writer.write_ub4(0);
223    }
224    writer.write_u8(1);
225    writer.write_ub4(13);
226    writer.write_u8(0);
227    writer.write_u8(0);
228    writer.write_ub4(0);
229    writer.write_ub4(num_iters);
230    writer.write_ub4(TNS_MAX_LONG_LENGTH);
231    if bind_count == 0 {
232        writer.write_u8(0);
233        writer.write_ub4(0);
234    } else {
235        writer.write_u8(1);
236        writer.write_ub4(bind_count);
237    }
238    // CQN registration id (registerquery) split lsb/msb across the al8i4 slots
239    // (reference execute.pyx:116-119,156,163). Zero for ordinary executes.
240    let registration_id_lsb = (exec_options.registration_id & 0xffff_ffff) as u32;
241    let registration_id_msb = ((exec_options.registration_id >> 32) & 0xffff_ffff) as u32;
242    writer.write_u8(0);
243    writer.write_u8(0);
244    writer.write_u8(0);
245    writer.write_u8(0);
246    writer.write_u8(0);
247    writer.write_u8(0);
248    writer.write_ub4(0);
249    writer.write_ub4(registration_id_lsb); // registration id (lsb)
250    writer.write_u8(0); // pointer (al8objlist)
251    writer.write_u8(1); // pointer (al8objlen)
252    writer.write_u8(0); // pointer (al8blv)
253    writer.write_ub4(0); // al8blvl
254    writer.write_u8(0); // pointer (al8dnam)
255    writer.write_ub4(0); // al8dnaml
256    writer.write_ub4(registration_id_msb); // registration id (msb)
257    if exec_options.arraydmlrowcounts {
258        writer.write_u8(1); // pointer (al8pidmlrc)
259        writer.write_ub4(exec_count); // al8pidmlrcbl
260        writer.write_u8(1); // pointer (al8pidmlrcl)
261    } else {
262        writer.write_u8(0); // pointer (al8pidmlrc)
263        writer.write_ub4(0); // al8pidmlrcbl
264        writer.write_u8(0); // pointer (al8pidmlrcl)
265    }
266    writer.write_u8(0); // pointer (al8sqlsig)
267    writer.write_ub4(0); // SQL signature length
268    writer.write_u8(0); // pointer (SQL ID)
269    writer.write_ub4(0); // allocated size of SQL ID
270    writer.write_u8(0); // pointer (length of SQL ID)
271    writer.write_u8(0); // pointer (chunk ids)
272    writer.write_ub4(0); // number of chunk ids
273
274    if needs_parse {
275        writer.write_bytes_with_length(sql_bytes)?;
276        writer.write_ub4(1); // al8i4[0] parse
277    } else {
278        writer.write_ub4(0); // al8i4[0] parse
279    }
280    writer.write_ub4(exec_count);
281    writer.write_ub4(0);
282    writer.write_ub4(0);
283    writer.write_ub4(0);
284    writer.write_ub4(0);
285    writer.write_ub4(0);
286    writer.write_ub4(query_flag); // al8i4[7] is query
287    writer.write_ub4(0); // al8i4[8]
288    writer.write_ub4(exec_flags); // al8i4[9] execute flags
289    writer.write_ub4(exec_options.fetch_orientation); // al8i4[10] fetch orientation
290    writer.write_ub4(exec_options.fetch_pos); // al8i4[11] fetch pos
291    writer.write_ub4(0); // al8i4[12]
292                         // a scroll request carries no bind parameters (reference suppresses the
293                         // BIND option and never writes bind params for scroll_operation)
294    if !bind_rows.is_empty() && !scroll_operation {
295        write_bind_params(&mut writer, bind_rows, is_plsql)?;
296    }
297    Ok(writer.into_bytes())
298}
299
300pub(crate) fn write_bind_params(
301    writer: &mut TtcWriter,
302    bind_rows: &[Vec<BindValue>],
303    is_plsql: bool,
304) -> Result<()> {
305    let Some(first_row) = bind_rows.first() else {
306        return Ok(());
307    };
308    let mut bind_metadata = Vec::with_capacity(first_row.len());
309    for index in 0..first_row.len() {
310        bind_metadata.push(write_bind_metadata_for_rows(writer, bind_rows, index)?);
311    }
312    for row in bind_rows {
313        if !is_plsql && row.iter().all(BindValue::is_output_only) {
314            continue;
315        }
316        writer.write_u8(TNS_MSG_TYPE_ROW_DATA);
317        for index in bind_row_value_order(row, &bind_metadata, is_plsql) {
318            let value = &row[index];
319            let (_ora_type_num, csfrm, _buffer_size) = bind_metadata
320                .get(index)
321                .copied()
322                .unwrap_or((ORA_TYPE_NUM_VARCHAR, CS_FORM_IMPLICIT, 1));
323            write_bind_value(writer, value, csfrm)?;
324        }
325    }
326    Ok(())
327}
328
329pub(crate) fn bind_row_value_order(
330    row: &[BindValue],
331    bind_metadata: &[(u8, u8, u32)],
332    is_plsql: bool,
333) -> Vec<usize> {
334    let mut non_long = Vec::with_capacity(row.len());
335    let mut long = Vec::new();
336    for (index, value) in row.iter().enumerate() {
337        if !is_plsql && value.is_output_only() {
338            continue;
339        }
340        // non-LONG values are written first followed by any LONG values; a
341        // value is "long" when its buffer size exceeds the maximum string
342        // size (reference messages/base.pyx:1529-1565 keys this off
343        // `metadata.buffer_size > buf._caps.max_string_size`)
344        if !is_plsql
345            && bind_metadata
346                .get(index)
347                .is_some_and(|(ora_type_num, _, buffer_size)| {
348                    matches!(*ora_type_num, ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW)
349                        || *buffer_size > 32_767
350                })
351        {
352            long.push(index);
353        } else {
354            non_long.push(index);
355        }
356    }
357    non_long.extend(long);
358    non_long
359}
360
361pub(crate) fn write_bind_metadata_for_rows(
362    writer: &mut TtcWriter,
363    bind_rows: &[Vec<BindValue>],
364    index: usize,
365) -> Result<(u8, u8, u32)> {
366    let Some(first_row) = bind_rows.first() else {
367        return Ok((ORA_TYPE_NUM_VARCHAR, CS_FORM_IMPLICIT, 1));
368    };
369    let Some(first_value) = first_row.get(index) else {
370        return Ok((ORA_TYPE_NUM_VARCHAR, CS_FORM_IMPLICIT, 1));
371    };
372    let mut metadata_value = first_value;
373    let (mut ora_type_num, mut csfrm, mut buffer_size) = bind_metadata(first_value);
374    let mut needs_type_inference = matches!(first_value, BindValue::Null);
375    for row in bind_rows.iter().skip(1) {
376        let Some(value) = row.get(index) else {
377            continue;
378        };
379        if needs_type_inference {
380            if matches!(value, BindValue::Null) {
381                continue;
382            }
383            metadata_value = value;
384            (ora_type_num, csfrm, buffer_size) = bind_metadata(value);
385            needs_type_inference = false;
386            continue;
387        }
388        let (row_ora_type_num, row_csfrm, row_buffer_size) = bind_metadata(value);
389        if row_csfrm == csfrm && bind_metadata_types_are_compatible(ora_type_num, row_ora_type_num)
390        {
391            ora_type_num = promoted_bind_metadata_type(ora_type_num, row_ora_type_num);
392            buffer_size = buffer_size.max(row_buffer_size);
393        }
394    }
395    write_bind_metadata_with_type(writer, metadata_value, ora_type_num, csfrm, buffer_size)?;
396    Ok((ora_type_num, csfrm, buffer_size))
397}
398
399pub(crate) fn bind_metadata_types_are_compatible(left: u8, right: u8) -> bool {
400    left == right
401        || (matches!(
402            left,
403            ORA_TYPE_NUM_CHAR | ORA_TYPE_NUM_VARCHAR | ORA_TYPE_NUM_LONG
404        ) && matches!(
405            right,
406            ORA_TYPE_NUM_CHAR | ORA_TYPE_NUM_VARCHAR | ORA_TYPE_NUM_LONG
407        ))
408        || (matches!(left, ORA_TYPE_NUM_RAW | ORA_TYPE_NUM_LONG_RAW)
409            && matches!(right, ORA_TYPE_NUM_RAW | ORA_TYPE_NUM_LONG_RAW))
410}
411
412pub(crate) fn promoted_bind_metadata_type(left: u8, right: u8) -> u8 {
413    if matches!(left, ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW) {
414        left
415    } else if matches!(right, ORA_TYPE_NUM_LONG | ORA_TYPE_NUM_LONG_RAW) {
416        right
417    } else {
418        left
419    }
420}