use snafu::{ResultExt, Snafu};
use std::collections::HashMap;
use std::io::{self, BufRead, BufReader, Write};
use std::net::{Shutdown, TcpStream, ToSocketAddrs};
use std::string::FromUtf8Error;
use std::time::Duration;
pub mod raw;
use raw::*;
#[derive(Snafu, Debug)]
pub enum Ts3Error {
#[snafu(display("Input was invalid UTF-8: {}", source))]
Utf8Error { source: FromUtf8Error },
#[snafu(display("IO Error: {}{}, kind: {:?}", context, source,source.kind()))]
Io {
context: &'static str,
source: io::Error,
},
#[snafu(display("Received invalid response: {}", data))]
InvalidResponse { context: &'static str, data: String },
#[snafu(display("Server error: {}", response))]
ServerError { response: ErrorResponse },
#[snafu(display("Invalid response, too many lines, DDOS limit reached: {:?}", response))]
ResponseLimit { response: Vec<String> },
}
impl From<io::Error> for Ts3Error {
fn from(error: io::Error) -> Self {
Ts3Error::Io {
context: "",
source: error,
}
}
}
#[derive(Debug)]
pub struct ErrorResponse {
pub id: usize,
pub msg: String,
}
impl std::fmt::Display for ErrorResponse {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "Error code {}, msg:{}", self.id, self.msg)
}
}
pub struct QueryClient {
rx: BufReader<TcpStream>,
tx: TcpStream,
}
const MAX_TRIES: usize = 100;
type Result<T> = ::std::result::Result<T, Ts3Error>;
impl Drop for QueryClient {
fn drop(&mut self) {
self.quit();
let _ = self.tx.shutdown(Shutdown::Both);
}
}
impl QueryClient {
pub fn new<A: ToSocketAddrs>(addr: A) -> Result<Self> {
let (rx, tx) = Self::new_inner(addr)?;
Ok(Self { rx, tx })
}
fn quit(&mut self) {
let _ = writeln!(&mut self.tx, "quit");
}
fn new_inner<A: ToSocketAddrs>(addr: A) -> Result<(BufReader<TcpStream>, TcpStream)> {
let stream = TcpStream::connect(addr).context(Io {
context: "while connecting: ",
})?;
stream
.set_write_timeout(Some(Duration::new(5, 0)))
.context(Io {
context: "setting write timeout: ",
})?;
stream
.set_read_timeout(Some(Duration::new(5, 0)))
.context(Io {
context: "setting read timeout: ",
})?;
stream.set_nodelay(true).context(Io {
context: "setting nodelay: ",
})?;
let mut reader = BufReader::new(stream.try_clone().context(Io {
context: "splitting connection: ",
})?);
let mut buffer = Vec::new();
reader.read_until(b'\r', &mut buffer).context(Io {
context: "reading response: ",
})?;
println!("{:?}", buffer);
stream
.set_read_timeout(Some(Duration::new(0, 500)))
.context(Io {
context: "setting read timeout: ",
})?;
buffer.clear();
if let Err(e) = reader.read_until(b'\r', &mut buffer) {
use std::io::ErrorKind::*;
match e.kind() {
TimedOut | WouldBlock => (),
_ => return Err(e.into()),
}
}
stream
.set_read_timeout(Some(Duration::new(20, 0)))
.context(Io {
context: "setting read timeout: ",
})?;
Ok((reader, stream))
}
pub fn raw_command(&mut self, command: &str) -> Result<Vec<String>> {
writeln!(&mut self.tx, "{}", command)?;
let v = self.read_response()?;
Ok(v)
}
pub fn whoami(&mut self) -> Result<HashMap<String, String>> {
writeln!(&mut self.tx, "whoami")?;
let v = self.read_response()?;
Ok(parse_hashmap(v, false))
}
pub fn logout(&mut self) -> Result<()> {
writeln!(&mut self.tx, "logout")?;
let _ = self.read_response()?;
Ok(())
}
pub fn login(&mut self, user: &str, password: &str) -> Result<()> {
writeln!(
&mut self.tx,
"login {} {}",
escape_arg(user),
escape_arg(password)
)?;
let _ = self.read_response()?;
Ok(())
}
pub fn select_server_by_port(&mut self, port: u16) -> Result<()> {
writeln!(&mut self.tx, "use port={}", port)?;
let _ = self.read_response()?;
Ok(())
}
pub fn create_dir(&mut self, cid: usize, path: &str) -> Result<()> {
writeln!(
&mut self.tx,
"ftcreatedir cid={} cpw= dirname={}",
cid,
escape_arg(path)
)?;
let _ = self.read_response()?;
Ok(())
}
pub fn delete_file(&mut self, cid: usize, path: &str) -> Result<()> {
writeln!(
&mut self.tx,
"ftdeletefile cid={} cpw= name={}",
cid,
escape_arg(path)
)?;
let _ = self.read_response()?;
Ok(())
}
pub fn ping(&mut self) -> Result<()> {
writeln!(&mut self.tx, "whoami")?;
let _ = self.read_response()?;
Ok(())
}
pub fn select_server_by_id(&mut self, sid: usize) -> Result<()> {
writeln!(&mut self.tx, "use sid={}", sid)?;
let _ = self.read_response()?;
Ok(())
}
pub fn server_group_del_clients(&mut self, group: usize, cldbid: &[usize]) -> Result<()> {
if cldbid.is_empty() {
return Ok(());
}
writeln!(
&mut self.tx,
"servergroupdelclient sgid={} {}",
group,
Self::format_cldbids(cldbid)
)?;
let _ = self.read_response()?;
Ok(())
}
pub fn server_group_add_clients(&mut self, group: usize, cldbid: &[usize]) -> Result<()> {
if cldbid.is_empty() {
return Ok(());
}
let v = Self::format_cldbids(cldbid);
writeln!(&mut self.tx, "servergroupaddclient sgid={} {}", group, v)?;
let _ = self.read_response()?;
Ok(())
}
fn format_cldbids(it: &[usize]) -> String {
let mut res: Vec<u8> = Vec::new();
let mut it = it.iter();
if let Some(n) = it.next() {
writeln!(res, "cldbid={}", n).unwrap();
}
for n in it {
writeln!(res, "|cldbid={}", n).unwrap();
}
unsafe {
String::from_utf8_unchecked(res)
}
}
fn read_response(&mut self) -> Result<Vec<String>> {
let mut result: Vec<String> = Vec::new();
for _ in 0..MAX_TRIES {
let mut buffer = Vec::with_capacity(20);
while {
self.rx.read_until(b'\r', &mut buffer).context(Io {
context: "reading response: ",
})?;
buffer.get(buffer.len() - 2).map_or(true, |v| *v != b'\n')
} {}
buffer.pop();
buffer.pop();
let buffer = String::from_utf8(buffer).context(Utf8Error)?;
if buffer.starts_with("error ") {
Self::check_ok(&buffer)?;
return Ok(result);
}
result.push(buffer);
}
Err(Ts3Error::ResponseLimit { response: result })
}
pub fn get_servergroup_client_list(&mut self, server_group: usize) -> Result<Vec<usize>> {
writeln!(&mut self.tx, "servergroupclientlist sgid={}", server_group)?;
let resp = self.read_response()?;
if let Some(line) = resp.get(0) {
let data: Vec<usize> = line
.split('|')
.map(|e| {
if let Some(cldbid) = e.split('=').collect::<Vec<_>>().get(1) {
Ok(cldbid
.parse::<usize>()
.map_err(|_| Ts3Error::InvalidResponse {
context: "expected usize, got ",
data: line.to_string(),
})?)
} else {
Err(Ts3Error::InvalidResponse {
context: "expected data of cldbid=1, got ",
data: line.to_string(),
})
}
})
.collect::<Result<Vec<usize>>>()?;
Ok(data)
} else {
Ok(Vec::new())
}
}
fn check_ok(msg: &str) -> Result<()> {
let result: Vec<&str> = msg.split(' ').collect();
#[cfg(debug)]
{
assert_eq!(
"check_ok invoked on non-error line",
result.get(0),
Some(&"error")
);
}
if let (Some(id), Some(msg)) = (result.get(1), result.get(2)) {
let split_id: Vec<&str> = id.split('=').collect();
let split_msg: Vec<&str> = msg.split('=').collect();
if let (Some(id), Some(msg)) = (split_id.get(1), split_msg.get(1)) {
let id = id.parse::<usize>().map_err(|_| Ts3Error::InvalidResponse {
context: "expected usize, got ",
data: msg.to_string(),
})?;
if id != 0 {
return Err(Ts3Error::ServerError {
response: ErrorResponse {
id,
msg: unescape_val(*msg),
},
});
} else {
return Ok(());
}
}
}
Err(Ts3Error::InvalidResponse {
context: "expected id and msg, got ",
data: msg.to_string(),
})
}
}