#ifdef _MSC_VER
#include "stdafx.h"
#else
#include "config.h"
#endif
#include "HttpServer.h"
#include "Initiator.h"
#include "Session.h"
#include "SessionFactory.h"
#include "Utility.h"
#include "scope_guard.hpp"
#include <algorithm>
#include <fstream>
namespace FIX {
Initiator::Initiator(
Application &application,
MessageStoreFactory &messageStoreFactory,
const SessionSettings &settings) EXCEPT(ConfigError)
: m_threadid(0),
m_application(application),
m_messageStoreFactory(messageStoreFactory),
m_settings(settings),
m_pLogFactory(0),
m_pLog(0),
m_processing(false),
m_firstPoll(true),
m_stop(true) {
initialize();
}
Initiator::Initiator(
Application &application,
MessageStoreFactory &messageStoreFactory,
const SessionSettings &settings,
LogFactory &logFactory) EXCEPT(ConfigError)
: m_threadid(0),
m_application(application),
m_messageStoreFactory(messageStoreFactory),
m_settings(settings),
m_pLogFactory(&logFactory),
m_pLog(logFactory.create()),
m_processing(false),
m_firstPoll(true),
m_stop(true) {
initialize();
}
void Initiator::initialize() EXCEPT(ConfigError) {
std::set<SessionID> sessions = m_settings.getSessions();
std::set<SessionID>::iterator i;
if (!sessions.size()) {
throw ConfigError("No sessions defined");
}
SessionFactory factory(m_application, m_messageStoreFactory, m_pLogFactory);
for (i = sessions.begin(); i != sessions.end(); ++i) {
if (m_settings.get(*i).getString("ConnectionType") == "initiator") {
m_sessionIDs.insert(*i);
m_sessions[*i] = factory.create(*i, m_settings.get(*i));
setDisconnected(*i);
}
}
if (!m_sessions.size()) {
throw ConfigError("No sessions defined for initiator");
}
}
Initiator::~Initiator() {
Sessions::iterator i;
for (i = m_sessions.begin(); i != m_sessions.end(); ++i) {
delete i->second;
}
if (m_pLogFactory && m_pLog) {
m_pLogFactory->destroy(m_pLog);
}
}
Session *Initiator::getSession(const SessionID &sessionID, Responder &responder) {
Sessions::iterator i = m_sessions.find(sessionID);
if (i != m_sessions.end()) {
i->second->setResponder(&responder);
return i->second;
}
return 0;
}
Session *Initiator::getSession(const SessionID &sessionID) const {
Sessions::const_iterator i = m_sessions.find(sessionID);
if (i != m_sessions.end()) {
return i->second;
} else {
return 0;
}
}
const Dictionary *const Initiator::getSessionSettings(const SessionID &sessionID) const {
try {
return &m_settings.get(sessionID);
} catch (ConfigError &) {
return 0;
}
}
void Initiator::connect() {
Locker l(m_mutex);
SessionIDs disconnected = m_disconnected;
SessionIDs::iterator i = disconnected.begin();
for (; i != disconnected.end(); ++i) {
Session *pSession = Session::lookupSession(*i);
if (pSession->isEnabled() && pSession->isSessionTime(UtcTimeStamp::now())) {
doConnect(*i, m_settings.get(*i));
}
}
}
void Initiator::setPending(const SessionID &sessionID) {
Locker l(m_mutex);
m_pending.insert(sessionID);
m_connected.erase(sessionID);
m_disconnected.erase(sessionID);
}
void Initiator::setConnected(const SessionID &sessionID) {
Locker l(m_mutex);
m_pending.erase(sessionID);
m_connected.insert(sessionID);
m_disconnected.erase(sessionID);
}
void Initiator::setDisconnected(const SessionID &sessionID) {
Locker l(m_mutex);
m_pending.erase(sessionID);
m_connected.erase(sessionID);
m_disconnected.insert(sessionID);
}
bool Initiator::isPending(const SessionID &sessionID) const {
Locker l(m_mutex);
return m_pending.find(sessionID) != m_pending.end();
}
bool Initiator::isConnected(const SessionID &sessionID) const {
Locker l(m_mutex);
return m_connected.find(sessionID) != m_connected.end();
}
bool Initiator::isDisconnected(const SessionID &sessionID) const {
Locker l(m_mutex);
return m_disconnected.find(sessionID) != m_disconnected.end();
}
void Initiator::start() EXCEPT(ConfigError, RuntimeError) {
if (m_processing) {
throw RuntimeError("Initiator::start called when already processing messages");
}
m_processing = true;
m_stop = false;
try {
onConfigure(m_settings);
onInitialize(m_settings);
HttpServer::startGlobal(m_settings);
} catch (...) {
m_processing = false;
throw;
}
if (!thread_spawn(&startThread, this, m_threadid)) {
m_processing = false;
throw RuntimeError("Unable to spawn thread");
}
}
void Initiator::block() EXCEPT(ConfigError, RuntimeError) {
if (m_processing) {
throw RuntimeError("Initiator::block called when already processing messages");
}
auto guard = sg::make_scope_guard([this]() { m_processing = false; });
m_processing = true;
m_stop = false;
onConfigure(m_settings);
onInitialize(m_settings);
startThread(this);
}
bool Initiator::poll() EXCEPT(ConfigError, RuntimeError) {
if (m_processing) {
throw RuntimeError("Initiator::poll called when already processing messages");
}
auto guard = sg::make_scope_guard([this]() { m_processing = false; });
m_processing = true;
if (m_firstPoll) {
m_stop = false;
onConfigure(m_settings);
onInitialize(m_settings);
connect();
m_firstPoll = false;
}
return onPoll();
}
void Initiator::stop(bool force) {
if (isStopped()) {
return;
}
HttpServer::stopGlobal();
std::vector<Session *> enabledSessions;
SessionIDs connected;
{
Locker l(m_mutex);
connected = m_connected;
}
SessionIDs::iterator i = connected.begin();
for (; i != connected.end(); ++i) {
Session *pSession = Session::lookupSession(*i);
if (pSession && pSession->isEnabled()) {
enabledSessions.push_back(pSession);
pSession->logout();
}
}
if (!force) {
for (int second = 1; second <= 10 && isLoggedOn(); ++second) {
process_sleep(1);
}
}
{
Locker l(m_mutex);
for (i = connected.begin(); i != connected.end(); ++i) {
setDisconnected(Session::lookupSession(*i)->getSessionID());
}
}
m_stop = true;
onStop();
if (m_threadid) {
thread_join(m_threadid);
}
m_threadid = 0;
for (Session *session : enabledSessions) {
session->logon();
}
}
bool Initiator::isLoggedOn() const {
Locker l(m_mutex);
SessionIDs connected = m_connected;
SessionIDs::iterator i = connected.begin();
for (; i != connected.end(); ++i) {
if (Session::lookupSession(*i)->isLoggedOn()) {
return true;
}
}
return false;
}
THREAD_PROC Initiator::startThread(void *p) {
Initiator *pInitiator = static_cast<Initiator *>(p);
auto guard = sg::make_scope_guard([pInitiator]() { pInitiator->m_processing = false; });
pInitiator->onStart();
return 0;
}
}