Skip to main content

firebird_wire/wire/
stream.rs

1//! Fluxo (stream) de pacotes enquadrado, opcionalmente criptografado, sobre uma conexão TCP.
2//!
3//! O Firebird não tem um prefixo de comprimento geral para o pacote: cada operação é uma sequência
4//! de campos XDR cujo formato depende do op code. Por isso lemos os campos
5//! sob demanda diretamente do socket em vez de armazenar pacotes inteiros em buffer.
6//!
7//! Após o handshake de wire-crypt (`op_crypt`) cada byte subsequente em ambas
8//! as direções passa por uma [`Cipher`] de fluxo (stream). Como as cifras de fluxo (stream) são
9//! dependentes de posição, a cifra é aplicada aos bytes brutos exatamente uma vez, em
10//! ordem, conforme atravessam o socket.
11
12#![allow(missing_docs)]
13
14use std::io::{Read, Write};
15use std::net::TcpStream;
16
17use crate::error::{Error, Result};
18use crate::wire::consts::{INFO_END, op};
19use crate::wire::xdr::{XdrWriter, pad4};
20
21/// Uma cifra de fluxo (stream) simétrica aplicada ao protocolo de comunicação (wire protocol) após `op_crypt`.
22///
23/// As implementações alteram o buffer no lugar. A mesma posição de byte deve ser
24/// processada exatamente uma vez; a camada de fluxo (stream) garante isso.
25pub trait Cipher: Send {
26    fn process(&mut self, data: &mut [u8]);
27}
28
29/// A conexão enquadrada com um servidor Firebird.
30pub struct FbStream {
31    sock: TcpStream,
32    /// Bytes descriptografados já extraídos do socket mas ainda não consumidos.
33    rbuf: Vec<u8>,
34    rpos: usize,
35    /// Bytes de saida acumulados antes de um descarregamento (flush).
36    wbuf: Vec<u8>,
37    read_cipher: Option<Box<dyn Cipher>>,
38    write_cipher: Option<Box<dyn Cipher>>,
39    /// Verdadeiro após um erro de I/O ou um desync de protocolo: o stream pode
40    /// ter bytes pendentes em estado desconhecido e não deve ser reutilizado (o
41    /// pool descarta conexões assim marcadas em vez de devolvê-las).
42    broken: bool,
43}
44
45impl FbStream {
46    pub fn new(sock: TcpStream) -> Self {
47        let _ = sock.set_nodelay(true);
48        FbStream {
49            sock,
50            rbuf: Vec::with_capacity(8192),
51            rpos: 0,
52            wbuf: Vec::with_capacity(1024),
53            read_cipher: None,
54            write_cipher: None,
55            broken: false,
56        }
57    }
58
59    /// Marca o stream como inutilizável (erro de I/O ou desync de protocolo).
60    pub fn mark_broken(&mut self) {
61        self.broken = true;
62    }
63
64    /// Se o stream sofreu um erro de I/O ou desync e não deve ser reutilizado.
65    pub fn is_broken(&self) -> bool {
66        self.broken
67    }
68
69    /// Instala as cifras de wire negociadas. Chamado uma vez, logo após o handshake
70    /// de crypt; todo o tráfego a partir deste ponto é criptografado.
71    pub fn enable_encryption(&mut self, read: Box<dyn Cipher>, write: Box<dyn Cipher>) {
72        self.read_cipher = Some(read);
73        self.write_cipher = Some(write);
74    }
75
76    pub fn is_encrypted(&self) -> bool {
77        self.read_cipher.is_some()
78    }
79
80    /// O IP do servidor (peer) deste socket. Usado para abrir o canal auxiliar de
81    /// eventos no mesmo host.
82    pub fn peer_ip(&self) -> Option<std::net::IpAddr> {
83        self.sock.peer_addr().ok().map(|a| a.ip())
84    }
85
86    // -- escrita -----------------------------------------------------------
87
88    /// Anexa uma operação construída em XDR ao buffer de envio. Use [`Self::flush`]
89    /// para empurrá-la ao socket. A maioria dos chamadores usa [`Self::send`].
90    pub fn enqueue(&mut self, w: &XdrWriter) {
91        self.wbuf.extend_from_slice(w.as_slice());
92    }
93
94    /// Descarrega (flush) toda a saida em buffer, criptografando se uma cifra estiver instalada.
95    pub fn flush(&mut self) -> Result<()> {
96        if self.wbuf.is_empty() {
97            return Ok(());
98        }
99        if let Some(c) = self.write_cipher.as_mut() {
100            c.process(&mut self.wbuf);
101        }
102        if let Err(e) = self.sock.write_all(&self.wbuf) {
103            self.broken = true;
104            return Err(e.into());
105        }
106        if let Err(e) = self.sock.flush() {
107            self.broken = true;
108            return Err(e.into());
109        }
110        self.wbuf.clear();
111        Ok(())
112    }
113
114    /// Enfileira e imediatamente descarrega (flush) uma operação.
115    pub fn send(&mut self, w: &XdrWriter) -> Result<()> {
116        self.enqueue(w);
117        self.flush()
118    }
119
120    // -- leitura -----------------------------------------------------------
121
122    /// Garante que pelo menos `n` bytes descriptografados estejam disponíveis no cursor de leitura,
123    /// extraindo (e descriptografando) mais do socket conforme necessário.
124    fn fill(&mut self, n: usize) -> Result<()> {
125        // Compacta ocasionalmente para que o buffer não cresça indefinidamente.
126        if self.rpos > 0 && self.rpos == self.rbuf.len() {
127            self.rbuf.clear();
128            self.rpos = 0;
129        } else if self.rpos > 16 * 1024 {
130            self.rbuf.drain(..self.rpos);
131            self.rpos = 0;
132        }
133
134        while self.rbuf.len() - self.rpos < n {
135            let mut chunk = [0u8; 8192];
136            let got = match self.sock.read(&mut chunk) {
137                Ok(n) => n,
138                Err(e) => {
139                    self.broken = true;
140                    return Err(e.into());
141                }
142            };
143            if got == 0 {
144                self.broken = true;
145                return Err(Error::Closed);
146            }
147            let slice = &mut chunk[..got];
148            if let Some(c) = self.read_cipher.as_mut() {
149                c.process(slice);
150            }
151            self.rbuf.extend_from_slice(slice);
152        }
153        Ok(())
154    }
155
156    /// Consome `n` bytes do cursor de leitura (sem preenchimento (padding) XDR).
157    pub fn read_raw(&mut self, n: usize) -> Result<Vec<u8>> {
158        self.fill(n)?;
159        let start = self.rpos;
160        self.rpos += n;
161        Ok(self.rbuf[start..start + n].to_vec())
162    }
163
164    pub fn read_i32(&mut self) -> Result<i32> {
165        self.fill(4)?;
166        let b = &self.rbuf[self.rpos..self.rpos + 4];
167        let v = i32::from_be_bytes(b.try_into().unwrap());
168        self.rpos += 4;
169        Ok(v)
170    }
171
172    pub fn read_i64(&mut self) -> Result<i64> {
173        self.fill(8)?;
174        let b = &self.rbuf[self.rpos..self.rpos + 8];
175        let v = i64::from_be_bytes(b.try_into().unwrap());
176        self.rpos += 8;
177        Ok(v)
178    }
179
180    pub fn read_f64(&mut self) -> Result<f64> {
181        Ok(f64::from_bits(self.read_i64()? as u64))
182    }
183
184    /// Pula o preenchimento (padding) XDR para que o deslocamento absoluto de bytes desde o início do
185    /// fluxo (stream) caia em um limite de 4 bytes. Rastreamos o alinhamento via `data_len`, não `rpos`,
186    /// então o chamador passa o comprimento do campo de dados recém-lido.
187    pub fn read_pad(&mut self, data_len: usize) -> Result<()> {
188        let pad = pad4(data_len) - data_len;
189        if pad > 0 {
190            let _ = self.read_raw(pad)?;
191        }
192        Ok(())
193    }
194
195    /// Lê um buffer opaco com prefixo de comprimento, alinhado em 4 bytes.
196    pub fn read_bytes(&mut self) -> Result<Vec<u8>> {
197        let len = self.read_i32()? as usize;
198        let data = self.read_raw(len)?;
199        self.read_pad(len)?;
200        Ok(data)
201    }
202
203    /// Lê um quad do Firebird (id de blob/transação): duas palavras XDR, alta depois baixa.
204    pub fn read_quad(&mut self) -> Result<u64> {
205        Ok(self.read_i64()? as u64)
206    }
207}
208
209/// Auxiliar: constrói uma verificação do terminador do resultado de uma info-request. Retorna os itens até
210/// (mas excluindo) o byte `isc_info_end`, validando que não está truncado.
211pub fn info_payload(buf: &[u8]) -> Result<&[u8]> {
212    match buf.last() {
213        Some(&INFO_END) => Ok(&buf[..buf.len() - 1]),
214        Some(&crate::wire::consts::INFO_TRUNCATED) => {
215            Err(Error::protocol("info response truncated; buffer too small"))
216        }
217        _ => Ok(buf),
218    }
219}
220
221/// Conveniência para construir o corpo de um pacote de operação única.
222pub fn op_packet(opcode: i32) -> XdrWriter {
223    let mut w = XdrWriter::new();
224    w.put_i32(opcode);
225    w
226}
227
228/// Os nomes dos op codes, para diagnóstico.
229pub fn op_name(code: i32) -> &'static str {
230    match code {
231        op::RESPONSE => "op_response",
232        op::ACCEPT => "op_accept",
233        op::ACCEPT_DATA => "op_accept_data",
234        op::COND_ACCEPT => "op_cond_accept",
235        op::REJECT => "op_reject",
236        op::DISCONNECT => "op_disconnect",
237        op::FETCH_RESPONSE => "op_fetch_response",
238        op::SQL_RESPONSE => "op_sql_response",
239        op::CONT_AUTH => "op_cont_auth",
240        op::CRYPT => "op_crypt",
241        op::CRYPT_KEY_CALLBACK => "op_crypt_key_callback",
242        op::BATCH_CS => "op_batch_cs",
243        op::TRUSTED_AUTH => "op_trusted_auth",
244        _ => "op_<other>",
245    }
246}