easyfix_session/
lib.rs

1#![feature(impl_trait_in_assoc_type)]
2
3pub mod acceptor;
4pub mod application;
5pub mod initiator;
6pub mod io;
7pub mod messages_storage;
8mod session;
9pub mod session_id;
10mod session_state;
11pub mod settings;
12
13use std::time::Duration;
14
15use easyfix_messages::{
16    fields::{FixString, MsgType, UtcTimestamp},
17    messages::{FixtMessage, Header, Message, Trailer},
18};
19use settings::Settings;
20use tokio::sync::mpsc;
21
22const NO_INBOUND_TIMEOUT_PADDING: Duration = Duration::from_millis(250);
23const TEST_REQUEST_THRESHOLD: f32 = 1.2;
24
25use tracing::error;
26
27#[derive(Debug, thiserror::Error)]
28pub enum SessionError {
29    #[error("Never received logon from new connection.")]
30    LogonNeverReceived,
31    #[error("Message does not point to any session.")]
32    UnknownSession,
33}
34
35#[derive(Debug, thiserror::Error)]
36pub enum Error {
37    #[error("I/O error: {0}")]
38    Io(#[from] std::io::Error),
39    #[error("Session error: {0}")]
40    SessionError(SessionError),
41}
42
43/// Disconnection reasons.
44#[derive(Clone, Copy, Debug)]
45pub enum DisconnectReason {
46    /// Logout requested locally
47    LocalRequestedLogout,
48    /// Logout requested remotely
49    RemoteRequestedLogout,
50    /// Disconnect forced by Application code
51    ApplicationForcedDisconnect,
52    /// Received message without MsgSeqNum
53    MsgSeqNumNotFound,
54    /// Received message with MsgSeqNum too low
55    MsgSeqNumTooLow,
56    /// Invalid logon state
57    InvalidLogonState,
58    /// Remote side disconnected
59    Disconnected,
60    /// I/O Error
61    IoError,
62}
63
64#[derive(Debug)]
65pub(crate) enum SenderMsg {
66    Msg(Box<FixtMessage>),
67    Disconnect(DisconnectReason),
68}
69
70#[derive(Clone, Debug)]
71pub struct Sender {
72    inner: mpsc::UnboundedSender<SenderMsg>,
73}
74
75impl Sender {
76    /// Create new `Sender` instance.
77    pub(crate) fn new(writer: mpsc::UnboundedSender<SenderMsg>) -> Sender {
78        Sender { inner: writer }
79    }
80
81    /// Send FIXT message.
82    ///
83    /// All header and trailer fields can be also adjusted when handing
84    /// `FixEvent::AppMsgOut` and `FixEvent::AdmMsgOut`.
85    ///
86    /// Before serialziation following header fields will be filled:
87    /// - begin_string (if not empty)
88    /// - msg_type
89    /// - sender_comp_id (if not empty)
90    /// - target_comp_id (if not empty)
91    /// - sending_time (if eq UtcTimestamp::MIN_UTC)
92    /// - msg_seq_num (if eq 0)
93    ///
94    /// The checksum(10) field value is always ignored - it is computed and set
95    /// after serialziation.
96    pub fn send_raw(&self, msg: Box<FixtMessage>) -> Result<(), Box<FixtMessage>> {
97        if let Err(msg) = self.inner.send(SenderMsg::Msg(msg)) {
98            match msg.0 {
99                SenderMsg::Msg(msg) => {
100                    error!(
101                        "failed to send {:?}<{}> message, receiver closed or dropped",
102                        msg.msg_type(),
103                        msg.msg_type().as_fix_str()
104                    );
105                    Err(msg)
106                }
107                SenderMsg::Disconnect(_) => unreachable!(),
108            }
109        } else {
110            Ok(())
111        }
112    }
113
114    /// Send FIX message.
115    ///
116    /// FIXT message will be constructed internally using default values
117    /// for Header and Trailer.
118    ///
119    /// All header and trailer fields can be also adjusted when handing
120    /// `FixEvent::AppMsgOut` and `FixEvent::AdmMsgOut`.
121    pub fn send(&self, msg: Box<Message>) -> Result<(), Box<FixtMessage>> {
122        let msg = Box::new(FixtMessage {
123            header: Box::new(new_header(msg.msg_type())),
124            body: msg,
125            trailer: Box::new(new_trailer()),
126        });
127        self.send_raw(msg)
128    }
129
130    /// Send disconnect message.
131    ///
132    /// Output stream will close output queue so no more message can be send
133    /// after this one.
134    pub(crate) fn disconnect(&self, reason: DisconnectReason) {
135        if self.inner.send(SenderMsg::Disconnect(reason)).is_err() {
136            error!("failed to disconnect, receiver closed or dropped");
137        }
138    }
139}
140
141pub fn new_header(msg_type: MsgType) -> Header {
142    // XXX: all required fields overwritten before serialization (if not set)
143    Header {
144        begin_string: FixString::new(),
145        msg_type,
146        sending_time: UtcTimestamp::MIN_UTC,
147        ..Default::default()
148    }
149}
150
151pub fn new_trailer() -> Trailer {
152    // XXX: all required fields overwritten before serialization
153    Trailer::default()
154}