ts3_clientquery_lib/
connection.rs

1use crate::error::{QueryError, QueryResult};
2use crate::types::QueryStatus;
3use crate::FromQueryString;
4#[cfg(feature = "log")]
5use log::{error, trace};
6use tokio::io::{AsyncReadExt, AsyncWriteExt};
7use tokio::net::TcpStream;
8pub const BUFFER_SIZE: usize = 512;
9
10pub(crate) fn decode_status(content: String) -> QueryResult<String> {
11    for line in content.lines() {
12        if line.trim().starts_with("error ") {
13            let status = QueryStatus::try_from(line)?;
14
15            return status.into_result(content);
16        }
17    }
18    Err(QueryError::static_empty_response())
19}
20
21pub(crate) fn decode_status_with_result<T: FromQueryString + Sized>(
22    data: String,
23) -> QueryResult<Option<Vec<T>>> {
24    let content = decode_status(data)?;
25
26    for line in content.lines() {
27        if !line.starts_with("error ") {
28            let mut v = Vec::new();
29            for element in line.split('|') {
30                v.push(T::from_query(element)?);
31            }
32            return Ok(Some(v));
33        }
34    }
35    Ok(None)
36}
37
38pub(crate) async fn read_data(conn: &mut TcpStream) -> Result<Option<String>, tokio::io::Error> {
39    let mut buffer = [0u8; BUFFER_SIZE];
40    let mut ret = String::new();
41    loop {
42        let size = if let Ok(data) =
43            tokio::time::timeout(std::time::Duration::from_secs(2), conn.read(&mut buffer)).await
44        {
45            match data {
46                Ok(size) => size,
47                Err(e) => return Err(e),
48            }
49        } else {
50            return Ok(None);
51        };
52
53        ret.push_str(&String::from_utf8_lossy(&buffer[..size]));
54        if size < BUFFER_SIZE || (ret.contains("error id=") && ret.ends_with("\n\r")) {
55            break;
56        }
57    }
58    #[cfg(feature = "log")]
59    trace!("receive => {:?}", &ret);
60    Ok(Some(ret))
61}
62
63pub(crate) async fn write_data(
64    conn: &mut TcpStream,
65    payload: &str,
66) -> Result<(), tokio::io::Error> {
67    debug_assert!(payload.ends_with("\n\r"));
68    #[cfg(feature = "log")]
69    trace!("send => {:?}", payload);
70    conn.write(payload.as_bytes()).await.map(|size| {
71        #[cfg(feature = "log")]
72        if size != payload.as_bytes().len() {
73            error!(
74                "Error payload size mismatch! expect {} but {} found. payload: {:?}",
75                payload.as_bytes().len(),
76                size,
77                payload
78            )
79        }
80    })?;
81    Ok(())
82}
83
84pub(crate) async fn write_and_read(conn: &mut TcpStream, payload: &str) -> QueryResult<String> {
85    write_data(conn, payload).await?;
86    read_data(conn)
87        .await?
88        .ok_or_else(QueryError::except_data_not_found)
89}
90
91pub(crate) async fn basic_operation(conn: &mut TcpStream, payload: &str) -> QueryResult<()> {
92    let data = write_and_read(conn, payload).await?;
93    decode_status(data).map(|_| ())
94}
95
96pub(crate) async fn query_operation_non_error<T: FromQueryString + Sized>(
97    conn: &mut TcpStream,
98    payload: &str,
99) -> QueryResult<Vec<T>> {
100    let data = write_and_read(conn, payload).await?;
101    let ret = decode_status_with_result(data)?;
102    Ok(ret
103        .ok_or_else(|| panic!("Can't find result line, payload => {}", payload))
104        .unwrap())
105}
106
107pub(crate) async fn query_operation<T: FromQueryString + Sized>(
108    conn: &mut TcpStream,
109    payload: &str,
110) -> QueryResult<Option<Vec<T>>> {
111    let data = write_and_read(conn, payload).await?;
112    decode_status_with_result(data)
113    //let status = status.ok_or_else(|| anyhow!("Can't find status line."))?;
114}
115
116pub(crate) async fn query_one_operation<T: FromQueryString + Sized>(
117    conn: &mut TcpStream,
118    payload: &str,
119) -> QueryResult<Option<T>> {
120    query_operation(conn, payload)
121        .await
122        .map(|r| r.map(|mut v| v.swap_remove(0)))
123}
124
125pub(crate) async fn query_one_non_error<T: FromQueryString + Sized>(
126    conn: &mut TcpStream,
127    payload: &str,
128) -> QueryResult<T> {
129    query_operation_non_error(conn, payload)
130        .await
131        .map(|mut v| v.swap_remove(0))
132}