#ifdef _MSC_VER
#include "stdafx.h"
#else
#include "config.h"
#endif
#include "Exceptions.h"
#include "Session.h"
#include "Settings.h"
#include "SocketAcceptor.h"
#include "Utility.h"
namespace FIX {
SocketAcceptor::SocketAcceptor(Application &application, MessageStoreFactory &factory, const SessionSettings &settings)
EXCEPT(ConfigError)
: Acceptor(application, factory, settings),
m_pServer(0) {}
SocketAcceptor::SocketAcceptor(
Application &application,
MessageStoreFactory &factory,
const SessionSettings &settings,
LogFactory &logFactory) EXCEPT(ConfigError)
: Acceptor(application, factory, settings, logFactory),
m_pServer(0) {}
SocketAcceptor::~SocketAcceptor() {
SocketConnections::iterator iter;
for (iter = m_connections.begin(); iter != m_connections.end(); ++iter) {
delete iter->second;
}
}
void SocketAcceptor::onConfigure(const SessionSettings &sessionSettings) EXCEPT(ConfigError) {
for (const SessionID &sessionID : sessionSettings.getSessions()) {
const Dictionary &settings = sessionSettings.get(sessionID);
settings.getInt(SOCKET_ACCEPT_PORT);
if (settings.has(SOCKET_REUSE_ADDRESS)) {
settings.getBool(SOCKET_REUSE_ADDRESS);
}
if (settings.has(SOCKET_NODELAY)) {
settings.getBool(SOCKET_NODELAY);
}
}
}
void SocketAcceptor::onInitialize(const SessionSettings &sessionSettings) EXCEPT(RuntimeError) {
uint16_t port = 0;
try {
m_pServer = new SocketServer(1);
for (const SessionID &sessionID : sessionSettings.getSessions()) {
const Dictionary &settings = sessionSettings.get(sessionID);
port = (short)settings.getInt(SOCKET_ACCEPT_PORT);
const bool reuseAddress = settings.has(SOCKET_REUSE_ADDRESS) ? settings.getBool(SOCKET_REUSE_ADDRESS) : true;
const bool noDelay = settings.has(SOCKET_NODELAY) ? settings.getBool(SOCKET_NODELAY) : false;
const int sendBufSize = settings.has(SOCKET_SEND_BUFFER_SIZE) ? settings.getInt(SOCKET_SEND_BUFFER_SIZE) : 0;
const int rcvBufSize = settings.has(SOCKET_RECEIVE_BUFFER_SIZE) ? settings.getInt(SOCKET_RECEIVE_BUFFER_SIZE) : 0;
socket_handle acceptSocket = m_pServer->add(port, reuseAddress, noDelay, sendBufSize, rcvBufSize);
m_portToSessions[socket_hostport(acceptSocket)].insert(sessionID);
m_sessionToPort[sessionID] = socket_hostport(acceptSocket);
}
} catch (SocketException &e) {
delete m_pServer;
m_pServer = 0;
throw RuntimeError(
"Unable to create, bind, or listen to port " + IntConvertor::convert((unsigned short)port) + " (" + e.what()
+ ")");
}
}
void SocketAcceptor::onStart() {
while (!isStopped() && m_pServer && m_pServer->block(*this)) {}
if (!m_pServer) {
return;
}
time_t start = 0;
time_t now = 0;
::time(&start);
while (isLoggedOn()) {
m_pServer->block(*this);
if (::time(&now) - 5 >= start) {
break;
}
}
m_pServer->close();
delete m_pServer;
m_pServer = 0;
}
bool SocketAcceptor::onPoll() {
if (!m_pServer) {
return false;
}
time_t start = 0;
time_t now = 0;
if (isStopped()) {
if (start == 0) {
::time(&start);
}
if (!isLoggedOn()) {
start = 0;
return false;
}
if (::time(&now) - 5 >= start) {
start = 0;
return false;
}
}
m_pServer->block(*this, true);
return true;
}
void SocketAcceptor::onStop() {
if (m_pServer) {
m_pServer->close();
}
}
void SocketAcceptor::onConnect(SocketServer &server, socket_handle a, socket_handle s) {
if (!socket_isValid(s)) {
return;
}
SocketConnections::iterator i = m_connections.find(s);
if (i != m_connections.end()) {
return;
}
uint16_t port = server.socketToPort(a);
Sessions sessions = m_portToSessions[port];
m_connections[s] = new SocketConnection(s, sessions, &server.getMonitor());
std::stringstream stream;
stream << "Accepted connection from " << socket_peername(s) << " on port " << port;
if (getLog()) {
getLog()->onEvent(stream.str());
}
}
void SocketAcceptor::onWrite(SocketServer &server, socket_handle s) {
SocketConnections::iterator i = m_connections.find(s);
if (i == m_connections.end()) {
return;
}
SocketConnection *pSocketConnection = i->second;
if (pSocketConnection->processQueue()) {
pSocketConnection->unsignal();
}
}
bool SocketAcceptor::onData(SocketServer &server, socket_handle s) {
SocketConnections::iterator i = m_connections.find(s);
if (i == m_connections.end()) {
return false;
}
SocketConnection *pSocketConnection = i->second;
return pSocketConnection->read(*this, server);
}
void SocketAcceptor::onDisconnect(SocketServer &, socket_handle s) {
SocketConnections::iterator i = m_connections.find(s);
if (i == m_connections.end()) {
return;
}
SocketConnection *pSocketConnection = i->second;
Session *pSession = pSocketConnection->getSession();
if (pSession) {
pSession->disconnect();
}
delete pSocketConnection;
m_connections.erase(s);
}
void SocketAcceptor::onError(SocketServer &) {}
void SocketAcceptor::onTimeout(SocketServer &) {
SocketConnections::iterator i;
for (i = m_connections.begin(); i != m_connections.end(); ++i) {
i->second->onTimeout();
}
}
}