1#![allow(non_upper_case_globals)]
2#![allow(non_camel_case_types)]
3#![allow(non_snake_case)]
4#![allow(clippy::all)]
5#![allow(unused_unsafe)]
6#![allow(unused_variables)]
7#![doc = include_str!("../README.md")]
8pub mod bindings {
17 include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
18}
19use bindings::*;
20include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
21include!(concat!(env!("OUT_DIR"), "/rb_custom.rs"));
22
23#[cfg(test)]
24mod tests {
25 use super::*;
26 use std::error;
27
28 #[test]
29 pub fn spsc_normal() -> Result<(), Box<dyn error::Error>> {
30 let rb = AeronSpscRb::new_with_capacity(1024 * 1024, 1024)?;
31 let rb2 = rb.clone();
32
33 for i in 0..100 {
34 let idx = rb.try_claim(i + 1, 4);
36 assert!(idx >= 0);
37 let slot = rb.buffer_at_mut(idx as usize, 4);
38 slot[0] = i as u8;
39 rb.commit(idx)?;
40 }
41
42 struct Reader {}
43 impl AeronRingBufferHandlerCallback for Reader {
44 fn handle_aeron_rb_handler(&mut self, msg_type_id: i32, buffer: &[u8]) -> () {
45 assert_eq!(buffer[0], (msg_type_id - 1) as u8)
46 }
47 }
48 let handler = AeronRingBufferHandlerWrapper::new(Reader {});
49 for i in 0..10 {
50 let read = rb2.read_msgs(&handler, 10);
51 assert_eq!(10, read);
52 }
53
54 assert_eq!(0, rb2.read(Some(&handler), 10));
55
56 Ok(())
57 }
58
59 #[test]
60 pub fn spsc_control() -> Result<(), Box<dyn error::Error>> {
61 let mut buff = vec![0u8; 1024 * 1024];
62 let rb = AeronSpscRb::from_slice(&mut buff, 1024)?;
63
64 for i in 0..100 {
65 let idx = rb.try_claim(i + 1, 4);
67 assert!(idx >= 0);
68 let slot = rb.buffer_at_mut(idx as usize, 4);
69 slot[0] = i as u8;
70 rb.commit(idx)?;
71 }
72
73 struct Reader {}
74 impl AeronRingBufferControlledHandlerCallback for Reader {
75 fn handle_aeron_controlled_rb_handler(
76 &mut self,
77 msg_type_id: i32,
78 buffer: &[u8],
79 ) -> aeron_rb_read_action_t {
80 assert_eq!(buffer[0], (msg_type_id - 1) as u8);
81 aeron_rb_read_action_stct::AERON_RB_COMMIT
82 }
83 }
84 let rb = AeronSpscRb::from_slice(&mut buff, 1024)?;
85 let handler = AeronRingBufferControlledHandlerWrapper::new(Reader {});
86 for i in 0..10 {
87 let read = rb.controlled_read_msgs(&handler, 10);
88 assert_eq!(10, read);
89 }
90
91 assert_eq!(0, rb.controlled_read_msgs(&handler, 10));
92
93 Ok(())
94 }
95
96 #[test]
97 pub fn mpsc_normal() -> Result<(), Box<dyn error::Error>> {
98 let rb = AeronMpscRb::new_with_capacity(1024 * 1024, 1024)?;
99
100 for i in 0..100 {
101 let idx = rb.try_claim(i + 1, 4);
103 assert!(idx >= 0);
104 let slot = rb.buffer_at_mut(idx as usize, 4);
105 slot[0] = i as u8;
106 rb.commit(idx)?;
107 }
108
109 struct Reader {}
110 impl AeronRingBufferHandlerCallback for Reader {
111 fn handle_aeron_rb_handler(&mut self, msg_type_id: i32, buffer: &[u8]) -> () {
112 assert_eq!(buffer[0], (msg_type_id - 1) as u8)
113 }
114 }
115 let handler = AeronRingBufferHandlerWrapper::new(Reader {});
116 for i in 0..10 {
117 let read = rb.read_msgs(&handler, 10);
118 assert_eq!(10, read);
119 }
120
121 assert_eq!(0, rb.read(Some(&handler), 10));
122
123 Ok(())
124 }
125
126 #[test]
127 pub fn mpsc_control() -> Result<(), Box<dyn error::Error>> {
128 let rb = AeronMpscRb::new_with_capacity(1024 * 1024, 1024)?;
129
130 for i in 0..100 {
131 let idx = rb.try_claim(i + 1, 4);
133 assert!(idx >= 0);
134 let slot = rb.buffer_at_mut(idx as usize, 4);
135 slot[0] = i as u8;
136 rb.commit(idx)?;
137 }
138
139 let rb2 = rb.clone();
140
141 struct Reader {}
142 impl AeronRingBufferControlledHandlerCallback for Reader {
143 fn handle_aeron_controlled_rb_handler(
144 &mut self,
145 msg_type_id: i32,
146 buffer: &[u8],
147 ) -> aeron_rb_read_action_t {
148 assert_eq!(buffer[0], (msg_type_id - 1) as u8);
149 aeron_rb_read_action_stct::AERON_RB_COMMIT
150 }
151 }
152 let handler = AeronRingBufferControlledHandlerWrapper::new(Reader {});
153 for i in 0..10 {
154 let read = rb2.controlled_read_msgs(&handler, 10);
155 assert_eq!(10, read);
156 }
157
158 assert_eq!(0, rb2.controlled_read_msgs(&handler, 10));
159
160 Ok(())
161 }
162
163 #[test]
164 pub fn broadcast() -> Result<(), Box<dyn error::Error>> {
165 let mut vec = vec![0u8; 1024 * 1024 + AERON_BROADCAST_BUFFER_TRAILER_LENGTH];
166 let transmitter = AeronBroadcastTransmitter::from_slice(vec.as_mut_slice(), 1024)?;
167 let receiver = AeronBroadcastReceiver::from_slice(vec.as_mut_slice())?;
168 let receiver2 = AeronBroadcastReceiver::from_slice(vec.as_mut_slice())?;
169 let receiver3 = AeronBroadcastReceiver::from_slice(vec.as_mut_slice())?;
170
171 for i in 0..100 {
172 let mut msg = [0u8; 4];
174 msg[0] = i as u8;
175 let idx = transmitter.transmit_msg(i + 1, &msg).unwrap();
176 assert!(idx >= 0);
177 }
178
179 struct Reader {}
180 impl AeronBroadcastReceiverHandlerCallback for Reader {
181 fn handle_aeron_broadcast_receiver_handler(
182 &mut self,
183 msg_type_id: i32,
184 buffer: &mut [u8],
185 ) -> () {
186 assert_eq!(buffer[0], (msg_type_id - 1) as u8);
187 }
188 }
189 let handler = Handler::leak(Reader {});
190 for rec in [&receiver, &receiver2, &receiver3] {
191 for i in 0..100 {
192 let read = rec.receive(Some(&handler)).unwrap();
193 assert!(read > 0);
194 }
195 }
196
197 assert_eq!(0, receiver.receive(Some(&handler)).unwrap_or_default());
198
199 Ok(())
200 }
201}