use crate::rpc::{RpcClientError, RpcResult};
use log::{debug, error, info, warn};
use std::sync::mpsc::{SendError, Sender as ThreadOut};
use ws::{CloseCode, Handler, Handshake, Message, Result as WsResult, Sender};
#[derive(Debug, PartialEq)]
pub enum XtStatus {
Finalized,
InBlock,
Broadcast,
Ready,
Future,
Error,
Unknown,
}
pub type OnMessageFn = fn(msg: Message, out: Sender, result: ThreadOut<String>) -> WsResult<()>;
pub struct RpcClient {
pub out: Sender,
pub request: String,
pub result: ThreadOut<String>,
pub on_message_fn: OnMessageFn,
}
impl Handler for RpcClient {
fn on_open(&mut self, _: Handshake) -> WsResult<()> {
info!("sending request: {}", self.request);
self.out.send(self.request.clone())?;
Ok(())
}
fn on_message(&mut self, msg: Message) -> WsResult<()> {
(self.on_message_fn)(msg, self.out.clone(), self.result.clone())
}
}
pub fn on_get_request_msg(msg: Message, out: Sender, result: ThreadOut<String>) -> WsResult<()> {
out.close(CloseCode::Normal)
.unwrap_or_else(|_| warn!("Could not close Websocket normally"));
info!("Got get_request_msg {}", msg);
let result_str = serde_json::from_str(msg.as_text()?)
.map(|v: serde_json::Value| v["result"].to_string())
.map_err(|e| Box::new(RpcClientError::Serde(e)))?;
result
.send(result_str)
.map_err(|e| Box::new(RpcClientError::Send(e)).into())
}
pub fn on_subscription_msg(msg: Message, out: Sender, result: ThreadOut<String>) -> WsResult<()> {
info!("got on_subscription_msg {}", msg);
let value: serde_json::Value =
serde_json::from_str(msg.as_text()?).map_err(|e| Box::new(RpcClientError::Serde(e)))?;
match value["id"].as_str() {
Some(_idstr) => {}
_ => {
debug!("no id field found in response. must be subscription");
debug!("method: {:?}", value["method"].as_str());
match value["method"].as_str() {
Some("state_storage") => {
let changes = &value["params"]["result"]["changes"];
match changes[0][1].as_str() {
Some(change_set) => {
if let Err(SendError(e)) = result.send(change_set.to_owned()) {
debug!("SendError: {}. will close ws", e);
out.close(CloseCode::Normal)?;
}
}
None => println!("No events happened"),
};
}
Some("chain_finalizedHead") => {
let head = serde_json::to_string(&value["params"]["result"])
.map_err(|e| Box::new(RpcClientError::Serde(e)))?;
if let Err(e) = result.send(head) {
debug!("SendError: {}. will close ws", e);
out.close(CloseCode::Normal)?;
}
}
_ => error!("unsupported method"),
}
}
};
Ok(())
}
pub fn on_extrinsic_msg_until_finalized(
msg: Message,
out: Sender,
result: ThreadOut<String>,
) -> WsResult<()> {
let retstr = msg.as_text().unwrap();
debug!("got msg {}", retstr);
match parse_status(retstr) {
Ok((XtStatus::Finalized, val)) => end_process(out, result, val),
Ok((XtStatus::Future, _)) => {
warn!("extrinsic has 'future' status. aborting");
end_process(out, result, None)
}
Err(e) => Err(Box::new(e).into()),
_ => Ok(()),
}
}
pub fn on_extrinsic_msg_until_in_block(
msg: Message,
out: Sender,
result: ThreadOut<String>,
) -> WsResult<()> {
let retstr = msg.as_text().unwrap();
debug!("got msg {}", retstr);
match parse_status(retstr) {
Ok((XtStatus::Finalized, val)) => end_process(out, result, val),
Ok((XtStatus::InBlock, val)) => end_process(out, result, val),
Ok((XtStatus::Future, _)) => end_process(out, result, None),
Err(e) => Err(Box::new(e).into()),
_ => Ok(()),
}
}
pub fn on_extrinsic_msg_until_broadcast(
msg: Message,
out: Sender,
result: ThreadOut<String>,
) -> WsResult<()> {
let retstr = msg.as_text().unwrap();
debug!("got msg {}", retstr);
match parse_status(retstr) {
Ok((XtStatus::Finalized, val)) => end_process(out, result, val),
Ok((XtStatus::Broadcast, _)) => end_process(out, result, None),
Ok((XtStatus::Future, _)) => end_process(out, result, None),
Err(e) => Err(Box::new(e).into()),
_ => Ok(()),
}
}
pub fn on_extrinsic_msg_until_ready(
msg: Message,
out: Sender,
result: ThreadOut<String>,
) -> WsResult<()> {
let retstr = msg.as_text().unwrap();
debug!("got msg {}", retstr);
match parse_status(retstr) {
Ok((XtStatus::Finalized, val)) => end_process(out, result, val),
Ok((XtStatus::Ready, _)) => end_process(out, result, None),
Ok((XtStatus::Future, _)) => end_process(out, result, None),
Err(e) => Err(Box::new(e).into()),
_ => Ok(()),
}
}
fn end_process(out: Sender, result: ThreadOut<String>, value: Option<String>) -> WsResult<()> {
debug!("Thread end result :{:?} value:{:?}", result, value);
let val = value.unwrap_or_else(|| "".to_string());
out.close(CloseCode::Normal)
.unwrap_or_else(|_| warn!("Could not close WebSocket normally"));
result
.send(val)
.map_err(|e| Box::new(RpcClientError::Send(e)).into())
}
fn parse_status(msg: &str) -> RpcResult<(XtStatus, Option<String>)> {
let value: serde_json::Value = serde_json::from_str(msg)?;
match value["error"].as_object() {
Some(obj) => {
let error = obj
.get("message")
.map_or_else(|| "", |e| e.as_str().unwrap());
let code = obj.get("code").map_or_else(|| -1, |c| c.as_i64().unwrap());
let details = obj.get("data").map_or_else(|| "", |d| d.as_str().unwrap());
Err(RpcClientError::Extrinsic(format!(
"extrinsic error code {}: {}: {}",
code, error, details
)))
}
None => match value["params"]["result"].as_object() {
Some(obj) => {
if let Some(hash) = obj.get("finalized") {
info!("finalized: {:?}", hash);
Ok((XtStatus::Finalized, Some(hash.to_string())))
} else if let Some(hash) = obj.get("inBlock") {
info!("inBlock: {:?}", hash);
Ok((XtStatus::InBlock, Some(hash.to_string())))
} else if let Some(array) = obj.get("broadcast") {
info!("broadcast: {:?}", array);
Ok((XtStatus::Broadcast, Some(array.to_string())))
} else {
Ok((XtStatus::Unknown, None))
}
}
None => match value["params"]["result"].as_str() {
Some("ready") => Ok((XtStatus::Ready, None)),
Some("future") => Ok((XtStatus::Future, None)),
Some(&_) => Ok((XtStatus::Unknown, None)),
None => Ok((XtStatus::Unknown, None)),
},
},
}
}
#[cfg(test)]
mod tests {
use super::*;
fn extract_extrinsic_error_msg(err: RpcClientError) -> String {
match err {
RpcClientError::Extrinsic(msg) => msg,
_ => panic!("Expected extrinsic error"),
}
}
#[test]
fn extrinsic_status_parsed_correctly() {
let msg = "{\"jsonrpc\":\"2.0\",\"result\":7185,\"id\":\"3\"}";
assert_eq!(parse_status(msg).unwrap(), (XtStatus::Unknown, None));
let msg = "{\"jsonrpc\":\"2.0\",\"method\":\"author_extrinsicUpdate\",\"params\":{\"result\":\"ready\",\"subscription\":7185}}";
assert_eq!(parse_status(msg).unwrap(), (XtStatus::Ready, None));
let msg = "{\"jsonrpc\":\"2.0\",\"method\":\"author_extrinsicUpdate\",\"params\":{\"result\":{\"broadcast\":[\"QmfSF4VYWNqNf5KYHpDEdY8Rt1nPUgSkMweDkYzhSWirGY\",\"Qmchhx9SRFeNvqjUK4ZVQ9jH4zhARFkutf9KhbbAmZWBLx\",\"QmQJAqr98EF1X3YfjVKNwQUG9RryqX4Hv33RqGChbz3Ncg\"]},\"subscription\":232}}";
assert_eq!(
parse_status(msg).unwrap(),
(
XtStatus::Broadcast,
Some(
"[\"QmfSF4VYWNqNf5KYHpDEdY8Rt1nPUgSkMweDkYzhSWirGY\",\"Qmchhx9SRFeNvqjUK4ZVQ9jH4zhARFkutf9KhbbAmZWBLx\",\"QmQJAqr98EF1X3YfjVKNwQUG9RryqX4Hv33RqGChbz3Ncg\"]"
.to_string()
)
)
);
let msg = "{\"jsonrpc\":\"2.0\",\"method\":\"author_extrinsicUpdate\",\"params\":{\"result\":{\"inBlock\":\"0x3104d362365ff5ddb61845e1de441b56c6722e94c1aee362f8aa8ba75bd7a3aa\"},\"subscription\":232}}";
assert_eq!(
parse_status(msg).unwrap(),
(
XtStatus::InBlock,
Some(
"\"0x3104d362365ff5ddb61845e1de441b56c6722e94c1aee362f8aa8ba75bd7a3aa\""
.to_string()
)
)
);
let msg = "{\"jsonrpc\":\"2.0\",\"method\":\"author_extrinsicUpdate\",\"params\":{\"result\":{\"finalized\":\"0x934385b11c483498e2b5bca64c2e8ef76ad6c74d3372a05595d3a50caf758d52\"},\"subscription\":7185}}";
assert_eq!(
parse_status(msg).unwrap(),
(
XtStatus::Finalized,
Some(
"\"0x934385b11c483498e2b5bca64c2e8ef76ad6c74d3372a05595d3a50caf758d52\""
.to_string()
)
)
);
let msg = "{\"jsonrpc\":\"2.0\",\"method\":\"author_extrinsicUpdate\",\"params\":{\"result\":\"future\",\"subscription\":2}}";
assert_eq!(parse_status(msg).unwrap(), (XtStatus::Future, None));
let msg = "{\"jsonrpc\":\"2.0\",\"error\":{\"code\":-32700,\"message\":\"Parse error\"},\"id\":null}";
assert_eq!(
parse_status(msg)
.map_err(|e| extract_extrinsic_error_msg(e))
.unwrap_err(),
"extrinsic error code -32700: Parse error: ".to_string()
);
let msg = "{\"jsonrpc\":\"2.0\",\"error\":{\"code\":1010,\"message\":\"Invalid Transaction\",\"data\":\"Bad Signature\"},\"id\":\"4\"}";
assert_eq!(
parse_status(msg)
.map_err(|e| extract_extrinsic_error_msg(e))
.unwrap_err(),
"extrinsic error code 1010: Invalid Transaction: Bad Signature".to_string()
);
let msg = "{\"jsonrpc\":\"2.0\",\"error\":{\"code\":1001,\"message\":\"Extrinsic has invalid format.\"},\"id\":\"0\"}";
assert_eq!(
parse_status(msg)
.map_err(|e| extract_extrinsic_error_msg(e))
.unwrap_err(),
"extrinsic error code 1001: Extrinsic has invalid format.: ".to_string()
);
let msg = r#"{"jsonrpc":"2.0","error":{"code":1002,"message":"Verification Error: Execution(Wasmi(Trap(Trap { kind: Unreachable })))","data":"RuntimeApi(\"Execution(Wasmi(Trap(Trap { kind: Unreachable })))\")"},"id":"3"}"#;
assert_eq!(
parse_status(msg)
.map_err(|e| extract_extrinsic_error_msg(e))
.unwrap_err(),
"extrinsic error code 1002: Verification Error: Execution(Wasmi(Trap(Trap { kind: Unreachable }))): RuntimeApi(\"Execution(Wasmi(Trap(Trap { kind: Unreachable })))\")".to_string()
);
}
}