easyfix_session/
application.rs

1use std::{
2    fmt,
3    marker::PhantomData,
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8use easyfix_messages::{
9    deserializer,
10    fields::{
11        parse_reject_reason_to_session_reject_reason, FixString, SeqNum, SessionRejectReason,
12        SessionStatus, TagNum,
13    },
14    messages::FixtMessage,
15};
16use futures::Stream;
17use tokio::sync::{mpsc, oneshot};
18use tokio_stream::wrappers::ReceiverStream;
19use tracing::error;
20
21use crate::{session_id::SessionId, DisconnectReason, Sender};
22
23//
24#[derive(Debug)]
25pub enum DeserializeError {
26    // TODO: enum maybe?
27    GarbledMessage(String),
28    Logout,
29    Reject {
30        msg_type: Option<FixString>,
31        seq_num: SeqNum,
32        tag: Option<TagNum>,
33        reason: SessionRejectReason,
34    },
35}
36
37impl fmt::Display for DeserializeError {
38    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39        match self {
40            DeserializeError::GarbledMessage(reason) => write!(f, "garbled message: {}", reason),
41            DeserializeError::Logout => write!(f, "MsgSeqNum missing"),
42            DeserializeError::Reject {
43                tag: Some(tag),
44                reason,
45                ..
46            } => write!(f, "{reason:?} (tag={tag})"),
47            DeserializeError::Reject {
48                tag: None, reason, ..
49            } => write!(f, "{reason:?}"),
50        }
51    }
52}
53
54impl std::error::Error for DeserializeError {}
55
56impl From<deserializer::DeserializeError> for DeserializeError {
57    fn from(error: deserializer::DeserializeError) -> Self {
58        use deserializer::DeserializeError as DeError;
59        match error {
60            DeError::GarbledMessage(reason) => DeserializeError::GarbledMessage(reason),
61            DeError::Logout => DeserializeError::Logout,
62            DeError::Reject {
63                msg_type,
64                seq_num,
65                tag,
66                reason,
67            } => DeserializeError::Reject {
68                msg_type,
69                seq_num,
70                tag,
71                reason: parse_reject_reason_to_session_reject_reason(reason),
72            },
73        }
74    }
75}
76
77pub struct DoNotSend {
78    pub gap_fill: bool,
79}
80
81#[derive(Debug)]
82pub(crate) enum InputResponderMsg {
83    // RejectLogon {
84    //     reason: Option<String>,
85    // },
86    Reject {
87        ref_msg_type: FixString,
88        ref_seq_num: SeqNum,
89        reason: SessionRejectReason,
90        text: FixString,
91        ref_tag_id: Option<i64>,
92    },
93    Logout {
94        session_status: Option<SessionStatus>,
95        text: Option<FixString>,
96        disconnect: bool,
97    },
98    Disconnect {
99        reason: Option<String>,
100    },
101}
102
103#[derive(Debug)]
104pub struct InputResponder<'a> {
105    sender: oneshot::Sender<InputResponderMsg>,
106    phantom_ref: PhantomData<&'a ()>,
107}
108
109impl<'a> InputResponder<'a> {
110    pub(crate) fn new(sender: oneshot::Sender<InputResponderMsg>) -> InputResponder<'a> {
111        InputResponder {
112            sender,
113            phantom_ref: PhantomData,
114        }
115    }
116
117    pub fn reject(
118        self,
119        ref_msg_type: FixString,
120        ref_seq_num: SeqNum,
121        reason: SessionRejectReason,
122        text: FixString,
123        ref_tag_id: Option<i64>,
124    ) {
125        self.sender
126            .send(InputResponderMsg::Reject {
127                ref_msg_type,
128                ref_seq_num,
129                reason,
130                text,
131                ref_tag_id,
132            })
133            .unwrap();
134    }
135
136    pub fn logout(
137        self,
138        session_status: Option<SessionStatus>,
139        text: Option<FixString>,
140        disconnect: bool,
141    ) {
142        self.sender
143            .send(InputResponderMsg::Logout {
144                session_status,
145                text,
146                disconnect,
147            })
148            .unwrap();
149    }
150
151    pub fn disconnect(self) {
152        self.sender
153            .send(InputResponderMsg::Disconnect { reason: None })
154            .unwrap();
155    }
156}
157
158#[derive(Debug)]
159pub struct Responder {
160    sender: Option<oneshot::Sender<Box<FixtMessage>>>,
161    change_to_gap_fill: bool,
162}
163
164impl Responder {
165    pub(crate) fn new(sender: oneshot::Sender<Box<FixtMessage>>) -> Responder {
166        Responder {
167            sender: Some(sender),
168            change_to_gap_fill: false,
169        }
170    }
171
172    pub fn do_not_send(&mut self) {
173        // Sender is `Option::None` now so message can't be send back
174        self.sender.take();
175    }
176
177    pub fn change_to_gap_fill(&mut self) {
178        self.change_to_gap_fill = true;
179    }
180}
181
182#[derive(Debug)]
183pub(crate) enum FixEventInternal {
184    Created(SessionId),
185    Logon(SessionId, Option<Sender>),
186    Logout(SessionId, DisconnectReason),
187    AppMsgIn(
188        Option<Box<FixtMessage>>,
189        Option<oneshot::Sender<InputResponderMsg>>,
190    ),
191    AdmMsgIn(
192        Option<Box<FixtMessage>>,
193        Option<oneshot::Sender<InputResponderMsg>>,
194    ),
195    AppMsgOut(Option<Box<FixtMessage>>, Responder),
196    AdmMsgOut(Option<Box<FixtMessage>>, Responder),
197    DeserializeError(SessionId, DeserializeError),
198}
199
200impl Drop for FixEventInternal {
201    fn drop(&mut self) {
202        if let FixEventInternal::AppMsgOut(ref mut msg, ref mut responder)
203        | FixEventInternal::AdmMsgOut(ref mut msg, ref mut responder) = self
204        {
205            if let Some(sender) = responder.sender.take() {
206                if responder.change_to_gap_fill {
207                    // TODO: GapFill HERE!
208                    sender.send(msg.take().unwrap()).unwrap();
209                } else {
210                    sender.send(msg.take().unwrap()).unwrap();
211                }
212            }
213        }
214    }
215}
216
217/// FIX protolol events.
218#[derive(Debug)]
219pub enum FixEvent<'a> {
220    /// Session created.
221    Created(&'a SessionId),
222
223    /// Successfull Logon<A> messages exchange.
224    ///
225    /// Use `Sender` to send messages to connected peer.
226    Logon(&'a SessionId, Sender),
227
228    /// Session disconnected.
229    Logout(&'a SessionId, DisconnectReason),
230
231    /// New application message received.
232    ///
233    /// Use `InputResponder` to reject the message or to force logut or
234    /// disconnection.
235    AppMsgIn(Box<FixtMessage>, InputResponder<'a>),
236
237    /// New administration message received.
238    ///
239    /// Use `InputResponder` to reject the message or to force logut or
240    /// disconnection.
241    AdmMsgIn(Box<FixtMessage>, InputResponder<'a>),
242
243    /// Application message is ready to be send.
244    ///
245    /// Use `Responder` to change the message to GapFill or to discard it.
246    ///
247    /// This event may happen after session disconnection when output queue
248    /// still has messages to send. In such case all messages will be stored
249    /// and will be available thorough ResendRequest<2>.
250    AppMsgOut(&'a mut FixtMessage, &'a mut Responder), // TODO: Try pass by value but bind named
251
252    /// Administration message is ready to be send.
253    ///
254    /// Use `Responder` to change the message to GapFill or to discard it.
255    ///
256    /// This event may happen after session disconnection when output queue
257    /// still has messages to send. In such case all messages will be stored
258    /// and will be available thorough ResendRequest<2>.
259    AdmMsgOut(&'a mut FixtMessage),
260
261    /// Failed to deserialize input message.
262    DeserializeError(&'a SessionId, &'a DeserializeError),
263}
264
265#[derive(Debug)]
266pub struct EventStream {
267    receiver: ReceiverStream<FixEventInternal>,
268}
269
270#[derive(Debug)]
271pub struct Emitter {
272    inner: mpsc::Sender<FixEventInternal>,
273}
274
275impl Clone for Emitter {
276    fn clone(&self) -> Self {
277        Self {
278            inner: self.inner.clone(),
279        }
280    }
281}
282
283impl Emitter {
284    pub(crate) async fn send(&self, event: FixEventInternal) {
285        if let Err(_e) = self.inner.send(event).await {
286            error!("Failed to send msg")
287        }
288    }
289}
290
291pub(crate) fn events_channel() -> (Emitter, EventStream) {
292    let (sender, receiver) = mpsc::channel(16);
293
294    (
295        Emitter { inner: sender },
296        EventStream {
297            receiver: receiver.into(),
298        },
299    )
300}
301
302mod private {
303    pub trait Sealed {}
304
305    impl Sealed for super::FixEventInternal {}
306}
307
308/// This trait is sealed and not meant to be implemented outside of the current crate.
309pub trait AsEvent: private::Sealed {
310    fn as_event(&mut self) -> FixEvent<'_>;
311}
312
313impl AsEvent for FixEventInternal {
314    fn as_event(&mut self) -> FixEvent<'_> {
315        match self {
316            FixEventInternal::Created(id) => FixEvent::Created(id),
317            FixEventInternal::Logon(id, sender) => FixEvent::Logon(id, sender.take().unwrap()),
318            FixEventInternal::Logout(id, reason) => FixEvent::Logout(id, *reason),
319            FixEventInternal::AppMsgIn(msg, sender) => FixEvent::AppMsgIn(
320                msg.take().unwrap(),
321                InputResponder::new(sender.take().unwrap()),
322            ),
323            FixEventInternal::AdmMsgIn(msg, sender) => FixEvent::AdmMsgIn(
324                msg.take().unwrap(),
325                InputResponder::new(sender.take().unwrap()),
326            ),
327            FixEventInternal::AppMsgOut(msg, resp) => {
328                FixEvent::AppMsgOut(msg.as_mut().unwrap(), resp)
329            }
330            FixEventInternal::AdmMsgOut(msg, _) => FixEvent::AdmMsgOut(msg.as_mut().unwrap()),
331            FixEventInternal::DeserializeError(session_id, deserialize_error) => {
332                FixEvent::DeserializeError(session_id, deserialize_error)
333            }
334        }
335    }
336}
337
338impl Stream for EventStream {
339    type Item = impl AsEvent;
340
341    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
342        Pin::new(&mut self.receiver).poll_next(cx)
343    }
344
345    fn size_hint(&self) -> (usize, Option<usize>) {
346        self.receiver.size_hint()
347    }
348}