use deribit_websocket::prelude::*;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
deribit_websocket::install_default_crypto_provider()?;
setup_logger();
let mut client = DeribitWebSocketClient::default();
tracing::info!("๐ Starting Advanced Mass Quote Example");
let quote_count = Arc::new(Mutex::new(0u32));
let mmp_triggers = Arc::new(Mutex::new(Vec::<String>::new()));
let quote_count_clone = Arc::clone("e_count);
let mmp_triggers_clone = Arc::clone(&mmp_triggers);
client.set_message_handler(
move |message: &str| -> Result<(), WebSocketError> {
if let Ok(json_msg) = serde_json::from_str::<Value>(message)
&& let Some(method) = json_msg.get("method")
&& method.as_str() == Some("subscription")
&& let Some(params) = json_msg.get("params")
&& let Some(channel) = params.get("channel").and_then(|c| c.as_str())
{
if channel.starts_with("mmp.triggers") {
let mut count = quote_count_clone.lock().unwrap();
*count += 1;
tracing::warn!("๐จ MMP Trigger #{}: Channel: {}", *count, channel);
if let Some(data) = params.get("data") {
if let Some(currency) = data.get("currency") {
tracing::warn!(" ๐ฑ Currency: {}", currency);
}
if let Some(mmp_group) = data.get("mmp_group") {
tracing::warn!(" ๐ท๏ธ MMP Group: {}", mmp_group);
}
if let Some(reason) = data.get("reason") {
tracing::warn!(" โก Reason: {}", reason);
}
if let Some(frozen_time) = data.get("frozen_time") {
tracing::warn!(" โ๏ธ Frozen Time: {}ms", frozen_time);
}
let mut triggers = mmp_triggers_clone.lock().unwrap();
triggers.push(format!(
"{}:{}",
channel,
data.get("reason")
.unwrap_or(&Value::String("unknown".to_string()))
));
}
}
else if channel == "user.trades"
&& let Some(data) = params.get("data")
&& let Some(trades) = data.as_array()
{
for trade in trades {
if let Some(quote_id) = trade.get("quote_id") {
tracing::info!("๐ฐ Quote Executed: ID {}", quote_id);
if let Some(instrument) = trade.get("instrument_name") {
tracing::info!(" ๐ฏ Instrument: {}", instrument);
}
if let Some(side) = trade.get("direction") {
tracing::info!(" ๐ Side: {}", side);
}
if let Some(amount) = trade.get("amount") {
tracing::info!(" ๐ Amount: {}", amount);
}
if let Some(price) = trade.get("price") {
tracing::info!(" ๐ต Price: {}", price);
}
}
}
}
}
Ok(())
},
|message, error| {
tracing::error!("โ Error processing message '{}': {}", message, error);
},
);
let client = client;
client.connect().await?;
tracing::info!("โ
Connected to Deribit WebSocket");
let (client_id, client_secret) = client.config.get_credentials().unwrap();
client.authenticate(client_id, client_secret).await?;
tracing::info!("๐ Authenticated successfully");
client
.subscribe(vec![
"mmp.triggers.any".to_string(),
"user.trades.any.any".to_string(),
])
.await?;
tracing::info!("๐ก Subscribed to MMP triggers and user trades");
tracing::info!("๐ Setting up multiple MMP groups...");
let mmp_groups = vec![
("btc_tight_spread", 5.0, 2.5, 500, 2000), ("btc_wide_spread", 20.0, 10.0, 1000, 5000), ("btc_scalping", 2.0, 1.0, 200, 1000), ];
for (group_name, qty_limit, delta_limit, interval, frozen_time) in &mmp_groups {
let config = MmpGroupConfig::new(
group_name.to_string(),
*qty_limit,
*delta_limit,
*interval,
*frozen_time,
)?;
match client.set_mmp_config(config).await {
Ok(()) => tracing::info!("โ
MMP group '{}' configured", group_name),
Err(WebSocketError::ApiError {
code: 11050,
message,
..
}) => tracing::warn!(
"โ ๏ธ MMP group '{}' skipped: {} (code 11050 โ MMP not activated on this account)",
group_name,
message
),
Err(e) => return Err(e.into()),
}
}
tracing::info!("๐ฐ Creating layered mass quotes...");
let mut all_quotes = HashMap::new();
let tight_quotes = vec![
Quote::buy("BTC-PERPETUAL".to_string(), 0.05, 49500.0)
.with_quote_set_id("tight_layer_1".to_string()),
Quote::sell("BTC-PERPETUAL".to_string(), 0.05, 50500.0)
.with_quote_set_id("tight_layer_1".to_string()),
];
let wide_quotes = vec![
Quote::buy("BTC-PERPETUAL".to_string(), 0.2, 48000.0)
.with_quote_set_id("wide_layer_1".to_string()),
Quote::sell("BTC-PERPETUAL".to_string(), 0.2, 52000.0)
.with_quote_set_id("wide_layer_1".to_string()),
];
let scalp_quotes = vec![
Quote::buy("BTC-PERPETUAL".to_string(), 0.01, 49900.0)
.with_quote_set_id("scalp_layer_1".to_string()),
Quote::sell("BTC-PERPETUAL".to_string(), 0.01, 50100.0)
.with_quote_set_id("scalp_layer_1".to_string()),
];
all_quotes.insert("btc_tight_spread", tight_quotes);
all_quotes.insert("btc_wide_spread", wide_quotes);
all_quotes.insert("btc_scalping", scalp_quotes);
for (group_name, quotes) in all_quotes {
let request = MassQuoteRequest::new(group_name.to_string(), quotes)
.with_quote_id(format!("{}_batch_1", group_name))
.with_detailed_errors();
match client.mass_quote(request).await {
Ok(response) => {
tracing::info!(
"โ
{} quotes: {} placed, {} errors",
group_name,
response.success_count,
response.error_count
);
}
Err(e) => {
tracing::error!("โ Failed to place {} quotes: {}", group_name, e);
}
}
}
tracing::info!("๐ Monitoring quotes for 30 seconds...");
let start_time = std::time::Instant::now();
let monitor_duration = std::time::Duration::from_secs(30);
while start_time.elapsed() < monitor_duration {
if start_time.elapsed().as_secs().is_multiple_of(5) {
match client.get_mmp_config(None).await {
Ok(configs) => {
tracing::info!("๐ MMP Status Check:");
for config in configs {
tracing::info!(
" ๐ท๏ธ Group: {} - Enabled: {}, Qty Limit: {}",
config.mmp_group,
config.enabled,
config.quantity_limit
);
}
}
Err(e) => {
tracing::warn!("โ ๏ธ Failed to get MMP config: {}", e);
}
}
}
tokio::select! {
result = client.receive_and_process_message() => {
if let Err(e) = result {
tracing::warn!("โ ๏ธ Message processing error: {}", e);
}
}
_ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
}
}
}
tracing::info!("๐ง Demonstrating quote management...");
let cancel_scalp = CancelQuotesRequest::by_quote_set_id("scalp_layer_1".to_string());
match client.cancel_quotes(cancel_scalp).await {
Ok(response) => {
tracing::info!("โ
Cancelled {} scalping quotes", response.cancelled_count);
}
Err(e) => {
tracing::warn!("โ ๏ธ Failed to cancel scalping quotes: {}", e);
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
tracing::info!("๐ Updating wide spread quotes...");
let updated_wide_quotes = vec![
Quote::buy("BTC-PERPETUAL".to_string(), 0.15, 47500.0)
.with_quote_set_id("wide_layer_2".to_string()),
Quote::sell("BTC-PERPETUAL".to_string(), 0.15, 52500.0)
.with_quote_set_id("wide_layer_2".to_string()),
];
let update_request = MassQuoteRequest::new("btc_wide_spread".to_string(), updated_wide_quotes)
.with_quote_id("wide_update_1".to_string());
match client.mass_quote(update_request).await {
Ok(response) => {
tracing::info!("โ
Updated wide spread: {} placed", response.success_count);
}
Err(e) => {
tracing::error!("โ Failed to update wide spread: {}", e);
}
}
tracing::info!("๐งน Final cleanup...");
let cancel_all = CancelQuotesRequest::by_currency("BTC".to_string());
match client.cancel_quotes(cancel_all).await {
Ok(response) => {
tracing::info!("โ
Cancelled {} remaining quotes", response.cancelled_count);
}
Err(e) => {
tracing::warn!("โ ๏ธ Failed to cancel all quotes: {}", e);
}
}
for (group_name, _, _, _, _) in &mmp_groups {
let disable_config = MmpGroupConfig::new(
group_name.to_string(),
1.0,
0.5,
0, 1000,
)?
.disable();
match client.set_mmp_config(disable_config).await {
Ok(()) => {
tracing::info!("โ
Disabled MMP group '{}'", group_name);
}
Err(e) => {
tracing::warn!("โ ๏ธ Failed to disable MMP group '{}': {}", group_name, e);
}
}
}
let trigger_count = mmp_triggers.lock().unwrap().len();
tracing::info!("๐ Advanced Mass Quote Example Summary:");
tracing::info!(" ๐ท๏ธ MMP Groups Created: {}", mmp_groups.len());
tracing::info!(" ๐จ MMP Triggers Received: {}", trigger_count);
tracing::info!(" โ
Example completed successfully!");
Ok(())
}