#![forbid(unsafe_code)]
use super::*;
use crate::wire::{ProtocolLimits, TtcReader, TtcWriter};
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct SubscribeResult {
pub registration_id: u64,
pub client_id: Option<Vec<u8>>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct MsgRow {
pub operation: u32,
pub rowid: String,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct MsgTable {
pub operation: u32,
pub name: String,
pub rows: Vec<MsgRow>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct MsgQuery {
pub id: u64,
pub operation: u32,
pub tables: Vec<MsgTable>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct NotificationMessage {
pub msg_type: u32,
pub dbname: Option<String>,
pub txid: Option<Vec<u8>>,
pub registered: bool,
pub queue_name: Option<String>,
pub consumer_name: Option<String>,
pub msgid: Option<Vec<u8>>,
pub tables: Vec<MsgTable>,
pub queries: Vec<MsgQuery>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum NotificationRecord {
Message {
message: NotificationMessage,
end_of_response: bool,
},
Stop,
}
fn write_function_code_token(w: &mut TtcWriter, function_code: u8, seq_num: u8, field_version: u8) {
w.write_function_code_with_seq(function_code, seq_num);
if field_version >= TNS_CCAP_FIELD_VERSION_23_1_EXT_1 {
w.write_ub8(0);
}
}
#[allow(clippy::too_many_arguments)]
pub fn build_subscribe_payload_with_seq(
seq_num: u8,
opcode: u8,
username: Option<&str>,
client_id: Option<&[u8]>,
namespace: u32,
name: Option<&str>,
public_qos: u32,
operations: u32,
timeout: u32,
grouping_class: u8,
grouping_value: u32,
grouping_type: u8,
registration_id: u64,
field_version: u8,
) -> Result<Vec<u8>> {
let mut qos = TNS_SUBSCR_QOS_SECURE;
if public_qos & SUBSCR_QOS_RELIABLE != 0 {
qos |= TNS_SUBSCR_QOS_RELIABLE;
}
if public_qos & SUBSCR_QOS_DEREG_NFY != 0 {
qos |= TNS_SUBSCR_QOS_PURGE_ON_NTFN;
}
let mut flags = operations;
if public_qos & SUBSCR_QOS_QUERY != 0 {
flags |= TNS_SUBSCR_FLAGS_QUERY;
}
if public_qos & SUBSCR_QOS_ROWIDS != 0 {
flags |= TNS_SUBSCR_FLAGS_INCLUDE_ROWIDS;
}
let grouping_type = if grouping_class == 0 {
0
} else {
grouping_type
};
let username_bytes = username.map(str::as_bytes);
let mut w = TtcWriter::new();
write_function_code_token(&mut w, TNS_FUNC_SUBSCRIBE, seq_num, field_version);
w.write_u8(opcode);
w.write_ub4(TNS_SUBSCR_MODE_CLIENT_INITIATED);
match username_bytes {
Some(bytes) => {
w.write_u8(1); w.write_ub4(u32::try_from(bytes.len()).unwrap_or(u32::MAX));
}
None => {
w.write_u8(0);
w.write_ub4(0);
}
}
match client_id {
Some(bytes) => {
w.write_u8(1); w.write_ub4(u32::try_from(bytes.len()).unwrap_or(u32::MAX));
}
None => {
w.write_u8(0);
w.write_ub4(0);
}
}
w.write_u8(1); w.write_ub4(1); w.write_ub2(1); w.write_ub2(6); w.write_u8(0); w.write_u8(1); w.write_u8(0); w.write_u8(1); if field_version >= TNS_CCAP_FIELD_VERSION_12_1 {
w.write_u8(1); w.write_u8(1); w.write_u8(1); w.write_u8(1); w.write_u8(1); w.write_ub4(TNS_SUBSCR_CLIENT_ID_LEN);
w.write_u8(1); }
if let Some(bytes) = username_bytes {
w.write_bytes_with_length(bytes)?;
}
if let Some(bytes) = client_id {
w.write_bytes_with_length(bytes)?;
}
w.write_ub4(namespace);
match name {
Some(name) => w.write_bytes_with_two_lengths(Some(name.as_bytes()))?,
None => w.write_ub4(0),
}
w.write_ub4(0); w.write_ub4(0); w.write_ub4(qos);
w.write_ub4(0); w.write_ub4(timeout);
w.write_ub4(0); w.write_ub4(flags);
w.write_ub4(0); w.write_ub4(0); w.write_u8(grouping_class);
w.write_ub4(grouping_value);
w.write_u8(grouping_type);
w.write_ub4(0); w.write_ub4(0);
w.write_ub8(registration_id);
Ok(w.into_bytes())
}
pub fn parse_subscribe_response(
payload: &[u8],
capabilities: ClientCapabilities,
) -> Result<SubscribeResult> {
parse_subscribe_response_with_limits(payload, capabilities, ProtocolLimits::DEFAULT)
}
pub fn parse_subscribe_response_with_limits(
payload: &[u8],
capabilities: ClientCapabilities,
limits: ProtocolLimits,
) -> Result<SubscribeResult> {
let mut reader = TtcReader::with_limits(payload, limits)?;
let mut result = SubscribeResult::default();
let field_version = capabilities.ttc_field_version;
while reader.remaining() > 0 {
let message_type = reader.read_u8()?;
match message_type {
0 => {}
TNS_MSG_TYPE_PARAMETER => {
parse_subscribe_return_parameters(&mut reader, field_version, &mut result)?;
}
TNS_MSG_TYPE_STATUS => {
let _call_status = reader.read_ub4()?;
let _seq = reader.read_ub2()?;
}
TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
let _ = skip_server_side_piggyback(&mut reader)?;
}
TNS_MSG_TYPE_END_OF_RESPONSE => break,
TNS_MSG_TYPE_ERROR => {
let info = parse_server_error_info(&mut reader, field_version)?;
if info.number != 0 {
return Err(ProtocolError::ServerError(info.message));
}
}
_ => {
return Err(ProtocolError::UnknownMessageType {
message_type,
position: reader.position().saturating_sub(1),
})
}
}
}
Ok(result)
}
fn parse_subscribe_return_parameters(
reader: &mut TtcReader<'_>,
field_version: u8,
result: &mut SubscribeResult,
) -> Result<()> {
let num_values = reader.read_ub4()?; for _ in 0..num_values {
let _ = reader.read_ub4()?;
}
for _ in 0..num_values {
let _ = reader.read_ub4()?; }
let num_values = reader.read_ub4()?; for _ in 0..num_values {
result.registration_id = reader.read_ub8()?;
if field_version >= TNS_CCAP_FIELD_VERSION_12_1 {
let _subscriber_name = reader.read_bytes_with_length()?;
}
}
if field_version >= TNS_CCAP_FIELD_VERSION_12_1 {
let num_instances = reader.read_ub4()?;
for _ in 0..num_instances {
let _ = reader.read_bytes_with_length()?;
}
let num_listeners = reader.read_ub4()?;
for _ in 0..num_listeners {
let _ = reader.read_bytes_with_length()?;
}
result.client_id = reader.read_bytes_with_length()?;
}
Ok(())
}
pub fn build_notify_payload_with_seq(
seq_num: u8,
client_id: &[u8],
field_version: u8,
) -> Result<Vec<u8>> {
let mut w = TtcWriter::new();
write_function_code_token(&mut w, TNS_FUNC_NOTIFY, seq_num, field_version);
w.write_ub4(u32::try_from(client_id.len()).unwrap_or(u32::MAX));
w.write_bytes_with_length(client_id)?;
w.write_u8(TNS_INIT_KPNDRREQ);
w.write_ub4(0);
Ok(w.into_bytes())
}
pub fn parse_notification_stream(
payload: &[u8],
namespace: u32,
public_qos: u32,
db_name: Option<&str>,
) -> Result<Vec<NotificationRecord>> {
parse_notification_stream_with_limits(
payload,
namespace,
public_qos,
db_name,
ProtocolLimits::DEFAULT,
)
}
pub fn parse_notification_stream_with_limits(
payload: &[u8],
namespace: u32,
public_qos: u32,
db_name: Option<&str>,
limits: ProtocolLimits,
) -> Result<Vec<NotificationRecord>> {
let mut reader = TtcReader::with_limits(payload, limits)?;
let message_type = reader.read_u8()?; if message_type != TNS_MSG_TYPE_OAC {
return Err(ProtocolError::UnknownMessageType {
message_type,
position: reader.position().saturating_sub(1),
});
}
let mut records = Vec::new();
while reader.remaining() > 0 {
let record =
parse_oac_record_with_limits(&mut reader, namespace, public_qos, db_name, limits)?;
let end = match &record {
NotificationRecord::Stop => true,
NotificationRecord::Message {
end_of_response, ..
} => *end_of_response,
};
records.push(record);
if end {
break;
}
}
Ok(records)
}
pub fn check_notification_header(bytes: &[u8]) -> Result<usize> {
check_notification_header_with_limits(bytes, ProtocolLimits::DEFAULT)
}
pub fn check_notification_header_with_limits(
bytes: &[u8],
limits: ProtocolLimits,
) -> Result<usize> {
let mut reader = TtcReader::with_limits(bytes, limits)?;
let message_type = reader.read_u8()?;
if message_type != TNS_MSG_TYPE_OAC {
return Err(ProtocolError::UnknownMessageType {
message_type,
position: 0,
});
}
Ok(reader.position())
}
pub fn try_parse_oac_record(
bytes: &[u8],
namespace: u32,
public_qos: u32,
db_name: Option<&str>,
) -> Result<Option<(NotificationRecord, usize)>> {
try_parse_oac_record_with_limits(
bytes,
namespace,
public_qos,
db_name,
ProtocolLimits::DEFAULT,
)
}
pub fn try_parse_oac_record_with_limits(
bytes: &[u8],
namespace: u32,
public_qos: u32,
db_name: Option<&str>,
limits: ProtocolLimits,
) -> Result<Option<(NotificationRecord, usize)>> {
let mut reader = TtcReader::with_limits(bytes, limits)?;
match parse_oac_record_with_limits(&mut reader, namespace, public_qos, db_name, limits) {
Ok(record) => Ok(Some((record, reader.position()))),
Err(_) => Ok(None),
}
}
pub fn parse_oac_record(
reader: &mut TtcReader<'_>,
namespace: u32,
public_qos: u32,
db_name: Option<&str>,
) -> Result<NotificationRecord> {
parse_oac_record_with_limits(reader, namespace, public_qos, db_name, reader.limits())
}
pub fn parse_oac_record_with_limits(
reader: &mut TtcReader<'_>,
namespace: u32,
public_qos: u32,
db_name: Option<&str>,
limits: ProtocolLimits,
) -> Result<NotificationRecord> {
let message_type = reader.read_ub4()?;
if message_type == TNS_SUBSCR_STOP_NOTIF {
return Ok(NotificationRecord::Stop);
}
let _error_code = reader.read_ub4()?;
let _registration_id = reader.read_ub4()?;
let queue_name = reader.read_string_with_length()?;
let consumer_name = reader.read_string_with_length()?;
let msgid = reader.read_bytes_with_length()?;
let num_props = reader.read_ub4()?;
if num_props > 0 {
let _ = reader.read_u8()?;
skip_msg_props(reader, num_props)?;
}
skip_bytes_with_length(reader)?;
let mut payload: Option<Vec<u8>> = None;
if namespace != TNS_SUBSCR_NAMESPACE_AQ {
let _payload_type = reader.read_ub4()?;
let _payload_flags = reader.read_ub4()?;
let _chunk_number = reader.read_ub4()?;
payload = reader.read_bytes_with_length()?;
skip_bytes_with_length(reader)?; }
let mut message = NotificationMessage {
msg_type: 0,
dbname: db_name.map(str::to_string),
txid: None,
registered: false,
queue_name,
consumer_name,
msgid,
tables: Vec::new(),
queries: Vec::new(),
};
let end_of_response = process_notification_payload(
payload.as_deref(),
namespace,
public_qos,
limits,
&mut message,
)?;
Ok(NotificationRecord::Message {
message,
end_of_response,
})
}
fn process_notification_payload(
payload: Option<&[u8]>,
namespace: u32,
public_qos: u32,
limits: ProtocolLimits,
message: &mut NotificationMessage,
) -> Result<bool> {
if namespace == TNS_SUBSCR_NAMESPACE_AQ {
message.msg_type = EVENT_AQ;
return Ok(false);
}
let Some(payload) = payload else {
message.msg_type = EVENT_DEREG;
return Ok(true);
};
let mut end_of_response = false;
if public_qos & SUBSCR_QOS_DEREG_NFY != 0 {
message.registered = false;
end_of_response = true;
} else {
message.registered = true;
}
let mut cur = ByteCursor::with_limits(payload, limits)?;
let _version = cur.u16be()?;
let _registration_id = cur.u32be()?;
let event_type = cur.u32be()?;
message.msg_type = event_type;
let dbname_len = cur.u16be()? as usize;
let dbname = cur.raw(dbname_len)?;
message.dbname = Some(
String::from_utf8(dbname.to_vec())
.map_err(|_| ProtocolError::TtcDecode("notification dbname not UTF-8"))?,
);
cur.skip(14)?; if event_type == EVENT_OBJCHANGE {
message.tables = process_tables(&mut cur)?;
} else if event_type == EVENT_QUERYCHANGE {
message.queries = process_queries(&mut cur)?;
}
Ok(end_of_response)
}
fn process_tables(cur: &mut ByteCursor<'_>) -> Result<Vec<MsgTable>> {
let num_tables = cur.u16be()?;
let mut tables: Vec<MsgTable> = cur.with_capacity_limited(
num_tables as usize,
6,
ProtocolLimits::check_length_prefixed_elements,
)?;
for _ in 0..num_tables {
let operation = cur.u32be()?;
let name_len = cur.u16be()? as usize;
let name = String::from_utf8(cur.raw(name_len)?.to_vec())
.map_err(|_| ProtocolError::TtcDecode("table name not UTF-8"))?;
let _object_num = cur.u32be()?;
let rows = if operation & OPCODE_ALLROWS == 0 {
process_rows(cur)?
} else {
Vec::new()
};
tables.push(MsgTable {
operation,
name,
rows,
});
}
Ok(tables)
}
fn process_rows(cur: &mut ByteCursor<'_>) -> Result<Vec<MsgRow>> {
let num_rows = cur.u16be()?;
let mut rows: Vec<MsgRow> = cur.with_capacity_limited(
num_rows as usize,
6,
ProtocolLimits::check_length_prefixed_elements,
)?;
for _ in 0..num_rows {
let operation = cur.u32be()?;
let rowid_len = cur.u16be()? as usize;
let rowid = String::from_utf8(cur.raw(rowid_len)?.to_vec())
.map_err(|_| ProtocolError::TtcDecode("rowid not UTF-8"))?;
rows.push(MsgRow { operation, rowid });
}
Ok(rows)
}
fn process_queries(cur: &mut ByteCursor<'_>) -> Result<Vec<MsgQuery>> {
let num_queries = cur.u16be()?;
let mut queries: Vec<MsgQuery> = cur.with_capacity_limited(
num_queries as usize,
12,
ProtocolLimits::check_length_prefixed_elements,
)?;
for _ in 0..num_queries {
let id_lsb = u64::from(cur.u32be()?);
let id_msb = u64::from(cur.u32be()?);
let id = (id_msb << 32) | id_lsb;
let operation = cur.u32be()?;
let tables = process_tables(cur)?;
queries.push(MsgQuery {
id,
operation,
tables,
});
}
Ok(queries)
}
fn skip_msg_props(reader: &mut TtcReader<'_>, num_props: u32) -> Result<()> {
for _ in 0..num_props {
skip_bytes_with_length(reader)?; skip_bytes_with_length(reader)?; }
Ok(())
}
fn skip_bytes_with_length(reader: &mut TtcReader<'_>) -> Result<()> {
let _ = reader.read_bytes_with_length()?;
Ok(())
}
struct ByteCursor<'a> {
bytes: &'a [u8],
pos: usize,
limits: ProtocolLimits,
}
impl<'a> ByteCursor<'a> {
#[cfg(test)]
fn new(bytes: &'a [u8]) -> Self {
Self {
bytes,
pos: 0,
limits: ProtocolLimits::DEFAULT,
}
}
fn with_limits(bytes: &'a [u8], limits: ProtocolLimits) -> Result<Self> {
let limits = limits.validate()?;
limits.check_response_bytes(bytes.len())?;
Ok(Self {
bytes,
pos: 0,
limits,
})
}
fn raw(&mut self, n: usize) -> Result<&'a [u8]> {
let end = self
.pos
.checked_add(n)
.ok_or(ProtocolError::TtcDecode("notification payload overflow"))?;
let slice = self
.bytes
.get(self.pos..end)
.ok_or(ProtocolError::TtcDecode("notification payload truncated"))?;
self.pos = end;
Ok(slice)
}
fn skip(&mut self, n: usize) -> Result<()> {
let _ = self.raw(n)?;
Ok(())
}
fn u16be(&mut self) -> Result<u16> {
let bytes = self.raw(2)?;
Ok(u16::from_be_bytes([bytes[0], bytes[1]]))
}
fn u32be(&mut self) -> Result<u32> {
let bytes = self.raw(4)?;
Ok(u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
}
}
impl crate::wire::BoundedReader for ByteCursor<'_> {
fn remaining(&self) -> usize {
self.bytes.len().saturating_sub(self.pos)
}
fn protocol_limits(&self) -> ProtocolLimits {
self.limits
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn cqn_oversized_table_count_fails_closed_not_oom() {
let bytes = [0xFFu8, 0xFF];
let mut cur = ByteCursor::new(&bytes);
let err = process_tables(&mut cur).expect_err("oversized table count must fail closed");
assert!(matches!(err, ProtocolError::TtcDecode(_)), "got {err:?}");
let cur2 = ByteCursor::new(&bytes);
let v: Vec<MsgTable> = cur2.with_capacity_bounded(0xFFFF, 6);
assert!(v.capacity() <= 1, "reservation capped by remaining bytes");
}
#[test]
fn cqn_table_count_respects_protocol_element_limit() {
let bytes = [0x00u8, 0x02];
let limits = ProtocolLimits {
max_length_prefixed_elements: 1,
..ProtocolLimits::DEFAULT
};
let mut cur = ByteCursor::with_limits(&bytes, limits).expect("valid limits");
let err = process_tables(&mut cur).expect_err("table count above policy must fail");
assert!(
matches!(
err,
ProtocolError::ResourceLimit {
limit: "length_prefixed_elements",
observed: 2,
maximum: 1,
}
),
"got {err:?}"
);
}
fn caps_12_1() -> ClientCapabilities {
ClientCapabilities {
ttc_field_version: 24,
..ClientCapabilities::default()
}
}
#[test]
fn subscribe_register_payload_matches_golden() {
let payload = build_subscribe_payload_with_seq(
0x03,
TNS_SUBSCR_OP_REGISTER,
Some("pythontest"),
None,
TNS_SUBSCR_NAMESPACE_DBCHANGE,
None,
SUBSCR_QOS_ROWIDS,
0, 10,
0,
0,
0,
0,
24,
)
.expect("subscribe payload");
let expected: &[u8] = &[
0x03, 0x7D, 0x03, 0x00, 0x01, 0x01, 0x04, 0x01, 0x01, 0x0A, 0x00, 0x00, 0x01, 0x01,
0x01, 0x01, 0x01, 0x01, 0x06, 0x00, 0x01, 0x00, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01,
0x01, 0x1D, 0x01, 0x0A, 0x70, 0x79, 0x74, 0x68, 0x6F, 0x6E, 0x74, 0x65, 0x73, 0x74,
0x01, 0x02, 0x00, 0x00, 0x00, 0x01, 0x08, 0x00, 0x01, 0x0A, 0x00, 0x01, 0x10, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
];
assert_eq!(payload, expected);
}
#[test]
fn subscribe_unregister_payload_matches_golden() {
let payload = build_subscribe_payload_with_seq(
0x0A,
TNS_SUBSCR_OP_UNREGISTER,
Some("pythontest"),
Some(b"OCI:EP:301"),
TNS_SUBSCR_NAMESPACE_DBCHANGE,
None,
SUBSCR_QOS_ROWIDS,
0,
10,
0,
0,
0,
302,
24,
)
.expect("unsubscribe payload");
let expected: &[u8] = &[
0x03, 0x7D, 0x0A, 0x00, 0x02, 0x01, 0x04, 0x01, 0x01, 0x0A, 0x01, 0x01, 0x0A, 0x01,
0x01, 0x01, 0x01, 0x01, 0x01, 0x06, 0x00, 0x01, 0x00, 0x01, 0x01, 0x01, 0x01, 0x01,
0x01, 0x01, 0x1D, 0x01, 0x0A, 0x70, 0x79, 0x74, 0x68, 0x6F, 0x6E, 0x74, 0x65, 0x73,
0x74, 0x0A, 0x4F, 0x43, 0x49, 0x3A, 0x45, 0x50, 0x3A, 0x33, 0x30, 0x31, 0x01, 0x02,
0x00, 0x00, 0x00, 0x01, 0x08, 0x00, 0x01, 0x0A, 0x00, 0x01, 0x10, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x02, 0x01, 0x2E,
];
assert_eq!(payload, expected);
}
#[test]
fn notify_payload_matches_golden() {
let payload =
build_notify_payload_with_seq(0x03, b"OCI:EP:301", 24).expect("notify payload");
let want: &[u8] = &[
0x03, 0xBB, 0x03, 0x00, 0x01, 0x0A, 0x0A, 0x4F, 0x43, 0x49, 0x3A, 0x45, 0x50, 0x3A,
0x33, 0x30, 0x31, 0x01, 0x00,
];
assert_eq!(payload, want);
}
#[test]
fn subscribe_response_decodes_registration_and_client_id() {
let payload: &[u8] = &[
0x08, 0x01, 0x01, 0x00, 0x02, 0x01, 0x2E, 0x01, 0x01, 0x02, 0x01, 0x2E, 0x00, 0x00,
0x01, 0x01, 0x01, 0x36, 0x36, 0x28, 0x41, 0x44, 0x44, 0x52, 0x45, 0x53, 0x53, 0x3D,
0x28, 0x50, 0x52, 0x4F, 0x54, 0x4F, 0x43, 0x4F, 0x4C, 0x3D, 0x54, 0x43, 0x50, 0x29,
0x28, 0x48, 0x4F, 0x53, 0x54, 0x3D, 0x32, 0x39, 0x30, 0x61, 0x63, 0x30, 0x33, 0x30,
0x30, 0x33, 0x38, 0x37, 0x29, 0x28, 0x50, 0x4F, 0x52, 0x54, 0x3D, 0x31, 0x35, 0x32,
0x31, 0x29, 0x29, 0x01, 0x0A, 0x0A, 0x4F, 0x43, 0x49, 0x3A, 0x45, 0x50, 0x3A, 0x33,
0x30, 0x31, 0x09, 0x01, 0x01, 0x02, 0xDD, 0x48, 0x1D,
];
let result = parse_subscribe_response(payload, caps_12_1()).expect("subscribe response");
assert_eq!(result.registration_id, 302);
assert_eq!(result.client_id.as_deref(), Some(&b"OCI:EP:301"[..]));
}
const NOTIF_STREAM: &[u8] = &[
0x0d, 0x01, 0x03, 0x00, 0x02, 0x01, 0x2e, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
0x00, 0x01, 0x60, 0x60, 0x00, 0x01, 0x02, 0xa4, 0xe2, 0x7a, 0x00, 0x00, 0x00, 0x06, 0x00,
0x08, 0x46, 0x52, 0x45, 0x45, 0x50, 0x44, 0x42, 0x31, 0x01, 0x00, 0x10, 0x00, 0xd2, 0x03,
0x00, 0x00, 0xe2, 0x7a, 0x00, 0x00, 0x00, 0x9b, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00,
0x18, 0x50, 0x59, 0x54, 0x48, 0x4f, 0x4e, 0x54, 0x45, 0x53, 0x54, 0x2e, 0x54, 0x45, 0x53,
0x54, 0x54, 0x45, 0x4d, 0x50, 0x54, 0x41, 0x42, 0x4c, 0x45, 0x00, 0x01, 0x1c, 0x4a, 0x00,
0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x12, 0x41, 0x41, 0x41, 0x53, 0x6a, 0x4d, 0x41, 0x41,
0x59, 0x41, 0x41, 0x41, 0x4a, 0x4f, 0x33, 0x41, 0x41, 0x41, 0x00, 0x01, 0x03, 0x00, 0x02,
0x01, 0x2e, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x60, 0x60, 0x00,
0x01, 0x00, 0x00, 0x89, 0x00, 0x00, 0x00, 0x00, 0x06, 0x00, 0x08, 0x46, 0x52, 0x45, 0x45,
0x50, 0x44, 0x42, 0x31, 0x03, 0x00, 0x19, 0x00, 0x98, 0x04, 0x00, 0x00, 0x0b, 0x00, 0x00,
0x00, 0x01, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x18, 0x50, 0x59, 0x54, 0x48,
0x4f, 0x4e, 0x54, 0x45, 0x53, 0x54, 0x2e, 0x54, 0x45, 0x53, 0x54, 0x54, 0x45, 0x4d, 0x50,
0x54, 0x41, 0x42, 0x4c, 0x45, 0x00, 0x01, 0x1c, 0x4a, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04,
0x00, 0x12, 0x41, 0x41, 0x41, 0x53, 0x6a, 0x4d, 0x41, 0x41, 0x59, 0x41, 0x41, 0x41, 0x4a,
0x4f, 0x33, 0x41, 0x41, 0x41, 0x00, 0x01, 0x03, 0x00, 0x02, 0x01, 0x2e, 0x00, 0x00, 0x00,
0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x60, 0x60, 0x00, 0x01, 0x03, 0x00, 0x89, 0x00,
0x00, 0x00, 0x00, 0x06, 0x00, 0x08, 0x46, 0x52, 0x45, 0x45, 0x50, 0x44, 0x42, 0x31, 0x05,
0x00, 0x06, 0x00, 0xa9, 0x04, 0x00, 0x00, 0xe2, 0x7a, 0x00, 0x00, 0x44, 0x32, 0x00, 0x01,
0x00, 0x00, 0x00, 0x02, 0x00, 0x18, 0x50, 0x59, 0x54, 0x48, 0x4f, 0x4e, 0x54, 0x45, 0x53,
0x54, 0x2e, 0x54, 0x45, 0x53, 0x54, 0x54, 0x45, 0x4d, 0x50, 0x54, 0x41, 0x42, 0x4c, 0x45,
0x00, 0x01, 0x1c, 0x4a, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x12, 0x41, 0x41, 0x41,
0x53, 0x6a, 0x4d, 0x41, 0x41, 0x59, 0x41, 0x41, 0x41, 0x4a, 0x4f, 0x33, 0x41, 0x41, 0x42,
0x00, 0x01, 0x03, 0x00, 0x02, 0x01, 0x2e, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
0x00, 0x01, 0x60, 0x60, 0x00, 0x01, 0x03, 0xa5, 0xe2, 0x7a, 0x00, 0x00, 0x00, 0x06, 0x00,
0x08, 0x46, 0x52, 0x45, 0x45, 0x50, 0x44, 0x42, 0x31, 0x02, 0x00, 0x09, 0x00, 0x7d, 0x04,
0x00, 0x00, 0xe2, 0x7a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x08, 0x00,
0x18, 0x50, 0x59, 0x54, 0x48, 0x4f, 0x4e, 0x54, 0x45, 0x53, 0x54, 0x2e, 0x54, 0x45, 0x53,
0x54, 0x54, 0x45, 0x4d, 0x50, 0x54, 0x41, 0x42, 0x4c, 0x45, 0x00, 0x01, 0x1c, 0x4a, 0x00,
0x01, 0x00, 0x00, 0x00, 0x08, 0x00, 0x12, 0x41, 0x41, 0x41, 0x53, 0x6a, 0x4d, 0x41, 0x41,
0x59, 0x41, 0x41, 0x41, 0x4a, 0x4f, 0x33, 0x41, 0x41, 0x42, 0x00, 0x01, 0x03, 0x00, 0x02,
0x01, 0x2e, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x46, 0x46, 0x00,
0x01, 0x00, 0x00, 0x89, 0x00, 0x00, 0x00, 0x00, 0x06, 0x00, 0x08, 0x46, 0x52, 0x45, 0x45,
0x50, 0x44, 0x42, 0x31, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xfe, 0x7f, 0x00,
0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x11, 0x00, 0x18, 0x50, 0x59, 0x54, 0x48,
0x4f, 0x4e, 0x54, 0x45, 0x53, 0x54, 0x2e, 0x54, 0x45, 0x53, 0x54, 0x54, 0x45, 0x4d, 0x50,
0x54, 0x41, 0x42, 0x4c, 0x45, 0x00, 0x01, 0x1c, 0x4a, 0x00,
];
#[test]
fn notification_stream_decodes_dml_events() {
let records = parse_notification_stream(
NOTIF_STREAM,
TNS_SUBSCR_NAMESPACE_DBCHANGE,
SUBSCR_QOS_ROWIDS,
Some("FREEPDB1"),
)
.expect("notification stream");
let messages: Vec<&NotificationMessage> = records
.iter()
.filter_map(|r| match r {
NotificationRecord::Message { message, .. } => Some(message),
NotificationRecord::Stop => None,
})
.collect();
assert_eq!(messages.len(), 5);
let table_ops: Vec<u32> = messages.iter().map(|m| m.tables[0].operation).collect();
assert_eq!(table_ops, vec![2, 4, 2, 8, 17]);
let mut row_ops = Vec::new();
let mut rowids = Vec::new();
for m in &messages {
assert_eq!(m.msg_type, EVENT_OBJCHANGE);
assert_eq!(m.dbname.as_deref(), Some("FREEPDB1"));
assert!(m.registered);
assert!(m.txid.is_none());
for row in &m.tables[0].rows {
row_ops.push(row.operation);
rowids.push(row.rowid.clone());
}
}
assert_eq!(row_ops, vec![2, 4, 2, 8]);
assert_eq!(
rowids,
vec![
"AAASjMAAYAAAJO3AAA",
"AAASjMAAYAAAJO3AAA",
"AAASjMAAYAAAJO3AAB",
"AAASjMAAYAAAJO3AAB",
]
);
assert!(messages[4].tables[0].rows.is_empty());
}
}