1#![forbid(unsafe_code)]
2
3use super::*;
17use crate::oson::{decode_oson, encode_oson, OsonValue};
18
19#[derive(Clone, Debug, PartialEq, Eq)]
22pub enum AqPayloadKind {
23 Raw,
25 Json,
27 Object,
29}
30
31#[derive(Clone, Debug)]
33pub struct AqQueueDesc {
34 pub name: String,
35 pub kind: AqPayloadKind,
36 pub payload_toid: Vec<u8>,
39}
40
41impl AqQueueDesc {
42 pub fn new(name: String, kind: AqPayloadKind, object_oid: Option<Vec<u8>>) -> Self {
45 let payload_toid = match kind {
46 AqPayloadKind::Raw => raw_payload_toid(),
47 AqPayloadKind::Json => json_payload_toid(),
48 AqPayloadKind::Object => object_oid.unwrap_or_default(),
49 };
50 Self {
51 name,
52 kind,
53 payload_toid,
54 }
55 }
56}
57
58fn raw_payload_toid() -> Vec<u8> {
59 let mut toid = vec![0u8; 15];
60 toid.push(0x17);
61 toid
62}
63
64fn json_payload_toid() -> Vec<u8> {
65 let mut toid = vec![0u8; 15];
66 toid.push(0x47);
67 toid
68}
69
70#[derive(Clone, Debug)]
72pub enum AqPayloadValue {
73 Raw(Vec<u8>),
75 Json(OsonValue),
77 Object { oid: Vec<u8>, image: Vec<u8> },
79}
80
81#[derive(Clone, Debug)]
84pub struct AqMsgProps {
85 pub priority: i32,
86 pub delay: i32,
87 pub expiration: i32,
88 pub correlation: Option<String>,
89 pub exception_queue: Option<String>,
90 pub state: i32,
91 pub enq_txn_id: Option<Vec<u8>>,
92 pub recipients: Option<Vec<String>>,
94 pub payload: Option<AqPayloadValue>,
97}
98
99impl Default for AqMsgProps {
100 fn default() -> Self {
101 Self {
102 priority: 0,
103 delay: 0,
104 expiration: -1,
105 correlation: None,
106 exception_queue: None,
107 state: 0,
108 enq_txn_id: None,
109 recipients: Some(Vec::new()),
113 payload: None,
114 }
115 }
116}
117
118#[derive(Clone, Debug)]
121pub struct AqEnqOptions {
122 pub visibility: u32,
123 pub delivery_mode: u16,
124}
125
126impl Default for AqEnqOptions {
127 fn default() -> Self {
128 Self {
129 visibility: 2,
130 delivery_mode: TNS_AQ_MSG_PERSISTENT,
131 }
132 }
133}
134
135#[derive(Clone, Debug)]
139pub struct AqDeqOptions {
140 pub condition: Option<String>,
141 pub consumer_name: Option<String>,
142 pub correlation: Option<String>,
143 pub delivery_mode: u16,
144 pub mode: i32,
145 pub msgid: Option<Vec<u8>>,
146 pub navigation: i32,
147 pub visibility: i32,
148 pub wait: u32,
149}
150
151impl Default for AqDeqOptions {
152 fn default() -> Self {
153 Self {
154 condition: None,
155 consumer_name: None,
156 correlation: None,
157 delivery_mode: TNS_AQ_MSG_PERSISTENT,
158 mode: 3,
159 msgid: None,
160 navigation: 3,
161 visibility: 2,
162 wait: 0xFFFF_FFFF,
163 }
164 }
165}
166
167#[derive(Clone, Debug, Default)]
170pub struct AqDeqMessage {
171 pub priority: i32,
172 pub delay: i32,
173 pub expiration: i32,
174 pub correlation: Option<String>,
175 pub num_attempts: i32,
176 pub exception_queue: Option<String>,
177 pub state: i32,
178 pub enq_time: Option<QueryValue>,
180 pub delivery_mode: u16,
181 pub msgid: Option<Vec<u8>>,
182 pub payload: Option<AqDeqPayload>,
184}
185
186#[derive(Clone, Debug)]
188pub enum AqDeqPayload {
189 Raw(Vec<u8>),
191 Json(OsonValue),
193 Object(Vec<u8>),
196}
197
198fn write_aq_function_code(
206 writer: &mut TtcWriter,
207 function_code: u8,
208 seq_num: u8,
209 ttc_field_version: u8,
210) {
211 writer.write_function_code_with_seq(function_code, seq_num);
212 if ttc_field_version >= TNS_CCAP_FIELD_VERSION_23_1_EXT_1 {
213 writer.write_ub8(0); }
215}
216
217fn write_value_with_length(writer: &mut TtcWriter, value: Option<&[u8]>) -> Result<()> {
218 match value {
219 None => {
220 writer.write_ub4(0);
221 Ok(())
222 }
223 Some(bytes) => writer.write_bytes_with_two_lengths(Some(bytes)),
224 }
225}
226
227fn write_msg_props(
229 writer: &mut TtcWriter,
230 props: &AqMsgProps,
231 ttc_field_version: u8,
232) -> Result<()> {
233 writer.write_ub4(props.priority as u32);
234 writer.write_ub4(props.delay as u32);
235 writer.write_sb4(props.expiration);
236 write_value_with_length(writer, props.correlation.as_deref().map(str::as_bytes))?;
237 writer.write_ub4(0); write_value_with_length(writer, props.exception_queue.as_deref().map(str::as_bytes))?;
239 writer.write_ub4(props.state as u32);
240 writer.write_ub4(0); write_value_with_length(writer, props.enq_txn_id.as_deref())?;
242 writer.write_ub4(4); writer.write_u8(0x0e); writer.write_keyword_value_pair(None, None, TNS_AQ_EXT_KEYWORD_AGENT_NAME)?;
245 writer.write_keyword_value_pair(None, None, TNS_AQ_EXT_KEYWORD_AGENT_ADDRESS)?;
246 writer.write_keyword_value_pair(None, Some(b"\x00"), TNS_AQ_EXT_KEYWORD_AGENT_PROTOCOL)?;
247 writer.write_keyword_value_pair(None, None, TNS_AQ_EXT_KEYWORD_ORIGINAL_MSGID)?;
248 writer.write_ub4(0); writer.write_ub4(0); writer.write_ub4(0); writer.write_ub4(0); if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
253 writer.write_ub4(0xFFFF_FFFF); }
255 Ok(())
256}
257
258fn write_recipients(writer: &mut TtcWriter, recipients: &[String]) -> Result<()> {
260 let mut index: u16 = 0;
261 for recipient in recipients {
262 writer.write_keyword_value_pair(Some(recipient.as_bytes()), None, index)?;
263 writer.write_keyword_value_pair(None, None, index + 1)?;
264 writer.write_keyword_value_pair(None, Some(b"\x00"), index + 2)?;
265 index += 3;
266 }
267 Ok(())
268}
269
270fn write_payload(
272 writer: &mut TtcWriter,
273 payload: &AqPayloadValue,
274 supports_oson_long_fnames: bool,
275) -> Result<()> {
276 match payload {
277 AqPayloadValue::Json(value) => {
278 let image = encode_oson(value, supports_oson_long_fnames)?;
281 crate::vector::write_oson_aq_payload(writer, &image)
282 }
283 AqPayloadValue::Object { oid, image } => write_dbobject_bind(writer, oid, image),
284 AqPayloadValue::Raw(bytes) => {
285 writer.write_raw(bytes);
286 Ok(())
287 }
288 }
289}
290
291pub fn build_aq_enq_payload(
298 queue: &AqQueueDesc,
299 props: &AqMsgProps,
300 enq_options: &AqEnqOptions,
301 seq_num: u8,
302 ttc_field_version: u8,
303 supports_oson_long_fnames: bool,
304) -> Result<Vec<u8>> {
305 let payload = props
306 .payload
307 .as_ref()
308 .ok_or(ProtocolError::TtcDecode("AQ enqueue has no payload"))?;
309 let queue_name = queue.name.as_bytes();
310 let mut writer = TtcWriter::new();
311 write_aq_function_code(&mut writer, TNS_FUNC_AQ_ENQ, seq_num, ttc_field_version);
312 writer.write_u8(1); writer.write_ub4(queue_name.len() as u32);
314 write_msg_props(&mut writer, props, ttc_field_version)?;
315 match props.recipients.as_ref() {
316 None => {
317 writer.write_u8(0); writer.write_ub4(0); }
320 Some(recipients) => {
321 writer.write_u8(1);
322 writer.write_ub4(3 * recipients.len() as u32);
323 }
324 }
325 writer.write_ub4(enq_options.visibility);
326 writer.write_u8(0); writer.write_ub4(0); writer.write_ub4(0); writer.write_u8(1); writer.write_ub4(16); writer.write_ub2(TNS_AQ_MESSAGE_VERSION);
332 match queue.kind {
333 AqPayloadKind::Json => {
334 writer.write_u8(0); writer.write_u8(0); writer.write_ub4(0); }
338 AqPayloadKind::Object => {
339 writer.write_u8(1); writer.write_u8(0); writer.write_ub4(0); }
343 AqPayloadKind::Raw => {
344 let raw_len = match payload {
345 AqPayloadValue::Raw(bytes) => bytes.len() as u32,
346 _ => return Err(ProtocolError::TtcDecode("RAW queue requires RAW payload")),
347 };
348 writer.write_u8(0); writer.write_u8(1); writer.write_ub4(raw_len);
351 }
352 }
353 writer.write_u8(1); writer.write_ub4(TNS_AQ_MESSAGE_ID_LENGTH as u32);
355 let mut enq_flags = 0u32;
356 if enq_options.delivery_mode == TNS_AQ_MSG_BUFFERED {
357 enq_flags |= TNS_KPD_AQ_BUFMSG;
358 }
359 writer.write_ub4(enq_flags); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_u8(0); if ttc_field_version >= TNS_CCAP_FIELD_VERSION_20_1 {
378 writer.write_u8(u8::from(queue.kind == AqPayloadKind::Json));
380 }
381
382 writer.write_bytes_with_length(queue_name)?;
383 if let Some(recipients) = props.recipients.as_ref() {
384 write_recipients(&mut writer, recipients)?;
385 }
386 writer.write_raw(&queue.payload_toid);
387 write_payload(&mut writer, payload, supports_oson_long_fnames)?;
388 Ok(writer.into_bytes())
389}
390
391pub fn parse_aq_enq_response(
394 payload: &[u8],
395 capabilities: ClientCapabilities,
396) -> Result<Option<Vec<u8>>> {
397 let mut reader = TtcReader::new(payload);
398 let mut msgid: Option<Vec<u8>> = None;
399 while reader.remaining() > 0 {
400 let message_type = reader.read_u8()?;
401 match message_type {
402 0 => {}
403 TNS_MSG_TYPE_PARAMETER => {
404 let id = reader.read_raw(TNS_AQ_MESSAGE_ID_LENGTH)?.to_vec();
405 let _ext_len = reader.read_ub2()?;
406 msgid = Some(id);
407 }
408 TNS_MSG_TYPE_STATUS => {
409 let _call_status = reader.read_ub4()?;
410 let _seq = reader.read_ub2()?;
411 }
412 TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
413 let _ = skip_server_side_piggyback(&mut reader)?;
414 }
415 TNS_MSG_TYPE_END_OF_RESPONSE => break,
416 TNS_MSG_TYPE_ERROR => {
417 let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
418 if info.number != 0 {
419 return Err(ProtocolError::ServerErrorInfo(Box::new(
420 info.into_details(),
421 )));
422 }
423 }
424 _ => {
425 return Err(ProtocolError::UnknownMessageType {
426 message_type,
427 position: reader.position().saturating_sub(1),
428 })
429 }
430 }
431 }
432 Ok(msgid)
433}
434
435pub fn build_aq_deq_payload(
442 queue: &AqQueueDesc,
443 deq_options: &AqDeqOptions,
444 seq_num: u8,
445 ttc_field_version: u8,
446) -> Result<Vec<u8>> {
447 let queue_name = queue.name.as_bytes();
448 let mut writer = TtcWriter::new();
449 write_aq_function_code(&mut writer, TNS_FUNC_AQ_DEQ, seq_num, ttc_field_version);
450 writer.write_u8(1); writer.write_ub4(queue_name.len() as u32);
452 writer.write_u8(1); writer.write_u8(1); writer.write_u8(1); writer.write_u8(1); let consumer_name = deq_options
457 .consumer_name
458 .as_ref()
459 .filter(|name| !name.is_empty());
460 match consumer_name {
461 Some(name) => {
462 writer.write_u8(1);
463 writer.write_ub4(name.len() as u32);
464 }
465 None => {
466 writer.write_u8(0);
467 writer.write_ub4(0);
468 }
469 }
470 writer.write_sb4(deq_options.mode);
471 writer.write_sb4(deq_options.navigation);
472 writer.write_sb4(deq_options.visibility);
473 writer.write_sb4(deq_options.wait as i32);
474 let msgid = deq_options.msgid.as_ref().filter(|id| !id.is_empty());
475 match msgid {
476 Some(_) => {
477 writer.write_u8(1);
478 writer.write_ub4(TNS_AQ_MESSAGE_ID_LENGTH as u32);
479 }
480 None => {
481 writer.write_u8(0);
482 writer.write_ub4(0);
483 }
484 }
485 let correlation = deq_options.correlation.as_ref().filter(|c| !c.is_empty());
486 match correlation {
487 Some(c) => {
488 writer.write_u8(1);
489 writer.write_ub4(c.len() as u32);
490 }
491 None => {
492 writer.write_u8(0);
493 writer.write_ub4(0);
494 }
495 }
496 writer.write_u8(1); writer.write_ub4(16); writer.write_ub2(TNS_AQ_MESSAGE_VERSION);
499 writer.write_u8(1); writer.write_u8(1); writer.write_ub4(TNS_AQ_MESSAGE_ID_LENGTH as u32);
502 let mut deq_flags = 0u32;
503 match deq_options.delivery_mode {
504 TNS_AQ_MSG_BUFFERED => deq_flags |= TNS_KPD_AQ_BUFMSG,
505 TNS_AQ_MSG_PERSISTENT_OR_BUFFERED => deq_flags |= TNS_KPD_AQ_EITHER,
506 _ => {}
507 }
508 writer.write_ub4(deq_flags);
509 let condition = deq_options.condition.as_ref().filter(|c| !c.is_empty());
510 match condition {
511 Some(c) => {
512 writer.write_u8(1);
513 writer.write_ub4(c.len() as u32);
514 }
515 None => {
516 writer.write_u8(0);
517 writer.write_ub4(0);
518 }
519 }
520 writer.write_u8(0); writer.write_ub4(0); if ttc_field_version >= TNS_CCAP_FIELD_VERSION_20_1 {
523 writer.write_u8(0); }
525 if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
526 writer.write_ub4(0xFFFF_FFFF); }
528
529 writer.write_bytes_with_length(queue_name)?;
530 if let Some(name) = consumer_name {
531 writer.write_bytes_with_length(name.as_bytes())?;
532 }
533 if let Some(id) = msgid {
534 let mut id = id.clone();
535 id.truncate(16);
536 if id.len() < 16 {
537 id.resize(16, 0);
538 }
539 writer.write_raw(&id);
540 }
541 if let Some(c) = correlation {
542 writer.write_bytes_with_length(c.as_bytes())?;
543 }
544 writer.write_raw(&queue.payload_toid);
545 if let Some(c) = condition {
546 writer.write_bytes_with_length(c.as_bytes())?;
547 }
548 Ok(writer.into_bytes())
549}
550
551#[derive(Clone, Debug, Default)]
553pub struct AqDeqResult {
554 pub message: Option<AqDeqMessage>,
556}
557
558pub fn parse_aq_deq_response(
561 payload: &[u8],
562 capabilities: ClientCapabilities,
563 kind: &AqPayloadKind,
564) -> Result<AqDeqResult> {
565 let mut reader = TtcReader::new(payload);
566 let mut result = AqDeqResult::default();
567 let mut no_msg_found = false;
568 while reader.remaining() > 0 {
569 let message_type = reader.read_u8()?;
570 match message_type {
571 0 => {}
572 TNS_MSG_TYPE_PARAMETER => {
573 let num_bytes = reader.read_ub4()?;
574 if num_bytes > 0 {
575 let mut message = AqDeqMessage::default();
576 process_msg_props(&mut reader, &mut message, capabilities.ttc_field_version)?;
577 process_recipients(&mut reader)?;
578 message.payload = process_payload(&mut reader, kind)?;
579 message.msgid = Some(process_msg_id(&mut reader)?);
580 result.message = Some(message);
581 }
582 }
583 TNS_MSG_TYPE_STATUS => {
584 let _call_status = reader.read_ub4()?;
585 let _seq = reader.read_ub2()?;
586 }
587 TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
588 let _ = skip_server_side_piggyback(&mut reader)?;
589 }
590 TNS_MSG_TYPE_END_OF_RESPONSE => break,
591 TNS_MSG_TYPE_ERROR => {
592 let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
593 if info.number == TNS_ERR_NO_MESSAGES_FOUND as u32 {
594 no_msg_found = true;
595 } else if info.number != 0 {
596 return Err(ProtocolError::ServerErrorInfo(Box::new(
597 info.into_details(),
598 )));
599 }
600 }
601 _ => {
602 return Err(ProtocolError::UnknownMessageType {
603 message_type,
604 position: reader.position().saturating_sub(1),
605 })
606 }
607 }
608 }
609 if no_msg_found {
610 result.message = None;
611 }
612 Ok(result)
613}
614
615pub fn build_aq_array_enq_payload(
622 queue: &AqQueueDesc,
623 props_list: &[AqMsgProps],
624 enq_options: &AqEnqOptions,
625 seq_num: u8,
626 ttc_field_version: u8,
627 supports_oson_long_fnames: bool,
628) -> Result<Vec<u8>> {
629 let num_iters = props_list.len() as u32;
630 let queue_name = queue.name.as_bytes();
631 let mut writer = TtcWriter::new();
632 write_aq_function_code(&mut writer, TNS_FUNC_AQ_ARRAY, seq_num, ttc_field_version);
633 writer.write_u8(0); writer.write_ub4(0); writer.write_ub4(TNS_AQ_ARRAY_FLAGS_RETURN_MESSAGE_ID);
636 writer.write_u8(1); writer.write_u8(0); writer.write_sb4(TNS_AQ_ARRAY_ENQ);
639 writer.write_u8(1); if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
641 writer.write_ub4(0xFFFF); }
643 writer.write_ub4(num_iters);
644
645 let mut flags = 0u32;
646 if enq_options.delivery_mode == TNS_AQ_MSG_BUFFERED {
647 flags |= TNS_KPD_AQ_BUFMSG;
648 }
649 writer.write_ub4(0); writer.write_u8(TNS_MSG_TYPE_ROW_HEADER);
651 writer.write_bytes_with_two_lengths(Some(queue_name))?;
652 writer.write_raw(&queue.payload_toid);
653 writer.write_ub2(TNS_AQ_MESSAGE_VERSION);
654 writer.write_ub4(flags);
655 for props in props_list {
656 let payload = props
657 .payload
658 .as_ref()
659 .ok_or(ProtocolError::TtcDecode("AQ array enqueue has no payload"))?;
660 writer.write_u8(TNS_MSG_TYPE_ROW_DATA);
661 writer.write_ub4(flags); write_msg_props(&mut writer, props, ttc_field_version)?;
663 match props.recipients.as_ref() {
664 None => writer.write_ub4(0),
665 Some(recipients) => {
666 writer.write_ub4(3 * recipients.len() as u32);
667 write_recipients(&mut writer, recipients)?;
668 }
669 }
670 writer.write_sb4(enq_options.visibility as i32);
671 writer.write_ub4(0); writer.write_sb4(0); if matches!(queue.kind, AqPayloadKind::Raw) {
674 let raw_len = match payload {
675 AqPayloadValue::Raw(bytes) => bytes.len() as u32,
676 _ => return Err(ProtocolError::TtcDecode("RAW queue requires RAW payload")),
677 };
678 writer.write_ub4(raw_len);
679 }
680 write_payload(&mut writer, payload, supports_oson_long_fnames)?;
681 }
682 writer.write_u8(TNS_MSG_TYPE_STATUS);
683 Ok(writer.into_bytes())
684}
685
686pub fn build_aq_array_deq_payload(
689 queue: &AqQueueDesc,
690 deq_options: &AqDeqOptions,
691 num_iters: u32,
692 seq_num: u8,
693 ttc_field_version: u8,
694) -> Result<Vec<u8>> {
695 let queue_name = queue.name.as_bytes();
696 let mut writer = TtcWriter::new();
697 write_aq_function_code(&mut writer, TNS_FUNC_AQ_ARRAY, seq_num, ttc_field_version);
698 writer.write_u8(1); writer.write_ub4(num_iters);
700 writer.write_ub4(TNS_AQ_ARRAY_FLAGS_RETURN_MESSAGE_ID);
701 writer.write_u8(1); writer.write_u8(1); writer.write_sb4(TNS_AQ_ARRAY_DEQ);
704 writer.write_u8(0); if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
706 writer.write_ub4(0xFFFF); }
708
709 let mut flags = 0u32;
710 match deq_options.delivery_mode {
711 TNS_AQ_MSG_BUFFERED => flags |= TNS_KPD_AQ_BUFMSG,
712 TNS_AQ_MSG_PERSISTENT_OR_BUFFERED => flags |= TNS_KPD_AQ_EITHER,
713 _ => {}
714 }
715 let consumer_name = deq_options
716 .consumer_name
717 .as_ref()
718 .filter(|name| !name.is_empty())
719 .map(|name| name.as_bytes());
720 let correlation = deq_options
721 .correlation
722 .as_ref()
723 .filter(|c| !c.is_empty())
724 .map(|c| c.as_bytes());
725 let condition = deq_options
726 .condition
727 .as_ref()
728 .filter(|c| !c.is_empty())
729 .map(|c| c.as_bytes());
730 let props = AqMsgProps::default();
731 for _ in 0..num_iters {
732 writer.write_bytes_with_two_lengths(Some(queue_name))?;
733 write_msg_props(&mut writer, &props, ttc_field_version)?;
734 writer.write_ub4(0); write_value_with_length(&mut writer, consumer_name)?;
736 writer.write_sb4(deq_options.mode);
737 writer.write_sb4(deq_options.navigation);
738 writer.write_sb4(deq_options.visibility);
739 writer.write_sb4(deq_options.wait as i32);
740 write_value_with_length(&mut writer, deq_options.msgid.as_deref())?;
741 write_value_with_length(&mut writer, correlation)?;
742 write_value_with_length(&mut writer, condition)?;
743 writer.write_ub4(0); writer.write_ub4(0); writer.write_sb4(0); writer.write_bytes_with_two_lengths(Some(&queue.payload_toid))?;
747 writer.write_ub2(TNS_AQ_MESSAGE_VERSION);
748 writer.write_ub4(0); writer.write_ub4(0); writer.write_ub4(0);
751 writer.write_ub4(flags);
752 writer.write_ub4(0); writer.write_ub4(0); }
755 Ok(writer.into_bytes())
756}
757
758#[derive(Clone, Debug, Default)]
761pub struct AqArrayResult {
762 pub enq_msgids: Vec<Vec<u8>>,
764 pub deq_messages: Vec<AqDeqMessage>,
766}
767
768pub fn parse_aq_array_response(
774 payload: &[u8],
775 capabilities: ClientCapabilities,
776 operation: i32,
777 props_count: u32,
778 kind: &AqPayloadKind,
779) -> Result<AqArrayResult> {
780 let mut reader = TtcReader::new(payload);
781 let mut result = AqArrayResult::default();
782 let mut messages: Vec<AqDeqMessage> = Vec::new();
783 let mut enq_msgid_blob: Option<Vec<u8>> = None;
784 let mut response_num_iters: u32 = 0;
785 let mut no_msg_found = false;
786 while reader.remaining() > 0 {
787 let message_type = reader.read_u8()?;
788 match message_type {
789 0 => {}
790 TNS_MSG_TYPE_PARAMETER => {
791 let num_iters = reader.read_ub4()?;
792 response_num_iters = num_iters;
793 for i in 0..num_iters {
794 let mut message = AqDeqMessage::default();
795 let props_len = reader.read_ub2()?;
796 if props_len > 0 {
797 reader.read_u8()?; process_msg_props(
799 &mut reader,
800 &mut message,
801 capabilities.ttc_field_version,
802 )?;
803 }
804 process_recipients(&mut reader)?;
805 let payload_len = reader.read_ub2()?;
806 if payload_len > 0 {
807 message.payload = process_payload(&mut reader, kind)?;
808 }
809 let msgid = reader.read_bytes_with_length()?.unwrap_or_default();
810 if operation == TNS_AQ_ARRAY_ENQ {
811 enq_msgid_blob = Some(msgid);
812 } else {
813 message.msgid = Some(msgid);
814 }
815 let ext_len = reader.read_ub2()?;
816 if ext_len > 0 {
817 return Err(ProtocolError::UnsupportedFeature("AQ array extensions"));
818 }
819 let _output_ack = reader.read_ub2()?;
820 if operation != TNS_AQ_ARRAY_ENQ {
821 let _ = i;
822 messages.push(message);
823 }
824 }
825 if operation == TNS_AQ_ARRAY_ENQ {
826 response_num_iters = reader.read_ub4()?;
827 }
828 }
829 TNS_MSG_TYPE_STATUS => {
830 let _call_status = reader.read_ub4()?;
831 let _seq = reader.read_ub2()?;
832 }
833 TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
834 let _ = skip_server_side_piggyback(&mut reader)?;
835 }
836 TNS_MSG_TYPE_END_OF_RESPONSE => break,
837 TNS_MSG_TYPE_ERROR => {
838 let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
839 if info.number == TNS_ERR_NO_MESSAGES_FOUND as u32 {
840 no_msg_found = true;
841 } else if info.number != 0 {
842 return Err(ProtocolError::ServerErrorInfo(Box::new(
843 info.into_details(),
844 )));
845 }
846 }
847 _ => {
848 return Err(ProtocolError::UnknownMessageType {
849 message_type,
850 position: reader.position().saturating_sub(1),
851 })
852 }
853 }
854 }
855 if operation == TNS_AQ_ARRAY_ENQ {
856 if let Some(blob) = enq_msgid_blob {
857 let count = props_count as usize;
858 result.enq_msgids = (0..count)
859 .map(|j| {
860 let start = j * 16;
861 let end = start + 16;
862 blob.get(start..end).map(<[u8]>::to_vec).unwrap_or_default()
863 })
864 .collect();
865 }
866 } else if no_msg_found {
867 result.deq_messages = Vec::new();
868 } else {
869 let keep = response_num_iters as usize;
870 messages.truncate(keep);
871 result.deq_messages = messages;
872 }
873 Ok(result)
874}
875
876fn process_msg_props(
881 reader: &mut TtcReader<'_>,
882 message: &mut AqDeqMessage,
883 ttc_field_version: u8,
884) -> Result<()> {
885 message.priority = reader.read_sb4()?;
886 message.delay = reader.read_sb4()?;
887 message.expiration = reader.read_sb4()?;
888 message.correlation = reader.read_string_with_length()?;
889 message.num_attempts = reader.read_sb4()?;
890 message.exception_queue = reader.read_string_with_length()?;
891 message.state = reader.read_sb4()?;
892 message.enq_time = process_date(reader)?;
893 let _enq_txn_id = reader.read_bytes_with_length()?;
894 process_extensions(reader)?;
895 let user_props = reader.read_ub4()?;
896 if user_props > 0 {
897 return Err(ProtocolError::UnsupportedFeature("AQ user properties"));
898 }
899 let _csn = reader.read_ub4()?;
900 let _dsn = reader.read_ub4()?;
901 let flags = reader.read_ub4()?;
902 message.delivery_mode = if flags == TNS_KPD_AQ_BUFMSG {
903 TNS_AQ_MSG_BUFFERED
904 } else {
905 TNS_AQ_MSG_PERSISTENT
906 };
907 if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
908 let _shard = reader.read_ub4()?;
909 }
910 Ok(())
911}
912
913fn process_date(reader: &mut TtcReader<'_>) -> Result<Option<QueryValue>> {
917 let num_bytes = reader.read_ub4()?;
918 if num_bytes == 0 {
919 return Ok(None);
920 }
921 let len = usize::from(reader.read_u8()?);
922 if len == 0 {
923 return Ok(None);
924 }
925 let bytes = reader.read_raw(len)?;
926 Ok(Some(decode_datetime_value(bytes)?))
927}
928
929fn process_extensions(reader: &mut TtcReader<'_>) -> Result<()> {
930 let num_extensions = reader.read_ub4()?;
931 if num_extensions > 0 {
932 reader.read_u8()?; for _ in 0..num_extensions {
934 let _text = reader.read_bytes_with_length()?;
935 let _binary = reader.read_bytes_with_length()?;
936 let _keyword = reader.read_ub2()?;
937 }
938 }
939 Ok(())
940}
941
942fn process_recipients(reader: &mut TtcReader<'_>) -> Result<()> {
943 let count = reader.read_ub4()?;
944 if count > 0 {
945 return Err(ProtocolError::UnsupportedFeature(
946 "AQ recipients on dequeue",
947 ));
948 }
949 Ok(())
950}
951
952fn process_msg_id(reader: &mut TtcReader<'_>) -> Result<Vec<u8>> {
953 Ok(reader.read_raw(TNS_AQ_MESSAGE_ID_LENGTH)?.to_vec())
954}
955
956fn process_payload(
958 reader: &mut TtcReader<'_>,
959 kind: &AqPayloadKind,
960) -> Result<Option<AqDeqPayload>> {
961 if matches!(kind, AqPayloadKind::Object) {
962 let _toid = reader.read_bytes_with_length()?;
966 let _oid = reader.read_bytes_with_length()?;
967 let _snapshot = reader.read_bytes_with_length()?;
968 let _version = reader.read_ub2()?;
969 let image_length = reader.read_ub4()?;
970 let _flags = reader.read_ub2()?;
971 if image_length == 0 {
972 return Ok(None);
973 }
974 let image = reader
975 .read_bytes()?
976 .ok_or(ProtocolError::TtcDecode("AQ object payload missing"))?;
977 return Ok(Some(AqDeqPayload::Object(image)));
978 }
979 let _toid = reader.read_bytes_with_length()?;
981 let _oid = reader.read_bytes_with_length()?;
982 let _snapshot = reader.read_bytes_with_length()?;
983 let _version = reader.read_ub2()?;
984 let image_length = reader.read_ub4()? as usize;
985 let _flags = reader.read_ub2()?;
986 if image_length > 0 {
987 let raw = reader
989 .read_bytes()?
990 .ok_or(ProtocolError::TtcDecode("AQ payload missing"))?;
991 let end = image_length.min(raw.len());
992 let start = 4.min(end);
993 let payload = raw.get(start..end).unwrap_or_default().to_vec();
994 if matches!(kind, AqPayloadKind::Json) {
995 let value = decode_oson(&payload)?;
996 return Ok(Some(AqDeqPayload::Json(value)));
997 }
998 return Ok(Some(AqDeqPayload::Raw(payload)));
999 }
1000 if matches!(kind, AqPayloadKind::Raw) {
1001 return Ok(Some(AqDeqPayload::Raw(Vec::new())));
1002 }
1003 Ok(None)
1004}
1005
1006#[cfg(test)]
1007mod tests {
1008 use super::*;
1009
1010 const FV: u8 = 24;
1013
1014 fn caps() -> ClientCapabilities {
1015 ClientCapabilities {
1016 ttc_field_version: FV,
1017 max_string_size: 32767,
1018 charset_id: 873,
1019 }
1020 }
1021
1022 const GOLDEN_RAW_ENQ: &[u8] = &[
1027 0x03, 0x79, 0x04, 0x00, 0x01, 0x01, 0x0e, 0x01, 0x02, 0x00, 0x81, 0x01, 0x01, 0x05, 0x05,
1028 0x43, 0x4f, 0x52, 0x52, 0x31, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x04, 0x0e, 0x00, 0x00,
1029 0x01, 0x40, 0x00, 0x00, 0x01, 0x41, 0x00, 0x01, 0x01, 0x01, 0x00, 0x01, 0x42, 0x00, 0x00,
1030 0x01, 0x45, 0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff, 0xff, 0x01, 0x00, 0x01, 0x02,
1031 0x00, 0x00, 0x00, 0x01, 0x01, 0x10, 0x01, 0x01, 0x00, 0x01, 0x01, 0x11, 0x01, 0x01, 0x10,
1032 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1033 0x00, 0x00, 0x00, 0x00, 0x0e, 0x54, 0x45, 0x53, 0x54, 0x5f, 0x52, 0x41, 0x57, 0x5f, 0x51,
1034 0x55, 0x45, 0x55, 0x45, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1035 0x00, 0x00, 0x00, 0x00, 0x17, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x20, 0x72, 0x61, 0x77,
1036 0x20, 0x64, 0x61, 0x74, 0x61, 0x20, 0x31,
1037 ];
1038
1039 const GOLDEN_RAW_DEQ: &[u8] = &[
1042 0x03, 0x7a, 0x06, 0x00, 0x01, 0x01, 0x0e, 0x01, 0x01, 0x01, 0x01, 0x00, 0x00, 0x01, 0x03,
1043 0x01, 0x01, 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x10, 0x01, 0x01, 0x01,
1044 0x01, 0x01, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff, 0xff, 0x0e,
1045 0x54, 0x45, 0x53, 0x54, 0x5f, 0x52, 0x41, 0x57, 0x5f, 0x51, 0x55, 0x45, 0x55, 0x45, 0x00,
1046 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x17,
1047 ];
1048
1049 #[test]
1050 fn raw_enqueue_request_matches_golden() {
1051 let queue = AqQueueDesc::new("TEST_RAW_QUEUE".to_string(), AqPayloadKind::Raw, None);
1052 let props = AqMsgProps {
1053 priority: 2,
1054 correlation: Some("CORR1".to_string()),
1055 payload: Some(AqPayloadValue::Raw(b"sample raw data 1".to_vec())),
1056 ..AqMsgProps::default()
1057 };
1058 let bytes = build_aq_enq_payload(&queue, &props, &AqEnqOptions::default(), 4, FV, false)
1059 .expect("build enqueue");
1060 assert_eq!(bytes, GOLDEN_RAW_ENQ);
1061 }
1062
1063 #[test]
1064 fn raw_dequeue_request_matches_golden() {
1065 let queue = AqQueueDesc::new("TEST_RAW_QUEUE".to_string(), AqPayloadKind::Raw, None);
1066 let deq = AqDeqOptions {
1067 wait: 0,
1068 navigation: 1,
1069 ..AqDeqOptions::default()
1070 };
1071 let bytes = build_aq_deq_payload(&queue, &deq, 6, FV).expect("build dequeue");
1072 assert_eq!(bytes, GOLDEN_RAW_DEQ);
1073 }
1074
1075 #[test]
1076 fn empty_queue_dequeue_yields_no_message() {
1077 let caps = caps();
1081 let res = parse_aq_deq_response(&[], caps, &AqPayloadKind::Raw).expect("parse");
1082 assert!(res.message.is_none());
1083 }
1084
1085 const GOLDEN_JSON_ENQ: &[u8] = &[
1089 0x03, 0x79, 0x04, 0x00, 0x01, 0x01, 0x0f, 0x00, 0x00, 0x81, 0x01, 0x00, 0x00, 0x00, 0x00,
1090 0x00, 0x00, 0x01, 0x04, 0x0e, 0x00, 0x00, 0x01, 0x40, 0x00, 0x00, 0x01, 0x41, 0x00, 0x01,
1091 0x01, 0x01, 0x00, 0x01, 0x42, 0x00, 0x00, 0x01, 0x45, 0x00, 0x00, 0x00, 0x00, 0x04, 0xff,
1092 0xff, 0xff, 0xff, 0x01, 0x00, 0x01, 0x02, 0x00, 0x00, 0x00, 0x01, 0x01, 0x10, 0x01, 0x01,
1093 0x00, 0x00, 0x00, 0x01, 0x01, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1094 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x0f, 0x54, 0x45, 0x53, 0x54,
1095 0x5f, 0x4a, 0x53, 0x4f, 0x4e, 0x5f, 0x51, 0x55, 0x45, 0x55, 0x45, 0x00, 0x00, 0x00, 0x00,
1096 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x47, 0x01, 0x28, 0x00,
1097 0x26, 0x00, 0x04, 0x61, 0x08, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1098 0x00, 0x43, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
1099 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x43, 0xff, 0x4a, 0x5a, 0x01, 0x21,
1100 0x02, 0x03, 0x00, 0x0e, 0x00, 0x1f, 0x00, 0x00, 0x42, 0x9c, 0xe6, 0x00, 0x09, 0x00, 0x05,
1101 0x00, 0x00, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x03, 0x61, 0x67, 0x65, 0x04, 0x63, 0x69, 0x74,
1102 0x79, 0xa4, 0x03, 0x03, 0x02, 0x01, 0x00, 0x00, 0x00, 0x11, 0x00, 0x00, 0x00, 0x17, 0x00,
1103 0x00, 0x00, 0x1b, 0x33, 0x04, 0x4a, 0x6f, 0x68, 0x6e, 0x34, 0x02, 0xc1, 0x1f, 0x33, 0x02,
1104 0x4e, 0x59,
1105 ];
1106
1107 #[test]
1108 fn json_enqueue_request_matches_golden() {
1109 let queue = AqQueueDesc::new("TEST_JSON_QUEUE".to_string(), AqPayloadKind::Json, None);
1110 let value = OsonValue::Object(vec![
1112 ("name".to_string(), OsonValue::String("John".to_string())),
1113 ("age".to_string(), OsonValue::Number("30".to_string())),
1114 ("city".to_string(), OsonValue::String("NY".to_string())),
1115 ]);
1116 let props = AqMsgProps {
1117 payload: Some(AqPayloadValue::Json(value)),
1118 ..AqMsgProps::default()
1119 };
1120 let bytes = build_aq_enq_payload(&queue, &props, &AqEnqOptions::default(), 4, FV, true)
1122 .expect("build json enqueue");
1123 assert_eq!(bytes, GOLDEN_JSON_ENQ);
1124 }
1125}