deribit_fix/session/
mod.rs

1//! FIX session management
2
3use crate::{
4    client::{NewOrderRequest, OrderSide, OrderType, Position, TimeInForce},
5    config::Config,
6    connection::Connection,
7    error::{DeribitFixError, Result},
8    message::{FixMessage, MessageBuilder},
9    types::MsgType,
10    utils::{generate_nonce, generate_timestamp},
11};
12use base64::prelude::*;
13use chrono::Utc;
14use sha2::{Digest, Sha256};
15use std::sync::Arc;
16use tokio::sync::Mutex;
17use tracing::{debug, error, info, warn};
18
19/// FIX session state
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum SessionState {
22    Disconnected,
23    LogonSent,
24    LoggedOn,
25    LogoutSent,
26}
27
28/// FIX session manager
29pub struct Session {
30    config: Config,
31    connection: Option<Arc<Mutex<Connection>>>,
32    state: SessionState,
33    outgoing_seq_num: u32,
34    incoming_seq_num: u32,
35}
36
37impl Session {
38    /// Create a new FIX session
39    pub fn new(config: &Config) -> Result<Self> {
40        Ok(Self {
41            config: config.clone(),
42            connection: None,
43            state: SessionState::Disconnected,
44            outgoing_seq_num: 1,
45            incoming_seq_num: 1,
46        })
47    }
48
49    /// Set the connection for this session
50    pub fn set_connection(&mut self, connection: Arc<Mutex<Connection>>) {
51        self.connection = Some(connection);
52    }
53
54    /// Perform FIX logon
55    pub async fn logon(&mut self) -> Result<()> {
56        let connection = self.connection.as_ref()
57            .ok_or_else(|| DeribitFixError::Session("No connection available".to_string()))?;
58
59        info!("Initiating FIX logon");
60        self.state = SessionState::LogonSent;
61
62        // Generate authentication data
63        let timestamp = generate_timestamp();
64        let nonce = generate_nonce(32);
65        let raw_data = format!("{}.{}", timestamp, nonce);
66        
67        // Calculate password hash
68        let password_hash = self.calculate_password_hash(&raw_data)?;
69        
70        // Build logon message
71        let mut builder = MessageBuilder::new()
72            .msg_type(MsgType::Logon)
73            .sender_comp_id(self.config.sender_comp_id.clone())
74            .target_comp_id(self.config.target_comp_id.clone())
75            .msg_seq_num(self.outgoing_seq_num)
76            .sending_time(Utc::now())
77            .field(108, self.config.heartbeat_interval.to_string()) // HeartBtInt
78            .field(553, self.config.username.clone()) // Username
79            .field(554, password_hash) // Password
80            .field(95, raw_data.len().to_string()) // RawDataLength
81            .field(96, raw_data.clone()); // RawData
82
83        // Add cancel on disconnect if configured
84        if self.config.cancel_on_disconnect {
85            builder = builder.field(9001, "Y".to_string()); // CancelOnDisconnect
86        } else {
87            builder = builder.field(9001, "N".to_string());
88        }
89
90        // Add application signature if configured
91        if let (Some(app_id), Some(app_secret)) = (&self.config.app_id, &self.config.app_secret) {
92            let app_sig = self.calculate_app_signature(&raw_data, app_secret)?;
93            builder = builder
94                .field(9004, app_id.clone()) // DeribitAppId
95                .field(9005, app_sig); // DeribitAppSig
96        }
97
98        let logon_message = builder.build()?;
99        
100        let mut conn_guard = connection.lock().await;
101        conn_guard.send_message(&logon_message).await?;
102        
103        self.outgoing_seq_num += 1;
104        
105        info!("Logon message sent");
106        Ok(())
107    }
108
109    /// Perform FIX logout
110    pub async fn logout(&mut self) -> Result<()> {
111        let connection = self.connection.as_ref()
112            .ok_or_else(|| DeribitFixError::Session("No connection available".to_string()))?;
113
114        info!("Initiating FIX logout");
115        self.state = SessionState::LogoutSent;
116
117        let logout_message = MessageBuilder::new()
118            .msg_type(MsgType::Logout)
119            .sender_comp_id(self.config.sender_comp_id.clone())
120            .target_comp_id(self.config.target_comp_id.clone())
121            .msg_seq_num(self.outgoing_seq_num)
122            .sending_time(Utc::now())
123            .build()?;
124
125        let mut conn_guard = connection.lock().await;
126        conn_guard.send_message(&logout_message).await?;
127        
128        self.outgoing_seq_num += 1;
129        self.state = SessionState::Disconnected;
130        
131        info!("Logout message sent");
132        Ok(())
133    }
134
135    /// Send a heartbeat message
136    pub async fn send_heartbeat(&mut self, test_req_id: Option<String>) -> Result<()> {
137        let connection = self.connection.as_ref()
138            .ok_or_else(|| DeribitFixError::Session("No connection available".to_string()))?;
139
140        let mut builder = MessageBuilder::new()
141            .msg_type(MsgType::Heartbeat)
142            .sender_comp_id(self.config.sender_comp_id.clone())
143            .target_comp_id(self.config.target_comp_id.clone())
144            .msg_seq_num(self.outgoing_seq_num)
145            .sending_time(Utc::now());
146
147        if let Some(test_req_id) = test_req_id {
148            builder = builder.field(112, test_req_id); // TestReqID
149        }
150
151        let heartbeat_message = builder.build()?;
152        
153        let mut conn_guard = connection.lock().await;
154        conn_guard.send_message(&heartbeat_message).await?;
155        
156        self.outgoing_seq_num += 1;
157        
158        debug!("Heartbeat sent");
159        Ok(())
160    }
161
162    /// Send a new order
163    pub async fn send_new_order(&mut self, order: NewOrderRequest) -> Result<String> {
164        let connection = self.connection.as_ref()
165            .ok_or_else(|| DeribitFixError::Session("No connection available".to_string()))?;
166
167        if self.state != SessionState::LoggedOn {
168            return Err(DeribitFixError::Session("Not logged on".to_string()));
169        }
170
171        let client_order_id = order.client_order_id
172            .unwrap_or_else(|| format!("ORDER_{}", generate_timestamp()));
173
174        let mut builder = MessageBuilder::new()
175            .msg_type(MsgType::NewOrderSingle)
176            .sender_comp_id(self.config.sender_comp_id.clone())
177            .target_comp_id(self.config.target_comp_id.clone())
178            .msg_seq_num(self.outgoing_seq_num)
179            .sending_time(Utc::now())
180            .field(11, client_order_id.clone()) // ClOrdID
181            .field(55, order.symbol) // Symbol
182            .field(54, match order.side { // Side
183                OrderSide::Buy => "1",
184                OrderSide::Sell => "2",
185            }.to_string())
186            .field(38, order.quantity.to_string()) // OrderQty
187            .field(40, match order.order_type { // OrdType
188                OrderType::Market => "1",
189                OrderType::Limit => "2",
190                OrderType::Stop => "3",
191                OrderType::StopLimit => "4",
192            }.to_string())
193            .field(59, match order.time_in_force { // TimeInForce
194                TimeInForce::Day => "0",
195                TimeInForce::GoodTillCancel => "1",
196                TimeInForce::ImmediateOrCancel => "3",
197                TimeInForce::FillOrKill => "4",
198            }.to_string());
199
200        // Add price for limit orders
201        if let Some(price) = order.price {
202            builder = builder.field(44, price.to_string()); // Price
203        }
204
205        let order_message = builder.build()?;
206        
207        let mut conn_guard = connection.lock().await;
208        conn_guard.send_message(&order_message).await?;
209        
210        self.outgoing_seq_num += 1;
211        
212        info!("New order sent: {}", client_order_id);
213        Ok(client_order_id)
214    }
215
216    /// Cancel an order
217    pub async fn cancel_order(&mut self, order_id: String) -> Result<()> {
218        let connection = self.connection.as_ref()
219            .ok_or_else(|| DeribitFixError::Session("No connection available".to_string()))?;
220
221        if self.state != SessionState::LoggedOn {
222            return Err(DeribitFixError::Session("Not logged on".to_string()));
223        }
224
225        let cancel_message = MessageBuilder::new()
226            .msg_type(MsgType::OrderCancelRequest)
227            .sender_comp_id(self.config.sender_comp_id.clone())
228            .target_comp_id(self.config.target_comp_id.clone())
229            .msg_seq_num(self.outgoing_seq_num)
230            .sending_time(Utc::now())
231            .field(11, format!("CANCEL_{}", generate_timestamp())) // ClOrdID
232            .field(41, order_id) // OrigClOrdID
233            .build()?;
234
235        let mut conn_guard = connection.lock().await;
236        conn_guard.send_message(&cancel_message).await?;
237        
238        self.outgoing_seq_num += 1;
239        
240        info!("Order cancel request sent");
241        Ok(())
242    }
243
244    /// Subscribe to market data
245    pub async fn subscribe_market_data(&mut self, symbol: String) -> Result<()> {
246        let connection = self.connection.as_ref()
247            .ok_or_else(|| DeribitFixError::Session("No connection available".to_string()))?;
248
249        if self.state != SessionState::LoggedOn {
250            return Err(DeribitFixError::Session("Not logged on".to_string()));
251        }
252
253        let md_request = MessageBuilder::new()
254            .msg_type(MsgType::MarketDataRequest)
255            .sender_comp_id(self.config.sender_comp_id.clone())
256            .target_comp_id(self.config.target_comp_id.clone())
257            .msg_seq_num(self.outgoing_seq_num)
258            .sending_time(Utc::now())
259            .field(262, format!("MD_{}", generate_timestamp())) // MDReqID
260            .field(263, "1".to_string()) // SubscriptionRequestType (Snapshot + Updates)
261            .field(264, "0".to_string()) // MarketDepth (Full Book)
262            .field(146, "1".to_string()) // NoRelatedSym
263            .field(55, symbol) // Symbol
264            .build()?;
265
266        let mut conn_guard = connection.lock().await;
267        conn_guard.send_message(&md_request).await?;
268        
269        self.outgoing_seq_num += 1;
270        
271        info!("Market data subscription sent");
272        Ok(())
273    }
274
275    /// Request positions
276    pub async fn request_positions(&mut self) -> Result<Vec<Position>> {
277        let connection = self.connection.as_ref()
278            .ok_or_else(|| DeribitFixError::Session("No connection available".to_string()))?;
279
280        if self.state != SessionState::LoggedOn {
281            return Err(DeribitFixError::Session("Not logged on".to_string()));
282        }
283
284        let pos_request = MessageBuilder::new()
285            .msg_type(MsgType::RequestForPositions)
286            .sender_comp_id(self.config.sender_comp_id.clone())
287            .target_comp_id(self.config.target_comp_id.clone())
288            .msg_seq_num(self.outgoing_seq_num)
289            .sending_time(Utc::now())
290            .field(710, format!("POS_{}", generate_timestamp())) // PosReqID
291            .field(724, "0".to_string()) // PosReqType (Positions)
292            .build()?;
293
294        let mut conn_guard = connection.lock().await;
295        conn_guard.send_message(&pos_request).await?;
296        
297        self.outgoing_seq_num += 1;
298        
299        info!("Position request sent");
300        
301        // TODO: Implement position response parsing
302        Ok(vec![])
303    }
304
305    /// Calculate password hash for authentication
306    fn calculate_password_hash(&self, raw_data: &str) -> Result<String> {
307        let mut hasher = Sha256::new();
308        hasher.update(raw_data.as_bytes());
309        hasher.update(self.config.password.as_bytes());
310        let hash = hasher.finalize();
311        Ok(BASE64_STANDARD.encode(hash))
312    }
313
314    /// Calculate application signature for registered apps
315    fn calculate_app_signature(&self, raw_data: &str, app_secret: &str) -> Result<String> {
316        let mut hasher = Sha256::new();
317        hasher.update(raw_data.as_bytes());
318        hasher.update(app_secret.as_bytes());
319        let hash = hasher.finalize();
320        Ok(BASE64_STANDARD.encode(hash))
321    }
322
323    /// Get current session state
324    pub fn state(&self) -> SessionState {
325        self.state
326    }
327
328    /// Set session state (for testing)
329    pub fn set_state(&mut self, state: SessionState) {
330        self.state = state;
331    }
332}