#include <rt/queue.h>
#include <rt/atomic.h>
#include <rt/trace.h>
#include <limits.h>
#include <string.h>
#define SLOT_DATA_PRESENT 0x08U
#define SLOT_SKIPPABLE 0x04U
#define SLOT_READABLE 0x02U
#define SLOT_READ_CLAIM 0x01U
#define SLOT_EMPTY 0x00U
#define SLOT_PUSH SLOT_SKIPPABLE
#define SLOT_SKIPPED (SLOT_SKIPPABLE | SLOT_READ_CLAIM)
#define SLOT_FULL (SLOT_DATA_PRESENT | SLOT_READABLE)
#define SLOT_POP (SLOT_DATA_PRESENT | SLOT_READ_CLAIM)
#define SLOT_PEEK (SLOT_DATA_PRESENT | SLOT_READABLE | SLOT_READ_CLAIM)
#define SLOT_STATE_MASK 0x0FU
#define SLOT_GEN_INCREMENT (1U << RT_QUEUE_STATE_BITS)
#define SLOT_GEN_MASK (UCHAR_MAX & ~SLOT_STATE_MASK)
#define qindex rt_queue_qindex
#define qgen rt_queue_qgen
#define qsgen rt_queue_qsgen
static inline bool slot_empty(unsigned char slot)
{
return (slot & SLOT_STATE_MASK) == 0;
}
static inline bool slot_data_present(unsigned char slot)
{
return (slot & SLOT_DATA_PRESENT) != 0;
}
static inline bool slot_skippable(unsigned char slot)
{
return (slot & SLOT_SKIPPABLE) != 0;
}
static inline bool slot_readable(unsigned char slot)
{
return (slot & SLOT_READABLE) != 0;
}
static inline unsigned char sgen(unsigned char slot)
{
return slot & SLOT_GEN_MASK;
}
static inline unsigned char sgen_next(unsigned char slot)
{
return (unsigned char)(sgen(slot) + SLOT_GEN_INCREMENT);
}
static size_t next(size_t q, size_t num_elems)
{
q += 1;
if (qindex(q) == num_elems)
{
return qgen(q + RT_QUEUE_QGEN_INCREMENT);
}
return q;
}
static void push(struct rt_queue *queue, const void *elem)
{
for (;;)
{
size_t enq = rt_atomic_load(&queue->enq, RT_ATOMIC_RELAXED);
size_t last_enq = enq;
rt_atomic_uchar *slot;
unsigned char s;
for (;;)
{
slot = &queue->slots[qindex(enq)];
s = rt_atomic_load(slot, RT_ATOMIC_RELAXED);
if (slot_empty(s) && (sgen(s) == qsgen(enq)))
{
break;
}
const size_t new_enq =
rt_atomic_load(&queue->enq, RT_ATOMIC_RELAXED);
if (new_enq != last_enq)
{
enq = new_enq;
last_enq = new_enq;
}
else
{
enq = next(enq, queue->num_elems);
}
}
const unsigned char push_s = sgen(s) | SLOT_PUSH;
if (rt_atomic_compare_exchange(slot, &s, push_s, RT_ATOMIC_RELAXED,
RT_ATOMIC_RELAXED))
{
rt_atomic_store(&queue->enq, next(enq, queue->num_elems),
RT_ATOMIC_RELAXED);
unsigned char *const p = queue->data;
memcpy(&p[queue->elem_size * qindex(enq)], elem, queue->elem_size);
s = push_s;
if (rt_atomic_compare_exchange(slot, &s, sgen(s) | SLOT_FULL,
RT_ATOMIC_RELEASE,
RT_ATOMIC_RELAXED))
{
rt_trace_queue_push(queue, enq);
break;
}
rt_trace_queue_push_skipped(queue, enq);
while (!rt_atomic_compare_exchange_weak(slot, &s,
sgen(s) | SLOT_EMPTY,
RT_ATOMIC_RELEASE,
RT_ATOMIC_RELAXED))
{
}
}
}
}
static bool skip(rt_atomic_uchar *slot, unsigned char *s)
{
return slot_skippable(*s) &&
rt_atomic_compare_exchange(slot, s, sgen_next(*s) | SLOT_SKIPPED,
RT_ATOMIC_RELAXED, RT_ATOMIC_RELAXED);
}
static void pop(struct rt_queue *queue, void *elem)
{
for (;;)
{
size_t deq = rt_atomic_load(&queue->deq, RT_ATOMIC_RELAXED);
size_t last_deq = deq;
rt_atomic_uchar *slot;
unsigned char s;
for (;;)
{
slot = &queue->slots[qindex(deq)];
s = rt_atomic_load(slot, RT_ATOMIC_RELAXED);
if (sgen(s) == qsgen(deq))
{
if (skip(slot, &s))
{
rt_trace_queue_pop_skip(queue, deq);
}
if (slot_data_present(s))
{
break;
}
}
const size_t new_deq =
rt_atomic_load(&queue->deq, RT_ATOMIC_RELAXED);
if (new_deq != last_deq)
{
deq = new_deq;
last_deq = new_deq;
}
else
{
deq = next(deq, queue->num_elems);
}
}
unsigned char pop_s = sgen(s) | SLOT_POP;
if (rt_atomic_compare_exchange(slot, &s, pop_s, RT_ATOMIC_ACQUIRE,
RT_ATOMIC_RELAXED))
{
const unsigned char *const p = queue->data;
memcpy(elem, &p[queue->elem_size * qindex(deq)], queue->elem_size);
if (rt_atomic_compare_exchange(slot, &pop_s,
sgen_next(s) | SLOT_EMPTY,
RT_ATOMIC_RELAXED,
RT_ATOMIC_RELAXED))
{
rt_atomic_store(&queue->deq, next(deq, queue->num_elems),
RT_ATOMIC_RELAXED);
rt_trace_queue_pop(queue, deq);
break;
}
}
}
}
static void peek(struct rt_queue *queue, void *elem)
{
for (;;)
{
size_t deq = rt_atomic_load(&queue->deq, RT_ATOMIC_RELAXED);
size_t last_deq = deq;
rt_atomic_uchar *slot;
unsigned char s;
for (;;)
{
slot = &queue->slots[qindex(deq)];
s = rt_atomic_load(slot, RT_ATOMIC_RELAXED);
if (sgen(s) == qsgen(deq))
{
if (skip(slot, &s))
{
rt_trace_queue_peek_skip(queue, deq);
}
if (slot_readable(s))
{
break;
}
}
const size_t new_deq =
rt_atomic_load(&queue->deq, RT_ATOMIC_RELAXED);
if (new_deq != last_deq)
{
deq = new_deq;
last_deq = new_deq;
}
else
{
deq = next(deq, queue->num_elems);
}
}
unsigned char peek_s = sgen(s) | SLOT_PEEK;
if (rt_atomic_compare_exchange(slot, &s, peek_s, RT_ATOMIC_ACQUIRE,
RT_ATOMIC_RELAXED))
{
const unsigned char *const p = queue->data;
memcpy(elem, &p[queue->elem_size * qindex(deq)], queue->elem_size);
if (rt_atomic_compare_exchange(slot, &peek_s, sgen(s) | SLOT_FULL,
RT_ATOMIC_RELAXED,
RT_ATOMIC_RELAXED))
{
rt_trace_queue_peek(queue, deq);
break;
}
}
}
}
void rt_queue_push(struct rt_queue *queue, const void *elem)
{
rt_sem_wait(&queue->push_sem);
push(queue, elem);
rt_sem_post_from_task(&queue->pop_sem);
}
void rt_queue_pop(struct rt_queue *queue, void *elem)
{
rt_sem_wait(&queue->pop_sem);
pop(queue, elem);
rt_sem_post_from_task(&queue->push_sem);
}
void rt_queue_peek(struct rt_queue *queue, void *elem)
{
rt_sem_wait(&queue->pop_sem);
peek(queue, elem);
rt_sem_post_from_task(&queue->pop_sem);
}
bool rt_queue_trypush(struct rt_queue *queue, const void *elem)
{
if (!rt_sem_trywait(&queue->push_sem))
{
return false;
}
push(queue, elem);
rt_sem_post(&queue->pop_sem);
return true;
}
bool rt_queue_trypop(struct rt_queue *queue, void *elem)
{
if (!rt_sem_trywait(&queue->pop_sem))
{
return false;
}
pop(queue, elem);
rt_sem_post(&queue->push_sem);
return true;
}
bool rt_queue_trypeek(struct rt_queue *queue, void *elem)
{
if (!rt_sem_trywait(&queue->pop_sem))
{
return false;
}
peek(queue, elem);
rt_sem_post(&queue->pop_sem);
return true;
}
bool rt_queue_timedpush(struct rt_queue *queue, const void *elem,
unsigned long ticks)
{
if (!rt_sem_timedwait(&queue->push_sem, ticks))
{
return false;
}
push(queue, elem);
rt_sem_post_from_task(&queue->pop_sem);
return true;
}
bool rt_queue_timedpop(struct rt_queue *queue, void *elem, unsigned long ticks)
{
if (!rt_sem_timedwait(&queue->pop_sem, ticks))
{
return false;
}
pop(queue, elem);
rt_sem_post_from_task(&queue->push_sem);
return true;
}
bool rt_queue_timedpeek(struct rt_queue *queue, void *elem, unsigned long ticks)
{
if (!rt_sem_timedwait(&queue->pop_sem, ticks))
{
return false;
}
peek(queue, elem);
rt_sem_post_from_task(&queue->pop_sem);
return true;
}