use crate::connection::Connection;
use crate::router::Router;
use crate::server::{UpstreamConfig, VarDiffConfig};
use crate::BanManager;
pub use crate::MinerList;
use crate::{Error, Result};
use async_std::net::TcpStream;
use async_std::sync::Arc;
use async_tungstenite::tungstenite::protocol::Message;
use async_tungstenite::WebSocketStream;
use futures::channel::mpsc::{unbounded, UnboundedReceiver};
use futures::stream::{SplitSink, SplitStream};
use futures::SinkExt;
use futures::StreamExt;
use log::{debug, info, warn};
use serde_json::{Map, Value};
use std::net::SocketAddr;
#[allow(clippy::too_many_arguments)]
pub async fn handle_connection<
State: Clone + Send + Sync + 'static,
CState: Clone + Send + Sync + 'static,
>(
ban_manager: Arc<BanManager>,
addr: SocketAddr,
connection_list: Arc<MinerList<CState>>,
router: Arc<Router<State, CState>>,
upstream_router: Arc<Router<State, CState>>,
upstream_config: UpstreamConfig,
state: State,
stream: TcpStream,
var_diff_config: VarDiffConfig,
initial_difficulty: f64,
connection_state: CState,
_proxy: bool,
_expected_port: u16,
) {
let stream = async_tungstenite::accept_async(stream)
.await
.expect("Error during the websocket handshake occurred");
let (wh, mut rh) = stream.split();
if !ban_manager.check_banned(&addr).await {
let (tx, rx) = unbounded();
let (utx, urx) = unbounded();
let (mut urtx, urrx) = unbounded();
let connection = Arc::new(Connection::new(
addr,
tx,
utx,
urrx,
initial_difficulty,
var_diff_config,
connection_state,
));
async_std::task::spawn(async move {
match send_loop(rx, wh).await {
Ok(_) => info!("Send Loop is closing for connection"),
Err(e) => warn!("Send loop is closed for connection: {}, Reason: {}", 1, e),
}
});
connection_list
.add_miner(addr, connection.clone())
.await
.unwrap();
info!("Accepting stream from: {}", addr);
loop {
if connection.is_disconnected().await {
break;
}
let (method, values) = match next_message(&mut rh).await {
Ok((method, values)) => (method, values),
Err(_) => {
break;
}
};
router
.call(&method, values, state.clone(), connection.clone())
.await;
}
info!("Closing stream from: {}", addr);
connection_list.remove_miner(addr).await.unwrap();
if connection.needs_ban().await {
ban_manager.add_ban(&addr).await;
}
} else {
warn!(
"Banned connection attempting to connect: {}. Connected closed",
addr
);
}
}
pub async fn next_message(
rh: &mut SplitStream<WebSocketStream<TcpStream>>,
) -> Result<(String, serde_json::map::Map<String, serde_json::Value>)> {
let msg = match rh.next().await {
Some(msg) => msg.unwrap(),
None => {
return Err(Error::StreamClosed);
}
};
let raw = msg.into_text().unwrap();
debug!("Received Message: {}", &raw);
let msg: Map<String, Value> = serde_json::from_str(&raw)?;
let method = if msg.contains_key("method") {
match msg.get("method") {
Some(method) => method.as_str(),
None => return Err(Error::MethodDoesntExist),
}
} else if msg.contains_key("message") {
match msg.get("message") {
Some(method) => method.as_str(),
None => return Err(Error::MethodDoesntExist),
}
} else {
Some("")
};
if let Some(method_string) = method {
Ok((method_string.to_owned(), msg))
} else {
Err(Error::MethodDoesntExist)
}
}
pub async fn send_loop(
mut rx: UnboundedReceiver<String>,
mut wh: SplitSink<WebSocketStream<TcpStream>, Message>,
) -> Result<()> {
while let Some(msg) = rx.next().await {
wh.send(Message::Text(msg)).await?;
}
wh.send(Message::Close(None)).await?;
Ok(())
}