1use crate::config::Config;
2use crate::packet::{AuthStatus, Packet, SuccessMessage};
3use std::io::{Read, Write};
4use std::net::TcpStream;
5use std::sync::Arc;
6use std::time::Duration;
7
8#[derive(Clone, Debug)]
9pub struct Connect {
10 pub(crate) stream: Arc<TcpStream>,
12 packet: Packet,
13 auth_status: AuthStatus,
15}
16
17impl Connect {
18
19 pub fn is_valid(&self) -> bool {
21 true
22 }
23
24 pub fn _close(&mut self) {
26 let _ = self.stream.as_ref().write_all(&Packet::pack_terminate());
27 let _ = self.stream.shutdown(std::net::Shutdown::Both);
28 }
29
30 pub fn new(mut config: Config) -> Result<Connect, String> {
31 let stream = match TcpStream::connect(config.url()) {
32 Ok(stream) => stream,
33 Err(e) => return Err(e.to_string()),
34 };
35 stream.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
36 stream.set_write_timeout(Some(Duration::from_secs(5))).unwrap();
37
38 let mut connect = Self {
40 stream: Arc::new(stream),
41 packet: Packet::new(config),
42 auth_status: AuthStatus::None,
43 };
44
45 connect.startup_message()?;
46 connect.sasl_initial_response_message()?;
47
48 Ok(connect)
49 }
50
51 fn read(&mut self) -> Result<Vec<u8>, String> {
52 let mut msg = vec![];
53 loop {
54 let mut response = [0u8; 1024];
55 match self.stream.try_clone().unwrap().read(&mut response) {
56 Ok(e) => {
57 msg.extend(response[..e].to_vec());
58 }
59 Err(e) => return Err(format!("Error reading from stream: {e}")),
60 }
61 if msg.is_empty() {
62 continue;
63 }
64 if let AuthStatus::AuthenticationOk = self.auth_status {
65 if msg.ends_with(&[90, 0, 0, 0, 5, 73]) | msg.ends_with(&[90, 0, 0, 0, 5, 84]) | msg.ends_with(&[90, 0, 0, 0, 5, 69]) {
66 break;
67 }
68 continue;
69 } else {
70 let t = &msg[1..=4];
71 let len = u32::from_be_bytes(t.try_into().unwrap());
72 if msg.len() < (len as usize) {
73 continue;
74 }
75 break;
76 }
77 }
78 Ok(msg)
79 }
80 fn startup_message(&mut self) -> Result<(), String> {
82 self.stream.try_clone().unwrap().write_all(&self.packet.pack_first()).unwrap();
83 let data = self.read()?;
84 self.packet.unpack(data, 0)?;
85 Ok(())
86 }
87 fn sasl_initial_response_message(&mut self) -> Result<(), String> {
89 self.stream.try_clone().unwrap().write_all(&self.packet.pack_auth()).unwrap();
90 let data = self.read()?;
91 self.packet.unpack(data, 0)?;
92 self.stream.try_clone().unwrap().write_all(&self.packet.pack_auth_verify()).unwrap();
93 let data = self.read()?;
94 self.packet.unpack(data, 0)?;
95 self.auth_status = AuthStatus::AuthenticationOk;
96 Ok(())
97 }
98 pub fn query(&mut self, sql: &str) -> Result<SuccessMessage, String> {
100 self.stream.as_ref()
106 .write_all(&self.packet.pack_query(sql))
107 .map_err(|e| format!("query error: {}", e))?;
108
109 let data = self.read()
110 .map_err(|e| format!("query read error: {}", e))?;
111
112 self.packet.unpack(data, 0)
113
114 }
118 pub fn execute(&mut self, sql: &str) -> Result<SuccessMessage, String> {
120 self.stream.as_ref()
129 .write_all(&self.packet.pack_execute(sql))
130 .map_err(|e| format!("execute error: {}", e))?;
131 let data = self.read()?;
132 self.packet.unpack(data, 0)
133 }
134}
135impl Drop for Connect {
136
137 fn drop(&mut self) {
138 crate::pools::DB_POOL.lock().unwrap().push_back(self.clone());
139 }
140
141 }