#define MS_CLASS "DepLibUring"
#include "DepLibUring.hpp"
#include "Logger.hpp"
#include "MediaSoupErrors.hpp"
#include "Settings.hpp"
#include "Utils.hpp"
#include <stdexcept>
#include <sys/eventfd.h>
#include <sys/resource.h>
#include <sys/utsname.h>
thread_local bool DepLibUring::enabled{ false };
thread_local DepLibUring::LibUring* DepLibUring::liburing{ nullptr };
thread_local struct io_uring_cqe* Cqes[DepLibUring::QueueDepth];
inline static void onCloseFd(uv_handle_t* handle)
{
delete reinterpret_cast<uv_poll_t*>(handle);
}
inline static void onFdEvent(uv_poll_t* handle, int , int )
{
auto* liburing = static_cast<DepLibUring::LibUring*>(handle->data);
auto count = io_uring_peek_batch_cqe(liburing->GetRing(), Cqes, DepLibUring::QueueDepth);
eventfd_t v;
const int err = eventfd_read(liburing->GetEventFd(), std::addressof(v));
if (err < 0)
{
const int error = -err;
MS_ABORT("eventfd_read() failed: %s", std::strerror(error));
};
for (unsigned int i{ 0 }; i < count; ++i)
{
struct io_uring_cqe* cqe = Cqes[i];
auto* userData = static_cast<DepLibUring::UserData*>(io_uring_cqe_get_data(cqe));
if (liburing->IsZeroCopyEnabled())
{
if (cqe->flags & IORING_CQE_F_NOTIF)
{
if (userData->cb)
{
(*userData->cb)(true);
delete userData->cb;
userData->cb = nullptr;
}
liburing->ReleaseUserDataEntry(userData->idx);
io_uring_cqe_seen(liburing->GetRing(), cqe);
continue;
}
if (cqe->flags & IORING_CQE_F_MORE)
{
if (cqe->res < 0)
{
if (userData->cb)
{
(*userData->cb)(false);
delete userData->cb;
userData->cb = nullptr;
}
}
io_uring_cqe_seen(liburing->GetRing(), cqe);
continue;
}
}
if (cqe->res >= 0)
{
if (userData->cb)
{
(*userData->cb)(true);
delete userData->cb;
userData->cb = nullptr;
}
}
else
{
if (userData->cb)
{
(*userData->cb)(false);
delete userData->cb;
userData->cb = nullptr;
}
}
liburing->ReleaseUserDataEntry(userData->idx);
io_uring_cqe_seen(liburing->GetRing(), cqe);
}
}
void DepLibUring::ClassInit()
{
const auto mayor = io_uring_major_version();
const auto minor = io_uring_minor_version();
MS_DEBUG_TAG(info, "io_uring version: \"%i.%i\"", mayor, minor);
if (Settings::configuration.liburingDisabled)
{
MS_DEBUG_TAG(info, "io_uring disabled by user settings");
return;
}
if (DepLibUring::CheckRuntimeSupport())
{
try
{
DepLibUring::liburing = new LibUring();
MS_DEBUG_TAG(info, "io_uring enabled");
DepLibUring::enabled = true;
}
catch (const MediaSoupError& error)
{
MS_DEBUG_TAG(info, "io_uring initialization failed, io_uring not enabled");
}
}
else
{
MS_DEBUG_TAG(info, "io_uring not enabled");
}
}
void DepLibUring::ClassDestroy()
{
MS_TRACE();
delete DepLibUring::liburing;
}
bool DepLibUring::CheckRuntimeSupport()
{
struct utsname buffer{};
const auto err = uname(std::addressof(buffer));
if (err != 0)
{
MS_THROW_ERROR("uname() failed: %s", std::strerror(err));
}
MS_DEBUG_TAG(info, "kernel version: %s", buffer.version);
auto* kernelMayorCstr = buffer.release;
auto kernelMayorLong = strtol(kernelMayorCstr, &kernelMayorCstr, 10);
if (kernelMayorLong < 6)
{
MS_DEBUG_TAG(info, "kernel doesn't support io_uring");
return false;
}
return true;
}
bool DepLibUring::IsEnabled()
{
return DepLibUring::enabled;
}
flatbuffers::Offset<FBS::LibUring::Dump> DepLibUring::FillBuffer(flatbuffers::FlatBufferBuilder& builder)
{
MS_TRACE();
MS_ASSERT(DepLibUring::enabled, "io_uring not enabled");
return DepLibUring::liburing->FillBuffer(builder);
}
void DepLibUring::StartPollingCQEs()
{
MS_TRACE();
MS_ASSERT(DepLibUring::enabled, "io_uring not enabled");
DepLibUring::liburing->StartPollingCQEs();
}
void DepLibUring::StopPollingCQEs()
{
MS_TRACE();
MS_ASSERT(DepLibUring::enabled, "io_uring not enabled");
DepLibUring::liburing->StopPollingCQEs();
}
uint8_t* DepLibUring::GetSendBuffer()
{
MS_TRACE();
MS_ASSERT(DepLibUring::enabled, "io_uring not enabled");
return DepLibUring::liburing->GetSendBuffer();
}
bool DepLibUring::PrepareSend(
int sockfd, const uint8_t* data, size_t len, const struct sockaddr* addr, onSendCallback* cb)
{
MS_TRACE();
MS_ASSERT(DepLibUring::enabled, "io_uring not enabled");
return DepLibUring::liburing->PrepareSend(sockfd, data, len, addr, cb);
}
bool DepLibUring::PrepareWrite(
int sockfd, const uint8_t* data1, size_t len1, const uint8_t* data2, size_t len2, onSendCallback* cb)
{
MS_TRACE();
MS_ASSERT(DepLibUring::enabled, "io_uring not enabled");
return DepLibUring::liburing->PrepareWrite(sockfd, data1, len1, data2, len2, cb);
}
void DepLibUring::Submit()
{
MS_TRACE();
MS_ASSERT(DepLibUring::enabled, "io_uring not enabled");
DepLibUring::liburing->Submit();
}
void DepLibUring::SetActive()
{
MS_TRACE();
MS_ASSERT(DepLibUring::enabled, "io_uring not enabled");
DepLibUring::liburing->SetActive();
}
bool DepLibUring::IsActive()
{
MS_TRACE();
MS_ASSERT(DepLibUring::enabled, "io_uring not enabled");
return DepLibUring::liburing->IsActive();
}
DepLibUring::LibUring::LibUring()
{
MS_TRACE();
const unsigned int flags = IORING_SETUP_SINGLE_ISSUER;
auto err = io_uring_queue_init(DepLibUring::QueueDepth, std::addressof(this->ring), flags);
if (err < 0)
{
const int error = -err;
MS_THROW_ERROR("io_uring_queue_init() failed: %s", std::strerror(error));
}
this->efd = eventfd(0, 0);
if (this->efd < 0)
{
MS_THROW_ERROR("eventfd() failed: %s", std::strerror(-this->efd));
}
err = io_uring_register_eventfd(std::addressof(this->ring), this->efd);
if (err < 0)
{
const int error = -err;
MS_THROW_ERROR("io_uring_register_eventfd() failed: %s", std::strerror(error));
}
for (size_t i{ 0 }; i < DepLibUring::QueueDepth; ++i)
{
this->userDatas[i].store = this->sendBuffers[i];
this->availableUserDataEntries.push(i);
}
for (size_t i{ 0 }; i < DepLibUring::QueueDepth; ++i)
{
this->iovecs[i].iov_base = this->sendBuffers[i];
this->iovecs[i].iov_len = DepLibUring::SendBufferSize;
}
err = io_uring_register_buffers(std::addressof(this->ring), this->iovecs, DepLibUring::QueueDepth);
if (err < 0)
{
const int error = -err;
if (error == ENOMEM)
{
this->zeroCopyEnabled = false;
struct rlimit l = {};
if (getrlimit(RLIMIT_MEMLOCK, std::addressof(l)) == -1)
{
MS_WARN_TAG(info, "getrlimit() failed: %s", std::strerror(errno));
MS_WARN_TAG(
info,
"io_uring_register_buffers() failed due to low RLIMIT_MEMLOCK, disabling zero copy: %s",
std::strerror(error));
}
else
{
MS_WARN_TAG(
info,
"io_uring_register_buffers() failed due to low RLIMIT_MEMLOCK (soft:%llu, hard:%llu), disabling zero copy: %s",
static_cast<unsigned long long>(l.rlim_cur),
static_cast<unsigned long long>(l.rlim_max),
std::strerror(error));
}
}
else
{
MS_THROW_ERROR("io_uring_register_buffers() failed: %s", std::strerror(error));
}
}
}
DepLibUring::LibUring::~LibUring()
{
MS_TRACE();
const auto err = close(this->efd);
if (err != 0)
{
const int error = -err;
try
{
MS_ABORT("close() failed: %s", std::strerror(error));
}
catch (const std::exception& error) {
}
}
io_uring_queue_exit(std::addressof(this->ring));
}
flatbuffers::Offset<FBS::LibUring::Dump> DepLibUring::LibUring::FillBuffer(
flatbuffers::FlatBufferBuilder& builder) const
{
MS_TRACE();
return FBS::LibUring::CreateDump(
builder, this->sqeProcessCount, this->sqeMissCount, this->userDataMissCount);
}
void DepLibUring::LibUring::StartPollingCQEs()
{
MS_TRACE();
this->uvHandle = new uv_poll_t;
auto err = uv_poll_init(DepLibUV::GetLoop(), this->uvHandle, this->efd);
if (err != 0)
{
delete this->uvHandle;
MS_THROW_ERROR("uv_poll_init() failed: %s", uv_strerror(err));
}
this->uvHandle->data = this;
err = uv_poll_start(this->uvHandle, UV_READABLE, static_cast<uv_poll_cb>(onFdEvent));
if (err != 0)
{
MS_THROW_ERROR("uv_poll_start() failed: %s", uv_strerror(err));
}
}
void DepLibUring::LibUring::StopPollingCQEs()
{
MS_TRACE();
this->uvHandle->data = nullptr;
const auto err = uv_poll_stop(this->uvHandle);
if (err != 0)
{
MS_ABORT("uv_poll_stop() failed: %s", uv_strerror(err));
}
uv_close(reinterpret_cast<uv_handle_t*>(this->uvHandle), static_cast<uv_close_cb>(onCloseFd));
}
uint8_t* DepLibUring::LibUring::GetSendBuffer()
{
MS_TRACE();
if (this->availableUserDataEntries.empty())
{
MS_DEBUG_DEV("no user data entry available");
return nullptr;
}
auto idx = this->availableUserDataEntries.front();
return this->userDatas[idx].store;
}
bool DepLibUring::LibUring::PrepareSend(
int sockfd, const uint8_t* data, size_t len, const struct sockaddr* addr, onSendCallback* cb)
{
MS_TRACE();
auto* userData = this->GetUserData();
if (!userData)
{
MS_DEBUG_DEV("no user data entry available");
this->userDataMissCount++;
return false;
}
auto* sqe = io_uring_get_sqe(std::addressof(this->ring));
if (!sqe)
{
MS_DEBUG_DEV("no sqe available");
this->sqeMissCount++;
return false;
}
if (this->IsDataInSendBuffers(data))
{
MS_ASSERT(data == userData->store, "send buffer does not match userData store");
}
else
{
std::memcpy(userData->store, data, len);
}
userData->cb = cb;
io_uring_sqe_set_data(sqe, userData);
const socklen_t addrlen = Utils::IP::GetAddressLen(addr);
if (this->zeroCopyEnabled)
{
auto iovec = this->iovecs[userData->idx];
iovec.iov_len = len;
io_uring_prep_send_zc(sqe, sockfd, iovec.iov_base, iovec.iov_len, 0, 0);
io_uring_prep_send_set_addr(sqe, addr, addrlen);
sqe->ioprio |= IORING_RECVSEND_FIXED_BUF;
sqe->buf_index = userData->idx;
}
else
{
io_uring_prep_sendto(sqe, sockfd, userData->store, len, 0, addr, addrlen);
}
this->sqeProcessCount++;
return true;
}
bool DepLibUring::LibUring::PrepareWrite(
int sockfd, const uint8_t* data1, size_t len1, const uint8_t* data2, size_t len2, onSendCallback* cb)
{
MS_TRACE();
auto* userData = this->GetUserData();
if (!userData)
{
MS_DEBUG_DEV("no user data entry available");
this->userDataMissCount++;
return false;
}
auto* sqe = io_uring_get_sqe(std::addressof(this->ring));
if (!sqe)
{
MS_DEBUG_DEV("no sqe available");
this->sqeMissCount++;
return false;
}
if (this->IsDataInSendBuffers(data2))
{
MS_ASSERT(data2 == userData->store, "send buffer does not match userData store");
std::memcpy(userData->frameLen, data1, len1);
userData->iov[0].iov_base = userData->frameLen;
userData->iov[0].iov_len = len1;
userData->iov[1].iov_base = userData->store;
userData->iov[1].iov_len = len2;
}
else
{
std::memcpy(userData->store, data1, len1);
std::memcpy(userData->store + len1, data2, len2);
userData->iov[0].iov_base = userData->store;
userData->iov[0].iov_len = len1;
userData->iov[1].iov_base = userData->store + len1;
userData->iov[1].iov_len = len2;
}
userData->cb = cb;
io_uring_sqe_set_data(sqe, userData);
io_uring_prep_writev(sqe, sockfd, userData->iov, 2, 0);
this->sqeProcessCount++;
return true;
}
void DepLibUring::LibUring::Submit()
{
MS_TRACE();
SetInactive();
const auto err = io_uring_submit(std::addressof(this->ring));
if (err >= 0)
{
MS_DEBUG_DEV("%i submission queue entries submitted", err);
}
else
{
const int error = -err;
MS_ERROR("io_uring_submit() failed: %s", std::strerror(error));
}
}
DepLibUring::UserData* DepLibUring::LibUring::GetUserData()
{
MS_TRACE();
if (this->availableUserDataEntries.empty())
{
return nullptr;
}
auto idx = this->availableUserDataEntries.front();
this->availableUserDataEntries.pop();
auto* userData = std::addressof(this->userDatas[idx]);
userData->idx = idx;
return userData;
}