1use std::{
2 fmt,
3 marker::PhantomData,
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8use easyfix_messages::{
9 deserializer,
10 fields::{
11 FixString, SeqNum, SessionRejectReason, SessionStatus, TagNum,
12 parse_reject_reason_to_session_reject_reason,
13 },
14 messages::FixtMessage,
15};
16use futures::Stream;
17use tokio::sync::{mpsc, oneshot};
18use tokio_stream::wrappers::ReceiverStream;
19use tracing::error;
20
21use crate::{DisconnectReason, Sender, session_id::SessionId};
22
23#[derive(Debug)]
25pub enum DeserializeError {
26 GarbledMessage(String),
28 Logout,
29 Reject {
30 msg_type: Option<FixString>,
31 seq_num: SeqNum,
32 tag: Option<TagNum>,
33 reason: SessionRejectReason,
34 },
35}
36
37impl fmt::Display for DeserializeError {
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 match self {
40 DeserializeError::GarbledMessage(reason) => write!(f, "garbled message: {}", reason),
41 DeserializeError::Logout => write!(f, "MsgSeqNum missing"),
42 DeserializeError::Reject {
43 tag: Some(tag),
44 reason,
45 ..
46 } => write!(f, "{reason:?} (tag={tag})"),
47 DeserializeError::Reject {
48 tag: None, reason, ..
49 } => write!(f, "{reason:?}"),
50 }
51 }
52}
53
54impl std::error::Error for DeserializeError {}
55
56impl From<deserializer::DeserializeError> for DeserializeError {
57 fn from(error: deserializer::DeserializeError) -> Self {
58 use deserializer::DeserializeError as DeError;
59 match error {
60 DeError::GarbledMessage(reason) => DeserializeError::GarbledMessage(reason),
61 DeError::Logout => DeserializeError::Logout,
62 DeError::Reject {
63 msg_type,
64 seq_num,
65 tag,
66 reason,
67 } => DeserializeError::Reject {
68 msg_type,
69 seq_num,
70 tag,
71 reason: parse_reject_reason_to_session_reject_reason(reason),
72 },
73 }
74 }
75}
76
77pub struct DoNotSend {
78 pub gap_fill: bool,
79}
80
81#[derive(Debug)]
82pub(crate) enum InputResponderMsg {
83 Ignore,
84 Reject {
85 ref_msg_type: FixString,
86 ref_seq_num: SeqNum,
87 reason: SessionRejectReason,
88 text: FixString,
89 ref_tag_id: Option<i64>,
90 },
91 Logout {
92 session_status: Option<SessionStatus>,
93 text: Option<FixString>,
94 disconnect: bool,
95 },
96 Disconnect {
97 reason: Option<String>,
98 },
99}
100
101#[derive(Debug)]
102pub struct InputResponder<'a> {
103 sender: oneshot::Sender<InputResponderMsg>,
104 phantom_ref: PhantomData<&'a ()>,
105}
106
107impl<'a> InputResponder<'a> {
108 pub(crate) fn new(sender: oneshot::Sender<InputResponderMsg>) -> InputResponder<'a> {
109 InputResponder {
110 sender,
111 phantom_ref: PhantomData,
112 }
113 }
114
115 pub fn ignore(self) {
116 self.sender.send(InputResponderMsg::Ignore).unwrap();
117 }
118
119 pub fn reject(
120 self,
121 ref_msg_type: FixString,
122 ref_seq_num: SeqNum,
123 reason: SessionRejectReason,
124 text: FixString,
125 ref_tag_id: Option<i64>,
126 ) {
127 self.sender
128 .send(InputResponderMsg::Reject {
129 ref_msg_type,
130 ref_seq_num,
131 reason,
132 text,
133 ref_tag_id,
134 })
135 .unwrap();
136 }
137
138 pub fn logout(
139 self,
140 session_status: Option<SessionStatus>,
141 text: Option<FixString>,
142 disconnect: bool,
143 ) {
144 self.sender
145 .send(InputResponderMsg::Logout {
146 session_status,
147 text,
148 disconnect,
149 })
150 .unwrap();
151 }
152
153 pub fn disconnect(self) {
154 self.sender
155 .send(InputResponderMsg::Disconnect { reason: None })
156 .unwrap();
157 }
158}
159
160#[derive(Debug)]
161pub struct Responder {
162 sender: Option<oneshot::Sender<Box<FixtMessage>>>,
163 change_to_gap_fill: bool,
164}
165
166impl Responder {
167 pub(crate) fn new(sender: oneshot::Sender<Box<FixtMessage>>) -> Responder {
168 Responder {
169 sender: Some(sender),
170 change_to_gap_fill: false,
171 }
172 }
173
174 pub fn do_not_send(&mut self) {
175 self.sender.take();
177 }
178
179 pub fn change_to_gap_fill(&mut self) {
180 self.change_to_gap_fill = true;
181 }
182}
183
184#[derive(Debug)]
185pub(crate) enum FixEventInternal {
186 Created(SessionId),
187 Logon(SessionId, Option<Sender>),
188 Logout(SessionId, DisconnectReason),
189 AppMsgIn(
190 Option<Box<FixtMessage>>,
191 Option<oneshot::Sender<InputResponderMsg>>,
192 ),
193 AdmMsgIn(
194 Option<Box<FixtMessage>>,
195 Option<oneshot::Sender<InputResponderMsg>>,
196 ),
197 AppMsgOut(Option<Box<FixtMessage>>, Responder),
198 AdmMsgOut(Option<Box<FixtMessage>>, Responder),
199 DeserializeError(SessionId, DeserializeError),
200}
201
202impl Drop for FixEventInternal {
203 fn drop(&mut self) {
204 if let &mut FixEventInternal::AppMsgOut(ref mut msg, ref mut responder)
205 | &mut FixEventInternal::AdmMsgOut(ref mut msg, ref mut responder) = self
206 && let Some(sender) = responder.sender.take()
207 {
208 if responder.change_to_gap_fill {
209 sender.send(msg.take().unwrap()).unwrap();
211 } else {
212 sender.send(msg.take().unwrap()).unwrap();
213 }
214 }
215 }
216}
217
218#[derive(Debug)]
220pub enum FixEvent<'a> {
221 Created(&'a SessionId),
223
224 Logon(&'a SessionId, Sender),
228
229 Logout(&'a SessionId, DisconnectReason),
231
232 AppMsgIn(Box<FixtMessage>, InputResponder<'a>),
237
238 AdmMsgIn(Box<FixtMessage>, InputResponder<'a>),
243
244 AppMsgOut(&'a mut FixtMessage, &'a mut Responder), AdmMsgOut(&'a mut FixtMessage),
261
262 DeserializeError(&'a SessionId, &'a DeserializeError),
264}
265
266#[derive(Debug)]
267pub struct EventStream {
268 receiver: ReceiverStream<FixEventInternal>,
269}
270
271#[derive(Debug)]
272pub struct Emitter {
273 inner: mpsc::Sender<FixEventInternal>,
274}
275
276impl Clone for Emitter {
277 fn clone(&self) -> Self {
278 Self {
279 inner: self.inner.clone(),
280 }
281 }
282}
283
284impl Emitter {
285 pub(crate) async fn send(&self, event: FixEventInternal) {
286 if let Err(_e) = self.inner.send(event).await {
287 error!("Failed to send msg")
288 }
289 }
290}
291
292pub(crate) fn events_channel() -> (Emitter, EventStream) {
293 let (sender, receiver) = mpsc::channel(16);
294
295 (
296 Emitter { inner: sender },
297 EventStream {
298 receiver: receiver.into(),
299 },
300 )
301}
302
303mod private {
304 pub trait Sealed {}
305
306 impl Sealed for super::FixEventInternal {}
307}
308
309pub trait AsEvent: private::Sealed {
311 fn as_event(&mut self) -> FixEvent<'_>;
312}
313
314impl AsEvent for FixEventInternal {
315 fn as_event(&mut self) -> FixEvent<'_> {
316 match self {
317 FixEventInternal::Created(id) => FixEvent::Created(id),
318 FixEventInternal::Logon(id, sender) => FixEvent::Logon(id, sender.take().unwrap()),
319 FixEventInternal::Logout(id, reason) => FixEvent::Logout(id, *reason),
320 FixEventInternal::AppMsgIn(msg, sender) => FixEvent::AppMsgIn(
321 msg.take().unwrap(),
322 InputResponder::new(sender.take().unwrap()),
323 ),
324 FixEventInternal::AdmMsgIn(msg, sender) => FixEvent::AdmMsgIn(
325 msg.take().unwrap(),
326 InputResponder::new(sender.take().unwrap()),
327 ),
328 FixEventInternal::AppMsgOut(msg, resp) => {
329 FixEvent::AppMsgOut(msg.as_mut().unwrap(), resp)
330 }
331 FixEventInternal::AdmMsgOut(msg, _) => FixEvent::AdmMsgOut(msg.as_mut().unwrap()),
332 FixEventInternal::DeserializeError(session_id, deserialize_error) => {
333 FixEvent::DeserializeError(session_id, deserialize_error)
334 }
335 }
336 }
337}
338
339impl Stream for EventStream {
340 type Item = impl AsEvent;
341
342 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
343 Pin::new(&mut self.receiver).poll_next(cx)
344 }
345
346 fn size_hint(&self) -> (usize, Option<usize>) {
347 self.receiver.size_hint()
348 }
349}