use crate::error::Error;
use futures::try_ready;
use resol_vbus::BlobBuffer;
use tokio::net::TcpStream;
use tokio::prelude::*;
#[derive(Debug)]
pub struct TcpClientHandshake {
socket: TcpStream,
buf: BlobBuffer,
}
impl TcpClientHandshake {
pub fn start(socket: TcpStream) -> impl Future<Item = Self, Error = Error> {
let hs = TcpClientHandshake {
socket,
buf: BlobBuffer::new(),
};
hs.read_reply()
}
pub fn into_inner(self) -> TcpStream {
self.socket
}
fn read_reply(self) -> impl Future<Item = Self, Error = Error> {
let mut hs = Some(self);
future::poll_fn(move || {
let first_byte = loop {
let hs = hs.as_mut().unwrap();
if let Some(idx) = hs.buf.iter().position(|b| *b == 10) {
let first_byte = hs.buf[0];
hs.buf.consume(idx + 1);
break first_byte;
}
let mut buf = [0u8; 256];
let len = try_ready!(hs.socket.poll_read(&mut buf));
if len == 0 {
return Err(Error::new("Reached EOF"));
}
hs.buf.extend_from_slice(&buf[0..len]);
};
if first_byte == b'+' {
Ok(Async::Ready(hs.take().unwrap()))
} else if first_byte == b'-' {
Err(Error::new("Negative reply"))
} else {
Err(Error::new("Unexpected reply"))
}
})
}
fn send_command(
self,
cmd: &str,
args: Option<&str>,
) -> impl Future<Item = Self, Error = Error> {
let mut hs = Some(self);
let cmd = match args {
Some(ref args) => format!("{} {}\r\n", cmd, args),
None => format!("{}\r\n", cmd),
};
let mut idx = 0;
future::poll_fn(move || {
let bytes = cmd.as_bytes();
loop {
let hs = hs.as_mut().unwrap();
if idx < bytes.len() {
let len = try_ready!(hs.socket.poll_write(&bytes[idx..]));
if len == 0 {
return Err(Error::new("Reached EOF"));
}
idx += len;
} else {
break;
}
}
Ok(Async::Ready(hs.take().unwrap()))
})
.and_then(|hs| hs.read_reply())
}
pub fn send_connect_command(self, via_tag: &str) -> impl Future<Item = Self, Error = Error> {
self.send_command("CONNECT", Some(via_tag))
}
pub fn send_pass_command(self, password: &str) -> impl Future<Item = Self, Error = Error> {
self.send_command("PASS", Some(password))
}
pub fn send_channel_command(self, channel: u8) -> impl Future<Item = Self, Error = Error> {
self.send_command("CHANNEL", Some(&format!("{}", channel)))
}
pub fn send_data_command(self) -> impl Future<Item = TcpStream, Error = Error> {
self.send_command("DATA", None).map(|hs| hs.into_inner())
}
pub fn send_quit_command(self) -> impl Future<Item = (), Error = Error> {
self.send_command("QUIT", None).map(|_| ())
}
}
#[cfg(test)]
mod tests {
use std::net::Shutdown;
use crate::{error::Result, tcp_server_handshake::TcpServerHandshake};
use tokio::net::TcpListener;
use super::*;
fn wait_for_close(mut socket: TcpStream) -> impl Future<Item = (), Error = Error> {
future::poll_fn(move || {
let mut buf = [0; 256];
let len = try_ready!(socket.poll_read(&mut buf));
if len != 0 {
Err(Error::new(format!("Read {} bytes...", len)))
} else {
Ok(Async::Ready(()))
}
})
}
#[test]
fn test() -> Result<()> {
let addr = "127.0.0.1:0".parse()?;
let mut listener = TcpListener::bind(&addr)?;
let addr = listener.local_addr()?;
let handler = future::lazy(move || {
let server = future::poll_fn(move || {
let (socket, _addr) = try_ready!(listener.poll_accept());
Ok(Async::Ready(socket))
})
.map_err(|err: std::io::Error| {
panic!("{}", err);
})
.and_then(|socket| TcpServerHandshake::start(socket))
.and_then(|hs| hs.receive_connect_command())
.and_then(|(hs, via_tag)| {
assert_eq!("via_tag", via_tag);
hs.receive_pass_command()
})
.and_then(|(hs, password)| {
assert_eq!("password", password);
hs.receive_channel_command()
})
.and_then(|(hs, channel)| {
assert_eq!(123, channel);
hs.receive_data_command()
})
.and_then(|socket| {
socket
.shutdown(Shutdown::Write)
.expect("Unable to shutdown server");
wait_for_close(socket)
})
.map_err(|err| panic!("Server error: {}", err));
let client = TcpStream::connect(&addr)
.map_err(|err| Error::new(err))
.and_then(|socket| TcpClientHandshake::start(socket))
.and_then(|hs| hs.send_connect_command("via_tag"))
.and_then(|hs| hs.send_pass_command("password"))
.and_then(|hs| hs.send_channel_command(123))
.and_then(|hs| hs.send_data_command())
.and_then(|socket| {
socket
.shutdown(Shutdown::Write)
.expect("Unable to shutdown client");
wait_for_close(socket)
})
.map_err(|err| {
panic!("Client error: {}", err);
});
tokio::spawn(server);
tokio::spawn(client);
Ok(())
});
println!("Starting runtime...");
tokio::run(handler);
println!("Runtime ended");
Ok(())
}
}