deribit_fix/client/
fix_client.rs

1//! Deribit FIX client implementation
2
3use crate::{
4    config::DeribitFixConfig,
5    connection::Connection,
6    error::{DeribitFixError, Result},
7    session::Session,
8};
9use deribit_base::prelude::{NewOrderRequest, Position};
10use std::sync::Arc;
11use tokio::sync::Mutex;
12use tracing::info;
13
14/// Main Deribit FIX client
15pub struct DeribitFixClient {
16    /// Client configuration
17    pub config: DeribitFixConfig,
18    connection: Option<Arc<Mutex<Connection>>>,
19    session: Option<Arc<Mutex<Session>>>,
20    heartbeat_task: Option<tokio::task::JoinHandle<()>>,
21}
22
23impl DeribitFixClient {
24    /// Create a new Deribit FIX client
25    pub async fn new(config: &DeribitFixConfig) -> Result<Self> {
26        config.validate()?;
27        let config = config.clone();
28        Ok(Self {
29            config,
30            connection: None,
31            session: None,
32            heartbeat_task: None,
33        })
34    }
35
36    /// Connect to the Deribit FIX server
37    pub async fn connect(&mut self) -> Result<()> {
38        info!(
39            "Connecting to Deribit FIX server at {}",
40            self.config.connection_url()
41        );
42
43        // Create connection
44        let connection = Connection::new(&self.config).await?;
45        self.connection = Some(Arc::new(Mutex::new(connection)));
46
47        // Create session
48        let session = Session::new(&self.config, self.connection.as_ref().unwrap().clone())?;
49        self.session = Some(Arc::new(Mutex::new(session)));
50
51        // Perform logon
52        self.logon().await?;
53
54        // Start background heartbeat task to keep the session alive
55        if let Some(session) = &self.session {
56            let session_arc = session.clone();
57            let hb_interval_secs = self.config.heartbeat_interval as u64;
58            self.heartbeat_task = Some(tokio::spawn(async move {
59                use tokio::time::{Duration, sleep};
60                loop {
61                    sleep(Duration::from_secs(hb_interval_secs)).await;
62                    let mut guard = session_arc.lock().await;
63                    // Only send heartbeat when logged on; stop loop when not active
64                    if guard.get_state() == crate::session::SessionState::LoggedOn {
65                        let _ = guard.send_heartbeat(None).await;
66                    } else {
67                        break;
68                    }
69                }
70            }));
71        }
72
73        info!("Successfully connected to Deribit FIX server");
74        Ok(())
75    }
76
77    /// Disconnect from the server
78    pub async fn disconnect(&mut self) -> Result<()> {
79        info!("Disconnecting from Deribit FIX server");
80
81        // Stop heartbeat task if running
82        if let Some(handle) = self.heartbeat_task.take() {
83            handle.abort();
84        }
85
86        if let Some(session) = &self.session {
87            let mut session_guard = session.lock().await;
88            session_guard.logout().await?;
89        }
90
91        if let Some(connection) = &self.connection {
92            let mut connection_guard = connection.lock().await;
93            connection_guard.close().await?;
94        }
95
96        self.connection = None;
97        self.session = None;
98
99        info!("Successfully disconnected from Deribit FIX server");
100        Ok(())
101    }
102
103    /// Check if the client is connected
104    pub fn is_connected(&self) -> bool {
105        self.connection.is_some() && self.session.is_some()
106    }
107
108    /// Get the current session state
109    pub async fn get_session_state(&self) -> Option<crate::session::SessionState> {
110        if let Some(session) = &self.session {
111            // Use async lock to properly wait for session access
112            let session_guard = session.lock().await;
113            Some(session_guard.get_state())
114        } else {
115            None
116        }
117    }
118
119    /// Perform FIX logon
120    async fn logon(&self) -> Result<()> {
121        if let Some(session) = &self.session {
122            let mut session_guard = session.lock().await;
123            session_guard.logon().await?;
124        }
125        Ok(())
126    }
127
128    /// Send a new order
129    pub async fn send_order(&self, order: NewOrderRequest) -> Result<String> {
130        if let Some(session) = &self.session {
131            let mut session_guard = session.lock().await;
132            session_guard.send_new_order(order).await
133        } else {
134            Err(DeribitFixError::Session("Not connected".to_string()))
135        }
136    }
137
138    /// Cancel an order
139    pub async fn cancel_order(&self, order_id: String) -> Result<()> {
140        self.cancel_order_with_symbol(order_id, None).await
141    }
142
143    /// Cancel an order with optional symbol specification
144    ///
145    /// # Arguments
146    /// * `order_id` - The order identifier (OrigClOrdID) to cancel
147    /// * `symbol` - Optional instrument symbol (e.g., "BTC-PERPETUAL").
148    ///   Required when canceling by ClOrdID or DeribitLabel,
149    ///   but not required when using OrigClOrdID (fastest approach)
150    ///
151    pub async fn cancel_order_with_symbol(
152        &self,
153        order_id: String,
154        symbol: Option<String>,
155    ) -> Result<()> {
156        if let Some(session) = &self.session {
157            let mut session_guard = session.lock().await;
158            session_guard
159                .cancel_order_with_symbol(order_id, symbol)
160                .await
161        } else {
162            Err(DeribitFixError::Session("Not connected".to_string()))
163        }
164    }
165
166    /// Subscribe to market data
167    pub async fn subscribe_market_data(&self, symbol: String) -> Result<()> {
168        if let Some(session) = &self.session {
169            let mut session_guard = session.lock().await;
170            session_guard.subscribe_market_data(symbol).await
171        } else {
172            Err(DeribitFixError::Session("Not connected".to_string()))
173        }
174    }
175
176    /// Get account positions
177    pub async fn get_positions(&self) -> Result<Vec<Position>> {
178        if let Some(session) = &self.session {
179            let mut session_guard = session.lock().await;
180            session_guard.request_positions().await
181        } else {
182            Err(DeribitFixError::Session("Not connected".to_string()))
183        }
184    }
185
186    /// Receive and process a message from the server
187    pub async fn receive_message(&self) -> Result<Option<crate::model::message::FixMessage>> {
188        if let Some(session) = &self.session {
189            let mut session_guard = session.lock().await;
190            session_guard.receive_and_process_message().await
191        } else {
192            Err(DeribitFixError::Session("Not connected".to_string()))
193        }
194    }
195}