1use crate::model::message::FixMessage;
4use crate::model::types::MsgType;
5use crate::{
6 config::DeribitFixConfig,
7 connection::Connection,
8 error::{DeribitFixError, Result},
9 message::MessageBuilder,
10};
11use base64::prelude::*;
12use chrono::Utc;
13use deribit_base::prelude::*;
14use rand;
15use sha2::{Digest, Sha256};
16use std::str::FromStr;
17use std::sync::Arc;
18use tokio::sync::Mutex;
19use tracing::{debug, info};
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum SessionState {
24 Disconnected,
26 LogonSent,
28 LoggedOn,
30 LogoutSent,
32}
33
34pub struct Session {
36 config: DeribitFixConfig,
37 connection: Option<Arc<Mutex<Connection>>>,
38 state: SessionState,
39 outgoing_seq_num: u32,
40 incoming_seq_num: u32,
41}
42
43impl Session {
44 pub fn new(config: &DeribitFixConfig, connection: Arc<Mutex<Connection>>) -> Result<Self> {
46 info!("Creating new FIX session");
47 Ok(Self {
48 config: config.clone(),
49 state: SessionState::Disconnected,
50 outgoing_seq_num: 1,
51 incoming_seq_num: 1,
52 connection: Some(connection),
53 })
54 }
55
56 pub fn set_connection(&mut self, connection: Arc<Mutex<Connection>>) {
58 self.connection = Some(connection);
59 }
60
61 pub fn get_state(&self) -> SessionState {
63 self.state
64 }
65
66 async fn send_message(&mut self, message: FixMessage) -> Result<()> {
68 if let Some(connection) = &self.connection {
69 let mut conn_guard = connection.lock().await;
70 conn_guard.send_message(&message).await?;
71 debug!("Sent FIX message: {}", message.to_string());
72 } else {
73 return Err(DeribitFixError::Connection(
74 "No connection available".to_string(),
75 ));
76 }
77 Ok(())
78 }
79
80 pub async fn logon(&mut self) -> Result<()> {
82 info!("Performing FIX logon");
83
84 let (raw_data, password_hash) = self.generate_auth_data(&self.config.password)?;
86
87 let message_builder = MessageBuilder::new()
88 .msg_type(MsgType::Logon)
89 .sender_comp_id(self.config.sender_comp_id.clone())
90 .target_comp_id(self.config.target_comp_id.clone())
91 .msg_seq_num(self.outgoing_seq_num)
92 .field(96, raw_data) .field(98, "0".to_string()) .field(108, self.config.heartbeat_interval.to_string()) .field(553, self.config.username.clone()) .field(554, password_hash); let logon_message = message_builder.build()?;
104
105 self.send_message(logon_message).await?;
107 self.state = SessionState::LogonSent;
108 self.outgoing_seq_num += 1;
109
110 info!("Logon message sent");
111 Ok(())
112 }
113
114 pub async fn logout(&mut self) -> Result<()> {
116 info!("Performing FIX logout");
117
118 let logout_message = MessageBuilder::new()
119 .msg_type(MsgType::Logout)
120 .sender_comp_id(self.config.sender_comp_id.clone())
121 .target_comp_id(self.config.target_comp_id.clone())
122 .msg_seq_num(self.outgoing_seq_num)
123 .field(58, "Normal logout".to_string()) .build()?;
125
126 self.send_message(logout_message).await?;
128 self.state = SessionState::LogoutSent;
129 self.outgoing_seq_num += 1;
130
131 info!("Logout message sent");
132 Ok(())
133 }
134
135 pub async fn send_heartbeat(&mut self, test_req_id: Option<String>) -> Result<()> {
137 debug!("Sending heartbeat message");
138
139 let mut builder = MessageBuilder::new()
140 .msg_type(MsgType::Heartbeat)
141 .sender_comp_id(self.config.sender_comp_id.clone())
142 .target_comp_id(self.config.target_comp_id.clone())
143 .msg_seq_num(self.outgoing_seq_num);
144
145 if let Some(test_req_id) = test_req_id {
146 builder = builder.field(112, test_req_id); }
148
149 let heartbeat_message = builder.build()?;
150
151 self.send_message(heartbeat_message).await?;
153 self.outgoing_seq_num += 1;
154
155 debug!("Heartbeat message sent");
156 Ok(())
157 }
158
159 pub fn send_new_order(&mut self, order: NewOrderRequest) -> Result<String> {
161 info!("Sending new order: {:?}", order);
162
163 let order_id = format!("ORDER_{}", chrono::Utc::now().timestamp_millis());
164
165 let _order_message = MessageBuilder::new()
166 .msg_type(MsgType::NewOrderSingle)
167 .sender_comp_id(self.config.sender_comp_id.clone())
168 .target_comp_id(self.config.target_comp_id.clone())
169 .msg_seq_num(self.outgoing_seq_num)
170 .field(11, order_id.clone()) .field(55, order.instrument_name.clone()) .field(
173 54,
174 match order.side {
175 deribit_base::model::order::OrderSide::Buy => "1".to_string(),
176 deribit_base::model::order::OrderSide::Sell => "2".to_string(),
177 },
178 ) .field(60, Utc::now().format("%Y%m%d-%H:%M:%S%.3f").to_string()) .field(38, order.amount.to_string()) .field(40, "2".to_string()) .field(44, order.price.unwrap_or(0.0).to_string()) .build()?;
184
185 self.outgoing_seq_num += 1;
187
188 info!("New order message prepared with ID: {}", order_id);
189 Ok(order_id)
190 }
191
192 pub fn cancel_order(&mut self, order_id: String) -> Result<()> {
194 info!("Cancelling order: {}", order_id);
195
196 let cancel_id = format!("CANCEL_{}", chrono::Utc::now().timestamp_millis());
197
198 let _cancel_message = MessageBuilder::new()
199 .msg_type(MsgType::OrderCancelRequest)
200 .sender_comp_id(self.config.sender_comp_id.clone())
201 .target_comp_id(self.config.target_comp_id.clone())
202 .msg_seq_num(self.outgoing_seq_num)
203 .field(11, cancel_id) .field(41, order_id) .field(60, Utc::now().format("%Y%m%d-%H:%M:%S%.3f").to_string()) .build()?;
207
208 self.outgoing_seq_num += 1;
210
211 info!("Order cancel message prepared");
212 Ok(())
213 }
214
215 pub fn subscribe_market_data(&mut self, symbol: String) -> Result<()> {
217 info!("Subscribing to market data for: {}", symbol);
218
219 let request_id = format!("MDR_{}", chrono::Utc::now().timestamp_millis());
220
221 let _market_data_request = MessageBuilder::new()
222 .msg_type(MsgType::MarketDataRequest)
223 .sender_comp_id(self.config.sender_comp_id.clone())
224 .target_comp_id(self.config.target_comp_id.clone())
225 .msg_seq_num(self.outgoing_seq_num)
226 .field(262, request_id) .field(263, "1".to_string()) .field(264, "0".to_string()) .field(267, "2".to_string()) .field(269, "0".to_string()) .field(269, "1".to_string()) .field(146, "1".to_string()) .field(55, symbol) .build()?;
235
236 self.outgoing_seq_num += 1;
238
239 info!("Market data subscription message prepared");
240 Ok(())
241 }
242
243 pub fn request_positions(&mut self) -> Result<Vec<Position>> {
245 info!("Requesting positions");
246
247 let request_id = format!("POS_{}", chrono::Utc::now().timestamp_millis());
248
249 let _position_request = MessageBuilder::new()
250 .msg_type(MsgType::RequestForPositions)
251 .sender_comp_id(self.config.sender_comp_id.clone())
252 .target_comp_id(self.config.target_comp_id.clone())
253 .msg_seq_num(self.outgoing_seq_num)
254 .field(710, request_id) .field(724, "0".to_string()) .field(263, "1".to_string()) .field(715, Utc::now().format("%Y%m%d").to_string()) .build()?;
259
260 self.outgoing_seq_num += 1;
262
263 info!("Position request message prepared");
264
265 Ok(Vec::new())
267 }
268
269 pub fn generate_auth_data(&self, access_secret: &str) -> Result<(String, String)> {
272 let timestamp = chrono::Utc::now().timestamp_millis();
274
275 let mut nonce_bytes = vec![0u8; 32];
277 for byte in nonce_bytes.iter_mut() {
278 *byte = rand::random::<u8>();
279 }
280 let nonce_b64 = BASE64_STANDARD.encode(&nonce_bytes);
281
282 let raw_data = format!("{timestamp}.{nonce_b64}");
284
285 let mut auth_data = raw_data.as_bytes().to_vec();
287 auth_data.extend_from_slice(access_secret.as_bytes());
288
289 debug!("Timestamp: {}", timestamp);
290 debug!("Nonce length: {} bytes", nonce_bytes.len());
291 debug!("Nonce (base64): {}", nonce_b64);
292 debug!("RawData: {}", raw_data);
293 debug!("Access secret: {}", access_secret);
294 debug!("Auth data length: {} bytes", auth_data.len());
295
296 let mut hasher = Sha256::new();
297 hasher.update(&auth_data);
298 let hash_result = hasher.finalize();
299 let password_hash = BASE64_STANDARD.encode(hash_result);
300
301 debug!("Password hash: {}", password_hash);
302
303 Ok((raw_data, password_hash))
304 }
305
306 #[allow(dead_code)]
308 fn calculate_app_signature(&self, raw_data: &str, app_secret: &str) -> Result<String> {
309 let mut hasher = Sha256::new();
310 hasher.update(format!("{raw_data}{app_secret}").as_bytes());
311 let result = hasher.finalize();
312 Ok(BASE64_STANDARD.encode(result))
313 }
314
315 pub fn state(&self) -> SessionState {
317 self.state
318 }
319
320 pub fn set_state(&mut self, state: SessionState) {
322 self.state = state;
323 }
324
325 async fn process_message(&mut self, message: &FixMessage) -> Result<()> {
327 debug!("Processing FIX message: {:?}", message);
328
329 let msg_type_str = message.get_field(35).unwrap_or(&String::new()).clone();
331 let msg_type = MsgType::from_str(&msg_type_str).map_err(|_| {
332 DeribitFixError::MessageParsing(format!("Unknown message type: {msg_type_str}"))
333 })?;
334
335 match msg_type {
336 MsgType::Logon => {
337 info!("Received logon response");
338 self.state = SessionState::LoggedOn;
339 }
340 MsgType::Logout => {
341 info!("Received logout message");
342 self.state = SessionState::Disconnected;
343 }
344 MsgType::Heartbeat => {
345 debug!("Received heartbeat");
346 }
347 MsgType::TestRequest => {
348 debug!("Received test request, sending heartbeat response");
349 let test_req_id = message.get_field(112);
350 self.send_heartbeat(test_req_id.cloned()).await?;
351 }
352 _ => {
353 debug!("Received message type: {:?}", msg_type);
354 }
355 }
356
357 self.incoming_seq_num += 1;
358 Ok(())
359 }
360
361 pub async fn receive_and_process_message(&mut self) -> Result<Option<FixMessage>> {
363 let message = if let Some(connection) = &self.connection {
364 let mut conn_guard = connection.lock().await;
365 conn_guard.receive_message().await?
366 } else {
367 None
368 };
369
370 if let Some(message) = message {
371 self.process_message(&message).await?;
372 Ok(Some(message))
373 } else {
374 Ok(None)
375 }
376 }
377}