deribit_fix/connection/
mod.rs

1//! Connection management for Deribit FIX client
2
3use crate::{
4    config::Config,
5    error::{DeribitFixError, Result},
6    message::FixMessage,
7};
8use std::time::Duration;
9use tokio::{
10    io::{AsyncReadExt, AsyncWriteExt},
11    net::TcpStream,
12    time::timeout,
13};
14use tokio_native_tls::{TlsConnector, TlsStream};
15use tracing::{debug, error, info, warn};
16
17/// Connection wrapper for both TCP and TLS streams
18pub enum Stream {
19    Tcp(TcpStream),
20    Tls(TlsStream<TcpStream>),
21}
22
23impl Stream {
24    async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
25        match self {
26            Stream::Tcp(stream) => stream.read(buf).await,
27            Stream::Tls(stream) => stream.read(buf).await,
28        }
29    }
30
31    async fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
32        match self {
33            Stream::Tcp(stream) => stream.write_all(buf).await,
34            Stream::Tls(stream) => stream.write_all(buf).await,
35        }
36    }
37
38    async fn flush(&mut self) -> std::io::Result<()> {
39        match self {
40            Stream::Tcp(stream) => stream.flush().await,
41            Stream::Tls(stream) => stream.flush().await,
42        }
43    }
44}
45
46/// TCP/TLS connection to Deribit FIX server
47pub struct Connection {
48    stream: Stream,
49    config: Config,
50    buffer: Vec<u8>,
51    connected: bool,
52}
53
54impl Connection {
55    /// Create a new connection to the Deribit FIX server
56    pub async fn new(config: &Config) -> Result<Self> {
57        let stream = if config.use_ssl {
58            Self::connect_tls(config).await?
59        } else {
60            Self::connect_tcp(config).await?
61        };
62
63        Ok(Self {
64            stream,
65            config: config.clone(),
66            buffer: Vec::with_capacity(8192),
67            connected: true,
68        })
69    }
70
71    /// Connect using raw TCP
72    async fn connect_tcp(config: &Config) -> Result<Stream> {
73        info!("Connecting to {}:{} via TCP", config.host, config.port);
74        
75        let addr = format!("{}:{}", config.host, config.port);
76        let stream = timeout(config.connection_timeout, TcpStream::connect(&addr))
77            .await
78            .map_err(|_| DeribitFixError::Timeout(format!("Connection timeout to {}", addr)))?
79            .map_err(|e| DeribitFixError::Connection(format!("Failed to connect to {}: {}", addr, e)))?;
80
81        info!("Successfully connected via TCP");
82        Ok(Stream::Tcp(stream))
83    }
84
85    /// Connect using TLS
86    async fn connect_tls(config: &Config) -> Result<Stream> {
87        info!("Connecting to {}:{} via TLS", config.host, config.port);
88        
89        let addr = format!("{}:{}", config.host, config.port);
90        let tcp_stream = timeout(config.connection_timeout, TcpStream::connect(&addr))
91            .await
92            .map_err(|_| DeribitFixError::Timeout(format!("Connection timeout to {}", addr)))?
93            .map_err(|e| DeribitFixError::Connection(format!("Failed to connect to {}: {}", addr, e)))?;
94
95        let connector = TlsConnector::from(
96            native_tls::TlsConnector::builder()
97                .build()
98                .map_err(|e| DeribitFixError::Connection(format!("TLS connector creation failed: {}", e)))?
99        );
100
101        let tls_stream = connector
102            .connect(&config.host, tcp_stream)
103            .await
104            .map_err(|e| DeribitFixError::Connection(format!("TLS handshake failed: {}", e)))?;
105
106        info!("Successfully connected via TLS");
107        Ok(Stream::Tls(tls_stream))
108    }
109
110    /// Send a FIX message
111    pub async fn send_message(&mut self, message: &FixMessage) -> Result<()> {
112        if !self.connected {
113            return Err(DeribitFixError::Connection("Connection is closed".to_string()));
114        }
115
116        let raw_message = message.to_string();
117        debug!("Sending FIX message: {}", raw_message);
118
119        self.stream
120            .write_all(raw_message.as_bytes())
121            .await
122            .map_err(|e| DeribitFixError::Io(e))?;
123
124        self.stream
125            .flush()
126            .await
127            .map_err(|e| DeribitFixError::Io(e))?;
128
129        Ok(())
130    }
131
132    /// Receive a FIX message
133    pub async fn receive_message(&mut self) -> Result<Option<FixMessage>> {
134        if !self.connected {
135            return Err(DeribitFixError::Connection("Connection is closed".to_string()));
136        }
137
138        // Read data into buffer
139        let mut temp_buffer = [0u8; 4096];
140        let bytes_read = self.stream
141            .read(&mut temp_buffer)
142            .await
143            .map_err(|e| DeribitFixError::Io(e))?;
144
145        if bytes_read == 0 {
146            // Connection closed by peer
147            self.connected = false;
148            return Ok(None);
149        }
150
151        self.buffer.extend_from_slice(&temp_buffer[..bytes_read]);
152
153        // Try to parse a complete FIX message
154        if let Some(message) = self.try_parse_message()? {
155            debug!("Received FIX message: {}", message);
156            return Ok(Some(message));
157        }
158
159        Ok(None)
160    }
161
162    /// Try to parse a complete FIX message from the buffer
163    fn try_parse_message(&mut self) -> Result<Option<FixMessage>> {
164        // Look for SOH (Start of Header) character which separates FIX fields
165        const SOH: u8 = 0x01;
166        
167        // Find the end of a complete message
168        let mut msg_end = None;
169        
170        for (i, window) in self.buffer.windows(3).enumerate() {
171            if window == b"10=" {
172                // Look for SOH after checksum (3 digits)
173                if i + 6 < self.buffer.len() && self.buffer[i + 6] == SOH {
174                    msg_end = Some(i + 7);
175                    break;
176                }
177            }
178        }
179
180        if let Some(end_pos) = msg_end {
181            let message_bytes = self.buffer.drain(..end_pos).collect::<Vec<u8>>();
182            let message_str = String::from_utf8(message_bytes)
183                .map_err(|e| DeribitFixError::MessageParsing(format!("Invalid UTF-8: {}", e)))?;
184            
185            let message = FixMessage::parse(&message_str)?;
186            return Ok(Some(message));
187        }
188
189        Ok(None)
190    }
191
192    /// Check if the connection is active
193    pub fn is_connected(&self) -> bool {
194        self.connected
195    }
196
197    /// Close the connection
198    pub async fn close(&mut self) -> Result<()> {
199        self.connected = false;
200        info!("Connection closed");
201        Ok(())
202    }
203
204    /// Reconnect to the server
205    pub async fn reconnect(&mut self) -> Result<()> {
206        info!("Attempting to reconnect...");
207        
208        let stream = if self.config.use_ssl {
209            Self::connect_tls(&self.config).await?
210        } else {
211            Self::connect_tcp(&self.config).await?
212        };
213
214        self.stream = stream;
215        self.connected = true;
216        self.buffer.clear();
217
218        info!("Successfully reconnected");
219        Ok(())
220    }
221}