Skip to main content

oracledb_protocol/thin/
aq.rs

1#![forbid(unsafe_code)]
2
3//! Sans-io codecs for Oracle Advanced Queuing (AQ) enqueue/dequeue operations.
4//!
5//! Mirrors the reference thin driver's `impl/thin/messages/aq_*.pyx`:
6//! - [`build_aq_enq_payload`] / [`parse_aq_enq_response`]  — FUNC 121 (single enqueue)
7//! - [`build_aq_deq_payload`] / [`parse_aq_deq_response`]  — FUNC 122 (single dequeue)
8//! - [`build_aq_array_enq_payload`] / [`build_aq_array_deq_payload`]
9//!   / [`parse_aq_array_response`]                          — FUNC 145 (bulk enqueue/dequeue)
10//!
11//! The message-property / payload codecs are shared between all three so the
12//! wire encoding is byte-identical to python-oracledb (golden traces under
13//! `tests/golden/aq_*.txt`). Object payloads reuse [`super::dbobject`] and JSON
14//! payloads reuse [`crate::oson`].
15
16use super::*;
17use crate::oson::{decode_oson, encode_oson, OsonValue};
18
19/// Payload classification for a queue. Determines the TOID sentinel and the
20/// payload-encoding branch taken during enqueue/dequeue.
21#[derive(Clone, Debug, PartialEq, Eq)]
22pub enum AqPayloadKind {
23    /// RAW / bytes payload. TOID sentinel `bytes([0]*15 + [0x17])`.
24    Raw,
25    /// JSON payload (OSON). TOID sentinel `bytes([0]*15 + [0x47])`.
26    Json,
27    /// Object payload of a named type. TOID is the type's OID.
28    Object,
29}
30
31/// Static description of an AQ queue, used by both enqueue and dequeue.
32#[derive(Clone, Debug)]
33pub struct AqQueueDesc {
34    pub name: String,
35    pub kind: AqPayloadKind,
36    /// TOID of the payload: 16-byte sentinel for RAW/JSON, the type OID for
37    /// object queues.
38    pub payload_toid: Vec<u8>,
39}
40
41impl AqQueueDesc {
42    /// Builds the queue descriptor, deriving the payload TOID from the kind.
43    /// For object queues `object_oid` must carry the type's OID.
44    pub fn new(name: String, kind: AqPayloadKind, object_oid: Option<Vec<u8>>) -> Self {
45        let payload_toid = match kind {
46            AqPayloadKind::Raw => raw_payload_toid(),
47            AqPayloadKind::Json => json_payload_toid(),
48            AqPayloadKind::Object => object_oid.unwrap_or_default(),
49        };
50        Self {
51            name,
52            kind,
53            payload_toid,
54        }
55    }
56}
57
58fn raw_payload_toid() -> Vec<u8> {
59    let mut toid = vec![0u8; 15];
60    toid.push(0x17);
61    toid
62}
63
64fn json_payload_toid() -> Vec<u8> {
65    let mut toid = vec![0u8; 15];
66    toid.push(0x47);
67    toid
68}
69
70/// The payload value carried by a message being enqueued.
71#[derive(Clone, Debug)]
72pub enum AqPayloadValue {
73    /// RAW bytes (already UTF-8 encoded if originally a string).
74    Raw(Vec<u8>),
75    /// JSON value encoded as OSON.
76    Json(OsonValue),
77    /// Pre-packed object image (the body produced by `DbObjectImpl::pack_image`).
78    Object { oid: Vec<u8>, image: Vec<u8> },
79}
80
81/// Mutable message properties (reference `ThinMsgPropsImpl`). Defaults match
82/// the reference: `delay=0`, `expiration=-1`, `priority=0`, `state=0`.
83#[derive(Clone, Debug)]
84pub struct AqMsgProps {
85    pub priority: i32,
86    pub delay: i32,
87    pub expiration: i32,
88    pub correlation: Option<String>,
89    pub exception_queue: Option<String>,
90    pub state: i32,
91    pub enq_txn_id: Option<Vec<u8>>,
92    /// Recipient names (multi-consumer enqueue). `None` => no recipient list.
93    pub recipients: Option<Vec<String>>,
94    /// Payload to enqueue. Required for enqueue; ignored for dequeue requests
95    /// (where defaults are written).
96    pub payload: Option<AqPayloadValue>,
97}
98
99impl Default for AqMsgProps {
100    fn default() -> Self {
101        Self {
102            priority: 0,
103            delay: 0,
104            expiration: -1,
105            correlation: None,
106            exception_queue: None,
107            state: 0,
108            enq_txn_id: None,
109            // Reference `ThinMsgPropsImpl.__init__` defaults recipients to an
110            // empty list (not None): an empty list still writes pointer=1 with
111            // a zero count, whereas None writes pointer=0.
112            recipients: Some(Vec::new()),
113            payload: None,
114        }
115    }
116}
117
118/// Enqueue options (reference `ThinEnqOptionsImpl`). `visibility` defaults to
119/// `ENQ_ON_COMMIT (2)`, `delivery_mode` to `PERSISTENT (1)`.
120#[derive(Clone, Debug)]
121pub struct AqEnqOptions {
122    pub visibility: u32,
123    pub delivery_mode: u16,
124}
125
126impl Default for AqEnqOptions {
127    fn default() -> Self {
128        Self {
129            visibility: 2,
130            delivery_mode: TNS_AQ_MSG_PERSISTENT,
131        }
132    }
133}
134
135/// Dequeue options (reference `ThinDeqOptionsImpl`). Defaults: `mode=REMOVE(3)`,
136/// `navigation=NEXT_MSG(3)`, `visibility=ON_COMMIT(2)`, `wait=WAIT_FOREVER`,
137/// `delivery_mode=PERSISTENT(1)`.
138#[derive(Clone, Debug)]
139pub struct AqDeqOptions {
140    pub condition: Option<String>,
141    pub consumer_name: Option<String>,
142    pub correlation: Option<String>,
143    pub delivery_mode: u16,
144    pub mode: i32,
145    pub msgid: Option<Vec<u8>>,
146    pub navigation: i32,
147    pub visibility: i32,
148    pub wait: u32,
149}
150
151impl Default for AqDeqOptions {
152    fn default() -> Self {
153        Self {
154            condition: None,
155            consumer_name: None,
156            correlation: None,
157            delivery_mode: TNS_AQ_MSG_PERSISTENT,
158            mode: 3,
159            msgid: None,
160            navigation: 3,
161            visibility: 2,
162            wait: 0xFFFF_FFFF,
163        }
164    }
165}
166
167/// A message returned by dequeue (reference fields read by `_process_msg_props`
168/// / `_process_payload`).
169#[derive(Clone, Debug, Default)]
170pub struct AqDeqMessage {
171    pub priority: i32,
172    pub delay: i32,
173    pub expiration: i32,
174    pub correlation: Option<String>,
175    pub num_attempts: i32,
176    pub exception_queue: Option<String>,
177    pub state: i32,
178    /// Oracle enqueue time decoded to a naive datetime, or `None`.
179    pub enq_time: Option<QueryValue>,
180    pub delivery_mode: u16,
181    pub msgid: Option<Vec<u8>>,
182    /// Decoded payload. `None` for an empty-payload message.
183    pub payload: Option<AqDeqPayload>,
184}
185
186/// A decoded dequeue payload.
187#[derive(Clone, Debug)]
188pub enum AqDeqPayload {
189    /// RAW bytes (may be empty for `DEQ_REMOVE_NODATA`).
190    Raw(Vec<u8>),
191    /// JSON decoded from OSON.
192    Json(OsonValue),
193    /// Object payload: the raw packed image (unpacked by the shim against the
194    /// queue's payload type).
195    Object(Vec<u8>),
196}
197
198// ---------------------------------------------------------------------------
199// Shared message-property and payload codecs (reference aq_base.pyx).
200// ---------------------------------------------------------------------------
201
202/// Writes the TTC function-code preamble: message-type/function/seq plus the
203/// `token_num` ub8 the server expects when the negotiated field version is at
204/// least `23.1 EXT 1` (reference `Message._write_function_code`).
205fn write_aq_function_code(
206    writer: &mut TtcWriter,
207    function_code: u8,
208    seq_num: u8,
209    ttc_field_version: u8,
210) {
211    writer.write_function_code_with_seq(function_code, seq_num);
212    if ttc_field_version >= TNS_CCAP_FIELD_VERSION_23_1_EXT_1 {
213        writer.write_ub8(0); // token_num
214    }
215}
216
217fn write_value_with_length(writer: &mut TtcWriter, value: Option<&[u8]>) -> Result<()> {
218    match value {
219        None => {
220            writer.write_ub4(0);
221            Ok(())
222        }
223        Some(bytes) => writer.write_bytes_with_two_lengths(Some(bytes)),
224    }
225}
226
227/// Writes the AQ message-property block (`_write_msg_props`).
228fn write_msg_props(
229    writer: &mut TtcWriter,
230    props: &AqMsgProps,
231    ttc_field_version: u8,
232) -> Result<()> {
233    writer.write_ub4(props.priority as u32);
234    writer.write_ub4(props.delay as u32);
235    writer.write_sb4(props.expiration);
236    write_value_with_length(writer, props.correlation.as_deref().map(str::as_bytes))?;
237    writer.write_ub4(0); // number of attempts
238    write_value_with_length(writer, props.exception_queue.as_deref().map(str::as_bytes))?;
239    writer.write_ub4(props.state as u32);
240    writer.write_ub4(0); // enqueue time length
241    write_value_with_length(writer, props.enq_txn_id.as_deref())?;
242    writer.write_ub4(4); // number of extensions
243    writer.write_u8(0x0e); // unknown extra byte
244    writer.write_keyword_value_pair(None, None, TNS_AQ_EXT_KEYWORD_AGENT_NAME)?;
245    writer.write_keyword_value_pair(None, None, TNS_AQ_EXT_KEYWORD_AGENT_ADDRESS)?;
246    writer.write_keyword_value_pair(None, Some(b"\x00"), TNS_AQ_EXT_KEYWORD_AGENT_PROTOCOL)?;
247    writer.write_keyword_value_pair(None, None, TNS_AQ_EXT_KEYWORD_ORIGINAL_MSGID)?;
248    writer.write_ub4(0); // user property
249    writer.write_ub4(0); // cscn
250    writer.write_ub4(0); // dscn
251    writer.write_ub4(0); // flags
252    if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
253        writer.write_ub4(0xFFFF_FFFF); // shard id
254    }
255    Ok(())
256}
257
258/// Writes the recipient-list key/value pairs (`_write_recipients`).
259fn write_recipients(writer: &mut TtcWriter, recipients: &[String]) -> Result<()> {
260    let mut index: u16 = 0;
261    for recipient in recipients {
262        writer.write_keyword_value_pair(Some(recipient.as_bytes()), None, index)?;
263        writer.write_keyword_value_pair(None, None, index + 1)?;
264        writer.write_keyword_value_pair(None, Some(b"\x00"), index + 2)?;
265        index += 3;
266    }
267    Ok(())
268}
269
270/// Writes the message payload (`_write_payload`).
271fn write_payload(
272    writer: &mut TtcWriter,
273    payload: &AqPayloadValue,
274    supports_oson_long_fnames: bool,
275) -> Result<()> {
276    match payload {
277        AqPayloadValue::Json(value) => {
278            // write_oson(..., write_length=False): a QLocator (no chunk-length
279            // prefix) followed by the OSON image as a length-prefixed chunk.
280            let image = encode_oson(value, supports_oson_long_fnames)?;
281            crate::vector::write_oson_aq_payload(writer, &image)
282        }
283        AqPayloadValue::Object { oid, image } => write_dbobject_bind(writer, oid, image),
284        AqPayloadValue::Raw(bytes) => {
285            writer.write_raw(bytes);
286            Ok(())
287        }
288    }
289}
290
291// ---------------------------------------------------------------------------
292// FUNC 121 — single enqueue
293// ---------------------------------------------------------------------------
294
295/// Builds the AQ enqueue (FUNC 121) request payload (reference
296/// `AqEnqMessage._write_message`).
297pub fn build_aq_enq_payload(
298    queue: &AqQueueDesc,
299    props: &AqMsgProps,
300    enq_options: &AqEnqOptions,
301    seq_num: u8,
302    ttc_field_version: u8,
303    supports_oson_long_fnames: bool,
304) -> Result<Vec<u8>> {
305    let payload = props
306        .payload
307        .as_ref()
308        .ok_or(ProtocolError::TtcDecode("AQ enqueue has no payload"))?;
309    let queue_name = queue.name.as_bytes();
310    let mut writer = TtcWriter::new();
311    write_aq_function_code(&mut writer, TNS_FUNC_AQ_ENQ, seq_num, ttc_field_version);
312    writer.write_u8(1); // queue name (pointer)
313    writer.write_ub4(queue_name.len() as u32);
314    write_msg_props(&mut writer, props, ttc_field_version)?;
315    match props.recipients.as_ref() {
316        None => {
317            writer.write_u8(0); // recipients (pointer)
318            writer.write_ub4(0); // number of key/value pairs
319        }
320        Some(recipients) => {
321            writer.write_u8(1);
322            writer.write_ub4(3 * recipients.len() as u32);
323        }
324    }
325    writer.write_ub4(enq_options.visibility);
326    writer.write_u8(0); // relative message id (pointer)
327    writer.write_ub4(0); // relative message length
328    writer.write_ub4(0); // sequence deviation
329    writer.write_u8(1); // TOID of payload (pointer)
330    writer.write_ub4(16); // TOID of payload length
331    writer.write_ub2(TNS_AQ_MESSAGE_VERSION);
332    match queue.kind {
333        AqPayloadKind::Json => {
334            writer.write_u8(0); // payload (pointer)
335            writer.write_u8(0); // RAW payload (pointer)
336            writer.write_ub4(0); // RAW payload length
337        }
338        AqPayloadKind::Object => {
339            writer.write_u8(1); // payload (pointer)
340            writer.write_u8(0); // RAW payload (pointer)
341            writer.write_ub4(0); // RAW payload length
342        }
343        AqPayloadKind::Raw => {
344            let raw_len = match payload {
345                AqPayloadValue::Raw(bytes) => bytes.len() as u32,
346                _ => return Err(ProtocolError::TtcDecode("RAW queue requires RAW payload")),
347            };
348            writer.write_u8(0); // payload (pointer)
349            writer.write_u8(1); // RAW payload (pointer)
350            writer.write_ub4(raw_len);
351        }
352    }
353    writer.write_u8(1); // return message id (pointer)
354    writer.write_ub4(TNS_AQ_MESSAGE_ID_LENGTH as u32);
355    let mut enq_flags = 0u32;
356    if enq_options.delivery_mode == TNS_AQ_MSG_BUFFERED {
357        enq_flags |= TNS_KPD_AQ_BUFMSG;
358    }
359    writer.write_ub4(enq_flags); // enqueue flags
360    writer.write_u8(0); // extensions 1 (pointer)
361    writer.write_ub4(0); // number of extensions 1
362    writer.write_u8(0); // extensions 2 (pointer)
363    writer.write_ub4(0); // number of extensions 2
364    writer.write_u8(0); // source sequence number
365    writer.write_ub4(0); // source sequence length
366    writer.write_u8(0); // max sequence number
367    writer.write_ub4(0); // max sequence length
368    writer.write_u8(0); // output ack length
369    writer.write_u8(0); // correlation (pointer)
370    writer.write_ub4(0); // correlation length
371    writer.write_u8(0); // sender name (pointer)
372    writer.write_ub4(0); // sender name length
373    writer.write_u8(0); // sender address (pointer)
374    writer.write_ub4(0); // sender address length
375    writer.write_u8(0); // sender charset id (pointer)
376    writer.write_u8(0); // sender ncharset id (pointer)
377    if ttc_field_version >= TNS_CCAP_FIELD_VERSION_20_1 {
378        // JSON payload (pointer)
379        writer.write_u8(u8::from(queue.kind == AqPayloadKind::Json));
380    }
381
382    writer.write_bytes_with_length(queue_name)?;
383    if let Some(recipients) = props.recipients.as_ref() {
384        write_recipients(&mut writer, recipients)?;
385    }
386    writer.write_raw(&queue.payload_toid);
387    write_payload(&mut writer, payload, supports_oson_long_fnames)?;
388    Ok(writer.into_bytes())
389}
390
391/// Parses an AQ enqueue (FUNC 121) response, returning the assigned 16-byte
392/// message id (reference `AqEnqMessage._process_return_parameters`).
393pub fn parse_aq_enq_response(
394    payload: &[u8],
395    capabilities: ClientCapabilities,
396) -> Result<Option<Vec<u8>>> {
397    let mut reader = TtcReader::new(payload);
398    let mut msgid: Option<Vec<u8>> = None;
399    while reader.remaining() > 0 {
400        let message_type = reader.read_u8()?;
401        match message_type {
402            0 => {}
403            TNS_MSG_TYPE_PARAMETER => {
404                let id = reader.read_raw(TNS_AQ_MESSAGE_ID_LENGTH)?.to_vec();
405                let _ext_len = reader.read_ub2()?;
406                msgid = Some(id);
407            }
408            TNS_MSG_TYPE_STATUS => {
409                let _call_status = reader.read_ub4()?;
410                let _seq = reader.read_ub2()?;
411            }
412            TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
413                let _ = skip_server_side_piggyback(&mut reader)?;
414            }
415            TNS_MSG_TYPE_END_OF_RESPONSE => break,
416            TNS_MSG_TYPE_ERROR => {
417                let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
418                if info.number != 0 {
419                    return Err(ProtocolError::ServerErrorInfo(Box::new(
420                        info.into_details(),
421                    )));
422                }
423            }
424            _ => {
425                return Err(ProtocolError::UnknownMessageType {
426                    message_type,
427                    position: reader.position().saturating_sub(1),
428                })
429            }
430        }
431    }
432    Ok(msgid)
433}
434
435// ---------------------------------------------------------------------------
436// FUNC 122 — single dequeue
437// ---------------------------------------------------------------------------
438
439/// Builds the AQ dequeue (FUNC 122) request payload (reference
440/// `AqDeqMessage._write_message`).
441pub fn build_aq_deq_payload(
442    queue: &AqQueueDesc,
443    deq_options: &AqDeqOptions,
444    seq_num: u8,
445    ttc_field_version: u8,
446) -> Result<Vec<u8>> {
447    let queue_name = queue.name.as_bytes();
448    let mut writer = TtcWriter::new();
449    write_aq_function_code(&mut writer, TNS_FUNC_AQ_DEQ, seq_num, ttc_field_version);
450    writer.write_u8(1); // queue name (pointer)
451    writer.write_ub4(queue_name.len() as u32);
452    writer.write_u8(1); // message properties
453    writer.write_u8(1); // msg props length
454    writer.write_u8(1); // recipient list
455    writer.write_u8(1); // recipient list length
456    let consumer_name = deq_options
457        .consumer_name
458        .as_ref()
459        .filter(|name| !name.is_empty());
460    match consumer_name {
461        Some(name) => {
462            writer.write_u8(1);
463            writer.write_ub4(name.len() as u32);
464        }
465        None => {
466            writer.write_u8(0);
467            writer.write_ub4(0);
468        }
469    }
470    writer.write_sb4(deq_options.mode);
471    writer.write_sb4(deq_options.navigation);
472    writer.write_sb4(deq_options.visibility);
473    writer.write_sb4(deq_options.wait as i32);
474    let msgid = deq_options.msgid.as_ref().filter(|id| !id.is_empty());
475    match msgid {
476        Some(_) => {
477            writer.write_u8(1);
478            writer.write_ub4(TNS_AQ_MESSAGE_ID_LENGTH as u32);
479        }
480        None => {
481            writer.write_u8(0);
482            writer.write_ub4(0);
483        }
484    }
485    let correlation = deq_options.correlation.as_ref().filter(|c| !c.is_empty());
486    match correlation {
487        Some(c) => {
488            writer.write_u8(1);
489            writer.write_ub4(c.len() as u32);
490        }
491        None => {
492            writer.write_u8(0);
493            writer.write_ub4(0);
494        }
495    }
496    writer.write_u8(1); // toid of payload
497    writer.write_ub4(16); // toid length
498    writer.write_ub2(TNS_AQ_MESSAGE_VERSION);
499    writer.write_u8(1); // payload
500    writer.write_u8(1); // return msg id
501    writer.write_ub4(TNS_AQ_MESSAGE_ID_LENGTH as u32);
502    let mut deq_flags = 0u32;
503    match deq_options.delivery_mode {
504        TNS_AQ_MSG_BUFFERED => deq_flags |= TNS_KPD_AQ_BUFMSG,
505        TNS_AQ_MSG_PERSISTENT_OR_BUFFERED => deq_flags |= TNS_KPD_AQ_EITHER,
506        _ => {}
507    }
508    writer.write_ub4(deq_flags);
509    let condition = deq_options.condition.as_ref().filter(|c| !c.is_empty());
510    match condition {
511        Some(c) => {
512            writer.write_u8(1);
513            writer.write_ub4(c.len() as u32);
514        }
515        None => {
516            writer.write_u8(0);
517            writer.write_ub4(0);
518        }
519    }
520    writer.write_u8(0); // extensions
521    writer.write_ub4(0); // number of extensions
522    if ttc_field_version >= TNS_CCAP_FIELD_VERSION_20_1 {
523        writer.write_u8(0); // JSON payload
524    }
525    if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
526        writer.write_ub4(0xFFFF_FFFF); // shard id (-1)
527    }
528
529    writer.write_bytes_with_length(queue_name)?;
530    if let Some(name) = consumer_name {
531        writer.write_bytes_with_length(name.as_bytes())?;
532    }
533    if let Some(id) = msgid {
534        let mut id = id.clone();
535        id.truncate(16);
536        if id.len() < 16 {
537            id.resize(16, 0);
538        }
539        writer.write_raw(&id);
540    }
541    if let Some(c) = correlation {
542        writer.write_bytes_with_length(c.as_bytes())?;
543    }
544    writer.write_raw(&queue.payload_toid);
545    if let Some(c) = condition {
546        writer.write_bytes_with_length(c.as_bytes())?;
547    }
548    Ok(writer.into_bytes())
549}
550
551/// Outcome of a single dequeue.
552#[derive(Clone, Debug, Default)]
553pub struct AqDeqResult {
554    /// The dequeued message, or `None` when the queue was empty (ORA-25228).
555    pub message: Option<AqDeqMessage>,
556}
557
558/// Parses an AQ dequeue (FUNC 122) response (reference
559/// `AqDeqMessage._process_return_parameters`).
560pub fn parse_aq_deq_response(
561    payload: &[u8],
562    capabilities: ClientCapabilities,
563    kind: &AqPayloadKind,
564) -> Result<AqDeqResult> {
565    let mut reader = TtcReader::new(payload);
566    let mut result = AqDeqResult::default();
567    let mut no_msg_found = false;
568    while reader.remaining() > 0 {
569        let message_type = reader.read_u8()?;
570        match message_type {
571            0 => {}
572            TNS_MSG_TYPE_PARAMETER => {
573                let num_bytes = reader.read_ub4()?;
574                if num_bytes > 0 {
575                    let mut message = AqDeqMessage::default();
576                    process_msg_props(&mut reader, &mut message, capabilities.ttc_field_version)?;
577                    process_recipients(&mut reader)?;
578                    message.payload = process_payload(&mut reader, kind)?;
579                    message.msgid = Some(process_msg_id(&mut reader)?);
580                    result.message = Some(message);
581                }
582            }
583            TNS_MSG_TYPE_STATUS => {
584                let _call_status = reader.read_ub4()?;
585                let _seq = reader.read_ub2()?;
586            }
587            TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
588                let _ = skip_server_side_piggyback(&mut reader)?;
589            }
590            TNS_MSG_TYPE_END_OF_RESPONSE => break,
591            TNS_MSG_TYPE_ERROR => {
592                let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
593                if info.number == TNS_ERR_NO_MESSAGES_FOUND as u32 {
594                    no_msg_found = true;
595                } else if info.number != 0 {
596                    return Err(ProtocolError::ServerErrorInfo(Box::new(
597                        info.into_details(),
598                    )));
599                }
600            }
601            _ => {
602                return Err(ProtocolError::UnknownMessageType {
603                    message_type,
604                    position: reader.position().saturating_sub(1),
605                })
606            }
607        }
608    }
609    if no_msg_found {
610        result.message = None;
611    }
612    Ok(result)
613}
614
615// ---------------------------------------------------------------------------
616// FUNC 145 — bulk enqueue / dequeue
617// ---------------------------------------------------------------------------
618
619/// Builds the AQ array enqueue (FUNC 145, op=ENQ) request payload (reference
620/// `AqArrayMessage._write_message` + `_write_array_enq`).
621pub fn build_aq_array_enq_payload(
622    queue: &AqQueueDesc,
623    props_list: &[AqMsgProps],
624    enq_options: &AqEnqOptions,
625    seq_num: u8,
626    ttc_field_version: u8,
627    supports_oson_long_fnames: bool,
628) -> Result<Vec<u8>> {
629    let num_iters = props_list.len() as u32;
630    let queue_name = queue.name.as_bytes();
631    let mut writer = TtcWriter::new();
632    write_aq_function_code(&mut writer, TNS_FUNC_AQ_ARRAY, seq_num, ttc_field_version);
633    writer.write_u8(0); // input params (pointer)
634    writer.write_ub4(0); // length
635    writer.write_ub4(TNS_AQ_ARRAY_FLAGS_RETURN_MESSAGE_ID);
636    writer.write_u8(1); // output params (pointer)
637    writer.write_u8(0); // length
638    writer.write_sb4(TNS_AQ_ARRAY_ENQ);
639    writer.write_u8(1); // num iters (pointer)
640    if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
641        writer.write_ub4(0xFFFF); // shard id
642    }
643    writer.write_ub4(num_iters);
644
645    let mut flags = 0u32;
646    if enq_options.delivery_mode == TNS_AQ_MSG_BUFFERED {
647        flags |= TNS_KPD_AQ_BUFMSG;
648    }
649    writer.write_ub4(0); // rel msgid len
650    writer.write_u8(TNS_MSG_TYPE_ROW_HEADER);
651    writer.write_bytes_with_two_lengths(Some(queue_name))?;
652    writer.write_raw(&queue.payload_toid);
653    writer.write_ub2(TNS_AQ_MESSAGE_VERSION);
654    writer.write_ub4(flags);
655    for props in props_list {
656        let payload = props
657            .payload
658            .as_ref()
659            .ok_or(ProtocolError::TtcDecode("AQ array enqueue has no payload"))?;
660        writer.write_u8(TNS_MSG_TYPE_ROW_DATA);
661        writer.write_ub4(flags); // aqi flags
662        write_msg_props(&mut writer, props, ttc_field_version)?;
663        match props.recipients.as_ref() {
664            None => writer.write_ub4(0),
665            Some(recipients) => {
666                writer.write_ub4(3 * recipients.len() as u32);
667                write_recipients(&mut writer, recipients)?;
668            }
669        }
670        writer.write_sb4(enq_options.visibility as i32);
671        writer.write_ub4(0); // relative msg id
672        writer.write_sb4(0); // seq deviation
673        if matches!(queue.kind, AqPayloadKind::Raw) {
674            let raw_len = match payload {
675                AqPayloadValue::Raw(bytes) => bytes.len() as u32,
676                _ => return Err(ProtocolError::TtcDecode("RAW queue requires RAW payload")),
677            };
678            writer.write_ub4(raw_len);
679        }
680        write_payload(&mut writer, payload, supports_oson_long_fnames)?;
681    }
682    writer.write_u8(TNS_MSG_TYPE_STATUS);
683    Ok(writer.into_bytes())
684}
685
686/// Builds the AQ array dequeue (FUNC 145, op=DEQ) request payload (reference
687/// `AqArrayMessage._write_message` + `_write_array_deq`).
688pub fn build_aq_array_deq_payload(
689    queue: &AqQueueDesc,
690    deq_options: &AqDeqOptions,
691    num_iters: u32,
692    seq_num: u8,
693    ttc_field_version: u8,
694) -> Result<Vec<u8>> {
695    let queue_name = queue.name.as_bytes();
696    let mut writer = TtcWriter::new();
697    write_aq_function_code(&mut writer, TNS_FUNC_AQ_ARRAY, seq_num, ttc_field_version);
698    writer.write_u8(1); // input params (pointer)
699    writer.write_ub4(num_iters);
700    writer.write_ub4(TNS_AQ_ARRAY_FLAGS_RETURN_MESSAGE_ID);
701    writer.write_u8(1); // output params (pointer)
702    writer.write_u8(1); // length
703    writer.write_sb4(TNS_AQ_ARRAY_DEQ);
704    writer.write_u8(0); // num iters (pointer)
705    if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
706        writer.write_ub4(0xFFFF); // shard id
707    }
708
709    let mut flags = 0u32;
710    match deq_options.delivery_mode {
711        TNS_AQ_MSG_BUFFERED => flags |= TNS_KPD_AQ_BUFMSG,
712        TNS_AQ_MSG_PERSISTENT_OR_BUFFERED => flags |= TNS_KPD_AQ_EITHER,
713        _ => {}
714    }
715    let consumer_name = deq_options
716        .consumer_name
717        .as_ref()
718        .filter(|name| !name.is_empty())
719        .map(|name| name.as_bytes());
720    let correlation = deq_options
721        .correlation
722        .as_ref()
723        .filter(|c| !c.is_empty())
724        .map(|c| c.as_bytes());
725    let condition = deq_options
726        .condition
727        .as_ref()
728        .filter(|c| !c.is_empty())
729        .map(|c| c.as_bytes());
730    let props = AqMsgProps::default();
731    for _ in 0..num_iters {
732        writer.write_bytes_with_two_lengths(Some(queue_name))?;
733        write_msg_props(&mut writer, &props, ttc_field_version)?;
734        writer.write_ub4(0); // num recipients
735        write_value_with_length(&mut writer, consumer_name)?;
736        writer.write_sb4(deq_options.mode);
737        writer.write_sb4(deq_options.navigation);
738        writer.write_sb4(deq_options.visibility);
739        writer.write_sb4(deq_options.wait as i32);
740        write_value_with_length(&mut writer, deq_options.msgid.as_deref())?;
741        write_value_with_length(&mut writer, correlation)?;
742        write_value_with_length(&mut writer, condition)?;
743        writer.write_ub4(0); // extensions
744        writer.write_ub4(0); // rel msg id
745        writer.write_sb4(0); // seq deviation
746        writer.write_bytes_with_two_lengths(Some(&queue.payload_toid))?;
747        writer.write_ub2(TNS_AQ_MESSAGE_VERSION);
748        writer.write_ub4(0); // payload length
749        writer.write_ub4(0); // raw pay length
750        writer.write_ub4(0);
751        writer.write_ub4(flags);
752        writer.write_ub4(0); // extensions len
753        writer.write_ub4(0); // source seq len
754    }
755    Ok(writer.into_bytes())
756}
757
758/// Result of a bulk operation: enqueue returns assigned msgids per message;
759/// dequeue returns the dequeued messages (already truncated to `num_iters`).
760#[derive(Clone, Debug, Default)]
761pub struct AqArrayResult {
762    /// For enqueue: assigned msgid per input message, in order.
763    pub enq_msgids: Vec<Vec<u8>>,
764    /// For dequeue: the dequeued messages.
765    pub deq_messages: Vec<AqDeqMessage>,
766}
767
768/// Parses an AQ array (FUNC 145) response for either operation (reference
769/// `AqArrayMessage._process_return_parameters`).
770///
771/// `props_count` is the number of message-property slots prepared client-side
772/// (`num_iters` for enqueue, `max_num_messages` for dequeue).
773pub fn parse_aq_array_response(
774    payload: &[u8],
775    capabilities: ClientCapabilities,
776    operation: i32,
777    props_count: u32,
778    kind: &AqPayloadKind,
779) -> Result<AqArrayResult> {
780    let mut reader = TtcReader::new(payload);
781    let mut result = AqArrayResult::default();
782    let mut messages: Vec<AqDeqMessage> = Vec::new();
783    let mut enq_msgid_blob: Option<Vec<u8>> = None;
784    let mut response_num_iters: u32 = 0;
785    let mut no_msg_found = false;
786    while reader.remaining() > 0 {
787        let message_type = reader.read_u8()?;
788        match message_type {
789            0 => {}
790            TNS_MSG_TYPE_PARAMETER => {
791                let num_iters = reader.read_ub4()?;
792                response_num_iters = num_iters;
793                for i in 0..num_iters {
794                    let mut message = AqDeqMessage::default();
795                    let props_len = reader.read_ub2()?;
796                    if props_len > 0 {
797                        reader.read_u8()?; // skip_ub1
798                        process_msg_props(
799                            &mut reader,
800                            &mut message,
801                            capabilities.ttc_field_version,
802                        )?;
803                    }
804                    process_recipients(&mut reader)?;
805                    let payload_len = reader.read_ub2()?;
806                    if payload_len > 0 {
807                        message.payload = process_payload(&mut reader, kind)?;
808                    }
809                    let msgid = reader.read_bytes_with_length()?.unwrap_or_default();
810                    if operation == TNS_AQ_ARRAY_ENQ {
811                        enq_msgid_blob = Some(msgid);
812                    } else {
813                        message.msgid = Some(msgid);
814                    }
815                    let ext_len = reader.read_ub2()?;
816                    if ext_len > 0 {
817                        return Err(ProtocolError::UnsupportedFeature("AQ array extensions"));
818                    }
819                    let _output_ack = reader.read_ub2()?;
820                    if operation != TNS_AQ_ARRAY_ENQ {
821                        let _ = i;
822                        messages.push(message);
823                    }
824                }
825                if operation == TNS_AQ_ARRAY_ENQ {
826                    response_num_iters = reader.read_ub4()?;
827                }
828            }
829            TNS_MSG_TYPE_STATUS => {
830                let _call_status = reader.read_ub4()?;
831                let _seq = reader.read_ub2()?;
832            }
833            TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
834                let _ = skip_server_side_piggyback(&mut reader)?;
835            }
836            TNS_MSG_TYPE_END_OF_RESPONSE => break,
837            TNS_MSG_TYPE_ERROR => {
838                let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
839                if info.number == TNS_ERR_NO_MESSAGES_FOUND as u32 {
840                    no_msg_found = true;
841                } else if info.number != 0 {
842                    return Err(ProtocolError::ServerErrorInfo(Box::new(
843                        info.into_details(),
844                    )));
845                }
846            }
847            _ => {
848                return Err(ProtocolError::UnknownMessageType {
849                    message_type,
850                    position: reader.position().saturating_sub(1),
851                })
852            }
853        }
854    }
855    if operation == TNS_AQ_ARRAY_ENQ {
856        if let Some(blob) = enq_msgid_blob {
857            let count = props_count as usize;
858            result.enq_msgids = (0..count)
859                .map(|j| {
860                    let start = j * 16;
861                    let end = start + 16;
862                    blob.get(start..end).map(<[u8]>::to_vec).unwrap_or_default()
863                })
864                .collect();
865        }
866    } else if no_msg_found {
867        result.deq_messages = Vec::new();
868    } else {
869        let keep = response_num_iters as usize;
870        messages.truncate(keep);
871        result.deq_messages = messages;
872    }
873    Ok(result)
874}
875
876// ---------------------------------------------------------------------------
877// Shared response decoders (reference aq_base.pyx).
878// ---------------------------------------------------------------------------
879
880fn process_msg_props(
881    reader: &mut TtcReader<'_>,
882    message: &mut AqDeqMessage,
883    ttc_field_version: u8,
884) -> Result<()> {
885    message.priority = reader.read_sb4()?;
886    message.delay = reader.read_sb4()?;
887    message.expiration = reader.read_sb4()?;
888    message.correlation = reader.read_string_with_length()?;
889    message.num_attempts = reader.read_sb4()?;
890    message.exception_queue = reader.read_string_with_length()?;
891    message.state = reader.read_sb4()?;
892    message.enq_time = process_date(reader)?;
893    let _enq_txn_id = reader.read_bytes_with_length()?;
894    process_extensions(reader)?;
895    let user_props = reader.read_ub4()?;
896    if user_props > 0 {
897        return Err(ProtocolError::UnsupportedFeature("AQ user properties"));
898    }
899    let _csn = reader.read_ub4()?;
900    let _dsn = reader.read_ub4()?;
901    let flags = reader.read_ub4()?;
902    message.delivery_mode = if flags == TNS_KPD_AQ_BUFMSG {
903        TNS_AQ_MSG_BUFFERED
904    } else {
905        TNS_AQ_MSG_PERSISTENT
906    };
907    if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
908        let _shard = reader.read_ub4()?;
909    }
910    Ok(())
911}
912
913/// Reads an Oracle date the way the reference `_process_date` does: a ub4
914/// presence flag, then `read_raw_bytes_and_length` (a single u8 length byte
915/// followed by that many raw date bytes).
916fn process_date(reader: &mut TtcReader<'_>) -> Result<Option<QueryValue>> {
917    let num_bytes = reader.read_ub4()?;
918    if num_bytes == 0 {
919        return Ok(None);
920    }
921    let len = usize::from(reader.read_u8()?);
922    if len == 0 {
923        return Ok(None);
924    }
925    let bytes = reader.read_raw(len)?;
926    Ok(Some(decode_datetime_value(bytes)?))
927}
928
929fn process_extensions(reader: &mut TtcReader<'_>) -> Result<()> {
930    let num_extensions = reader.read_ub4()?;
931    if num_extensions > 0 {
932        reader.read_u8()?; // skip_ub1
933        for _ in 0..num_extensions {
934            let _text = reader.read_bytes_with_length()?;
935            let _binary = reader.read_bytes_with_length()?;
936            let _keyword = reader.read_ub2()?;
937        }
938    }
939    Ok(())
940}
941
942fn process_recipients(reader: &mut TtcReader<'_>) -> Result<()> {
943    let count = reader.read_ub4()?;
944    if count > 0 {
945        return Err(ProtocolError::UnsupportedFeature(
946            "AQ recipients on dequeue",
947        ));
948    }
949    Ok(())
950}
951
952fn process_msg_id(reader: &mut TtcReader<'_>) -> Result<Vec<u8>> {
953    Ok(reader.read_raw(TNS_AQ_MESSAGE_ID_LENGTH)?.to_vec())
954}
955
956/// Decodes the payload of a dequeued message (reference `_process_payload`).
957fn process_payload(
958    reader: &mut TtcReader<'_>,
959    kind: &AqPayloadKind,
960) -> Result<Option<AqDeqPayload>> {
961    if matches!(kind, AqPayloadKind::Object) {
962        // Object branch (reference `read_dbobject`): TOID/OID/snapshot
963        // (length-prefixed each), version ub2, image-len ub4, flags ub2, then
964        // the packed image as a bare length-prefixed chunk (`read_bytes`).
965        let _toid = reader.read_bytes_with_length()?;
966        let _oid = reader.read_bytes_with_length()?;
967        let _snapshot = reader.read_bytes_with_length()?;
968        let _version = reader.read_ub2()?;
969        let image_length = reader.read_ub4()?;
970        let _flags = reader.read_ub2()?;
971        if image_length == 0 {
972            return Ok(None);
973        }
974        let image = reader
975            .read_bytes()?
976            .ok_or(ProtocolError::TtcDecode("AQ object payload missing"))?;
977        return Ok(Some(AqDeqPayload::Object(image)));
978    }
979    // RAW / JSON branch.
980    let _toid = reader.read_bytes_with_length()?;
981    let _oid = reader.read_bytes_with_length()?;
982    let _snapshot = reader.read_bytes_with_length()?;
983    let _version = reader.read_ub2()?;
984    let image_length = reader.read_ub4()? as usize;
985    let _flags = reader.read_ub2()?;
986    if image_length > 0 {
987        // reference: payload = read_bytes()[4:image_length]
988        let raw = reader
989            .read_bytes()?
990            .ok_or(ProtocolError::TtcDecode("AQ payload missing"))?;
991        let end = image_length.min(raw.len());
992        let start = 4.min(end);
993        let payload = raw.get(start..end).unwrap_or_default().to_vec();
994        if matches!(kind, AqPayloadKind::Json) {
995            let value = decode_oson(&payload)?;
996            return Ok(Some(AqDeqPayload::Json(value)));
997        }
998        return Ok(Some(AqDeqPayload::Raw(payload)));
999    }
1000    if matches!(kind, AqPayloadKind::Raw) {
1001        return Ok(Some(AqDeqPayload::Raw(Vec::new())));
1002    }
1003    Ok(None)
1004}
1005
1006#[cfg(test)]
1007mod tests {
1008    use super::*;
1009
1010    // Field version negotiated against the 23ai container (lane-1523). >= EXT_1
1011    // (18) so token_num is emitted; >= 21_1 (16) so shard id is emitted.
1012    const FV: u8 = 24;
1013
1014    fn caps() -> ClientCapabilities {
1015        ClientCapabilities {
1016            ttc_field_version: FV,
1017            max_string_size: 32767,
1018            charset_id: 873,
1019        }
1020    }
1021
1022    // Golden RAW enqueue request (FUNC 121), captured from python-oracledb 4.0.1
1023    // against lane-1523: msgproperties(payload=b"sample raw data 1",
1024    // correlation="CORR1", priority=2); enqone. TTC payload (offset 10 past the
1025    // packet header), seq_num=4. See tests/golden/aq_raw.txt.
1026    const GOLDEN_RAW_ENQ: &[u8] = &[
1027        0x03, 0x79, 0x04, 0x00, 0x01, 0x01, 0x0e, 0x01, 0x02, 0x00, 0x81, 0x01, 0x01, 0x05, 0x05,
1028        0x43, 0x4f, 0x52, 0x52, 0x31, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x04, 0x0e, 0x00, 0x00,
1029        0x01, 0x40, 0x00, 0x00, 0x01, 0x41, 0x00, 0x01, 0x01, 0x01, 0x00, 0x01, 0x42, 0x00, 0x00,
1030        0x01, 0x45, 0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff, 0xff, 0x01, 0x00, 0x01, 0x02,
1031        0x00, 0x00, 0x00, 0x01, 0x01, 0x10, 0x01, 0x01, 0x00, 0x01, 0x01, 0x11, 0x01, 0x01, 0x10,
1032        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1033        0x00, 0x00, 0x00, 0x00, 0x0e, 0x54, 0x45, 0x53, 0x54, 0x5f, 0x52, 0x41, 0x57, 0x5f, 0x51,
1034        0x55, 0x45, 0x55, 0x45, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1035        0x00, 0x00, 0x00, 0x00, 0x17, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x20, 0x72, 0x61, 0x77,
1036        0x20, 0x64, 0x61, 0x74, 0x61, 0x20, 0x31,
1037    ];
1038
1039    // Golden RAW dequeue request (FUNC 122): deqoptions wait=NO_WAIT,
1040    // navigation=DEQ_FIRST_MSG; deqone. seq_num=6.
1041    const GOLDEN_RAW_DEQ: &[u8] = &[
1042        0x03, 0x7a, 0x06, 0x00, 0x01, 0x01, 0x0e, 0x01, 0x01, 0x01, 0x01, 0x00, 0x00, 0x01, 0x03,
1043        0x01, 0x01, 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x10, 0x01, 0x01, 0x01,
1044        0x01, 0x01, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff, 0xff, 0x0e,
1045        0x54, 0x45, 0x53, 0x54, 0x5f, 0x52, 0x41, 0x57, 0x5f, 0x51, 0x55, 0x45, 0x55, 0x45, 0x00,
1046        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x17,
1047    ];
1048
1049    #[test]
1050    fn raw_enqueue_request_matches_golden() {
1051        let queue = AqQueueDesc::new("TEST_RAW_QUEUE".to_string(), AqPayloadKind::Raw, None);
1052        let props = AqMsgProps {
1053            priority: 2,
1054            correlation: Some("CORR1".to_string()),
1055            payload: Some(AqPayloadValue::Raw(b"sample raw data 1".to_vec())),
1056            ..AqMsgProps::default()
1057        };
1058        let bytes = build_aq_enq_payload(&queue, &props, &AqEnqOptions::default(), 4, FV, false)
1059            .expect("build enqueue");
1060        assert_eq!(bytes, GOLDEN_RAW_ENQ);
1061    }
1062
1063    #[test]
1064    fn raw_dequeue_request_matches_golden() {
1065        let queue = AqQueueDesc::new("TEST_RAW_QUEUE".to_string(), AqPayloadKind::Raw, None);
1066        let deq = AqDeqOptions {
1067            wait: 0,
1068            navigation: 1,
1069            ..AqDeqOptions::default()
1070        };
1071        let bytes = build_aq_deq_payload(&queue, &deq, 6, FV).expect("build dequeue");
1072        assert_eq!(bytes, GOLDEN_RAW_DEQ);
1073    }
1074
1075    #[test]
1076    fn empty_queue_dequeue_yields_no_message() {
1077        // ORA-25228 is cleared and surfaces as no message. We can't synthesize a
1078        // full error packet trivially here, so just confirm an empty response
1079        // (status-only) yields None without error.
1080        let caps = caps();
1081        let res = parse_aq_deq_response(&[], caps, &AqPayloadKind::Raw).expect("parse");
1082        assert!(res.message.is_none());
1083    }
1084
1085    // Golden JSON enqueue (FUNC 121): msgproperties(payload=dict(name="John",
1086    // age=30, city="NY")); enqone against TEST_JSON_QUEUE, seq_num=4. The OSON
1087    // image (after `ff 4a 5a 01`) also exercises encode_oson byte-parity.
1088    const GOLDEN_JSON_ENQ: &[u8] = &[
1089        0x03, 0x79, 0x04, 0x00, 0x01, 0x01, 0x0f, 0x00, 0x00, 0x81, 0x01, 0x00, 0x00, 0x00, 0x00,
1090        0x00, 0x00, 0x01, 0x04, 0x0e, 0x00, 0x00, 0x01, 0x40, 0x00, 0x00, 0x01, 0x41, 0x00, 0x01,
1091        0x01, 0x01, 0x00, 0x01, 0x42, 0x00, 0x00, 0x01, 0x45, 0x00, 0x00, 0x00, 0x00, 0x04, 0xff,
1092        0xff, 0xff, 0xff, 0x01, 0x00, 0x01, 0x02, 0x00, 0x00, 0x00, 0x01, 0x01, 0x10, 0x01, 0x01,
1093        0x00, 0x00, 0x00, 0x01, 0x01, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1094        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x0f, 0x54, 0x45, 0x53, 0x54,
1095        0x5f, 0x4a, 0x53, 0x4f, 0x4e, 0x5f, 0x51, 0x55, 0x45, 0x55, 0x45, 0x00, 0x00, 0x00, 0x00,
1096        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x47, 0x01, 0x28, 0x00,
1097        0x26, 0x00, 0x04, 0x61, 0x08, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1098        0x00, 0x43, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1099        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x43, 0xff, 0x4a, 0x5a, 0x01, 0x21,
1100        0x02, 0x03, 0x00, 0x0e, 0x00, 0x1f, 0x00, 0x00, 0x42, 0x9c, 0xe6, 0x00, 0x09, 0x00, 0x05,
1101        0x00, 0x00, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x03, 0x61, 0x67, 0x65, 0x04, 0x63, 0x69, 0x74,
1102        0x79, 0xa4, 0x03, 0x03, 0x02, 0x01, 0x00, 0x00, 0x00, 0x11, 0x00, 0x00, 0x00, 0x17, 0x00,
1103        0x00, 0x00, 0x1b, 0x33, 0x04, 0x4a, 0x6f, 0x68, 0x6e, 0x34, 0x02, 0xc1, 0x1f, 0x33, 0x02,
1104        0x4e, 0x59,
1105    ];
1106
1107    #[test]
1108    fn json_enqueue_request_matches_golden() {
1109        let queue = AqQueueDesc::new("TEST_JSON_QUEUE".to_string(), AqPayloadKind::Json, None);
1110        // Insertion-ordered object {name, age, city}.
1111        let value = OsonValue::Object(vec![
1112            ("name".to_string(), OsonValue::String("John".to_string())),
1113            ("age".to_string(), OsonValue::Number("30".to_string())),
1114            ("city".to_string(), OsonValue::String("NY".to_string())),
1115        ]);
1116        let props = AqMsgProps {
1117            payload: Some(AqPayloadValue::Json(value)),
1118            ..AqMsgProps::default()
1119        };
1120        // Container negotiates >= 23ai so supports_oson_long_fnames = true.
1121        let bytes = build_aq_enq_payload(&queue, &props, &AqEnqOptions::default(), 4, FV, true)
1122            .expect("build json enqueue");
1123        assert_eq!(bytes, GOLDEN_JSON_ENQ);
1124    }
1125}