1use std::net::{TcpStream, ToSocketAddrs};
5
6use crate::auth::srp::{SrpClient, SrpHash, parse_server_data};
7use crate::auth::wirecrypt::{WireCryptPlugin, make_ciphers};
8use crate::config::{ConnectConfig, WireCrypt};
9use crate::error::{Error, Result};
10use crate::wire::consts::*;
11use crate::wire::response::{read_op, read_response, read_response_body};
12use crate::wire::stream::{FbStream, op_name, op_packet};
13use crate::wire::xdr::{ParameterBuffer, XdrWriter};
14
15const OFFERED_PROTOCOLS: &[i32] = &[
17 PROTOCOL_VERSION13,
18 PROTOCOL_VERSION15,
19 PROTOCOL_VERSION16,
20 PROTOCOL_VERSION17,
21 PROTOCOL_VERSION18,
22 PROTOCOL_VERSION19,
23];
24
25pub struct Connection {
27 stream: FbStream,
28 db_handle: i32,
29 protocol_version: i32,
30 charset: crate::charset::Charset,
31 event_seq: i32,
33}
34
35impl Connection {
36 pub fn connect(config: &ConnectConfig) -> Result<Connection> {
38 Self::open(config, false)
39 }
40
41 pub fn create_database(config: &ConnectConfig) -> Result<Connection> {
43 Self::open(config, true)
44 }
45
46 fn open(config: &ConnectConfig, create: bool) -> Result<Connection> {
47 Self::open_inner(config, create)
48 }
49
50 fn open_inner(config: &ConnectConfig, create: bool) -> Result<Connection> {
51 let connect_op = if create { op::CREATE } else { op::ATTACH };
52
53 let Handshake {
56 mut stream,
57 protocol_version,
58 auth,
59 } = handshake(config, connect_op, &config.database)?;
60
61 let dpb = build_dpb(config, create, &auth);
63 let mut w = op_packet(connect_op);
64 w.put_i32(0); w.put_str(&config.database);
66 w.put_bytes(&dpb);
67 stream.send(&w)?;
68 let resp = attach_response(&mut stream)?;
69
70 let mut conn = Connection {
71 stream,
72 db_handle: resp.handle,
73 protocol_version,
74 charset: crate::charset::Charset::from_name(&config.charset),
75 event_seq: 0,
76 };
77
78 if config.native_data_types && protocol_version >= 16 {
82 for stmt in [
83 "SET BIND OF INT128 TO NATIVE",
84 "SET BIND OF DECFLOAT TO NATIVE",
85 "SET BIND OF TIME ZONE TO NATIVE",
86 ] {
87 conn.exec_immediate(None, stmt)?;
88 }
89 }
90
91 Ok(conn)
92 }
93
94 pub(crate) fn next_event_id(&mut self) -> i32 {
96 self.event_seq += 1;
97 self.event_seq
98 }
99
100 pub fn charset(&self) -> crate::charset::Charset {
102 self.charset
103 }
104
105 pub fn close(mut self) -> Result<()> {
107 let mut w = op_packet(op::DETACH);
108 w.put_i32(self.db_handle);
109 self.stream.send(&w)?;
110 let _ = read_response(&mut self.stream)?;
111 Ok(())
112 }
113
114 pub fn ping(&mut self) -> Result<()> {
116 let w = op_packet(op::PING);
117 self.stream.send(&w)?;
118 read_response(&mut self.stream)?;
119 Ok(())
120 }
121
122 pub fn protocol_version(&self) -> i32 {
124 self.protocol_version
125 }
126
127 pub fn supports_batch(&self) -> bool {
129 self.protocol_version >= 16
130 }
131
132 pub fn supports_fetch_scroll(&self) -> bool {
134 self.protocol_version >= 17
135 }
136
137 pub fn exec_immediate(
144 &mut self,
145 tx: Option<&crate::transaction::Transaction>,
146 sql: &str,
147 ) -> Result<()> {
148 match tx {
149 Some(t) => self.exec_immediate_inner(t.handle(), sql),
150 None => {
151 let implicit_tx = self.begin()?;
153 let tx_handle = implicit_tx.handle();
154 match self.exec_immediate_inner(tx_handle, sql) {
155 Ok(()) => implicit_tx.commit(self),
156 Err(e) => {
157 let _ = implicit_tx.rollback(self);
158 Err(e)
159 }
160 }
161 }
162 }
163 }
164
165 fn exec_immediate_inner(&mut self, tx_handle: i32, sql: &str) -> Result<()> {
168 let mut w = op_packet(op::EXEC_IMMEDIATE);
169 w.put_i32(tx_handle); w.put_i32(self.db_handle); w.put_i32(3); w.put_str(sql);
173 w.put_bytes(&[]); w.put_i32(0); self.stream.send(&w)?;
176 read_response(&mut self.stream)?;
177 Ok(())
178 }
179
180 pub fn is_encrypted(&self) -> bool {
182 self.stream.is_encrypted()
183 }
184
185 pub fn is_healthy(&self) -> bool {
189 !self.stream.is_broken()
190 }
191
192 pub(crate) fn io(&mut self) -> &mut FbStream {
195 &mut self.stream
196 }
197
198 pub(crate) fn db_handle(&self) -> i32 {
199 self.db_handle
200 }
201}
202
203struct Accept {
205 version: i32,
206 data: Vec<u8>,
208 plugin: String,
210 authenticated: bool,
212 keys: Vec<u8>,
214 cond: bool,
218}
219
220fn read_accept(stream: &mut FbStream) -> Result<Accept> {
221 let code = read_op(stream)?;
222 match code {
223 c if c == op::ACCEPT => {
224 let version = stream.read_i32()?;
225 let _arch = stream.read_i32()?;
226 let _ptype = stream.read_i32()?;
227 Ok(Accept {
228 version,
229 data: Vec::new(),
230 plugin: String::new(),
231 authenticated: true,
232 keys: Vec::new(),
233 cond: false,
234 })
235 }
236 c if c == op::ACCEPT_DATA || c == op::COND_ACCEPT => {
240 let version = stream.read_i32()?;
241 let _arch = stream.read_i32()?;
242 let _ptype = stream.read_i32()?;
243 let data = stream.read_bytes()?;
244 let plugin = String::from_utf8_lossy(&stream.read_bytes()?)
245 .trim()
246 .to_string();
247 let authenticated = stream.read_i32()? != 0;
248 let keys = stream.read_bytes()?;
249 Ok(Accept {
250 version,
251 data,
252 plugin,
253 authenticated,
254 keys,
255 cond: c == op::COND_ACCEPT,
256 })
257 }
258 c if c == op::REJECT => Err(Error::auth("server rejected the connection")),
259 c if c == op::RESPONSE => {
260 crate::wire::response::read_response_body(stream)?.into_result()?;
262 Err(Error::protocol("unexpected op_response during connect"))
263 }
264 other => Err(Error::protocol(format!(
265 "unexpected handshake packet {} ({other})",
266 op_name(other)
267 ))),
268 }
269}
270
271pub(crate) struct AuthData {
273 pub(crate) plugin: String,
274 pub(crate) proof_hex: String,
275 pub(crate) session_key: Vec<u8>,
276}
277
278pub(crate) enum AuthState {
281 Proof(AuthData),
283 Legacy,
285 Done,
287}
288
289pub(crate) struct Handshake {
292 pub(crate) stream: FbStream,
293 pub(crate) protocol_version: i32,
294 pub(crate) auth: AuthState,
295}
296
297pub(crate) fn handshake(
302 config: &ConnectConfig,
303 connect_op: i32,
304 target: &str,
305) -> Result<Handshake> {
306 config.validate()?;
307 let sock = connect_socket(config)?;
308 let mut stream = FbStream::new(sock);
309
310 let mut srp = SrpClient::new(SrpHash::Sha256);
311
312 let pubkey = srp.public_key_hex();
314 let cnct = build_cnct_block(config, &pubkey);
315 dbg_log(&format!("pubkey hex ({} chars)", pubkey.len()));
316 dbg_log(&format!("cnct ({} bytes): {}", cnct.len(), hexdump(&cnct)));
317 let mut w = op_packet(op::CONNECT);
318 w.put_i32(connect_op); w.put_i32(CONNECT_VERSION3);
320 w.put_i32(ARCH_GENERIC);
321 w.put_str(target); w.put_i32(OFFERED_PROTOCOLS.len() as i32);
323 w.put_bytes(&cnct); for (i, &version) in OFFERED_PROTOCOLS.iter().enumerate() {
325 w.put_i32(version);
326 w.put_i32(ARCH_GENERIC);
327 w.put_i32(PTYPE_RPC); w.put_i32(PTYPE_BATCH_SEND); w.put_i32((i + 1) as i32); }
331 stream.send(&w)?;
332 dbg_log("sent op_connect");
333
334 let accept = read_accept(&mut stream)?;
336 let protocol_version = accept.version & 0x7fff;
339 dbg_log(&format!(
340 "accept: proto={protocol_version} plugin={:?} authenticated={} data_len={} keys_len={}",
341 accept.plugin,
342 accept.authenticated,
343 accept.data.len(),
344 accept.keys.len()
345 ));
346
347 let auth = compute_auth(config, &mut srp, &accept)?;
350 let session_key = auth.as_ref().map(|a| a.session_key.clone());
351 dbg_log(&format!("auth computed; have_proof={}", auth.is_some()));
352
353 let auth = match (auth, accept.cond, config.wire_crypt != WireCrypt::Disabled) {
359 (Some(a), true, true) => {
360 let keys = continue_auth(&mut stream, &a)?;
361 negotiate_crypt(&mut stream, config, Some(&a.session_key), &keys)?;
362 AuthState::Done
363 }
364 (Some(a), _, _) => {
365 negotiate_crypt(&mut stream, config, session_key.as_deref(), &accept.keys)?;
366 AuthState::Proof(a)
367 }
368 (None, _, _) => {
369 negotiate_crypt(&mut stream, config, session_key.as_deref(), &accept.keys)?;
370 AuthState::Legacy
371 }
372 };
373 dbg_log(&format!(
374 "crypt negotiated; encrypted={}",
375 stream.is_encrypted()
376 ));
377
378 Ok(Handshake {
379 stream,
380 protocol_version,
381 auth,
382 })
383}
384
385fn connect_socket(config: &ConnectConfig) -> Result<TcpStream> {
386 let addrs: Vec<_> = (config.host.as_str(), config.port)
387 .to_socket_addrs()?
388 .collect();
389 if addrs.is_empty() {
390 return Err(Error::protocol("host resolution returned no addresses"));
391 }
392
393 let mut last_err = None;
394 for addr in addrs {
395 let result = match config.connect_timeout {
396 Some(timeout) => TcpStream::connect_timeout(&addr, timeout),
397 None => TcpStream::connect(addr),
398 };
399 match result {
400 Ok(sock) => return Ok(sock),
401 Err(e) if e.kind() == std::io::ErrorKind::TimedOut => return Err(Error::Timeout),
402 Err(e) => last_err = Some(e),
403 }
404 }
405
406 Err(last_err
407 .unwrap_or_else(|| {
408 std::io::Error::new(std::io::ErrorKind::NotFound, "no socket address resolved")
409 })
410 .into())
411}
412
413fn compute_auth(
416 config: &ConnectConfig,
417 srp: &mut SrpClient,
418 accept: &Accept,
419) -> Result<Option<AuthData>> {
420 if accept.data.is_empty() || accept.authenticated {
421 return Ok(None);
422 }
423
424 let hash = match accept.plugin.as_str() {
425 "Srp256" => SrpHash::Sha256,
426 "Srp" => SrpHash::Sha1,
427 other => return Err(Error::auth(format!("unsupported auth plugin '{other}'"))),
428 };
429 srp.set_hash(hash);
430
431 let (salt, b_pub) = parse_server_data(&accept.data)?;
432 let user = config.normalized_user();
433 let (proof, key) = srp.proof(&user, &config.password, &salt, &b_pub)?;
434
435 Ok(Some(AuthData {
436 plugin: accept.plugin.clone(),
437 proof_hex: crate::auth::srp::to_hex(&proof),
438 session_key: key,
439 }))
440}
441
442fn continue_auth(stream: &mut FbStream, auth: &AuthData) -> Result<Vec<u8>> {
448 let mut w = op_packet(op::CONT_AUTH);
449 w.put_str(&auth.proof_hex);
450 w.put_str(&auth.plugin);
451 w.put_str("Srp256,Srp,Legacy_Auth");
452 w.put_bytes(&[]);
453 stream.send(&w)?;
454 let resp = read_response(stream)?;
455 Ok(resp.data)
456}
457
458pub(crate) fn attach_response(stream: &mut FbStream) -> Result<crate::wire::response::Response> {
463 loop {
464 let code = read_op(stream)?;
465 if code == op::RESPONSE {
466 return read_response_body(stream)?.into_result();
467 } else if code == op::CONT_AUTH {
468 for _ in 0..4 {
471 let _ = stream.read_bytes()?;
472 }
473 } else {
474 return Err(Error::protocol(format!(
475 "unexpected packet after attach: {} ({code})",
476 op_name(code)
477 )));
478 }
479 }
480}
481
482fn negotiate_crypt(
484 stream: &mut FbStream,
485 config: &ConnectConfig,
486 session_key: Option<&[u8]>,
487 keys: &[u8],
488) -> Result<()> {
489 if config.wire_crypt == WireCrypt::Disabled {
490 return Ok(());
491 }
492
493 let key = match session_key {
494 Some(k) => k,
495 None => {
496 if config.wire_crypt == WireCrypt::Required {
497 return Err(Error::auth(
498 "encryption required but no session key was negotiated",
499 ));
500 }
501 return Ok(());
502 }
503 };
504
505 let (plugin, nonce) = if let Some(n) = find_after(keys, b"ChaCha\x00", 12) {
509 (WireCryptPlugin::ChaCha, n)
510 } else if let Some(n) = find_after(keys, b"ChaCha64\x00", 8) {
511 (WireCryptPlugin::ChaCha64, n)
512 } else if contains_subslice(keys, b"Arc4") {
513 (WireCryptPlugin::Arc4, Vec::new())
514 } else {
515 if config.wire_crypt == WireCrypt::Required {
516 return Err(Error::auth("server offers no supported wire-crypt plugin"));
517 }
518 return Ok(()); };
520
521 let mut w = op_packet(op::CRYPT);
522 w.put_str(plugin.name()); w.put_str("Symmetric"); stream.send(&w)?;
525
526 let (rd, wr) = make_ciphers(plugin, key, &nonce);
528 stream.enable_encryption(rd, wr);
529
530 read_response(stream)?;
531 Ok(())
532}
533
534fn find_after(keys: &[u8], marker: &[u8], n: usize) -> Option<Vec<u8>> {
537 let i = keys.windows(marker.len()).position(|w| w == marker)?;
538 let start = i + marker.len();
539 keys.get(start..start + n).map(|s| s.to_vec())
540}
541
542fn wire_crypt_level(wc: WireCrypt) -> i32 {
547 match wc {
548 WireCrypt::Disabled => wire_crypt::DISABLED,
549 WireCrypt::Enabled => wire_crypt::ENABLED,
550 WireCrypt::Required => wire_crypt::REQUIRED,
551 }
552}
553
554fn build_cnct_block(config: &ConnectConfig, public_key_hex: &str) -> Vec<u8> {
557 let mut b = Vec::new();
558 let user = config.normalized_user();
559
560 push_cnct(&mut b, cnct::LOGIN, user.as_bytes());
561 push_cnct(&mut b, cnct::PLUGIN_NAME, b"Srp256");
562 push_cnct(&mut b, cnct::PLUGIN_LIST, b"Srp256,Srp");
563
564 if let Some(os_user) = os_user() {
566 push_cnct(&mut b, cnct::USER, os_user.as_bytes());
567 }
568 if let Some(host) = host_name() {
569 push_cnct(&mut b, cnct::HOST, host.as_bytes());
570 }
571
572 let data = public_key_hex.as_bytes();
575 let mut idx: u8 = 0;
576 let mut off = 0;
577 while off < data.len() {
578 let end = (off + 254).min(data.len());
579 let chunk = &data[off..end];
580 b.push(cnct::SPECIFIC_DATA);
581 b.push((chunk.len() + 1) as u8);
582 b.push(idx);
583 b.extend_from_slice(chunk);
584 idx = idx.wrapping_add(1);
585 off = end;
586 }
587
588 push_cnct(
589 &mut b,
590 cnct::CLIENT_CRYPT,
591 &wire_crypt_level(config.wire_crypt).to_le_bytes(),
592 );
593 b
594}
595
596fn push_cnct(buf: &mut Vec<u8>, tag: u8, value: &[u8]) {
597 debug_assert!(value.len() <= u8::MAX as usize);
598 buf.push(tag);
599 buf.push(value.len() as u8);
600 buf.extend_from_slice(value);
601}
602
603fn build_dpb(config: &ConnectConfig, create: bool, auth: &AuthState) -> Vec<u8> {
605 let mut pb = ParameterBuffer::new(DPB_VERSION1);
606
607 pb.int(dpb::SQL_DIALECT, config.dialect);
608 pb.string(dpb::LC_CTYPE, &config.charset);
609 pb.string(dpb::USER_NAME, &config.normalized_user());
610
611 match auth {
612 AuthState::Proof(a) => {
613 pb.string(dpb::AUTH_PLUGIN_NAME, &a.plugin);
614 pb.string(dpb::AUTH_PLUGIN_LIST, "Srp256,Srp");
615 pb.string(dpb::SPECIFIC_AUTH_DATA, &a.proof_hex);
616 }
617 AuthState::Legacy => {
618 pb.string(dpb::PASSWORD, &config.password);
620 }
621 AuthState::Done => {}
623 }
624
625 if let Some(role) = &config.role {
626 pb.string(dpb::ROLE_NAME, role);
627 }
628 if let Some(tz) = &config.timezone {
629 pb.string(dpb::SESSION_TIME_ZONE, tz);
630 }
631 if let Some(workers) = config.parallel_workers {
632 pb.int(dpb::PARALLEL_WORKERS, workers);
633 }
634 if let Some(t) = config.connect_timeout {
635 pb.int(
636 dpb::CONNECT_TIMEOUT,
637 t.as_secs().clamp(1, i32::MAX as u64) as i32,
638 );
639 }
640 if create && let Some(size) = config.page_size {
641 pb.int(dpb::PAGE_SIZE, size);
642 }
643
644 pb.int(dpb::PROCESS_ID, std::process::id() as i32);
645 pb.string(dpb::PROCESS_NAME, &process_name());
646
647 pb.into_vec()
648}
649
650fn process_name() -> String {
651 std::env::current_exe()
652 .ok()
653 .and_then(|p| p.file_name().map(|s| s.to_string_lossy().into_owned()))
654 .map(|mut s| {
655 s.truncate(255);
656 s
657 })
658 .unwrap_or_else(|| "fdb_driver".to_string())
659}
660
661fn dbg_log(msg: &str) {
662 if std::env::var_os("FB_DEBUG").is_some() {
663 eprintln!("[fdb] {msg}");
664 }
665}
666
667fn hexdump(b: &[u8]) -> String {
668 use std::fmt::Write;
669 let mut s = String::new();
670 for x in b {
671 let _ = write!(s, "{x:02x} ");
672 }
673 s
674}
675
676fn os_user() -> Option<String> {
677 std::env::var("USER")
678 .or_else(|_| std::env::var("USERNAME"))
679 .ok()
680 .map(|mut s| {
681 s.truncate(255);
682 s
683 })
684}
685
686fn host_name() -> Option<String> {
687 std::env::var("HOSTNAME")
688 .ok()
689 .or_else(|| {
690 std::fs::read_to_string("/etc/hostname")
691 .ok()
692 .map(|s| s.trim().to_string())
693 })
694 .filter(|s| !s.is_empty())
695 .map(|mut s| {
696 s.truncate(255);
697 s
698 })
699}
700
701fn contains_subslice(haystack: &[u8], needle: &[u8]) -> bool {
702 if needle.is_empty() || haystack.len() < needle.len() {
703 return false;
704 }
705 haystack.windows(needle.len()).any(|w| w == needle)
706}
707
708pub(crate) fn info_request(opcode: i32, handle: i32, items: &[u8], buffer_len: i32) -> XdrWriter {
711 let mut w = op_packet(opcode);
712 w.put_i32(handle);
713 w.put_i32(0); w.put_bytes(items);
715 w.put_i32(buffer_len);
716 w
717}
718
719#[cfg(test)]
720mod tests {
721 use super::*;
722
723 #[test]
724 fn cnct_block_chunks_public_key() {
725 let cfg = ConnectConfig::new().user("sysdba");
726 let hex = "a".repeat(256);
728 let block = build_cnct_block(&cfg, &hex);
729
730 let mut i = 0;
732 let mut chunks = Vec::new();
733 while i < block.len() {
734 let tag = block[i];
735 let len = block[i + 1] as usize;
736 let val = &block[i + 2..i + 2 + len];
737 if tag == cnct::SPECIFIC_DATA {
738 chunks.push((val[0], val.len() - 1));
739 }
740 i += 2 + len;
741 }
742 assert_eq!(chunks, vec![(0u8, 254usize), (1u8, 2usize)]);
743 }
744
745 #[test]
746 fn dpb_has_dialect_and_charset() {
747 let cfg = ConnectConfig::new().charset("UTF8").dialect(3);
748 let dpb = build_dpb(&cfg, false, &AuthState::Legacy);
749 assert_eq!(dpb[0], DPB_VERSION1);
750 assert!(dpb.windows(1).any(|w| w[0] == dpb::SQL_DIALECT));
752 assert!(contains_subslice(&dpb, b"UTF8"));
754 }
755
756 #[test]
757 fn subslice_search() {
758 assert!(contains_subslice(b"xxArc4yy", b"Arc4"));
759 assert!(!contains_subslice(b"xxChaChayy", b"Arc4"));
760 }
761}