#![allow(deprecated)]
use mio::{Events,Poll,PollOpt,Ready,Token};
use mio::channel::{channel,Receiver,Sender};
use std::collections::HashMap;
use mio::tcp::TcpListener;
use std::fmt;
use std::io;
use std::mem;
use std::ops::Range;
use std::net::{SocketAddr,ToSocketAddrs};
use std::sync::{Arc,Mutex};
use std::sync::mpsc::TryRecvError;
use std::thread;
use std::time::{Duration,Instant};
use dictionary::messages::Logon;
use fixt::engine_thread::{CONNECTION_COUNT_MAX,BASE_CONNECTION_TOKEN,INTERNAL_ENGINE_EVENT_TOKEN,InternalEngineToThreadEvent,internal_engine_thread};
use fixt::message::{BuildFIXTMessage,FIXTMessage,debug_format_fixt_message};
use fix::ParseError;
use fix_version::FIXVersion;
use message_version::MessageVersion;
use token_generator::TokenGenerator;
const ENGINE_EVENT_TOKEN: Token = Token(0);
#[derive(Clone,Copy,Debug,Eq,Hash,PartialEq)]
pub struct Connection(pub usize);
impl fmt::Display for Connection {
fn fmt(&self,f: &mut fmt::Formatter) -> fmt::Result {
write!(f,"{}",self.0)
}
}
#[derive(Clone,Copy,Debug,Eq,Hash,PartialEq)]
pub struct Listener(pub usize);
impl fmt::Display for Listener {
fn fmt(&self,f: &mut fmt::Formatter) -> fmt::Result {
write!(f,"{}",self.0)
}
}
pub enum ConnectionTerminatedReason {
BeginStrWrongError{ received: FIXVersion, expected: FIXVersion },
InboundMsgSeqNumMaxExceededError,
InboundMsgSeqNumLowerThanExpectedError,
InboundResendRequestLoopError,
LocalRequested,
LogonHeartBtIntNegativeError,
LogonParseError(ParseError),
LogonNeverReceivedError,
LogonNotFirstMessageError,
LogonRejectedError,
LogoutNoHangUpError,
LogoutNoResponseError,
OutboundMsgSeqNumMaxExceededError,
RemoteRequested,
SenderCompIDWrongError,
SocketNotWritableTimeoutError,
SocketReadError(io::Error),
SocketWriteError(io::Error),
TargetCompIDWrongError,
TestRequestNotRespondedError,
}
impl fmt::Debug for ConnectionTerminatedReason {
fn fmt(&self,f: &mut fmt::Formatter) -> fmt::Result {
match *self {
ConnectionTerminatedReason::BeginStrWrongError{ref received,ref expected} => {
let received_str = String::from_utf8_lossy(received.begin_string()).into_owned();
let expected_str = String::from_utf8_lossy(expected.begin_string()).into_owned();
write!(f,"Received message with BeginStr containing '{}' but expected '{}'.",received_str,expected_str)
},
ConnectionTerminatedReason::InboundMsgSeqNumMaxExceededError => write!(f,"Expected inbound MsgSeqNum exceeded maximum allowed."),
ConnectionTerminatedReason::InboundMsgSeqNumLowerThanExpectedError => write!(f,"Received message with lower MsgSeqNum than expected."),
ConnectionTerminatedReason::InboundResendRequestLoopError => write!(f,"Received too many ResendRequests with the same BeginSeqNo."),
ConnectionTerminatedReason::LocalRequested => write!(f,"Local requested logout and it was performed cleanly."),
ConnectionTerminatedReason::LogonHeartBtIntNegativeError => write!(f,"Response to logon included negative HeartBtInt."),
ConnectionTerminatedReason::LogonParseError(_) => write!(f,"Could not parse logon response."), ConnectionTerminatedReason::LogonNeverReceivedError => write!(f,"Never received logon from new connection."),
ConnectionTerminatedReason::LogonNotFirstMessageError => write!(f,"Remote responded to logon with a non-logon message."),
ConnectionTerminatedReason::LogonRejectedError => write!(f,"Remote rejected logon for arbitrary reason."),
ConnectionTerminatedReason::LogoutNoHangUpError => write!(f,"Remote requested logout but did not close socket after response."),
ConnectionTerminatedReason::LogoutNoResponseError => write!(f,"Local requested logout but remote did not respond within a reasonable amount of time."),
ConnectionTerminatedReason::OutboundMsgSeqNumMaxExceededError => write!(f,"Expected outbound MsgSeqNum exceeded maximum allowed."),
ConnectionTerminatedReason::RemoteRequested => write!(f,"Remote requested logout and it was performed cleanly."),
ConnectionTerminatedReason::SenderCompIDWrongError => write!(f,"Received message with SenderCompID not matching the expected value."),
ConnectionTerminatedReason::SocketNotWritableTimeoutError => write!(f,"Socket returned WouldBlock on write for an unreasonable amount of time."),
ConnectionTerminatedReason::SocketReadError(ref error) => write!(f,"Socket could not be read from: {}",error),
ConnectionTerminatedReason::SocketWriteError(ref error) => write!(f,"Socket could not be written to: {}",error),
ConnectionTerminatedReason::TargetCompIDWrongError => write!(f,"Received message with TargetCompID not matching the expected value."),
ConnectionTerminatedReason::TestRequestNotRespondedError => write!(f,"TestRequest message not responded with Heartbeat message within a reasonable amount of time."),
}
}
}
pub enum EngineEvent {
ConnectionFailed(Connection,io::Error), ConnectionSucceeded(Connection), ConnectionTerminated(Connection,ConnectionTerminatedReason), ConnectionDropped(Listener,SocketAddr), ConnectionAccepted(Listener,Connection,SocketAddr), ConnectionLoggingOn(Listener,Connection,Box<Logon>),
SessionEstablished(Connection), ListenerFailed(Listener,io::Error), ListenerAcceptFailed(Listener,io::Error), MessageReceived(Connection,Box<FIXTMessage + Send>), MessageReceivedGarbled(Connection,ParseError), MessageReceivedDuplicate(Connection,Box<FIXTMessage + Send>), MessageRejected(Connection,Box<FIXTMessage + Send>), ResendRequested(Connection,Range<u64>), SequenceResetResetHasNoEffect(Connection),
SequenceResetResetInThePast(Connection),
FatalError(&'static str,io::Error), }
impl fmt::Debug for EngineEvent {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
EngineEvent::ConnectionFailed(connection,ref error) => write!(f,"EngineEvent::ConnectionFailed({:?},{:?})",connection,error),
EngineEvent::ConnectionSucceeded(connection) => write!(f,"EngineEvent::ConnectionSucceeded({:?})",connection),
EngineEvent::ConnectionTerminated(connection,ref reason) => write!(f,"EngineEvent::ConnectionTerminated({:?},{:?})",connection,reason),
EngineEvent::ConnectionDropped(connection,addr) => write!(f,"EngineEvent::ConnectionDropped({:?},{:?})",connection,addr),
EngineEvent::ConnectionAccepted(listener,connection,addr) => write!(f,"EngineEvent::ConnectionAccepted({:?},{:?},{:?})",listener,connection,addr),
EngineEvent::ConnectionLoggingOn(listener,connection,ref message) => write!(f,"EngineEvent::ConnectionLoggingOn({:?},{:?},",listener,connection)
.and_then(|_| debug_format_fixt_message(&**message as &FIXTMessage,f))
.and_then(|_| write!(f,")")),
EngineEvent::SessionEstablished(connection) => write!(f,"EngineEvent::SessionEstablished({:?})",connection),
EngineEvent::ListenerFailed(listener,ref error) => write!(f,"EngineEvent::ListenerFailed({:?},{:?})",listener,error),
EngineEvent::ListenerAcceptFailed(listener,ref error) => write!(f,"EngineEvent::ListenerAcceptFailed({:?},{:?})",listener,error),
EngineEvent::MessageReceived(connection,ref message) => write!(f,"EngineEvent::MessageReceived({:?},{:?})",connection,message),
EngineEvent::MessageReceivedGarbled(connection,ref parse_error) => write!(f,"EngineEvent::MessageReceivedGarbled({:?},{:?})",connection,parse_error),
EngineEvent::MessageReceivedDuplicate(connection,ref message) => write!(f,"EngineEvent::MessageReceivedDuplicate({:?},{:?})",connection,message),
EngineEvent::MessageRejected(connection,ref message) => write!(f,"EngineEvent::MessageRejected({:?},{:?})",connection,message),
EngineEvent::ResendRequested(connection,ref range) => write!(f,"EngineEvent::ResendRequested({:?},{:?})",connection,range),
EngineEvent::SequenceResetResetHasNoEffect(connection) => write!(f,"EngineEvent:SequenceResetResetHasNoEffect({:?})",connection),
EngineEvent::SequenceResetResetInThePast(connection) => write!(f,"EngineEvent:SequenceResetResetInThePast({:?})",connection),
EngineEvent::FatalError(description,ref error) => write!(f,"EngineEvent::FatalError({:?},{:?})",description,error),
}
}
}
pub enum ResendResponse {
Message(Option<MessageVersion>,Box<FIXTMessage + Send>),
Gap(Range<u64>),
}
fn to_socket_addr<A: ToSocketAddrs>(address: A) -> Option<SocketAddr> {
match address.to_socket_addrs() {
Ok(mut address_iter) => address_iter.next(),
Err(_) => None,
}
}
pub struct Engine {
token_generator: Arc<Mutex<TokenGenerator>>,
tx: Sender<InternalEngineToThreadEvent>,
rx: Receiver<EngineEvent>,
poll: Poll,
thread_handle: Option<thread::JoinHandle<()>>,
}
impl Engine {
pub fn new(message_dictionary: HashMap<&'static [u8],Box<BuildFIXTMessage + Send>>,
max_message_size: u64) -> Result<Engine,io::Error> {
let engine_poll = try!(Poll::new());
let (thread_to_engine_tx,thread_to_engine_rx) = channel::<EngineEvent>();
try!(engine_poll.register(&thread_to_engine_rx,ENGINE_EVENT_TOKEN,Ready::readable(),PollOpt::level()));
let poll = try!(Poll::new());
let (engine_to_thread_tx,engine_to_thread_rx) = channel::<InternalEngineToThreadEvent>();
try!(poll.register(&engine_to_thread_rx,INTERNAL_ENGINE_EVENT_TOKEN,Ready::readable(),PollOpt::level()));
let token_generator = Arc::new(Mutex::new(TokenGenerator::new(BASE_CONNECTION_TOKEN.0,Some(CONNECTION_COUNT_MAX - BASE_CONNECTION_TOKEN.0))));
Ok(Engine {
token_generator: token_generator.clone(),
tx: engine_to_thread_tx,
rx: thread_to_engine_rx,
poll: engine_poll,
thread_handle: Some(thread::spawn(move || {
internal_engine_thread(poll,token_generator,thread_to_engine_tx,engine_to_thread_rx,message_dictionary,max_message_size);
})),
})
}
pub fn add_connection<A: ToSocketAddrs>(&mut self,
fix_version: FIXVersion,
mut default_message_version: MessageVersion,
sender_comp_id: &[u8],
target_comp_id: &[u8],
address: A) -> Option<Connection> {
let address = match to_socket_addr(address) {
Some(address) => address,
None => return None,
};
default_message_version = match fix_version {
FIXVersion::FIX_4_0 => MessageVersion::FIX40,
FIXVersion::FIX_4_1 => MessageVersion::FIX41,
FIXVersion::FIX_4_2 => MessageVersion::FIX42,
FIXVersion::FIX_4_3 => MessageVersion::FIX43,
FIXVersion::FIX_4_4 => MessageVersion::FIX44,
FIXVersion::FIXT_1_1 => default_message_version,
};
let token = match self.token_generator.lock().unwrap().create() {
Some(token) => token,
None => return None,
};
self.tx.send(InternalEngineToThreadEvent::NewConnection(token.clone(),fix_version,default_message_version,sender_comp_id.to_vec(),target_comp_id.to_vec(),address)).unwrap();
let connection = Connection(token.0);
Some(connection)
}
pub fn add_listener<A: ToSocketAddrs>(&mut self,sender_comp_id: &[u8],address: A) -> Result<Option<Listener>,io::Error> {
let address = match to_socket_addr(address) {
Some(address) => address,
None => return Ok(None),
};
let listener = try!(TcpListener::bind(&address));
let token = match self.token_generator.lock().unwrap().create() {
Some(token) => token,
None => return Ok(None),
};
self.tx.send(InternalEngineToThreadEvent::NewListener(token.clone(),sender_comp_id.to_vec(),listener)).unwrap();
let listener = Listener(token.0);
Ok(Some(listener))
}
pub fn send_message<T: 'static + FIXTMessage + Send>(&mut self,connection: Connection,message: T) {
let message = Box::new(message);
self.send_message_box(connection,message);
}
pub fn send_message_box(&mut self,connection: Connection,message: Box<FIXTMessage + Send>) {
self.send_message_box_with_message_version(connection,None,message);
}
pub fn send_message_box_with_message_version<MV: Into<Option<MessageVersion>>>(&mut self,connection: Connection,message_version: MV,message: Box<FIXTMessage + Send>) {
self.tx.send(InternalEngineToThreadEvent::SendMessage(Token(connection.0),message_version.into(),message)).unwrap();
}
pub fn send_resend_response(&mut self,connection: Connection,response: Vec<ResendResponse>) {
if response.is_empty() {
return;
}
{
fn resend_response_end(response: &ResendResponse) -> u64 {
match *response {
ResendResponse::Message(_,ref message) => message.msg_seq_num(),
ResendResponse::Gap(ref range) => {
assert!(range.start <= range.end);
range.end
},
}
}
let mut iter = response.iter();
let mut previous = resend_response_end(iter.next().unwrap());
for item in iter {
let next = resend_response_end(item);
assert!(previous < next);
previous = next;
}
}
self.tx.send(InternalEngineToThreadEvent::ResendMessages(Token(connection.0),response)).unwrap();
}
pub fn approve_new_connection<IMSN: Into<Option<u64>>>(&mut self,connection: Connection,message: Box<Logon>,inbound_msg_seq_num: IMSN) {
self.tx.send(InternalEngineToThreadEvent::ApproveNewConnection(connection,message,inbound_msg_seq_num.into().unwrap_or(2))).unwrap();
}
pub fn reject_new_connection(&mut self,connection: Connection,reason: Option<Vec<u8>>) {
self.tx.send(InternalEngineToThreadEvent::RejectNewConnection(connection,reason)).unwrap();
}
pub fn logout(&mut self,connection: Connection) {
self.tx.send(InternalEngineToThreadEvent::Logout(Token(connection.0))).unwrap();
}
pub fn poll<D: Into<Option<Duration>>>(&mut self,duration: D) -> Option<EngineEvent> {
fn update_engine(engine: &mut Engine,event: &EngineEvent) {
match *event {
EngineEvent::ConnectionFailed(connection,_) |
EngineEvent::ConnectionTerminated(connection,_) => {
engine.token_generator.lock().unwrap().remove(Token(connection.0));
},
_ => {},
}
};
if let Ok(event) = self.rx.try_recv() {
update_engine(self,&event);
return Some(event);
}
if let Some(poll_duration) = duration.into() {
let now = Instant::now();
while let Some(poll_duration) = poll_duration.checked_sub(now.elapsed()) {
let mut events = Events::with_capacity(1);
if self.poll.poll(&mut events,Some(poll_duration)).is_err() {
return None;
}
let result = self.rx.try_recv();
match result {
Ok(event) => {
update_engine(self,&event);
return Some(event);
},
Err(e) if e == TryRecvError::Disconnected => return None,
_ => {},
}
}
}
None
}
}
impl Drop for Engine {
fn drop(&mut self) {
self.tx.send(InternalEngineToThreadEvent::Shutdown).unwrap();
let thread_handle = mem::replace(&mut self.thread_handle,None);
let _ = thread_handle.unwrap().join();
}
}