use {
super::{chain_notification, connection::Websocket},
crate::{
dcrjson::{commands, result_types::JsonResponse},
rpcclient::{connection, constants, infrastructure},
},
futures_util::{
stream::{SplitSink, SplitStream, StreamExt},
SinkExt,
},
log::{debug, info, trace, warn},
std::{
collections::{HashMap, VecDeque},
sync::Arc,
},
tokio::{
sync::{mpsc, Mutex, RwLock},
time,
},
tokio_tungstenite::{tungstenite, tungstenite::Error as WSError, tungstenite::Message},
};
pub struct Command {
pub id: u64,
pub user_channel: mpsc::Sender<JsonResponse>,
pub rpc_message: Vec<u8>,
}
pub(super) async fn handle_websocket_in(
send_rcvd_websocket_msg: mpsc::UnboundedSender<Message>,
mut websocket_read: SplitStream<Websocket>,
mut websocket_read_new: mpsc::Receiver<SplitStream<Websocket>>,
signal_ws_reconnect: mpsc::Sender<()>,
) {
'outer_loop: loop {
while let Some(message) = websocket_read.next().await {
match message {
Ok(message) => {
if let Err(e) = send_rcvd_websocket_msg.send(message) {
warn!("error sending received websocket message to message handler, error: {}.
Closing websocket connection", e);
return;
}
}
Err(e) => {
match e {
WSError::ConnectionClosed | WSError::AlreadyClosed => {
info!("websocket already closed.");
return;
}
_ => {
warn!("websocket disconnected unexpectedly with error: {}, calling for reconnection", e);
break;
}
};
}
}
}
info!("reconnecting websocket");
if let Err(e) = signal_ws_reconnect.send(()).await {
warn!(
"websocket reconnection failed, error: {}. Closing websocket connection.",
e
);
return;
}
let ws = match websocket_read_new.recv().await {
Some(ws) => ws,
None => {
warn!("failed to retrieve new websocket reader. Closing websocket connection.");
break 'outer_loop;
}
};
info!("Changing websocket_read channel.");
websocket_read = ws;
}
info!("handle_websocket_in exited")
}
pub(super) async fn handle_received_message(
mut rcvd_msg_consumer: mpsc::UnboundedReceiver<Message>,
notification_handler: mpsc::Sender<JsonResponse>,
ws_disconnected_acknowledgement: mpsc::Sender<()>,
receiver_channel_id_mapper: Arc<Mutex<HashMap<u64, mpsc::Sender<JsonResponse>>>>,
) {
while let Some(message) = rcvd_msg_consumer.recv().await {
let json_content: JsonResponse = match message {
Message::Binary(m) => match serde_json::from_slice(&m) {
Ok(m) => m,
Err(e) => {
warn!(
"Error unmarshalling binary result, error: {}. \n Message: {:?}",
e,
std::str::from_utf8(&m)
);
continue;
}
},
Message::Text(m) => match serde_json::from_str(&m) {
Ok(m) => m,
Err(e) => {
warn!(
"Error unmarshalling string result, error: {}. \n Message: {}",
e, m
);
continue;
}
},
Message::Close(_) => {
info!("Received close message from server, closing now.");
match ws_disconnected_acknowledgement.send(()).await {
Ok(_) => {
info!("websocket connection closed successfully",);
}
Err(e) => {
warn!("error sending websocket disconnect acknowledgement to client, error: {}", e);
}
};
return;
}
Message::Pong(_) => {
info!("Received pong message from server");
continue;
}
Message::Ping(_) => {
info!("Received ping message from server");
continue;
}
};
let id = if json_content.id.is_null() {
debug!("Received a notification");
match notification_handler.send(json_content).await {
Ok(_) => {
trace!("Sent received notification to handler.");
continue;
}
Err(e) => {
warn!(
"Error sending notification message to receiver, error: {}",
e
);
continue;
}
};
} else {
let id = match json_content.id.as_u64() {
Some(id) => id,
None => {
warn!(
"Unsupported ID value type sent by RPC server, ID consist: {:?}",
json_content.id.as_str()
);
continue;
}
};
id
};
let mut receiver_channel_id_mapper = receiver_channel_id_mapper.lock().await;
match receiver_channel_id_mapper.get_mut(&id) {
Some(val) => {
match val.send(json_content).await {
Ok(_) => {}
Err(e) => {
warn!(
"Client RPC result receiver channel closed abruptly, error: {}. ID is {}",
e, id,
);
}
};
}
None => warn!("Could not retrieve senders request channel from map"),
};
}
info!("handle_received_message exited");
}
pub(super) async fn ws_write_middleman(
mut user_command: mpsc::Receiver<Command>,
request_queue_updated: mpsc::Sender<()>,
mut message_sent_acknowledgement: mpsc::Receiver<Result<(), Vec<u8>>>,
send_queue_command: mpsc::Sender<Vec<u8>>,
requests_queue_container: Arc<Mutex<VecDeque<Vec<u8>>>>,
receiver_channel_id_mapper: Arc<Mutex<HashMap<u64, mpsc::Sender<JsonResponse>>>>,
) {
loop {
tokio::select! {
command = user_command.recv() => {
match command {
Some(command) => {
let mut mapper = receiver_channel_id_mapper.lock().await;
if mapper.insert(command.id, command.user_channel).is_some() {
warn!("channel ID already present in map, ID: {}.", command.id);
break;
}
drop(mapper);
requests_queue_container
.lock()
.await
.push_back(command.rpc_message);
if let Some(e) = request_queue_updated.send(()).await.err() {
warn!("request_queue_updated sending channel closed, error: {}. Closing websocket connection.", e);
break;
}
}
None => {
warn!("client command receiving channel closed. Closing websocket connection.");
break;
},
}
}
ack = message_sent_acknowledgement.recv() => {
match ack {
Some(ack) => {
match ack {
Ok(_) => {
match requests_queue_container.lock().await.pop_front() {
Some(message) => {
if send_queue_command.send(message).await.is_err() {
warn!("Error sending message queue to websocket writer");
break;
}
}
None => info!("queue empty. All commands have been sent to server."),
};
}
Err(message) => {
requests_queue_container.lock().await.push_front(message);
if request_queue_updated.send(()).await.is_err() {
warn!("Request queue updated sending channel closed abruptly");
break;
}
}
}
}
None => {
warn!("message sent acknowledgement channel receiver closed");
break;
}
}
}
}
}
info!("ws_write_middleman exited")
}
pub(super) async fn handle_websocket_out(
mut ws_sender: mpsc::Sender<Message>,
mut ws_sender_new: mpsc::Receiver<mpsc::Sender<Message>>,
mut queue_command: mpsc::Receiver<Vec<u8>>,
message_sent_acknowledgement: mpsc::Sender<Result<(), Vec<u8>>>,
mut request_queue_updated: mpsc::Receiver<()>,
mut disconnect_cmd_rcv: mpsc::Receiver<()>,
) {
let send_ack = |msg_ack: mpsc::Sender<Result<(), Vec<u8>>>| async move {
match msg_ack.send(Ok(())).await {
Ok(_) => {}
Err(e) => warn!("error sending websocket open acknowledgement, error: {}", e),
};
};
send_ack(message_sent_acknowledgement.clone()).await;
let mut ping_sender = ws_sender.clone();
loop {
tokio::select! {
disconnect = disconnect_cmd_rcv.recv() => {
match disconnect {
Some(_) => {
let close_message = Message::Close(Some(tungstenite::protocol::CloseFrame {
code: tungstenite::protocol::frame::coding::CloseCode::Normal,
reason: "".into(),
}));
match ws_sender.send(close_message).await{
Ok(_) => {
info!("Websocket close message sent successfully to server");
},
Err(e) => {
warn!(
"Error sending close message to websocket, error: {}",
e
);
}
};
}
None => {
warn!("Websocket disconnect channel receiver closed abruptly");
}
}
break;
}
_ = time::sleep(tokio::time::Duration::from_secs(constants::KEEP_ALIVE)) => {
debug!("Sending keep alive ping to websocket server");
match ping_sender.send(Message::Ping(Vec::new())).await {
Ok(_) => {
continue;
},
Err(e) => warn!("Error sending ping message, error: {}", e),
};
}
e = request_queue_updated.recv() => {
match e {
Some(_) => send_ack(message_sent_acknowledgement.clone()).await,
None => {
warn!("request_queue_update receiver channel closed abruptly");
break;
}
}
}
new_ws = ws_sender_new.recv() => {
match new_ws {
Some(new_ws)=>{
ping_sender = new_ws.clone();
ws_sender = new_ws;
info!("Websocket reconnected");
continue;
}
None => {
warn!("New websocket sender channel closed abruptly. Closing connection.");
break;
}
}
}
msg = queue_command.recv() => {
match msg {
Some(msg) => if let Err(e) = ws_sender.send(Message::Binary(msg)).await {
match message_sent_acknowledgement.send(Err(e.0.into_data())).await {
Ok(_) => continue,
Err(e) => {
warn!(
"Error sending message sent acknowledgement error to websocket, error: {}. Closing websocket connection.",
e
);
break;
}
}
},
None => {
warn!("command queue receiver closed abruptly, closing websocket connection.");
break;
}
}
}
}
}
}
pub(super) async fn get_ws_sink(
mut sink: mpsc::Receiver<Message>,
mut ws_sender: SplitSink<Websocket, Message>,
ack: mpsc::Sender<Result<(), Vec<u8>>>,
) {
tokio::spawn(async move {
while let Some(msg) = sink.recv().await {
if let Err(e) = ws_sender.send(msg.clone()).await {
warn!("websocket sender dropped: {}", e);
ack.send(Err(msg.into_data())).await.ok();
return;
};
}
});
}
#[allow(clippy::too_many_arguments)]
pub(super) async fn ws_reconnect_handler<F>(
mut conn: impl connection::RPCConn,
is_ws_disconnected: Arc<RwLock<bool>>,
mut ws_reconnect_signal: mpsc::Receiver<()>,
websocket_read_new: mpsc::Sender<SplitStream<Websocket>>,
ws_writer_new: mpsc::Sender<mpsc::Sender<Message>>,
notification_state: Arc<RwLock<HashMap<String, u64>>>,
message_sent_acknowledgement: mpsc::Sender<Result<(), Vec<u8>>>,
on_reconnect: F,
) where
F: Fn(),
{
while ws_reconnect_signal.recv().await.is_some() {
info!("reconnecting websocket connection.");
let is_ws_disconnected_clone = is_ws_disconnected.read().await;
if *is_ws_disconnected_clone {
info!("Websocket disconnected by client.");
break;
}
drop(is_ws_disconnected_clone);
let mut backoff = std::time::Duration::new(0, 0);
if conn.disable_auto_reconnect() {
info!("Websocket reconnect disabled. Dropping all websocket handler.");
let mut is_ws_disconnected_clone = is_ws_disconnected.write().await;
*is_ws_disconnected_clone = true;
break;
}
loop {
backoff += crate::rpcclient::constants::CONNECTION_RETRY_INTERVAL_SECS;
let (ws_rcv, ws_writer) = match conn.ws_split_stream().await {
Ok(ws) => ws,
Err(e) => {
warn!("unable to reconnect websocket, error: {}. Reconnecting.", e);
std::thread::sleep(backoff);
continue;
}
};
let (writer, rcvr) = mpsc::channel(1);
let message_sent_acknowledgement = message_sent_acknowledgement.clone();
infrastructure::get_ws_sink(rcvr, ws_writer, message_sent_acknowledgement).await;
let notification_state_clone = notification_state.read().await;
for iter in notification_state_clone.clone().into_iter() {
debug!("Registering {} notification on reconnection.", iter.0);
let data = format!(
"{{ \"jsonrpc\": \"1.0\", \"method\":\"{}\", \"params\":[], \"id\":{} }}",
iter.0, iter.1
);
trace!(
"Registering notification on reconnection, notification: {}",
iter.0
);
if let Err(e) = writer.send(Message::Text(data)).await {
warn!(
"Error registering notification on reconnection, error: {}",
e
);
}
}
trace!("Reconnection websocket message reader");
if let Err(e) = websocket_read_new.send(ws_rcv).await {
warn!(
"websocket reconnect handler closed on sending new websocket_read channel, error: {}",
e
);
break;
}
trace!("Reconnection websocket message writer");
if let Err(e) = ws_writer_new.send(writer).await {
warn!(
"websocket reconnect handler closed on sending new ws_writer send, error: {}",
e
);
break;
}
break;
}
on_reconnect();
}
info!("_ws_reconnect_handler exited")
}
pub(super) async fn handle_notification(
mut channel_recv: mpsc::Receiver<JsonResponse>,
notif: Arc<super::notify::NotificationHandlers>,
) {
while let Some(msg) = channel_recv.recv().await {
info!("Received notification");
if msg.params.is_empty() {
warn!("server sent an invalid notification result: {:?}", msg);
continue;
}
match msg.method.as_str() {
Some(method) => match method {
commands::NOTIFICATION_METHOD_BLOCK_CONNECTED => match notif.on_block_connected {
Some(e) => chain_notification::on_block_connected(&msg.params, e),
None => {
warn!("On block connected notification callback not registered.");
continue;
}
},
commands::NOTIFICATION_METHOD_BLOCK_DISCONNECTED => {
match notif.on_block_disconnected {
Some(e) => chain_notification::on_block_disconnected(&msg.params, e),
None => {
warn!("On block disconnected notification callback not registered.");
continue;
}
}
}
commands::NOTIFICATION_METHOD_WORK => match notif.on_work {
Some(e) => chain_notification::on_work(&msg.params, e),
None => {
warn!("On work notification callback not registered.");
continue;
}
},
commands::NOTIFICATION_METHOD_NEW_TICKETS => match notif.on_new_tickets {
Some(e) => chain_notification::on_new_tickets(&msg.params, e),
None => {
warn!("On new tickets notification callback not registered.");
continue;
}
},
commands::NOTIFICATION_METHOD_TX_ACCEPTED => match notif.on_tx_accepted {
Some(e) => chain_notification::on_tx_accepted(&msg.params, e),
None => {
warn!("On transaction accepted notification callback not registered.");
continue;
}
},
commands::NOTIFICATION_METHOD_TX_ACCEPTED_VERBOSE => {
match notif.on_tx_accepted_verbose {
Some(e) => chain_notification::on_tx_accepted_verbose(&msg.params, e),
None => {
warn!("On transaction accepted verbose notification callback not registered.");
continue;
}
}
}
commands::NOTIFICATION_METHOD_REORGANIZATION => match notif.on_reorganization {
Some(e) => chain_notification::on_reorganization(&msg.params, e),
None => {
warn!("On block reorganization callback not registered.");
continue;
}
},
_ => match notif.on_unknown_notification {
Some(e) => {
e(method.to_string(), msg);
}
None => {
warn!(
"On unknown notification callback not registered. Method: {}",
method
);
continue;
}
},
},
None => {
warn!("Received a nil or unsupported method type on notify blocks.");
continue;
}
}
}
trace!("Closing notification handler.");
}