firebird_wire/wire/
stream.rs1#![allow(missing_docs)]
13
14use std::io::{Read, Write};
15use std::net::TcpStream;
16
17use crate::error::{Error, Result};
18use crate::wire::consts::{INFO_END, op};
19use crate::wire::xdr::{XdrWriter, pad4};
20
21pub trait Cipher: Send {
26 fn process(&mut self, data: &mut [u8]);
27}
28
29pub struct FbStream {
31 sock: TcpStream,
32 rbuf: Vec<u8>,
34 rpos: usize,
35 wbuf: Vec<u8>,
37 read_cipher: Option<Box<dyn Cipher>>,
38 write_cipher: Option<Box<dyn Cipher>>,
39 broken: bool,
43}
44
45impl FbStream {
46 pub fn new(sock: TcpStream) -> Self {
47 let _ = sock.set_nodelay(true);
48 FbStream {
49 sock,
50 rbuf: Vec::with_capacity(8192),
51 rpos: 0,
52 wbuf: Vec::with_capacity(1024),
53 read_cipher: None,
54 write_cipher: None,
55 broken: false,
56 }
57 }
58
59 pub fn mark_broken(&mut self) {
61 self.broken = true;
62 }
63
64 pub fn is_broken(&self) -> bool {
66 self.broken
67 }
68
69 pub fn enable_encryption(&mut self, read: Box<dyn Cipher>, write: Box<dyn Cipher>) {
72 self.read_cipher = Some(read);
73 self.write_cipher = Some(write);
74 }
75
76 pub fn is_encrypted(&self) -> bool {
77 self.read_cipher.is_some()
78 }
79
80 pub fn peer_ip(&self) -> Option<std::net::IpAddr> {
83 self.sock.peer_addr().ok().map(|a| a.ip())
84 }
85
86 pub fn enqueue(&mut self, w: &XdrWriter) {
91 self.wbuf.extend_from_slice(w.as_slice());
92 }
93
94 pub fn flush(&mut self) -> Result<()> {
96 if self.wbuf.is_empty() {
97 return Ok(());
98 }
99 if let Some(c) = self.write_cipher.as_mut() {
100 c.process(&mut self.wbuf);
101 }
102 if let Err(e) = self.sock.write_all(&self.wbuf) {
103 self.broken = true;
104 return Err(e.into());
105 }
106 if let Err(e) = self.sock.flush() {
107 self.broken = true;
108 return Err(e.into());
109 }
110 self.wbuf.clear();
111 Ok(())
112 }
113
114 pub fn send(&mut self, w: &XdrWriter) -> Result<()> {
116 self.enqueue(w);
117 self.flush()
118 }
119
120 fn fill(&mut self, n: usize) -> Result<()> {
125 if self.rpos > 0 && self.rpos == self.rbuf.len() {
127 self.rbuf.clear();
128 self.rpos = 0;
129 } else if self.rpos > 16 * 1024 {
130 self.rbuf.drain(..self.rpos);
131 self.rpos = 0;
132 }
133
134 while self.rbuf.len() - self.rpos < n {
135 let mut chunk = [0u8; 8192];
136 let got = match self.sock.read(&mut chunk) {
137 Ok(n) => n,
138 Err(e) => {
139 self.broken = true;
140 return Err(e.into());
141 }
142 };
143 if got == 0 {
144 self.broken = true;
145 return Err(Error::Closed);
146 }
147 let slice = &mut chunk[..got];
148 if let Some(c) = self.read_cipher.as_mut() {
149 c.process(slice);
150 }
151 self.rbuf.extend_from_slice(slice);
152 }
153 Ok(())
154 }
155
156 pub fn read_raw(&mut self, n: usize) -> Result<Vec<u8>> {
158 self.fill(n)?;
159 let start = self.rpos;
160 self.rpos += n;
161 Ok(self.rbuf[start..start + n].to_vec())
162 }
163
164 pub fn read_i32(&mut self) -> Result<i32> {
165 self.fill(4)?;
166 let b = &self.rbuf[self.rpos..self.rpos + 4];
167 let v = i32::from_be_bytes(b.try_into().unwrap());
168 self.rpos += 4;
169 Ok(v)
170 }
171
172 pub fn read_i64(&mut self) -> Result<i64> {
173 self.fill(8)?;
174 let b = &self.rbuf[self.rpos..self.rpos + 8];
175 let v = i64::from_be_bytes(b.try_into().unwrap());
176 self.rpos += 8;
177 Ok(v)
178 }
179
180 pub fn read_f64(&mut self) -> Result<f64> {
181 Ok(f64::from_bits(self.read_i64()? as u64))
182 }
183
184 pub fn read_pad(&mut self, data_len: usize) -> Result<()> {
188 let pad = pad4(data_len) - data_len;
189 if pad > 0 {
190 let _ = self.read_raw(pad)?;
191 }
192 Ok(())
193 }
194
195 pub fn read_bytes(&mut self) -> Result<Vec<u8>> {
197 let len = self.read_i32()? as usize;
198 let data = self.read_raw(len)?;
199 self.read_pad(len)?;
200 Ok(data)
201 }
202
203 pub fn read_quad(&mut self) -> Result<u64> {
205 Ok(self.read_i64()? as u64)
206 }
207}
208
209pub fn info_payload(buf: &[u8]) -> Result<&[u8]> {
212 match buf.last() {
213 Some(&INFO_END) => Ok(&buf[..buf.len() - 1]),
214 Some(&crate::wire::consts::INFO_TRUNCATED) => {
215 Err(Error::protocol("info response truncated; buffer too small"))
216 }
217 _ => Ok(buf),
218 }
219}
220
221pub fn op_packet(opcode: i32) -> XdrWriter {
223 let mut w = XdrWriter::new();
224 w.put_i32(opcode);
225 w
226}
227
228pub fn op_name(code: i32) -> &'static str {
230 match code {
231 op::RESPONSE => "op_response",
232 op::ACCEPT => "op_accept",
233 op::ACCEPT_DATA => "op_accept_data",
234 op::COND_ACCEPT => "op_cond_accept",
235 op::REJECT => "op_reject",
236 op::DISCONNECT => "op_disconnect",
237 op::FETCH_RESPONSE => "op_fetch_response",
238 op::SQL_RESPONSE => "op_sql_response",
239 op::CONT_AUTH => "op_cont_auth",
240 op::CRYPT => "op_crypt",
241 op::CRYPT_KEY_CALLBACK => "op_crypt_key_callback",
242 op::BATCH_CS => "op_batch_cs",
243 op::TRUSTED_AUTH => "op_trusted_auth",
244 _ => "op_<other>",
245 }
246}