1use crate::{
4 client::{NewOrderRequest, OrderSide, OrderType, Position, TimeInForce},
5 config::Config,
6 connection::Connection,
7 error::{DeribitFixError, Result},
8 message::{FixMessage, MessageBuilder},
9 types::MsgType,
10 utils::{generate_nonce, generate_timestamp},
11};
12use base64::prelude::*;
13use chrono::Utc;
14use sha2::{Digest, Sha256};
15use std::sync::Arc;
16use tokio::sync::Mutex;
17use tracing::{debug, error, info, warn};
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum SessionState {
22 Disconnected,
23 LogonSent,
24 LoggedOn,
25 LogoutSent,
26}
27
28pub struct Session {
30 config: Config,
31 connection: Option<Arc<Mutex<Connection>>>,
32 state: SessionState,
33 outgoing_seq_num: u32,
34 incoming_seq_num: u32,
35}
36
37impl Session {
38 pub fn new(config: &Config) -> Result<Self> {
40 Ok(Self {
41 config: config.clone(),
42 connection: None,
43 state: SessionState::Disconnected,
44 outgoing_seq_num: 1,
45 incoming_seq_num: 1,
46 })
47 }
48
49 pub fn set_connection(&mut self, connection: Arc<Mutex<Connection>>) {
51 self.connection = Some(connection);
52 }
53
54 pub async fn logon(&mut self) -> Result<()> {
56 let connection = self.connection.as_ref()
57 .ok_or_else(|| DeribitFixError::Session("No connection available".to_string()))?;
58
59 info!("Initiating FIX logon");
60 self.state = SessionState::LogonSent;
61
62 let timestamp = generate_timestamp();
64 let nonce = generate_nonce(32);
65 let raw_data = format!("{}.{}", timestamp, nonce);
66
67 let password_hash = self.calculate_password_hash(&raw_data)?;
69
70 let mut builder = MessageBuilder::new()
72 .msg_type(MsgType::Logon)
73 .sender_comp_id(self.config.sender_comp_id.clone())
74 .target_comp_id(self.config.target_comp_id.clone())
75 .msg_seq_num(self.outgoing_seq_num)
76 .sending_time(Utc::now())
77 .field(108, self.config.heartbeat_interval.to_string()) .field(553, self.config.username.clone()) .field(554, password_hash) .field(95, raw_data.len().to_string()) .field(96, raw_data.clone()); if self.config.cancel_on_disconnect {
85 builder = builder.field(9001, "Y".to_string()); } else {
87 builder = builder.field(9001, "N".to_string());
88 }
89
90 if let (Some(app_id), Some(app_secret)) = (&self.config.app_id, &self.config.app_secret) {
92 let app_sig = self.calculate_app_signature(&raw_data, app_secret)?;
93 builder = builder
94 .field(9004, app_id.clone()) .field(9005, app_sig); }
97
98 let logon_message = builder.build()?;
99
100 let mut conn_guard = connection.lock().await;
101 conn_guard.send_message(&logon_message).await?;
102
103 self.outgoing_seq_num += 1;
104
105 info!("Logon message sent");
106 Ok(())
107 }
108
109 pub async fn logout(&mut self) -> Result<()> {
111 let connection = self.connection.as_ref()
112 .ok_or_else(|| DeribitFixError::Session("No connection available".to_string()))?;
113
114 info!("Initiating FIX logout");
115 self.state = SessionState::LogoutSent;
116
117 let logout_message = MessageBuilder::new()
118 .msg_type(MsgType::Logout)
119 .sender_comp_id(self.config.sender_comp_id.clone())
120 .target_comp_id(self.config.target_comp_id.clone())
121 .msg_seq_num(self.outgoing_seq_num)
122 .sending_time(Utc::now())
123 .build()?;
124
125 let mut conn_guard = connection.lock().await;
126 conn_guard.send_message(&logout_message).await?;
127
128 self.outgoing_seq_num += 1;
129 self.state = SessionState::Disconnected;
130
131 info!("Logout message sent");
132 Ok(())
133 }
134
135 pub async fn send_heartbeat(&mut self, test_req_id: Option<String>) -> Result<()> {
137 let connection = self.connection.as_ref()
138 .ok_or_else(|| DeribitFixError::Session("No connection available".to_string()))?;
139
140 let mut builder = MessageBuilder::new()
141 .msg_type(MsgType::Heartbeat)
142 .sender_comp_id(self.config.sender_comp_id.clone())
143 .target_comp_id(self.config.target_comp_id.clone())
144 .msg_seq_num(self.outgoing_seq_num)
145 .sending_time(Utc::now());
146
147 if let Some(test_req_id) = test_req_id {
148 builder = builder.field(112, test_req_id); }
150
151 let heartbeat_message = builder.build()?;
152
153 let mut conn_guard = connection.lock().await;
154 conn_guard.send_message(&heartbeat_message).await?;
155
156 self.outgoing_seq_num += 1;
157
158 debug!("Heartbeat sent");
159 Ok(())
160 }
161
162 pub async fn send_new_order(&mut self, order: NewOrderRequest) -> Result<String> {
164 let connection = self.connection.as_ref()
165 .ok_or_else(|| DeribitFixError::Session("No connection available".to_string()))?;
166
167 if self.state != SessionState::LoggedOn {
168 return Err(DeribitFixError::Session("Not logged on".to_string()));
169 }
170
171 let client_order_id = order.client_order_id
172 .unwrap_or_else(|| format!("ORDER_{}", generate_timestamp()));
173
174 let mut builder = MessageBuilder::new()
175 .msg_type(MsgType::NewOrderSingle)
176 .sender_comp_id(self.config.sender_comp_id.clone())
177 .target_comp_id(self.config.target_comp_id.clone())
178 .msg_seq_num(self.outgoing_seq_num)
179 .sending_time(Utc::now())
180 .field(11, client_order_id.clone()) .field(55, order.symbol) .field(54, match order.side { OrderSide::Buy => "1",
184 OrderSide::Sell => "2",
185 }.to_string())
186 .field(38, order.quantity.to_string()) .field(40, match order.order_type { OrderType::Market => "1",
189 OrderType::Limit => "2",
190 OrderType::Stop => "3",
191 OrderType::StopLimit => "4",
192 }.to_string())
193 .field(59, match order.time_in_force { TimeInForce::Day => "0",
195 TimeInForce::GoodTillCancel => "1",
196 TimeInForce::ImmediateOrCancel => "3",
197 TimeInForce::FillOrKill => "4",
198 }.to_string());
199
200 if let Some(price) = order.price {
202 builder = builder.field(44, price.to_string()); }
204
205 let order_message = builder.build()?;
206
207 let mut conn_guard = connection.lock().await;
208 conn_guard.send_message(&order_message).await?;
209
210 self.outgoing_seq_num += 1;
211
212 info!("New order sent: {}", client_order_id);
213 Ok(client_order_id)
214 }
215
216 pub async fn cancel_order(&mut self, order_id: String) -> Result<()> {
218 let connection = self.connection.as_ref()
219 .ok_or_else(|| DeribitFixError::Session("No connection available".to_string()))?;
220
221 if self.state != SessionState::LoggedOn {
222 return Err(DeribitFixError::Session("Not logged on".to_string()));
223 }
224
225 let cancel_message = MessageBuilder::new()
226 .msg_type(MsgType::OrderCancelRequest)
227 .sender_comp_id(self.config.sender_comp_id.clone())
228 .target_comp_id(self.config.target_comp_id.clone())
229 .msg_seq_num(self.outgoing_seq_num)
230 .sending_time(Utc::now())
231 .field(11, format!("CANCEL_{}", generate_timestamp())) .field(41, order_id) .build()?;
234
235 let mut conn_guard = connection.lock().await;
236 conn_guard.send_message(&cancel_message).await?;
237
238 self.outgoing_seq_num += 1;
239
240 info!("Order cancel request sent");
241 Ok(())
242 }
243
244 pub async fn subscribe_market_data(&mut self, symbol: String) -> Result<()> {
246 let connection = self.connection.as_ref()
247 .ok_or_else(|| DeribitFixError::Session("No connection available".to_string()))?;
248
249 if self.state != SessionState::LoggedOn {
250 return Err(DeribitFixError::Session("Not logged on".to_string()));
251 }
252
253 let md_request = MessageBuilder::new()
254 .msg_type(MsgType::MarketDataRequest)
255 .sender_comp_id(self.config.sender_comp_id.clone())
256 .target_comp_id(self.config.target_comp_id.clone())
257 .msg_seq_num(self.outgoing_seq_num)
258 .sending_time(Utc::now())
259 .field(262, format!("MD_{}", generate_timestamp())) .field(263, "1".to_string()) .field(264, "0".to_string()) .field(146, "1".to_string()) .field(55, symbol) .build()?;
265
266 let mut conn_guard = connection.lock().await;
267 conn_guard.send_message(&md_request).await?;
268
269 self.outgoing_seq_num += 1;
270
271 info!("Market data subscription sent");
272 Ok(())
273 }
274
275 pub async fn request_positions(&mut self) -> Result<Vec<Position>> {
277 let connection = self.connection.as_ref()
278 .ok_or_else(|| DeribitFixError::Session("No connection available".to_string()))?;
279
280 if self.state != SessionState::LoggedOn {
281 return Err(DeribitFixError::Session("Not logged on".to_string()));
282 }
283
284 let pos_request = MessageBuilder::new()
285 .msg_type(MsgType::RequestForPositions)
286 .sender_comp_id(self.config.sender_comp_id.clone())
287 .target_comp_id(self.config.target_comp_id.clone())
288 .msg_seq_num(self.outgoing_seq_num)
289 .sending_time(Utc::now())
290 .field(710, format!("POS_{}", generate_timestamp())) .field(724, "0".to_string()) .build()?;
293
294 let mut conn_guard = connection.lock().await;
295 conn_guard.send_message(&pos_request).await?;
296
297 self.outgoing_seq_num += 1;
298
299 info!("Position request sent");
300
301 Ok(vec![])
303 }
304
305 fn calculate_password_hash(&self, raw_data: &str) -> Result<String> {
307 let mut hasher = Sha256::new();
308 hasher.update(raw_data.as_bytes());
309 hasher.update(self.config.password.as_bytes());
310 let hash = hasher.finalize();
311 Ok(BASE64_STANDARD.encode(hash))
312 }
313
314 fn calculate_app_signature(&self, raw_data: &str, app_secret: &str) -> Result<String> {
316 let mut hasher = Sha256::new();
317 hasher.update(raw_data.as_bytes());
318 hasher.update(app_secret.as_bytes());
319 let hash = hasher.finalize();
320 Ok(BASE64_STANDARD.encode(hash))
321 }
322
323 pub fn state(&self) -> SessionState {
325 self.state
326 }
327
328 pub fn set_state(&mut self, state: SessionState) {
330 self.state = state;
331 }
332}