1#![allow(improper_ctypes_definitions)]
2#![allow(non_upper_case_globals)]
3#![allow(non_camel_case_types)]
4#![allow(non_snake_case)]
5#![allow(clippy::all)]
6#![allow(unused_unsafe)]
7#![allow(unused_variables)]
8#![doc = include_str!("../README.md")]
9#[allow(improper_ctypes_definitions)]
19#[allow(unpredictable_function_pointer_comparisons)]
20pub mod bindings {
21 include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
22}
23
24use bindings::*;
25
26include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
27include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
28
29#[cfg(test)]
30mod tests {
31 use super::*;
32 use crate::test_alloc::current_allocs;
33 use log::{error, info};
34 use rusteron_media_driver::AeronDriverContext;
35 use serial_test::serial;
36 use std::error;
37 use std::error::Error;
38 use std::io::Write;
39 use std::os::raw::c_int;
40 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
41 use std::sync::Arc;
42 use std::thread::{sleep, JoinHandle};
43 use std::time::{Duration, Instant};
44
45 #[derive(Default, Debug)]
46 struct ErrorCount {
47 error_count: usize,
48 }
49
50 impl AeronErrorHandlerCallback for ErrorCount {
51 fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
52 error!("Aeron error {}: {}", error_code, msg);
53 self.error_count += 1;
54 }
55 }
56
57 #[test]
58 #[serial]
59 fn version_check() -> Result<(), Box<dyn error::Error>> {
60 unsafe {
61 aeron_randomised_int32();
62 }
63 let alloc_count = current_allocs();
64
65 {
66 let major = unsafe { crate::aeron_version_major() };
67 let minor = unsafe { crate::aeron_version_minor() };
68 let patch = unsafe { crate::aeron_version_patch() };
69
70 let cargo_version = "1.48.6";
71 let aeron_version = format!("{}.{}.{}", major, minor, patch);
72 assert_eq!(aeron_version, cargo_version);
73
74 let ctx = AeronContext::new()?;
75 let error_count = 1;
76 let mut handler = Handler::leak(ErrorCount::default());
77 ctx.set_error_handler(Some(&handler))?;
78
79 assert!(Aeron::epoch_clock() > 0);
80 drop(ctx);
81 assert!(handler.should_drop);
82 handler.release();
83 assert!(!handler.should_drop);
84 drop(handler);
85 }
86
87 assert!(
88 current_allocs() <= alloc_count,
89 "allocations {} > {alloc_count}",
90 current_allocs()
91 );
92
93 Ok(())
94 }
95
96 #[test]
97 #[serial]
98 fn async_publication_invalid_interface_poll_then_drop() -> Result<(), Box<dyn error::Error>> {
99 let _ = env_logger::Builder::new()
100 .is_test(true)
101 .filter_level(log::LevelFilter::Info)
102 .try_init();
103
104 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
105 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
106 media_driver_ctx.set_dir_delete_on_start(true)?;
107 media_driver_ctx.set_dir(
108 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
109 )?;
110 let (stop, driver_handle) =
111 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
112
113 let ctx = AeronContext::new()?;
114 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
115 ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
116 let aeron = Aeron::new(&ctx)?;
117 aeron.start()?;
118
119 let channel = String::from("aeron:udp?endpoint=203.0.113.1:54321");
120
121 let pub_poller = aeron.async_add_publication(&channel.clone().into_c_string(), 4321)?;
124 let sub_poller = aeron.async_add_subscription(
125 &channel.into_c_string(),
126 4321,
127 Handlers::no_available_image_handler(),
128 Handlers::no_unavailable_image_handler(),
129 )?;
130
131 let mut publication: Option<AeronPublication> = None;
132 let mut subscription: Option<AeronSubscription> = None;
133 let start = Instant::now();
134 while start.elapsed() < Duration::from_secs(2) {
135 if publication.is_none() {
136 match pub_poller.poll() {
137 Ok(Some(p)) => publication = Some(p),
138 Ok(None) | Err(_) => {}
139 }
140 }
141 if subscription.is_none() {
142 match sub_poller.poll() {
143 Ok(Some(s)) => subscription = Some(s),
144 Ok(None) | Err(_) => {}
145 }
146 }
147 if publication.is_some() && subscription.is_some() {
148 break;
149 }
150 #[cfg(debug_assertions)]
151 std::thread::sleep(Duration::from_millis(10));
152 }
153
154 info!("publication: {:?}", publication);
155 info!("subscription: {:?}", subscription);
156
157 if let (Some(publisher), Some(subscription)) = (publication, subscription) {
158 let payload = b"hello-aeron";
159 let send_start = Instant::now();
160 let mut sent = false;
161 while send_start.elapsed() < Duration::from_millis(500) {
162 let res = publisher.offer(payload, Handlers::no_reserved_value_supplier_handler());
163 if res >= payload.len() as i64 {
164 sent = true;
165 info!("sent {:?}", payload);
166 break;
167 }
168 std::thread::sleep(Duration::from_millis(10));
169 }
170
171 if sent {
172 let mut got = false;
173 let read_start = Instant::now();
174 while read_start.elapsed() < Duration::from_millis(500) {
175 let _ = subscription.poll_once(
176 |msg, _hdr| {
177 if msg == payload {
178 got = true;
179 info!("received {:?}", payload);
180 }
181 },
182 1024,
183 );
184 if got {
185 break;
186 }
187 std::thread::sleep(Duration::from_millis(10));
188 }
189 }
191 }
192
193 stop.store(true, Ordering::SeqCst);
195 let _ = driver_handle.join().unwrap();
196 Ok(())
197 }
198
199 #[test]
200 #[serial]
201 fn async_pub_sub_invalid_endpoint_create_drop_stress() -> Result<(), Box<dyn error::Error>> {
202 let _ = env_logger::Builder::new()
203 .is_test(true)
204 .filter_level(log::LevelFilter::Info)
205 .try_init();
206
207 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
208 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
209 media_driver_ctx.set_dir_delete_on_start(true)?;
210 media_driver_ctx.set_dir(
211 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
212 )?;
213 let (stop, driver_handle) =
214 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
215
216 let ctx = AeronContext::new()?;
217 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
218 ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
219 let aeron = Aeron::new(&ctx)?;
220 aeron.start()?;
221
222 for i in 0..60u16 {
224 let port = 55000u16 + i;
225 let channel = format!("aeron:udp?endpoint=203.0.113.1:{}", port);
226 let pub_poller =
227 aeron.async_add_publication(&channel.clone().into_c_string(), 4500 + i as i32)?;
228 let sub_poller = aeron.async_add_subscription(
229 &channel.into_c_string(),
230 4500 + i as i32,
231 Handlers::no_available_image_handler(),
232 Handlers::no_unavailable_image_handler(),
233 )?;
234
235 let start = Instant::now();
236 while start.elapsed() < Duration::from_millis(50) {
237 let _ = pub_poller.poll();
238 let _ = sub_poller.poll();
239 }
240
241 if i % 2 == 0 {
242 drop(sub_poller);
243 drop(pub_poller);
244 } else {
245 drop(pub_poller);
246 drop(sub_poller);
247 }
248 }
249
250 stop.store(true, Ordering::SeqCst);
251 let _ = driver_handle.join().unwrap();
252 Ok(())
253 }
254
255 #[test]
256 #[serial]
257 fn async_subscription_invalid_interface_poll_then_drop() -> Result<(), Box<dyn error::Error>> {
259 let _ = env_logger::Builder::new()
260 .is_test(true)
261 .filter_level(log::LevelFilter::Info)
262 .try_init();
263
264 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
265 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
266 media_driver_ctx.set_dir_delete_on_start(true)?;
267 media_driver_ctx.set_dir(
268 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
269 )?;
270 let (stop, driver_handle) =
271 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
272
273 let ctx = AeronContext::new()?;
274 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
275 ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
276 let aeron = Aeron::new(&ctx)?;
277 aeron.start()?;
278
279 let channel = String::from("aeron:udp?endpoint=203.0.113.1:54323");
281
282 let poller = aeron.async_add_subscription(
283 &channel.into_c_string(),
284 4323,
285 Handlers::no_available_image_handler(),
286 Handlers::no_unavailable_image_handler(),
287 )?;
288
289 let start = Instant::now();
290 while start.elapsed() < Duration::from_millis(250) {
291 let _ = poller.poll();
292 #[cfg(debug_assertions)]
293 std::thread::sleep(Duration::from_millis(10));
294 }
295
296 drop(poller);
297
298 stop.store(true, Ordering::SeqCst);
299 let _ = driver_handle.join().unwrap();
300 Ok(())
301 }
302
303 #[test]
304 #[serial]
305 fn blocking_add_subscription_invalid_interface_timeout() -> Result<(), Box<dyn error::Error>> {
306 let _ = env_logger::Builder::new()
307 .is_test(true)
308 .filter_level(log::LevelFilter::Info)
309 .try_init();
310
311 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
312 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
313 media_driver_ctx.set_dir_delete_on_start(true)?;
314 media_driver_ctx.set_dir(
315 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
316 )?;
317 let (stop, driver_handle) =
318 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
319
320 let ctx = AeronContext::new()?;
321 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
322 ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
323 let aeron = Aeron::new(&ctx)?;
324 aeron.start()?;
325
326 let channel = String::from("aeron:udp?endpoint=203.0.113.1:54324");
327
328 let result = aeron.add_subscription(
329 &channel.into_c_string(),
330 4324,
331 Handlers::no_available_image_handler(),
332 Handlers::no_unavailable_image_handler(),
333 Duration::from_millis(300),
334 );
335
336 assert!(result.is_err(), "expected error for invalid interface");
337
338 stop.store(true, Ordering::SeqCst);
339 let _ = driver_handle.join().unwrap();
340 Ok(())
341 }
342
343 #[test]
344 #[serial]
345 fn async_publication_invalid_bind_poll_then_drop() -> Result<(), Box<dyn error::Error>> {
346 let _ = env_logger::Builder::new()
347 .is_test(true)
348 .filter_level(log::LevelFilter::Info)
349 .try_init();
350
351 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
352 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
353 media_driver_ctx.set_dir_delete_on_start(true)?;
354 media_driver_ctx.set_dir(
355 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
356 )?;
357 let (stop, driver_handle) =
358 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
359
360 let ctx = AeronContext::new()?;
361 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
362 ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
363 let aeron = Aeron::new(&ctx)?;
364 aeron.start()?;
365
366 let channel = format!("aeron:udp?endpoint=127.0.0.1:54330|bind=203.0.113.1:60000");
368
369 let poller = aeron.async_add_publication(&channel.into_c_string(), 4330)?;
370 let start = Instant::now();
371 while start.elapsed() < Duration::from_millis(250) {
372 let _ = poller.poll();
373 #[cfg(debug_assertions)]
374 std::thread::sleep(Duration::from_millis(10));
375 }
376 drop(poller);
377
378 stop.store(true, Ordering::SeqCst);
379 let _ = driver_handle.join().unwrap();
380 Ok(())
381 }
382
383 #[test]
384 #[serial]
385 pub fn simple_large_send() -> Result<(), Box<dyn error::Error>> {
386 let _ = env_logger::Builder::new()
387 .is_test(true)
388 .filter_level(log::LevelFilter::Info)
389 .try_init();
390 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
391 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
392 media_driver_ctx.set_dir_delete_on_start(true)?;
393 media_driver_ctx.set_dir(
394 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
395 )?;
396 let (stop, driver_handle) =
397 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
398
399 let ctx = AeronContext::new()?;
400 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
401 assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
402 let error_count = 1;
403 ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
404 ctx.set_on_new_publication(Some(&Handler::leak(AeronNewPublicationLogger)))?;
405 ctx.set_on_available_counter(Some(&Handler::leak(AeronAvailableCounterLogger)))?;
406 ctx.set_on_close_client(Some(&Handler::leak(AeronCloseClientLogger)))?;
407 ctx.set_on_new_subscription(Some(&Handler::leak(AeronNewSubscriptionLogger)))?;
408 ctx.set_on_unavailable_counter(Some(&Handler::leak(AeronUnavailableCounterLogger)))?;
409 ctx.set_on_available_counter(Some(&Handler::leak(AeronAvailableCounterLogger)))?;
410 ctx.set_on_new_exclusive_publication(Some(&Handler::leak(AeronNewPublicationLogger)))?;
411
412 info!("creating client [simple_large_send test]");
413 let aeron = Aeron::new(&ctx)?;
414 info!("starting client");
415
416 aeron.start()?;
417 info!("client started");
418 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
419 info!("created publisher");
420
421 assert!(AeronCncMetadata::load_from_file(ctx.get_dir())?.pid > 0);
422 let cstr = std::ffi::CString::new(ctx.get_dir()).unwrap();
423 AeronCncMetadata::read_from_file(&cstr, |cnc| {
424 assert!(cnc.pid > 0);
425 })?;
426 assert!(AeronCnc::new_on_heap(ctx.get_dir())?.get_to_driver_heartbeat_ms()? > 0);
427 let cstr = std::ffi::CString::new(ctx.get_dir()).unwrap();
428 for _ in 0..50 {
429 AeronCnc::read_on_partial_stack(&cstr, |cnc| {
430 assert!(cnc.get_to_driver_heartbeat_ms().unwrap() > 0);
431 })?;
432 }
433
434 let subscription = aeron.add_subscription(
435 AERON_IPC_STREAM,
436 123,
437 Handlers::no_available_image_handler(),
438 Handlers::no_unavailable_image_handler(),
439 Duration::from_secs(5),
440 )?;
441 info!("created subscription");
442
443 subscription
444 .poll_once(|msg, header| println!("foo"), 1024)
445 .unwrap();
446
447 let string_len = media_driver_ctx.ipc_mtu_length * 100;
449 info!("string length: {}", string_len);
450
451 let publisher_handler = {
452 let stop = stop.clone();
453 std::thread::spawn(move || {
454 let binding = "1".repeat(string_len);
455 let large_msg = binding.as_bytes();
456 loop {
457 if stop.load(Ordering::Acquire) || publisher.is_closed() {
458 break;
459 }
460 let result =
461 publisher.offer(large_msg, Handlers::no_reserved_value_supplier_handler());
462
463 assert_eq!(123, publisher.get_constants().unwrap().stream_id);
464
465 if result < large_msg.len() as i64 {
466 let error = AeronCError::from_code(result as i32);
467 match error.kind() {
468 AeronErrorType::PublicationBackPressured
469 | AeronErrorType::PublicationAdminAction => {
470 }
472 _ => {
473 error!(
474 "ERROR: failed to send message {:?}",
475 AeronCError::from_code(result as i32)
476 );
477 }
478 }
479 sleep(Duration::from_millis(500));
480 }
481 }
482 info!("stopping publisher thread");
483 })
484 };
485
486 let mut assembler = AeronFragmentClosureAssembler::new()?;
487
488 struct Context {
489 count: Arc<AtomicUsize>,
490 stop: Arc<AtomicBool>,
491 string_len: usize,
492 }
493
494 let count = Arc::new(AtomicUsize::new(0usize));
495 let mut context = Context {
496 count: count.clone(),
497 stop: stop.clone(),
498 string_len,
499 };
500
501 let start_time = Instant::now();
503
504 loop {
505 if start_time.elapsed() > Duration::from_secs(30) {
506 info!("Failed: exceeded 30-second timeout");
507 return Err(Box::new(std::io::Error::new(
508 std::io::ErrorKind::TimedOut,
509 "Timeout exceeded",
510 )));
511 }
512 let c = count.load(Ordering::SeqCst);
513 if c > 100 {
514 break;
515 }
516
517 fn process_msg(ctx: &mut Context, buffer: &[u8], header: AeronHeader) {
518 ctx.count.fetch_add(1, Ordering::SeqCst);
519
520 let values = header.get_values().unwrap();
521 assert_ne!(values.frame.session_id, 0);
522
523 if buffer.len() != ctx.string_len {
524 ctx.stop.store(true, Ordering::SeqCst);
525 error!(
526 "ERROR: message was {} but was expecting {} [header={:?}]",
527 buffer.len(),
528 ctx.string_len,
529 header
530 );
531 sleep(Duration::from_secs(1));
532 }
533
534 assert_eq!(buffer.len(), ctx.string_len);
535 assert_eq!(buffer, "1".repeat(ctx.string_len).as_bytes());
536 }
537
538 subscription.poll(assembler.process(&mut context, process_msg), 128)?;
539 assert_eq!(123, subscription.get_constants().unwrap().stream_id);
540 }
541
542 subscription.close(Handlers::no_notification_handler())?;
543
544 info!("stopping client");
545 stop.store(true, Ordering::SeqCst);
546
547 let _ = publisher_handler.join().unwrap();
548 let _ = driver_handle.join().unwrap();
549
550 let cnc = AeronCnc::new_on_heap(ctx.get_dir())?;
551 cnc.counters_reader().foreach_counter_once(
552 |value: i64, id: i32, type_id: i32, key: &[u8], label: &str| {
553 println!("counter reader id={id}, type_id={type_id}, key={key:?}, label={label}, value={value} [type={:?}]",
554 AeronSystemCounterType::try_from(type_id));
555 },
556 );
557 cnc.error_log_read_once(| observation_count: i32,
558 first_observation_timestamp: i64,
559 last_observation_timestamp: i64,
560 error: &str| {
561 println!("error: {error} observationCount={observation_count}, first_observation_timestamp={first_observation_timestamp}, last_observation_timestamp={last_observation_timestamp}");
562 }, 0);
563 cnc.loss_reporter_read_once(| observation_count: i64,
564 total_bytes_lost: i64,
565 first_observation_timestamp: i64,
566 last_observation_timestamp: i64,
567 session_id: i32,
568 stream_id: i32,
569 channel: &str,
570 source: &str,| {
571 println!("loss reporter observationCount={observation_count}, totalBytesLost={total_bytes_lost}, first_observed={first_observation_timestamp}, last_observed={last_observation_timestamp}, session_id={session_id}, stream_id={stream_id}, channel={channel} source={source}");
572 })?;
573
574 Ok(())
575 }
576
577 #[test]
578 #[serial]
579 pub fn try_claim() -> Result<(), Box<dyn error::Error>> {
580 let _ = env_logger::Builder::new()
581 .is_test(true)
582 .filter_level(log::LevelFilter::Info)
583 .try_init();
584 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
585 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
586 media_driver_ctx.set_dir_delete_on_start(true)?;
587 media_driver_ctx.set_dir(
588 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
589 )?;
590 let (stop, driver_handle) =
591 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
592
593 let ctx = AeronContext::new()?;
594 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
595 assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
596 ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
597
598 info!("creating client [try_claim test]");
599 let aeron = Aeron::new(&ctx)?;
600 info!("starting client");
601
602 aeron.start()?;
603 info!("client started");
604 const STREAM_ID: i32 = 123;
605 let publisher =
606 aeron.add_publication(AERON_IPC_STREAM, STREAM_ID, Duration::from_secs(5))?;
607 info!("created publisher");
608
609 let subscription = aeron.add_subscription(
610 AERON_IPC_STREAM,
611 STREAM_ID,
612 Handlers::no_available_image_handler(),
613 Handlers::no_unavailable_image_handler(),
614 Duration::from_secs(5),
615 )?;
616 info!("created subscription");
617
618 let string_len = 156;
620 info!("string length: {}", string_len);
621
622 let publisher_handler = {
623 let stop = stop.clone();
624 std::thread::spawn(move || {
625 let binding = "1".repeat(string_len);
626 let msg = binding.as_bytes();
627 let buffer = AeronBufferClaim::default();
628 loop {
629 if stop.load(Ordering::Acquire) || publisher.is_closed() {
630 break;
631 }
632
633 let result = publisher.try_claim(string_len, &buffer);
634
635 if result < msg.len() as i64 {
636 error!(
637 "ERROR: failed to send message {:?}",
638 AeronCError::from_code(result as i32)
639 );
640 } else {
641 buffer.data().write_all(&msg).unwrap();
642 buffer.commit().unwrap();
643 }
644 }
645 info!("stopping publisher thread");
646 })
647 };
648
649 let count = Arc::new(AtomicUsize::new(0usize));
650 let count_copy = Arc::clone(&count);
651 let stop2 = stop.clone();
652
653 struct FragmentHandler {
654 count_copy: Arc<AtomicUsize>,
655 stop2: Arc<AtomicBool>,
656 string_len: usize,
657 }
658
659 impl AeronFragmentHandlerCallback for FragmentHandler {
660 fn handle_aeron_fragment_handler(&mut self, buffer: &[u8], header: AeronHeader) {
661 assert_eq!(STREAM_ID, header.get_values().unwrap().frame.stream_id);
662 let header = header.get_values().unwrap();
663 let frame = header.frame();
664 let stream_id = frame.stream_id();
665 assert_eq!(STREAM_ID, stream_id);
666
667 self.count_copy.fetch_add(1, Ordering::SeqCst);
668
669 if buffer.len() != self.string_len {
670 self.stop2.store(true, Ordering::SeqCst);
671 error!(
672 "ERROR: message was {} but was expecting {} [header={:?}]",
673 buffer.len(),
674 self.string_len,
675 header
676 );
677 sleep(Duration::from_secs(1));
678 }
679
680 assert_eq!(buffer.len(), self.string_len);
681 assert_eq!(buffer, "1".repeat(self.string_len).as_bytes());
682 }
683 }
684
685 let (closure, _inner) = Handler::leak_with_fragment_assembler(FragmentHandler {
686 count_copy,
687 stop2,
688 string_len,
689 })?;
690 let start_time = Instant::now();
691
692 loop {
693 if start_time.elapsed() > Duration::from_secs(30) {
694 info!("Failed: exceeded 30-second timeout");
695 return Err(Box::new(std::io::Error::new(
696 std::io::ErrorKind::TimedOut,
697 "Timeout exceeded",
698 )));
699 }
700 let c = count.load(Ordering::SeqCst);
701 if c > 100 {
702 break;
703 }
704 subscription.poll(Some(&closure), 128)?;
705 }
706
707 info!("stopping client");
708
709 stop.store(true, Ordering::SeqCst);
710
711 let _ = publisher_handler.join().unwrap();
712 let _ = driver_handle.join().unwrap();
713 Ok(())
714 }
715
716 #[test]
717 #[serial]
718 pub fn counters() -> Result<(), Box<dyn error::Error>> {
719 let _ = env_logger::Builder::new()
720 .is_test(true)
721 .filter_level(log::LevelFilter::Info)
722 .try_init();
723 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
724 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
725 media_driver_ctx.set_dir_delete_on_start(true)?;
726 media_driver_ctx.set_dir(
727 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
728 )?;
729 let (stop, driver_handle) =
730 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
731
732 let ctx = AeronContext::new()?;
733 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
734 assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
735 ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
736 ctx.set_on_unavailable_counter(Some(&Handler::leak(AeronUnavailableCounterLogger)))?;
737
738 struct AvailableCounterHandler {
739 found_counter: bool,
740 }
741
742 impl AeronAvailableCounterCallback for AvailableCounterHandler {
743 fn handle_aeron_on_available_counter(
744 &mut self,
745 counters_reader: AeronCountersReader,
746 registration_id: i64,
747 counter_id: i32,
748 ) -> () {
749 info!(
750 "on counter key={:?}, label={:?} registration_id={registration_id}, counter_id={counter_id}, value={}, {counters_reader:?}",
751 String::from_utf8(counters_reader.get_counter_key(counter_id).unwrap()),
752 counters_reader.get_counter_label(counter_id, 1000),
753 counters_reader.addr(counter_id)
754 );
755
756 assert_eq!(
757 counters_reader.counter_registration_id(counter_id).unwrap(),
758 registration_id
759 );
760
761 if let Ok(label) = counters_reader.get_counter_label(counter_id, 1000) {
762 if label == "label_buffer" {
763 self.found_counter = true;
764 assert_eq!(
765 &counters_reader.get_counter_key(counter_id).unwrap(),
766 "key".as_bytes()
767 );
768 }
769 }
770 }
771 }
772
773 let handler = &Handler::leak(AvailableCounterHandler {
774 found_counter: false,
775 });
776 ctx.set_on_available_counter(Some(handler))?;
777
778 info!("creating client");
779 let aeron = Aeron::new(&ctx)?;
780 info!("starting client");
781
782 aeron.start()?;
783 info!("client started [counters test]");
784
785 let counter = aeron.add_counter(
786 123,
787 "key".as_bytes(),
788 "label_buffer",
789 Duration::from_secs(5),
790 )?;
791 let constants = counter.get_constants()?;
792 let counter_id = constants.counter_id;
793
794 let publisher_handler = {
795 let stop = stop.clone();
796 let counter = counter.clone();
797 std::thread::spawn(move || {
798 for _ in 0..150 {
799 if stop.load(Ordering::Acquire) || counter.is_closed() {
800 break;
801 }
802 counter.addr_atomic().fetch_add(1, Ordering::SeqCst);
803 }
804 info!("stopping publisher thread");
805 })
806 };
807
808 let now = Instant::now();
809 while counter.addr_atomic().load(Ordering::SeqCst) < 100
810 && now.elapsed() < Duration::from_secs(10)
811 {
812 sleep(Duration::from_micros(10));
813 }
814
815 assert!(now.elapsed() < Duration::from_secs(10));
816
817 info!(
818 "counter is {}",
819 counter.addr_atomic().load(Ordering::SeqCst)
820 );
821
822 info!("stopping client");
823
824 #[cfg(not(target_os = "windows"))] assert!(handler.found_counter);
826
827 stop.store(true, Ordering::SeqCst);
828
829 let reader = aeron.counters_reader();
830 assert_eq!(reader.get_counter_label(counter_id, 256)?, "label_buffer");
831 assert_eq!(reader.get_counter_key(counter_id)?, "key".as_bytes());
832 let buffers = AeronCountersReaderBuffers::default();
833 reader.get_buffers(&buffers)?;
834
835 let _ = publisher_handler.join().unwrap();
836 let _ = driver_handle.join().unwrap();
837 Ok(())
838 }
839
840 #[derive(Default, Debug)]
842 struct TestErrorCount {
843 pub error_count: usize,
844 }
845
846 impl Drop for TestErrorCount {
847 fn drop(&mut self) {
848 info!("TestErrorCount dropped with {} errors", self.error_count);
849 }
850 }
851
852 impl AeronErrorHandlerCallback for TestErrorCount {
853 fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
854 error!("Aeron error {}: {}", error_code, msg);
855 self.error_count += 1;
856 }
857 }
858
859 #[test]
860 #[serial]
861 pub fn backpressure_recovery_test() -> Result<(), Box<dyn error::Error>> {
862 let _ = env_logger::Builder::new()
863 .is_test(true)
864 .filter_level(log::LevelFilter::Info)
865 .try_init();
866
867 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
868 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
869 media_driver_ctx.set_dir_delete_on_start(true)?;
870 media_driver_ctx.set_dir(
871 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
872 )?;
873 let (stop, driver_handle) =
874 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
875
876 let ctx = AeronContext::new()?;
877 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
878 ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
879
880 let aeron = Aeron::new(&ctx)?;
881 aeron.start()?;
882
883 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
884 let subscription = aeron.add_subscription(
885 AERON_IPC_STREAM,
886 123,
887 Handlers::no_available_image_handler(),
888 Handlers::no_unavailable_image_handler(),
889 Duration::from_secs(5),
890 )?;
891
892 let count = Arc::new(AtomicUsize::new(0));
893 let start_time = Instant::now();
894
895 let publisher_thread = {
897 let stop = stop.clone();
898 std::thread::spawn(move || {
899 while !stop.load(Ordering::Acquire) {
900 let msg = b"test";
901 let result =
902 publisher.offer(msg, Handlers::no_reserved_value_supplier_handler());
903 if result == AeronErrorType::PublicationBackPressured.code() as i64 {
905 sleep(Duration::from_millis(50));
906 }
907 }
908 })
909 };
910
911 while count.load(Ordering::SeqCst) < 50 && start_time.elapsed() < Duration::from_secs(10) {
913 let _ = subscription.poll_once(
914 |_msg, _header| {
915 count.fetch_add(1, Ordering::SeqCst);
916 },
917 128,
918 )?;
919 }
920
921 stop.store(true, Ordering::SeqCst);
922 publisher_thread.join().unwrap();
923 let _ = driver_handle.join().unwrap();
924
925 assert!(
926 count.load(Ordering::SeqCst) >= 50,
927 "Expected at least 50 messages received"
928 );
929 Ok(())
930 }
931
932 #[test]
933 #[serial]
934 pub fn multi_subscription_test() -> Result<(), Box<dyn error::Error>> {
935 let _ = env_logger::Builder::new()
936 .is_test(true)
937 .filter_level(log::LevelFilter::Info)
938 .try_init();
939
940 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
941 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
942 media_driver_ctx.set_dir_delete_on_start(true)?;
943 media_driver_ctx.set_dir(
944 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
945 )?;
946 let (_stop, driver_handle) =
947 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
948
949 let ctx = AeronContext::new()?;
950 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
951 ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
952
953 let aeron = Aeron::new(&ctx)?;
954 aeron.start()?;
955 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
956
957 let subscription1 = aeron.add_subscription(
959 AERON_IPC_STREAM,
960 123,
961 Handlers::no_available_image_handler(),
962 Handlers::no_unavailable_image_handler(),
963 Duration::from_secs(5),
964 )?;
965 let subscription2 = aeron.add_subscription(
966 AERON_IPC_STREAM,
967 123,
968 Handlers::no_available_image_handler(),
969 Handlers::no_unavailable_image_handler(),
970 Duration::from_secs(5),
971 )?;
972
973 let count1 = Arc::new(AtomicUsize::new(0));
974 let count2 = Arc::new(AtomicUsize::new(0));
975
976 let msg = b"hello multi-subscription";
978 let result = publisher.offer(msg, Handlers::no_reserved_value_supplier_handler());
979 assert!(
980 result >= msg.len() as i64,
981 "Message should be sent successfully"
982 );
983
984 let start_time = Instant::now();
985 while (count1.load(Ordering::SeqCst) < 1 || count2.load(Ordering::SeqCst) < 1)
987 && start_time.elapsed() < Duration::from_secs(5)
988 {
989 let _ = subscription1.poll_once(
990 |_msg, _header| {
991 count1.fetch_add(1, Ordering::SeqCst);
992 },
993 128,
994 )?;
995 let _ = subscription2.poll_once(
996 |_msg, _header| {
997 count2.fetch_add(1, Ordering::SeqCst);
998 },
999 128,
1000 )?;
1001 }
1002
1003 assert!(
1004 count1.load(Ordering::SeqCst) >= 1,
1005 "Subscription 1 did not receive the message"
1006 );
1007 assert!(
1008 count2.load(Ordering::SeqCst) >= 1,
1009 "Subscription 2 did not receive the message"
1010 );
1011
1012 _stop.store(true, Ordering::SeqCst);
1013 let _ = driver_handle.join().unwrap();
1014 Ok(())
1015 }
1016
1017 #[test]
1018 #[serial]
1019 pub fn should_be_able_to_drop_after_close_manually_being_closed(
1020 ) -> Result<(), Box<dyn error::Error>> {
1021 let _ = env_logger::Builder::new()
1022 .is_test(true)
1023 .filter_level(log::LevelFilter::Info)
1024 .try_init();
1025
1026 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1027 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1028 media_driver_ctx.set_dir_delete_on_start(true)?;
1029 media_driver_ctx.set_dir(
1030 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
1031 )?;
1032 let (_stop, driver_handle) =
1033 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1034
1035 let ctx = AeronContext::new()?;
1036 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
1037 ctx.set_error_handler(Some(&Handler::leak(AeronErrorHandlerLogger)))?;
1038
1039 let aeron = Aeron::new(&ctx)?;
1040 aeron.start()?;
1041
1042 {
1043 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
1044 info!("created publication [sessionId={}]", publisher.session_id());
1045 publisher.close_with_no_args()?;
1046 drop(publisher);
1047 }
1048
1049 {
1050 let publisher = aeron.add_publication(AERON_IPC_STREAM, 124, Duration::from_secs(5))?;
1051 info!("created publication [sessionId={}]", publisher.session_id());
1052 publisher.close(Handlers::no_notification_handler())?;
1053 drop(publisher);
1054 }
1055
1056 {
1057 let publisher = aeron.add_publication(AERON_IPC_STREAM, 125, Duration::from_secs(5))?;
1058 publisher.close_once(|| println!("on close"))?;
1059 info!("created publication [sessionId={}]", publisher.session_id());
1060 drop(publisher);
1061 }
1062
1063 Ok(())
1064 }
1065
1066 #[test]
1067 #[serial]
1068 pub fn offer_on_closed_publication_error_test() -> Result<(), Box<dyn error::Error>> {
1069 let _ = env_logger::Builder::new()
1070 .is_test(true)
1071 .filter_level(log::LevelFilter::Info)
1072 .try_init();
1073
1074 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1075 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1076 media_driver_ctx.set_dir_delete_on_start(true)?;
1077 media_driver_ctx.set_dir(
1078 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
1079 )?;
1080 let (_stop, driver_handle) =
1081 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1082
1083 let ctx = AeronContext::new()?;
1084 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
1085 ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
1086
1087 let aeron = Aeron::new(&ctx)?;
1088 aeron.start()?;
1089 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
1090
1091 publisher.close(Handlers::no_notification_handler())?;
1093
1094 let result = publisher.offer(
1096 b"should fail",
1097 Handlers::no_reserved_value_supplier_handler(),
1098 );
1099 assert!(
1100 result < 0,
1101 "Offering on a closed publication should return a negative error code"
1102 );
1103
1104 _stop.store(true, Ordering::SeqCst);
1105 let _ = driver_handle.join().unwrap();
1106 Ok(())
1107 }
1108
1109 #[test]
1111 #[serial]
1112 pub fn empty_message_test() -> Result<(), Box<dyn error::Error>> {
1113 let _ = env_logger::Builder::new()
1114 .is_test(true)
1115 .filter_level(log::LevelFilter::Info)
1116 .try_init();
1117
1118 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1119 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1120 media_driver_ctx.set_dir_delete_on_start(true)?;
1121 media_driver_ctx.set_dir(
1122 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
1123 )?;
1124 let (_stop, driver_handle) =
1125 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1126
1127 let ctx = AeronContext::new()?;
1128 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
1129 ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
1130
1131 let aeron = Aeron::new(&ctx)?;
1132 aeron.start()?;
1133 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
1134 let subscription = aeron.add_subscription(
1135 AERON_IPC_STREAM,
1136 123,
1137 Handlers::no_available_image_handler(),
1138 Handlers::no_unavailable_image_handler(),
1139 Duration::from_secs(5),
1140 )?;
1141
1142 let empty_received = Arc::new(AtomicBool::new(false));
1143 let start_time = Instant::now();
1144
1145 let result = publisher.offer(b"", Handlers::no_reserved_value_supplier_handler());
1146 assert!(result > 0);
1147
1148 while !empty_received.load(Ordering::SeqCst)
1149 && start_time.elapsed() < Duration::from_secs(5)
1150 {
1151 let _ = subscription.poll_once(
1152 |msg, _header| {
1153 if msg.is_empty() {
1154 empty_received.store(true, Ordering::SeqCst);
1155 }
1156 },
1157 128,
1158 )?;
1159 }
1160
1161 assert!(
1162 empty_received.load(Ordering::SeqCst),
1163 "Empty message was not received"
1164 );
1165 _stop.store(true, Ordering::SeqCst);
1166 let _ = driver_handle.join().unwrap();
1167 Ok(())
1168 }
1169
1170 #[test]
1171 #[serial]
1172 #[ignore] pub fn tags() -> Result<(), Box<dyn error::Error>> {
1174 let _ = env_logger::Builder::new()
1175 .is_test(true)
1176 .filter_level(log::LevelFilter::Debug)
1177 .try_init();
1178
1179 let (md_ctx, stop, md) = start_media_driver(1)?;
1180
1181 let (_a_ctx2, aeron_sub) = create_client(&md_ctx)?;
1182
1183 info!("creating suscriber 1");
1184 let sub = aeron_sub
1185 .add_subscription(
1186 &"aeron:udp?tags=100".into_c_string(),
1187 123,
1188 Handlers::no_available_image_handler(),
1189 Handlers::no_unavailable_image_handler(),
1190 Duration::from_secs(50),
1191 )
1192 .map_err(|e| {
1193 error!("aeron error={}", Aeron::errmsg());
1194 e
1195 })?;
1196
1197 let ctx = AeronContext::new()?;
1198 ctx.set_dir(&aeron_sub.context().get_dir().into_c_string())?;
1199 let aeron = Aeron::new(&ctx)?;
1200 aeron.start()?;
1201
1202 info!("creating suscriber 2");
1203 let sub2 = aeron_sub.add_subscription(
1204 &"aeron:udp?tags=100".into_c_string(),
1205 123,
1206 Handlers::no_available_image_handler(),
1207 Handlers::no_unavailable_image_handler(),
1208 Duration::from_secs(50),
1209 )?;
1210
1211 let (_a_ctx1, aeron_pub) = create_client(&md_ctx)?;
1212 info!("creating publisher");
1213 assert!(!aeron_pub.is_closed());
1214 let publisher = aeron_pub
1215 .add_publication(
1216 &"aeron:udp?endpoint=localhost:4040|alias=test|tags=100".into_c_string(),
1217 123,
1218 Duration::from_secs(5),
1219 )
1220 .map_err(|e| {
1221 error!("aeron error={}", Aeron::errmsg());
1222 e
1223 })?;
1224
1225 info!("publishing msg");
1226
1227 loop {
1228 let result = publisher.offer(
1229 "213".as_bytes(),
1230 Handlers::no_reserved_value_supplier_handler(),
1231 );
1232 if result < 0 {
1233 error!(
1234 "failed to publish {:?}",
1235 AeronCError::from_code(result as i32)
1236 );
1237 } else {
1238 break;
1239 }
1240 }
1241
1242 sub.poll_once(
1243 |msg, _header| {
1244 println!("Received message: {:?}", msg);
1245 },
1246 128,
1247 )?;
1248 sub2.poll_once(
1249 |msg, _header| {
1250 println!("Received message: {:?}", msg);
1251 },
1252 128,
1253 )?;
1254
1255 stop.store(true, Ordering::SeqCst);
1256
1257 Ok(())
1258 }
1259
1260 fn create_client(
1261 media_driver_ctx: &AeronDriverContext,
1262 ) -> Result<(AeronContext, Aeron), Box<dyn Error>> {
1263 let dir = media_driver_ctx.get_dir();
1264 info!("creating aeron client [dir={}]", dir);
1265 let ctx = AeronContext::new()?;
1266 ctx.set_dir(&dir.into_c_string())?;
1267 ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
1268 let aeron = Aeron::new(&ctx)?;
1269 aeron.start()?;
1270 Ok((ctx, aeron))
1271 }
1272
1273 fn start_media_driver(
1274 instance: u64,
1275 ) -> Result<
1276 (
1277 AeronDriverContext,
1278 Arc<AtomicBool>,
1279 JoinHandle<Result<(), rusteron_media_driver::AeronCError>>,
1280 ),
1281 Box<dyn Error>,
1282 > {
1283 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1284 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1285 media_driver_ctx.set_dir_delete_on_start(true)?;
1286 media_driver_ctx.set_dir(
1287 &format!(
1288 "{}{}-{}",
1289 media_driver_ctx.get_dir(),
1290 Aeron::epoch_clock(),
1291 instance
1292 )
1293 .into_c_string(),
1294 )?;
1295 let (stop, driver_handle) =
1296 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1297 Ok((media_driver_ctx, stop, driver_handle))
1298 }
1299
1300 #[doc = include_str!("../../README.md")]
1301 mod readme_tests {}
1302}