#define MS_CLASS "PortManager"
#include "RTC/PortManager.hpp"
#include "DepLibUV.hpp"
#include "Logger.hpp"
#include "MediaSoupErrors.hpp"
#include "Settings.hpp"
#include "Utils.hpp"
#include <tuple>
#include <utility>
static inline void onCloseUdp(uv_handle_t* handle)
{
delete reinterpret_cast<uv_udp_t*>(handle);
}
static inline void onCloseTcp(uv_handle_t* handle)
{
delete reinterpret_cast<uv_tcp_t*>(handle);
}
inline static void onFakeConnection(uv_stream_t* , int )
{
}
namespace RTC
{
thread_local absl::flat_hash_map<uint64_t, PortManager::PortRange> PortManager::mapPortRanges;
uv_handle_t* PortManager::Bind(
Protocol protocol, std::string& ip, uint16_t port, RTC::Transport::SocketFlags& flags)
{
MS_TRACE();
Utils::IP::NormalizeIp(ip);
int err;
const int family = Utils::IP::GetFamily(ip);
struct sockaddr_storage bindAddr
{
};
uv_handle_t* uvHandle{ nullptr };
std::string protocolStr;
const uint8_t bitFlags = ConvertSocketFlags(flags, protocol, family);
switch (protocol)
{
case Protocol::UDP:
{
protocolStr.assign("udp");
break;
}
case Protocol::TCP:
{
protocolStr.assign("tcp");
break;
}
}
switch (family)
{
case AF_INET:
{
err = uv_ip4_addr(ip.c_str(), 0, reinterpret_cast<struct sockaddr_in*>(&bindAddr));
if (err != 0)
{
MS_THROW_ERROR("uv_ip4_addr() failed: %s", uv_strerror(err));
}
break;
}
case AF_INET6:
{
err = uv_ip6_addr(ip.c_str(), 0, reinterpret_cast<struct sockaddr_in6*>(&bindAddr));
if (err != 0)
{
MS_THROW_ERROR("uv_ip6_addr() failed: %s", uv_strerror(err));
}
break;
}
default:
{
MS_THROW_ERROR("unknown IP family");
}
}
switch (family)
{
case AF_INET:
{
(reinterpret_cast<struct sockaddr_in*>(&bindAddr))->sin_port = htons(port);
break;
}
case AF_INET6:
{
(reinterpret_cast<struct sockaddr_in6*>(&bindAddr))->sin6_port = htons(port);
break;
}
}
switch (protocol)
{
case Protocol::UDP:
{
uvHandle = reinterpret_cast<uv_handle_t*>(new uv_udp_t());
err = uv_udp_init_ex(
DepLibUV::GetLoop(), reinterpret_cast<uv_udp_t*>(uvHandle), UV_UDP_RECVMMSG);
break;
}
case Protocol::TCP:
{
uvHandle = reinterpret_cast<uv_handle_t*>(new uv_tcp_t());
err = uv_tcp_init(DepLibUV::GetLoop(), reinterpret_cast<uv_tcp_t*>(uvHandle));
break;
}
}
if (err != 0)
{
switch (protocol)
{
case Protocol::UDP:
{
delete reinterpret_cast<uv_udp_t*>(uvHandle);
MS_THROW_ERROR("uv_udp_init_ex() failed: %s", uv_strerror(err));
break;
}
case Protocol::TCP:
{
delete reinterpret_cast<uv_tcp_t*>(uvHandle);
MS_THROW_ERROR("uv_tcp_init() failed: %s", uv_strerror(err));
break;
}
}
}
switch (protocol)
{
case Protocol::UDP:
{
err = uv_udp_bind(
reinterpret_cast<uv_udp_t*>(uvHandle),
reinterpret_cast<const struct sockaddr*>(&bindAddr),
bitFlags);
if (err != 0)
{
uv_close(reinterpret_cast<uv_handle_t*>(uvHandle), static_cast<uv_close_cb>(onCloseUdp));
MS_THROW_ERROR(
"uv_udp_bind() failed [protocol:%s, ip:'%s', port:%" PRIu16 "]: %s",
protocolStr.c_str(),
ip.c_str(),
port,
uv_strerror(err));
}
break;
}
case Protocol::TCP:
{
err = uv_tcp_bind(
reinterpret_cast<uv_tcp_t*>(uvHandle),
reinterpret_cast<const struct sockaddr*>(&bindAddr),
bitFlags);
if (err != 0)
{
uv_close(reinterpret_cast<uv_handle_t*>(uvHandle), static_cast<uv_close_cb>(onCloseTcp));
MS_THROW_ERROR(
"uv_tcp_bind() failed [protocol:%s, ip:'%s', port:%" PRIu16 "]: %s",
protocolStr.c_str(),
ip.c_str(),
port,
uv_strerror(err));
}
err = uv_listen(
reinterpret_cast<uv_stream_t*>(uvHandle),
256,
static_cast<uv_connection_cb>(onFakeConnection));
if (err != 0)
{
uv_close(reinterpret_cast<uv_handle_t*>(uvHandle), static_cast<uv_close_cb>(onCloseTcp));
MS_THROW_ERROR(
"uv_listen() failed [protocol:%s, ip:'%s', port:%" PRIu16 "]: %s",
protocolStr.c_str(),
ip.c_str(),
port,
uv_strerror(err));
}
break;
}
}
MS_DEBUG_DEV(
"bind succeeded [protocol:%s, ip:'%s', port:%" PRIu16 "]", protocolStr.c_str(), ip.c_str(), port);
return static_cast<uv_handle_t*>(uvHandle);
}
uv_handle_t* PortManager::Bind(
Protocol protocol,
std::string& ip,
uint16_t minPort,
uint16_t maxPort,
RTC::Transport::SocketFlags& flags,
uint64_t& hash)
{
MS_TRACE();
if (maxPort < minPort)
{
MS_THROW_TYPE_ERROR("maxPort cannot be less than minPort");
}
Utils::IP::NormalizeIp(ip);
int err;
const int family = Utils::IP::GetFamily(ip);
struct sockaddr_storage bindAddr
{
};
std::string protocolStr;
switch (protocol)
{
case Protocol::UDP:
{
protocolStr.assign("udp");
break;
}
case Protocol::TCP:
{
protocolStr.assign("tcp");
break;
}
}
switch (family)
{
case AF_INET:
{
err = uv_ip4_addr(ip.c_str(), 0, reinterpret_cast<struct sockaddr_in*>(&bindAddr));
if (err != 0)
{
MS_THROW_ERROR("uv_ip4_addr() failed: %s", uv_strerror(err));
}
break;
}
case AF_INET6:
{
err = uv_ip6_addr(ip.c_str(), 0, reinterpret_cast<struct sockaddr_in6*>(&bindAddr));
if (err != 0)
{
MS_THROW_ERROR("uv_ip6_addr() failed: %s", uv_strerror(err));
}
break;
}
default:
{
MS_THROW_ERROR("unknown IP family");
}
}
hash = GeneratePortRangeHash(protocol, std::addressof(bindAddr), minPort, maxPort);
auto& portRange = PortManager::GetOrCreatePortRange(hash, minPort, maxPort);
const size_t numPorts = portRange.ports.size();
const size_t numAttempts = numPorts;
size_t attempt{ 0u };
size_t portIdx;
uint16_t port;
uv_handle_t* uvHandle{ nullptr };
const uint8_t bitFlags = ConvertSocketFlags(flags, protocol, family);
portIdx = static_cast<size_t>(
Utils::Crypto::GetRandomUInt(static_cast<uint32_t>(0), static_cast<uint32_t>(numPorts - 1)));
while (true)
{
++attempt;
if (attempt > numAttempts)
{
MS_THROW_ERROR(
"no more available ports [protocol:%s, ip:'%s', numAttempt:%zu]",
protocolStr.c_str(),
ip.c_str(),
numAttempts);
}
portIdx = (portIdx + 1) % numPorts;
port = static_cast<uint16_t>(portIdx + minPort);
MS_DEBUG_DEV(
"testing port [protocol:%s, ip:'%s', port:%" PRIu16 ", attempt:%zu/%zu]",
protocolStr.c_str(),
ip.c_str(),
port,
attempt,
numAttempts);
if (portRange.ports[portIdx])
{
MS_DEBUG_DEV(
"port in use, trying again [protocol:%s, ip:'%s', port:%" PRIu16 ", attempt:%zu/%zu]",
protocolStr.c_str(),
ip.c_str(),
port,
attempt,
numAttempts);
continue;
}
switch (family)
{
case AF_INET:
{
(reinterpret_cast<struct sockaddr_in*>(&bindAddr))->sin_port = htons(port);
break;
}
case AF_INET6:
{
(reinterpret_cast<struct sockaddr_in6*>(&bindAddr))->sin6_port = htons(port);
break;
}
}
switch (protocol)
{
case Protocol::UDP:
{
uvHandle = reinterpret_cast<uv_handle_t*>(new uv_udp_t());
err = uv_udp_init_ex(
DepLibUV::GetLoop(), reinterpret_cast<uv_udp_t*>(uvHandle), UV_UDP_RECVMMSG);
break;
}
case Protocol::TCP:
{
uvHandle = reinterpret_cast<uv_handle_t*>(new uv_tcp_t());
err = uv_tcp_init(DepLibUV::GetLoop(), reinterpret_cast<uv_tcp_t*>(uvHandle));
break;
}
}
if (err != 0)
{
switch (protocol)
{
case Protocol::UDP:
{
delete reinterpret_cast<uv_udp_t*>(uvHandle);
MS_THROW_ERROR("uv_udp_init_ex() failed: %s", uv_strerror(err));
break;
}
case Protocol::TCP:
{
delete reinterpret_cast<uv_tcp_t*>(uvHandle);
MS_THROW_ERROR("uv_tcp_init() failed: %s", uv_strerror(err));
break;
}
}
}
switch (protocol)
{
case Protocol::UDP:
{
err = uv_udp_bind(
reinterpret_cast<uv_udp_t*>(uvHandle),
reinterpret_cast<const struct sockaddr*>(&bindAddr),
bitFlags);
if (err != 0)
{
MS_WARN_DEV(
"uv_udp_bind() failed [protocol:%s, ip:'%s', port:%" PRIu16 ", attempt:%zu/%zu]: %s",
protocolStr.c_str(),
ip.c_str(),
port,
attempt,
numAttempts,
uv_strerror(err));
}
break;
}
case Protocol::TCP:
{
err = uv_tcp_bind(
reinterpret_cast<uv_tcp_t*>(uvHandle),
reinterpret_cast<const struct sockaddr*>(&bindAddr),
bitFlags);
if (err != 0)
{
MS_WARN_DEV(
"uv_tcp_bind() failed [protocol:%s, ip:'%s', port:%" PRIu16 ", attempt:%zu/%zu]: %s",
protocolStr.c_str(),
ip.c_str(),
port,
attempt,
numAttempts,
uv_strerror(err));
}
if (err == 0)
{
err = uv_listen(
reinterpret_cast<uv_stream_t*>(uvHandle),
256,
static_cast<uv_connection_cb>(onFakeConnection));
MS_WARN_DEV(
"uv_listen() failed [protocol:%s, ip:'%s', port:%" PRIu16 ", attempt:%zu/%zu]: %s",
protocolStr.c_str(),
ip.c_str(),
port,
attempt,
numAttempts,
uv_strerror(err));
}
break;
}
}
if (err == 0)
{
break;
}
switch (protocol)
{
case Protocol::UDP:
{
uv_close(reinterpret_cast<uv_handle_t*>(uvHandle), static_cast<uv_close_cb>(onCloseUdp));
break;
};
case Protocol::TCP:
{
uv_close(reinterpret_cast<uv_handle_t*>(uvHandle), static_cast<uv_close_cb>(onCloseTcp));
break;
}
}
switch (err)
{
case UV_EMFILE:
{
MS_THROW_ERROR(
"port bind failed due to too many open files [protocol:%s, ip:'%s', port:%" PRIu16
", attempt:%zu/%zu]",
protocolStr.c_str(),
ip.c_str(),
port,
attempt,
numAttempts);
break;
}
case UV_EADDRNOTAVAIL:
{
MS_THROW_ERROR(
"port bind failed due to address not available [protocol:%s, ip:'%s', port:%" PRIu16
", attempt:%zu/%zu]",
protocolStr.c_str(),
ip.c_str(),
port,
attempt,
numAttempts);
break;
}
default:
{
}
}
}
portRange.ports[portIdx] = true;
portRange.numUsedPorts++;
MS_DEBUG_DEV(
"bind succeeded [protocol:%s, ip:'%s', port:%" PRIu16 ", attempt:%zu/%zu]",
protocolStr.c_str(),
ip.c_str(),
port,
attempt,
numAttempts);
return static_cast<uv_handle_t*>(uvHandle);
}
void PortManager::Unbind(uint64_t hash, uint16_t port)
{
MS_TRACE();
auto it = PortManager::mapPortRanges.find(hash);
if (it == PortManager::mapPortRanges.end())
{
MS_ERROR("hash %" PRIu64 " doesn't exist in the map", hash);
return;
}
auto& portRange = it->second;
const auto portIdx = static_cast<size_t>(port - portRange.minPort);
MS_ASSERT(portRange.ports.at(portIdx) == true, "port %" PRIu16 " is not used", port);
MS_ASSERT(portRange.numUsedPorts > 0u, "number of used ports is 0");
portRange.ports[portIdx] = false;
portRange.numUsedPorts--;
if (portRange.numUsedPorts == 0u)
{
PortManager::mapPortRanges.erase(it);
}
}
void PortManager::Dump()
{
MS_DUMP("<PortManager>");
for (auto& kv : PortManager::mapPortRanges)
{
auto hash = kv.first;
auto portRange = kv.second;
MS_DUMP(" <PortRange>");
MS_DUMP(" hash: %" PRIu64, hash);
MS_DUMP(" minPort: %" PRIu16, portRange.minPort);
MS_DUMP(" maxPort: %zu", portRange.minPort + portRange.ports.size() - 1);
MS_DUMP(" numUsedPorts: %" PRIu16, portRange.numUsedPorts);
MS_DUMP(" </PortRange>");
}
MS_DUMP("</PortManager>");
}
uint64_t PortManager::GeneratePortRangeHash(
Protocol protocol, sockaddr_storage* bindAddr, uint16_t minPort, uint16_t maxPort)
{
MS_TRACE();
uint64_t hash{ 0u };
switch (bindAddr->ss_family)
{
case AF_INET:
{
auto* bindAddrIn = reinterpret_cast<struct sockaddr_in*>(bindAddr);
const uint64_t address = bindAddrIn->sin_addr.s_addr;
hash |= static_cast<uint64_t>(minPort) << 48;
hash |= static_cast<uint64_t>(maxPort) << 32;
hash |= (address >> 2) << 2;
hash |= 0x0000;
break;
}
case AF_INET6:
{
auto* bindAddrIn6 = reinterpret_cast<struct sockaddr_in6*>(bindAddr);
auto* a = reinterpret_cast<uint32_t*>(std::addressof(bindAddrIn6->sin6_addr));
const auto address = a[0] ^ a[1] ^ a[2] ^ a[3];
hash |= static_cast<uint64_t>(minPort) << 48;
hash |= static_cast<uint64_t>(maxPort) << 32;
hash |= static_cast<uint64_t>(address) << 16;
hash |= (static_cast<uint64_t>(address) >> 2) << 2;
hash |= 0x0002;
break;
}
}
if (protocol == Protocol::UDP)
{
hash |= 0x0000;
}
else
{
hash |= 0x0001;
}
return hash;
}
PortManager::PortRange& PortManager::GetOrCreatePortRange(
uint64_t hash, uint16_t minPort, uint16_t maxPort)
{
MS_TRACE();
auto it = PortManager::mapPortRanges.find(hash);
if (it != PortManager::mapPortRanges.end())
{
auto& portRange = it->second;
return portRange;
}
const uint16_t numPorts = maxPort - minPort + 1;
auto pair = PortManager::mapPortRanges.emplace(
std::piecewise_construct, std::make_tuple(hash), std::make_tuple(numPorts, minPort));
auto& portRange = pair.first->second;
return portRange;
}
uint8_t PortManager::ConvertSocketFlags(RTC::Transport::SocketFlags& flags, Protocol protocol, int family)
{
MS_TRACE();
uint8_t bitFlags{ 0b00000000 };
if (flags.ipv6Only && family == AF_INET6)
{
switch (protocol)
{
case Protocol::UDP:
{
bitFlags |= UV_UDP_IPV6ONLY;
break;
}
case Protocol::TCP:
{
bitFlags |= UV_TCP_IPV6ONLY;
break;
}
}
}
if (flags.udpReusePort && protocol == Protocol::UDP)
{
bitFlags |= UV_UDP_REUSEADDR;
}
return bitFlags;
}
}