1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#![feature(impl_trait_in_assoc_type)]
#![feature(type_alias_impl_trait)]

pub mod acceptor;
pub mod application;
mod connection;
pub mod initiator;
pub mod messages_storage;
mod session;
pub mod session_id;
mod session_state;
pub mod settings;

use std::{io, time::Duration};

use easyfix_messages::{
    fields::{FixString, MsgType, UtcTimestamp},
    messages::{FixtMessage, Header, Message, Trailer},
};
use settings::Settings;
use tokio::sync::mpsc;

const NO_INBOUND_TIMEOUT_PADDING: Duration = Duration::from_millis(250);

pub use connection::{send, send_raw, sender};
use tracing::error;

#[derive(Debug, thiserror::Error)]
pub enum SessionError {
    #[error("Never received logon from new connection.")]
    LogonNeverReceived,
    #[error("Message does not point to any session.")]
    UnknownSession,
}

#[derive(Debug, thiserror::Error)]
pub enum Error {
    #[error("I/O error: {0}")]
    Io(#[from] io::Error),
    #[error("Session error: {0}")]
    SessionError(SessionError),
}

/// Disconnection reasons.
#[derive(Clone, Copy, Debug)]
pub enum DisconnectReason {
    /// Logout requested locally
    LocalRequestedLogout,
    /// Logout requested remotely
    RemoteRequestedLogout,
    /// Disconnect forced by User code
    UserForcedDisconnect,
    /// Received message without MsgSeqNum
    MsgSeqNumNotFound,
    /// Received message with MsgSeqNum too low
    MsgSeqNumTooLow,
    /// Invalid logon state
    InvalidLogonState,
    /// Remote side disconnected
    Disconnected,
    /// I/O Error
    IoError,
}

#[derive(Debug)]
pub(crate) enum SenderMsg {
    Msg(Box<FixtMessage>),
    Disconnect(DisconnectReason),
}

#[derive(Clone, Debug)]
pub struct Sender {
    inner: mpsc::UnboundedSender<SenderMsg>,
}

impl Sender {
    /// Create new `Sender` instance.
    pub(crate) fn new(writer: mpsc::UnboundedSender<SenderMsg>) -> Sender {
        Sender { inner: writer }
    }

    /// Send FIXT message.
    ///
    /// All header and trailer fields can be also adjusted when handing
    /// `FixEvent::AppMsgOut` and `FixEvent::AdmMsgOut`.
    ///
    /// Before serialziation following header fields will be filled:
    /// - begin_string (if not empty)
    /// - msg_type
    /// - sender_comp_id (if not empty)
    /// - target_comp_id (if not empty)
    /// - sending_time (if eq UtcTimestamp::MIN_UTC)
    /// - msg_seq_num (if eq 0)
    ///
    /// The checksum(10) field value is always ignored - it is computed and set
    /// after serialziation.
    pub fn send_raw(&self, msg: Box<FixtMessage>) -> Result<(), Box<FixtMessage>> {
        if let Err(msg) = self.inner.send(SenderMsg::Msg(msg)) {
            match msg.0 {
                SenderMsg::Msg(msg) => {
                    error!(
                        "failed to send {:?}<{}> message, receiver closed or dropped",
                        msg.msg_type(),
                        msg.msg_type().as_fix_str()
                    );
                    Err(msg)
                }
                SenderMsg::Disconnect(_) => unreachable!(),
            }
        } else {
            Ok(())
        }
    }

    /// Send FIX message.
    ///
    /// FIXT message will be constructed internally using default values
    /// for Header and Trailer.
    ///
    /// All header and trailer fields can be also adjusted when handing
    /// `FixEvent::AppMsgOut` and `FixEvent::AdmMsgOut`.
    pub fn send(&self, msg: Box<Message>) -> Result<(), Box<FixtMessage>> {
        let msg = Box::new(FixtMessage {
            header: Box::new(new_header(msg.msg_type())),
            body: msg,
            trailer: Box::new(new_trailer()),
        });
        self.send_raw(msg)
    }

    /// Send disconnect message.
    ///
    /// Output stream will close output queue so no more message can be send
    /// after this one.
    pub(crate) fn disconnect(&self, reason: DisconnectReason) {
        if self.inner.send(SenderMsg::Disconnect(reason)).is_err() {
            error!("failed to disconnect, receiver closed or dropped");
        }
    }
}

pub fn new_header(msg_type: MsgType) -> Header {
    // XXX: all required fields overwritten before serialization (if not set)
    Header {
        begin_string: FixString::new(),
        msg_type,
        sending_time: UtcTimestamp::MIN_UTC,
        ..Default::default()
    }
}

pub fn new_trailer() -> Trailer {
    // XXX: all required fields overwritten before serialization
    Trailer::default()
}