#include "channel.hpp"
namespace rtc::impl {
void Channel::triggerOpen() {
mOpenTriggered = true;
openCallback();
flushPendingMessages();
}
void Channel::triggerClosed() { closedCallback(); }
void Channel::triggerError(string error) { errorCallback(std::move(error)); }
void Channel::triggerAvailable(size_t count) {
if (count == 1)
availableCallback();
flushPendingMessages();
}
void Channel::triggerBufferedAmount(size_t amount) {
size_t previous = bufferedAmount.exchange(amount);
size_t threshold = bufferedAmountLowThreshold.load();
if (previous > threshold && amount <= threshold)
bufferedAmountLowCallback();
}
void Channel::flushPendingMessages() {
if (!mOpenTriggered)
return;
while (messageCallback) {
auto next = receive();
if (!next)
break;
messageCallback(*next);
}
}
void Channel::resetOpenCallback() {
mOpenTriggered = false;
openCallback = nullptr;
}
void Channel::resetCallbacks() {
mOpenTriggered = false;
openCallback = nullptr;
closedCallback = nullptr;
errorCallback = nullptr;
availableCallback = nullptr;
bufferedAmountLowCallback = nullptr;
messageCallback = nullptr;
}
}