#ifdef LIBUS_USE_IO_URING
#include "libusockets.h"
#include "internal/internal.h"
#include "internal.h"
#include <stdlib.h>
char bufs[BUFFERS_COUNT][MAX_MESSAGE_LEN] = {0};
int group_id = 1337;
void add_provide_buf(struct io_uring *ring, __u16 bid, unsigned gid) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_provide_buffers(sqe, bufs[bid], MAX_MESSAGE_LEN, 1, gid, bid);
io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS);
io_uring_sqe_set_data64(sqe, 6);
}
void us_loop_run(struct us_loop_t *loop) {
while (1) {
io_uring_submit_and_wait(&loop->ring, 1);
struct io_uring_cqe *cqe;
unsigned head;
unsigned count = 0;
io_uring_for_each_cqe(&loop->ring, head, cqe) {
++count;
int pointer_tag = (int)((uintptr_t)io_uring_cqe_get_data(cqe) & (uintptr_t)0x7ull);
void *object = (void *)((uintptr_t)io_uring_cqe_get_data(cqe) & (uintptr_t)0xFFFFFFFFFFFFFFF8ull);
if (pointer_tag == 6) {
printf("uuuuuuuuh: %d\n", cqe->res);
exit(1);
}
int type = pointer_tag; if (cqe->res == -ENOBUFS) {
fprintf(stdout, "bufs in automatic buffer selection empty, this should not happen...\n");
fflush(stdout);
exit(1);
}else if (type == LISTEN_SOCKET_ACCEPT) {
struct us_listen_socket_t *listen_s = object;
struct us_socket_t *s = malloc(sizeof(struct us_socket_t) + listen_s->socket_ext_size);
s->context = listen_s->context;
s->dd = cqe->res;
int sock_conn_fd = cqe->res;
if (sock_conn_fd >= 0) {
struct io_uring_sqe *sqe = io_uring_get_sqe(&loop->ring);
io_uring_prep_recv_multishot(sqe, s->dd, NULL, 0, 0);
io_uring_sqe_set_flags(sqe, IOSQE_BUFFER_SELECT | IOSQE_FIXED_FILE);
sqe->buf_group = group_id;
io_uring_sqe_set_data(sqe, (char *)s + SOCKET_READ);
}
s->context->on_open(s, 1, 0, 0);
} else if (type == SOCKET_READ) {
int bytes_read = cqe->res;
int bid = cqe->flags >> 16;
if (cqe->res <= 0) {
add_provide_buf(&loop->ring, bid, group_id);
struct io_uring_sqe *sqe = io_uring_get_sqe(&loop->ring);
struct us_socket_t *s = object;
s->context->on_close(s, 0, 0);
io_uring_prep_close_direct(sqe, s->dd);
io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS);
io_uring_sqe_set_data64(sqe, 5);
} else {
struct us_socket_t *s = object;
s->context->on_data(s, bufs[bid], bytes_read);
struct io_uring_sqe *sqe = io_uring_get_sqe(&loop->ring);
io_uring_prep_provide_buffers(sqe, bufs[bid], MAX_MESSAGE_LEN, 1, group_id, bid);
io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS);
io_uring_sqe_set_data64(sqe, 6);
}
} else if (type == SOCKET_WRITE) {
} else if (type == SOCKET_CONNECT) {
printf("we are connectred: %d\n", cqe->res);
struct us_socket_t *s = object;
struct io_uring_sqe *sqe = io_uring_get_sqe(&loop->ring);
io_uring_prep_recv_multishot(sqe, s->dd, NULL, 0, 0);
io_uring_sqe_set_flags(sqe, IOSQE_BUFFER_SELECT | IOSQE_FIXED_FILE);
sqe->buf_group = group_id;
io_uring_sqe_set_data(sqe, (char *)s + SOCKET_READ);
s->context->on_open(s, 1, 0, 0);
}
}
io_uring_cq_advance(&loop->ring, count);
}
}
struct us_timer_t *us_create_timer(struct us_loop_t *loop, int fallthrough, unsigned int ext_size) {
struct us_timer_t *timer = malloc(ext_size + sizeof(struct us_timer_t));
timer->loop = loop;
return timer;
}
void us_timer_set(struct us_timer_t *t, void (*cb)(struct us_timer_t *t), int ms, int repeat_ms) {
}
void *us_timer_ext(struct us_timer_t *timer) {
return timer + 1;
}
void us_timer_close(struct us_timer_t *timer) {
}
struct us_loop_t *us_timer_loop(struct us_timer_t *t) {
return t->loop;
}
void us_loop_free(struct us_loop_t *loop) {
}
struct us_loop_t *us_create_loop(void *hint, void (*wakeup_cb)(struct us_loop_t *loop), void (*pre_cb)(struct us_loop_t *loop), void (*post_cb)(struct us_loop_t *loop), unsigned int ext_size) {
struct us_loop_t *loop = malloc(ext_size + sizeof(struct us_loop_t));
struct io_uring_params params;
memset(¶ms, 0, sizeof(params));
params.flags = IORING_SETUP_COOP_TASKRUN | IORING_SETUP_SINGLE_ISSUER;
if (io_uring_queue_init_params(2048, &loop->ring, ¶ms) < 0) {
perror("io_uring_init_failed...\n");
exit(1);
}
if (io_uring_register_files_sparse(&loop->ring, 1024)) {
exit(1);
}
if (!(params.features & IORING_FEAT_FAST_POLL)) {
printf("IORING_FEAT_FAST_POLL not available in the kernel, quiting...\n");
exit(0);
}
struct io_uring_probe *probe;
probe = io_uring_get_probe_ring(&loop->ring);
if (!probe || !io_uring_opcode_supported(probe, IORING_OP_PROVIDE_BUFFERS)) {
printf("Buffer select not supported, skipping...\n");
exit(0);
}
io_uring_free_probe(probe);
struct io_uring_sqe *sqe;
struct io_uring_cqe *cqe;
sqe = io_uring_get_sqe(&loop->ring);
io_uring_prep_provide_buffers(sqe, bufs, MAX_MESSAGE_LEN, BUFFERS_COUNT, group_id, 0);
io_uring_submit(&loop->ring);
io_uring_wait_cqe(&loop->ring, &cqe);
if (cqe->res < 0) {
printf("cqe->res = %d\n", cqe->res);
exit(1);
}
io_uring_cqe_seen(&loop->ring, cqe);
return loop;
}
void us_internal_loop_data_free(struct us_loop_t *loop) {
}
void us_wakeup_loop(struct us_loop_t *loop) {
}
void us_internal_loop_link(struct us_loop_t *loop, struct us_socket_context_t *context) {
}
void us_internal_loop_unlink(struct us_loop_t *loop, struct us_socket_context_t *context) {
}
void us_internal_timer_sweep(struct us_loop_t *loop) {
}
static const int MAX_LOW_PRIO_SOCKETS_PER_LOOP_ITERATION = 5;
void us_internal_handle_low_priority_sockets(struct us_loop_t *loop) {
}
void us_internal_free_closed_sockets(struct us_loop_t *loop) {
}
long long us_loop_iteration_number(struct us_loop_t *loop) {
return 0;
}
void us_internal_loop_pre(struct us_loop_t *loop) {
}
void us_internal_loop_post(struct us_loop_t *loop) {
}
void us_loop_integrate(struct us_loop_t *loop) {
}
void *us_loop_ext(struct us_loop_t *loop) {
return loop + 1;
}
#endif