rusteron_code_gen/
common.rs

1use crate::AeronErrorType::Unknown;
2use std::any::Any;
3#[cfg(feature = "backtrace")]
4use std::backtrace::Backtrace;
5use std::cell::UnsafeCell;
6use std::ops::{Deref, DerefMut};
7
8/// A custom struct for managing C resources with automatic cleanup.
9///
10/// It handles initialisation and clean-up of the resource and ensures that resources
11/// are properly released when they go out of scope.
12#[allow(dead_code)]
13pub struct ManagedCResource<T> {
14    resource: *mut T,
15    cleanup: Option<Box<dyn FnMut(*mut *mut T) -> i32>>,
16    cleanup_struct: bool,
17    borrowed: bool,
18    /// if someone externally rusteron calls close
19    close_already_called: std::cell::Cell<bool>,
20    /// if there is a c method to verify it someone has closed it, only few structs have this functionality
21    check_for_is_closed: Option<Box<dyn Fn(*mut T) -> bool>>,
22    /// this will be called if closed hasn't already happened even if its borrowed
23    auto_close: std::cell::Cell<bool>,
24    // to prevent the dependencies from being dropped as you have a copy here
25    dependencies: UnsafeCell<Vec<std::rc::Rc<dyn Any>>>,
26}
27
28impl<T> std::fmt::Debug for ManagedCResource<T> {
29    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30        let mut debug_struct = f.debug_struct("ManagedCResource");
31
32        if !self.close_already_called.get()
33            && !self.resource.is_null()
34            && !self
35                .check_for_is_closed
36                .as_ref()
37                .map_or(false, |f| f(self.resource))
38        {
39            debug_struct.field("resource", &self.resource);
40        }
41
42        debug_struct
43            .field("type", &std::any::type_name::<T>())
44            .finish()
45    }
46}
47
48impl<T> ManagedCResource<T> {
49    /// Creates a new ManagedCResource with a given initializer and cleanup function.
50    ///
51    /// The initializer is a closure that attempts to initialize the resource.
52    /// If initialization fails, the initializer should return an error code.
53    /// The cleanup function is used to release the resource when it's no longer needed.
54    /// `cleanup_struct` where it should clean up the struct in rust
55    pub fn new(
56        init: impl FnOnce(*mut *mut T) -> i32,
57        cleanup: Option<Box<dyn FnMut(*mut *mut T) -> i32>>,
58        cleanup_struct: bool,
59        check_for_is_closed: Option<Box<dyn Fn(*mut T) -> bool>>,
60    ) -> Result<Self, AeronCError> {
61        let mut resource: *mut T = std::ptr::null_mut();
62        let result = init(&mut resource);
63        if result < 0 || resource.is_null() {
64            return Err(AeronCError::from_code(result));
65        }
66
67        let result = Self {
68            resource,
69            cleanup,
70            cleanup_struct,
71            borrowed: false,
72            close_already_called: std::cell::Cell::new(false),
73            check_for_is_closed,
74            auto_close: std::cell::Cell::new(false),
75            dependencies: UnsafeCell::new(vec![]),
76        };
77        #[cfg(feature = "extra-logging")]
78        log::debug!("created c resource: {:?}", result);
79        Ok(result)
80    }
81
82    pub fn is_closed_already_called(&self) -> bool {
83        self.close_already_called.get()
84            || self.resource.is_null()
85            || self
86                .check_for_is_closed
87                .as_ref()
88                .map_or(false, |f| f(self.resource))
89    }
90
91    pub fn new_borrowed(
92        value: *const T,
93        check_for_is_closed: Option<Box<dyn Fn(*mut T) -> bool>>,
94    ) -> Self {
95        Self {
96            resource: value as *mut _,
97            cleanup: None,
98            cleanup_struct: false,
99            borrowed: true,
100            close_already_called: std::cell::Cell::new(false),
101            check_for_is_closed,
102            auto_close: std::cell::Cell::new(false),
103            dependencies: UnsafeCell::new(vec![]),
104        }
105    }
106
107    /// Gets a raw pointer to the resource.
108    #[inline(always)]
109    pub fn get(&self) -> *mut T {
110        self.resource
111    }
112
113    #[inline(always)]
114    pub fn get_mut(&self) -> &mut T {
115        unsafe { &mut *self.resource }
116    }
117
118    // to prevent the dependencies from being dropped as you have a copy here
119    pub fn add_dependency<D: Any>(&self, dep: D) {
120        unsafe {
121            (*self.dependencies.get()).push(std::rc::Rc::new(dep));
122        }
123    }
124
125    /// Closes the resource by calling the cleanup function.
126    ///
127    /// If cleanup fails, it returns an `AeronError`.
128    pub fn close(&mut self) -> Result<(), AeronCError> {
129        if self.close_already_called.get() {
130            return Ok(());
131        }
132        self.close_already_called.set(true);
133
134        let already_closed = self
135            .check_for_is_closed
136            .as_ref()
137            .map_or(false, |f| f(self.resource));
138
139        if let Some(mut cleanup) = self.cleanup.take() {
140            if !self.resource.is_null() {
141                if !already_closed {
142                    let result = cleanup(&mut self.resource);
143                    if result < 0 {
144                        return Err(AeronCError::from_code(result));
145                    }
146                }
147                self.resource = std::ptr::null_mut();
148            }
149        }
150
151        Ok(())
152    }
153}
154
155impl<T> Drop for ManagedCResource<T> {
156    fn drop(&mut self) {
157        if !self.resource.is_null() {
158            let already_closed = self.close_already_called.get()
159                || self
160                    .check_for_is_closed
161                    .as_ref()
162                    .map_or(false, |f| f(self.resource));
163            if !self.borrowed {
164                self.close_already_called.set(true);
165                let resource = if already_closed {
166                    self.resource
167                } else {
168                    self.resource.clone()
169                };
170
171                if !already_closed {
172                    // Ensure the clean-up function is called when the resource is dropped.
173                    #[cfg(feature = "extra-logging")]
174                    log::debug!("closing c resource: {:?}", self);
175                    let _ = self.close(); // Ignore errors during an automatic drop to avoid panics.
176                }
177
178                if self.cleanup_struct {
179                    #[cfg(feature = "extra-logging")]
180                    log::debug!("closing rust struct resource: {:?}", resource);
181                    unsafe {
182                        let _ = Box::from_raw(resource);
183                    }
184                }
185            }
186        }
187    }
188}
189
190#[derive(Debug)]
191pub enum AeronErrorType {
192    NullOrNotConnected,
193    ClientErrorDriverTimeout,
194    ClientErrorClientTimeout,
195    ClientErrorConductorServiceTimeout,
196    ClientErrorBufferFull,
197    PublicationBackPressured,
198    PublicationAdminAction,
199    PublicationClosed,
200    PublicationMaxPositionExceeded,
201    PublicationError,
202    TimedOut,
203    Unknown(i32),
204}
205
206impl From<AeronErrorType> for AeronCError {
207    fn from(value: AeronErrorType) -> Self {
208        AeronCError::from_code(value.code())
209    }
210}
211
212impl AeronErrorType {
213    pub fn code(&self) -> i32 {
214        match self {
215            AeronErrorType::NullOrNotConnected => -1,
216            AeronErrorType::ClientErrorDriverTimeout => -1000,
217            AeronErrorType::ClientErrorClientTimeout => -1001,
218            AeronErrorType::ClientErrorConductorServiceTimeout => -1002,
219            AeronErrorType::ClientErrorBufferFull => -1003,
220            AeronErrorType::PublicationBackPressured => -2,
221            AeronErrorType::PublicationAdminAction => -3,
222            AeronErrorType::PublicationClosed => -4,
223            AeronErrorType::PublicationMaxPositionExceeded => -5,
224            AeronErrorType::PublicationError => -6,
225            AeronErrorType::TimedOut => -234324,
226            AeronErrorType::Unknown(code) => *code,
227        }
228    }
229
230    pub fn from_code(code: i32) -> Self {
231        match code {
232            -1 => AeronErrorType::NullOrNotConnected,
233            -1000 => AeronErrorType::ClientErrorDriverTimeout,
234            -1001 => AeronErrorType::ClientErrorClientTimeout,
235            -1002 => AeronErrorType::ClientErrorConductorServiceTimeout,
236            -1003 => AeronErrorType::ClientErrorBufferFull,
237            -2 => AeronErrorType::PublicationBackPressured,
238            -3 => AeronErrorType::PublicationAdminAction,
239            -4 => AeronErrorType::PublicationClosed,
240            -5 => AeronErrorType::PublicationMaxPositionExceeded,
241            -6 => AeronErrorType::PublicationError,
242            -234324 => AeronErrorType::TimedOut,
243            _ => Unknown(code),
244        }
245    }
246
247    pub fn to_string(&self) -> &'static str {
248        match self {
249            AeronErrorType::NullOrNotConnected => "Null Value or Not Connected",
250            AeronErrorType::ClientErrorDriverTimeout => "Client Error Driver Timeout",
251            AeronErrorType::ClientErrorClientTimeout => "Client Error Client Timeout",
252            AeronErrorType::ClientErrorConductorServiceTimeout => {
253                "Client Error Conductor Service Timeout"
254            }
255            AeronErrorType::ClientErrorBufferFull => "Client Error Buffer Full",
256            AeronErrorType::PublicationBackPressured => "Publication Back Pressured",
257            AeronErrorType::PublicationAdminAction => "Publication Admin Action",
258            AeronErrorType::PublicationClosed => "Publication Closed",
259            AeronErrorType::PublicationMaxPositionExceeded => "Publication Max Position Exceeded",
260            AeronErrorType::PublicationError => "Publication Error",
261            AeronErrorType::TimedOut => "Timed Out",
262            AeronErrorType::Unknown(_) => "Unknown Error",
263        }
264    }
265}
266
267/// Represents an Aeron-specific error with a code and an optional message.
268///
269/// The error code is derived from Aeron C API calls.
270/// Use `get_message()` to retrieve a human-readable message, if available.
271#[derive(Eq, PartialEq)]
272pub struct AeronCError {
273    pub code: i32,
274}
275
276impl AeronCError {
277    /// Creates an AeronError from the error code returned by Aeron.
278    ///
279    /// Error codes below zero are considered failure.
280    pub fn from_code(code: i32) -> Self {
281        #[cfg(feature = "backtrace")]
282        {
283            if code < 0 {
284                let backtrace = Backtrace::capture();
285                let backtrace = format!("{:?}", backtrace);
286
287                let re =
288                    regex::Regex::new(r#"fn: "([^"]+)", file: "([^"]+)", line: (\d+)"#).unwrap();
289                let mut lines = String::new();
290                re.captures_iter(&backtrace).for_each(|cap| {
291                    let function = &cap[1];
292                    let mut file = cap[2].to_string();
293                    let line = &cap[3];
294                    if file.starts_with("./") {
295                        file = format!("{}/{}", env!("CARGO_MANIFEST_DIR"), &file[2..]);
296                    } else if file.starts_with("/rustc/") {
297                        file = file.split("/").last().unwrap().to_string();
298                    }
299                    // log in intellij friendly error format so can hyperlink to source code in stack trace
300                    lines.push_str(&format!(" {file}:{line} in {function}\n"));
301                });
302
303                log::error!(
304                    "Aeron C error code: {}, kind: '{:?}'\n{}",
305                    code,
306                    AeronErrorType::from_code(code),
307                    lines
308                );
309            }
310        }
311        AeronCError { code }
312    }
313
314    pub fn kind(&self) -> AeronErrorType {
315        AeronErrorType::from_code(self.code)
316    }
317}
318
319impl std::fmt::Display for AeronCError {
320    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
321        write!(f, "Aeron error {}: {:?}", self.code, self.kind())
322    }
323}
324
325impl std::fmt::Debug for AeronCError {
326    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
327        f.debug_struct("AeronCError")
328            .field("code", &self.code)
329            .field("kind", &self.kind())
330            .finish()
331    }
332}
333
334impl std::error::Error for AeronCError {}
335
336/// # Handler
337///
338/// `Handler` is a struct that wraps a raw pointer and a drop flag.
339///
340/// **Important:** `Handler` does not get dropped automatically.
341/// You need to call the `release` method if you want to clear the memory manually.
342///
343/// ## Example
344///
345/// ```no_compile
346/// use rusteron_code_gen::Handler;
347/// let handler = Handler::leak(your_value);
348/// // When you are done with the handler
349/// handler.release();
350/// ```
351pub struct Handler<T> {
352    raw_ptr: *mut T,
353    should_drop: bool,
354}
355
356unsafe impl<T> Send for Handler<T> {}
357unsafe impl<T> Sync for Handler<T> {}
358
359/// Utility method for setting empty handlers
360pub struct Handlers;
361
362impl<T> Handler<T> {
363    pub fn leak(handler: T) -> Self {
364        let raw_ptr = Box::into_raw(Box::new(handler)) as *mut _;
365        Self {
366            raw_ptr,
367            should_drop: true,
368        }
369    }
370
371    pub fn is_none(&self) -> bool {
372        self.raw_ptr.is_null()
373    }
374
375    pub fn as_raw(&self) -> *mut std::os::raw::c_void {
376        self.raw_ptr as *mut std::os::raw::c_void
377    }
378
379    pub fn release(&mut self) {
380        if self.should_drop && !self.raw_ptr.is_null() {
381            unsafe {
382                #[cfg(feature = "extra-logging")]
383                log::debug!("dropping handler {:?}", self.raw_ptr);
384                let _ = Box::from_raw(self.raw_ptr as *mut Box<T>);
385                self.should_drop = false;
386            }
387        }
388    }
389}
390
391impl<T> Deref for Handler<T> {
392    type Target = T;
393
394    fn deref(&self) -> &Self::Target {
395        unsafe { &*self.raw_ptr as &T }
396    }
397}
398
399impl<T> DerefMut for Handler<T> {
400    fn deref_mut(&mut self) -> &mut Self::Target {
401        unsafe { &mut *self.raw_ptr as &mut T }
402    }
403}
404
405pub fn find_unused_udp_port(start_port: u16) -> Option<u16> {
406    let end_port = u16::MAX;
407
408    for port in start_port..=end_port {
409        if is_udp_port_available(port) {
410            return Some(port);
411        }
412    }
413
414    None
415}
416
417pub fn is_udp_port_available(port: u16) -> bool {
418    std::net::UdpSocket::bind(("127.0.0.1", port)).is_ok()
419}
420
421/// Represents the Aeron URI parser and handler.
422pub struct ChannelUri {}
423
424impl ChannelUri {
425    pub const AERON_SCHEME: &'static str = "aeron";
426    pub const SPY_QUALIFIER: &'static str = "aeron-spy";
427    pub const MAX_URI_LENGTH: usize = 4095;
428}
429
430pub const DRIVER_TIMEOUT_MS_DEFAULT: u64 = 10_000;
431pub const AERON_DIR_PROP_NAME: &str = "aeron.dir";
432pub const AERON_IPC_MEDIA: &str = "aeron:ipc";
433pub const AERON_UDP_MEDIA: &str = "aeron:udp";
434pub const SPY_PREFIX: &str = "aeron-spy:";
435pub const TAG_PREFIX: &str = "tag:";
436
437/// Enum for media types.
438#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
439pub enum Media {
440    Ipc,
441    Udp,
442}
443
444impl Media {
445    pub fn as_str(&self) -> &'static str {
446        match self {
447            Media::Ipc => "ipc",
448            Media::Udp => "udp",
449        }
450    }
451}
452
453/// Enum for control modes.
454#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
455pub enum ControlMode {
456    Manual,
457    Dynamic,
458    /// this is a beta feature useful when dealing with docker containers and networking
459    Response,
460}
461
462impl ControlMode {
463    pub fn as_str(&self) -> &'static str {
464        match self {
465            ControlMode::Manual => "manual",
466            ControlMode::Dynamic => "dynamic",
467            ControlMode::Response => "response",
468        }
469    }
470}