1use bytes::{Buf, BufMut, BytesMut};
2use md5::{Digest, Md5};
3use std::collections::HashMap;
4use tokio::io::{AsyncReadExt, AsyncWriteExt};
5use tokio::net::TcpStream;
6use rust_decimal::Decimal;
7use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
8
9use crate::conn::{ConnectionConfig, ConnectionState};
10use crate::error::error_response::ErrorResponse;
11use crate::error::{Result, VerticaError};
12use crate::query::{ColumnInfo, QueryResult};
13use crate::types::vertica_types;
14use crate::types::Value;
15use crate::Row;
16#[derive(Debug)]
18pub enum AsyncConnectionStream {
19    Plain(TcpStream),
21    #[cfg(feature = "tls")]
22    Ssl(tokio_native_tls::TlsStream<TcpStream>),
24}
25
26impl AsyncConnectionStream {
27    pub async fn read_async(&mut self, buf: &mut [u8]) -> Result<usize> {
38        match self {
39            AsyncConnectionStream::Plain(stream) => stream
40                .read(buf)
41                .await
42                .map_err(|e| VerticaError::Connection(format!("Read error: {}", e))),
43            #[cfg(feature = "tls")]
44            AsyncConnectionStream::Ssl(stream) => stream
45                .read(buf)
46                .await
47                .map_err(|e| VerticaError::Connection(format!("SSL read error: {}", e))),
48        }
49    }
50
51    pub async fn write_all_async(&mut self, buf: &[u8]) -> Result<()> {
62        match self {
63            AsyncConnectionStream::Plain(stream) => stream
64                .write_all(buf)
65                .await
66                .map_err(|e| VerticaError::Connection(format!("Write error: {}", e))),
67            #[cfg(feature = "tls")]
68            AsyncConnectionStream::Ssl(stream) => stream
69                .write_all(buf)
70                .await
71                .map_err(|e| VerticaError::Connection(format!("SSL write error: {}", e))),
72        }
73    }
74}
75
76#[derive(Debug)]
78pub struct AsyncConnection {
79    stream: Option<AsyncConnectionStream>,
80    config: ConnectionConfig,
81    state: ConnectionState,
82    parameters: HashMap<String, String>,
83    backend_pid: Option<u32>,
84    secret_key: Option<u32>,
85}
86
87impl AsyncConnection {
88    pub async fn new(config: ConnectionConfig) -> Result<Self> {
90        let mut conn = Self {
91            stream: None,
92            config,
93            state: ConnectionState::Disconnected,
94            parameters: HashMap::new(),
95            backend_pid: None,
96            secret_key: None,
97        };
98        conn.connect().await?;
99        Ok(conn)
100    }
101
102    pub async fn new_from_url(url: &str) -> Result<Self> {
104        let config = ConnectionConfig::from_url(url)?;
105        Self::new(config).await
106    }
107
108    async fn connect(&mut self) -> Result<()> {
110        log::debug!(
111            "[ASYNC_CONNECT] Starting connection process to {}:{} as {}",
112            self.config.host,
113            self.config.port,
114            self.config.username
115        );
116        self.state = ConnectionState::Connecting;
117
118        let mut hosts = vec![format!("{}:{}", self.config.host, self.config.port)];
120        if let Some(ref backup_servers) = self.config.backup_servers {
121            hosts.extend(backup_servers.clone());
122        }
123
124        let mut _last_error = None;
125
126        for host_port in &hosts {
127            log::debug!("[ASYNC_CONNECT] Trying host: {}", host_port);
128
129            match self.connect_to_host(host_port).await {
130                Ok(stream) => {
131                    log::debug!("[ASYNC_CONNECT] Successfully connected to {}", host_port);
132                    self.stream = Some(stream);
133                    self.state = ConnectionState::Handshake;
134
135                    return self.handshake().await.map_err(|e| {
136                        log::debug!("[ASYNC_CONNECT] Handshake failed: {}", e);
137                        e
138                    });
139                }
140                Err(e) => {
141                    log::debug!("[ASYNC_CONNECT] Failed to connect to {}: {}", host_port, e);
142                    _last_error = Some(e);
143                    continue;
144                }
145            }
146        }
147
148        let error_msg = format!(
149            "Failed to connect to any host. Last error: {}",
150            _last_error.unwrap_or_else(|| VerticaError::Connection("Unknown error".to_string()))
151        );
152        log::debug!("[ASYNC_CONNECT] {}", error_msg);
153        Err(VerticaError::Connection(error_msg))
154    }
155
156    async fn connect_to_host(&self, host_port: &str) -> Result<AsyncConnectionStream> {
158        let parts: Vec<&str> = host_port.split(':').collect();
159        if parts.len() != 2 {
160            return Err(VerticaError::Connection(format!(
161                "Invalid host:port format: {}",
162                host_port
163            )));
164        }
165
166        let host = parts[0];
167        let port: u16 = parts[1]
168            .parse()
169            .map_err(|e| VerticaError::Connection(format!("Invalid port number: {}", e)))?;
170
171        let addrs = tokio::net::lookup_host(format!("{}:{}", host, port))
173            .await
174            .map_err(|e| {
175                VerticaError::Connection(format!("Failed to resolve host {}: {}", host, e))
176            })?;
177
178        let mut _last_error = None;
179
180        for addr in addrs {
181            log::debug!("[ASYNC_CONNECT] Trying IP address: {}", addr);
182
183            match tokio::time::timeout(self.config.connect_timeout, TcpStream::connect(addr)).await
184            {
185                Ok(Ok(stream)) => {
186                    stream.set_nodelay(true).map_err(|e| {
188                        VerticaError::Connection(format!("Failed to set TCP_NODELAY: {}", e))
189                    })?;
190
191                    return Ok(AsyncConnectionStream::Plain(stream));
192                }
193                Ok(Err(e)) => {
194                    log::debug!("[ASYNC_CONNECT] Failed to connect to {}: {}", addr, e);
195                    _last_error = Some(e);
196                    continue;
197                }
198                Err(_) => {
199                    log::debug!("[ASYNC_CONNECT] Connection timeout to {}", addr);
200                    _last_error = Some(std::io::Error::new(
201                        std::io::ErrorKind::TimedOut,
202                        "Connection timeout",
203                    ));
204                    continue;
205                }
206            }
207        }
208
209        Err(VerticaError::Connection(format!(
210            "Failed to connect to {}",
211            host_port
212        )))
213    }
214
215    async fn handshake(&mut self) -> Result<()> {
217        log::debug!("[ASYNC_HANDSHAKE] Starting handshake process");
218        let mut buf = BytesMut::with_capacity(1024);
219
220        let username = self.config.username.clone();
221        let database = self.config.database.clone();
222
223        buf.put_i32(0); buf.put_i32(196608); self.write_string_async(&mut buf, "user").await?;
227        self.write_string_async(&mut buf, &username).await?;
228
229        self.write_string_async(&mut buf, "database").await?;
230        self.write_string_async(&mut buf, &database).await?;
231
232        self.write_string_async(&mut buf, "client_encoding").await?;
233        self.write_string_async(&mut buf, "UTF8").await?;
234
235        self.write_string_async(&mut buf, "").await?; let len = buf.len() as i32;
238        buf[0..4].copy_from_slice(&len.to_be_bytes());
239
240        log::debug!("[ASYNC_HANDSHAKE] Sending startup message ({} bytes)", len);
241        self.write_all_async(&buf).await.map_err(|e| {
242            log::debug!("[ASYNC_HANDSHAKE] Failed to send startup message: {}", e);
243            e
244        })?;
245
246        log::debug!("[ASYNC_HANDSHAKE] Startup message sent, starting authentication");
247        self.authenticate().await?;
248        log::debug!("[ASYNC_HANDSHAKE] Handshake completed successfully");
249        Ok(())
250    }
251
252    async fn authenticate(&mut self) -> Result<()> {
254        log::debug!("[ASYNC_AUTH] Starting authentication process");
255        self.state = ConnectionState::Authenticating;
256
257        let mut message_count = 0;
258        loop {
259            message_count += 1;
260            log::debug!("[ASYNC_AUTH] Waiting for message #{}", message_count);
261
262            let mut msg = self.read_message().await.map_err(|e| {
263                log::debug!(
264                    "[ASYNC_AUTH] Failed to read message #{}: {}",
265                    message_count,
266                    e
267                );
268                e
269            })?;
270
271            log::debug!(
272                "[ASYNC_AUTH] Received message type: {}",
273                msg.message_type as char
274            );
275
276            match msg.message_type {
277                b'R' => {
278                    let auth_type = msg.read_i32().map_err(|e| {
280                        log::debug!("[ASYNC_AUTH] Failed to read auth type: {}", e);
281                        e
282                    })?;
283                    log::debug!("[ASYNC_AUTH] Authentication request type: {}", auth_type);
284
285                    match auth_type {
286                        0 => {
287                            log::debug!("[ASYNC_AUTH] Authentication successful");
289                        }
290                        3 => {
291                            log::debug!("[ASYNC_AUTH] Clear text password required");
293                            self.send_password_response().await?;
294                        }
295                        5 => {
296                            log::debug!("[ASYNC_AUTH] MD5 password required");
298                            let salt = msg.read_bytes(4)?;
299                            log::debug!("[ASYNC_AUTH] Salt: {:02x?}", salt);
300                            self.send_md5_password_response(&salt).await?;
301                        }
302                        _ => {
303                            log::error!(
304                                "[ASYNC_AUTH] Unsupported authentication type: {}",
305                                auth_type
306                            );
307                            return Err(VerticaError::Authentication(format!(
308                                "Unsupported authentication type: {}",
309                                auth_type
310                            )));
311                        }
312                    }
313                }
314                b'K' => {
315                    let pid = msg.read_u32()?;
316                    let key = msg.read_u32()?;
317                    self.backend_pid = Some(pid);
318                    self.secret_key = Some(key);
319                    log::debug!("[ASYNC_AUTH] Backend PID: {}, Secret key: {}", pid, key);
320                }
321                b'S' => {
322                    log::debug!(
323                        "[ASYNC_AUTH] Processing parameter message, data length: {}",
324                        msg.data.len()
325                    );
326
327                    if msg.data.is_empty() {
328                        log::warn!("[ASYNC_AUTH] Empty parameter message, skipping");
329                        continue;
330                    }
331
332                    let name = match msg.read_string() {
333                        Ok(name) => name,
334                        Err(e) => {
335                            log::warn!("[ASYNC_AUTH] Failed to read parameter name: {}", e);
336                            continue;
337                        }
338                    };
339
340                    let value = match msg.read_string() {
341                        Ok(value) => value,
342                        Err(e) => {
343                            log::warn!(
344                                "[ASYNC_AUTH] Failed to read parameter value for '{}': {}",
345                                name,
346                                e
347                            );
348                            continue;
349                        }
350                    };
351
352                    self.parameters.insert(name.clone(), value.clone());
353                    log::debug!("[ASYNC_AUTH] Parameter: {} = {}", name, value);
354                }
355                b'Z' => {
356                    self.state = ConnectionState::Ready;
357                    break;
358                }
359                _ => {
360                    return Err(VerticaError::Connection(format!(
361                        "Unexpected message type: {}",
362                        msg.message_type as char
363                    )));
364                }
365            }
366        }
367
368        Ok(())
369    }
370
371    async fn send_password_response(&mut self) -> Result<()> {
373        log::debug!("[ASYNC_AUTH] Sending clear text password");
374        let password = self
375            .config
376            .password
377            .clone()
378            .ok_or_else(|| VerticaError::Authentication("No password provided".to_string()))?;
379
380        let mut buf = BytesMut::with_capacity(1024);
381        buf.put_u8(b'p');
382        buf.put_i32(0); self.write_string_async(&mut buf, &password).await?;
384
385        let len = buf.len() as i32;
386        buf[1..5].copy_from_slice(&(len - 1).to_be_bytes());
387
388        self.write_all_async(&buf).await
389    }
390
391    async fn send_md5_password_response(&mut self, salt: &[u8]) -> Result<()> {
393        log::debug!("[ASYNC_AUTH] Sending MD5 password response");
394
395        let user = self.config.username.clone();
396        let password = self
397            .config
398            .password
399            .clone()
400            .ok_or_else(|| VerticaError::Authentication("No password provided".to_string()))?;
401
402        let mut hasher = Md5::new();
404        hasher.update(password.as_bytes());
405        hasher.update(user.as_bytes());
406        let hash1 = hasher.finalize();
407
408        let mut hasher = Md5::new();
409        hasher.update(&hash1);
410        hasher.update(salt);
411        let hash2 = hasher.finalize();
412
413        let md5_response = format!("md5{:x}", hash2);
414
415        let mut buf = BytesMut::with_capacity(1024);
416        buf.put_u8(b'p');
417        buf.put_i32(0); self.write_string_async(&mut buf, &md5_response).await?;
419
420        let len = buf.len() as i32;
421        buf[1..5].copy_from_slice(&(len - 1).to_be_bytes());
422
423        self.write_all_async(&buf).await
424    }
425
426    async fn read_message(&mut self) -> Result<AsyncMessage> {
428        let mut header = [0u8; 5];
429        self.read_exact_async(&mut header).await?;
430
431        let message_type = header[0];
432        let len = i32::from_be_bytes([header[1], header[2], header[3], header[4]]) - 4;
433
434        log::debug!(
435            "[ASYNC_READ] Message type: {} ({}), length: {}",
436            message_type as char,
437            message_type,
438            len
439        );
440
441        if len < 0 || len > 1_000_000 {
442            return Err(VerticaError::Connection(format!(
443                "Invalid message length: {}",
444                len
445            )));
446        }
447
448        let mut data = vec![0u8; len as usize];
449        if len > 0 {
450            self.read_exact_async(&mut data).await?;
451        }
452
453        log::debug!(
454            "[ASYNC_READ] Successfully read message, data length: {}",
455            data.len()
456        );
457
458        Ok(AsyncMessage {
459            message_type,
460            data: BytesMut::from(&data[..]),
461        })
462    }
463
464    async fn read_exact_async(&mut self, buf: &mut [u8]) -> Result<()> {
466        let mut read_index = 0;
467        let total_len = buf.len();
468
469        while read_index < total_len {
470            let bytes_read = match &mut self.stream {
471                Some(stream) => stream.read_async(&mut buf[read_index..]).await?,
472                None => return Err(VerticaError::Connection("No active connection".to_string())),
473            };
474
475            if bytes_read == 0 {
476                return Err(VerticaError::Connection(
477                    "Connection closed by server".to_string(),
478                ));
479            }
480
481            read_index += bytes_read;
482            log::debug!(
483                "[ASYNC_READ] Progress: {}/{} bytes ({} bytes this read)",
484                read_index,
485                total_len,
486                bytes_read
487            );
488        }
489        Ok(())
490    }
491
492    async fn write_all_async(&mut self, buf: &[u8]) -> Result<()> {
494        match &mut self.stream {
495            Some(stream) => stream.write_all_async(buf).await,
496            None => Err(VerticaError::Connection("No active connection".to_string())),
497        }
498    }
499
500    async fn write_string_async(&mut self, buf: &mut BytesMut, s: &str) -> Result<()> {
502        buf.extend_from_slice(s.as_bytes());
503        buf.put_u8(0); Ok(())
505    }
506
507    pub async fn simple_query(&mut self, query: &str) -> Result<QueryResult> {
509        if self.state != ConnectionState::Ready {
510            return Err(VerticaError::Connection("Connection not ready".to_string()));
511        }
512
513        let mut buf = BytesMut::with_capacity(1024);
514        buf.put_u8(b'Q');
515        buf.put_i32(0); self.write_string_async(&mut buf, query).await?;
517
518        let len = buf.len() as i32;
519        buf[1..5].copy_from_slice(&(len - 1).to_be_bytes());
520
521        self.write_all_async(&buf).await?;
522        self.process_query_result().await
523    }
524
525    async fn process_query_result(&mut self) -> Result<QueryResult> {
527        let mut columns = Vec::new();
528        let mut rows = Vec::new();
529
530        loop {
531            let mut msg = self.read_message().await?;
532
533            match msg.message_type {
534                b'T' => {
535                    log::debug!("[ASYNC_READ] Processing row description message");
536                    let field_count = msg.read_i16()?;
538                    for _ in 0..field_count {
539                        let name = msg.read_string()?;
540                        let _table_oid = msg.read_u32()?;
541                        let _column_attr_number = msg.read_i16()?;
542                        let data_type = msg.read_u32()?;
543                        let _type_size = msg.read_i16()?;
544                        let _type_modifier = msg.read_i32()?;
545                        let _format_code = msg.read_i16()?;
546
547                        columns.push(ColumnInfo {
548                            name,
549                            data_type,
550                            nullable: true, precision: None,
552                            scale: None,
553                        });
554                    }
555                }
556                b'D' => {
557                    log::debug!("[ASYNC_READ] Processing data row message");
558                    let column_count = msg.read_i16()?;
560                    let mut row = Vec::new();
561
562                    for i in 0..column_count {
563                        let len = msg.read_i32()?;
564                        if len == -1 {
565                            row.push(Value::Null);
566                        } else {
567                            let mut data = vec![0u8; len as usize];
568                            msg.read_exact(&mut data)?;
569
570                            let data_type = if (i as usize) < columns.len() {
572                                columns[i as usize].data_type
573                            } else {
574                                0 };
576
577                            let value = Self::parse_binary_value(data_type, &data)?;
578                            row.push(value);
579                        }
580                    }
581                    rows.push(row);
582                }
583                b'C' => {
584                    log::debug!("[ASYNC_READ] Processing command complete message");
585                    let _tag = msg.read_string()?;
587                    break;
588                }
589                b'Z' => {
590                    log::debug!("[ASYNC_READ] Processing ready for query message");
591                    let _status = msg.read_u8()?;
593                    break;
594                }
595                b'E' => {
596                    log::debug!("[ASYNC_READ] Processing error response message");
597                    let error_response = ErrorResponse::parse_from_message(&msg.data)?;
599                    log::error!("[ASYNC_READ] Server error: {}", error_response);
600                    return Err(error_response.to_error());
601                }
602                b'S' => {
603                    log::debug!("[ASYNC_READ] Processing notice response message");
604                    let fields = msg.read_string()?;
605                    log::info!("Notice: {}", fields);
606                    continue;
607                }
608                b'N' => {
609                    log::debug!("[ASYNC_READ] Processing notice response message");
610                    let fields = msg.read_string()?;
611                    log::info!("Notice: {}", fields);
612                    continue;
613                }
614                b't' => {
615                    let param_count = msg.read_i16()?;
617                    log::debug!("[PREPARE] ParameterDescription: {} parameters", param_count);
618
619                    for i in 0..param_count {
620                        let oid = msg.read_i32()?;
621                        log::debug!("[PREPARE]   Parameter {}: OID {}", i, oid);
622                    }
623                }
624                b'1' => {
625                    log::debug!("[ASYNC_READ] Processing parse complete message");
626                    continue;
628                }
629                b'2' => {
630                    log::debug!("[ASYNC_READ] Processing bind complete message");
631                    continue;
633                }
634                b's' => {
635                    log::debug!("[ASYNC_READ] Processing portal suspended message");
636                    continue;
638                }
639                _ => {
640                    log::warn!(
641                        "[ASYNC_READ] Unknown message type: {}",
642                        msg.message_type as char
643                    );
644                    continue;
645                }
646            }
647        }
648
649        Ok(QueryResult::from_rows(rows, columns))
650    }
651
652    pub fn state(&self) -> ConnectionState {
654        self.state
655    }
656
657    pub fn is_ready(&self) -> bool {
659        self.state == ConnectionState::Ready
660    }
661
662    pub fn parameters(&self) -> &HashMap<String, String> {
664        &self.parameters
665    }
666
667    fn parse_binary_value(data_type: u32, data: &[u8]) -> Result<Value> {
669        use byteorder::{ByteOrder, LittleEndian, BigEndian};
670        log::debug!(
671            "[PARSE_BINARY_VALUE] data_type: {}, data_len: {}, raw_data: {:?}",
672            data_type,
673            data.len(),
674            data
675        );
676
677        if data.is_empty() {
678            log::debug!("[PARSE_BINARY_VALUE] Empty data, returning empty string");
679            return Ok(Value::String(String::new()));
680        }
681
682        match data_type {
683            vertica_types::BOOLEAN => {
684                if data.len() >= 1 {
685                    let val = Value::Boolean(data[0] != 0);
686                    log::debug!("[PARSE_BOOLEAN] {:?}", val);
687                    Ok(val)
688                } else {
689                    log::debug!("[PARSE_BOOLEAN] Empty data, defaulting to false");
690                    Ok(Value::Boolean(false))
691                }
692            }
693            vertica_types::INTEGER => {
694                if data.len() >= 4 {
695                    let val = Value::Int(LittleEndian::read_i32(data));
696                    log::debug!("[PARSE_INTEGER] {:?}", val);
697                    Ok(val)
698                } else if data.len() == 2 {
699                    let val = Value::Int(LittleEndian::read_i16(data) as i32);
700                    log::debug!("[PARSE_INTEGER] 2-byte value: {:?}", val);
701                    Ok(val)
702                } else if data.len() == 1 {
703                    let val = Value::Int(data[0] as i8 as i32);
704                    log::debug!("[PARSE_INTEGER] 1-byte value: {:?}", val);
705                    Ok(val)
706                } else {
707                    if let Ok(s) = String::from_utf8(data.to_vec()) {
709                        if let Ok(num) = s.trim().parse::<i32>() {
710                            log::debug!("[PARSE_INTEGER] String format: {:?}", num);
711                            Ok(Value::Int(num))
712                        } else if let Ok(num) = s.trim().parse::<i64>() {
713                            log::debug!("[PARSE_INTEGER] Large string format: {} -> {}", num, num as i32);
715                            Ok(Value::Int(num as i32))
716                        } else {
717                            log::debug!("[PARSE_INTEGER] Invalid string format, returning string: {}", s);
718                            Ok(Value::String(s))
719                        }
720                    } else {
721                        log::debug!("[PARSE_INTEGER] Unexpected len and invalid UTF-8, returning binary");
722                        Ok(Value::Binary(data.to_vec()))
723                    }
724                }
725            }
726            vertica_types::BIGINT => {
727                if data.len() >= 8 {
728                    let val = Value::BigInt(LittleEndian::read_i64(data));
729                    log::debug!("[PARSE_BIGINT] {:?}", val);
730                    Ok(val)
731                } else if data.len() == 4 {
732                    let val = Value::BigInt(LittleEndian::read_i32(data) as i64);
733                    log::debug!("[PARSE_BIGINT] 4-byte value: {:?}", val);
734                    Ok(val)
735                } else {
736                    if let Ok(s) = String::from_utf8(data.to_vec()) {
738                        if let Ok(num) = s.trim().parse::<i64>() {
739                            log::debug!("[PARSE_BIGINT] String format: {:?}", num);
740                            Ok(Value::BigInt(num))
741                        } else {
742                            log::debug!("[PARSE_BIGINT] Invalid string format, returning string: {}", s);
743                            Ok(Value::String(s))
744                        }
745                    } else {
746                        log::debug!("[PARSE_BIGINT] Unexpected len and invalid UTF-8, returning binary");
747                        Ok(Value::Binary(data.to_vec()))
748                    }
749                }
750            }
751            vertica_types::FLOAT => {
752                log::debug!(
753                    "[PARSE_FLOAT] data_type: {}, data_len: {}, raw_bytes: {:?}",
754                    data_type,
755                    data.len(),
756                    data
757                );
758                match data.len() {
759                    8 => {
760                        let val = BigEndian::read_f64(data);
762                        log::debug!("[PARSE_FLOAT] FLOAT8: {}", val);
763                        Ok(Value::Double(val))
764                    },
765                    4 => {
766                        let val = BigEndian::read_f32(data);
768                        log::debug!("[PARSE_FLOAT] FLOAT4: {}", val);
769                        Ok(Value::Float(val))
770                    },
771                    _ => {
772                        let s = String::from_utf8_lossy(data);
774                        log::debug!("[PARSE_FLOAT] String format: {}", s);
775                        
776                        let s = s.trim();
777                        if let Ok(num) = s.parse::<f64>() {
778                            log::debug!("[PARSE_FLOAT] Parsed as f64: {}", num);
779                            Ok(Value::Double(num))
780                        } else if let Ok(num) = s.parse::<f32>() {
781                            log::debug!("[PARSE_FLOAT] Parsed as f32: {}", num);
782                            Ok(Value::Float(num))
783                        } else {
784                            Ok(Value::String(s.to_string()))
785                        }
786                    }
787                }
788            }
789            vertica_types::CHAR | vertica_types::VARCHAR | vertica_types::LONG_VARCHAR => {
790                let val = String::from_utf8(data.to_vec())
791                    .map(Value::String)
792                    .unwrap_or_else(|_| Value::Binary(data.to_vec()));
793                log::debug!("[PARSE_STRING] {:?}", val);
794                Ok(val)
795            }
796            vertica_types::DATE => {
797                if data.len() == 8 {
798                    let days = LittleEndian::read_i64(data);
800                    log::debug!(
801                        "[PARSE_DATE] Binary format - days since 2000-01-01: {}",
802                        days
803                    );
804                    Value::from_date_days(days as i32)
805                } else {
806                    if let Ok(s) = String::from_utf8(data.to_vec()) {
808                        log::debug!("[PARSE_DATE] String format: {}", s);
809                        let formats = ["%Y-%m-%d", "%Y-%m-%d %H:%M:%S", "%Y/%m/%d", "%d-%m-%Y"];
811
812                        let mut result = Value::String(s.clone());
813                        for fmt in formats.iter() {
814                            if let Ok(date) = chrono::NaiveDate::parse_from_str(&s.trim(), fmt) {
815                                result = Value::Date(date);
816                                break;
817                            }
818                        }
819
820                        Ok(result)
821                    } else {
822                        log::debug!("[PARSE_DATE] Invalid UTF-8 string, returning binary");
823                        Ok(Value::Binary(data.to_vec()))
824                    }
825                }
826            }
827            vertica_types::TIME => {
828                if data.len() == 8 {
829                    let micros = LittleEndian::read_i64(data);
831                    log::debug!(
832                        "[PARSE_TIME] Binary format - micros since midnight: {}",
833                        micros
834                    );
835                    Value::from_time_micros(micros)
836                } else {
837                    if let Ok(s) = String::from_utf8(data.to_vec()) {
839                        log::debug!("[PARSE_TIME] String format: {}", s);
840                        let formats = ["%H:%M:%S", "%Y-%m-%d %H:%M:%S", "%H:%M"];
842
843                        let mut result = Value::String(s.clone());
844                        for fmt in formats.iter() {
845                            if let Ok(time) = chrono::NaiveTime::parse_from_str(&s.trim(), fmt) {
846                                result = Value::Time(time);
847                                break;
848                            }
849                        }
850
851                        Ok(result)
852                    } else {
853                        log::debug!("[PARSE_TIME] Invalid UTF-8 string, returning binary");
854                        Ok(Value::Binary(data.to_vec()))
855                    }
856                }
857            }
858            vertica_types::TIMETZ => {
859                if data.len() == 8 {
860                    let micros = LittleEndian::read_i64(data);
862                    log::debug!(
863                        "[PARSE_TIMETZ] Binary format - micros since midnight: {}",
864                        micros
865                    );
866                    Value::from_time_micros(micros)
867                } else {
868                    if let Ok(s) = String::from_utf8(data.to_vec()) {
870                        log::debug!(
871                            "[PARSE_TIMETZ] String format: {}",
872                            s
873                        );
874                        
875                        let s = s.trim();
876                        
877                        let formats = [
879                            "%H:%M:%S%.f",      "%H:%M:%S",         ];
882
883                        let time_part = if let Some(pos) = s.find(|c: char| c == '+' || c == '-') {
885                            &s[..pos]
886                        } else {
887                            s
888                        };
889
890                        let mut result = Value::String(s.to_string()); for fmt in formats.iter() {
892                            if let Ok(time) = chrono::NaiveTime::parse_from_str(time_part, fmt) {
893                                result = Value::Time(time);
894                                log::debug!(
895                                    "[PARSE_TIMETZ] Successfully parsed as Time: {:?}",
896                                    time
897                                );
898                                break;
899                            }
900                        }
901
902                        Ok(result)
903                    } else {
904                        log::debug!(
905                            "[PARSE_TIMETZ] Invalid UTF-8 string, returning binary"
906                        );
907                        Ok(Value::Binary(data.to_vec()))
908                    }
909                }
910            }
911            vertica_types::TIMESTAMPTZ => {
912                if data.len() == 8 {
913                    let micros = LittleEndian::read_i64(data);
915                    log::debug!(
916                        "[PARSE_TIMESTAMPTZ] Binary format - micros since 2000-01-01: {}",
917                        micros
918                    );
919                    Value::from_timestamp_micros(micros)
920                } else {
921                    if let Ok(s) = String::from_utf8(data.to_vec()) {
923                        log::debug!(
924                            "[PARSE_TIMESTAMPTZ] String format: {}",
925                            s
926                        );
927                        let formats = [
929                            "%Y-%m-%d %H:%M:%S%.f%#z",
930                            "%Y-%m-%d %H:%M:%S%#z", 
931                            "%Y-%m-%d %H:%M:%S%.f",
932                            "%Y-%m-%d %H:%M:%S",
933                            "%Y-%m-%d %H:%M",
934                            "%Y-%m-%d",
935                        ];
936
937                        let mut result = Value::String(s.clone());
938                        for fmt in formats.iter() {
939                            if let Ok(timestamp) =
940                                chrono::NaiveDateTime::parse_from_str(&s.trim(), fmt)
941                            {
942                                result = Value::Timestamp(timestamp);
943                                break;
944                            }
945                        }
946
947                        if let Value::String(_) = result {
949                            if let Ok(date) =
950                                chrono::NaiveDate::parse_from_str(&s.trim(), "%Y-%m-%d")
951                            {
952                                result = Value::Timestamp(date.and_hms_opt(0, 0, 0).unwrap());
953                            }
954                        }
955
956                        Ok(result)
957                    } else {
958                        log::debug!(
959                            "[PARSE_TIMESTAMPTZ] Unexpected len and invalid UTF-8, returning binary"
960                        );
961                        Ok(Value::Binary(data.to_vec()))
962                    }
963                }
964            }
965            vertica_types::TIMESTAMP => {
966                if data.len() == 8 {
967                    let micros = LittleEndian::read_i64(data);
969                    log::debug!(
970                        "[PARSE_TIMESTAMP] Binary format - micros since 2000-01-01: {}",
971                        micros
972                    );
973                    Value::from_timestamp_micros(micros)
974                } else if data.len() == 19 {
975                    if let Ok(s) = String::from_utf8(data.to_vec()) {
977                        log::debug!("[PARSE_TIMESTAMP] String format: {}", s);
978                        Value::parse_timestamp(&s)
979                    } else {
980                        log::debug!("[PARSE_TIMESTAMP] Invalid UTF-8 string");
981                        Ok(Value::Binary(data.to_vec()))
982                    }
983                } else {
984                    if let Ok(s) = String::from_utf8(data.to_vec()) {
986                        log::debug!(
987                            "[PARSE_TIMESTAMP] Variable string format: {}",
988                            s
989                        );
990                        let formats = [
992                            "%Y-%m-%d %H:%M:%S",
993                            "%Y-%m-%d %H:%M:%S%.f",
994                            "%Y-%m-%d %H:%M",
995                            "%Y-%m-%d",
996                        ];
997
998                        let mut result = Value::String(s.clone());
999                        for fmt in formats.iter() {
1000                            if let Ok(timestamp) =
1001                                chrono::NaiveDateTime::parse_from_str(&s.trim(), fmt)
1002                            {
1003                                result = Value::Timestamp(timestamp);
1004                                break;
1005                            }
1006                        }
1007
1008                        if let Value::String(_) = result {
1010                            if let Ok(date) =
1011                                chrono::NaiveDate::parse_from_str(&s.trim(), "%Y-%m-%d")
1012                            {
1013                                result = Value::Timestamp(date.and_hms_opt(0, 0, 0).unwrap());
1014                            }
1015                        }
1016
1017                        Ok(result)
1018                    } else {
1019                        log::debug!(
1020                            "[PARSE_TIMESTAMP] Unexpected len and invalid UTF-8, returning binary"
1021                        );
1022                        Ok(Value::Binary(data.to_vec()))
1023                    }
1024                }
1025            }
1026            vertica_types::NUMERIC => {
1027                log::debug!(
1028                    "[PARSE_NUMERIC] type_oid: {}, data_len: {}, raw_bytes: {:?}",
1029                    data_type,
1030                    data.len(),
1031                    data
1032                );
1033                match data.len() {
1034                    8 => {
1035                        let val = BigEndian::read_f64(data);
1037                        log::debug!("[PARSE_NUMERIC] NUMERIC8: {}", val);
1038                        if let Some(decimal) = Decimal::from_f64(val) {
1039                            Ok(Value::Decimal(decimal))
1040                        } else {
1041                            Ok(Value::Double(val))
1042                        }
1043                    },
1044                    4 => {
1045                        let val = BigEndian::read_f32(data);
1047                        log::debug!("[PARSE_NUMERIC] NUMERIC4: {}", val);
1048                        if let Some(decimal) = Decimal::from_f32(val) {
1049                            Ok(Value::Decimal(decimal))
1050                        } else {
1051                            Ok(Value::Float(val))
1052                        }
1053                    },
1054                    16 => {
1055                        let s = String::from_utf8_lossy(data);
1057                        log::debug!("[PARSE_NUMERIC] NUMERIC16 string: {}", s);
1058                        if let Ok(decimal) = s.trim().parse::<Decimal>() {
1059                            Ok(Value::Decimal(decimal))
1060                        } else if let Ok(num) = s.trim().parse::<f64>() {
1061                            Ok(Value::Double(num))
1062                        } else {
1063                            Ok(Value::String(s.to_string()))
1064                        }
1065                    },
1066                    _ => {
1067                        let s = String::from_utf8_lossy(data);
1069                        log::debug!("[PARSE_NUMERIC] String format: {}", s);
1070                        
1071                        let s = s.trim();
1072                        if let Ok(decimal) = s.parse::<Decimal>() {
1073                            log::debug!("[PARSE_NUMERIC] Parsed as Decimal: {}", decimal);
1074                            Ok(Value::Decimal(decimal))
1075                        } else if let Ok(num) = s.parse::<f64>() {
1076                            log::debug!("[PARSE_NUMERIC] Parsed as f64: {}", num);
1077                            Ok(Value::Double(num))
1078                        } else if let Ok(num) = s.parse::<f32>() {
1079                            log::debug!("[PARSE_NUMERIC] Parsed as f32: {}", num);
1080                            Ok(Value::Float(num))
1081                        } else if let Ok(num) = s.parse::<i64>() {
1082                            log::debug!("[PARSE_NUMERIC] Parsed as i64: {}", num);
1083                            Ok(Value::BigInt(num))
1084                        } else if let Ok(num) = s.parse::<i32>() {
1085                            log::debug!("[PARSE_NUMERIC] Parsed as i32: {}", num);
1086                            Ok(Value::Int(num))
1087                        } else {
1088                            Ok(Value::String(s.to_string()))
1089                        }
1090                    }
1091                }
1092            }
1093            vertica_types::UUID => {
1094                if data.len() >= 16 {
1095                    let val = Value::from_uuid_bytes(data.to_vec());
1096                    log::debug!("[PARSE_UUID] {:?}", val);
1097                    val
1098                } else {
1099                    log::debug!("[PARSE_UUID] Unexpected len, returning binary");
1100                    Ok(Value::Binary(data.to_vec()))
1101                }
1102            }
1103            vertica_types::VARBINARY | vertica_types::LONG_VARBINARY | vertica_types::BINARY => {
1104                let val = Value::Binary(data.to_vec());
1105                log::debug!("[PARSE_BINARY] {} bytes", data.len());
1106                Ok(val)
1107            }
1108            _ => {
1109                if let Ok(s) = String::from_utf8(data.to_vec()) {
1110                    log::debug!("[PARSE_UNKNOWN] UTF-8 string: {}", s);
1111                    Ok(Value::String(s))
1112                } else {
1113                    log::debug!("[PARSE_UNKNOWN] Binary: {} bytes", data.len());
1114                    Ok(Value::Binary(data.to_vec()))
1115                }
1116            }
1117        }
1118    }
1119
1120    pub async fn prepare(&mut self, stmt_id: &str, query: &str) -> Result<()> {
1122        if self.state != ConnectionState::Ready {
1123            return Err(VerticaError::Connection("Connection not ready".to_string()));
1124        }
1125
1126        log::debug!("[PREPARE] Starting prepare for statement: {}", stmt_id);
1127        log::debug!("[PREPARE] SQL: {}", query);
1128
1129        let mut parse_buf = BytesMut::with_capacity(1024);
1131        parse_buf.put_u8(b'P'); parse_buf.put_i32(0); self.write_string_async(&mut parse_buf, stmt_id).await?;
1136        self.write_string_async(&mut parse_buf, query).await?;
1138        parse_buf.put_i16(0);
1140
1141        let len = parse_buf.len() as i32;
1142        parse_buf[1..5].copy_from_slice(&(len - 1).to_be_bytes());
1143
1144        log::debug!("[PREPARE] Sending Parse message ({} bytes)", len);
1145        self.write_all_async(&parse_buf).await?;
1146
1147        let mut describe_buf = BytesMut::with_capacity(64);
1149        describe_buf.put_u8(b'D'); describe_buf.put_i32(0); describe_buf.put_u8(b'S'); self.write_string_async(&mut describe_buf, stmt_id).await?;
1153
1154        let describe_len = describe_buf.len() as i32;
1155        describe_buf[1..5].copy_from_slice(&(describe_len - 1).to_be_bytes());
1156
1157        log::debug!(
1158            "[PREPARE] Sending Describe message ({} bytes)",
1159            describe_len
1160        );
1161        self.write_all_async(&describe_buf).await?;
1162
1163        let mut sync_buf = BytesMut::with_capacity(16);
1165        sync_buf.put_u8(b'S'); sync_buf.put_i32(4); log::debug!("[PREPARE] Sending Sync message");
1169        self.write_all_async(&sync_buf).await?;
1170
1171        let mut parse_complete_received = false;
1173        let  ready_for_query_received :bool;
1174
1175
1176        loop {
1177            let mut msg = self.read_message().await?;
1178
1179            match msg.message_type {
1180                b'1' => {
1181                    log::debug!("[PREPARE] ParseComplete received");
1183                    parse_complete_received = true;
1184                }
1185
1186                b't' => {
1187                    let param_count = msg.read_i16()?;
1189                    log::debug!("[PREPARE] ParameterDescription: {} parameters", param_count);
1190
1191                    for i in 0..param_count {
1192                        let oid = msg.read_i32()?;
1193                        log::debug!("[PREPARE]   Parameter {}: OID {}", i, oid);
1194                    }
1195                }
1196
1197                b'T' => {
1198                    let field_count = msg.read_i16()?;
1200                    log::debug!("[PREPARE] RowDescription: {} columns", field_count);
1201
1202                    let mut columns: Vec<ColumnInfo> = Vec::new();
1203                    for i in 0..field_count {
1204                        let name = msg.read_string()?;
1205                       let type_oid = msg.read_u32()?;
1206                        let column = ColumnInfo {
1207                            name: name.clone(),
1208                            data_type: type_oid,
1209                            nullable: true,
1210                            precision: None,
1211                            scale: None,
1212                        };
1213                        columns.push(column);
1214
1215                        log::debug!(
1216                            "[PREPARE]   Column {}: name={}, type_oid={}",
1217                            i,
1218                            name,
1219                            type_oid
1220                        );
1221                    }
1222                }
1223
1224                b'n' => {
1225                    log::debug!("[PREPARE] NoData received (no result set)");
1227                }
1228
1229                b'E' => {
1230                    let error_response = ErrorResponse::parse_from_message(&msg.data)?;
1232                    log::error!("[PREPARE] ErrorResponse: {}", error_response);
1233                    return Err(error_response.to_error());
1234                }
1235
1236                b'S' => {
1237                    let name = msg.read_string()?;
1239                    let value = msg.read_string()?;
1240                    self.parameters.insert(name.clone(), value.clone());
1241                    log::debug!("[PREPARE] ParameterStatus: {} = {}", name, value);
1242                }
1243
1244                b'Z' => {
1245                    let status = msg.read_u8()?;
1247                    log::debug!("[PREPARE] ReadyForQuery: status={}", status as char);
1248                    ready_for_query_received = true;
1249
1250                    break;
1252                }
1253
1254                b'C' => {
1255                    let tag = msg.read_string()?;
1257                    log::debug!("[PREPARE] CommandComplete: {}", tag);
1258                }
1259
1260                b'D' => {
1261                    let col_count = msg.read_i16()?;
1263                    log::warn!(
1264                        "[PREPARE] Unexpected DataRow: {} columns - skipping",
1265                        col_count
1266                    );
1267
1268                    for _ in 0..col_count {
1270                        let len = msg.read_i32()?;
1271                        if len > 0 {
1272                            let _data = msg.read_bytes(len as usize)?;
1273                        }
1274                    }
1275                }
1276
1277                b'N' => {
1278                    let notice = msg.read_string()?;
1280                    log::info!("[PREPARE] Notice: {}", notice);
1281                }
1282
1283                _ => {
1284                    log::warn!(
1286                        "[PREPARE] Unknown message type: {} ({}) - skipping",
1287                        msg.message_type as char,
1288                        msg.message_type
1289                    );
1290                    continue;
1291                }
1292            }
1293        }
1294
1295        if !ready_for_query_received {
1297            log::warn!("[PREPARE] Missing ReadyForQuery, prepare may have failed");
1298            return Err(VerticaError::Connection(
1299                "Incomplete prepare response - missing ReadyForQuery".to_string(),
1300            ));
1301        }
1302
1303        if !parse_complete_received {
1305            log::warn!(
1306                "[PREPARE] No ParseComplete received, but continuing as server indicated ready"
1307            );
1308        }
1309
1310        log::debug!("[PREPARE] Prepare process completed");
1311        Ok(())
1312    }
1313
1314    pub async fn execute_prepared(
1316        &mut self,
1317        statement_id: &str,
1318        params: &[Value],
1319    ) -> Result<QueryResult> {
1320        log::debug!(
1321            "[EXECUTE_PREPARED] Executing prepared statement: {} with {} parameters",
1322            statement_id,
1323            params.len()
1324        );
1325
1326        let mut bind_msg = BytesMut::new();
1328        bind_msg.put_u8(b'B'); bind_msg.put_i32(0); bind_msg.put_slice(b"");
1333        bind_msg.put_u8(0);
1334
1335        bind_msg.put_slice(statement_id.as_bytes());
1337        bind_msg.put_u8(0);
1338
1339        bind_msg.put_i16(0);
1341
1342        bind_msg.put_i16(params.len() as i16);
1344
1345        for param in params {
1347            if let Value::Null = *param {
1348                bind_msg.put_i32(-1); } else {
1350                let bytes = param.as_bytes()?;
1353                bind_msg.put_i32(bytes.len() as i32);
1354                bind_msg.extend_from_slice(&bytes);
1355            }
1356        }
1357
1358        bind_msg.put_i16(0);
1360
1361        let len = bind_msg.len() - 1;
1363        bind_msg[1..5].copy_from_slice(&(len as i32).to_be_bytes());
1364
1365        let mut execute_msg = BytesMut::new();
1367        execute_msg.put_u8(b'E'); execute_msg.put_i32(9); execute_msg.put_u8(0); execute_msg.put_i32(0); let len = execute_msg.len() - 1;
1374        execute_msg[1..5].copy_from_slice(&(len as i32).to_be_bytes());
1375
1376        let mut sync_msg = BytesMut::new();
1378        sync_msg.put_u8(b'S'); sync_msg.put_i32(4); let mut full_msg = BytesMut::new();
1383        full_msg.extend_from_slice(&bind_msg);
1384        full_msg.extend_from_slice(&execute_msg);
1385        full_msg.extend_from_slice(&sync_msg);
1386
1387        if let Some(ref mut stream) = self.stream {
1388            stream.write_all_async(&full_msg).await?;
1389        } else {
1390            return Err(VerticaError::Connection(
1391                "Connection not established".to_string(),
1392            ));
1393        }
1394
1395        log::debug!(
1396            "[EXECUTE_PREPARED] Sent Bind/Execute/Sync messages for statement: {}",
1397            statement_id
1398        );
1399
1400        let mut rows = Vec::new();
1402        let mut columns = Vec::new();
1403        let mut affected_rows = 0;
1404        let mut command_tag = String::new();
1405        let mut command_complete_received = false;
1406
1407        loop {
1408            let  msg = self.read_message().await?;
1409            log::debug!(
1410                "[EXECUTE_PREPARED] Received message type: {} ({:x})",
1411                msg.message_type as char,
1412                msg.message_type
1413            );
1414
1415            let mut msg_data = AsyncMessage {
1416                message_type: msg.message_type,
1417                data: msg.data.clone(),
1418            };
1419
1420            match msg.message_type {
1421                b't' => {
1422                    log::debug!("[EXECUTE_PREPARED] Unexpected ParameterDescription - skipping");
1424                }
1425                b'T' => {
1426                    let field_count = msg_data.read_i16()?;
1428                    log::debug!("[EXECUTE_PREPARED] RowDescription: {} columns", field_count);
1429
1430                    columns.clear();
1431                    for i in 0..field_count {
1432                        let name = msg_data.read_string()?;
1433                       let type_oid = msg_data.read_u32()?;
1434                        let column = ColumnInfo {
1435                            name: name.clone(),
1436                            data_type: type_oid,
1437                            nullable: true,
1438                            precision: None,
1439                            scale: None,
1440                        };
1441                        columns.push(column);
1442
1443                        log::debug!(
1444                            "[EXECUTE_PREPARED]   Column {}: name={}, type_oid={}",
1445                            i,
1446                            &name,
1447                            type_oid
1448                        );
1449                    }
1450                }
1451                b'D' => {
1452                    let col_count = msg_data.read_i16()?;
1454                    let mut values = Vec::with_capacity(col_count as usize);
1455
1456                    for _i in 0..col_count {
1457                        let len = msg_data.read_i32()?;
1458                        if len == -1 {
1459                            values.push(Value::Null);
1461                        } else if len >= 0 {
1462                            let data = msg_data.read_bytes(len as usize)?;
1463                            let value = Value::from_bytes(data);
1464                            values.push(value);
1465                        } else {
1466                            return Err(VerticaError::Protocol(
1467                                "Invalid data length in DataRow".to_string(),
1468                            ));
1469                        }
1470                    }
1471
1472                    let row = Row { data: values };
1473                    rows.push(row);
1474                }
1475                b'C' => {
1476                    command_tag = msg_data.read_string()?;
1478                    log::debug!("[EXECUTE_PREPARED] CommandComplete: {}", command_tag);
1479
1480                    if let Some(pos) = command_tag.rfind(' ') {
1482                        if let Ok(rows) = command_tag[pos + 1..].parse::<u64>() {
1483                            affected_rows = rows;
1484                        }
1485                    }
1486                    command_complete_received = true;
1487                }
1488                b'E' => {
1489                    let error_response = ErrorResponse::parse_from_message(&msg.data)?;
1491                    log::error!("[EXECUTE_PREPARED] ErrorResponse: {}", error_response);
1492                    return Err(error_response.to_error());
1493                }
1494                b'Z' => {
1495                    let status = msg_data.read_u8()?;
1497                    log::debug!(
1498                        "[EXECUTE_PREPARED] ReadyForQuery: status={}",
1499                        status as char
1500                    );
1501                    break;
1502                }
1503                b'S' => {
1504                    let name = msg_data.read_string()?;
1506                    let value = msg_data.read_string()?;
1507                    self.parameters.insert(name.clone(), value.clone());
1508                    log::debug!("[EXECUTE_PREPARED] ParameterStatus: {} = {}", name, value);
1509                }
1510                b'N' => {
1511                    let notice = msg_data.read_string()?;
1513                    log::info!("[EXECUTE_PREPARED] Notice: {}", notice);
1514                }
1515                b'n' => {
1516                    log::debug!("[EXECUTE_PREPARED] NoData received");
1518                }
1519                b'1' => {
1520                    log::debug!("[ASYNC_READ] Processing parse complete message");
1521                    continue;
1523                }
1524                b'2' => {
1525                    log::debug!("[ASYNC_READ] Processing bind complete message");
1526                    continue;
1528                }
1529                b's' => {
1530                    log::debug!("[ASYNC_READ] Processing portal suspended message");
1531                    break;
1533                }
1534                _ => {
1535                    log::warn!(
1536                        "[ASYNC_READ] Unknown message type: {}",
1537                        msg.message_type as char
1538                    );
1539                    continue;
1540                }
1541            }
1542        }
1543
1544        log::debug!(
1545            "[EXECUTE_PREPARED] Execution completed. Rows: {}, Columns: {}, Affected: {}",
1546            rows.len(),
1547            columns.len(),
1548            affected_rows
1549        );
1550
1551        let result = if columns.is_empty() && command_complete_received {
1553            QueryResult::from_command(affected_rows as u64, Some(command_tag), "".to_owned())
1555        } else if !columns.is_empty() {
1556            QueryResult::from_rows(rows.into_iter().map(|r| r.data).collect(), columns)
1558        } else {
1559            QueryResult::empty()
1561        };
1562
1563        Ok(result)
1564    }
1565
1566    pub async fn close(&mut self) -> Result<()> {
1568        if let Some(mut stream) = self.stream.take() {
1569            stream.write_all_async(&[]).await?;
1570        }
1571        self.state = ConnectionState::Disconnected;
1572        Ok(())
1573    }
1574}
1575
1576pub struct AsyncMessage {
1578    pub message_type: u8,
1580    pub data: BytesMut,
1582}
1583impl AsyncMessage {
1585    pub fn read_u8(&mut self) -> Result<u8> {
1593        if self.data.is_empty() {
1594            return Err(VerticaError::Connection(
1595                "read_u8:Unexpected end of message".to_string(),
1596            ));
1597        }
1598        Ok(self.data.get_u8())
1599    }
1600
1601    pub fn read_i16(&mut self) -> Result<i16> {
1609        if self.data.len() < 2 {
1610            return Err(VerticaError::Connection(
1611                "read_i16:Unexpected end of message".to_string(),
1612            ));
1613        }
1614        Ok(self.data.get_i16())
1615    }
1616
1617    pub fn read_i32(&mut self) -> Result<i32> {
1626        if self.data.len() < 4 {
1627            return Err(VerticaError::Connection(
1628                "read_i32:Unexpected end of message".to_string(),
1629            ));
1630        }
1631        Ok(self.data.get_i32())
1632    }
1633
1634    pub fn read_u32(&mut self) -> Result<u32> {
1642        if self.data.len() < 4 {
1643            return Err(VerticaError::Connection(
1644                "read_u32:Unexpected end of message".to_string(),
1645            ));
1646        }
1647        Ok(self.data.get_u32())
1648    }
1649
1650    pub fn read_string(&mut self) -> Result<String> {
1659        if self.data.is_empty() {
1660            return Err(VerticaError::Connection(
1661                "Empty message when reading string".to_string(),
1662            ));
1663        }
1664
1665        let null_pos = self.data.iter().position(|&b| b == 0).ok_or_else(|| {
1666            VerticaError::Connection("Invalid string format: no null terminator found".to_string())
1667        })?;
1668
1669        if null_pos >= self.data.len() {
1670            return Err(VerticaError::Connection(
1671                "Invalid string format: null terminator beyond buffer".to_string(),
1672            ));
1673        }
1674
1675        let string_data = self.data.split_to(null_pos);
1676
1677        if !self.data.is_empty() {
1679            self.data.advance(1);
1680        }
1681
1682        if string_data.is_empty() {
1684            return Ok(String::new());
1685        }
1686
1687        let string = String::from_utf8_lossy(&string_data[..]).to_string();
1689
1690        Ok(string)
1691    }
1692
1693    pub fn read_tagged_string(&mut self) -> Result<(u8, String)> {
1703        if self.data.is_empty() {
1704            return Err(VerticaError::Connection(
1705                "Empty message when reading tagged string".to_string(),
1706            ));
1707        }
1708
1709        let field_type = self.data.get_u8();
1710
1711        if field_type == 0 {
1713            return Ok((0, String::new()));
1714        }
1715
1716        let value = self.read_string()?;
1717        Ok((field_type, value))
1718    }
1719
1720    pub fn read_bytes(&mut self, len: usize) -> Result<Vec<u8>> {
1731        if self.data.len() < len {
1732            return Err(VerticaError::Connection(
1733                "read_bytes:Unexpected end of message".to_string(),
1734            ));
1735        }
1736        Ok(self.data.split_to(len).to_vec())
1737    }
1738
1739    pub fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> {
1750        if self.data.len() < buf.len() {
1751            return Err(VerticaError::Connection(
1752                "read_exact:Unexpected end of message".to_string(),
1753            ));
1754        }
1755        buf.copy_from_slice(&self.data.split_to(buf.len()));
1756        Ok(())
1757    }
1758}
1759
1760
1761
1762
1763
1764
1765