1use crate::AeronErrorType::Unknown;
2#[cfg(feature = "backtrace")]
3use std::backtrace::Backtrace;
4use std::cell::UnsafeCell;
5use std::fmt::Formatter;
6use std::mem::MaybeUninit;
7use std::ops::{Deref, DerefMut};
8pub enum CResource<T> {
9 OwnedOnHeap(std::rc::Rc<ManagedCResource<T>>),
10 OwnedOnStack(std::mem::MaybeUninit<T>),
12 Borrowed(*mut T),
13}
14
15impl<T: Clone> Clone for CResource<T> {
16 fn clone(&self) -> Self {
17 unsafe {
18 match self {
19 CResource::OwnedOnHeap(r) => CResource::OwnedOnHeap(r.clone()),
20 CResource::OwnedOnStack(r) => {
21 CResource::OwnedOnStack(MaybeUninit::new(r.assume_init_ref().clone()))
22 }
23 CResource::Borrowed(r) => CResource::Borrowed(r.clone()),
24 }
25 }
26 }
27}
28
29impl<T> CResource<T> {
30 #[inline]
31 pub fn get(&self) -> *mut T {
32 match self {
33 CResource::OwnedOnHeap(r) => r.get(),
34 CResource::OwnedOnStack(r) => r.as_ptr() as *mut T,
35 CResource::Borrowed(r) => *r,
36 }
37 }
38
39 #[inline]
40 pub fn add_dependency<D: std::any::Any>(&self, dep: D) {
42 match self {
43 CResource::OwnedOnHeap(r) => r.add_dependency(dep),
44 CResource::OwnedOnStack(_) | CResource::Borrowed(_) => {
45 unreachable!("only owned on heap")
46 }
47 }
48 }
49 #[inline]
50 pub fn get_dependency<V: Clone + 'static>(&self) -> Option<V> {
51 match self {
52 CResource::OwnedOnHeap(r) => r.get_dependency(),
53 CResource::OwnedOnStack(_) | CResource::Borrowed(_) => None,
54 }
55 }
56
57 #[inline]
58 pub fn as_owned(&self) -> Option<&std::rc::Rc<ManagedCResource<T>>> {
59 match self {
60 CResource::OwnedOnHeap(r) => Some(r),
61 CResource::OwnedOnStack(_) | CResource::Borrowed(_) => None,
62 }
63 }
64}
65
66impl<T> std::fmt::Debug for CResource<T> {
67 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68 let name = std::any::type_name::<T>();
69
70 match self {
71 CResource::OwnedOnHeap(r) => {
72 write!(f, "{name} heap({:?})", r)
73 }
74 CResource::OwnedOnStack(r) => {
75 write!(f, "{name} stack({:?})", *r)
76 }
77 CResource::Borrowed(r) => {
78 write!(f, "{name} borrowed ({:?})", r)
79 }
80 }
81 }
82}
83
84#[allow(dead_code)]
89pub struct ManagedCResource<T> {
90 resource: *mut T,
91 cleanup: Option<Box<dyn FnMut(*mut *mut T) -> i32>>,
92 cleanup_struct: bool,
93 close_already_called: std::cell::Cell<bool>,
95 check_for_is_closed: Option<fn(*mut T) -> bool>,
97 auto_close: std::cell::Cell<bool>,
99 resource_released: std::cell::Cell<bool>,
101 dependencies: UnsafeCell<Vec<std::rc::Rc<dyn std::any::Any>>>,
106}
107
108impl<T> std::fmt::Debug for ManagedCResource<T> {
109 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110 let mut debug_struct = f.debug_struct("ManagedCResource");
111
112 if !self.close_already_called.get()
113 && !self.resource.is_null()
114 && !self
115 .check_for_is_closed
116 .as_ref()
117 .map_or(false, |f| f(self.resource))
118 {
119 debug_struct.field("resource", &self.resource);
120 }
121
122 debug_struct
123 .field("type", &std::any::type_name::<T>())
124 .finish()
125 }
126}
127
128impl<T> ManagedCResource<T> {
129 pub fn new(
136 init: impl FnOnce(*mut *mut T) -> i32,
137 cleanup: Option<Box<dyn FnMut(*mut *mut T) -> i32>>,
138 cleanup_struct: bool,
139 check_for_is_closed: Option<fn(*mut T) -> bool>,
140 ) -> Result<Self, AeronCError> {
141 let resource = Self::initialise(init)?;
142
143 let result = Self {
144 resource,
145 cleanup,
146 cleanup_struct,
147 close_already_called: std::cell::Cell::new(false),
148 check_for_is_closed,
149 auto_close: std::cell::Cell::new(false),
150 resource_released: std::cell::Cell::new(false),
151 dependencies: UnsafeCell::new(vec![]),
152 };
153 #[cfg(feature = "extra-logging")]
154 log::info!("created c resource: {:?}", result);
155 Ok(result)
156 }
157
158 pub fn initialise(
159 init: impl FnOnce(*mut *mut T) -> i32 + Sized,
160 ) -> Result<*mut T, AeronCError> {
161 let mut resource: *mut T = std::ptr::null_mut();
162 let result = init(&mut resource);
163 if result < 0 || resource.is_null() {
164 return Err(AeronCError::from_code(result));
165 }
166 Ok(resource)
167 }
168
169 pub fn is_closed_already_called(&self) -> bool {
170 self.close_already_called.get()
171 || self.resource.is_null()
172 || self
173 .check_for_is_closed
174 .as_ref()
175 .map_or(false, |f| f(self.resource))
176 }
177
178 #[inline(always)]
180 pub fn get(&self) -> *mut T {
181 self.resource
182 }
183
184 #[inline(always)]
185 pub fn get_mut(&self) -> &mut T {
186 unsafe { &mut *self.resource }
187 }
188
189 #[inline]
190 pub fn add_dependency<D: std::any::Any>(&self, dep: D) {
192 if let Some(dep) =
193 (&dep as &dyn std::any::Any).downcast_ref::<std::rc::Rc<dyn std::any::Any>>()
194 {
195 unsafe {
196 (*self.dependencies.get()).push(dep.clone());
197 }
198 } else {
199 unsafe {
200 (*self.dependencies.get()).push(std::rc::Rc::new(dep));
201 }
202 }
203 }
204
205 #[inline]
206 pub fn get_dependency<V: Clone + 'static>(&self) -> Option<V> {
207 unsafe {
208 (*self.dependencies.get())
209 .iter()
210 .filter_map(|x| x.as_ref().downcast_ref::<V>().cloned())
211 .next()
212 }
213 }
214
215 #[inline]
216 pub fn is_resource_released(&self) -> bool {
217 self.resource_released.get()
218 }
219
220 #[inline]
221 pub fn mark_resource_released(&self) {
222 self.resource_released.set(true);
223 }
224
225 pub fn close(&mut self) -> Result<(), AeronCError> {
229 if self.close_already_called.get() {
230 return Ok(());
231 }
232 self.close_already_called.set(true);
233
234 let already_closed = self
235 .check_for_is_closed
236 .as_ref()
237 .map_or(false, |f| f(self.resource));
238
239 if let Some(mut cleanup) = self.cleanup.take() {
240 if !self.resource.is_null() {
241 if !already_closed {
242 let result = cleanup(&mut self.resource);
243 if result < 0 {
244 return Err(AeronCError::from_code(result));
245 }
246 }
247 self.resource = std::ptr::null_mut();
248 }
249 }
250
251 Ok(())
252 }
253}
254
255impl<T> Drop for ManagedCResource<T> {
256 fn drop(&mut self) {
257 if !self.resource.is_null() {
258 let already_closed = self.close_already_called.get()
259 || self
260 .check_for_is_closed
261 .as_ref()
262 .map_or(false, |f| f(self.resource));
263
264 let resource = if already_closed {
265 self.resource
266 } else {
267 self.resource.clone()
268 };
269
270 if !already_closed {
271 #[cfg(feature = "extra-logging")]
273 log::info!("closing c resource: {:?}", self);
274 let _ = self.close(); }
276 self.close_already_called.set(true);
277
278 if self.cleanup_struct {
279 #[cfg(feature = "extra-logging")]
280 log::info!("closing rust struct resource: {:?}", resource);
281 unsafe {
282 let _ = Box::from_raw(resource);
283 }
284 }
285 }
286 }
287}
288
289#[derive(Debug, PartialOrd, Eq, PartialEq, Clone)]
290pub enum AeronErrorType {
291 GenericError,
292 ClientErrorDriverTimeout,
293 ClientErrorClientTimeout,
294 ClientErrorConductorServiceTimeout,
295 ClientErrorBufferFull,
296 PublicationBackPressured,
297 PublicationAdminAction,
298 PublicationClosed,
299 PublicationMaxPositionExceeded,
300 PublicationError,
301 TimedOut,
302 Unknown(i32),
303}
304
305impl From<AeronErrorType> for AeronCError {
306 fn from(value: AeronErrorType) -> Self {
307 AeronCError::from_code(value.code())
308 }
309}
310
311impl AeronErrorType {
312 pub fn code(&self) -> i32 {
313 match self {
314 AeronErrorType::GenericError => -1,
315 AeronErrorType::ClientErrorDriverTimeout => -1000,
316 AeronErrorType::ClientErrorClientTimeout => -1001,
317 AeronErrorType::ClientErrorConductorServiceTimeout => -1002,
318 AeronErrorType::ClientErrorBufferFull => -1003,
319 AeronErrorType::PublicationBackPressured => -2,
320 AeronErrorType::PublicationAdminAction => -3,
321 AeronErrorType::PublicationClosed => -4,
322 AeronErrorType::PublicationMaxPositionExceeded => -5,
323 AeronErrorType::PublicationError => -6,
324 AeronErrorType::TimedOut => -234324,
325 AeronErrorType::Unknown(code) => *code,
326 }
327 }
328
329 pub fn is_back_pressured(&self) -> bool {
330 self == &AeronErrorType::PublicationBackPressured
331 }
332
333 pub fn is_admin_action(&self) -> bool {
334 self == &AeronErrorType::PublicationAdminAction
335 }
336
337 pub fn is_back_pressured_or_admin_action(&self) -> bool {
338 self.is_back_pressured() || self.is_admin_action()
339 }
340
341 pub fn from_code(code: i32) -> Self {
342 match code {
343 -1 => AeronErrorType::GenericError,
344 -1000 => AeronErrorType::ClientErrorDriverTimeout,
345 -1001 => AeronErrorType::ClientErrorClientTimeout,
346 -1002 => AeronErrorType::ClientErrorConductorServiceTimeout,
347 -1003 => AeronErrorType::ClientErrorBufferFull,
348 -2 => AeronErrorType::PublicationBackPressured,
349 -3 => AeronErrorType::PublicationAdminAction,
350 -4 => AeronErrorType::PublicationClosed,
351 -5 => AeronErrorType::PublicationMaxPositionExceeded,
352 -6 => AeronErrorType::PublicationError,
353 -234324 => AeronErrorType::TimedOut,
354 _ => Unknown(code),
355 }
356 }
357
358 pub fn to_string(&self) -> &'static str {
359 match self {
360 AeronErrorType::GenericError => "Generic Error",
361 AeronErrorType::ClientErrorDriverTimeout => "Client Error Driver Timeout",
362 AeronErrorType::ClientErrorClientTimeout => "Client Error Client Timeout",
363 AeronErrorType::ClientErrorConductorServiceTimeout => {
364 "Client Error Conductor Service Timeout"
365 }
366 AeronErrorType::ClientErrorBufferFull => "Client Error Buffer Full",
367 AeronErrorType::PublicationBackPressured => "Publication Back Pressured",
368 AeronErrorType::PublicationAdminAction => "Publication Admin Action",
369 AeronErrorType::PublicationClosed => "Publication Closed",
370 AeronErrorType::PublicationMaxPositionExceeded => "Publication Max Position Exceeded",
371 AeronErrorType::PublicationError => "Publication Error",
372 AeronErrorType::TimedOut => "Timed Out",
373 AeronErrorType::Unknown(_) => "Unknown Error",
374 }
375 }
376}
377
378#[derive(Eq, PartialEq, Clone)]
383pub struct AeronCError {
384 pub code: i32,
385}
386
387impl AeronCError {
388 pub fn from_code(code: i32) -> Self {
392 #[cfg(feature = "backtrace")]
393 {
394 if code < 0 {
395 let backtrace = Backtrace::capture();
396 let backtrace = format!("{:?}", backtrace);
397
398 let re =
399 regex::Regex::new(r#"fn: "([^"]+)", file: "([^"]+)", line: (\d+)"#).unwrap();
400 let mut lines = String::new();
401 re.captures_iter(&backtrace).for_each(|cap| {
402 let function = &cap[1];
403 let mut file = cap[2].to_string();
404 let line = &cap[3];
405 if file.starts_with("./") {
406 file = format!("{}/{}", env!("CARGO_MANIFEST_DIR"), &file[2..]);
407 } else if file.starts_with("/rustc/") {
408 file = file.split("/").last().unwrap().to_string();
409 }
410 lines.push_str(&format!(" {file}:{line} in {function}\n"));
412 });
413
414 log::error!(
415 "Aeron C error code: {}, kind: '{:?}'\n{}",
416 code,
417 AeronErrorType::from_code(code),
418 lines
419 );
420 }
421 }
422 AeronCError { code }
423 }
424
425 pub fn kind(&self) -> AeronErrorType {
426 AeronErrorType::from_code(self.code)
427 }
428
429 pub fn is_back_pressured(&self) -> bool {
430 self.kind().is_back_pressured()
431 }
432
433 pub fn is_admin_action(&self) -> bool {
434 self.kind().is_admin_action()
435 }
436
437 pub fn is_back_pressured_or_admin_action(&self) -> bool {
438 self.kind().is_back_pressured_or_admin_action()
439 }
440}
441
442pub struct Handler<T> {
460 raw_ptr: *mut T,
461 should_drop: bool,
462}
463
464unsafe impl<T> Send for Handler<T> {}
465unsafe impl<T> Sync for Handler<T> {}
466
467pub struct Handlers;
469
470impl<T> Handler<T> {
471 pub fn leak(handler: T) -> Self {
472 let raw_ptr = Box::into_raw(Box::new(handler)) as *mut _;
473 #[cfg(feature = "extra-logging")]
474 log::info!("creating handler {:?}", raw_ptr);
475 Self {
476 raw_ptr,
477 should_drop: true,
478 }
479 }
480
481 pub fn is_none(&self) -> bool {
482 self.raw_ptr.is_null()
483 }
484
485 pub fn as_raw(&self) -> *mut std::os::raw::c_void {
486 self.raw_ptr as *mut std::os::raw::c_void
487 }
488
489 pub fn release(&mut self) {
490 if self.should_drop && !self.raw_ptr.is_null() {
491 unsafe {
492 #[cfg(feature = "extra-logging")]
493 log::info!("dropping handler {:?}", self.raw_ptr);
494 let _ = Box::from_raw(self.raw_ptr as *mut T);
495 self.should_drop = false;
496 }
497 }
498 }
499
500 pub unsafe fn new(raw_ptr: *mut T, should_drop: bool) -> Self {
501 Self {
502 raw_ptr,
503 should_drop,
504 }
505 }
506}
507
508impl<T> Drop for Handler<T> {
509 fn drop(&mut self) {
510 if self.should_drop && !self.raw_ptr.is_null() {
511 log::error!(
512 "Handler<{}> at {:?} is being dropped but release() was never called — \
513 memory leak: {} bytes. Call release() explicitly when the C side no longer holds the pointer.",
514 std::any::type_name::<T>(),
515 self.raw_ptr,
516 std::mem::size_of::<T>(),
517 );
518 }
519 }
520}
521
522impl<T> Deref for Handler<T> {
523 type Target = T;
524
525 fn deref(&self) -> &Self::Target {
526 unsafe { &*self.raw_ptr as &T }
527 }
528}
529
530impl<T> DerefMut for Handler<T> {
531 fn deref_mut(&mut self) -> &mut Self::Target {
532 unsafe { &mut *self.raw_ptr as &mut T }
533 }
534}
535
536pub fn find_unused_udp_port(start_port: u16) -> Option<u16> {
537 let end_port = u16::MAX;
538
539 for port in start_port..=end_port {
540 if is_udp_port_available(port) {
541 return Some(port);
542 }
543 }
544
545 None
546}
547
548pub fn is_udp_port_available(port: u16) -> bool {
549 std::net::UdpSocket::bind(("127.0.0.1", port)).is_ok()
550}
551
552pub struct ChannelUri {}
554
555impl ChannelUri {
556 pub const AERON_SCHEME: &'static str = "aeron";
557 pub const SPY_QUALIFIER: &'static str = "aeron-spy";
558 pub const MAX_URI_LENGTH: usize = 4095;
559}
560
561pub const DRIVER_TIMEOUT_MS_DEFAULT: u64 = 10_000;
562pub const AERON_DIR_PROP_NAME: &str = "aeron.dir";
563pub const AERON_IPC_MEDIA: &str = "aeron:ipc";
564pub const AERON_UDP_MEDIA: &str = "aeron:udp";
565pub const SPY_PREFIX: &str = "aeron-spy:";
566pub const TAG_PREFIX: &str = "tag:";
567
568#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
570pub enum Media {
571 Ipc,
572 Udp,
573}
574
575impl Media {
576 pub fn as_str(&self) -> &'static str {
577 match self {
578 Media::Ipc => "ipc",
579 Media::Udp => "udp",
580 }
581 }
582}
583
584#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
586pub enum ControlMode {
587 Manual,
588 Dynamic,
589 Response,
591}
592
593impl ControlMode {
594 pub fn as_str(&self) -> &'static str {
595 match self {
596 ControlMode::Manual => "manual",
597 ControlMode::Dynamic => "dynamic",
598 ControlMode::Response => "response",
599 }
600 }
601}
602
603#[cfg(test)]
604#[allow(dead_code)]
605pub(crate) mod test_alloc {
606 use std::alloc::{GlobalAlloc, Layout, System};
607 use std::env;
608 use std::fs::OpenOptions;
609 #[allow(unused_imports)]
610 use std::os::unix::fs::OpenOptionsExt;
611 use std::sync::atomic::{AtomicIsize, Ordering};
612
613 pub struct TrackingAllocator {
616 allocs: AtomicIsize,
617 }
618
619 impl TrackingAllocator {
620 pub const fn new() -> Self {
621 Self {
622 allocs: AtomicIsize::new(0),
623 }
624 }
625 pub fn current(&self) -> isize {
626 self.allocs.load(Ordering::SeqCst)
627 }
628 }
629
630 unsafe impl GlobalAlloc for TrackingAllocator {
631 unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
632 self.allocs.fetch_add(1, Ordering::SeqCst);
633 System.alloc(layout)
634 }
635 unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
636 self.allocs.fetch_sub(1, Ordering::SeqCst);
637 System.dealloc(ptr, layout)
638 }
639 unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
640 self.allocs.fetch_add(1, Ordering::SeqCst);
641 System.alloc_zeroed(layout)
642 }
643 unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
644 System.realloc(ptr, layout, new_size)
645 }
646 }
647
648 #[global_allocator]
649 static GLOBAL: TrackingAllocator = TrackingAllocator::new();
650
651 pub fn current_allocs() -> isize {
653 GLOBAL.current()
654 }
655
656 pub fn assert_no_allocation<F: FnOnce()>(f: F) {
659 let tmp = env::temp_dir().join("rusteron_allocation.lck");
660
661 #[cfg(unix)]
662 let file = {
663 OpenOptions::new()
664 .read(true)
665 .write(true)
666 .create(true)
667 .mode(0o600)
668 .open(&tmp)
669 .expect("Failed to open allocation lock file")
670 };
671 #[cfg(not(unix))]
672 let file = {
673 OpenOptions::new()
674 .read(true)
675 .write(true)
676 .create(true)
677 .open(&tmp)
678 .expect("Failed to open allocation lock file")
679 };
680
681 let mut lock = fd_lock::RwLock::new(file);
682 let lock = lock.write().expect("Failed to acquire file lock");
683
684 let before = current_allocs();
685 f();
686 let after = current_allocs();
687 let diff = (after - before).abs();
688 assert!(
689 diff < 50,
690 "Expected no allocation leak, but alloc count changed from {} to {} (diff {})",
691 before,
692 after,
693 diff
694 );
695
696 drop(lock)
697 }
698}
699
700pub trait IntoCString {
701 fn into_c_string(self) -> std::ffi::CString;
702}
703
704impl IntoCString for std::ffi::CString {
705 fn into_c_string(self) -> std::ffi::CString {
706 self
707 }
708}
709
710impl IntoCString for &str {
711 fn into_c_string(self) -> std::ffi::CString {
712 #[cfg(feature = "extra-logging")]
713 log::info!("created c string on heap: {:?}", self);
714
715 std::ffi::CString::new(self).expect("failed to create CString")
716 }
717}
718
719impl IntoCString for String {
720 fn into_c_string(self) -> std::ffi::CString {
721 #[cfg(feature = "extra-logging")]
722 log::info!("created c string on heap: {:?}", self);
723
724 std::ffi::CString::new(self).expect("failed to create CString")
725 }
726}