ironfix_transport/
codec.rs1use bytes::{BufMut, BytesMut};
13use ironfix_tagvalue::checksum::{calculate_checksum, parse_checksum};
14use memchr::memchr;
15use thiserror::Error;
16use tokio_util::codec::{Decoder, Encoder};
17
18#[derive(Debug, Error, Clone, PartialEq, Eq)]
20pub enum CodecError {
21 #[error("incomplete message")]
23 Incomplete,
24
25 #[error("invalid begin string: message must start with 8=")]
27 InvalidBeginString,
28
29 #[error("missing body length field (tag 9)")]
31 MissingBodyLength,
32
33 #[error("invalid body length value")]
35 InvalidBodyLength,
36
37 #[error("checksum mismatch: calculated {calculated}, declared {declared}")]
39 ChecksumMismatch {
40 calculated: u8,
42 declared: u8,
44 },
45
46 #[error("message too large: {size} bytes exceeds maximum {max_size}")]
48 MessageTooLarge {
49 size: usize,
51 max_size: usize,
53 },
54
55 #[error("io error: {0}")]
57 Io(String),
58}
59
60impl From<std::io::Error> for CodecError {
61 fn from(err: std::io::Error) -> Self {
62 Self::Io(err.to_string())
63 }
64}
65
66const SOH: u8 = 0x01;
68
69#[derive(Debug, Clone)]
74pub struct FixCodec {
75 max_message_size: usize,
77 validate_checksum: bool,
79}
80
81impl FixCodec {
82 #[must_use]
84 pub fn new() -> Self {
85 Self {
86 max_message_size: 1024 * 1024, validate_checksum: true,
88 }
89 }
90
91 #[must_use]
93 pub const fn with_max_message_size(mut self, size: usize) -> Self {
94 self.max_message_size = size;
95 self
96 }
97
98 #[must_use]
100 pub const fn with_checksum_validation(mut self, validate: bool) -> Self {
101 self.validate_checksum = validate;
102 self
103 }
104}
105
106impl Default for FixCodec {
107 fn default() -> Self {
108 Self::new()
109 }
110}
111
112impl Decoder for FixCodec {
113 type Item = BytesMut;
114 type Error = CodecError;
115
116 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
117 if src.len() < 20 {
119 return Ok(None);
120 }
121
122 if src.len() < 2 || &src[0..2] != b"8=" {
124 return Err(CodecError::InvalidBeginString);
125 }
126
127 let first_soh = match memchr(SOH, src) {
129 Some(pos) => pos,
130 None => return Ok(None),
131 };
132
133 let body_len_start = first_soh + 1;
135 if src.len() < body_len_start + 3 {
136 return Ok(None);
137 }
138
139 if &src[body_len_start..body_len_start + 2] != b"9=" {
140 return Err(CodecError::MissingBodyLength);
141 }
142
143 let body_len_soh = match memchr(SOH, &src[body_len_start..]) {
145 Some(pos) => body_len_start + pos,
146 None => return Ok(None),
147 };
148
149 let body_len_str = std::str::from_utf8(&src[body_len_start + 2..body_len_soh])
151 .map_err(|_| CodecError::InvalidBodyLength)?;
152 let body_length: usize = body_len_str
153 .parse()
154 .map_err(|_| CodecError::InvalidBodyLength)?;
155
156 let total_length = body_len_soh + 1 + body_length + 7; if total_length > self.max_message_size {
163 return Err(CodecError::MessageTooLarge {
164 size: total_length,
165 max_size: self.max_message_size,
166 });
167 }
168
169 if src.len() < total_length {
171 src.reserve(total_length - src.len());
172 return Ok(None);
173 }
174
175 if self.validate_checksum {
177 let checksum_start = total_length - 4;
179 let checksum_bytes = &src[checksum_start..checksum_start + 3];
180
181 let declared = parse_checksum(checksum_bytes).ok_or(CodecError::InvalidBodyLength)?;
182
183 let checksum_field_start = total_length - 7;
185 let calculated = calculate_checksum(&src[..checksum_field_start]);
186
187 if calculated != declared {
188 return Err(CodecError::ChecksumMismatch {
189 calculated,
190 declared,
191 });
192 }
193 }
194
195 let message = src.split_to(total_length);
197 Ok(Some(message))
198 }
199}
200
201impl Encoder<&[u8]> for FixCodec {
202 type Error = CodecError;
203
204 fn encode(&mut self, item: &[u8], dst: &mut BytesMut) -> Result<(), Self::Error> {
205 dst.reserve(item.len());
206 dst.put_slice(item);
207 Ok(())
208 }
209}
210
211impl Encoder<BytesMut> for FixCodec {
212 type Error = CodecError;
213
214 fn encode(&mut self, item: BytesMut, dst: &mut BytesMut) -> Result<(), Self::Error> {
215 dst.reserve(item.len());
216 dst.put_slice(&item);
217 Ok(())
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224
225 fn make_fix_message(body: &str) -> Vec<u8> {
226 let header = format!("8=FIX.4.4\x019={}\x01", body.len());
227 let without_checksum = format!("{}{}", header, body);
228 let checksum = calculate_checksum(without_checksum.as_bytes());
229 format!("{}10={:03}\x01", without_checksum, checksum).into_bytes()
230 }
231
232 #[test]
233 fn test_codec_decode_complete_message() {
234 let mut codec = FixCodec::new();
235 let msg = make_fix_message("35=0\x01");
236 let mut buf = BytesMut::from(&msg[..]);
237
238 let result = codec.decode(&mut buf).unwrap();
239 assert!(result.is_some());
240 assert!(buf.is_empty());
241 }
242
243 #[test]
244 fn test_codec_decode_incomplete() {
245 let mut codec = FixCodec::new();
246 let msg = make_fix_message("35=0\x01");
247 let mut buf = BytesMut::from(&msg[..msg.len() - 5]);
248
249 let result = codec.decode(&mut buf).unwrap();
250 assert!(result.is_none());
251 }
252
253 #[test]
254 fn test_codec_decode_invalid_begin_string() {
255 let mut codec = FixCodec::new();
256 let mut buf = BytesMut::from(&b"9=FIX.4.4\x019=5\x0135=0\x0110=000\x01"[..]);
258
259 let result = codec.decode(&mut buf);
260 assert!(matches!(result, Err(CodecError::InvalidBeginString)));
261 }
262
263 #[test]
264 fn test_codec_decode_checksum_mismatch() {
265 let mut codec = FixCodec::new();
266 let mut buf = BytesMut::from(&b"8=FIX.4.4\x019=5\x0135=0\x0110=000\x01"[..]);
267
268 let result = codec.decode(&mut buf);
269 assert!(matches!(result, Err(CodecError::ChecksumMismatch { .. })));
270 }
271
272 #[test]
273 fn test_codec_decode_no_checksum_validation() {
274 let mut codec = FixCodec::new().with_checksum_validation(false);
275 let mut buf = BytesMut::from(&b"8=FIX.4.4\x019=5\x0135=0\x0110=000\x01"[..]);
276
277 let result = codec.decode(&mut buf).unwrap();
278 assert!(result.is_some());
279 }
280
281 #[test]
282 fn test_codec_encode() {
283 let mut codec = FixCodec::new();
284 let msg = b"8=FIX.4.4\x019=5\x0135=0\x0110=123\x01";
285 let mut dst = BytesMut::new();
286
287 codec.encode(&msg[..], &mut dst).unwrap();
288 assert_eq!(&dst[..], msg);
289 }
290}