use std::{
ffi::CString,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
};
use crate::{
client_conductor::ClientConductor,
concurrent::{
atomic_buffer::AtomicBuffer,
logbuffer::{
buffer_claim::BufferClaim,
data_frame_header,
exclusive_term_appender::ExclusiveTermAppender,
frame_descriptor,
header::HeaderWriter,
log_buffer_descriptor,
term_appender::{default_reserved_value_supplier, OnReservedValueSupplier},
},
position::{ReadablePosition, UnsafeBufferPosition},
status::status_indicator_reader,
},
publication::{ADMIN_ACTION, BACK_PRESSURED, MAX_POSITION_EXCEEDED, NOT_CONNECTED, PUBLICATION_CLOSED},
utils::{bit_utils::number_of_trailing_zeroes, errors::AeronError, log_buffers::LogBuffers, types::Index},
};
#[allow(dead_code)]
pub struct ExclusivePublication {
conductor: Arc<Mutex<ClientConductor>>,
log_meta_data_buffer: AtomicBuffer,
channel: CString,
registration_id: i64,
max_possible_position: i64,
stream_id: i32,
session_id: i32,
initial_term_id: i32,
max_payload_length: Index,
max_message_length: Index,
position_bits_to_shift: i32,
term_offset: i32,
term_id: i32,
active_partition_index: i32,
term_begin_position: i64,
publication_limit: UnsafeBufferPosition,
channel_status_id: i32,
is_closed: AtomicBool,
log_buffers: Arc<LogBuffers>,
appenders: [ExclusiveTermAppender; log_buffer_descriptor::PARTITION_COUNT as usize],
header_writer: HeaderWriter,
}
impl ExclusivePublication {
#[allow(clippy::too_many_arguments)]
pub fn new(
conductor: Arc<Mutex<ClientConductor>>,
channel: CString,
registration_id: i64,
stream_id: i32,
session_id: i32,
publication_limit: UnsafeBufferPosition,
channel_status_id: i32,
log_buffers: Arc<LogBuffers>,
) -> Self {
let log_md_buffer = log_buffers.atomic_buffer(log_buffer_descriptor::LOG_META_DATA_SECTION_INDEX);
let appenders: [ExclusiveTermAppender; 3] = [
ExclusiveTermAppender::new(
log_buffers.atomic_buffer(0),
log_buffers.atomic_buffer(log_buffer_descriptor::LOG_META_DATA_SECTION_INDEX),
0,
),
ExclusiveTermAppender::new(
log_buffers.atomic_buffer(1),
log_buffers.atomic_buffer(log_buffer_descriptor::LOG_META_DATA_SECTION_INDEX),
1,
),
ExclusiveTermAppender::new(
log_buffers.atomic_buffer(2),
log_buffers.atomic_buffer(log_buffer_descriptor::LOG_META_DATA_SECTION_INDEX),
2,
),
];
let raw_tail = appenders[0].raw_tail();
Self {
conductor,
log_meta_data_buffer: log_md_buffer,
channel,
registration_id,
max_possible_position: (log_buffers.atomic_buffer(0).capacity() as i64) << 31,
stream_id,
session_id,
initial_term_id: log_buffer_descriptor::initial_term_id(&log_md_buffer),
max_payload_length: log_buffer_descriptor::mtu_length(&log_md_buffer) as Index - data_frame_header::LENGTH,
max_message_length: frame_descriptor::compute_max_message_length(log_buffers.atomic_buffer(0).capacity()),
position_bits_to_shift: number_of_trailing_zeroes(log_buffers.atomic_buffer(0).capacity()),
term_offset: log_buffer_descriptor::term_offset(raw_tail, log_buffers.atomic_buffer(0).capacity() as i64),
term_id: log_buffer_descriptor::term_id(raw_tail),
active_partition_index: 0,
term_begin_position: 0,
publication_limit,
channel_status_id,
is_closed: AtomicBool::from(false),
log_buffers,
header_writer: HeaderWriter::new(log_buffer_descriptor::default_frame_header(&log_md_buffer)),
appenders,
}
}
#[inline]
pub fn channel(&self) -> CString {
self.channel.clone()
}
#[inline]
pub fn stream_id(&self) -> i32 {
self.stream_id
}
#[inline]
pub fn session_id(&self) -> i32 {
self.session_id
}
#[inline]
pub fn initial_term_id(&self) -> i32 {
self.initial_term_id
}
#[inline]
pub fn term_id(&self) -> i32 {
self.term_id
}
#[inline]
pub fn term_offset(&self) -> i32 {
self.term_offset
}
#[inline]
pub fn original_registration_id(&self) -> i64 {
self.registration_id
}
#[inline]
pub fn registration_id(&self) -> i64 {
self.registration_id
}
pub const fn is_original(&self) -> bool {
true
}
#[inline]
pub fn max_message_length(&self) -> Index {
self.max_message_length
}
#[inline]
pub fn max_payload_length(&self) -> Index {
self.max_payload_length
}
#[inline]
pub fn term_buffer_length(&self) -> i32 {
self.appenders[0].term_buffer().capacity()
}
#[inline]
pub fn position_bits_to_shift(&self) -> i32 {
self.position_bits_to_shift
}
#[inline]
pub fn is_connected(&self) -> bool {
!self.is_closed() && log_buffer_descriptor::is_connected(&self.log_meta_data_buffer)
}
#[inline]
pub fn is_closed(&self) -> bool {
self.is_closed.load(Ordering::Acquire)
}
#[inline]
pub fn position(&self) -> i64 {
if !self.is_closed() {
self.term_begin_position + self.term_offset as i64
} else {
PUBLICATION_CLOSED
}
}
#[inline]
pub fn publication_limit(&self) -> i64 {
if self.is_closed() {
PUBLICATION_CLOSED
} else {
self.publication_limit.get_volatile()
}
}
#[inline]
pub fn publication_limit_id(&self) -> i32 {
self.publication_limit.id()
}
#[inline]
pub fn available_window(&self) -> i64 {
if !self.is_closed() {
self.publication_limit.get_volatile() - self.position()
} else {
PUBLICATION_CLOSED
}
}
#[inline]
pub fn channel_status_id(&self) -> i32 {
self.channel_status_id
}
pub fn offer_opt(
&mut self,
buffer: AtomicBuffer,
offset: Index,
length: Index,
reserved_value_supplier: OnReservedValueSupplier,
) -> Result<i64, AeronError> {
let mut new_position = PUBLICATION_CLOSED;
if !self.is_closed() {
let limit = self.publication_limit.get_volatile();
let term_appender = &mut self.appenders[self.active_partition_index as usize];
let position = self.term_begin_position + self.term_offset as i64;
if position < limit {
let resulting_offset = if length <= self.max_payload_length {
term_appender.append_unfragmented_message(
self.term_id,
self.term_offset,
&self.header_writer,
buffer,
offset,
length,
reserved_value_supplier,
)
} else {
if length > self.max_message_length {
return Err(AeronError::IllegalArgumentException(format!(
"encoded message exceeds max_message_length of {}, length={}",
self.max_message_length, length
)));
}
term_appender.append_fragmented_message(
self.term_id,
self.term_offset,
&self.header_writer,
buffer,
offset,
length,
self.max_payload_length,
reserved_value_supplier,
)
};
new_position = self.new_position(resulting_offset);
} else {
new_position = self.back_pressure_status(position, length);
}
}
Ok(new_position)
}
pub fn offer_part(&mut self, buffer: AtomicBuffer, offset: Index, length: Index) -> Result<i64, AeronError> {
self.offer_opt(buffer, offset, length, default_reserved_value_supplier)
}
pub fn offer(&mut self, buffer: AtomicBuffer) -> Result<i64, AeronError> {
self.offer_part(buffer, 0, buffer.capacity())
}
pub fn offer_bulk(
&mut self,
buffers: Vec<AtomicBuffer>,
reserved_value_supplier: OnReservedValueSupplier,
) -> Result<i64, AeronError> {
let length: Index = buffers.iter().map(|&ab| ab.capacity()).sum();
if length == std::i32::MAX {
return Err(AeronError::IllegalStateException(format!("length overflow: {}", length)));
}
let mut new_position = PUBLICATION_CLOSED;
if !self.is_closed() {
let limit = self.publication_limit.get_volatile();
let term_appender = &mut self.appenders[self.active_partition_index as usize];
let position = self.term_begin_position + self.term_offset as i64;
if position < limit {
let resulting_offset = if length <= self.max_payload_length {
term_appender.append_unfragmented_message_bulk(
self.term_id,
self.term_offset,
&self.header_writer,
buffers,
length,
reserved_value_supplier,
)
} else {
if length > self.max_message_length {
return Err(AeronError::IllegalArgumentException(format!(
"encoded message exceeds max_message_length of {}, length={}",
self.max_message_length, length
)));
}
term_appender.append_unfragmented_message_bulk(
self.term_id,
self.term_offset,
&self.header_writer,
buffers,
length,
reserved_value_supplier,
)
};
new_position = self.new_position(resulting_offset);
} else {
new_position = self.back_pressure_status(position, length as Index);
}
}
Ok(new_position)
}
pub fn try_claim(&mut self, length: Index, mut buffer_claim: BufferClaim) -> Result<i64, AeronError> {
self.check_payload_length(length)?;
let mut new_position = PUBLICATION_CLOSED;
if !self.is_closed() {
let limit = self.publication_limit.get_volatile();
let term_appender = &mut self.appenders[self.active_partition_index as usize];
let position = self.term_begin_position + self.term_offset as i64;
if position < limit {
let resulting_offset =
term_appender.claim(self.term_id, self.term_offset, &self.header_writer, length, &mut buffer_claim);
new_position = self.new_position(resulting_offset);
} else {
new_position = self.back_pressure_status(position, length);
}
}
Ok(new_position)
}
pub fn add_destination(&mut self, endpoint_channel: CString) -> Result<i64, AeronError> {
if self.is_closed() {
return Err(AeronError::IllegalStateException(String::from("Publication is closed")));
}
self.conductor
.lock()
.expect("Mutex poisoned")
.add_destination(self.registration_id, endpoint_channel)
}
pub fn remove_destination(&mut self, endpoint_channel: CString) -> Result<i64, AeronError> {
if self.is_closed() {
return Err(AeronError::IllegalStateException(String::from("Publication is closed")));
}
self.conductor
.lock()
.expect("Mutex poisoned")
.remove_destination(self.registration_id, endpoint_channel)
}
pub fn channel_status(&self) -> i64 {
if self.is_closed() {
return status_indicator_reader::NO_ID_ALLOCATED as i64;
}
self.conductor
.lock()
.expect("Mutex poisoned")
.channel_status(self.channel_status_id)
}
pub fn close(&self) {
self.is_closed.store(true, Ordering::Release);
}
fn new_position(&mut self, resulting_offset: Index) -> i64 {
if resulting_offset > 0 {
self.term_offset = resulting_offset;
return self.term_begin_position + resulting_offset as i64;
}
let term_length = self.term_buffer_length();
if self.term_begin_position + term_length as i64 >= self.max_possible_position {
return MAX_POSITION_EXCEEDED;
}
let next_index = log_buffer_descriptor::next_partition_index(self.active_partition_index);
let next_term_id = self.term_id + 1;
self.active_partition_index = next_index;
self.term_offset = 0;
self.term_id = next_term_id;
self.term_begin_position += term_length as i64;
let term_count = next_term_id - self.initial_term_id;
log_buffer_descriptor::initialize_tail_with_term_id(&self.log_meta_data_buffer, next_index, next_term_id);
log_buffer_descriptor::set_active_term_count_ordered(&self.log_meta_data_buffer, term_count);
ADMIN_ACTION
}
fn back_pressure_status(&self, current_position: i64, message_length: i32) -> i64 {
if current_position + message_length as i64 >= self.max_possible_position {
return MAX_POSITION_EXCEEDED;
}
if log_buffer_descriptor::is_connected(&self.log_meta_data_buffer) {
return BACK_PRESSURED;
}
NOT_CONNECTED
}
#[allow(dead_code)]
fn check_max_message_length(&self, length: Index) -> Result<(), AeronError> {
if length > self.max_message_length {
Err(AeronError::IllegalArgumentException(format!(
"encoded message exceeds max_message_length of {}, length={}",
self.max_message_length, length
)))
} else {
Ok(())
}
}
fn check_payload_length(&self, length: Index) -> Result<(), AeronError> {
if length > self.max_payload_length {
Err(AeronError::IllegalArgumentException(format!(
"encoded message exceeds max_payload_length of {}, length={}",
self.max_payload_length, length
)))
} else {
Ok(())
}
}
}
impl Drop for ExclusivePublication {
fn drop(&mut self) {
self.is_closed.store(true, Ordering::Release);
let _unused = self
.conductor
.lock()
.expect("Mutex poisoned")
.release_exclusive_publication(self.registration_id);
}
}
#[cfg(test)]
mod tests {
use std::ffi::CString;
use std::sync::{Arc, Mutex};
use lazy_static::lazy_static;
use crate::{
client_conductor::ClientConductor,
concurrent::{
atomic_buffer::{AlignedBuffer, AtomicBuffer},
broadcast::{
broadcast_buffer_descriptor, broadcast_receiver::BroadcastReceiver,
copy_broadcast_receiver::CopyBroadcastReceiver,
},
counters::CountersReader,
logbuffer::{
buffer_claim::BufferClaim,
data_frame_header::LENGTH,
frame_descriptor,
log_buffer_descriptor::{self, AERON_PAGE_MIN_SIZE, TERM_MIN_LENGTH},
},
position::{ReadablePosition, UnsafeBufferPosition},
ring_buffer::{self, ManyToOneRingBuffer},
status::status_indicator_reader::{StatusIndicatorReader, NO_ID_ALLOCATED},
},
driver_proxy::DriverProxy,
exclusive_publication::ExclusivePublication,
publication::{ADMIN_ACTION, NOT_CONNECTED, PUBLICATION_CLOSED},
utils::{
errors::AeronError,
log_buffers::LogBuffers,
misc::unix_time_ms,
types::{Index, Moment, I64_SIZE},
},
};
lazy_static! {
pub static ref CHANNEL: CString = CString::new("aeron:udp?endpoint=localhost:40123").unwrap();
}
const STREAM_ID: i32 = 10;
const SESSION_ID: i32 = 200;
const PUBLICATION_LIMIT_COUNTER_ID: i32 = 0;
const CORRELATION_ID: i64 = 100;
const TERM_ID_1: i32 = 1;
const DRIVER_TIMEOUT_MS: Moment = 10 * 1000;
const RESOURCE_LINGER_TIMEOUT_MS: Moment = 5 * 1000;
const INTER_SERVICE_TIMEOUT_NS: Moment = 5 * 1000 * 1000 * 1000;
const INTER_SERVICE_TIMEOUT_MS: Moment = INTER_SERVICE_TIMEOUT_NS / 1_000_000;
const PRE_TOUCH_MAPPED_MEMORY: bool = false;
const CAPACITY: i32 = 1024;
const MANY_TO_ONE_RING_BUFFER_LENGTH: i32 = CAPACITY + ring_buffer::TRAILER_LENGTH;
const BROADCAST_BUFFER_LENGTH: i32 = CAPACITY + broadcast_buffer_descriptor::TRAILER_LENGTH;
const COUNTER_METADATA_BUFFER_LENGTH: i32 = 4 * 1024 * 1024;
#[inline]
fn raw_tail_value(term_id: i32, position: i64) -> i64 {
(term_id as i64 * (1_i64 << 32)) as i64 | position
}
#[inline]
fn term_tail_counter_offset(index: i32) -> Index {
*log_buffer_descriptor::TERM_TAIL_COUNTER_OFFSET + (index * I64_SIZE)
}
fn on_new_publication_handler(_channel: CString, _stream_id: i32, _session_id: i32, _correlation_id: i64) {}
fn on_new_exclusive_publication_handler(_channel: CString, _stream_id: i32, _session_id: i32, _correlation_id: i64) {}
fn on_new_subscription_handler(_channel: CString, _stream_id: i32, _correlation_id: i64) {}
fn error_handler(err: AeronError) {
println!("Got error: {:?}", err);
}
fn on_available_counter_handler(_counters_reader: &CountersReader, _registration_id: i64, _counter_id: i32) {}
fn on_unavailable_counter_handler(_counters_reader: &CountersReader, _registration_id: i64, _counter_id: i32) {}
fn on_close_client_handler() {}
#[allow(dead_code)]
struct ExclusivePublicationTest {
src: AlignedBuffer,
log: AlignedBuffer,
conductor: Arc<Mutex<ClientConductor>>,
to_driver: AlignedBuffer,
to_clients: AlignedBuffer,
counter_metadata: AlignedBuffer,
counter_values: AlignedBuffer,
to_driver_buffer: AtomicBuffer,
to_clients_buffer: AtomicBuffer,
many_to_one_ring_buffer: Arc<ManyToOneRingBuffer>,
term_buffers: [AtomicBuffer; 3],
log_meta_data_buffer: AtomicBuffer,
src_buffer: AtomicBuffer,
log_buffers: Arc<LogBuffers>,
publication_limit: UnsafeBufferPosition,
channel_status_indicator: StatusIndicatorReader,
publication: ExclusivePublication,
}
impl ExclusivePublicationTest {
pub fn new() -> Self {
let log = AlignedBuffer::with_capacity(TERM_MIN_LENGTH * 3 + log_buffer_descriptor::LOG_META_DATA_LENGTH);
let src = AlignedBuffer::with_capacity(1024);
let src_buffer = AtomicBuffer::from_aligned(&src);
let log_buffers =
Arc::new(unsafe { LogBuffers::new(log.ptr, log.len as isize, log_buffer_descriptor::TERM_MIN_LENGTH) });
let to_driver = AlignedBuffer::with_capacity(MANY_TO_ONE_RING_BUFFER_LENGTH);
let to_clients = AlignedBuffer::with_capacity(BROADCAST_BUFFER_LENGTH);
let counter_metadata = AlignedBuffer::with_capacity(BROADCAST_BUFFER_LENGTH);
let counter_values = AlignedBuffer::with_capacity(COUNTER_METADATA_BUFFER_LENGTH);
let to_driver_buffer = AtomicBuffer::from_aligned(&to_driver);
let to_clients_buffer = AtomicBuffer::from_aligned(&to_clients);
let counters_metadata_buffer = AtomicBuffer::from_aligned(&counter_metadata);
let counters_values_buffer = AtomicBuffer::from_aligned(&counter_values);
let local_to_driver_ring_buffer =
Arc::new(ManyToOneRingBuffer::new(to_driver_buffer).expect("Failed to create RingBuffer"));
let local_to_clients_broadcast_receiver = Arc::new(Mutex::new(
BroadcastReceiver::new(to_clients_buffer).expect("Failed to create BroadcastReceiver"),
));
let local_driver_proxy = Arc::new(DriverProxy::new(local_to_driver_ring_buffer.clone()));
let local_copy_broadcast_receiver =
Arc::new(Mutex::new(CopyBroadcastReceiver::new(local_to_clients_broadcast_receiver)));
let conductor = ClientConductor::new(
unix_time_ms,
local_driver_proxy,
local_copy_broadcast_receiver,
counters_metadata_buffer,
counters_values_buffer,
on_new_publication_handler,
on_new_exclusive_publication_handler,
on_new_subscription_handler,
error_handler,
on_available_counter_handler,
on_unavailable_counter_handler,
on_close_client_handler,
DRIVER_TIMEOUT_MS,
RESOURCE_LINGER_TIMEOUT_MS,
INTER_SERVICE_TIMEOUT_MS,
PRE_TOUCH_MAPPED_MEMORY,
);
let conductor_guard = conductor.lock().expect("Conductor mutex is poisoned");
let publication_limit =
UnsafeBufferPosition::new(conductor_guard.counter_values_buffer(), PUBLICATION_LIMIT_COUNTER_ID);
let channel_status_indicator = StatusIndicatorReader::new(conductor_guard.counter_values_buffer(), NO_ID_ALLOCATED);
drop(conductor_guard);
let log_meta_data_buffer = log_buffers.atomic_buffer(log_buffer_descriptor::LOG_META_DATA_SECTION_INDEX);
log_meta_data_buffer.put(*log_buffer_descriptor::LOG_MTU_LENGTH_OFFSET, 3 * src_buffer.capacity());
log_meta_data_buffer.put(*log_buffer_descriptor::LOG_TERM_LENGTH_OFFSET, TERM_MIN_LENGTH);
log_meta_data_buffer.put(*log_buffer_descriptor::LOG_PAGE_SIZE_OFFSET, AERON_PAGE_MIN_SIZE);
log_meta_data_buffer.put(*log_buffer_descriptor::LOG_INITIAL_TERM_ID_OFFSET, TERM_ID_1);
let index = log_buffer_descriptor::index_by_term(TERM_ID_1, TERM_ID_1);
log_meta_data_buffer.put(*log_buffer_descriptor::LOG_ACTIVE_TERM_COUNT_OFFSET, 0);
log_meta_data_buffer.put(term_tail_counter_offset(index), (TERM_ID_1 as i64) << 32);
Self {
src,
log,
conductor: conductor.clone(),
to_driver,
to_clients,
counter_metadata,
counter_values,
to_driver_buffer,
to_clients_buffer,
many_to_one_ring_buffer: local_to_driver_ring_buffer,
term_buffers: [
log_buffers.atomic_buffer(0),
log_buffers.atomic_buffer(1),
log_buffers.atomic_buffer(2),
],
log_meta_data_buffer,
src_buffer,
log_buffers: log_buffers.clone(),
publication_limit: publication_limit.clone(),
channel_status_indicator,
publication: ExclusivePublication::new(
conductor,
(*CHANNEL).clone(),
CORRELATION_ID,
STREAM_ID,
SESSION_ID,
publication_limit,
NO_ID_ALLOCATED,
log_buffers,
),
}
}
fn create_pub(&mut self) {
self.publication = ExclusivePublication::new(
self.conductor.clone(),
(*CHANNEL).clone(),
CORRELATION_ID,
STREAM_ID,
SESSION_ID,
self.publication_limit.clone(),
NO_ID_ALLOCATED,
self.log_buffers.clone(),
);
}
}
#[test]
fn should_report_initial_position() {
let test = ExclusivePublicationTest::new();
assert_eq!(test.publication.position(), 0);
}
#[test]
fn should_report_max_message_length() {
let test = ExclusivePublicationTest::new();
assert_eq!(
test.publication.max_message_length(),
frame_descriptor::compute_max_message_length(TERM_MIN_LENGTH)
);
}
#[test]
fn should_report_correct_term_buffer_length() {
let test = ExclusivePublicationTest::new();
assert_eq!(test.publication.term_buffer_length(), TERM_MIN_LENGTH);
}
#[test]
fn should_report_that_publication_has_not_been_connected_yet() {
let test = ExclusivePublicationTest::new();
log_buffer_descriptor::set_is_connected(&test.log_meta_data_buffer, false);
assert!(!test.publication.is_connected());
}
#[test]
fn should_report_that_publication_has_been_connected_yet() {
let test = ExclusivePublicationTest::new();
log_buffer_descriptor::set_is_connected(&test.log_meta_data_buffer, true);
assert!(test.publication.is_connected());
}
#[test]
fn should_ensure_the_publication_is_open_before_reading_position() {
let test = ExclusivePublicationTest::new();
test.publication.close();
assert_eq!(test.publication.position(), PUBLICATION_CLOSED);
}
#[test]
fn should_ensure_the_publication_is_open_before_offer() {
let mut test = ExclusivePublicationTest::new();
test.publication.close();
assert!(test.publication.is_closed());
assert_eq!(test.publication.offer(test.src_buffer).unwrap(), PUBLICATION_CLOSED);
}
#[test]
fn should_ensure_the_publication_is_open_before_claim() {
let mut test = ExclusivePublicationTest::new();
let buffer_claim = BufferClaim::default();
test.publication.close();
assert!(test.publication.is_closed());
assert_eq!(test.publication.try_claim(1024, buffer_claim).unwrap(), PUBLICATION_CLOSED);
}
#[test]
fn should_offer_a_message_upon_construction() {
let mut test = ExclusivePublicationTest::new();
let expected_position = test.src_buffer.capacity() + LENGTH;
test.publication_limit.set(2 * test.src_buffer.capacity() as i64);
assert_eq!(test.publication.offer(test.src_buffer).unwrap(), expected_position as i64);
assert_eq!(test.publication.position(), expected_position as i64);
}
#[test]
fn should_fail_to_offer_a_message_when_limited() {
let mut test = ExclusivePublicationTest::new();
test.publication_limit.set(0);
test.create_pub();
assert_eq!(test.publication.offer(test.src_buffer).unwrap(), NOT_CONNECTED);
}
#[test]
fn should_fail_to_offer_when_append_fails() {
let mut test = ExclusivePublicationTest::new();
let active_index = log_buffer_descriptor::index_by_term(TERM_ID_1, TERM_ID_1);
let initial_position = TERM_MIN_LENGTH;
test.log_meta_data_buffer.put(
term_tail_counter_offset(active_index),
raw_tail_value(TERM_ID_1, initial_position as i64),
);
test.publication_limit.set(i32::max_value() as i64);
test.create_pub();
assert_eq!(test.publication.position(), initial_position as i64);
assert_eq!(test.publication.offer(test.src_buffer).unwrap(), ADMIN_ACTION);
}
#[test]
fn should_rotate_when_append_trips() {
let mut test = ExclusivePublicationTest::new();
let active_index = log_buffer_descriptor::index_by_term(TERM_ID_1, TERM_ID_1);
let initial_position = TERM_MIN_LENGTH - LENGTH;
test.log_meta_data_buffer.put(
term_tail_counter_offset(active_index),
raw_tail_value(TERM_ID_1, initial_position as i64),
);
test.publication_limit.set(i32::max_value() as i64);
test.create_pub();
assert_eq!(test.publication.position(), initial_position as i64);
assert_eq!(test.publication.offer(test.src_buffer).unwrap(), ADMIN_ACTION);
let next_index = log_buffer_descriptor::index_by_term(TERM_ID_1, TERM_ID_1 + 1);
assert_eq!(
test.log_meta_data_buffer
.get::<i32>(*log_buffer_descriptor::LOG_ACTIVE_TERM_COUNT_OFFSET),
1
);
assert_eq!(
test.log_meta_data_buffer.get::<i64>(term_tail_counter_offset(next_index)),
((TERM_ID_1 + 1) as i64) << 32
);
assert!(
test.publication.offer(test.src_buffer).unwrap() > (initial_position + LENGTH + test.src_buffer.capacity()) as i64
);
assert!(test.publication.position() > (initial_position + LENGTH + test.src_buffer.capacity()) as i64);
}
#[test]
fn should_rotate_when_claim_trips() {
let mut test = ExclusivePublicationTest::new();
let active_index = log_buffer_descriptor::index_by_term(TERM_ID_1, TERM_ID_1);
let initial_position = TERM_MIN_LENGTH - LENGTH;
test.log_meta_data_buffer.put(
term_tail_counter_offset(active_index),
raw_tail_value(TERM_ID_1, initial_position as i64),
);
test.publication_limit.set(i32::max_value() as i64);
test.create_pub();
let buffer_claim = BufferClaim::default();
assert_eq!(test.publication.position(), initial_position as i64);
assert_eq!(test.publication.try_claim(1024, buffer_claim).unwrap(), ADMIN_ACTION);
let next_index = log_buffer_descriptor::index_by_term(TERM_ID_1, TERM_ID_1 + 1);
assert_eq!(
test.log_meta_data_buffer
.get::<i32>(*log_buffer_descriptor::LOG_ACTIVE_TERM_COUNT_OFFSET),
1
);
assert_eq!(
test.log_meta_data_buffer.get::<i64>(term_tail_counter_offset(next_index)),
((TERM_ID_1 + 1) as i64) << 32
);
assert!(
test.publication.try_claim(1024, buffer_claim).unwrap()
> (initial_position + LENGTH + test.src_buffer.capacity()) as i64
);
assert!(test.publication.position() > (initial_position + LENGTH + test.src_buffer.capacity()) as i64);
}
}