use std::fmt;
use crate::{
command::control_protocol_events::AeronCommand,
concurrent::atomic_buffer::AtomicBuffer,
utils::{
bit_utils::{align, is_power_of_two},
misc::CACHE_LINE_LENGTH,
types::Index,
},
};
pub const TAIL_POSITION_OFFSET: Index = CACHE_LINE_LENGTH * 2;
pub const HEAD_CACHE_POSITION_OFFSET: Index = CACHE_LINE_LENGTH * 4;
pub const HEAD_POSITION_OFFSET: Index = CACHE_LINE_LENGTH * 6;
pub const CORRELATION_COUNTER_OFFSET: Index = CACHE_LINE_LENGTH * 8;
pub const CONSUMER_HEARTBEAT_OFFSET: Index = CACHE_LINE_LENGTH * 10;
pub const TRAILER_LENGTH: Index = CACHE_LINE_LENGTH * 12;
fn check_capacity(capacity: Index) -> Result<(), RingBufferError> {
if is_power_of_two(capacity) {
Ok(())
} else {
Err(RingBufferError::CapacityIsNotTwoPower { capacity })
}
}
pub mod record_descriptor {
use super::RingBufferError;
use crate::command::control_protocol_events::AeronCommand;
use crate::utils::types::{Index, I32_SIZE};
pub const HEADER_LENGTH: Index = I32_SIZE * 2;
pub const ALIGNMENT: Index = HEADER_LENGTH;
#[inline]
pub fn length_offset(record_offset: Index) -> Index {
record_offset
}
#[inline]
pub fn type_offset(record_offset: Index) -> Index {
record_offset + I32_SIZE
}
#[inline]
pub fn encoded_msg_offset(record_offset: Index) -> Index {
record_offset + HEADER_LENGTH
}
#[inline]
pub fn make_header(len: Index, command: AeronCommand) -> i64 {
(((command as i64) & 0xFFFF_FFFF) << 32) | ((len as i64) & 0xFFFF_FFFF)
}
#[inline]
pub fn record_length(header: i64) -> Index {
(header & 0xFFFF_FFFF) as Index
}
#[inline]
pub fn message_type_id(header: i64) -> i32 {
(header >> 32) as i32
}
#[inline]
pub fn message_type(header: i64) -> AeronCommand {
let type_id = message_type_id(header);
AeronCommand::from_command_id(type_id)
}
#[inline]
pub fn check_msg_type_id(msg_type_id: i32) -> Result<(), RingBufferError> {
if msg_type_id < 1 {
return Err(RingBufferError::NonPositiveMessageTypeId(msg_type_id));
}
Ok(())
}
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum RingBufferError {
InsufficientCapacity,
MessageTooLong { msg: Index, max: Index },
NonPositiveMessageTypeId(i32),
CapacityIsNotTwoPower { capacity: Index },
}
impl fmt::Display for RingBufferError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let msg = match self {
RingBufferError::InsufficientCapacity => "Insufficient capacity".into(),
RingBufferError::MessageTooLong { msg, max } => {
format!("Encoded message exceeds maxMsgLength of {}: length={}", max, msg)
}
RingBufferError::NonPositiveMessageTypeId(type_id) => {
format!("Message type id must be greater than zero, msgTypeId={}", type_id)
}
RingBufferError::CapacityIsNotTwoPower { capacity } => format!(
"Capacity must be a positive power of 2 + TRAILER_LENGTH: capacity={}",
capacity
),
};
write!(f, "{}", msg)
}
}
#[derive(Debug)]
pub struct ManyToOneRingBuffer {
buffer: AtomicBuffer,
capacity: Index,
max_msg_len: Index,
tail_position: Index,
head_cache_position: Index,
head_position: Index,
correlation_id_counter: Index,
consumer_heartbeat: Index,
}
impl ManyToOneRingBuffer {
pub fn new(buffer: AtomicBuffer) -> Result<Self, RingBufferError> {
let capacity = buffer.capacity() - TRAILER_LENGTH;
check_capacity(capacity)?;
let max_msg_len = capacity / 8;
let tail_position = capacity + TAIL_POSITION_OFFSET;
let head_cache_position = capacity + HEAD_CACHE_POSITION_OFFSET;
let head_position = capacity + HEAD_POSITION_OFFSET;
let correlation_id_counter = capacity + CORRELATION_COUNTER_OFFSET;
let consumer_heartbeat = capacity + CONSUMER_HEARTBEAT_OFFSET;
Ok(Self {
buffer,
capacity,
max_msg_len,
tail_position,
head_cache_position,
head_position,
correlation_id_counter,
consumer_heartbeat,
})
}
pub fn write(
&self,
cmd: AeronCommand,
src_buffer: AtomicBuffer,
src_index: Index,
length: Index,
) -> Result<(), RingBufferError> {
record_descriptor::check_msg_type_id(cmd as i32)?;
self.check_msg_length(length)?;
let record_len = length + record_descriptor::HEADER_LENGTH;
let required_capacity = align(record_len, record_descriptor::ALIGNMENT);
let record_index = self.claim(required_capacity)?;
self.buffer
.put_ordered::<i64>(record_index, record_descriptor::make_header(-record_len, cmd));
self.buffer.copy_from(
record_descriptor::encoded_msg_offset(record_index),
&src_buffer,
src_index,
length,
);
self.buffer
.put_ordered::<i32>(record_descriptor::length_offset(record_index), record_len);
Ok(())
}
pub fn read<F: FnMut(AeronCommand, AtomicBuffer)>(&self, mut handler: F, msg_count_limit: i32) -> i32 {
let head = self.buffer.get_volatile::<i64>(self.head_position);
let head_index = (head & (self.capacity - 1) as i64) as i32;
let contiguous_block_len = self.capacity - head_index;
let mut messages_read = 0;
let mut bytes_read = 0;
while bytes_read < contiguous_block_len && messages_read < msg_count_limit {
let record_index = head_index + bytes_read;
let header = self.buffer.get_volatile::<i64>(record_index);
let record_len = record_descriptor::record_length(header);
if record_len <= 0 {
break;
}
bytes_read += align(record_len, record_descriptor::ALIGNMENT);
let msg_type = record_descriptor::message_type(header);
if let AeronCommand::Padding = msg_type {
continue;
}
messages_read += 1;
let view = self.buffer.view(
record_descriptor::encoded_msg_offset(record_index),
record_len - record_descriptor::HEADER_LENGTH,
);
handler(msg_type, view)
}
if bytes_read != 0 {
self.buffer.set_memory(head_index, bytes_read, 0);
self.buffer.put_ordered(self.head_position, head + bytes_read as i64);
}
messages_read
}
#[inline]
pub fn read_all<F: FnMut(AeronCommand, AtomicBuffer)>(&self, handler: F) -> i32 {
self.read(handler, std::i32::MAX)
}
#[inline]
pub fn capacity(&self) -> Index {
self.capacity
}
#[inline]
pub fn max_msg_len(&self) -> Index {
self.max_msg_len
}
#[inline]
pub fn next_correlation_id(&self) -> i64 {
self.buffer.get_and_add_i64(self.correlation_id_counter, 1)
}
#[inline]
pub fn set_consumer_heartbeat_time(&self, time: i64) {
self.buffer.put_ordered(self.consumer_heartbeat, time)
}
#[inline]
pub fn consumer_heartbeat_time(&self) -> i64 {
self.buffer.get_volatile::<i64>(self.consumer_heartbeat)
}
#[inline]
pub fn size(&self) -> Index {
let mut tail: i64;
let mut head_after = self.buffer.get_volatile::<i64>(self.head_position);
loop {
let head_before = head_after;
tail = self.buffer.get_volatile::<i64>(self.tail_position);
head_after = self.buffer.get_volatile::<i64>(self.head_position);
if head_after == head_before {
return (tail - head_after) as Index;
}
}
}
#[allow(clippy::comparison_chain)]
pub fn unblock(&self) -> bool {
let head_position = self.buffer.get_volatile::<i64>(self.head_position);
let tail_position = self.buffer.get_volatile::<i64>(self.tail_position);
if tail_position == head_position {
return false;
}
let mask = (self.capacity - 1) as i64;
let consumer_index = (head_position & mask) as Index;
let producer_index = (tail_position & mask) as Index;
let mut unblocked = false;
let length: Index = self.buffer.get_volatile(consumer_index);
if length < 0 {
self.buffer
.put_ordered(consumer_index, record_descriptor::make_header(-length, AeronCommand::Padding));
unblocked = true;
} else if length == 0 {
let limit = if producer_index > consumer_index {
producer_index
} else {
self.buffer.capacity()
};
let mut i = consumer_index + record_descriptor::ALIGNMENT;
loop {
let length: i32 = self.buffer.get_volatile(i);
if length != 0 {
if self.scan_back_to_confirm_still_zeroed(i, consumer_index) {
self.buffer.put_ordered(
consumer_index,
record_descriptor::make_header(i - consumer_index, AeronCommand::Padding),
);
unblocked = true;
}
break;
}
i += record_descriptor::ALIGNMENT;
if i >= limit {
break;
}
}
}
unblocked
}
fn claim(&self, required_capacity: Index) -> Result<Index, RingBufferError> {
let mask = (self.capacity - 1) as i64;
let mut head = self.buffer.get_volatile::<i64>(self.head_cache_position);
let (padding, tail_index) = loop {
let tail = self.buffer.get_volatile::<i64>(self.tail_position);
let available_capacity = self.capacity - (tail - head) as Index;
if required_capacity > available_capacity {
head = self.buffer.get_volatile::<i64>(self.head_position);
if required_capacity > (self.capacity - (tail - head) as Index) {
return Err(RingBufferError::InsufficientCapacity);
}
self.buffer.put_ordered::<i64>(self.head_cache_position, head);
}
let mut padding = 0;
let tail_index = (tail & mask) as Index;
let len_to_buffer_end = self.capacity - tail_index;
if required_capacity > len_to_buffer_end {
let mut head_index = (head & mask) as Index;
if required_capacity > head_index {
head = self.buffer.get_volatile::<i64>(self.head_position);
head_index = (head & mask) as Index;
if required_capacity > head_index {
return Err(RingBufferError::InsufficientCapacity);
}
self.buffer.put_ordered::<i64>(self.head_cache_position, head);
}
padding = len_to_buffer_end;
}
let t2 = tail + required_capacity as i64 + padding as i64;
if self.buffer.compare_and_set_i64(self.tail_position, tail, t2) {
break (padding, tail_index);
}
};
if padding != 0 {
self.buffer
.put_ordered::<i64>(tail_index, record_descriptor::make_header(padding, AeronCommand::Padding));
Ok(0)
} else {
Ok(tail_index)
}
}
fn check_msg_length(&self, length: Index) -> Result<(), RingBufferError> {
if length > self.max_msg_len {
return Err(RingBufferError::MessageTooLong {
msg: length,
max: self.max_msg_len,
});
}
Ok(())
}
fn scan_back_to_confirm_still_zeroed(&self, from: Index, limit: Index) -> bool {
let mut i = from - record_descriptor::ALIGNMENT;
while i >= limit {
if self.buffer.get_volatile::<i32>(i) != 0 {
return false;
}
i -= record_descriptor::ALIGNMENT;
}
true
}
}
unsafe impl Send for ManyToOneRingBuffer {}
unsafe impl Sync for ManyToOneRingBuffer {}
#[cfg(test)]
mod tests {
use lazy_static::lazy_static;
use super::*;
use crate::command::control_protocol_events::AeronCommand;
use crate::concurrent::ring_buffer::record_descriptor::{make_header, message_type, message_type_id, record_length};
use crate::{
concurrent::atomic_buffer::{AlignedBuffer, AtomicBuffer},
utils::{bit_utils::align, types::Index},
};
use std::sync::atomic::{AtomicI32, AtomicI64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
const CAPACITY: Index = 1024;
const BUFFER_SZ: Index = CAPACITY + TRAILER_LENGTH;
const ODD_BUFFER_SZ: Index = BUFFER_SZ - 1;
const HEAD_COUNTER_INDEX: Index = 1024 + HEAD_POSITION_OFFSET;
const TAIL_COUNTER_INDEX: Index = 1024 + TAIL_POSITION_OFFSET;
const MSG_TYPE_ID: Index = 101;
struct Test {
ab: AtomicBuffer,
src_ab: AtomicBuffer,
ring_buffer: ManyToOneRingBuffer,
_buffer: AlignedBuffer,
_src_buffer: AlignedBuffer,
}
impl Test {
pub fn new() -> Self {
let buffer = AlignedBuffer::with_capacity(BUFFER_SZ);
let ab = AtomicBuffer::from_aligned(&buffer);
let src_buffer = AlignedBuffer::with_capacity(BUFFER_SZ);
let src_ab = AtomicBuffer::from_aligned(&src_buffer);
let ring_buffer = ManyToOneRingBuffer::new(ab).unwrap();
Self {
ab,
src_ab,
ring_buffer,
_buffer: buffer,
_src_buffer: src_buffer,
}
}
}
#[test]
fn ring_buffer_calculate_capacity_for_buffer() {
let test = Test::new();
assert_eq!(test.ab.capacity(), BUFFER_SZ);
assert_eq!(test.ring_buffer.capacity(), BUFFER_SZ - TRAILER_LENGTH)
}
#[test]
fn ring_buffer_that_capacity_ok() {
let b = AlignedBuffer::with_capacity(1024);
let ab = AtomicBuffer::from_aligned(&b);
let cap = ab.capacity();
assert_eq!(cap, 1024);
let p = ManyToOneRingBuffer::new(ab).unwrap();
assert_eq!(p.capacity, cap - TRAILER_LENGTH)
}
#[test]
fn ring_buffer_capacity_not_power_of_two() {
let test_buffer = AlignedBuffer::with_capacity(ODD_BUFFER_SZ);
let ab = AtomicBuffer::from_aligned(&test_buffer);
let ring_res = ManyToOneRingBuffer::new(ab);
assert_eq!(
ring_res.unwrap_err(),
RingBufferError::CapacityIsNotTwoPower {
capacity: ODD_BUFFER_SZ - TRAILER_LENGTH
}
);
}
#[test]
fn ring_buffer_max_message_size_exceeded() {
let test = Test::new();
let size = test.ring_buffer.max_msg_len() + 1;
let write_res = test
.ring_buffer
.write(AeronCommand::UnitTestMessageTypeID, test.src_ab, 0, size);
assert_eq!(write_res.unwrap_err(), RingBufferError::MessageTooLong { msg: 129, max: 128 });
}
#[test]
fn ring_buffer_that_writes_to_empty() {
let test = Test::new();
let tail: Index = 0;
let tail_index: Index = 0;
let length: Index = 8;
let record_length: Index = length + record_descriptor::HEADER_LENGTH;
let aligned_record_length: Index = align(record_length, record_descriptor::ALIGNMENT);
test.ring_buffer
.write(AeronCommand::UnitTestMessageTypeID, test.src_ab, 0, length)
.unwrap();
let record_length = test.ab.get::<i32>(record_descriptor::length_offset(tail_index));
assert_eq!(record_length, record_length);
let msg_type = test.ab.get::<i32>(record_descriptor::type_offset(tail_index));
assert_eq!(msg_type, MSG_TYPE_ID);
let tail_counter = test.ab.get::<i64>(TAIL_COUNTER_INDEX);
assert_eq!(tail_counter, (tail + aligned_record_length) as i64);
}
#[test]
fn ring_buffer_should_reject_write_when_insufficient_space() {
let test = Test::new();
let length: Index = 100;
let head: Index = 0;
let tail: Index = head + (CAPACITY as Index - align(length - record_descriptor::ALIGNMENT, record_descriptor::ALIGNMENT));
let _src_index: Index = 0;
test.ab.put::<i64>(HEAD_COUNTER_INDEX, head as i64);
test.ab.put::<i64>(TAIL_COUNTER_INDEX, tail as i64);
let err = test
.ring_buffer
.write(AeronCommand::UnitTestMessageTypeID, test.src_ab, 0, length)
.unwrap_err();
assert_eq!(err, RingBufferError::InsufficientCapacity);
assert_eq!(test.ab.get::<i64>(TAIL_COUNTER_INDEX), tail as i64);
}
#[test]
fn ring_buffer_should_reject_write_when_buffer_full() {
let test = Test::new();
let length: Index = 8;
let head: Index = 0;
let tail: Index = head + CAPACITY;
let _src_index: Index = 0;
test.ab.put::<i64>(HEAD_COUNTER_INDEX, head as i64);
test.ab.put::<i64>(TAIL_COUNTER_INDEX, tail as i64);
let err = test
.ring_buffer
.write(AeronCommand::UnitTestMessageTypeID, test.src_ab, 0, length)
.unwrap_err();
assert_eq!(err, RingBufferError::InsufficientCapacity);
assert_eq!(test.ab.get::<i64>(TAIL_COUNTER_INDEX), tail as i64);
}
#[test]
fn ring_buffer_should_insert_padding_record_plus_message_on_buffer_wrap() {
let test = Test::new();
let length: Index = 100;
let record_length: Index = length + record_descriptor::HEADER_LENGTH;
let aligned_record_length: Index = align(record_length, record_descriptor::ALIGNMENT);
let tail: Index = CAPACITY - record_descriptor::ALIGNMENT;
let head: Index = tail - (record_descriptor::ALIGNMENT * 4);
let _src_index: Index = 0;
test.ab.put::<i64>(HEAD_COUNTER_INDEX, head as i64);
test.ab.put::<i64>(TAIL_COUNTER_INDEX, tail as i64);
test.ring_buffer
.write(AeronCommand::UnitTestMessageTypeID, test.src_ab, 0, length)
.unwrap();
assert_eq!(
test.ab.get::<i32>(record_descriptor::type_offset(tail)),
AeronCommand::Padding as i32
);
assert_eq!(
test.ab.get::<i32>(record_descriptor::length_offset(tail)),
record_descriptor::ALIGNMENT
);
assert_eq!(test.ab.get::<i32>(record_descriptor::length_offset(0)), record_length);
assert_eq!(test.ab.get::<i32>(record_descriptor::type_offset(0)), MSG_TYPE_ID);
assert_eq!(
test.ab.get::<i64>(TAIL_COUNTER_INDEX),
(tail + aligned_record_length + record_descriptor::ALIGNMENT) as i64
);
}
#[test]
fn ring_buffer_should_insert_padding_record_plus_message_on_buffer_wrap_with_head_equal_to_tail() {
let test = Test::new();
let length: Index = 100;
let record_length: Index = length + record_descriptor::HEADER_LENGTH;
let aligned_record_length: Index = align(record_length, record_descriptor::ALIGNMENT);
let tail: Index = CAPACITY - record_descriptor::ALIGNMENT;
let head: Index = tail;
let _src_index: Index = 0;
test.ab.put::<i64>(HEAD_COUNTER_INDEX, head as i64);
test.ab.put::<i64>(TAIL_COUNTER_INDEX, tail as i64);
test.ring_buffer
.write(AeronCommand::UnitTestMessageTypeID, test.src_ab, 0, length)
.unwrap();
assert_eq!(
test.ab.get::<i32>(record_descriptor::type_offset(tail)),
AeronCommand::Padding as i32
);
assert_eq!(
test.ab.get::<i32>(record_descriptor::length_offset(tail)),
record_descriptor::ALIGNMENT
);
assert_eq!(test.ab.get::<i32>(record_descriptor::length_offset(0)), record_length);
assert_eq!(test.ab.get::<i32>(record_descriptor::type_offset(0)), MSG_TYPE_ID);
assert_eq!(
test.ab.get::<i64>(TAIL_COUNTER_INDEX),
(tail + aligned_record_length + record_descriptor::ALIGNMENT) as i64
);
}
#[test]
fn ring_buffer_should_read_nothing_from_empty_buffer() {
let test = Test::new();
let tail: Index = 0;
let head: Index = 0;
test.ab.put::<i64>(HEAD_COUNTER_INDEX, head as i64);
test.ab.put::<i64>(TAIL_COUNTER_INDEX, tail as i64);
let mut times_called = 0;
let handler = |_command, _buffer| times_called += 1;
let messages_read = test.ring_buffer.read(handler, 1);
assert_eq!(messages_read, 0);
assert_eq!(times_called, 0);
}
#[test]
fn ring_buffer_should_read_single_message() {
let test = Test::new();
let length: Index = 8;
let head: Index = 0;
let record_length: Index = length + record_descriptor::HEADER_LENGTH;
let aligned_record_length: Index = align(record_length, record_descriptor::ALIGNMENT);
let tail: Index = aligned_record_length;
test.ab.put::<i64>(HEAD_COUNTER_INDEX, head as i64);
test.ab.put::<i64>(TAIL_COUNTER_INDEX, tail as i64);
test.ab.put::<i32>(record_descriptor::type_offset(0), MSG_TYPE_ID);
test.ab.put::<i32>(record_descriptor::length_offset(0), record_length);
let mut times_called = 0;
let handler = |_command, _buffer| times_called += 1;
let messages_read = test.ring_buffer.read(handler, 1);
assert_eq!(messages_read, 1);
assert_eq!(times_called, 1);
assert_eq!(test.ab.get::<i64>(HEAD_COUNTER_INDEX), (head + aligned_record_length) as i64);
for i in (0..record_descriptor::ALIGNMENT).step_by(4) {
assert_eq!(test.ab.get::<i32>(i), 0);
}
}
#[test]
fn ring_buffer_should_not_read_single_message_part_way_through_writing() {
let test = Test::new();
let length: Index = 8;
let head: Index = 0;
let record_length: Index = length + record_descriptor::HEADER_LENGTH;
let aligned_record_length: Index = align(record_length, record_descriptor::ALIGNMENT);
let end_tail: Index = aligned_record_length;
test.ab.put::<i64>(TAIL_COUNTER_INDEX, end_tail as i64);
test.ab.put::<i32>(record_descriptor::type_offset(0), MSG_TYPE_ID);
test.ab.put::<i32>(record_descriptor::length_offset(0), -record_length);
let mut times_called = 0;
let handler = |_command, _buffer| times_called += 1;
let messages_read = test.ring_buffer.read(handler, 1);
assert_eq!(messages_read, 0);
assert_eq!(times_called, 0);
assert_eq!(test.ab.get::<i64>(HEAD_COUNTER_INDEX), head as i64);
}
#[test]
fn ring_buffer_should_read_two_messages() {
let test = Test::new();
let length: Index = 8;
let head: Index = 0;
let record_length: Index = length + record_descriptor::HEADER_LENGTH;
let aligned_record_length: Index = align(record_length, record_descriptor::ALIGNMENT);
let tail: Index = aligned_record_length * 2;
test.ab.put::<i64>(HEAD_COUNTER_INDEX, head as i64);
test.ab.put::<i64>(TAIL_COUNTER_INDEX, tail as i64);
test.ab.put::<i32>(record_descriptor::type_offset(0), MSG_TYPE_ID);
test.ab.put::<i32>(record_descriptor::length_offset(0), record_length);
test.ab
.put::<i32>(record_descriptor::type_offset(aligned_record_length), MSG_TYPE_ID);
test.ab
.put::<i32>(record_descriptor::length_offset(aligned_record_length), record_length);
let mut times_called = 0;
let handler = |_command, _buffer| times_called += 1;
let messages_read = test.ring_buffer.read(handler, 2);
assert_eq!(messages_read, 2);
assert_eq!(times_called, 2);
assert_eq!(
test.ab.get::<i64>(HEAD_COUNTER_INDEX),
(head + aligned_record_length * 2) as i64
);
for i in (0..record_descriptor::ALIGNMENT).step_by(4) {
assert_eq!(test.ab.get::<i32>(i), 0);
}
}
#[test]
fn ring_buffer_should_limit_read_of_messages() {
let test = Test::new();
let length: Index = 8;
let head: Index = 0;
let record_length: Index = length + record_descriptor::HEADER_LENGTH;
let aligned_record_length: Index = align(record_length, record_descriptor::ALIGNMENT);
let tail: Index = aligned_record_length * 2;
test.ab.put::<i64>(HEAD_COUNTER_INDEX, head as i64);
test.ab.put::<i64>(TAIL_COUNTER_INDEX, tail as i64);
test.ab.put::<i32>(record_descriptor::type_offset(0), MSG_TYPE_ID);
test.ab.put::<i32>(record_descriptor::length_offset(0), record_length);
test.ab
.put::<i32>(record_descriptor::type_offset(aligned_record_length), MSG_TYPE_ID);
test.ab
.put::<i32>(record_descriptor::length_offset(aligned_record_length), record_length);
let mut times_called = 0;
let handler = |_command, _buffer| times_called += 1;
let messages_read = test.ring_buffer.read(handler, 1);
assert_eq!(messages_read, 1);
assert_eq!(times_called, 1);
assert_eq!(test.ab.get::<i64>(HEAD_COUNTER_INDEX), (head + aligned_record_length) as i64);
for i in (0..record_descriptor::ALIGNMENT).step_by(4) {
assert_eq!(test.ab.get::<i32>(i), 0);
}
assert_eq!(
test.ab.get::<i32>(record_descriptor::length_offset(aligned_record_length)),
record_length
);
}
#[test]
fn ring_buffer_should_cope_with_exception_from_handler() {
let test = Test::new();
let length: Index = 8;
let head: Index = 0;
let record_length: Index = length + record_descriptor::HEADER_LENGTH;
let aligned_record_length: Index = align(record_length, record_descriptor::ALIGNMENT);
let tail: Index = aligned_record_length * 2;
test.ab.put::<i64>(HEAD_COUNTER_INDEX, head as i64);
test.ab.put::<i64>(TAIL_COUNTER_INDEX, tail as i64);
test.ab.put::<i32>(record_descriptor::type_offset(0), MSG_TYPE_ID);
test.ab.put::<i32>(record_descriptor::length_offset(0), record_length);
test.ab
.put::<i32>(record_descriptor::type_offset(aligned_record_length), MSG_TYPE_ID);
test.ab
.put::<i32>(record_descriptor::length_offset(aligned_record_length), record_length);
let mut times_called = 0;
let mut exception_threw: bool = false;
let handler = |_command, _buffer| {
times_called += 1;
if times_called == 2 {
exception_threw = true;
}
};
test.ring_buffer.read(handler, 2);
assert_eq!(times_called, 2);
assert!(exception_threw);
assert_eq!(
test.ab.get::<i64>(HEAD_COUNTER_INDEX),
(head + aligned_record_length * 2) as i64
);
for i in (0..record_descriptor::ALIGNMENT * 2).step_by(4) {
assert_eq!(test.ab.get::<i32>(i), 0);
}
}
#[test]
fn ring_buffer_should_not_unblock_when_empty() {
let test = Test::new();
let tail: Index = record_descriptor::ALIGNMENT * 4;
let head: Index = tail;
test.ab.put::<i64>(TAIL_COUNTER_INDEX, tail as i64);
test.ab.put::<i64>(HEAD_COUNTER_INDEX, head as i64);
assert!(!test.ring_buffer.unblock());
}
#[test]
fn ring_buffer_should_unblock_message_with_header() {
let test = Test::new();
let message_length: Index = record_descriptor::ALIGNMENT * 4;
let head: Index = message_length;
let tail: Index = message_length * 2;
test.ab.put::<i64>(HEAD_COUNTER_INDEX, head as i64);
test.ab.put::<i64>(TAIL_COUNTER_INDEX, tail as i64);
test.ab.put::<i32>(record_descriptor::type_offset(head), MSG_TYPE_ID);
test.ab.put::<i32>(record_descriptor::length_offset(head), -message_length);
assert!(test.ring_buffer.unblock());
assert_eq!(
test.ab.get::<i32>(record_descriptor::type_offset(head)),
AeronCommand::Padding as i32
);
assert_eq!(test.ab.get::<i32>(record_descriptor::length_offset(head)), message_length);
assert_eq!(test.ab.get::<i64>(HEAD_COUNTER_INDEX), message_length as i64);
assert_eq!(test.ab.get::<i64>(TAIL_COUNTER_INDEX), (message_length * 2) as i64);
}
#[test]
fn ring_buffer_should_unblock_gap_with_zeros() {
let test = Test::new();
let message_length: Index = record_descriptor::ALIGNMENT * 4;
let head: Index = message_length;
let tail: Index = message_length * 3;
test.ab.put::<i64>(HEAD_COUNTER_INDEX, head as i64);
test.ab.put::<i64>(TAIL_COUNTER_INDEX, tail as i64);
test.ab
.put::<i32>(record_descriptor::length_offset(message_length * 2), message_length);
assert!(test.ring_buffer.unblock());
assert_eq!(
test.ab.get::<i32>(record_descriptor::type_offset(head)),
AeronCommand::Padding as i32
);
assert_eq!(test.ab.get::<i32>(record_descriptor::length_offset(head)), message_length);
assert_eq!(test.ab.get::<i64>(HEAD_COUNTER_INDEX), message_length as i64);
assert_eq!(test.ab.get::<i64>(TAIL_COUNTER_INDEX), (message_length * 3) as i64);
}
#[test]
fn ring_buffer_should_not_unblock_gap_with_message_race_on_second_message_increasing_tail_then_interrupting() {
let test = Test::new();
let message_length: Index = record_descriptor::ALIGNMENT * 4;
let head = message_length;
let tail = message_length * 3;
test.ab.put_ordered::<i64>(HEAD_COUNTER_INDEX, head as i64);
test.ab.put_ordered::<i64>(TAIL_COUNTER_INDEX, tail as i64);
test.ab.put_ordered::<i32>(message_length * 2, 0);
assert_eq!(test.ring_buffer.unblock(), false);
}
const NUM_IDS_PER_THREAD: i64 = 10 * 1000 * 1000;
const NUM_PUBLISHERS: i64 = 2;
lazy_static! {
pub static ref PUB_COUNT_DOWN1: AtomicI64 = AtomicI64::new(NUM_PUBLISHERS);
}
#[test]
fn ring_buffer_should_provide_correlation_ids() {
let mpsc_buffer = AlignedBuffer::with_capacity(BUFFER_SZ);
let mpsc_ab = AtomicBuffer::from_aligned(&mpsc_buffer);
mpsc_ab.set_memory(0, mpsc_ab.capacity(), 0);
let ring_buffer = Arc::new(ManyToOneRingBuffer::new(mpsc_ab).unwrap());
let mut threads: Vec<std::thread::JoinHandle<()>> = Vec::new();
for _i in 0..NUM_PUBLISHERS {
let ring_buffer = ring_buffer.clone();
threads.push(std::thread::spawn(move || {
PUB_COUNT_DOWN1.fetch_sub(1, Ordering::SeqCst);
while PUB_COUNT_DOWN1.load(Ordering::SeqCst) > 0 {
std::thread::yield_now();
}
for _i in 0..NUM_IDS_PER_THREAD {
ring_buffer.next_correlation_id();
}
}));
}
threads.into_iter().for_each(|thread| thread.join().unwrap());
assert_eq!(ring_buffer.next_correlation_id(), NUM_IDS_PER_THREAD * NUM_PUBLISHERS);
}
lazy_static! {
pub static ref PUB_COUNT_DOWN2: AtomicI64 = AtomicI64::new(NUM_PUBLISHERS);
pub static ref PUBLISHER_ID: AtomicI32 = AtomicI32::new(0);
pub static ref COUNTS: Arc<Mutex<[i32; NUM_PUBLISHERS as usize]>> = Arc::new(Mutex::new([0; NUM_PUBLISHERS as usize]));
}
const NUM_MESSAGES_PER_PUBLISHER: i64 = 10000;
#[test]
fn ring_buffer_should_exchange_messages() {
let mpsc_buffer = AlignedBuffer::with_capacity(BUFFER_SZ);
let mpsc_ab = AtomicBuffer::from_aligned(&mpsc_buffer);
mpsc_ab.set_memory(0, mpsc_ab.capacity(), 0);
let ring_buffer = Arc::new(ManyToOneRingBuffer::new(mpsc_ab).unwrap());
let mut threads: Vec<std::thread::JoinHandle<()>> = Vec::new();
for _i in 0..NUM_PUBLISHERS {
let ring_buffer = ring_buffer.clone();
threads.push(std::thread::spawn(move || {
let src_buf = AlignedBuffer::with_capacity(BUFFER_SZ);
let src_ab = AtomicBuffer::from_aligned(&src_buf);
src_ab.set_memory(0, src_ab.capacity(), 0);
let id = PUBLISHER_ID.fetch_add(1, Ordering::SeqCst);
PUB_COUNT_DOWN2.fetch_sub(1, Ordering::SeqCst);
while PUB_COUNT_DOWN2.load(Ordering::SeqCst) > 0 {
std::thread::yield_now();
}
let message_length = 4 + 4;
let message_num_offset = 4;
src_ab.put::<i32>(0, id);
for i in 0..NUM_MESSAGES_PER_PUBLISHER {
src_ab.put::<i32>(message_num_offset, i as i32);
while ring_buffer
.write(AeronCommand::UnitTestMessageTypeID, src_ab, 0, message_length)
.is_err()
{
std::thread::yield_now();
}
}
}));
}
let mut msg_count = 0;
let handler = |command: AeronCommand, buffer: AtomicBuffer| {
let id: usize = buffer.get::<i32>(0) as usize;
let message_number = buffer.get::<i32>(4);
assert_eq!(buffer.capacity(), 4 + 4);
assert_eq!(command, AeronCommand::UnitTestMessageTypeID);
let mut counts = COUNTS.lock().unwrap();
assert_eq!(counts[id], message_number);
counts[id] += 1;
};
while msg_count < NUM_MESSAGES_PER_PUBLISHER * NUM_PUBLISHERS {
let read_count = ring_buffer.read(handler, (NUM_MESSAGES_PER_PUBLISHER * NUM_PUBLISHERS) as i32);
if 0 == read_count {
std::thread::sleep(Duration::from_millis(1));
}
msg_count += read_count as i64;
}
threads.into_iter().for_each(|thread| thread.join().unwrap());
}
#[test]
fn ring_buffer_header_construction() {
let header = make_header(111, AeronCommand::RemoveCounter);
assert_eq!(record_length(header), 111);
assert_eq!(message_type_id(header), AeronCommand::RemoveCounter as i32);
assert_eq!(message_type(header), AeronCommand::RemoveCounter);
let header = make_header(222_111, AeronCommand::AddCounter);
assert_eq!(record_length(header), 222_111);
assert_eq!(message_type_id(header), AeronCommand::AddCounter as i32);
assert_eq!(message_type(header), AeronCommand::AddCounter);
}
}