1#![forbid(unsafe_code)]
2
3use super::*;
17use crate::oson::{decode_oson_with_limits, encode_oson, OsonValue};
18use crate::wire::ProtocolLimits;
19
20#[derive(Clone, Debug, PartialEq, Eq)]
23pub enum AqPayloadKind {
24 Raw,
26 Json,
28 Object,
30}
31
32#[derive(Clone, Debug)]
34pub struct AqQueueDesc {
35 pub name: String,
36 pub kind: AqPayloadKind,
37 pub payload_toid: Vec<u8>,
40}
41
42impl AqQueueDesc {
43 pub fn new(name: String, kind: AqPayloadKind, object_oid: Option<Vec<u8>>) -> Self {
46 let payload_toid = match kind {
47 AqPayloadKind::Raw => raw_payload_toid(),
48 AqPayloadKind::Json => json_payload_toid(),
49 AqPayloadKind::Object => object_oid.unwrap_or_default(),
50 };
51 Self {
52 name,
53 kind,
54 payload_toid,
55 }
56 }
57}
58
59fn raw_payload_toid() -> Vec<u8> {
60 let mut toid = vec![0u8; 15];
61 toid.push(0x17);
62 toid
63}
64
65fn json_payload_toid() -> Vec<u8> {
66 let mut toid = vec![0u8; 15];
67 toid.push(0x47);
68 toid
69}
70
71#[derive(Clone, Debug)]
73pub enum AqPayloadValue {
74 Raw(Vec<u8>),
76 Json(OsonValue),
78 Object { oid: Vec<u8>, image: Vec<u8> },
80}
81
82#[derive(Clone, Debug)]
85pub struct AqMsgProps {
86 pub priority: i32,
87 pub delay: i32,
88 pub expiration: i32,
89 pub correlation: Option<String>,
90 pub exception_queue: Option<String>,
91 pub state: i32,
92 pub enq_txn_id: Option<Vec<u8>>,
93 pub recipients: Option<Vec<String>>,
95 pub payload: Option<AqPayloadValue>,
98}
99
100impl Default for AqMsgProps {
101 fn default() -> Self {
102 Self {
103 priority: 0,
104 delay: 0,
105 expiration: -1,
106 correlation: None,
107 exception_queue: None,
108 state: 0,
109 enq_txn_id: None,
110 recipients: Some(Vec::new()),
114 payload: None,
115 }
116 }
117}
118
119#[derive(Clone, Debug)]
122pub struct AqEnqOptions {
123 pub visibility: u32,
124 pub delivery_mode: u16,
125}
126
127impl Default for AqEnqOptions {
128 fn default() -> Self {
129 Self {
130 visibility: 2,
131 delivery_mode: TNS_AQ_MSG_PERSISTENT,
132 }
133 }
134}
135
136#[derive(Clone, Debug)]
140pub struct AqDeqOptions {
141 pub condition: Option<String>,
142 pub consumer_name: Option<String>,
143 pub correlation: Option<String>,
144 pub delivery_mode: u16,
145 pub mode: i32,
146 pub msgid: Option<Vec<u8>>,
147 pub navigation: i32,
148 pub visibility: i32,
149 pub wait: u32,
150}
151
152impl Default for AqDeqOptions {
153 fn default() -> Self {
154 Self {
155 condition: None,
156 consumer_name: None,
157 correlation: None,
158 delivery_mode: TNS_AQ_MSG_PERSISTENT,
159 mode: 3,
160 msgid: None,
161 navigation: 3,
162 visibility: 2,
163 wait: 0xFFFF_FFFF,
164 }
165 }
166}
167
168#[derive(Clone, Debug, Default)]
171pub struct AqDeqMessage {
172 pub priority: i32,
173 pub delay: i32,
174 pub expiration: i32,
175 pub correlation: Option<String>,
176 pub num_attempts: i32,
177 pub exception_queue: Option<String>,
178 pub state: i32,
179 pub enq_time: Option<QueryValue>,
181 pub delivery_mode: u16,
182 pub msgid: Option<Vec<u8>>,
183 pub payload: Option<AqDeqPayload>,
185}
186
187#[derive(Clone, Debug)]
189pub enum AqDeqPayload {
190 Raw(Vec<u8>),
192 Json(OsonValue),
194 Object(Vec<u8>),
197}
198
199fn write_aq_function_code(
207 writer: &mut TtcWriter,
208 function_code: u8,
209 seq_num: u8,
210 ttc_field_version: u8,
211) {
212 writer.write_function_code_with_seq(function_code, seq_num);
213 if ttc_field_version >= TNS_CCAP_FIELD_VERSION_23_1_EXT_1 {
214 writer.write_ub8(0); }
216}
217
218fn write_value_with_length(writer: &mut TtcWriter, value: Option<&[u8]>) -> Result<()> {
219 match value {
220 None => {
221 writer.write_ub4(0);
222 Ok(())
223 }
224 Some(bytes) => writer.write_bytes_with_two_lengths(Some(bytes)),
225 }
226}
227
228fn write_msg_props(
230 writer: &mut TtcWriter,
231 props: &AqMsgProps,
232 ttc_field_version: u8,
233) -> Result<()> {
234 writer.write_ub4(props.priority as u32);
235 writer.write_ub4(props.delay as u32);
236 writer.write_sb4(props.expiration);
237 write_value_with_length(writer, props.correlation.as_deref().map(str::as_bytes))?;
238 writer.write_ub4(0); write_value_with_length(writer, props.exception_queue.as_deref().map(str::as_bytes))?;
240 writer.write_ub4(props.state as u32);
241 writer.write_ub4(0); write_value_with_length(writer, props.enq_txn_id.as_deref())?;
243 writer.write_ub4(4); writer.write_u8(0x0e); writer.write_keyword_value_pair(None, None, TNS_AQ_EXT_KEYWORD_AGENT_NAME)?;
246 writer.write_keyword_value_pair(None, None, TNS_AQ_EXT_KEYWORD_AGENT_ADDRESS)?;
247 writer.write_keyword_value_pair(None, Some(b"\x00"), TNS_AQ_EXT_KEYWORD_AGENT_PROTOCOL)?;
248 writer.write_keyword_value_pair(None, None, TNS_AQ_EXT_KEYWORD_ORIGINAL_MSGID)?;
249 writer.write_ub4(0); writer.write_ub4(0); writer.write_ub4(0); writer.write_ub4(0); if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
254 writer.write_ub4(0xFFFF_FFFF); }
256 Ok(())
257}
258
259fn write_recipients(writer: &mut TtcWriter, recipients: &[String]) -> Result<()> {
261 let mut index: u16 = 0;
262 for recipient in recipients {
263 writer.write_keyword_value_pair(Some(recipient.as_bytes()), None, index)?;
264 writer.write_keyword_value_pair(None, None, index + 1)?;
265 writer.write_keyword_value_pair(None, Some(b"\x00"), index + 2)?;
266 index += 3;
267 }
268 Ok(())
269}
270
271fn write_payload(
273 writer: &mut TtcWriter,
274 payload: &AqPayloadValue,
275 supports_oson_long_fnames: bool,
276) -> Result<()> {
277 match payload {
278 AqPayloadValue::Json(value) => {
279 let image = encode_oson(value, supports_oson_long_fnames)?;
282 crate::vector::write_oson_aq_payload(writer, &image)
283 }
284 AqPayloadValue::Object { oid, image } => write_dbobject_bind(writer, oid, image),
285 AqPayloadValue::Raw(bytes) => {
286 writer.write_raw(bytes);
287 Ok(())
288 }
289 }
290}
291
292pub fn build_aq_enq_payload(
299 queue: &AqQueueDesc,
300 props: &AqMsgProps,
301 enq_options: &AqEnqOptions,
302 seq_num: u8,
303 ttc_field_version: u8,
304 supports_oson_long_fnames: bool,
305) -> Result<Vec<u8>> {
306 let payload = props
307 .payload
308 .as_ref()
309 .ok_or(ProtocolError::TtcDecode("AQ enqueue has no payload"))?;
310 let queue_name = queue.name.as_bytes();
311 let mut writer = TtcWriter::new();
312 write_aq_function_code(&mut writer, TNS_FUNC_AQ_ENQ, seq_num, ttc_field_version);
313 writer.write_u8(1); writer.write_ub4(queue_name.len() as u32);
315 write_msg_props(&mut writer, props, ttc_field_version)?;
316 match props.recipients.as_ref() {
317 None => {
318 writer.write_u8(0); writer.write_ub4(0); }
321 Some(recipients) => {
322 writer.write_u8(1);
323 writer.write_ub4(3 * recipients.len() as u32);
324 }
325 }
326 writer.write_ub4(enq_options.visibility);
327 writer.write_u8(0); writer.write_ub4(0); writer.write_ub4(0); writer.write_u8(1); writer.write_ub4(16); writer.write_ub2(TNS_AQ_MESSAGE_VERSION);
333 match queue.kind {
334 AqPayloadKind::Json => {
335 writer.write_u8(0); writer.write_u8(0); writer.write_ub4(0); }
339 AqPayloadKind::Object => {
340 writer.write_u8(1); writer.write_u8(0); writer.write_ub4(0); }
344 AqPayloadKind::Raw => {
345 let raw_len = match payload {
346 AqPayloadValue::Raw(bytes) => bytes.len() as u32,
347 _ => return Err(ProtocolError::TtcDecode("RAW queue requires RAW payload")),
348 };
349 writer.write_u8(0); writer.write_u8(1); writer.write_ub4(raw_len);
352 }
353 }
354 writer.write_u8(1); writer.write_ub4(TNS_AQ_MESSAGE_ID_LENGTH as u32);
356 let mut enq_flags = 0u32;
357 if enq_options.delivery_mode == TNS_AQ_MSG_BUFFERED {
358 enq_flags |= TNS_KPD_AQ_BUFMSG;
359 }
360 writer.write_ub4(enq_flags); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_u8(0); if ttc_field_version >= TNS_CCAP_FIELD_VERSION_20_1 {
379 writer.write_u8(u8::from(queue.kind == AqPayloadKind::Json));
381 }
382
383 writer.write_bytes_with_length(queue_name)?;
384 if let Some(recipients) = props.recipients.as_ref() {
385 write_recipients(&mut writer, recipients)?;
386 }
387 writer.write_raw(&queue.payload_toid);
388 write_payload(&mut writer, payload, supports_oson_long_fnames)?;
389 Ok(writer.into_bytes())
390}
391
392pub fn parse_aq_enq_response(
395 payload: &[u8],
396 capabilities: ClientCapabilities,
397) -> Result<Option<Vec<u8>>> {
398 parse_aq_enq_response_with_limits(payload, capabilities, ProtocolLimits::DEFAULT)
399}
400
401pub fn parse_aq_enq_response_with_limits(
402 payload: &[u8],
403 capabilities: ClientCapabilities,
404 limits: ProtocolLimits,
405) -> Result<Option<Vec<u8>>> {
406 let mut reader = TtcReader::with_limits(payload, limits)?;
407 let mut msgid: Option<Vec<u8>> = None;
408 while reader.remaining() > 0 {
409 let message_type = reader.read_u8()?;
410 match message_type {
411 0 => {}
412 TNS_MSG_TYPE_PARAMETER => {
413 let id = reader.read_raw(TNS_AQ_MESSAGE_ID_LENGTH)?.to_vec();
414 let _ext_len = reader.read_ub2()?;
415 msgid = Some(id);
416 }
417 TNS_MSG_TYPE_STATUS => {
418 let _call_status = reader.read_ub4()?;
419 let _seq = reader.read_ub2()?;
420 }
421 TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
422 let _ = skip_server_side_piggyback(&mut reader)?;
423 }
424 TNS_MSG_TYPE_END_OF_RESPONSE => break,
425 TNS_MSG_TYPE_ERROR => {
426 let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
427 if info.number != 0 {
428 return Err(ProtocolError::ServerErrorInfo(Box::new(
429 info.into_details(),
430 )));
431 }
432 }
433 _ => {
434 return Err(ProtocolError::UnknownMessageType {
435 message_type,
436 position: reader.position().saturating_sub(1),
437 })
438 }
439 }
440 }
441 Ok(msgid)
442}
443
444pub fn build_aq_deq_payload(
451 queue: &AqQueueDesc,
452 deq_options: &AqDeqOptions,
453 seq_num: u8,
454 ttc_field_version: u8,
455) -> Result<Vec<u8>> {
456 let queue_name = queue.name.as_bytes();
457 let mut writer = TtcWriter::new();
458 write_aq_function_code(&mut writer, TNS_FUNC_AQ_DEQ, seq_num, ttc_field_version);
459 writer.write_u8(1); writer.write_ub4(queue_name.len() as u32);
461 writer.write_u8(1); writer.write_u8(1); writer.write_u8(1); writer.write_u8(1); let consumer_name = deq_options
466 .consumer_name
467 .as_ref()
468 .filter(|name| !name.is_empty());
469 match consumer_name {
470 Some(name) => {
471 writer.write_u8(1);
472 writer.write_ub4(name.len() as u32);
473 }
474 None => {
475 writer.write_u8(0);
476 writer.write_ub4(0);
477 }
478 }
479 writer.write_sb4(deq_options.mode);
480 writer.write_sb4(deq_options.navigation);
481 writer.write_sb4(deq_options.visibility);
482 writer.write_sb4(deq_options.wait as i32);
483 let msgid = deq_options.msgid.as_ref().filter(|id| !id.is_empty());
484 match msgid {
485 Some(_) => {
486 writer.write_u8(1);
487 writer.write_ub4(TNS_AQ_MESSAGE_ID_LENGTH as u32);
488 }
489 None => {
490 writer.write_u8(0);
491 writer.write_ub4(0);
492 }
493 }
494 let correlation = deq_options.correlation.as_ref().filter(|c| !c.is_empty());
495 match correlation {
496 Some(c) => {
497 writer.write_u8(1);
498 writer.write_ub4(c.len() as u32);
499 }
500 None => {
501 writer.write_u8(0);
502 writer.write_ub4(0);
503 }
504 }
505 writer.write_u8(1); writer.write_ub4(16); writer.write_ub2(TNS_AQ_MESSAGE_VERSION);
508 writer.write_u8(1); writer.write_u8(1); writer.write_ub4(TNS_AQ_MESSAGE_ID_LENGTH as u32);
511 let mut deq_flags = 0u32;
512 match deq_options.delivery_mode {
513 TNS_AQ_MSG_BUFFERED => deq_flags |= TNS_KPD_AQ_BUFMSG,
514 TNS_AQ_MSG_PERSISTENT_OR_BUFFERED => deq_flags |= TNS_KPD_AQ_EITHER,
515 _ => {}
516 }
517 writer.write_ub4(deq_flags);
518 let condition = deq_options.condition.as_ref().filter(|c| !c.is_empty());
519 match condition {
520 Some(c) => {
521 writer.write_u8(1);
522 writer.write_ub4(c.len() as u32);
523 }
524 None => {
525 writer.write_u8(0);
526 writer.write_ub4(0);
527 }
528 }
529 writer.write_u8(0); writer.write_ub4(0); if ttc_field_version >= TNS_CCAP_FIELD_VERSION_20_1 {
532 writer.write_u8(0); }
534 if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
535 writer.write_ub4(0xFFFF_FFFF); }
537
538 writer.write_bytes_with_length(queue_name)?;
539 if let Some(name) = consumer_name {
540 writer.write_bytes_with_length(name.as_bytes())?;
541 }
542 if let Some(id) = msgid {
543 let mut id = id.clone();
544 id.truncate(16);
545 if id.len() < 16 {
546 id.resize(16, 0);
547 }
548 writer.write_raw(&id);
549 }
550 if let Some(c) = correlation {
551 writer.write_bytes_with_length(c.as_bytes())?;
552 }
553 writer.write_raw(&queue.payload_toid);
554 if let Some(c) = condition {
555 writer.write_bytes_with_length(c.as_bytes())?;
556 }
557 Ok(writer.into_bytes())
558}
559
560#[derive(Clone, Debug, Default)]
562pub struct AqDeqResult {
563 pub message: Option<AqDeqMessage>,
565}
566
567pub fn parse_aq_deq_response(
570 payload: &[u8],
571 capabilities: ClientCapabilities,
572 kind: &AqPayloadKind,
573) -> Result<AqDeqResult> {
574 parse_aq_deq_response_with_limits(payload, capabilities, kind, ProtocolLimits::DEFAULT)
575}
576
577pub fn parse_aq_deq_response_with_limits(
578 payload: &[u8],
579 capabilities: ClientCapabilities,
580 kind: &AqPayloadKind,
581 limits: ProtocolLimits,
582) -> Result<AqDeqResult> {
583 let mut reader = TtcReader::with_limits(payload, limits)?;
584 let mut result = AqDeqResult::default();
585 let mut no_msg_found = false;
586 while reader.remaining() > 0 {
587 let message_type = reader.read_u8()?;
588 match message_type {
589 0 => {}
590 TNS_MSG_TYPE_PARAMETER => {
591 let num_bytes = reader.read_ub4()?;
592 if num_bytes > 0 {
593 let mut message = AqDeqMessage::default();
594 process_msg_props(&mut reader, &mut message, capabilities.ttc_field_version)?;
595 process_recipients(&mut reader)?;
596 message.payload = process_payload(&mut reader, kind)?;
597 message.msgid = Some(process_msg_id(&mut reader)?);
598 result.message = Some(message);
599 }
600 }
601 TNS_MSG_TYPE_STATUS => {
602 let _call_status = reader.read_ub4()?;
603 let _seq = reader.read_ub2()?;
604 }
605 TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
606 let _ = skip_server_side_piggyback(&mut reader)?;
607 }
608 TNS_MSG_TYPE_END_OF_RESPONSE => break,
609 TNS_MSG_TYPE_ERROR => {
610 let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
611 if info.number == TNS_ERR_NO_MESSAGES_FOUND as u32 {
612 no_msg_found = true;
613 } else if info.number != 0 {
614 return Err(ProtocolError::ServerErrorInfo(Box::new(
615 info.into_details(),
616 )));
617 }
618 }
619 _ => {
620 return Err(ProtocolError::UnknownMessageType {
621 message_type,
622 position: reader.position().saturating_sub(1),
623 })
624 }
625 }
626 }
627 if no_msg_found {
628 result.message = None;
629 }
630 Ok(result)
631}
632
633pub fn build_aq_array_enq_payload(
640 queue: &AqQueueDesc,
641 props_list: &[AqMsgProps],
642 enq_options: &AqEnqOptions,
643 seq_num: u8,
644 ttc_field_version: u8,
645 supports_oson_long_fnames: bool,
646) -> Result<Vec<u8>> {
647 let num_iters = props_list.len() as u32;
648 let queue_name = queue.name.as_bytes();
649 let mut writer = TtcWriter::new();
650 write_aq_function_code(&mut writer, TNS_FUNC_AQ_ARRAY, seq_num, ttc_field_version);
651 writer.write_u8(0); writer.write_ub4(0); writer.write_ub4(TNS_AQ_ARRAY_FLAGS_RETURN_MESSAGE_ID);
654 writer.write_u8(1); writer.write_u8(0); writer.write_sb4(TNS_AQ_ARRAY_ENQ);
657 writer.write_u8(1); if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
659 writer.write_ub4(0xFFFF); }
661 writer.write_ub4(num_iters);
662
663 let mut flags = 0u32;
664 if enq_options.delivery_mode == TNS_AQ_MSG_BUFFERED {
665 flags |= TNS_KPD_AQ_BUFMSG;
666 }
667 writer.write_ub4(0); writer.write_u8(TNS_MSG_TYPE_ROW_HEADER);
669 writer.write_bytes_with_two_lengths(Some(queue_name))?;
670 writer.write_raw(&queue.payload_toid);
671 writer.write_ub2(TNS_AQ_MESSAGE_VERSION);
672 writer.write_ub4(flags);
673 for props in props_list {
674 let payload = props
675 .payload
676 .as_ref()
677 .ok_or(ProtocolError::TtcDecode("AQ array enqueue has no payload"))?;
678 writer.write_u8(TNS_MSG_TYPE_ROW_DATA);
679 writer.write_ub4(flags); write_msg_props(&mut writer, props, ttc_field_version)?;
681 match props.recipients.as_ref() {
682 None => writer.write_ub4(0),
683 Some(recipients) => {
684 writer.write_ub4(3 * recipients.len() as u32);
685 write_recipients(&mut writer, recipients)?;
686 }
687 }
688 writer.write_sb4(enq_options.visibility as i32);
689 writer.write_ub4(0); writer.write_sb4(0); if matches!(queue.kind, AqPayloadKind::Raw) {
692 let raw_len = match payload {
693 AqPayloadValue::Raw(bytes) => bytes.len() as u32,
694 _ => return Err(ProtocolError::TtcDecode("RAW queue requires RAW payload")),
695 };
696 writer.write_ub4(raw_len);
697 }
698 write_payload(&mut writer, payload, supports_oson_long_fnames)?;
699 }
700 writer.write_u8(TNS_MSG_TYPE_STATUS);
701 Ok(writer.into_bytes())
702}
703
704pub fn build_aq_array_deq_payload(
707 queue: &AqQueueDesc,
708 deq_options: &AqDeqOptions,
709 num_iters: u32,
710 seq_num: u8,
711 ttc_field_version: u8,
712) -> Result<Vec<u8>> {
713 let queue_name = queue.name.as_bytes();
714 let mut writer = TtcWriter::new();
715 write_aq_function_code(&mut writer, TNS_FUNC_AQ_ARRAY, seq_num, ttc_field_version);
716 writer.write_u8(1); writer.write_ub4(num_iters);
718 writer.write_ub4(TNS_AQ_ARRAY_FLAGS_RETURN_MESSAGE_ID);
719 writer.write_u8(1); writer.write_u8(1); writer.write_sb4(TNS_AQ_ARRAY_DEQ);
722 writer.write_u8(0); if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
724 writer.write_ub4(0xFFFF); }
726
727 let mut flags = 0u32;
728 match deq_options.delivery_mode {
729 TNS_AQ_MSG_BUFFERED => flags |= TNS_KPD_AQ_BUFMSG,
730 TNS_AQ_MSG_PERSISTENT_OR_BUFFERED => flags |= TNS_KPD_AQ_EITHER,
731 _ => {}
732 }
733 let consumer_name = deq_options
734 .consumer_name
735 .as_ref()
736 .filter(|name| !name.is_empty())
737 .map(|name| name.as_bytes());
738 let correlation = deq_options
739 .correlation
740 .as_ref()
741 .filter(|c| !c.is_empty())
742 .map(|c| c.as_bytes());
743 let condition = deq_options
744 .condition
745 .as_ref()
746 .filter(|c| !c.is_empty())
747 .map(|c| c.as_bytes());
748 let props = AqMsgProps::default();
749 for _ in 0..num_iters {
750 writer.write_bytes_with_two_lengths(Some(queue_name))?;
751 write_msg_props(&mut writer, &props, ttc_field_version)?;
752 writer.write_ub4(0); write_value_with_length(&mut writer, consumer_name)?;
754 writer.write_sb4(deq_options.mode);
755 writer.write_sb4(deq_options.navigation);
756 writer.write_sb4(deq_options.visibility);
757 writer.write_sb4(deq_options.wait as i32);
758 write_value_with_length(&mut writer, deq_options.msgid.as_deref())?;
759 write_value_with_length(&mut writer, correlation)?;
760 write_value_with_length(&mut writer, condition)?;
761 writer.write_ub4(0); writer.write_ub4(0); writer.write_sb4(0); writer.write_bytes_with_two_lengths(Some(&queue.payload_toid))?;
765 writer.write_ub2(TNS_AQ_MESSAGE_VERSION);
766 writer.write_ub4(0); writer.write_ub4(0); writer.write_ub4(0);
769 writer.write_ub4(flags);
770 writer.write_ub4(0); writer.write_ub4(0); }
773 Ok(writer.into_bytes())
774}
775
776#[derive(Clone, Debug, Default)]
779pub struct AqArrayResult {
780 pub enq_msgids: Vec<Vec<u8>>,
782 pub deq_messages: Vec<AqDeqMessage>,
784}
785
786pub fn parse_aq_array_response(
792 payload: &[u8],
793 capabilities: ClientCapabilities,
794 operation: i32,
795 props_count: u32,
796 kind: &AqPayloadKind,
797) -> Result<AqArrayResult> {
798 parse_aq_array_response_with_limits(
799 payload,
800 capabilities,
801 operation,
802 props_count,
803 kind,
804 ProtocolLimits::DEFAULT,
805 )
806}
807
808pub fn parse_aq_array_response_with_limits(
809 payload: &[u8],
810 capabilities: ClientCapabilities,
811 operation: i32,
812 props_count: u32,
813 kind: &AqPayloadKind,
814 limits: ProtocolLimits,
815) -> Result<AqArrayResult> {
816 limits.check_batch_rows(props_count as usize)?;
817 let mut reader = TtcReader::with_limits(payload, limits)?;
818 let mut result = AqArrayResult::default();
819 let mut messages: Vec<AqDeqMessage> = Vec::new();
820 let mut enq_msgid_blob: Option<Vec<u8>> = None;
821 let mut response_num_iters: u32 = 0;
822 let mut no_msg_found = false;
823 while reader.remaining() > 0 {
824 let message_type = reader.read_u8()?;
825 match message_type {
826 0 => {}
827 TNS_MSG_TYPE_PARAMETER => {
828 let num_iters = reader.read_ub4()?;
829 reader.limits().check_batch_rows(num_iters as usize)?;
830 response_num_iters = num_iters;
831 for i in 0..num_iters {
832 let mut message = AqDeqMessage::default();
833 let props_len = reader.read_ub2()?;
834 if props_len > 0 {
835 reader.read_u8()?; process_msg_props(
837 &mut reader,
838 &mut message,
839 capabilities.ttc_field_version,
840 )?;
841 }
842 process_recipients(&mut reader)?;
843 let payload_len = reader.read_ub2()?;
844 if payload_len > 0 {
845 message.payload = process_payload(&mut reader, kind)?;
846 }
847 let msgid = reader.read_bytes_with_length()?.unwrap_or_default();
848 if operation == TNS_AQ_ARRAY_ENQ {
849 enq_msgid_blob = Some(msgid);
850 } else {
851 message.msgid = Some(msgid);
852 }
853 let ext_len = reader.read_ub2()?;
854 if ext_len > 0 {
855 return Err(ProtocolError::UnsupportedFeature("AQ array extensions"));
856 }
857 let _output_ack = reader.read_ub2()?;
858 if operation != TNS_AQ_ARRAY_ENQ {
859 let _ = i;
860 messages.push(message);
861 }
862 }
863 if operation == TNS_AQ_ARRAY_ENQ {
864 response_num_iters = reader.read_ub4()?;
865 reader
866 .limits()
867 .check_batch_rows(response_num_iters as usize)?;
868 }
869 }
870 TNS_MSG_TYPE_STATUS => {
871 let _call_status = reader.read_ub4()?;
872 let _seq = reader.read_ub2()?;
873 }
874 TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
875 let _ = skip_server_side_piggyback(&mut reader)?;
876 }
877 TNS_MSG_TYPE_END_OF_RESPONSE => break,
878 TNS_MSG_TYPE_ERROR => {
879 let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
880 if info.number == TNS_ERR_NO_MESSAGES_FOUND as u32 {
881 no_msg_found = true;
882 } else if info.number != 0 {
883 return Err(ProtocolError::ServerErrorInfo(Box::new(
884 info.into_details(),
885 )));
886 }
887 }
888 _ => {
889 return Err(ProtocolError::UnknownMessageType {
890 message_type,
891 position: reader.position().saturating_sub(1),
892 })
893 }
894 }
895 }
896 if operation == TNS_AQ_ARRAY_ENQ {
897 if let Some(blob) = enq_msgid_blob {
898 let count = props_count as usize;
899 result.enq_msgids = (0..count)
900 .map(|j| {
901 let start = j * 16;
902 let end = start + 16;
903 blob.get(start..end).map(<[u8]>::to_vec).unwrap_or_default()
904 })
905 .collect();
906 }
907 } else if no_msg_found {
908 result.deq_messages = Vec::new();
909 } else {
910 let keep = response_num_iters as usize;
911 messages.truncate(keep);
912 result.deq_messages = messages;
913 }
914 Ok(result)
915}
916
917fn process_msg_props(
922 reader: &mut TtcReader<'_>,
923 message: &mut AqDeqMessage,
924 ttc_field_version: u8,
925) -> Result<()> {
926 message.priority = reader.read_sb4()?;
927 message.delay = reader.read_sb4()?;
928 message.expiration = reader.read_sb4()?;
929 message.correlation = reader.read_string_with_length()?;
930 message.num_attempts = reader.read_sb4()?;
931 message.exception_queue = reader.read_string_with_length()?;
932 message.state = reader.read_sb4()?;
933 message.enq_time = process_date(reader)?;
934 let _enq_txn_id = reader.read_bytes_with_length()?;
935 process_extensions(reader)?;
936 let user_props = reader.read_ub4()?;
937 if user_props > 0 {
938 return Err(ProtocolError::UnsupportedFeature("AQ user properties"));
939 }
940 let _csn = reader.read_ub4()?;
941 let _dsn = reader.read_ub4()?;
942 let flags = reader.read_ub4()?;
943 message.delivery_mode = if flags == TNS_KPD_AQ_BUFMSG {
944 TNS_AQ_MSG_BUFFERED
945 } else {
946 TNS_AQ_MSG_PERSISTENT
947 };
948 if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
949 let _shard = reader.read_ub4()?;
950 }
951 Ok(())
952}
953
954fn process_date(reader: &mut TtcReader<'_>) -> Result<Option<QueryValue>> {
958 let num_bytes = reader.read_ub4()?;
959 if num_bytes == 0 {
960 return Ok(None);
961 }
962 let len = usize::from(reader.read_u8()?);
963 if len == 0 {
964 return Ok(None);
965 }
966 let bytes = reader.read_raw(len)?;
967 Ok(Some(decode_datetime_value(bytes)?))
968}
969
970fn process_extensions(reader: &mut TtcReader<'_>) -> Result<()> {
971 let num_extensions = reader.read_ub4()?;
972 if num_extensions > 0 {
973 reader.read_u8()?; for _ in 0..num_extensions {
975 let _text = reader.read_bytes_with_length()?;
976 let _binary = reader.read_bytes_with_length()?;
977 let _keyword = reader.read_ub2()?;
978 }
979 }
980 Ok(())
981}
982
983fn process_recipients(reader: &mut TtcReader<'_>) -> Result<()> {
984 let count = reader.read_ub4()?;
985 if count > 0 {
986 return Err(ProtocolError::UnsupportedFeature(
987 "AQ recipients on dequeue",
988 ));
989 }
990 Ok(())
991}
992
993fn process_msg_id(reader: &mut TtcReader<'_>) -> Result<Vec<u8>> {
994 Ok(reader.read_raw(TNS_AQ_MESSAGE_ID_LENGTH)?.to_vec())
995}
996
997fn process_payload(
999 reader: &mut TtcReader<'_>,
1000 kind: &AqPayloadKind,
1001) -> Result<Option<AqDeqPayload>> {
1002 if matches!(kind, AqPayloadKind::Object) {
1003 let _toid = reader.read_bytes_with_length()?;
1007 let _oid = reader.read_bytes_with_length()?;
1008 let _snapshot = reader.read_bytes_with_length()?;
1009 let _version = reader.read_ub2()?;
1010 let image_length = reader.read_ub4()?;
1011 reader
1012 .limits()
1013 .check_response_bytes(image_length as usize)?;
1014 let _flags = reader.read_ub2()?;
1015 if image_length == 0 {
1016 return Ok(None);
1017 }
1018 let image = reader
1019 .read_bytes()?
1020 .ok_or(ProtocolError::TtcDecode("AQ object payload missing"))?;
1021 return Ok(Some(AqDeqPayload::Object(image)));
1022 }
1023 let _toid = reader.read_bytes_with_length()?;
1025 let _oid = reader.read_bytes_with_length()?;
1026 let _snapshot = reader.read_bytes_with_length()?;
1027 let _version = reader.read_ub2()?;
1028 let image_length = reader.read_ub4()? as usize;
1029 reader.limits().check_response_bytes(image_length)?;
1030 let _flags = reader.read_ub2()?;
1031 if image_length > 0 {
1032 let raw = reader
1034 .read_bytes()?
1035 .ok_or(ProtocolError::TtcDecode("AQ payload missing"))?;
1036 let end = image_length.min(raw.len());
1037 let start = 4.min(end);
1038 let payload = raw.get(start..end).unwrap_or_default().to_vec();
1039 if matches!(kind, AqPayloadKind::Json) {
1040 let value = decode_oson_with_limits(&payload, reader.limits())?;
1041 return Ok(Some(AqDeqPayload::Json(value)));
1042 }
1043 return Ok(Some(AqDeqPayload::Raw(payload)));
1044 }
1045 if matches!(kind, AqPayloadKind::Raw) {
1046 return Ok(Some(AqDeqPayload::Raw(Vec::new())));
1047 }
1048 Ok(None)
1049}
1050
1051#[cfg(test)]
1052mod tests {
1053 use super::*;
1054
1055 const FV: u8 = 24;
1058
1059 fn caps() -> ClientCapabilities {
1060 ClientCapabilities {
1061 ttc_field_version: FV,
1062 max_string_size: 32767,
1063 charset_id: 873,
1064 }
1065 }
1066
1067 const GOLDEN_RAW_ENQ: &[u8] = &[
1072 0x03, 0x79, 0x04, 0x00, 0x01, 0x01, 0x0e, 0x01, 0x02, 0x00, 0x81, 0x01, 0x01, 0x05, 0x05,
1073 0x43, 0x4f, 0x52, 0x52, 0x31, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x04, 0x0e, 0x00, 0x00,
1074 0x01, 0x40, 0x00, 0x00, 0x01, 0x41, 0x00, 0x01, 0x01, 0x01, 0x00, 0x01, 0x42, 0x00, 0x00,
1075 0x01, 0x45, 0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff, 0xff, 0x01, 0x00, 0x01, 0x02,
1076 0x00, 0x00, 0x00, 0x01, 0x01, 0x10, 0x01, 0x01, 0x00, 0x01, 0x01, 0x11, 0x01, 0x01, 0x10,
1077 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1078 0x00, 0x00, 0x00, 0x00, 0x0e, 0x54, 0x45, 0x53, 0x54, 0x5f, 0x52, 0x41, 0x57, 0x5f, 0x51,
1079 0x55, 0x45, 0x55, 0x45, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1080 0x00, 0x00, 0x00, 0x00, 0x17, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x20, 0x72, 0x61, 0x77,
1081 0x20, 0x64, 0x61, 0x74, 0x61, 0x20, 0x31,
1082 ];
1083
1084 const GOLDEN_RAW_DEQ: &[u8] = &[
1087 0x03, 0x7a, 0x06, 0x00, 0x01, 0x01, 0x0e, 0x01, 0x01, 0x01, 0x01, 0x00, 0x00, 0x01, 0x03,
1088 0x01, 0x01, 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x10, 0x01, 0x01, 0x01,
1089 0x01, 0x01, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff, 0xff, 0x0e,
1090 0x54, 0x45, 0x53, 0x54, 0x5f, 0x52, 0x41, 0x57, 0x5f, 0x51, 0x55, 0x45, 0x55, 0x45, 0x00,
1091 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x17,
1092 ];
1093
1094 #[test]
1095 fn raw_enqueue_request_matches_golden() {
1096 let queue = AqQueueDesc::new("TEST_RAW_QUEUE".to_string(), AqPayloadKind::Raw, None);
1097 let props = AqMsgProps {
1098 priority: 2,
1099 correlation: Some("CORR1".to_string()),
1100 payload: Some(AqPayloadValue::Raw(b"sample raw data 1".to_vec())),
1101 ..AqMsgProps::default()
1102 };
1103 let bytes = build_aq_enq_payload(&queue, &props, &AqEnqOptions::default(), 4, FV, false)
1104 .expect("build enqueue");
1105 assert_eq!(bytes, GOLDEN_RAW_ENQ);
1106 }
1107
1108 #[test]
1109 fn raw_dequeue_request_matches_golden() {
1110 let queue = AqQueueDesc::new("TEST_RAW_QUEUE".to_string(), AqPayloadKind::Raw, None);
1111 let deq = AqDeqOptions {
1112 wait: 0,
1113 navigation: 1,
1114 ..AqDeqOptions::default()
1115 };
1116 let bytes = build_aq_deq_payload(&queue, &deq, 6, FV).expect("build dequeue");
1117 assert_eq!(bytes, GOLDEN_RAW_DEQ);
1118 }
1119
1120 #[test]
1121 fn empty_queue_dequeue_yields_no_message() {
1122 let caps = caps();
1126 let res = parse_aq_deq_response(&[], caps, &AqPayloadKind::Raw).expect("parse");
1127 assert!(res.message.is_none());
1128 }
1129
1130 #[test]
1131 fn aq_array_response_respects_protocol_batch_limit() {
1132 let limits = ProtocolLimits {
1133 max_batch_rows: 1,
1134 ..ProtocolLimits::DEFAULT
1135 };
1136 let err = parse_aq_array_response_with_limits(
1137 &[],
1138 caps(),
1139 TNS_AQ_ARRAY_ENQ,
1140 2,
1141 &AqPayloadKind::Raw,
1142 limits,
1143 )
1144 .expect_err("client-side AQ batch count above policy must fail");
1145 assert!(
1146 matches!(
1147 err,
1148 ProtocolError::ResourceLimit {
1149 limit: "batch_rows",
1150 observed: 2,
1151 maximum: 1,
1152 }
1153 ),
1154 "got {err:?}"
1155 );
1156 }
1157
1158 const GOLDEN_JSON_ENQ: &[u8] = &[
1162 0x03, 0x79, 0x04, 0x00, 0x01, 0x01, 0x0f, 0x00, 0x00, 0x81, 0x01, 0x00, 0x00, 0x00, 0x00,
1163 0x00, 0x00, 0x01, 0x04, 0x0e, 0x00, 0x00, 0x01, 0x40, 0x00, 0x00, 0x01, 0x41, 0x00, 0x01,
1164 0x01, 0x01, 0x00, 0x01, 0x42, 0x00, 0x00, 0x01, 0x45, 0x00, 0x00, 0x00, 0x00, 0x04, 0xff,
1165 0xff, 0xff, 0xff, 0x01, 0x00, 0x01, 0x02, 0x00, 0x00, 0x00, 0x01, 0x01, 0x10, 0x01, 0x01,
1166 0x00, 0x00, 0x00, 0x01, 0x01, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1167 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x0f, 0x54, 0x45, 0x53, 0x54,
1168 0x5f, 0x4a, 0x53, 0x4f, 0x4e, 0x5f, 0x51, 0x55, 0x45, 0x55, 0x45, 0x00, 0x00, 0x00, 0x00,
1169 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x47, 0x01, 0x28, 0x00,
1170 0x26, 0x00, 0x04, 0x61, 0x08, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1171 0x00, 0x43, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1172 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x43, 0xff, 0x4a, 0x5a, 0x01, 0x21,
1173 0x02, 0x03, 0x00, 0x0e, 0x00, 0x1f, 0x00, 0x00, 0x42, 0x9c, 0xe6, 0x00, 0x09, 0x00, 0x05,
1174 0x00, 0x00, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x03, 0x61, 0x67, 0x65, 0x04, 0x63, 0x69, 0x74,
1175 0x79, 0xa4, 0x03, 0x03, 0x02, 0x01, 0x00, 0x00, 0x00, 0x11, 0x00, 0x00, 0x00, 0x17, 0x00,
1176 0x00, 0x00, 0x1b, 0x33, 0x04, 0x4a, 0x6f, 0x68, 0x6e, 0x34, 0x02, 0xc1, 0x1f, 0x33, 0x02,
1177 0x4e, 0x59,
1178 ];
1179
1180 #[test]
1181 fn json_enqueue_request_matches_golden() {
1182 let queue = AqQueueDesc::new("TEST_JSON_QUEUE".to_string(), AqPayloadKind::Json, None);
1183 let value = OsonValue::Object(vec![
1185 ("name".to_string(), OsonValue::String("John".to_string())),
1186 ("age".to_string(), OsonValue::Number("30".to_string())),
1187 ("city".to_string(), OsonValue::String("NY".to_string())),
1188 ]);
1189 let props = AqMsgProps {
1190 payload: Some(AqPayloadValue::Json(value)),
1191 ..AqMsgProps::default()
1192 };
1193 let bytes = build_aq_enq_payload(&queue, &props, &AqEnqOptions::default(), 4, FV, true)
1195 .expect("build json enqueue");
1196 assert_eq!(bytes, GOLDEN_JSON_ENQ);
1197 }
1198}