deribit_fix/connection/
mod.rs1use crate::{
4 config::Config,
5 error::{DeribitFixError, Result},
6 message::FixMessage,
7};
8use std::time::Duration;
9use tokio::{
10 io::{AsyncReadExt, AsyncWriteExt},
11 net::TcpStream,
12 time::timeout,
13};
14use tokio_native_tls::{TlsConnector, TlsStream};
15use tracing::{debug, error, info, warn};
16
17pub enum Stream {
19 Tcp(TcpStream),
20 Tls(TlsStream<TcpStream>),
21}
22
23impl Stream {
24 async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
25 match self {
26 Stream::Tcp(stream) => stream.read(buf).await,
27 Stream::Tls(stream) => stream.read(buf).await,
28 }
29 }
30
31 async fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
32 match self {
33 Stream::Tcp(stream) => stream.write_all(buf).await,
34 Stream::Tls(stream) => stream.write_all(buf).await,
35 }
36 }
37
38 async fn flush(&mut self) -> std::io::Result<()> {
39 match self {
40 Stream::Tcp(stream) => stream.flush().await,
41 Stream::Tls(stream) => stream.flush().await,
42 }
43 }
44}
45
46pub struct Connection {
48 stream: Stream,
49 config: Config,
50 buffer: Vec<u8>,
51 connected: bool,
52}
53
54impl Connection {
55 pub async fn new(config: &Config) -> Result<Self> {
57 let stream = if config.use_ssl {
58 Self::connect_tls(config).await?
59 } else {
60 Self::connect_tcp(config).await?
61 };
62
63 Ok(Self {
64 stream,
65 config: config.clone(),
66 buffer: Vec::with_capacity(8192),
67 connected: true,
68 })
69 }
70
71 async fn connect_tcp(config: &Config) -> Result<Stream> {
73 info!("Connecting to {}:{} via TCP", config.host, config.port);
74
75 let addr = format!("{}:{}", config.host, config.port);
76 let stream = timeout(config.connection_timeout, TcpStream::connect(&addr))
77 .await
78 .map_err(|_| DeribitFixError::Timeout(format!("Connection timeout to {}", addr)))?
79 .map_err(|e| DeribitFixError::Connection(format!("Failed to connect to {}: {}", addr, e)))?;
80
81 info!("Successfully connected via TCP");
82 Ok(Stream::Tcp(stream))
83 }
84
85 async fn connect_tls(config: &Config) -> Result<Stream> {
87 info!("Connecting to {}:{} via TLS", config.host, config.port);
88
89 let addr = format!("{}:{}", config.host, config.port);
90 let tcp_stream = timeout(config.connection_timeout, TcpStream::connect(&addr))
91 .await
92 .map_err(|_| DeribitFixError::Timeout(format!("Connection timeout to {}", addr)))?
93 .map_err(|e| DeribitFixError::Connection(format!("Failed to connect to {}: {}", addr, e)))?;
94
95 let connector = TlsConnector::from(
96 native_tls::TlsConnector::builder()
97 .build()
98 .map_err(|e| DeribitFixError::Connection(format!("TLS connector creation failed: {}", e)))?
99 );
100
101 let tls_stream = connector
102 .connect(&config.host, tcp_stream)
103 .await
104 .map_err(|e| DeribitFixError::Connection(format!("TLS handshake failed: {}", e)))?;
105
106 info!("Successfully connected via TLS");
107 Ok(Stream::Tls(tls_stream))
108 }
109
110 pub async fn send_message(&mut self, message: &FixMessage) -> Result<()> {
112 if !self.connected {
113 return Err(DeribitFixError::Connection("Connection is closed".to_string()));
114 }
115
116 let raw_message = message.to_string();
117 debug!("Sending FIX message: {}", raw_message);
118
119 self.stream
120 .write_all(raw_message.as_bytes())
121 .await
122 .map_err(|e| DeribitFixError::Io(e))?;
123
124 self.stream
125 .flush()
126 .await
127 .map_err(|e| DeribitFixError::Io(e))?;
128
129 Ok(())
130 }
131
132 pub async fn receive_message(&mut self) -> Result<Option<FixMessage>> {
134 if !self.connected {
135 return Err(DeribitFixError::Connection("Connection is closed".to_string()));
136 }
137
138 let mut temp_buffer = [0u8; 4096];
140 let bytes_read = self.stream
141 .read(&mut temp_buffer)
142 .await
143 .map_err(|e| DeribitFixError::Io(e))?;
144
145 if bytes_read == 0 {
146 self.connected = false;
148 return Ok(None);
149 }
150
151 self.buffer.extend_from_slice(&temp_buffer[..bytes_read]);
152
153 if let Some(message) = self.try_parse_message()? {
155 debug!("Received FIX message: {}", message);
156 return Ok(Some(message));
157 }
158
159 Ok(None)
160 }
161
162 fn try_parse_message(&mut self) -> Result<Option<FixMessage>> {
164 const SOH: u8 = 0x01;
166
167 let mut msg_end = None;
169
170 for (i, window) in self.buffer.windows(3).enumerate() {
171 if window == b"10=" {
172 if i + 6 < self.buffer.len() && self.buffer[i + 6] == SOH {
174 msg_end = Some(i + 7);
175 break;
176 }
177 }
178 }
179
180 if let Some(end_pos) = msg_end {
181 let message_bytes = self.buffer.drain(..end_pos).collect::<Vec<u8>>();
182 let message_str = String::from_utf8(message_bytes)
183 .map_err(|e| DeribitFixError::MessageParsing(format!("Invalid UTF-8: {}", e)))?;
184
185 let message = FixMessage::parse(&message_str)?;
186 return Ok(Some(message));
187 }
188
189 Ok(None)
190 }
191
192 pub fn is_connected(&self) -> bool {
194 self.connected
195 }
196
197 pub async fn close(&mut self) -> Result<()> {
199 self.connected = false;
200 info!("Connection closed");
201 Ok(())
202 }
203
204 pub async fn reconnect(&mut self) -> Result<()> {
206 info!("Attempting to reconnect...");
207
208 let stream = if self.config.use_ssl {
209 Self::connect_tls(&self.config).await?
210 } else {
211 Self::connect_tcp(&self.config).await?
212 };
213
214 self.stream = stream;
215 self.connected = true;
216 self.buffer.clear();
217
218 info!("Successfully reconnected");
219 Ok(())
220 }
221}