1#![forbid(unsafe_code)]
2
3use super::*;
17use crate::oson::{decode_oson_with_limits, encode_oson, OsonValue};
18use crate::wire::ProtocolLimits;
19
20#[derive(Clone, Debug, PartialEq, Eq)]
23pub enum AqPayloadKind {
24 Raw,
26 Json,
28 Object,
30}
31
32#[derive(Clone, Debug)]
34pub struct AqQueueDesc {
35 pub name: String,
36 pub kind: AqPayloadKind,
37 pub payload_toid: Vec<u8>,
40}
41
42impl AqQueueDesc {
43 pub fn new(name: String, kind: AqPayloadKind, object_oid: Option<Vec<u8>>) -> Self {
46 let payload_toid = match kind {
47 AqPayloadKind::Raw => raw_payload_toid(),
48 AqPayloadKind::Json => json_payload_toid(),
49 AqPayloadKind::Object => object_oid.unwrap_or_default(),
50 };
51 Self {
52 name,
53 kind,
54 payload_toid,
55 }
56 }
57}
58
59fn raw_payload_toid() -> Vec<u8> {
60 let mut toid = vec![0u8; 15];
61 toid.push(0x17);
62 toid
63}
64
65fn json_payload_toid() -> Vec<u8> {
66 let mut toid = vec![0u8; 15];
67 toid.push(0x47);
68 toid
69}
70
71#[derive(Clone, Debug)]
73pub enum AqPayloadValue {
74 Raw(Vec<u8>),
76 Json(OsonValue),
78 Object { oid: Vec<u8>, image: Vec<u8> },
80}
81
82#[derive(Clone, Debug)]
85pub struct AqMsgProps {
86 pub priority: i32,
87 pub delay: i32,
88 pub expiration: i32,
89 pub correlation: Option<String>,
90 pub exception_queue: Option<String>,
91 pub state: i32,
92 pub enq_txn_id: Option<Vec<u8>>,
93 pub recipients: Option<Vec<String>>,
95 pub payload: Option<AqPayloadValue>,
98}
99
100impl Default for AqMsgProps {
101 fn default() -> Self {
102 Self {
103 priority: 0,
104 delay: 0,
105 expiration: -1,
106 correlation: None,
107 exception_queue: None,
108 state: 0,
109 enq_txn_id: None,
110 recipients: Some(Vec::new()),
114 payload: None,
115 }
116 }
117}
118
119#[derive(Clone, Debug)]
122pub struct AqEnqOptions {
123 pub visibility: u32,
124 pub delivery_mode: u16,
125}
126
127impl Default for AqEnqOptions {
128 fn default() -> Self {
129 Self {
130 visibility: 2,
131 delivery_mode: TNS_AQ_MSG_PERSISTENT,
132 }
133 }
134}
135
136#[derive(Clone, Debug)]
140pub struct AqDeqOptions {
141 pub condition: Option<String>,
142 pub consumer_name: Option<String>,
143 pub correlation: Option<String>,
144 pub delivery_mode: u16,
145 pub mode: i32,
146 pub msgid: Option<Vec<u8>>,
147 pub navigation: i32,
148 pub visibility: i32,
149 pub wait: u32,
150}
151
152impl Default for AqDeqOptions {
153 fn default() -> Self {
154 Self {
155 condition: None,
156 consumer_name: None,
157 correlation: None,
158 delivery_mode: TNS_AQ_MSG_PERSISTENT,
159 mode: 3,
160 msgid: None,
161 navigation: 3,
162 visibility: 2,
163 wait: 0xFFFF_FFFF,
164 }
165 }
166}
167
168#[derive(Clone, Debug, Default)]
171pub struct AqDeqMessage {
172 pub priority: i32,
173 pub delay: i32,
174 pub expiration: i32,
175 pub correlation: Option<String>,
176 pub num_attempts: i32,
177 pub exception_queue: Option<String>,
178 pub state: i32,
179 pub enq_time: Option<QueryValue>,
181 pub delivery_mode: u16,
182 pub msgid: Option<Vec<u8>>,
183 pub payload: Option<AqDeqPayload>,
185}
186
187#[derive(Clone, Debug)]
189pub enum AqDeqPayload {
190 Raw(Vec<u8>),
192 Json(OsonValue),
194 Object(Vec<u8>),
197}
198
199fn write_aq_function_code(
207 writer: &mut TtcWriter,
208 function_code: u8,
209 seq_num: u8,
210 ttc_field_version: u8,
211) {
212 writer.write_function_code_with_seq(function_code, seq_num);
213 if ttc_field_version >= TNS_CCAP_FIELD_VERSION_23_1_EXT_1 {
214 writer.write_ub8(0); }
216}
217
218fn write_value_with_length(writer: &mut TtcWriter, value: Option<&[u8]>) -> Result<()> {
219 match value {
220 None => {
221 writer.write_ub4(0);
222 Ok(())
223 }
224 Some(bytes) => writer.write_bytes_with_two_lengths(Some(bytes)),
225 }
226}
227
228fn write_msg_props(
230 writer: &mut TtcWriter,
231 props: &AqMsgProps,
232 ttc_field_version: u8,
233) -> Result<()> {
234 writer.write_ub4(props.priority as u32);
235 writer.write_ub4(props.delay as u32);
236 writer.write_sb4(props.expiration);
237 write_value_with_length(writer, props.correlation.as_deref().map(str::as_bytes))?;
238 writer.write_ub4(0); write_value_with_length(writer, props.exception_queue.as_deref().map(str::as_bytes))?;
240 writer.write_ub4(props.state as u32);
241 writer.write_ub4(0); write_value_with_length(writer, props.enq_txn_id.as_deref())?;
243 writer.write_ub4(4); writer.write_u8(0x0e); writer.write_keyword_value_pair(None, None, TNS_AQ_EXT_KEYWORD_AGENT_NAME)?;
246 writer.write_keyword_value_pair(None, None, TNS_AQ_EXT_KEYWORD_AGENT_ADDRESS)?;
247 writer.write_keyword_value_pair(None, Some(b"\x00"), TNS_AQ_EXT_KEYWORD_AGENT_PROTOCOL)?;
248 writer.write_keyword_value_pair(None, None, TNS_AQ_EXT_KEYWORD_ORIGINAL_MSGID)?;
249 writer.write_ub4(0); writer.write_ub4(0); writer.write_ub4(0); writer.write_ub4(0); if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
254 writer.write_ub4(0xFFFF_FFFF); }
256 Ok(())
257}
258
259fn write_recipients(writer: &mut TtcWriter, recipients: &[String]) -> Result<()> {
261 let mut index: u16 = 0;
262 for recipient in recipients {
263 writer.write_keyword_value_pair(Some(recipient.as_bytes()), None, index)?;
264 writer.write_keyword_value_pair(None, None, index + 1)?;
265 writer.write_keyword_value_pair(None, Some(b"\x00"), index + 2)?;
266 index += 3;
267 }
268 Ok(())
269}
270
271fn write_payload(
273 writer: &mut TtcWriter,
274 payload: &AqPayloadValue,
275 supports_oson_long_fnames: bool,
276) -> Result<()> {
277 match payload {
278 AqPayloadValue::Json(value) => {
279 let image = encode_oson(value, supports_oson_long_fnames)?;
282 crate::vector::write_oson_aq_payload(writer, &image)
283 }
284 AqPayloadValue::Object { oid, image } => write_dbobject_bind(writer, oid, image),
285 AqPayloadValue::Raw(bytes) => {
286 writer.write_raw(bytes);
287 Ok(())
288 }
289 }
290}
291
292pub fn build_aq_enq_payload(
299 queue: &AqQueueDesc,
300 props: &AqMsgProps,
301 enq_options: &AqEnqOptions,
302 seq_num: u8,
303 ttc_field_version: u8,
304 supports_oson_long_fnames: bool,
305) -> Result<Vec<u8>> {
306 let payload = props
307 .payload
308 .as_ref()
309 .ok_or(ProtocolError::TtcDecode("AQ enqueue has no payload"))?;
310 let queue_name = queue.name.as_bytes();
311 let mut writer = TtcWriter::new();
312 write_aq_function_code(&mut writer, TNS_FUNC_AQ_ENQ, seq_num, ttc_field_version);
313 writer.write_u8(1); writer.write_ub4(queue_name.len() as u32);
315 write_msg_props(&mut writer, props, ttc_field_version)?;
316 match props.recipients.as_ref() {
317 None => {
318 writer.write_u8(0); writer.write_ub4(0); }
321 Some(recipients) => {
322 writer.write_u8(1);
323 writer.write_ub4(3 * recipients.len() as u32);
324 }
325 }
326 writer.write_ub4(enq_options.visibility);
327 writer.write_u8(0); writer.write_ub4(0); writer.write_ub4(0); writer.write_u8(1); writer.write_ub4(16); writer.write_ub2(TNS_AQ_MESSAGE_VERSION);
333 match queue.kind {
334 AqPayloadKind::Json => {
335 writer.write_u8(0); writer.write_u8(0); writer.write_ub4(0); }
339 AqPayloadKind::Object => {
340 writer.write_u8(1); writer.write_u8(0); writer.write_ub4(0); }
344 AqPayloadKind::Raw => {
345 let raw_len = match payload {
346 AqPayloadValue::Raw(bytes) => bytes.len() as u32,
347 _ => return Err(ProtocolError::TtcDecode("RAW queue requires RAW payload")),
348 };
349 writer.write_u8(0); writer.write_u8(1); writer.write_ub4(raw_len);
352 }
353 }
354 writer.write_u8(1); writer.write_ub4(TNS_AQ_MESSAGE_ID_LENGTH as u32);
356 let mut enq_flags = 0u32;
357 if enq_options.delivery_mode == TNS_AQ_MSG_BUFFERED {
358 enq_flags |= TNS_KPD_AQ_BUFMSG;
359 }
360 writer.write_ub4(enq_flags); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_u8(0); if ttc_field_version >= TNS_CCAP_FIELD_VERSION_20_1 {
379 writer.write_u8(u8::from(queue.kind == AqPayloadKind::Json));
381 }
382
383 writer.write_bytes_with_length(queue_name)?;
384 if let Some(recipients) = props.recipients.as_ref() {
385 write_recipients(&mut writer, recipients)?;
386 }
387 writer.write_raw(&queue.payload_toid);
388 write_payload(&mut writer, payload, supports_oson_long_fnames)?;
389 Ok(writer.into_bytes())
390}
391
392pub fn parse_aq_enq_response(
395 payload: &[u8],
396 capabilities: ClientCapabilities,
397) -> Result<Option<Vec<u8>>> {
398 parse_aq_enq_response_with_limits(payload, capabilities, ProtocolLimits::DEFAULT)
399}
400
401pub fn parse_aq_enq_response_with_limits(
402 payload: &[u8],
403 capabilities: ClientCapabilities,
404 limits: ProtocolLimits,
405) -> Result<Option<Vec<u8>>> {
406 let mut reader = TtcReader::with_limits(payload, limits)?;
407 let mut msgid: Option<Vec<u8>> = None;
408 while reader.remaining() > 0 {
409 let message_type = reader.read_u8()?;
410 match message_type {
411 0 => {}
412 TNS_MSG_TYPE_PARAMETER => {
413 let id = reader.read_raw(TNS_AQ_MESSAGE_ID_LENGTH)?.to_vec();
414 let _ext_len = reader.read_ub2()?;
415 msgid = Some(id);
416 }
417 TNS_MSG_TYPE_STATUS => {
418 let _call_status = reader.read_ub4()?;
419 let _seq = reader.read_ub2()?;
420 }
421 TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
422 let _ = skip_server_side_piggyback(&mut reader)?;
423 }
424 TNS_MSG_TYPE_END_OF_RESPONSE => break,
425 TNS_MSG_TYPE_ERROR => {
426 let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
427 if info.number != 0 {
428 return Err(ProtocolError::ServerErrorInfo(Box::new(
429 info.into_details(),
430 )));
431 }
432 }
433 _ => {
434 return Err(ProtocolError::UnknownMessageType {
435 message_type,
436 position: reader.position().saturating_sub(1),
437 })
438 }
439 }
440 }
441 Ok(msgid)
442}
443
444pub fn build_aq_deq_payload(
451 queue: &AqQueueDesc,
452 deq_options: &AqDeqOptions,
453 seq_num: u8,
454 ttc_field_version: u8,
455) -> Result<Vec<u8>> {
456 let queue_name = queue.name.as_bytes();
457 let mut writer = TtcWriter::new();
458 write_aq_function_code(&mut writer, TNS_FUNC_AQ_DEQ, seq_num, ttc_field_version);
459 writer.write_u8(1); writer.write_ub4(queue_name.len() as u32);
461 writer.write_u8(1); writer.write_u8(1); writer.write_u8(1); writer.write_u8(1); let consumer_name = deq_options
466 .consumer_name
467 .as_ref()
468 .filter(|name| !name.is_empty());
469 match consumer_name {
470 Some(name) => {
471 writer.write_u8(1);
472 writer.write_ub4(name.len() as u32);
473 }
474 None => {
475 writer.write_u8(0);
476 writer.write_ub4(0);
477 }
478 }
479 writer.write_sb4(deq_options.mode);
480 writer.write_sb4(deq_options.navigation);
481 writer.write_sb4(deq_options.visibility);
482 writer.write_sb4(deq_options.wait as i32);
483 let msgid = deq_options.msgid.as_ref().filter(|id| !id.is_empty());
484 match msgid {
485 Some(_) => {
486 writer.write_u8(1);
487 writer.write_ub4(TNS_AQ_MESSAGE_ID_LENGTH as u32);
488 }
489 None => {
490 writer.write_u8(0);
491 writer.write_ub4(0);
492 }
493 }
494 let correlation = deq_options.correlation.as_ref().filter(|c| !c.is_empty());
495 match correlation {
496 Some(c) => {
497 writer.write_u8(1);
498 writer.write_ub4(c.len() as u32);
499 }
500 None => {
501 writer.write_u8(0);
502 writer.write_ub4(0);
503 }
504 }
505 writer.write_u8(1); writer.write_ub4(16); writer.write_ub2(TNS_AQ_MESSAGE_VERSION);
508 writer.write_u8(1); writer.write_u8(1); writer.write_ub4(TNS_AQ_MESSAGE_ID_LENGTH as u32);
511 let mut deq_flags = 0u32;
512 match deq_options.delivery_mode {
513 TNS_AQ_MSG_BUFFERED => deq_flags |= TNS_KPD_AQ_BUFMSG,
514 TNS_AQ_MSG_PERSISTENT_OR_BUFFERED => deq_flags |= TNS_KPD_AQ_EITHER,
515 _ => {}
516 }
517 writer.write_ub4(deq_flags);
518 let condition = deq_options.condition.as_ref().filter(|c| !c.is_empty());
519 match condition {
520 Some(c) => {
521 writer.write_u8(1);
522 writer.write_ub4(c.len() as u32);
523 }
524 None => {
525 writer.write_u8(0);
526 writer.write_ub4(0);
527 }
528 }
529 writer.write_u8(0); writer.write_ub4(0); if ttc_field_version >= TNS_CCAP_FIELD_VERSION_20_1 {
532 writer.write_u8(0); }
534 if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
535 writer.write_ub4(0xFFFF_FFFF); }
537
538 writer.write_bytes_with_length(queue_name)?;
539 if let Some(name) = consumer_name {
540 writer.write_bytes_with_length(name.as_bytes())?;
541 }
542 if let Some(id) = msgid {
543 let mut id = id.clone();
544 id.truncate(16);
545 if id.len() < 16 {
546 id.resize(16, 0);
547 }
548 writer.write_raw(&id);
549 }
550 if let Some(c) = correlation {
551 writer.write_bytes_with_length(c.as_bytes())?;
552 }
553 writer.write_raw(&queue.payload_toid);
554 if let Some(c) = condition {
555 writer.write_bytes_with_length(c.as_bytes())?;
556 }
557 Ok(writer.into_bytes())
558}
559
560#[derive(Clone, Debug, Default)]
562pub struct AqDeqResult {
563 pub message: Option<AqDeqMessage>,
565}
566
567pub fn parse_aq_deq_response(
570 payload: &[u8],
571 capabilities: ClientCapabilities,
572 kind: &AqPayloadKind,
573) -> Result<AqDeqResult> {
574 parse_aq_deq_response_with_limits(payload, capabilities, kind, ProtocolLimits::DEFAULT)
575}
576
577pub fn parse_aq_deq_response_with_limits(
578 payload: &[u8],
579 capabilities: ClientCapabilities,
580 kind: &AqPayloadKind,
581 limits: ProtocolLimits,
582) -> Result<AqDeqResult> {
583 let mut reader = TtcReader::with_limits(payload, limits)?;
584 let mut result = AqDeqResult::default();
585 let mut no_msg_found = false;
586 while reader.remaining() > 0 {
587 let message_type = reader.read_u8()?;
588 match message_type {
589 0 => {}
590 TNS_MSG_TYPE_PARAMETER => {
591 let num_bytes = reader.read_ub4()?;
592 if num_bytes > 0 {
593 let mut message = AqDeqMessage::default();
594 process_msg_props(&mut reader, &mut message, capabilities.ttc_field_version)?;
595 process_recipients(&mut reader)?;
596 message.payload = process_payload(&mut reader, kind)?;
597 message.msgid = Some(process_msg_id(&mut reader)?);
598 result.message = Some(message);
599 }
600 }
601 TNS_MSG_TYPE_STATUS => {
602 let _call_status = reader.read_ub4()?;
603 let _seq = reader.read_ub2()?;
604 }
605 TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
606 let _ = skip_server_side_piggyback(&mut reader)?;
607 }
608 TNS_MSG_TYPE_END_OF_RESPONSE => break,
609 TNS_MSG_TYPE_ERROR => {
610 let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
611 if info.number == TNS_ERR_NO_MESSAGES_FOUND as u32 {
612 no_msg_found = true;
613 } else if info.number != 0 {
614 return Err(ProtocolError::ServerErrorInfo(Box::new(
615 info.into_details(),
616 )));
617 }
618 }
619 _ => {
620 return Err(ProtocolError::UnknownMessageType {
621 message_type,
622 position: reader.position().saturating_sub(1),
623 })
624 }
625 }
626 }
627 if no_msg_found {
628 result.message = None;
629 }
630 Ok(result)
631}
632
633pub fn build_aq_array_enq_payload(
640 queue: &AqQueueDesc,
641 props_list: &[AqMsgProps],
642 enq_options: &AqEnqOptions,
643 seq_num: u8,
644 ttc_field_version: u8,
645 supports_oson_long_fnames: bool,
646) -> Result<Vec<u8>> {
647 let num_iters = props_list.len() as u32;
648 let queue_name = queue.name.as_bytes();
649 let mut writer = TtcWriter::new();
650 write_aq_function_code(&mut writer, TNS_FUNC_AQ_ARRAY, seq_num, ttc_field_version);
651 writer.write_u8(0); writer.write_ub4(0); writer.write_ub4(TNS_AQ_ARRAY_FLAGS_RETURN_MESSAGE_ID);
654 writer.write_u8(1); writer.write_u8(0); writer.write_sb4(TNS_AQ_ARRAY_ENQ);
657 writer.write_u8(1); if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
659 writer.write_ub4(0xFFFF); }
661 writer.write_ub4(num_iters);
662
663 let mut flags = 0u32;
664 if enq_options.delivery_mode == TNS_AQ_MSG_BUFFERED {
665 flags |= TNS_KPD_AQ_BUFMSG;
666 }
667 writer.write_ub4(0); writer.write_u8(TNS_MSG_TYPE_ROW_HEADER);
669 writer.write_bytes_with_two_lengths(Some(queue_name))?;
670 writer.write_raw(&queue.payload_toid);
671 writer.write_ub2(TNS_AQ_MESSAGE_VERSION);
672 writer.write_ub4(flags);
673 for props in props_list {
674 let payload = props
675 .payload
676 .as_ref()
677 .ok_or(ProtocolError::TtcDecode("AQ array enqueue has no payload"))?;
678 writer.write_u8(TNS_MSG_TYPE_ROW_DATA);
679 writer.write_ub4(flags); write_msg_props(&mut writer, props, ttc_field_version)?;
681 match props.recipients.as_ref() {
682 None => writer.write_ub4(0),
683 Some(recipients) => {
684 writer.write_ub4(3 * recipients.len() as u32);
685 write_recipients(&mut writer, recipients)?;
686 }
687 }
688 writer.write_sb4(enq_options.visibility as i32);
689 writer.write_ub4(0); writer.write_sb4(0); if matches!(queue.kind, AqPayloadKind::Raw) {
692 let raw_len = match payload {
693 AqPayloadValue::Raw(bytes) => bytes.len() as u32,
694 _ => return Err(ProtocolError::TtcDecode("RAW queue requires RAW payload")),
695 };
696 writer.write_ub4(raw_len);
697 }
698 write_payload(&mut writer, payload, supports_oson_long_fnames)?;
699 }
700 writer.write_u8(TNS_MSG_TYPE_STATUS);
701 Ok(writer.into_bytes())
702}
703
704pub fn build_aq_array_deq_payload(
707 queue: &AqQueueDesc,
708 deq_options: &AqDeqOptions,
709 num_iters: u32,
710 seq_num: u8,
711 ttc_field_version: u8,
712) -> Result<Vec<u8>> {
713 let queue_name = queue.name.as_bytes();
714 let mut writer = TtcWriter::new();
715 write_aq_function_code(&mut writer, TNS_FUNC_AQ_ARRAY, seq_num, ttc_field_version);
716 writer.write_u8(1); writer.write_ub4(num_iters);
718 writer.write_ub4(TNS_AQ_ARRAY_FLAGS_RETURN_MESSAGE_ID);
719 writer.write_u8(1); writer.write_u8(1); writer.write_sb4(TNS_AQ_ARRAY_DEQ);
722 writer.write_u8(0); if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
724 writer.write_ub4(0xFFFF); }
726
727 let mut flags = 0u32;
728 match deq_options.delivery_mode {
729 TNS_AQ_MSG_BUFFERED => flags |= TNS_KPD_AQ_BUFMSG,
730 TNS_AQ_MSG_PERSISTENT_OR_BUFFERED => flags |= TNS_KPD_AQ_EITHER,
731 _ => {}
732 }
733 let consumer_name = deq_options
734 .consumer_name
735 .as_ref()
736 .filter(|name| !name.is_empty())
737 .map(|name| name.as_bytes());
738 let correlation = deq_options
739 .correlation
740 .as_ref()
741 .filter(|c| !c.is_empty())
742 .map(|c| c.as_bytes());
743 let condition = deq_options
744 .condition
745 .as_ref()
746 .filter(|c| !c.is_empty())
747 .map(|c| c.as_bytes());
748 let props = AqMsgProps::default();
749 for _ in 0..num_iters {
750 writer.write_bytes_with_two_lengths(Some(queue_name))?;
751 write_msg_props(&mut writer, &props, ttc_field_version)?;
752 writer.write_ub4(0); write_value_with_length(&mut writer, consumer_name)?;
754 writer.write_sb4(deq_options.mode);
755 writer.write_sb4(deq_options.navigation);
756 writer.write_sb4(deq_options.visibility);
757 writer.write_sb4(deq_options.wait as i32);
758 write_value_with_length(&mut writer, deq_options.msgid.as_deref())?;
759 write_value_with_length(&mut writer, correlation)?;
760 write_value_with_length(&mut writer, condition)?;
761 writer.write_ub4(0); writer.write_ub4(0); writer.write_sb4(0); writer.write_bytes_with_two_lengths(Some(&queue.payload_toid))?;
765 writer.write_ub2(TNS_AQ_MESSAGE_VERSION);
766 writer.write_ub4(0); writer.write_ub4(0); writer.write_ub4(0);
769 writer.write_ub4(flags);
770 writer.write_ub4(0); writer.write_ub4(0); }
773 Ok(writer.into_bytes())
774}
775
776#[derive(Clone, Debug, Default)]
779pub struct AqArrayResult {
780 pub enq_msgids: Vec<Vec<u8>>,
782 pub deq_messages: Vec<AqDeqMessage>,
784}
785
786pub fn parse_aq_array_response(
792 payload: &[u8],
793 capabilities: ClientCapabilities,
794 operation: i32,
795 props_count: u32,
796 kind: &AqPayloadKind,
797) -> Result<AqArrayResult> {
798 parse_aq_array_response_with_limits(
799 payload,
800 capabilities,
801 operation,
802 props_count,
803 kind,
804 ProtocolLimits::DEFAULT,
805 )
806}
807
808pub fn parse_aq_array_response_with_limits(
809 payload: &[u8],
810 capabilities: ClientCapabilities,
811 operation: i32,
812 props_count: u32,
813 kind: &AqPayloadKind,
814 limits: ProtocolLimits,
815) -> Result<AqArrayResult> {
816 limits.check_batch_rows(props_count as usize)?;
817 let mut reader = TtcReader::with_limits(payload, limits)?;
818 let mut result = AqArrayResult::default();
819 let mut messages: Vec<AqDeqMessage> = Vec::new();
820 let mut enq_msgid_blob: Option<Vec<u8>> = None;
821 let mut response_num_iters: u32 = 0;
822 let mut no_msg_found = false;
823 while reader.remaining() > 0 {
824 let message_type = reader.read_u8()?;
825 match message_type {
826 0 => {}
827 TNS_MSG_TYPE_PARAMETER => {
828 let num_iters = reader.read_ub4()?;
829 reader.limits().check_batch_rows(num_iters as usize)?;
830 response_num_iters = num_iters;
831 for i in 0..num_iters {
832 let mut message = AqDeqMessage::default();
833 let props_len = reader.read_ub2()?;
834 if props_len > 0 {
835 reader.read_u8()?; process_msg_props(
837 &mut reader,
838 &mut message,
839 capabilities.ttc_field_version,
840 )?;
841 }
842 process_recipients(&mut reader)?;
843 let payload_len = reader.read_ub2()?;
844 if payload_len > 0 {
845 message.payload = process_payload(&mut reader, kind)?;
846 }
847 let msgid = reader.read_bytes_with_length()?.unwrap_or_default();
848 if operation == TNS_AQ_ARRAY_ENQ {
849 enq_msgid_blob = Some(msgid);
850 } else {
851 message.msgid = Some(msgid);
852 }
853 let ext_len = reader.read_ub2()?;
854 if ext_len > 0 {
855 return Err(ProtocolError::UnsupportedFeature("AQ array extensions"));
856 }
857 let _output_ack = reader.read_ub2()?;
858 if operation != TNS_AQ_ARRAY_ENQ {
859 let _ = i;
860 messages.push(message);
861 }
862 }
863 if operation == TNS_AQ_ARRAY_ENQ {
864 response_num_iters = reader.read_ub4()?;
865 reader
866 .limits()
867 .check_batch_rows(response_num_iters as usize)?;
868 }
869 }
870 TNS_MSG_TYPE_STATUS => {
871 let _call_status = reader.read_ub4()?;
872 let _seq = reader.read_ub2()?;
873 }
874 TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
875 let _ = skip_server_side_piggyback(&mut reader)?;
876 }
877 TNS_MSG_TYPE_END_OF_RESPONSE => break,
878 TNS_MSG_TYPE_ERROR => {
879 let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
880 if info.number == TNS_ERR_NO_MESSAGES_FOUND as u32 {
881 no_msg_found = true;
882 } else if info.number != 0 {
883 return Err(ProtocolError::ServerErrorInfo(Box::new(
884 info.into_details(),
885 )));
886 }
887 }
888 _ => {
889 return Err(ProtocolError::UnknownMessageType {
890 message_type,
891 position: reader.position().saturating_sub(1),
892 })
893 }
894 }
895 }
896 if operation == TNS_AQ_ARRAY_ENQ {
897 if let Some(blob) = enq_msgid_blob {
898 let count = props_count as usize;
899 result.enq_msgids = (0..count)
900 .map(|j| {
901 let start = j * 16;
902 let end = start + 16;
903 blob.get(start..end).map(<[u8]>::to_vec).unwrap_or_default()
904 })
905 .collect();
906 }
907 } else if no_msg_found {
908 result.deq_messages = Vec::new();
909 } else {
910 let keep = response_num_iters as usize;
911 messages.truncate(keep);
912 result.deq_messages = messages;
913 }
914 Ok(result)
915}
916
917fn process_msg_props(
922 reader: &mut TtcReader<'_>,
923 message: &mut AqDeqMessage,
924 ttc_field_version: u8,
925) -> Result<()> {
926 message.priority = reader.read_sb4()?;
927 message.delay = reader.read_sb4()?;
928 message.expiration = reader.read_sb4()?;
929 message.correlation = reader.read_string_with_length()?;
930 message.num_attempts = reader.read_sb4()?;
931 message.exception_queue = reader.read_string_with_length()?;
932 message.state = reader.read_sb4()?;
933 message.enq_time = process_date(reader)?;
934 let _enq_txn_id = reader.read_bytes_with_length()?;
935 process_extensions(reader)?;
936 let user_props = reader.read_ub4()?;
937 if user_props > 0 {
938 return Err(ProtocolError::UnsupportedFeature("AQ user properties"));
939 }
940 let _csn = reader.read_ub4()?;
941 let _dsn = reader.read_ub4()?;
942 let flags = reader.read_ub4()?;
943 message.delivery_mode = if flags == TNS_KPD_AQ_BUFMSG {
944 TNS_AQ_MSG_BUFFERED
945 } else {
946 TNS_AQ_MSG_PERSISTENT
947 };
948 if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
949 let _shard = reader.read_ub4()?;
950 }
951 Ok(())
952}
953
954fn process_date(reader: &mut TtcReader<'_>) -> Result<Option<QueryValue>> {
958 let num_bytes = reader.read_ub4()?;
959 if num_bytes == 0 {
960 return Ok(None);
961 }
962 let len = usize::from(reader.read_u8()?);
963 if len == 0 {
964 return Ok(None);
965 }
966 let bytes = reader.read_raw(len)?;
967 Ok(Some(decode_datetime_value(bytes)?))
968}
969
970fn process_extensions(reader: &mut TtcReader<'_>) -> Result<()> {
971 let num_extensions = reader.read_ub4()?;
972 if num_extensions > 0 {
973 reader.read_u8()?; for _ in 0..num_extensions {
975 let _text = reader.read_bytes_with_length()?;
976 let _binary = reader.read_bytes_with_length()?;
977 let _keyword = reader.read_ub2()?;
978 }
979 }
980 Ok(())
981}
982
983fn process_recipients(reader: &mut TtcReader<'_>) -> Result<()> {
984 let count = reader.read_ub4()?;
985 if count > 0 {
986 return Err(ProtocolError::UnsupportedFeature(
987 "AQ recipients on dequeue",
988 ));
989 }
990 Ok(())
991}
992
993fn process_msg_id(reader: &mut TtcReader<'_>) -> Result<Vec<u8>> {
994 Ok(reader.read_raw(TNS_AQ_MESSAGE_ID_LENGTH)?.to_vec())
995}
996
997fn process_payload(
999 reader: &mut TtcReader<'_>,
1000 kind: &AqPayloadKind,
1001) -> Result<Option<AqDeqPayload>> {
1002 if matches!(kind, AqPayloadKind::Object) {
1003 let _toid = reader.read_bytes_with_length()?;
1007 let _oid = reader.read_bytes_with_length()?;
1008 let _snapshot = reader.read_bytes_with_length()?;
1009 let _version = reader.read_ub2()?;
1010 let image_length = reader.read_ub4()?;
1011 reader
1012 .limits()
1013 .check_response_bytes(image_length as usize)?;
1014 let _flags = reader.read_ub2()?;
1015 if image_length == 0 {
1016 return Ok(None);
1017 }
1018 let image = reader
1019 .read_bytes()?
1020 .ok_or(ProtocolError::TtcDecode("AQ object payload missing"))?;
1021 return Ok(Some(AqDeqPayload::Object(image)));
1022 }
1023 let _toid = reader.read_bytes_with_length()?;
1025 let _oid = reader.read_bytes_with_length()?;
1026 let _snapshot = reader.read_bytes_with_length()?;
1027 let _version = reader.read_ub2()?;
1028 let image_length = reader.read_ub4()? as usize;
1029 reader.limits().check_response_bytes(image_length)?;
1030 let _flags = reader.read_ub2()?;
1031 if image_length > 0 {
1032 let raw = reader
1034 .read_bytes()?
1035 .ok_or(ProtocolError::TtcDecode("AQ payload missing"))?;
1036 if raw.len() < image_length {
1037 return Err(ProtocolError::TtcDecode("AQ payload shorter than declared"));
1038 }
1039 let end = image_length;
1040 let start = 4.min(end);
1041 let payload = raw.get(start..end).unwrap_or_default().to_vec();
1042 if matches!(kind, AqPayloadKind::Json) {
1043 let value = decode_oson_with_limits(&payload, reader.limits())?;
1044 return Ok(Some(AqDeqPayload::Json(value)));
1045 }
1046 return Ok(Some(AqDeqPayload::Raw(payload)));
1047 }
1048 if matches!(kind, AqPayloadKind::Raw) {
1049 return Ok(Some(AqDeqPayload::Raw(Vec::new())));
1050 }
1051 Ok(None)
1052}
1053
1054#[cfg(test)]
1055mod tests {
1056 use super::*;
1057
1058 const FV: u8 = 24;
1061
1062 fn caps() -> ClientCapabilities {
1063 ClientCapabilities {
1064 ttc_field_version: FV,
1065 max_string_size: 32767,
1066 charset_id: 873,
1067 }
1068 }
1069
1070 const GOLDEN_RAW_ENQ: &[u8] = &[
1075 0x03, 0x79, 0x04, 0x00, 0x01, 0x01, 0x0e, 0x01, 0x02, 0x00, 0x81, 0x01, 0x01, 0x05, 0x05,
1076 0x43, 0x4f, 0x52, 0x52, 0x31, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x04, 0x0e, 0x00, 0x00,
1077 0x01, 0x40, 0x00, 0x00, 0x01, 0x41, 0x00, 0x01, 0x01, 0x01, 0x00, 0x01, 0x42, 0x00, 0x00,
1078 0x01, 0x45, 0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff, 0xff, 0x01, 0x00, 0x01, 0x02,
1079 0x00, 0x00, 0x00, 0x01, 0x01, 0x10, 0x01, 0x01, 0x00, 0x01, 0x01, 0x11, 0x01, 0x01, 0x10,
1080 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1081 0x00, 0x00, 0x00, 0x00, 0x0e, 0x54, 0x45, 0x53, 0x54, 0x5f, 0x52, 0x41, 0x57, 0x5f, 0x51,
1082 0x55, 0x45, 0x55, 0x45, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1083 0x00, 0x00, 0x00, 0x00, 0x17, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x20, 0x72, 0x61, 0x77,
1084 0x20, 0x64, 0x61, 0x74, 0x61, 0x20, 0x31,
1085 ];
1086
1087 const GOLDEN_RAW_DEQ: &[u8] = &[
1090 0x03, 0x7a, 0x06, 0x00, 0x01, 0x01, 0x0e, 0x01, 0x01, 0x01, 0x01, 0x00, 0x00, 0x01, 0x03,
1091 0x01, 0x01, 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x10, 0x01, 0x01, 0x01,
1092 0x01, 0x01, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff, 0xff, 0x0e,
1093 0x54, 0x45, 0x53, 0x54, 0x5f, 0x52, 0x41, 0x57, 0x5f, 0x51, 0x55, 0x45, 0x55, 0x45, 0x00,
1094 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x17,
1095 ];
1096
1097 #[test]
1098 fn raw_enqueue_request_matches_golden() {
1099 let queue = AqQueueDesc::new("TEST_RAW_QUEUE".to_string(), AqPayloadKind::Raw, None);
1100 let props = AqMsgProps {
1101 priority: 2,
1102 correlation: Some("CORR1".to_string()),
1103 payload: Some(AqPayloadValue::Raw(b"sample raw data 1".to_vec())),
1104 ..AqMsgProps::default()
1105 };
1106 let bytes = build_aq_enq_payload(&queue, &props, &AqEnqOptions::default(), 4, FV, false)
1107 .expect("build enqueue");
1108 assert_eq!(bytes, GOLDEN_RAW_ENQ);
1109 }
1110
1111 #[test]
1112 fn raw_dequeue_request_matches_golden() {
1113 let queue = AqQueueDesc::new("TEST_RAW_QUEUE".to_string(), AqPayloadKind::Raw, None);
1114 let deq = AqDeqOptions {
1115 wait: 0,
1116 navigation: 1,
1117 ..AqDeqOptions::default()
1118 };
1119 let bytes = build_aq_deq_payload(&queue, &deq, 6, FV).expect("build dequeue");
1120 assert_eq!(bytes, GOLDEN_RAW_DEQ);
1121 }
1122
1123 #[test]
1124 fn empty_queue_dequeue_yields_no_message() {
1125 let caps = caps();
1129 let res = parse_aq_deq_response(&[], caps, &AqPayloadKind::Raw).expect("parse");
1130 assert!(res.message.is_none());
1131 }
1132
1133 fn deq_response_with_raw_image(image_length: u32, raw_image: &[u8]) -> Vec<u8> {
1134 let mut writer = TtcWriter::new();
1135 writer.write_u8(TNS_MSG_TYPE_PARAMETER);
1136 writer.write_ub4(1);
1137 write_msg_props(&mut writer, &AqMsgProps::default(), FV).expect("write message props");
1138 writer.write_ub4(0); writer.write_ub4(0); writer.write_ub4(0); writer.write_ub4(0); writer.write_ub2(0); writer.write_ub4(image_length);
1144 writer.write_ub2(0); writer
1146 .write_bytes_with_length(raw_image)
1147 .expect("write raw image field");
1148 writer.write_raw(&[0u8; TNS_AQ_MESSAGE_ID_LENGTH]);
1149 writer.write_u8(TNS_MSG_TYPE_END_OF_RESPONSE);
1150 writer.into_bytes()
1151 }
1152
1153 #[test]
1154 fn raw_dequeue_rejects_declared_image_length_shortfall() {
1155 let response = deq_response_with_raw_image(8, &[0, 0, 0, 0, b'A', b'B']);
1156 let err = parse_aq_deq_response(&response, caps(), &AqPayloadKind::Raw)
1157 .expect_err("short RAW image must fail");
1158 assert!(matches!(err, ProtocolError::TtcDecode(_)), "got {err:?}");
1159 }
1160
1161 #[test]
1162 fn json_dequeue_rejects_declared_image_length_shortfall() {
1163 let response = deq_response_with_raw_image(8, &[0, 0, 0, 0, b'A', b'B']);
1164 let err = parse_aq_deq_response(&response, caps(), &AqPayloadKind::Json)
1165 .expect_err("short JSON image must fail");
1166 assert!(matches!(err, ProtocolError::TtcDecode(_)), "got {err:?}");
1167 }
1168
1169 #[test]
1170 fn raw_dequeue_accepts_exact_declared_image_length() {
1171 let response = deq_response_with_raw_image(6, &[0, 0, 0, 0, b'A', b'B']);
1172 let res = parse_aq_deq_response(&response, caps(), &AqPayloadKind::Raw)
1173 .expect("exact RAW image should parse");
1174 let message = res.message.expect("message present");
1175 match message.payload {
1176 Some(AqDeqPayload::Raw(payload)) => assert_eq!(payload, vec![b'A', b'B']),
1177 other => panic!("unexpected payload {other:?}"),
1178 }
1179 }
1180
1181 #[test]
1182 fn aq_array_response_respects_protocol_batch_limit() {
1183 let limits = ProtocolLimits {
1184 max_batch_rows: 1,
1185 ..ProtocolLimits::DEFAULT
1186 };
1187 let err = parse_aq_array_response_with_limits(
1188 &[],
1189 caps(),
1190 TNS_AQ_ARRAY_ENQ,
1191 2,
1192 &AqPayloadKind::Raw,
1193 limits,
1194 )
1195 .expect_err("client-side AQ batch count above policy must fail");
1196 assert!(
1197 matches!(
1198 err,
1199 ProtocolError::ResourceLimit {
1200 limit: "batch_rows",
1201 observed: 2,
1202 maximum: 1,
1203 }
1204 ),
1205 "got {err:?}"
1206 );
1207 }
1208
1209 const GOLDEN_JSON_ENQ: &[u8] = &[
1213 0x03, 0x79, 0x04, 0x00, 0x01, 0x01, 0x0f, 0x00, 0x00, 0x81, 0x01, 0x00, 0x00, 0x00, 0x00,
1214 0x00, 0x00, 0x01, 0x04, 0x0e, 0x00, 0x00, 0x01, 0x40, 0x00, 0x00, 0x01, 0x41, 0x00, 0x01,
1215 0x01, 0x01, 0x00, 0x01, 0x42, 0x00, 0x00, 0x01, 0x45, 0x00, 0x00, 0x00, 0x00, 0x04, 0xff,
1216 0xff, 0xff, 0xff, 0x01, 0x00, 0x01, 0x02, 0x00, 0x00, 0x00, 0x01, 0x01, 0x10, 0x01, 0x01,
1217 0x00, 0x00, 0x00, 0x01, 0x01, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1218 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x0f, 0x54, 0x45, 0x53, 0x54,
1219 0x5f, 0x4a, 0x53, 0x4f, 0x4e, 0x5f, 0x51, 0x55, 0x45, 0x55, 0x45, 0x00, 0x00, 0x00, 0x00,
1220 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x47, 0x01, 0x28, 0x00,
1221 0x26, 0x00, 0x04, 0x61, 0x08, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1222 0x00, 0x43, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1223 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x43, 0xff, 0x4a, 0x5a, 0x01, 0x21,
1224 0x02, 0x03, 0x00, 0x0e, 0x00, 0x1f, 0x00, 0x00, 0x42, 0x9c, 0xe6, 0x00, 0x09, 0x00, 0x05,
1225 0x00, 0x00, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x03, 0x61, 0x67, 0x65, 0x04, 0x63, 0x69, 0x74,
1226 0x79, 0xa4, 0x03, 0x03, 0x02, 0x01, 0x00, 0x00, 0x00, 0x11, 0x00, 0x00, 0x00, 0x17, 0x00,
1227 0x00, 0x00, 0x1b, 0x33, 0x04, 0x4a, 0x6f, 0x68, 0x6e, 0x34, 0x02, 0xc1, 0x1f, 0x33, 0x02,
1228 0x4e, 0x59,
1229 ];
1230
1231 #[test]
1232 fn json_enqueue_request_matches_golden() {
1233 let queue = AqQueueDesc::new("TEST_JSON_QUEUE".to_string(), AqPayloadKind::Json, None);
1234 let value = OsonValue::Object(vec![
1236 ("name".to_string(), OsonValue::String("John".to_string())),
1237 ("age".to_string(), OsonValue::Number("30".to_string())),
1238 ("city".to_string(), OsonValue::String("NY".to_string())),
1239 ]);
1240 let props = AqMsgProps {
1241 payload: Some(AqPayloadValue::Json(value)),
1242 ..AqMsgProps::default()
1243 };
1244 let bytes = build_aq_enq_payload(&queue, &props, &AqEnqOptions::default(), 4, FV, true)
1246 .expect("build json enqueue");
1247 assert_eq!(bytes, GOLDEN_JSON_ENQ);
1248 }
1249}