pub use crate::ConnectionList;
use crate::{
config::{UpstreamConfig, VarDiffConfig},
connection::{Connection, SendInformation},
id_manager::IDManager,
router::Router,
types::{GlobalVars, MessageValue},
BanManager, Error, Result,
};
use async_std::{net::TcpStream, prelude::FutureExt, sync::Arc};
use async_tungstenite::{tungstenite::protocol::Message, WebSocketStream};
use futures::{
channel::mpsc::{unbounded, UnboundedReceiver},
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use log::{info, trace, warn};
use serde_json::{Map, Value};
use std::net::SocketAddr;
use stop_token::future::FutureExt as stopFutureExt;
#[allow(clippy::too_many_arguments)]
pub async fn handle_connection<
State: Clone + Send + Sync + 'static,
CState: Clone + Send + Sync + 'static,
>(
id_manager: Arc<IDManager>,
ban_manager: Arc<BanManager>,
addr: SocketAddr,
connection_list: Arc<ConnectionList<CState>>,
router: Arc<Router<State, CState>>,
_upstream_router: Arc<Router<State, CState>>,
upstream_config: UpstreamConfig,
state: State,
stream: TcpStream,
var_diff_config: VarDiffConfig,
initial_difficulty: u64,
connection_state: CState,
_proxy: bool,
_expected_port: u16,
global_vars: GlobalVars,
) -> Result<()> {
let stream = async_tungstenite::accept_async(stream)
.await
.expect("Error during the websocket handshake occurred");
let (wh, rh) = stream.split();
let mut buffer_stream = rh;
if ban_manager.check_banned(&addr).await {
warn!(
"Banned connection attempting to connect: {}. Connection closed",
addr
);
return Ok(());
}
let (tx, rx) = unbounded();
let (utx, _urx) = unbounded();
let (_urtx, urrx) = unbounded();
info!(
"Upstream enabled status: {}\nUpstream URL: {}",
upstream_config.enabled, upstream_config.url
);
let connection_id = match id_manager.allocate_session_id().await {
Some(id) => id,
None => {
warn!("Sessions full");
return Ok(());
}
};
let connection = Arc::new(Connection::new(
connection_id,
tx,
utx,
urrx,
initial_difficulty,
var_diff_config,
connection_state,
));
let stop_token = connection.get_stop_token();
let id = connection.id();
async_std::task::spawn(async move {
match send_loop(rx, wh).await {
Ok(_) => trace!("Send Loop is closing for connection: {}", id),
Err(e) => warn!("Send loop is closed for connection: {}, Reason: {}", id, e),
}
});
connection_list
.add_miner(addr, connection.clone())
.await
.unwrap();
loop {
if connection.is_disconnected().await {
trace!(
"Connection: {} disconnected. Breaking out of next_message loop",
connection.id()
);
break;
}
let timeout = connection.timeout().await;
let next_message = next_message(&mut buffer_stream)
.timeout(timeout)
.timeout_at(stop_token.clone())
.await;
match next_message {
Err(e) => log::error!(
"Connection: {} error in 'next_message' (stop_token) Error: {}",
connection.id(),
e
),
Ok(msg) => {
match msg {
Err(e) => {
log::error!(
"Connection: {} error in 'next_message' (timeout fn) Error: {}",
connection.id(),
e
);
break;
}
Ok(msg) => match msg {
Err(e) => {
log::error!(
"Connection: {} error in 'next_message' (decoding/reading) Error: {}",
connection.id(), e
);
break;
}
Ok((method, values)) => {
router
.call(
&method,
values,
state.clone(),
connection.clone(),
global_vars.clone(),
)
.await;
}
},
}
}
}
}
trace!("Closing stream from: {}", connection.id());
id_manager.remove_session_id(connection_id).await;
connection_list.remove_miner(addr).await;
if connection.needs_ban().await {
ban_manager.add_ban(&addr).await;
}
connection.shutdown().await;
Ok(())
}
pub async fn next_message(
stream: &mut SplitStream<WebSocketStream<TcpStream>>,
) -> Result<(String, MessageValue)> {
let msg = match stream.next().await {
Some(msg) => msg.unwrap(),
None => {
return Err(Error::StreamClosed(String::from("Websocket closed")));
}
};
let raw = msg.into_text().unwrap();
trace!("Received Message: {}", &raw);
let msg: Map<String, Value> = match serde_json::from_str(&raw) {
Ok(msg) => msg,
Err(_) => return Err(Error::MethodDoesntExist),
};
let method = if msg.contains_key("method") {
match msg.get("method") {
Some(method) => method.as_str(),
None => return Err(Error::MethodDoesntExist),
}
} else if msg.contains_key("messsage") {
match msg.get("message") {
Some(method) => method.as_str(),
None => return Err(Error::MethodDoesntExist),
}
} else if msg.contains_key("result") {
Some("result")
} else {
Some("")
};
if let Some(method_string) = method {
Ok((method_string.to_owned(), MessageValue::StratumV1(msg)))
} else {
Err(Error::MethodDoesntExist)
}
}
pub async fn send_loop(
mut rx: UnboundedReceiver<SendInformation>,
mut rh: SplitSink<WebSocketStream<TcpStream>, Message>,
) -> Result<()> {
while let Some(msg) = rx.next().await {
match msg {
SendInformation::Json(json) => {
rh.send(Message::Text(json.as_str().to_owned())).await?;
}
SendInformation::Text(text) => rh.send(Message::Text(text)).await?,
SendInformation::Raw(buffer) => rh.send(Message::Binary(buffer.to_vec())).await?,
}
}
rh.send(Message::Close(None)).await?;
Ok(())
}