1use crate::AeronErrorType::Unknown;
2use std::any::Any;
3#[cfg(feature = "backtrace")]
4use std::backtrace::Backtrace;
5use std::cell::UnsafeCell;
6use std::ops::{Deref, DerefMut};
7
8#[allow(dead_code)]
13pub struct ManagedCResource<T> {
14 resource: *mut T,
15 cleanup: Option<Box<dyn FnMut(*mut *mut T) -> i32>>,
16 cleanup_struct: bool,
17 borrowed: bool,
18 close_already_called: std::cell::Cell<bool>,
20 check_for_is_closed: Option<Box<dyn Fn(*mut T) -> bool>>,
22 auto_close: std::cell::Cell<bool>,
24 dependencies: UnsafeCell<Vec<std::rc::Rc<dyn Any>>>,
26}
27
28impl<T> std::fmt::Debug for ManagedCResource<T> {
29 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30 let mut debug_struct = f.debug_struct("ManagedCResource");
31
32 if !self.close_already_called.get()
33 && !self.resource.is_null()
34 && !self
35 .check_for_is_closed
36 .as_ref()
37 .map_or(false, |f| f(self.resource))
38 {
39 debug_struct.field("resource", &self.resource);
40 }
41
42 debug_struct
43 .field("type", &std::any::type_name::<T>())
44 .finish()
45 }
46}
47
48impl<T> ManagedCResource<T> {
49 pub fn new(
56 init: impl FnOnce(*mut *mut T) -> i32,
57 cleanup: Option<Box<dyn FnMut(*mut *mut T) -> i32>>,
58 cleanup_struct: bool,
59 check_for_is_closed: Option<Box<dyn Fn(*mut T) -> bool>>,
60 ) -> Result<Self, AeronCError> {
61 let mut resource: *mut T = std::ptr::null_mut();
62 let result = init(&mut resource);
63 if result < 0 || resource.is_null() {
64 return Err(AeronCError::from_code(result));
65 }
66
67 let result = Self {
68 resource,
69 cleanup,
70 cleanup_struct,
71 borrowed: false,
72 close_already_called: std::cell::Cell::new(false),
73 check_for_is_closed,
74 auto_close: std::cell::Cell::new(false),
75 dependencies: UnsafeCell::new(vec![]),
76 };
77 #[cfg(feature = "extra-logging")]
78 log::debug!("created c resource: {:?}", result);
79 Ok(result)
80 }
81
82 pub fn is_closed_already_called(&self) -> bool {
83 self.close_already_called.get()
84 || self.resource.is_null()
85 || self
86 .check_for_is_closed
87 .as_ref()
88 .map_or(false, |f| f(self.resource))
89 }
90
91 pub fn new_borrowed(
92 value: *const T,
93 check_for_is_closed: Option<Box<dyn Fn(*mut T) -> bool>>,
94 ) -> Self {
95 Self {
96 resource: value as *mut _,
97 cleanup: None,
98 cleanup_struct: false,
99 borrowed: true,
100 close_already_called: std::cell::Cell::new(false),
101 check_for_is_closed,
102 auto_close: std::cell::Cell::new(false),
103 dependencies: UnsafeCell::new(vec![]),
104 }
105 }
106
107 #[inline(always)]
109 pub fn get(&self) -> *mut T {
110 self.resource
111 }
112
113 #[inline(always)]
114 pub fn get_mut(&self) -> &mut T {
115 unsafe { &mut *self.resource }
116 }
117
118 pub fn add_dependency<D: Any>(&self, dep: D) {
120 unsafe {
121 (*self.dependencies.get()).push(std::rc::Rc::new(dep));
122 }
123 }
124
125 pub fn close(&mut self) -> Result<(), AeronCError> {
129 if self.close_already_called.get() {
130 return Ok(());
131 }
132 self.close_already_called.set(true);
133
134 let already_closed = self
135 .check_for_is_closed
136 .as_ref()
137 .map_or(false, |f| f(self.resource));
138
139 if let Some(mut cleanup) = self.cleanup.take() {
140 if !self.resource.is_null() {
141 if !already_closed {
142 let result = cleanup(&mut self.resource);
143 if result < 0 {
144 return Err(AeronCError::from_code(result));
145 }
146 }
147 self.resource = std::ptr::null_mut();
148 }
149 }
150
151 Ok(())
152 }
153}
154
155impl<T> Drop for ManagedCResource<T> {
156 fn drop(&mut self) {
157 if !self.resource.is_null() {
158 let already_closed = self.close_already_called.get()
159 || self
160 .check_for_is_closed
161 .as_ref()
162 .map_or(false, |f| f(self.resource));
163 if !self.borrowed {
164 self.close_already_called.set(true);
165 let resource = if already_closed {
166 self.resource
167 } else {
168 self.resource.clone()
169 };
170
171 if !already_closed {
172 #[cfg(feature = "extra-logging")]
174 log::debug!("closing c resource: {:?}", self);
175 let _ = self.close(); }
177
178 if self.cleanup_struct {
179 #[cfg(feature = "extra-logging")]
180 log::debug!("closing rust struct resource: {:?}", resource);
181 unsafe {
182 let _ = Box::from_raw(resource);
183 }
184 }
185 }
186 }
187 }
188}
189
190#[derive(Debug)]
191pub enum AeronErrorType {
192 NullOrNotConnected,
193 ClientErrorDriverTimeout,
194 ClientErrorClientTimeout,
195 ClientErrorConductorServiceTimeout,
196 ClientErrorBufferFull,
197 PublicationBackPressured,
198 PublicationAdminAction,
199 PublicationClosed,
200 PublicationMaxPositionExceeded,
201 PublicationError,
202 TimedOut,
203 Unknown(i32),
204}
205
206impl From<AeronErrorType> for AeronCError {
207 fn from(value: AeronErrorType) -> Self {
208 AeronCError::from_code(value.code())
209 }
210}
211
212impl AeronErrorType {
213 pub fn code(&self) -> i32 {
214 match self {
215 AeronErrorType::NullOrNotConnected => -1,
216 AeronErrorType::ClientErrorDriverTimeout => -1000,
217 AeronErrorType::ClientErrorClientTimeout => -1001,
218 AeronErrorType::ClientErrorConductorServiceTimeout => -1002,
219 AeronErrorType::ClientErrorBufferFull => -1003,
220 AeronErrorType::PublicationBackPressured => -2,
221 AeronErrorType::PublicationAdminAction => -3,
222 AeronErrorType::PublicationClosed => -4,
223 AeronErrorType::PublicationMaxPositionExceeded => -5,
224 AeronErrorType::PublicationError => -6,
225 AeronErrorType::TimedOut => -234324,
226 AeronErrorType::Unknown(code) => *code,
227 }
228 }
229
230 pub fn from_code(code: i32) -> Self {
231 match code {
232 -1 => AeronErrorType::NullOrNotConnected,
233 -1000 => AeronErrorType::ClientErrorDriverTimeout,
234 -1001 => AeronErrorType::ClientErrorClientTimeout,
235 -1002 => AeronErrorType::ClientErrorConductorServiceTimeout,
236 -1003 => AeronErrorType::ClientErrorBufferFull,
237 -2 => AeronErrorType::PublicationBackPressured,
238 -3 => AeronErrorType::PublicationAdminAction,
239 -4 => AeronErrorType::PublicationClosed,
240 -5 => AeronErrorType::PublicationMaxPositionExceeded,
241 -6 => AeronErrorType::PublicationError,
242 -234324 => AeronErrorType::TimedOut,
243 _ => Unknown(code),
244 }
245 }
246
247 pub fn to_string(&self) -> &'static str {
248 match self {
249 AeronErrorType::NullOrNotConnected => "Null Value or Not Connected",
250 AeronErrorType::ClientErrorDriverTimeout => "Client Error Driver Timeout",
251 AeronErrorType::ClientErrorClientTimeout => "Client Error Client Timeout",
252 AeronErrorType::ClientErrorConductorServiceTimeout => {
253 "Client Error Conductor Service Timeout"
254 }
255 AeronErrorType::ClientErrorBufferFull => "Client Error Buffer Full",
256 AeronErrorType::PublicationBackPressured => "Publication Back Pressured",
257 AeronErrorType::PublicationAdminAction => "Publication Admin Action",
258 AeronErrorType::PublicationClosed => "Publication Closed",
259 AeronErrorType::PublicationMaxPositionExceeded => "Publication Max Position Exceeded",
260 AeronErrorType::PublicationError => "Publication Error",
261 AeronErrorType::TimedOut => "Timed Out",
262 AeronErrorType::Unknown(_) => "Unknown Error",
263 }
264 }
265}
266
267#[derive(Eq, PartialEq)]
272pub struct AeronCError {
273 pub code: i32,
274}
275
276impl AeronCError {
277 pub fn from_code(code: i32) -> Self {
281 #[cfg(feature = "backtrace")]
282 {
283 if code < 0 {
284 let backtrace = Backtrace::capture();
285 let backtrace = format!("{:?}", backtrace);
286
287 let re =
288 regex::Regex::new(r#"fn: "([^"]+)", file: "([^"]+)", line: (\d+)"#).unwrap();
289 let mut lines = String::new();
290 re.captures_iter(&backtrace).for_each(|cap| {
291 let function = &cap[1];
292 let mut file = cap[2].to_string();
293 let line = &cap[3];
294 if file.starts_with("./") {
295 file = format!("{}/{}", env!("CARGO_MANIFEST_DIR"), &file[2..]);
296 } else if file.starts_with("/rustc/") {
297 file = file.split("/").last().unwrap().to_string();
298 }
299 lines.push_str(&format!(" {file}:{line} in {function}\n"));
301 });
302
303 log::error!(
304 "Aeron C error code: {}, kind: '{:?}'\n{}",
305 code,
306 AeronErrorType::from_code(code),
307 lines
308 );
309 }
310 }
311 AeronCError { code }
312 }
313
314 pub fn kind(&self) -> AeronErrorType {
315 AeronErrorType::from_code(self.code)
316 }
317}
318
319impl std::fmt::Display for AeronCError {
320 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
321 write!(f, "Aeron error {}: {:?}", self.code, self.kind())
322 }
323}
324
325impl std::fmt::Debug for AeronCError {
326 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
327 f.debug_struct("AeronCError")
328 .field("code", &self.code)
329 .field("kind", &self.kind())
330 .finish()
331 }
332}
333
334impl std::error::Error for AeronCError {}
335
336pub struct Handler<T> {
352 raw_ptr: *mut T,
353 should_drop: bool,
354}
355
356unsafe impl<T> Send for Handler<T> {}
357unsafe impl<T> Sync for Handler<T> {}
358
359pub struct Handlers;
361
362impl<T> Handler<T> {
363 pub fn leak(handler: T) -> Self {
364 let raw_ptr = Box::into_raw(Box::new(handler)) as *mut _;
365 Self {
366 raw_ptr,
367 should_drop: true,
368 }
369 }
370
371 pub fn is_none(&self) -> bool {
372 self.raw_ptr.is_null()
373 }
374
375 pub fn as_raw(&self) -> *mut std::os::raw::c_void {
376 self.raw_ptr as *mut std::os::raw::c_void
377 }
378
379 pub fn release(&mut self) {
380 if self.should_drop && !self.raw_ptr.is_null() {
381 unsafe {
382 #[cfg(feature = "extra-logging")]
383 log::debug!("dropping handler {:?}", self.raw_ptr);
384 let _ = Box::from_raw(self.raw_ptr as *mut Box<T>);
385 self.should_drop = false;
386 }
387 }
388 }
389}
390
391impl<T> Deref for Handler<T> {
392 type Target = T;
393
394 fn deref(&self) -> &Self::Target {
395 unsafe { &*self.raw_ptr as &T }
396 }
397}
398
399impl<T> DerefMut for Handler<T> {
400 fn deref_mut(&mut self) -> &mut Self::Target {
401 unsafe { &mut *self.raw_ptr as &mut T }
402 }
403}
404
405pub fn find_unused_udp_port(start_port: u16) -> Option<u16> {
406 let end_port = u16::MAX;
407
408 for port in start_port..=end_port {
409 if is_udp_port_available(port) {
410 return Some(port);
411 }
412 }
413
414 None
415}
416
417pub fn is_udp_port_available(port: u16) -> bool {
418 std::net::UdpSocket::bind(("127.0.0.1", port)).is_ok()
419}
420
421pub struct ChannelUri {}
423
424impl ChannelUri {
425 pub const AERON_SCHEME: &'static str = "aeron";
426 pub const SPY_QUALIFIER: &'static str = "aeron-spy";
427 pub const MAX_URI_LENGTH: usize = 4095;
428}
429
430pub const DRIVER_TIMEOUT_MS_DEFAULT: u64 = 10_000;
431pub const AERON_DIR_PROP_NAME: &str = "aeron.dir";
432pub const AERON_IPC_MEDIA: &str = "aeron:ipc";
433pub const AERON_UDP_MEDIA: &str = "aeron:udp";
434pub const SPY_PREFIX: &str = "aeron-spy:";
435pub const TAG_PREFIX: &str = "tag:";
436
437#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
439pub enum Media {
440 Ipc,
441 Udp,
442}
443
444impl Media {
445 pub fn as_str(&self) -> &'static str {
446 match self {
447 Media::Ipc => "ipc",
448 Media::Udp => "udp",
449 }
450 }
451}
452
453#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
455pub enum ControlMode {
456 Manual,
457 Dynamic,
458 Response,
460}
461
462impl ControlMode {
463 pub fn as_str(&self) -> &'static str {
464 match self {
465 ControlMode::Manual => "manual",
466 ControlMode::Dynamic => "dynamic",
467 ControlMode::Response => "response",
468 }
469 }
470}