deribit_fix/session/
fix_session.rs

1//! FIX session management
2
3use crate::model::message::FixMessage;
4use crate::model::types::MsgType;
5use crate::{
6    config::DeribitFixConfig,
7    connection::Connection,
8    error::{DeribitFixError, Result},
9    message::MessageBuilder,
10};
11use base64::prelude::*;
12use chrono::Utc;
13use deribit_base::prelude::*;
14use rand;
15use sha2::{Digest, Sha256};
16use std::str::FromStr;
17use std::sync::Arc;
18use tokio::sync::Mutex;
19use tracing::{debug, info};
20
21/// FIX session state
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum SessionState {
24    /// Session is disconnected
25    Disconnected,
26    /// Logon message sent, waiting for response
27    LogonSent,
28    /// Session is logged on and active
29    LoggedOn,
30    /// Logout message sent, waiting for confirmation
31    LogoutSent,
32}
33
34/// FIX session manager
35pub struct Session {
36    config: DeribitFixConfig,
37    connection: Option<Arc<Mutex<Connection>>>,
38    state: SessionState,
39    outgoing_seq_num: u32,
40    incoming_seq_num: u32,
41}
42
43impl Session {
44    /// Create a new FIX session
45    pub fn new(config: &DeribitFixConfig, connection: Arc<Mutex<Connection>>) -> Result<Self> {
46        info!("Creating new FIX session");
47        Ok(Self {
48            config: config.clone(),
49            state: SessionState::Disconnected,
50            outgoing_seq_num: 1,
51            incoming_seq_num: 1,
52            connection: Some(connection),
53        })
54    }
55
56    /// Set the connection for this session
57    pub fn set_connection(&mut self, connection: Arc<Mutex<Connection>>) {
58        self.connection = Some(connection);
59    }
60
61    /// Get the current session state
62    pub fn get_state(&self) -> SessionState {
63        self.state
64    }
65
66    /// Send a FIX message through the connection
67    async fn send_message(&mut self, message: FixMessage) -> Result<()> {
68        if let Some(connection) = &self.connection {
69            let mut conn_guard = connection.lock().await;
70            conn_guard.send_message(&message).await?;
71            debug!("Sent FIX message: {}", message.to_string());
72        } else {
73            return Err(DeribitFixError::Connection(
74                "No connection available".to_string(),
75            ));
76        }
77        Ok(())
78    }
79
80    /// Perform FIX logon
81    pub async fn logon(&mut self) -> Result<()> {
82        info!("Performing FIX logon");
83
84        // Generate RawData and password hash according to Deribit FIX spec
85        let (raw_data, password_hash) = self.generate_auth_data(&self.config.password)?;
86
87        let message_builder = MessageBuilder::new()
88            .msg_type(MsgType::Logon)
89            .sender_comp_id(self.config.sender_comp_id.clone())
90            .target_comp_id(self.config.target_comp_id.clone())
91            .msg_seq_num(self.outgoing_seq_num)
92            .field(96, raw_data) // RawData - timestamp.nonce
93            .field(98, "0".to_string()) // EncryptMethod (0 = None)
94            .field(108, self.config.heartbeat_interval.to_string()) // HeartBtInt
95            .field(553, self.config.username.clone()) // Username
96            .field(554, password_hash); // Password - base64(sha256(nonce ++ access_secret))
97
98        // Add AppID if available - temporarily disabled for testing
99        // if let Some(app_id) = &self.config.app_id {
100        //     message_builder = message_builder.field(1128, app_id.clone()); // AppID
101        // }
102
103        let logon_message = message_builder.build()?;
104
105        // Send the logon message
106        self.send_message(logon_message).await?;
107        self.state = SessionState::LogonSent;
108        self.outgoing_seq_num += 1;
109
110        info!("Logon message sent");
111        Ok(())
112    }
113
114    /// Perform FIX logout
115    pub async fn logout(&mut self) -> Result<()> {
116        info!("Performing FIX logout");
117
118        let logout_message = MessageBuilder::new()
119            .msg_type(MsgType::Logout)
120            .sender_comp_id(self.config.sender_comp_id.clone())
121            .target_comp_id(self.config.target_comp_id.clone())
122            .msg_seq_num(self.outgoing_seq_num)
123            .field(58, "Normal logout".to_string()) // Text
124            .build()?;
125
126        // Send the logout message
127        self.send_message(logout_message).await?;
128        self.state = SessionState::LogoutSent;
129        self.outgoing_seq_num += 1;
130
131        info!("Logout message sent");
132        Ok(())
133    }
134
135    /// Send a heartbeat message
136    pub async fn send_heartbeat(&mut self, test_req_id: Option<String>) -> Result<()> {
137        debug!("Sending heartbeat message");
138
139        let mut builder = MessageBuilder::new()
140            .msg_type(MsgType::Heartbeat)
141            .sender_comp_id(self.config.sender_comp_id.clone())
142            .target_comp_id(self.config.target_comp_id.clone())
143            .msg_seq_num(self.outgoing_seq_num);
144
145        if let Some(test_req_id) = test_req_id {
146            builder = builder.field(112, test_req_id); // TestReqID
147        }
148
149        let heartbeat_message = builder.build()?;
150
151        // Send the heartbeat message
152        self.send_message(heartbeat_message).await?;
153        self.outgoing_seq_num += 1;
154
155        debug!("Heartbeat message sent");
156        Ok(())
157    }
158
159    /// Send a new order
160    pub fn send_new_order(&mut self, order: NewOrderRequest) -> Result<String> {
161        info!("Sending new order: {:?}", order);
162
163        let order_id = format!("ORDER_{}", chrono::Utc::now().timestamp_millis());
164
165        let _order_message = MessageBuilder::new()
166            .msg_type(MsgType::NewOrderSingle)
167            .sender_comp_id(self.config.sender_comp_id.clone())
168            .target_comp_id(self.config.target_comp_id.clone())
169            .msg_seq_num(self.outgoing_seq_num)
170            .field(11, order_id.clone()) // ClOrdID
171            .field(55, order.instrument_name.clone()) // Symbol
172            .field(
173                54,
174                match order.side {
175                    deribit_base::model::order::OrderSide::Buy => "1".to_string(),
176                    deribit_base::model::order::OrderSide::Sell => "2".to_string(),
177                },
178            ) // Side
179            .field(60, Utc::now().format("%Y%m%d-%H:%M:%S%.3f").to_string()) // TransactTime
180            .field(38, order.amount.to_string()) // OrderQty
181            .field(40, "2".to_string()) // OrdType (2 = Limit)
182            .field(44, order.price.unwrap_or(0.0).to_string()) // Price
183            .build()?;
184
185        // In a real implementation, you would send this message
186        self.outgoing_seq_num += 1;
187
188        info!("New order message prepared with ID: {}", order_id);
189        Ok(order_id)
190    }
191
192    /// Cancel an order
193    pub fn cancel_order(&mut self, order_id: String) -> Result<()> {
194        info!("Cancelling order: {}", order_id);
195
196        let cancel_id = format!("CANCEL_{}", chrono::Utc::now().timestamp_millis());
197
198        let _cancel_message = MessageBuilder::new()
199            .msg_type(MsgType::OrderCancelRequest)
200            .sender_comp_id(self.config.sender_comp_id.clone())
201            .target_comp_id(self.config.target_comp_id.clone())
202            .msg_seq_num(self.outgoing_seq_num)
203            .field(11, cancel_id) // ClOrdID
204            .field(41, order_id) // OrigClOrdID
205            .field(60, Utc::now().format("%Y%m%d-%H:%M:%S%.3f").to_string()) // TransactTime
206            .build()?;
207
208        // In a real implementation, you would send this message
209        self.outgoing_seq_num += 1;
210
211        info!("Order cancel message prepared");
212        Ok(())
213    }
214
215    /// Subscribe to market data
216    pub fn subscribe_market_data(&mut self, symbol: String) -> Result<()> {
217        info!("Subscribing to market data for: {}", symbol);
218
219        let request_id = format!("MDR_{}", chrono::Utc::now().timestamp_millis());
220
221        let _market_data_request = MessageBuilder::new()
222            .msg_type(MsgType::MarketDataRequest)
223            .sender_comp_id(self.config.sender_comp_id.clone())
224            .target_comp_id(self.config.target_comp_id.clone())
225            .msg_seq_num(self.outgoing_seq_num)
226            .field(262, request_id) // MDReqID
227            .field(263, "1".to_string()) // SubscriptionRequestType (1 = Snapshot + Updates)
228            .field(264, "0".to_string()) // MarketDepth (0 = Full Book)
229            .field(267, "2".to_string()) // NoMDEntryTypes
230            .field(269, "0".to_string()) // MDEntryType (0 = Bid)
231            .field(269, "1".to_string()) // MDEntryType (1 = Offer)
232            .field(146, "1".to_string()) // NoRelatedSym
233            .field(55, symbol) // Symbol
234            .build()?;
235
236        // In a real implementation, you would send this message
237        self.outgoing_seq_num += 1;
238
239        info!("Market data subscription message prepared");
240        Ok(())
241    }
242
243    /// Request positions
244    pub fn request_positions(&mut self) -> Result<Vec<Position>> {
245        info!("Requesting positions");
246
247        let request_id = format!("POS_{}", chrono::Utc::now().timestamp_millis());
248
249        let _position_request = MessageBuilder::new()
250            .msg_type(MsgType::RequestForPositions)
251            .sender_comp_id(self.config.sender_comp_id.clone())
252            .target_comp_id(self.config.target_comp_id.clone())
253            .msg_seq_num(self.outgoing_seq_num)
254            .field(710, request_id) // PosReqID
255            .field(724, "0".to_string()) // PosReqType (0 = Positions)
256            .field(263, "1".to_string()) // SubscriptionRequestType
257            .field(715, Utc::now().format("%Y%m%d").to_string()) // ClearingBusinessDate
258            .build()?;
259
260        // In a real implementation, you would send this message and wait for response
261        self.outgoing_seq_num += 1;
262
263        info!("Position request message prepared");
264
265        // Return empty positions for now
266        Ok(Vec::new())
267    }
268
269    /// Generate authentication data according to Deribit FIX specification
270    /// Returns (raw_data, base64_password_hash)
271    pub fn generate_auth_data(&self, access_secret: &str) -> Result<(String, String)> {
272        // Generate timestamp (strictly increasing integer in milliseconds)
273        let timestamp = chrono::Utc::now().timestamp_millis();
274
275        // Generate random nonce (at least 32 bytes as recommended by Deribit)
276        let mut nonce_bytes = vec![0u8; 32];
277        for byte in nonce_bytes.iter_mut() {
278            *byte = rand::random::<u8>();
279        }
280        let nonce_b64 = BASE64_STANDARD.encode(&nonce_bytes);
281
282        // Create RawData: timestamp.nonce (separated by ASCII period)
283        let raw_data = format!("{timestamp}.{nonce_b64}");
284
285        // Calculate password hash: base64(sha256(RawData ++ access_secret))
286        let mut auth_data = raw_data.as_bytes().to_vec();
287        auth_data.extend_from_slice(access_secret.as_bytes());
288
289        debug!("Timestamp: {}", timestamp);
290        debug!("Nonce length: {} bytes", nonce_bytes.len());
291        debug!("Nonce (base64): {}", nonce_b64);
292        debug!("RawData: {}", raw_data);
293        debug!("Access secret: {}", access_secret);
294        debug!("Auth data length: {} bytes", auth_data.len());
295
296        let mut hasher = Sha256::new();
297        hasher.update(&auth_data);
298        let hash_result = hasher.finalize();
299        let password_hash = BASE64_STANDARD.encode(hash_result);
300
301        debug!("Password hash: {}", password_hash);
302
303        Ok((raw_data, password_hash))
304    }
305
306    /// Calculate application signature for registered apps
307    #[allow(dead_code)]
308    fn calculate_app_signature(&self, raw_data: &str, app_secret: &str) -> Result<String> {
309        let mut hasher = Sha256::new();
310        hasher.update(format!("{raw_data}{app_secret}").as_bytes());
311        let result = hasher.finalize();
312        Ok(BASE64_STANDARD.encode(result))
313    }
314
315    /// Get current session state
316    pub fn state(&self) -> SessionState {
317        self.state
318    }
319
320    /// Set session state (for testing)
321    pub fn set_state(&mut self, state: SessionState) {
322        self.state = state;
323    }
324
325    /// Process incoming FIX message
326    async fn process_message(&mut self, message: &FixMessage) -> Result<()> {
327        debug!("Processing FIX message: {:?}", message);
328
329        // Get message type
330        let msg_type_str = message.get_field(35).unwrap_or(&String::new()).clone();
331        let msg_type = MsgType::from_str(&msg_type_str).map_err(|_| {
332            DeribitFixError::MessageParsing(format!("Unknown message type: {msg_type_str}"))
333        })?;
334
335        match msg_type {
336            MsgType::Logon => {
337                info!("Received logon response");
338                self.state = SessionState::LoggedOn;
339            }
340            MsgType::Logout => {
341                info!("Received logout message");
342                self.state = SessionState::Disconnected;
343            }
344            MsgType::Heartbeat => {
345                debug!("Received heartbeat");
346            }
347            MsgType::TestRequest => {
348                debug!("Received test request, sending heartbeat response");
349                let test_req_id = message.get_field(112);
350                self.send_heartbeat(test_req_id.cloned()).await?;
351            }
352            _ => {
353                debug!("Received message type: {:?}", msg_type);
354            }
355        }
356
357        self.incoming_seq_num += 1;
358        Ok(())
359    }
360
361    /// Receive and process a FIX message from the connection
362    pub async fn receive_and_process_message(&mut self) -> Result<Option<FixMessage>> {
363        let message = if let Some(connection) = &self.connection {
364            let mut conn_guard = connection.lock().await;
365            conn_guard.receive_message().await?
366        } else {
367            None
368        };
369
370        if let Some(message) = message {
371            self.process_message(&message).await?;
372            Ok(Some(message))
373        } else {
374            Ok(None)
375        }
376    }
377}