deribit_fix/client/
fix_client.rs1use crate::{
4 config::DeribitFixConfig,
5 connection::Connection,
6 error::{DeribitFixError, Result},
7 model::position::Position,
8 model::request::NewOrderRequest,
9 session::Session,
10};
11use std::sync::Arc;
12use tokio::sync::Mutex;
13use tracing::info;
14
15pub struct DeribitFixClient {
17 pub config: DeribitFixConfig,
19 connection: Option<Arc<Mutex<Connection>>>,
20 session: Option<Arc<Mutex<Session>>>,
21 heartbeat_task: Option<tokio::task::JoinHandle<()>>,
22}
23
24impl DeribitFixClient {
25 pub async fn new(config: &DeribitFixConfig) -> Result<Self> {
27 config.validate()?;
28 let config = config.clone();
29 Ok(Self {
30 config,
31 connection: None,
32 session: None,
33 heartbeat_task: None,
34 })
35 }
36
37 pub async fn connect(&mut self) -> Result<()> {
39 info!(
40 "Connecting to Deribit FIX server at {}",
41 self.config.connection_url()
42 );
43
44 let connection = Connection::new(&self.config).await?;
46 self.connection = Some(Arc::new(Mutex::new(connection)));
47
48 let session = Session::new(&self.config, self.connection.as_ref().unwrap().clone())?;
50 self.session = Some(Arc::new(Mutex::new(session)));
51
52 self.logon().await?;
54
55 if let Some(session) = &self.session {
57 let session_arc = session.clone();
58 let hb_interval_secs = self.config.heartbeat_interval as u64;
59 self.heartbeat_task = Some(tokio::spawn(async move {
60 use tokio::time::{Duration, sleep};
61 loop {
62 sleep(Duration::from_secs(hb_interval_secs)).await;
63 let mut guard = session_arc.lock().await;
64 if guard.get_state() == crate::session::SessionState::LoggedOn {
66 let _ = guard.send_heartbeat(None).await;
67 } else {
68 break;
69 }
70 }
71 }));
72 }
73
74 info!("Successfully connected to Deribit FIX server");
75 Ok(())
76 }
77
78 pub async fn disconnect(&mut self) -> Result<()> {
80 info!("Disconnecting from Deribit FIX server");
81
82 if let Some(handle) = self.heartbeat_task.take() {
84 handle.abort();
85 }
86
87 if let Some(session) = &self.session {
88 let mut session_guard = session.lock().await;
89 session_guard.logout().await?;
90 }
91
92 if let Some(connection) = &self.connection {
93 let mut connection_guard = connection.lock().await;
94 connection_guard.close().await?;
95 }
96
97 self.connection = None;
98 self.session = None;
99
100 info!("Successfully disconnected from Deribit FIX server");
101 Ok(())
102 }
103
104 pub fn is_connected(&self) -> bool {
106 self.connection.is_some() && self.session.is_some()
107 }
108
109 pub async fn get_session_state(&self) -> Option<crate::session::SessionState> {
111 if let Some(session) = &self.session {
112 let session_guard = session.lock().await;
114 Some(session_guard.get_state())
115 } else {
116 None
117 }
118 }
119
120 async fn logon(&self) -> Result<()> {
122 if let Some(session) = &self.session {
123 let mut session_guard = session.lock().await;
124 session_guard.logon().await?;
125 }
126 Ok(())
127 }
128
129 pub async fn send_order(&self, order: NewOrderRequest) -> Result<String> {
131 if let Some(session) = &self.session {
132 let mut session_guard = session.lock().await;
133 session_guard.send_new_order(order).await
134 } else {
135 Err(DeribitFixError::Session("Not connected".to_string()))
136 }
137 }
138
139 pub async fn cancel_order(&self, order_id: String) -> Result<()> {
141 self.cancel_order_with_symbol(order_id, None).await
142 }
143
144 pub async fn cancel_order_with_symbol(
153 &self,
154 order_id: String,
155 symbol: Option<String>,
156 ) -> Result<()> {
157 if let Some(session) = &self.session {
158 let mut session_guard = session.lock().await;
159 session_guard
160 .cancel_order_with_symbol(order_id, symbol)
161 .await
162 } else {
163 Err(DeribitFixError::Session("Not connected".to_string()))
164 }
165 }
166
167 pub async fn subscribe_market_data(&self, symbol: String) -> Result<()> {
169 if let Some(session) = &self.session {
170 let mut session_guard = session.lock().await;
171 session_guard.subscribe_market_data(symbol).await
172 } else {
173 Err(DeribitFixError::Session("Not connected".to_string()))
174 }
175 }
176
177 pub async fn get_positions(&self) -> Result<Vec<Position>> {
179 if let Some(session) = &self.session {
180 let mut session_guard = session.lock().await;
181 session_guard.request_positions().await
182 } else {
183 Err(DeribitFixError::Session("Not connected".to_string()))
184 }
185 }
186
187 pub async fn receive_message(&self) -> Result<Option<crate::model::message::FixMessage>> {
189 if let Some(session) = &self.session {
190 let mut session_guard = session.lock().await;
191 session_guard.receive_and_process_message().await
192 } else {
193 Err(DeribitFixError::Session("Not connected".to_string()))
194 }
195 }
196}