1use bytes::{Buf, BufMut, BytesMut};
2use md5::{Digest, Md5};
3use std::collections::HashMap;
4use tokio::io::{AsyncReadExt, AsyncWriteExt};
5use tokio::net::TcpStream;
6use rust_decimal::Decimal;
7use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
8
9use crate::conn::{ConnectionConfig, ConnectionState};
10use crate::error::error_response::ErrorResponse;
11use crate::error::{Result, VerticaError};
12use crate::query::{ColumnInfo, QueryResult};
13use crate::types::vertica_types;
14use crate::types::Value;
15use crate::Row;
16#[derive(Debug)]
18pub enum AsyncConnectionStream {
19 Plain(TcpStream),
21 #[cfg(feature = "tls")]
22 Ssl(tokio_native_tls::TlsStream<TcpStream>),
24}
25
26impl AsyncConnectionStream {
27 pub async fn read_async(&mut self, buf: &mut [u8]) -> Result<usize> {
38 match self {
39 AsyncConnectionStream::Plain(stream) => stream
40 .read(buf)
41 .await
42 .map_err(|e| VerticaError::Connection(format!("Read error: {}", e))),
43 #[cfg(feature = "tls")]
44 AsyncConnectionStream::Ssl(stream) => stream
45 .read(buf)
46 .await
47 .map_err(|e| VerticaError::Connection(format!("SSL read error: {}", e))),
48 }
49 }
50
51 pub async fn write_all_async(&mut self, buf: &[u8]) -> Result<()> {
62 match self {
63 AsyncConnectionStream::Plain(stream) => stream
64 .write_all(buf)
65 .await
66 .map_err(|e| VerticaError::Connection(format!("Write error: {}", e))),
67 #[cfg(feature = "tls")]
68 AsyncConnectionStream::Ssl(stream) => stream
69 .write_all(buf)
70 .await
71 .map_err(|e| VerticaError::Connection(format!("SSL write error: {}", e))),
72 }
73 }
74}
75
76#[derive(Debug)]
78pub struct AsyncConnection {
79 stream: Option<AsyncConnectionStream>,
80 config: ConnectionConfig,
81 state: ConnectionState,
82 parameters: HashMap<String, String>,
83 backend_pid: Option<u32>,
84 secret_key: Option<u32>,
85}
86
87impl AsyncConnection {
88 pub async fn new(config: ConnectionConfig) -> Result<Self> {
90 let mut conn = Self {
91 stream: None,
92 config,
93 state: ConnectionState::Disconnected,
94 parameters: HashMap::new(),
95 backend_pid: None,
96 secret_key: None,
97 };
98 conn.connect().await?;
99 Ok(conn)
100 }
101
102 pub async fn new_from_url(url: &str) -> Result<Self> {
104 let config = ConnectionConfig::from_url(url)?;
105 Self::new(config).await
106 }
107
108 async fn connect(&mut self) -> Result<()> {
110 log::debug!(
111 "[ASYNC_CONNECT] Starting connection process to {}:{} as {}",
112 self.config.host,
113 self.config.port,
114 self.config.username
115 );
116 self.state = ConnectionState::Connecting;
117
118 let mut hosts = vec![format!("{}:{}", self.config.host, self.config.port)];
120 if let Some(ref backup_servers) = self.config.backup_servers {
121 hosts.extend(backup_servers.clone());
122 }
123
124 let mut _last_error = None;
125
126 for host_port in &hosts {
127 log::debug!("[ASYNC_CONNECT] Trying host: {}", host_port);
128
129 match self.connect_to_host(host_port).await {
130 Ok(stream) => {
131 log::debug!("[ASYNC_CONNECT] Successfully connected to {}", host_port);
132 self.stream = Some(stream);
133 self.state = ConnectionState::Handshake;
134
135 return self.handshake().await.map_err(|e| {
136 log::debug!("[ASYNC_CONNECT] Handshake failed: {}", e);
137 e
138 });
139 }
140 Err(e) => {
141 log::debug!("[ASYNC_CONNECT] Failed to connect to {}: {}", host_port, e);
142 _last_error = Some(e);
143 continue;
144 }
145 }
146 }
147
148 let error_msg = format!(
149 "Failed to connect to any host. Last error: {}",
150 _last_error.unwrap_or_else(|| VerticaError::Connection("Unknown error".to_string()))
151 );
152 log::debug!("[ASYNC_CONNECT] {}", error_msg);
153 Err(VerticaError::Connection(error_msg))
154 }
155
156 async fn connect_to_host(&self, host_port: &str) -> Result<AsyncConnectionStream> {
158 let parts: Vec<&str> = host_port.split(':').collect();
159 if parts.len() != 2 {
160 return Err(VerticaError::Connection(format!(
161 "Invalid host:port format: {}",
162 host_port
163 )));
164 }
165
166 let host = parts[0];
167 let port: u16 = parts[1]
168 .parse()
169 .map_err(|e| VerticaError::Connection(format!("Invalid port number: {}", e)))?;
170
171 let addrs = tokio::net::lookup_host(format!("{}:{}", host, port))
173 .await
174 .map_err(|e| {
175 VerticaError::Connection(format!("Failed to resolve host {}: {}", host, e))
176 })?;
177
178 let mut _last_error = None;
179
180 for addr in addrs {
181 log::debug!("[ASYNC_CONNECT] Trying IP address: {}", addr);
182
183 match tokio::time::timeout(self.config.connect_timeout, TcpStream::connect(addr)).await
184 {
185 Ok(Ok(stream)) => {
186 stream.set_nodelay(true).map_err(|e| {
188 VerticaError::Connection(format!("Failed to set TCP_NODELAY: {}", e))
189 })?;
190
191 return Ok(AsyncConnectionStream::Plain(stream));
192 }
193 Ok(Err(e)) => {
194 log::debug!("[ASYNC_CONNECT] Failed to connect to {}: {}", addr, e);
195 _last_error = Some(e);
196 continue;
197 }
198 Err(_) => {
199 log::debug!("[ASYNC_CONNECT] Connection timeout to {}", addr);
200 _last_error = Some(std::io::Error::new(
201 std::io::ErrorKind::TimedOut,
202 "Connection timeout",
203 ));
204 continue;
205 }
206 }
207 }
208
209 Err(VerticaError::Connection(format!(
210 "Failed to connect to {}",
211 host_port
212 )))
213 }
214
215 async fn handshake(&mut self) -> Result<()> {
217 log::debug!("[ASYNC_HANDSHAKE] Starting handshake process");
218 let mut buf = BytesMut::with_capacity(1024);
219
220 let username = self.config.username.clone();
221 let database = self.config.database.clone();
222
223 buf.put_i32(0); buf.put_i32(196608); self.write_string_async(&mut buf, "user").await?;
227 self.write_string_async(&mut buf, &username).await?;
228
229 self.write_string_async(&mut buf, "database").await?;
230 self.write_string_async(&mut buf, &database).await?;
231
232 self.write_string_async(&mut buf, "client_encoding").await?;
233 self.write_string_async(&mut buf, "UTF8").await?;
234
235 self.write_string_async(&mut buf, "").await?; let len = buf.len() as i32;
238 buf[0..4].copy_from_slice(&len.to_be_bytes());
239
240 log::debug!("[ASYNC_HANDSHAKE] Sending startup message ({} bytes)", len);
241 self.write_all_async(&buf).await.map_err(|e| {
242 log::debug!("[ASYNC_HANDSHAKE] Failed to send startup message: {}", e);
243 e
244 })?;
245
246 log::debug!("[ASYNC_HANDSHAKE] Startup message sent, starting authentication");
247 self.authenticate().await?;
248 log::debug!("[ASYNC_HANDSHAKE] Handshake completed successfully");
249 Ok(())
250 }
251
252 async fn authenticate(&mut self) -> Result<()> {
254 log::debug!("[ASYNC_AUTH] Starting authentication process");
255 self.state = ConnectionState::Authenticating;
256
257 let mut message_count = 0;
258 loop {
259 message_count += 1;
260 log::debug!("[ASYNC_AUTH] Waiting for message #{}", message_count);
261
262 let mut msg = self.read_message().await.map_err(|e| {
263 log::debug!(
264 "[ASYNC_AUTH] Failed to read message #{}: {}",
265 message_count,
266 e
267 );
268 e
269 })?;
270
271 log::debug!(
272 "[ASYNC_AUTH] Received message type: {}",
273 msg.message_type as char
274 );
275
276 match msg.message_type {
277 b'R' => {
278 let auth_type = msg.read_i32().map_err(|e| {
280 log::debug!("[ASYNC_AUTH] Failed to read auth type: {}", e);
281 e
282 })?;
283 log::debug!("[ASYNC_AUTH] Authentication request type: {}", auth_type);
284
285 match auth_type {
286 0 => {
287 log::debug!("[ASYNC_AUTH] Authentication successful");
289 }
290 3 => {
291 log::debug!("[ASYNC_AUTH] Clear text password required");
293 self.send_password_response().await?;
294 }
295 5 => {
296 log::debug!("[ASYNC_AUTH] MD5 password required");
298 let salt = msg.read_bytes(4)?;
299 log::debug!("[ASYNC_AUTH] Salt: {:02x?}", salt);
300 self.send_md5_password_response(&salt).await?;
301 }
302 _ => {
303 log::error!(
304 "[ASYNC_AUTH] Unsupported authentication type: {}",
305 auth_type
306 );
307 return Err(VerticaError::Authentication(format!(
308 "Unsupported authentication type: {}",
309 auth_type
310 )));
311 }
312 }
313 }
314 b'K' => {
315 let pid = msg.read_u32()?;
316 let key = msg.read_u32()?;
317 self.backend_pid = Some(pid);
318 self.secret_key = Some(key);
319 log::debug!("[ASYNC_AUTH] Backend PID: {}, Secret key: {}", pid, key);
320 }
321 b'S' => {
322 log::debug!(
323 "[ASYNC_AUTH] Processing parameter message, data length: {}",
324 msg.data.len()
325 );
326
327 if msg.data.is_empty() {
328 log::warn!("[ASYNC_AUTH] Empty parameter message, skipping");
329 continue;
330 }
331
332 let name = match msg.read_string() {
333 Ok(name) => name,
334 Err(e) => {
335 log::warn!("[ASYNC_AUTH] Failed to read parameter name: {}", e);
336 continue;
337 }
338 };
339
340 let value = match msg.read_string() {
341 Ok(value) => value,
342 Err(e) => {
343 log::warn!(
344 "[ASYNC_AUTH] Failed to read parameter value for '{}': {}",
345 name,
346 e
347 );
348 continue;
349 }
350 };
351
352 self.parameters.insert(name.clone(), value.clone());
353 log::debug!("[ASYNC_AUTH] Parameter: {} = {}", name, value);
354 }
355 b'Z' => {
356 self.state = ConnectionState::Ready;
357 break;
358 }
359 _ => {
360 return Err(VerticaError::Connection(format!(
361 "Unexpected message type: {}",
362 msg.message_type as char
363 )));
364 }
365 }
366 }
367
368 Ok(())
369 }
370
371 async fn send_password_response(&mut self) -> Result<()> {
373 log::debug!("[ASYNC_AUTH] Sending clear text password");
374 let password = self
375 .config
376 .password
377 .clone()
378 .ok_or_else(|| VerticaError::Authentication("No password provided".to_string()))?;
379
380 let mut buf = BytesMut::with_capacity(1024);
381 buf.put_u8(b'p');
382 buf.put_i32(0); self.write_string_async(&mut buf, &password).await?;
384
385 let len = buf.len() as i32;
386 buf[1..5].copy_from_slice(&(len - 1).to_be_bytes());
387
388 self.write_all_async(&buf).await
389 }
390
391 async fn send_md5_password_response(&mut self, salt: &[u8]) -> Result<()> {
393 log::debug!("[ASYNC_AUTH] Sending MD5 password response");
394
395 let user = self.config.username.clone();
396 let password = self
397 .config
398 .password
399 .clone()
400 .ok_or_else(|| VerticaError::Authentication("No password provided".to_string()))?;
401
402 let mut hasher = Md5::new();
404 hasher.update(password.as_bytes());
405 hasher.update(user.as_bytes());
406 let hash1 = hasher.finalize();
407
408 let mut hasher = Md5::new();
409 hasher.update(&hash1);
410 hasher.update(salt);
411 let hash2 = hasher.finalize();
412
413 let md5_response = format!("md5{:x}", hash2);
414
415 let mut buf = BytesMut::with_capacity(1024);
416 buf.put_u8(b'p');
417 buf.put_i32(0); self.write_string_async(&mut buf, &md5_response).await?;
419
420 let len = buf.len() as i32;
421 buf[1..5].copy_from_slice(&(len - 1).to_be_bytes());
422
423 self.write_all_async(&buf).await
424 }
425
426 async fn read_message(&mut self) -> Result<AsyncMessage> {
428 let mut header = [0u8; 5];
429 self.read_exact_async(&mut header).await?;
430
431 let message_type = header[0];
432 let len = i32::from_be_bytes([header[1], header[2], header[3], header[4]]) - 4;
433
434 log::debug!(
435 "[ASYNC_READ] Message type: {} ({}), length: {}",
436 message_type as char,
437 message_type,
438 len
439 );
440
441 if len < 0 || len > 1_000_000 {
442 return Err(VerticaError::Connection(format!(
443 "Invalid message length: {}",
444 len
445 )));
446 }
447
448 let mut data = vec![0u8; len as usize];
449 if len > 0 {
450 self.read_exact_async(&mut data).await?;
451 }
452
453 log::debug!(
454 "[ASYNC_READ] Successfully read message, data length: {}",
455 data.len()
456 );
457
458 Ok(AsyncMessage {
459 message_type,
460 data: BytesMut::from(&data[..]),
461 })
462 }
463
464 async fn read_exact_async(&mut self, buf: &mut [u8]) -> Result<()> {
466 let mut read_index = 0;
467 let total_len = buf.len();
468
469 while read_index < total_len {
470 let bytes_read = match &mut self.stream {
471 Some(stream) => stream.read_async(&mut buf[read_index..]).await?,
472 None => return Err(VerticaError::Connection("No active connection".to_string())),
473 };
474
475 if bytes_read == 0 {
476 return Err(VerticaError::Connection(
477 "Connection closed by server".to_string(),
478 ));
479 }
480
481 read_index += bytes_read;
482 log::debug!(
483 "[ASYNC_READ] Progress: {}/{} bytes ({} bytes this read)",
484 read_index,
485 total_len,
486 bytes_read
487 );
488 }
489 Ok(())
490 }
491
492 async fn write_all_async(&mut self, buf: &[u8]) -> Result<()> {
494 match &mut self.stream {
495 Some(stream) => stream.write_all_async(buf).await,
496 None => Err(VerticaError::Connection("No active connection".to_string())),
497 }
498 }
499
500 async fn write_string_async(&mut self, buf: &mut BytesMut, s: &str) -> Result<()> {
502 buf.extend_from_slice(s.as_bytes());
503 buf.put_u8(0); Ok(())
505 }
506
507 pub async fn simple_query(&mut self, query: &str) -> Result<QueryResult> {
509 if self.state != ConnectionState::Ready {
510 return Err(VerticaError::Connection("Connection not ready".to_string()));
511 }
512
513 let mut buf = BytesMut::with_capacity(1024);
514 buf.put_u8(b'Q');
515 buf.put_i32(0); self.write_string_async(&mut buf, query).await?;
517
518 let len = buf.len() as i32;
519 buf[1..5].copy_from_slice(&(len - 1).to_be_bytes());
520
521 self.write_all_async(&buf).await?;
522 self.process_query_result().await
523 }
524
525 async fn process_query_result(&mut self) -> Result<QueryResult> {
527 let mut columns = Vec::new();
528 let mut rows = Vec::new();
529
530 loop {
531 let mut msg = self.read_message().await?;
532
533 match msg.message_type {
534 b'T' => {
535 log::debug!("[ASYNC_READ] Processing row description message");
536 let field_count = msg.read_i16()?;
538 for _ in 0..field_count {
539 let name = msg.read_string()?;
540 let _table_oid = msg.read_u32()?;
541 let _column_attr_number = msg.read_i16()?;
542 let data_type = msg.read_u32()?;
543 let _type_size = msg.read_i16()?;
544 let _type_modifier = msg.read_i32()?;
545 let _format_code = msg.read_i16()?;
546
547 columns.push(ColumnInfo {
548 name,
549 data_type,
550 nullable: true, precision: None,
552 scale: None,
553 });
554 }
555 }
556 b'D' => {
557 log::debug!("[ASYNC_READ] Processing data row message");
558 let column_count = msg.read_i16()?;
560 let mut row = Vec::new();
561
562 for i in 0..column_count {
563 let len = msg.read_i32()?;
564 if len == -1 {
565 row.push(Value::Null);
566 } else {
567 let mut data = vec![0u8; len as usize];
568 msg.read_exact(&mut data)?;
569
570 let data_type = if (i as usize) < columns.len() {
572 columns[i as usize].data_type
573 } else {
574 0 };
576
577 let value = Self::parse_binary_value(data_type, &data)?;
578 row.push(value);
579 }
580 }
581 rows.push(row);
582 }
583 b'C' => {
584 log::debug!("[ASYNC_READ] Processing command complete message");
585 let _tag = msg.read_string()?;
587 break;
588 }
589 b'Z' => {
590 log::debug!("[ASYNC_READ] Processing ready for query message");
591 let _status = msg.read_u8()?;
593 break;
594 }
595 b'E' => {
596 log::debug!("[ASYNC_READ] Processing error response message");
597 let error_response = ErrorResponse::parse_from_message(&msg.data)?;
599 log::error!("[ASYNC_READ] Server error: {}", error_response);
600 return Err(error_response.to_error());
601 }
602 b'S' => {
603 log::debug!("[ASYNC_READ] Processing notice response message");
604 let fields = msg.read_string()?;
605 log::info!("Notice: {}", fields);
606 continue;
607 }
608 b'N' => {
609 log::debug!("[ASYNC_READ] Processing notice response message");
610 let fields = msg.read_string()?;
611 log::info!("Notice: {}", fields);
612 continue;
613 }
614 b't' => {
615 let param_count = msg.read_i16()?;
617 log::debug!("[PREPARE] ParameterDescription: {} parameters", param_count);
618
619 for i in 0..param_count {
620 let oid = msg.read_i32()?;
621 log::debug!("[PREPARE] Parameter {}: OID {}", i, oid);
622 }
623 }
624 b'1' => {
625 log::debug!("[ASYNC_READ] Processing parse complete message");
626 continue;
628 }
629 b'2' => {
630 log::debug!("[ASYNC_READ] Processing bind complete message");
631 continue;
633 }
634 b's' => {
635 log::debug!("[ASYNC_READ] Processing portal suspended message");
636 continue;
638 }
639 _ => {
640 log::warn!(
641 "[ASYNC_READ] Unknown message type: {}",
642 msg.message_type as char
643 );
644 continue;
645 }
646 }
647 }
648
649 Ok(QueryResult::from_rows(rows, columns))
650 }
651
652 pub fn state(&self) -> ConnectionState {
654 self.state
655 }
656
657 pub fn is_ready(&self) -> bool {
659 self.state == ConnectionState::Ready
660 }
661
662 pub fn parameters(&self) -> &HashMap<String, String> {
664 &self.parameters
665 }
666
667 fn parse_binary_value(data_type: u32, data: &[u8]) -> Result<Value> {
669 use byteorder::{ByteOrder, LittleEndian, BigEndian};
670 log::debug!(
671 "[PARSE_BINARY_VALUE] data_type: {}, data_len: {}, raw_data: {:?}",
672 data_type,
673 data.len(),
674 data
675 );
676
677 if data.is_empty() {
678 log::debug!("[PARSE_BINARY_VALUE] Empty data, returning empty string");
679 return Ok(Value::String(String::new()));
680 }
681
682 match data_type {
683 vertica_types::BOOLEAN => {
684 if data.len() >= 1 {
685 let val = Value::Boolean(data[0] != 0);
686 log::debug!("[PARSE_BOOLEAN] {:?}", val);
687 Ok(val)
688 } else {
689 log::debug!("[PARSE_BOOLEAN] Empty data, defaulting to false");
690 Ok(Value::Boolean(false))
691 }
692 }
693 vertica_types::INTEGER => {
694 if data.len() >= 4 {
695 let val = Value::Int(LittleEndian::read_i32(data));
696 log::debug!("[PARSE_INTEGER] {:?}", val);
697 Ok(val)
698 } else if data.len() == 2 {
699 let val = Value::Int(LittleEndian::read_i16(data) as i32);
700 log::debug!("[PARSE_INTEGER] 2-byte value: {:?}", val);
701 Ok(val)
702 } else if data.len() == 1 {
703 let val = Value::Int(data[0] as i8 as i32);
704 log::debug!("[PARSE_INTEGER] 1-byte value: {:?}", val);
705 Ok(val)
706 } else {
707 if let Ok(s) = String::from_utf8(data.to_vec()) {
709 if let Ok(num) = s.trim().parse::<i32>() {
710 log::debug!("[PARSE_INTEGER] String format: {:?}", num);
711 Ok(Value::Int(num))
712 } else if let Ok(num) = s.trim().parse::<i64>() {
713 log::debug!("[PARSE_INTEGER] Large string format: {} -> {}", num, num as i32);
715 Ok(Value::Int(num as i32))
716 } else {
717 log::debug!("[PARSE_INTEGER] Invalid string format, returning string: {}", s);
718 Ok(Value::String(s))
719 }
720 } else {
721 log::debug!("[PARSE_INTEGER] Unexpected len and invalid UTF-8, returning binary");
722 Ok(Value::Binary(data.to_vec()))
723 }
724 }
725 }
726 vertica_types::BIGINT => {
727 if data.len() >= 8 {
728 let val = Value::BigInt(LittleEndian::read_i64(data));
729 log::debug!("[PARSE_BIGINT] {:?}", val);
730 Ok(val)
731 } else if data.len() == 4 {
732 let val = Value::BigInt(LittleEndian::read_i32(data) as i64);
733 log::debug!("[PARSE_BIGINT] 4-byte value: {:?}", val);
734 Ok(val)
735 } else {
736 if let Ok(s) = String::from_utf8(data.to_vec()) {
738 if let Ok(num) = s.trim().parse::<i64>() {
739 log::debug!("[PARSE_BIGINT] String format: {:?}", num);
740 Ok(Value::BigInt(num))
741 } else {
742 log::debug!("[PARSE_BIGINT] Invalid string format, returning string: {}", s);
743 Ok(Value::String(s))
744 }
745 } else {
746 log::debug!("[PARSE_BIGINT] Unexpected len and invalid UTF-8, returning binary");
747 Ok(Value::Binary(data.to_vec()))
748 }
749 }
750 }
751 vertica_types::FLOAT => {
752 log::debug!(
753 "[PARSE_FLOAT] data_type: {}, data_len: {}, raw_bytes: {:?}",
754 data_type,
755 data.len(),
756 data
757 );
758 match data.len() {
759 8 => {
760 let val = BigEndian::read_f64(data);
762 log::debug!("[PARSE_FLOAT] FLOAT8: {}", val);
763 Ok(Value::Double(val))
764 },
765 4 => {
766 let val = BigEndian::read_f32(data);
768 log::debug!("[PARSE_FLOAT] FLOAT4: {}", val);
769 Ok(Value::Float(val))
770 },
771 _ => {
772 let s = String::from_utf8_lossy(data);
774 log::debug!("[PARSE_FLOAT] String format: {}", s);
775
776 let s = s.trim();
777 if let Ok(num) = s.parse::<f64>() {
778 log::debug!("[PARSE_FLOAT] Parsed as f64: {}", num);
779 Ok(Value::Double(num))
780 } else if let Ok(num) = s.parse::<f32>() {
781 log::debug!("[PARSE_FLOAT] Parsed as f32: {}", num);
782 Ok(Value::Float(num))
783 } else {
784 Ok(Value::String(s.to_string()))
785 }
786 }
787 }
788 }
789 vertica_types::CHAR | vertica_types::VARCHAR | vertica_types::LONG_VARCHAR => {
790 let val = String::from_utf8(data.to_vec())
791 .map(Value::String)
792 .unwrap_or_else(|_| Value::Binary(data.to_vec()));
793 log::debug!("[PARSE_STRING] {:?}", val);
794 Ok(val)
795 }
796 vertica_types::DATE => {
797 if data.len() == 8 {
798 let days = LittleEndian::read_i64(data);
800 log::debug!(
801 "[PARSE_DATE] Binary format - days since 2000-01-01: {}",
802 days
803 );
804 Value::from_date_days(days as i32)
805 } else {
806 if let Ok(s) = String::from_utf8(data.to_vec()) {
808 log::debug!("[PARSE_DATE] String format: {}", s);
809 let formats = ["%Y-%m-%d", "%Y-%m-%d %H:%M:%S", "%Y/%m/%d", "%d-%m-%Y"];
811
812 let mut result = Value::String(s.clone());
813 for fmt in formats.iter() {
814 if let Ok(date) = chrono::NaiveDate::parse_from_str(&s.trim(), fmt) {
815 result = Value::Date(date);
816 break;
817 }
818 }
819
820 Ok(result)
821 } else {
822 log::debug!("[PARSE_DATE] Invalid UTF-8 string, returning binary");
823 Ok(Value::Binary(data.to_vec()))
824 }
825 }
826 }
827 vertica_types::TIME => {
828 if data.len() == 8 {
829 let micros = LittleEndian::read_i64(data);
831 log::debug!(
832 "[PARSE_TIME] Binary format - micros since midnight: {}",
833 micros
834 );
835 Value::from_time_micros(micros)
836 } else {
837 if let Ok(s) = String::from_utf8(data.to_vec()) {
839 log::debug!("[PARSE_TIME] String format: {}", s);
840 let formats = ["%H:%M:%S", "%Y-%m-%d %H:%M:%S", "%H:%M"];
842
843 let mut result = Value::String(s.clone());
844 for fmt in formats.iter() {
845 if let Ok(time) = chrono::NaiveTime::parse_from_str(&s.trim(), fmt) {
846 result = Value::Time(time);
847 break;
848 }
849 }
850
851 Ok(result)
852 } else {
853 log::debug!("[PARSE_TIME] Invalid UTF-8 string, returning binary");
854 Ok(Value::Binary(data.to_vec()))
855 }
856 }
857 }
858 vertica_types::TIMETZ => {
859 if data.len() == 8 {
860 let micros = LittleEndian::read_i64(data);
862 log::debug!(
863 "[PARSE_TIMETZ] Binary format - micros since midnight: {}",
864 micros
865 );
866 Value::from_time_micros(micros)
867 } else {
868 if let Ok(s) = String::from_utf8(data.to_vec()) {
870 log::debug!(
871 "[PARSE_TIMETZ] String format: {}",
872 s
873 );
874
875 let s = s.trim();
876
877 let formats = [
879 "%H:%M:%S%.f", "%H:%M:%S", ];
882
883 let time_part = if let Some(pos) = s.find(|c: char| c == '+' || c == '-') {
885 &s[..pos]
886 } else {
887 s
888 };
889
890 let mut result = Value::String(s.to_string()); for fmt in formats.iter() {
892 if let Ok(time) = chrono::NaiveTime::parse_from_str(time_part, fmt) {
893 result = Value::Time(time);
894 log::debug!(
895 "[PARSE_TIMETZ] Successfully parsed as Time: {:?}",
896 time
897 );
898 break;
899 }
900 }
901
902 Ok(result)
903 } else {
904 log::debug!(
905 "[PARSE_TIMETZ] Invalid UTF-8 string, returning binary"
906 );
907 Ok(Value::Binary(data.to_vec()))
908 }
909 }
910 }
911 vertica_types::TIMESTAMPTZ => {
912 if data.len() == 8 {
913 let micros = LittleEndian::read_i64(data);
915 log::debug!(
916 "[PARSE_TIMESTAMPTZ] Binary format - micros since 2000-01-01: {}",
917 micros
918 );
919 Value::from_timestamp_micros(micros)
920 } else {
921 if let Ok(s) = String::from_utf8(data.to_vec()) {
923 log::debug!(
924 "[PARSE_TIMESTAMPTZ] String format: {}",
925 s
926 );
927 let formats = [
929 "%Y-%m-%d %H:%M:%S%.f%#z",
930 "%Y-%m-%d %H:%M:%S%#z",
931 "%Y-%m-%d %H:%M:%S%.f",
932 "%Y-%m-%d %H:%M:%S",
933 "%Y-%m-%d %H:%M",
934 "%Y-%m-%d",
935 ];
936
937 let mut result = Value::String(s.clone());
938 for fmt in formats.iter() {
939 if let Ok(timestamp) =
940 chrono::NaiveDateTime::parse_from_str(&s.trim(), fmt)
941 {
942 result = Value::Timestamp(timestamp);
943 break;
944 }
945 }
946
947 if let Value::String(_) = result {
949 if let Ok(date) =
950 chrono::NaiveDate::parse_from_str(&s.trim(), "%Y-%m-%d")
951 {
952 result = Value::Timestamp(date.and_hms_opt(0, 0, 0).unwrap());
953 }
954 }
955
956 Ok(result)
957 } else {
958 log::debug!(
959 "[PARSE_TIMESTAMPTZ] Unexpected len and invalid UTF-8, returning binary"
960 );
961 Ok(Value::Binary(data.to_vec()))
962 }
963 }
964 }
965 vertica_types::TIMESTAMP => {
966 if data.len() == 8 {
967 let micros = LittleEndian::read_i64(data);
969 log::debug!(
970 "[PARSE_TIMESTAMP] Binary format - micros since 2000-01-01: {}",
971 micros
972 );
973 Value::from_timestamp_micros(micros)
974 } else if data.len() == 19 {
975 if let Ok(s) = String::from_utf8(data.to_vec()) {
977 log::debug!("[PARSE_TIMESTAMP] String format: {}", s);
978 Value::parse_timestamp(&s)
979 } else {
980 log::debug!("[PARSE_TIMESTAMP] Invalid UTF-8 string");
981 Ok(Value::Binary(data.to_vec()))
982 }
983 } else {
984 if let Ok(s) = String::from_utf8(data.to_vec()) {
986 log::debug!(
987 "[PARSE_TIMESTAMP] Variable string format: {}",
988 s
989 );
990 let formats = [
992 "%Y-%m-%d %H:%M:%S",
993 "%Y-%m-%d %H:%M:%S%.f",
994 "%Y-%m-%d %H:%M",
995 "%Y-%m-%d",
996 ];
997
998 let mut result = Value::String(s.clone());
999 for fmt in formats.iter() {
1000 if let Ok(timestamp) =
1001 chrono::NaiveDateTime::parse_from_str(&s.trim(), fmt)
1002 {
1003 result = Value::Timestamp(timestamp);
1004 break;
1005 }
1006 }
1007
1008 if let Value::String(_) = result {
1010 if let Ok(date) =
1011 chrono::NaiveDate::parse_from_str(&s.trim(), "%Y-%m-%d")
1012 {
1013 result = Value::Timestamp(date.and_hms_opt(0, 0, 0).unwrap());
1014 }
1015 }
1016
1017 Ok(result)
1018 } else {
1019 log::debug!(
1020 "[PARSE_TIMESTAMP] Unexpected len and invalid UTF-8, returning binary"
1021 );
1022 Ok(Value::Binary(data.to_vec()))
1023 }
1024 }
1025 }
1026 vertica_types::NUMERIC => {
1027 log::debug!(
1028 "[PARSE_NUMERIC] type_oid: {}, data_len: {}, raw_bytes: {:?}",
1029 data_type,
1030 data.len(),
1031 data
1032 );
1033 match data.len() {
1034 8 => {
1035 let val = BigEndian::read_f64(data);
1037 log::debug!("[PARSE_NUMERIC] NUMERIC8: {}", val);
1038 if let Some(decimal) = Decimal::from_f64(val) {
1039 Ok(Value::Decimal(decimal))
1040 } else {
1041 Ok(Value::Double(val))
1042 }
1043 },
1044 4 => {
1045 let val = BigEndian::read_f32(data);
1047 log::debug!("[PARSE_NUMERIC] NUMERIC4: {}", val);
1048 if let Some(decimal) = Decimal::from_f32(val) {
1049 Ok(Value::Decimal(decimal))
1050 } else {
1051 Ok(Value::Float(val))
1052 }
1053 },
1054 16 => {
1055 let s = String::from_utf8_lossy(data);
1057 log::debug!("[PARSE_NUMERIC] NUMERIC16 string: {}", s);
1058 if let Ok(decimal) = s.trim().parse::<Decimal>() {
1059 Ok(Value::Decimal(decimal))
1060 } else if let Ok(num) = s.trim().parse::<f64>() {
1061 Ok(Value::Double(num))
1062 } else {
1063 Ok(Value::String(s.to_string()))
1064 }
1065 },
1066 _ => {
1067 let s = String::from_utf8_lossy(data);
1069 log::debug!("[PARSE_NUMERIC] String format: {}", s);
1070
1071 let s = s.trim();
1072 if let Ok(decimal) = s.parse::<Decimal>() {
1073 log::debug!("[PARSE_NUMERIC] Parsed as Decimal: {}", decimal);
1074 Ok(Value::Decimal(decimal))
1075 } else if let Ok(num) = s.parse::<f64>() {
1076 log::debug!("[PARSE_NUMERIC] Parsed as f64: {}", num);
1077 Ok(Value::Double(num))
1078 } else if let Ok(num) = s.parse::<f32>() {
1079 log::debug!("[PARSE_NUMERIC] Parsed as f32: {}", num);
1080 Ok(Value::Float(num))
1081 } else if let Ok(num) = s.parse::<i64>() {
1082 log::debug!("[PARSE_NUMERIC] Parsed as i64: {}", num);
1083 Ok(Value::BigInt(num))
1084 } else if let Ok(num) = s.parse::<i32>() {
1085 log::debug!("[PARSE_NUMERIC] Parsed as i32: {}", num);
1086 Ok(Value::Int(num))
1087 } else {
1088 Ok(Value::String(s.to_string()))
1089 }
1090 }
1091 }
1092 }
1093 vertica_types::UUID => {
1094 if data.len() >= 16 {
1095 let val = Value::from_uuid_bytes(data.to_vec());
1096 log::debug!("[PARSE_UUID] {:?}", val);
1097 val
1098 } else {
1099 log::debug!("[PARSE_UUID] Unexpected len, returning binary");
1100 Ok(Value::Binary(data.to_vec()))
1101 }
1102 }
1103 vertica_types::VARBINARY | vertica_types::LONG_VARBINARY | vertica_types::BINARY => {
1104 let val = Value::Binary(data.to_vec());
1105 log::debug!("[PARSE_BINARY] {} bytes", data.len());
1106 Ok(val)
1107 }
1108 _ => {
1109 if let Ok(s) = String::from_utf8(data.to_vec()) {
1110 log::debug!("[PARSE_UNKNOWN] UTF-8 string: {}", s);
1111 Ok(Value::String(s))
1112 } else {
1113 log::debug!("[PARSE_UNKNOWN] Binary: {} bytes", data.len());
1114 Ok(Value::Binary(data.to_vec()))
1115 }
1116 }
1117 }
1118 }
1119
1120 pub async fn prepare(&mut self, stmt_id: &str, query: &str) -> Result<()> {
1122 if self.state != ConnectionState::Ready {
1123 return Err(VerticaError::Connection("Connection not ready".to_string()));
1124 }
1125
1126 log::debug!("[PREPARE] Starting prepare for statement: {}", stmt_id);
1127 log::debug!("[PREPARE] SQL: {}", query);
1128
1129 let mut parse_buf = BytesMut::with_capacity(1024);
1131 parse_buf.put_u8(b'P'); parse_buf.put_i32(0); self.write_string_async(&mut parse_buf, stmt_id).await?;
1136 self.write_string_async(&mut parse_buf, query).await?;
1138 parse_buf.put_i16(0);
1140
1141 let len = parse_buf.len() as i32;
1142 parse_buf[1..5].copy_from_slice(&(len - 1).to_be_bytes());
1143
1144 log::debug!("[PREPARE] Sending Parse message ({} bytes)", len);
1145 self.write_all_async(&parse_buf).await?;
1146
1147 let mut describe_buf = BytesMut::with_capacity(64);
1149 describe_buf.put_u8(b'D'); describe_buf.put_i32(0); describe_buf.put_u8(b'S'); self.write_string_async(&mut describe_buf, stmt_id).await?;
1153
1154 let describe_len = describe_buf.len() as i32;
1155 describe_buf[1..5].copy_from_slice(&(describe_len - 1).to_be_bytes());
1156
1157 log::debug!(
1158 "[PREPARE] Sending Describe message ({} bytes)",
1159 describe_len
1160 );
1161 self.write_all_async(&describe_buf).await?;
1162
1163 let mut sync_buf = BytesMut::with_capacity(16);
1165 sync_buf.put_u8(b'S'); sync_buf.put_i32(4); log::debug!("[PREPARE] Sending Sync message");
1169 self.write_all_async(&sync_buf).await?;
1170
1171 let mut parse_complete_received = false;
1173 let ready_for_query_received :bool;
1174
1175
1176 loop {
1177 let mut msg = self.read_message().await?;
1178
1179 match msg.message_type {
1180 b'1' => {
1181 log::debug!("[PREPARE] ParseComplete received");
1183 parse_complete_received = true;
1184 }
1185
1186 b't' => {
1187 let param_count = msg.read_i16()?;
1189 log::debug!("[PREPARE] ParameterDescription: {} parameters", param_count);
1190
1191 for i in 0..param_count {
1192 let oid = msg.read_i32()?;
1193 log::debug!("[PREPARE] Parameter {}: OID {}", i, oid);
1194 }
1195 }
1196
1197 b'T' => {
1198 let field_count = msg.read_i16()?;
1200 log::debug!("[PREPARE] RowDescription: {} columns", field_count);
1201
1202 let mut columns: Vec<ColumnInfo> = Vec::new();
1203 for i in 0..field_count {
1204 let name = msg.read_string()?;
1205 let type_oid = msg.read_u32()?;
1206 let column = ColumnInfo {
1207 name: name.clone(),
1208 data_type: type_oid,
1209 nullable: true,
1210 precision: None,
1211 scale: None,
1212 };
1213 columns.push(column);
1214
1215 log::debug!(
1216 "[PREPARE] Column {}: name={}, type_oid={}",
1217 i,
1218 name,
1219 type_oid
1220 );
1221 }
1222 }
1223
1224 b'n' => {
1225 log::debug!("[PREPARE] NoData received (no result set)");
1227 }
1228
1229 b'E' => {
1230 let error_response = ErrorResponse::parse_from_message(&msg.data)?;
1232 log::error!("[PREPARE] ErrorResponse: {}", error_response);
1233 return Err(error_response.to_error());
1234 }
1235
1236 b'S' => {
1237 let name = msg.read_string()?;
1239 let value = msg.read_string()?;
1240 self.parameters.insert(name.clone(), value.clone());
1241 log::debug!("[PREPARE] ParameterStatus: {} = {}", name, value);
1242 }
1243
1244 b'Z' => {
1245 let status = msg.read_u8()?;
1247 log::debug!("[PREPARE] ReadyForQuery: status={}", status as char);
1248 ready_for_query_received = true;
1249
1250 break;
1252 }
1253
1254 b'C' => {
1255 let tag = msg.read_string()?;
1257 log::debug!("[PREPARE] CommandComplete: {}", tag);
1258 }
1259
1260 b'D' => {
1261 let col_count = msg.read_i16()?;
1263 log::warn!(
1264 "[PREPARE] Unexpected DataRow: {} columns - skipping",
1265 col_count
1266 );
1267
1268 for _ in 0..col_count {
1270 let len = msg.read_i32()?;
1271 if len > 0 {
1272 let _data = msg.read_bytes(len as usize)?;
1273 }
1274 }
1275 }
1276
1277 b'N' => {
1278 let notice = msg.read_string()?;
1280 log::info!("[PREPARE] Notice: {}", notice);
1281 }
1282
1283 _ => {
1284 log::warn!(
1286 "[PREPARE] Unknown message type: {} ({}) - skipping",
1287 msg.message_type as char,
1288 msg.message_type
1289 );
1290 continue;
1291 }
1292 }
1293 }
1294
1295 if !ready_for_query_received {
1297 log::warn!("[PREPARE] Missing ReadyForQuery, prepare may have failed");
1298 return Err(VerticaError::Connection(
1299 "Incomplete prepare response - missing ReadyForQuery".to_string(),
1300 ));
1301 }
1302
1303 if !parse_complete_received {
1305 log::warn!(
1306 "[PREPARE] No ParseComplete received, but continuing as server indicated ready"
1307 );
1308 }
1309
1310 log::debug!("[PREPARE] Prepare process completed");
1311 Ok(())
1312 }
1313
1314 pub async fn execute_prepared(
1316 &mut self,
1317 statement_id: &str,
1318 params: &[Value],
1319 ) -> Result<QueryResult> {
1320 log::debug!(
1321 "[EXECUTE_PREPARED] Executing prepared statement: {} with {} parameters",
1322 statement_id,
1323 params.len()
1324 );
1325
1326 let mut bind_msg = BytesMut::new();
1328 bind_msg.put_u8(b'B'); bind_msg.put_i32(0); bind_msg.put_slice(b"");
1333 bind_msg.put_u8(0);
1334
1335 bind_msg.put_slice(statement_id.as_bytes());
1337 bind_msg.put_u8(0);
1338
1339 bind_msg.put_i16(0);
1341
1342 bind_msg.put_i16(params.len() as i16);
1344
1345 for param in params {
1347 if let Value::Null = *param {
1348 bind_msg.put_i32(-1); } else {
1350 let bytes = param.as_bytes()?;
1353 bind_msg.put_i32(bytes.len() as i32);
1354 bind_msg.extend_from_slice(&bytes);
1355 }
1356 }
1357
1358 bind_msg.put_i16(0);
1360
1361 let len = bind_msg.len() - 1;
1363 bind_msg[1..5].copy_from_slice(&(len as i32).to_be_bytes());
1364
1365 let mut execute_msg = BytesMut::new();
1367 execute_msg.put_u8(b'E'); execute_msg.put_i32(9); execute_msg.put_u8(0); execute_msg.put_i32(0); let len = execute_msg.len() - 1;
1374 execute_msg[1..5].copy_from_slice(&(len as i32).to_be_bytes());
1375
1376 let mut sync_msg = BytesMut::new();
1378 sync_msg.put_u8(b'S'); sync_msg.put_i32(4); let mut full_msg = BytesMut::new();
1383 full_msg.extend_from_slice(&bind_msg);
1384 full_msg.extend_from_slice(&execute_msg);
1385 full_msg.extend_from_slice(&sync_msg);
1386
1387 if let Some(ref mut stream) = self.stream {
1388 stream.write_all_async(&full_msg).await?;
1389 } else {
1390 return Err(VerticaError::Connection(
1391 "Connection not established".to_string(),
1392 ));
1393 }
1394
1395 log::debug!(
1396 "[EXECUTE_PREPARED] Sent Bind/Execute/Sync messages for statement: {}",
1397 statement_id
1398 );
1399
1400 let mut rows = Vec::new();
1402 let mut columns = Vec::new();
1403 let mut affected_rows = 0;
1404 let mut command_tag = String::new();
1405 let mut command_complete_received = false;
1406
1407 loop {
1408 let msg = self.read_message().await?;
1409 log::debug!(
1410 "[EXECUTE_PREPARED] Received message type: {} ({:x})",
1411 msg.message_type as char,
1412 msg.message_type
1413 );
1414
1415 let mut msg_data = AsyncMessage {
1416 message_type: msg.message_type,
1417 data: msg.data.clone(),
1418 };
1419
1420 match msg.message_type {
1421 b't' => {
1422 log::debug!("[EXECUTE_PREPARED] Unexpected ParameterDescription - skipping");
1424 }
1425 b'T' => {
1426 let field_count = msg_data.read_i16()?;
1428 log::debug!("[EXECUTE_PREPARED] RowDescription: {} columns", field_count);
1429
1430 columns.clear();
1431 for i in 0..field_count {
1432 let name = msg_data.read_string()?;
1433 let type_oid = msg_data.read_u32()?;
1434 let column = ColumnInfo {
1435 name: name.clone(),
1436 data_type: type_oid,
1437 nullable: true,
1438 precision: None,
1439 scale: None,
1440 };
1441 columns.push(column);
1442
1443 log::debug!(
1444 "[EXECUTE_PREPARED] Column {}: name={}, type_oid={}",
1445 i,
1446 &name,
1447 type_oid
1448 );
1449 }
1450 }
1451 b'D' => {
1452 let col_count = msg_data.read_i16()?;
1454 let mut values = Vec::with_capacity(col_count as usize);
1455
1456 for _i in 0..col_count {
1457 let len = msg_data.read_i32()?;
1458 if len == -1 {
1459 values.push(Value::Null);
1461 } else if len >= 0 {
1462 let data = msg_data.read_bytes(len as usize)?;
1463 let value = Value::from_bytes(data);
1464 values.push(value);
1465 } else {
1466 return Err(VerticaError::Protocol(
1467 "Invalid data length in DataRow".to_string(),
1468 ));
1469 }
1470 }
1471
1472 let row = Row { data: values };
1473 rows.push(row);
1474 }
1475 b'C' => {
1476 command_tag = msg_data.read_string()?;
1478 log::debug!("[EXECUTE_PREPARED] CommandComplete: {}", command_tag);
1479
1480 if let Some(pos) = command_tag.rfind(' ') {
1482 if let Ok(rows) = command_tag[pos + 1..].parse::<u64>() {
1483 affected_rows = rows;
1484 }
1485 }
1486 command_complete_received = true;
1487 }
1488 b'E' => {
1489 let error_response = ErrorResponse::parse_from_message(&msg.data)?;
1491 log::error!("[EXECUTE_PREPARED] ErrorResponse: {}", error_response);
1492 return Err(error_response.to_error());
1493 }
1494 b'Z' => {
1495 let status = msg_data.read_u8()?;
1497 log::debug!(
1498 "[EXECUTE_PREPARED] ReadyForQuery: status={}",
1499 status as char
1500 );
1501 break;
1502 }
1503 b'S' => {
1504 let name = msg_data.read_string()?;
1506 let value = msg_data.read_string()?;
1507 self.parameters.insert(name.clone(), value.clone());
1508 log::debug!("[EXECUTE_PREPARED] ParameterStatus: {} = {}", name, value);
1509 }
1510 b'N' => {
1511 let notice = msg_data.read_string()?;
1513 log::info!("[EXECUTE_PREPARED] Notice: {}", notice);
1514 }
1515 b'n' => {
1516 log::debug!("[EXECUTE_PREPARED] NoData received");
1518 }
1519 b'1' => {
1520 log::debug!("[ASYNC_READ] Processing parse complete message");
1521 continue;
1523 }
1524 b'2' => {
1525 log::debug!("[ASYNC_READ] Processing bind complete message");
1526 continue;
1528 }
1529 b's' => {
1530 log::debug!("[ASYNC_READ] Processing portal suspended message");
1531 break;
1533 }
1534 _ => {
1535 log::warn!(
1536 "[ASYNC_READ] Unknown message type: {}",
1537 msg.message_type as char
1538 );
1539 continue;
1540 }
1541 }
1542 }
1543
1544 log::debug!(
1545 "[EXECUTE_PREPARED] Execution completed. Rows: {}, Columns: {}, Affected: {}",
1546 rows.len(),
1547 columns.len(),
1548 affected_rows
1549 );
1550
1551 let result = if columns.is_empty() && command_complete_received {
1553 QueryResult::from_command(affected_rows as u64, Some(command_tag), "".to_owned())
1555 } else if !columns.is_empty() {
1556 QueryResult::from_rows(rows.into_iter().map(|r| r.data).collect(), columns)
1558 } else {
1559 QueryResult::empty()
1561 };
1562
1563 Ok(result)
1564 }
1565
1566 pub async fn close(&mut self) -> Result<()> {
1568 if let Some(mut stream) = self.stream.take() {
1569 stream.write_all_async(&[]).await?;
1570 }
1571 self.state = ConnectionState::Disconnected;
1572 Ok(())
1573 }
1574}
1575
1576pub struct AsyncMessage {
1578 pub message_type: u8,
1580 pub data: BytesMut,
1582}
1583impl AsyncMessage {
1585 pub fn read_u8(&mut self) -> Result<u8> {
1593 if self.data.is_empty() {
1594 return Err(VerticaError::Connection(
1595 "read_u8:Unexpected end of message".to_string(),
1596 ));
1597 }
1598 Ok(self.data.get_u8())
1599 }
1600
1601 pub fn read_i16(&mut self) -> Result<i16> {
1609 if self.data.len() < 2 {
1610 return Err(VerticaError::Connection(
1611 "read_i16:Unexpected end of message".to_string(),
1612 ));
1613 }
1614 Ok(self.data.get_i16())
1615 }
1616
1617 pub fn read_i32(&mut self) -> Result<i32> {
1626 if self.data.len() < 4 {
1627 return Err(VerticaError::Connection(
1628 "read_i32:Unexpected end of message".to_string(),
1629 ));
1630 }
1631 Ok(self.data.get_i32())
1632 }
1633
1634 pub fn read_u32(&mut self) -> Result<u32> {
1642 if self.data.len() < 4 {
1643 return Err(VerticaError::Connection(
1644 "read_u32:Unexpected end of message".to_string(),
1645 ));
1646 }
1647 Ok(self.data.get_u32())
1648 }
1649
1650 pub fn read_string(&mut self) -> Result<String> {
1659 if self.data.is_empty() {
1660 return Err(VerticaError::Connection(
1661 "Empty message when reading string".to_string(),
1662 ));
1663 }
1664
1665 let null_pos = self.data.iter().position(|&b| b == 0).ok_or_else(|| {
1666 VerticaError::Connection("Invalid string format: no null terminator found".to_string())
1667 })?;
1668
1669 if null_pos >= self.data.len() {
1670 return Err(VerticaError::Connection(
1671 "Invalid string format: null terminator beyond buffer".to_string(),
1672 ));
1673 }
1674
1675 let string_data = self.data.split_to(null_pos);
1676
1677 if !self.data.is_empty() {
1679 self.data.advance(1);
1680 }
1681
1682 if string_data.is_empty() {
1684 return Ok(String::new());
1685 }
1686
1687 let string = String::from_utf8_lossy(&string_data[..]).to_string();
1689
1690 Ok(string)
1691 }
1692
1693 pub fn read_tagged_string(&mut self) -> Result<(u8, String)> {
1703 if self.data.is_empty() {
1704 return Err(VerticaError::Connection(
1705 "Empty message when reading tagged string".to_string(),
1706 ));
1707 }
1708
1709 let field_type = self.data.get_u8();
1710
1711 if field_type == 0 {
1713 return Ok((0, String::new()));
1714 }
1715
1716 let value = self.read_string()?;
1717 Ok((field_type, value))
1718 }
1719
1720 pub fn read_bytes(&mut self, len: usize) -> Result<Vec<u8>> {
1731 if self.data.len() < len {
1732 return Err(VerticaError::Connection(
1733 "read_bytes:Unexpected end of message".to_string(),
1734 ));
1735 }
1736 Ok(self.data.split_to(len).to_vec())
1737 }
1738
1739 pub fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> {
1750 if self.data.len() < buf.len() {
1751 return Err(VerticaError::Connection(
1752 "read_exact:Unexpected end of message".to_string(),
1753 ));
1754 }
1755 buf.copy_from_slice(&self.data.split_to(buf.len()));
1756 Ok(())
1757 }
1758}
1759
1760
1761
1762
1763
1764
1765