use std::collections::HashMap;
use std::net::SocketAddr;
use log::*;
use tokio::runtime::Runtime;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc::{self, UnboundedSender};
use crate::proto::*;
pub fn start(config: ServerConfig) {
let mut rt = Runtime::new().expect("failed to create runtime");
let _ = rt.block_on(start_future(config));
}
pub async fn start_future(config: ServerConfig) -> Result<(), ProcessError> {
let mut listener = TcpListener::bind(config.host.clone()).await?;
let (server_tx, mut server_rx) = mpsc::unbounded_channel();
tokio::spawn(async move {
let mut clients = HashMap::new();
loop {
let msg = server_rx.recv().await.expect("ServerMsg receive failed");
match msg {
ServerMsg::AddClient(addr, net_addr, tx) => {
let client = Client {
net_addr,
tx
};
clients.insert(addr, client);
}
ServerMsg::SendUnit(addr, stream_unit) => {
match clients.get_mut(&addr) {
Some(client) => {
match client.tx.send(stream_unit) {
Ok(()) => {}
Err(_) => panic!("ServerMsg::SendArray processing failed - send error")
}
}
None => error!("no client for send stream unit {}", addr)
}
}
ServerMsg::RemoveClient(addr) => {
let _ = clients.remove(&addr);
}
}
}
});
let mut client_states = HashMap::new();
loop {
let (mut stream, client_net_addr) = listener.accept().await?;
info!("new connection from {}", client_net_addr);
let config = config.clone();
let server_tx = server_tx.clone();
match auth_stream(&mut stream, client_net_addr, &config).await {
Ok(addr) => {
info!("stream from {} authorized as {}", client_net_addr, addr);
if !client_states.contains_key(&addr) {
client_states.insert(addr.clone(), ClientState::new());
}
match client_states.get_mut(&addr) {
Some(client_state) => {
if !client_state.has_writer {
client_state.has_writer = true;
tokio::spawn(async move {
let res = process_write_stream(addr.clone(), &mut stream, client_net_addr, server_tx).await;
error!("{} write process ended, {:?}", addr, res);
});
} else {
client_state.has_writer = false;
tokio::spawn(async move {
let res = process_read_stream(addr.clone(), stream, client_net_addr, server_tx).await;
error!("{} read process ended, {:?}", addr, res);
});
}
}
None => error!("failed to get client state for {} stream from {}", addr, client_net_addr)
}
}
Err(e) => error!("failed to authorize stream from {}, {:?}", client_net_addr, e)
}
}
}
struct ClientState {
has_writer: bool
}
impl ClientState {
pub fn new() -> ClientState {
ClientState {
has_writer: false
}
}
}
async fn auth_stream(stream: &mut TcpStream, _client_net_addr: SocketAddr, _config: &ServerConfig) -> Result<String, ProcessError> {
let mut state = State::new("Server".to_owned());
let mut stream_layouts = HashMap::new();
let auth_stream_layout;
loop {
match read(&mut state, stream).await? {
ReadResult::MsgMeta(stream_id, msg_meta, _) => {
stream_layouts.insert(stream_id, StreamLayout {
id: stream_id,
msg_meta,
payload: vec![],
attachments_data: vec![]
});
}
ReadResult::PayloadData(stream_id, n, buf) => {
let stream_layout = stream_layouts.get_mut(&stream_id).ok_or(ProcessError::StreamLayoutNotFound)?;
stream_layout.payload.extend_from_slice(&buf[..n]);
}
ReadResult::PayloadFinished(stream_id, n, buf) => {
let stream_layout = stream_layouts.get_mut(&stream_id).ok_or(ProcessError::StreamLayoutNotFound)?;
stream_layout.payload.extend_from_slice(&buf[..n]);
}
ReadResult::AttachmentData(stream_id, _, n, buf) => {
let stream_layout = stream_layouts.get_mut(&stream_id).ok_or(ProcessError::StreamLayoutNotFound)?;
stream_layout.attachments_data.extend_from_slice(&buf[..n]);
}
ReadResult::AttachmentFinished(stream_id, _, n, buf) => {
let stream_layout = stream_layouts.get_mut(&stream_id).ok_or(ProcessError::StreamLayoutNotFound)?;
stream_layout.attachments_data.extend_from_slice(&buf[..n]);
}
ReadResult::MessageFinished(stream_id, finish_bytes) => {
let stream_layout = stream_layouts.get_mut(&stream_id).ok_or(ProcessError::StreamLayoutNotFound)?;
match finish_bytes {
MessageFinishBytes::Payload(n, buf) => {
stream_layout.payload.extend_from_slice(&buf[..n]);
}
MessageFinishBytes::Attachment(_, n, buf) => {
stream_layout.attachments_data.extend_from_slice(&buf[..n]);
}
}
auth_stream_layout = stream_layouts.remove(&stream_id);
break;
}
ReadResult::MessageAborted(stream_id) => {
match stream_id {
Some(stream_id) => {
let _ = stream_layouts.remove(&stream_id);
}
None => {}
}
}
}
}
let auth_stream_layout = auth_stream_layout.ok_or(ProcessError::AuthStreamLayoutIsEmpty)?;
Ok(auth_stream_layout.msg_meta.tx)
}
async fn process_read_stream(addr: String, mut stream: TcpStream, client_net_addr: SocketAddr, server_tx: UnboundedSender<ServerMsg>) -> Result<(), ProcessError> {
let mut _state = State::new("write stream from Server to ".to_owned() + &addr);
let (client_tx, client_rx) = mpsc::unbounded_channel();
server_tx.send(ServerMsg::AddClient(addr.clone(), client_net_addr, client_tx))?;
write_loop(addr, client_rx, &mut stream).await
}
async fn process_write_stream(addr: String, stream: &mut TcpStream, _client_net_addr: SocketAddr, server_tx: UnboundedSender<ServerMsg>) -> Result<(), ProcessError> {
let mut state = State::new("read stream from Server to ".to_owned() + &addr);
let mut client_addrs = HashMap::new();
loop {
match read(&mut state, stream).await? {
ReadResult::MsgMeta(stream_id, msg_meta, buf) => {
client_addrs.insert(stream_id, msg_meta.rx.clone());
server_tx.send(ServerMsg::SendUnit(msg_meta.rx.clone(), StreamUnit::Vector(stream_id, buf)))?;
}
ReadResult::PayloadData(stream_id, n, buf) => {
let client_addr = client_addrs.get(&stream_id).ok_or(ProcessError::ClientAddrNotFound)?;
server_tx.send(ServerMsg::SendUnit(client_addr.clone(), StreamUnit::Array(stream_id, n, buf)))?;
}
ReadResult::PayloadFinished(stream_id, n, buf) => {
let client_addr = client_addrs.get(&stream_id).ok_or(ProcessError::ClientAddrNotFound)?;
server_tx.send(ServerMsg::SendUnit(client_addr.clone(), StreamUnit::Array(stream_id, n, buf)))?;
}
ReadResult::AttachmentData(stream_id, _, n, buf) => {
let client_addr = client_addrs.get(&stream_id).ok_or(ProcessError::ClientAddrNotFound)?;
server_tx.send(ServerMsg::SendUnit(client_addr.clone(), StreamUnit::Array(stream_id, n, buf)))?;
}
ReadResult::AttachmentFinished(stream_id, _, n, buf) => {
let client_addr = client_addrs.get(&stream_id).ok_or(ProcessError::ClientAddrNotFound)?;
server_tx.send(ServerMsg::SendUnit(client_addr.clone(), StreamUnit::Array(stream_id, n, buf)))?;
}
ReadResult::MessageFinished(stream_id, finish_bytes) => {
let client_addr = client_addrs.remove(&stream_id).ok_or(ProcessError::ClientAddrNotFound)?;
match finish_bytes {
MessageFinishBytes::Payload(n, buf) => {
server_tx.send(ServerMsg::SendUnit(client_addr.clone(), StreamUnit::Array(stream_id, n, buf)))?;
}
MessageFinishBytes::Attachment(_, n, buf) => {
server_tx.send(ServerMsg::SendUnit(client_addr.clone(), StreamUnit::Array(stream_id, n, buf)))?;
}
}
}
ReadResult::MessageAborted(stream_id) => {
match stream_id {
Some(stream_id) => {
let _ = client_addrs.remove(&stream_id).ok_or(ProcessError::ClientAddrNotFound)?;
}
None => {}
}
}
}
}
}