1#![forbid(unsafe_code)]
2
3use super::*;
18use crate::wire::{TtcReader, TtcWriter};
19
20#[derive(Clone, Debug, Default, PartialEq, Eq)]
22pub struct SubscribeResult {
23 pub registration_id: u64,
25 pub client_id: Option<Vec<u8>>,
27}
28
29#[derive(Clone, Debug, PartialEq, Eq)]
31pub struct MsgRow {
32 pub operation: u32,
33 pub rowid: String,
34}
35
36#[derive(Clone, Debug, PartialEq, Eq)]
38pub struct MsgTable {
39 pub operation: u32,
40 pub name: String,
41 pub rows: Vec<MsgRow>,
42}
43
44#[derive(Clone, Debug, PartialEq, Eq)]
46pub struct MsgQuery {
47 pub id: u64,
48 pub operation: u32,
49 pub tables: Vec<MsgTable>,
50}
51
52#[derive(Clone, Debug, PartialEq, Eq)]
54pub struct NotificationMessage {
55 pub msg_type: u32,
57 pub dbname: Option<String>,
58 pub txid: Option<Vec<u8>>,
60 pub registered: bool,
61 pub queue_name: Option<String>,
62 pub consumer_name: Option<String>,
63 pub msgid: Option<Vec<u8>>,
64 pub tables: Vec<MsgTable>,
65 pub queries: Vec<MsgQuery>,
66}
67
68#[derive(Clone, Debug, PartialEq, Eq)]
70pub enum NotificationRecord {
71 Message {
74 message: NotificationMessage,
75 end_of_response: bool,
76 },
77 Stop,
79}
80
81fn write_function_code_token(w: &mut TtcWriter, function_code: u8, seq_num: u8, field_version: u8) {
86 w.write_function_code_with_seq(function_code, seq_num);
87 if field_version >= TNS_CCAP_FIELD_VERSION_23_1_EXT_1 {
88 w.write_ub8(0);
89 }
90}
91
92#[allow(clippy::too_many_arguments)]
98pub fn build_subscribe_payload_with_seq(
99 seq_num: u8,
100 opcode: u8,
101 username: Option<&str>,
102 client_id: Option<&[u8]>,
103 namespace: u32,
104 name: Option<&str>,
105 public_qos: u32,
106 operations: u32,
107 timeout: u32,
108 grouping_class: u8,
109 grouping_value: u32,
110 grouping_type: u8,
111 registration_id: u64,
112 field_version: u8,
113) -> Result<Vec<u8>> {
114 let mut qos = TNS_SUBSCR_QOS_SECURE;
116 if public_qos & SUBSCR_QOS_RELIABLE != 0 {
117 qos |= TNS_SUBSCR_QOS_RELIABLE;
118 }
119 if public_qos & SUBSCR_QOS_DEREG_NFY != 0 {
120 qos |= TNS_SUBSCR_QOS_PURGE_ON_NTFN;
121 }
122 let mut flags = operations;
124 if public_qos & SUBSCR_QOS_QUERY != 0 {
125 flags |= TNS_SUBSCR_FLAGS_QUERY;
126 }
127 if public_qos & SUBSCR_QOS_ROWIDS != 0 {
128 flags |= TNS_SUBSCR_FLAGS_INCLUDE_ROWIDS;
129 }
130 let grouping_type = if grouping_class == 0 {
132 0
133 } else {
134 grouping_type
135 };
136
137 let username_bytes = username.map(str::as_bytes);
138
139 let mut w = TtcWriter::new();
140 write_function_code_token(&mut w, TNS_FUNC_SUBSCRIBE, seq_num, field_version);
141 w.write_u8(opcode);
142 w.write_ub4(TNS_SUBSCR_MODE_CLIENT_INITIATED);
143 match username_bytes {
144 Some(bytes) => {
145 w.write_u8(1); w.write_ub4(u32::try_from(bytes.len()).unwrap_or(u32::MAX));
147 }
148 None => {
149 w.write_u8(0);
150 w.write_ub4(0);
151 }
152 }
153 match client_id {
154 Some(bytes) => {
155 w.write_u8(1); w.write_ub4(u32::try_from(bytes.len()).unwrap_or(u32::MAX));
157 }
158 None => {
159 w.write_u8(0);
160 w.write_ub4(0);
161 }
162 }
163 w.write_u8(1); w.write_ub4(1); w.write_ub2(1); w.write_ub2(6); w.write_u8(0); w.write_u8(1); w.write_u8(0); w.write_u8(1); if field_version >= TNS_CCAP_FIELD_VERSION_12_1 {
172 w.write_u8(1); w.write_u8(1); w.write_u8(1); w.write_u8(1); w.write_u8(1); w.write_ub4(TNS_SUBSCR_CLIENT_ID_LEN);
178 w.write_u8(1); }
180 if let Some(bytes) = username_bytes {
181 w.write_bytes_with_length(bytes)?;
182 }
183 if let Some(bytes) = client_id {
184 w.write_bytes_with_length(bytes)?;
185 }
186 w.write_ub4(namespace);
187 match name {
188 Some(name) => w.write_bytes_with_two_lengths(Some(name.as_bytes()))?,
189 None => w.write_ub4(0),
190 }
191 w.write_ub4(0); w.write_ub4(0); w.write_ub4(qos);
194 w.write_ub4(0); w.write_ub4(timeout);
196 w.write_ub4(0); w.write_ub4(flags);
198 w.write_ub4(0); w.write_ub4(0); w.write_u8(grouping_class);
201 w.write_ub4(grouping_value);
202 w.write_u8(grouping_type);
203 w.write_ub4(0); w.write_ub4(0);
207 w.write_ub8(registration_id);
208 Ok(w.into_bytes())
209}
210
211pub fn parse_subscribe_response(
215 payload: &[u8],
216 capabilities: ClientCapabilities,
217) -> Result<SubscribeResult> {
218 let mut reader = TtcReader::new(payload);
219 let mut result = SubscribeResult::default();
220 let field_version = capabilities.ttc_field_version;
221 while reader.remaining() > 0 {
222 let message_type = reader.read_u8()?;
223 match message_type {
224 0 => {}
225 TNS_MSG_TYPE_PARAMETER => {
226 parse_subscribe_return_parameters(&mut reader, field_version, &mut result)?;
227 }
228 TNS_MSG_TYPE_STATUS => {
229 let _call_status = reader.read_ub4()?;
230 let _seq = reader.read_ub2()?;
231 }
232 TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
233 let _ = skip_server_side_piggyback(&mut reader)?;
234 }
235 TNS_MSG_TYPE_END_OF_RESPONSE => break,
236 TNS_MSG_TYPE_ERROR => {
237 let info = parse_server_error_info(&mut reader, field_version)?;
238 if info.number != 0 {
239 return Err(ProtocolError::ServerError(info.message));
240 }
241 }
242 _ => {
243 return Err(ProtocolError::UnknownMessageType {
244 message_type,
245 position: reader.position().saturating_sub(1),
246 })
247 }
248 }
249 }
250 Ok(result)
251}
252
253fn parse_subscribe_return_parameters(
254 reader: &mut TtcReader<'_>,
255 field_version: u8,
256 result: &mut SubscribeResult,
257) -> Result<()> {
258 let num_values = reader.read_ub4()?; for _ in 0..num_values {
260 let _ = reader.read_ub4()?;
261 }
262 for _ in 0..num_values {
263 let _ = reader.read_ub4()?; }
265 let num_values = reader.read_ub4()?; for _ in 0..num_values {
267 result.registration_id = reader.read_ub8()?;
268 if field_version >= TNS_CCAP_FIELD_VERSION_12_1 {
269 let _subscriber_name = reader.read_bytes_with_length()?;
270 }
271 }
272 if field_version >= TNS_CCAP_FIELD_VERSION_12_1 {
273 let num_instances = reader.read_ub4()?;
274 for _ in 0..num_instances {
275 let _ = reader.read_bytes_with_length()?;
276 }
277 let num_listeners = reader.read_ub4()?;
278 for _ in 0..num_listeners {
279 let _ = reader.read_bytes_with_length()?;
280 }
281 result.client_id = reader.read_bytes_with_length()?;
282 }
283 Ok(())
284}
285
286pub fn build_notify_payload_with_seq(
290 seq_num: u8,
291 client_id: &[u8],
292 field_version: u8,
293) -> Result<Vec<u8>> {
294 let mut w = TtcWriter::new();
295 write_function_code_token(&mut w, TNS_FUNC_NOTIFY, seq_num, field_version);
296 w.write_ub4(u32::try_from(client_id.len()).unwrap_or(u32::MAX));
297 w.write_bytes_with_length(client_id)?;
298 w.write_u8(TNS_INIT_KPNDRREQ);
299 w.write_ub4(0);
300 Ok(w.into_bytes())
301}
302
303pub fn parse_notification_stream(
311 payload: &[u8],
312 namespace: u32,
313 public_qos: u32,
314 db_name: Option<&str>,
315) -> Result<Vec<NotificationRecord>> {
316 let mut reader = TtcReader::new(payload);
317 let message_type = reader.read_u8()?; if message_type != TNS_MSG_TYPE_OAC {
319 return Err(ProtocolError::UnknownMessageType {
320 message_type,
321 position: reader.position().saturating_sub(1),
322 });
323 }
324 let mut records = Vec::new();
325 while reader.remaining() > 0 {
326 let record = parse_oac_record(&mut reader, namespace, public_qos, db_name)?;
327 let end = match &record {
328 NotificationRecord::Stop => true,
329 NotificationRecord::Message {
330 end_of_response, ..
331 } => *end_of_response,
332 };
333 records.push(record);
334 if end {
335 break;
336 }
337 }
338 Ok(records)
339}
340
341pub fn check_notification_header(bytes: &[u8]) -> Result<usize> {
345 let mut reader = TtcReader::new(bytes);
346 let message_type = reader.read_u8()?;
347 if message_type != TNS_MSG_TYPE_OAC {
348 return Err(ProtocolError::UnknownMessageType {
349 message_type,
350 position: 0,
351 });
352 }
353 Ok(reader.position())
354}
355
356pub fn try_parse_oac_record(
362 bytes: &[u8],
363 namespace: u32,
364 public_qos: u32,
365 db_name: Option<&str>,
366) -> Result<Option<(NotificationRecord, usize)>> {
367 let mut reader = TtcReader::new(bytes);
368 match parse_oac_record(&mut reader, namespace, public_qos, db_name) {
369 Ok(record) => Ok(Some((record, reader.position()))),
370 Err(_) => Ok(None),
374 }
375}
376
377pub fn parse_oac_record(
380 reader: &mut TtcReader<'_>,
381 namespace: u32,
382 public_qos: u32,
383 db_name: Option<&str>,
384) -> Result<NotificationRecord> {
385 let message_type = reader.read_ub4()?;
386 if message_type == TNS_SUBSCR_STOP_NOTIF {
387 return Ok(NotificationRecord::Stop);
388 }
389 let _error_code = reader.read_ub4()?;
390 let _registration_id = reader.read_ub4()?;
391 let queue_name = reader.read_string_with_length()?;
392 let consumer_name = reader.read_string_with_length()?;
393 let msgid = reader.read_bytes_with_length()?;
394 let num_props = reader.read_ub4()?;
395 if num_props > 0 {
396 let _ = reader.read_u8()?;
400 skip_msg_props(reader, num_props)?;
401 }
402 skip_bytes_with_length(reader)?; let mut payload: Option<Vec<u8>> = None;
405 if namespace != TNS_SUBSCR_NAMESPACE_AQ {
406 let _payload_type = reader.read_ub4()?;
407 let _payload_flags = reader.read_ub4()?;
408 let _chunk_number = reader.read_ub4()?;
409 payload = reader.read_bytes_with_length()?;
410 skip_bytes_with_length(reader)?; }
412
413 let mut message = NotificationMessage {
414 msg_type: 0,
415 dbname: db_name.map(str::to_string),
416 txid: None,
417 registered: false,
418 queue_name,
419 consumer_name,
420 msgid,
421 tables: Vec::new(),
422 queries: Vec::new(),
423 };
424 let end_of_response =
425 process_notification_payload(payload.as_deref(), namespace, public_qos, &mut message)?;
426 Ok(NotificationRecord::Message {
427 message,
428 end_of_response,
429 })
430}
431
432fn process_notification_payload(
435 payload: Option<&[u8]>,
436 namespace: u32,
437 public_qos: u32,
438 message: &mut NotificationMessage,
439) -> Result<bool> {
440 if namespace == TNS_SUBSCR_NAMESPACE_AQ {
441 message.msg_type = EVENT_AQ;
442 return Ok(false);
443 }
444 let Some(payload) = payload else {
445 message.msg_type = EVENT_DEREG;
447 return Ok(true);
448 };
449 let mut end_of_response = false;
450 if public_qos & SUBSCR_QOS_DEREG_NFY != 0 {
451 message.registered = false;
452 end_of_response = true;
453 } else {
454 message.registered = true;
455 }
456 let mut cur = ByteCursor::new(payload);
458 let _version = cur.u16be()?;
459 let _registration_id = cur.u32be()?;
460 let event_type = cur.u32be()?;
461 message.msg_type = event_type;
462 let dbname_len = cur.u16be()? as usize;
463 let dbname = cur.raw(dbname_len)?;
464 message.dbname = Some(
465 String::from_utf8(dbname.to_vec())
466 .map_err(|_| ProtocolError::TtcDecode("notification dbname not UTF-8"))?,
467 );
468 cur.skip(14)?; if event_type == EVENT_OBJCHANGE {
470 message.tables = process_tables(&mut cur)?;
471 } else if event_type == EVENT_QUERYCHANGE {
472 message.queries = process_queries(&mut cur)?;
473 }
474 Ok(end_of_response)
475}
476
477fn process_tables(cur: &mut ByteCursor<'_>) -> Result<Vec<MsgTable>> {
478 let num_tables = cur.u16be()?;
479 let mut tables: Vec<MsgTable> = cur.with_capacity_bounded(num_tables as usize, 6);
483 for _ in 0..num_tables {
484 let operation = cur.u32be()?;
485 let name_len = cur.u16be()? as usize;
486 let name = String::from_utf8(cur.raw(name_len)?.to_vec())
487 .map_err(|_| ProtocolError::TtcDecode("table name not UTF-8"))?;
488 let _object_num = cur.u32be()?;
489 let rows = if operation & OPCODE_ALLROWS == 0 {
490 process_rows(cur)?
491 } else {
492 Vec::new()
493 };
494 tables.push(MsgTable {
495 operation,
496 name,
497 rows,
498 });
499 }
500 Ok(tables)
501}
502
503fn process_rows(cur: &mut ByteCursor<'_>) -> Result<Vec<MsgRow>> {
504 let num_rows = cur.u16be()?;
505 let mut rows: Vec<MsgRow> = cur.with_capacity_bounded(num_rows as usize, 6);
508 for _ in 0..num_rows {
509 let operation = cur.u32be()?;
510 let rowid_len = cur.u16be()? as usize;
511 let rowid = String::from_utf8(cur.raw(rowid_len)?.to_vec())
512 .map_err(|_| ProtocolError::TtcDecode("rowid not UTF-8"))?;
513 rows.push(MsgRow { operation, rowid });
514 }
515 Ok(rows)
516}
517
518fn process_queries(cur: &mut ByteCursor<'_>) -> Result<Vec<MsgQuery>> {
519 let num_queries = cur.u16be()?;
520 let mut queries: Vec<MsgQuery> = cur.with_capacity_bounded(num_queries as usize, 12);
523 for _ in 0..num_queries {
524 let id_lsb = u64::from(cur.u32be()?);
525 let id_msb = u64::from(cur.u32be()?);
526 let id = (id_msb << 32) | id_lsb;
527 let operation = cur.u32be()?;
528 let tables = process_tables(cur)?;
529 queries.push(MsgQuery {
530 id,
531 operation,
532 tables,
533 });
534 }
535 Ok(queries)
536}
537
538fn skip_msg_props(reader: &mut TtcReader<'_>, num_props: u32) -> Result<()> {
541 for _ in 0..num_props {
542 skip_bytes_with_length(reader)?; skip_bytes_with_length(reader)?; }
545 Ok(())
546}
547
548fn skip_bytes_with_length(reader: &mut TtcReader<'_>) -> Result<()> {
549 let _ = reader.read_bytes_with_length()?;
550 Ok(())
551}
552
553struct ByteCursor<'a> {
555 bytes: &'a [u8],
556 pos: usize,
557}
558
559impl<'a> ByteCursor<'a> {
560 fn new(bytes: &'a [u8]) -> Self {
561 Self { bytes, pos: 0 }
562 }
563
564 fn raw(&mut self, n: usize) -> Result<&'a [u8]> {
565 let end = self
566 .pos
567 .checked_add(n)
568 .ok_or(ProtocolError::TtcDecode("notification payload overflow"))?;
569 let slice = self
570 .bytes
571 .get(self.pos..end)
572 .ok_or(ProtocolError::TtcDecode("notification payload truncated"))?;
573 self.pos = end;
574 Ok(slice)
575 }
576
577 fn skip(&mut self, n: usize) -> Result<()> {
578 let _ = self.raw(n)?;
579 Ok(())
580 }
581
582 fn u16be(&mut self) -> Result<u16> {
583 let bytes = self.raw(2)?;
584 Ok(u16::from_be_bytes([bytes[0], bytes[1]]))
585 }
586
587 fn u32be(&mut self) -> Result<u32> {
588 let bytes = self.raw(4)?;
589 Ok(u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
590 }
591}
592
593impl crate::wire::BoundedReader for ByteCursor<'_> {
594 fn remaining(&self) -> usize {
595 self.bytes.len().saturating_sub(self.pos)
596 }
597}
598
599#[cfg(test)]
600mod tests {
601 use super::*;
602
603 #[test]
610 fn cqn_oversized_table_count_fails_closed_not_oom() {
611 let bytes = [0xFFu8, 0xFF];
613 let mut cur = ByteCursor::new(&bytes);
614 let err = process_tables(&mut cur).expect_err("oversized table count must fail closed");
615 assert!(matches!(err, ProtocolError::TtcDecode(_)), "got {err:?}");
616 let cur2 = ByteCursor::new(&bytes);
618 let v: Vec<MsgTable> = cur2.with_capacity_bounded(0xFFFF, 6);
619 assert!(v.capacity() <= 1, "reservation capped by remaining bytes");
620 }
621
622 fn caps_12_1() -> ClientCapabilities {
623 ClientCapabilities {
624 ttc_field_version: 24,
625 ..ClientCapabilities::default()
626 }
627 }
628
629 #[test]
630 fn subscribe_register_payload_matches_golden() {
631 let payload = build_subscribe_payload_with_seq(
634 0x03,
635 TNS_SUBSCR_OP_REGISTER,
636 Some("pythontest"),
637 None,
638 TNS_SUBSCR_NAMESPACE_DBCHANGE,
639 None,
640 SUBSCR_QOS_ROWIDS,
641 0, 10,
643 0,
644 0,
645 0,
646 0,
647 24,
648 )
649 .expect("subscribe payload");
650 let expected: &[u8] = &[
653 0x03, 0x7D, 0x03, 0x00, 0x01, 0x01, 0x04, 0x01, 0x01, 0x0A, 0x00, 0x00, 0x01, 0x01,
654 0x01, 0x01, 0x01, 0x01, 0x06, 0x00, 0x01, 0x00, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01,
655 0x01, 0x1D, 0x01, 0x0A, 0x70, 0x79, 0x74, 0x68, 0x6F, 0x6E, 0x74, 0x65, 0x73, 0x74,
656 0x01, 0x02, 0x00, 0x00, 0x00, 0x01, 0x08, 0x00, 0x01, 0x0A, 0x00, 0x01, 0x10, 0x00,
657 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
658 ];
659 assert_eq!(payload, expected);
660 }
661
662 #[test]
663 fn subscribe_unregister_payload_matches_golden() {
664 let payload = build_subscribe_payload_with_seq(
667 0x0A,
668 TNS_SUBSCR_OP_UNREGISTER,
669 Some("pythontest"),
670 Some(b"OCI:EP:301"),
671 TNS_SUBSCR_NAMESPACE_DBCHANGE,
672 None,
673 SUBSCR_QOS_ROWIDS,
674 0,
675 10,
676 0,
677 0,
678 0,
679 302,
680 24,
681 )
682 .expect("unsubscribe payload");
683 let expected: &[u8] = &[
684 0x03, 0x7D, 0x0A, 0x00, 0x02, 0x01, 0x04, 0x01, 0x01, 0x0A, 0x01, 0x01, 0x0A, 0x01,
685 0x01, 0x01, 0x01, 0x01, 0x01, 0x06, 0x00, 0x01, 0x00, 0x01, 0x01, 0x01, 0x01, 0x01,
686 0x01, 0x01, 0x1D, 0x01, 0x0A, 0x70, 0x79, 0x74, 0x68, 0x6F, 0x6E, 0x74, 0x65, 0x73,
687 0x74, 0x0A, 0x4F, 0x43, 0x49, 0x3A, 0x45, 0x50, 0x3A, 0x33, 0x30, 0x31, 0x01, 0x02,
688 0x00, 0x00, 0x00, 0x01, 0x08, 0x00, 0x01, 0x0A, 0x00, 0x01, 0x10, 0x00, 0x00, 0x00,
689 0x00, 0x00, 0x00, 0x00, 0x02, 0x01, 0x2E,
690 ];
691 assert_eq!(payload, expected);
692 }
693
694 #[test]
695 fn notify_payload_matches_golden() {
696 let payload =
698 build_notify_payload_with_seq(0x03, b"OCI:EP:301", 24).expect("notify payload");
699 let want: &[u8] = &[
701 0x03, 0xBB, 0x03, 0x00, 0x01, 0x0A, 0x0A, 0x4F, 0x43, 0x49, 0x3A, 0x45, 0x50, 0x3A,
702 0x33, 0x30, 0x31, 0x01, 0x00,
703 ];
704 assert_eq!(payload, want);
705 }
706
707 #[test]
708 fn subscribe_response_decodes_registration_and_client_id() {
709 let payload: &[u8] = &[
711 0x08, 0x01, 0x01, 0x00, 0x02, 0x01, 0x2E, 0x01, 0x01, 0x02, 0x01, 0x2E, 0x00, 0x00,
712 0x01, 0x01, 0x01, 0x36, 0x36, 0x28, 0x41, 0x44, 0x44, 0x52, 0x45, 0x53, 0x53, 0x3D,
713 0x28, 0x50, 0x52, 0x4F, 0x54, 0x4F, 0x43, 0x4F, 0x4C, 0x3D, 0x54, 0x43, 0x50, 0x29,
714 0x28, 0x48, 0x4F, 0x53, 0x54, 0x3D, 0x32, 0x39, 0x30, 0x61, 0x63, 0x30, 0x33, 0x30,
715 0x30, 0x33, 0x38, 0x37, 0x29, 0x28, 0x50, 0x4F, 0x52, 0x54, 0x3D, 0x31, 0x35, 0x32,
716 0x31, 0x29, 0x29, 0x01, 0x0A, 0x0A, 0x4F, 0x43, 0x49, 0x3A, 0x45, 0x50, 0x3A, 0x33,
717 0x30, 0x31, 0x09, 0x01, 0x01, 0x02, 0xDD, 0x48, 0x1D,
718 ];
719 let result = parse_subscribe_response(payload, caps_12_1()).expect("subscribe response");
720 assert_eq!(result.registration_id, 302);
721 assert_eq!(result.client_id.as_deref(), Some(&b"OCI:EP:301"[..]));
722 }
723
724 const NOTIF_STREAM: &[u8] = &[
728 0x0d, 0x01, 0x03, 0x00, 0x02, 0x01, 0x2e, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
729 0x00, 0x01, 0x60, 0x60, 0x00, 0x01, 0x02, 0xa4, 0xe2, 0x7a, 0x00, 0x00, 0x00, 0x06, 0x00,
730 0x08, 0x46, 0x52, 0x45, 0x45, 0x50, 0x44, 0x42, 0x31, 0x01, 0x00, 0x10, 0x00, 0xd2, 0x03,
731 0x00, 0x00, 0xe2, 0x7a, 0x00, 0x00, 0x00, 0x9b, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00,
732 0x18, 0x50, 0x59, 0x54, 0x48, 0x4f, 0x4e, 0x54, 0x45, 0x53, 0x54, 0x2e, 0x54, 0x45, 0x53,
733 0x54, 0x54, 0x45, 0x4d, 0x50, 0x54, 0x41, 0x42, 0x4c, 0x45, 0x00, 0x01, 0x1c, 0x4a, 0x00,
734 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x12, 0x41, 0x41, 0x41, 0x53, 0x6a, 0x4d, 0x41, 0x41,
735 0x59, 0x41, 0x41, 0x41, 0x4a, 0x4f, 0x33, 0x41, 0x41, 0x41, 0x00, 0x01, 0x03, 0x00, 0x02,
736 0x01, 0x2e, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x60, 0x60, 0x00,
737 0x01, 0x00, 0x00, 0x89, 0x00, 0x00, 0x00, 0x00, 0x06, 0x00, 0x08, 0x46, 0x52, 0x45, 0x45,
738 0x50, 0x44, 0x42, 0x31, 0x03, 0x00, 0x19, 0x00, 0x98, 0x04, 0x00, 0x00, 0x0b, 0x00, 0x00,
739 0x00, 0x01, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x18, 0x50, 0x59, 0x54, 0x48,
740 0x4f, 0x4e, 0x54, 0x45, 0x53, 0x54, 0x2e, 0x54, 0x45, 0x53, 0x54, 0x54, 0x45, 0x4d, 0x50,
741 0x54, 0x41, 0x42, 0x4c, 0x45, 0x00, 0x01, 0x1c, 0x4a, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04,
742 0x00, 0x12, 0x41, 0x41, 0x41, 0x53, 0x6a, 0x4d, 0x41, 0x41, 0x59, 0x41, 0x41, 0x41, 0x4a,
743 0x4f, 0x33, 0x41, 0x41, 0x41, 0x00, 0x01, 0x03, 0x00, 0x02, 0x01, 0x2e, 0x00, 0x00, 0x00,
744 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x60, 0x60, 0x00, 0x01, 0x03, 0x00, 0x89, 0x00,
745 0x00, 0x00, 0x00, 0x06, 0x00, 0x08, 0x46, 0x52, 0x45, 0x45, 0x50, 0x44, 0x42, 0x31, 0x05,
746 0x00, 0x06, 0x00, 0xa9, 0x04, 0x00, 0x00, 0xe2, 0x7a, 0x00, 0x00, 0x44, 0x32, 0x00, 0x01,
747 0x00, 0x00, 0x00, 0x02, 0x00, 0x18, 0x50, 0x59, 0x54, 0x48, 0x4f, 0x4e, 0x54, 0x45, 0x53,
748 0x54, 0x2e, 0x54, 0x45, 0x53, 0x54, 0x54, 0x45, 0x4d, 0x50, 0x54, 0x41, 0x42, 0x4c, 0x45,
749 0x00, 0x01, 0x1c, 0x4a, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x12, 0x41, 0x41, 0x41,
750 0x53, 0x6a, 0x4d, 0x41, 0x41, 0x59, 0x41, 0x41, 0x41, 0x4a, 0x4f, 0x33, 0x41, 0x41, 0x42,
751 0x00, 0x01, 0x03, 0x00, 0x02, 0x01, 0x2e, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
752 0x00, 0x01, 0x60, 0x60, 0x00, 0x01, 0x03, 0xa5, 0xe2, 0x7a, 0x00, 0x00, 0x00, 0x06, 0x00,
753 0x08, 0x46, 0x52, 0x45, 0x45, 0x50, 0x44, 0x42, 0x31, 0x02, 0x00, 0x09, 0x00, 0x7d, 0x04,
754 0x00, 0x00, 0xe2, 0x7a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x08, 0x00,
755 0x18, 0x50, 0x59, 0x54, 0x48, 0x4f, 0x4e, 0x54, 0x45, 0x53, 0x54, 0x2e, 0x54, 0x45, 0x53,
756 0x54, 0x54, 0x45, 0x4d, 0x50, 0x54, 0x41, 0x42, 0x4c, 0x45, 0x00, 0x01, 0x1c, 0x4a, 0x00,
757 0x01, 0x00, 0x00, 0x00, 0x08, 0x00, 0x12, 0x41, 0x41, 0x41, 0x53, 0x6a, 0x4d, 0x41, 0x41,
758 0x59, 0x41, 0x41, 0x41, 0x4a, 0x4f, 0x33, 0x41, 0x41, 0x42, 0x00, 0x01, 0x03, 0x00, 0x02,
759 0x01, 0x2e, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x46, 0x46, 0x00,
760 0x01, 0x00, 0x00, 0x89, 0x00, 0x00, 0x00, 0x00, 0x06, 0x00, 0x08, 0x46, 0x52, 0x45, 0x45,
761 0x50, 0x44, 0x42, 0x31, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xfe, 0x7f, 0x00,
762 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x11, 0x00, 0x18, 0x50, 0x59, 0x54, 0x48,
763 0x4f, 0x4e, 0x54, 0x45, 0x53, 0x54, 0x2e, 0x54, 0x45, 0x53, 0x54, 0x54, 0x45, 0x4d, 0x50,
764 0x54, 0x41, 0x42, 0x4c, 0x45, 0x00, 0x01, 0x1c, 0x4a, 0x00,
765 ];
766
767 #[test]
768 fn notification_stream_decodes_dml_events() {
769 let records = parse_notification_stream(
770 NOTIF_STREAM,
771 TNS_SUBSCR_NAMESPACE_DBCHANGE,
772 SUBSCR_QOS_ROWIDS,
773 Some("FREEPDB1"),
774 )
775 .expect("notification stream");
776 let messages: Vec<&NotificationMessage> = records
777 .iter()
778 .filter_map(|r| match r {
779 NotificationRecord::Message { message, .. } => Some(message),
780 NotificationRecord::Stop => None,
781 })
782 .collect();
783 assert_eq!(messages.len(), 5);
784
785 let table_ops: Vec<u32> = messages.iter().map(|m| m.tables[0].operation).collect();
786 assert_eq!(table_ops, vec![2, 4, 2, 8, 17]);
787
788 let mut row_ops = Vec::new();
789 let mut rowids = Vec::new();
790 for m in &messages {
791 assert_eq!(m.msg_type, EVENT_OBJCHANGE);
792 assert_eq!(m.dbname.as_deref(), Some("FREEPDB1"));
793 assert!(m.registered);
794 assert!(m.txid.is_none());
795 for row in &m.tables[0].rows {
796 row_ops.push(row.operation);
797 rowids.push(row.rowid.clone());
798 }
799 }
800 assert_eq!(row_ops, vec![2, 4, 2, 8]);
801 assert_eq!(
802 rowids,
803 vec![
804 "AAASjMAAYAAAJO3AAA",
805 "AAASjMAAYAAAJO3AAA",
806 "AAASjMAAYAAAJO3AAB",
807 "AAASjMAAYAAAJO3AAB",
808 ]
809 );
810 assert!(messages[4].tables[0].rows.is_empty());
812 }
813}