deribit_fix/client/
fix_client.rs1use crate::{
4 config::DeribitFixConfig,
5 connection::Connection,
6 error::{DeribitFixError, Result},
7 session::Session,
8};
9use deribit_base::prelude::{NewOrderRequest, Position};
10use std::sync::Arc;
11use tokio::sync::Mutex;
12use tracing::info;
13
14pub struct DeribitFixClient {
16 pub config: DeribitFixConfig,
18 connection: Option<Arc<Mutex<Connection>>>,
19 session: Option<Arc<Mutex<Session>>>,
20 heartbeat_task: Option<tokio::task::JoinHandle<()>>,
21}
22
23impl DeribitFixClient {
24 pub async fn new(config: &DeribitFixConfig) -> Result<Self> {
26 config.validate()?;
27 let config = config.clone();
28 Ok(Self {
29 config,
30 connection: None,
31 session: None,
32 heartbeat_task: None,
33 })
34 }
35
36 pub async fn connect(&mut self) -> Result<()> {
38 info!(
39 "Connecting to Deribit FIX server at {}",
40 self.config.connection_url()
41 );
42
43 let connection = Connection::new(&self.config).await?;
45 self.connection = Some(Arc::new(Mutex::new(connection)));
46
47 let session = Session::new(&self.config, self.connection.as_ref().unwrap().clone())?;
49 self.session = Some(Arc::new(Mutex::new(session)));
50
51 self.logon().await?;
53
54 if let Some(session) = &self.session {
56 let session_arc = session.clone();
57 let hb_interval_secs = self.config.heartbeat_interval as u64;
58 self.heartbeat_task = Some(tokio::spawn(async move {
59 use tokio::time::{Duration, sleep};
60 loop {
61 sleep(Duration::from_secs(hb_interval_secs)).await;
62 let mut guard = session_arc.lock().await;
63 if guard.get_state() == crate::session::SessionState::LoggedOn {
65 let _ = guard.send_heartbeat(None).await;
66 } else {
67 break;
68 }
69 }
70 }));
71 }
72
73 info!("Successfully connected to Deribit FIX server");
74 Ok(())
75 }
76
77 pub async fn disconnect(&mut self) -> Result<()> {
79 info!("Disconnecting from Deribit FIX server");
80
81 if let Some(handle) = self.heartbeat_task.take() {
83 handle.abort();
84 }
85
86 if let Some(session) = &self.session {
87 let mut session_guard = session.lock().await;
88 session_guard.logout().await?;
89 }
90
91 if let Some(connection) = &self.connection {
92 let mut connection_guard = connection.lock().await;
93 connection_guard.close().await?;
94 }
95
96 self.connection = None;
97 self.session = None;
98
99 info!("Successfully disconnected from Deribit FIX server");
100 Ok(())
101 }
102
103 pub fn is_connected(&self) -> bool {
105 self.connection.is_some() && self.session.is_some()
106 }
107
108 pub async fn get_session_state(&self) -> Option<crate::session::SessionState> {
110 if let Some(session) = &self.session {
111 let session_guard = session.lock().await;
113 Some(session_guard.get_state())
114 } else {
115 None
116 }
117 }
118
119 async fn logon(&self) -> Result<()> {
121 if let Some(session) = &self.session {
122 let mut session_guard = session.lock().await;
123 session_guard.logon().await?;
124 }
125 Ok(())
126 }
127
128 pub async fn send_order(&self, order: NewOrderRequest) -> Result<String> {
130 if let Some(session) = &self.session {
131 let mut session_guard = session.lock().await;
132 session_guard.send_new_order(order).await
133 } else {
134 Err(DeribitFixError::Session("Not connected".to_string()))
135 }
136 }
137
138 pub async fn cancel_order(&self, order_id: String) -> Result<()> {
140 self.cancel_order_with_symbol(order_id, None).await
141 }
142
143 pub async fn cancel_order_with_symbol(
152 &self,
153 order_id: String,
154 symbol: Option<String>,
155 ) -> Result<()> {
156 if let Some(session) = &self.session {
157 let mut session_guard = session.lock().await;
158 session_guard
159 .cancel_order_with_symbol(order_id, symbol)
160 .await
161 } else {
162 Err(DeribitFixError::Session("Not connected".to_string()))
163 }
164 }
165
166 pub async fn subscribe_market_data(&self, symbol: String) -> Result<()> {
168 if let Some(session) = &self.session {
169 let mut session_guard = session.lock().await;
170 session_guard.subscribe_market_data(symbol).await
171 } else {
172 Err(DeribitFixError::Session("Not connected".to_string()))
173 }
174 }
175
176 pub async fn get_positions(&self) -> Result<Vec<Position>> {
178 if let Some(session) = &self.session {
179 let mut session_guard = session.lock().await;
180 session_guard.request_positions().await
181 } else {
182 Err(DeribitFixError::Session("Not connected".to_string()))
183 }
184 }
185
186 pub async fn receive_message(&self) -> Result<Option<crate::model::message::FixMessage>> {
188 if let Some(session) = &self.session {
189 let mut session_guard = session.lock().await;
190 session_guard.receive_and_process_message().await
191 } else {
192 Err(DeribitFixError::Session("Not connected".to_string()))
193 }
194 }
195}