1use crate::AeronErrorType::Unknown;
2#[cfg(feature = "backtrace")]
3use std::backtrace::Backtrace;
4use std::cell::UnsafeCell;
5use std::fmt::Formatter;
6use std::mem::MaybeUninit;
7use std::ops::{Deref, DerefMut};
8
9pub enum CResource<T> {
10 OwnedOnHeap(std::rc::Rc<ManagedCResource<T>>),
11 OwnedOnStack(std::mem::MaybeUninit<T>),
13 Borrowed(*mut T),
14}
15
16impl<T: Clone> Clone for CResource<T> {
17 fn clone(&self) -> Self {
18 unsafe {
19 match self {
20 CResource::OwnedOnHeap(r) => CResource::OwnedOnHeap(r.clone()),
21 CResource::OwnedOnStack(r) => {
22 CResource::OwnedOnStack(MaybeUninit::new(r.assume_init_ref().clone()))
23 }
24 CResource::Borrowed(r) => CResource::Borrowed(r.clone()),
25 }
26 }
27 }
28}
29
30impl<T> CResource<T> {
31 #[inline]
32 pub fn get(&self) -> *mut T {
33 match self {
34 CResource::OwnedOnHeap(r) => r.get(),
35 CResource::OwnedOnStack(r) => r.as_ptr() as *mut T,
36 CResource::Borrowed(r) => *r,
37 }
38 }
39
40 #[inline]
41 pub fn add_dependency<D: std::any::Any>(&self, dep: D) {
43 match self {
44 CResource::OwnedOnHeap(r) => r.add_dependency(dep),
45 CResource::OwnedOnStack(_) | CResource::Borrowed(_) => {
46 unreachable!("only owned on heap")
47 }
48 }
49 }
50 #[inline]
51 pub fn get_dependency<V: Clone + 'static>(&self) -> Option<V> {
52 match self {
53 CResource::OwnedOnHeap(r) => r.get_dependency(),
54 CResource::OwnedOnStack(_) | CResource::Borrowed(_) => None,
55 }
56 }
57
58 #[inline]
59 pub fn as_owned(&self) -> Option<&std::rc::Rc<ManagedCResource<T>>> {
60 match self {
61 CResource::OwnedOnHeap(r) => Some(r),
62 CResource::OwnedOnStack(_) | CResource::Borrowed(_) => None,
63 }
64 }
65}
66
67impl<T> std::fmt::Debug for CResource<T> {
68 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69 let name = std::any::type_name::<T>();
70
71 match self {
72 CResource::OwnedOnHeap(r) => {
73 write!(f, "{name} heap({:?})", r)
74 }
75 CResource::OwnedOnStack(r) => {
76 write!(f, "{name} stack({:?})", *r)
77 }
78 CResource::Borrowed(r) => {
79 write!(f, "{name} borrowed ({:?})", r)
80 }
81 }
82 }
83}
84
85#[allow(dead_code)]
90pub struct ManagedCResource<T> {
91 resource: *mut T,
92 cleanup: Option<Box<dyn FnMut(*mut *mut T) -> i32>>,
93 cleanup_struct: bool,
94 close_already_called: std::cell::Cell<bool>,
96 check_for_is_closed: Option<fn(*mut T) -> bool>,
98 auto_close: std::cell::Cell<bool>,
100 dependencies: UnsafeCell<Vec<std::rc::Rc<dyn std::any::Any>>>,
105}
106
107impl<T> std::fmt::Debug for ManagedCResource<T> {
108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 let mut debug_struct = f.debug_struct("ManagedCResource");
110
111 if !self.close_already_called.get()
112 && !self.resource.is_null()
113 && !self
114 .check_for_is_closed
115 .as_ref()
116 .map_or(false, |f| f(self.resource))
117 {
118 debug_struct.field("resource", &self.resource);
119 }
120
121 debug_struct
122 .field("type", &std::any::type_name::<T>())
123 .finish()
124 }
125}
126
127impl<T> ManagedCResource<T> {
128 pub fn new(
135 init: impl FnOnce(*mut *mut T) -> i32,
136 cleanup: Option<Box<dyn FnMut(*mut *mut T) -> i32>>,
137 cleanup_struct: bool,
138 check_for_is_closed: Option<fn(*mut T) -> bool>,
139 ) -> Result<Self, AeronCError> {
140 let resource = Self::initialise(init)?;
141
142 let result = Self {
143 resource,
144 cleanup,
145 cleanup_struct,
146 close_already_called: std::cell::Cell::new(false),
147 check_for_is_closed,
148 auto_close: std::cell::Cell::new(false),
149 dependencies: UnsafeCell::new(vec![]),
150 };
151 #[cfg(feature = "extra-logging")]
152 log::info!("created c resource: {:?}", result);
153 Ok(result)
154 }
155
156 pub fn initialise(
157 init: impl FnOnce(*mut *mut T) -> i32 + Sized,
158 ) -> Result<*mut T, AeronCError> {
159 let mut resource: *mut T = std::ptr::null_mut();
160 let result = init(&mut resource);
161 if result < 0 || resource.is_null() {
162 return Err(AeronCError::from_code(result));
163 }
164 Ok(resource)
165 }
166
167 pub fn is_closed_already_called(&self) -> bool {
168 self.close_already_called.get()
169 || self.resource.is_null()
170 || self
171 .check_for_is_closed
172 .as_ref()
173 .map_or(false, |f| f(self.resource))
174 }
175
176 #[inline(always)]
178 pub fn get(&self) -> *mut T {
179 self.resource
180 }
181
182 #[inline(always)]
183 pub fn get_mut(&self) -> &mut T {
184 unsafe { &mut *self.resource }
185 }
186
187 #[inline]
188 pub fn add_dependency<D: std::any::Any>(&self, dep: D) {
190 if let Some(dep) =
191 (&dep as &dyn std::any::Any).downcast_ref::<std::rc::Rc<dyn std::any::Any>>()
192 {
193 unsafe {
194 (*self.dependencies.get()).push(dep.clone());
195 }
196 } else {
197 unsafe {
198 (*self.dependencies.get()).push(std::rc::Rc::new(dep));
199 }
200 }
201 }
202
203 #[inline]
204 pub fn get_dependency<V: Clone + 'static>(&self) -> Option<V> {
205 unsafe {
206 (*self.dependencies.get())
207 .iter()
208 .filter_map(|x| x.as_ref().downcast_ref::<V>().cloned())
209 .next()
210 }
211 }
212
213 pub fn close(&mut self) -> Result<(), AeronCError> {
217 if self.close_already_called.get() {
218 return Ok(());
219 }
220 self.close_already_called.set(true);
221
222 let already_closed = self
223 .check_for_is_closed
224 .as_ref()
225 .map_or(false, |f| f(self.resource));
226
227 if let Some(mut cleanup) = self.cleanup.take() {
228 if !self.resource.is_null() {
229 if !already_closed {
230 let result = cleanup(&mut self.resource);
231 if result < 0 {
232 return Err(AeronCError::from_code(result));
233 }
234 }
235 self.resource = std::ptr::null_mut();
236 }
237 }
238
239 Ok(())
240 }
241}
242
243impl<T> Drop for ManagedCResource<T> {
244 fn drop(&mut self) {
245 if !self.resource.is_null() {
246 let already_closed = self.close_already_called.get()
247 || self
248 .check_for_is_closed
249 .as_ref()
250 .map_or(false, |f| f(self.resource));
251
252 let resource = if already_closed {
253 self.resource
254 } else {
255 self.resource.clone()
256 };
257
258 if !already_closed {
259 #[cfg(feature = "extra-logging")]
261 log::info!("closing c resource: {:?}", self);
262 let _ = self.close(); }
264 self.close_already_called.set(true);
265
266 if self.cleanup_struct {
267 #[cfg(feature = "extra-logging")]
268 log::info!("closing rust struct resource: {:?}", resource);
269 unsafe {
270 let _ = Box::from_raw(resource);
271 }
272 }
273 }
274 }
275}
276
277#[derive(Debug, PartialOrd, Eq, PartialEq, Clone)]
278pub enum AeronErrorType {
279 NullOrNotConnected,
280 ClientErrorDriverTimeout,
281 ClientErrorClientTimeout,
282 ClientErrorConductorServiceTimeout,
283 ClientErrorBufferFull,
284 PublicationBackPressured,
285 PublicationAdminAction,
286 PublicationClosed,
287 PublicationMaxPositionExceeded,
288 PublicationError,
289 TimedOut,
290 Unknown(i32),
291}
292
293impl From<AeronErrorType> for AeronCError {
294 fn from(value: AeronErrorType) -> Self {
295 AeronCError::from_code(value.code())
296 }
297}
298
299impl AeronErrorType {
300 pub fn code(&self) -> i32 {
301 match self {
302 AeronErrorType::NullOrNotConnected => -1,
303 AeronErrorType::ClientErrorDriverTimeout => -1000,
304 AeronErrorType::ClientErrorClientTimeout => -1001,
305 AeronErrorType::ClientErrorConductorServiceTimeout => -1002,
306 AeronErrorType::ClientErrorBufferFull => -1003,
307 AeronErrorType::PublicationBackPressured => -2,
308 AeronErrorType::PublicationAdminAction => -3,
309 AeronErrorType::PublicationClosed => -4,
310 AeronErrorType::PublicationMaxPositionExceeded => -5,
311 AeronErrorType::PublicationError => -6,
312 AeronErrorType::TimedOut => -234324,
313 AeronErrorType::Unknown(code) => *code,
314 }
315 }
316
317 pub fn is_back_pressured(&self) -> bool {
318 self == &AeronErrorType::PublicationBackPressured
319 }
320
321 pub fn is_admin_action(&self) -> bool {
322 self == &AeronErrorType::PublicationAdminAction
323 }
324
325 pub fn is_back_pressured_or_admin_action(&self) -> bool {
326 self.is_back_pressured() || self.is_admin_action()
327 }
328
329 pub fn from_code(code: i32) -> Self {
330 match code {
331 -1 => AeronErrorType::NullOrNotConnected,
332 -1000 => AeronErrorType::ClientErrorDriverTimeout,
333 -1001 => AeronErrorType::ClientErrorClientTimeout,
334 -1002 => AeronErrorType::ClientErrorConductorServiceTimeout,
335 -1003 => AeronErrorType::ClientErrorBufferFull,
336 -2 => AeronErrorType::PublicationBackPressured,
337 -3 => AeronErrorType::PublicationAdminAction,
338 -4 => AeronErrorType::PublicationClosed,
339 -5 => AeronErrorType::PublicationMaxPositionExceeded,
340 -6 => AeronErrorType::PublicationError,
341 -234324 => AeronErrorType::TimedOut,
342 _ => Unknown(code),
343 }
344 }
345
346 pub fn to_string(&self) -> &'static str {
347 match self {
348 AeronErrorType::NullOrNotConnected => "Null Value or Not Connected",
349 AeronErrorType::ClientErrorDriverTimeout => "Client Error Driver Timeout",
350 AeronErrorType::ClientErrorClientTimeout => "Client Error Client Timeout",
351 AeronErrorType::ClientErrorConductorServiceTimeout => {
352 "Client Error Conductor Service Timeout"
353 }
354 AeronErrorType::ClientErrorBufferFull => "Client Error Buffer Full",
355 AeronErrorType::PublicationBackPressured => "Publication Back Pressured",
356 AeronErrorType::PublicationAdminAction => "Publication Admin Action",
357 AeronErrorType::PublicationClosed => "Publication Closed",
358 AeronErrorType::PublicationMaxPositionExceeded => "Publication Max Position Exceeded",
359 AeronErrorType::PublicationError => "Publication Error",
360 AeronErrorType::TimedOut => "Timed Out",
361 AeronErrorType::Unknown(_) => "Unknown Error",
362 }
363 }
364}
365
366#[derive(Eq, PartialEq, Clone)]
371pub struct AeronCError {
372 pub code: i32,
373}
374
375impl AeronCError {
376 pub fn from_code(code: i32) -> Self {
380 #[cfg(feature = "backtrace")]
381 {
382 if code < 0 {
383 let backtrace = Backtrace::capture();
384 let backtrace = format!("{:?}", backtrace);
385
386 let re =
387 regex::Regex::new(r#"fn: "([^"]+)", file: "([^"]+)", line: (\d+)"#).unwrap();
388 let mut lines = String::new();
389 re.captures_iter(&backtrace).for_each(|cap| {
390 let function = &cap[1];
391 let mut file = cap[2].to_string();
392 let line = &cap[3];
393 if file.starts_with("./") {
394 file = format!("{}/{}", env!("CARGO_MANIFEST_DIR"), &file[2..]);
395 } else if file.starts_with("/rustc/") {
396 file = file.split("/").last().unwrap().to_string();
397 }
398 lines.push_str(&format!(" {file}:{line} in {function}\n"));
400 });
401
402 log::error!(
403 "Aeron C error code: {}, kind: '{:?}'\n{}",
404 code,
405 AeronErrorType::from_code(code),
406 lines
407 );
408 }
409 }
410 AeronCError { code }
411 }
412
413 pub fn kind(&self) -> AeronErrorType {
414 AeronErrorType::from_code(self.code)
415 }
416
417 pub fn is_back_pressured(&self) -> bool {
418 self.kind().is_back_pressured()
419 }
420
421 pub fn is_admin_action(&self) -> bool {
422 self.kind().is_admin_action()
423 }
424
425 pub fn is_back_pressured_or_admin_action(&self) -> bool {
426 self.kind().is_back_pressured_or_admin_action()
427 }
428}
429
430impl std::fmt::Display for AeronCError {
431 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
432 write!(f, "Aeron error {}: {:?}", self.code, self.kind())
433 }
434}
435
436impl std::fmt::Debug for AeronCError {
437 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
438 f.debug_struct("AeronCError")
439 .field("code", &self.code)
440 .field("kind", &self.kind())
441 .finish()
442 }
443}
444
445impl std::error::Error for AeronCError {}
446
447pub struct Handler<T> {
466 raw_ptr: *mut T,
467 should_drop: bool,
468}
469
470unsafe impl<T> Send for Handler<T> {}
471unsafe impl<T> Sync for Handler<T> {}
472
473pub struct Handlers;
475
476impl<T> Handler<T> {
477 pub fn leak(handler: T) -> Self {
478 let raw_ptr = Box::into_raw(Box::new(handler)) as *mut _;
479 #[cfg(feature = "extra-logging")]
480 log::info!("creating handler {:?}", raw_ptr);
481 Self {
482 raw_ptr,
483 should_drop: true,
484 }
485 }
486
487 pub fn is_none(&self) -> bool {
488 self.raw_ptr.is_null()
489 }
490
491 pub fn as_raw(&self) -> *mut std::os::raw::c_void {
492 self.raw_ptr as *mut std::os::raw::c_void
493 }
494
495 pub fn release(&mut self) {
496 if self.should_drop && !self.raw_ptr.is_null() {
497 unsafe {
498 #[cfg(feature = "extra-logging")]
499 log::info!("dropping handler {:?}", self.raw_ptr);
500 let _ = Box::from_raw(self.raw_ptr as *mut T);
501 self.should_drop = false;
502 }
503 }
504 }
505}
506
507impl<T> Deref for Handler<T> {
508 type Target = T;
509
510 fn deref(&self) -> &Self::Target {
511 unsafe { &*self.raw_ptr as &T }
512 }
513}
514
515impl<T> DerefMut for Handler<T> {
516 fn deref_mut(&mut self) -> &mut Self::Target {
517 unsafe { &mut *self.raw_ptr as &mut T }
518 }
519}
520
521pub fn find_unused_udp_port(start_port: u16) -> Option<u16> {
522 let end_port = u16::MAX;
523
524 for port in start_port..=end_port {
525 if is_udp_port_available(port) {
526 return Some(port);
527 }
528 }
529
530 None
531}
532
533pub fn is_udp_port_available(port: u16) -> bool {
534 std::net::UdpSocket::bind(("127.0.0.1", port)).is_ok()
535}
536
537pub struct ChannelUri {}
539
540impl ChannelUri {
541 pub const AERON_SCHEME: &'static str = "aeron";
542 pub const SPY_QUALIFIER: &'static str = "aeron-spy";
543 pub const MAX_URI_LENGTH: usize = 4095;
544}
545
546pub const DRIVER_TIMEOUT_MS_DEFAULT: u64 = 10_000;
547pub const AERON_DIR_PROP_NAME: &str = "aeron.dir";
548pub const AERON_IPC_MEDIA: &str = "aeron:ipc";
549pub const AERON_UDP_MEDIA: &str = "aeron:udp";
550pub const SPY_PREFIX: &str = "aeron-spy:";
551pub const TAG_PREFIX: &str = "tag:";
552
553#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
555pub enum Media {
556 Ipc,
557 Udp,
558}
559
560impl Media {
561 pub fn as_str(&self) -> &'static str {
562 match self {
563 Media::Ipc => "ipc",
564 Media::Udp => "udp",
565 }
566 }
567}
568
569#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
571pub enum ControlMode {
572 Manual,
573 Dynamic,
574 Response,
576}
577
578impl ControlMode {
579 pub fn as_str(&self) -> &'static str {
580 match self {
581 ControlMode::Manual => "manual",
582 ControlMode::Dynamic => "dynamic",
583 ControlMode::Response => "response",
584 }
585 }
586}
587
588#[cfg(test)]
589#[allow(dead_code)]
590pub(crate) mod test_alloc {
591 use std::alloc::{GlobalAlloc, Layout, System};
592 use std::sync::atomic::{AtomicIsize, Ordering};
593
594 pub struct CountingAllocator {
599 allocs: AtomicIsize,
600 }
601
602 impl CountingAllocator {
603 pub const fn new() -> Self {
604 Self {
605 allocs: AtomicIsize::new(0),
606 }
607 }
608 fn current(&self) -> isize {
610 self.allocs.load(Ordering::SeqCst)
611 }
612 }
613
614 unsafe impl GlobalAlloc for CountingAllocator {
615 unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
616 self.allocs.fetch_add(1, Ordering::SeqCst);
617 System.alloc(layout)
618 }
619 unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
620 self.allocs.fetch_sub(1, Ordering::SeqCst);
621 System.dealloc(ptr, layout)
622 }
623 }
624
625 #[global_allocator]
626 static GLOBAL: CountingAllocator = CountingAllocator::new();
627
628 pub fn current_allocs() -> isize {
630 GLOBAL.current()
631 }
632}
633
634pub trait IntoCString {
635 fn into_c_string(self) -> std::ffi::CString;
636}
637
638impl IntoCString for std::ffi::CString {
639 fn into_c_string(self) -> std::ffi::CString {
640 self
641 }
642}
643
644impl IntoCString for &str {
645 fn into_c_string(self) -> std::ffi::CString {
646 #[cfg(feature = "extra-logging")]
647 log::info!("created c string on heap: {:?}", self);
648
649 std::ffi::CString::new(self).expect("failed to create CString")
650 }
651}
652
653impl IntoCString for String {
654 fn into_c_string(self) -> std::ffi::CString {
655 #[cfg(feature = "extra-logging")]
656 log::info!("created c string on heap: {:?}", self);
657
658 std::ffi::CString::new(self).expect("failed to create CString")
659 }
660}