Skip to main content

firebird_wire/
events.rs

1//! Eventos do banco (`POST_EVENT` / `isc_que_events`).
2//!
3//! Uma conexão registra interesse em eventos nomeados; quando outra conexão faz
4//! `POST_EVENT 'nome'` e commita, o servidor notifica por um
5//! **canal auxiliar** (um segundo socket TCP). O fluxo de wire (decodificado de
6//! um cliente C com `isc_wait_for_event`, sob `strace`):
7//!
8//! 1. `op_connect_request` (53): `op | type(=1, async) | db_handle | partner(=0)`.
9//!    A resposta (`op_response`) traz no `p_resp_data` um `sockaddr_in` de 16
10//!    bytes — `família(2) | porta(2 BE) | ip(4) | zeros(8)` — com a porta do
11//!    canal auxiliar. O cliente abre um novo socket TCP para `(ip do servidor,
12//!    porta)`.
13//! 2. `op_que_events` (48): `op | db_handle | epb(cstring) | ast(4=0) | arg(4=0)
14//!    | event_id(4)`. O EPB é `versão(1) | [namelen(1) | nome | count(4 LE)]…`.
15//! 3. `op_event` (52) chega pelo canal auxiliar quando um evento é postado:
16//!    `op | db_handle | epb(cstring com os counts atualizados) | ast(4) |
17//!    event_id(4)`. Comparando os counts com os anteriores sabemos o que disparou.
18//! 4. `op_cancel_events` (49): `op | db_handle | event_id`.
19//!
20//! Os eventos são *one-shot*: após cada notificação é preciso registrar de novo
21//! (o que [`EventListener::wait`] faz automaticamente).
22
23use std::net::{IpAddr, TcpStream};
24
25use crate::connection::Connection;
26use crate::error::{Error, Result};
27use crate::wire::consts::op;
28use crate::wire::response::{read_op, read_response};
29use crate::wire::stream::{FbStream, op_name, op_packet};
30
31/// Escuta de eventos: o canal auxiliar mais o estado de registro. Criada por
32/// [`Connection::listen_events`].
33pub struct EventListener {
34    aux: FbStream,
35    event_id: i32,
36    names: Vec<String>,
37    /// Último count visto de cada evento (a linha de base do registro atual).
38    counts: Vec<u32>,
39}
40
41impl Connection {
42    /// Registra interesse nos eventos nomeados e abre o canal auxiliar por onde o
43    /// servidor empurra as notificações. Use [`EventListener::wait`] para aguardar.
44    ///
45    /// ```text
46    /// let mut ev = conn.listen_events(&["minha_tabela_mudou"])?;
47    /// let disparados = ev.wait(&mut conn)?;   // bloqueia até um POST
48    /// ev.cancel(&mut conn)?;
49    /// ```
50    pub fn listen_events(&mut self, names: &[&str]) -> Result<EventListener> {
51        if names.is_empty() {
52            return Err(Error::protocol("listen_events exige ao menos um evento"));
53        }
54        // O nome de cada evento vai num clumplet de comprimento de 1 byte no EPB.
55        if let Some(long) = names.iter().find(|n| n.len() > u8::MAX as usize) {
56            return Err(Error::conversion(format!(
57                "nome de evento excede 255 bytes: {:.32}…",
58                long
59            )));
60        }
61        // 1. Pede o canal auxiliar (op_connect_request, tipo async = 1).
62        let mut w = op_packet(op::CONNECT_REQUEST);
63        w.put_i32(1); // p_req_type = async events
64        w.put_i32(self.db_handle());
65        w.put_i32(0); // p_req_partner
66        self.io().send(&w)?;
67        let resp = read_response(self.io())?;
68        let (ip, port) = parse_aux_addr(&resp.data, self.io().peer_ip())?;
69
70        // 2. Conecta o socket auxiliar.
71        let sock = TcpStream::connect((ip, port))?;
72        let aux = FbStream::new(sock);
73
74        // 3. Registra os eventos com a linha de base zerada.
75        let names: Vec<String> = names.iter().map(|s| s.to_string()).collect();
76        let counts = vec![0u32; names.len()];
77        let event_id = self.next_event_id();
78        que_events(self, &names, &counts, event_id)?;
79
80        Ok(EventListener {
81            aux,
82            event_id,
83            names,
84            counts,
85        })
86    }
87}
88
89impl EventListener {
90    /// Os nomes dos eventos registrados.
91    pub fn names(&self) -> &[String] {
92        &self.names
93    }
94
95    /// Bloqueia até ao menos um dos eventos registrados ser postado, devolvendo os
96    /// nomes que dispararam. Re-registra automaticamente para a próxima espera
97    /// (por isso precisa da conexão).
98    pub fn wait(&mut self, conn: &mut Connection) -> Result<Vec<String>> {
99        loop {
100            // Lê um op_event do canal auxiliar (bloqueia até chegar).
101            let code = read_op(&mut self.aux)?;
102            if code != op::EVENT {
103                return Err(Error::protocol(format!(
104                    "esperava op_event no canal auxiliar, veio {} ({code})",
105                    op_name(code)
106                )));
107            }
108            let _db = self.aux.read_i32()?;
109            let epb = self.aux.read_bytes()?;
110            let _ast = self.aux.read_i32()?;
111            let _rid = self.aux.read_i32()?;
112
113            let new_counts = parse_epb_counts(&epb, &self.names);
114            let fired: Vec<String> = self
115                .names
116                .iter()
117                .enumerate()
118                .filter(|(i, _)| new_counts[*i] > self.counts[*i])
119                .map(|(_, n)| n.clone())
120                .collect();
121            self.counts = new_counts;
122
123            // Re-registra (one-shot) com a linha de base atualizada, para a próxima.
124            que_events(conn, &self.names, &self.counts, self.event_id)?;
125
126            if !fired.is_empty() {
127                return Ok(fired);
128            }
129            // Notificação sem incremento (eco do registro): continua esperando.
130        }
131    }
132
133    /// Cancela o registro no servidor (`op_cancel_events`) e fecha o canal
134    /// auxiliar.
135    pub fn cancel(self, conn: &mut Connection) -> Result<()> {
136        let mut w = op_packet(op::CANCEL_EVENTS);
137        w.put_i32(conn.db_handle());
138        w.put_i32(self.event_id);
139        conn.io().send(&w)?;
140        read_response(conn.io())?;
141        Ok(())
142    }
143}
144
145/// Envia `op_que_events` com o EPB dos eventos e a linha de base dos counts.
146fn que_events(
147    conn: &mut Connection,
148    names: &[String],
149    counts: &[u32],
150    event_id: i32,
151) -> Result<()> {
152    let epb = build_epb(names, counts);
153    let mut w = op_packet(op::QUE_EVENTS);
154    w.put_i32(conn.db_handle());
155    w.put_bytes(&epb); // cstring: len(4) + epb + pad
156    w.put_i32(0); // ast (ponteiro do callback; ignorado no wire)
157    w.put_i32(0); // arg
158    w.put_i32(event_id);
159    conn.io().send(&w)?;
160    read_response(conn.io())?;
161    Ok(())
162}
163
164/// Monta o EPB (event parameter block): `versão(1) | [namelen(1) | nome |
165/// count(4 LE)]…`.
166fn build_epb(names: &[String], counts: &[u32]) -> Vec<u8> {
167    let mut epb = vec![1u8]; // EPB_version1
168    for (name, &count) in names.iter().zip(counts) {
169        epb.push(name.len() as u8);
170        epb.extend_from_slice(name.as_bytes());
171        epb.extend_from_slice(&count.to_le_bytes());
172    }
173    epb
174}
175
176/// Extrai os counts do EPB de um `op_event`, na ordem de `names`.
177fn parse_epb_counts(epb: &[u8], names: &[String]) -> Vec<u32> {
178    let mut out = vec![0u32; names.len()];
179    let mut i = 1; // pula o byte de versão
180    while i < epb.len() {
181        let nlen = epb[i] as usize;
182        i += 1;
183        if i + nlen + 4 > epb.len() {
184            break;
185        }
186        let name = &epb[i..i + nlen];
187        i += nlen;
188        let count = u32::from_le_bytes(epb[i..i + 4].try_into().unwrap());
189        i += 4;
190        if let Some(idx) = names.iter().position(|n| n.as_bytes() == name) {
191            out[idx] = count;
192        }
193    }
194    out
195}
196
197/// Decodifica o `sockaddr_in` (16 bytes) da resposta de `op_connect_request`:
198/// `família(2) | porta(2 BE) | ip(4) | zeros(8)`. Se o ip vier zerado, usa o
199/// `fallback` (o ip do servidor da conexão principal).
200fn parse_aux_addr(data: &[u8], fallback: Option<IpAddr>) -> Result<(IpAddr, u16)> {
201    if data.len() < 8 {
202        return Err(Error::protocol(
203            "resposta de op_connect_request sem sockaddr",
204        ));
205    }
206    let port = u16::from_be_bytes([data[2], data[3]]);
207    let ip = IpAddr::from([data[4], data[5], data[6], data[7]]);
208    let ip = if ip.is_unspecified() {
209        fallback.ok_or_else(|| Error::protocol("canal auxiliar sem endereço utilizável"))?
210    } else {
211        ip
212    };
213    Ok((ip, port))
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219
220    #[test]
221    fn epb_roundtrip() {
222        let names = vec!["abc".to_string(), "evento2".to_string()];
223        let epb = build_epb(&names, &[0, 5]);
224        // versão 1, len 3 "abc" 00000000, len 7 "evento2" 05000000
225        assert_eq!(epb[0], 1);
226        assert_eq!(parse_epb_counts(&epb, &names), vec![0, 5]);
227    }
228
229    #[test]
230    fn parse_aux_addr_from_sockaddr() {
231        // família(2 LE) | porta 0x8e81 (BE) | 127.0.0.1 | zeros
232        let data = [
233            0x02, 0x00, 0x8e, 0x81, 0x7f, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 0, 0, 0,
234        ];
235        let (ip, port) = parse_aux_addr(&data, None).unwrap();
236        assert_eq!(port, 0x8e81);
237        assert_eq!(ip, IpAddr::from([127, 0, 0, 1]));
238    }
239}