ts3_clientquery_lib/
connection.rs1use crate::error::{QueryError, QueryResult};
2use crate::types::QueryStatus;
3use crate::FromQueryString;
4#[cfg(feature = "log")]
5use log::{error, trace};
6use tokio::io::{AsyncReadExt, AsyncWriteExt};
7use tokio::net::TcpStream;
8pub const BUFFER_SIZE: usize = 512;
9
10pub(crate) fn decode_status(content: String) -> QueryResult<String> {
11 for line in content.lines() {
12 if line.trim().starts_with("error ") {
13 let status = QueryStatus::try_from(line)?;
14
15 return status.into_result(content);
16 }
17 }
18 Err(QueryError::static_empty_response())
19}
20
21pub(crate) fn decode_status_with_result<T: FromQueryString + Sized>(
22 data: String,
23) -> QueryResult<Option<Vec<T>>> {
24 let content = decode_status(data)?;
25
26 for line in content.lines() {
27 if !line.starts_with("error ") {
28 let mut v = Vec::new();
29 for element in line.split('|') {
30 v.push(T::from_query(element)?);
31 }
32 return Ok(Some(v));
33 }
34 }
35 Ok(None)
36}
37
38pub(crate) async fn read_data(conn: &mut TcpStream) -> Result<Option<String>, tokio::io::Error> {
39 let mut buffer = [0u8; BUFFER_SIZE];
40 let mut ret = String::new();
41 loop {
42 let size = if let Ok(data) =
43 tokio::time::timeout(std::time::Duration::from_secs(2), conn.read(&mut buffer)).await
44 {
45 match data {
46 Ok(size) => size,
47 Err(e) => return Err(e),
48 }
49 } else {
50 return Ok(None);
51 };
52
53 ret.push_str(&String::from_utf8_lossy(&buffer[..size]));
54 if size < BUFFER_SIZE || (ret.contains("error id=") && ret.ends_with("\n\r")) {
55 break;
56 }
57 }
58 #[cfg(feature = "log")]
59 trace!("receive => {:?}", &ret);
60 Ok(Some(ret))
61}
62
63pub(crate) async fn write_data(
64 conn: &mut TcpStream,
65 payload: &str,
66) -> Result<(), tokio::io::Error> {
67 debug_assert!(payload.ends_with("\n\r"));
68 #[cfg(feature = "log")]
69 trace!("send => {:?}", payload);
70 conn.write(payload.as_bytes()).await.map(|size| {
71 #[cfg(feature = "log")]
72 if size != payload.as_bytes().len() {
73 error!(
74 "Error payload size mismatch! expect {} but {} found. payload: {:?}",
75 payload.as_bytes().len(),
76 size,
77 payload
78 )
79 }
80 })?;
81 Ok(())
82}
83
84pub(crate) async fn write_and_read(conn: &mut TcpStream, payload: &str) -> QueryResult<String> {
85 write_data(conn, payload).await?;
86 read_data(conn)
87 .await?
88 .ok_or_else(QueryError::except_data_not_found)
89}
90
91pub(crate) async fn basic_operation(conn: &mut TcpStream, payload: &str) -> QueryResult<()> {
92 let data = write_and_read(conn, payload).await?;
93 decode_status(data).map(|_| ())
94}
95
96pub(crate) async fn query_operation_non_error<T: FromQueryString + Sized>(
97 conn: &mut TcpStream,
98 payload: &str,
99) -> QueryResult<Vec<T>> {
100 let data = write_and_read(conn, payload).await?;
101 let ret = decode_status_with_result(data)?;
102 Ok(ret
103 .ok_or_else(|| panic!("Can't find result line, payload => {}", payload))
104 .unwrap())
105}
106
107pub(crate) async fn query_operation<T: FromQueryString + Sized>(
108 conn: &mut TcpStream,
109 payload: &str,
110) -> QueryResult<Option<Vec<T>>> {
111 let data = write_and_read(conn, payload).await?;
112 decode_status_with_result(data)
113 }
115
116pub(crate) async fn query_one_operation<T: FromQueryString + Sized>(
117 conn: &mut TcpStream,
118 payload: &str,
119) -> QueryResult<Option<T>> {
120 query_operation(conn, payload)
121 .await
122 .map(|r| r.map(|mut v| v.swap_remove(0)))
123}
124
125pub(crate) async fn query_one_non_error<T: FromQueryString + Sized>(
126 conn: &mut TcpStream,
127 payload: &str,
128) -> QueryResult<T> {
129 query_operation_non_error(conn, payload)
130 .await
131 .map(|mut v| v.swap_remove(0))
132}