Skip to main content

oracledb_protocol/thin/
subscr.rs

1#![forbid(unsafe_code)]
2
3//! CQN / Continuous Query Notification wire codecs (sans-io).
4//!
5//! Ports the reference thin subscription messages:
6//! - `impl/thin/messages/subscribe.pyx` — FUNC 125 register/unregister payload
7//!   and the `_process_return_parameters` response decode.
8//! - `impl/thin/messages/notification.pyx` — FUNC 187 NOTIFY payload, the OAC
9//!   record loop (`_process_oac`) and the big-endian inner CQN payload decoder
10//!   (`_process_notification_payload` / `_process_tables` / `_process_rows` /
11//!   `_process_queries`).
12//!
13//! Only the byte<->struct translation lives here; the second ("emon")
14//! connection, the background receive loop and the Python callback invocation
15//! live in the driver and pyshim crates.
16
17use super::*;
18use crate::wire::{ProtocolLimits, TtcReader, TtcWriter};
19
20/// Result of decoding the SUBSCRIBE (register) response.
21#[derive(Clone, Debug, Default, PartialEq, Eq)]
22pub struct SubscribeResult {
23    /// `USER_CHANGE_NOTIFICATION_REGS.REGID` — exposed as `Subscription.id`.
24    pub registration_id: u64,
25    /// EMON client id (e.g. `b"OCI:EP:301"`) echoed back in the NOTIFY message.
26    pub client_id: Option<Vec<u8>>,
27}
28
29/// One row changed inside a table notification.
30#[derive(Clone, Debug, PartialEq, Eq)]
31pub struct MsgRow {
32    pub operation: u32,
33    pub rowid: String,
34}
35
36/// One table changed inside an OBJCHANGE / QUERYCHANGE notification.
37#[derive(Clone, Debug, PartialEq, Eq)]
38pub struct MsgTable {
39    pub operation: u32,
40    pub name: String,
41    pub rows: Vec<MsgRow>,
42}
43
44/// One query whose result set changed (QUERYCHANGE notifications).
45#[derive(Clone, Debug, PartialEq, Eq)]
46pub struct MsgQuery {
47    pub id: u64,
48    pub operation: u32,
49    pub tables: Vec<MsgTable>,
50}
51
52/// A single decoded OAC notification record handed to the user callback.
53#[derive(Clone, Debug, PartialEq, Eq)]
54pub struct NotificationMessage {
55    /// `EVENT_*` value placed on `Message.type`.
56    pub msg_type: u32,
57    pub dbname: Option<String>,
58    /// Thin never decodes the transaction id (14 bytes skipped); always `None`.
59    pub txid: Option<Vec<u8>>,
60    pub registered: bool,
61    pub queue_name: Option<String>,
62    pub consumer_name: Option<String>,
63    pub msgid: Option<Vec<u8>>,
64    pub tables: Vec<MsgTable>,
65    pub queries: Vec<MsgQuery>,
66}
67
68/// Outcome of decoding one OAC record from the notification stream.
69#[derive(Clone, Debug, PartialEq, Eq)]
70pub enum NotificationRecord {
71    /// A record to deliver to the callback. `end_of_response` mirrors the
72    /// reference flag (DEREG / DEREG_NFY terminate the loop after delivery).
73    Message {
74        message: NotificationMessage,
75        end_of_response: bool,
76    },
77    /// `TNS_SUBSCR_STOP_NOTIF` — the stream is finished; no callback fires.
78    Stop,
79}
80
81/// Writes the function-code header plus the pipeline `token_num` (always 0 for
82/// these messages) when the negotiated caps include it, mirroring
83/// `messages/base.pyx::_write_function_code` (writes `ub8 token_num` for
84/// `ttc_field_version >= TNS_CCAP_FIELD_VERSION_23_1_EXT_1`).
85fn write_function_code_token(w: &mut TtcWriter, function_code: u8, seq_num: u8, field_version: u8) {
86    w.write_function_code_with_seq(function_code, seq_num);
87    if field_version >= TNS_CCAP_FIELD_VERSION_23_1_EXT_1 {
88        w.write_ub8(0);
89    }
90}
91
92/// Build the SUBSCRIBE (FUNC 125) payload for register (`opcode = 1`) or
93/// unregister (`opcode = 2`). Ports `subscribe.pyx::_write_message`.
94///
95/// `qos`/`operations` are the *public* `SUBSCR_QOS_*` / `OPCODE_*` values; this
96/// function performs the qos/flags derivation (`subscribe.pyx:82-93`).
97#[allow(clippy::too_many_arguments)]
98pub fn build_subscribe_payload_with_seq(
99    seq_num: u8,
100    opcode: u8,
101    username: Option<&str>,
102    client_id: Option<&[u8]>,
103    namespace: u32,
104    name: Option<&str>,
105    public_qos: u32,
106    operations: u32,
107    timeout: u32,
108    grouping_class: u8,
109    grouping_value: u32,
110    grouping_type: u8,
111    registration_id: u64,
112    field_version: u8,
113) -> Result<Vec<u8>> {
114    // derive the wire qos flags
115    let mut qos = TNS_SUBSCR_QOS_SECURE;
116    if public_qos & SUBSCR_QOS_RELIABLE != 0 {
117        qos |= TNS_SUBSCR_QOS_RELIABLE;
118    }
119    if public_qos & SUBSCR_QOS_DEREG_NFY != 0 {
120        qos |= TNS_SUBSCR_QOS_PURGE_ON_NTFN;
121    }
122    // derive the wire operation flags
123    let mut flags = operations;
124    if public_qos & SUBSCR_QOS_QUERY != 0 {
125        flags |= TNS_SUBSCR_FLAGS_QUERY;
126    }
127    if public_qos & SUBSCR_QOS_ROWIDS != 0 {
128        flags |= TNS_SUBSCR_FLAGS_INCLUDE_ROWIDS;
129    }
130    // grouping_type can only be sent when a grouping class is set
131    let grouping_type = if grouping_class == 0 {
132        0
133    } else {
134        grouping_type
135    };
136
137    let username_bytes = username.map(str::as_bytes);
138
139    let mut w = TtcWriter::new();
140    write_function_code_token(&mut w, TNS_FUNC_SUBSCRIBE, seq_num, field_version);
141    w.write_u8(opcode);
142    w.write_ub4(TNS_SUBSCR_MODE_CLIENT_INITIATED);
143    match username_bytes {
144        Some(bytes) => {
145            w.write_u8(1); // pointer (username)
146            w.write_ub4(u32::try_from(bytes.len()).unwrap_or(u32::MAX));
147        }
148        None => {
149            w.write_u8(0);
150            w.write_ub4(0);
151        }
152    }
153    match client_id {
154        Some(bytes) => {
155            w.write_u8(1); // pointer (location)
156            w.write_ub4(u32::try_from(bytes.len()).unwrap_or(u32::MAX));
157        }
158        None => {
159            w.write_u8(0);
160            w.write_ub4(0);
161        }
162    }
163    w.write_u8(1); // pointer (registration)
164    w.write_ub4(1); // num registrations
165    w.write_ub2(1); // raw presentation
166    w.write_ub2(6); // version for client notification
167    w.write_u8(0); // pointer (namespace out attrs)
168    w.write_u8(1); // pointer (num elements in array)
169    w.write_u8(0); // pointer (generic out attrs)
170    w.write_u8(1); // pointer (num elements in array)
171    if field_version >= TNS_CCAP_FIELD_VERSION_12_1 {
172        w.write_u8(1); // kpninst
173        w.write_u8(1); // kpninstl
174        w.write_u8(1); // kpngcret
175        w.write_u8(1); // kpngcretl
176        w.write_u8(1); // client id
177        w.write_ub4(TNS_SUBSCR_CLIENT_ID_LEN);
178        w.write_u8(1); // client id length
179    }
180    if let Some(bytes) = username_bytes {
181        w.write_bytes_with_length(bytes)?;
182    }
183    if let Some(bytes) = client_id {
184        w.write_bytes_with_length(bytes)?;
185    }
186    w.write_ub4(namespace);
187    match name {
188        Some(name) => w.write_bytes_with_two_lengths(Some(name.as_bytes()))?,
189        None => w.write_ub4(0),
190    }
191    w.write_ub4(0); // context length
192    w.write_ub4(0); // payload type
193    w.write_ub4(qos);
194    w.write_ub4(0); // payload callback length (JMS)
195    w.write_ub4(timeout);
196    w.write_ub4(0); // kpdnsd
197    w.write_ub4(flags);
198    w.write_ub4(0); // change lag between notifications
199    w.write_ub4(0); // change registration id
200    w.write_u8(grouping_class);
201    w.write_ub4(grouping_value);
202    w.write_u8(grouping_type);
203    w.write_ub4(0); // grouping class start time
204                    // grouping repeat count: write_sb4(0); for the constant 0 this is the same
205                    // single 0x00 byte the unsigned encoder emits.
206    w.write_ub4(0);
207    w.write_ub8(registration_id);
208    Ok(w.into_bytes())
209}
210
211/// Decode the SUBSCRIBE (register) response. Ports
212/// `subscribe.pyx::_process_return_parameters`, dispatched on the
213/// `TNS_MSG_TYPE_PARAMETER` message inside the standard function response loop.
214pub fn parse_subscribe_response(
215    payload: &[u8],
216    capabilities: ClientCapabilities,
217) -> Result<SubscribeResult> {
218    parse_subscribe_response_with_limits(payload, capabilities, ProtocolLimits::DEFAULT)
219}
220
221pub fn parse_subscribe_response_with_limits(
222    payload: &[u8],
223    capabilities: ClientCapabilities,
224    limits: ProtocolLimits,
225) -> Result<SubscribeResult> {
226    let mut reader = TtcReader::with_limits(payload, limits)?;
227    let mut result = SubscribeResult::default();
228    let field_version = capabilities.ttc_field_version;
229    while reader.remaining() > 0 {
230        let message_type = reader.read_u8()?;
231        match message_type {
232            0 => {}
233            TNS_MSG_TYPE_PARAMETER => {
234                parse_subscribe_return_parameters(&mut reader, field_version, &mut result)?;
235            }
236            TNS_MSG_TYPE_STATUS => {
237                let _call_status = reader.read_ub4()?;
238                let _seq = reader.read_ub2()?;
239            }
240            TNS_MSG_TYPE_SERVER_SIDE_PIGGYBACK => {
241                let _ = skip_server_side_piggyback(&mut reader)?;
242            }
243            TNS_MSG_TYPE_END_OF_RESPONSE => break,
244            TNS_MSG_TYPE_ERROR => {
245                let info = parse_server_error_info(&mut reader, field_version)?;
246                if info.number != 0 {
247                    return Err(ProtocolError::ServerError(info.message));
248                }
249            }
250            _ => {
251                return Err(ProtocolError::UnknownMessageType {
252                    message_type,
253                    position: reader.position().saturating_sub(1),
254                })
255            }
256        }
257    }
258    Ok(result)
259}
260
261fn parse_subscribe_return_parameters(
262    reader: &mut TtcReader<'_>,
263    field_version: u8,
264    result: &mut SubscribeResult,
265) -> Result<()> {
266    let num_values = reader.read_ub4()?; // out parameters (kpnrl)
267    for _ in 0..num_values {
268        let _ = reader.read_ub4()?;
269    }
270    for _ in 0..num_values {
271        let _ = reader.read_ub4()?; // registration id (short)
272    }
273    let num_values = reader.read_ub4()?; // out parameters (kpngrl)
274    for _ in 0..num_values {
275        result.registration_id = reader.read_ub8()?;
276        if field_version >= TNS_CCAP_FIELD_VERSION_12_1 {
277            let _subscriber_name = reader.read_bytes_with_length()?;
278        }
279    }
280    if field_version >= TNS_CCAP_FIELD_VERSION_12_1 {
281        let num_instances = reader.read_ub4()?;
282        for _ in 0..num_instances {
283            let _ = reader.read_bytes_with_length()?;
284        }
285        let num_listeners = reader.read_ub4()?;
286        for _ in 0..num_listeners {
287            let _ = reader.read_bytes_with_length()?;
288        }
289        result.client_id = reader.read_bytes_with_length()?;
290    }
291    Ok(())
292}
293
294/// Build the NOTIFY (FUNC 187) payload sent on the emon connection. Ports
295/// `notification.pyx::_write_message`. The caller must transmit this packet
296/// with the `TNS_DATA_FLAGS_END_OF_REQUEST` data flag set.
297pub fn build_notify_payload_with_seq(
298    seq_num: u8,
299    client_id: &[u8],
300    field_version: u8,
301) -> Result<Vec<u8>> {
302    let mut w = TtcWriter::new();
303    write_function_code_token(&mut w, TNS_FUNC_NOTIFY, seq_num, field_version);
304    w.write_ub4(u32::try_from(client_id.len()).unwrap_or(u32::MAX));
305    w.write_bytes_with_length(client_id)?;
306    w.write_u8(TNS_INIT_KPNDRREQ);
307    w.write_ub4(0);
308    Ok(w.into_bytes())
309}
310
311/// Decode every OAC record in a notification stream. The reference reads one
312/// leading `message_type` byte (`TNS_MSG_TYPE_OAC`) then loops `_process_oac`
313/// until `end_of_response`; the driver chains network packets into `payload`
314/// so this operates on the full concatenated TTC stream.
315///
316/// Returns the decoded records in order. A trailing [`NotificationRecord::Stop`]
317/// (or a record whose `end_of_response` is set) marks the end of the stream.
318pub fn parse_notification_stream(
319    payload: &[u8],
320    namespace: u32,
321    public_qos: u32,
322    db_name: Option<&str>,
323) -> Result<Vec<NotificationRecord>> {
324    parse_notification_stream_with_limits(
325        payload,
326        namespace,
327        public_qos,
328        db_name,
329        ProtocolLimits::DEFAULT,
330    )
331}
332
333pub fn parse_notification_stream_with_limits(
334    payload: &[u8],
335    namespace: u32,
336    public_qos: u32,
337    db_name: Option<&str>,
338    limits: ProtocolLimits,
339) -> Result<Vec<NotificationRecord>> {
340    let mut reader = TtcReader::with_limits(payload, limits)?;
341    let message_type = reader.read_u8()?; // outer process(): read_ub1(message_type)
342    if message_type != TNS_MSG_TYPE_OAC {
343        return Err(ProtocolError::UnknownMessageType {
344            message_type,
345            position: reader.position().saturating_sub(1),
346        });
347    }
348    let mut records = Vec::new();
349    while reader.remaining() > 0 {
350        let record =
351            parse_oac_record_with_limits(&mut reader, namespace, public_qos, db_name, limits)?;
352        let end = match &record {
353            NotificationRecord::Stop => true,
354            NotificationRecord::Message {
355                end_of_response, ..
356            } => *end_of_response,
357        };
358        records.push(record);
359        if end {
360            break;
361        }
362    }
363    Ok(records)
364}
365
366/// Consume the leading `TNS_MSG_TYPE_OAC` byte that precedes the OAC record
367/// stream (`process()` reads it once before delivering any record). Returns the
368/// number of bytes consumed (1) or an error if the byte is not OAC.
369pub fn check_notification_header(bytes: &[u8]) -> Result<usize> {
370    check_notification_header_with_limits(bytes, ProtocolLimits::DEFAULT)
371}
372
373pub fn check_notification_header_with_limits(
374    bytes: &[u8],
375    limits: ProtocolLimits,
376) -> Result<usize> {
377    let mut reader = TtcReader::with_limits(bytes, limits)?;
378    let message_type = reader.read_u8()?;
379    if message_type != TNS_MSG_TYPE_OAC {
380        return Err(ProtocolError::UnknownMessageType {
381            message_type,
382            position: 0,
383        });
384    }
385    Ok(reader.position())
386}
387
388/// Attempt to decode exactly one OAC record from the front of `bytes`. Returns
389/// the decoded record and the number of bytes consumed, or `Ok(None)` when the
390/// buffer does not yet hold a complete record (the caller must read more data
391/// from the EMON socket and retry — mirroring the reference `ReadBuffer`
392/// chaining packets within a single `process()` call).
393pub fn try_parse_oac_record(
394    bytes: &[u8],
395    namespace: u32,
396    public_qos: u32,
397    db_name: Option<&str>,
398) -> Result<Option<(NotificationRecord, usize)>> {
399    try_parse_oac_record_with_limits(
400        bytes,
401        namespace,
402        public_qos,
403        db_name,
404        ProtocolLimits::DEFAULT,
405    )
406}
407
408pub fn try_parse_oac_record_with_limits(
409    bytes: &[u8],
410    namespace: u32,
411    public_qos: u32,
412    db_name: Option<&str>,
413    limits: ProtocolLimits,
414) -> Result<Option<(NotificationRecord, usize)>> {
415    let mut reader = TtcReader::with_limits(bytes, limits)?;
416    match parse_oac_record_with_limits(&mut reader, namespace, public_qos, db_name, limits) {
417        Ok(record) => Ok(Some((record, reader.position()))),
418        // The server only emits well-formed records; a decode failure while the
419        // stream is still being chained means the buffer is short, so signal
420        // "need more bytes" rather than treating it as corruption.
421        Err(_) => Ok(None),
422    }
423}
424
425/// Decode a single OAC record. Ports `notification.pyx::_process_oac` plus the
426/// inner payload decode.
427pub fn parse_oac_record(
428    reader: &mut TtcReader<'_>,
429    namespace: u32,
430    public_qos: u32,
431    db_name: Option<&str>,
432) -> Result<NotificationRecord> {
433    parse_oac_record_with_limits(reader, namespace, public_qos, db_name, reader.limits())
434}
435
436pub fn parse_oac_record_with_limits(
437    reader: &mut TtcReader<'_>,
438    namespace: u32,
439    public_qos: u32,
440    db_name: Option<&str>,
441    limits: ProtocolLimits,
442) -> Result<NotificationRecord> {
443    let message_type = reader.read_ub4()?;
444    if message_type == TNS_SUBSCR_STOP_NOTIF {
445        return Ok(NotificationRecord::Stop);
446    }
447    let _error_code = reader.read_ub4()?;
448    let _registration_id = reader.read_ub4()?;
449    let queue_name = reader.read_string_with_length()?;
450    let consumer_name = reader.read_string_with_length()?;
451    let msgid = reader.read_bytes_with_length()?;
452    let num_props = reader.read_ub4()?;
453    if num_props > 0 {
454        // AQ message properties path: skip the invalid-length byte then the
455        // property records. The CQN tests never exercise this branch (AQ uses
456        // num_props == 0); skip conservatively so the stream stays aligned.
457        let _ = reader.read_u8()?;
458        skip_msg_props(reader, num_props)?;
459    }
460    skip_bytes_with_length(reader)?; // JMS message properties
461
462    let mut payload: Option<Vec<u8>> = None;
463    if namespace != TNS_SUBSCR_NAMESPACE_AQ {
464        let _payload_type = reader.read_ub4()?;
465        let _payload_flags = reader.read_ub4()?;
466        let _chunk_number = reader.read_ub4()?;
467        payload = reader.read_bytes_with_length()?;
468        skip_bytes_with_length(reader)?; // DbObject / JSON payload
469    }
470
471    let mut message = NotificationMessage {
472        msg_type: 0,
473        dbname: db_name.map(str::to_string),
474        txid: None,
475        registered: false,
476        queue_name,
477        consumer_name,
478        msgid,
479        tables: Vec::new(),
480        queries: Vec::new(),
481    };
482    let end_of_response = process_notification_payload(
483        payload.as_deref(),
484        namespace,
485        public_qos,
486        limits,
487        &mut message,
488    )?;
489    Ok(NotificationRecord::Message {
490        message,
491        end_of_response,
492    })
493}
494
495/// Ports `_process_notification_payload`. Returns the resulting
496/// `end_of_response` flag.
497fn process_notification_payload(
498    payload: Option<&[u8]>,
499    namespace: u32,
500    public_qos: u32,
501    limits: ProtocolLimits,
502    message: &mut NotificationMessage,
503) -> Result<bool> {
504    if namespace == TNS_SUBSCR_NAMESPACE_AQ {
505        message.msg_type = EVENT_AQ;
506        return Ok(false);
507    }
508    let Some(payload) = payload else {
509        // empty payload => registration discarded
510        message.msg_type = EVENT_DEREG;
511        return Ok(true);
512    };
513    let mut end_of_response = false;
514    if public_qos & SUBSCR_QOS_DEREG_NFY != 0 {
515        message.registered = false;
516        end_of_response = true;
517    } else {
518        message.registered = true;
519    }
520    // inner payload is a plain big-endian byte cursor
521    let mut cur = ByteCursor::with_limits(payload, limits)?;
522    let _version = cur.u16be()?;
523    let _registration_id = cur.u32be()?;
524    let event_type = cur.u32be()?;
525    message.msg_type = event_type;
526    let dbname_len = cur.u16be()? as usize;
527    let dbname = cur.raw(dbname_len)?;
528    message.dbname = Some(
529        String::from_utf8(dbname.to_vec())
530            .map_err(|_| ProtocolError::TtcDecode("notification dbname not UTF-8"))?,
531    );
532    cur.skip(14)?; // transaction id + SCN (txid intentionally left None)
533    if event_type == EVENT_OBJCHANGE {
534        message.tables = process_tables(&mut cur)?;
535    } else if event_type == EVENT_QUERYCHANGE {
536        message.queries = process_queries(&mut cur)?;
537    }
538    Ok(end_of_response)
539}
540
541fn process_tables(cur: &mut ByteCursor<'_>) -> Result<Vec<MsgTable>> {
542    let num_tables = cur.u16be()?;
543    // Each table record reads at least a u32 operation + u16 name length (6
544    // bytes) before its name, so cap the reservation by the buffer
545    // (BoundedReader); the loop still fails closed on truncation.
546    let mut tables: Vec<MsgTable> = cur.with_capacity_limited(
547        num_tables as usize,
548        6,
549        ProtocolLimits::check_length_prefixed_elements,
550    )?;
551    for _ in 0..num_tables {
552        let operation = cur.u32be()?;
553        let name_len = cur.u16be()? as usize;
554        let name = String::from_utf8(cur.raw(name_len)?.to_vec())
555            .map_err(|_| ProtocolError::TtcDecode("table name not UTF-8"))?;
556        let _object_num = cur.u32be()?;
557        let rows = if operation & OPCODE_ALLROWS == 0 {
558            process_rows(cur)?
559        } else {
560            Vec::new()
561        };
562        tables.push(MsgTable {
563            operation,
564            name,
565            rows,
566        });
567    }
568    Ok(tables)
569}
570
571fn process_rows(cur: &mut ByteCursor<'_>) -> Result<Vec<MsgRow>> {
572    let num_rows = cur.u16be()?;
573    // Each row record reads at least a u32 operation + u16 rowid length (6
574    // bytes); bound the reservation by the buffer (BoundedReader).
575    let mut rows: Vec<MsgRow> = cur.with_capacity_limited(
576        num_rows as usize,
577        6,
578        ProtocolLimits::check_length_prefixed_elements,
579    )?;
580    for _ in 0..num_rows {
581        let operation = cur.u32be()?;
582        let rowid_len = cur.u16be()? as usize;
583        let rowid = String::from_utf8(cur.raw(rowid_len)?.to_vec())
584            .map_err(|_| ProtocolError::TtcDecode("rowid not UTF-8"))?;
585        rows.push(MsgRow { operation, rowid });
586    }
587    Ok(rows)
588}
589
590fn process_queries(cur: &mut ByteCursor<'_>) -> Result<Vec<MsgQuery>> {
591    let num_queries = cur.u16be()?;
592    // Each query record reads at least three u32s (12 bytes) before its nested
593    // tables; bound the reservation by the buffer (BoundedReader).
594    let mut queries: Vec<MsgQuery> = cur.with_capacity_limited(
595        num_queries as usize,
596        12,
597        ProtocolLimits::check_length_prefixed_elements,
598    )?;
599    for _ in 0..num_queries {
600        let id_lsb = u64::from(cur.u32be()?);
601        let id_msb = u64::from(cur.u32be()?);
602        let id = (id_msb << 32) | id_lsb;
603        let operation = cur.u32be()?;
604        let tables = process_tables(cur)?;
605        queries.push(MsgQuery {
606            id,
607            operation,
608            tables,
609        });
610    }
611    Ok(queries)
612}
613
614/// Skip AQ message-property records (`_process_msg_props`). The CQN tests never
615/// reach this branch; this keeps the parser aligned should the server send it.
616fn skip_msg_props(reader: &mut TtcReader<'_>, num_props: u32) -> Result<()> {
617    for _ in 0..num_props {
618        skip_bytes_with_length(reader)?; // name
619        skip_bytes_with_length(reader)?; // value
620    }
621    Ok(())
622}
623
624fn skip_bytes_with_length(reader: &mut TtcReader<'_>) -> Result<()> {
625    let _ = reader.read_bytes_with_length()?;
626    Ok(())
627}
628
629/// A plain big-endian cursor over the inner CQN payload bytes (no TTC chunking).
630struct ByteCursor<'a> {
631    bytes: &'a [u8],
632    pos: usize,
633    limits: ProtocolLimits,
634}
635
636impl<'a> ByteCursor<'a> {
637    #[cfg(test)]
638    fn new(bytes: &'a [u8]) -> Self {
639        Self {
640            bytes,
641            pos: 0,
642            limits: ProtocolLimits::DEFAULT,
643        }
644    }
645
646    fn with_limits(bytes: &'a [u8], limits: ProtocolLimits) -> Result<Self> {
647        let limits = limits.validate()?;
648        limits.check_response_bytes(bytes.len())?;
649        Ok(Self {
650            bytes,
651            pos: 0,
652            limits,
653        })
654    }
655
656    fn raw(&mut self, n: usize) -> Result<&'a [u8]> {
657        let end = self
658            .pos
659            .checked_add(n)
660            .ok_or(ProtocolError::TtcDecode("notification payload overflow"))?;
661        let slice = self
662            .bytes
663            .get(self.pos..end)
664            .ok_or(ProtocolError::TtcDecode("notification payload truncated"))?;
665        self.pos = end;
666        Ok(slice)
667    }
668
669    fn skip(&mut self, n: usize) -> Result<()> {
670        let _ = self.raw(n)?;
671        Ok(())
672    }
673
674    fn u16be(&mut self) -> Result<u16> {
675        let bytes = self.raw(2)?;
676        Ok(u16::from_be_bytes([bytes[0], bytes[1]]))
677    }
678
679    fn u32be(&mut self) -> Result<u32> {
680        let bytes = self.raw(4)?;
681        Ok(u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
682    }
683}
684
685impl crate::wire::BoundedReader for ByteCursor<'_> {
686    fn remaining(&self) -> usize {
687        self.bytes.len().saturating_sub(self.pos)
688    }
689
690    fn protocol_limits(&self) -> ProtocolLimits {
691        self.limits
692    }
693}
694
695#[cfg(test)]
696mod tests {
697    use super::*;
698
699    // BoundedReader invariant (l2p), CQN array family: a notification table
700    // record declaring the maximum num_tables (0xFFFF) but carrying no table
701    // bytes must fail closed via the bounded reservation + the per-record read,
702    // not pre-allocate 65535 MsgTable structs from the count. (num_tables is a
703    // u16 so this was never a multi-GB OOM, but routing it through the bound
704    // keeps the whole class uniform and regression-proof.)
705    #[test]
706    fn cqn_oversized_table_count_fails_closed_not_oom() {
707        // num_tables = 0xFFFF (u16), then nothing.
708        let bytes = [0xFFu8, 0xFF];
709        let mut cur = ByteCursor::new(&bytes);
710        let err = process_tables(&mut cur).expect_err("oversized table count must fail closed");
711        assert!(matches!(err, ProtocolError::TtcDecode(_)), "got {err:?}");
712        // The pre-allocation never exceeds remaining()/6 even for the max count.
713        let cur2 = ByteCursor::new(&bytes);
714        let v: Vec<MsgTable> = cur2.with_capacity_bounded(0xFFFF, 6);
715        assert!(v.capacity() <= 1, "reservation capped by remaining bytes");
716    }
717
718    #[test]
719    fn cqn_table_count_respects_protocol_element_limit() {
720        // num_tables = 2. A max_length_prefixed_elements=1 policy rejects the
721        // count before reserving table slots.
722        let bytes = [0x00u8, 0x02];
723        let limits = ProtocolLimits {
724            max_length_prefixed_elements: 1,
725            ..ProtocolLimits::DEFAULT
726        };
727        let mut cur = ByteCursor::with_limits(&bytes, limits).expect("valid limits");
728        let err = process_tables(&mut cur).expect_err("table count above policy must fail");
729        assert!(
730            matches!(
731                err,
732                ProtocolError::ResourceLimit {
733                    limit: "length_prefixed_elements",
734                    observed: 2,
735                    maximum: 1,
736                }
737            ),
738            "got {err:?}"
739        );
740    }
741
742    fn caps_12_1() -> ClientCapabilities {
743        ClientCapabilities {
744            ttc_field_version: 24,
745            ..ClientCapabilities::default()
746        }
747    }
748
749    #[test]
750    fn subscribe_register_payload_matches_golden() {
751        // Golden /tmp/cqn_trace.txt line 2421 (op 8, socket 5), payload after
752        // the 2-byte data flags. seq byte is 0x03 in the capture.
753        let payload = build_subscribe_payload_with_seq(
754            0x03,
755            TNS_SUBSCR_OP_REGISTER,
756            Some("pythontest"),
757            None,
758            TNS_SUBSCR_NAMESPACE_DBCHANGE,
759            None,
760            SUBSCR_QOS_ROWIDS,
761            0, // OPCODE_ALLOPS
762            10,
763            0,
764            0,
765            0,
766            0,
767            24,
768        )
769        .expect("subscribe payload");
770        // real capture TTC payload (token byte 0x00 follows the seq):
771        // 03 7d 03 00 01 01 04 01 01 0a 00 00 01 01 01 01 01 01 06 00 ...
772        let expected: &[u8] = &[
773            0x03, 0x7D, 0x03, 0x00, 0x01, 0x01, 0x04, 0x01, 0x01, 0x0A, 0x00, 0x00, 0x01, 0x01,
774            0x01, 0x01, 0x01, 0x01, 0x06, 0x00, 0x01, 0x00, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01,
775            0x01, 0x1D, 0x01, 0x0A, 0x70, 0x79, 0x74, 0x68, 0x6F, 0x6E, 0x74, 0x65, 0x73, 0x74,
776            0x01, 0x02, 0x00, 0x00, 0x00, 0x01, 0x08, 0x00, 0x01, 0x0A, 0x00, 0x01, 0x10, 0x00,
777            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
778        ];
779        assert_eq!(payload, expected);
780    }
781
782    #[test]
783    fn subscribe_unregister_payload_matches_golden() {
784        // Golden /tmp/cqn_trace.txt line 4029 (op 22, socket 5). seq 0x0A,
785        // opcode 2, client_id now set to "OCI:EP:301", reg id 302 in the tail.
786        let payload = build_subscribe_payload_with_seq(
787            0x0A,
788            TNS_SUBSCR_OP_UNREGISTER,
789            Some("pythontest"),
790            Some(b"OCI:EP:301"),
791            TNS_SUBSCR_NAMESPACE_DBCHANGE,
792            None,
793            SUBSCR_QOS_ROWIDS,
794            0,
795            10,
796            0,
797            0,
798            0,
799            302,
800            24,
801        )
802        .expect("unsubscribe payload");
803        let expected: &[u8] = &[
804            0x03, 0x7D, 0x0A, 0x00, 0x02, 0x01, 0x04, 0x01, 0x01, 0x0A, 0x01, 0x01, 0x0A, 0x01,
805            0x01, 0x01, 0x01, 0x01, 0x01, 0x06, 0x00, 0x01, 0x00, 0x01, 0x01, 0x01, 0x01, 0x01,
806            0x01, 0x01, 0x1D, 0x01, 0x0A, 0x70, 0x79, 0x74, 0x68, 0x6F, 0x6E, 0x74, 0x65, 0x73,
807            0x74, 0x0A, 0x4F, 0x43, 0x49, 0x3A, 0x45, 0x50, 0x3A, 0x33, 0x30, 0x31, 0x01, 0x02,
808            0x00, 0x00, 0x00, 0x01, 0x08, 0x00, 0x01, 0x0A, 0x00, 0x01, 0x10, 0x00, 0x00, 0x00,
809            0x00, 0x00, 0x00, 0x00, 0x02, 0x01, 0x2E,
810        ];
811        assert_eq!(payload, expected);
812    }
813
814    #[test]
815    fn notify_payload_matches_golden() {
816        // Golden /tmp/cqn_trace.txt line 3647 (op 8, socket 6) after data flags.
817        let payload =
818            build_notify_payload_with_seq(0x03, b"OCI:EP:301", 24).expect("notify payload");
819        // 03 bb 03 00 01 0a 0a OCI:EP:301 01 00  (token 0x00 after seq)
820        let want: &[u8] = &[
821            0x03, 0xBB, 0x03, 0x00, 0x01, 0x0A, 0x0A, 0x4F, 0x43, 0x49, 0x3A, 0x45, 0x50, 0x3A,
822            0x33, 0x30, 0x31, 0x01, 0x00,
823        ];
824        assert_eq!(payload, want);
825    }
826
827    #[test]
828    fn subscribe_response_decodes_registration_and_client_id() {
829        // Golden /tmp/cqn_trace.txt line 2433 (op 9, socket 5) after data flags.
830        let payload: &[u8] = &[
831            0x08, 0x01, 0x01, 0x00, 0x02, 0x01, 0x2E, 0x01, 0x01, 0x02, 0x01, 0x2E, 0x00, 0x00,
832            0x01, 0x01, 0x01, 0x36, 0x36, 0x28, 0x41, 0x44, 0x44, 0x52, 0x45, 0x53, 0x53, 0x3D,
833            0x28, 0x50, 0x52, 0x4F, 0x54, 0x4F, 0x43, 0x4F, 0x4C, 0x3D, 0x54, 0x43, 0x50, 0x29,
834            0x28, 0x48, 0x4F, 0x53, 0x54, 0x3D, 0x32, 0x39, 0x30, 0x61, 0x63, 0x30, 0x33, 0x30,
835            0x30, 0x33, 0x38, 0x37, 0x29, 0x28, 0x50, 0x4F, 0x52, 0x54, 0x3D, 0x31, 0x35, 0x32,
836            0x31, 0x29, 0x29, 0x01, 0x0A, 0x0A, 0x4F, 0x43, 0x49, 0x3A, 0x45, 0x50, 0x3A, 0x33,
837            0x30, 0x31, 0x09, 0x01, 0x01, 0x02, 0xDD, 0x48, 0x1D,
838        ];
839        let result = parse_subscribe_response(payload, caps_12_1()).expect("subscribe response");
840        assert_eq!(result.registration_id, 302);
841        assert_eq!(result.client_id.as_deref(), Some(&b"OCI:EP:301"[..]));
842    }
843
844    /// The full real notification stream captured on the emon socket
845    /// (`/tmp/cqn_notif_stream.bin`): the leading OAC ack byte plus five OAC
846    /// records (insert / update / insert / delete / truncate).
847    const NOTIF_STREAM: &[u8] = &[
848        0x0d, 0x01, 0x03, 0x00, 0x02, 0x01, 0x2e, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
849        0x00, 0x01, 0x60, 0x60, 0x00, 0x01, 0x02, 0xa4, 0xe2, 0x7a, 0x00, 0x00, 0x00, 0x06, 0x00,
850        0x08, 0x46, 0x52, 0x45, 0x45, 0x50, 0x44, 0x42, 0x31, 0x01, 0x00, 0x10, 0x00, 0xd2, 0x03,
851        0x00, 0x00, 0xe2, 0x7a, 0x00, 0x00, 0x00, 0x9b, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00,
852        0x18, 0x50, 0x59, 0x54, 0x48, 0x4f, 0x4e, 0x54, 0x45, 0x53, 0x54, 0x2e, 0x54, 0x45, 0x53,
853        0x54, 0x54, 0x45, 0x4d, 0x50, 0x54, 0x41, 0x42, 0x4c, 0x45, 0x00, 0x01, 0x1c, 0x4a, 0x00,
854        0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x12, 0x41, 0x41, 0x41, 0x53, 0x6a, 0x4d, 0x41, 0x41,
855        0x59, 0x41, 0x41, 0x41, 0x4a, 0x4f, 0x33, 0x41, 0x41, 0x41, 0x00, 0x01, 0x03, 0x00, 0x02,
856        0x01, 0x2e, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x60, 0x60, 0x00,
857        0x01, 0x00, 0x00, 0x89, 0x00, 0x00, 0x00, 0x00, 0x06, 0x00, 0x08, 0x46, 0x52, 0x45, 0x45,
858        0x50, 0x44, 0x42, 0x31, 0x03, 0x00, 0x19, 0x00, 0x98, 0x04, 0x00, 0x00, 0x0b, 0x00, 0x00,
859        0x00, 0x01, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x18, 0x50, 0x59, 0x54, 0x48,
860        0x4f, 0x4e, 0x54, 0x45, 0x53, 0x54, 0x2e, 0x54, 0x45, 0x53, 0x54, 0x54, 0x45, 0x4d, 0x50,
861        0x54, 0x41, 0x42, 0x4c, 0x45, 0x00, 0x01, 0x1c, 0x4a, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04,
862        0x00, 0x12, 0x41, 0x41, 0x41, 0x53, 0x6a, 0x4d, 0x41, 0x41, 0x59, 0x41, 0x41, 0x41, 0x4a,
863        0x4f, 0x33, 0x41, 0x41, 0x41, 0x00, 0x01, 0x03, 0x00, 0x02, 0x01, 0x2e, 0x00, 0x00, 0x00,
864        0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x60, 0x60, 0x00, 0x01, 0x03, 0x00, 0x89, 0x00,
865        0x00, 0x00, 0x00, 0x06, 0x00, 0x08, 0x46, 0x52, 0x45, 0x45, 0x50, 0x44, 0x42, 0x31, 0x05,
866        0x00, 0x06, 0x00, 0xa9, 0x04, 0x00, 0x00, 0xe2, 0x7a, 0x00, 0x00, 0x44, 0x32, 0x00, 0x01,
867        0x00, 0x00, 0x00, 0x02, 0x00, 0x18, 0x50, 0x59, 0x54, 0x48, 0x4f, 0x4e, 0x54, 0x45, 0x53,
868        0x54, 0x2e, 0x54, 0x45, 0x53, 0x54, 0x54, 0x45, 0x4d, 0x50, 0x54, 0x41, 0x42, 0x4c, 0x45,
869        0x00, 0x01, 0x1c, 0x4a, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x12, 0x41, 0x41, 0x41,
870        0x53, 0x6a, 0x4d, 0x41, 0x41, 0x59, 0x41, 0x41, 0x41, 0x4a, 0x4f, 0x33, 0x41, 0x41, 0x42,
871        0x00, 0x01, 0x03, 0x00, 0x02, 0x01, 0x2e, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
872        0x00, 0x01, 0x60, 0x60, 0x00, 0x01, 0x03, 0xa5, 0xe2, 0x7a, 0x00, 0x00, 0x00, 0x06, 0x00,
873        0x08, 0x46, 0x52, 0x45, 0x45, 0x50, 0x44, 0x42, 0x31, 0x02, 0x00, 0x09, 0x00, 0x7d, 0x04,
874        0x00, 0x00, 0xe2, 0x7a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x08, 0x00,
875        0x18, 0x50, 0x59, 0x54, 0x48, 0x4f, 0x4e, 0x54, 0x45, 0x53, 0x54, 0x2e, 0x54, 0x45, 0x53,
876        0x54, 0x54, 0x45, 0x4d, 0x50, 0x54, 0x41, 0x42, 0x4c, 0x45, 0x00, 0x01, 0x1c, 0x4a, 0x00,
877        0x01, 0x00, 0x00, 0x00, 0x08, 0x00, 0x12, 0x41, 0x41, 0x41, 0x53, 0x6a, 0x4d, 0x41, 0x41,
878        0x59, 0x41, 0x41, 0x41, 0x4a, 0x4f, 0x33, 0x41, 0x41, 0x42, 0x00, 0x01, 0x03, 0x00, 0x02,
879        0x01, 0x2e, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x46, 0x46, 0x00,
880        0x01, 0x00, 0x00, 0x89, 0x00, 0x00, 0x00, 0x00, 0x06, 0x00, 0x08, 0x46, 0x52, 0x45, 0x45,
881        0x50, 0x44, 0x42, 0x31, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xfe, 0x7f, 0x00,
882        0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x11, 0x00, 0x18, 0x50, 0x59, 0x54, 0x48,
883        0x4f, 0x4e, 0x54, 0x45, 0x53, 0x54, 0x2e, 0x54, 0x45, 0x53, 0x54, 0x54, 0x45, 0x4d, 0x50,
884        0x54, 0x41, 0x42, 0x4c, 0x45, 0x00, 0x01, 0x1c, 0x4a, 0x00,
885    ];
886
887    #[test]
888    fn notification_stream_decodes_dml_events() {
889        let records = parse_notification_stream(
890            NOTIF_STREAM,
891            TNS_SUBSCR_NAMESPACE_DBCHANGE,
892            SUBSCR_QOS_ROWIDS,
893            Some("FREEPDB1"),
894        )
895        .expect("notification stream");
896        let messages: Vec<&NotificationMessage> = records
897            .iter()
898            .filter_map(|r| match r {
899                NotificationRecord::Message { message, .. } => Some(message),
900                NotificationRecord::Stop => None,
901            })
902            .collect();
903        assert_eq!(messages.len(), 5);
904
905        let table_ops: Vec<u32> = messages.iter().map(|m| m.tables[0].operation).collect();
906        assert_eq!(table_ops, vec![2, 4, 2, 8, 17]);
907
908        let mut row_ops = Vec::new();
909        let mut rowids = Vec::new();
910        for m in &messages {
911            assert_eq!(m.msg_type, EVENT_OBJCHANGE);
912            assert_eq!(m.dbname.as_deref(), Some("FREEPDB1"));
913            assert!(m.registered);
914            assert!(m.txid.is_none());
915            for row in &m.tables[0].rows {
916                row_ops.push(row.operation);
917                rowids.push(row.rowid.clone());
918            }
919        }
920        assert_eq!(row_ops, vec![2, 4, 2, 8]);
921        assert_eq!(
922            rowids,
923            vec![
924                "AAASjMAAYAAAJO3AAA",
925                "AAASjMAAYAAAJO3AAA",
926                "AAASjMAAYAAAJO3AAB",
927                "AAASjMAAYAAAJO3AAB",
928            ]
929        );
930        // the truncate record carries the ALLROWS bit, so no rows are present
931        assert!(messages[4].tables[0].rows.is_empty());
932    }
933}