chair_rb/concurrent/ring_buffer/
mod.rs1use std::mem::size_of;
21
22use crate::concurrent::AtomicBuffer;
23use crate::util::bit_util;
24use crate::util::bit_util::CACHE_LINE_LENGTH;
25use crate::util::Index;
26
27pub use self::many_to_one_ring_buffer::ManyToOneRingBuffer;
28pub use self::one_to_one_ring_buffer::OneToOneRingBuffer;
29
30pub mod many_to_one_ring_buffer;
31pub mod one_to_one_ring_buffer;
32
33pub struct RingBufferDescriptor;
34
35impl RingBufferDescriptor {
36 pub const TAIL_POSITION_OFFSET: Index = CACHE_LINE_LENGTH * 2;
37 pub const HEAD_CACHE_POSITION_OFFSET: Index = CACHE_LINE_LENGTH * 4;
38 pub const HEAD_POSITION_OFFSET: Index = CACHE_LINE_LENGTH * 6;
39 pub const CORRELATION_COUNTER_OFFSET: Index = CACHE_LINE_LENGTH * 8;
40 pub const CONSUMER_HEARTBEAT_OFFSET: Index = CACHE_LINE_LENGTH * 10;
41
42 pub const TRAILER_LENGTH: Index = CACHE_LINE_LENGTH * 12;
44
45 #[inline]
46 fn check_capacity(capacity: Index) {
47 if !bit_util::is_power_of_two(capacity) {
48 panic!(
49 "Capacity must be a positive power of 2 + TRAILER_LENGTH: capacity={}",
50 capacity
51 )
52 }
53 }
54}
55
56pub struct RecordDescriptor;
74
75impl RecordDescriptor {
76 pub const HEADER_LENGTH: Index = size_of::<Index>() as Index * 2;
77 pub const ALIGNMENT: Index = RecordDescriptor::HEADER_LENGTH;
78 pub const PADDING_MSG_TYPE_ID: Index = -1;
79
80 #[inline]
81 pub fn length_offset(record_offset: Index) -> Index {
82 record_offset
83 }
84
85 #[inline]
86 pub fn type_offset(record_offset: Index) -> Index {
87 record_offset + size_of::<Index>() as Index
88 }
89
90 #[inline]
91 pub fn encoded_msg_offset(record_offset: Index) -> Index {
92 record_offset + RecordDescriptor::HEADER_LENGTH
93 }
94
95 #[inline]
96 pub fn make_header(length: i32, msg_type_id: i32) -> i64 {
97 (((msg_type_id as i64) & 0xFFFFFFFF) << 32) | (length as i64 & 0xFFFFFFFF)
98 }
99
100 #[inline]
101 pub fn record_length(header: i64) -> i32 {
102 header as i32
103 }
104
105 #[inline]
106 pub fn message_type_id(header: i64) -> i32 {
107 (header >> 32) as i32
108 }
109
110 #[inline]
111 pub fn check_msg_type_id(msg_type_id: i32) {
112 if msg_type_id < 1 {
113 panic!(
114 "Message type id must be greater than zero, msgTypeId={}",
115 msg_type_id
116 );
117 }
118 }
119}
120
121pub trait RingBuffer {
122 fn capacity(&self) -> Index;
123
124 fn write(
125 &self,
126 msg_type_id: i32,
127 src_buffer: &AtomicBuffer,
128 src_index: Index,
129 length: Index,
130 ) -> bool;
131
132 fn read<'a, F>(&'a self, handler: F, message_count_limit: u32) -> u32
133 where
134 F: FnMut(i32, &'a AtomicBuffer, Index, Index);
135
136 fn max_msg_length(&self) -> Index;
137
138 fn next_correlation_id(&self) -> i64;
139
140 fn unblock(&self) -> bool;
141}
142
143pub trait MessageHandler {
144 fn on_message(&self, msg_type_id: i32, buffer: &AtomicBuffer, index: Index, length: Index);
145}