1#![forbid(unsafe_code)]
2
3use super::*;
18use crate::wire::{ProtocolLimits, TtcReader, TtcWriter};
19
20#[derive(Clone, Debug, Default, PartialEq, Eq)]
22pub struct SubscribeResult {
23 pub registration_id: u64,
25 pub client_id: Option<Vec<u8>>,
27}
28
29#[derive(Clone, Debug, PartialEq, Eq)]
31pub struct MsgRow {
32 pub operation: u32,
33 pub rowid: String,
34}
35
36#[derive(Clone, Debug, PartialEq, Eq)]
38pub struct MsgTable {
39 pub operation: u32,
40 pub name: String,
41 pub rows: Vec<MsgRow>,
42}
43
44#[derive(Clone, Debug, PartialEq, Eq)]
46pub struct MsgQuery {
47 pub id: u64,
48 pub operation: u32,
49 pub tables: Vec<MsgTable>,
50}
51
52#[derive(Clone, Debug, PartialEq, Eq)]
54pub struct NotificationMessage {
55 pub msg_type: u32,
57 pub dbname: Option<String>,
58 pub txid: Option<Vec<u8>>,
60 pub registered: bool,
61 pub queue_name: Option<String>,
62 pub consumer_name: Option<String>,
63 pub msgid: Option<Vec<u8>>,
64 pub tables: Vec<MsgTable>,
65 pub queries: Vec<MsgQuery>,
66}
67
68#[derive(Clone, Debug, PartialEq, Eq)]
70pub enum NotificationRecord {
71 Message {
74 message: NotificationMessage,
75 end_of_response: bool,
76 },
77 Stop,
79}
80
81fn write_function_code_token(w: &mut TtcWriter, function_code: u8, seq_num: u8, field_version: u8) {
86 w.write_function_code_with_seq(function_code, seq_num);
87 if field_version >= TNS_CCAP_FIELD_VERSION_23_1_EXT_1 {
88 w.write_ub8(0);
89 }
90}
91
92#[allow(clippy::too_many_arguments)]
98pub fn build_subscribe_payload_with_seq(
99 seq_num: u8,
100 opcode: u8,
101 username: Option<&str>,
102 client_id: Option<&[u8]>,
103 namespace: u32,
104 name: Option<&str>,
105 public_qos: u32,
106 operations: u32,
107 timeout: u32,
108 grouping_class: u8,
109 grouping_value: u32,
110 grouping_type: u8,
111 registration_id: u64,
112 field_version: u8,
113) -> Result<Vec<u8>> {
114 let mut qos = TNS_SUBSCR_QOS_SECURE;
116 if public_qos & SUBSCR_QOS_RELIABLE != 0 {
117 qos |= TNS_SUBSCR_QOS_RELIABLE;
118 }
119 if public_qos & SUBSCR_QOS_DEREG_NFY != 0 {
120 qos |= TNS_SUBSCR_QOS_PURGE_ON_NTFN;
121 }
122 let mut flags = operations;
124 if public_qos & SUBSCR_QOS_QUERY != 0 {
125 flags |= TNS_SUBSCR_FLAGS_QUERY;
126 }
127 if public_qos & SUBSCR_QOS_ROWIDS != 0 {
128 flags |= TNS_SUBSCR_FLAGS_INCLUDE_ROWIDS;
129 }
130 let grouping_type = if grouping_class == 0 {
132 0
133 } else {
134 grouping_type
135 };
136
137 let username_bytes = username.map(str::as_bytes);
138
139 let mut w = TtcWriter::new();
140 write_function_code_token(&mut w, TNS_FUNC_SUBSCRIBE, seq_num, field_version);
141 w.write_u8(opcode);
142 w.write_ub4(TNS_SUBSCR_MODE_CLIENT_INITIATED);
143 match username_bytes {
144 Some(bytes) => {
145 w.write_u8(1); w.write_ub4(u32::try_from(bytes.len()).unwrap_or(u32::MAX));
147 }
148 None => {
149 w.write_u8(0);
150 w.write_ub4(0);
151 }
152 }
153 match client_id {
154 Some(bytes) => {
155 w.write_u8(1); w.write_ub4(u32::try_from(bytes.len()).unwrap_or(u32::MAX));
157 }
158 None => {
159 w.write_u8(0);
160 w.write_ub4(0);
161 }
162 }
163 w.write_u8(1); w.write_ub4(1); w.write_ub2(1); w.write_ub2(6); w.write_u8(0); w.write_u8(1); w.write_u8(0); w.write_u8(1); if field_version >= TNS_CCAP_FIELD_VERSION_12_1 {
172 w.write_u8(1); w.write_u8(1); w.write_u8(1); w.write_u8(1); w.write_u8(1); w.write_ub4(TNS_SUBSCR_CLIENT_ID_LEN);
178 w.write_u8(1); }
180 if let Some(bytes) = username_bytes {
181 w.write_bytes_with_length(bytes)?;
182 }
183 if let Some(bytes) = client_id {
184 w.write_bytes_with_length(bytes)?;
185 }
186 w.write_ub4(namespace);
187 match name {
188 Some(name) => w.write_bytes_with_two_lengths(Some(name.as_bytes()))?,
189 None => w.write_ub4(0),
190 }
191 w.write_ub4(0); w.write_ub4(0); w.write_ub4(qos);
194 w.write_ub4(0); w.write_ub4(timeout);
196 w.write_ub4(0); w.write_ub4(flags);
198 w.write_ub4(0); w.write_ub4(0); w.write_u8(grouping_class);
201 w.write_ub4(grouping_value);
202 w.write_u8(grouping_type);
203 w.write_ub4(0); w.write_ub4(0);
207 w.write_ub8(registration_id);
208 Ok(w.into_bytes())
209}
210
211pub fn parse_subscribe_response(
215 payload: &[u8],
216 capabilities: ClientCapabilities,
217) -> Result<SubscribeResult> {
218 parse_subscribe_response_with_limits(payload, capabilities, ProtocolLimits::DEFAULT)
219}
220
221pub fn parse_subscribe_response_with_limits(
222 payload: &[u8],
223 capabilities: ClientCapabilities,
224 limits: ProtocolLimits,
225) -> Result<SubscribeResult> {
226 let mut reader = TtcReader::with_limits(payload, limits)?;
227 let mut result = SubscribeResult::default();
228 let field_version = capabilities.ttc_field_version;
229 while reader.remaining() > 0 {
230 let message_type = reader.read_u8()?;
231 match message_type {
232 0 => {}
233 TNS_MSG_TYPE_PARAMETER => {
234 parse_subscribe_return_parameters(&mut reader, field_version, &mut result)?;
235 }
236 TNS_MSG_TYPE_STATUS => {
237 let _call_status = reader.read_ub4()?;
238 let _seq = reader.read_ub2()?;
239 }
240 TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
241 let _ = skip_server_side_piggyback(&mut reader)?;
242 }
243 TNS_MSG_TYPE_END_OF_RESPONSE => break,
244 TNS_MSG_TYPE_ERROR => {
245 let info = parse_server_error_info(&mut reader, field_version)?;
246 if info.number != 0 {
247 return Err(ProtocolError::ServerError(info.message));
248 }
249 }
250 _ => {
251 return Err(ProtocolError::UnknownMessageType {
252 message_type,
253 position: reader.position().saturating_sub(1),
254 })
255 }
256 }
257 }
258 Ok(result)
259}
260
261fn parse_subscribe_return_parameters(
262 reader: &mut TtcReader<'_>,
263 field_version: u8,
264 result: &mut SubscribeResult,
265) -> Result<()> {
266 let num_values = reader.read_ub4()?; for _ in 0..num_values {
268 let _ = reader.read_ub4()?;
269 }
270 for _ in 0..num_values {
271 let _ = reader.read_ub4()?; }
273 let num_values = reader.read_ub4()?; for _ in 0..num_values {
275 result.registration_id = reader.read_ub8()?;
276 if field_version >= TNS_CCAP_FIELD_VERSION_12_1 {
277 let _subscriber_name = reader.read_bytes_with_length()?;
278 }
279 }
280 if field_version >= TNS_CCAP_FIELD_VERSION_12_1 {
281 let num_instances = reader.read_ub4()?;
282 for _ in 0..num_instances {
283 let _ = reader.read_bytes_with_length()?;
284 }
285 let num_listeners = reader.read_ub4()?;
286 for _ in 0..num_listeners {
287 let _ = reader.read_bytes_with_length()?;
288 }
289 result.client_id = reader.read_bytes_with_length()?;
290 }
291 Ok(())
292}
293
294pub fn build_notify_payload_with_seq(
298 seq_num: u8,
299 client_id: &[u8],
300 field_version: u8,
301) -> Result<Vec<u8>> {
302 let mut w = TtcWriter::new();
303 write_function_code_token(&mut w, TNS_FUNC_NOTIFY, seq_num, field_version);
304 w.write_ub4(u32::try_from(client_id.len()).unwrap_or(u32::MAX));
305 w.write_bytes_with_length(client_id)?;
306 w.write_u8(TNS_INIT_KPNDRREQ);
307 w.write_ub4(0);
308 Ok(w.into_bytes())
309}
310
311pub fn parse_notification_stream(
319 payload: &[u8],
320 namespace: u32,
321 public_qos: u32,
322 db_name: Option<&str>,
323) -> Result<Vec<NotificationRecord>> {
324 parse_notification_stream_with_limits(
325 payload,
326 namespace,
327 public_qos,
328 db_name,
329 ProtocolLimits::DEFAULT,
330 )
331}
332
333pub fn parse_notification_stream_with_limits(
334 payload: &[u8],
335 namespace: u32,
336 public_qos: u32,
337 db_name: Option<&str>,
338 limits: ProtocolLimits,
339) -> Result<Vec<NotificationRecord>> {
340 let mut reader = TtcReader::with_limits(payload, limits)?;
341 let message_type = reader.read_u8()?; if message_type != TNS_MSG_TYPE_OAC {
343 return Err(ProtocolError::UnknownMessageType {
344 message_type,
345 position: reader.position().saturating_sub(1),
346 });
347 }
348 let mut records = Vec::new();
349 while reader.remaining() > 0 {
350 let record =
351 parse_oac_record_with_limits(&mut reader, namespace, public_qos, db_name, limits)?;
352 let end = match &record {
353 NotificationRecord::Stop => true,
354 NotificationRecord::Message {
355 end_of_response, ..
356 } => *end_of_response,
357 };
358 records.push(record);
359 if end {
360 break;
361 }
362 }
363 Ok(records)
364}
365
366pub fn check_notification_header(bytes: &[u8]) -> Result<usize> {
370 check_notification_header_with_limits(bytes, ProtocolLimits::DEFAULT)
371}
372
373pub fn check_notification_header_with_limits(
374 bytes: &[u8],
375 limits: ProtocolLimits,
376) -> Result<usize> {
377 let mut reader = TtcReader::with_limits(bytes, limits)?;
378 let message_type = reader.read_u8()?;
379 if message_type != TNS_MSG_TYPE_OAC {
380 return Err(ProtocolError::UnknownMessageType {
381 message_type,
382 position: 0,
383 });
384 }
385 Ok(reader.position())
386}
387
388pub fn try_parse_oac_record(
394 bytes: &[u8],
395 namespace: u32,
396 public_qos: u32,
397 db_name: Option<&str>,
398) -> Result<Option<(NotificationRecord, usize)>> {
399 try_parse_oac_record_with_limits(
400 bytes,
401 namespace,
402 public_qos,
403 db_name,
404 ProtocolLimits::DEFAULT,
405 )
406}
407
408pub fn try_parse_oac_record_with_limits(
409 bytes: &[u8],
410 namespace: u32,
411 public_qos: u32,
412 db_name: Option<&str>,
413 limits: ProtocolLimits,
414) -> Result<Option<(NotificationRecord, usize)>> {
415 let mut reader = TtcReader::with_limits(bytes, limits)?;
416 match parse_oac_record_with_limits(&mut reader, namespace, public_qos, db_name, limits) {
417 Ok(record) => Ok(Some((record, reader.position()))),
418 Err(_) => Ok(None),
422 }
423}
424
425pub fn parse_oac_record(
428 reader: &mut TtcReader<'_>,
429 namespace: u32,
430 public_qos: u32,
431 db_name: Option<&str>,
432) -> Result<NotificationRecord> {
433 parse_oac_record_with_limits(reader, namespace, public_qos, db_name, reader.limits())
434}
435
436pub fn parse_oac_record_with_limits(
437 reader: &mut TtcReader<'_>,
438 namespace: u32,
439 public_qos: u32,
440 db_name: Option<&str>,
441 limits: ProtocolLimits,
442) -> Result<NotificationRecord> {
443 let message_type = reader.read_ub4()?;
444 if message_type == TNS_SUBSCR_STOP_NOTIF {
445 return Ok(NotificationRecord::Stop);
446 }
447 let _error_code = reader.read_ub4()?;
448 let _registration_id = reader.read_ub4()?;
449 let queue_name = reader.read_string_with_length()?;
450 let consumer_name = reader.read_string_with_length()?;
451 let msgid = reader.read_bytes_with_length()?;
452 let num_props = reader.read_ub4()?;
453 if num_props > 0 {
454 let _ = reader.read_u8()?;
458 skip_msg_props(reader, num_props)?;
459 }
460 skip_bytes_with_length(reader)?; let mut payload: Option<Vec<u8>> = None;
463 if namespace != TNS_SUBSCR_NAMESPACE_AQ {
464 let _payload_type = reader.read_ub4()?;
465 let _payload_flags = reader.read_ub4()?;
466 let _chunk_number = reader.read_ub4()?;
467 payload = reader.read_bytes_with_length()?;
468 skip_bytes_with_length(reader)?; }
470
471 let mut message = NotificationMessage {
472 msg_type: 0,
473 dbname: db_name.map(str::to_string),
474 txid: None,
475 registered: false,
476 queue_name,
477 consumer_name,
478 msgid,
479 tables: Vec::new(),
480 queries: Vec::new(),
481 };
482 let end_of_response = process_notification_payload(
483 payload.as_deref(),
484 namespace,
485 public_qos,
486 limits,
487 &mut message,
488 )?;
489 Ok(NotificationRecord::Message {
490 message,
491 end_of_response,
492 })
493}
494
495fn process_notification_payload(
498 payload: Option<&[u8]>,
499 namespace: u32,
500 public_qos: u32,
501 limits: ProtocolLimits,
502 message: &mut NotificationMessage,
503) -> Result<bool> {
504 if namespace == TNS_SUBSCR_NAMESPACE_AQ {
505 message.msg_type = EVENT_AQ;
506 return Ok(false);
507 }
508 let Some(payload) = payload else {
509 message.msg_type = EVENT_DEREG;
511 return Ok(true);
512 };
513 let mut end_of_response = false;
514 if public_qos & SUBSCR_QOS_DEREG_NFY != 0 {
515 message.registered = false;
516 end_of_response = true;
517 } else {
518 message.registered = true;
519 }
520 let mut cur = ByteCursor::with_limits(payload, limits)?;
522 let _version = cur.u16be()?;
523 let _registration_id = cur.u32be()?;
524 let event_type = cur.u32be()?;
525 message.msg_type = event_type;
526 let dbname_len = cur.u16be()? as usize;
527 let dbname = cur.raw(dbname_len)?;
528 message.dbname = Some(
529 String::from_utf8(dbname.to_vec())
530 .map_err(|_| ProtocolError::TtcDecode("notification dbname not UTF-8"))?,
531 );
532 cur.skip(14)?; if event_type == EVENT_OBJCHANGE {
534 message.tables = process_tables(&mut cur)?;
535 } else if event_type == EVENT_QUERYCHANGE {
536 message.queries = process_queries(&mut cur)?;
537 }
538 Ok(end_of_response)
539}
540
541fn process_tables(cur: &mut ByteCursor<'_>) -> Result<Vec<MsgTable>> {
542 let num_tables = cur.u16be()?;
543 let mut tables: Vec<MsgTable> = cur.with_capacity_limited(
547 num_tables as usize,
548 6,
549 ProtocolLimits::check_length_prefixed_elements,
550 )?;
551 for _ in 0..num_tables {
552 let operation = cur.u32be()?;
553 let name_len = cur.u16be()? as usize;
554 let name = String::from_utf8(cur.raw(name_len)?.to_vec())
555 .map_err(|_| ProtocolError::TtcDecode("table name not UTF-8"))?;
556 let _object_num = cur.u32be()?;
557 let rows = if operation & OPCODE_ALLROWS == 0 {
558 process_rows(cur)?
559 } else {
560 Vec::new()
561 };
562 tables.push(MsgTable {
563 operation,
564 name,
565 rows,
566 });
567 }
568 Ok(tables)
569}
570
571fn process_rows(cur: &mut ByteCursor<'_>) -> Result<Vec<MsgRow>> {
572 let num_rows = cur.u16be()?;
573 let mut rows: Vec<MsgRow> = cur.with_capacity_limited(
576 num_rows as usize,
577 6,
578 ProtocolLimits::check_length_prefixed_elements,
579 )?;
580 for _ in 0..num_rows {
581 let operation = cur.u32be()?;
582 let rowid_len = cur.u16be()? as usize;
583 let rowid = String::from_utf8(cur.raw(rowid_len)?.to_vec())
584 .map_err(|_| ProtocolError::TtcDecode("rowid not UTF-8"))?;
585 rows.push(MsgRow { operation, rowid });
586 }
587 Ok(rows)
588}
589
590fn process_queries(cur: &mut ByteCursor<'_>) -> Result<Vec<MsgQuery>> {
591 let num_queries = cur.u16be()?;
592 let mut queries: Vec<MsgQuery> = cur.with_capacity_limited(
595 num_queries as usize,
596 12,
597 ProtocolLimits::check_length_prefixed_elements,
598 )?;
599 for _ in 0..num_queries {
600 let id_lsb = u64::from(cur.u32be()?);
601 let id_msb = u64::from(cur.u32be()?);
602 let id = (id_msb << 32) | id_lsb;
603 let operation = cur.u32be()?;
604 let tables = process_tables(cur)?;
605 queries.push(MsgQuery {
606 id,
607 operation,
608 tables,
609 });
610 }
611 Ok(queries)
612}
613
614fn skip_msg_props(reader: &mut TtcReader<'_>, num_props: u32) -> Result<()> {
617 for _ in 0..num_props {
618 skip_bytes_with_length(reader)?; skip_bytes_with_length(reader)?; }
621 Ok(())
622}
623
624fn skip_bytes_with_length(reader: &mut TtcReader<'_>) -> Result<()> {
625 let _ = reader.read_bytes_with_length()?;
626 Ok(())
627}
628
629struct ByteCursor<'a> {
631 bytes: &'a [u8],
632 pos: usize,
633 limits: ProtocolLimits,
634}
635
636impl<'a> ByteCursor<'a> {
637 #[cfg(test)]
638 fn new(bytes: &'a [u8]) -> Self {
639 Self {
640 bytes,
641 pos: 0,
642 limits: ProtocolLimits::DEFAULT,
643 }
644 }
645
646 fn with_limits(bytes: &'a [u8], limits: ProtocolLimits) -> Result<Self> {
647 let limits = limits.validate()?;
648 limits.check_response_bytes(bytes.len())?;
649 Ok(Self {
650 bytes,
651 pos: 0,
652 limits,
653 })
654 }
655
656 fn raw(&mut self, n: usize) -> Result<&'a [u8]> {
657 let end = self
658 .pos
659 .checked_add(n)
660 .ok_or(ProtocolError::TtcDecode("notification payload overflow"))?;
661 let slice = self
662 .bytes
663 .get(self.pos..end)
664 .ok_or(ProtocolError::TtcDecode("notification payload truncated"))?;
665 self.pos = end;
666 Ok(slice)
667 }
668
669 fn skip(&mut self, n: usize) -> Result<()> {
670 let _ = self.raw(n)?;
671 Ok(())
672 }
673
674 fn u16be(&mut self) -> Result<u16> {
675 let bytes = self.raw(2)?;
676 Ok(u16::from_be_bytes([bytes[0], bytes[1]]))
677 }
678
679 fn u32be(&mut self) -> Result<u32> {
680 let bytes = self.raw(4)?;
681 Ok(u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
682 }
683}
684
685impl crate::wire::BoundedReader for ByteCursor<'_> {
686 fn remaining(&self) -> usize {
687 self.bytes.len().saturating_sub(self.pos)
688 }
689
690 fn protocol_limits(&self) -> ProtocolLimits {
691 self.limits
692 }
693}
694
695#[cfg(test)]
696mod tests {
697 use super::*;
698
699 #[test]
706 fn cqn_oversized_table_count_fails_closed_not_oom() {
707 let bytes = [0xFFu8, 0xFF];
709 let mut cur = ByteCursor::new(&bytes);
710 let err = process_tables(&mut cur).expect_err("oversized table count must fail closed");
711 assert!(matches!(err, ProtocolError::TtcDecode(_)), "got {err:?}");
712 let cur2 = ByteCursor::new(&bytes);
714 let v: Vec<MsgTable> = cur2.with_capacity_bounded(0xFFFF, 6);
715 assert!(v.capacity() <= 1, "reservation capped by remaining bytes");
716 }
717
718 #[test]
719 fn cqn_table_count_respects_protocol_element_limit() {
720 let bytes = [0x00u8, 0x02];
723 let limits = ProtocolLimits {
724 max_length_prefixed_elements: 1,
725 ..ProtocolLimits::DEFAULT
726 };
727 let mut cur = ByteCursor::with_limits(&bytes, limits).expect("valid limits");
728 let err = process_tables(&mut cur).expect_err("table count above policy must fail");
729 assert!(
730 matches!(
731 err,
732 ProtocolError::ResourceLimit {
733 limit: "length_prefixed_elements",
734 observed: 2,
735 maximum: 1,
736 }
737 ),
738 "got {err:?}"
739 );
740 }
741
742 fn caps_12_1() -> ClientCapabilities {
743 ClientCapabilities {
744 ttc_field_version: 24,
745 ..ClientCapabilities::default()
746 }
747 }
748
749 #[test]
750 fn subscribe_register_payload_matches_golden() {
751 let payload = build_subscribe_payload_with_seq(
754 0x03,
755 TNS_SUBSCR_OP_REGISTER,
756 Some("pythontest"),
757 None,
758 TNS_SUBSCR_NAMESPACE_DBCHANGE,
759 None,
760 SUBSCR_QOS_ROWIDS,
761 0, 10,
763 0,
764 0,
765 0,
766 0,
767 24,
768 )
769 .expect("subscribe payload");
770 let expected: &[u8] = &[
773 0x03, 0x7D, 0x03, 0x00, 0x01, 0x01, 0x04, 0x01, 0x01, 0x0A, 0x00, 0x00, 0x01, 0x01,
774 0x01, 0x01, 0x01, 0x01, 0x06, 0x00, 0x01, 0x00, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01,
775 0x01, 0x1D, 0x01, 0x0A, 0x70, 0x79, 0x74, 0x68, 0x6F, 0x6E, 0x74, 0x65, 0x73, 0x74,
776 0x01, 0x02, 0x00, 0x00, 0x00, 0x01, 0x08, 0x00, 0x01, 0x0A, 0x00, 0x01, 0x10, 0x00,
777 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
778 ];
779 assert_eq!(payload, expected);
780 }
781
782 #[test]
783 fn subscribe_unregister_payload_matches_golden() {
784 let payload = build_subscribe_payload_with_seq(
787 0x0A,
788 TNS_SUBSCR_OP_UNREGISTER,
789 Some("pythontest"),
790 Some(b"OCI:EP:301"),
791 TNS_SUBSCR_NAMESPACE_DBCHANGE,
792 None,
793 SUBSCR_QOS_ROWIDS,
794 0,
795 10,
796 0,
797 0,
798 0,
799 302,
800 24,
801 )
802 .expect("unsubscribe payload");
803 let expected: &[u8] = &[
804 0x03, 0x7D, 0x0A, 0x00, 0x02, 0x01, 0x04, 0x01, 0x01, 0x0A, 0x01, 0x01, 0x0A, 0x01,
805 0x01, 0x01, 0x01, 0x01, 0x01, 0x06, 0x00, 0x01, 0x00, 0x01, 0x01, 0x01, 0x01, 0x01,
806 0x01, 0x01, 0x1D, 0x01, 0x0A, 0x70, 0x79, 0x74, 0x68, 0x6F, 0x6E, 0x74, 0x65, 0x73,
807 0x74, 0x0A, 0x4F, 0x43, 0x49, 0x3A, 0x45, 0x50, 0x3A, 0x33, 0x30, 0x31, 0x01, 0x02,
808 0x00, 0x00, 0x00, 0x01, 0x08, 0x00, 0x01, 0x0A, 0x00, 0x01, 0x10, 0x00, 0x00, 0x00,
809 0x00, 0x00, 0x00, 0x00, 0x02, 0x01, 0x2E,
810 ];
811 assert_eq!(payload, expected);
812 }
813
814 #[test]
815 fn notify_payload_matches_golden() {
816 let payload =
818 build_notify_payload_with_seq(0x03, b"OCI:EP:301", 24).expect("notify payload");
819 let want: &[u8] = &[
821 0x03, 0xBB, 0x03, 0x00, 0x01, 0x0A, 0x0A, 0x4F, 0x43, 0x49, 0x3A, 0x45, 0x50, 0x3A,
822 0x33, 0x30, 0x31, 0x01, 0x00,
823 ];
824 assert_eq!(payload, want);
825 }
826
827 #[test]
828 fn subscribe_response_decodes_registration_and_client_id() {
829 let payload: &[u8] = &[
831 0x08, 0x01, 0x01, 0x00, 0x02, 0x01, 0x2E, 0x01, 0x01, 0x02, 0x01, 0x2E, 0x00, 0x00,
832 0x01, 0x01, 0x01, 0x36, 0x36, 0x28, 0x41, 0x44, 0x44, 0x52, 0x45, 0x53, 0x53, 0x3D,
833 0x28, 0x50, 0x52, 0x4F, 0x54, 0x4F, 0x43, 0x4F, 0x4C, 0x3D, 0x54, 0x43, 0x50, 0x29,
834 0x28, 0x48, 0x4F, 0x53, 0x54, 0x3D, 0x32, 0x39, 0x30, 0x61, 0x63, 0x30, 0x33, 0x30,
835 0x30, 0x33, 0x38, 0x37, 0x29, 0x28, 0x50, 0x4F, 0x52, 0x54, 0x3D, 0x31, 0x35, 0x32,
836 0x31, 0x29, 0x29, 0x01, 0x0A, 0x0A, 0x4F, 0x43, 0x49, 0x3A, 0x45, 0x50, 0x3A, 0x33,
837 0x30, 0x31, 0x09, 0x01, 0x01, 0x02, 0xDD, 0x48, 0x1D,
838 ];
839 let result = parse_subscribe_response(payload, caps_12_1()).expect("subscribe response");
840 assert_eq!(result.registration_id, 302);
841 assert_eq!(result.client_id.as_deref(), Some(&b"OCI:EP:301"[..]));
842 }
843
844 const NOTIF_STREAM: &[u8] = &[
848 0x0d, 0x01, 0x03, 0x00, 0x02, 0x01, 0x2e, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
849 0x00, 0x01, 0x60, 0x60, 0x00, 0x01, 0x02, 0xa4, 0xe2, 0x7a, 0x00, 0x00, 0x00, 0x06, 0x00,
850 0x08, 0x46, 0x52, 0x45, 0x45, 0x50, 0x44, 0x42, 0x31, 0x01, 0x00, 0x10, 0x00, 0xd2, 0x03,
851 0x00, 0x00, 0xe2, 0x7a, 0x00, 0x00, 0x00, 0x9b, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00,
852 0x18, 0x50, 0x59, 0x54, 0x48, 0x4f, 0x4e, 0x54, 0x45, 0x53, 0x54, 0x2e, 0x54, 0x45, 0x53,
853 0x54, 0x54, 0x45, 0x4d, 0x50, 0x54, 0x41, 0x42, 0x4c, 0x45, 0x00, 0x01, 0x1c, 0x4a, 0x00,
854 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x12, 0x41, 0x41, 0x41, 0x53, 0x6a, 0x4d, 0x41, 0x41,
855 0x59, 0x41, 0x41, 0x41, 0x4a, 0x4f, 0x33, 0x41, 0x41, 0x41, 0x00, 0x01, 0x03, 0x00, 0x02,
856 0x01, 0x2e, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x60, 0x60, 0x00,
857 0x01, 0x00, 0x00, 0x89, 0x00, 0x00, 0x00, 0x00, 0x06, 0x00, 0x08, 0x46, 0x52, 0x45, 0x45,
858 0x50, 0x44, 0x42, 0x31, 0x03, 0x00, 0x19, 0x00, 0x98, 0x04, 0x00, 0x00, 0x0b, 0x00, 0x00,
859 0x00, 0x01, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x18, 0x50, 0x59, 0x54, 0x48,
860 0x4f, 0x4e, 0x54, 0x45, 0x53, 0x54, 0x2e, 0x54, 0x45, 0x53, 0x54, 0x54, 0x45, 0x4d, 0x50,
861 0x54, 0x41, 0x42, 0x4c, 0x45, 0x00, 0x01, 0x1c, 0x4a, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04,
862 0x00, 0x12, 0x41, 0x41, 0x41, 0x53, 0x6a, 0x4d, 0x41, 0x41, 0x59, 0x41, 0x41, 0x41, 0x4a,
863 0x4f, 0x33, 0x41, 0x41, 0x41, 0x00, 0x01, 0x03, 0x00, 0x02, 0x01, 0x2e, 0x00, 0x00, 0x00,
864 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x60, 0x60, 0x00, 0x01, 0x03, 0x00, 0x89, 0x00,
865 0x00, 0x00, 0x00, 0x06, 0x00, 0x08, 0x46, 0x52, 0x45, 0x45, 0x50, 0x44, 0x42, 0x31, 0x05,
866 0x00, 0x06, 0x00, 0xa9, 0x04, 0x00, 0x00, 0xe2, 0x7a, 0x00, 0x00, 0x44, 0x32, 0x00, 0x01,
867 0x00, 0x00, 0x00, 0x02, 0x00, 0x18, 0x50, 0x59, 0x54, 0x48, 0x4f, 0x4e, 0x54, 0x45, 0x53,
868 0x54, 0x2e, 0x54, 0x45, 0x53, 0x54, 0x54, 0x45, 0x4d, 0x50, 0x54, 0x41, 0x42, 0x4c, 0x45,
869 0x00, 0x01, 0x1c, 0x4a, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x12, 0x41, 0x41, 0x41,
870 0x53, 0x6a, 0x4d, 0x41, 0x41, 0x59, 0x41, 0x41, 0x41, 0x4a, 0x4f, 0x33, 0x41, 0x41, 0x42,
871 0x00, 0x01, 0x03, 0x00, 0x02, 0x01, 0x2e, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
872 0x00, 0x01, 0x60, 0x60, 0x00, 0x01, 0x03, 0xa5, 0xe2, 0x7a, 0x00, 0x00, 0x00, 0x06, 0x00,
873 0x08, 0x46, 0x52, 0x45, 0x45, 0x50, 0x44, 0x42, 0x31, 0x02, 0x00, 0x09, 0x00, 0x7d, 0x04,
874 0x00, 0x00, 0xe2, 0x7a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x08, 0x00,
875 0x18, 0x50, 0x59, 0x54, 0x48, 0x4f, 0x4e, 0x54, 0x45, 0x53, 0x54, 0x2e, 0x54, 0x45, 0x53,
876 0x54, 0x54, 0x45, 0x4d, 0x50, 0x54, 0x41, 0x42, 0x4c, 0x45, 0x00, 0x01, 0x1c, 0x4a, 0x00,
877 0x01, 0x00, 0x00, 0x00, 0x08, 0x00, 0x12, 0x41, 0x41, 0x41, 0x53, 0x6a, 0x4d, 0x41, 0x41,
878 0x59, 0x41, 0x41, 0x41, 0x4a, 0x4f, 0x33, 0x41, 0x41, 0x42, 0x00, 0x01, 0x03, 0x00, 0x02,
879 0x01, 0x2e, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x46, 0x46, 0x00,
880 0x01, 0x00, 0x00, 0x89, 0x00, 0x00, 0x00, 0x00, 0x06, 0x00, 0x08, 0x46, 0x52, 0x45, 0x45,
881 0x50, 0x44, 0x42, 0x31, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xfe, 0x7f, 0x00,
882 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x11, 0x00, 0x18, 0x50, 0x59, 0x54, 0x48,
883 0x4f, 0x4e, 0x54, 0x45, 0x53, 0x54, 0x2e, 0x54, 0x45, 0x53, 0x54, 0x54, 0x45, 0x4d, 0x50,
884 0x54, 0x41, 0x42, 0x4c, 0x45, 0x00, 0x01, 0x1c, 0x4a, 0x00,
885 ];
886
887 #[test]
888 fn notification_stream_decodes_dml_events() {
889 let records = parse_notification_stream(
890 NOTIF_STREAM,
891 TNS_SUBSCR_NAMESPACE_DBCHANGE,
892 SUBSCR_QOS_ROWIDS,
893 Some("FREEPDB1"),
894 )
895 .expect("notification stream");
896 let messages: Vec<&NotificationMessage> = records
897 .iter()
898 .filter_map(|r| match r {
899 NotificationRecord::Message { message, .. } => Some(message),
900 NotificationRecord::Stop => None,
901 })
902 .collect();
903 assert_eq!(messages.len(), 5);
904
905 let table_ops: Vec<u32> = messages.iter().map(|m| m.tables[0].operation).collect();
906 assert_eq!(table_ops, vec![2, 4, 2, 8, 17]);
907
908 let mut row_ops = Vec::new();
909 let mut rowids = Vec::new();
910 for m in &messages {
911 assert_eq!(m.msg_type, EVENT_OBJCHANGE);
912 assert_eq!(m.dbname.as_deref(), Some("FREEPDB1"));
913 assert!(m.registered);
914 assert!(m.txid.is_none());
915 for row in &m.tables[0].rows {
916 row_ops.push(row.operation);
917 rowids.push(row.rowid.clone());
918 }
919 }
920 assert_eq!(row_ops, vec![2, 4, 2, 8]);
921 assert_eq!(
922 rowids,
923 vec![
924 "AAASjMAAYAAAJO3AAA",
925 "AAASjMAAYAAAJO3AAA",
926 "AAASjMAAYAAAJO3AAB",
927 "AAASjMAAYAAAJO3AAB",
928 ]
929 );
930 assert!(messages[4].tables[0].rows.is_empty());
932 }
933}