chair_rb/concurrent/ring_buffer/
mod.rs

1/*
2 * Copyright 2021 Andrew Trumbo
3 * This work is a derivative of:
4 * https://github.com/real-logic/aeron/
5 * Copyright 2014-2021 Real Logic Limited.
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * https://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 */
19
20use std::mem::size_of;
21
22use crate::concurrent::AtomicBuffer;
23use crate::util::bit_util;
24use crate::util::bit_util::CACHE_LINE_LENGTH;
25use crate::util::Index;
26
27pub use self::many_to_one_ring_buffer::ManyToOneRingBuffer;
28pub use self::one_to_one_ring_buffer::OneToOneRingBuffer;
29
30pub mod many_to_one_ring_buffer;
31pub mod one_to_one_ring_buffer;
32
33pub struct RingBufferDescriptor;
34
35impl RingBufferDescriptor {
36    pub const TAIL_POSITION_OFFSET: Index = CACHE_LINE_LENGTH * 2;
37    pub const HEAD_CACHE_POSITION_OFFSET: Index = CACHE_LINE_LENGTH * 4;
38    pub const HEAD_POSITION_OFFSET: Index = CACHE_LINE_LENGTH * 6;
39    pub const CORRELATION_COUNTER_OFFSET: Index = CACHE_LINE_LENGTH * 8;
40    pub const CONSUMER_HEARTBEAT_OFFSET: Index = CACHE_LINE_LENGTH * 10;
41
42    /* Total length of the trailer in bytes. */
43    pub const TRAILER_LENGTH: Index = CACHE_LINE_LENGTH * 12;
44
45    #[inline]
46    fn check_capacity(capacity: Index) {
47        if !bit_util::is_power_of_two(capacity) {
48            panic!(
49                "Capacity must be a positive power of 2 + TRAILER_LENGTH: capacity={}",
50                capacity
51            )
52        }
53    }
54}
55
56/**
57* Header length made up of fields for message length, message type, and then the encoded message.
58* <p>
59* Writing of the record length signals the message recording is complete.
60* <pre>
61*   0                   1                   2                   3
62*   0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
63*  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
64*  |R|                       Record Length                         |
65*  +-+-------------------------------------------------------------+
66*  |                              Type                             |
67*  +---------------------------------------------------------------+
68*  |                       Encoded Message                        ...
69* ...                                                              |
70*  +---------------------------------------------------------------+
71* </pre>
72*/
73pub struct RecordDescriptor;
74
75impl RecordDescriptor {
76    pub const HEADER_LENGTH: Index = size_of::<Index>() as Index * 2;
77    pub const ALIGNMENT: Index = RecordDescriptor::HEADER_LENGTH;
78    pub const PADDING_MSG_TYPE_ID: Index = -1;
79
80    #[inline]
81    pub fn length_offset(record_offset: Index) -> Index {
82        record_offset
83    }
84
85    #[inline]
86    pub fn type_offset(record_offset: Index) -> Index {
87        record_offset + size_of::<Index>() as Index
88    }
89
90    #[inline]
91    pub fn encoded_msg_offset(record_offset: Index) -> Index {
92        record_offset + RecordDescriptor::HEADER_LENGTH
93    }
94
95    #[inline]
96    pub fn make_header(length: i32, msg_type_id: i32) -> i64 {
97        (((msg_type_id as i64) & 0xFFFFFFFF) << 32) | (length as i64 & 0xFFFFFFFF)
98    }
99
100    #[inline]
101    pub fn record_length(header: i64) -> i32 {
102        header as i32
103    }
104
105    #[inline]
106    pub fn message_type_id(header: i64) -> i32 {
107        (header >> 32) as i32
108    }
109
110    #[inline]
111    pub fn check_msg_type_id(msg_type_id: i32) {
112        if msg_type_id < 1 {
113            panic!(
114                "Message type id must be greater than zero, msgTypeId={}",
115                msg_type_id
116            );
117        }
118    }
119}
120
121pub trait RingBuffer {
122    fn capacity(&self) -> Index;
123
124    fn write(
125        &self,
126        msg_type_id: i32,
127        src_buffer: &AtomicBuffer,
128        src_index: Index,
129        length: Index,
130    ) -> bool;
131
132    fn read<'a, F>(&'a self, handler: F, message_count_limit: u32) -> u32
133    where
134        F: FnMut(i32, &'a AtomicBuffer, Index, Index);
135
136    fn max_msg_length(&self) -> Index;
137
138    fn next_correlation_id(&self) -> i64;
139
140    fn unblock(&self) -> bool;
141}
142
143pub trait MessageHandler {
144    fn on_message(&self, msg_type_id: i32, buffer: &AtomicBuffer, index: Index, length: Index);
145}