use std::sync::Arc;
use tokio::sync::Mutex;
use crate::model::SubscriptionChannel;
use crate::{
callback::MessageHandler,
config::WebSocketConfig,
connection::Dispatcher,
error::{WebSocketError, envelope::build_raw_error_response},
message::request::RequestBuilder,
model::{
quote::*,
subscription::SubscriptionManager,
ws_types::{JsonRpcRequest, JsonRpcResponse, JsonRpcResult},
},
session::WebSocketSession,
};
#[derive(Debug)]
pub struct DeribitWebSocketClient {
pub config: Arc<WebSocketConfig>,
dispatcher: Arc<Mutex<Option<Arc<Dispatcher>>>>,
pub session: Arc<WebSocketSession>,
request_builder: Arc<Mutex<RequestBuilder>>,
subscription_manager: Arc<Mutex<SubscriptionManager>>,
message_handler: Option<MessageHandler>,
}
impl DeribitWebSocketClient {
pub fn new(config: &WebSocketConfig) -> Result<Self, WebSocketError> {
let subscription_manager = Arc::new(Mutex::new(SubscriptionManager::new()));
let config = Arc::new(config.clone());
let session = Arc::new(WebSocketSession::new(
Arc::clone(&config),
Arc::clone(&subscription_manager),
));
Ok(Self {
config,
dispatcher: Arc::new(Mutex::new(None)),
session,
request_builder: Arc::new(Mutex::new(RequestBuilder::new())),
subscription_manager,
message_handler: None,
})
}
#[must_use]
pub fn subscription_manager(&self) -> Arc<Mutex<SubscriptionManager>> {
Arc::clone(&self.subscription_manager)
}
pub fn new_with_url(ws_url: String) -> Result<Self, WebSocketError> {
let config = WebSocketConfig::with_url(&ws_url)
.map_err(|e| WebSocketError::ConnectionFailed(format!("Invalid URL: {}", e)))?;
Self::new(&config)
}
pub fn new_testnet() -> Result<Self, WebSocketError> {
Self::new_with_url("wss://test.deribit.com/ws/api/v2".to_string())
}
pub fn new_production() -> Result<Self, WebSocketError> {
Self::new_with_url("wss://www.deribit.com/ws/api/v2".to_string())
}
pub async fn connect(&self) -> Result<(), WebSocketError> {
let mut guard = self.dispatcher.lock().await;
if let Some(prev) = guard.take() {
let _ = prev.shutdown().await;
}
let dispatcher = Dispatcher::connect(
self.config.ws_url.clone(),
self.config.connection_timeout,
self.config.request_timeout,
self.config.notification_channel_capacity,
self.config.dispatcher_command_capacity,
)
.await?;
*guard = Some(Arc::new(dispatcher));
Ok(())
}
pub async fn disconnect(&self) -> Result<(), WebSocketError> {
let dispatcher = {
let mut guard = self.dispatcher.lock().await;
guard.take()
};
if let Some(dispatcher) = dispatcher {
dispatcher.shutdown().await?;
}
Ok(())
}
pub async fn is_connected(&self) -> bool {
self.dispatcher.lock().await.is_some()
}
pub async fn authenticate(
&self,
client_id: &str,
client_secret: &str,
) -> Result<crate::model::AuthResponse, WebSocketError> {
let request = {
let mut builder = self.request_builder.lock().await;
builder.build_auth_request(client_id, client_secret)
};
let request_ctx: &JsonRpcRequest = &request;
let response = self.send_request(&request).await?;
match response.result {
JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
WebSocketError::InvalidMessage(format!(
"Failed to parse authentication response: {}",
e
))
}),
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn subscribe(
&self,
channels: Vec<String>,
) -> Result<JsonRpcResponse, WebSocketError> {
let request = {
let mut builder = self.request_builder.lock().await;
builder.build_subscribe_request(channels)
};
let response = self.send_request(&request).await?;
if let Some(confirmed) = confirmed_channels(&response, "public/subscribe")? {
let mut sub_manager = self.subscription_manager.lock().await;
add_confirmed_channels(&mut sub_manager, confirmed);
}
Ok(response)
}
pub async fn unsubscribe(
&self,
channels: Vec<String>,
) -> Result<JsonRpcResponse, WebSocketError> {
let request = {
let mut builder = self.request_builder.lock().await;
builder.build_unsubscribe_request(channels)
};
let response = self.send_request(&request).await?;
if let Some(confirmed) = confirmed_channels(&response, "public/unsubscribe")? {
let mut sub_manager = self.subscription_manager.lock().await;
remove_confirmed_channels(&mut sub_manager, confirmed);
}
Ok(response)
}
pub async fn public_unsubscribe_all(&self) -> Result<String, WebSocketError> {
let request = {
let mut builder = self.request_builder.lock().await;
builder.build_public_unsubscribe_all_request()
};
let request_ctx: &JsonRpcRequest = &request;
let response = self.send_request(&request).await?;
match response.result {
JsonRpcResult::Success { result } => {
self.subscription_manager.lock().await.clear();
result.as_str().map(String::from).ok_or_else(|| {
WebSocketError::InvalidMessage(
"Expected string result from unsubscribe_all".to_string(),
)
})
}
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn private_unsubscribe_all(&self) -> Result<String, WebSocketError> {
let request = {
let mut builder = self.request_builder.lock().await;
builder.build_private_unsubscribe_all_request()
};
let request_ctx: &JsonRpcRequest = &request;
let response = self.send_request(&request).await?;
match response.result {
JsonRpcResult::Success { result } => {
self.subscription_manager.lock().await.clear();
result.as_str().map(String::from).ok_or_else(|| {
WebSocketError::InvalidMessage(
"Expected string result from unsubscribe_all".to_string(),
)
})
}
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn send_request(
&self,
request: &JsonRpcRequest,
) -> Result<JsonRpcResponse, WebSocketError> {
let dispatcher = {
let guard = self.dispatcher.lock().await;
guard
.as_ref()
.map(Arc::clone)
.ok_or(WebSocketError::ConnectionClosed)?
};
dispatcher.send_request(request).await
}
pub async fn receive_message(&self) -> Result<String, WebSocketError> {
let dispatcher = {
let guard = self.dispatcher.lock().await;
guard
.as_ref()
.map(Arc::clone)
.ok_or(WebSocketError::ConnectionClosed)?
};
dispatcher
.next_notification()
.await
.ok_or(WebSocketError::ConnectionClosed)
}
pub async fn get_subscriptions(&self) -> Vec<String> {
let sub_manager = self.subscription_manager.lock().await;
sub_manager.get_all_channels()
}
pub async fn test_connection(&self) -> Result<crate::model::TestResponse, WebSocketError> {
let request = {
let mut builder = self.request_builder.lock().await;
builder.build_test_request()
};
let request_ctx: &JsonRpcRequest = &request;
let response = self.send_request(&request).await?;
match response.result {
JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
WebSocketError::InvalidMessage(format!("Failed to parse test response: {}", e))
}),
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn get_time(&self) -> Result<u64, WebSocketError> {
let request = {
let mut builder = self.request_builder.lock().await;
builder.build_get_time_request()
};
let request_ctx: &JsonRpcRequest = &request;
let response = self.send_request(&request).await?;
match response.result {
JsonRpcResult::Success { result } => result.as_u64().ok_or_else(|| {
WebSocketError::InvalidMessage(
"Expected u64 timestamp in get_time response".to_string(),
)
}),
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn set_heartbeat(&self, interval: u64) -> Result<String, WebSocketError> {
let request = {
let mut builder = self.request_builder.lock().await;
builder.build_set_heartbeat_request(interval)
};
let request_ctx: &JsonRpcRequest = &request;
let response = self.send_request(&request).await?;
match response.result {
JsonRpcResult::Success { result } => {
result.as_str().map(String::from).ok_or_else(|| {
WebSocketError::InvalidMessage(
"Expected string result from set_heartbeat".to_string(),
)
})
}
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn disable_heartbeat(&self) -> Result<String, WebSocketError> {
let request = {
let mut builder = self.request_builder.lock().await;
builder.build_disable_heartbeat_request()
};
let request_ctx: &JsonRpcRequest = &request;
let response = self.send_request(&request).await?;
match response.result {
JsonRpcResult::Success { result } => {
result.as_str().map(String::from).ok_or_else(|| {
WebSocketError::InvalidMessage(
"Expected string result from disable_heartbeat".to_string(),
)
})
}
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn hello(
&self,
client_name: &str,
client_version: &str,
) -> Result<crate::model::HelloResponse, WebSocketError> {
let request = {
let mut builder = self.request_builder.lock().await;
builder.build_hello_request(client_name, client_version)
};
let request_ctx: &JsonRpcRequest = &request;
let response = self.send_request(&request).await?;
match response.result {
JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
WebSocketError::InvalidMessage(format!("Failed to parse hello response: {}", e))
}),
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn enable_cancel_on_disconnect(&self) -> Result<String, WebSocketError> {
let request = {
let mut builder = self.request_builder.lock().await;
builder.build_enable_cancel_on_disconnect_request()
};
let request_ctx: &JsonRpcRequest = &request;
let response = self.send_request(&request).await?;
match response.result {
JsonRpcResult::Success { result } => {
result.as_str().map(String::from).ok_or_else(|| {
WebSocketError::InvalidMessage(
"Expected string result from enable_cancel_on_disconnect".to_string(),
)
})
}
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn disable_cancel_on_disconnect(&self) -> Result<String, WebSocketError> {
let request = {
let mut builder = self.request_builder.lock().await;
builder.build_disable_cancel_on_disconnect_request()
};
let request_ctx: &JsonRpcRequest = &request;
let response = self.send_request(&request).await?;
match response.result {
JsonRpcResult::Success { result } => {
result.as_str().map(String::from).ok_or_else(|| {
WebSocketError::InvalidMessage(
"Expected string result from disable_cancel_on_disconnect".to_string(),
)
})
}
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn get_cancel_on_disconnect(&self) -> Result<bool, WebSocketError> {
let request = {
let mut builder = self.request_builder.lock().await;
builder.build_get_cancel_on_disconnect_request()
};
let request_ctx: &JsonRpcRequest = &request;
let response = self.send_request(&request).await?;
match response.result {
JsonRpcResult::Success { result } => {
result
.get("enabled")
.and_then(|v| v.as_bool())
.ok_or_else(|| {
WebSocketError::InvalidMessage(
"Expected 'enabled' boolean in get_cancel_on_disconnect response"
.to_string(),
)
})
}
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn mass_quote(
&self,
request: MassQuoteRequest,
) -> Result<MassQuoteResult, WebSocketError> {
request.validate().map_err(WebSocketError::InvalidMessage)?;
let json_request = {
let mut builder = self.request_builder.lock().await;
builder.build_mass_quote_request(request)?
};
let request_ctx: &JsonRpcRequest = &json_request;
let response = self.send_request(&json_request).await?;
match response.result {
JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
WebSocketError::InvalidMessage(format!(
"Failed to parse mass quote response: {}",
e
))
}),
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn cancel_quotes(
&self,
request: CancelQuotesRequest,
) -> Result<CancelQuotesResponse, WebSocketError> {
let json_request = {
let mut builder = self.request_builder.lock().await;
builder.build_cancel_quotes_request(request)?
};
let request_ctx: &JsonRpcRequest = &json_request;
let response = self.send_request(&json_request).await?;
match response.result {
JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
WebSocketError::InvalidMessage(format!(
"Failed to parse cancel quotes response: {}",
e
))
}),
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn set_mmp_config(&self, config: MmpGroupConfig) -> Result<(), WebSocketError> {
let json_request = {
let mut builder = self.request_builder.lock().await;
builder.build_set_mmp_config_request(config)?
};
let request_ctx: &JsonRpcRequest = &json_request;
let response = self.send_request(&json_request).await?;
match response.result {
JsonRpcResult::Success { .. } => Ok(()),
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn get_mmp_config(
&self,
mmp_group: Option<String>,
) -> Result<Vec<MmpGroupConfig>, WebSocketError> {
let json_request = {
let mut builder = self.request_builder.lock().await;
builder.build_get_mmp_config_request(mmp_group)
};
let request_ctx: &JsonRpcRequest = &json_request;
let response = self.send_request(&json_request).await?;
match response.result {
JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
WebSocketError::InvalidMessage(format!(
"Failed to parse MMP config response: {}",
e
))
}),
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn reset_mmp(&self, mmp_group: Option<String>) -> Result<(), WebSocketError> {
let json_request = {
let mut builder = self.request_builder.lock().await;
builder.build_reset_mmp_request(mmp_group)
};
let request_ctx: &JsonRpcRequest = &json_request;
let response = self.send_request(&json_request).await?;
match response.result {
JsonRpcResult::Success { .. } => Ok(()),
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn get_open_orders(
&self,
currency: Option<String>,
kind: Option<String>,
type_filter: Option<String>,
) -> Result<Vec<QuoteInfo>, WebSocketError> {
let json_request = {
let mut builder = self.request_builder.lock().await;
builder.build_get_open_orders_request(currency, kind, type_filter)
};
let request_ctx: &JsonRpcRequest = &json_request;
let response = self.send_request(&json_request).await?;
match response.result {
JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
WebSocketError::InvalidMessage(format!(
"Failed to parse open orders response: {}",
e
))
}),
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn buy(
&self,
request: crate::model::trading::OrderRequest,
) -> Result<crate::model::trading::OrderResponse, WebSocketError> {
let json_request = {
let mut builder = self.request_builder.lock().await;
builder.build_buy_request(&request)?
};
let request_ctx: &JsonRpcRequest = &json_request;
let response = self.send_request(&json_request).await?;
match response.result {
JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
WebSocketError::InvalidMessage(format!("Failed to parse buy response: {}", e))
}),
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn sell(
&self,
request: crate::model::trading::OrderRequest,
) -> Result<crate::model::trading::OrderResponse, WebSocketError> {
let json_request = {
let mut builder = self.request_builder.lock().await;
builder.build_sell_request(&request)?
};
let request_ctx: &JsonRpcRequest = &json_request;
let response = self.send_request(&json_request).await?;
match response.result {
JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
WebSocketError::InvalidMessage(format!("Failed to parse sell response: {}", e))
}),
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn cancel(
&self,
order_id: &str,
) -> Result<crate::model::trading::OrderInfo, WebSocketError> {
let json_request = {
let mut builder = self.request_builder.lock().await;
builder.build_cancel_request(order_id)
};
let request_ctx: &JsonRpcRequest = &json_request;
let response = self.send_request(&json_request).await?;
match response.result {
JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
WebSocketError::InvalidMessage(format!("Failed to parse cancel response: {}", e))
}),
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn cancel_all(&self) -> Result<u32, WebSocketError> {
let json_request = {
let mut builder = self.request_builder.lock().await;
builder.build_cancel_all_request()
};
let request_ctx: &JsonRpcRequest = &json_request;
let response = self.send_request(&json_request).await?;
match response.result {
JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
WebSocketError::InvalidMessage(format!(
"Failed to parse cancel_all response: {}",
e
))
}),
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn cancel_all_by_currency(&self, currency: &str) -> Result<u32, WebSocketError> {
let json_request = {
let mut builder = self.request_builder.lock().await;
builder.build_cancel_all_by_currency_request(currency)
};
let request_ctx: &JsonRpcRequest = &json_request;
let response = self.send_request(&json_request).await?;
match response.result {
JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
WebSocketError::InvalidMessage(format!(
"Failed to parse cancel_all_by_currency response: {}",
e
))
}),
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn cancel_all_by_instrument(
&self,
instrument_name: &str,
) -> Result<u32, WebSocketError> {
let json_request = {
let mut builder = self.request_builder.lock().await;
builder.build_cancel_all_by_instrument_request(instrument_name)
};
let request_ctx: &JsonRpcRequest = &json_request;
let response = self.send_request(&json_request).await?;
match response.result {
JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
WebSocketError::InvalidMessage(format!(
"Failed to parse cancel_all_by_instrument response: {}",
e
))
}),
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn edit(
&self,
request: crate::model::trading::EditOrderRequest,
) -> Result<crate::model::trading::OrderResponse, WebSocketError> {
let json_request = {
let mut builder = self.request_builder.lock().await;
builder.build_edit_request(&request)?
};
let request_ctx: &JsonRpcRequest = &json_request;
let response = self.send_request(&json_request).await?;
match response.result {
JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
WebSocketError::InvalidMessage(format!("Failed to parse edit response: {}", e))
}),
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn get_positions(
&self,
currency: Option<&str>,
kind: Option<&str>,
) -> Result<Vec<crate::model::Position>, WebSocketError> {
let json_request = {
let mut builder = self.request_builder.lock().await;
builder.build_get_positions_request(currency, kind)
};
let request_ctx: &JsonRpcRequest = &json_request;
let response = self.send_request(&json_request).await?;
match response.result {
JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
WebSocketError::InvalidMessage(format!("Failed to parse positions response: {}", e))
}),
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn get_account_summary(
&self,
currency: &str,
extended: Option<bool>,
) -> Result<crate::model::AccountSummary, WebSocketError> {
let json_request = {
let mut builder = self.request_builder.lock().await;
builder.build_get_account_summary_request(currency, extended)
};
let request_ctx: &JsonRpcRequest = &json_request;
let response = self.send_request(&json_request).await?;
match response.result {
JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
WebSocketError::InvalidMessage(format!(
"Failed to parse account summary response: {}",
e
))
}),
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn get_order_state(
&self,
order_id: &str,
) -> Result<crate::model::OrderInfo, WebSocketError> {
let json_request = {
let mut builder = self.request_builder.lock().await;
builder.build_get_order_state_request(order_id)
};
let request_ctx: &JsonRpcRequest = &json_request;
let response = self.send_request(&json_request).await?;
match response.result {
JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
WebSocketError::InvalidMessage(format!(
"Failed to parse order state response: {}",
e
))
}),
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn get_order_history_by_currency(
&self,
currency: &str,
kind: Option<&str>,
count: Option<u32>,
) -> Result<Vec<crate::model::OrderInfo>, WebSocketError> {
let json_request = {
let mut builder = self.request_builder.lock().await;
builder.build_get_order_history_by_currency_request(currency, kind, count)
};
let request_ctx: &JsonRpcRequest = &json_request;
let response = self.send_request(&json_request).await?;
match response.result {
JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
WebSocketError::InvalidMessage(format!(
"Failed to parse order history response: {}",
e
))
}),
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn close_position(
&self,
instrument_name: &str,
order_type: &str,
price: Option<f64>,
) -> Result<crate::model::ClosePositionResponse, WebSocketError> {
let json_request = {
let mut builder = self.request_builder.lock().await;
builder.build_close_position_request(instrument_name, order_type, price)?
};
let request_ctx: &JsonRpcRequest = &json_request;
let response = self.send_request(&json_request).await?;
match response.result {
JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
WebSocketError::InvalidMessage(format!(
"Failed to parse close position response: {}",
e
))
}),
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub async fn move_positions(
&self,
currency: &str,
source_uid: u64,
target_uid: u64,
trades: &[crate::model::MovePositionTrade],
) -> Result<Vec<crate::model::MovePositionResult>, WebSocketError> {
let json_request = {
let mut builder = self.request_builder.lock().await;
builder.build_move_positions_request(currency, source_uid, target_uid, trades)?
};
let request_ctx: &JsonRpcRequest = &json_request;
let response = self.send_request(&json_request).await?;
match response.result {
JsonRpcResult::Success { result } => serde_json::from_value(result).map_err(|e| {
WebSocketError::InvalidMessage(format!(
"Failed to parse move positions response: {}",
e
))
}),
JsonRpcResult::Error { error } => {
let raw = build_raw_error_response(&response.jsonrpc, &response.id, &error);
Err(WebSocketError::api_error_from_parts(
request_ctx,
error,
Some(raw),
))
}
}
}
pub fn set_message_handler<F, E>(&mut self, message_callback: F, error_callback: E)
where
F: Fn(&str) -> Result<(), WebSocketError> + Send + Sync + 'static,
E: Fn(&str, &WebSocketError) + Send + Sync + 'static,
{
self.message_handler = Some(MessageHandler::new(message_callback, error_callback));
}
pub fn set_message_handler_builder(&mut self, handler: MessageHandler) {
self.message_handler = Some(handler);
}
pub fn clear_message_handler(&mut self) {
self.message_handler = None;
}
pub fn has_message_handler(&self) -> bool {
self.message_handler.is_some()
}
pub async fn receive_and_process_message(&self) -> Result<(), WebSocketError> {
let message = self.receive_message().await?;
if let Some(handler) = &self.message_handler {
handler.handle_message(&message);
}
Ok(())
}
pub async fn start_message_processing_loop(&self) -> Result<(), WebSocketError> {
if self.message_handler.is_none() {
return Err(WebSocketError::InvalidMessage(
"No message handler set. Use set_message_handler() first.".to_string(),
));
}
loop {
match self.receive_and_process_message().await {
Ok(()) => {
}
Err(WebSocketError::ConnectionClosed) => {
break;
}
Err(e) => {
return Err(e);
}
}
}
Ok(())
}
}
impl Default for DeribitWebSocketClient {
fn default() -> Self {
let config = WebSocketConfig::default();
#[allow(clippy::unwrap_used)]
Self::new(&config).unwrap()
}
}
fn add_confirmed_channels(manager: &mut SubscriptionManager, channels: Vec<String>) {
for channel in channels {
let channel_type = SubscriptionChannel::from_string(&channel);
let instrument = instrument_from_channel(&channel);
manager.add_subscription(channel, channel_type, instrument);
}
}
fn remove_confirmed_channels(manager: &mut SubscriptionManager, channels: Vec<String>) {
for channel in channels {
manager.remove_subscription(&channel);
}
}
fn confirmed_channels(
response: &JsonRpcResponse,
method: &'static str,
) -> Result<Option<Vec<String>>, WebSocketError> {
match &response.result {
JsonRpcResult::Success { result } => serde_json::from_value::<Vec<String>>(result.clone())
.map(Some)
.map_err(|e| {
WebSocketError::InvalidMessage(format!(
"expected array of confirmed channel strings in {} response: {}",
method, e
))
}),
JsonRpcResult::Error { .. } => Ok(None),
}
}
fn instrument_from_channel(channel: &str) -> Option<String> {
let parts: Vec<&str> = channel.split('.').collect();
match parts.as_slice() {
["ticker", instrument] | ["ticker", instrument, _] => Some((*instrument).to_string()),
["book", instrument, ..] => Some((*instrument).to_string()),
["trades", instrument, ..] => Some((*instrument).to_string()),
["chart", "trades", instrument, _] => Some((*instrument).to_string()),
["user", "changes", instrument, _] => Some((*instrument).to_string()),
["estimated_expiration_price", instrument] => Some((*instrument).to_string()),
["markprice", "options", instrument] => Some((*instrument).to_string()),
["perpetual", instrument, _] => Some((*instrument).to_string()),
["quote", instrument] => Some((*instrument).to_string()),
["incremental_ticker", instrument] => Some((*instrument).to_string()),
["deribit_price_index", index_name]
| ["deribit_price_ranking", index_name]
| ["deribit_price_statistics", index_name]
| ["deribit_volatility_index", index_name] => Some((*index_name).to_string()),
["instrument", "state", _kind, currency] => Some((*currency).to_string()),
["block_rfq", "trades", currency] => Some((*currency).to_string()),
["block_trade_confirmations", currency] => Some((*currency).to_string()),
["user", "mmp_trigger", index_name] => Some((*index_name).to_string()),
_ => None,
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
use crate::model::ws_types::JsonRpcError;
use serde_json::json;
fn success(result: serde_json::Value) -> JsonRpcResponse {
JsonRpcResponse::success(json!(1), result)
}
fn api_error(code: i32, message: &str) -> JsonRpcResponse {
JsonRpcResponse::error(
json!(1),
JsonRpcError {
code,
message: message.to_string(),
data: None,
},
)
}
fn reconcile_subscribe(
manager: &mut SubscriptionManager,
response: &JsonRpcResponse,
) -> Result<(), WebSocketError> {
if let Some(confirmed) = confirmed_channels(response, "public/subscribe")? {
add_confirmed_channels(manager, confirmed);
}
Ok(())
}
fn reconcile_unsubscribe(
manager: &mut SubscriptionManager,
response: &JsonRpcResponse,
) -> Result<(), WebSocketError> {
if let Some(confirmed) = confirmed_channels(response, "public/unsubscribe")? {
remove_confirmed_channels(manager, confirmed);
}
Ok(())
}
#[test]
fn test_reconcile_subscribe_adds_only_server_confirmed_channels() {
let mut manager = SubscriptionManager::new();
let response = success(json!(["ticker.BTC-PERPETUAL"]));
reconcile_subscribe(&mut manager, &response)
.expect("well-formed success response reconciles");
let channels = manager.get_all_channels();
assert_eq!(channels, vec!["ticker.BTC-PERPETUAL".to_string()]);
assert!(
manager.get_subscription("ticker.INVALID").is_none(),
"rejected input channel must not leak into local state"
);
}
#[test]
fn test_reconcile_subscribe_happy_path_input_equals_response() {
let mut manager = SubscriptionManager::new();
let response = success(json!(["ticker.BTC-PERPETUAL", "book.ETH-PERPETUAL.raw"]));
reconcile_subscribe(&mut manager, &response).expect("happy-path response reconciles");
let mut channels = manager.get_all_channels();
channels.sort();
assert_eq!(
channels,
vec![
"book.ETH-PERPETUAL.raw".to_string(),
"ticker.BTC-PERPETUAL".to_string(),
]
);
let ticker = manager
.get_subscription("ticker.BTC-PERPETUAL")
.expect("ticker subscription tracked");
assert_eq!(ticker.instrument.as_deref(), Some("BTC-PERPETUAL"));
}
#[test]
fn test_reconcile_subscribe_empty_result_is_noop() {
let mut manager = SubscriptionManager::new();
let response = success(json!([] as [&str; 0]));
reconcile_subscribe(&mut manager, &response).expect("empty confirmation is valid");
assert!(manager.get_all_channels().is_empty());
}
#[test]
fn test_reconcile_subscribe_api_error_is_noop() {
let mut manager = SubscriptionManager::new();
manager.add_subscription(
"ticker.BTC-PERPETUAL".to_string(),
SubscriptionChannel::Ticker("BTC-PERPETUAL".to_string()),
Some("BTC-PERPETUAL".to_string()),
);
let before = manager.get_all_channels();
let response = api_error(-32000, "subscription rejected");
assert!(
matches!(confirmed_channels(&response, "public/subscribe"), Ok(None)),
"api-error response must yield Ok(None)"
);
reconcile_subscribe(&mut manager, &response)
.expect("api-error response must not return Err");
assert_eq!(
manager.get_all_channels(),
before,
"api-error response must not mutate the manager"
);
}
#[test]
fn test_reconcile_subscribe_non_array_result_returns_invalid_message() {
let mut manager = SubscriptionManager::new();
let response = success(json!({ "channels": ["ticker.BTC-PERPETUAL"] }));
let err = reconcile_subscribe(&mut manager, &response)
.expect_err("object result must not parse as Vec<String>");
assert!(
matches!(err, WebSocketError::InvalidMessage(_)),
"expected InvalidMessage, got {:?}",
err
);
assert!(
manager.get_all_channels().is_empty(),
"failed reconciliation must not partially mutate the manager"
);
}
#[test]
fn test_reconcile_unsubscribe_removes_only_server_confirmed_channels() {
let mut manager = SubscriptionManager::new();
manager.add_subscription(
"ticker.BTC-PERPETUAL".to_string(),
SubscriptionChannel::Ticker("BTC-PERPETUAL".to_string()),
Some("BTC-PERPETUAL".to_string()),
);
manager.add_subscription(
"ticker.ETH-PERPETUAL".to_string(),
SubscriptionChannel::Ticker("ETH-PERPETUAL".to_string()),
Some("ETH-PERPETUAL".to_string()),
);
let response = success(json!(["ticker.BTC-PERPETUAL"]));
reconcile_unsubscribe(&mut manager, &response)
.expect("well-formed unsubscribe response reconciles");
let channels = manager.get_all_channels();
assert_eq!(channels, vec!["ticker.ETH-PERPETUAL".to_string()]);
}
#[test]
fn test_reconcile_unsubscribe_happy_path() {
let mut manager = SubscriptionManager::new();
manager.add_subscription(
"ticker.BTC-PERPETUAL".to_string(),
SubscriptionChannel::Ticker("BTC-PERPETUAL".to_string()),
Some("BTC-PERPETUAL".to_string()),
);
manager.add_subscription(
"book.ETH-PERPETUAL.raw".to_string(),
SubscriptionChannel::OrderBook("ETH-PERPETUAL".to_string()),
Some("ETH-PERPETUAL".to_string()),
);
let response = success(json!(["ticker.BTC-PERPETUAL", "book.ETH-PERPETUAL.raw"]));
reconcile_unsubscribe(&mut manager, &response).expect("happy-path unsubscribe reconciles");
assert!(manager.get_all_channels().is_empty());
}
#[test]
fn test_reconcile_unsubscribe_api_error_is_noop() {
let mut manager = SubscriptionManager::new();
manager.add_subscription(
"ticker.BTC-PERPETUAL".to_string(),
SubscriptionChannel::Ticker("BTC-PERPETUAL".to_string()),
Some("BTC-PERPETUAL".to_string()),
);
let before = manager.get_all_channels();
let response = api_error(-32000, "unsubscribe rejected");
reconcile_unsubscribe(&mut manager, &response)
.expect("api-error response must not return Err");
assert_eq!(
manager.get_all_channels(),
before,
"api-error response must not mutate the manager"
);
}
#[test]
fn test_reconcile_unsubscribe_non_array_result_returns_invalid_message() {
let mut manager = SubscriptionManager::new();
manager.add_subscription(
"ticker.BTC-PERPETUAL".to_string(),
SubscriptionChannel::Ticker("BTC-PERPETUAL".to_string()),
Some("BTC-PERPETUAL".to_string()),
);
let response = success(json!("not an array"));
let err = reconcile_unsubscribe(&mut manager, &response)
.expect_err("string result must not parse as Vec<String>");
assert!(
matches!(err, WebSocketError::InvalidMessage(_)),
"expected InvalidMessage, got {:?}",
err
);
assert_eq!(
manager.get_all_channels(),
vec!["ticker.BTC-PERPETUAL".to_string()],
"failed reconciliation must not partially mutate the manager"
);
}
async fn spawn_mock_server<F, Fut>(
scenario: F,
) -> (std::net::SocketAddr, tokio::task::JoinHandle<()>)
where
F: FnOnce(
futures_util::stream::SplitSink<
tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
tokio_tungstenite::tungstenite::Message,
>,
futures_util::stream::SplitStream<
tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
>,
) -> Fut
+ Send
+ 'static,
Fut: std::future::Future<Output = ()> + Send,
{
use futures_util::StreamExt;
use tokio::net::TcpListener;
use tokio_tungstenite::accept_async;
let listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("bind localhost ephemeral port");
let addr = listener
.local_addr()
.expect("read local addr of bound listener");
let handle = tokio::spawn(async move {
let (socket, _peer) = match listener.accept().await {
Ok(pair) => pair,
Err(_) => return,
};
let ws = match accept_async(socket).await {
Ok(ws) => ws,
Err(_) => return,
};
let (sink, stream) = ws.split();
scenario(sink, stream).await;
});
(addr, handle)
}
#[tokio::test]
async fn test_subscribe_reconciles_local_state_with_server_subset() {
use futures_util::{SinkExt, StreamExt};
use tokio_tungstenite::tungstenite::Message;
let (addr, server) = spawn_mock_server(|mut sink, mut stream| async move {
if let Some(Ok(Message::Text(t))) = stream.next().await {
let req: serde_json::Value =
serde_json::from_str(&t).expect("server parses request");
let id = req.get("id").cloned().unwrap_or(serde_json::Value::Null);
let resp = json!({
"jsonrpc": "2.0",
"id": id,
"result": ["ticker.BTC-PERPETUAL"],
});
let _ = sink.send(Message::Text(resp.to_string().into())).await;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
})
.await;
let config = WebSocketConfig::with_url(&format!("ws://{}/", addr)).expect("valid ws url");
let client = DeribitWebSocketClient::new(&config).expect("client construction");
client.connect().await.expect("client connects to mock");
let response = client
.subscribe(vec![
"ticker.INVALID".to_string(),
"ticker.BTC-PERPETUAL".to_string(),
])
.await
.expect("subscribe returns the server-confirmed response");
let JsonRpcResult::Success { result } = response.result else {
panic!("expected Success result, got {:?}", response.result);
};
assert_eq!(result, json!(["ticker.BTC-PERPETUAL"]));
let manager = client.subscription_manager();
let channels = manager.lock().await.get_all_channels();
assert_eq!(
channels,
vec!["ticker.BTC-PERPETUAL".to_string()],
"local manager must drop rejected channels from the input"
);
client.disconnect().await.expect("client disconnects");
server.await.expect("server task did not panic");
}
}