deribit_fix/client/
fix_client.rs

1//! Deribit FIX client implementation
2
3use crate::{
4    config::DeribitFixConfig,
5    connection::Connection,
6    error::{DeribitFixError, Result},
7    session::Session,
8};
9use deribit_base::prelude::{NewOrderRequest, Position};
10use std::sync::Arc;
11use tokio::sync::Mutex;
12use tracing::info;
13
14/// Main Deribit FIX client
15pub struct DeribitFixClient {
16    /// Client configuration
17    pub config: DeribitFixConfig,
18    connection: Option<Arc<Mutex<Connection>>>,
19    session: Option<Arc<Mutex<Session>>>,
20    heartbeat_task: Option<tokio::task::JoinHandle<()>>,
21}
22
23impl DeribitFixClient {
24    /// Create a new Deribit FIX client
25    pub async fn new(config: DeribitFixConfig) -> Result<Self> {
26        config.validate()?;
27
28        Ok(Self {
29            config,
30            connection: None,
31            session: None,
32            heartbeat_task: None,
33        })
34    }
35
36    /// Connect to the Deribit FIX server
37    pub async fn connect(&mut self) -> Result<()> {
38        info!(
39            "Connecting to Deribit FIX server at {}",
40            self.config.connection_url()
41        );
42
43        // Create connection
44        let connection = Connection::new(&self.config).await?;
45        self.connection = Some(Arc::new(Mutex::new(connection)));
46
47        // Create session
48        let session = Session::new(&self.config, self.connection.as_ref().unwrap().clone())?;
49        self.session = Some(Arc::new(Mutex::new(session)));
50
51        // Perform logon
52        self.logon().await?;
53
54        // Start background heartbeat task to keep the session alive
55        if let Some(session) = &self.session {
56            let session_arc = session.clone();
57            let hb_interval_secs = self.config.heartbeat_interval as u64;
58            self.heartbeat_task = Some(tokio::spawn(async move {
59                use tokio::time::{Duration, sleep};
60                loop {
61                    sleep(Duration::from_secs(hb_interval_secs)).await;
62                    let mut guard = session_arc.lock().await;
63                    // Only send heartbeat when logged on; stop loop when not active
64                    if guard.get_state() == crate::session::SessionState::LoggedOn {
65                        let _ = guard.send_heartbeat(None).await;
66                    } else {
67                        break;
68                    }
69                }
70            }));
71        }
72
73        info!("Successfully connected to Deribit FIX server");
74        Ok(())
75    }
76
77    /// Disconnect from the server
78    pub async fn disconnect(&mut self) -> Result<()> {
79        info!("Disconnecting from Deribit FIX server");
80
81        // Stop heartbeat task if running
82        if let Some(handle) = self.heartbeat_task.take() {
83            handle.abort();
84        }
85
86        if let Some(session) = &self.session {
87            let mut session_guard = session.lock().await;
88            session_guard.logout().await?;
89        }
90
91        if let Some(connection) = &self.connection {
92            let mut connection_guard = connection.lock().await;
93            connection_guard.close().await?;
94        }
95
96        self.connection = None;
97        self.session = None;
98
99        info!("Successfully disconnected from Deribit FIX server");
100        Ok(())
101    }
102
103    /// Check if the client is connected
104    pub fn is_connected(&self) -> bool {
105        self.connection.is_some() && self.session.is_some()
106    }
107
108    /// Get the current session state
109    pub async fn get_session_state(&self) -> Option<crate::session::SessionState> {
110        if let Some(session) = &self.session {
111            // Use async lock to properly wait for session access
112            let session_guard = session.lock().await;
113            Some(session_guard.get_state())
114        } else {
115            None
116        }
117    }
118
119    /// Perform FIX logon
120    async fn logon(&self) -> Result<()> {
121        if let Some(session) = &self.session {
122            let mut session_guard = session.lock().await;
123            session_guard.logon().await?;
124        }
125        Ok(())
126    }
127
128    /// Send a new order
129    pub async fn send_order(&self, order: NewOrderRequest) -> Result<String> {
130        if let Some(session) = &self.session {
131            let mut session_guard = session.lock().await;
132            session_guard.send_new_order(order)
133        } else {
134            Err(DeribitFixError::Session("Not connected".to_string()))
135        }
136    }
137
138    /// Cancel an order
139    pub async fn cancel_order(&self, order_id: String) -> Result<()> {
140        if let Some(session) = &self.session {
141            let mut session_guard = session.lock().await;
142            session_guard.cancel_order(order_id)
143        } else {
144            Err(DeribitFixError::Session("Not connected".to_string()))
145        }
146    }
147
148    /// Subscribe to market data
149    pub async fn subscribe_market_data(&self, symbol: String) -> Result<()> {
150        if let Some(session) = &self.session {
151            let mut session_guard = session.lock().await;
152            session_guard.subscribe_market_data(symbol).await
153        } else {
154            Err(DeribitFixError::Session("Not connected".to_string()))
155        }
156    }
157
158    /// Get account positions
159    pub async fn get_positions(&self) -> Result<Vec<Position>> {
160        if let Some(session) = &self.session {
161            let mut session_guard = session.lock().await;
162            session_guard.request_positions().await
163        } else {
164            Err(DeribitFixError::Session("Not connected".to_string()))
165        }
166    }
167
168    /// Receive and process a message from the server
169    pub async fn receive_message(&self) -> Result<Option<crate::model::message::FixMessage>> {
170        if let Some(session) = &self.session {
171            let mut session_guard = session.lock().await;
172            session_guard.receive_and_process_message().await
173        } else {
174            Err(DeribitFixError::Session("Not connected".to_string()))
175        }
176    }
177}