Skip to main content

deribit_fix/client/
fix_client.rs

1//! Deribit FIX client implementation
2
3use crate::{
4    config::DeribitFixConfig,
5    connection::Connection,
6    error::{DeribitFixError, Result},
7    model::position::Position,
8    model::request::NewOrderRequest,
9    session::Session,
10};
11use std::sync::Arc;
12use tokio::sync::Mutex;
13use tracing::info;
14
15/// Main Deribit FIX client
16pub struct DeribitFixClient {
17    /// Client configuration
18    pub config: DeribitFixConfig,
19    connection: Option<Arc<Mutex<Connection>>>,
20    session: Option<Arc<Mutex<Session>>>,
21    heartbeat_task: Option<tokio::task::JoinHandle<()>>,
22}
23
24impl DeribitFixClient {
25    /// Create a new Deribit FIX client
26    pub async fn new(config: &DeribitFixConfig) -> Result<Self> {
27        config.validate()?;
28        let config = config.clone();
29        Ok(Self {
30            config,
31            connection: None,
32            session: None,
33            heartbeat_task: None,
34        })
35    }
36
37    /// Connect to the Deribit FIX server
38    pub async fn connect(&mut self) -> Result<()> {
39        info!(
40            "Connecting to Deribit FIX server at {}",
41            self.config.connection_url()
42        );
43
44        // Create connection
45        let connection = Connection::new(&self.config).await?;
46        self.connection = Some(Arc::new(Mutex::new(connection)));
47
48        // Create session
49        let session = Session::new(&self.config, self.connection.as_ref().unwrap().clone())?;
50        self.session = Some(Arc::new(Mutex::new(session)));
51
52        // Perform logon
53        self.logon().await?;
54
55        // Start background heartbeat task to keep the session alive
56        if let Some(session) = &self.session {
57            let session_arc = session.clone();
58            let hb_interval_secs = self.config.heartbeat_interval as u64;
59            self.heartbeat_task = Some(tokio::spawn(async move {
60                use tokio::time::{Duration, sleep};
61                loop {
62                    sleep(Duration::from_secs(hb_interval_secs)).await;
63                    let mut guard = session_arc.lock().await;
64                    // Only send heartbeat when logged on; stop loop when not active
65                    if guard.get_state() == crate::session::SessionState::LoggedOn {
66                        let _ = guard.send_heartbeat(None).await;
67                    } else {
68                        break;
69                    }
70                }
71            }));
72        }
73
74        info!("Successfully connected to Deribit FIX server");
75        Ok(())
76    }
77
78    /// Disconnect from the server
79    pub async fn disconnect(&mut self) -> Result<()> {
80        info!("Disconnecting from Deribit FIX server");
81
82        // Stop heartbeat task if running
83        if let Some(handle) = self.heartbeat_task.take() {
84            handle.abort();
85        }
86
87        if let Some(session) = &self.session {
88            let mut session_guard = session.lock().await;
89            session_guard.logout().await?;
90        }
91
92        if let Some(connection) = &self.connection {
93            let mut connection_guard = connection.lock().await;
94            connection_guard.close().await?;
95        }
96
97        self.connection = None;
98        self.session = None;
99
100        info!("Successfully disconnected from Deribit FIX server");
101        Ok(())
102    }
103
104    /// Check if the client is connected
105    pub fn is_connected(&self) -> bool {
106        self.connection.is_some() && self.session.is_some()
107    }
108
109    /// Get the current session state
110    pub async fn get_session_state(&self) -> Option<crate::session::SessionState> {
111        if let Some(session) = &self.session {
112            // Use async lock to properly wait for session access
113            let session_guard = session.lock().await;
114            Some(session_guard.get_state())
115        } else {
116            None
117        }
118    }
119
120    /// Perform FIX logon
121    async fn logon(&self) -> Result<()> {
122        if let Some(session) = &self.session {
123            let mut session_guard = session.lock().await;
124            session_guard.logon().await?;
125        }
126        Ok(())
127    }
128
129    /// Send a new order
130    pub async fn send_order(&self, order: NewOrderRequest) -> Result<String> {
131        if let Some(session) = &self.session {
132            let mut session_guard = session.lock().await;
133            session_guard.send_new_order(order).await
134        } else {
135            Err(DeribitFixError::Session("Not connected".to_string()))
136        }
137    }
138
139    /// Cancel an order
140    pub async fn cancel_order(&self, order_id: String) -> Result<()> {
141        self.cancel_order_with_symbol(order_id, None).await
142    }
143
144    /// Cancel an order with optional symbol specification
145    ///
146    /// # Arguments
147    /// * `order_id` - The order identifier (OrigClOrdID) to cancel
148    /// * `symbol` - Optional instrument symbol (e.g., "BTC-PERPETUAL").
149    ///   Required when canceling by ClOrdID or DeribitLabel,
150    ///   but not required when using OrigClOrdID (fastest approach)
151    ///
152    pub async fn cancel_order_with_symbol(
153        &self,
154        order_id: String,
155        symbol: Option<String>,
156    ) -> Result<()> {
157        if let Some(session) = &self.session {
158            let mut session_guard = session.lock().await;
159            session_guard
160                .cancel_order_with_symbol(order_id, symbol)
161                .await
162        } else {
163            Err(DeribitFixError::Session("Not connected".to_string()))
164        }
165    }
166
167    /// Subscribe to market data
168    pub async fn subscribe_market_data(&self, symbol: String) -> Result<()> {
169        if let Some(session) = &self.session {
170            let mut session_guard = session.lock().await;
171            session_guard.subscribe_market_data(symbol).await
172        } else {
173            Err(DeribitFixError::Session("Not connected".to_string()))
174        }
175    }
176
177    /// Get account positions
178    pub async fn get_positions(&self) -> Result<Vec<Position>> {
179        if let Some(session) = &self.session {
180            let mut session_guard = session.lock().await;
181            session_guard.request_positions().await
182        } else {
183            Err(DeribitFixError::Session("Not connected".to_string()))
184        }
185    }
186
187    /// Receive and process a message from the server
188    pub async fn receive_message(&self) -> Result<Option<crate::model::message::FixMessage>> {
189        if let Some(session) = &self.session {
190            let mut session_guard = session.lock().await;
191            session_guard.receive_and_process_message().await
192        } else {
193            Err(DeribitFixError::Session("Not connected".to_string()))
194        }
195    }
196}