use std::net::{IpAddr, TcpStream};
use crate::connection::Connection;
use crate::error::{Error, Result};
use crate::wire::consts::op;
use crate::wire::response::{read_op, read_response};
use crate::wire::stream::{FbStream, op_name, op_packet};
pub struct EventListener {
aux: FbStream,
event_id: i32,
names: Vec<String>,
counts: Vec<u32>,
}
impl Connection {
pub fn listen_events(&mut self, names: &[&str]) -> Result<EventListener> {
if names.is_empty() {
return Err(Error::protocol("listen_events exige ao menos um evento"));
}
if let Some(long) = names.iter().find(|n| n.len() > u8::MAX as usize) {
return Err(Error::conversion(format!(
"nome de evento excede 255 bytes: {:.32}…",
long
)));
}
let mut w = op_packet(op::CONNECT_REQUEST);
w.put_i32(1); w.put_i32(self.db_handle());
w.put_i32(0); self.io().send(&w)?;
let resp = read_response(self.io())?;
let (ip, port) = parse_aux_addr(&resp.data, self.io().peer_ip())?;
let sock = TcpStream::connect((ip, port))?;
let aux = FbStream::new(sock);
let names: Vec<String> = names.iter().map(|s| s.to_string()).collect();
let counts = vec![0u32; names.len()];
let event_id = self.next_event_id();
que_events(self, &names, &counts, event_id)?;
Ok(EventListener {
aux,
event_id,
names,
counts,
})
}
}
impl EventListener {
pub fn names(&self) -> &[String] {
&self.names
}
pub fn wait(&mut self, conn: &mut Connection) -> Result<Vec<String>> {
loop {
let code = read_op(&mut self.aux)?;
if code != op::EVENT {
return Err(Error::protocol(format!(
"esperava op_event no canal auxiliar, veio {} ({code})",
op_name(code)
)));
}
let _db = self.aux.read_i32()?;
let epb = self.aux.read_bytes()?;
let _ast = self.aux.read_i32()?;
let _rid = self.aux.read_i32()?;
let new_counts = parse_epb_counts(&epb, &self.names);
let fired: Vec<String> = self
.names
.iter()
.enumerate()
.filter(|(i, _)| new_counts[*i] > self.counts[*i])
.map(|(_, n)| n.clone())
.collect();
self.counts = new_counts;
que_events(conn, &self.names, &self.counts, self.event_id)?;
if !fired.is_empty() {
return Ok(fired);
}
}
}
pub fn cancel(self, conn: &mut Connection) -> Result<()> {
let mut w = op_packet(op::CANCEL_EVENTS);
w.put_i32(conn.db_handle());
w.put_i32(self.event_id);
conn.io().send(&w)?;
read_response(conn.io())?;
Ok(())
}
}
fn que_events(
conn: &mut Connection,
names: &[String],
counts: &[u32],
event_id: i32,
) -> Result<()> {
let epb = build_epb(names, counts);
let mut w = op_packet(op::QUE_EVENTS);
w.put_i32(conn.db_handle());
w.put_bytes(&epb); w.put_i32(0); w.put_i32(0); w.put_i32(event_id);
conn.io().send(&w)?;
read_response(conn.io())?;
Ok(())
}
fn build_epb(names: &[String], counts: &[u32]) -> Vec<u8> {
let mut epb = vec![1u8]; for (name, &count) in names.iter().zip(counts) {
epb.push(name.len() as u8);
epb.extend_from_slice(name.as_bytes());
epb.extend_from_slice(&count.to_le_bytes());
}
epb
}
fn parse_epb_counts(epb: &[u8], names: &[String]) -> Vec<u32> {
let mut out = vec![0u32; names.len()];
let mut i = 1; while i < epb.len() {
let nlen = epb[i] as usize;
i += 1;
if i + nlen + 4 > epb.len() {
break;
}
let name = &epb[i..i + nlen];
i += nlen;
let count = u32::from_le_bytes(epb[i..i + 4].try_into().unwrap());
i += 4;
if let Some(idx) = names.iter().position(|n| n.as_bytes() == name) {
out[idx] = count;
}
}
out
}
fn parse_aux_addr(data: &[u8], fallback: Option<IpAddr>) -> Result<(IpAddr, u16)> {
if data.len() < 8 {
return Err(Error::protocol(
"resposta de op_connect_request sem sockaddr",
));
}
let port = u16::from_be_bytes([data[2], data[3]]);
let ip = IpAddr::from([data[4], data[5], data[6], data[7]]);
let ip = if ip.is_unspecified() {
fallback.ok_or_else(|| Error::protocol("canal auxiliar sem endereço utilizável"))?
} else {
ip
};
Ok((ip, port))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn epb_roundtrip() {
let names = vec!["abc".to_string(), "evento2".to_string()];
let epb = build_epb(&names, &[0, 5]);
assert_eq!(epb[0], 1);
assert_eq!(parse_epb_counts(&epb, &names), vec![0, 5]);
}
#[test]
fn parse_aux_addr_from_sockaddr() {
let data = [
0x02, 0x00, 0x8e, 0x81, 0x7f, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 0, 0, 0,
];
let (ip, port) = parse_aux_addr(&data, None).unwrap();
assert_eq!(port, 0x8e81);
assert_eq!(ip, IpAddr::from([127, 0, 0, 1]));
}
}