1use std::net::{IpAddr, TcpStream};
24
25use crate::connection::Connection;
26use crate::error::{Error, Result};
27use crate::wire::consts::op;
28use crate::wire::response::{read_op, read_response};
29use crate::wire::stream::{FbStream, op_name, op_packet};
30
31pub struct EventListener {
34 aux: FbStream,
35 event_id: i32,
36 names: Vec<String>,
37 counts: Vec<u32>,
39}
40
41impl Connection {
42 pub fn listen_events(&mut self, names: &[&str]) -> Result<EventListener> {
51 if names.is_empty() {
52 return Err(Error::protocol("listen_events exige ao menos um evento"));
53 }
54 if let Some(long) = names.iter().find(|n| n.len() > u8::MAX as usize) {
56 return Err(Error::conversion(format!(
57 "nome de evento excede 255 bytes: {:.32}…",
58 long
59 )));
60 }
61 let mut w = op_packet(op::CONNECT_REQUEST);
63 w.put_i32(1); w.put_i32(self.db_handle());
65 w.put_i32(0); self.io().send(&w)?;
67 let resp = read_response(self.io())?;
68 let (ip, port) = parse_aux_addr(&resp.data, self.io().peer_ip())?;
69
70 let sock = TcpStream::connect((ip, port))?;
72 let aux = FbStream::new(sock);
73
74 let names: Vec<String> = names.iter().map(|s| s.to_string()).collect();
76 let counts = vec![0u32; names.len()];
77 let event_id = self.next_event_id();
78 que_events(self, &names, &counts, event_id)?;
79
80 Ok(EventListener {
81 aux,
82 event_id,
83 names,
84 counts,
85 })
86 }
87}
88
89impl EventListener {
90 pub fn names(&self) -> &[String] {
92 &self.names
93 }
94
95 pub fn wait(&mut self, conn: &mut Connection) -> Result<Vec<String>> {
99 loop {
100 let code = read_op(&mut self.aux)?;
102 if code != op::EVENT {
103 return Err(Error::protocol(format!(
104 "esperava op_event no canal auxiliar, veio {} ({code})",
105 op_name(code)
106 )));
107 }
108 let _db = self.aux.read_i32()?;
109 let epb = self.aux.read_bytes()?;
110 let _ast = self.aux.read_i32()?;
111 let _rid = self.aux.read_i32()?;
112
113 let new_counts = parse_epb_counts(&epb, &self.names);
114 let fired: Vec<String> = self
115 .names
116 .iter()
117 .enumerate()
118 .filter(|(i, _)| new_counts[*i] > self.counts[*i])
119 .map(|(_, n)| n.clone())
120 .collect();
121 self.counts = new_counts;
122
123 que_events(conn, &self.names, &self.counts, self.event_id)?;
125
126 if !fired.is_empty() {
127 return Ok(fired);
128 }
129 }
131 }
132
133 pub fn cancel(self, conn: &mut Connection) -> Result<()> {
136 let mut w = op_packet(op::CANCEL_EVENTS);
137 w.put_i32(conn.db_handle());
138 w.put_i32(self.event_id);
139 conn.io().send(&w)?;
140 read_response(conn.io())?;
141 Ok(())
142 }
143}
144
145fn que_events(
147 conn: &mut Connection,
148 names: &[String],
149 counts: &[u32],
150 event_id: i32,
151) -> Result<()> {
152 let epb = build_epb(names, counts);
153 let mut w = op_packet(op::QUE_EVENTS);
154 w.put_i32(conn.db_handle());
155 w.put_bytes(&epb); w.put_i32(0); w.put_i32(0); w.put_i32(event_id);
159 conn.io().send(&w)?;
160 read_response(conn.io())?;
161 Ok(())
162}
163
164fn build_epb(names: &[String], counts: &[u32]) -> Vec<u8> {
167 let mut epb = vec![1u8]; for (name, &count) in names.iter().zip(counts) {
169 epb.push(name.len() as u8);
170 epb.extend_from_slice(name.as_bytes());
171 epb.extend_from_slice(&count.to_le_bytes());
172 }
173 epb
174}
175
176fn parse_epb_counts(epb: &[u8], names: &[String]) -> Vec<u32> {
178 let mut out = vec![0u32; names.len()];
179 let mut i = 1; while i < epb.len() {
181 let nlen = epb[i] as usize;
182 i += 1;
183 if i + nlen + 4 > epb.len() {
184 break;
185 }
186 let name = &epb[i..i + nlen];
187 i += nlen;
188 let count = u32::from_le_bytes(epb[i..i + 4].try_into().unwrap());
189 i += 4;
190 if let Some(idx) = names.iter().position(|n| n.as_bytes() == name) {
191 out[idx] = count;
192 }
193 }
194 out
195}
196
197fn parse_aux_addr(data: &[u8], fallback: Option<IpAddr>) -> Result<(IpAddr, u16)> {
201 if data.len() < 8 {
202 return Err(Error::protocol(
203 "resposta de op_connect_request sem sockaddr",
204 ));
205 }
206 let port = u16::from_be_bytes([data[2], data[3]]);
207 let ip = IpAddr::from([data[4], data[5], data[6], data[7]]);
208 let ip = if ip.is_unspecified() {
209 fallback.ok_or_else(|| Error::protocol("canal auxiliar sem endereço utilizável"))?
210 } else {
211 ip
212 };
213 Ok((ip, port))
214}
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219
220 #[test]
221 fn epb_roundtrip() {
222 let names = vec!["abc".to_string(), "evento2".to_string()];
223 let epb = build_epb(&names, &[0, 5]);
224 assert_eq!(epb[0], 1);
226 assert_eq!(parse_epb_counts(&epb, &names), vec![0, 5]);
227 }
228
229 #[test]
230 fn parse_aux_addr_from_sockaddr() {
231 let data = [
233 0x02, 0x00, 0x8e, 0x81, 0x7f, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 0, 0, 0,
234 ];
235 let (ip, port) = parse_aux_addr(&data, None).unwrap();
236 assert_eq!(port, 0x8e81);
237 assert_eq!(ip, IpAddr::from([127, 0, 0, 1]));
238 }
239}