#![forbid(unsafe_code)]
use super::*;
use crate::oson::{decode_oson_with_limits, encode_oson, OsonValue};
use crate::wire::ProtocolLimits;
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum AqPayloadKind {
Raw,
Json,
Object,
}
#[derive(Clone, Debug)]
pub struct AqQueueDesc {
pub name: String,
pub kind: AqPayloadKind,
pub payload_toid: Vec<u8>,
}
impl AqQueueDesc {
pub fn new(name: String, kind: AqPayloadKind, object_oid: Option<Vec<u8>>) -> Self {
let payload_toid = match kind {
AqPayloadKind::Raw => raw_payload_toid(),
AqPayloadKind::Json => json_payload_toid(),
AqPayloadKind::Object => object_oid.unwrap_or_default(),
};
Self {
name,
kind,
payload_toid,
}
}
}
fn raw_payload_toid() -> Vec<u8> {
let mut toid = vec![0u8; 15];
toid.push(0x17);
toid
}
fn json_payload_toid() -> Vec<u8> {
let mut toid = vec![0u8; 15];
toid.push(0x47);
toid
}
#[derive(Clone, Debug)]
pub enum AqPayloadValue {
Raw(Vec<u8>),
Json(OsonValue),
Object { oid: Vec<u8>, image: Vec<u8> },
}
#[derive(Clone, Debug)]
pub struct AqMsgProps {
pub priority: i32,
pub delay: i32,
pub expiration: i32,
pub correlation: Option<String>,
pub exception_queue: Option<String>,
pub state: i32,
pub enq_txn_id: Option<Vec<u8>>,
pub recipients: Option<Vec<String>>,
pub payload: Option<AqPayloadValue>,
}
impl Default for AqMsgProps {
fn default() -> Self {
Self {
priority: 0,
delay: 0,
expiration: -1,
correlation: None,
exception_queue: None,
state: 0,
enq_txn_id: None,
recipients: Some(Vec::new()),
payload: None,
}
}
}
#[derive(Clone, Debug)]
pub struct AqEnqOptions {
pub visibility: u32,
pub delivery_mode: u16,
}
impl Default for AqEnqOptions {
fn default() -> Self {
Self {
visibility: 2,
delivery_mode: TNS_AQ_MSG_PERSISTENT,
}
}
}
#[derive(Clone, Debug)]
pub struct AqDeqOptions {
pub condition: Option<String>,
pub consumer_name: Option<String>,
pub correlation: Option<String>,
pub delivery_mode: u16,
pub mode: i32,
pub msgid: Option<Vec<u8>>,
pub navigation: i32,
pub visibility: i32,
pub wait: u32,
}
impl Default for AqDeqOptions {
fn default() -> Self {
Self {
condition: None,
consumer_name: None,
correlation: None,
delivery_mode: TNS_AQ_MSG_PERSISTENT,
mode: 3,
msgid: None,
navigation: 3,
visibility: 2,
wait: 0xFFFF_FFFF,
}
}
}
#[derive(Clone, Debug, Default)]
pub struct AqDeqMessage {
pub priority: i32,
pub delay: i32,
pub expiration: i32,
pub correlation: Option<String>,
pub num_attempts: i32,
pub exception_queue: Option<String>,
pub state: i32,
pub enq_time: Option<QueryValue>,
pub delivery_mode: u16,
pub msgid: Option<Vec<u8>>,
pub payload: Option<AqDeqPayload>,
}
#[derive(Clone, Debug)]
pub enum AqDeqPayload {
Raw(Vec<u8>),
Json(OsonValue),
Object(Vec<u8>),
}
fn write_aq_function_code(
writer: &mut TtcWriter,
function_code: u8,
seq_num: u8,
ttc_field_version: u8,
) {
writer.write_function_code_with_seq(function_code, seq_num);
if ttc_field_version >= TNS_CCAP_FIELD_VERSION_23_1_EXT_1 {
writer.write_ub8(0); }
}
fn write_value_with_length(writer: &mut TtcWriter, value: Option<&[u8]>) -> Result<()> {
match value {
None => {
writer.write_ub4(0);
Ok(())
}
Some(bytes) => writer.write_bytes_with_two_lengths(Some(bytes)),
}
}
fn write_msg_props(
writer: &mut TtcWriter,
props: &AqMsgProps,
ttc_field_version: u8,
) -> Result<()> {
writer.write_ub4(props.priority as u32);
writer.write_ub4(props.delay as u32);
writer.write_sb4(props.expiration);
write_value_with_length(writer, props.correlation.as_deref().map(str::as_bytes))?;
writer.write_ub4(0); write_value_with_length(writer, props.exception_queue.as_deref().map(str::as_bytes))?;
writer.write_ub4(props.state as u32);
writer.write_ub4(0); write_value_with_length(writer, props.enq_txn_id.as_deref())?;
writer.write_ub4(4); writer.write_u8(0x0e); writer.write_keyword_value_pair(None, None, TNS_AQ_EXT_KEYWORD_AGENT_NAME)?;
writer.write_keyword_value_pair(None, None, TNS_AQ_EXT_KEYWORD_AGENT_ADDRESS)?;
writer.write_keyword_value_pair(None, Some(b"\x00"), TNS_AQ_EXT_KEYWORD_AGENT_PROTOCOL)?;
writer.write_keyword_value_pair(None, None, TNS_AQ_EXT_KEYWORD_ORIGINAL_MSGID)?;
writer.write_ub4(0); writer.write_ub4(0); writer.write_ub4(0); writer.write_ub4(0); if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
writer.write_ub4(0xFFFF_FFFF); }
Ok(())
}
fn write_recipients(writer: &mut TtcWriter, recipients: &[String]) -> Result<()> {
let mut index: u16 = 0;
for recipient in recipients {
writer.write_keyword_value_pair(Some(recipient.as_bytes()), None, index)?;
writer.write_keyword_value_pair(None, None, index + 1)?;
writer.write_keyword_value_pair(None, Some(b"\x00"), index + 2)?;
index += 3;
}
Ok(())
}
fn write_payload(
writer: &mut TtcWriter,
payload: &AqPayloadValue,
supports_oson_long_fnames: bool,
) -> Result<()> {
match payload {
AqPayloadValue::Json(value) => {
let image = encode_oson(value, supports_oson_long_fnames)?;
crate::vector::write_oson_aq_payload(writer, &image)
}
AqPayloadValue::Object { oid, image } => write_dbobject_bind(writer, oid, image),
AqPayloadValue::Raw(bytes) => {
writer.write_raw(bytes);
Ok(())
}
}
}
pub fn build_aq_enq_payload(
queue: &AqQueueDesc,
props: &AqMsgProps,
enq_options: &AqEnqOptions,
seq_num: u8,
ttc_field_version: u8,
supports_oson_long_fnames: bool,
) -> Result<Vec<u8>> {
let payload = props
.payload
.as_ref()
.ok_or(ProtocolError::TtcDecode("AQ enqueue has no payload"))?;
let queue_name = queue.name.as_bytes();
let mut writer = TtcWriter::new();
write_aq_function_code(&mut writer, TNS_FUNC_AQ_ENQ, seq_num, ttc_field_version);
writer.write_u8(1); writer.write_ub4(queue_name.len() as u32);
write_msg_props(&mut writer, props, ttc_field_version)?;
match props.recipients.as_ref() {
None => {
writer.write_u8(0); writer.write_ub4(0); }
Some(recipients) => {
writer.write_u8(1);
writer.write_ub4(3 * recipients.len() as u32);
}
}
writer.write_ub4(enq_options.visibility);
writer.write_u8(0); writer.write_ub4(0); writer.write_ub4(0); writer.write_u8(1); writer.write_ub4(16); writer.write_ub2(TNS_AQ_MESSAGE_VERSION);
match queue.kind {
AqPayloadKind::Json => {
writer.write_u8(0); writer.write_u8(0); writer.write_ub4(0); }
AqPayloadKind::Object => {
writer.write_u8(1); writer.write_u8(0); writer.write_ub4(0); }
AqPayloadKind::Raw => {
let raw_len = match payload {
AqPayloadValue::Raw(bytes) => bytes.len() as u32,
_ => return Err(ProtocolError::TtcDecode("RAW queue requires RAW payload")),
};
writer.write_u8(0); writer.write_u8(1); writer.write_ub4(raw_len);
}
}
writer.write_u8(1); writer.write_ub4(TNS_AQ_MESSAGE_ID_LENGTH as u32);
let mut enq_flags = 0u32;
if enq_options.delivery_mode == TNS_AQ_MSG_BUFFERED {
enq_flags |= TNS_KPD_AQ_BUFMSG;
}
writer.write_ub4(enq_flags); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_ub4(0); writer.write_u8(0); writer.write_u8(0); if ttc_field_version >= TNS_CCAP_FIELD_VERSION_20_1 {
writer.write_u8(u8::from(queue.kind == AqPayloadKind::Json));
}
writer.write_bytes_with_length(queue_name)?;
if let Some(recipients) = props.recipients.as_ref() {
write_recipients(&mut writer, recipients)?;
}
writer.write_raw(&queue.payload_toid);
write_payload(&mut writer, payload, supports_oson_long_fnames)?;
Ok(writer.into_bytes())
}
pub fn parse_aq_enq_response(
payload: &[u8],
capabilities: ClientCapabilities,
) -> Result<Option<Vec<u8>>> {
parse_aq_enq_response_with_limits(payload, capabilities, ProtocolLimits::DEFAULT)
}
pub fn parse_aq_enq_response_with_limits(
payload: &[u8],
capabilities: ClientCapabilities,
limits: ProtocolLimits,
) -> Result<Option<Vec<u8>>> {
let mut reader = TtcReader::with_limits(payload, limits)?;
let mut msgid: Option<Vec<u8>> = None;
while reader.remaining() > 0 {
let message_type = reader.read_u8()?;
match message_type {
0 => {}
TNS_MSG_TYPE_PARAMETER => {
let id = reader.read_raw(TNS_AQ_MESSAGE_ID_LENGTH)?.to_vec();
let _ext_len = reader.read_ub2()?;
msgid = Some(id);
}
TNS_MSG_TYPE_STATUS => {
let _call_status = reader.read_ub4()?;
let _seq = reader.read_ub2()?;
}
TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
let _ = skip_server_side_piggyback(&mut reader)?;
}
TNS_MSG_TYPE_END_OF_RESPONSE => break,
TNS_MSG_TYPE_ERROR => {
let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
if info.number != 0 {
return Err(ProtocolError::ServerErrorInfo(Box::new(
info.into_details(),
)));
}
}
_ => {
return Err(ProtocolError::UnknownMessageType {
message_type,
position: reader.position().saturating_sub(1),
})
}
}
}
Ok(msgid)
}
pub fn build_aq_deq_payload(
queue: &AqQueueDesc,
deq_options: &AqDeqOptions,
seq_num: u8,
ttc_field_version: u8,
) -> Result<Vec<u8>> {
let queue_name = queue.name.as_bytes();
let mut writer = TtcWriter::new();
write_aq_function_code(&mut writer, TNS_FUNC_AQ_DEQ, seq_num, ttc_field_version);
writer.write_u8(1); writer.write_ub4(queue_name.len() as u32);
writer.write_u8(1); writer.write_u8(1); writer.write_u8(1); writer.write_u8(1); let consumer_name = deq_options
.consumer_name
.as_ref()
.filter(|name| !name.is_empty());
match consumer_name {
Some(name) => {
writer.write_u8(1);
writer.write_ub4(name.len() as u32);
}
None => {
writer.write_u8(0);
writer.write_ub4(0);
}
}
writer.write_sb4(deq_options.mode);
writer.write_sb4(deq_options.navigation);
writer.write_sb4(deq_options.visibility);
writer.write_sb4(deq_options.wait as i32);
let msgid = deq_options.msgid.as_ref().filter(|id| !id.is_empty());
match msgid {
Some(_) => {
writer.write_u8(1);
writer.write_ub4(TNS_AQ_MESSAGE_ID_LENGTH as u32);
}
None => {
writer.write_u8(0);
writer.write_ub4(0);
}
}
let correlation = deq_options.correlation.as_ref().filter(|c| !c.is_empty());
match correlation {
Some(c) => {
writer.write_u8(1);
writer.write_ub4(c.len() as u32);
}
None => {
writer.write_u8(0);
writer.write_ub4(0);
}
}
writer.write_u8(1); writer.write_ub4(16); writer.write_ub2(TNS_AQ_MESSAGE_VERSION);
writer.write_u8(1); writer.write_u8(1); writer.write_ub4(TNS_AQ_MESSAGE_ID_LENGTH as u32);
let mut deq_flags = 0u32;
match deq_options.delivery_mode {
TNS_AQ_MSG_BUFFERED => deq_flags |= TNS_KPD_AQ_BUFMSG,
TNS_AQ_MSG_PERSISTENT_OR_BUFFERED => deq_flags |= TNS_KPD_AQ_EITHER,
_ => {}
}
writer.write_ub4(deq_flags);
let condition = deq_options.condition.as_ref().filter(|c| !c.is_empty());
match condition {
Some(c) => {
writer.write_u8(1);
writer.write_ub4(c.len() as u32);
}
None => {
writer.write_u8(0);
writer.write_ub4(0);
}
}
writer.write_u8(0); writer.write_ub4(0); if ttc_field_version >= TNS_CCAP_FIELD_VERSION_20_1 {
writer.write_u8(0); }
if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
writer.write_ub4(0xFFFF_FFFF); }
writer.write_bytes_with_length(queue_name)?;
if let Some(name) = consumer_name {
writer.write_bytes_with_length(name.as_bytes())?;
}
if let Some(id) = msgid {
let mut id = id.clone();
id.truncate(16);
if id.len() < 16 {
id.resize(16, 0);
}
writer.write_raw(&id);
}
if let Some(c) = correlation {
writer.write_bytes_with_length(c.as_bytes())?;
}
writer.write_raw(&queue.payload_toid);
if let Some(c) = condition {
writer.write_bytes_with_length(c.as_bytes())?;
}
Ok(writer.into_bytes())
}
#[derive(Clone, Debug, Default)]
pub struct AqDeqResult {
pub message: Option<AqDeqMessage>,
}
pub fn parse_aq_deq_response(
payload: &[u8],
capabilities: ClientCapabilities,
kind: &AqPayloadKind,
) -> Result<AqDeqResult> {
parse_aq_deq_response_with_limits(payload, capabilities, kind, ProtocolLimits::DEFAULT)
}
pub fn parse_aq_deq_response_with_limits(
payload: &[u8],
capabilities: ClientCapabilities,
kind: &AqPayloadKind,
limits: ProtocolLimits,
) -> Result<AqDeqResult> {
let mut reader = TtcReader::with_limits(payload, limits)?;
let mut result = AqDeqResult::default();
let mut no_msg_found = false;
while reader.remaining() > 0 {
let message_type = reader.read_u8()?;
match message_type {
0 => {}
TNS_MSG_TYPE_PARAMETER => {
let num_bytes = reader.read_ub4()?;
if num_bytes > 0 {
let mut message = AqDeqMessage::default();
process_msg_props(&mut reader, &mut message, capabilities.ttc_field_version)?;
process_recipients(&mut reader)?;
message.payload = process_payload(&mut reader, kind)?;
message.msgid = Some(process_msg_id(&mut reader)?);
result.message = Some(message);
}
}
TNS_MSG_TYPE_STATUS => {
let _call_status = reader.read_ub4()?;
let _seq = reader.read_ub2()?;
}
TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
let _ = skip_server_side_piggyback(&mut reader)?;
}
TNS_MSG_TYPE_END_OF_RESPONSE => break,
TNS_MSG_TYPE_ERROR => {
let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
if info.number == TNS_ERR_NO_MESSAGES_FOUND as u32 {
no_msg_found = true;
} else if info.number != 0 {
return Err(ProtocolError::ServerErrorInfo(Box::new(
info.into_details(),
)));
}
}
_ => {
return Err(ProtocolError::UnknownMessageType {
message_type,
position: reader.position().saturating_sub(1),
})
}
}
}
if no_msg_found {
result.message = None;
}
Ok(result)
}
pub fn build_aq_array_enq_payload(
queue: &AqQueueDesc,
props_list: &[AqMsgProps],
enq_options: &AqEnqOptions,
seq_num: u8,
ttc_field_version: u8,
supports_oson_long_fnames: bool,
) -> Result<Vec<u8>> {
let num_iters = props_list.len() as u32;
let queue_name = queue.name.as_bytes();
let mut writer = TtcWriter::new();
write_aq_function_code(&mut writer, TNS_FUNC_AQ_ARRAY, seq_num, ttc_field_version);
writer.write_u8(0); writer.write_ub4(0); writer.write_ub4(TNS_AQ_ARRAY_FLAGS_RETURN_MESSAGE_ID);
writer.write_u8(1); writer.write_u8(0); writer.write_sb4(TNS_AQ_ARRAY_ENQ);
writer.write_u8(1); if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
writer.write_ub4(0xFFFF); }
writer.write_ub4(num_iters);
let mut flags = 0u32;
if enq_options.delivery_mode == TNS_AQ_MSG_BUFFERED {
flags |= TNS_KPD_AQ_BUFMSG;
}
writer.write_ub4(0); writer.write_u8(TNS_MSG_TYPE_ROW_HEADER);
writer.write_bytes_with_two_lengths(Some(queue_name))?;
writer.write_raw(&queue.payload_toid);
writer.write_ub2(TNS_AQ_MESSAGE_VERSION);
writer.write_ub4(flags);
for props in props_list {
let payload = props
.payload
.as_ref()
.ok_or(ProtocolError::TtcDecode("AQ array enqueue has no payload"))?;
writer.write_u8(TNS_MSG_TYPE_ROW_DATA);
writer.write_ub4(flags); write_msg_props(&mut writer, props, ttc_field_version)?;
match props.recipients.as_ref() {
None => writer.write_ub4(0),
Some(recipients) => {
writer.write_ub4(3 * recipients.len() as u32);
write_recipients(&mut writer, recipients)?;
}
}
writer.write_sb4(enq_options.visibility as i32);
writer.write_ub4(0); writer.write_sb4(0); if matches!(queue.kind, AqPayloadKind::Raw) {
let raw_len = match payload {
AqPayloadValue::Raw(bytes) => bytes.len() as u32,
_ => return Err(ProtocolError::TtcDecode("RAW queue requires RAW payload")),
};
writer.write_ub4(raw_len);
}
write_payload(&mut writer, payload, supports_oson_long_fnames)?;
}
writer.write_u8(TNS_MSG_TYPE_STATUS);
Ok(writer.into_bytes())
}
pub fn build_aq_array_deq_payload(
queue: &AqQueueDesc,
deq_options: &AqDeqOptions,
num_iters: u32,
seq_num: u8,
ttc_field_version: u8,
) -> Result<Vec<u8>> {
let queue_name = queue.name.as_bytes();
let mut writer = TtcWriter::new();
write_aq_function_code(&mut writer, TNS_FUNC_AQ_ARRAY, seq_num, ttc_field_version);
writer.write_u8(1); writer.write_ub4(num_iters);
writer.write_ub4(TNS_AQ_ARRAY_FLAGS_RETURN_MESSAGE_ID);
writer.write_u8(1); writer.write_u8(1); writer.write_sb4(TNS_AQ_ARRAY_DEQ);
writer.write_u8(0); if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
writer.write_ub4(0xFFFF); }
let mut flags = 0u32;
match deq_options.delivery_mode {
TNS_AQ_MSG_BUFFERED => flags |= TNS_KPD_AQ_BUFMSG,
TNS_AQ_MSG_PERSISTENT_OR_BUFFERED => flags |= TNS_KPD_AQ_EITHER,
_ => {}
}
let consumer_name = deq_options
.consumer_name
.as_ref()
.filter(|name| !name.is_empty())
.map(|name| name.as_bytes());
let correlation = deq_options
.correlation
.as_ref()
.filter(|c| !c.is_empty())
.map(|c| c.as_bytes());
let condition = deq_options
.condition
.as_ref()
.filter(|c| !c.is_empty())
.map(|c| c.as_bytes());
let props = AqMsgProps::default();
for _ in 0..num_iters {
writer.write_bytes_with_two_lengths(Some(queue_name))?;
write_msg_props(&mut writer, &props, ttc_field_version)?;
writer.write_ub4(0); write_value_with_length(&mut writer, consumer_name)?;
writer.write_sb4(deq_options.mode);
writer.write_sb4(deq_options.navigation);
writer.write_sb4(deq_options.visibility);
writer.write_sb4(deq_options.wait as i32);
write_value_with_length(&mut writer, deq_options.msgid.as_deref())?;
write_value_with_length(&mut writer, correlation)?;
write_value_with_length(&mut writer, condition)?;
writer.write_ub4(0); writer.write_ub4(0); writer.write_sb4(0); writer.write_bytes_with_two_lengths(Some(&queue.payload_toid))?;
writer.write_ub2(TNS_AQ_MESSAGE_VERSION);
writer.write_ub4(0); writer.write_ub4(0); writer.write_ub4(0);
writer.write_ub4(flags);
writer.write_ub4(0); writer.write_ub4(0); }
Ok(writer.into_bytes())
}
#[derive(Clone, Debug, Default)]
pub struct AqArrayResult {
pub enq_msgids: Vec<Vec<u8>>,
pub deq_messages: Vec<AqDeqMessage>,
}
pub fn parse_aq_array_response(
payload: &[u8],
capabilities: ClientCapabilities,
operation: i32,
props_count: u32,
kind: &AqPayloadKind,
) -> Result<AqArrayResult> {
parse_aq_array_response_with_limits(
payload,
capabilities,
operation,
props_count,
kind,
ProtocolLimits::DEFAULT,
)
}
pub fn parse_aq_array_response_with_limits(
payload: &[u8],
capabilities: ClientCapabilities,
operation: i32,
props_count: u32,
kind: &AqPayloadKind,
limits: ProtocolLimits,
) -> Result<AqArrayResult> {
limits.check_batch_rows(props_count as usize)?;
let mut reader = TtcReader::with_limits(payload, limits)?;
let mut result = AqArrayResult::default();
let mut messages: Vec<AqDeqMessage> = Vec::new();
let mut enq_msgid_blob: Option<Vec<u8>> = None;
let mut response_num_iters: u32 = 0;
let mut no_msg_found = false;
while reader.remaining() > 0 {
let message_type = reader.read_u8()?;
match message_type {
0 => {}
TNS_MSG_TYPE_PARAMETER => {
let num_iters = reader.read_ub4()?;
reader.limits().check_batch_rows(num_iters as usize)?;
response_num_iters = num_iters;
for i in 0..num_iters {
let mut message = AqDeqMessage::default();
let props_len = reader.read_ub2()?;
if props_len > 0 {
reader.read_u8()?; process_msg_props(
&mut reader,
&mut message,
capabilities.ttc_field_version,
)?;
}
process_recipients(&mut reader)?;
let payload_len = reader.read_ub2()?;
if payload_len > 0 {
message.payload = process_payload(&mut reader, kind)?;
}
let msgid = reader.read_bytes_with_length()?.unwrap_or_default();
if operation == TNS_AQ_ARRAY_ENQ {
enq_msgid_blob = Some(msgid);
} else {
message.msgid = Some(msgid);
}
let ext_len = reader.read_ub2()?;
if ext_len > 0 {
return Err(ProtocolError::UnsupportedFeature("AQ array extensions"));
}
let _output_ack = reader.read_ub2()?;
if operation != TNS_AQ_ARRAY_ENQ {
let _ = i;
messages.push(message);
}
}
if operation == TNS_AQ_ARRAY_ENQ {
response_num_iters = reader.read_ub4()?;
reader
.limits()
.check_batch_rows(response_num_iters as usize)?;
}
}
TNS_MSG_TYPE_STATUS => {
let _call_status = reader.read_ub4()?;
let _seq = reader.read_ub2()?;
}
TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
let _ = skip_server_side_piggyback(&mut reader)?;
}
TNS_MSG_TYPE_END_OF_RESPONSE => break,
TNS_MSG_TYPE_ERROR => {
let info = parse_server_error_info(&mut reader, capabilities.ttc_field_version)?;
if info.number == TNS_ERR_NO_MESSAGES_FOUND as u32 {
no_msg_found = true;
} else if info.number != 0 {
return Err(ProtocolError::ServerErrorInfo(Box::new(
info.into_details(),
)));
}
}
_ => {
return Err(ProtocolError::UnknownMessageType {
message_type,
position: reader.position().saturating_sub(1),
})
}
}
}
if operation == TNS_AQ_ARRAY_ENQ {
if let Some(blob) = enq_msgid_blob {
let count = props_count as usize;
result.enq_msgids = (0..count)
.map(|j| {
let start = j * 16;
let end = start + 16;
blob.get(start..end).map(<[u8]>::to_vec).unwrap_or_default()
})
.collect();
}
} else if no_msg_found {
result.deq_messages = Vec::new();
} else {
let keep = response_num_iters as usize;
messages.truncate(keep);
result.deq_messages = messages;
}
Ok(result)
}
fn process_msg_props(
reader: &mut TtcReader<'_>,
message: &mut AqDeqMessage,
ttc_field_version: u8,
) -> Result<()> {
message.priority = reader.read_sb4()?;
message.delay = reader.read_sb4()?;
message.expiration = reader.read_sb4()?;
message.correlation = reader.read_string_with_length()?;
message.num_attempts = reader.read_sb4()?;
message.exception_queue = reader.read_string_with_length()?;
message.state = reader.read_sb4()?;
message.enq_time = process_date(reader)?;
let _enq_txn_id = reader.read_bytes_with_length()?;
process_extensions(reader)?;
let user_props = reader.read_ub4()?;
if user_props > 0 {
return Err(ProtocolError::UnsupportedFeature("AQ user properties"));
}
let _csn = reader.read_ub4()?;
let _dsn = reader.read_ub4()?;
let flags = reader.read_ub4()?;
message.delivery_mode = if flags == TNS_KPD_AQ_BUFMSG {
TNS_AQ_MSG_BUFFERED
} else {
TNS_AQ_MSG_PERSISTENT
};
if ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1 {
let _shard = reader.read_ub4()?;
}
Ok(())
}
fn process_date(reader: &mut TtcReader<'_>) -> Result<Option<QueryValue>> {
let num_bytes = reader.read_ub4()?;
if num_bytes == 0 {
return Ok(None);
}
let len = usize::from(reader.read_u8()?);
if len == 0 {
return Ok(None);
}
let bytes = reader.read_raw(len)?;
Ok(Some(decode_datetime_value(bytes)?))
}
fn process_extensions(reader: &mut TtcReader<'_>) -> Result<()> {
let num_extensions = reader.read_ub4()?;
if num_extensions > 0 {
reader.read_u8()?; for _ in 0..num_extensions {
let _text = reader.read_bytes_with_length()?;
let _binary = reader.read_bytes_with_length()?;
let _keyword = reader.read_ub2()?;
}
}
Ok(())
}
fn process_recipients(reader: &mut TtcReader<'_>) -> Result<()> {
let count = reader.read_ub4()?;
if count > 0 {
return Err(ProtocolError::UnsupportedFeature(
"AQ recipients on dequeue",
));
}
Ok(())
}
fn process_msg_id(reader: &mut TtcReader<'_>) -> Result<Vec<u8>> {
Ok(reader.read_raw(TNS_AQ_MESSAGE_ID_LENGTH)?.to_vec())
}
fn process_payload(
reader: &mut TtcReader<'_>,
kind: &AqPayloadKind,
) -> Result<Option<AqDeqPayload>> {
if matches!(kind, AqPayloadKind::Object) {
let _toid = reader.read_bytes_with_length()?;
let _oid = reader.read_bytes_with_length()?;
let _snapshot = reader.read_bytes_with_length()?;
let _version = reader.read_ub2()?;
let image_length = reader.read_ub4()?;
reader
.limits()
.check_response_bytes(image_length as usize)?;
let _flags = reader.read_ub2()?;
if image_length == 0 {
return Ok(None);
}
let image = reader
.read_bytes()?
.ok_or(ProtocolError::TtcDecode("AQ object payload missing"))?;
return Ok(Some(AqDeqPayload::Object(image)));
}
let _toid = reader.read_bytes_with_length()?;
let _oid = reader.read_bytes_with_length()?;
let _snapshot = reader.read_bytes_with_length()?;
let _version = reader.read_ub2()?;
let image_length = reader.read_ub4()? as usize;
reader.limits().check_response_bytes(image_length)?;
let _flags = reader.read_ub2()?;
if image_length > 0 {
let raw = reader
.read_bytes()?
.ok_or(ProtocolError::TtcDecode("AQ payload missing"))?;
if raw.len() < image_length {
return Err(ProtocolError::TtcDecode("AQ payload shorter than declared"));
}
let end = image_length;
let start = 4.min(end);
let payload = raw.get(start..end).unwrap_or_default().to_vec();
if matches!(kind, AqPayloadKind::Json) {
let value = decode_oson_with_limits(&payload, reader.limits())?;
return Ok(Some(AqDeqPayload::Json(value)));
}
return Ok(Some(AqDeqPayload::Raw(payload)));
}
if matches!(kind, AqPayloadKind::Raw) {
return Ok(Some(AqDeqPayload::Raw(Vec::new())));
}
Ok(None)
}
#[cfg(test)]
mod tests {
use super::*;
const FV: u8 = 24;
fn caps() -> ClientCapabilities {
ClientCapabilities {
ttc_field_version: FV,
max_string_size: 32767,
charset_id: 873,
}
}
const GOLDEN_RAW_ENQ: &[u8] = &[
0x03, 0x79, 0x04, 0x00, 0x01, 0x01, 0x0e, 0x01, 0x02, 0x00, 0x81, 0x01, 0x01, 0x05, 0x05,
0x43, 0x4f, 0x52, 0x52, 0x31, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x04, 0x0e, 0x00, 0x00,
0x01, 0x40, 0x00, 0x00, 0x01, 0x41, 0x00, 0x01, 0x01, 0x01, 0x00, 0x01, 0x42, 0x00, 0x00,
0x01, 0x45, 0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff, 0xff, 0x01, 0x00, 0x01, 0x02,
0x00, 0x00, 0x00, 0x01, 0x01, 0x10, 0x01, 0x01, 0x00, 0x01, 0x01, 0x11, 0x01, 0x01, 0x10,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x0e, 0x54, 0x45, 0x53, 0x54, 0x5f, 0x52, 0x41, 0x57, 0x5f, 0x51,
0x55, 0x45, 0x55, 0x45, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x17, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x20, 0x72, 0x61, 0x77,
0x20, 0x64, 0x61, 0x74, 0x61, 0x20, 0x31,
];
const GOLDEN_RAW_DEQ: &[u8] = &[
0x03, 0x7a, 0x06, 0x00, 0x01, 0x01, 0x0e, 0x01, 0x01, 0x01, 0x01, 0x00, 0x00, 0x01, 0x03,
0x01, 0x01, 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x10, 0x01, 0x01, 0x01,
0x01, 0x01, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff, 0xff, 0x0e,
0x54, 0x45, 0x53, 0x54, 0x5f, 0x52, 0x41, 0x57, 0x5f, 0x51, 0x55, 0x45, 0x55, 0x45, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x17,
];
#[test]
fn raw_enqueue_request_matches_golden() {
let queue = AqQueueDesc::new("TEST_RAW_QUEUE".to_string(), AqPayloadKind::Raw, None);
let props = AqMsgProps {
priority: 2,
correlation: Some("CORR1".to_string()),
payload: Some(AqPayloadValue::Raw(b"sample raw data 1".to_vec())),
..AqMsgProps::default()
};
let bytes = build_aq_enq_payload(&queue, &props, &AqEnqOptions::default(), 4, FV, false)
.expect("build enqueue");
assert_eq!(bytes, GOLDEN_RAW_ENQ);
}
#[test]
fn raw_dequeue_request_matches_golden() {
let queue = AqQueueDesc::new("TEST_RAW_QUEUE".to_string(), AqPayloadKind::Raw, None);
let deq = AqDeqOptions {
wait: 0,
navigation: 1,
..AqDeqOptions::default()
};
let bytes = build_aq_deq_payload(&queue, &deq, 6, FV).expect("build dequeue");
assert_eq!(bytes, GOLDEN_RAW_DEQ);
}
#[test]
fn empty_queue_dequeue_yields_no_message() {
let caps = caps();
let res = parse_aq_deq_response(&[], caps, &AqPayloadKind::Raw).expect("parse");
assert!(res.message.is_none());
}
fn deq_response_with_raw_image(image_length: u32, raw_image: &[u8]) -> Vec<u8> {
let mut writer = TtcWriter::new();
writer.write_u8(TNS_MSG_TYPE_PARAMETER);
writer.write_ub4(1);
write_msg_props(&mut writer, &AqMsgProps::default(), FV).expect("write message props");
writer.write_ub4(0); writer.write_ub4(0); writer.write_ub4(0); writer.write_ub4(0); writer.write_ub2(0); writer.write_ub4(image_length);
writer.write_ub2(0); writer
.write_bytes_with_length(raw_image)
.expect("write raw image field");
writer.write_raw(&[0u8; TNS_AQ_MESSAGE_ID_LENGTH]);
writer.write_u8(TNS_MSG_TYPE_END_OF_RESPONSE);
writer.into_bytes()
}
#[test]
fn raw_dequeue_rejects_declared_image_length_shortfall() {
let response = deq_response_with_raw_image(8, &[0, 0, 0, 0, b'A', b'B']);
let err = parse_aq_deq_response(&response, caps(), &AqPayloadKind::Raw)
.expect_err("short RAW image must fail");
assert!(matches!(err, ProtocolError::TtcDecode(_)), "got {err:?}");
}
#[test]
fn json_dequeue_rejects_declared_image_length_shortfall() {
let response = deq_response_with_raw_image(8, &[0, 0, 0, 0, b'A', b'B']);
let err = parse_aq_deq_response(&response, caps(), &AqPayloadKind::Json)
.expect_err("short JSON image must fail");
assert!(matches!(err, ProtocolError::TtcDecode(_)), "got {err:?}");
}
#[test]
fn raw_dequeue_accepts_exact_declared_image_length() {
let response = deq_response_with_raw_image(6, &[0, 0, 0, 0, b'A', b'B']);
let res = parse_aq_deq_response(&response, caps(), &AqPayloadKind::Raw)
.expect("exact RAW image should parse");
let message = res.message.expect("message present");
match message.payload {
Some(AqDeqPayload::Raw(payload)) => assert_eq!(payload, vec![b'A', b'B']),
other => panic!("unexpected payload {other:?}"),
}
}
#[test]
fn aq_array_response_respects_protocol_batch_limit() {
let limits = ProtocolLimits {
max_batch_rows: 1,
..ProtocolLimits::DEFAULT
};
let err = parse_aq_array_response_with_limits(
&[],
caps(),
TNS_AQ_ARRAY_ENQ,
2,
&AqPayloadKind::Raw,
limits,
)
.expect_err("client-side AQ batch count above policy must fail");
assert!(
matches!(
err,
ProtocolError::ResourceLimit {
limit: "batch_rows",
observed: 2,
maximum: 1,
}
),
"got {err:?}"
);
}
const GOLDEN_JSON_ENQ: &[u8] = &[
0x03, 0x79, 0x04, 0x00, 0x01, 0x01, 0x0f, 0x00, 0x00, 0x81, 0x01, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x01, 0x04, 0x0e, 0x00, 0x00, 0x01, 0x40, 0x00, 0x00, 0x01, 0x41, 0x00, 0x01,
0x01, 0x01, 0x00, 0x01, 0x42, 0x00, 0x00, 0x01, 0x45, 0x00, 0x00, 0x00, 0x00, 0x04, 0xff,
0xff, 0xff, 0xff, 0x01, 0x00, 0x01, 0x02, 0x00, 0x00, 0x00, 0x01, 0x01, 0x10, 0x01, 0x01,
0x00, 0x00, 0x00, 0x01, 0x01, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x0f, 0x54, 0x45, 0x53, 0x54,
0x5f, 0x4a, 0x53, 0x4f, 0x4e, 0x5f, 0x51, 0x55, 0x45, 0x55, 0x45, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x47, 0x01, 0x28, 0x00,
0x26, 0x00, 0x04, 0x61, 0x08, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x43, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x43, 0xff, 0x4a, 0x5a, 0x01, 0x21,
0x02, 0x03, 0x00, 0x0e, 0x00, 0x1f, 0x00, 0x00, 0x42, 0x9c, 0xe6, 0x00, 0x09, 0x00, 0x05,
0x00, 0x00, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x03, 0x61, 0x67, 0x65, 0x04, 0x63, 0x69, 0x74,
0x79, 0xa4, 0x03, 0x03, 0x02, 0x01, 0x00, 0x00, 0x00, 0x11, 0x00, 0x00, 0x00, 0x17, 0x00,
0x00, 0x00, 0x1b, 0x33, 0x04, 0x4a, 0x6f, 0x68, 0x6e, 0x34, 0x02, 0xc1, 0x1f, 0x33, 0x02,
0x4e, 0x59,
];
#[test]
fn json_enqueue_request_matches_golden() {
let queue = AqQueueDesc::new("TEST_JSON_QUEUE".to_string(), AqPayloadKind::Json, None);
let value = OsonValue::Object(vec![
("name".to_string(), OsonValue::String("John".to_string())),
("age".to_string(), OsonValue::Number("30".to_string())),
("city".to_string(), OsonValue::String("NY".to_string())),
]);
let props = AqMsgProps {
payload: Some(AqPayloadValue::Json(value)),
..AqMsgProps::default()
};
let bytes = build_aq_enq_payload(&queue, &props, &AqEnqOptions::default(), 4, FV, true)
.expect("build json enqueue");
assert_eq!(bytes, GOLDEN_JSON_ENQ);
}
}