use crate::config::gen_id;
use crate::model::message::FixMessage;
use crate::model::position::Position;
use crate::model::request::{NewOrderRequest, OrderSide, OrderType, TimeInForce};
use crate::model::types::MsgType;
use crate::{
config::DeribitFixConfig,
connection::Connection,
error::{DeribitFixError, Result},
message::{MessageBuilder, PositionReport, RequestForPositions},
};
use base64::prelude::*;
use chrono::Utc;
use rand;
use sha2::{Digest, Sha256};
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{debug, error, info, trace};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SessionState {
Disconnected,
LogonSent,
LoggedOn,
LogoutSent,
}
pub struct Session {
config: DeribitFixConfig,
connection: Option<Arc<Mutex<Connection>>>,
state: SessionState,
outgoing_seq_num: u32,
incoming_seq_num: u32,
}
impl Session {
pub fn new(config: &DeribitFixConfig, connection: Arc<Mutex<Connection>>) -> Result<Self> {
info!("Creating new FIX session");
Ok(Self {
config: config.clone(),
state: SessionState::Disconnected,
outgoing_seq_num: 1,
incoming_seq_num: 1,
connection: Some(connection),
})
}
pub fn set_connection(&mut self, connection: Arc<Mutex<Connection>>) {
self.connection = Some(connection);
}
pub fn get_state(&self) -> SessionState {
self.state
}
async fn send_message(&mut self, message: FixMessage) -> Result<()> {
if let Some(connection) = &self.connection {
let mut conn_guard = connection.lock().await;
conn_guard.send_message(&message).await?;
debug!("Sent FIX message: {}", message.to_string());
} else {
return Err(DeribitFixError::Connection(
"No connection available".to_string(),
));
}
Ok(())
}
pub async fn logon(&mut self) -> Result<()> {
info!("Performing FIX logon");
let (raw_data, password_hash) = self.generate_auth_data(&self.config.password)?;
let mut message_builder = MessageBuilder::new()
.msg_type(MsgType::Logon)
.sender_comp_id(self.config.sender_comp_id.clone())
.target_comp_id(self.config.target_comp_id.clone())
.msg_seq_num(self.outgoing_seq_num)
.field(108, self.config.heartbeat_interval.to_string()) .field(96, raw_data.clone()) .field(553, self.config.username.clone()) .field(554, password_hash);
message_builder = message_builder.field(95, raw_data.len().to_string());
if let Some(use_wordsafe_tags) = &self.config.use_wordsafe_tags {
message_builder =
message_builder.field(9002, if *use_wordsafe_tags { "Y" } else { "N" }.to_string()); }
message_builder = message_builder.field(
9001,
if self.config.cancel_on_disconnect {
"Y"
} else {
"N"
}
.to_string(),
);
if let Some(app_id) = &self.config.app_id {
message_builder = message_builder.field(9004, app_id.clone()); }
if let Some(app_secret) = &self.config.app_secret
&& let Some(raw_data_str) = raw_data
.split_once('.')
.map(|(timestamp, nonce)| format!("{}.{}", timestamp, nonce))
&& let Ok(app_sig) = self.calculate_app_signature(&raw_data_str, app_secret)
{
message_builder = message_builder.field(9005, app_sig); }
if let Some(deribit_sequential) = &self.config.deribit_sequential {
message_builder = message_builder.field(
9007,
if *deribit_sequential { "Y" } else { "N" }.to_string(),
); }
if let Some(unsubscribe_exec_reports) = &self.config.unsubscribe_execution_reports {
message_builder = message_builder.field(
9009,
if *unsubscribe_exec_reports { "Y" } else { "N" }.to_string(),
); }
if let Some(connection_only_exec_reports) = &self.config.connection_only_execution_reports {
message_builder = message_builder.field(
9010,
if *connection_only_exec_reports {
"Y"
} else {
"N"
}
.to_string(),
); }
if let Some(report_fills_as_exec_reports) = &self.config.report_fills_as_exec_reports {
message_builder = message_builder.field(
9015,
if *report_fills_as_exec_reports {
"Y"
} else {
"N"
}
.to_string(),
); }
if let Some(display_increment_steps) = &self.config.display_increment_steps {
message_builder = message_builder.field(
9018,
if *display_increment_steps { "Y" } else { "N" }.to_string(),
); }
if let Some(app_id) = &self.config.app_id {
message_builder = message_builder.field(1128, app_id.clone()); }
let logon_message = message_builder.build()?;
self.send_message(logon_message).await?;
self.state = SessionState::LogonSent;
self.outgoing_seq_num += 1;
info!("Logon message sent");
Ok(())
}
pub async fn logout(&mut self) -> Result<()> {
self.logout_with_options(None, None).await
}
pub async fn logout_with_options(
&mut self,
text: Option<String>,
dont_cancel_on_disconnect: Option<bool>,
) -> Result<()> {
info!("Performing FIX logout");
let mut message_builder = MessageBuilder::new()
.msg_type(MsgType::Logout)
.sender_comp_id(self.config.sender_comp_id.clone())
.target_comp_id(self.config.target_comp_id.clone())
.msg_seq_num(self.outgoing_seq_num);
let logout_text = text.unwrap_or_else(|| "Normal logout".to_string());
message_builder = message_builder.field(58, logout_text);
if let Some(dont_cancel) = dont_cancel_on_disconnect {
message_builder =
message_builder.field(9003, if dont_cancel { "Y" } else { "N" }.to_string()); }
let logout_message = message_builder.build()?;
self.send_message(logout_message).await?;
self.state = SessionState::LogoutSent;
self.outgoing_seq_num += 1;
info!("Logout message sent");
Ok(())
}
pub async fn send_heartbeat(&mut self, test_req_id: Option<String>) -> Result<()> {
debug!("Sending heartbeat message");
let mut builder = MessageBuilder::new()
.msg_type(MsgType::Heartbeat)
.sender_comp_id(self.config.sender_comp_id.clone())
.target_comp_id(self.config.target_comp_id.clone())
.msg_seq_num(self.outgoing_seq_num);
if let Some(test_req_id) = test_req_id {
builder = builder.field(112, test_req_id); }
let heartbeat_message = builder.build()?;
self.send_message(heartbeat_message).await?;
self.outgoing_seq_num += 1;
debug!("Heartbeat message sent");
Ok(())
}
pub async fn send_new_order(&mut self, order: NewOrderRequest) -> Result<String> {
info!("Sending new order: {:?}", order);
let order_id = order
.client_order_id
.clone()
.unwrap_or_else(|| format!("ORDER_{}", gen_id()));
let ord_type = match order.order_type {
OrderType::Market => "1", OrderType::Limit => "2", OrderType::StopLimit => "4", OrderType::StopMarket => "3", _ => "2", };
let mut builder = MessageBuilder::new()
.msg_type(MsgType::NewOrderSingle)
.sender_comp_id(self.config.sender_comp_id.clone())
.target_comp_id(self.config.target_comp_id.clone())
.msg_seq_num(self.outgoing_seq_num)
.field(11, order_id.clone()) .field(55, order.instrument_name.clone()) .field(
54,
match order.side {
OrderSide::Buy => "1".to_string(),
OrderSide::Sell => "2".to_string(),
},
) .field(60, Utc::now().format("%Y%m%d-%H:%M:%S%.3f").to_string()) .field(38, order.amount.to_string()) .field(40, ord_type.to_string());
if order.order_type == OrderType::Limit || order.price.is_some() {
builder = builder.field(44, order.price.unwrap_or(0.0).to_string()); }
let tif = match order.time_in_force {
TimeInForce::GoodTilCancelled => "1",
TimeInForce::ImmediateOrCancel => "3",
TimeInForce::FillOrKill => "4",
TimeInForce::GoodTilDay => "6",
};
builder = builder.field(59, tif.to_string());
let mut exec_inst = String::new();
if order.post_only == Some(true) {
exec_inst.push('6'); }
if order.reduce_only == Some(true) {
exec_inst.push('E'); }
if !exec_inst.is_empty() {
builder = builder.field(18, exec_inst); }
if let Some(label) = &order.label {
builder = builder.field(100010, label.clone()); }
let order_message = builder.build()?;
self.send_message(order_message).await?;
self.outgoing_seq_num += 1;
info!("New order message sent with ID: {}", order_id);
Ok(order_id)
}
pub async fn cancel_order(&mut self, order_id: String) -> Result<()> {
self.cancel_order_with_symbol(order_id, None).await
}
pub async fn cancel_order_with_symbol(
&mut self,
order_id: String,
symbol: Option<String>,
) -> Result<()> {
info!("Cancelling order: {} with symbol: {:?}", order_id, symbol);
let cancel_id = format!("CANCEL_{}", gen_id());
let mut builder = MessageBuilder::new()
.msg_type(MsgType::OrderCancelRequest)
.sender_comp_id(self.config.sender_comp_id.clone())
.target_comp_id(self.config.target_comp_id.clone())
.msg_seq_num(self.outgoing_seq_num)
.field(11, cancel_id) .field(41, order_id) .field(60, Utc::now().format("%Y%m%d-%H:%M:%S%.3f").to_string());
if let Some(symbol_value) = symbol {
builder = builder.field(55, symbol_value); }
let cancel_message = builder.build()?;
self.send_message(cancel_message).await?;
self.outgoing_seq_num += 1;
info!("Order cancel message sent");
Ok(())
}
pub async fn subscribe_market_data(&mut self, symbol: String) -> Result<()> {
info!("Subscribing to market data for: {}", symbol);
let request_id = format!("MDR_{}", gen_id());
let market_data_request = MessageBuilder::new()
.msg_type(MsgType::MarketDataRequest)
.sender_comp_id(self.config.sender_comp_id.clone())
.target_comp_id(self.config.target_comp_id.clone())
.msg_seq_num(self.outgoing_seq_num)
.field(262, request_id.clone()) .field(263, "1".to_string()) .field(264, "0".to_string()) .field(267, "2".to_string()) .field(269, "0".to_string()) .field(269, "1".to_string()) .field(146, "1".to_string()) .field(55, symbol.clone()) .build()?;
self.send_message(market_data_request).await?;
self.outgoing_seq_num += 1;
info!(
"Market data subscription request sent for symbol: {} with ID: {}",
symbol, request_id
);
Ok(())
}
pub async fn request_positions(&mut self) -> Result<Vec<Position>> {
use std::time::{Duration, Instant};
use tracing::{debug, info, warn};
info!("Requesting positions");
let request_id = format!("POS_{}", gen_id());
let position_request = RequestForPositions::all_positions(request_id.clone())
.with_clearing_date(Utc::now().format("%Y%m%d").to_string());
let fix_message = position_request.to_fix_message(
self.config.sender_comp_id.clone(),
self.config.target_comp_id.clone(),
self.outgoing_seq_num,
)?;
self.send_message(fix_message).await?;
self.outgoing_seq_num += 1;
info!(
"Position request sent, awaiting responses for request ID: {}",
request_id
);
let mut positions = Vec::new();
let timeout = Duration::from_secs(30); let start_time = Instant::now();
loop {
if start_time.elapsed() > timeout {
warn!("Position request timed out after {:?}", timeout);
break;
}
match self.receive_and_process_message().await {
Ok(Some(message)) => {
if let Some(msg_type_str) = message.get_field(35)
&& msg_type_str == "AP"
{
if let Some(pos_req_id) = message.get_field(710) {
if pos_req_id == &request_id {
debug!("Received PositionReport for request: {}", request_id);
if message.get_field(55).is_none() {
info!(
"Received empty position report - indicates no active positions for this request"
);
debug!(
"Empty PositionReport details: PosReqID={}, PosMaintRptID={:?}",
request_id,
message.get_field(721)
);
continue;
}
match PositionReport::try_from_fix_message(&message) {
Ok(position) => {
debug!("Successfully parsed position: {:?}", position);
positions.push(position);
}
Err(e) => {
warn!("Failed to parse PositionReport: {}", e);
debug!("Message fields: {:?}", message);
}
}
} else {
debug!(
"Received PositionReport for different request: {}",
pos_req_id
);
}
}
}
}
Ok(None) => {
tokio::time::sleep(Duration::from_millis(10)).await;
}
Err(e) => {
warn!("Error receiving message: {}", e);
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
if !positions.is_empty() && start_time.elapsed() > Duration::from_secs(5) {
debug!(
"Received {} positions, stopping collection",
positions.len()
);
break;
}
}
info!(
"Position request completed, received {} positions",
positions.len()
);
Ok(positions)
}
pub fn generate_auth_data(&self, access_secret: &str) -> Result<(String, String)> {
let timestamp = Utc::now().timestamp_millis();
let mut nonce_bytes = vec![0u8; 32];
for byte in nonce_bytes.iter_mut() {
*byte = rand::random::<u8>();
}
let nonce_b64 = BASE64_STANDARD.encode(&nonce_bytes);
let raw_data = format!("{timestamp}.{nonce_b64}");
let mut auth_data = raw_data.as_bytes().to_vec();
auth_data.extend_from_slice(access_secret.as_bytes());
debug!("Auth Data at Timestamp: {}", timestamp);
trace!("Nonce length: {} bytes", nonce_bytes.len());
trace!("Nonce (base64): {}", nonce_b64);
trace!("RawData: {}", raw_data);
trace!("Access secret: {}", access_secret);
trace!("Auth data length: {} bytes", auth_data.len());
let mut hasher = Sha256::new();
hasher.update(&auth_data);
let hash_result = hasher.finalize();
let password_hash = BASE64_STANDARD.encode(hash_result);
debug!("Password hash: {}", password_hash);
Ok((raw_data, password_hash))
}
#[allow(dead_code)]
fn calculate_app_signature(&self, raw_data: &str, app_secret: &str) -> Result<String> {
let mut hasher = Sha256::new();
hasher.update(format!("{raw_data}{app_secret}").as_bytes());
let result = hasher.finalize();
Ok(BASE64_STANDARD.encode(result))
}
pub fn state(&self) -> SessionState {
self.state
}
pub fn set_state(&mut self, state: SessionState) {
self.state = state;
}
async fn process_message(&mut self, message: &FixMessage) -> Result<()> {
debug!("Processing FIX message: {:?}", message);
let msg_type_str = message.get_field(35).unwrap_or(&String::new()).clone();
if msg_type_str.is_empty() {
debug!("Skipping message with empty message type");
return Ok(());
}
let msg_type = MsgType::from_str(&msg_type_str).map_err(|_| {
DeribitFixError::MessageParsing(format!("Unknown message type: {msg_type_str}"))
})?;
match msg_type {
MsgType::Logon => {
info!("Received logon response");
self.state = SessionState::LoggedOn;
}
MsgType::Logout => {
info!("Received logout message");
self.state = SessionState::Disconnected;
}
MsgType::Heartbeat => {
debug!("Received heartbeat");
}
MsgType::TestRequest => {
debug!("Received test request, sending heartbeat response");
let test_req_id = message.get_field(112);
self.send_heartbeat(test_req_id.cloned()).await?;
}
MsgType::ExecutionReport => {
debug!("Received ExecutionReport: {:?}", message);
}
MsgType::PositionReport => {
debug!("Received PositionReport: {:?}", message);
}
MsgType::Reject => {
error!("Received Reject message: {:?}", message);
}
_ => {
debug!("Received message type: {:?}", msg_type);
}
}
self.incoming_seq_num += 1;
Ok(())
}
pub async fn receive_and_process_message(&mut self) -> Result<Option<FixMessage>> {
let message = if let Some(connection) = &self.connection {
let mut conn_guard = connection.lock().await;
conn_guard.receive_message().await?
} else {
None
};
if let Some(message) = message {
self.process_message(&message).await?;
Ok(Some(message))
} else {
Ok(None)
}
}
}