Skip to main content

oracledb_protocol/thin/
aq.rs

1#![forbid(unsafe_code)]
2
3//! Sans-io codecs for Oracle Advanced Queuing (AQ) enqueue/dequeue operations.
4//!
5//! Mirrors the reference thin driver's `impl/thin/messages/aq_*.pyx`:
6//! - [`build_aq_enq_payload`] / [`parse_aq_enq_response`]  — FUNC 121 (single enqueue)
7//! - [`build_aq_deq_payload`] / [`parse_aq_deq_response`]  — FUNC 122 (single dequeue)
8//! - [`build_aq_array_enq_payload`] / [`build_aq_array_deq_payload`]
9//!   / [`parse_aq_array_response`]                          — FUNC 145 (bulk enqueue/dequeue)
10//!
11//! The message-property / payload codecs are shared between all three so the
12//! wire encoding is byte-identical to python-oracledb (golden traces under
13//! `tests/golden/aq_*.txt`). Object payloads reuse the `dbobject` codec and JSON
14//! payloads reuse [`crate::oson`].
15
16use super::*;
17use crate::oson::{decode_oson_with_limits, encode_oson, OsonValue};
18use crate::wire::ProtocolLimits;
19
20/// Payload classification for a queue. Determines the TOID sentinel and the
21/// payload-encoding branch taken during enqueue/dequeue.
22#[derive(Clone, Debug, PartialEq, Eq)]
23pub enum AqPayloadKind {
24    /// RAW / bytes payload. TOID sentinel `bytes([0]*15 + [0x17])`.
25    Raw,
26    /// JSON payload (OSON). TOID sentinel `bytes([0]*15 + [0x47])`.
27    Json,
28    /// Object payload of a named type. TOID is the type's OID.
29    Object,
30}
31
32/// Static description of an AQ queue, used by both enqueue and dequeue.
33#[derive(Clone, Debug)]
34pub struct AqQueueDesc {
35    pub name: String,
36    pub kind: AqPayloadKind,
37    /// TOID of the payload: 16-byte sentinel for RAW/JSON, the type OID for
38    /// object queues.
39    pub payload_toid: Vec<u8>,
40}
41
42impl AqQueueDesc {
43    /// Builds the queue descriptor, deriving the payload TOID from the kind.
44    /// For object queues `object_oid` must carry the type's OID.
45    pub fn new(name: String, kind: AqPayloadKind, object_oid: Option<Vec<u8>>) -> Self {
46        let payload_toid = match kind {
47            AqPayloadKind::Raw => raw_payload_toid(),
48            AqPayloadKind::Json => json_payload_toid(),
49            AqPayloadKind::Object => object_oid.unwrap_or_default(),
50        };
51        Self {
52            name,
53            kind,
54            payload_toid,
55        }
56    }
57}
58
59fn raw_payload_toid() -> Vec<u8> {
60    let mut toid = vec![0u8; 15];
61    toid.push(0x17);
62    toid
63}
64
65fn json_payload_toid() -> Vec<u8> {
66    let mut toid = vec![0u8; 15];
67    toid.push(0x47);
68    toid
69}
70
71/// The payload value carried by a message being enqueued.
72#[derive(Clone, Debug)]
73pub enum AqPayloadValue {
74    /// RAW bytes (already UTF-8 encoded if originally a string).
75    Raw(Vec<u8>),
76    /// JSON value encoded as OSON.
77    Json(OsonValue),
78    /// Pre-packed object image (the body produced by `DbObjectImpl::pack_image`).
79    Object { oid: Vec<u8>, image: Vec<u8> },
80}
81
82/// Mutable message properties (reference `ThinMsgPropsImpl`). Defaults match
83/// the reference: `delay=0`, `expiration=-1`, `priority=0`, `state=0`.
84#[derive(Clone, Debug)]
85pub struct AqMsgProps {
86    pub priority: i32,
87    pub delay: i32,
88    pub expiration: i32,
89    pub correlation: Option<String>,
90    pub exception_queue: Option<String>,
91    pub state: i32,
92    pub enq_txn_id: Option<Vec<u8>>,
93    /// Recipient names (multi-consumer enqueue). `None` => no recipient list.
94    pub recipients: Option<Vec<String>>,
95    /// Payload to enqueue. Required for enqueue; ignored for dequeue requests
96    /// (where defaults are written).
97    pub payload: Option<AqPayloadValue>,
98}
99
100impl Default for AqMsgProps {
101    fn default() -> Self {
102        Self {
103            priority: 0,
104            delay: 0,
105            expiration: -1,
106            correlation: None,
107            exception_queue: None,
108            state: 0,
109            enq_txn_id: None,
110            // Reference `ThinMsgPropsImpl.__init__` defaults recipients to an
111            // empty list (not None): an empty list still writes pointer=1 with
112            // a zero count, whereas None writes pointer=0.
113            recipients: Some(Vec::new()),
114            payload: None,
115        }
116    }
117}
118
119/// Enqueue options (reference `ThinEnqOptionsImpl`). `visibility` defaults to
120/// `ENQ_ON_COMMIT (2)`, `delivery_mode` to `PERSISTENT (1)`.
121#[derive(Clone, Debug)]
122pub struct AqEnqOptions {
123    pub visibility: u32,
124    pub delivery_mode: u16,
125}
126
127impl Default for AqEnqOptions {
128    fn default() -> Self {
129        Self {
130            visibility: 2,
131            delivery_mode: TNS_AQ_MSG_PERSISTENT,
132        }
133    }
134}
135
136/// Dequeue options (reference `ThinDeqOptionsImpl`). Defaults: `mode=REMOVE(3)`,
137/// `navigation=NEXT_MSG(3)`, `visibility=ON_COMMIT(2)`, `wait=WAIT_FOREVER`,
138/// `delivery_mode=PERSISTENT(1)`.
139#[derive(Clone, Debug)]
140pub struct AqDeqOptions {
141    pub condition: Option<String>,
142    pub consumer_name: Option<String>,
143    pub correlation: Option<String>,
144    pub delivery_mode: u16,
145    pub mode: i32,
146    pub msgid: Option<Vec<u8>>,
147    pub navigation: i32,
148    pub visibility: i32,
149    pub wait: u32,
150}
151
152impl Default for AqDeqOptions {
153    fn default() -> Self {
154        Self {
155            condition: None,
156            consumer_name: None,
157            correlation: None,
158            delivery_mode: TNS_AQ_MSG_PERSISTENT,
159            mode: 3,
160            msgid: None,
161            navigation: 3,
162            visibility: 2,
163            wait: 0xFFFF_FFFF,
164        }
165    }
166}
167
168/// A message returned by dequeue (reference fields read by `_process_msg_props`
169/// / `_process_payload`).
170#[derive(Clone, Debug, Default)]
171pub struct AqDeqMessage {
172    pub priority: i32,
173    pub delay: i32,
174    pub expiration: i32,
175    pub correlation: Option<String>,
176    pub num_attempts: i32,
177    pub exception_queue: Option<String>,
178    pub state: i32,
179    /// Oracle enqueue time decoded to a naive datetime, or `None`.
180    pub enq_time: Option<QueryValue>,
181    pub delivery_mode: u16,
182    pub msgid: Option<Vec<u8>>,
183    /// Decoded payload. `None` for an empty-payload message.
184    pub payload: Option<AqDeqPayload>,
185}
186
187/// A decoded dequeue payload.
188#[derive(Clone, Debug)]
189pub enum AqDeqPayload {
190    /// RAW bytes (may be empty for `DEQ_REMOVE_NODATA`).
191    Raw(Vec<u8>),
192    /// JSON decoded from OSON.
193    Json(OsonValue),
194    /// Object payload: the raw packed image (unpacked by the shim against the
195    /// queue's payload type).
196    Object(Vec<u8>),
197}
198
199// ---------------------------------------------------------------------------
200// Shared message-property and payload codecs (reference aq_base.pyx).
201// ---------------------------------------------------------------------------
202
203/// Writes the TTC function-code preamble: message-type/function/seq plus the
204/// `token_num` ub8 the server expects when the negotiated field version is at
205/// least `23.1 EXT 1` (reference `Message._write_function_code`).
206fn write_aq_function_code(
207    writer: &mut TtcWriter,
208    function_code: u8,
209    seq_num: u8,
210    ttc_field_version: u8,
211) {
212    writer.write_function_code_with_seq(function_code, seq_num);
213    if ttc_field_version >= TNS_CCAP_FIELD_VERSION_23_1_EXT_1 {
214        writer.write_ub8(0); // token_num
215    }
216}
217
218fn write_value_with_length(writer: &mut TtcWriter, value: Option<&[u8]>) -> Result<()> {
219    match value {
220        None => {
221            writer.write_ub4(0);
222            Ok(())
223        }
224        Some(bytes) => writer.write_bytes_with_two_lengths(Some(bytes)),
225    }
226}
227
228/// Writes the AQ message-property block (`_write_msg_props`).
229fn write_msg_props(
230    writer: &mut TtcWriter,
231    props: &AqMsgProps,
232    ttc_field_version: u8,
233) -> Result<()> {
234    writer.write_ub4(props.priority as u32);
235    writer.write_ub4(props.delay as u32);
236    writer.write_sb4(props.expiration);
237    write_value_with_length(writer, props.correlation.as_deref().map(str::as_bytes))?;
238    writer.write_ub4(0); // number of attempts
239    write_value_with_length(writer, props.exception_queue.as_deref().map(str::as_bytes))?;
240    writer.write_ub4(props.state as u32);
241    writer.write_ub4(0); // enqueue time length
242    write_value_with_length(writer, props.enq_txn_id.as_deref())?;
243    writer.write_ub4(4); // number of extensions
244    writer.write_u8(0x0e); // unknown extra byte
245    writer.write_keyword_value_pair(None, None, TNS_AQ_EXT_KEYWORD_AGENT_NAME)?;
246    writer.write_keyword_value_pair(None, None, TNS_AQ_EXT_KEYWORD_AGENT_ADDRESS)?;
247    writer.write_keyword_value_pair(None, Some(b"\x00"), TNS_AQ_EXT_KEYWORD_AGENT_PROTOCOL)?;
248    writer.write_keyword_value_pair(None, None, TNS_AQ_EXT_KEYWORD_ORIGINAL_MSGID)?;
249    writer.write_ub4(0); // user property
250    writer.write_ub4(0); // cscn
251    writer.write_ub4(0); // dscn
252    writer.write_ub4(0); // flags
253    if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
254        writer.write_ub4(0xFFFF_FFFF); // shard id
255    }
256    Ok(())
257}
258
259/// Writes the recipient-list key/value pairs (`_write_recipients`).
260fn write_recipients(writer: &mut TtcWriter, recipients: &[String]) -> Result<()> {
261    let mut index: u16 = 0;
262    for recipient in recipients {
263        writer.write_keyword_value_pair(Some(recipient.as_bytes()), None, index)?;
264        writer.write_keyword_value_pair(None, None, index + 1)?;
265        writer.write_keyword_value_pair(None, Some(b"\x00"), index + 2)?;
266        index += 3;
267    }
268    Ok(())
269}
270
271/// Writes the message payload (`_write_payload`).
272fn write_payload(
273    writer: &mut TtcWriter,
274    payload: &AqPayloadValue,
275    supports_oson_long_fnames: bool,
276) -> Result<()> {
277    match payload {
278        AqPayloadValue::Json(value) => {
279            // write_oson(..., write_length=False): a QLocator (no chunk-length
280            // prefix) followed by the OSON image as a length-prefixed chunk.
281            let image = encode_oson(value, supports_oson_long_fnames)?;
282            crate::vector::write_oson_aq_payload(writer, &image)
283        }
284        AqPayloadValue::Object { oid, image } => write_dbobject_bind(writer, oid, image),
285        AqPayloadValue::Raw(bytes) => {
286            writer.write_raw(bytes);
287            Ok(())
288        }
289    }
290}
291
292// ---------------------------------------------------------------------------
293// FUNC 121 — single enqueue
294// ---------------------------------------------------------------------------
295
296/// Builds the AQ enqueue (FUNC 121) request payload (reference
297/// `AqEnqMessage._write_message`).
298pub fn build_aq_enq_payload(
299    queue: &AqQueueDesc,
300    props: &AqMsgProps,
301    enq_options: &AqEnqOptions,
302    seq_num: u8,
303    ttc_field_version: u8,
304    supports_oson_long_fnames: bool,
305) -> Result<Vec<u8>> {
306    let payload = props
307        .payload
308        .as_ref()
309        .ok_or(ProtocolError::TtcDecode("AQ enqueue has no payload"))?;
310    let queue_name = queue.name.as_bytes();
311    let mut writer = TtcWriter::new();
312    write_aq_function_code(&mut writer, TNS_FUNC_AQ_ENQ, seq_num, ttc_field_version);
313    writer.write_u8(1); // queue name (pointer)
314    writer.write_ub4(queue_name.len() as u32);
315    write_msg_props(&mut writer, props, ttc_field_version)?;
316    match props.recipients.as_ref() {
317        None => {
318            writer.write_u8(0); // recipients (pointer)
319            writer.write_ub4(0); // number of key/value pairs
320        }
321        Some(recipients) => {
322            writer.write_u8(1);
323            writer.write_ub4(3 * recipients.len() as u32);
324        }
325    }
326    writer.write_ub4(enq_options.visibility);
327    writer.write_u8(0); // relative message id (pointer)
328    writer.write_ub4(0); // relative message length
329    writer.write_ub4(0); // sequence deviation
330    writer.write_u8(1); // TOID of payload (pointer)
331    writer.write_ub4(16); // TOID of payload length
332    writer.write_ub2(TNS_AQ_MESSAGE_VERSION);
333    match queue.kind {
334        AqPayloadKind::Json => {
335            writer.write_u8(0); // payload (pointer)
336            writer.write_u8(0); // RAW payload (pointer)
337            writer.write_ub4(0); // RAW payload length
338        }
339        AqPayloadKind::Object => {
340            writer.write_u8(1); // payload (pointer)
341            writer.write_u8(0); // RAW payload (pointer)
342            writer.write_ub4(0); // RAW payload length
343        }
344        AqPayloadKind::Raw => {
345            let raw_len = match payload {
346                AqPayloadValue::Raw(bytes) => bytes.len() as u32,
347                _ => return Err(ProtocolError::TtcDecode("RAW queue requires RAW payload")),
348            };
349            writer.write_u8(0); // payload (pointer)
350            writer.write_u8(1); // RAW payload (pointer)
351            writer.write_ub4(raw_len);
352        }
353    }
354    writer.write_u8(1); // return message id (pointer)
355    writer.write_ub4(TNS_AQ_MESSAGE_ID_LENGTH as u32);
356    let mut enq_flags = 0u32;
357    if enq_options.delivery_mode == TNS_AQ_MSG_BUFFERED {
358        enq_flags |= TNS_KPD_AQ_BUFMSG;
359    }
360    writer.write_ub4(enq_flags); // enqueue flags
361    writer.write_u8(0); // extensions 1 (pointer)
362    writer.write_ub4(0); // number of extensions 1
363    writer.write_u8(0); // extensions 2 (pointer)
364    writer.write_ub4(0); // number of extensions 2
365    writer.write_u8(0); // source sequence number
366    writer.write_ub4(0); // source sequence length
367    writer.write_u8(0); // max sequence number
368    writer.write_ub4(0); // max sequence length
369    writer.write_u8(0); // output ack length
370    writer.write_u8(0); // correlation (pointer)
371    writer.write_ub4(0); // correlation length
372    writer.write_u8(0); // sender name (pointer)
373    writer.write_ub4(0); // sender name length
374    writer.write_u8(0); // sender address (pointer)
375    writer.write_ub4(0); // sender address length
376    writer.write_u8(0); // sender charset id (pointer)
377    writer.write_u8(0); // sender ncharset id (pointer)
378    if ttc_field_version >= TNS_CCAP_FIELD_VERSION_20_1 {
379        // JSON payload (pointer)
380        writer.write_u8(u8::from(queue.kind == AqPayloadKind::Json));
381    }
382
383    writer.write_bytes_with_length(queue_name)?;
384    if let Some(recipients) = props.recipients.as_ref() {
385        write_recipients(&mut writer, recipients)?;
386    }
387    writer.write_raw(&queue.payload_toid);
388    write_payload(&mut writer, payload, supports_oson_long_fnames)?;
389    Ok(writer.into_bytes())
390}
391
392/// Parses an AQ enqueue (FUNC 121) response, returning the assigned 16-byte
393/// message id (reference `AqEnqMessage._process_return_parameters`).
394pub fn parse_aq_enq_response(
395    payload: &[u8],
396    capabilities: ClientCapabilities,
397) -> Result<Option<Vec<u8>>> {
398    parse_aq_enq_response_with_limits(payload, capabilities, ProtocolLimits::DEFAULT)
399}
400
401pub fn parse_aq_enq_response_with_limits(
402    payload: &[u8],
403    capabilities: ClientCapabilities,
404    limits: ProtocolLimits,
405) -> Result<Option<Vec<u8>>> {
406    let mut reader = TtcReader::with_limits(payload, limits)?;
407    let mut msgid: Option<Vec<u8>> = None;
408    while reader.remaining() > 0 {
409        let message_type = reader.read_u8()?;
410        match message_type {
411            0 => {}
412            TNS_MSG_TYPE_PARAMETER => {
413                let id = reader.read_raw(TNS_AQ_MESSAGE_ID_LENGTH)?.to_vec();
414                let _ext_len = reader.read_ub2()?;
415                msgid = Some(id);
416            }
417            TNS_MSG_TYPE_STATUS => {
418                let _call_status = reader.read_ub4()?;
419                let _seq = reader.read_ub2()?;
420            }
421            TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
422                let _ = skip_server_side_piggyback(&mut reader)?;
423            }
424            TNS_MSG_TYPE_END_OF_RESPONSE => break,
425            TNS_MSG_TYPE_ERROR => {
426                let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
427                if info.number != 0 {
428                    return Err(ProtocolError::ServerErrorInfo(Box::new(
429                        info.into_details(),
430                    )));
431                }
432            }
433            _ => {
434                return Err(ProtocolError::UnknownMessageType {
435                    message_type,
436                    position: reader.position().saturating_sub(1),
437                })
438            }
439        }
440    }
441    Ok(msgid)
442}
443
444// ---------------------------------------------------------------------------
445// FUNC 122 — single dequeue
446// ---------------------------------------------------------------------------
447
448/// Builds the AQ dequeue (FUNC 122) request payload (reference
449/// `AqDeqMessage._write_message`).
450pub fn build_aq_deq_payload(
451    queue: &AqQueueDesc,
452    deq_options: &AqDeqOptions,
453    seq_num: u8,
454    ttc_field_version: u8,
455) -> Result<Vec<u8>> {
456    let queue_name = queue.name.as_bytes();
457    let mut writer = TtcWriter::new();
458    write_aq_function_code(&mut writer, TNS_FUNC_AQ_DEQ, seq_num, ttc_field_version);
459    writer.write_u8(1); // queue name (pointer)
460    writer.write_ub4(queue_name.len() as u32);
461    writer.write_u8(1); // message properties
462    writer.write_u8(1); // msg props length
463    writer.write_u8(1); // recipient list
464    writer.write_u8(1); // recipient list length
465    let consumer_name = deq_options
466        .consumer_name
467        .as_ref()
468        .filter(|name| !name.is_empty());
469    match consumer_name {
470        Some(name) => {
471            writer.write_u8(1);
472            writer.write_ub4(name.len() as u32);
473        }
474        None => {
475            writer.write_u8(0);
476            writer.write_ub4(0);
477        }
478    }
479    writer.write_sb4(deq_options.mode);
480    writer.write_sb4(deq_options.navigation);
481    writer.write_sb4(deq_options.visibility);
482    writer.write_sb4(deq_options.wait as i32);
483    let msgid = deq_options.msgid.as_ref().filter(|id| !id.is_empty());
484    match msgid {
485        Some(_) => {
486            writer.write_u8(1);
487            writer.write_ub4(TNS_AQ_MESSAGE_ID_LENGTH as u32);
488        }
489        None => {
490            writer.write_u8(0);
491            writer.write_ub4(0);
492        }
493    }
494    let correlation = deq_options.correlation.as_ref().filter(|c| !c.is_empty());
495    match correlation {
496        Some(c) => {
497            writer.write_u8(1);
498            writer.write_ub4(c.len() as u32);
499        }
500        None => {
501            writer.write_u8(0);
502            writer.write_ub4(0);
503        }
504    }
505    writer.write_u8(1); // toid of payload
506    writer.write_ub4(16); // toid length
507    writer.write_ub2(TNS_AQ_MESSAGE_VERSION);
508    writer.write_u8(1); // payload
509    writer.write_u8(1); // return msg id
510    writer.write_ub4(TNS_AQ_MESSAGE_ID_LENGTH as u32);
511    let mut deq_flags = 0u32;
512    match deq_options.delivery_mode {
513        TNS_AQ_MSG_BUFFERED => deq_flags |= TNS_KPD_AQ_BUFMSG,
514        TNS_AQ_MSG_PERSISTENT_OR_BUFFERED => deq_flags |= TNS_KPD_AQ_EITHER,
515        _ => {}
516    }
517    writer.write_ub4(deq_flags);
518    let condition = deq_options.condition.as_ref().filter(|c| !c.is_empty());
519    match condition {
520        Some(c) => {
521            writer.write_u8(1);
522            writer.write_ub4(c.len() as u32);
523        }
524        None => {
525            writer.write_u8(0);
526            writer.write_ub4(0);
527        }
528    }
529    writer.write_u8(0); // extensions
530    writer.write_ub4(0); // number of extensions
531    if ttc_field_version >= TNS_CCAP_FIELD_VERSION_20_1 {
532        writer.write_u8(0); // JSON payload
533    }
534    if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
535        writer.write_ub4(0xFFFF_FFFF); // shard id (-1)
536    }
537
538    writer.write_bytes_with_length(queue_name)?;
539    if let Some(name) = consumer_name {
540        writer.write_bytes_with_length(name.as_bytes())?;
541    }
542    if let Some(id) = msgid {
543        let mut id = id.clone();
544        id.truncate(16);
545        if id.len() < 16 {
546            id.resize(16, 0);
547        }
548        writer.write_raw(&id);
549    }
550    if let Some(c) = correlation {
551        writer.write_bytes_with_length(c.as_bytes())?;
552    }
553    writer.write_raw(&queue.payload_toid);
554    if let Some(c) = condition {
555        writer.write_bytes_with_length(c.as_bytes())?;
556    }
557    Ok(writer.into_bytes())
558}
559
560/// Outcome of a single dequeue.
561#[derive(Clone, Debug, Default)]
562pub struct AqDeqResult {
563    /// The dequeued message, or `None` when the queue was empty (ORA-25228).
564    pub message: Option<AqDeqMessage>,
565}
566
567/// Parses an AQ dequeue (FUNC 122) response (reference
568/// `AqDeqMessage._process_return_parameters`).
569pub fn parse_aq_deq_response(
570    payload: &[u8],
571    capabilities: ClientCapabilities,
572    kind: &AqPayloadKind,
573) -> Result<AqDeqResult> {
574    parse_aq_deq_response_with_limits(payload, capabilities, kind, ProtocolLimits::DEFAULT)
575}
576
577pub fn parse_aq_deq_response_with_limits(
578    payload: &[u8],
579    capabilities: ClientCapabilities,
580    kind: &AqPayloadKind,
581    limits: ProtocolLimits,
582) -> Result<AqDeqResult> {
583    let mut reader = TtcReader::with_limits(payload, limits)?;
584    let mut result = AqDeqResult::default();
585    let mut no_msg_found = false;
586    while reader.remaining() > 0 {
587        let message_type = reader.read_u8()?;
588        match message_type {
589            0 => {}
590            TNS_MSG_TYPE_PARAMETER => {
591                let num_bytes = reader.read_ub4()?;
592                if num_bytes > 0 {
593                    let mut message = AqDeqMessage::default();
594                    process_msg_props(&mut reader, &mut message, capabilities.ttc_field_version)?;
595                    process_recipients(&mut reader)?;
596                    message.payload = process_payload(&mut reader, kind)?;
597                    message.msgid = Some(process_msg_id(&mut reader)?);
598                    result.message = Some(message);
599                }
600            }
601            TNS_MSG_TYPE_STATUS => {
602                let _call_status = reader.read_ub4()?;
603                let _seq = reader.read_ub2()?;
604            }
605            TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
606                let _ = skip_server_side_piggyback(&mut reader)?;
607            }
608            TNS_MSG_TYPE_END_OF_RESPONSE => break,
609            TNS_MSG_TYPE_ERROR => {
610                let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
611                if info.number == TNS_ERR_NO_MESSAGES_FOUND as u32 {
612                    no_msg_found = true;
613                } else if info.number != 0 {
614                    return Err(ProtocolError::ServerErrorInfo(Box::new(
615                        info.into_details(),
616                    )));
617                }
618            }
619            _ => {
620                return Err(ProtocolError::UnknownMessageType {
621                    message_type,
622                    position: reader.position().saturating_sub(1),
623                })
624            }
625        }
626    }
627    if no_msg_found {
628        result.message = None;
629    }
630    Ok(result)
631}
632
633// ---------------------------------------------------------------------------
634// FUNC 145 — bulk enqueue / dequeue
635// ---------------------------------------------------------------------------
636
637/// Builds the AQ array enqueue (FUNC 145, op=ENQ) request payload (reference
638/// `AqArrayMessage._write_message` + `_write_array_enq`).
639pub fn build_aq_array_enq_payload(
640    queue: &AqQueueDesc,
641    props_list: &[AqMsgProps],
642    enq_options: &AqEnqOptions,
643    seq_num: u8,
644    ttc_field_version: u8,
645    supports_oson_long_fnames: bool,
646) -> Result<Vec<u8>> {
647    let num_iters = props_list.len() as u32;
648    let queue_name = queue.name.as_bytes();
649    let mut writer = TtcWriter::new();
650    write_aq_function_code(&mut writer, TNS_FUNC_AQ_ARRAY, seq_num, ttc_field_version);
651    writer.write_u8(0); // input params (pointer)
652    writer.write_ub4(0); // length
653    writer.write_ub4(TNS_AQ_ARRAY_FLAGS_RETURN_MESSAGE_ID);
654    writer.write_u8(1); // output params (pointer)
655    writer.write_u8(0); // length
656    writer.write_sb4(TNS_AQ_ARRAY_ENQ);
657    writer.write_u8(1); // num iters (pointer)
658    if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
659        writer.write_ub4(0xFFFF); // shard id
660    }
661    writer.write_ub4(num_iters);
662
663    let mut flags = 0u32;
664    if enq_options.delivery_mode == TNS_AQ_MSG_BUFFERED {
665        flags |= TNS_KPD_AQ_BUFMSG;
666    }
667    writer.write_ub4(0); // rel msgid len
668    writer.write_u8(TNS_MSG_TYPE_ROW_HEADER);
669    writer.write_bytes_with_two_lengths(Some(queue_name))?;
670    writer.write_raw(&queue.payload_toid);
671    writer.write_ub2(TNS_AQ_MESSAGE_VERSION);
672    writer.write_ub4(flags);
673    for props in props_list {
674        let payload = props
675            .payload
676            .as_ref()
677            .ok_or(ProtocolError::TtcDecode("AQ array enqueue has no payload"))?;
678        writer.write_u8(TNS_MSG_TYPE_ROW_DATA);
679        writer.write_ub4(flags); // aqi flags
680        write_msg_props(&mut writer, props, ttc_field_version)?;
681        match props.recipients.as_ref() {
682            None => writer.write_ub4(0),
683            Some(recipients) => {
684                writer.write_ub4(3 * recipients.len() as u32);
685                write_recipients(&mut writer, recipients)?;
686            }
687        }
688        writer.write_sb4(enq_options.visibility as i32);
689        writer.write_ub4(0); // relative msg id
690        writer.write_sb4(0); // seq deviation
691        if matches!(queue.kind, AqPayloadKind::Raw) {
692            let raw_len = match payload {
693                AqPayloadValue::Raw(bytes) => bytes.len() as u32,
694                _ => return Err(ProtocolError::TtcDecode("RAW queue requires RAW payload")),
695            };
696            writer.write_ub4(raw_len);
697        }
698        write_payload(&mut writer, payload, supports_oson_long_fnames)?;
699    }
700    writer.write_u8(TNS_MSG_TYPE_STATUS);
701    Ok(writer.into_bytes())
702}
703
704/// Builds the AQ array dequeue (FUNC 145, op=DEQ) request payload (reference
705/// `AqArrayMessage._write_message` + `_write_array_deq`).
706pub fn build_aq_array_deq_payload(
707    queue: &AqQueueDesc,
708    deq_options: &AqDeqOptions,
709    num_iters: u32,
710    seq_num: u8,
711    ttc_field_version: u8,
712) -> Result<Vec<u8>> {
713    let queue_name = queue.name.as_bytes();
714    let mut writer = TtcWriter::new();
715    write_aq_function_code(&mut writer, TNS_FUNC_AQ_ARRAY, seq_num, ttc_field_version);
716    writer.write_u8(1); // input params (pointer)
717    writer.write_ub4(num_iters);
718    writer.write_ub4(TNS_AQ_ARRAY_FLAGS_RETURN_MESSAGE_ID);
719    writer.write_u8(1); // output params (pointer)
720    writer.write_u8(1); // length
721    writer.write_sb4(TNS_AQ_ARRAY_DEQ);
722    writer.write_u8(0); // num iters (pointer)
723    if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
724        writer.write_ub4(0xFFFF); // shard id
725    }
726
727    let mut flags = 0u32;
728    match deq_options.delivery_mode {
729        TNS_AQ_MSG_BUFFERED => flags |= TNS_KPD_AQ_BUFMSG,
730        TNS_AQ_MSG_PERSISTENT_OR_BUFFERED => flags |= TNS_KPD_AQ_EITHER,
731        _ => {}
732    }
733    let consumer_name = deq_options
734        .consumer_name
735        .as_ref()
736        .filter(|name| !name.is_empty())
737        .map(|name| name.as_bytes());
738    let correlation = deq_options
739        .correlation
740        .as_ref()
741        .filter(|c| !c.is_empty())
742        .map(|c| c.as_bytes());
743    let condition = deq_options
744        .condition
745        .as_ref()
746        .filter(|c| !c.is_empty())
747        .map(|c| c.as_bytes());
748    let props = AqMsgProps::default();
749    for _ in 0..num_iters {
750        writer.write_bytes_with_two_lengths(Some(queue_name))?;
751        write_msg_props(&mut writer, &props, ttc_field_version)?;
752        writer.write_ub4(0); // num recipients
753        write_value_with_length(&mut writer, consumer_name)?;
754        writer.write_sb4(deq_options.mode);
755        writer.write_sb4(deq_options.navigation);
756        writer.write_sb4(deq_options.visibility);
757        writer.write_sb4(deq_options.wait as i32);
758        write_value_with_length(&mut writer, deq_options.msgid.as_deref())?;
759        write_value_with_length(&mut writer, correlation)?;
760        write_value_with_length(&mut writer, condition)?;
761        writer.write_ub4(0); // extensions
762        writer.write_ub4(0); // rel msg id
763        writer.write_sb4(0); // seq deviation
764        writer.write_bytes_with_two_lengths(Some(&queue.payload_toid))?;
765        writer.write_ub2(TNS_AQ_MESSAGE_VERSION);
766        writer.write_ub4(0); // payload length
767        writer.write_ub4(0); // raw pay length
768        writer.write_ub4(0);
769        writer.write_ub4(flags);
770        writer.write_ub4(0); // extensions len
771        writer.write_ub4(0); // source seq len
772    }
773    Ok(writer.into_bytes())
774}
775
776/// Result of a bulk operation: enqueue returns assigned msgids per message;
777/// dequeue returns the dequeued messages (already truncated to `num_iters`).
778#[derive(Clone, Debug, Default)]
779pub struct AqArrayResult {
780    /// For enqueue: assigned msgid per input message, in order.
781    pub enq_msgids: Vec<Vec<u8>>,
782    /// For dequeue: the dequeued messages.
783    pub deq_messages: Vec<AqDeqMessage>,
784}
785
786/// Parses an AQ array (FUNC 145) response for either operation (reference
787/// `AqArrayMessage._process_return_parameters`).
788///
789/// `props_count` is the number of message-property slots prepared client-side
790/// (`num_iters` for enqueue, `max_num_messages` for dequeue).
791pub fn parse_aq_array_response(
792    payload: &[u8],
793    capabilities: ClientCapabilities,
794    operation: i32,
795    props_count: u32,
796    kind: &AqPayloadKind,
797) -> Result<AqArrayResult> {
798    parse_aq_array_response_with_limits(
799        payload,
800        capabilities,
801        operation,
802        props_count,
803        kind,
804        ProtocolLimits::DEFAULT,
805    )
806}
807
808pub fn parse_aq_array_response_with_limits(
809    payload: &[u8],
810    capabilities: ClientCapabilities,
811    operation: i32,
812    props_count: u32,
813    kind: &AqPayloadKind,
814    limits: ProtocolLimits,
815) -> Result<AqArrayResult> {
816    limits.check_batch_rows(props_count as usize)?;
817    let mut reader = TtcReader::with_limits(payload, limits)?;
818    let mut result = AqArrayResult::default();
819    let mut messages: Vec<AqDeqMessage> = Vec::new();
820    let mut enq_msgid_blob: Option<Vec<u8>> = None;
821    let mut response_num_iters: u32 = 0;
822    let mut no_msg_found = false;
823    while reader.remaining() > 0 {
824        let message_type = reader.read_u8()?;
825        match message_type {
826            0 => {}
827            TNS_MSG_TYPE_PARAMETER => {
828                let num_iters = reader.read_ub4()?;
829                reader.limits().check_batch_rows(num_iters as usize)?;
830                response_num_iters = num_iters;
831                for i in 0..num_iters {
832                    let mut message = AqDeqMessage::default();
833                    let props_len = reader.read_ub2()?;
834                    if props_len > 0 {
835                        reader.read_u8()?; // skip_ub1
836                        process_msg_props(
837                            &mut reader,
838                            &mut message,
839                            capabilities.ttc_field_version,
840                        )?;
841                    }
842                    process_recipients(&mut reader)?;
843                    let payload_len = reader.read_ub2()?;
844                    if payload_len > 0 {
845                        message.payload = process_payload(&mut reader, kind)?;
846                    }
847                    let msgid = reader.read_bytes_with_length()?.unwrap_or_default();
848                    if operation == TNS_AQ_ARRAY_ENQ {
849                        enq_msgid_blob = Some(msgid);
850                    } else {
851                        message.msgid = Some(msgid);
852                    }
853                    let ext_len = reader.read_ub2()?;
854                    if ext_len > 0 {
855                        return Err(ProtocolError::UnsupportedFeature("AQ array extensions"));
856                    }
857                    let _output_ack = reader.read_ub2()?;
858                    if operation != TNS_AQ_ARRAY_ENQ {
859                        let _ = i;
860                        messages.push(message);
861                    }
862                }
863                if operation == TNS_AQ_ARRAY_ENQ {
864                    response_num_iters = reader.read_ub4()?;
865                    reader
866                        .limits()
867                        .check_batch_rows(response_num_iters as usize)?;
868                }
869            }
870            TNS_MSG_TYPE_STATUS => {
871                let _call_status = reader.read_ub4()?;
872                let _seq = reader.read_ub2()?;
873            }
874            TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
875                let _ = skip_server_side_piggyback(&mut reader)?;
876            }
877            TNS_MSG_TYPE_END_OF_RESPONSE => break,
878            TNS_MSG_TYPE_ERROR => {
879                let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
880                if info.number == TNS_ERR_NO_MESSAGES_FOUND as u32 {
881                    no_msg_found = true;
882                } else if info.number != 0 {
883                    return Err(ProtocolError::ServerErrorInfo(Box::new(
884                        info.into_details(),
885                    )));
886                }
887            }
888            _ => {
889                return Err(ProtocolError::UnknownMessageType {
890                    message_type,
891                    position: reader.position().saturating_sub(1),
892                })
893            }
894        }
895    }
896    if operation == TNS_AQ_ARRAY_ENQ {
897        if let Some(blob) = enq_msgid_blob {
898            let count = props_count as usize;
899            result.enq_msgids = (0..count)
900                .map(|j| {
901                    let start = j * 16;
902                    let end = start + 16;
903                    blob.get(start..end).map(<[u8]>::to_vec).unwrap_or_default()
904                })
905                .collect();
906        }
907    } else if no_msg_found {
908        result.deq_messages = Vec::new();
909    } else {
910        let keep = response_num_iters as usize;
911        messages.truncate(keep);
912        result.deq_messages = messages;
913    }
914    Ok(result)
915}
916
917// ---------------------------------------------------------------------------
918// Shared response decoders (reference aq_base.pyx).
919// ---------------------------------------------------------------------------
920
921fn process_msg_props(
922    reader: &mut TtcReader<'_>,
923    message: &mut AqDeqMessage,
924    ttc_field_version: u8,
925) -> Result<()> {
926    message.priority = reader.read_sb4()?;
927    message.delay = reader.read_sb4()?;
928    message.expiration = reader.read_sb4()?;
929    message.correlation = reader.read_string_with_length()?;
930    message.num_attempts = reader.read_sb4()?;
931    message.exception_queue = reader.read_string_with_length()?;
932    message.state = reader.read_sb4()?;
933    message.enq_time = process_date(reader)?;
934    let _enq_txn_id = reader.read_bytes_with_length()?;
935    process_extensions(reader)?;
936    let user_props = reader.read_ub4()?;
937    if user_props > 0 {
938        return Err(ProtocolError::UnsupportedFeature("AQ user properties"));
939    }
940    let _csn = reader.read_ub4()?;
941    let _dsn = reader.read_ub4()?;
942    let flags = reader.read_ub4()?;
943    message.delivery_mode = if flags == TNS_KPD_AQ_BUFMSG {
944        TNS_AQ_MSG_BUFFERED
945    } else {
946        TNS_AQ_MSG_PERSISTENT
947    };
948    if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
949        let _shard = reader.read_ub4()?;
950    }
951    Ok(())
952}
953
954/// Reads an Oracle date the way the reference `_process_date` does: a ub4
955/// presence flag, then `read_raw_bytes_and_length` (a single u8 length byte
956/// followed by that many raw date bytes).
957fn process_date(reader: &mut TtcReader<'_>) -> Result<Option<QueryValue>> {
958    let num_bytes = reader.read_ub4()?;
959    if num_bytes == 0 {
960        return Ok(None);
961    }
962    let len = usize::from(reader.read_u8()?);
963    if len == 0 {
964        return Ok(None);
965    }
966    let bytes = reader.read_raw(len)?;
967    Ok(Some(decode_datetime_value(bytes)?))
968}
969
970fn process_extensions(reader: &mut TtcReader<'_>) -> Result<()> {
971    let num_extensions = reader.read_ub4()?;
972    if num_extensions > 0 {
973        reader.read_u8()?; // skip_ub1
974        for _ in 0..num_extensions {
975            let _text = reader.read_bytes_with_length()?;
976            let _binary = reader.read_bytes_with_length()?;
977            let _keyword = reader.read_ub2()?;
978        }
979    }
980    Ok(())
981}
982
983fn process_recipients(reader: &mut TtcReader<'_>) -> Result<()> {
984    let count = reader.read_ub4()?;
985    if count > 0 {
986        return Err(ProtocolError::UnsupportedFeature(
987            "AQ recipients on dequeue",
988        ));
989    }
990    Ok(())
991}
992
993fn process_msg_id(reader: &mut TtcReader<'_>) -> Result<Vec<u8>> {
994    Ok(reader.read_raw(TNS_AQ_MESSAGE_ID_LENGTH)?.to_vec())
995}
996
997/// Decodes the payload of a dequeued message (reference `_process_payload`).
998fn process_payload(
999    reader: &mut TtcReader<'_>,
1000    kind: &AqPayloadKind,
1001) -> Result<Option<AqDeqPayload>> {
1002    if matches!(kind, AqPayloadKind::Object) {
1003        // Object branch (reference `read_dbobject`): TOID/OID/snapshot
1004        // (length-prefixed each), version ub2, image-len ub4, flags ub2, then
1005        // the packed image as a bare length-prefixed chunk (`read_bytes`).
1006        let _toid = reader.read_bytes_with_length()?;
1007        let _oid = reader.read_bytes_with_length()?;
1008        let _snapshot = reader.read_bytes_with_length()?;
1009        let _version = reader.read_ub2()?;
1010        let image_length = reader.read_ub4()?;
1011        reader
1012            .limits()
1013            .check_response_bytes(image_length as usize)?;
1014        let _flags = reader.read_ub2()?;
1015        if image_length == 0 {
1016            return Ok(None);
1017        }
1018        let image = reader
1019            .read_bytes()?
1020            .ok_or(ProtocolError::TtcDecode("AQ object payload missing"))?;
1021        return Ok(Some(AqDeqPayload::Object(image)));
1022    }
1023    // RAW / JSON branch.
1024    let _toid = reader.read_bytes_with_length()?;
1025    let _oid = reader.read_bytes_with_length()?;
1026    let _snapshot = reader.read_bytes_with_length()?;
1027    let _version = reader.read_ub2()?;
1028    let image_length = reader.read_ub4()? as usize;
1029    reader.limits().check_response_bytes(image_length)?;
1030    let _flags = reader.read_ub2()?;
1031    if image_length > 0 {
1032        // reference: payload = read_bytes()[4:image_length]
1033        let raw = reader
1034            .read_bytes()?
1035            .ok_or(ProtocolError::TtcDecode("AQ payload missing"))?;
1036        if raw.len() < image_length {
1037            return Err(ProtocolError::TtcDecode("AQ payload shorter than declared"));
1038        }
1039        let end = image_length;
1040        let start = 4.min(end);
1041        let payload = raw.get(start..end).unwrap_or_default().to_vec();
1042        if matches!(kind, AqPayloadKind::Json) {
1043            let value = decode_oson_with_limits(&payload, reader.limits())?;
1044            return Ok(Some(AqDeqPayload::Json(value)));
1045        }
1046        return Ok(Some(AqDeqPayload::Raw(payload)));
1047    }
1048    if matches!(kind, AqPayloadKind::Raw) {
1049        return Ok(Some(AqDeqPayload::Raw(Vec::new())));
1050    }
1051    Ok(None)
1052}
1053
1054#[cfg(test)]
1055mod tests {
1056    use super::*;
1057
1058    // Field version negotiated against the 23ai container (lane-1523). >= EXT_1
1059    // (18) so token_num is emitted; >= 21_1 (16) so shard id is emitted.
1060    const FV: u8 = 24;
1061
1062    fn caps() -> ClientCapabilities {
1063        ClientCapabilities {
1064            ttc_field_version: FV,
1065            max_string_size: 32767,
1066            charset_id: 873,
1067        }
1068    }
1069
1070    // Golden RAW enqueue request (FUNC 121), captured from python-oracledb 4.0.1
1071    // against lane-1523: msgproperties(payload=b"sample raw data 1",
1072    // correlation="CORR1", priority=2); enqone. TTC payload (offset 10 past the
1073    // packet header), seq_num=4. See tests/golden/aq_raw.txt.
1074    const GOLDEN_RAW_ENQ: &[u8] = &[
1075        0x03, 0x79, 0x04, 0x00, 0x01, 0x01, 0x0e, 0x01, 0x02, 0x00, 0x81, 0x01, 0x01, 0x05, 0x05,
1076        0x43, 0x4f, 0x52, 0x52, 0x31, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x04, 0x0e, 0x00, 0x00,
1077        0x01, 0x40, 0x00, 0x00, 0x01, 0x41, 0x00, 0x01, 0x01, 0x01, 0x00, 0x01, 0x42, 0x00, 0x00,
1078        0x01, 0x45, 0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff, 0xff, 0x01, 0x00, 0x01, 0x02,
1079        0x00, 0x00, 0x00, 0x01, 0x01, 0x10, 0x01, 0x01, 0x00, 0x01, 0x01, 0x11, 0x01, 0x01, 0x10,
1080        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1081        0x00, 0x00, 0x00, 0x00, 0x0e, 0x54, 0x45, 0x53, 0x54, 0x5f, 0x52, 0x41, 0x57, 0x5f, 0x51,
1082        0x55, 0x45, 0x55, 0x45, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1083        0x00, 0x00, 0x00, 0x00, 0x17, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x20, 0x72, 0x61, 0x77,
1084        0x20, 0x64, 0x61, 0x74, 0x61, 0x20, 0x31,
1085    ];
1086
1087    // Golden RAW dequeue request (FUNC 122): deqoptions wait=NO_WAIT,
1088    // navigation=DEQ_FIRST_MSG; deqone. seq_num=6.
1089    const GOLDEN_RAW_DEQ: &[u8] = &[
1090        0x03, 0x7a, 0x06, 0x00, 0x01, 0x01, 0x0e, 0x01, 0x01, 0x01, 0x01, 0x00, 0x00, 0x01, 0x03,
1091        0x01, 0x01, 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x10, 0x01, 0x01, 0x01,
1092        0x01, 0x01, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff, 0xff, 0x0e,
1093        0x54, 0x45, 0x53, 0x54, 0x5f, 0x52, 0x41, 0x57, 0x5f, 0x51, 0x55, 0x45, 0x55, 0x45, 0x00,
1094        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x17,
1095    ];
1096
1097    #[test]
1098    fn raw_enqueue_request_matches_golden() {
1099        let queue = AqQueueDesc::new("TEST_RAW_QUEUE".to_string(), AqPayloadKind::Raw, None);
1100        let props = AqMsgProps {
1101            priority: 2,
1102            correlation: Some("CORR1".to_string()),
1103            payload: Some(AqPayloadValue::Raw(b"sample raw data 1".to_vec())),
1104            ..AqMsgProps::default()
1105        };
1106        let bytes = build_aq_enq_payload(&queue, &props, &AqEnqOptions::default(), 4, FV, false)
1107            .expect("build enqueue");
1108        assert_eq!(bytes, GOLDEN_RAW_ENQ);
1109    }
1110
1111    #[test]
1112    fn raw_dequeue_request_matches_golden() {
1113        let queue = AqQueueDesc::new("TEST_RAW_QUEUE".to_string(), AqPayloadKind::Raw, None);
1114        let deq = AqDeqOptions {
1115            wait: 0,
1116            navigation: 1,
1117            ..AqDeqOptions::default()
1118        };
1119        let bytes = build_aq_deq_payload(&queue, &deq, 6, FV).expect("build dequeue");
1120        assert_eq!(bytes, GOLDEN_RAW_DEQ);
1121    }
1122
1123    #[test]
1124    fn empty_queue_dequeue_yields_no_message() {
1125        // ORA-25228 is cleared and surfaces as no message. We can't synthesize a
1126        // full error packet trivially here, so just confirm an empty response
1127        // (status-only) yields None without error.
1128        let caps = caps();
1129        let res = parse_aq_deq_response(&[], caps, &AqPayloadKind::Raw).expect("parse");
1130        assert!(res.message.is_none());
1131    }
1132
1133    fn deq_response_with_raw_image(image_length: u32, raw_image: &[u8]) -> Vec<u8> {
1134        let mut writer = TtcWriter::new();
1135        writer.write_u8(TNS_MSG_TYPE_PARAMETER);
1136        writer.write_ub4(1);
1137        write_msg_props(&mut writer, &AqMsgProps::default(), FV).expect("write message props");
1138        writer.write_ub4(0); // recipients
1139        writer.write_ub4(0); // TOID
1140        writer.write_ub4(0); // OID
1141        writer.write_ub4(0); // snapshot
1142        writer.write_ub2(0); // version
1143        writer.write_ub4(image_length);
1144        writer.write_ub2(0); // flags
1145        writer
1146            .write_bytes_with_length(raw_image)
1147            .expect("write raw image field");
1148        writer.write_raw(&[0u8; TNS_AQ_MESSAGE_ID_LENGTH]);
1149        writer.write_u8(TNS_MSG_TYPE_END_OF_RESPONSE);
1150        writer.into_bytes()
1151    }
1152
1153    #[test]
1154    fn raw_dequeue_rejects_declared_image_length_shortfall() {
1155        let response = deq_response_with_raw_image(8, &[0, 0, 0, 0, b'A', b'B']);
1156        let err = parse_aq_deq_response(&response, caps(), &AqPayloadKind::Raw)
1157            .expect_err("short RAW image must fail");
1158        assert!(matches!(err, ProtocolError::TtcDecode(_)), "got {err:?}");
1159    }
1160
1161    #[test]
1162    fn json_dequeue_rejects_declared_image_length_shortfall() {
1163        let response = deq_response_with_raw_image(8, &[0, 0, 0, 0, b'A', b'B']);
1164        let err = parse_aq_deq_response(&response, caps(), &AqPayloadKind::Json)
1165            .expect_err("short JSON image must fail");
1166        assert!(matches!(err, ProtocolError::TtcDecode(_)), "got {err:?}");
1167    }
1168
1169    #[test]
1170    fn raw_dequeue_accepts_exact_declared_image_length() {
1171        let response = deq_response_with_raw_image(6, &[0, 0, 0, 0, b'A', b'B']);
1172        let res = parse_aq_deq_response(&response, caps(), &AqPayloadKind::Raw)
1173            .expect("exact RAW image should parse");
1174        let message = res.message.expect("message present");
1175        match message.payload {
1176            Some(AqDeqPayload::Raw(payload)) => assert_eq!(payload, vec![b'A', b'B']),
1177            other => panic!("unexpected payload {other:?}"),
1178        }
1179    }
1180
1181    #[test]
1182    fn aq_array_response_respects_protocol_batch_limit() {
1183        let limits = ProtocolLimits {
1184            max_batch_rows: 1,
1185            ..ProtocolLimits::DEFAULT
1186        };
1187        let err = parse_aq_array_response_with_limits(
1188            &[],
1189            caps(),
1190            TNS_AQ_ARRAY_ENQ,
1191            2,
1192            &AqPayloadKind::Raw,
1193            limits,
1194        )
1195        .expect_err("client-side AQ batch count above policy must fail");
1196        assert!(
1197            matches!(
1198                err,
1199                ProtocolError::ResourceLimit {
1200                    limit: "batch_rows",
1201                    observed: 2,
1202                    maximum: 1,
1203                }
1204            ),
1205            "got {err:?}"
1206        );
1207    }
1208
1209    // Golden JSON enqueue (FUNC 121): msgproperties(payload=dict(name="John",
1210    // age=30, city="NY")); enqone against TEST_JSON_QUEUE, seq_num=4. The OSON
1211    // image (after `ff 4a 5a 01`) also exercises encode_oson byte-parity.
1212    const GOLDEN_JSON_ENQ: &[u8] = &[
1213        0x03, 0x79, 0x04, 0x00, 0x01, 0x01, 0x0f, 0x00, 0x00, 0x81, 0x01, 0x00, 0x00, 0x00, 0x00,
1214        0x00, 0x00, 0x01, 0x04, 0x0e, 0x00, 0x00, 0x01, 0x40, 0x00, 0x00, 0x01, 0x41, 0x00, 0x01,
1215        0x01, 0x01, 0x00, 0x01, 0x42, 0x00, 0x00, 0x01, 0x45, 0x00, 0x00, 0x00, 0x00, 0x04, 0xff,
1216        0xff, 0xff, 0xff, 0x01, 0x00, 0x01, 0x02, 0x00, 0x00, 0x00, 0x01, 0x01, 0x10, 0x01, 0x01,
1217        0x00, 0x00, 0x00, 0x01, 0x01, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1218        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x0f, 0x54, 0x45, 0x53, 0x54,
1219        0x5f, 0x4a, 0x53, 0x4f, 0x4e, 0x5f, 0x51, 0x55, 0x45, 0x55, 0x45, 0x00, 0x00, 0x00, 0x00,
1220        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x47, 0x01, 0x28, 0x00,
1221        0x26, 0x00, 0x04, 0x61, 0x08, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1222        0x00, 0x43, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1223        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x43, 0xff, 0x4a, 0x5a, 0x01, 0x21,
1224        0x02, 0x03, 0x00, 0x0e, 0x00, 0x1f, 0x00, 0x00, 0x42, 0x9c, 0xe6, 0x00, 0x09, 0x00, 0x05,
1225        0x00, 0x00, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x03, 0x61, 0x67, 0x65, 0x04, 0x63, 0x69, 0x74,
1226        0x79, 0xa4, 0x03, 0x03, 0x02, 0x01, 0x00, 0x00, 0x00, 0x11, 0x00, 0x00, 0x00, 0x17, 0x00,
1227        0x00, 0x00, 0x1b, 0x33, 0x04, 0x4a, 0x6f, 0x68, 0x6e, 0x34, 0x02, 0xc1, 0x1f, 0x33, 0x02,
1228        0x4e, 0x59,
1229    ];
1230
1231    #[test]
1232    fn json_enqueue_request_matches_golden() {
1233        let queue = AqQueueDesc::new("TEST_JSON_QUEUE".to_string(), AqPayloadKind::Json, None);
1234        // Insertion-ordered object {name, age, city}.
1235        let value = OsonValue::Object(vec![
1236            ("name".to_string(), OsonValue::String("John".to_string())),
1237            ("age".to_string(), OsonValue::Number("30".to_string())),
1238            ("city".to_string(), OsonValue::String("NY".to_string())),
1239        ]);
1240        let props = AqMsgProps {
1241            payload: Some(AqPayloadValue::Json(value)),
1242            ..AqMsgProps::default()
1243        };
1244        // Container negotiates >= 23ai so supports_oson_long_fnames = true.
1245        let bytes = build_aq_enq_payload(&queue, &props, &AqEnqOptions::default(), 4, FV, true)
1246            .expect("build json enqueue");
1247        assert_eq!(bytes, GOLDEN_JSON_ENQ);
1248    }
1249}