1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
#![feature(impl_trait_in_assoc_type)]
#![feature(type_alias_impl_trait)]
pub mod acceptor;
pub mod application;
mod connection;
pub mod initiator;
pub mod messages_storage;
mod session;
pub mod session_id;
mod session_state;
pub mod settings;
use std::{io, time::Duration};
use easyfix_messages::{
fields::{FixString, MsgType, UtcTimestamp},
messages::{FixtMessage, Header, Message, Trailer},
};
use settings::Settings;
use tokio::sync::mpsc;
const NO_INBOUND_TIMEOUT_PADDING: Duration = Duration::from_millis(250);
pub use connection::{send, send_raw, sender};
use tracing::error;
#[derive(Debug, thiserror::Error)]
pub enum SessionError {
#[error("Never received logon from new connection.")]
LogonNeverReceived,
#[error("Message does not point to any session.")]
UnknownSession,
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("I/O error: {0}")]
Io(#[from] io::Error),
#[error("Session error: {0}")]
SessionError(SessionError),
}
/// Disconnection reasons.
#[derive(Clone, Copy, Debug)]
pub enum DisconnectReason {
/// Logout requested locally
LocalRequestedLogout,
/// Logout requested remotely
RemoteRequestedLogout,
/// Disconnect forced by User code
UserForcedDisconnect,
/// Received message without MsgSeqNum
MsgSeqNumNotFound,
/// Received message with MsgSeqNum too low
MsgSeqNumTooLow,
/// Invalid logon state
InvalidLogonState,
/// Remote side disconnected
Disconnected,
/// I/O Error
IoError,
}
#[derive(Debug)]
pub(crate) enum SenderMsg {
Msg(Box<FixtMessage>),
Disconnect(DisconnectReason),
}
#[derive(Clone, Debug)]
pub struct Sender {
inner: mpsc::UnboundedSender<SenderMsg>,
}
impl Sender {
/// Create new `Sender` instance.
pub(crate) fn new(writer: mpsc::UnboundedSender<SenderMsg>) -> Sender {
Sender { inner: writer }
}
/// Send FIXT message.
///
/// All header and trailer fields can be also adjusted when handing
/// `FixEvent::AppMsgOut` and `FixEvent::AdmMsgOut`.
///
/// Before serialziation following header fields will be filled:
/// - begin_string (if not empty)
/// - msg_type
/// - sender_comp_id (if not empty)
/// - target_comp_id (if not empty)
/// - sending_time (if eq UtcTimestamp::MIN_UTC)
/// - msg_seq_num (if eq 0)
///
/// The checksum(10) field value is always ignored - it is computed and set
/// after serialziation.
pub fn send_raw(&self, msg: Box<FixtMessage>) -> Result<(), Box<FixtMessage>> {
if let Err(msg) = self.inner.send(SenderMsg::Msg(msg)) {
match msg.0 {
SenderMsg::Msg(msg) => {
error!(
"failed to send {:?}<{}> message, receiver closed or dropped",
msg.msg_type(),
msg.msg_type().as_fix_str()
);
Err(msg)
}
SenderMsg::Disconnect(_) => unreachable!(),
}
} else {
Ok(())
}
}
/// Send FIX message.
///
/// FIXT message will be constructed internally using default values
/// for Header and Trailer.
///
/// All header and trailer fields can be also adjusted when handing
/// `FixEvent::AppMsgOut` and `FixEvent::AdmMsgOut`.
pub fn send(&self, msg: Box<Message>) -> Result<(), Box<FixtMessage>> {
let msg = Box::new(FixtMessage {
header: Box::new(new_header(msg.msg_type())),
body: msg,
trailer: Box::new(new_trailer()),
});
self.send_raw(msg)
}
/// Send disconnect message.
///
/// Output stream will close output queue so no more message can be send
/// after this one.
pub(crate) fn disconnect(&self, reason: DisconnectReason) {
if self.inner.send(SenderMsg::Disconnect(reason)).is_err() {
error!("failed to disconnect, receiver closed or dropped");
}
}
}
pub fn new_header(msg_type: MsgType) -> Header {
// XXX: all required fields overwritten before serialization (if not set)
Header {
begin_string: FixString::new(),
msg_type,
sending_time: UtcTimestamp::MIN_UTC,
..Default::default()
}
}
pub fn new_trailer() -> Trailer {
// XXX: all required fields overwritten before serialization
Trailer::default()
}