1use std::{
2 fmt,
3 marker::PhantomData,
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8use easyfix_messages::{
9 deserializer,
10 fields::{
11 parse_reject_reason_to_session_reject_reason, FixString, SeqNum, SessionRejectReason,
12 SessionStatus, TagNum,
13 },
14 messages::FixtMessage,
15};
16use futures::Stream;
17use tokio::sync::{mpsc, oneshot};
18use tokio_stream::wrappers::ReceiverStream;
19use tracing::error;
20
21use crate::{session_id::SessionId, DisconnectReason, Sender};
22
23#[derive(Debug)]
25pub enum DeserializeError {
26 GarbledMessage(String),
28 Logout,
29 Reject {
30 msg_type: Option<FixString>,
31 seq_num: SeqNum,
32 tag: Option<TagNum>,
33 reason: SessionRejectReason,
34 },
35}
36
37impl fmt::Display for DeserializeError {
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 match self {
40 DeserializeError::GarbledMessage(reason) => write!(f, "garbled message: {}", reason),
41 DeserializeError::Logout => write!(f, "MsgSeqNum missing"),
42 DeserializeError::Reject {
43 tag: Some(tag),
44 reason,
45 ..
46 } => write!(f, "{reason:?} (tag={tag})"),
47 DeserializeError::Reject {
48 tag: None, reason, ..
49 } => write!(f, "{reason:?}"),
50 }
51 }
52}
53
54impl std::error::Error for DeserializeError {}
55
56impl From<deserializer::DeserializeError> for DeserializeError {
57 fn from(error: deserializer::DeserializeError) -> Self {
58 use deserializer::DeserializeError as DeError;
59 match error {
60 DeError::GarbledMessage(reason) => DeserializeError::GarbledMessage(reason),
61 DeError::Logout => DeserializeError::Logout,
62 DeError::Reject {
63 msg_type,
64 seq_num,
65 tag,
66 reason,
67 } => DeserializeError::Reject {
68 msg_type,
69 seq_num,
70 tag,
71 reason: parse_reject_reason_to_session_reject_reason(reason),
72 },
73 }
74 }
75}
76
77pub struct DoNotSend {
78 pub gap_fill: bool,
79}
80
81#[derive(Debug)]
82pub(crate) enum InputResponderMsg {
83 Reject {
87 ref_msg_type: FixString,
88 ref_seq_num: SeqNum,
89 reason: SessionRejectReason,
90 text: FixString,
91 ref_tag_id: Option<i64>,
92 },
93 Logout {
94 session_status: Option<SessionStatus>,
95 text: Option<FixString>,
96 disconnect: bool,
97 },
98 Disconnect {
99 reason: Option<String>,
100 },
101}
102
103#[derive(Debug)]
104pub struct InputResponder<'a> {
105 sender: oneshot::Sender<InputResponderMsg>,
106 phantom_ref: PhantomData<&'a ()>,
107}
108
109impl<'a> InputResponder<'a> {
110 pub(crate) fn new(sender: oneshot::Sender<InputResponderMsg>) -> InputResponder<'a> {
111 InputResponder {
112 sender,
113 phantom_ref: PhantomData,
114 }
115 }
116
117 pub fn reject(
118 self,
119 ref_msg_type: FixString,
120 ref_seq_num: SeqNum,
121 reason: SessionRejectReason,
122 text: FixString,
123 ref_tag_id: Option<i64>,
124 ) {
125 self.sender
126 .send(InputResponderMsg::Reject {
127 ref_msg_type,
128 ref_seq_num,
129 reason,
130 text,
131 ref_tag_id,
132 })
133 .unwrap();
134 }
135
136 pub fn logout(
137 self,
138 session_status: Option<SessionStatus>,
139 text: Option<FixString>,
140 disconnect: bool,
141 ) {
142 self.sender
143 .send(InputResponderMsg::Logout {
144 session_status,
145 text,
146 disconnect,
147 })
148 .unwrap();
149 }
150
151 pub fn disconnect(self) {
152 self.sender
153 .send(InputResponderMsg::Disconnect { reason: None })
154 .unwrap();
155 }
156}
157
158#[derive(Debug)]
159pub struct Responder {
160 sender: Option<oneshot::Sender<Box<FixtMessage>>>,
161 change_to_gap_fill: bool,
162}
163
164impl Responder {
165 pub(crate) fn new(sender: oneshot::Sender<Box<FixtMessage>>) -> Responder {
166 Responder {
167 sender: Some(sender),
168 change_to_gap_fill: false,
169 }
170 }
171
172 pub fn do_not_send(&mut self) {
173 self.sender.take();
175 }
176
177 pub fn change_to_gap_fill(&mut self) {
178 self.change_to_gap_fill = true;
179 }
180}
181
182#[derive(Debug)]
183pub(crate) enum FixEventInternal {
184 Created(SessionId),
185 Logon(SessionId, Option<Sender>),
186 Logout(SessionId, DisconnectReason),
187 AppMsgIn(
188 Option<Box<FixtMessage>>,
189 Option<oneshot::Sender<InputResponderMsg>>,
190 ),
191 AdmMsgIn(
192 Option<Box<FixtMessage>>,
193 Option<oneshot::Sender<InputResponderMsg>>,
194 ),
195 AppMsgOut(Option<Box<FixtMessage>>, Responder),
196 AdmMsgOut(Option<Box<FixtMessage>>, Responder),
197 DeserializeError(SessionId, DeserializeError),
198}
199
200impl Drop for FixEventInternal {
201 fn drop(&mut self) {
202 if let FixEventInternal::AppMsgOut(ref mut msg, ref mut responder)
203 | FixEventInternal::AdmMsgOut(ref mut msg, ref mut responder) = self
204 {
205 if let Some(sender) = responder.sender.take() {
206 if responder.change_to_gap_fill {
207 sender.send(msg.take().unwrap()).unwrap();
209 } else {
210 sender.send(msg.take().unwrap()).unwrap();
211 }
212 }
213 }
214 }
215}
216
217#[derive(Debug)]
219pub enum FixEvent<'a> {
220 Created(&'a SessionId),
222
223 Logon(&'a SessionId, Sender),
227
228 Logout(&'a SessionId, DisconnectReason),
230
231 AppMsgIn(Box<FixtMessage>, InputResponder<'a>),
236
237 AdmMsgIn(Box<FixtMessage>, InputResponder<'a>),
242
243 AppMsgOut(&'a mut FixtMessage, &'a mut Responder), AdmMsgOut(&'a mut FixtMessage),
260
261 DeserializeError(&'a SessionId, &'a DeserializeError),
263}
264
265#[derive(Debug)]
266pub struct EventStream {
267 receiver: ReceiverStream<FixEventInternal>,
268}
269
270#[derive(Debug)]
271pub struct Emitter {
272 inner: mpsc::Sender<FixEventInternal>,
273}
274
275impl Clone for Emitter {
276 fn clone(&self) -> Self {
277 Self {
278 inner: self.inner.clone(),
279 }
280 }
281}
282
283impl Emitter {
284 pub(crate) async fn send(&self, event: FixEventInternal) {
285 if let Err(_e) = self.inner.send(event).await {
286 error!("Failed to send msg")
287 }
288 }
289}
290
291pub(crate) fn events_channel() -> (Emitter, EventStream) {
292 let (sender, receiver) = mpsc::channel(16);
293
294 (
295 Emitter { inner: sender },
296 EventStream {
297 receiver: receiver.into(),
298 },
299 )
300}
301
302mod private {
303 pub trait Sealed {}
304
305 impl Sealed for super::FixEventInternal {}
306}
307
308pub trait AsEvent: private::Sealed {
310 fn as_event(&mut self) -> FixEvent<'_>;
311}
312
313impl AsEvent for FixEventInternal {
314 fn as_event(&mut self) -> FixEvent<'_> {
315 match self {
316 FixEventInternal::Created(id) => FixEvent::Created(id),
317 FixEventInternal::Logon(id, sender) => FixEvent::Logon(id, sender.take().unwrap()),
318 FixEventInternal::Logout(id, reason) => FixEvent::Logout(id, *reason),
319 FixEventInternal::AppMsgIn(msg, sender) => FixEvent::AppMsgIn(
320 msg.take().unwrap(),
321 InputResponder::new(sender.take().unwrap()),
322 ),
323 FixEventInternal::AdmMsgIn(msg, sender) => FixEvent::AdmMsgIn(
324 msg.take().unwrap(),
325 InputResponder::new(sender.take().unwrap()),
326 ),
327 FixEventInternal::AppMsgOut(msg, resp) => {
328 FixEvent::AppMsgOut(msg.as_mut().unwrap(), resp)
329 }
330 FixEventInternal::AdmMsgOut(msg, _) => FixEvent::AdmMsgOut(msg.as_mut().unwrap()),
331 FixEventInternal::DeserializeError(session_id, deserialize_error) => {
332 FixEvent::DeserializeError(session_id, deserialize_error)
333 }
334 }
335 }
336}
337
338impl Stream for EventStream {
339 type Item = impl AsEvent;
340
341 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
342 Pin::new(&mut self.receiver).poll_next(cx)
343 }
344
345 fn size_hint(&self) -> (usize, Option<usize>) {
346 self.receiver.size_hint()
347 }
348}