#ifndef RTC_TRANSPORT_H
#define RTC_TRANSPORT_H
#include "include.hpp"
#include "message.hpp"
#include <atomic>
#include <functional>
#include <memory>
namespace rtc {
class Transport {
public:
enum class State { Disconnected, Connecting, Connected, Completed, Failed };
using state_callback = std::function<void(State state)>;
Transport(std::shared_ptr<Transport> lower = nullptr, state_callback callback = nullptr)
: mLower(std::move(lower)), mStateChangeCallback(std::move(callback)) {}
virtual ~Transport() { stop(); }
virtual void start() { mStopped = false; }
virtual bool stop() {
if (mStopped.exchange(true))
return false;
if (mLower) {
PLOG_VERBOSE << "Unregistering incoming callback";
mLower->onRecv(nullptr);
}
return true;
}
void registerIncoming() {
if (mLower) {
PLOG_VERBOSE << "Registering incoming callback";
mLower->onRecv(std::bind(&Transport::incoming, this, std::placeholders::_1));
}
}
void onRecv(message_callback callback) { mRecvCallback = std::move(callback); }
State state() const { return mState; }
virtual bool send(message_ptr message) { return outgoing(message); }
protected:
void recv(message_ptr message) {
try {
mRecvCallback(message);
} catch (const std::exception &e) {
PLOG_WARNING << e.what();
}
}
void changeState(State state) {
try {
if (mState.exchange(state) != state)
mStateChangeCallback(state);
} catch (const std::exception &e) {
PLOG_WARNING << e.what();
}
}
virtual void incoming(message_ptr message) { recv(message); }
virtual bool outgoing(message_ptr message) {
if (mLower)
return mLower->send(message);
else
return false;
}
private:
std::shared_ptr<Transport> mLower;
synchronized_callback<State> mStateChangeCallback;
synchronized_callback<message_ptr> mRecvCallback;
std::atomic<State> mState = State::Disconnected;
std::atomic<bool> mStopped = true;
};
}
#endif