Skip to main content

ironfix_transport/
codec.rs

1/******************************************************************************
2   Author: Joaquín Béjar García
3   Email: jb@taunais.com
4   Date: 27/1/26
5******************************************************************************/
6
7//! Tokio codec for FIX message framing.
8//!
9//! This module provides a codec that handles FIX message framing over TCP,
10//! including BeginString, BodyLength, and Checksum validation.
11
12use bytes::{BufMut, BytesMut};
13use ironfix_tagvalue::checksum::{calculate_checksum, parse_checksum};
14use memchr::memchr;
15use thiserror::Error;
16use tokio_util::codec::{Decoder, Encoder};
17
18/// Errors that can occur during codec operations.
19#[derive(Debug, Error, Clone, PartialEq, Eq)]
20pub enum CodecError {
21    /// Message is incomplete, need more data.
22    #[error("incomplete message")]
23    Incomplete,
24
25    /// Invalid BeginString field.
26    #[error("invalid begin string: message must start with 8=")]
27    InvalidBeginString,
28
29    /// Missing BodyLength field.
30    #[error("missing body length field (tag 9)")]
31    MissingBodyLength,
32
33    /// Invalid BodyLength value.
34    #[error("invalid body length value")]
35    InvalidBodyLength,
36
37    /// Checksum mismatch.
38    #[error("checksum mismatch: calculated {calculated}, declared {declared}")]
39    ChecksumMismatch {
40        /// Calculated checksum.
41        calculated: u8,
42        /// Declared checksum in message.
43        declared: u8,
44    },
45
46    /// Message exceeds maximum size.
47    #[error("message too large: {size} bytes exceeds maximum {max_size}")]
48    MessageTooLarge {
49        /// Actual message size.
50        size: usize,
51        /// Maximum allowed size.
52        max_size: usize,
53    },
54
55    /// I/O error.
56    #[error("io error: {0}")]
57    Io(String),
58}
59
60impl From<std::io::Error> for CodecError {
61    fn from(err: std::io::Error) -> Self {
62        Self::Io(err.to_string())
63    }
64}
65
66/// SOH delimiter.
67const SOH: u8 = 0x01;
68
69/// Tokio codec for FIX message framing.
70///
71/// Handles parsing of FIX messages from a byte stream, validating
72/// BeginString, BodyLength, and optionally Checksum.
73#[derive(Debug, Clone)]
74pub struct FixCodec {
75    /// Maximum message size in bytes.
76    max_message_size: usize,
77    /// Whether to validate checksums.
78    validate_checksum: bool,
79}
80
81impl FixCodec {
82    /// Creates a new codec with default settings.
83    #[must_use]
84    pub fn new() -> Self {
85        Self {
86            max_message_size: 1024 * 1024, // 1MB
87            validate_checksum: true,
88        }
89    }
90
91    /// Sets the maximum message size.
92    #[must_use]
93    pub const fn with_max_message_size(mut self, size: usize) -> Self {
94        self.max_message_size = size;
95        self
96    }
97
98    /// Sets whether to validate checksums.
99    #[must_use]
100    pub const fn with_checksum_validation(mut self, validate: bool) -> Self {
101        self.validate_checksum = validate;
102        self
103    }
104}
105
106impl Default for FixCodec {
107    fn default() -> Self {
108        Self::new()
109    }
110}
111
112impl Decoder for FixCodec {
113    type Item = BytesMut;
114    type Error = CodecError;
115
116    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
117        // Minimum FIX message size: 8=FIX.4.2|9=X|35=0|10=XXX| (minimum ~25 bytes)
118        if src.len() < 20 {
119            return Ok(None);
120        }
121
122        // Validate BeginString starts with "8="
123        if src.len() < 2 || &src[0..2] != b"8=" {
124            return Err(CodecError::InvalidBeginString);
125        }
126
127        // Find first SOH to get BeginString value
128        let first_soh = match memchr(SOH, src) {
129            Some(pos) => pos,
130            None => return Ok(None),
131        };
132
133        // Find BodyLength field (9=XXX|)
134        let body_len_start = first_soh + 1;
135        if src.len() < body_len_start + 3 {
136            return Ok(None);
137        }
138
139        if &src[body_len_start..body_len_start + 2] != b"9=" {
140            return Err(CodecError::MissingBodyLength);
141        }
142
143        // Find SOH after BodyLength
144        let body_len_soh = match memchr(SOH, &src[body_len_start..]) {
145            Some(pos) => body_len_start + pos,
146            None => return Ok(None),
147        };
148
149        // Parse BodyLength value
150        let body_len_str = std::str::from_utf8(&src[body_len_start + 2..body_len_soh])
151            .map_err(|_| CodecError::InvalidBodyLength)?;
152        let body_length: usize = body_len_str
153            .parse()
154            .map_err(|_| CodecError::InvalidBodyLength)?;
155
156        // Calculate total message length
157        // BodyLength counts from after 9=XXX| to before 10=
158        // Total = header + body + trailer (10=XXX|)
159        let total_length = body_len_soh + 1 + body_length + 7; // +7 for |10=XXX|
160
161        // Check maximum size
162        if total_length > self.max_message_size {
163            return Err(CodecError::MessageTooLarge {
164                size: total_length,
165                max_size: self.max_message_size,
166            });
167        }
168
169        // Check if we have the complete message
170        if src.len() < total_length {
171            src.reserve(total_length - src.len());
172            return Ok(None);
173        }
174
175        // Validate checksum if enabled
176        if self.validate_checksum {
177            // Checksum is at total_length - 4 to total_length - 1 (3 digits)
178            let checksum_start = total_length - 4;
179            let checksum_bytes = &src[checksum_start..checksum_start + 3];
180
181            let declared = parse_checksum(checksum_bytes).ok_or(CodecError::InvalidBodyLength)?;
182
183            // Calculate checksum of everything before 10=
184            let checksum_field_start = total_length - 7;
185            let calculated = calculate_checksum(&src[..checksum_field_start]);
186
187            if calculated != declared {
188                return Err(CodecError::ChecksumMismatch {
189                    calculated,
190                    declared,
191                });
192            }
193        }
194
195        // Extract the complete message
196        let message = src.split_to(total_length);
197        Ok(Some(message))
198    }
199}
200
201impl Encoder<&[u8]> for FixCodec {
202    type Error = CodecError;
203
204    fn encode(&mut self, item: &[u8], dst: &mut BytesMut) -> Result<(), Self::Error> {
205        dst.reserve(item.len());
206        dst.put_slice(item);
207        Ok(())
208    }
209}
210
211impl Encoder<BytesMut> for FixCodec {
212    type Error = CodecError;
213
214    fn encode(&mut self, item: BytesMut, dst: &mut BytesMut) -> Result<(), Self::Error> {
215        dst.reserve(item.len());
216        dst.put_slice(&item);
217        Ok(())
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224
225    fn make_fix_message(body: &str) -> Vec<u8> {
226        let header = format!("8=FIX.4.4\x019={}\x01", body.len());
227        let without_checksum = format!("{}{}", header, body);
228        let checksum = calculate_checksum(without_checksum.as_bytes());
229        format!("{}10={:03}\x01", without_checksum, checksum).into_bytes()
230    }
231
232    #[test]
233    fn test_codec_decode_complete_message() {
234        let mut codec = FixCodec::new();
235        let msg = make_fix_message("35=0\x01");
236        let mut buf = BytesMut::from(&msg[..]);
237
238        let result = codec.decode(&mut buf).unwrap();
239        assert!(result.is_some());
240        assert!(buf.is_empty());
241    }
242
243    #[test]
244    fn test_codec_decode_incomplete() {
245        let mut codec = FixCodec::new();
246        let msg = make_fix_message("35=0\x01");
247        let mut buf = BytesMut::from(&msg[..msg.len() - 5]);
248
249        let result = codec.decode(&mut buf).unwrap();
250        assert!(result.is_none());
251    }
252
253    #[test]
254    fn test_codec_decode_invalid_begin_string() {
255        let mut codec = FixCodec::new();
256        // Message without proper 8= prefix (needs at least 20 bytes)
257        let mut buf = BytesMut::from(&b"9=FIX.4.4\x019=5\x0135=0\x0110=000\x01"[..]);
258
259        let result = codec.decode(&mut buf);
260        assert!(matches!(result, Err(CodecError::InvalidBeginString)));
261    }
262
263    #[test]
264    fn test_codec_decode_checksum_mismatch() {
265        let mut codec = FixCodec::new();
266        let mut buf = BytesMut::from(&b"8=FIX.4.4\x019=5\x0135=0\x0110=000\x01"[..]);
267
268        let result = codec.decode(&mut buf);
269        assert!(matches!(result, Err(CodecError::ChecksumMismatch { .. })));
270    }
271
272    #[test]
273    fn test_codec_decode_no_checksum_validation() {
274        let mut codec = FixCodec::new().with_checksum_validation(false);
275        let mut buf = BytesMut::from(&b"8=FIX.4.4\x019=5\x0135=0\x0110=000\x01"[..]);
276
277        let result = codec.decode(&mut buf).unwrap();
278        assert!(result.is_some());
279    }
280
281    #[test]
282    fn test_codec_encode() {
283        let mut codec = FixCodec::new();
284        let msg = b"8=FIX.4.4\x019=5\x0135=0\x0110=123\x01";
285        let mut dst = BytesMut::new();
286
287        codec.encode(&msg[..], &mut dst).unwrap();
288        assert_eq!(&dst[..], msg);
289    }
290}