easyfix_session/
application.rs

1use std::{
2    fmt,
3    marker::PhantomData,
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8use easyfix_messages::{
9    deserializer,
10    fields::{
11        FixString, SeqNum, SessionRejectReason, SessionStatus, TagNum,
12        parse_reject_reason_to_session_reject_reason,
13    },
14    messages::FixtMessage,
15};
16use futures::Stream;
17use tokio::sync::{mpsc, oneshot};
18use tokio_stream::wrappers::ReceiverStream;
19use tracing::error;
20
21use crate::{DisconnectReason, Sender, session_id::SessionId};
22
23//
24#[derive(Debug)]
25pub enum DeserializeError {
26    // TODO: enum maybe?
27    GarbledMessage(String),
28    Logout,
29    Reject {
30        msg_type: Option<FixString>,
31        seq_num: SeqNum,
32        tag: Option<TagNum>,
33        reason: SessionRejectReason,
34    },
35}
36
37impl fmt::Display for DeserializeError {
38    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39        match self {
40            DeserializeError::GarbledMessage(reason) => write!(f, "garbled message: {}", reason),
41            DeserializeError::Logout => write!(f, "MsgSeqNum missing"),
42            DeserializeError::Reject {
43                tag: Some(tag),
44                reason,
45                ..
46            } => write!(f, "{reason:?} (tag={tag})"),
47            DeserializeError::Reject {
48                tag: None, reason, ..
49            } => write!(f, "{reason:?}"),
50        }
51    }
52}
53
54impl std::error::Error for DeserializeError {}
55
56impl From<deserializer::DeserializeError> for DeserializeError {
57    fn from(error: deserializer::DeserializeError) -> Self {
58        use deserializer::DeserializeError as DeError;
59        match error {
60            DeError::GarbledMessage(reason) => DeserializeError::GarbledMessage(reason),
61            DeError::Logout => DeserializeError::Logout,
62            DeError::Reject {
63                msg_type,
64                seq_num,
65                tag,
66                reason,
67            } => DeserializeError::Reject {
68                msg_type,
69                seq_num,
70                tag,
71                reason: parse_reject_reason_to_session_reject_reason(reason),
72            },
73        }
74    }
75}
76
77pub struct DoNotSend {
78    pub gap_fill: bool,
79}
80
81#[derive(Debug)]
82pub(crate) enum InputResponderMsg {
83    Ignore,
84    Reject {
85        ref_msg_type: FixString,
86        ref_seq_num: SeqNum,
87        reason: SessionRejectReason,
88        text: FixString,
89        ref_tag_id: Option<i64>,
90    },
91    Logout {
92        session_status: Option<SessionStatus>,
93        text: Option<FixString>,
94        disconnect: bool,
95    },
96    Disconnect {
97        reason: Option<String>,
98    },
99}
100
101#[derive(Debug)]
102pub struct InputResponder<'a> {
103    sender: oneshot::Sender<InputResponderMsg>,
104    phantom_ref: PhantomData<&'a ()>,
105}
106
107impl<'a> InputResponder<'a> {
108    pub(crate) fn new(sender: oneshot::Sender<InputResponderMsg>) -> InputResponder<'a> {
109        InputResponder {
110            sender,
111            phantom_ref: PhantomData,
112        }
113    }
114
115    pub fn ignore(self) {
116        self.sender.send(InputResponderMsg::Ignore).unwrap();
117    }
118
119    pub fn reject(
120        self,
121        ref_msg_type: FixString,
122        ref_seq_num: SeqNum,
123        reason: SessionRejectReason,
124        text: FixString,
125        ref_tag_id: Option<i64>,
126    ) {
127        self.sender
128            .send(InputResponderMsg::Reject {
129                ref_msg_type,
130                ref_seq_num,
131                reason,
132                text,
133                ref_tag_id,
134            })
135            .unwrap();
136    }
137
138    pub fn logout(
139        self,
140        session_status: Option<SessionStatus>,
141        text: Option<FixString>,
142        disconnect: bool,
143    ) {
144        self.sender
145            .send(InputResponderMsg::Logout {
146                session_status,
147                text,
148                disconnect,
149            })
150            .unwrap();
151    }
152
153    pub fn disconnect(self) {
154        self.sender
155            .send(InputResponderMsg::Disconnect { reason: None })
156            .unwrap();
157    }
158}
159
160#[derive(Debug)]
161pub struct Responder {
162    sender: Option<oneshot::Sender<Box<FixtMessage>>>,
163    change_to_gap_fill: bool,
164}
165
166impl Responder {
167    pub(crate) fn new(sender: oneshot::Sender<Box<FixtMessage>>) -> Responder {
168        Responder {
169            sender: Some(sender),
170            change_to_gap_fill: false,
171        }
172    }
173
174    pub fn do_not_send(&mut self) {
175        // Sender is `Option::None` now so message can't be send back
176        self.sender.take();
177    }
178
179    pub fn change_to_gap_fill(&mut self) {
180        self.change_to_gap_fill = true;
181    }
182}
183
184#[derive(Debug)]
185pub(crate) enum FixEventInternal {
186    Created(SessionId),
187    Logon(SessionId, Option<Sender>),
188    Logout(SessionId, DisconnectReason),
189    AppMsgIn(
190        Option<Box<FixtMessage>>,
191        Option<oneshot::Sender<InputResponderMsg>>,
192    ),
193    AdmMsgIn(
194        Option<Box<FixtMessage>>,
195        Option<oneshot::Sender<InputResponderMsg>>,
196    ),
197    AppMsgOut(Option<Box<FixtMessage>>, Responder),
198    AdmMsgOut(Option<Box<FixtMessage>>, Responder),
199    DeserializeError(SessionId, DeserializeError),
200}
201
202impl Drop for FixEventInternal {
203    fn drop(&mut self) {
204        if let &mut FixEventInternal::AppMsgOut(ref mut msg, ref mut responder)
205        | &mut FixEventInternal::AdmMsgOut(ref mut msg, ref mut responder) = self
206            && let Some(sender) = responder.sender.take()
207        {
208            if responder.change_to_gap_fill {
209                // TODO: GapFill HERE!
210                sender.send(msg.take().unwrap()).unwrap();
211            } else {
212                sender.send(msg.take().unwrap()).unwrap();
213            }
214        }
215    }
216}
217
218/// FIX protolol events.
219#[derive(Debug)]
220pub enum FixEvent<'a> {
221    /// Session created.
222    Created(&'a SessionId),
223
224    /// Successfull Logon<A> messages exchange.
225    ///
226    /// Use `Sender` to send messages to connected peer.
227    Logon(&'a SessionId, Sender),
228
229    /// Session disconnected.
230    Logout(&'a SessionId, DisconnectReason),
231
232    /// New application message received.
233    ///
234    /// Use `InputResponder` to reject the message or to force logut or
235    /// disconnection.
236    AppMsgIn(Box<FixtMessage>, InputResponder<'a>),
237
238    /// New administration message received.
239    ///
240    /// Use `InputResponder` to reject the message or to force logut or
241    /// disconnection.
242    AdmMsgIn(Box<FixtMessage>, InputResponder<'a>),
243
244    /// Application message is ready to be send.
245    ///
246    /// Use `Responder` to change the message to GapFill or to discard it.
247    ///
248    /// This event may happen after session disconnection when output queue
249    /// still has messages to send. In such case all messages will be stored
250    /// and will be available thorough ResendRequest<2>.
251    AppMsgOut(&'a mut FixtMessage, &'a mut Responder), // TODO: Try pass by value but bind named
252
253    /// Administration message is ready to be send.
254    ///
255    /// Use `Responder` to change the message to GapFill or to discard it.
256    ///
257    /// This event may happen after session disconnection when output queue
258    /// still has messages to send. In such case all messages will be stored
259    /// and will be available thorough ResendRequest<2>.
260    AdmMsgOut(&'a mut FixtMessage),
261
262    /// Failed to deserialize input message.
263    DeserializeError(&'a SessionId, &'a DeserializeError),
264}
265
266#[derive(Debug)]
267pub struct EventStream {
268    receiver: ReceiverStream<FixEventInternal>,
269}
270
271#[derive(Debug)]
272pub struct Emitter {
273    inner: mpsc::Sender<FixEventInternal>,
274}
275
276impl Clone for Emitter {
277    fn clone(&self) -> Self {
278        Self {
279            inner: self.inner.clone(),
280        }
281    }
282}
283
284impl Emitter {
285    pub(crate) async fn send(&self, event: FixEventInternal) {
286        if let Err(_e) = self.inner.send(event).await {
287            error!("Failed to send msg")
288        }
289    }
290}
291
292pub(crate) fn events_channel() -> (Emitter, EventStream) {
293    let (sender, receiver) = mpsc::channel(16);
294
295    (
296        Emitter { inner: sender },
297        EventStream {
298            receiver: receiver.into(),
299        },
300    )
301}
302
303mod private {
304    pub trait Sealed {}
305
306    impl Sealed for super::FixEventInternal {}
307}
308
309/// This trait is sealed and not meant to be implemented outside of the current crate.
310pub trait AsEvent: private::Sealed {
311    fn as_event(&mut self) -> FixEvent<'_>;
312}
313
314impl AsEvent for FixEventInternal {
315    fn as_event(&mut self) -> FixEvent<'_> {
316        match self {
317            FixEventInternal::Created(id) => FixEvent::Created(id),
318            FixEventInternal::Logon(id, sender) => FixEvent::Logon(id, sender.take().unwrap()),
319            FixEventInternal::Logout(id, reason) => FixEvent::Logout(id, *reason),
320            FixEventInternal::AppMsgIn(msg, sender) => FixEvent::AppMsgIn(
321                msg.take().unwrap(),
322                InputResponder::new(sender.take().unwrap()),
323            ),
324            FixEventInternal::AdmMsgIn(msg, sender) => FixEvent::AdmMsgIn(
325                msg.take().unwrap(),
326                InputResponder::new(sender.take().unwrap()),
327            ),
328            FixEventInternal::AppMsgOut(msg, resp) => {
329                FixEvent::AppMsgOut(msg.as_mut().unwrap(), resp)
330            }
331            FixEventInternal::AdmMsgOut(msg, _) => FixEvent::AdmMsgOut(msg.as_mut().unwrap()),
332            FixEventInternal::DeserializeError(session_id, deserialize_error) => {
333                FixEvent::DeserializeError(session_id, deserialize_error)
334            }
335        }
336    }
337}
338
339impl Stream for EventStream {
340    type Item = impl AsEvent;
341
342    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
343        Pin::new(&mut self.receiver).poll_next(cx)
344    }
345
346    fn size_hint(&self) -> (usize, Option<usize>) {
347        self.receiver.size_hint()
348    }
349}