#ifndef CUBEB_RING_BUFFER_H
#define CUBEB_RING_BUFFER_H
#include "cubeb_utils.h"
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <memory>
#include <thread>
template <typename T> class ring_buffer_base {
public:
ring_buffer_base(int capacity)
: capacity_(capacity + 1)
{
assert(storage_capacity() < std::numeric_limits<int>::max() / 2 &&
"buffer too large for the type of index used.");
assert(capacity_ > 0);
data_.reset(new T[storage_capacity()]);
write_index_ = 0;
read_index_ = 0;
}
int enqueue_default(int count) { return enqueue(nullptr, count); }
int enqueue(T & element) { return enqueue(&element, 1); }
int enqueue(T * elements, int count)
{
#ifndef NDEBUG
assert_correct_thread(producer_id);
#endif
int wr_idx = write_index_.load(std::memory_order_relaxed);
int rd_idx = read_index_.load(std::memory_order_acquire);
if (full_internal(rd_idx, wr_idx)) {
return 0;
}
int to_write = std::min(available_write_internal(rd_idx, wr_idx), count);
int first_part = std::min(storage_capacity() - wr_idx, to_write);
int second_part = to_write - first_part;
if (elements) {
Copy(data_.get() + wr_idx, elements, first_part);
Copy(data_.get(), elements + first_part, second_part);
} else {
ConstructDefault(data_.get() + wr_idx, first_part);
ConstructDefault(data_.get(), second_part);
}
write_index_.store(increment_index(wr_idx, to_write),
std::memory_order_release);
return to_write;
}
int dequeue(T * elements, int count)
{
#ifndef NDEBUG
assert_correct_thread(consumer_id);
#endif
int rd_idx = read_index_.load(std::memory_order_relaxed);
int wr_idx = write_index_.load(std::memory_order_acquire);
if (empty_internal(rd_idx, wr_idx)) {
return 0;
}
int to_read = std::min(available_read_internal(rd_idx, wr_idx), count);
int first_part = std::min(storage_capacity() - rd_idx, to_read);
int second_part = to_read - first_part;
if (elements) {
Copy(elements, data_.get() + rd_idx, first_part);
Copy(elements + first_part, data_.get(), second_part);
}
read_index_.store(increment_index(rd_idx, to_read),
std::memory_order_release);
return to_read;
}
int available_read() const
{
#ifndef NDEBUG
assert_correct_thread(consumer_id);
#endif
return available_read_internal(
read_index_.load(std::memory_order_relaxed),
write_index_.load(std::memory_order_acquire));
}
int available_write() const
{
#ifndef NDEBUG
assert_correct_thread(producer_id);
#endif
return available_write_internal(
read_index_.load(std::memory_order_acquire),
write_index_.load(std::memory_order_relaxed));
}
int capacity() const { return storage_capacity() - 1; }
void reset_thread_ids()
{
#ifndef NDEBUG
consumer_id = producer_id = std::thread::id();
#endif
}
private:
bool empty_internal(int read_index, int write_index) const
{
return write_index == read_index;
}
bool full_internal(int read_index, int write_index) const
{
return (write_index + 1) % storage_capacity() == read_index;
}
int storage_capacity() const { return capacity_; }
int available_read_internal(int read_index, int write_index) const
{
if (write_index >= read_index) {
return write_index - read_index;
} else {
return write_index + storage_capacity() - read_index;
}
}
int available_write_internal(int read_index, int write_index) const
{
int rv = read_index - write_index - 1;
if (write_index >= read_index) {
rv += storage_capacity();
}
return rv;
}
int increment_index(int index, int increment) const
{
assert(increment >= 0);
return (index + increment) % storage_capacity();
}
#ifndef NDEBUG
static void assert_correct_thread(std::thread::id & id)
{
if (id == std::thread::id()) {
id = std::this_thread::get_id();
return;
}
assert(id == std::this_thread::get_id());
}
#endif
std::atomic<int> read_index_;
std::atomic<int> write_index_;
const int capacity_;
std::unique_ptr<T[]> data_;
#ifndef NDEBUG
mutable std::thread::id consumer_id;
mutable std::thread::id producer_id;
#endif
};
template <typename T> class audio_ring_buffer_base {
public:
audio_ring_buffer_base(int channel_count, int capacity_in_frames)
: channel_count(channel_count),
ring_buffer(frames_to_samples(capacity_in_frames))
{
assert(channel_count > 0);
}
int enqueue_default(int frame_count)
{
return samples_to_frames(
ring_buffer.enqueue(nullptr, frames_to_samples(frame_count)));
}
int enqueue(T * frames, int frame_count)
{
return samples_to_frames(
ring_buffer.enqueue(frames, frames_to_samples(frame_count)));
}
int dequeue(T * frames, int frame_count)
{
return samples_to_frames(
ring_buffer.dequeue(frames, frames_to_samples(frame_count)));
}
int available_read() const
{
return samples_to_frames(ring_buffer.available_read());
}
int available_write() const
{
return samples_to_frames(ring_buffer.available_write());
}
int capacity() const { return samples_to_frames(ring_buffer.capacity()); }
private:
int frames_to_samples(int frames) const { return frames * channel_count; }
int samples_to_frames(int samples) const { return samples / channel_count; }
int channel_count;
ring_buffer_base<T> ring_buffer;
};
template <typename T> using lock_free_queue = ring_buffer_base<T>;
template <typename T>
using lock_free_audio_ring_buffer = audio_ring_buffer_base<T>;
#endif