Skip to main content

rusteron_code_gen/
common.rs

1use crate::AeronErrorType::Unknown;
2#[cfg(feature = "backtrace")]
3use std::backtrace::Backtrace;
4use std::cell::UnsafeCell;
5use std::fmt::Formatter;
6use std::mem::MaybeUninit;
7use std::ops::{Deref, DerefMut};
8pub enum CResource<T> {
9    OwnedOnHeap(std::rc::Rc<ManagedCResource<T>>),
10    /// stored on stack, unsafe, use with care
11    OwnedOnStack(std::mem::MaybeUninit<T>),
12    Borrowed(*mut T),
13}
14
15impl<T: Clone> Clone for CResource<T> {
16    fn clone(&self) -> Self {
17        unsafe {
18            match self {
19                CResource::OwnedOnHeap(r) => CResource::OwnedOnHeap(r.clone()),
20                CResource::OwnedOnStack(r) => {
21                    CResource::OwnedOnStack(MaybeUninit::new(r.assume_init_ref().clone()))
22                }
23                CResource::Borrowed(r) => CResource::Borrowed(r.clone()),
24            }
25        }
26    }
27}
28
29impl<T> CResource<T> {
30    #[inline]
31    pub fn get(&self) -> *mut T {
32        match self {
33            CResource::OwnedOnHeap(r) => r.get(),
34            CResource::OwnedOnStack(r) => r.as_ptr() as *mut T,
35            CResource::Borrowed(r) => *r,
36        }
37    }
38
39    #[inline]
40    // to prevent the dependencies from being dropped as you have a copy here
41    pub fn add_dependency<D: std::any::Any>(&self, dep: D) {
42        match self {
43            CResource::OwnedOnHeap(r) => r.add_dependency(dep),
44            CResource::OwnedOnStack(_) | CResource::Borrowed(_) => {
45                unreachable!("only owned on heap")
46            }
47        }
48    }
49    #[inline]
50    pub fn get_dependency<V: Clone + 'static>(&self) -> Option<V> {
51        match self {
52            CResource::OwnedOnHeap(r) => r.get_dependency(),
53            CResource::OwnedOnStack(_) | CResource::Borrowed(_) => None,
54        }
55    }
56
57    #[inline]
58    pub fn as_owned(&self) -> Option<&std::rc::Rc<ManagedCResource<T>>> {
59        match self {
60            CResource::OwnedOnHeap(r) => Some(r),
61            CResource::OwnedOnStack(_) | CResource::Borrowed(_) => None,
62        }
63    }
64}
65
66impl<T> std::fmt::Debug for CResource<T> {
67    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68        let name = std::any::type_name::<T>();
69
70        match self {
71            CResource::OwnedOnHeap(r) => {
72                write!(f, "{name} heap({:?})", r)
73            }
74            CResource::OwnedOnStack(r) => {
75                write!(f, "{name} stack({:?})", *r)
76            }
77            CResource::Borrowed(r) => {
78                write!(f, "{name} borrowed ({:?})", r)
79            }
80        }
81    }
82}
83
84/// A custom struct for managing C resources with automatic cleanup.
85///
86/// It handles initialisation and clean-up of the resource and ensures that resources
87/// are properly released when they go out of scope.
88#[allow(dead_code)]
89pub struct ManagedCResource<T> {
90    resource: *mut T,
91    cleanup: Option<Box<dyn FnMut(*mut *mut T) -> i32>>,
92    cleanup_struct: bool,
93    /// if someone externally rusteron calls close
94    close_already_called: std::cell::Cell<bool>,
95    /// if there is a c method to verify it someone has closed it, only few structs have this functionality
96    check_for_is_closed: Option<fn(*mut T) -> bool>,
97    /// this will be called if closed hasn't already happened even if its borrowed
98    auto_close: std::cell::Cell<bool>,
99    /// indicates if the underlying resource has already been handed off and should not be re-polled
100    resource_released: std::cell::Cell<bool>,
101    /// to prevent the dependencies from being dropped as you have a copy here,
102    /// for example, you want to have a dependency to aeron for any async jobs so aeron doesnt get dropped first
103    /// when you have a publication/subscription
104    /// Note empty vec does not allocate on heap
105    dependencies: UnsafeCell<Vec<std::rc::Rc<dyn std::any::Any>>>,
106}
107
108impl<T> std::fmt::Debug for ManagedCResource<T> {
109    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110        let mut debug_struct = f.debug_struct("ManagedCResource");
111
112        if !self.close_already_called.get()
113            && !self.resource.is_null()
114            && !self
115                .check_for_is_closed
116                .as_ref()
117                .map_or(false, |f| f(self.resource))
118        {
119            debug_struct.field("resource", &self.resource);
120        }
121
122        debug_struct
123            .field("type", &std::any::type_name::<T>())
124            .finish()
125    }
126}
127
128impl<T> ManagedCResource<T> {
129    /// Creates a new ManagedCResource with a given initializer and cleanup function.
130    ///
131    /// The initializer is a closure that attempts to initialize the resource.
132    /// If initialization fails, the initializer should return an error code.
133    /// The cleanup function is used to release the resource when it's no longer needed.
134    /// `cleanup_struct` where it should clean up the struct in rust
135    pub fn new(
136        init: impl FnOnce(*mut *mut T) -> i32,
137        cleanup: Option<Box<dyn FnMut(*mut *mut T) -> i32>>,
138        cleanup_struct: bool,
139        check_for_is_closed: Option<fn(*mut T) -> bool>,
140    ) -> Result<Self, AeronCError> {
141        let resource = Self::initialise(init)?;
142
143        let result = Self {
144            resource,
145            cleanup,
146            cleanup_struct,
147            close_already_called: std::cell::Cell::new(false),
148            check_for_is_closed,
149            auto_close: std::cell::Cell::new(false),
150            resource_released: std::cell::Cell::new(false),
151            dependencies: UnsafeCell::new(vec![]),
152        };
153        #[cfg(feature = "extra-logging")]
154        log::info!("created c resource: {:?}", result);
155        Ok(result)
156    }
157
158    pub fn initialise(
159        init: impl FnOnce(*mut *mut T) -> i32 + Sized,
160    ) -> Result<*mut T, AeronCError> {
161        let mut resource: *mut T = std::ptr::null_mut();
162        let result = init(&mut resource);
163        if result < 0 || resource.is_null() {
164            return Err(AeronCError::from_code(result));
165        }
166        Ok(resource)
167    }
168
169    pub fn is_closed_already_called(&self) -> bool {
170        self.close_already_called.get()
171            || self.resource.is_null()
172            || self
173                .check_for_is_closed
174                .as_ref()
175                .map_or(false, |f| f(self.resource))
176    }
177
178    /// Gets a raw pointer to the resource.
179    #[inline(always)]
180    pub fn get(&self) -> *mut T {
181        self.resource
182    }
183
184    #[inline(always)]
185    pub fn get_mut(&self) -> &mut T {
186        unsafe { &mut *self.resource }
187    }
188
189    #[inline]
190    // to prevent the dependencies from being dropped as you have a copy here
191    pub fn add_dependency<D: std::any::Any>(&self, dep: D) {
192        if let Some(dep) =
193            (&dep as &dyn std::any::Any).downcast_ref::<std::rc::Rc<dyn std::any::Any>>()
194        {
195            unsafe {
196                (*self.dependencies.get()).push(dep.clone());
197            }
198        } else {
199            unsafe {
200                (*self.dependencies.get()).push(std::rc::Rc::new(dep));
201            }
202        }
203    }
204
205    #[inline]
206    pub fn get_dependency<V: Clone + 'static>(&self) -> Option<V> {
207        unsafe {
208            (*self.dependencies.get())
209                .iter()
210                .filter_map(|x| x.as_ref().downcast_ref::<V>().cloned())
211                .next()
212        }
213    }
214
215    #[inline]
216    pub fn is_resource_released(&self) -> bool {
217        self.resource_released.get()
218    }
219
220    #[inline]
221    pub fn mark_resource_released(&self) {
222        self.resource_released.set(true);
223    }
224
225    /// Closes the resource by calling the cleanup function.
226    ///
227    /// If cleanup fails, it returns an `AeronError`.
228    pub fn close(&mut self) -> Result<(), AeronCError> {
229        if self.close_already_called.get() {
230            return Ok(());
231        }
232        self.close_already_called.set(true);
233
234        let already_closed = self
235            .check_for_is_closed
236            .as_ref()
237            .map_or(false, |f| f(self.resource));
238
239        if let Some(mut cleanup) = self.cleanup.take() {
240            if !self.resource.is_null() {
241                if !already_closed {
242                    let result = cleanup(&mut self.resource);
243                    if result < 0 {
244                        return Err(AeronCError::from_code(result));
245                    }
246                }
247                self.resource = std::ptr::null_mut();
248            }
249        }
250
251        Ok(())
252    }
253}
254
255impl<T> Drop for ManagedCResource<T> {
256    fn drop(&mut self) {
257        if !self.resource.is_null() {
258            let already_closed = self.close_already_called.get()
259                || self
260                    .check_for_is_closed
261                    .as_ref()
262                    .map_or(false, |f| f(self.resource));
263
264            let resource = if already_closed {
265                self.resource
266            } else {
267                self.resource.clone()
268            };
269
270            if !already_closed {
271                // Ensure the clean-up function is called when the resource is dropped.
272                #[cfg(feature = "extra-logging")]
273                log::info!("closing c resource: {:?}", self);
274                let _ = self.close(); // Ignore errors during an automatic drop to avoid panics.
275            }
276            self.close_already_called.set(true);
277
278            if self.cleanup_struct {
279                #[cfg(feature = "extra-logging")]
280                log::info!("closing rust struct resource: {:?}", resource);
281                unsafe {
282                    let _ = Box::from_raw(resource);
283                }
284            }
285        }
286    }
287}
288
289#[derive(Debug, PartialOrd, Eq, PartialEq, Clone)]
290pub enum AeronErrorType {
291    GenericError,
292    ClientErrorDriverTimeout,
293    ClientErrorClientTimeout,
294    ClientErrorConductorServiceTimeout,
295    ClientErrorBufferFull,
296    PublicationBackPressured,
297    PublicationAdminAction,
298    PublicationClosed,
299    PublicationMaxPositionExceeded,
300    PublicationError,
301    TimedOut,
302    Unknown(i32),
303}
304
305impl From<AeronErrorType> for AeronCError {
306    fn from(value: AeronErrorType) -> Self {
307        AeronCError::from_code(value.code())
308    }
309}
310
311impl AeronErrorType {
312    pub fn code(&self) -> i32 {
313        match self {
314            AeronErrorType::GenericError => -1,
315            AeronErrorType::ClientErrorDriverTimeout => -1000,
316            AeronErrorType::ClientErrorClientTimeout => -1001,
317            AeronErrorType::ClientErrorConductorServiceTimeout => -1002,
318            AeronErrorType::ClientErrorBufferFull => -1003,
319            AeronErrorType::PublicationBackPressured => -2,
320            AeronErrorType::PublicationAdminAction => -3,
321            AeronErrorType::PublicationClosed => -4,
322            AeronErrorType::PublicationMaxPositionExceeded => -5,
323            AeronErrorType::PublicationError => -6,
324            AeronErrorType::TimedOut => -234324,
325            AeronErrorType::Unknown(code) => *code,
326        }
327    }
328
329    pub fn is_back_pressured(&self) -> bool {
330        self == &AeronErrorType::PublicationBackPressured
331    }
332
333    pub fn is_admin_action(&self) -> bool {
334        self == &AeronErrorType::PublicationAdminAction
335    }
336
337    pub fn is_back_pressured_or_admin_action(&self) -> bool {
338        self.is_back_pressured() || self.is_admin_action()
339    }
340
341    pub fn from_code(code: i32) -> Self {
342        match code {
343            -1 => AeronErrorType::GenericError,
344            -1000 => AeronErrorType::ClientErrorDriverTimeout,
345            -1001 => AeronErrorType::ClientErrorClientTimeout,
346            -1002 => AeronErrorType::ClientErrorConductorServiceTimeout,
347            -1003 => AeronErrorType::ClientErrorBufferFull,
348            -2 => AeronErrorType::PublicationBackPressured,
349            -3 => AeronErrorType::PublicationAdminAction,
350            -4 => AeronErrorType::PublicationClosed,
351            -5 => AeronErrorType::PublicationMaxPositionExceeded,
352            -6 => AeronErrorType::PublicationError,
353            -234324 => AeronErrorType::TimedOut,
354            _ => Unknown(code),
355        }
356    }
357
358    pub fn to_string(&self) -> &'static str {
359        match self {
360            AeronErrorType::GenericError => "Generic Error",
361            AeronErrorType::ClientErrorDriverTimeout => "Client Error Driver Timeout",
362            AeronErrorType::ClientErrorClientTimeout => "Client Error Client Timeout",
363            AeronErrorType::ClientErrorConductorServiceTimeout => {
364                "Client Error Conductor Service Timeout"
365            }
366            AeronErrorType::ClientErrorBufferFull => "Client Error Buffer Full",
367            AeronErrorType::PublicationBackPressured => "Publication Back Pressured",
368            AeronErrorType::PublicationAdminAction => "Publication Admin Action",
369            AeronErrorType::PublicationClosed => "Publication Closed",
370            AeronErrorType::PublicationMaxPositionExceeded => "Publication Max Position Exceeded",
371            AeronErrorType::PublicationError => "Publication Error",
372            AeronErrorType::TimedOut => "Timed Out",
373            AeronErrorType::Unknown(_) => "Unknown Error",
374        }
375    }
376}
377
378/// Represents an Aeron-specific error with a code and an optional message.
379///
380/// The error code is derived from Aeron C API calls.
381/// Use `get_last_err_message()` to retrieve the last human-readable message, if available.
382#[derive(Eq, PartialEq, Clone)]
383pub struct AeronCError {
384    pub code: i32,
385}
386
387impl AeronCError {
388    /// Creates an AeronError from the error code returned by Aeron.
389    ///
390    /// Error codes below zero are considered failure.
391    pub fn from_code(code: i32) -> Self {
392        #[cfg(feature = "backtrace")]
393        {
394            if code < 0 {
395                let backtrace = Backtrace::capture();
396                let backtrace = format!("{:?}", backtrace);
397
398                let re =
399                    regex::Regex::new(r#"fn: "([^"]+)", file: "([^"]+)", line: (\d+)"#).unwrap();
400                let mut lines = String::new();
401                re.captures_iter(&backtrace).for_each(|cap| {
402                    let function = &cap[1];
403                    let mut file = cap[2].to_string();
404                    let line = &cap[3];
405                    if file.starts_with("./") {
406                        file = format!("{}/{}", env!("CARGO_MANIFEST_DIR"), &file[2..]);
407                    } else if file.starts_with("/rustc/") {
408                        file = file.split("/").last().unwrap().to_string();
409                    }
410                    // log in intellij friendly error format so can hyperlink to source code in stack trace
411                    lines.push_str(&format!(" {file}:{line} in {function}\n"));
412                });
413
414                log::error!(
415                    "Aeron C error code: {}, kind: '{:?}'\n{}",
416                    code,
417                    AeronErrorType::from_code(code),
418                    lines
419                );
420            }
421        }
422        AeronCError { code }
423    }
424
425    pub fn kind(&self) -> AeronErrorType {
426        AeronErrorType::from_code(self.code)
427    }
428
429    pub fn is_back_pressured(&self) -> bool {
430        self.kind().is_back_pressured()
431    }
432
433    pub fn is_admin_action(&self) -> bool {
434        self.kind().is_admin_action()
435    }
436
437    pub fn is_back_pressured_or_admin_action(&self) -> bool {
438        self.kind().is_back_pressured_or_admin_action()
439    }
440}
441
442/// # Handler
443///
444/// `Handler` is a struct that wraps a raw pointer and a drop flag.
445///
446/// Memory is freed automatically when `Handler` goes out of scope (via `Drop`).
447/// You must ensure the `Handler` outlives the Aeron session that uses it, since Aeron
448/// holds a raw `clientd` pointer to the boxed value and will call callbacks until closed.
449///
450/// Call `release()` early if you want to free the memory before the `Handler` drops.
451///
452/// ## Example
453///
454/// ```no_compile
455/// use rusteron_code_gen::Handler;
456/// let handler = Handler::leak(your_value);
457/// // handler is freed automatically when it goes out of scope
458/// ```
459pub struct Handler<T> {
460    raw_ptr: *mut T,
461    should_drop: bool,
462}
463
464unsafe impl<T> Send for Handler<T> {}
465unsafe impl<T> Sync for Handler<T> {}
466
467/// Utility method for setting empty handlers
468pub struct Handlers;
469
470impl<T> Handler<T> {
471    pub fn leak(handler: T) -> Self {
472        let raw_ptr = Box::into_raw(Box::new(handler)) as *mut _;
473        #[cfg(feature = "extra-logging")]
474        log::info!("creating handler {:?}", raw_ptr);
475        Self {
476            raw_ptr,
477            should_drop: true,
478        }
479    }
480
481    pub fn is_none(&self) -> bool {
482        self.raw_ptr.is_null()
483    }
484
485    pub fn as_raw(&self) -> *mut std::os::raw::c_void {
486        self.raw_ptr as *mut std::os::raw::c_void
487    }
488
489    pub fn release(&mut self) {
490        if self.should_drop && !self.raw_ptr.is_null() {
491            unsafe {
492                #[cfg(feature = "extra-logging")]
493                log::info!("dropping handler {:?}", self.raw_ptr);
494                let _ = Box::from_raw(self.raw_ptr as *mut T);
495                self.should_drop = false;
496            }
497        }
498    }
499
500    pub unsafe fn new(raw_ptr: *mut T, should_drop: bool) -> Self {
501        Self {
502            raw_ptr,
503            should_drop,
504        }
505    }
506}
507
508impl<T> Drop for Handler<T> {
509    fn drop(&mut self) {
510        if self.should_drop && !self.raw_ptr.is_null() {
511            log::error!(
512                "Handler<{}> at {:?} is being dropped but release() was never called — \
513                 memory leak: {} bytes. Call release() explicitly when the C side no longer holds the pointer.",
514                std::any::type_name::<T>(),
515                self.raw_ptr,
516                std::mem::size_of::<T>(),
517            );
518        }
519    }
520}
521
522impl<T> Deref for Handler<T> {
523    type Target = T;
524
525    fn deref(&self) -> &Self::Target {
526        unsafe { &*self.raw_ptr as &T }
527    }
528}
529
530impl<T> DerefMut for Handler<T> {
531    fn deref_mut(&mut self) -> &mut Self::Target {
532        unsafe { &mut *self.raw_ptr as &mut T }
533    }
534}
535
536pub fn find_unused_udp_port(start_port: u16) -> Option<u16> {
537    let end_port = u16::MAX;
538
539    for port in start_port..=end_port {
540        if is_udp_port_available(port) {
541            return Some(port);
542        }
543    }
544
545    None
546}
547
548pub fn is_udp_port_available(port: u16) -> bool {
549    std::net::UdpSocket::bind(("127.0.0.1", port)).is_ok()
550}
551
552/// Represents the Aeron URI parser and handler.
553pub struct ChannelUri {}
554
555impl ChannelUri {
556    pub const AERON_SCHEME: &'static str = "aeron";
557    pub const SPY_QUALIFIER: &'static str = "aeron-spy";
558    pub const MAX_URI_LENGTH: usize = 4095;
559}
560
561pub const DRIVER_TIMEOUT_MS_DEFAULT: u64 = 10_000;
562pub const AERON_DIR_PROP_NAME: &str = "aeron.dir";
563pub const AERON_IPC_MEDIA: &str = "aeron:ipc";
564pub const AERON_UDP_MEDIA: &str = "aeron:udp";
565pub const SPY_PREFIX: &str = "aeron-spy:";
566pub const TAG_PREFIX: &str = "tag:";
567
568/// Enum for media types.
569#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
570pub enum Media {
571    Ipc,
572    Udp,
573}
574
575impl Media {
576    pub fn as_str(&self) -> &'static str {
577        match self {
578            Media::Ipc => "ipc",
579            Media::Udp => "udp",
580        }
581    }
582}
583
584/// Enum for control modes.
585#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
586pub enum ControlMode {
587    Manual,
588    Dynamic,
589    /// this is a beta feature useful when dealing with docker containers and networking
590    Response,
591}
592
593impl ControlMode {
594    pub fn as_str(&self) -> &'static str {
595        match self {
596            ControlMode::Manual => "manual",
597            ControlMode::Dynamic => "dynamic",
598            ControlMode::Response => "response",
599        }
600    }
601}
602
603#[cfg(test)]
604#[allow(dead_code)]
605pub(crate) mod test_alloc {
606    use std::alloc::{GlobalAlloc, Layout, System};
607    use std::env;
608    use std::fs::OpenOptions;
609    #[allow(unused_imports)]
610    use std::os::unix::fs::OpenOptionsExt;
611    use std::sync::atomic::{AtomicIsize, Ordering};
612
613    /// A simple global allocator that tracks the net allocation count.
614    /// Used mainly for testing memory leaks or unintended allocations.
615    pub struct TrackingAllocator {
616        allocs: AtomicIsize,
617    }
618
619    impl TrackingAllocator {
620        pub const fn new() -> Self {
621            Self {
622                allocs: AtomicIsize::new(0),
623            }
624        }
625        pub fn current(&self) -> isize {
626            self.allocs.load(Ordering::SeqCst)
627        }
628    }
629
630    unsafe impl GlobalAlloc for TrackingAllocator {
631        unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
632            self.allocs.fetch_add(1, Ordering::SeqCst);
633            System.alloc(layout)
634        }
635        unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
636            self.allocs.fetch_sub(1, Ordering::SeqCst);
637            System.dealloc(ptr, layout)
638        }
639        unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
640            self.allocs.fetch_add(1, Ordering::SeqCst);
641            System.alloc_zeroed(layout)
642        }
643        unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
644            System.realloc(ptr, layout, new_size)
645        }
646    }
647
648    #[global_allocator]
649    static GLOBAL: TrackingAllocator = TrackingAllocator::new();
650
651    /// Returns the current number of net allocations
652    pub fn current_allocs() -> isize {
653        GLOBAL.current()
654    }
655
656    /// Asserts that no allocations occur within the provided closure.
657    /// Uses a file lock to ensure exclusive access across threads/tests.
658    pub fn assert_no_allocation<F: FnOnce()>(f: F) {
659        let tmp = env::temp_dir().join("rusteron_allocation.lck");
660
661        #[cfg(unix)]
662        let file = {
663            OpenOptions::new()
664                .read(true)
665                .write(true)
666                .create(true)
667                .mode(0o600)
668                .open(&tmp)
669                .expect("Failed to open allocation lock file")
670        };
671        #[cfg(not(unix))]
672        let file = {
673            OpenOptions::new()
674                .read(true)
675                .write(true)
676                .create(true)
677                .open(&tmp)
678                .expect("Failed to open allocation lock file")
679        };
680
681        let mut lock = fd_lock::RwLock::new(file);
682        let lock = lock.write().expect("Failed to acquire file lock");
683
684        let before = current_allocs();
685        f();
686        let after = current_allocs();
687        let diff = (after - before).abs();
688        assert!(
689            diff < 50,
690            "Expected no allocation leak, but alloc count changed from {} to {} (diff {})",
691            before,
692            after,
693            diff
694        );
695
696        drop(lock)
697    }
698}
699
700pub trait IntoCString {
701    fn into_c_string(self) -> std::ffi::CString;
702}
703
704impl IntoCString for std::ffi::CString {
705    fn into_c_string(self) -> std::ffi::CString {
706        self
707    }
708}
709
710impl IntoCString for &str {
711    fn into_c_string(self) -> std::ffi::CString {
712        #[cfg(feature = "extra-logging")]
713        log::info!("created c string on heap: {:?}", self);
714
715        std::ffi::CString::new(self).expect("failed to create CString")
716    }
717}
718
719impl IntoCString for String {
720    fn into_c_string(self) -> std::ffi::CString {
721        #[cfg(feature = "extra-logging")]
722        log::info!("created c string on heap: {:?}", self);
723
724        std::ffi::CString::new(self).expect("failed to create CString")
725    }
726}