rusteron_rb/
lib.rs

1#![allow(non_upper_case_globals)]
2#![allow(non_camel_case_types)]
3#![allow(non_snake_case)]
4#![allow(clippy::all)]
5#![allow(unused_unsafe)]
6#![allow(unused_variables)]
7#![doc = include_str!("../README.md")]
8//! # Features
9//!
10//! - **`static`**: When enabled, this feature statically links the Aeron C code.
11//!   By default, the library uses dynamic linking to the Aeron C libraries.
12//! - **`backtrace`** - When enabled will log a backtrace for each AeronCError
13//! - **`extra-logging`** - When enabled will log when resource is created and destroyed. useful if your seeing a segfault due to a resource being closed
14//! - **`precompile`** - When enabled will use precompiled c code instead of requiring cmake and java to me installed
15
16pub mod bindings {
17    include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
18}
19use bindings::*;
20include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
21include!(concat!(env!("OUT_DIR"), "/rb_custom.rs"));
22
23#[cfg(test)]
24mod tests {
25    use super::*;
26    use std::error;
27
28    #[test]
29    pub fn spsc_normal() -> Result<(), Box<dyn error::Error>> {
30        let rb = AeronSpscRb::new_with_capacity(1024 * 1024, 1024)?;
31        let rb2 = rb.clone();
32
33        for i in 0..100 {
34            // msg_type_id must >0
35            let idx = rb.try_claim(i + 1, 4);
36            assert!(idx >= 0);
37            let slot = rb.buffer_at_mut(idx as usize, 4);
38            slot[0] = i as u8;
39            rb.commit(idx)?;
40        }
41
42        struct Reader {}
43        impl AeronRingBufferHandlerCallback for Reader {
44            fn handle_aeron_rb_handler(&mut self, msg_type_id: i32, buffer: &[u8]) -> () {
45                assert_eq!(buffer[0], (msg_type_id - 1) as u8)
46            }
47        }
48        let handler = AeronRingBufferHandlerWrapper::new(Reader {});
49        for i in 0..10 {
50            let read = rb2.read_msgs(&handler, 10);
51            assert_eq!(10, read);
52        }
53
54        assert_eq!(0, rb2.read(Some(&handler), 10));
55
56        Ok(())
57    }
58
59    #[test]
60    pub fn spsc_control() -> Result<(), Box<dyn error::Error>> {
61        let mut buff = vec![0u8; 1024 * 1024];
62        let rb = AeronSpscRb::from_slice(&mut buff, 1024)?;
63
64        for i in 0..100 {
65            // msg_type_id must >0
66            let idx = rb.try_claim(i + 1, 4);
67            assert!(idx >= 0);
68            let slot = rb.buffer_at_mut(idx as usize, 4);
69            slot[0] = i as u8;
70            rb.commit(idx)?;
71        }
72
73        struct Reader {}
74        impl AeronRingBufferControlledHandlerCallback for Reader {
75            fn handle_aeron_controlled_rb_handler(
76                &mut self,
77                msg_type_id: i32,
78                buffer: &[u8],
79            ) -> aeron_rb_read_action_t {
80                assert_eq!(buffer[0], (msg_type_id - 1) as u8);
81                aeron_rb_read_action_stct::AERON_RB_COMMIT
82            }
83        }
84        let rb = AeronSpscRb::from_slice(&mut buff, 1024)?;
85        let handler = AeronRingBufferControlledHandlerWrapper::new(Reader {});
86        for i in 0..10 {
87            let read = rb.controlled_read_msgs(&handler, 10);
88            assert_eq!(10, read);
89        }
90
91        assert_eq!(0, rb.controlled_read_msgs(&handler, 10));
92
93        Ok(())
94    }
95
96    #[test]
97    pub fn mpsc_normal() -> Result<(), Box<dyn error::Error>> {
98        let rb = AeronMpscRb::new_with_capacity(1024 * 1024, 1024)?;
99
100        for i in 0..100 {
101            // msg_type_id must >0
102            let idx = rb.try_claim(i + 1, 4);
103            assert!(idx >= 0);
104            let slot = rb.buffer_at_mut(idx as usize, 4);
105            slot[0] = i as u8;
106            rb.commit(idx)?;
107        }
108
109        struct Reader {}
110        impl AeronRingBufferHandlerCallback for Reader {
111            fn handle_aeron_rb_handler(&mut self, msg_type_id: i32, buffer: &[u8]) -> () {
112                assert_eq!(buffer[0], (msg_type_id - 1) as u8)
113            }
114        }
115        let handler = AeronRingBufferHandlerWrapper::new(Reader {});
116        for i in 0..10 {
117            let read = rb.read_msgs(&handler, 10);
118            assert_eq!(10, read);
119        }
120
121        assert_eq!(0, rb.read(Some(&handler), 10));
122
123        Ok(())
124    }
125
126    #[test]
127    pub fn mpsc_control() -> Result<(), Box<dyn error::Error>> {
128        let rb = AeronMpscRb::new_with_capacity(1024 * 1024, 1024)?;
129
130        for i in 0..100 {
131            // msg_type_id must >0
132            let idx = rb.try_claim(i + 1, 4);
133            assert!(idx >= 0);
134            let slot = rb.buffer_at_mut(idx as usize, 4);
135            slot[0] = i as u8;
136            rb.commit(idx)?;
137        }
138
139        let rb2 = rb.clone();
140
141        struct Reader {}
142        impl AeronRingBufferControlledHandlerCallback for Reader {
143            fn handle_aeron_controlled_rb_handler(
144                &mut self,
145                msg_type_id: i32,
146                buffer: &[u8],
147            ) -> aeron_rb_read_action_t {
148                assert_eq!(buffer[0], (msg_type_id - 1) as u8);
149                aeron_rb_read_action_stct::AERON_RB_COMMIT
150            }
151        }
152        let handler = AeronRingBufferControlledHandlerWrapper::new(Reader {});
153        for i in 0..10 {
154            let read = rb2.controlled_read_msgs(&handler, 10);
155            assert_eq!(10, read);
156        }
157
158        assert_eq!(0, rb2.controlled_read_msgs(&handler, 10));
159
160        Ok(())
161    }
162
163    #[test]
164    pub fn broadcast() -> Result<(), Box<dyn error::Error>> {
165        let mut vec = vec![0u8; 1024 * 1024 + AERON_BROADCAST_BUFFER_TRAILER_LENGTH];
166        let transmitter = AeronBroadcastTransmitter::from_slice(vec.as_mut_slice(), 1024)?;
167        let receiver = AeronBroadcastReceiver::from_slice(vec.as_mut_slice())?;
168        let receiver2 = AeronBroadcastReceiver::from_slice(vec.as_mut_slice())?;
169        let receiver3 = AeronBroadcastReceiver::from_slice(vec.as_mut_slice())?;
170
171        for i in 0..100 {
172            // msg_type_id must >0
173            let mut msg = [0u8; 4];
174            msg[0] = i as u8;
175            let idx = transmitter.transmit_msg(i + 1, &msg).unwrap();
176            assert!(idx >= 0);
177        }
178
179        struct Reader {}
180        impl AeronBroadcastReceiverHandlerCallback for Reader {
181            fn handle_aeron_broadcast_receiver_handler(
182                &mut self,
183                msg_type_id: i32,
184                buffer: &mut [u8],
185            ) -> () {
186                assert_eq!(buffer[0], (msg_type_id - 1) as u8);
187            }
188        }
189        let handler = Handler::leak(Reader {});
190        for rec in [&receiver, &receiver2, &receiver3] {
191            for i in 0..100 {
192                let read = rec.receive(Some(&handler)).unwrap();
193                assert!(read > 0);
194            }
195        }
196
197        assert_eq!(0, receiver.receive(Some(&handler)).unwrap_or_default());
198
199        Ok(())
200    }
201}