1use super::{IncomingMessages, OutgoingMessages};
8use std::collections::HashMap;
9
10#[derive(Debug, Clone)]
12pub struct ParsedField {
13 pub name: String,
15 pub value: String,
17}
18
19type FieldTransform = Box<dyn Fn(&str) -> String + Send + Sync>;
21
22pub struct FieldDef {
24 index: usize,
25 name: &'static str,
26 transform: Option<FieldTransform>,
27}
28
29impl FieldDef {
30 pub fn new(index: usize, name: &'static str) -> Self {
32 Self {
33 index,
34 name,
35 transform: None,
36 }
37 }
38
39 pub fn with_transform<F>(mut self, f: F) -> Self
41 where
42 F: Fn(&str) -> String + Send + Sync + 'static,
43 {
44 self.transform = Some(Box::new(f));
45 self
46 }
47}
48
49pub trait MessageParser: Send + Sync {
51 fn parse(&self, parts: &[&str]) -> Vec<ParsedField>;
53}
54
55pub struct FieldBasedParser {
57 fields: Vec<FieldDef>,
58}
59
60impl FieldBasedParser {
61 pub fn new(fields: Vec<FieldDef>) -> Self {
63 Self { fields }
64 }
65}
66
67impl MessageParser for FieldBasedParser {
68 fn parse(&self, parts: &[&str]) -> Vec<ParsedField> {
69 let mut result = Vec::new();
70
71 for field_def in &self.fields {
72 if let Some(value) = parts.get(field_def.index) {
73 let processed_value = if let Some(transform) = &field_def.transform {
74 transform(value)
75 } else {
76 value.to_string()
77 };
78
79 result.push(ParsedField {
80 name: field_def.name.to_string(),
81 value: processed_value,
82 });
83 }
84 }
85
86 result
87 }
88}
89
90pub struct TimestampParser {
92 base_parser: FieldBasedParser,
93 timestamp_index: usize,
94}
95
96impl TimestampParser {
97 pub fn new(base_parser: FieldBasedParser, timestamp_index: usize) -> Self {
99 Self {
100 base_parser,
101 timestamp_index,
102 }
103 }
104}
105
106impl MessageParser for TimestampParser {
107 fn parse(&self, parts: &[&str]) -> Vec<ParsedField> {
108 let mut fields = self.base_parser.parse(parts);
109
110 if let Some(timestamp) = parts.get(self.timestamp_index) {
112 if let Ok(ts) = timestamp.parse::<i64>() {
113 if let Ok(dt) = time::OffsetDateTime::from_unix_timestamp(ts) {
114 fields.push(ParsedField {
115 name: "timestamp_parsed".to_string(),
116 value: dt.to_string(),
117 });
118 }
119 }
120 }
121
122 fields
123 }
124}
125
126pub struct MessageParserRegistry {
128 request_parsers: HashMap<OutgoingMessages, Box<dyn MessageParser>>,
129 response_parsers: HashMap<IncomingMessages, Box<dyn MessageParser>>,
130}
131
132impl MessageParserRegistry {
133 pub fn new() -> Self {
135 let mut registry = Self {
136 request_parsers: HashMap::new(),
137 response_parsers: HashMap::new(),
138 };
139
140 registry.register_default_parsers();
141 registry
142 }
143
144 fn register_default_parsers(&mut self) {
145 self.request_parsers.insert(
148 OutgoingMessages::RequestCurrentTime,
149 Box::new(FieldBasedParser::new(vec![FieldDef::new(0, "message_type"), FieldDef::new(1, "version")])),
150 );
151
152 self.request_parsers.insert(
154 OutgoingMessages::RequestManagedAccounts,
155 Box::new(FieldBasedParser::new(vec![FieldDef::new(0, "message_type"), FieldDef::new(1, "version")])),
156 );
157
158 self.request_parsers.insert(
160 OutgoingMessages::RequestPositions,
161 Box::new(FieldBasedParser::new(vec![FieldDef::new(0, "message_type"), FieldDef::new(1, "version")])),
162 );
163
164 self.request_parsers.insert(
166 OutgoingMessages::CancelPositions,
167 Box::new(FieldBasedParser::new(vec![FieldDef::new(0, "message_type"), FieldDef::new(1, "version")])),
168 );
169
170 self.request_parsers.insert(
172 OutgoingMessages::RequestAccountSummary,
173 Box::new(FieldBasedParser::new(vec![
174 FieldDef::new(0, "message_type"),
175 FieldDef::new(1, "version"),
176 FieldDef::new(2, "request_id"),
177 FieldDef::new(3, "group"),
178 FieldDef::new(4, "tags"),
179 ])),
180 );
181
182 self.request_parsers.insert(
184 OutgoingMessages::CancelAccountSummary,
185 Box::new(FieldBasedParser::new(vec![
186 FieldDef::new(0, "message_type"),
187 FieldDef::new(1, "version"),
188 FieldDef::new(2, "request_id"),
189 ])),
190 );
191
192 self.request_parsers.insert(
194 OutgoingMessages::RequestPnL,
195 Box::new(FieldBasedParser::new(vec![
196 FieldDef::new(0, "message_type"),
197 FieldDef::new(1, "request_id"),
198 FieldDef::new(2, "account"),
199 FieldDef::new(3, "model_code"),
200 ])),
201 );
202
203 self.request_parsers.insert(
205 OutgoingMessages::CancelPnL,
206 Box::new(FieldBasedParser::new(vec![
207 FieldDef::new(0, "message_type"),
208 FieldDef::new(1, "request_id"),
209 ])),
210 );
211
212 self.request_parsers.insert(
214 OutgoingMessages::RequestPnLSingle,
215 Box::new(FieldBasedParser::new(vec![
216 FieldDef::new(0, "message_type"),
217 FieldDef::new(1, "request_id"),
218 FieldDef::new(2, "account"),
219 FieldDef::new(3, "model_code"),
220 FieldDef::new(4, "contract_id"),
221 ])),
222 );
223
224 self.request_parsers.insert(
226 OutgoingMessages::CancelPnLSingle,
227 Box::new(FieldBasedParser::new(vec![
228 FieldDef::new(0, "message_type"),
229 FieldDef::new(1, "request_id"),
230 ])),
231 );
232
233 self.response_parsers.insert(
236 IncomingMessages::CurrentTime,
237 Box::new(TimestampParser::new(
238 FieldBasedParser::new(vec![
239 FieldDef::new(0, "message_type"),
240 FieldDef::new(1, "version"),
241 FieldDef::new(2, "timestamp"),
242 ]),
243 2, )),
245 );
246
247 self.response_parsers.insert(
249 IncomingMessages::Error,
250 Box::new(FieldBasedParser::new(vec![
251 FieldDef::new(0, "message_type"),
252 FieldDef::new(1, "version"),
253 FieldDef::new(2, "request_id"),
254 FieldDef::new(3, "error_code"),
255 FieldDef::new(4, "error_message"),
256 ])),
257 );
258
259 self.response_parsers.insert(
261 IncomingMessages::ManagedAccounts,
262 Box::new(FieldBasedParser::new(vec![
263 FieldDef::new(0, "message_type"),
264 FieldDef::new(1, "version"),
265 FieldDef::new(2, "accounts"),
266 ])),
267 );
268
269 self.response_parsers.insert(
271 IncomingMessages::Position,
272 Box::new(FieldBasedParser::new(vec![
273 FieldDef::new(0, "message_type"),
274 FieldDef::new(1, "version"),
275 FieldDef::new(2, "account"),
276 FieldDef::new(3, "contract_id"),
277 FieldDef::new(4, "symbol"),
278 FieldDef::new(5, "security_type"),
279 FieldDef::new(6, "last_trade_date_or_contract_month"),
280 FieldDef::new(7, "strike"),
281 FieldDef::new(8, "right"),
282 FieldDef::new(9, "multiplier"),
283 FieldDef::new(10, "exchange"),
284 FieldDef::new(11, "currency"),
285 FieldDef::new(12, "local_symbol"),
286 FieldDef::new(13, "trading_class"),
287 FieldDef::new(14, "position"),
288 FieldDef::new(15, "average_cost"),
289 ])),
290 );
291
292 self.response_parsers.insert(
294 IncomingMessages::PositionEnd,
295 Box::new(FieldBasedParser::new(vec![FieldDef::new(0, "message_type"), FieldDef::new(1, "version")])),
296 );
297
298 self.response_parsers.insert(
300 IncomingMessages::AccountSummary,
301 Box::new(FieldBasedParser::new(vec![
302 FieldDef::new(0, "message_type"),
303 FieldDef::new(1, "version"),
304 FieldDef::new(2, "request_id"),
305 FieldDef::new(3, "account"),
306 FieldDef::new(4, "tag"),
307 FieldDef::new(5, "value"),
308 FieldDef::new(6, "currency"),
309 ])),
310 );
311
312 self.response_parsers.insert(
314 IncomingMessages::AccountSummaryEnd,
315 Box::new(FieldBasedParser::new(vec![
316 FieldDef::new(0, "message_type"),
317 FieldDef::new(1, "version"),
318 FieldDef::new(2, "request_id"),
319 ])),
320 );
321
322 self.response_parsers.insert(
324 IncomingMessages::PnL,
325 Box::new(FieldBasedParser::new(vec![
326 FieldDef::new(0, "message_type"),
327 FieldDef::new(1, "request_id"),
328 FieldDef::new(2, "daily_pnl"),
329 FieldDef::new(3, "unrealized_pnl"),
330 FieldDef::new(4, "realized_pnl"),
331 ])),
332 );
333
334 self.response_parsers.insert(
336 IncomingMessages::PnLSingle,
337 Box::new(FieldBasedParser::new(vec![
338 FieldDef::new(0, "message_type"),
339 FieldDef::new(1, "request_id"),
340 FieldDef::new(2, "position"),
341 FieldDef::new(3, "daily_pnl"),
342 FieldDef::new(4, "unrealized_pnl"),
343 FieldDef::new(5, "realized_pnl"),
344 FieldDef::new(6, "value"),
345 ])),
346 );
347 }
348
349 pub fn parse_request(&self, msg_type: OutgoingMessages, parts: &[&str]) -> Vec<ParsedField> {
351 if let Some(parser) = self.request_parsers.get(&msg_type) {
352 parser.parse(parts)
353 } else {
354 parse_generic_message(parts)
355 }
356 }
357
358 pub fn parse_response(&self, msg_type: IncomingMessages, parts: &[&str]) -> Vec<ParsedField> {
360 if let Some(parser) = self.response_parsers.get(&msg_type) {
361 parser.parse(parts)
362 } else {
363 parse_generic_message(parts)
364 }
365 }
366
367 pub fn register_request_parser(&mut self, msg_type: OutgoingMessages, parser: Box<dyn MessageParser>) {
369 self.request_parsers.insert(msg_type, parser);
370 }
371
372 pub fn register_response_parser(&mut self, msg_type: IncomingMessages, parser: Box<dyn MessageParser>) {
374 self.response_parsers.insert(msg_type, parser);
375 }
376}
377
378impl Default for MessageParserRegistry {
379 fn default() -> Self {
380 Self::new()
381 }
382}
383
384pub fn parse_generic_message(parts: &[&str]) -> Vec<ParsedField> {
386 let mut fields = Vec::new();
387
388 if let Some(msg_type) = parts.first() {
390 fields.push(ParsedField {
391 name: "message_type".to_string(),
392 value: msg_type.to_string(),
393 });
394 }
395
396 for (i, part) in parts.iter().skip(1).enumerate() {
398 if i == parts.len() - 2 && part.is_empty() {
400 continue;
401 }
402 fields.push(ParsedField {
403 name: format!("field_{}", i + 2),
404 value: part.to_string(),
405 });
406 }
407
408 fields
409}