#include "peerconnection.hpp"
#include "certificate.hpp"
#include "common.hpp"
#include "dtlstransport.hpp"
#include "icetransport.hpp"
#include "internals.hpp"
#include "logcounter.hpp"
#include "peerconnection.hpp"
#include "processor.hpp"
#include "rtp.hpp"
#include "sctptransport.hpp"
#if RTC_ENABLE_MEDIA
#include "dtlssrtptransport.hpp"
#endif
#include <algorithm>
#include <array>
#include <iomanip>
#include <set>
#include <sstream>
#include <thread>
using namespace std::placeholders;
namespace rtc::impl {
static LogCounter COUNTER_MEDIA_TRUNCATED(plog::warning,
"Number of truncated RTP packets over past second");
static LogCounter COUNTER_SRTP_DECRYPT_ERROR(plog::warning,
"Number of SRTP decryption errors over past second");
static LogCounter COUNTER_SRTP_ENCRYPT_ERROR(plog::warning,
"Number of SRTP encryption errors over past second");
static LogCounter
COUNTER_UNKNOWN_PACKET_TYPE(plog::warning,
"Number of unknown RTCP packet types over past second");
PeerConnection::PeerConnection(Configuration config_)
: config(std::move(config_)), mCertificate(make_certificate(config.certificateType)) {
PLOG_VERBOSE << "Creating PeerConnection";
if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
throw std::invalid_argument("Invalid port range");
if (config.mtu) {
if (*config.mtu < 576) throw std::invalid_argument("Invalid MTU value");
if (*config.mtu > 1500) { PLOG_WARNING << "MTU set to " << *config.mtu;
} else {
PLOG_VERBOSE << "MTU set to " << *config.mtu;
}
}
}
PeerConnection::~PeerConnection() {
PLOG_VERBOSE << "Destroying PeerConnection";
mProcessor.join();
}
void PeerConnection::close() {
PLOG_VERBOSE << "Closing PeerConnection";
negotiationNeeded = false;
mProcessor.enqueue(&PeerConnection::closeDataChannels, shared_from_this());
mProcessor.enqueue(&PeerConnection::closeTracks, shared_from_this());
closeTransports();
}
optional<Description> PeerConnection::localDescription() const {
std::lock_guard lock(mLocalDescriptionMutex);
return mLocalDescription;
}
optional<Description> PeerConnection::remoteDescription() const {
std::lock_guard lock(mRemoteDescriptionMutex);
return mRemoteDescription;
}
size_t PeerConnection::remoteMaxMessageSize() const {
const size_t localMax = config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
size_t remoteMax = DEFAULT_MAX_MESSAGE_SIZE;
std::lock_guard lock(mRemoteDescriptionMutex);
if (mRemoteDescription)
if (auto *application = mRemoteDescription->application())
if (auto max = application->maxMessageSize()) {
remoteMax = *max > 0 ? *max : std::numeric_limits<size_t>::max();
}
return std::min(remoteMax, localMax);
}
template <typename T>
shared_ptr<T> emplaceTransport(PeerConnection *pc, shared_ptr<T> *member, shared_ptr<T> transport) {
std::atomic_store(member, transport);
try {
transport->start();
} catch (...) {
std::atomic_store(member, decltype(transport)(nullptr));
transport->stop();
throw;
}
if (pc->state.load() == PeerConnection::State::Closed) {
std::atomic_store(member, decltype(transport)(nullptr));
transport->stop();
return nullptr;
}
return transport;
}
shared_ptr<IceTransport> PeerConnection::initIceTransport() {
try {
if (auto transport = std::atomic_load(&mIceTransport))
return transport;
PLOG_VERBOSE << "Starting ICE transport";
auto transport = std::make_shared<IceTransport>(
config, weak_bind(&PeerConnection::processLocalCandidate, this, _1),
[this, weak_this = weak_from_this()](IceTransport::State transportState) {
auto shared_this = weak_this.lock();
if (!shared_this)
return;
switch (transportState) {
case IceTransport::State::Connecting:
changeState(State::Connecting);
break;
case IceTransport::State::Failed:
changeState(State::Failed);
break;
case IceTransport::State::Connected:
initDtlsTransport();
break;
case IceTransport::State::Disconnected:
changeState(State::Disconnected);
break;
default:
break;
}
},
[this, weak_this = weak_from_this()](IceTransport::GatheringState gatheringState) {
auto shared_this = weak_this.lock();
if (!shared_this)
return;
switch (gatheringState) {
case IceTransport::GatheringState::InProgress:
changeGatheringState(GatheringState::InProgress);
break;
case IceTransport::GatheringState::Complete:
endLocalCandidates();
changeGatheringState(GatheringState::Complete);
break;
default:
break;
}
});
return emplaceTransport(this, &mIceTransport, std::move(transport));
} catch (const std::exception &e) {
PLOG_ERROR << e.what();
changeState(State::Failed);
throw std::runtime_error("ICE transport initialization failed");
}
}
shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
try {
if (auto transport = std::atomic_load(&mDtlsTransport))
return transport;
PLOG_VERBOSE << "Starting DTLS transport";
auto lower = std::atomic_load(&mIceTransport);
if (!lower)
throw std::logic_error("No underlying ICE transport for DTLS transport");
auto certificate = mCertificate.get();
auto verifierCallback = weak_bind(&PeerConnection::checkFingerprint, this, _1);
auto dtlsStateChangeCallback =
[this, weak_this = weak_from_this()](DtlsTransport::State transportState) {
auto shared_this = weak_this.lock();
if (!shared_this)
return;
switch (transportState) {
case DtlsTransport::State::Connected:
if (auto remote = remoteDescription(); remote && remote->hasApplication())
initSctpTransport();
else
changeState(State::Connected);
mProcessor.enqueue(&PeerConnection::openTracks, shared_from_this());
break;
case DtlsTransport::State::Failed:
changeState(State::Failed);
mProcessor.enqueue(&PeerConnection::closeTracks, shared_from_this());
break;
case DtlsTransport::State::Disconnected:
changeState(State::Disconnected);
mProcessor.enqueue(&PeerConnection::closeTracks, shared_from_this());
break;
default:
break;
}
};
shared_ptr<DtlsTransport> transport;
if (auto local = localDescription(); local && local->hasAudioOrVideo()) {
#if RTC_ENABLE_MEDIA
PLOG_INFO << "This connection requires media support";
transport = std::make_shared<DtlsSrtpTransport>(
lower, certificate, config.mtu, verifierCallback,
weak_bind(&PeerConnection::forwardMedia, this, _1), dtlsStateChangeCallback);
#else
PLOG_WARNING << "Ignoring media support (not compiled with media support)";
#endif
}
if (!transport) {
transport = std::make_shared<DtlsTransport>(lower, certificate, config.mtu,
verifierCallback, dtlsStateChangeCallback);
}
return emplaceTransport(this, &mDtlsTransport, std::move(transport));
} catch (const std::exception &e) {
PLOG_ERROR << e.what();
changeState(State::Failed);
throw std::runtime_error("DTLS transport initialization failed");
}
}
shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
try {
if (auto transport = std::atomic_load(&mSctpTransport))
return transport;
PLOG_VERBOSE << "Starting SCTP transport";
auto lower = std::atomic_load(&mDtlsTransport);
if (!lower)
throw std::logic_error("No underlying DTLS transport for SCTP transport");
auto local = localDescription();
if (!local || !local->application())
throw std::logic_error("Starting SCTP transport without local application description");
auto remote = remoteDescription();
if (!remote || !remote->application())
throw std::logic_error(
"Starting SCTP transport without remote application description");
SctpTransport::Ports ports = {};
ports.local = local->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
ports.remote = remote->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
shiftDataChannels();
auto transport = std::make_shared<SctpTransport>(
lower, config, std::move(ports), weak_bind(&PeerConnection::forwardMessage, this, _1),
weak_bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
[this, weak_this = weak_from_this()](SctpTransport::State transportState) {
auto shared_this = weak_this.lock();
if (!shared_this)
return;
switch (transportState) {
case SctpTransport::State::Connected:
changeState(State::Connected);
mProcessor.enqueue(&PeerConnection::openDataChannels, shared_from_this());
break;
case SctpTransport::State::Failed:
LOG_WARNING << "SCTP transport failed";
changeState(State::Failed);
mProcessor.enqueue(&PeerConnection::remoteCloseDataChannels,
shared_from_this());
break;
case SctpTransport::State::Disconnected:
changeState(State::Disconnected);
mProcessor.enqueue(&PeerConnection::remoteCloseDataChannels,
shared_from_this());
break;
default:
break;
}
});
return emplaceTransport(this, &mSctpTransport, std::move(transport));
} catch (const std::exception &e) {
PLOG_ERROR << e.what();
changeState(State::Failed);
throw std::runtime_error("SCTP transport initialization failed");
}
}
shared_ptr<IceTransport> PeerConnection::getIceTransport() const {
return std::atomic_load(&mIceTransport);
}
shared_ptr<DtlsTransport> PeerConnection::getDtlsTransport() const {
return std::atomic_load(&mDtlsTransport);
}
shared_ptr<SctpTransport> PeerConnection::getSctpTransport() const {
return std::atomic_load(&mSctpTransport);
}
void PeerConnection::closeTransports() {
PLOG_VERBOSE << "Closing transports";
if (!changeState(State::Closed))
return;
resetCallbacks();
auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr));
auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr));
auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr));
if (sctp) {
sctp->onRecv(nullptr);
sctp->onBufferedAmount(nullptr);
}
using array = std::array<shared_ptr<Transport>, 3>;
array transports{std::move(sctp), std::move(dtls), std::move(ice)};
for (const auto &t : transports)
if (t)
t->onStateChange(nullptr);
mProcessor.enqueue([self = shared_from_this(), transports = std::move(transports)]() {
TearDownProcessor::Instance().enqueue(
[transports = std::move(transports), token = Init::Instance().token()]() mutable {
for (const auto &t : transports)
if (t)
t->stop();
for (auto &t : transports)
t.reset();
});
});
}
void PeerConnection::endLocalCandidates() {
std::lock_guard lock(mLocalDescriptionMutex);
if (mLocalDescription)
mLocalDescription->endCandidates();
}
void PeerConnection::rollbackLocalDescription() {
PLOG_DEBUG << "Rolling back pending local description";
std::unique_lock lock(mLocalDescriptionMutex);
if (mCurrentLocalDescription) {
std::vector<Candidate> existingCandidates;
if (mLocalDescription)
existingCandidates = mLocalDescription->extractCandidates();
mLocalDescription.emplace(std::move(*mCurrentLocalDescription));
mLocalDescription->addCandidates(std::move(existingCandidates));
mCurrentLocalDescription.reset();
}
}
bool PeerConnection::checkFingerprint(const std::string &fingerprint) const {
std::lock_guard lock(mRemoteDescriptionMutex);
auto expectedFingerprint = mRemoteDescription ? mRemoteDescription->fingerprint() : nullopt;
if (expectedFingerprint && *expectedFingerprint == fingerprint) {
PLOG_VERBOSE << "Valid fingerprint \"" << fingerprint << "\"";
return true;
}
PLOG_ERROR << "Invalid fingerprint \"" << fingerprint << "\", expected \""
<< expectedFingerprint.value_or("[none]") << "\"";
return false;
}
void PeerConnection::forwardMessage(message_ptr message) {
if (!message) {
remoteCloseDataChannels();
return;
}
const uint16_t stream = uint16_t(message->stream);
auto channel = findDataChannel(stream);
if (DataChannel::IsOpenMessage(message)) {
auto iceTransport = getIceTransport();
auto sctpTransport = getSctpTransport();
if (!iceTransport || !sctpTransport)
return;
const uint16_t remoteParity = (iceTransport->role() == Description::Role::Active) ? 1 : 0;
if (stream % 2 != remoteParity) {
PLOG_WARNING << "Got open message violating the odd/even rule on stream " << stream;
sctpTransport->closeStream(message->stream);
return;
}
if (channel && channel->isOpen()) {
PLOG_WARNING << "Got open message on stream " << stream
<< " for an already open DataChannel, closing it first";
channel->close();
}
channel = std::make_shared<IncomingDataChannel>(weak_from_this(), sctpTransport, stream);
channel->openCallback =
weak_bind(&PeerConnection::triggerDataChannel, this, weak_ptr<DataChannel>{channel});
std::unique_lock lock(mDataChannelsMutex); mDataChannels.emplace(stream, channel);
}
if (!channel) {
if (message->type == Message::Control) return;
PLOG_WARNING << "Got unexpected message on stream " << stream;
if (auto sctpTransport = getSctpTransport())
sctpTransport->closeStream(message->stream);
return;
}
channel->incoming(message);
}
void PeerConnection::forwardMedia(message_ptr message) {
if (!message)
return;
if (message->type == Message::Control) {
std::set<uint32_t> ssrcs;
size_t offset = 0;
while ((sizeof(RtcpHeader) + offset) <= message->size()) {
auto header = reinterpret_cast<RtcpHeader *>(message->data() + offset);
if (header->lengthInBytes() > message->size() - offset) {
COUNTER_MEDIA_TRUNCATED++;
break;
}
offset += header->lengthInBytes();
if (header->payloadType() == 205 || header->payloadType() == 206) {
auto rtcpfb = reinterpret_cast<RtcpFbHeader *>(header);
ssrcs.insert(rtcpfb->packetSenderSSRC());
ssrcs.insert(rtcpfb->mediaSourceSSRC());
} else if (header->payloadType() == 200 || header->payloadType() == 201) {
auto rtcpsr = reinterpret_cast<RtcpSr *>(header);
ssrcs.insert(rtcpsr->senderSSRC());
for (int i = 0; i < rtcpsr->header.reportCount(); ++i)
ssrcs.insert(rtcpsr->getReportBlock(i)->getSSRC());
} else if (header->payloadType() == 202) {
auto sdes = reinterpret_cast<RtcpSdes *>(header);
if (!sdes->isValid()) {
PLOG_WARNING << "RTCP SDES packet is invalid";
continue;
}
for (unsigned int i = 0; i < sdes->chunksCount(); i++) {
auto chunk = sdes->getChunk(i);
ssrcs.insert(chunk->ssrc());
}
} else {
if (header->payloadType() != 207) {
COUNTER_UNKNOWN_PACKET_TYPE++;
}
}
}
if (!ssrcs.empty()) {
for (uint32_t ssrc : ssrcs) {
if (auto mid = getMidFromSsrc(ssrc)) {
std::shared_lock lock(mTracksMutex); if (auto it = mTracks.find(*mid); it != mTracks.end())
if (auto track = it->second.lock())
track->incoming(message);
}
}
return;
}
}
uint32_t ssrc = uint32_t(message->stream);
if (auto mid = getMidFromSsrc(ssrc)) {
std::shared_lock lock(mTracksMutex); if (auto it = mTracks.find(*mid); it != mTracks.end())
if (auto track = it->second.lock())
track->incoming(message);
} else {
return;
}
}
optional<std::string> PeerConnection::getMidFromSsrc(uint32_t ssrc) {
if (auto it = mMidFromSsrc.find(ssrc); it != mMidFromSsrc.end())
return it->second;
{
std::lock_guard lock(mRemoteDescriptionMutex);
if (!mRemoteDescription)
return nullopt;
for (unsigned int i = 0; i < mRemoteDescription->mediaCount(); ++i) {
if (auto found =
std::visit(rtc::overloaded{[&](Description::Application *) -> optional<string> {
return std::nullopt;
},
[&](Description::Media *media) -> optional<string> {
return media->hasSSRC(ssrc)
? std::make_optional(media->mid())
: nullopt;
}},
mRemoteDescription->media(i))) {
mMidFromSsrc.emplace(ssrc, *found);
return *found;
}
}
}
{
std::lock_guard lock(mLocalDescriptionMutex);
if (!mLocalDescription)
return nullopt;
for (unsigned int i = 0; i < mLocalDescription->mediaCount(); ++i) {
if (auto found =
std::visit(rtc::overloaded{[&](Description::Application *) -> optional<string> {
return std::nullopt;
},
[&](Description::Media *media) -> optional<string> {
return media->hasSSRC(ssrc)
? std::make_optional(media->mid())
: nullopt;
}},
mLocalDescription->media(i))) {
mMidFromSsrc.emplace(ssrc, *found);
return *found;
}
}
}
return nullopt;
}
void PeerConnection::forwardBufferedAmount(uint16_t stream, size_t amount) {
if (auto channel = findDataChannel(stream))
channel->triggerBufferedAmount(amount);
}
shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(string label, DataChannelInit init) {
cleanupDataChannels();
std::unique_lock lock(mDataChannelsMutex); const uint16_t maxStream = maxDataChannelStream();
uint16_t stream;
if (init.id) {
stream = *init.id;
if (stream > maxStream)
throw std::invalid_argument("DataChannel stream id is too high");
} else {
auto iceTransport = getIceTransport();
auto role = iceTransport ? iceTransport->role() : Description::Role::Passive;
stream = (role == Description::Role::Active) ? 0 : 1;
while (true) {
if (stream > maxStream)
throw std::runtime_error("Too many DataChannels");
auto it = mDataChannels.find(stream);
if (it == mDataChannels.end() || !it->second.lock())
break;
stream += 2;
}
}
auto channel =
init.negotiated
? std::make_shared<DataChannel>(weak_from_this(), stream, std::move(label),
std::move(init.protocol), std::move(init.reliability))
: std::make_shared<OutgoingDataChannel>(weak_from_this(), stream, std::move(label),
std::move(init.protocol),
std::move(init.reliability));
mDataChannels.emplace(std::make_pair(stream, channel));
return channel;
}
shared_ptr<DataChannel> PeerConnection::findDataChannel(uint16_t stream) {
std::shared_lock lock(mDataChannelsMutex); if (auto it = mDataChannels.find(stream); it != mDataChannels.end())
if (auto channel = it->second.lock())
return channel;
return nullptr;
}
uint16_t PeerConnection::maxDataChannelStream() const {
auto sctpTransport = std::atomic_load(&mSctpTransport);
return sctpTransport ? sctpTransport->maxStream() : (MAX_SCTP_STREAMS_COUNT - 1);
}
void PeerConnection::shiftDataChannels() {
auto iceTransport = std::atomic_load(&mIceTransport);
auto sctpTransport = std::atomic_load(&mSctpTransport);
if (!sctpTransport && iceTransport && iceTransport->role() == Description::Role::Active) {
std::unique_lock lock(mDataChannelsMutex); decltype(mDataChannels) newDataChannels;
auto it = mDataChannels.begin();
while (it != mDataChannels.end()) {
auto channel = it->second.lock();
channel->shiftStream();
newDataChannels.emplace(channel->stream(), channel);
++it;
}
std::swap(mDataChannels, newDataChannels);
}
}
void PeerConnection::iterateDataChannels(
std::function<void(shared_ptr<DataChannel> channel)> func) {
std::vector<shared_ptr<DataChannel>> locked;
{
std::shared_lock lock(mDataChannelsMutex); locked.reserve(mDataChannels.size());
auto it = mDataChannels.begin();
while (it != mDataChannels.end()) {
auto channel = it->second.lock();
if (channel && !channel->isClosed())
locked.push_back(std::move(channel));
++it;
}
}
for (auto &channel : locked)
func(std::move(channel));
}
void PeerConnection::cleanupDataChannels() {
std::unique_lock lock(mDataChannelsMutex); auto it = mDataChannels.begin();
while (it != mDataChannels.end()) {
if (!it->second.lock()) {
it = mDataChannels.erase(it);
continue;
}
++it;
}
}
void PeerConnection::openDataChannels() {
if (auto transport = std::atomic_load(&mSctpTransport))
iterateDataChannels([&](shared_ptr<DataChannel> channel) {
if (channel->stream() <= transport->maxStream()) {
channel->open(transport);
} else {
channel->triggerError("DataChannel stream id is too high");
channel->remoteClose();
}
});
}
void PeerConnection::closeDataChannels() {
iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->close(); });
}
void PeerConnection::remoteCloseDataChannels() {
iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->remoteClose(); });
}
shared_ptr<Track> PeerConnection::emplaceTrack(Description::Media description) {
#if !RTC_ENABLE_MEDIA
PLOG_WARNING << "Tracks are disabled (not compiled with media support)";
description.markRemoved();
#endif
shared_ptr<Track> track;
if (auto it = mTracks.find(description.mid()); it != mTracks.end())
if (track = it->second.lock(); track)
track->setDescription(std::move(description));
if (!track) {
track = std::make_shared<Track>(weak_from_this(), std::move(description));
mTracks.emplace(std::make_pair(track->mid(), track));
mTrackLines.emplace_back(track);
}
if (description.isRemoved())
track->close();
return track;
}
void PeerConnection::incomingTrack(Description::Media description) {
std::unique_lock lock(mTracksMutex); shared_ptr<Track> track;
if (auto it = mTracks.find(description.mid()); it != mTracks.end()) {
if (track = it->second.lock(); track)
track->setDescription(std::move(description));
} else {
track = std::make_shared<Track>(weak_from_this(), std::move(description));
mTracks.emplace(std::make_pair(track->mid(), track));
mTrackLines.emplace_back(track);
triggerTrack(track);
}
if (track && description.isRemoved())
track->close();
}
void PeerConnection::openTracks() {
#if RTC_ENABLE_MEDIA
if (auto transport = std::atomic_load(&mDtlsTransport)) {
auto srtpTransport = std::dynamic_pointer_cast<DtlsSrtpTransport>(transport);
std::shared_lock lock(mTracksMutex); for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it)
if (auto track = it->lock())
if (!track->isOpen())
track->open(srtpTransport);
}
#endif
}
void PeerConnection::closeTracks() {
std::shared_lock lock(mTracksMutex); for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it)
if (auto track = it->lock())
track->close();
}
void PeerConnection::validateRemoteDescription(const Description &description) {
if (!description.iceUfrag())
throw std::invalid_argument("Remote description has no ICE user fragment");
if (!description.icePwd())
throw std::invalid_argument("Remote description has no ICE password");
if (!description.fingerprint())
throw std::invalid_argument("Remote description has no valid fingerprint");
if (description.mediaCount() == 0)
throw std::invalid_argument("Remote description has no media line");
int activeMediaCount = 0;
for (unsigned int i = 0; i < description.mediaCount(); ++i)
std::visit(rtc::overloaded{[&](const Description::Application *application) {
if (!application->isRemoved())
++activeMediaCount;
},
[&](const Description::Media *media) {
if (!media->isRemoved() ||
media->direction() != Description::Direction::Inactive)
++activeMediaCount;
}},
description.media(i));
if (activeMediaCount == 0)
throw std::invalid_argument("Remote description has no active media");
if (auto local = localDescription(); local && local->iceUfrag() && local->icePwd())
if (*description.iceUfrag() == *local->iceUfrag() &&
*description.icePwd() == *local->icePwd())
throw std::logic_error("Got the local description as remote description");
PLOG_VERBOSE << "Remote description looks valid";
}
void PeerConnection::processLocalDescription(Description description) {
const uint16_t localSctpPort = DEFAULT_SCTP_PORT;
const size_t localMaxMessageSize =
config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
description.clearMedia();
if (auto remote = remoteDescription()) {
for (unsigned int i = 0; i < remote->mediaCount(); ++i)
std::visit( rtc::overloaded{
[&](Description::Application *remoteApp) {
std::shared_lock lock(mDataChannelsMutex);
if (!mDataChannels.empty()) {
Description::Application app(remoteApp->mid());
app.setSctpPort(localSctpPort);
app.setMaxMessageSize(localMaxMessageSize);
PLOG_DEBUG << "Adding application to local description, mid=\""
<< app.mid() << "\"";
description.addMedia(std::move(app));
return;
}
auto reciprocated = remoteApp->reciprocate();
reciprocated.hintSctpPort(localSctpPort);
reciprocated.setMaxMessageSize(localMaxMessageSize);
PLOG_DEBUG << "Reciprocating application in local description, mid=\""
<< reciprocated.mid() << "\"";
description.addMedia(std::move(reciprocated));
},
[&](Description::Media *remoteMedia) {
std::shared_lock lock(mTracksMutex);
if (auto it = mTracks.find(remoteMedia->mid()); it != mTracks.end()) {
if (auto track = it->second.lock()) {
auto media = track->description();
PLOG_DEBUG << "Adding media to local description, mid=\""
<< media.mid() << "\", removed=" << std::boolalpha
<< media.isRemoved();
description.addMedia(std::move(media));
} else {
auto reciprocated = remoteMedia->reciprocate();
reciprocated.markRemoved();
PLOG_DEBUG << "Adding media to local description, mid=\""
<< reciprocated.mid()
<< "\", removed=true (track is destroyed)";
description.addMedia(std::move(reciprocated));
}
return;
}
lock.unlock();
auto reciprocated = remoteMedia->reciprocate();
#if !RTC_ENABLE_MEDIA
if (!reciprocated.isRemoved()) {
PLOG_WARNING << "Rejecting track (not compiled with media support)";
reciprocated.markRemoved();
}
#endif
incomingTrack(reciprocated);
PLOG_DEBUG << "Reciprocating media in local description, mid=\""
<< reciprocated.mid() << "\", removed=" << std::boolalpha
<< reciprocated.isRemoved();
description.addMedia(std::move(reciprocated));
},
},
remote->media(i));
}
if (description.type() == Description::Type::Offer) {
std::shared_lock lock(mTracksMutex);
for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it) {
if (auto track = it->lock()) {
if (description.hasMid(track->mid()))
continue;
auto media = track->description();
PLOG_DEBUG << "Adding media to local description, mid=\"" << media.mid()
<< "\", removed=" << std::boolalpha << media.isRemoved();
description.addMedia(std::move(media));
}
}
if (!description.hasApplication()) {
std::shared_lock lock(mDataChannelsMutex);
if (!mDataChannels.empty()) {
unsigned int m = 0;
while (description.hasMid(std::to_string(m)))
++m;
Description::Application app(std::to_string(m));
app.setSctpPort(localSctpPort);
app.setMaxMessageSize(localMaxMessageSize);
PLOG_DEBUG << "Adding application to local description, mid=\"" << app.mid()
<< "\"";
description.addMedia(std::move(app));
}
}
if (description.mediaCount() == 0)
throw std::runtime_error("No DataChannel or Track to negotiate");
}
description.setFingerprint(mCertificate.get()->fingerprint());
PLOG_VERBOSE << "Issuing local description: " << description;
if (description.mediaCount() == 0)
throw std::logic_error("Local description has no media line");
{
std::lock_guard lock(mLocalDescriptionMutex);
std::vector<Candidate> existingCandidates;
if (mLocalDescription) {
existingCandidates = mLocalDescription->extractCandidates();
mCurrentLocalDescription.emplace(std::move(*mLocalDescription));
}
mLocalDescription.emplace(description);
mLocalDescription->addCandidates(std::move(existingCandidates));
}
mProcessor.enqueue(&PeerConnection::trigger<Description>, shared_from_this(),
&localDescriptionCallback, std::move(description));
if (auto dtlsTransport = std::atomic_load(&mDtlsTransport);
dtlsTransport && dtlsTransport->state() == Transport::State::Connected)
mProcessor.enqueue(&PeerConnection::openTracks, shared_from_this());
}
void PeerConnection::processLocalCandidate(Candidate candidate) {
std::lock_guard lock(mLocalDescriptionMutex);
if (!mLocalDescription)
throw std::logic_error("Got a local candidate without local description");
if (config.iceTransportPolicy == TransportPolicy::Relay &&
candidate.type() != Candidate::Type::Relayed) {
PLOG_VERBOSE << "Not issuing local candidate because of transport policy: " << candidate;
return;
}
PLOG_VERBOSE << "Issuing local candidate: " << candidate;
candidate.resolve(Candidate::ResolveMode::Simple);
mLocalDescription->addCandidate(candidate);
mProcessor.enqueue(&PeerConnection::trigger<Candidate>, shared_from_this(),
&localCandidateCallback, std::move(candidate));
}
void PeerConnection::processRemoteDescription(Description description) {
{
std::lock_guard lock(mRemoteDescriptionMutex);
std::vector<Candidate> existingCandidates;
if (mRemoteDescription)
existingCandidates = mRemoteDescription->extractCandidates();
mRemoteDescription.emplace(description);
mRemoteDescription->addCandidates(std::move(existingCandidates));
}
auto iceTransport = initIceTransport();
if (!iceTransport)
return;
iceTransport->setRemoteDescription(std::move(description));
shiftDataChannels();
if (description.hasApplication()) {
auto dtlsTransport = std::atomic_load(&mDtlsTransport);
auto sctpTransport = std::atomic_load(&mSctpTransport);
if (!sctpTransport && dtlsTransport &&
dtlsTransport->state() == Transport::State::Connected)
initSctpTransport();
} else {
mProcessor.enqueue(&PeerConnection::remoteCloseDataChannels, shared_from_this());
}
}
void PeerConnection::processRemoteCandidate(Candidate candidate) {
auto iceTransport = std::atomic_load(&mIceTransport);
{
std::lock_guard lock(mRemoteDescriptionMutex);
if (!mRemoteDescription)
throw std::logic_error("Got a remote candidate without remote description");
if (!iceTransport)
throw std::logic_error("Got a remote candidate without ICE transport");
candidate.hintMid(mRemoteDescription->bundleMid());
if (mRemoteDescription->hasCandidate(candidate))
return;
candidate.resolve(Candidate::ResolveMode::Simple);
mRemoteDescription->addCandidate(candidate);
}
if (candidate.isResolved()) {
iceTransport->addRemoteCandidate(std::move(candidate));
} else {
if ((iceTransport = std::atomic_load(&mIceTransport))) {
weak_ptr<IceTransport> weakIceTransport{iceTransport};
std::thread t([weakIceTransport, candidate = std::move(candidate)]() mutable {
if (candidate.resolve(Candidate::ResolveMode::Lookup))
if (auto iceTransport = weakIceTransport.lock())
iceTransport->addRemoteCandidate(std::move(candidate));
});
t.detach();
}
}
}
string PeerConnection::localBundleMid() const {
std::lock_guard lock(mLocalDescriptionMutex);
return mLocalDescription ? mLocalDescription->bundleMid() : "0";
}
void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
auto dataChannel = weakDataChannel.lock();
if (dataChannel) {
dataChannel->resetOpenCallback(); mPendingDataChannels.push(std::move(dataChannel));
}
triggerPendingDataChannels();
}
void PeerConnection::triggerTrack(weak_ptr<Track> weakTrack) {
auto track = weakTrack.lock();
if (track) {
track->resetOpenCallback(); mPendingTracks.push(std::move(track));
}
triggerPendingTracks();
}
void PeerConnection::triggerPendingDataChannels() {
while (dataChannelCallback) {
auto next = mPendingDataChannels.tryPop();
if (!next)
break;
auto impl = std::move(*next);
dataChannelCallback(std::make_shared<rtc::DataChannel>(impl));
impl->triggerOpen();
}
}
void PeerConnection::triggerPendingTracks() {
while (trackCallback) {
auto next = mPendingTracks.tryPop();
if (!next)
break;
auto impl = std::move(*next);
trackCallback(std::make_shared<rtc::Track>(impl));
}
}
void PeerConnection::flushPendingDataChannels() {
mProcessor.enqueue(&PeerConnection::triggerPendingDataChannels, shared_from_this());
}
void PeerConnection::flushPendingTracks() {
mProcessor.enqueue(&PeerConnection::triggerPendingTracks, shared_from_this());
}
bool PeerConnection::changeState(State newState) {
State current;
do {
current = state.load();
if (current == State::Closed)
return false;
if (current == newState)
return false;
} while (!state.compare_exchange_weak(current, newState));
std::ostringstream s;
s << newState;
PLOG_INFO << "Changed state to " << s.str();
if (newState == State::Closed) {
auto callback = std::move(stateChangeCallback); callback(State::Closed); } else {
mProcessor.enqueue(&PeerConnection::trigger<State>, shared_from_this(),
&stateChangeCallback, newState);
}
return true;
}
bool PeerConnection::changeGatheringState(GatheringState newState) {
if (gatheringState.exchange(newState) == newState)
return false;
std::ostringstream s;
s << newState;
PLOG_INFO << "Changed gathering state to " << s.str();
mProcessor.enqueue(&PeerConnection::trigger<GatheringState>, shared_from_this(),
&gatheringStateChangeCallback, newState);
return true;
}
bool PeerConnection::changeSignalingState(SignalingState newState) {
if (signalingState.exchange(newState) == newState)
return false;
std::ostringstream s;
s << newState;
PLOG_INFO << "Changed signaling state to " << s.str();
mProcessor.enqueue(&PeerConnection::trigger<SignalingState>, shared_from_this(),
&signalingStateChangeCallback, newState);
return true;
}
void PeerConnection::resetCallbacks() {
dataChannelCallback = nullptr;
localDescriptionCallback = nullptr;
localCandidateCallback = nullptr;
stateChangeCallback = nullptr;
gatheringStateChangeCallback = nullptr;
signalingStateChangeCallback = nullptr;
trackCallback = nullptr;
}
}