easyfix_session/
lib.rs

1#![feature(impl_trait_in_assoc_type)]
2
3pub mod acceptor;
4pub mod application;
5pub mod initiator;
6pub mod io;
7pub mod messages_storage;
8mod session;
9pub mod session_id;
10mod session_state;
11pub mod settings;
12
13use std::time::Duration;
14
15use easyfix_messages::{
16    fields::{FixString, MsgType, UtcTimestamp},
17    messages::{FixtMessage, Header, Message, Trailer},
18};
19use settings::Settings;
20use tokio::sync::mpsc;
21
22const NO_INBOUND_TIMEOUT_PADDING: Duration = Duration::from_millis(250);
23const TEST_REQUEST_THRESHOLD: f32 = 1.2;
24
25use tracing::error;
26
27#[derive(Debug, thiserror::Error)]
28pub enum SessionError {
29    #[error("Never received logon from new connection.")]
30    LogonNeverReceived,
31    #[error("Message does not point to any session.")]
32    UnknownSession,
33}
34
35#[derive(Debug, thiserror::Error)]
36pub enum Error {
37    #[error("I/O error: {0}")]
38    Io(#[from] std::io::Error),
39    #[error("Session error: {0}")]
40    SessionError(SessionError),
41}
42
43/// Disconnection reasons.
44#[derive(Clone, Copy, Debug)]
45pub enum DisconnectReason {
46    /// Logout requested locally
47    LocalRequestedLogout,
48    /// Logout requested remotely
49    RemoteRequestedLogout,
50    /// Disconnect forced by Application code
51    ApplicationForcedDisconnect,
52    /// Received message without MsgSeqNum
53    MsgSeqNumNotFound,
54    /// Received message with MsgSeqNum too low
55    MsgSeqNumTooLow,
56    /// Invalid logon state
57    InvalidLogonState,
58    /// Invalid COMP ID
59    InvalidCompId,
60    /// Invalid OrigSendingTime
61    InvalidOrigSendingTime,
62    /// Remote side disconnected
63    Disconnected,
64    /// I/O Error
65    IoError,
66    /// Logout timeout
67    LogoutTimeout,
68}
69
70#[derive(Debug)]
71pub(crate) enum SenderMsg {
72    Msg(Box<FixtMessage>),
73    Disconnect(DisconnectReason),
74}
75
76#[derive(Clone, Debug)]
77pub struct Sender {
78    inner: mpsc::UnboundedSender<SenderMsg>,
79}
80
81impl Sender {
82    /// Create new `Sender` instance.
83    pub(crate) fn new(writer: mpsc::UnboundedSender<SenderMsg>) -> Sender {
84        Sender { inner: writer }
85    }
86
87    /// Send FIXT message.
88    ///
89    /// All header and trailer fields can be also adjusted when handing
90    /// `FixEvent::AppMsgOut` and `FixEvent::AdmMsgOut`.
91    ///
92    /// Before serialziation following header fields will be filled:
93    /// - begin_string (if not empty)
94    /// - msg_type
95    /// - sender_comp_id (if not empty)
96    /// - target_comp_id (if not empty)
97    /// - sending_time (if eq UtcTimestamp::MIN_UTC)
98    /// - msg_seq_num (if eq 0)
99    ///
100    /// The checksum(10) field value is always ignored - it is computed and set
101    /// after serialziation.
102    pub fn send_raw(&self, msg: Box<FixtMessage>) -> Result<(), Box<FixtMessage>> {
103        if let Err(msg) = self.inner.send(SenderMsg::Msg(msg)) {
104            match msg.0 {
105                SenderMsg::Msg(msg) => {
106                    error!(
107                        "failed to send {:?}<{}> message, receiver closed or dropped",
108                        msg.msg_type(),
109                        msg.msg_type().as_fix_str()
110                    );
111                    Err(msg)
112                }
113                SenderMsg::Disconnect(_) => unreachable!(),
114            }
115        } else {
116            Ok(())
117        }
118    }
119
120    /// Send FIX message.
121    ///
122    /// FIXT message will be constructed internally using default values
123    /// for Header and Trailer.
124    ///
125    /// All header and trailer fields can be also adjusted when handing
126    /// `FixEvent::AppMsgOut` and `FixEvent::AdmMsgOut`.
127    pub fn send(&self, msg: Box<Message>) -> Result<(), Box<FixtMessage>> {
128        let msg = Box::new(FixtMessage {
129            header: Box::new(new_header(msg.msg_type())),
130            body: msg,
131            trailer: Box::new(new_trailer()),
132        });
133        self.send_raw(msg)
134    }
135
136    /// Send disconnect message.
137    ///
138    /// Output stream will close output queue so no more message can be send
139    /// after this one.
140    pub(crate) fn disconnect(&self, reason: DisconnectReason) {
141        if self.inner.send(SenderMsg::Disconnect(reason)).is_err() {
142            error!("failed to disconnect, receiver closed or dropped");
143        }
144    }
145}
146
147pub fn new_header(msg_type: MsgType) -> Header {
148    // XXX: all required fields overwritten before serialization (if not set)
149    Header {
150        begin_string: FixString::new(),
151        msg_type,
152        sending_time: UtcTimestamp::MIN_UTC,
153        ..Default::default()
154    }
155}
156
157pub fn new_trailer() -> Trailer {
158    // XXX: all required fields overwritten before serialization
159    Trailer::default()
160}