#define MS_CLASS "PortManager"
#include "RTC/PortManager.hpp"
#include "DepLibUV.hpp"
#include "Logger.hpp"
#include "MediaSoupErrors.hpp"
#include "Utils.hpp"
#include <cstring>
#include <tuple>
static inline void onCloseUdp(uv_handle_t* handle)
{
delete reinterpret_cast<uv_udp_t*>(handle);
}
static inline void onCloseTcp(uv_handle_t* handle)
{
delete reinterpret_cast<uv_tcp_t*>(handle);
}
inline static void onFakeConnection(uv_stream_t* , int )
{
}
namespace RTC
{
thread_local ankerl::unordered_dense::
map<PortManager::PortRangeKey, PortManager::PortRange, PortManager::PortRangeKeyHash>
PortManager::mapPortRanges;
uv_handle_t* PortManager::Bind(
Protocol protocol, std::string& ip, uint16_t port, RTC::Transport::SocketFlags& flags)
{
MS_TRACE();
Utils::IP::NormalizeIp(ip);
int err;
const int family = Utils::IP::GetFamily(ip);
struct sockaddr_storage bindAddr{};
uv_handle_t* uvHandle{ nullptr };
std::string protocolStr;
const uint8_t bitFlags = ConvertSocketFlags(flags, protocol, family);
switch (protocol)
{
case Protocol::UDP:
{
protocolStr.assign("udp");
break;
}
case Protocol::TCP:
{
protocolStr.assign("tcp");
break;
}
}
switch (family)
{
case AF_INET:
{
err = uv_ip4_addr(
ip.c_str(), 0, reinterpret_cast<struct sockaddr_in*>(std::addressof(bindAddr)));
if (err != 0)
{
MS_THROW_ERROR("uv_ip4_addr() failed: %s", uv_strerror(err));
}
break;
}
case AF_INET6:
{
err = uv_ip6_addr(
ip.c_str(), 0, reinterpret_cast<struct sockaddr_in6*>(std::addressof(bindAddr)));
if (err != 0)
{
MS_THROW_ERROR("uv_ip6_addr() failed: %s", uv_strerror(err));
}
break;
}
default:
{
MS_THROW_ERROR("unknown IP family");
}
}
switch (family)
{
case AF_INET:
{
(reinterpret_cast<struct sockaddr_in*>(std::addressof(bindAddr)))->sin_port = htons(port);
break;
}
case AF_INET6:
{
(reinterpret_cast<struct sockaddr_in6*>(std::addressof(bindAddr)))->sin6_port = htons(port);
break;
}
default:
{
MS_THROW_ERROR("unknown IP family");
}
}
switch (protocol)
{
case Protocol::UDP:
{
uvHandle = reinterpret_cast<uv_handle_t*>(new uv_udp_t());
err = uv_udp_init_ex(
DepLibUV::GetLoop(), reinterpret_cast<uv_udp_t*>(uvHandle), UV_UDP_RECVMMSG);
break;
}
case Protocol::TCP:
{
uvHandle = reinterpret_cast<uv_handle_t*>(new uv_tcp_t());
err = uv_tcp_init(DepLibUV::GetLoop(), reinterpret_cast<uv_tcp_t*>(uvHandle));
break;
}
}
if (err != 0)
{
switch (protocol)
{
case Protocol::UDP:
{
delete reinterpret_cast<uv_udp_t*>(uvHandle);
MS_THROW_ERROR("uv_udp_init_ex() failed: %s", uv_strerror(err));
break;
}
case Protocol::TCP:
{
delete reinterpret_cast<uv_tcp_t*>(uvHandle);
MS_THROW_ERROR("uv_tcp_init() failed: %s", uv_strerror(err));
break;
}
}
}
switch (protocol)
{
case Protocol::UDP:
{
err = uv_udp_bind(
reinterpret_cast<uv_udp_t*>(uvHandle),
reinterpret_cast<const struct sockaddr*>(std::addressof(bindAddr)),
bitFlags);
if (err != 0)
{
uv_close(uvHandle, static_cast<uv_close_cb>(onCloseUdp));
MS_THROW_ERROR(
"uv_udp_bind() failed [protocol:%s, ip:'%s', port:%" PRIu16 "]: %s",
protocolStr.c_str(),
ip.c_str(),
port,
uv_strerror(err));
}
break;
}
case Protocol::TCP:
{
err = uv_tcp_bind(
reinterpret_cast<uv_tcp_t*>(uvHandle),
reinterpret_cast<const struct sockaddr*>(std::addressof(bindAddr)),
bitFlags);
if (err != 0)
{
uv_close(uvHandle, static_cast<uv_close_cb>(onCloseTcp));
MS_THROW_ERROR(
"uv_tcp_bind() failed [protocol:%s, ip:'%s', port:%" PRIu16 "]: %s",
protocolStr.c_str(),
ip.c_str(),
port,
uv_strerror(err));
}
err = uv_listen(
reinterpret_cast<uv_stream_t*>(uvHandle),
256,
static_cast<uv_connection_cb>(onFakeConnection));
if (err != 0)
{
uv_close(uvHandle, static_cast<uv_close_cb>(onCloseTcp));
MS_THROW_ERROR(
"uv_listen() failed [protocol:%s, ip:'%s', port:%" PRIu16 "]: %s",
protocolStr.c_str(),
ip.c_str(),
port,
uv_strerror(err));
}
break;
}
}
MS_DEBUG_DEV(
"bind succeeded [protocol:%s, ip:'%s', port:%" PRIu16 "]", protocolStr.c_str(), ip.c_str(), port);
return uvHandle;
}
uv_handle_t* PortManager::Bind(
Protocol protocol,
std::string& ip,
uint16_t minPort,
uint16_t maxPort,
RTC::Transport::SocketFlags& flags,
PortRangeKey& key)
{
MS_TRACE();
if (maxPort < minPort)
{
MS_THROW_TYPE_ERROR("maxPort cannot be less than minPort");
}
Utils::IP::NormalizeIp(ip);
int err;
const int family = Utils::IP::GetFamily(ip);
struct sockaddr_storage bindAddr{};
std::string protocolStr;
switch (protocol)
{
case Protocol::UDP:
{
protocolStr.assign("udp");
break;
}
case Protocol::TCP:
{
protocolStr.assign("tcp");
break;
}
}
switch (family)
{
case AF_INET:
{
err = uv_ip4_addr(
ip.c_str(), 0, reinterpret_cast<struct sockaddr_in*>(std::addressof(bindAddr)));
if (err != 0)
{
MS_THROW_ERROR("uv_ip4_addr() failed: %s", uv_strerror(err));
}
break;
}
case AF_INET6:
{
err = uv_ip6_addr(
ip.c_str(), 0, reinterpret_cast<struct sockaddr_in6*>(std::addressof(bindAddr)));
if (err != 0)
{
MS_THROW_ERROR("uv_ip6_addr() failed: %s", uv_strerror(err));
}
break;
}
default:
{
MS_THROW_ERROR("unknown IP family");
}
}
key = PortRangeKey(protocol, bindAddr, minPort, maxPort);
auto& portRange = PortManager::GetOrCreatePortRange(key, minPort, maxPort);
const size_t numPorts = portRange.ports.size();
const size_t numAttempts = numPorts;
size_t attempt{ 0u };
size_t portIdx;
uint16_t port;
uv_handle_t* uvHandle{ nullptr };
const uint8_t bitFlags = ConvertSocketFlags(flags, protocol, family);
portIdx = Utils::Crypto::GetRandomUInt<size_t>(
static_cast<uint32_t>(0), static_cast<uint32_t>(numPorts - 1));
while (true)
{
++attempt;
if (attempt > numAttempts)
{
MS_THROW_ERROR(
"no more available ports [protocol:%s, ip:'%s', numAttempt:%zu]",
protocolStr.c_str(),
ip.c_str(),
numAttempts);
}
portIdx = (portIdx + 1) % numPorts;
port = static_cast<uint16_t>(portIdx + minPort);
MS_DEBUG_DEV(
"testing port [protocol:%s, ip:'%s', port:%" PRIu16 ", attempt:%zu/%zu]",
protocolStr.c_str(),
ip.c_str(),
port,
attempt,
numAttempts);
if (portRange.ports[portIdx])
{
MS_DEBUG_DEV(
"port in use, trying again [protocol:%s, ip:'%s', port:%" PRIu16 ", attempt:%zu/%zu]",
protocolStr.c_str(),
ip.c_str(),
port,
attempt,
numAttempts);
continue;
}
switch (family)
{
case AF_INET:
{
(reinterpret_cast<struct sockaddr_in*>(std::addressof(bindAddr)))->sin_port = htons(port);
break;
}
case AF_INET6:
{
(reinterpret_cast<struct sockaddr_in6*>(std::addressof(bindAddr)))->sin6_port = htons(port);
break;
}
default:
{
MS_THROW_ERROR("unknown IP family");
}
}
switch (protocol)
{
case Protocol::UDP:
{
uvHandle = reinterpret_cast<uv_handle_t*>(new uv_udp_t());
err = uv_udp_init_ex(
DepLibUV::GetLoop(), reinterpret_cast<uv_udp_t*>(uvHandle), UV_UDP_RECVMMSG);
break;
}
case Protocol::TCP:
{
uvHandle = reinterpret_cast<uv_handle_t*>(new uv_tcp_t());
err = uv_tcp_init(DepLibUV::GetLoop(), reinterpret_cast<uv_tcp_t*>(uvHandle));
break;
}
}
if (err != 0)
{
switch (protocol)
{
case Protocol::UDP:
{
delete reinterpret_cast<uv_udp_t*>(uvHandle);
MS_THROW_ERROR("uv_udp_init_ex() failed: %s", uv_strerror(err));
break;
}
case Protocol::TCP:
{
delete reinterpret_cast<uv_tcp_t*>(uvHandle);
MS_THROW_ERROR("uv_tcp_init() failed: %s", uv_strerror(err));
break;
}
}
}
switch (protocol)
{
case Protocol::UDP:
{
err = uv_udp_bind(
reinterpret_cast<uv_udp_t*>(uvHandle),
reinterpret_cast<const struct sockaddr*>(std::addressof(bindAddr)),
bitFlags);
if (err != 0)
{
MS_WARN_DEV(
"uv_udp_bind() failed [protocol:%s, ip:'%s', port:%" PRIu16 ", attempt:%zu/%zu]: %s",
protocolStr.c_str(),
ip.c_str(),
port,
attempt,
numAttempts,
uv_strerror(err));
}
break;
}
case Protocol::TCP:
{
err = uv_tcp_bind(
reinterpret_cast<uv_tcp_t*>(uvHandle),
reinterpret_cast<const struct sockaddr*>(std::addressof(bindAddr)),
bitFlags);
if (err != 0)
{
MS_WARN_DEV(
"uv_tcp_bind() failed [protocol:%s, ip:'%s', port:%" PRIu16 ", attempt:%zu/%zu]: %s",
protocolStr.c_str(),
ip.c_str(),
port,
attempt,
numAttempts,
uv_strerror(err));
}
if (err == 0)
{
err = uv_listen(
reinterpret_cast<uv_stream_t*>(uvHandle),
256,
static_cast<uv_connection_cb>(onFakeConnection));
MS_WARN_DEV(
"uv_listen() failed [protocol:%s, ip:'%s', port:%" PRIu16 ", attempt:%zu/%zu]: %s",
protocolStr.c_str(),
ip.c_str(),
port,
attempt,
numAttempts,
uv_strerror(err));
}
break;
}
}
if (err == 0)
{
break;
}
switch (protocol)
{
case Protocol::UDP:
{
uv_close(uvHandle, static_cast<uv_close_cb>(onCloseUdp));
break;
};
case Protocol::TCP:
{
uv_close(uvHandle, static_cast<uv_close_cb>(onCloseTcp));
break;
}
}
switch (err)
{
case UV_EMFILE:
{
MS_THROW_ERROR(
"port bind failed due to too many open files [protocol:%s, ip:'%s', port:%" PRIu16
", attempt:%zu/%zu]",
protocolStr.c_str(),
ip.c_str(),
port,
attempt,
numAttempts);
break;
}
case UV_EADDRNOTAVAIL:
{
MS_THROW_ERROR(
"port bind failed due to address not available [protocol:%s, ip:'%s', port:%" PRIu16
", attempt:%zu/%zu]",
protocolStr.c_str(),
ip.c_str(),
port,
attempt,
numAttempts);
break;
}
default:
{
}
}
}
portRange.ports[portIdx] = true;
portRange.numUsedPorts++;
MS_DEBUG_DEV(
"bind succeeded [protocol:%s, ip:'%s', port:%" PRIu16 ", attempt:%zu/%zu]",
protocolStr.c_str(),
ip.c_str(),
port,
attempt,
numAttempts);
return uvHandle;
}
void PortManager::Unbind(const PortRangeKey& key, uint16_t port)
{
MS_TRACE();
auto it = PortManager::mapPortRanges.find(key);
if (it == PortManager::mapPortRanges.end())
{
MS_ERROR(
"port range key [minPort:%" PRIu16 ", maxPort:%" PRIu16 "] doesn't exist in the map",
key.minPort,
key.maxPort);
return;
}
auto& portRange = it->second;
const auto portIdx = static_cast<size_t>(port - portRange.minPort);
MS_ASSERT(portRange.ports.at(portIdx) == true, "port %" PRIu16 " is not used", port);
MS_ASSERT(portRange.numUsedPorts > 0u, "number of used ports is 0");
portRange.ports[portIdx] = false;
portRange.numUsedPorts--;
if (portRange.numUsedPorts == 0u)
{
PortManager::mapPortRanges.erase(it);
}
}
void PortManager::Dump(int indentation) const
{
MS_DUMP_CLEAN(indentation, "<PortManager>");
for (const auto& kv : PortManager::mapPortRanges)
{
const auto& key = kv.first;
const auto& portRange = kv.second;
const char* protocolStr = (key.protocol == Protocol::UDP) ? "udp" : "tcp";
MS_DUMP_CLEAN(indentation + 1, "<PortRange>");
MS_DUMP_CLEAN(indentation + 1, " protocol: %s", protocolStr);
MS_DUMP_CLEAN(indentation + 1, " family: %d", key.bindAddr.ss_family);
MS_DUMP_CLEAN(indentation + 1, " minPort: %" PRIu16, portRange.minPort);
MS_DUMP_CLEAN(indentation + 1, " maxPort: %zu", portRange.minPort + portRange.ports.size() - 1);
MS_DUMP_CLEAN(indentation + 1, " numUsedPorts: %" PRIu16, portRange.numUsedPorts);
MS_DUMP_CLEAN(indentation + 1, "</PortRange>");
}
MS_DUMP_CLEAN(indentation, "</PortManager>");
}
PortManager::PortRange& PortManager::GetOrCreatePortRange(
const PortRangeKey& key, uint16_t minPort, uint16_t maxPort)
{
MS_TRACE();
const auto it = PortManager::mapPortRanges.find(key);
if (it != PortManager::mapPortRanges.end())
{
auto& portRange = it->second;
return portRange;
}
const uint16_t numPorts = maxPort - minPort + 1;
const auto pair = PortManager::mapPortRanges.emplace(
std::piecewise_construct, std::forward_as_tuple(key), std::forward_as_tuple(numPorts, minPort));
auto& portRange = pair.first->second;
return portRange;
}
uint8_t PortManager::ConvertSocketFlags(RTC::Transport::SocketFlags& flags, Protocol protocol, int family)
{
MS_TRACE();
uint8_t bitFlags{ 0b00000000 };
if (flags.ipv6Only && family == AF_INET6)
{
switch (protocol)
{
case Protocol::UDP:
{
bitFlags |= UV_UDP_IPV6ONLY;
break;
}
case Protocol::TCP:
{
bitFlags |= UV_TCP_IPV6ONLY;
break;
}
}
}
if (flags.udpReusePort && protocol == Protocol::UDP)
{
bitFlags |= UV_UDP_REUSEADDR;
}
return bitFlags;
}
PortManager::PortRangeKey::PortRangeKey(
Protocol protocol, const sockaddr_storage& bindAddr, uint16_t minPort, uint16_t maxPort)
: protocol(protocol), bindAddr(bindAddr), minPort(minPort), maxPort(maxPort)
{
MS_TRACE();
}
bool PortManager::PortRangeKey::operator==(const PortRangeKey& other) const noexcept
{
MS_TRACE();
if (this->protocol != other.protocol)
{
return false;
}
else if (this->minPort != other.minPort)
{
return false;
}
else if (this->maxPort != other.maxPort)
{
return false;
}
else if (this->bindAddr.ss_family != other.bindAddr.ss_family)
{
return false;
}
switch (this->bindAddr.ss_family)
{
case AF_INET:
{
const auto* a = reinterpret_cast<const sockaddr_in*>(std::addressof(this->bindAddr));
const auto* b = reinterpret_cast<const sockaddr_in*>(std::addressof(other.bindAddr));
return a->sin_addr.s_addr == b->sin_addr.s_addr;
}
case AF_INET6:
{
const auto* a = reinterpret_cast<const sockaddr_in6*>(std::addressof(this->bindAddr));
const auto* b = reinterpret_cast<const sockaddr_in6*>(std::addressof(other.bindAddr));
return std::memcmp(
std::addressof(a->sin6_addr), std::addressof(b->sin6_addr), sizeof(in6_addr)) == 0;
}
default:
{
return false;
}
}
}
size_t PortManager::PortRangeKeyHash::operator()(const PortRangeKey& key) const noexcept
{
MS_TRACE();
const auto protocolBits = static_cast<uint8_t>(key.protocol);
const auto familyBits = static_cast<uint16_t>(key.bindAddr.ss_family);
auto hashCombine = [](size_t& seed, size_t value)
{
seed ^= value + 0x9e3779b9 + (seed << 6) + (seed >> 2);
};
size_t seed = 0;
switch (key.bindAddr.ss_family)
{
case AF_INET:
{
const auto* in = reinterpret_cast<const sockaddr_in*>(std::addressof(key.bindAddr));
hashCombine(seed, ankerl::unordered_dense::hash<uint8_t>{}(protocolBits));
hashCombine(seed, ankerl::unordered_dense::hash<uint16_t>{}(familyBits));
hashCombine(seed, ankerl::unordered_dense::hash<uint32_t>{}(in->sin_addr.s_addr));
hashCombine(seed, ankerl::unordered_dense::hash<uint16_t>{}(key.minPort));
hashCombine(seed, ankerl::unordered_dense::hash<uint16_t>{}(key.maxPort));
break;
}
case AF_INET6:
{
const auto* in6 = reinterpret_cast<const sockaddr_in6*>(std::addressof(key.bindAddr));
const auto* addr = in6->sin6_addr.s6_addr;
uint64_t hi;
uint64_t lo;
std::memcpy(std::addressof(hi), addr, sizeof(uint64_t));
std::memcpy(std::addressof(lo), addr + sizeof(uint64_t), sizeof(uint64_t));
hashCombine(seed, ankerl::unordered_dense::hash<uint8_t>{}(protocolBits));
hashCombine(seed, ankerl::unordered_dense::hash<uint16_t>{}(familyBits));
hashCombine(seed, ankerl::unordered_dense::hash<uint64_t>{}(hi));
hashCombine(seed, ankerl::unordered_dense::hash<uint64_t>{}(lo));
hashCombine(seed, ankerl::unordered_dense::hash<uint16_t>{}(key.minPort));
hashCombine(seed, ankerl::unordered_dense::hash<uint16_t>{}(key.maxPort));
break;
}
default:
{
hashCombine(seed, ankerl::unordered_dense::hash<uint8_t>{}(protocolBits));
hashCombine(seed, ankerl::unordered_dense::hash<uint16_t>{}(familyBits));
hashCombine(seed, ankerl::unordered_dense::hash<uint16_t>{}(key.minPort));
hashCombine(seed, ankerl::unordered_dense::hash<uint16_t>{}(key.maxPort));
break;
}
}
return seed;
}
}