rt 0.19.0

A real-time operating system capable of full preemption
Documentation
#include <rt/queue.h>

#include <rt/atomic.h>
#include <rt/trace.h>

#include <limits.h>
#include <string.h>

// States with data present (FULL, PEEK, POP).
#define SLOT_DATA_PRESENT 0x08U

// States that can be skipped (PUSH, SKIPPED).
#define SLOT_SKIPPABLE 0x04U

// States that are readable (FULL, PEEK).
#define SLOT_READABLE 0x02U

// States claimed by a reader (SKIPPED, PEEK, POP).
#define SLOT_READ_CLAIM 0x01U

// An empty slot ready to be pushed.
#define SLOT_EMPTY 0x00U

// An empty slot that has been claimed by a pusher.
#define SLOT_PUSH SLOT_SKIPPABLE

// An empty slot that has been skipped by a popper or peeker.
#define SLOT_SKIPPED (SLOT_SKIPPABLE | SLOT_READ_CLAIM)

// A full slot ready to be popped.
#define SLOT_FULL (SLOT_DATA_PRESENT | SLOT_READABLE)

// A full slot that has been claimed by a popper.
#define SLOT_POP (SLOT_DATA_PRESENT | SLOT_READ_CLAIM)

// A full slot that has been claimed by a peeker.
#define SLOT_PEEK (SLOT_DATA_PRESENT | SLOT_READABLE | SLOT_READ_CLAIM)

#define SLOT_STATE_MASK 0x0FU

#define SLOT_GEN_INCREMENT (1U << RT_QUEUE_STATE_BITS)
#define SLOT_GEN_MASK (UCHAR_MAX & ~SLOT_STATE_MASK)

#define qindex rt_queue_qindex
#define qgen rt_queue_qgen
#define qsgen rt_queue_qsgen

static inline bool slot_empty(unsigned char slot)
{
    return (slot & SLOT_STATE_MASK) == 0;
}

static inline bool slot_data_present(unsigned char slot)
{
    return (slot & SLOT_DATA_PRESENT) != 0;
}

static inline bool slot_skippable(unsigned char slot)
{
    return (slot & SLOT_SKIPPABLE) != 0;
}

static inline bool slot_readable(unsigned char slot)
{
    return (slot & SLOT_READABLE) != 0;
}

static inline unsigned char sgen(unsigned char slot)
{
    return slot & SLOT_GEN_MASK;
}

static inline unsigned char sgen_next(unsigned char slot)
{
    return (unsigned char)(sgen(slot) + SLOT_GEN_INCREMENT);
}

static size_t next(size_t q, size_t num_elems)
{
    q += 1;
    if (qindex(q) == num_elems)
    {
        return qgen(q + RT_QUEUE_QGEN_INCREMENT);
    }
    return q;
}

static void push(struct rt_queue *queue, const void *elem)
{
    for (;;)
    {
        size_t enq = rt_atomic_load(&queue->enq, RT_ATOMIC_RELAXED);
        size_t last_enq = enq;
        rt_atomic_uchar *slot;
        unsigned char s;
        for (;;)
        {
            slot = &queue->slots[qindex(enq)];
            s = rt_atomic_load(slot, RT_ATOMIC_RELAXED);
            if (slot_empty(s) && (sgen(s) == qsgen(enq)))
            {
                break;
            }
            const size_t new_enq =
                rt_atomic_load(&queue->enq, RT_ATOMIC_RELAXED);
            if (new_enq != last_enq)
            {
                enq = new_enq;
                last_enq = new_enq;
            }
            else
            {
                enq = next(enq, queue->num_elems);
            }
        }

        const unsigned char push_s = sgen(s) | SLOT_PUSH;
        if (rt_atomic_compare_exchange(slot, &s, push_s, RT_ATOMIC_RELAXED,
                                       RT_ATOMIC_RELAXED))
        {
            rt_atomic_store(&queue->enq, next(enq, queue->num_elems),
                            RT_ATOMIC_RELAXED);

            unsigned char *const p = queue->data;
            memcpy(&p[queue->elem_size * qindex(enq)], elem, queue->elem_size);

            s = push_s;
            if (rt_atomic_compare_exchange(slot, &s, sgen(s) | SLOT_FULL,
                                           RT_ATOMIC_RELEASE,
                                           RT_ATOMIC_RELAXED))
            {
                rt_trace_queue_push(queue, enq);
                break;
            }

            rt_trace_queue_push_skipped(queue, enq);
            /* If this slot has been skipped by a reader, then restore it
             * back to empty and keep looking. */
            while (!rt_atomic_compare_exchange_weak(slot, &s,
                                                    sgen(s) | SLOT_EMPTY,
                                                    RT_ATOMIC_RELEASE,
                                                    RT_ATOMIC_RELAXED))
            {
            }
        }
    }
}

/* If a pop or peek encounters an in-progress push, attempt to skip it. The
 * push may have completed in the mean time, in which case we can attempt to
 * pop/peek from the slot. If not, there will be full slots to pop/peek after
 * this slot, because the semaphore allowing poppers to run is only posted
 * after each push is complete. */
/* clang-tidy thinks the compare_exchange second paramter is a pointer to const
 * for some reason, so it complains that s is not a pointer to const. */
// NOLINTNEXTLINE(readability-non-const-parameter)
static bool skip(rt_atomic_uchar *slot, unsigned char *s)
{
    return slot_skippable(*s) &&
           rt_atomic_compare_exchange(slot, s, sgen_next(*s) | SLOT_SKIPPED,
                                      RT_ATOMIC_RELAXED, RT_ATOMIC_RELAXED);
}

static void pop(struct rt_queue *queue, void *elem)
{
    for (;;)
    {
        size_t deq = rt_atomic_load(&queue->deq, RT_ATOMIC_RELAXED);
        size_t last_deq = deq;
        rt_atomic_uchar *slot;
        unsigned char s;
        for (;;)
        {
            slot = &queue->slots[qindex(deq)];
            s = rt_atomic_load(slot, RT_ATOMIC_RELAXED);
            if (sgen(s) == qsgen(deq))
            {
                if (skip(slot, &s))
                {
                    rt_trace_queue_pop_skip(queue, deq);
                }
                if (slot_data_present(s))
                {
                    break;
                }
            }
            const size_t new_deq =
                rt_atomic_load(&queue->deq, RT_ATOMIC_RELAXED);
            if (new_deq != last_deq)
            {
                deq = new_deq;
                last_deq = new_deq;
            }
            else
            {
                deq = next(deq, queue->num_elems);
            }
        }

        unsigned char pop_s = sgen(s) | SLOT_POP;
        if (rt_atomic_compare_exchange(slot, &s, pop_s, RT_ATOMIC_ACQUIRE,
                                       RT_ATOMIC_RELAXED))
        {
            const unsigned char *const p = queue->data;
            memcpy(elem, &p[queue->elem_size * qindex(deq)], queue->elem_size);

            if (rt_atomic_compare_exchange(slot, &pop_s,
                                           sgen_next(s) | SLOT_EMPTY,
                                           RT_ATOMIC_RELAXED,
                                           RT_ATOMIC_RELAXED))
            {
                /* If the slot successfully goes from POP back to EMPTY, then
                 * this task owns the received element; otherwise, another
                 * popper got to it first. */
                rt_atomic_store(&queue->deq, next(deq, queue->num_elems),
                                RT_ATOMIC_RELAXED);
                rt_trace_queue_pop(queue, deq);
                break;
            }
        }
    }
}

static void peek(struct rt_queue *queue, void *elem)
{
    /* Similar to pop, but don't update the dequeue index and set the slot back
     * to SLOT_FULL after reading it so another popper/peeker can read it. */
    for (;;)
    {
        size_t deq = rt_atomic_load(&queue->deq, RT_ATOMIC_RELAXED);
        size_t last_deq = deq;
        rt_atomic_uchar *slot;
        unsigned char s;
        for (;;)
        {
            slot = &queue->slots[qindex(deq)];
            s = rt_atomic_load(slot, RT_ATOMIC_RELAXED);
            if (sgen(s) == qsgen(deq))
            {
                if (skip(slot, &s))
                {
                    rt_trace_queue_peek_skip(queue, deq);
                }
                if (slot_readable(s))
                {
                    break;
                }
            }
            const size_t new_deq =
                rt_atomic_load(&queue->deq, RT_ATOMIC_RELAXED);
            if (new_deq != last_deq)
            {
                deq = new_deq;
                last_deq = new_deq;
            }
            else
            {
                deq = next(deq, queue->num_elems);
            }
        }

        unsigned char peek_s = sgen(s) | SLOT_PEEK;
        if (rt_atomic_compare_exchange(slot, &s, peek_s, RT_ATOMIC_ACQUIRE,
                                       RT_ATOMIC_RELAXED))
        {
            const unsigned char *const p = queue->data;
            memcpy(elem, &p[queue->elem_size * qindex(deq)], queue->elem_size);

            /* Restore the slot back to full if it hasn't been claimed by
             * a popper or already restored by another peeker. */
            if (rt_atomic_compare_exchange(slot, &peek_s, sgen(s) | SLOT_FULL,
                                           RT_ATOMIC_RELAXED,
                                           RT_ATOMIC_RELAXED))
            {
                rt_trace_queue_peek(queue, deq);
                break;
            }
        }
    }
}

void rt_queue_push(struct rt_queue *queue, const void *elem)
{
    rt_sem_wait(&queue->push_sem);
    push(queue, elem);
    rt_sem_post_from_task(&queue->pop_sem);
}

void rt_queue_pop(struct rt_queue *queue, void *elem)
{
    rt_sem_wait(&queue->pop_sem);
    pop(queue, elem);
    rt_sem_post_from_task(&queue->push_sem);
}

void rt_queue_peek(struct rt_queue *queue, void *elem)
{
    rt_sem_wait(&queue->pop_sem);
    peek(queue, elem);
    rt_sem_post_from_task(&queue->pop_sem);
}

bool rt_queue_trypush(struct rt_queue *queue, const void *elem)
{
    if (!rt_sem_trywait(&queue->push_sem))
    {
        return false;
    }
    push(queue, elem);
    rt_sem_post(&queue->pop_sem);
    return true;
}

bool rt_queue_trypop(struct rt_queue *queue, void *elem)
{
    if (!rt_sem_trywait(&queue->pop_sem))
    {
        return false;
    }
    pop(queue, elem);
    rt_sem_post(&queue->push_sem);
    return true;
}

bool rt_queue_trypeek(struct rt_queue *queue, void *elem)
{
    if (!rt_sem_trywait(&queue->pop_sem))
    {
        return false;
    }
    peek(queue, elem);
    rt_sem_post(&queue->pop_sem);
    return true;
}

bool rt_queue_timedpush(struct rt_queue *queue, const void *elem,
                        unsigned long ticks)
{
    if (!rt_sem_timedwait(&queue->push_sem, ticks))
    {
        return false;
    }
    push(queue, elem);
    rt_sem_post_from_task(&queue->pop_sem);
    return true;
}

bool rt_queue_timedpop(struct rt_queue *queue, void *elem, unsigned long ticks)
{
    if (!rt_sem_timedwait(&queue->pop_sem, ticks))
    {
        return false;
    }
    pop(queue, elem);
    rt_sem_post_from_task(&queue->push_sem);
    return true;
}

bool rt_queue_timedpeek(struct rt_queue *queue, void *elem, unsigned long ticks)
{
    if (!rt_sem_timedwait(&queue->pop_sem, ticks))
    {
        return false;
    }
    peek(queue, elem);
    rt_sem_post_from_task(&queue->pop_sem);
    return true;
}