#ifndef MS_DEP_LIBURING_HPP
#define MS_DEP_LIBURING_HPP
#include "FBS/liburing.h"
#include <uv.h>
#include <functional>
#include <liburing.h>
#include <queue>
class DepLibUring
{
public:
using onSendCallback = const std::function<void(bool sent)>;
struct UserData
{
uint8_t* store{ nullptr };
uint8_t frameLen[2] = { 0 };
struct iovec iov[2];
onSendCallback* cb{ nullptr };
size_t idx{ 0 };
};
static constexpr size_t QueueDepth{ 1024 * 4 };
static constexpr size_t SendBufferSize{ 1500 };
using SendBuffer = uint8_t[SendBufferSize];
static void ClassInit();
static void ClassDestroy();
static bool CheckRuntimeSupport();
static bool IsEnabled();
static flatbuffers::Offset<FBS::LibUring::Dump> FillBuffer(flatbuffers::FlatBufferBuilder& builder);
static void StartPollingCQEs();
static void StopPollingCQEs();
static uint8_t* GetSendBuffer();
static bool PrepareSend(
int sockfd, const uint8_t* data, size_t len, const struct sockaddr* addr, onSendCallback* cb);
static bool PrepareWrite(
int sockfd, const uint8_t* data1, size_t len1, const uint8_t* data2, size_t len2, onSendCallback* cb);
static void Submit();
static void SetActive();
static bool IsActive();
class LibUring;
static thread_local bool enabled;
static thread_local LibUring* liburing;
public:
class LibUring
{
public:
LibUring();
~LibUring();
flatbuffers::Offset<FBS::LibUring::Dump> FillBuffer(flatbuffers::FlatBufferBuilder& builder) const;
void StartPollingCQEs();
void StopPollingCQEs();
uint8_t* GetSendBuffer();
bool PrepareSend(
int sockfd, const uint8_t* data, size_t len, const struct sockaddr* addr, onSendCallback* cb);
bool PrepareWrite(
int sockfd,
const uint8_t* data1,
size_t len1,
const uint8_t* data2,
size_t len2,
onSendCallback* cb);
void Submit();
void SetActive()
{
this->active = true;
}
bool IsActive() const
{
return this->active;
}
bool IsZeroCopyEnabled() const
{
return this->zeroCopyEnabled;
}
io_uring* GetRing()
{
return std::addressof(this->ring);
}
int GetEventFd() const
{
return this->efd;
}
void ReleaseUserDataEntry(size_t idx)
{
this->availableUserDataEntries.push(idx);
}
private:
void SetInactive()
{
this->active = false;
}
UserData* GetUserData();
bool IsDataInSendBuffers(const uint8_t* data) const
{
return data >= this->sendBuffers[0] && data <= this->sendBuffers[DepLibUring::QueueDepth - 1];
}
private:
io_uring ring;
int efd;
uv_poll_t* uvHandle{ nullptr };
bool active{ false };
bool zeroCopyEnabled{ true };
UserData userDatas[QueueDepth]{};
std::queue<size_t> availableUserDataEntries;
SendBuffer sendBuffers[QueueDepth];
struct iovec iovecs[QueueDepth];
uint64_t sqeProcessCount{ 0u };
uint64_t sqeMissCount{ 0u };
uint64_t userDataMissCount{ 0u };
};
};
#endif