rusteron_client/
lib.rs

1#![allow(improper_ctypes_definitions)]
2#![allow(non_upper_case_globals)]
3#![allow(non_camel_case_types)]
4#![allow(non_snake_case)]
5#![allow(clippy::all)]
6#![allow(unused_unsafe)]
7#![allow(unused_variables)]
8#![doc = include_str!("../README.md")]
9//! # Features
10//!
11//! - **`static`**: When enabled, this feature statically links the Aeron C code.
12//!   By default, the library uses dynamic linking to the Aeron C libraries.
13//! - **`backtrace`**: When enabled will log a backtrace for each AeronCError
14//! - **`extra-logging`**: When enabled will log when resource is created and destroyed. Useful if you're seeing a segfault due to a resource being closed
15//! - **`log-c-bindings`**: When enabled will log every C binding call with arguments and return values. Useful for debugging FFI interactions
16//! - **`precompile`**: When enabled will use precompiled C code instead of requiring cmake and java to be installed
17
18#[allow(improper_ctypes_definitions)]
19#[allow(unpredictable_function_pointer_comparisons)]
20pub mod bindings {
21    include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
22}
23
24use bindings::*;
25
26include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
27include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
28
29#[cfg(test)]
30mod tests {
31    use super::*;
32    use crate::test_alloc::current_allocs;
33    use log::{error, info};
34    use rusteron_media_driver::AeronDriverContext;
35    use serial_test::serial;
36    use std::error;
37    use std::error::Error;
38    use std::io::Write;
39    use std::os::raw::c_int;
40    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
41    use std::sync::Arc;
42    use std::thread::{sleep, JoinHandle};
43    use std::time::{Duration, Instant};
44
45    #[derive(Default, Debug)]
46    struct ErrorCount {
47        error_count: usize,
48    }
49
50    impl AeronErrorHandlerCallback for ErrorCount {
51        fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
52            error!("Aeron error {}: {}", error_code, msg);
53            self.error_count += 1;
54        }
55    }
56
57    #[test]
58    #[serial]
59    fn version_check() -> Result<(), Box<dyn error::Error>> {
60        unsafe {
61            aeron_randomised_int32();
62        }
63        let alloc_count = current_allocs();
64
65        {
66            let major = unsafe { crate::aeron_version_major() };
67            let minor = unsafe { crate::aeron_version_minor() };
68            let patch = unsafe { crate::aeron_version_patch() };
69
70            let cargo_version = "1.48.6";
71            let aeron_version = format!("{}.{}.{}", major, minor, patch);
72            assert_eq!(aeron_version, cargo_version);
73
74            let ctx = AeronContext::new()?;
75            let error_count = 1;
76            let mut handler = Handler::leak(ErrorCount::default());
77            ctx.set_error_handler(Some(&handler))?;
78
79            assert!(Aeron::epoch_clock() > 0);
80            drop(ctx);
81            assert!(handler.should_drop);
82            handler.release();
83            assert!(!handler.should_drop);
84            drop(handler);
85        }
86
87        assert!(
88            current_allocs() <= alloc_count,
89            "allocations {} > {alloc_count}",
90            current_allocs()
91        );
92
93        Ok(())
94    }
95
96    #[test]
97    #[serial]
98    fn async_publication_invalid_interface_poll_then_drop() -> Result<(), Box<dyn error::Error>> {
99        let _ = env_logger::Builder::new()
100            .is_test(true)
101            .filter_level(log::LevelFilter::Info)
102            .try_init();
103
104        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
105        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
106        media_driver_ctx.set_dir_delete_on_start(true)?;
107        media_driver_ctx.set_dir(
108            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
109        )?;
110        let (stop, driver_handle) =
111            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
112
113        let ctx = AeronContext::new()?;
114        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
115        ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
116        let aeron = Aeron::new(&ctx)?;
117        aeron.start()?;
118
119        let channel = String::from("aeron:udp?endpoint=203.0.113.1:54321");
120
121        // Create async publication and subscription pollers on the same invalid channel and
122        // attempt to resolve them. If both are created, try a small send/receive cycle and then exit.
123        let pub_poller = aeron.async_add_publication(&channel.clone().into_c_string(), 4321)?;
124        let sub_poller = aeron.async_add_subscription(
125            &channel.into_c_string(),
126            4321,
127            Handlers::no_available_image_handler(),
128            Handlers::no_unavailable_image_handler(),
129        )?;
130
131        let mut publication: Option<AeronPublication> = None;
132        let mut subscription: Option<AeronSubscription> = None;
133        let start = Instant::now();
134        while start.elapsed() < Duration::from_secs(2) {
135            if publication.is_none() {
136                match pub_poller.poll() {
137                    Ok(Some(p)) => publication = Some(p),
138                    Ok(None) | Err(_) => {}
139                }
140            }
141            if subscription.is_none() {
142                match sub_poller.poll() {
143                    Ok(Some(s)) => subscription = Some(s),
144                    Ok(None) | Err(_) => {}
145                }
146            }
147            if publication.is_some() && subscription.is_some() {
148                break;
149            }
150            #[cfg(debug_assertions)]
151            std::thread::sleep(Duration::from_millis(10));
152        }
153
154        info!("publication: {:?}", publication);
155        info!("subscription: {:?}", subscription);
156
157        if let (Some(publisher), Some(subscription)) = (publication, subscription) {
158            let payload = b"hello-aeron";
159            let send_start = Instant::now();
160            let mut sent = false;
161            while send_start.elapsed() < Duration::from_millis(500) {
162                let res = publisher.offer(payload, Handlers::no_reserved_value_supplier_handler());
163                if res >= payload.len() as i64 {
164                    sent = true;
165                    info!("sent {:?}", payload);
166                    break;
167                }
168                std::thread::sleep(Duration::from_millis(10));
169            }
170
171            if sent {
172                let mut got = false;
173                let read_start = Instant::now();
174                while read_start.elapsed() < Duration::from_millis(500) {
175                    let _ = subscription.poll_once(
176                        |msg, _hdr| {
177                            if msg == payload {
178                                got = true;
179                                info!("received {:?}", payload);
180                            }
181                        },
182                        1024,
183                    );
184                    if got {
185                        break;
186                    }
187                    std::thread::sleep(Duration::from_millis(10));
188                }
189                // We don't assert on got, just exercise the path.
190            }
191        }
192
193        // Shutdown
194        stop.store(true, Ordering::SeqCst);
195        let _ = driver_handle.join().unwrap();
196        Ok(())
197    }
198
199    #[test]
200    #[serial]
201    fn async_pub_sub_invalid_endpoint_create_drop_stress() -> Result<(), Box<dyn error::Error>> {
202        let _ = env_logger::Builder::new()
203            .is_test(true)
204            .filter_level(log::LevelFilter::Info)
205            .try_init();
206
207        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
208        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
209        media_driver_ctx.set_dir_delete_on_start(true)?;
210        media_driver_ctx.set_dir(
211            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
212        )?;
213        let (stop, driver_handle) =
214            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
215
216        let ctx = AeronContext::new()?;
217        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
218        ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
219        let aeron = Aeron::new(&ctx)?;
220        aeron.start()?;
221
222        // Stress: repeatedly create and drop pub/sub async pollers on an invalid endpoint.
223        for i in 0..60u16 {
224            let port = 55000u16 + i;
225            let channel = format!("aeron:udp?endpoint=203.0.113.1:{}", port);
226            let pub_poller =
227                aeron.async_add_publication(&channel.clone().into_c_string(), 4500 + i as i32)?;
228            let sub_poller = aeron.async_add_subscription(
229                &channel.into_c_string(),
230                4500 + i as i32,
231                Handlers::no_available_image_handler(),
232                Handlers::no_unavailable_image_handler(),
233            )?;
234
235            let start = Instant::now();
236            while start.elapsed() < Duration::from_millis(50) {
237                let _ = pub_poller.poll();
238                let _ = sub_poller.poll();
239            }
240
241            if i % 2 == 0 {
242                drop(sub_poller);
243                drop(pub_poller);
244            } else {
245                drop(pub_poller);
246                drop(sub_poller);
247            }
248        }
249
250        stop.store(true, Ordering::SeqCst);
251        let _ = driver_handle.join().unwrap();
252        Ok(())
253    }
254
255    #[test]
256    #[serial]
257    // // #[ignore] // TODO FIXME broken test
258    fn async_subscription_invalid_interface_poll_then_drop() -> Result<(), Box<dyn error::Error>> {
259        let _ = env_logger::Builder::new()
260            .is_test(true)
261            .filter_level(log::LevelFilter::Info)
262            .try_init();
263
264        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
265        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
266        media_driver_ctx.set_dir_delete_on_start(true)?;
267        media_driver_ctx.set_dir(
268            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
269        )?;
270        let (stop, driver_handle) =
271            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
272
273        let ctx = AeronContext::new()?;
274        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
275        ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
276        let aeron = Aeron::new(&ctx)?;
277        aeron.start()?;
278
279        // Invalid remote endpoint only (no interface)
280        let channel = String::from("aeron:udp?endpoint=203.0.113.1:54323");
281
282        let poller = aeron.async_add_subscription(
283            &channel.into_c_string(),
284            4323,
285            Handlers::no_available_image_handler(),
286            Handlers::no_unavailable_image_handler(),
287        )?;
288
289        let start = Instant::now();
290        while start.elapsed() < Duration::from_millis(250) {
291            let _ = poller.poll();
292            #[cfg(debug_assertions)]
293            std::thread::sleep(Duration::from_millis(10));
294        }
295
296        drop(poller);
297
298        stop.store(true, Ordering::SeqCst);
299        let _ = driver_handle.join().unwrap();
300        Ok(())
301    }
302
303    #[test]
304    #[serial]
305    fn blocking_add_subscription_invalid_interface_timeout() -> Result<(), Box<dyn error::Error>> {
306        let _ = env_logger::Builder::new()
307            .is_test(true)
308            .filter_level(log::LevelFilter::Info)
309            .try_init();
310
311        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
312        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
313        media_driver_ctx.set_dir_delete_on_start(true)?;
314        media_driver_ctx.set_dir(
315            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
316        )?;
317        let (stop, driver_handle) =
318            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
319
320        let ctx = AeronContext::new()?;
321        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
322        ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
323        let aeron = Aeron::new(&ctx)?;
324        aeron.start()?;
325
326        let channel = String::from("aeron:udp?endpoint=203.0.113.1:54324");
327
328        let result = aeron.add_subscription(
329            &channel.into_c_string(),
330            4324,
331            Handlers::no_available_image_handler(),
332            Handlers::no_unavailable_image_handler(),
333            Duration::from_millis(300),
334        );
335
336        assert!(result.is_err(), "expected error for invalid interface");
337
338        stop.store(true, Ordering::SeqCst);
339        let _ = driver_handle.join().unwrap();
340        Ok(())
341    }
342
343    #[test]
344    #[serial]
345    fn async_publication_invalid_bind_poll_then_drop() -> Result<(), Box<dyn error::Error>> {
346        let _ = env_logger::Builder::new()
347            .is_test(true)
348            .filter_level(log::LevelFilter::Info)
349            .try_init();
350
351        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
352        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
353        media_driver_ctx.set_dir_delete_on_start(true)?;
354        media_driver_ctx.set_dir(
355            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
356        )?;
357        let (stop, driver_handle) =
358            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
359
360        let ctx = AeronContext::new()?;
361        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
362        ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
363        let aeron = Aeron::new(&ctx)?;
364        aeron.start()?;
365
366        // Use an invalid bind on publication (bind is not valid for publication, and the IP is unowned).
367        let channel = format!("aeron:udp?endpoint=127.0.0.1:54330|bind=203.0.113.1:60000");
368
369        let poller = aeron.async_add_publication(&channel.into_c_string(), 4330)?;
370        let start = Instant::now();
371        while start.elapsed() < Duration::from_millis(250) {
372            let _ = poller.poll();
373            #[cfg(debug_assertions)]
374            std::thread::sleep(Duration::from_millis(10));
375        }
376        drop(poller);
377
378        stop.store(true, Ordering::SeqCst);
379        let _ = driver_handle.join().unwrap();
380        Ok(())
381    }
382
383    #[test]
384    #[serial]
385    pub fn simple_large_send() -> Result<(), Box<dyn error::Error>> {
386        let _ = env_logger::Builder::new()
387            .is_test(true)
388            .filter_level(log::LevelFilter::Info)
389            .try_init();
390        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
391        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
392        media_driver_ctx.set_dir_delete_on_start(true)?;
393        media_driver_ctx.set_dir(
394            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
395        )?;
396        let (stop, driver_handle) =
397            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
398
399        let ctx = AeronContext::new()?;
400        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
401        assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
402        let error_count = 1;
403        ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
404        ctx.set_on_new_publication(Some(&Handler::leak(AeronNewPublicationLogger)))?;
405        ctx.set_on_available_counter(Some(&Handler::leak(AeronAvailableCounterLogger)))?;
406        ctx.set_on_close_client(Some(&Handler::leak(AeronCloseClientLogger)))?;
407        ctx.set_on_new_subscription(Some(&Handler::leak(AeronNewSubscriptionLogger)))?;
408        ctx.set_on_unavailable_counter(Some(&Handler::leak(AeronUnavailableCounterLogger)))?;
409        ctx.set_on_available_counter(Some(&Handler::leak(AeronAvailableCounterLogger)))?;
410        ctx.set_on_new_exclusive_publication(Some(&Handler::leak(AeronNewPublicationLogger)))?;
411
412        info!("creating client [simple_large_send test]");
413        let aeron = Aeron::new(&ctx)?;
414        info!("starting client");
415
416        aeron.start()?;
417        info!("client started");
418        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
419        info!("created publisher");
420
421        assert!(AeronCncMetadata::load_from_file(ctx.get_dir())?.pid > 0);
422        let cstr = std::ffi::CString::new(ctx.get_dir()).unwrap();
423        AeronCncMetadata::read_from_file(&cstr, |cnc| {
424            assert!(cnc.pid > 0);
425        })?;
426        assert!(AeronCnc::new_on_heap(ctx.get_dir())?.get_to_driver_heartbeat_ms()? > 0);
427        let cstr = std::ffi::CString::new(ctx.get_dir()).unwrap();
428        for _ in 0..50 {
429            AeronCnc::read_on_partial_stack(&cstr, |cnc| {
430                assert!(cnc.get_to_driver_heartbeat_ms().unwrap() > 0);
431            })?;
432        }
433
434        let subscription = aeron.add_subscription(
435            AERON_IPC_STREAM,
436            123,
437            Handlers::no_available_image_handler(),
438            Handlers::no_unavailable_image_handler(),
439            Duration::from_secs(5),
440        )?;
441        info!("created subscription");
442
443        subscription
444            .poll_once(|msg, header| println!("foo"), 1024)
445            .unwrap();
446
447        // pick a large enough size to confirm fragement assembler is working
448        let string_len = media_driver_ctx.ipc_mtu_length * 100;
449        info!("string length: {}", string_len);
450
451        let publisher_handler = {
452            let stop = stop.clone();
453            std::thread::spawn(move || {
454                let binding = "1".repeat(string_len);
455                let large_msg = binding.as_bytes();
456                loop {
457                    if stop.load(Ordering::Acquire) || publisher.is_closed() {
458                        break;
459                    }
460                    let result =
461                        publisher.offer(large_msg, Handlers::no_reserved_value_supplier_handler());
462
463                    assert_eq!(123, publisher.get_constants().unwrap().stream_id);
464
465                    if result < large_msg.len() as i64 {
466                        let error = AeronCError::from_code(result as i32);
467                        match error.kind() {
468                            AeronErrorType::PublicationBackPressured
469                            | AeronErrorType::PublicationAdminAction => {
470                                // ignore
471                            }
472                            _ => {
473                                error!(
474                                    "ERROR: failed to send message {:?}",
475                                    AeronCError::from_code(result as i32)
476                                );
477                            }
478                        }
479                        sleep(Duration::from_millis(500));
480                    }
481                }
482                info!("stopping publisher thread");
483            })
484        };
485
486        let mut assembler = AeronFragmentClosureAssembler::new()?;
487
488        struct Context {
489            count: Arc<AtomicUsize>,
490            stop: Arc<AtomicBool>,
491            string_len: usize,
492        }
493
494        let count = Arc::new(AtomicUsize::new(0usize));
495        let mut context = Context {
496            count: count.clone(),
497            stop: stop.clone(),
498            string_len,
499        };
500
501        // Start the timer
502        let start_time = Instant::now();
503
504        loop {
505            if start_time.elapsed() > Duration::from_secs(30) {
506                info!("Failed: exceeded 30-second timeout");
507                return Err(Box::new(std::io::Error::new(
508                    std::io::ErrorKind::TimedOut,
509                    "Timeout exceeded",
510                )));
511            }
512            let c = count.load(Ordering::SeqCst);
513            if c > 100 {
514                break;
515            }
516
517            fn process_msg(ctx: &mut Context, buffer: &[u8], header: AeronHeader) {
518                ctx.count.fetch_add(1, Ordering::SeqCst);
519
520                let values = header.get_values().unwrap();
521                assert_ne!(values.frame.session_id, 0);
522
523                if buffer.len() != ctx.string_len {
524                    ctx.stop.store(true, Ordering::SeqCst);
525                    error!(
526                        "ERROR: message was {} but was expecting {} [header={:?}]",
527                        buffer.len(),
528                        ctx.string_len,
529                        header
530                    );
531                    sleep(Duration::from_secs(1));
532                }
533
534                assert_eq!(buffer.len(), ctx.string_len);
535                assert_eq!(buffer, "1".repeat(ctx.string_len).as_bytes());
536            }
537
538            subscription.poll(assembler.process(&mut context, process_msg), 128)?;
539            assert_eq!(123, subscription.get_constants().unwrap().stream_id);
540        }
541
542        subscription.close(Handlers::no_notification_handler())?;
543
544        info!("stopping client");
545        stop.store(true, Ordering::SeqCst);
546
547        let _ = publisher_handler.join().unwrap();
548        let _ = driver_handle.join().unwrap();
549
550        let cnc = AeronCnc::new_on_heap(ctx.get_dir())?;
551        cnc.counters_reader().foreach_counter_once(
552            |value: i64, id: i32, type_id: i32, key: &[u8], label: &str| {
553                println!("counter reader id={id}, type_id={type_id}, key={key:?}, label={label}, value={value} [type={:?}]",
554                AeronSystemCounterType::try_from(type_id));
555            },
556        );
557        cnc.error_log_read_once(| observation_count: i32,
558                                     first_observation_timestamp: i64,
559                                     last_observation_timestamp: i64,
560                                     error: &str| {
561            println!("error: {error} observationCount={observation_count}, first_observation_timestamp={first_observation_timestamp}, last_observation_timestamp={last_observation_timestamp}");
562        }, 0);
563        cnc.loss_reporter_read_once(|    observation_count: i64,
564                                    total_bytes_lost: i64,
565                                    first_observation_timestamp: i64,
566                                    last_observation_timestamp: i64,
567                                    session_id: i32,
568                                    stream_id: i32,
569                                    channel: &str,
570                                    source: &str,| {
571            println!("loss reporter observationCount={observation_count}, totalBytesLost={total_bytes_lost}, first_observed={first_observation_timestamp}, last_observed={last_observation_timestamp}, session_id={session_id}, stream_id={stream_id}, channel={channel} source={source}");
572        })?;
573
574        Ok(())
575    }
576
577    #[test]
578    #[serial]
579    pub fn try_claim() -> Result<(), Box<dyn error::Error>> {
580        let _ = env_logger::Builder::new()
581            .is_test(true)
582            .filter_level(log::LevelFilter::Info)
583            .try_init();
584        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
585        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
586        media_driver_ctx.set_dir_delete_on_start(true)?;
587        media_driver_ctx.set_dir(
588            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
589        )?;
590        let (stop, driver_handle) =
591            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
592
593        let ctx = AeronContext::new()?;
594        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
595        assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
596        ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
597
598        info!("creating client [try_claim test]");
599        let aeron = Aeron::new(&ctx)?;
600        info!("starting client");
601
602        aeron.start()?;
603        info!("client started");
604        const STREAM_ID: i32 = 123;
605        let publisher =
606            aeron.add_publication(AERON_IPC_STREAM, STREAM_ID, Duration::from_secs(5))?;
607        info!("created publisher");
608
609        let subscription = aeron.add_subscription(
610            AERON_IPC_STREAM,
611            STREAM_ID,
612            Handlers::no_available_image_handler(),
613            Handlers::no_unavailable_image_handler(),
614            Duration::from_secs(5),
615        )?;
616        info!("created subscription");
617
618        // pick a large enough size to confirm fragement assembler is working
619        let string_len = 156;
620        info!("string length: {}", string_len);
621
622        let publisher_handler = {
623            let stop = stop.clone();
624            std::thread::spawn(move || {
625                let binding = "1".repeat(string_len);
626                let msg = binding.as_bytes();
627                let buffer = AeronBufferClaim::default();
628                loop {
629                    if stop.load(Ordering::Acquire) || publisher.is_closed() {
630                        break;
631                    }
632
633                    let result = publisher.try_claim(string_len, &buffer);
634
635                    if result < msg.len() as i64 {
636                        error!(
637                            "ERROR: failed to send message {:?}",
638                            AeronCError::from_code(result as i32)
639                        );
640                    } else {
641                        buffer.data().write_all(&msg).unwrap();
642                        buffer.commit().unwrap();
643                    }
644                }
645                info!("stopping publisher thread");
646            })
647        };
648
649        let count = Arc::new(AtomicUsize::new(0usize));
650        let count_copy = Arc::clone(&count);
651        let stop2 = stop.clone();
652
653        struct FragmentHandler {
654            count_copy: Arc<AtomicUsize>,
655            stop2: Arc<AtomicBool>,
656            string_len: usize,
657        }
658
659        impl AeronFragmentHandlerCallback for FragmentHandler {
660            fn handle_aeron_fragment_handler(&mut self, buffer: &[u8], header: AeronHeader) {
661                assert_eq!(STREAM_ID, header.get_values().unwrap().frame.stream_id);
662                let header = header.get_values().unwrap();
663                let frame = header.frame();
664                let stream_id = frame.stream_id();
665                assert_eq!(STREAM_ID, stream_id);
666
667                self.count_copy.fetch_add(1, Ordering::SeqCst);
668
669                if buffer.len() != self.string_len {
670                    self.stop2.store(true, Ordering::SeqCst);
671                    error!(
672                        "ERROR: message was {} but was expecting {} [header={:?}]",
673                        buffer.len(),
674                        self.string_len,
675                        header
676                    );
677                    sleep(Duration::from_secs(1));
678                }
679
680                assert_eq!(buffer.len(), self.string_len);
681                assert_eq!(buffer, "1".repeat(self.string_len).as_bytes());
682            }
683        }
684
685        let (closure, _inner) = Handler::leak_with_fragment_assembler(FragmentHandler {
686            count_copy,
687            stop2,
688            string_len,
689        })?;
690        let start_time = Instant::now();
691
692        loop {
693            if start_time.elapsed() > Duration::from_secs(30) {
694                info!("Failed: exceeded 30-second timeout");
695                return Err(Box::new(std::io::Error::new(
696                    std::io::ErrorKind::TimedOut,
697                    "Timeout exceeded",
698                )));
699            }
700            let c = count.load(Ordering::SeqCst);
701            if c > 100 {
702                break;
703            }
704            subscription.poll(Some(&closure), 128)?;
705        }
706
707        info!("stopping client");
708
709        stop.store(true, Ordering::SeqCst);
710
711        let _ = publisher_handler.join().unwrap();
712        let _ = driver_handle.join().unwrap();
713        Ok(())
714    }
715
716    #[test]
717    #[serial]
718    pub fn counters() -> Result<(), Box<dyn error::Error>> {
719        let _ = env_logger::Builder::new()
720            .is_test(true)
721            .filter_level(log::LevelFilter::Info)
722            .try_init();
723        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
724        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
725        media_driver_ctx.set_dir_delete_on_start(true)?;
726        media_driver_ctx.set_dir(
727            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
728        )?;
729        let (stop, driver_handle) =
730            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
731
732        let ctx = AeronContext::new()?;
733        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
734        assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
735        ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
736        ctx.set_on_unavailable_counter(Some(&Handler::leak(AeronUnavailableCounterLogger)))?;
737
738        struct AvailableCounterHandler {
739            found_counter: bool,
740        }
741
742        impl AeronAvailableCounterCallback for AvailableCounterHandler {
743            fn handle_aeron_on_available_counter(
744                &mut self,
745                counters_reader: AeronCountersReader,
746                registration_id: i64,
747                counter_id: i32,
748            ) -> () {
749                info!(
750            "on counter key={:?}, label={:?} registration_id={registration_id}, counter_id={counter_id}, value={}, {counters_reader:?}",
751            String::from_utf8(counters_reader.get_counter_key(counter_id).unwrap()),
752            counters_reader.get_counter_label(counter_id, 1000),
753            counters_reader.addr(counter_id)
754        );
755
756                assert_eq!(
757                    counters_reader.counter_registration_id(counter_id).unwrap(),
758                    registration_id
759                );
760
761                if let Ok(label) = counters_reader.get_counter_label(counter_id, 1000) {
762                    if label == "label_buffer" {
763                        self.found_counter = true;
764                        assert_eq!(
765                            &counters_reader.get_counter_key(counter_id).unwrap(),
766                            "key".as_bytes()
767                        );
768                    }
769                }
770            }
771        }
772
773        let handler = &Handler::leak(AvailableCounterHandler {
774            found_counter: false,
775        });
776        ctx.set_on_available_counter(Some(handler))?;
777
778        info!("creating client");
779        let aeron = Aeron::new(&ctx)?;
780        info!("starting client");
781
782        aeron.start()?;
783        info!("client started [counters test]");
784
785        let counter = aeron.add_counter(
786            123,
787            "key".as_bytes(),
788            "label_buffer",
789            Duration::from_secs(5),
790        )?;
791        let constants = counter.get_constants()?;
792        let counter_id = constants.counter_id;
793
794        let publisher_handler = {
795            let stop = stop.clone();
796            let counter = counter.clone();
797            std::thread::spawn(move || {
798                for _ in 0..150 {
799                    if stop.load(Ordering::Acquire) || counter.is_closed() {
800                        break;
801                    }
802                    counter.addr_atomic().fetch_add(1, Ordering::SeqCst);
803                }
804                info!("stopping publisher thread");
805            })
806        };
807
808        let now = Instant::now();
809        while counter.addr_atomic().load(Ordering::SeqCst) < 100
810            && now.elapsed() < Duration::from_secs(10)
811        {
812            sleep(Duration::from_micros(10));
813        }
814
815        assert!(now.elapsed() < Duration::from_secs(10));
816
817        info!(
818            "counter is {}",
819            counter.addr_atomic().load(Ordering::SeqCst)
820        );
821
822        info!("stopping client");
823
824        #[cfg(not(target_os = "windows"))] // not sure why windows version doesn't fire event
825        assert!(handler.found_counter);
826
827        stop.store(true, Ordering::SeqCst);
828
829        let reader = aeron.counters_reader();
830        assert_eq!(reader.get_counter_label(counter_id, 256)?, "label_buffer");
831        assert_eq!(reader.get_counter_key(counter_id)?, "key".as_bytes());
832        let buffers = AeronCountersReaderBuffers::default();
833        reader.get_buffers(&buffers)?;
834
835        let _ = publisher_handler.join().unwrap();
836        let _ = driver_handle.join().unwrap();
837        Ok(())
838    }
839
840    /// A simple error counter for testing error callback invocation.
841    #[derive(Default, Debug)]
842    struct TestErrorCount {
843        pub error_count: usize,
844    }
845
846    impl Drop for TestErrorCount {
847        fn drop(&mut self) {
848            info!("TestErrorCount dropped with {} errors", self.error_count);
849        }
850    }
851
852    impl AeronErrorHandlerCallback for TestErrorCount {
853        fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
854            error!("Aeron error {}: {}", error_code, msg);
855            self.error_count += 1;
856        }
857    }
858
859    #[test]
860    #[serial]
861    pub fn backpressure_recovery_test() -> Result<(), Box<dyn error::Error>> {
862        let _ = env_logger::Builder::new()
863            .is_test(true)
864            .filter_level(log::LevelFilter::Info)
865            .try_init();
866
867        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
868        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
869        media_driver_ctx.set_dir_delete_on_start(true)?;
870        media_driver_ctx.set_dir(
871            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
872        )?;
873        let (stop, driver_handle) =
874            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
875
876        let ctx = AeronContext::new()?;
877        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
878        ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
879
880        let aeron = Aeron::new(&ctx)?;
881        aeron.start()?;
882
883        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
884        let subscription = aeron.add_subscription(
885            AERON_IPC_STREAM,
886            123,
887            Handlers::no_available_image_handler(),
888            Handlers::no_unavailable_image_handler(),
889            Duration::from_secs(5),
890        )?;
891
892        let count = Arc::new(AtomicUsize::new(0));
893        let start_time = Instant::now();
894
895        // Spawn a publisher thread that repeatedly sends "test" messages.
896        let publisher_thread = {
897            let stop = stop.clone();
898            std::thread::spawn(move || {
899                while !stop.load(Ordering::Acquire) {
900                    let msg = b"test";
901                    let result =
902                        publisher.offer(msg, Handlers::no_reserved_value_supplier_handler());
903                    // If backpressure is encountered, sleep a bit.
904                    if result == AeronErrorType::PublicationBackPressured.code() as i64 {
905                        sleep(Duration::from_millis(50));
906                    }
907                }
908            })
909        };
910
911        // Poll using the inline closure via poll_once until we receive at least 50 messages.
912        while count.load(Ordering::SeqCst) < 50 && start_time.elapsed() < Duration::from_secs(10) {
913            let _ = subscription.poll_once(
914                |_msg, _header| {
915                    count.fetch_add(1, Ordering::SeqCst);
916                },
917                128,
918            )?;
919        }
920
921        stop.store(true, Ordering::SeqCst);
922        publisher_thread.join().unwrap();
923        let _ = driver_handle.join().unwrap();
924
925        assert!(
926            count.load(Ordering::SeqCst) >= 50,
927            "Expected at least 50 messages received"
928        );
929        Ok(())
930    }
931
932    #[test]
933    #[serial]
934    pub fn multi_subscription_test() -> Result<(), Box<dyn error::Error>> {
935        let _ = env_logger::Builder::new()
936            .is_test(true)
937            .filter_level(log::LevelFilter::Info)
938            .try_init();
939
940        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
941        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
942        media_driver_ctx.set_dir_delete_on_start(true)?;
943        media_driver_ctx.set_dir(
944            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
945        )?;
946        let (_stop, driver_handle) =
947            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
948
949        let ctx = AeronContext::new()?;
950        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
951        ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
952
953        let aeron = Aeron::new(&ctx)?;
954        aeron.start()?;
955        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
956
957        // Create two subscriptions on the same channel.
958        let subscription1 = aeron.add_subscription(
959            AERON_IPC_STREAM,
960            123,
961            Handlers::no_available_image_handler(),
962            Handlers::no_unavailable_image_handler(),
963            Duration::from_secs(5),
964        )?;
965        let subscription2 = aeron.add_subscription(
966            AERON_IPC_STREAM,
967            123,
968            Handlers::no_available_image_handler(),
969            Handlers::no_unavailable_image_handler(),
970            Duration::from_secs(5),
971        )?;
972
973        let count1 = Arc::new(AtomicUsize::new(0));
974        let count2 = Arc::new(AtomicUsize::new(0));
975
976        // Publish a single message.
977        let msg = b"hello multi-subscription";
978        let result = publisher.offer(msg, Handlers::no_reserved_value_supplier_handler());
979        assert!(
980            result >= msg.len() as i64,
981            "Message should be sent successfully"
982        );
983
984        let start_time = Instant::now();
985        // Poll both subscriptions with inline closures until each has received at least one message.
986        while (count1.load(Ordering::SeqCst) < 1 || count2.load(Ordering::SeqCst) < 1)
987            && start_time.elapsed() < Duration::from_secs(5)
988        {
989            let _ = subscription1.poll_once(
990                |_msg, _header| {
991                    count1.fetch_add(1, Ordering::SeqCst);
992                },
993                128,
994            )?;
995            let _ = subscription2.poll_once(
996                |_msg, _header| {
997                    count2.fetch_add(1, Ordering::SeqCst);
998                },
999                128,
1000            )?;
1001        }
1002
1003        assert!(
1004            count1.load(Ordering::SeqCst) >= 1,
1005            "Subscription 1 did not receive the message"
1006        );
1007        assert!(
1008            count2.load(Ordering::SeqCst) >= 1,
1009            "Subscription 2 did not receive the message"
1010        );
1011
1012        _stop.store(true, Ordering::SeqCst);
1013        let _ = driver_handle.join().unwrap();
1014        Ok(())
1015    }
1016
1017    #[test]
1018    #[serial]
1019    pub fn should_be_able_to_drop_after_close_manually_being_closed(
1020    ) -> Result<(), Box<dyn error::Error>> {
1021        let _ = env_logger::Builder::new()
1022            .is_test(true)
1023            .filter_level(log::LevelFilter::Info)
1024            .try_init();
1025
1026        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1027        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1028        media_driver_ctx.set_dir_delete_on_start(true)?;
1029        media_driver_ctx.set_dir(
1030            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
1031        )?;
1032        let (_stop, driver_handle) =
1033            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1034
1035        let ctx = AeronContext::new()?;
1036        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
1037        ctx.set_error_handler(Some(&Handler::leak(AeronErrorHandlerLogger)))?;
1038
1039        let aeron = Aeron::new(&ctx)?;
1040        aeron.start()?;
1041
1042        {
1043            let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
1044            info!("created publication [sessionId={}]", publisher.session_id());
1045            publisher.close_with_no_args()?;
1046            drop(publisher);
1047        }
1048
1049        {
1050            let publisher = aeron.add_publication(AERON_IPC_STREAM, 124, Duration::from_secs(5))?;
1051            info!("created publication [sessionId={}]", publisher.session_id());
1052            publisher.close(Handlers::no_notification_handler())?;
1053            drop(publisher);
1054        }
1055
1056        {
1057            let publisher = aeron.add_publication(AERON_IPC_STREAM, 125, Duration::from_secs(5))?;
1058            publisher.close_once(|| println!("on close"))?;
1059            info!("created publication [sessionId={}]", publisher.session_id());
1060            drop(publisher);
1061        }
1062
1063        Ok(())
1064    }
1065
1066    #[test]
1067    #[serial]
1068    pub fn offer_on_closed_publication_error_test() -> Result<(), Box<dyn error::Error>> {
1069        let _ = env_logger::Builder::new()
1070            .is_test(true)
1071            .filter_level(log::LevelFilter::Info)
1072            .try_init();
1073
1074        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1075        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1076        media_driver_ctx.set_dir_delete_on_start(true)?;
1077        media_driver_ctx.set_dir(
1078            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
1079        )?;
1080        let (_stop, driver_handle) =
1081            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1082
1083        let ctx = AeronContext::new()?;
1084        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
1085        ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
1086
1087        let aeron = Aeron::new(&ctx)?;
1088        aeron.start()?;
1089        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
1090
1091        // Close the publication immediately.
1092        publisher.close(Handlers::no_notification_handler())?;
1093
1094        // Attempt to send a message after the publication is closed.
1095        let result = publisher.offer(
1096            b"should fail",
1097            Handlers::no_reserved_value_supplier_handler(),
1098        );
1099        assert!(
1100            result < 0,
1101            "Offering on a closed publication should return a negative error code"
1102        );
1103
1104        _stop.store(true, Ordering::SeqCst);
1105        let _ = driver_handle.join().unwrap();
1106        Ok(())
1107    }
1108
1109    /// Test sending and receiving an empty (zero-length) message using inline closures with poll_once.
1110    #[test]
1111    #[serial]
1112    pub fn empty_message_test() -> Result<(), Box<dyn error::Error>> {
1113        let _ = env_logger::Builder::new()
1114            .is_test(true)
1115            .filter_level(log::LevelFilter::Info)
1116            .try_init();
1117
1118        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1119        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1120        media_driver_ctx.set_dir_delete_on_start(true)?;
1121        media_driver_ctx.set_dir(
1122            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
1123        )?;
1124        let (_stop, driver_handle) =
1125            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1126
1127        let ctx = AeronContext::new()?;
1128        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
1129        ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
1130
1131        let aeron = Aeron::new(&ctx)?;
1132        aeron.start()?;
1133        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
1134        let subscription = aeron.add_subscription(
1135            AERON_IPC_STREAM,
1136            123,
1137            Handlers::no_available_image_handler(),
1138            Handlers::no_unavailable_image_handler(),
1139            Duration::from_secs(5),
1140        )?;
1141
1142        let empty_received = Arc::new(AtomicBool::new(false));
1143        let start_time = Instant::now();
1144
1145        let result = publisher.offer(b"", Handlers::no_reserved_value_supplier_handler());
1146        assert!(result > 0);
1147
1148        while !empty_received.load(Ordering::SeqCst)
1149            && start_time.elapsed() < Duration::from_secs(5)
1150        {
1151            let _ = subscription.poll_once(
1152                |msg, _header| {
1153                    if msg.is_empty() {
1154                        empty_received.store(true, Ordering::SeqCst);
1155                    }
1156                },
1157                128,
1158            )?;
1159        }
1160
1161        assert!(
1162            empty_received.load(Ordering::SeqCst),
1163            "Empty message was not received"
1164        );
1165        _stop.store(true, Ordering::SeqCst);
1166        let _ = driver_handle.join().unwrap();
1167        Ok(())
1168    }
1169
1170    #[test]
1171    #[serial]
1172    #[ignore] // need to work to get tags working properly, its more of testing issue then tag issue
1173    pub fn tags() -> Result<(), Box<dyn error::Error>> {
1174        let _ = env_logger::Builder::new()
1175            .is_test(true)
1176            .filter_level(log::LevelFilter::Debug)
1177            .try_init();
1178
1179        let (md_ctx, stop, md) = start_media_driver(1)?;
1180
1181        let (_a_ctx2, aeron_sub) = create_client(&md_ctx)?;
1182
1183        info!("creating suscriber 1");
1184        let sub = aeron_sub
1185            .add_subscription(
1186                &"aeron:udp?tags=100".into_c_string(),
1187                123,
1188                Handlers::no_available_image_handler(),
1189                Handlers::no_unavailable_image_handler(),
1190                Duration::from_secs(50),
1191            )
1192            .map_err(|e| {
1193                error!("aeron error={}", Aeron::errmsg());
1194                e
1195            })?;
1196
1197        let ctx = AeronContext::new()?;
1198        ctx.set_dir(&aeron_sub.context().get_dir().into_c_string())?;
1199        let aeron = Aeron::new(&ctx)?;
1200        aeron.start()?;
1201
1202        info!("creating suscriber 2");
1203        let sub2 = aeron_sub.add_subscription(
1204            &"aeron:udp?tags=100".into_c_string(),
1205            123,
1206            Handlers::no_available_image_handler(),
1207            Handlers::no_unavailable_image_handler(),
1208            Duration::from_secs(50),
1209        )?;
1210
1211        let (_a_ctx1, aeron_pub) = create_client(&md_ctx)?;
1212        info!("creating publisher");
1213        assert!(!aeron_pub.is_closed());
1214        let publisher = aeron_pub
1215            .add_publication(
1216                &"aeron:udp?endpoint=localhost:4040|alias=test|tags=100".into_c_string(),
1217                123,
1218                Duration::from_secs(5),
1219            )
1220            .map_err(|e| {
1221                error!("aeron error={}", Aeron::errmsg());
1222                e
1223            })?;
1224
1225        info!("publishing msg");
1226
1227        loop {
1228            let result = publisher.offer(
1229                "213".as_bytes(),
1230                Handlers::no_reserved_value_supplier_handler(),
1231            );
1232            if result < 0 {
1233                error!(
1234                    "failed to publish {:?}",
1235                    AeronCError::from_code(result as i32)
1236                );
1237            } else {
1238                break;
1239            }
1240        }
1241
1242        sub.poll_once(
1243            |msg, _header| {
1244                println!("Received message: {:?}", msg);
1245            },
1246            128,
1247        )?;
1248        sub2.poll_once(
1249            |msg, _header| {
1250                println!("Received message: {:?}", msg);
1251            },
1252            128,
1253        )?;
1254
1255        stop.store(true, Ordering::SeqCst);
1256
1257        Ok(())
1258    }
1259
1260    fn create_client(
1261        media_driver_ctx: &AeronDriverContext,
1262    ) -> Result<(AeronContext, Aeron), Box<dyn Error>> {
1263        let dir = media_driver_ctx.get_dir();
1264        info!("creating aeron client [dir={}]", dir);
1265        let ctx = AeronContext::new()?;
1266        ctx.set_dir(&dir.into_c_string())?;
1267        ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
1268        let aeron = Aeron::new(&ctx)?;
1269        aeron.start()?;
1270        Ok((ctx, aeron))
1271    }
1272
1273    fn start_media_driver(
1274        instance: u64,
1275    ) -> Result<
1276        (
1277            AeronDriverContext,
1278            Arc<AtomicBool>,
1279            JoinHandle<Result<(), rusteron_media_driver::AeronCError>>,
1280        ),
1281        Box<dyn Error>,
1282    > {
1283        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
1284        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
1285        media_driver_ctx.set_dir_delete_on_start(true)?;
1286        media_driver_ctx.set_dir(
1287            &format!(
1288                "{}{}-{}",
1289                media_driver_ctx.get_dir(),
1290                Aeron::epoch_clock(),
1291                instance
1292            )
1293            .into_c_string(),
1294        )?;
1295        let (stop, driver_handle) =
1296            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
1297        Ok((media_driver_ctx, stop, driver_handle))
1298    }
1299
1300    #[doc = include_str!("../../README.md")]
1301    mod readme_tests {}
1302}