Skip to main content

oracledb_protocol/thin/
subscr.rs

1#![forbid(unsafe_code)]
2
3//! CQN / Continuous Query Notification wire codecs (sans-io).
4//!
5//! Ports the reference thin subscription messages:
6//! - `impl/thin/messages/subscribe.pyx` — FUNC 125 register/unregister payload
7//!   and the `_process_return_parameters` response decode.
8//! - `impl/thin/messages/notification.pyx` — FUNC 187 NOTIFY payload, the OAC
9//!   record loop (`_process_oac`) and the big-endian inner CQN payload decoder
10//!   (`_process_notification_payload` / `_process_tables` / `_process_rows` /
11//!   `_process_queries`).
12//!
13//! Only the byte<->struct translation lives here; the second ("emon")
14//! connection, the background receive loop and the Python callback invocation
15//! live in the driver and pyshim crates.
16
17use super::*;
18use crate::wire::{TtcReader, TtcWriter};
19
20/// Result of decoding the SUBSCRIBE (register) response.
21#[derive(Clone, Debug, Default, PartialEq, Eq)]
22pub struct SubscribeResult {
23    /// `USER_CHANGE_NOTIFICATION_REGS.REGID` — exposed as `Subscription.id`.
24    pub registration_id: u64,
25    /// EMON client id (e.g. `b"OCI:EP:301"`) echoed back in the NOTIFY message.
26    pub client_id: Option<Vec<u8>>,
27}
28
29/// One row changed inside a table notification.
30#[derive(Clone, Debug, PartialEq, Eq)]
31pub struct MsgRow {
32    pub operation: u32,
33    pub rowid: String,
34}
35
36/// One table changed inside an OBJCHANGE / QUERYCHANGE notification.
37#[derive(Clone, Debug, PartialEq, Eq)]
38pub struct MsgTable {
39    pub operation: u32,
40    pub name: String,
41    pub rows: Vec<MsgRow>,
42}
43
44/// One query whose result set changed (QUERYCHANGE notifications).
45#[derive(Clone, Debug, PartialEq, Eq)]
46pub struct MsgQuery {
47    pub id: u64,
48    pub operation: u32,
49    pub tables: Vec<MsgTable>,
50}
51
52/// A single decoded OAC notification record handed to the user callback.
53#[derive(Clone, Debug, PartialEq, Eq)]
54pub struct NotificationMessage {
55    /// `EVENT_*` value placed on `Message.type`.
56    pub msg_type: u32,
57    pub dbname: Option<String>,
58    /// Thin never decodes the transaction id (14 bytes skipped); always `None`.
59    pub txid: Option<Vec<u8>>,
60    pub registered: bool,
61    pub queue_name: Option<String>,
62    pub consumer_name: Option<String>,
63    pub msgid: Option<Vec<u8>>,
64    pub tables: Vec<MsgTable>,
65    pub queries: Vec<MsgQuery>,
66}
67
68/// Outcome of decoding one OAC record from the notification stream.
69#[derive(Clone, Debug, PartialEq, Eq)]
70pub enum NotificationRecord {
71    /// A record to deliver to the callback. `end_of_response` mirrors the
72    /// reference flag (DEREG / DEREG_NFY terminate the loop after delivery).
73    Message {
74        message: NotificationMessage,
75        end_of_response: bool,
76    },
77    /// `TNS_SUBSCR_STOP_NOTIF` — the stream is finished; no callback fires.
78    Stop,
79}
80
81/// Writes the function-code header plus the pipeline `token_num` (always 0 for
82/// these messages) when the negotiated caps include it, mirroring
83/// `messages/base.pyx::_write_function_code` (writes `ub8 token_num` for
84/// `ttc_field_version >= TNS_CCAP_FIELD_VERSION_23_1_EXT_1`).
85fn write_function_code_token(w: &mut TtcWriter, function_code: u8, seq_num: u8, field_version: u8) {
86    w.write_function_code_with_seq(function_code, seq_num);
87    if field_version >= TNS_CCAP_FIELD_VERSION_23_1_EXT_1 {
88        w.write_ub8(0);
89    }
90}
91
92/// Build the SUBSCRIBE (FUNC 125) payload for register (`opcode = 1`) or
93/// unregister (`opcode = 2`). Ports `subscribe.pyx::_write_message`.
94///
95/// `qos`/`operations` are the *public* `SUBSCR_QOS_*` / `OPCODE_*` values; this
96/// function performs the qos/flags derivation (`subscribe.pyx:82-93`).
97#[allow(clippy::too_many_arguments)]
98pub fn build_subscribe_payload_with_seq(
99    seq_num: u8,
100    opcode: u8,
101    username: Option<&str>,
102    client_id: Option<&[u8]>,
103    namespace: u32,
104    name: Option<&str>,
105    public_qos: u32,
106    operations: u32,
107    timeout: u32,
108    grouping_class: u8,
109    grouping_value: u32,
110    grouping_type: u8,
111    registration_id: u64,
112    field_version: u8,
113) -> Result<Vec<u8>> {
114    // derive the wire qos flags
115    let mut qos = TNS_SUBSCR_QOS_SECURE;
116    if public_qos & SUBSCR_QOS_RELIABLE != 0 {
117        qos |= TNS_SUBSCR_QOS_RELIABLE;
118    }
119    if public_qos & SUBSCR_QOS_DEREG_NFY != 0 {
120        qos |= TNS_SUBSCR_QOS_PURGE_ON_NTFN;
121    }
122    // derive the wire operation flags
123    let mut flags = operations;
124    if public_qos & SUBSCR_QOS_QUERY != 0 {
125        flags |= TNS_SUBSCR_FLAGS_QUERY;
126    }
127    if public_qos & SUBSCR_QOS_ROWIDS != 0 {
128        flags |= TNS_SUBSCR_FLAGS_INCLUDE_ROWIDS;
129    }
130    // grouping_type can only be sent when a grouping class is set
131    let grouping_type = if grouping_class == 0 {
132        0
133    } else {
134        grouping_type
135    };
136
137    let username_bytes = username.map(str::as_bytes);
138
139    let mut w = TtcWriter::new();
140    write_function_code_token(&mut w, TNS_FUNC_SUBSCRIBE, seq_num, field_version);
141    w.write_u8(opcode);
142    w.write_ub4(TNS_SUBSCR_MODE_CLIENT_INITIATED);
143    match username_bytes {
144        Some(bytes) => {
145            w.write_u8(1); // pointer (username)
146            w.write_ub4(u32::try_from(bytes.len()).unwrap_or(u32::MAX));
147        }
148        None => {
149            w.write_u8(0);
150            w.write_ub4(0);
151        }
152    }
153    match client_id {
154        Some(bytes) => {
155            w.write_u8(1); // pointer (location)
156            w.write_ub4(u32::try_from(bytes.len()).unwrap_or(u32::MAX));
157        }
158        None => {
159            w.write_u8(0);
160            w.write_ub4(0);
161        }
162    }
163    w.write_u8(1); // pointer (registration)
164    w.write_ub4(1); // num registrations
165    w.write_ub2(1); // raw presentation
166    w.write_ub2(6); // version for client notification
167    w.write_u8(0); // pointer (namespace out attrs)
168    w.write_u8(1); // pointer (num elements in array)
169    w.write_u8(0); // pointer (generic out attrs)
170    w.write_u8(1); // pointer (num elements in array)
171    if field_version >= TNS_CCAP_FIELD_VERSION_12_1 {
172        w.write_u8(1); // kpninst
173        w.write_u8(1); // kpninstl
174        w.write_u8(1); // kpngcret
175        w.write_u8(1); // kpngcretl
176        w.write_u8(1); // client id
177        w.write_ub4(TNS_SUBSCR_CLIENT_ID_LEN);
178        w.write_u8(1); // client id length
179    }
180    if let Some(bytes) = username_bytes {
181        w.write_bytes_with_length(bytes)?;
182    }
183    if let Some(bytes) = client_id {
184        w.write_bytes_with_length(bytes)?;
185    }
186    w.write_ub4(namespace);
187    match name {
188        Some(name) => w.write_bytes_with_two_lengths(Some(name.as_bytes()))?,
189        None => w.write_ub4(0),
190    }
191    w.write_ub4(0); // context length
192    w.write_ub4(0); // payload type
193    w.write_ub4(qos);
194    w.write_ub4(0); // payload callback length (JMS)
195    w.write_ub4(timeout);
196    w.write_ub4(0); // kpdnsd
197    w.write_ub4(flags);
198    w.write_ub4(0); // change lag between notifications
199    w.write_ub4(0); // change registration id
200    w.write_u8(grouping_class);
201    w.write_ub4(grouping_value);
202    w.write_u8(grouping_type);
203    w.write_ub4(0); // grouping class start time
204                    // grouping repeat count: write_sb4(0); for the constant 0 this is the same
205                    // single 0x00 byte the unsigned encoder emits.
206    w.write_ub4(0);
207    w.write_ub8(registration_id);
208    Ok(w.into_bytes())
209}
210
211/// Decode the SUBSCRIBE (register) response. Ports
212/// `subscribe.pyx::_process_return_parameters`, dispatched on the
213/// `TNS_MSG_TYPE_PARAMETER` message inside the standard function response loop.
214pub fn parse_subscribe_response(
215    payload: &[u8],
216    capabilities: ClientCapabilities,
217) -> Result<SubscribeResult> {
218    let mut reader = TtcReader::new(payload);
219    let mut result = SubscribeResult::default();
220    let field_version = capabilities.ttc_field_version;
221    while reader.remaining() > 0 {
222        let message_type = reader.read_u8()?;
223        match message_type {
224            0 => {}
225            TNS_MSG_TYPE_PARAMETER => {
226                parse_subscribe_return_parameters(&mut reader, field_version, &mut result)?;
227            }
228            TNS_MSG_TYPE_STATUS => {
229                let _call_status = reader.read_ub4()?;
230                let _seq = reader.read_ub2()?;
231            }
232            TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
233                let _ = skip_server_side_piggyback(&mut reader)?;
234            }
235            TNS_MSG_TYPE_END_OF_RESPONSE => break,
236            TNS_MSG_TYPE_ERROR => {
237                let info = parse_server_error_info(&mut reader, field_version)?;
238                if info.number != 0 {
239                    return Err(ProtocolError::ServerError(info.message));
240                }
241            }
242            _ => {
243                return Err(ProtocolError::UnknownMessageType {
244                    message_type,
245                    position: reader.position().saturating_sub(1),
246                })
247            }
248        }
249    }
250    Ok(result)
251}
252
253fn parse_subscribe_return_parameters(
254    reader: &mut TtcReader<'_>,
255    field_version: u8,
256    result: &mut SubscribeResult,
257) -> Result<()> {
258    let num_values = reader.read_ub4()?; // out parameters (kpnrl)
259    for _ in 0..num_values {
260        let _ = reader.read_ub4()?;
261    }
262    for _ in 0..num_values {
263        let _ = reader.read_ub4()?; // registration id (short)
264    }
265    let num_values = reader.read_ub4()?; // out parameters (kpngrl)
266    for _ in 0..num_values {
267        result.registration_id = reader.read_ub8()?;
268        if field_version >= TNS_CCAP_FIELD_VERSION_12_1 {
269            let _subscriber_name = reader.read_bytes_with_length()?;
270        }
271    }
272    if field_version >= TNS_CCAP_FIELD_VERSION_12_1 {
273        let num_instances = reader.read_ub4()?;
274        for _ in 0..num_instances {
275            let _ = reader.read_bytes_with_length()?;
276        }
277        let num_listeners = reader.read_ub4()?;
278        for _ in 0..num_listeners {
279            let _ = reader.read_bytes_with_length()?;
280        }
281        result.client_id = reader.read_bytes_with_length()?;
282    }
283    Ok(())
284}
285
286/// Build the NOTIFY (FUNC 187) payload sent on the emon connection. Ports
287/// `notification.pyx::_write_message`. The caller must transmit this packet
288/// with the `TNS_DATA_FLAGS_END_OF_REQUEST` data flag set.
289pub fn build_notify_payload_with_seq(
290    seq_num: u8,
291    client_id: &[u8],
292    field_version: u8,
293) -> Result<Vec<u8>> {
294    let mut w = TtcWriter::new();
295    write_function_code_token(&mut w, TNS_FUNC_NOTIFY, seq_num, field_version);
296    w.write_ub4(u32::try_from(client_id.len()).unwrap_or(u32::MAX));
297    w.write_bytes_with_length(client_id)?;
298    w.write_u8(TNS_INIT_KPNDRREQ);
299    w.write_ub4(0);
300    Ok(w.into_bytes())
301}
302
303/// Decode every OAC record in a notification stream. The reference reads one
304/// leading `message_type` byte (`TNS_MSG_TYPE_OAC`) then loops `_process_oac`
305/// until `end_of_response`; the driver chains network packets into `payload`
306/// so this operates on the full concatenated TTC stream.
307///
308/// Returns the decoded records in order. A trailing [`NotificationRecord::Stop`]
309/// (or a record whose `end_of_response` is set) marks the end of the stream.
310pub fn parse_notification_stream(
311    payload: &[u8],
312    namespace: u32,
313    public_qos: u32,
314    db_name: Option<&str>,
315) -> Result<Vec<NotificationRecord>> {
316    let mut reader = TtcReader::new(payload);
317    let message_type = reader.read_u8()?; // outer process(): read_ub1(message_type)
318    if message_type != TNS_MSG_TYPE_OAC {
319        return Err(ProtocolError::UnknownMessageType {
320            message_type,
321            position: reader.position().saturating_sub(1),
322        });
323    }
324    let mut records = Vec::new();
325    while reader.remaining() > 0 {
326        let record = parse_oac_record(&mut reader, namespace, public_qos, db_name)?;
327        let end = match &record {
328            NotificationRecord::Stop => true,
329            NotificationRecord::Message {
330                end_of_response, ..
331            } => *end_of_response,
332        };
333        records.push(record);
334        if end {
335            break;
336        }
337    }
338    Ok(records)
339}
340
341/// Consume the leading `TNS_MSG_TYPE_OAC` byte that precedes the OAC record
342/// stream (`process()` reads it once before delivering any record). Returns the
343/// number of bytes consumed (1) or an error if the byte is not OAC.
344pub fn check_notification_header(bytes: &[u8]) -> Result<usize> {
345    let mut reader = TtcReader::new(bytes);
346    let message_type = reader.read_u8()?;
347    if message_type != TNS_MSG_TYPE_OAC {
348        return Err(ProtocolError::UnknownMessageType {
349            message_type,
350            position: 0,
351        });
352    }
353    Ok(reader.position())
354}
355
356/// Attempt to decode exactly one OAC record from the front of `bytes`. Returns
357/// the decoded record and the number of bytes consumed, or `Ok(None)` when the
358/// buffer does not yet hold a complete record (the caller must read more data
359/// from the EMON socket and retry — mirroring the reference `ReadBuffer`
360/// chaining packets within a single `process()` call).
361pub fn try_parse_oac_record(
362    bytes: &[u8],
363    namespace: u32,
364    public_qos: u32,
365    db_name: Option<&str>,
366) -> Result<Option<(NotificationRecord, usize)>> {
367    let mut reader = TtcReader::new(bytes);
368    match parse_oac_record(&mut reader, namespace, public_qos, db_name) {
369        Ok(record) => Ok(Some((record, reader.position()))),
370        // The server only emits well-formed records; a decode failure while the
371        // stream is still being chained means the buffer is short, so signal
372        // "need more bytes" rather than treating it as corruption.
373        Err(_) => Ok(None),
374    }
375}
376
377/// Decode a single OAC record. Ports `notification.pyx::_process_oac` plus the
378/// inner payload decode.
379pub fn parse_oac_record(
380    reader: &mut TtcReader<'_>,
381    namespace: u32,
382    public_qos: u32,
383    db_name: Option<&str>,
384) -> Result<NotificationRecord> {
385    let message_type = reader.read_ub4()?;
386    if message_type == TNS_SUBSCR_STOP_NOTIF {
387        return Ok(NotificationRecord::Stop);
388    }
389    let _error_code = reader.read_ub4()?;
390    let _registration_id = reader.read_ub4()?;
391    let queue_name = reader.read_string_with_length()?;
392    let consumer_name = reader.read_string_with_length()?;
393    let msgid = reader.read_bytes_with_length()?;
394    let num_props = reader.read_ub4()?;
395    if num_props > 0 {
396        // AQ message properties path: skip the invalid-length byte then the
397        // property records. The CQN tests never exercise this branch (AQ uses
398        // num_props == 0); skip conservatively so the stream stays aligned.
399        let _ = reader.read_u8()?;
400        skip_msg_props(reader, num_props)?;
401    }
402    skip_bytes_with_length(reader)?; // JMS message properties
403
404    let mut payload: Option<Vec<u8>> = None;
405    if namespace != TNS_SUBSCR_NAMESPACE_AQ {
406        let _payload_type = reader.read_ub4()?;
407        let _payload_flags = reader.read_ub4()?;
408        let _chunk_number = reader.read_ub4()?;
409        payload = reader.read_bytes_with_length()?;
410        skip_bytes_with_length(reader)?; // DbObject / JSON payload
411    }
412
413    let mut message = NotificationMessage {
414        msg_type: 0,
415        dbname: db_name.map(str::to_string),
416        txid: None,
417        registered: false,
418        queue_name,
419        consumer_name,
420        msgid,
421        tables: Vec::new(),
422        queries: Vec::new(),
423    };
424    let end_of_response =
425        process_notification_payload(payload.as_deref(), namespace, public_qos, &mut message)?;
426    Ok(NotificationRecord::Message {
427        message,
428        end_of_response,
429    })
430}
431
432/// Ports `_process_notification_payload`. Returns the resulting
433/// `end_of_response` flag.
434fn process_notification_payload(
435    payload: Option<&[u8]>,
436    namespace: u32,
437    public_qos: u32,
438    message: &mut NotificationMessage,
439) -> Result<bool> {
440    if namespace == TNS_SUBSCR_NAMESPACE_AQ {
441        message.msg_type = EVENT_AQ;
442        return Ok(false);
443    }
444    let Some(payload) = payload else {
445        // empty payload => registration discarded
446        message.msg_type = EVENT_DEREG;
447        return Ok(true);
448    };
449    let mut end_of_response = false;
450    if public_qos & SUBSCR_QOS_DEREG_NFY != 0 {
451        message.registered = false;
452        end_of_response = true;
453    } else {
454        message.registered = true;
455    }
456    // inner payload is a plain big-endian byte cursor
457    let mut cur = ByteCursor::new(payload);
458    let _version = cur.u16be()?;
459    let _registration_id = cur.u32be()?;
460    let event_type = cur.u32be()?;
461    message.msg_type = event_type;
462    let dbname_len = cur.u16be()? as usize;
463    let dbname = cur.raw(dbname_len)?;
464    message.dbname = Some(
465        String::from_utf8(dbname.to_vec())
466            .map_err(|_| ProtocolError::TtcDecode("notification dbname not UTF-8"))?,
467    );
468    cur.skip(14)?; // transaction id + SCN (txid intentionally left None)
469    if event_type == EVENT_OBJCHANGE {
470        message.tables = process_tables(&mut cur)?;
471    } else if event_type == EVENT_QUERYCHANGE {
472        message.queries = process_queries(&mut cur)?;
473    }
474    Ok(end_of_response)
475}
476
477fn process_tables(cur: &mut ByteCursor<'_>) -> Result<Vec<MsgTable>> {
478    let num_tables = cur.u16be()?;
479    // Each table record reads at least a u32 operation + u16 name length (6
480    // bytes) before its name, so cap the reservation by the buffer
481    // (BoundedReader); the loop still fails closed on truncation.
482    let mut tables: Vec<MsgTable> = cur.with_capacity_bounded(num_tables as usize, 6);
483    for _ in 0..num_tables {
484        let operation = cur.u32be()?;
485        let name_len = cur.u16be()? as usize;
486        let name = String::from_utf8(cur.raw(name_len)?.to_vec())
487            .map_err(|_| ProtocolError::TtcDecode("table name not UTF-8"))?;
488        let _object_num = cur.u32be()?;
489        let rows = if operation & OPCODE_ALLROWS == 0 {
490            process_rows(cur)?
491        } else {
492            Vec::new()
493        };
494        tables.push(MsgTable {
495            operation,
496            name,
497            rows,
498        });
499    }
500    Ok(tables)
501}
502
503fn process_rows(cur: &mut ByteCursor<'_>) -> Result<Vec<MsgRow>> {
504    let num_rows = cur.u16be()?;
505    // Each row record reads at least a u32 operation + u16 rowid length (6
506    // bytes); bound the reservation by the buffer (BoundedReader).
507    let mut rows: Vec<MsgRow> = cur.with_capacity_bounded(num_rows as usize, 6);
508    for _ in 0..num_rows {
509        let operation = cur.u32be()?;
510        let rowid_len = cur.u16be()? as usize;
511        let rowid = String::from_utf8(cur.raw(rowid_len)?.to_vec())
512            .map_err(|_| ProtocolError::TtcDecode("rowid not UTF-8"))?;
513        rows.push(MsgRow { operation, rowid });
514    }
515    Ok(rows)
516}
517
518fn process_queries(cur: &mut ByteCursor<'_>) -> Result<Vec<MsgQuery>> {
519    let num_queries = cur.u16be()?;
520    // Each query record reads at least three u32s (12 bytes) before its nested
521    // tables; bound the reservation by the buffer (BoundedReader).
522    let mut queries: Vec<MsgQuery> = cur.with_capacity_bounded(num_queries as usize, 12);
523    for _ in 0..num_queries {
524        let id_lsb = u64::from(cur.u32be()?);
525        let id_msb = u64::from(cur.u32be()?);
526        let id = (id_msb << 32) | id_lsb;
527        let operation = cur.u32be()?;
528        let tables = process_tables(cur)?;
529        queries.push(MsgQuery {
530            id,
531            operation,
532            tables,
533        });
534    }
535    Ok(queries)
536}
537
538/// Skip AQ message-property records (`_process_msg_props`). The CQN tests never
539/// reach this branch; this keeps the parser aligned should the server send it.
540fn skip_msg_props(reader: &mut TtcReader<'_>, num_props: u32) -> Result<()> {
541    for _ in 0..num_props {
542        skip_bytes_with_length(reader)?; // name
543        skip_bytes_with_length(reader)?; // value
544    }
545    Ok(())
546}
547
548fn skip_bytes_with_length(reader: &mut TtcReader<'_>) -> Result<()> {
549    let _ = reader.read_bytes_with_length()?;
550    Ok(())
551}
552
553/// A plain big-endian cursor over the inner CQN payload bytes (no TTC chunking).
554struct ByteCursor<'a> {
555    bytes: &'a [u8],
556    pos: usize,
557}
558
559impl<'a> ByteCursor<'a> {
560    fn new(bytes: &'a [u8]) -> Self {
561        Self { bytes, pos: 0 }
562    }
563
564    fn raw(&mut self, n: usize) -> Result<&'a [u8]> {
565        let end = self
566            .pos
567            .checked_add(n)
568            .ok_or(ProtocolError::TtcDecode("notification payload overflow"))?;
569        let slice = self
570            .bytes
571            .get(self.pos..end)
572            .ok_or(ProtocolError::TtcDecode("notification payload truncated"))?;
573        self.pos = end;
574        Ok(slice)
575    }
576
577    fn skip(&mut self, n: usize) -> Result<()> {
578        let _ = self.raw(n)?;
579        Ok(())
580    }
581
582    fn u16be(&mut self) -> Result<u16> {
583        let bytes = self.raw(2)?;
584        Ok(u16::from_be_bytes([bytes[0], bytes[1]]))
585    }
586
587    fn u32be(&mut self) -> Result<u32> {
588        let bytes = self.raw(4)?;
589        Ok(u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
590    }
591}
592
593impl crate::wire::BoundedReader for ByteCursor<'_> {
594    fn remaining(&self) -> usize {
595        self.bytes.len().saturating_sub(self.pos)
596    }
597}
598
599#[cfg(test)]
600mod tests {
601    use super::*;
602
603    // BoundedReader invariant (l2p), CQN array family: a notification table
604    // record declaring the maximum num_tables (0xFFFF) but carrying no table
605    // bytes must fail closed via the bounded reservation + the per-record read,
606    // not pre-allocate 65535 MsgTable structs from the count. (num_tables is a
607    // u16 so this was never a multi-GB OOM, but routing it through the bound
608    // keeps the whole class uniform and regression-proof.)
609    #[test]
610    fn cqn_oversized_table_count_fails_closed_not_oom() {
611        // num_tables = 0xFFFF (u16), then nothing.
612        let bytes = [0xFFu8, 0xFF];
613        let mut cur = ByteCursor::new(&bytes);
614        let err = process_tables(&mut cur).expect_err("oversized table count must fail closed");
615        assert!(matches!(err, ProtocolError::TtcDecode(_)), "got {err:?}");
616        // The pre-allocation never exceeds remaining()/6 even for the max count.
617        let cur2 = ByteCursor::new(&bytes);
618        let v: Vec<MsgTable> = cur2.with_capacity_bounded(0xFFFF, 6);
619        assert!(v.capacity() <= 1, "reservation capped by remaining bytes");
620    }
621
622    fn caps_12_1() -> ClientCapabilities {
623        ClientCapabilities {
624            ttc_field_version: 24,
625            ..ClientCapabilities::default()
626        }
627    }
628
629    #[test]
630    fn subscribe_register_payload_matches_golden() {
631        // Golden /tmp/cqn_trace.txt line 2421 (op 8, socket 5), payload after
632        // the 2-byte data flags. seq byte is 0x03 in the capture.
633        let payload = build_subscribe_payload_with_seq(
634            0x03,
635            TNS_SUBSCR_OP_REGISTER,
636            Some("pythontest"),
637            None,
638            TNS_SUBSCR_NAMESPACE_DBCHANGE,
639            None,
640            SUBSCR_QOS_ROWIDS,
641            0, // OPCODE_ALLOPS
642            10,
643            0,
644            0,
645            0,
646            0,
647            24,
648        )
649        .expect("subscribe payload");
650        // real capture TTC payload (token byte 0x00 follows the seq):
651        // 03 7d 03 00 01 01 04 01 01 0a 00 00 01 01 01 01 01 01 06 00 ...
652        let expected: &[u8] = &[
653            0x03, 0x7D, 0x03, 0x00, 0x01, 0x01, 0x04, 0x01, 0x01, 0x0A, 0x00, 0x00, 0x01, 0x01,
654            0x01, 0x01, 0x01, 0x01, 0x06, 0x00, 0x01, 0x00, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01,
655            0x01, 0x1D, 0x01, 0x0A, 0x70, 0x79, 0x74, 0x68, 0x6F, 0x6E, 0x74, 0x65, 0x73, 0x74,
656            0x01, 0x02, 0x00, 0x00, 0x00, 0x01, 0x08, 0x00, 0x01, 0x0A, 0x00, 0x01, 0x10, 0x00,
657            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
658        ];
659        assert_eq!(payload, expected);
660    }
661
662    #[test]
663    fn subscribe_unregister_payload_matches_golden() {
664        // Golden /tmp/cqn_trace.txt line 4029 (op 22, socket 5). seq 0x0A,
665        // opcode 2, client_id now set to "OCI:EP:301", reg id 302 in the tail.
666        let payload = build_subscribe_payload_with_seq(
667            0x0A,
668            TNS_SUBSCR_OP_UNREGISTER,
669            Some("pythontest"),
670            Some(b"OCI:EP:301"),
671            TNS_SUBSCR_NAMESPACE_DBCHANGE,
672            None,
673            SUBSCR_QOS_ROWIDS,
674            0,
675            10,
676            0,
677            0,
678            0,
679            302,
680            24,
681        )
682        .expect("unsubscribe payload");
683        let expected: &[u8] = &[
684            0x03, 0x7D, 0x0A, 0x00, 0x02, 0x01, 0x04, 0x01, 0x01, 0x0A, 0x01, 0x01, 0x0A, 0x01,
685            0x01, 0x01, 0x01, 0x01, 0x01, 0x06, 0x00, 0x01, 0x00, 0x01, 0x01, 0x01, 0x01, 0x01,
686            0x01, 0x01, 0x1D, 0x01, 0x0A, 0x70, 0x79, 0x74, 0x68, 0x6F, 0x6E, 0x74, 0x65, 0x73,
687            0x74, 0x0A, 0x4F, 0x43, 0x49, 0x3A, 0x45, 0x50, 0x3A, 0x33, 0x30, 0x31, 0x01, 0x02,
688            0x00, 0x00, 0x00, 0x01, 0x08, 0x00, 0x01, 0x0A, 0x00, 0x01, 0x10, 0x00, 0x00, 0x00,
689            0x00, 0x00, 0x00, 0x00, 0x02, 0x01, 0x2E,
690        ];
691        assert_eq!(payload, expected);
692    }
693
694    #[test]
695    fn notify_payload_matches_golden() {
696        // Golden /tmp/cqn_trace.txt line 3647 (op 8, socket 6) after data flags.
697        let payload =
698            build_notify_payload_with_seq(0x03, b"OCI:EP:301", 24).expect("notify payload");
699        // 03 bb 03 00 01 0a 0a OCI:EP:301 01 00  (token 0x00 after seq)
700        let want: &[u8] = &[
701            0x03, 0xBB, 0x03, 0x00, 0x01, 0x0A, 0x0A, 0x4F, 0x43, 0x49, 0x3A, 0x45, 0x50, 0x3A,
702            0x33, 0x30, 0x31, 0x01, 0x00,
703        ];
704        assert_eq!(payload, want);
705    }
706
707    #[test]
708    fn subscribe_response_decodes_registration_and_client_id() {
709        // Golden /tmp/cqn_trace.txt line 2433 (op 9, socket 5) after data flags.
710        let payload: &[u8] = &[
711            0x08, 0x01, 0x01, 0x00, 0x02, 0x01, 0x2E, 0x01, 0x01, 0x02, 0x01, 0x2E, 0x00, 0x00,
712            0x01, 0x01, 0x01, 0x36, 0x36, 0x28, 0x41, 0x44, 0x44, 0x52, 0x45, 0x53, 0x53, 0x3D,
713            0x28, 0x50, 0x52, 0x4F, 0x54, 0x4F, 0x43, 0x4F, 0x4C, 0x3D, 0x54, 0x43, 0x50, 0x29,
714            0x28, 0x48, 0x4F, 0x53, 0x54, 0x3D, 0x32, 0x39, 0x30, 0x61, 0x63, 0x30, 0x33, 0x30,
715            0x30, 0x33, 0x38, 0x37, 0x29, 0x28, 0x50, 0x4F, 0x52, 0x54, 0x3D, 0x31, 0x35, 0x32,
716            0x31, 0x29, 0x29, 0x01, 0x0A, 0x0A, 0x4F, 0x43, 0x49, 0x3A, 0x45, 0x50, 0x3A, 0x33,
717            0x30, 0x31, 0x09, 0x01, 0x01, 0x02, 0xDD, 0x48, 0x1D,
718        ];
719        let result = parse_subscribe_response(payload, caps_12_1()).expect("subscribe response");
720        assert_eq!(result.registration_id, 302);
721        assert_eq!(result.client_id.as_deref(), Some(&b"OCI:EP:301"[..]));
722    }
723
724    /// The full real notification stream captured on the emon socket
725    /// (`/tmp/cqn_notif_stream.bin`): the leading OAC ack byte plus five OAC
726    /// records (insert / update / insert / delete / truncate).
727    const NOTIF_STREAM: &[u8] = &[
728        0x0d, 0x01, 0x03, 0x00, 0x02, 0x01, 0x2e, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
729        0x00, 0x01, 0x60, 0x60, 0x00, 0x01, 0x02, 0xa4, 0xe2, 0x7a, 0x00, 0x00, 0x00, 0x06, 0x00,
730        0x08, 0x46, 0x52, 0x45, 0x45, 0x50, 0x44, 0x42, 0x31, 0x01, 0x00, 0x10, 0x00, 0xd2, 0x03,
731        0x00, 0x00, 0xe2, 0x7a, 0x00, 0x00, 0x00, 0x9b, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00,
732        0x18, 0x50, 0x59, 0x54, 0x48, 0x4f, 0x4e, 0x54, 0x45, 0x53, 0x54, 0x2e, 0x54, 0x45, 0x53,
733        0x54, 0x54, 0x45, 0x4d, 0x50, 0x54, 0x41, 0x42, 0x4c, 0x45, 0x00, 0x01, 0x1c, 0x4a, 0x00,
734        0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x12, 0x41, 0x41, 0x41, 0x53, 0x6a, 0x4d, 0x41, 0x41,
735        0x59, 0x41, 0x41, 0x41, 0x4a, 0x4f, 0x33, 0x41, 0x41, 0x41, 0x00, 0x01, 0x03, 0x00, 0x02,
736        0x01, 0x2e, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x60, 0x60, 0x00,
737        0x01, 0x00, 0x00, 0x89, 0x00, 0x00, 0x00, 0x00, 0x06, 0x00, 0x08, 0x46, 0x52, 0x45, 0x45,
738        0x50, 0x44, 0x42, 0x31, 0x03, 0x00, 0x19, 0x00, 0x98, 0x04, 0x00, 0x00, 0x0b, 0x00, 0x00,
739        0x00, 0x01, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x18, 0x50, 0x59, 0x54, 0x48,
740        0x4f, 0x4e, 0x54, 0x45, 0x53, 0x54, 0x2e, 0x54, 0x45, 0x53, 0x54, 0x54, 0x45, 0x4d, 0x50,
741        0x54, 0x41, 0x42, 0x4c, 0x45, 0x00, 0x01, 0x1c, 0x4a, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04,
742        0x00, 0x12, 0x41, 0x41, 0x41, 0x53, 0x6a, 0x4d, 0x41, 0x41, 0x59, 0x41, 0x41, 0x41, 0x4a,
743        0x4f, 0x33, 0x41, 0x41, 0x41, 0x00, 0x01, 0x03, 0x00, 0x02, 0x01, 0x2e, 0x00, 0x00, 0x00,
744        0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x60, 0x60, 0x00, 0x01, 0x03, 0x00, 0x89, 0x00,
745        0x00, 0x00, 0x00, 0x06, 0x00, 0x08, 0x46, 0x52, 0x45, 0x45, 0x50, 0x44, 0x42, 0x31, 0x05,
746        0x00, 0x06, 0x00, 0xa9, 0x04, 0x00, 0x00, 0xe2, 0x7a, 0x00, 0x00, 0x44, 0x32, 0x00, 0x01,
747        0x00, 0x00, 0x00, 0x02, 0x00, 0x18, 0x50, 0x59, 0x54, 0x48, 0x4f, 0x4e, 0x54, 0x45, 0x53,
748        0x54, 0x2e, 0x54, 0x45, 0x53, 0x54, 0x54, 0x45, 0x4d, 0x50, 0x54, 0x41, 0x42, 0x4c, 0x45,
749        0x00, 0x01, 0x1c, 0x4a, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x12, 0x41, 0x41, 0x41,
750        0x53, 0x6a, 0x4d, 0x41, 0x41, 0x59, 0x41, 0x41, 0x41, 0x4a, 0x4f, 0x33, 0x41, 0x41, 0x42,
751        0x00, 0x01, 0x03, 0x00, 0x02, 0x01, 0x2e, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
752        0x00, 0x01, 0x60, 0x60, 0x00, 0x01, 0x03, 0xa5, 0xe2, 0x7a, 0x00, 0x00, 0x00, 0x06, 0x00,
753        0x08, 0x46, 0x52, 0x45, 0x45, 0x50, 0x44, 0x42, 0x31, 0x02, 0x00, 0x09, 0x00, 0x7d, 0x04,
754        0x00, 0x00, 0xe2, 0x7a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x08, 0x00,
755        0x18, 0x50, 0x59, 0x54, 0x48, 0x4f, 0x4e, 0x54, 0x45, 0x53, 0x54, 0x2e, 0x54, 0x45, 0x53,
756        0x54, 0x54, 0x45, 0x4d, 0x50, 0x54, 0x41, 0x42, 0x4c, 0x45, 0x00, 0x01, 0x1c, 0x4a, 0x00,
757        0x01, 0x00, 0x00, 0x00, 0x08, 0x00, 0x12, 0x41, 0x41, 0x41, 0x53, 0x6a, 0x4d, 0x41, 0x41,
758        0x59, 0x41, 0x41, 0x41, 0x4a, 0x4f, 0x33, 0x41, 0x41, 0x42, 0x00, 0x01, 0x03, 0x00, 0x02,
759        0x01, 0x2e, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x46, 0x46, 0x00,
760        0x01, 0x00, 0x00, 0x89, 0x00, 0x00, 0x00, 0x00, 0x06, 0x00, 0x08, 0x46, 0x52, 0x45, 0x45,
761        0x50, 0x44, 0x42, 0x31, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xfe, 0x7f, 0x00,
762        0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x11, 0x00, 0x18, 0x50, 0x59, 0x54, 0x48,
763        0x4f, 0x4e, 0x54, 0x45, 0x53, 0x54, 0x2e, 0x54, 0x45, 0x53, 0x54, 0x54, 0x45, 0x4d, 0x50,
764        0x54, 0x41, 0x42, 0x4c, 0x45, 0x00, 0x01, 0x1c, 0x4a, 0x00,
765    ];
766
767    #[test]
768    fn notification_stream_decodes_dml_events() {
769        let records = parse_notification_stream(
770            NOTIF_STREAM,
771            TNS_SUBSCR_NAMESPACE_DBCHANGE,
772            SUBSCR_QOS_ROWIDS,
773            Some("FREEPDB1"),
774        )
775        .expect("notification stream");
776        let messages: Vec<&NotificationMessage> = records
777            .iter()
778            .filter_map(|r| match r {
779                NotificationRecord::Message { message, .. } => Some(message),
780                NotificationRecord::Stop => None,
781            })
782            .collect();
783        assert_eq!(messages.len(), 5);
784
785        let table_ops: Vec<u32> = messages.iter().map(|m| m.tables[0].operation).collect();
786        assert_eq!(table_ops, vec![2, 4, 2, 8, 17]);
787
788        let mut row_ops = Vec::new();
789        let mut rowids = Vec::new();
790        for m in &messages {
791            assert_eq!(m.msg_type, EVENT_OBJCHANGE);
792            assert_eq!(m.dbname.as_deref(), Some("FREEPDB1"));
793            assert!(m.registered);
794            assert!(m.txid.is_none());
795            for row in &m.tables[0].rows {
796                row_ops.push(row.operation);
797                rowids.push(row.rowid.clone());
798            }
799        }
800        assert_eq!(row_ops, vec![2, 4, 2, 8]);
801        assert_eq!(
802            rowids,
803            vec![
804                "AAASjMAAYAAAJO3AAA",
805                "AAASjMAAYAAAJO3AAA",
806                "AAASjMAAYAAAJO3AAB",
807                "AAASjMAAYAAAJO3AAB",
808            ]
809        );
810        // the truncate record carries the ALLROWS bit, so no rows are present
811        assert!(messages[4].tables[0].rows.is_empty());
812    }
813}