vertica_rs/conn/
async_conn.rs

1use bytes::{Buf, BufMut, BytesMut};
2use md5::{Digest, Md5};
3use std::collections::HashMap;
4use tokio::io::{AsyncReadExt, AsyncWriteExt};
5use tokio::net::TcpStream;
6use rust_decimal::Decimal;
7use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
8
9use crate::conn::{ConnectionConfig, ConnectionState};
10use crate::error::error_response::ErrorResponse;
11use crate::error::{Result, VerticaError};
12use crate::query::{ColumnInfo, QueryResult};
13use crate::types::vertica_types;
14use crate::types::Value;
15use crate::Row;
16/// Async version of ConnectionStream
17#[derive(Debug)]
18pub enum AsyncConnectionStream {
19    /// 表示使用普通的 TCP 连接流
20    Plain(TcpStream),
21    #[cfg(feature = "tls")]
22    /// 表示使用 SSL/TLS 加密的 TCP 连接流
23    Ssl(tokio_native_tls::TlsStream<TcpStream>),
24}
25
26impl AsyncConnectionStream {
27    /// 异步读取数据到指定缓冲区。
28    ///
29    /// 此方法会尝试从连接流中异步读取数据到提供的缓冲区 `buf` 中。
30    /// 返回成功读取的字节数。
31    ///
32    /// # 参数
33    /// - `buf`: 用于存储读取数据的缓冲区。
34    ///
35    /// # 返回值
36    /// 返回一个 `Result`,其中包含成功读取的字节数或错误信息。
37    pub async fn read_async(&mut self, buf: &mut [u8]) -> Result<usize> {
38        match self {
39            AsyncConnectionStream::Plain(stream) => stream
40                .read(buf)
41                .await
42                .map_err(|e| VerticaError::Connection(format!("Read error: {}", e))),
43            #[cfg(feature = "tls")]
44            AsyncConnectionStream::Ssl(stream) => stream
45                .read(buf)
46                .await
47                .map_err(|e| VerticaError::Connection(format!("SSL read error: {}", e))),
48        }
49    }
50
51    /// 异步将整个缓冲区的数据写入连接流。
52    ///
53    /// 此方法会尝试将提供的缓冲区 `buf` 中的所有数据异步写入连接流。
54    /// 如果写入成功,返回 `Ok(())`;否则返回包含错误信息的 `Err`。
55    ///
56    /// # 参数
57    /// - `buf`: 要写入连接流的数据缓冲区。
58    ///
59    /// # 返回值
60    /// 返回一个 `Result`,表示写入操作是否成功。
61    pub async fn write_all_async(&mut self, buf: &[u8]) -> Result<()> {
62        match self {
63            AsyncConnectionStream::Plain(stream) => stream
64                .write_all(buf)
65                .await
66                .map_err(|e| VerticaError::Connection(format!("Write error: {}", e))),
67            #[cfg(feature = "tls")]
68            AsyncConnectionStream::Ssl(stream) => stream
69                .write_all(buf)
70                .await
71                .map_err(|e| VerticaError::Connection(format!("SSL write error: {}", e))),
72        }
73    }
74}
75
76/// Async version of Connection following vertica-sql-go patterns
77#[derive(Debug)]
78pub struct AsyncConnection {
79    stream: Option<AsyncConnectionStream>,
80    config: ConnectionConfig,
81    state: ConnectionState,
82    parameters: HashMap<String, String>,
83    backend_pid: Option<u32>,
84    secret_key: Option<u32>,
85}
86
87impl AsyncConnection {
88    /// Create a new async connection
89    pub async fn new(config: ConnectionConfig) -> Result<Self> {
90        let mut conn = Self {
91            stream: None,
92            config,
93            state: ConnectionState::Disconnected,
94            parameters: HashMap::new(),
95            backend_pid: None,
96            secret_key: None,
97        };
98        conn.connect().await?;
99        Ok(conn)
100    }
101
102    /// Create a new async connection from URL
103    pub async fn new_from_url(url: &str) -> Result<Self> {
104        let config = ConnectionConfig::from_url(url)?;
105        Self::new(config).await
106    }
107
108    /// Async connect to database - following vertica-sql-go's establishSocketConnection pattern
109    async fn connect(&mut self) -> Result<()> {
110        log::debug!(
111            "[ASYNC_CONNECT] Starting connection process to {}:{} as {}",
112            self.config.host,
113            self.config.port,
114            self.config.username
115        );
116        self.state = ConnectionState::Connecting;
117
118        // Build host list including backup servers
119        let mut hosts = vec![format!("{}:{}", self.config.host, self.config.port)];
120        if let Some(ref backup_servers) = self.config.backup_servers {
121            hosts.extend(backup_servers.clone());
122        }
123
124        let mut _last_error = None;
125
126        for host_port in &hosts {
127            log::debug!("[ASYNC_CONNECT] Trying host: {}", host_port);
128
129            match self.connect_to_host(host_port).await {
130                Ok(stream) => {
131                    log::debug!("[ASYNC_CONNECT] Successfully connected to {}", host_port);
132                    self.stream = Some(stream);
133                    self.state = ConnectionState::Handshake;
134
135                    return self.handshake().await.map_err(|e| {
136                        log::debug!("[ASYNC_CONNECT] Handshake failed: {}", e);
137                        e
138                    });
139                }
140                Err(e) => {
141                    log::debug!("[ASYNC_CONNECT] Failed to connect to {}: {}", host_port, e);
142                    _last_error = Some(e);
143                    continue;
144                }
145            }
146        }
147
148        let error_msg = format!(
149            "Failed to connect to any host. Last error: {}",
150            _last_error.unwrap_or_else(|| VerticaError::Connection("Unknown error".to_string()))
151        );
152        log::debug!("[ASYNC_CONNECT] {}", error_msg);
153        Err(VerticaError::Connection(error_msg))
154    }
155
156    /// Async connect to specific host
157    async fn connect_to_host(&self, host_port: &str) -> Result<AsyncConnectionStream> {
158        let parts: Vec<&str> = host_port.split(':').collect();
159        if parts.len() != 2 {
160            return Err(VerticaError::Connection(format!(
161                "Invalid host:port format: {}",
162                host_port
163            )));
164        }
165
166        let host = parts[0];
167        let port: u16 = parts[1]
168            .parse()
169            .map_err(|e| VerticaError::Connection(format!("Invalid port number: {}", e)))?;
170
171        // Resolve hostname to IP addresses
172        let addrs = tokio::net::lookup_host(format!("{}:{}", host, port))
173            .await
174            .map_err(|e| {
175                VerticaError::Connection(format!("Failed to resolve host {}: {}", host, e))
176            })?;
177
178        let mut _last_error = None;
179
180        for addr in addrs {
181            log::debug!("[ASYNC_CONNECT] Trying IP address: {}", addr);
182
183            match tokio::time::timeout(self.config.connect_timeout, TcpStream::connect(addr)).await
184            {
185                Ok(Ok(stream)) => {
186                    // Set socket options like vertica-sql-go
187                    stream.set_nodelay(true).map_err(|e| {
188                        VerticaError::Connection(format!("Failed to set TCP_NODELAY: {}", e))
189                    })?;
190
191                    return Ok(AsyncConnectionStream::Plain(stream));
192                }
193                Ok(Err(e)) => {
194                    log::debug!("[ASYNC_CONNECT] Failed to connect to {}: {}", addr, e);
195                    _last_error = Some(e);
196                    continue;
197                }
198                Err(_) => {
199                    log::debug!("[ASYNC_CONNECT] Connection timeout to {}", addr);
200                    _last_error = Some(std::io::Error::new(
201                        std::io::ErrorKind::TimedOut,
202                        "Connection timeout",
203                    ));
204                    continue;
205                }
206            }
207        }
208
209        Err(VerticaError::Connection(format!(
210            "Failed to connect to {}",
211            host_port
212        )))
213    }
214
215    /// Async handshake
216    async fn handshake(&mut self) -> Result<()> {
217        log::debug!("[ASYNC_HANDSHAKE] Starting handshake process");
218        let mut buf = BytesMut::with_capacity(1024);
219
220        let username = self.config.username.clone();
221        let database = self.config.database.clone();
222
223        buf.put_i32(0); // Length placeholder
224        buf.put_i32(196608); // Protocol version 3.0
225
226        self.write_string_async(&mut buf, "user").await?;
227        self.write_string_async(&mut buf, &username).await?;
228
229        self.write_string_async(&mut buf, "database").await?;
230        self.write_string_async(&mut buf, &database).await?;
231
232        self.write_string_async(&mut buf, "client_encoding").await?;
233        self.write_string_async(&mut buf, "UTF8").await?;
234
235        self.write_string_async(&mut buf, "").await?; // Terminator
236
237        let len = buf.len() as i32;
238        buf[0..4].copy_from_slice(&len.to_be_bytes());
239
240        log::debug!("[ASYNC_HANDSHAKE] Sending startup message ({} bytes)", len);
241        self.write_all_async(&buf).await.map_err(|e| {
242            log::debug!("[ASYNC_HANDSHAKE] Failed to send startup message: {}", e);
243            e
244        })?;
245
246        log::debug!("[ASYNC_HANDSHAKE] Startup message sent, starting authentication");
247        self.authenticate().await?;
248        log::debug!("[ASYNC_HANDSHAKE] Handshake completed successfully");
249        Ok(())
250    }
251
252    /// Async authentication
253    async fn authenticate(&mut self) -> Result<()> {
254        log::debug!("[ASYNC_AUTH] Starting authentication process");
255        self.state = ConnectionState::Authenticating;
256
257        let mut message_count = 0;
258        loop {
259            message_count += 1;
260            log::debug!("[ASYNC_AUTH] Waiting for message #{}", message_count);
261
262            let mut msg = self.read_message().await.map_err(|e| {
263                log::debug!(
264                    "[ASYNC_AUTH] Failed to read message #{}: {}",
265                    message_count,
266                    e
267                );
268                e
269            })?;
270
271            log::debug!(
272                "[ASYNC_AUTH] Received message type: {}",
273                msg.message_type as char
274            );
275
276            match msg.message_type {
277                b'R' => {
278                    // Authentication request
279                    let auth_type = msg.read_i32().map_err(|e| {
280                        log::debug!("[ASYNC_AUTH] Failed to read auth type: {}", e);
281                        e
282                    })?;
283                    log::debug!("[ASYNC_AUTH] Authentication request type: {}", auth_type);
284
285                    match auth_type {
286                        0 => {
287                            // Authentication successful
288                            log::debug!("[ASYNC_AUTH] Authentication successful");
289                        }
290                        3 => {
291                            // Clear text password
292                            log::debug!("[ASYNC_AUTH] Clear text password required");
293                            self.send_password_response().await?;
294                        }
295                        5 => {
296                            // MD5 password
297                            log::debug!("[ASYNC_AUTH] MD5 password required");
298                            let salt = msg.read_bytes(4)?;
299                            log::debug!("[ASYNC_AUTH] Salt: {:02x?}", salt);
300                            self.send_md5_password_response(&salt).await?;
301                        }
302                        _ => {
303                            log::error!(
304                                "[ASYNC_AUTH] Unsupported authentication type: {}",
305                                auth_type
306                            );
307                            return Err(VerticaError::Authentication(format!(
308                                "Unsupported authentication type: {}",
309                                auth_type
310                            )));
311                        }
312                    }
313                }
314                b'K' => {
315                    let pid = msg.read_u32()?;
316                    let key = msg.read_u32()?;
317                    self.backend_pid = Some(pid);
318                    self.secret_key = Some(key);
319                    log::debug!("[ASYNC_AUTH] Backend PID: {}, Secret key: {}", pid, key);
320                }
321                b'S' => {
322                    log::debug!(
323                        "[ASYNC_AUTH] Processing parameter message, data length: {}",
324                        msg.data.len()
325                    );
326
327                    if msg.data.is_empty() {
328                        log::warn!("[ASYNC_AUTH] Empty parameter message, skipping");
329                        continue;
330                    }
331
332                    let name = match msg.read_string() {
333                        Ok(name) => name,
334                        Err(e) => {
335                            log::warn!("[ASYNC_AUTH] Failed to read parameter name: {}", e);
336                            continue;
337                        }
338                    };
339
340                    let value = match msg.read_string() {
341                        Ok(value) => value,
342                        Err(e) => {
343                            log::warn!(
344                                "[ASYNC_AUTH] Failed to read parameter value for '{}': {}",
345                                name,
346                                e
347                            );
348                            continue;
349                        }
350                    };
351
352                    self.parameters.insert(name.clone(), value.clone());
353                    log::debug!("[ASYNC_AUTH] Parameter: {} = {}", name, value);
354                }
355                b'Z' => {
356                    self.state = ConnectionState::Ready;
357                    break;
358                }
359                _ => {
360                    return Err(VerticaError::Connection(format!(
361                        "Unexpected message type: {}",
362                        msg.message_type as char
363                    )));
364                }
365            }
366        }
367
368        Ok(())
369    }
370
371    /// Async send password response
372    async fn send_password_response(&mut self) -> Result<()> {
373        log::debug!("[ASYNC_AUTH] Sending clear text password");
374        let password = self
375            .config
376            .password
377            .clone()
378            .ok_or_else(|| VerticaError::Authentication("No password provided".to_string()))?;
379
380        let mut buf = BytesMut::with_capacity(1024);
381        buf.put_u8(b'p');
382        buf.put_i32(0); // Length placeholder
383        self.write_string_async(&mut buf, &password).await?;
384
385        let len = buf.len() as i32;
386        buf[1..5].copy_from_slice(&(len - 1).to_be_bytes());
387
388        self.write_all_async(&buf).await
389    }
390
391    /// Async send MD5 password response
392    async fn send_md5_password_response(&mut self, salt: &[u8]) -> Result<()> {
393        log::debug!("[ASYNC_AUTH] Sending MD5 password response");
394
395        let user = self.config.username.clone();
396        let password = self
397            .config
398            .password
399            .clone()
400            .ok_or_else(|| VerticaError::Authentication("No password provided".to_string()))?;
401
402        // Calculate MD5 hash: md5(md5(password + user) + salt)
403        let mut hasher = Md5::new();
404        hasher.update(password.as_bytes());
405        hasher.update(user.as_bytes());
406        let hash1 = hasher.finalize();
407
408        let mut hasher = Md5::new();
409        hasher.update(&hash1);
410        hasher.update(salt);
411        let hash2 = hasher.finalize();
412
413        let md5_response = format!("md5{:x}", hash2);
414
415        let mut buf = BytesMut::with_capacity(1024);
416        buf.put_u8(b'p');
417        buf.put_i32(0); // Length placeholder
418        self.write_string_async(&mut buf, &md5_response).await?;
419
420        let len = buf.len() as i32;
421        buf[1..5].copy_from_slice(&(len - 1).to_be_bytes());
422
423        self.write_all_async(&buf).await
424    }
425
426    /// Async read message
427    async fn read_message(&mut self) -> Result<AsyncMessage> {
428        let mut header = [0u8; 5];
429        self.read_exact_async(&mut header).await?;
430
431        let message_type = header[0];
432        let len = i32::from_be_bytes([header[1], header[2], header[3], header[4]]) - 4;
433
434        log::debug!(
435            "[ASYNC_READ] Message type: {} ({}), length: {}",
436            message_type as char,
437            message_type,
438            len
439        );
440
441        if len < 0 || len > 1_000_000 {
442            return Err(VerticaError::Connection(format!(
443                "Invalid message length: {}",
444                len
445            )));
446        }
447
448        let mut data = vec![0u8; len as usize];
449        if len > 0 {
450            self.read_exact_async(&mut data).await?;
451        }
452
453        log::debug!(
454            "[ASYNC_READ] Successfully read message, data length: {}",
455            data.len()
456        );
457
458        Ok(AsyncMessage {
459            message_type,
460            data: BytesMut::from(&data[..]),
461        })
462    }
463
464    /// Async read exact bytes
465    async fn read_exact_async(&mut self, buf: &mut [u8]) -> Result<()> {
466        let mut read_index = 0;
467        let total_len = buf.len();
468
469        while read_index < total_len {
470            let bytes_read = match &mut self.stream {
471                Some(stream) => stream.read_async(&mut buf[read_index..]).await?,
472                None => return Err(VerticaError::Connection("No active connection".to_string())),
473            };
474
475            if bytes_read == 0 {
476                return Err(VerticaError::Connection(
477                    "Connection closed by server".to_string(),
478                ));
479            }
480
481            read_index += bytes_read;
482            log::debug!(
483                "[ASYNC_READ] Progress: {}/{} bytes ({} bytes this read)",
484                read_index,
485                total_len,
486                bytes_read
487            );
488        }
489        Ok(())
490    }
491
492    /// Async write all
493    async fn write_all_async(&mut self, buf: &[u8]) -> Result<()> {
494        match &mut self.stream {
495            Some(stream) => stream.write_all_async(buf).await,
496            None => Err(VerticaError::Connection("No active connection".to_string())),
497        }
498    }
499
500    /// Async write string
501    async fn write_string_async(&mut self, buf: &mut BytesMut, s: &str) -> Result<()> {
502        buf.extend_from_slice(s.as_bytes());
503        buf.put_u8(0); // null terminator
504        Ok(())
505    }
506
507    /// Async simple query
508    pub async fn simple_query(&mut self, query: &str) -> Result<QueryResult> {
509        if self.state != ConnectionState::Ready {
510            return Err(VerticaError::Connection("Connection not ready".to_string()));
511        }
512
513        let mut buf = BytesMut::with_capacity(1024);
514        buf.put_u8(b'Q');
515        buf.put_i32(0); // Length placeholder
516        self.write_string_async(&mut buf, query).await?;
517
518        let len = buf.len() as i32;
519        buf[1..5].copy_from_slice(&(len - 1).to_be_bytes());
520
521        self.write_all_async(&buf).await?;
522        self.process_query_result().await
523    }
524
525    /// Async process query result
526    async fn process_query_result(&mut self) -> Result<QueryResult> {
527        let mut columns = Vec::new();
528        let mut rows = Vec::new();
529
530        loop {
531            let mut msg = self.read_message().await?;
532
533            match msg.message_type {
534                b'T' => {
535                    log::debug!("[ASYNC_READ] Processing row description message");
536                    // Row description
537                    let field_count = msg.read_i16()?;
538                    for _ in 0..field_count {
539                        let name = msg.read_string()?;
540                        let _table_oid = msg.read_u32()?;
541                        let _column_attr_number = msg.read_i16()?;
542                        let data_type = msg.read_u32()?;
543                        let _type_size = msg.read_i16()?;
544                        let _type_modifier = msg.read_i32()?;
545                        let _format_code = msg.read_i16()?;
546
547                        columns.push(ColumnInfo {
548                            name,
549                            data_type,
550                            nullable: true, // Default value
551                            precision: None,
552                            scale: None,
553                        });
554                    }
555                }
556                b'D' => {
557                    log::debug!("[ASYNC_READ] Processing data row message");
558                    // Data row
559                    let column_count = msg.read_i16()?;
560                    let mut row = Vec::new();
561
562                    for i in 0..column_count {
563                        let len = msg.read_i32()?;
564                        if len == -1 {
565                            row.push(Value::Null);
566                        } else {
567                            let mut data = vec![0u8; len as usize];
568                            msg.read_exact(&mut data)?;
569
570                            // 根据数据类型OID进行类型转换
571                            let data_type = if (i as usize) < columns.len() {
572                                columns[i as usize].data_type
573                            } else {
574                                0 // 默认类型
575                            };
576
577                            let value = Self::parse_binary_value(data_type, &data)?;
578                            row.push(value);
579                        }
580                    }
581                    rows.push(row);
582                }
583                b'C' => {
584                    log::debug!("[ASYNC_READ] Processing command complete message");
585                    // Command complete
586                    let _tag = msg.read_string()?;
587                    break;
588                }
589                b'Z' => {
590                    log::debug!("[ASYNC_READ] Processing ready for query message");
591                    // Ready for query
592                    let _status = msg.read_u8()?;
593                    break;
594                }
595                b'E' => {
596                    log::debug!("[ASYNC_READ] Processing error response message");
597                    // Error response - parse structured error
598                    let error_response = ErrorResponse::parse_from_message(&msg.data)?;
599                    log::error!("[ASYNC_READ] Server error: {}", error_response);
600                    return Err(error_response.to_error());
601                }
602                b'S' => {
603                    log::debug!("[ASYNC_READ] Processing notice response message");
604                    let fields = msg.read_string()?;
605                    log::info!("Notice: {}", fields);
606                    continue;
607                }
608                b'N' => {
609                    log::debug!("[ASYNC_READ] Processing notice response message");
610                    let fields = msg.read_string()?;
611                    log::info!("Notice: {}", fields);
612                    continue;
613                }
614                b't' => {
615                    // ParameterDescription - 参数描述
616                    let param_count = msg.read_i16()?;
617                    log::debug!("[PREPARE] ParameterDescription: {} parameters", param_count);
618
619                    for i in 0..param_count {
620                        let oid = msg.read_i32()?;
621                        log::debug!("[PREPARE]   Parameter {}: OID {}", i, oid);
622                    }
623                }
624                b'1' => {
625                    log::debug!("[ASYNC_READ] Processing parse complete message");
626                    // Parse complete - just continue
627                    continue;
628                }
629                b'2' => {
630                    log::debug!("[ASYNC_READ] Processing bind complete message");
631                    // Bind complete - just continue
632                    continue;
633                }
634                b's' => {
635                    log::debug!("[ASYNC_READ] Processing portal suspended message");
636                    // Portal suspended - end of results
637                    continue;
638                }
639                _ => {
640                    log::warn!(
641                        "[ASYNC_READ] Unknown message type: {}",
642                        msg.message_type as char
643                    );
644                    continue;
645                }
646            }
647        }
648
649        Ok(QueryResult::from_rows(rows, columns))
650    }
651
652    /// Get connection state
653    pub fn state(&self) -> ConnectionState {
654        self.state
655    }
656
657    /// Check if connection is ready
658    pub fn is_ready(&self) -> bool {
659        self.state == ConnectionState::Ready
660    }
661
662    /// Get server parameters
663    pub fn parameters(&self) -> &HashMap<String, String> {
664        &self.parameters
665    }
666
667    /// Parse binary data based on Vertica type OID
668    fn parse_binary_value(data_type: u32, data: &[u8]) -> Result<Value> {
669        use byteorder::{ByteOrder, LittleEndian, BigEndian};
670        log::debug!(
671            "[PARSE_BINARY_VALUE] data_type: {}, data_len: {}, raw_data: {:?}",
672            data_type,
673            data.len(),
674            data
675        );
676
677        if data.is_empty() {
678            log::debug!("[PARSE_BINARY_VALUE] Empty data, returning empty string");
679            return Ok(Value::String(String::new()));
680        }
681
682        match data_type {
683            vertica_types::BOOLEAN => {
684                if data.len() >= 1 {
685                    let val = Value::Boolean(data[0] != 0);
686                    log::debug!("[PARSE_BOOLEAN] {:?}", val);
687                    Ok(val)
688                } else {
689                    log::debug!("[PARSE_BOOLEAN] Empty data, defaulting to false");
690                    Ok(Value::Boolean(false))
691                }
692            }
693            vertica_types::INTEGER => {
694                if data.len() >= 4 {
695                    let val = Value::Int(LittleEndian::read_i32(data));
696                    log::debug!("[PARSE_INTEGER] {:?}", val);
697                    Ok(val)
698                } else if data.len() == 2 {
699                    let val = Value::Int(LittleEndian::read_i16(data) as i32);
700                    log::debug!("[PARSE_INTEGER] 2-byte value: {:?}", val);
701                    Ok(val)
702                } else if data.len() == 1 {
703                    let val = Value::Int(data[0] as i8 as i32);
704                    log::debug!("[PARSE_INTEGER] 1-byte value: {:?}", val);
705                    Ok(val)
706                } else {
707                    // 尝试解析字符串格式的整数
708                    if let Ok(s) = String::from_utf8(data.to_vec()) {
709                        if let Ok(num) = s.trim().parse::<i32>() {
710                            log::debug!("[PARSE_INTEGER] String format: {:?}", num);
711                            Ok(Value::Int(num))
712                        } else if let Ok(num) = s.trim().parse::<i64>() {
713                            // 如果数字太大,尝试i64后转换为i32(可能会截断)
714                            log::debug!("[PARSE_INTEGER] Large string format: {} -> {}", num, num as i32);
715                            Ok(Value::Int(num as i32))
716                        } else {
717                            log::debug!("[PARSE_INTEGER] Invalid string format, returning string: {}", s);
718                            Ok(Value::String(s))
719                        }
720                    } else {
721                        log::debug!("[PARSE_INTEGER] Unexpected len and invalid UTF-8, returning binary");
722                        Ok(Value::Binary(data.to_vec()))
723                    }
724                }
725            }
726            vertica_types::BIGINT => {
727                if data.len() >= 8 {
728                    let val = Value::BigInt(LittleEndian::read_i64(data));
729                    log::debug!("[PARSE_BIGINT] {:?}", val);
730                    Ok(val)
731                } else if data.len() == 4 {
732                    let val = Value::BigInt(LittleEndian::read_i32(data) as i64);
733                    log::debug!("[PARSE_BIGINT] 4-byte value: {:?}", val);
734                    Ok(val)
735                } else {
736                    // 尝试解析字符串格式的整数
737                    if let Ok(s) = String::from_utf8(data.to_vec()) {
738                        if let Ok(num) = s.trim().parse::<i64>() {
739                            log::debug!("[PARSE_BIGINT] String format: {:?}", num);
740                            Ok(Value::BigInt(num))
741                        } else {
742                            log::debug!("[PARSE_BIGINT] Invalid string format, returning string: {}", s);
743                            Ok(Value::String(s))
744                        }
745                    } else {
746                        log::debug!("[PARSE_BIGINT] Unexpected len and invalid UTF-8, returning binary");
747                        Ok(Value::Binary(data.to_vec()))
748                    }
749                }
750            }
751            vertica_types::FLOAT => {
752                log::debug!(
753                    "[PARSE_FLOAT] data_type: {}, data_len: {}, raw_bytes: {:?}",
754                    data_type,
755                    data.len(),
756                    data
757                );
758                match data.len() {
759                    8 => {
760                        // FLOAT8 (DOUBLE PRECISION) - 8字节大端序
761                        let val = BigEndian::read_f64(data);
762                        log::debug!("[PARSE_FLOAT] FLOAT8: {}", val);
763                        Ok(Value::Double(val))
764                    },
765                    4 => {
766                        // FLOAT4 (REAL) - 4字节大端序
767                        let val = BigEndian::read_f32(data);
768                        log::debug!("[PARSE_FLOAT] FLOAT4: {}", val);
769                        Ok(Value::Float(val))
770                    },
771                    _ => {
772                        // 字符串格式:尝试解析数值字符串
773                        let s = String::from_utf8_lossy(data);
774                        log::debug!("[PARSE_FLOAT] String format: {}", s);
775                        
776                        let s = s.trim();
777                        if let Ok(num) = s.parse::<f64>() {
778                            log::debug!("[PARSE_FLOAT] Parsed as f64: {}", num);
779                            Ok(Value::Double(num))
780                        } else if let Ok(num) = s.parse::<f32>() {
781                            log::debug!("[PARSE_FLOAT] Parsed as f32: {}", num);
782                            Ok(Value::Float(num))
783                        } else {
784                            Ok(Value::String(s.to_string()))
785                        }
786                    }
787                }
788            }
789            vertica_types::CHAR | vertica_types::VARCHAR | vertica_types::LONG_VARCHAR => {
790                let val = String::from_utf8(data.to_vec())
791                    .map(Value::String)
792                    .unwrap_or_else(|_| Value::Binary(data.to_vec()));
793                log::debug!("[PARSE_STRING] {:?}", val);
794                Ok(val)
795            }
796            vertica_types::DATE => {
797                if data.len() == 8 {
798                    // 二进制格式:8字节的小端序整数
799                    let days = LittleEndian::read_i64(data);
800                    log::debug!(
801                        "[PARSE_DATE] Binary format - days since 2000-01-01: {}",
802                        days
803                    );
804                    Value::from_date_days(days as i32)
805                } else {
806                    // 字符串格式:尝试解析日期字符串
807                    if let Ok(s) = String::from_utf8(data.to_vec()) {
808                        log::debug!("[PARSE_DATE] String format: {}", s);
809                        // 尝试多种日期格式
810                        let formats = ["%Y-%m-%d", "%Y-%m-%d %H:%M:%S", "%Y/%m/%d", "%d-%m-%Y"];
811
812                        let mut result = Value::String(s.clone());
813                        for fmt in formats.iter() {
814                            if let Ok(date) = chrono::NaiveDate::parse_from_str(&s.trim(), fmt) {
815                                result = Value::Date(date);
816                                break;
817                            }
818                        }
819
820                        Ok(result)
821                    } else {
822                        log::debug!("[PARSE_DATE] Invalid UTF-8 string, returning binary");
823                        Ok(Value::Binary(data.to_vec()))
824                    }
825                }
826            }
827            vertica_types::TIME => {
828                if data.len() == 8 {
829                    // 二进制格式:8字节的小端序整数
830                    let micros = LittleEndian::read_i64(data);
831                    log::debug!(
832                        "[PARSE_TIME] Binary format - micros since midnight: {}",
833                        micros
834                    );
835                    Value::from_time_micros(micros)
836                } else {
837                    // 字符串格式:尝试解析时间字符串
838                    if let Ok(s) = String::from_utf8(data.to_vec()) {
839                        log::debug!("[PARSE_TIME] String format: {}", s);
840                        // 尝试多种时间格式
841                        let formats = ["%H:%M:%S", "%Y-%m-%d %H:%M:%S", "%H:%M"];
842
843                        let mut result = Value::String(s.clone());
844                        for fmt in formats.iter() {
845                            if let Ok(time) = chrono::NaiveTime::parse_from_str(&s.trim(), fmt) {
846                                result = Value::Time(time);
847                                break;
848                            }
849                        }
850
851                        Ok(result)
852                    } else {
853                        log::debug!("[PARSE_TIME] Invalid UTF-8 string, returning binary");
854                        Ok(Value::Binary(data.to_vec()))
855                    }
856                }
857            }
858            vertica_types::TIMETZ => {
859                if data.len() == 8 {
860                    // 二进制格式:8字节的小端序整数(微秒数)
861                    let micros = LittleEndian::read_i64(data);
862                    log::debug!(
863                        "[PARSE_TIMETZ] Binary format - micros since midnight: {}",
864                        micros
865                    );
866                    Value::from_time_micros(micros)
867                } else {
868                    // 字符串格式:尝试解析时间字符串(可能包含时区)
869                    if let Ok(s) = String::from_utf8(data.to_vec()) {
870                        log::debug!(
871                            "[PARSE_TIMETZ] String format: {}",
872                            s
873                        );
874                        
875                        let s = s.trim();
876                        
877                        // 尝试解析带时区的时间格式,如"14:28:09.956353+08"
878                        let formats = [
879                            "%H:%M:%S%.f",      // 14:28:09.956353
880                            "%H:%M:%S",         // 14:28:09
881                        ];
882
883                        // 提取时间部分(去掉时区偏移)
884                        let time_part = if let Some(pos) = s.find(|c: char| c == '+' || c == '-') {
885                            &s[..pos]
886                        } else {
887                            s
888                        };
889
890                        let mut result = Value::String(s.to_string()); // 默认返回原始字符串
891                        for fmt in formats.iter() {
892                            if let Ok(time) = chrono::NaiveTime::parse_from_str(time_part, fmt) {
893                                result = Value::Time(time);
894                                log::debug!(
895                                    "[PARSE_TIMETZ] Successfully parsed as Time: {:?}",
896                                    time
897                                );
898                                break;
899                            }
900                        }
901
902                        Ok(result)
903                    } else {
904                        log::debug!(
905                            "[PARSE_TIMETZ] Invalid UTF-8 string, returning binary"
906                        );
907                        Ok(Value::Binary(data.to_vec()))
908                    }
909                }
910            }
911            vertica_types::TIMESTAMPTZ => {
912                if data.len() == 8 {
913                    // 二进制格式:8字节的小端序整数
914                    let micros = LittleEndian::read_i64(data);
915                    log::debug!(
916                        "[PARSE_TIMESTAMPTZ] Binary format - micros since 2000-01-01: {}",
917                        micros
918                    );
919                    Value::from_timestamp_micros(micros)
920                } else {
921                    // 字符串格式:尝试解析时间戳字符串
922                    if let Ok(s) = String::from_utf8(data.to_vec()) {
923                        log::debug!(
924                            "[PARSE_TIMESTAMPTZ] String format: {}",
925                            s
926                        );
927                        // 尝试多种时间格式,包括带时区的格式
928                        let formats = [
929                            "%Y-%m-%d %H:%M:%S%.f%#z",
930                            "%Y-%m-%d %H:%M:%S%#z", 
931                            "%Y-%m-%d %H:%M:%S%.f",
932                            "%Y-%m-%d %H:%M:%S",
933                            "%Y-%m-%d %H:%M",
934                            "%Y-%m-%d",
935                        ];
936
937                        let mut result = Value::String(s.clone());
938                        for fmt in formats.iter() {
939                            if let Ok(timestamp) =
940                                chrono::NaiveDateTime::parse_from_str(&s.trim(), fmt)
941                            {
942                                result = Value::Timestamp(timestamp);
943                                break;
944                            }
945                        }
946
947                        // 如果还没解析成功,尝试解析为日期再转换为时间戳
948                        if let Value::String(_) = result {
949                            if let Ok(date) =
950                                chrono::NaiveDate::parse_from_str(&s.trim(), "%Y-%m-%d")
951                            {
952                                result = Value::Timestamp(date.and_hms_opt(0, 0, 0).unwrap());
953                            }
954                        }
955
956                        Ok(result)
957                    } else {
958                        log::debug!(
959                            "[PARSE_TIMESTAMPTZ] Unexpected len and invalid UTF-8, returning binary"
960                        );
961                        Ok(Value::Binary(data.to_vec()))
962                    }
963                }
964            }
965            vertica_types::TIMESTAMP => {
966                if data.len() == 8 {
967                    // 二进制格式:8字节的小端序整数
968                    let micros = LittleEndian::read_i64(data);
969                    log::debug!(
970                        "[PARSE_TIMESTAMP] Binary format - micros since 2000-01-01: {}",
971                        micros
972                    );
973                    Value::from_timestamp_micros(micros)
974                } else if data.len() == 19 {
975                    // 字符串格式:19字节的日期时间字符串 "YYYY-MM-DD HH:MM:SS"
976                    if let Ok(s) = String::from_utf8(data.to_vec()) {
977                        log::debug!("[PARSE_TIMESTAMP] String format: {}", s);
978                        Value::parse_timestamp(&s)
979                    } else {
980                        log::debug!("[PARSE_TIMESTAMP] Invalid UTF-8 string");
981                        Ok(Value::Binary(data.to_vec()))
982                    }
983                } else {
984                    // 其他长度,尝试作为字符串处理
985                    if let Ok(s) = String::from_utf8(data.to_vec()) {
986                        log::debug!(
987                            "[PARSE_TIMESTAMP] Variable string format: {}",
988                            s
989                        );
990                        // 尝试多种时间格式
991                        let formats = [
992                            "%Y-%m-%d %H:%M:%S",
993                            "%Y-%m-%d %H:%M:%S%.f",
994                            "%Y-%m-%d %H:%M",
995                            "%Y-%m-%d",
996                        ];
997
998                        let mut result = Value::String(s.clone());
999                        for fmt in formats.iter() {
1000                            if let Ok(timestamp) =
1001                                chrono::NaiveDateTime::parse_from_str(&s.trim(), fmt)
1002                            {
1003                                result = Value::Timestamp(timestamp);
1004                                break;
1005                            }
1006                        }
1007
1008                        // 如果还没解析成功,尝试解析为日期再转换为时间戳
1009                        if let Value::String(_) = result {
1010                            if let Ok(date) =
1011                                chrono::NaiveDate::parse_from_str(&s.trim(), "%Y-%m-%d")
1012                            {
1013                                result = Value::Timestamp(date.and_hms_opt(0, 0, 0).unwrap());
1014                            }
1015                        }
1016
1017                        Ok(result)
1018                    } else {
1019                        log::debug!(
1020                            "[PARSE_TIMESTAMP] Unexpected len and invalid UTF-8, returning binary"
1021                        );
1022                        Ok(Value::Binary(data.to_vec()))
1023                    }
1024                }
1025            }
1026            vertica_types::NUMERIC => {
1027                log::debug!(
1028                    "[PARSE_NUMERIC] type_oid: {}, data_len: {}, raw_bytes: {:?}",
1029                    data_type,
1030                    data.len(),
1031                    data
1032                );
1033                match data.len() {
1034                    8 => {
1035                        // NUMERIC as 8-byte float - 尝试解析为Decimal
1036                        let val = BigEndian::read_f64(data);
1037                        log::debug!("[PARSE_NUMERIC] NUMERIC8: {}", val);
1038                        if let Some(decimal) = Decimal::from_f64(val) {
1039                            Ok(Value::Decimal(decimal))
1040                        } else {
1041                            Ok(Value::Double(val))
1042                        }
1043                    },
1044                    4 => {
1045                        // NUMERIC as 4-byte float - 尝试解析为Decimal
1046                        let val = BigEndian::read_f32(data);
1047                        log::debug!("[PARSE_NUMERIC] NUMERIC4: {}", val);
1048                        if let Some(decimal) = Decimal::from_f32(val) {
1049                            Ok(Value::Decimal(decimal))
1050                        } else {
1051                            Ok(Value::Float(val))
1052                        }
1053                    },
1054                    16 => {
1055                        // NUMERIC as decimal128 - 尝试解析为Decimal
1056                        let s = String::from_utf8_lossy(data);
1057                        log::debug!("[PARSE_NUMERIC] NUMERIC16 string: {}", s);
1058                        if let Ok(decimal) = s.trim().parse::<Decimal>() {
1059                            Ok(Value::Decimal(decimal))
1060                        } else if let Ok(num) = s.trim().parse::<f64>() {
1061                            Ok(Value::Double(num))
1062                        } else {
1063                            Ok(Value::String(s.to_string()))
1064                        }
1065                    },
1066                    _ => {
1067                        // 字符串格式:优先尝试解析为Decimal
1068                        let s = String::from_utf8_lossy(data);
1069                        log::debug!("[PARSE_NUMERIC] String format: {}", s);
1070                        
1071                        let s = s.trim();
1072                        if let Ok(decimal) = s.parse::<Decimal>() {
1073                            log::debug!("[PARSE_NUMERIC] Parsed as Decimal: {}", decimal);
1074                            Ok(Value::Decimal(decimal))
1075                        } else if let Ok(num) = s.parse::<f64>() {
1076                            log::debug!("[PARSE_NUMERIC] Parsed as f64: {}", num);
1077                            Ok(Value::Double(num))
1078                        } else if let Ok(num) = s.parse::<f32>() {
1079                            log::debug!("[PARSE_NUMERIC] Parsed as f32: {}", num);
1080                            Ok(Value::Float(num))
1081                        } else if let Ok(num) = s.parse::<i64>() {
1082                            log::debug!("[PARSE_NUMERIC] Parsed as i64: {}", num);
1083                            Ok(Value::BigInt(num))
1084                        } else if let Ok(num) = s.parse::<i32>() {
1085                            log::debug!("[PARSE_NUMERIC] Parsed as i32: {}", num);
1086                            Ok(Value::Int(num))
1087                        } else {
1088                            Ok(Value::String(s.to_string()))
1089                        }
1090                    }
1091                }
1092            }
1093            vertica_types::UUID => {
1094                if data.len() >= 16 {
1095                    let val = Value::from_uuid_bytes(data.to_vec());
1096                    log::debug!("[PARSE_UUID] {:?}", val);
1097                    val
1098                } else {
1099                    log::debug!("[PARSE_UUID] Unexpected len, returning binary");
1100                    Ok(Value::Binary(data.to_vec()))
1101                }
1102            }
1103            vertica_types::VARBINARY | vertica_types::LONG_VARBINARY | vertica_types::BINARY => {
1104                let val = Value::Binary(data.to_vec());
1105                log::debug!("[PARSE_BINARY] {} bytes", data.len());
1106                Ok(val)
1107            }
1108            _ => {
1109                if let Ok(s) = String::from_utf8(data.to_vec()) {
1110                    log::debug!("[PARSE_UNKNOWN] UTF-8 string: {}", s);
1111                    Ok(Value::String(s))
1112                } else {
1113                    log::debug!("[PARSE_UNKNOWN] Binary: {} bytes", data.len());
1114                    Ok(Value::Binary(data.to_vec()))
1115                }
1116            }
1117        }
1118    }
1119
1120    /// Prepare a SQL statement - 修复实现,更健壮地处理非标准响应
1121    pub async fn prepare(&mut self, stmt_id: &str, query: &str) -> Result<()> {
1122        if self.state != ConnectionState::Ready {
1123            return Err(VerticaError::Connection("Connection not ready".to_string()));
1124        }
1125
1126        log::debug!("[PREPARE] Starting prepare for statement: {}", stmt_id);
1127        log::debug!("[PREPARE] SQL: {}", query);
1128
1129        // 1. 构建并发送Parse消息
1130        let mut parse_buf = BytesMut::with_capacity(1024);
1131        parse_buf.put_u8(b'P'); // Parse消息类型
1132        parse_buf.put_i32(0); // 长度占位符
1133
1134        // 写入语句名
1135        self.write_string_async(&mut parse_buf, stmt_id).await?;
1136        // 写入查询字符串
1137        self.write_string_async(&mut parse_buf, query).await?;
1138        // 写入参数格式代码数量 (0个)
1139        parse_buf.put_i16(0);
1140
1141        let len = parse_buf.len() as i32;
1142        parse_buf[1..5].copy_from_slice(&(len - 1).to_be_bytes());
1143
1144        log::debug!("[PREPARE] Sending Parse message ({} bytes)", len);
1145        self.write_all_async(&parse_buf).await?;
1146
1147        // 2. 构建并发送Describe消息 - 获取语句元数据
1148        let mut describe_buf = BytesMut::with_capacity(64);
1149        describe_buf.put_u8(b'D'); // Describe消息类型
1150        describe_buf.put_i32(0); // 长度占位符
1151        describe_buf.put_u8(b'S'); // 描述语句 (S = statement)
1152        self.write_string_async(&mut describe_buf, stmt_id).await?;
1153
1154        let describe_len = describe_buf.len() as i32;
1155        describe_buf[1..5].copy_from_slice(&(describe_len - 1).to_be_bytes());
1156
1157        log::debug!(
1158            "[PREPARE] Sending Describe message ({} bytes)",
1159            describe_len
1160        );
1161        self.write_all_async(&describe_buf).await?;
1162
1163        // 3. 构建并发送Sync消息 - 同步点
1164        let mut sync_buf = BytesMut::with_capacity(16);
1165        sync_buf.put_u8(b'S'); // Sync消息类型
1166        sync_buf.put_i32(4); // 固定长度4字节
1167
1168        log::debug!("[PREPARE] Sending Sync message");
1169        self.write_all_async(&sync_buf).await?;
1170
1171        // 4. 处理服务器响应 - 更健壮的实现
1172        let mut parse_complete_received = false;
1173        let  ready_for_query_received :bool;
1174
1175
1176        loop {
1177            let mut msg = self.read_message().await?;
1178
1179            match msg.message_type {
1180                b'1' => {
1181                    // ParseComplete - Parse完成确认
1182                    log::debug!("[PREPARE] ParseComplete received");
1183                    parse_complete_received = true;
1184                }
1185
1186                b't' => {
1187                    // ParameterDescription - 参数描述
1188                    let param_count = msg.read_i16()?;
1189                    log::debug!("[PREPARE] ParameterDescription: {} parameters", param_count);
1190
1191                    for i in 0..param_count {
1192                        let oid = msg.read_i32()?;
1193                        log::debug!("[PREPARE]   Parameter {}: OID {}", i, oid);
1194                    }
1195                }
1196
1197                b'T' => {
1198                    // RowDescription - 结果集描述
1199                    let field_count = msg.read_i16()?;
1200                    log::debug!("[PREPARE] RowDescription: {} columns", field_count);
1201
1202                    let mut columns: Vec<ColumnInfo> = Vec::new();
1203                    for i in 0..field_count {
1204                        let name = msg.read_string()?;
1205                       let type_oid = msg.read_u32()?;
1206                        let column = ColumnInfo {
1207                            name: name.clone(),
1208                            data_type: type_oid,
1209                            nullable: true,
1210                            precision: None,
1211                            scale: None,
1212                        };
1213                        columns.push(column);
1214
1215                        log::debug!(
1216                            "[PREPARE]   Column {}: name={}, type_oid={}",
1217                            i,
1218                            name,
1219                            type_oid
1220                        );
1221                    }
1222                }
1223
1224                b'n' => {
1225                    // NoData - 无结果集 (如INSERT语句)
1226                    log::debug!("[PREPARE] NoData received (no result set)");
1227                }
1228
1229                b'E' => {
1230                    // ErrorResponse - 错误响应
1231                    let error_response = ErrorResponse::parse_from_message(&msg.data)?;
1232                    log::error!("[PREPARE] ErrorResponse: {}", error_response);
1233                    return Err(error_response.to_error());
1234                }
1235
1236                b'S' => {
1237                    // ParameterStatus - 服务器参数更新
1238                    let name = msg.read_string()?;
1239                    let value = msg.read_string()?;
1240                    self.parameters.insert(name.clone(), value.clone());
1241                    log::debug!("[PREPARE] ParameterStatus: {} = {}", name, value);
1242                }
1243
1244                b'Z' => {
1245                    // ReadyForQuery - 事务状态
1246                    let status = msg.read_u8()?;
1247                    log::debug!("[PREPARE] ReadyForQuery: status={}", status as char);
1248                    ready_for_query_received = true;
1249
1250                    // 收到ReadyForQuery表示消息序列结束
1251                    break;
1252                }
1253
1254                b'C' => {
1255                    // CommandComplete - 命令完成
1256                    let tag = msg.read_string()?;
1257                    log::debug!("[PREPARE] CommandComplete: {}", tag);
1258                }
1259
1260                b'D' => {
1261                    // DataRow - 数据行 (在prepare阶段不应该出现,但跳过它)
1262                    let col_count = msg.read_i16()?;
1263                    log::warn!(
1264                        "[PREPARE] Unexpected DataRow: {} columns - skipping",
1265                        col_count
1266                    );
1267
1268                    // 跳过数据内容
1269                    for _ in 0..col_count {
1270                        let len = msg.read_i32()?;
1271                        if len > 0 {
1272                            let _data = msg.read_bytes(len as usize)?;
1273                        }
1274                    }
1275                }
1276
1277                b'N' => {
1278                    // NoticeResponse - 通知消息
1279                    let notice = msg.read_string()?;
1280                    log::info!("[PREPARE] Notice: {}", notice);
1281                }
1282
1283                _ => {
1284                    // 未知消息类型 - 跳过
1285                    log::warn!(
1286                        "[PREPARE] Unknown message type: {} ({}) - skipping",
1287                        msg.message_type as char,
1288                        msg.message_type
1289                    );
1290                    continue;
1291                }
1292            }
1293        }
1294
1295        // 检查是否收到必要消息,但不要求ParseComplete必须出现
1296        if !ready_for_query_received {
1297            log::warn!("[PREPARE] Missing ReadyForQuery, prepare may have failed");
1298            return Err(VerticaError::Connection(
1299                "Incomplete prepare response - missing ReadyForQuery".to_string(),
1300            ));
1301        }
1302
1303        // 如果没有收到ParseComplete但收到了ReadyForQuery,可能是服务器行为差异
1304        if !parse_complete_received {
1305            log::warn!(
1306                "[PREPARE] No ParseComplete received, but continuing as server indicated ready"
1307            );
1308        }
1309
1310        log::debug!("[PREPARE] Prepare process completed");
1311        Ok(())
1312    }
1313
1314    /// Execute prepared statement
1315    pub async fn execute_prepared(
1316        &mut self,
1317        statement_id: &str,
1318        params: &[Value],
1319    ) -> Result<QueryResult> {
1320        log::debug!(
1321            "[EXECUTE_PREPARED] Executing prepared statement: {} with {} parameters",
1322            statement_id,
1323            params.len()
1324        );
1325
1326        // 发送 Bind 消息
1327        let mut bind_msg = BytesMut::new();
1328        bind_msg.put_u8(b'B'); // Bind
1329        bind_msg.put_i32(0); // 占位长度
1330
1331        // Portal name (空字符串表示未命名portal)
1332        bind_msg.put_slice(b"");
1333        bind_msg.put_u8(0);
1334
1335        // Statement name
1336        bind_msg.put_slice(statement_id.as_bytes());
1337        bind_msg.put_u8(0);
1338
1339        // 参数格式代码数量 (0 = 所有参数使用默认格式)
1340        bind_msg.put_i16(0);
1341
1342        // 参数值数量
1343        bind_msg.put_i16(params.len() as i16);
1344
1345        // 参数值
1346        for param in params {
1347            if let Value::Null = *param {
1348                bind_msg.put_i32(-1); // NULL 值
1349            } else {
1350                // 由于 `to_bytes` 方法不存在,推测应该使用 `Value::from_bytes` 的逆操作,假设存在 `to_binary` 方法
1351                // 若实际使用时不存在该方法,需根据 `Value` 类型的具体实现调整
1352                let bytes = param.as_bytes()?;
1353                bind_msg.put_i32(bytes.len() as i32);
1354                bind_msg.extend_from_slice(&bytes);
1355            }
1356        }
1357
1358        // 结果列格式 (0 = 所有列使用默认格式)
1359        bind_msg.put_i16(0);
1360
1361        // 更新长度
1362        let len = bind_msg.len() - 1;
1363        bind_msg[1..5].copy_from_slice(&(len as i32).to_be_bytes());
1364
1365        // 发送 Execute 消息
1366        let mut execute_msg = BytesMut::new();
1367        execute_msg.put_u8(b'E'); // Execute
1368        execute_msg.put_i32(9); // 长度: 4字节(长度字段) + 1字节(空portal名) + 4字节(最大行数)
1369        execute_msg.put_u8(0); // Portal name (空字符串,1字节)
1370        execute_msg.put_i32(0); // 最大行数 (0 = 获取所有行,4字节)
1371
1372        // 更新长度
1373        let len = execute_msg.len() - 1;
1374        execute_msg[1..5].copy_from_slice(&(len as i32).to_be_bytes());
1375
1376        // 发送 Sync 消息
1377        let mut sync_msg = BytesMut::new();
1378        sync_msg.put_u8(b'S'); // Sync
1379        sync_msg.put_i32(4); // Length
1380
1381        // 合并消息并发送
1382        let mut full_msg = BytesMut::new();
1383        full_msg.extend_from_slice(&bind_msg);
1384        full_msg.extend_from_slice(&execute_msg);
1385        full_msg.extend_from_slice(&sync_msg);
1386
1387        if let Some(ref mut stream) = self.stream {
1388            stream.write_all_async(&full_msg).await?;
1389        } else {
1390            return Err(VerticaError::Connection(
1391                "Connection not established".to_string(),
1392            ));
1393        }
1394
1395        log::debug!(
1396            "[EXECUTE_PREPARED] Sent Bind/Execute/Sync messages for statement: {}",
1397            statement_id
1398        );
1399
1400        // 处理响应
1401        let mut rows = Vec::new();
1402        let mut columns = Vec::new();
1403        let mut affected_rows = 0;
1404        let mut command_tag = String::new();
1405        let mut command_complete_received = false;
1406
1407        loop {
1408            let  msg = self.read_message().await?;
1409            log::debug!(
1410                "[EXECUTE_PREPARED] Received message type: {} ({:x})",
1411                msg.message_type as char,
1412                msg.message_type
1413            );
1414
1415            let mut msg_data = AsyncMessage {
1416                message_type: msg.message_type,
1417                data: msg.data.clone(),
1418            };
1419
1420            match msg.message_type {
1421                b't' => {
1422                    // ParameterDescription - 参数描述
1423                    log::debug!("[EXECUTE_PREPARED] Unexpected ParameterDescription - skipping");
1424                }
1425                b'T' => {
1426                    // RowDescription - 结果集描述
1427                    let field_count = msg_data.read_i16()?;
1428                    log::debug!("[EXECUTE_PREPARED] RowDescription: {} columns", field_count);
1429
1430                    columns.clear();
1431                    for i in 0..field_count {
1432                        let name = msg_data.read_string()?;
1433                       let type_oid = msg_data.read_u32()?;
1434                        let column = ColumnInfo {
1435                            name: name.clone(),
1436                            data_type: type_oid,
1437                            nullable: true,
1438                            precision: None,
1439                            scale: None,
1440                        };
1441                        columns.push(column);
1442
1443                        log::debug!(
1444                            "[EXECUTE_PREPARED]   Column {}: name={}, type_oid={}",
1445                            i,
1446                            &name,
1447                            type_oid
1448                        );
1449                    }
1450                }
1451                b'D' => {
1452                    // DataRow - 数据行
1453                    let col_count = msg_data.read_i16()?;
1454                    let mut values = Vec::with_capacity(col_count as usize);
1455
1456                    for _i in 0..col_count {
1457                        let len = msg_data.read_i32()?;
1458                        if len == -1 {
1459                            // NULL 值
1460                            values.push(Value::Null);
1461                        } else if len >= 0 {
1462                            let data = msg_data.read_bytes(len as usize)?;
1463                            let value = Value::from_bytes(data);
1464                            values.push(value);
1465                        } else {
1466                            return Err(VerticaError::Protocol(
1467                                "Invalid data length in DataRow".to_string(),
1468                            ));
1469                        }
1470                    }
1471
1472                    let row = Row { data: values };
1473                    rows.push(row);
1474                }
1475                b'C' => {
1476                    // CommandComplete - 命令完成
1477                    command_tag = msg_data.read_string()?;
1478                    log::debug!("[EXECUTE_PREPARED] CommandComplete: {}", command_tag);
1479
1480                    // 解析影响的行数
1481                    if let Some(pos) = command_tag.rfind(' ') {
1482                        if let Ok(rows) = command_tag[pos + 1..].parse::<u64>() {
1483                            affected_rows = rows;
1484                        }
1485                    }
1486                    command_complete_received = true;
1487                }
1488                b'E' => {
1489                    // ErrorResponse - 错误响应
1490                    let error_response = ErrorResponse::parse_from_message(&msg.data)?;
1491                    log::error!("[EXECUTE_PREPARED] ErrorResponse: {}", error_response);
1492                    return Err(error_response.to_error());
1493                }
1494                b'Z' => {
1495                    // ReadyForQuery - 事务状态
1496                    let status = msg_data.read_u8()?;
1497                    log::debug!(
1498                        "[EXECUTE_PREPARED] ReadyForQuery: status={}",
1499                        status as char
1500                    );
1501                    break;
1502                }
1503                b'S' => {
1504                    // ParameterStatus - 服务器参数更新
1505                    let name = msg_data.read_string()?;
1506                    let value = msg_data.read_string()?;
1507                    self.parameters.insert(name.clone(), value.clone());
1508                    log::debug!("[EXECUTE_PREPARED] ParameterStatus: {} = {}", name, value);
1509                }
1510                b'N' => {
1511                    // NoticeResponse - 通知消息
1512                    let notice = msg_data.read_string()?;
1513                    log::info!("[EXECUTE_PREPARED] Notice: {}", notice);
1514                }
1515                b'n' => {
1516                    // NoData - 无结果集
1517                    log::debug!("[EXECUTE_PREPARED] NoData received");
1518                }
1519                b'1' => {
1520                    log::debug!("[ASYNC_READ] Processing parse complete message");
1521                    // Parse complete - just continue
1522                    continue;
1523                }
1524                b'2' => {
1525                    log::debug!("[ASYNC_READ] Processing bind complete message");
1526                    // Bind complete - just continue
1527                    continue;
1528                }
1529                b's' => {
1530                    log::debug!("[ASYNC_READ] Processing portal suspended message");
1531                    // Portal suspended - end of results
1532                    break;
1533                }
1534                _ => {
1535                    log::warn!(
1536                        "[ASYNC_READ] Unknown message type: {}",
1537                        msg.message_type as char
1538                    );
1539                    continue;
1540                }
1541            }
1542        }
1543
1544        log::debug!(
1545            "[EXECUTE_PREPARED] Execution completed. Rows: {}, Columns: {}, Affected: {}",
1546            rows.len(),
1547            columns.len(),
1548            affected_rows
1549        );
1550
1551        // 构建结果
1552        let result = if columns.is_empty() && command_complete_received {
1553            // 非查询命令 (INSERT, UPDATE, DELETE等)
1554            QueryResult::from_command(affected_rows as u64, Some(command_tag), "".to_owned())
1555        } else if !columns.is_empty() {
1556            // 查询命令
1557            QueryResult::from_rows(rows.into_iter().map(|r| r.data).collect(), columns)
1558        } else {
1559            // 空结果
1560            QueryResult::empty()
1561        };
1562
1563        Ok(result)
1564    }
1565
1566    /// Close connection
1567    pub async fn close(&mut self) -> Result<()> {
1568        if let Some(mut stream) = self.stream.take() {
1569            stream.write_all_async(&[]).await?;
1570        }
1571        self.state = ConnectionState::Disconnected;
1572        Ok(())
1573    }
1574}
1575
1576/// Async message structure
1577pub struct AsyncMessage {
1578    /// 消息类型,使用 u8 表示消息的类型标识
1579    pub message_type: u8,
1580    /// 存储消息数据的缓冲区
1581    pub data: BytesMut,
1582}
1583/// 为 `AsyncMessage` 实现消息读取方法,用于从消息数据中解析各种类型的值。
1584impl AsyncMessage {
1585    /// 从消息数据中读取一个 `u8` 类型的值。
1586    ///
1587    /// 如果消息数据为空,将返回一个包含错误信息的 `Err`。
1588    ///
1589    /// # 返回值
1590    /// - `Ok(u8)`: 成功读取的 `u8` 值。
1591    /// - `Err(VerticaError)`: 读取失败时的错误信息。
1592    pub fn read_u8(&mut self) -> Result<u8> {
1593        if self.data.is_empty() {
1594            return Err(VerticaError::Connection(
1595                "read_u8:Unexpected end of message".to_string(),
1596            ));
1597        }
1598        Ok(self.data.get_u8())
1599    }
1600
1601    /// 从消息数据中读取一个 `i16` 类型的值。
1602    ///
1603    /// 如果消息数据长度不足 2 字节,将返回一个包含错误信息的 `Err`。
1604    ///
1605    /// # 返回值
1606    /// - `Ok(i16)`: 成功读取的 `i16` 值。
1607    /// - `Err(VerticaError)`: 读取失败时的错误信息。
1608    pub fn read_i16(&mut self) -> Result<i16> {
1609        if self.data.len() < 2 {
1610            return Err(VerticaError::Connection(
1611                "read_i16:Unexpected end of message".to_string(),
1612            ));
1613        }
1614        Ok(self.data.get_i16())
1615    }
1616
1617    /// 从消息数据中读取一个 `i32` 类型的值。
1618    ///
1619    /// 原错误信息存在笔误,当前代码中错误信息为 "read_i16",正确应为 "read_i32"。
1620    /// 如果消息数据长度不足 4 字节,将返回一个包含错误信息的 `Err`。
1621    ///
1622    /// # 返回值
1623    /// - `Ok(i32)`: 成功读取的 `i32` 值。
1624    /// - `Err(VerticaError)`: 读取失败时的错误信息。
1625    pub fn read_i32(&mut self) -> Result<i32> {
1626        if self.data.len() < 4 {
1627            return Err(VerticaError::Connection(
1628                "read_i32:Unexpected end of message".to_string(),
1629            ));
1630        }
1631        Ok(self.data.get_i32())
1632    }
1633
1634    /// 从消息数据中读取一个 `u32` 类型的值。
1635    ///
1636    /// 如果消息数据长度不足 4 字节,将返回一个包含错误信息的 `Err`。
1637    ///
1638    /// # 返回值
1639    /// - `Ok(u32)`: 成功读取的 `u32` 值。
1640    /// - `Err(VerticaError)`: 读取失败时的错误信息。
1641    pub fn read_u32(&mut self) -> Result<u32> {
1642        if self.data.len() < 4 {
1643            return Err(VerticaError::Connection(
1644                "read_u32:Unexpected end of message".to_string(),
1645            ));
1646        }
1647        Ok(self.data.get_u32())
1648    }
1649
1650    /// 从消息数据中读取一个以空字符 (`\0`) 结尾的字符串。
1651    ///
1652    /// 该方法会处理空消息、缺少空字符结尾、空字符位置越界等情况,
1653    /// 并使用 `from_utf8_lossy` 方法优雅地处理无效的 UTF-8 编码。
1654    ///
1655    /// # 返回值
1656    /// - `Ok(String)`: 成功读取的字符串。
1657    /// - `Err(VerticaError)`: 读取失败时的错误信息。
1658    pub fn read_string(&mut self) -> Result<String> {
1659        if self.data.is_empty() {
1660            return Err(VerticaError::Connection(
1661                "Empty message when reading string".to_string(),
1662            ));
1663        }
1664
1665        let null_pos = self.data.iter().position(|&b| b == 0).ok_or_else(|| {
1666            VerticaError::Connection("Invalid string format: no null terminator found".to_string())
1667        })?;
1668
1669        if null_pos >= self.data.len() {
1670            return Err(VerticaError::Connection(
1671                "Invalid string format: null terminator beyond buffer".to_string(),
1672            ));
1673        }
1674
1675        let string_data = self.data.split_to(null_pos);
1676
1677        // 跳过空字符终止符
1678        if !self.data.is_empty() {
1679            self.data.advance(1);
1680        }
1681
1682        // 优雅地处理空字符串
1683        if string_data.is_empty() {
1684            return Ok(String::new());
1685        }
1686
1687        // 使用 from_utf8_lossy 优雅地处理无效的 UTF-8 编码,类似于 Go 实现
1688        let string = String::from_utf8_lossy(&string_data[..]).to_string();
1689
1690        Ok(string)
1691    }
1692
1693    /// Read a tagged string (field type + string value) similar to Go's readTaggedString
1694    /// 读取一个带标签的字符串(字段类型 + 字符串值),类似于 Go 的 `readTaggedString` 方法。
1695    ///
1696    /// 首先读取一个 `u8` 类型的字段类型,若字段类型为 0,则表示字符串为空。
1697    /// 否则,继续读取一个以空字符结尾的字符串。
1698    ///
1699    /// # 返回值
1700    /// - `Ok((u8, String))`: 成功读取的字段类型和对应的字符串。
1701    /// - `Err(VerticaError)`: 读取失败时的错误信息。
1702    pub fn read_tagged_string(&mut self) -> Result<(u8, String)> {
1703        if self.data.is_empty() {
1704            return Err(VerticaError::Connection(
1705                "Empty message when reading tagged string".to_string(),
1706            ));
1707        }
1708
1709        let field_type = self.data.get_u8();
1710
1711        // 处理字段类型为 0 的情况,表示空字符串(结束标记)
1712        if field_type == 0 {
1713            return Ok((0, String::new()));
1714        }
1715
1716        let value = self.read_string()?;
1717        Ok((field_type, value))
1718    }
1719
1720    /// 从消息数据中读取指定长度的字节数据。
1721    ///
1722    /// 如果消息数据长度不足指定长度,将返回一个包含错误信息的 `Err`。
1723    ///
1724    /// # 参数
1725    /// - `len`: 需要读取的字节长度。
1726    ///
1727    /// # 返回值
1728    /// - `Ok(Vec<u8>)`: 成功读取的字节数据。
1729    /// - `Err(VerticaError)`: 读取失败时的错误信息。
1730    pub fn read_bytes(&mut self, len: usize) -> Result<Vec<u8>> {
1731        if self.data.len() < len {
1732            return Err(VerticaError::Connection(
1733                "read_bytes:Unexpected end of message".to_string(),
1734            ));
1735        }
1736        Ok(self.data.split_to(len).to_vec())
1737    }
1738
1739    /// 从消息数据中读取指定缓冲区长度的字节数据,并填充到缓冲区中。
1740    ///
1741    /// 如果消息数据长度不足缓冲区长度,将返回一个包含错误信息的 `Err`。
1742    ///
1743    /// # 参数
1744    /// - `buf`: 用于存储读取数据的缓冲区。
1745    ///
1746    /// # 返回值
1747    /// - `Ok(())`: 读取成功。
1748    /// - `Err(VerticaError)`: 读取失败时的错误信息。
1749    pub fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> {
1750        if self.data.len() < buf.len() {
1751            return Err(VerticaError::Connection(
1752                "read_exact:Unexpected end of message".to_string(),
1753            ));
1754        }
1755        buf.copy_from_slice(&self.data.split_to(buf.len()));
1756        Ok(())
1757    }
1758}
1759
1760
1761
1762
1763
1764
1765