use std::{collections::HashMap, fmt::Debug, sync::Arc};
use nautilus_core::consts::NAUTILUS_USER_AGENT;
use nautilus_model::defi::{Block, Chain, rpc::RpcNodeWssResponse};
use nautilus_network::{
RECONNECTED,
http::USER_AGENT,
websocket::{WebSocketClient, WebSocketConfig, channel_message_handler},
};
use tokio_tungstenite::tungstenite::Message;
use crate::rpc::{
error::BlockchainRpcClientError,
types::{BlockchainMessage, RpcEventType},
utils::{
extract_rpc_subscription_id, is_subscription_confirmation_response, is_subscription_event,
},
};
pub struct CoreBlockchainRpcClient {
chain: Chain,
wss_rpc_url: String,
request_id: u64,
pending_subscription_request: HashMap<u64, RpcEventType>,
subscription_event_types: HashMap<String, RpcEventType>,
wss_client: Option<Arc<WebSocketClient>>,
wss_consumer_rx: Option<tokio::sync::mpsc::UnboundedReceiver<Message>>,
subscriptions: Arc<tokio::sync::RwLock<HashMap<RpcEventType, String>>>,
}
impl Debug for CoreBlockchainRpcClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct(stringify!(CoreBlockchainRpcClient))
.field("chain", &self.chain)
.field("wss_rpc_url", &self.wss_rpc_url)
.field("request_id", &self.request_id)
.field(
"pending_subscription_request",
&self.pending_subscription_request,
)
.field("subscription_event_types", &self.subscription_event_types)
.field(
"wss_client",
&self.wss_client.as_ref().map(|_| "<WebSocketClient>"),
)
.field(
"wss_consumer_rx",
&self.wss_consumer_rx.as_ref().map(|_| "<Receiver>"),
)
.field("confirmed_subscriptions", &"<RwLock<HashMap>>")
.finish()
}
}
impl CoreBlockchainRpcClient {
#[must_use]
pub fn new(chain: Chain, wss_rpc_url: String) -> Self {
Self {
chain,
wss_rpc_url,
request_id: 1,
wss_client: None,
pending_subscription_request: HashMap::new(),
subscription_event_types: HashMap::new(),
wss_consumer_rx: None,
subscriptions: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
}
}
pub async fn connect(&mut self) -> anyhow::Result<()> {
let (handler, rx) = channel_message_handler();
let user_agent = (USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string());
let heartbeat_interval = 30;
let config = WebSocketConfig {
url: self.wss_rpc_url.clone(),
headers: vec![user_agent],
heartbeat: Some(heartbeat_interval),
heartbeat_msg: None,
reconnect_timeout_ms: Some(10_000),
reconnect_delay_initial_ms: Some(1_000),
reconnect_delay_max_ms: Some(30_000),
reconnect_backoff_factor: Some(2.0),
reconnect_jitter_ms: Some(1_000),
reconnect_max_attempts: None,
idle_timeout_ms: None,
};
let client =
WebSocketClient::connect(config, Some(handler), None, None, vec![], None).await?;
self.wss_client = Some(Arc::new(client));
self.wss_consumer_rx = Some(rx);
Ok(())
}
async fn subscribe_events(
&mut self,
event_type: RpcEventType,
subscription_id: String,
) -> Result<(), BlockchainRpcClientError> {
if let Some(client) = &self.wss_client {
log::info!(
"Subscribing to '{}' on chain '{}'",
subscription_id,
self.chain.name
);
let msg = serde_json::json!({
"method": "eth_subscribe",
"id": self.request_id,
"jsonrpc": "2.0",
"params": [subscription_id]
});
self.pending_subscription_request
.insert(self.request_id, event_type.clone());
self.request_id += 1;
if let Err(e) = client.send_text(msg.to_string(), None).await {
log::error!("Error sending subscribe message: {e:?}");
}
let mut confirmed = self.subscriptions.write().await;
confirmed.insert(event_type, subscription_id);
Ok(())
} else {
Err(BlockchainRpcClientError::ClientError(String::from(
"Client not connected",
)))
}
}
async fn resubscribe_all(&mut self) -> Result<(), BlockchainRpcClientError> {
let subscriptions = self.subscriptions.read().await;
if subscriptions.is_empty() {
log::debug!(
"No subscriptions to re-establish for chain '{}'",
self.chain.name
);
return Ok(());
}
log::info!(
"Re-establishing {} subscription(s) for chain '{}'",
subscriptions.len(),
self.chain.name
);
let subs_to_restore: Vec<(RpcEventType, String)> = subscriptions
.iter()
.map(|(event_type, sub_id)| (event_type.clone(), sub_id.clone()))
.collect();
drop(subscriptions);
for (event_type, subscription_id) in subs_to_restore {
if let Some(client) = &self.wss_client {
log::debug!(
"Re-subscribing to '{}' on chain '{}'",
subscription_id,
self.chain.name
);
let msg = serde_json::json!({
"method": "eth_subscribe",
"id": self.request_id,
"jsonrpc": "2.0",
"params": [subscription_id]
});
self.pending_subscription_request
.insert(self.request_id, event_type);
self.request_id += 1;
if let Err(e) = client.send_text(msg.to_string(), None).await {
log::error!("Error re-subscribing after reconnection: {e:?}");
}
}
}
Ok(())
}
async fn unsubscribe_events(
&self,
subscription_id: String,
) -> Result<(), BlockchainRpcClientError> {
if let Some(client) = &self.wss_client {
log::info!("Unsubscribing to new blocks on chain {}", self.chain.name);
let msg = serde_json::json!({
"method": "eth_unsubscribe",
"id": 1,
"jsonrpc": "2.0",
"params": [subscription_id]
});
if let Err(e) = client.send_text(msg.to_string(), None).await {
log::error!("Error sending unsubscribe message: {e:?}");
}
Ok(())
} else {
Err(BlockchainRpcClientError::ClientError(String::from(
"Client not connected",
)))
}
}
pub async fn wait_on_rpc_channel(&mut self) -> Option<Message> {
match &mut self.wss_consumer_rx {
Some(rx) => rx.recv().await,
None => None,
}
}
pub async fn next_rpc_message(
&mut self,
) -> Result<BlockchainMessage, BlockchainRpcClientError> {
while let Some(msg) = self.wait_on_rpc_channel().await {
match msg {
Message::Text(text) => {
if text == RECONNECTED {
log::info!("Detected reconnection for chain '{}'", self.chain.name);
if let Err(e) = self.resubscribe_all().await {
log::error!("Failed to re-establish subscriptions: {e:?}");
}
continue;
}
match serde_json::from_str::<serde_json::Value>(&text) {
Ok(json) => {
if is_subscription_confirmation_response(&json) {
let subscription_request_id =
json.get("id").unwrap().as_u64().unwrap();
let result = json.get("result").unwrap().as_str().unwrap();
let event_type = self
.pending_subscription_request
.get(&subscription_request_id)
.unwrap();
self.subscription_event_types
.insert(result.to_string(), event_type.clone());
self.pending_subscription_request
.remove(&subscription_request_id);
continue;
} else if is_subscription_event(&json) {
let subscription_id = match extract_rpc_subscription_id(&json) {
Some(id) => id,
None => {
return Err(BlockchainRpcClientError::InternalRpcClientError(
"Error parsing subscription id from valid rpc response"
.to_string(),
));
}
};
if let Some(event_type) =
self.subscription_event_types.get(subscription_id)
{
match event_type {
RpcEventType::NewBlock => {
return match serde_json::from_value::<
RpcNodeWssResponse<Block>,
>(
json
) {
Ok(block_response) => {
let block = block_response.params.result;
Ok(BlockchainMessage::Block(block))
}
Err(e) => Err(
BlockchainRpcClientError::MessageParsingError(
format!(
"Error parsing rpc response to block with error {e}"
),
),
),
};
}
}
}
return Err(BlockchainRpcClientError::InternalRpcClientError(
format!(
"Event type not found for defined subscription id {subscription_id}"
),
));
}
return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
json.to_string(),
));
}
Err(e) => {
return Err(BlockchainRpcClientError::MessageParsingError(
e.to_string(),
));
}
}
}
Message::Pong(_) => {}
_ => {
return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
msg.to_string(),
));
}
}
}
Err(BlockchainRpcClientError::NoMessageReceived)
}
pub async fn subscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
self.subscribe_events(RpcEventType::NewBlock, String::from("newHeads"))
.await
}
pub async fn unsubscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
self.unsubscribe_events(String::from("newHeads")).await?;
let subscription_ids_to_remove: Vec<String> = self
.subscription_event_types
.iter()
.filter(|(_, event_type)| **event_type == RpcEventType::NewBlock)
.map(|(id, _)| id.clone())
.collect();
for id in subscription_ids_to_remove {
self.subscription_event_types.remove(&id);
}
let mut confirmed = self.subscriptions.write().await;
confirmed.remove(&RpcEventType::NewBlock);
Ok(())
}
}