1use crate::AeronErrorType::Unknown;
2#[cfg(feature = "backtrace")]
3use std::backtrace::Backtrace;
4use std::fmt::{Debug, Formatter};
5use std::ops::{Deref, DerefMut};
6use std::{any, fmt, ptr};
7
8pub struct ManagedCResource<T> {
13 resource: *mut T,
14 cleanup: Option<Box<dyn FnMut(*mut *mut T) -> i32>>,
15 cleanup_struct: bool,
16 borrowed: bool,
17}
18
19impl<T> Debug for ManagedCResource<T> {
20 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
21 f.debug_struct("ManagedCResource")
22 .field("resource", &self.resource)
23 .field("type", &any::type_name::<T>())
24 .finish()
25 }
26}
27
28impl<T> ManagedCResource<T> {
29 pub fn new(
36 init: impl FnOnce(*mut *mut T) -> i32,
37 cleanup: Option<Box<dyn FnMut(*mut *mut T) -> i32>>,
38 cleanup_struct: bool,
39 ) -> Result<Self, AeronCError> {
40 let mut resource: *mut T = ptr::null_mut();
41 let result = init(&mut resource);
42 if result < 0 || resource.is_null() {
43 return Err(AeronCError::from_code(result));
44 }
45
46 let result = Self {
47 resource,
48 cleanup,
49 cleanup_struct,
50 borrowed: false,
51 };
52 #[cfg(feature = "extra-logging")]
53 log::debug!("created c resource: {:?}", result);
54 Ok(result)
55 }
56
57 pub fn new_borrowed(value: *const T) -> Self {
58 Self {
59 resource: value as *mut _,
60 cleanup: None,
61 cleanup_struct: false,
62 borrowed: true,
63 }
64 }
65
66 #[inline(always)]
68 pub fn get(&self) -> *mut T {
69 self.resource
70 }
71
72 #[inline(always)]
73 pub fn get_mut(&self) -> &mut T {
74 unsafe { &mut *self.resource }
75 }
76
77 pub fn close(&mut self) -> Result<(), AeronCError> {
81 if let Some(mut cleanup) = self.cleanup.take() {
82 if !self.resource.is_null() {
83 let result = cleanup(&mut self.resource);
84 if result < 0 {
85 return Err(AeronCError::from_code(result));
86 }
87 self.resource = std::ptr::null_mut();
88 }
89 }
90
91 Ok(())
92 }
93}
94
95impl<T> Drop for ManagedCResource<T> {
96 fn drop(&mut self) {
97 if !self.resource.is_null() && !self.borrowed {
98 let resource = self.resource.clone();
99 #[cfg(feature = "extra-logging")]
101 log::debug!("closing c resource: {:?}", self);
102 let _ = self.close(); if self.cleanup_struct {
105 #[cfg(feature = "extra-logging")]
106 log::debug!("closing rust struct resource: {:?}", resource);
107 unsafe {
108 let _ = Box::from_raw(resource);
109 }
110 }
111 }
112 }
113}
114
115#[derive(Debug)]
116pub enum AeronErrorType {
117 NullOrNotConnected,
118 ClientErrorDriverTimeout,
119 ClientErrorClientTimeout,
120 ClientErrorConductorServiceTimeout,
121 ClientErrorBufferFull,
122 PublicationBackPressured,
123 PublicationAdminAction,
124 PublicationClosed,
125 PublicationMaxPositionExceeded,
126 PublicationError,
127 TimedOut,
128 Unknown(i32),
129}
130
131impl From<AeronErrorType> for AeronCError {
132 fn from(value: AeronErrorType) -> Self {
133 AeronCError::from_code(value.code())
134 }
135}
136
137impl AeronErrorType {
138 pub fn code(&self) -> i32 {
139 match self {
140 AeronErrorType::NullOrNotConnected => -1,
141 AeronErrorType::ClientErrorDriverTimeout => -1000,
142 AeronErrorType::ClientErrorClientTimeout => -1001,
143 AeronErrorType::ClientErrorConductorServiceTimeout => -1002,
144 AeronErrorType::ClientErrorBufferFull => -1003,
145 AeronErrorType::PublicationBackPressured => -2,
146 AeronErrorType::PublicationAdminAction => -3,
147 AeronErrorType::PublicationClosed => -4,
148 AeronErrorType::PublicationMaxPositionExceeded => -5,
149 AeronErrorType::PublicationError => -6,
150 AeronErrorType::TimedOut => -234324,
151 AeronErrorType::Unknown(code) => *code,
152 }
153 }
154
155 pub fn from_code(code: i32) -> Self {
156 match code {
157 -1 => AeronErrorType::NullOrNotConnected,
158 -1000 => AeronErrorType::ClientErrorDriverTimeout,
159 -1001 => AeronErrorType::ClientErrorClientTimeout,
160 -1002 => AeronErrorType::ClientErrorConductorServiceTimeout,
161 -1003 => AeronErrorType::ClientErrorBufferFull,
162 -2 => AeronErrorType::PublicationBackPressured,
163 -3 => AeronErrorType::PublicationAdminAction,
164 -4 => AeronErrorType::PublicationClosed,
165 -5 => AeronErrorType::PublicationMaxPositionExceeded,
166 -6 => AeronErrorType::PublicationError,
167 -234324 => AeronErrorType::TimedOut,
168 _ => Unknown(code),
169 }
170 }
171
172 pub fn to_string(&self) -> &'static str {
173 match self {
174 AeronErrorType::NullOrNotConnected => "Null Value or Not Connected",
175 AeronErrorType::ClientErrorDriverTimeout => "Client Error Driver Timeout",
176 AeronErrorType::ClientErrorClientTimeout => "Client Error Client Timeout",
177 AeronErrorType::ClientErrorConductorServiceTimeout => {
178 "Client Error Conductor Service Timeout"
179 }
180 AeronErrorType::ClientErrorBufferFull => "Client Error Buffer Full",
181 AeronErrorType::PublicationBackPressured => "Publication Back Pressured",
182 AeronErrorType::PublicationAdminAction => "Publication Admin Action",
183 AeronErrorType::PublicationClosed => "Publication Closed",
184 AeronErrorType::PublicationMaxPositionExceeded => "Publication Max Position Exceeded",
185 AeronErrorType::PublicationError => "Publication Error",
186 AeronErrorType::TimedOut => "Timed Out",
187 AeronErrorType::Unknown(_) => "Unknown Error",
188 }
189 }
190}
191
192#[derive(Eq, PartialEq)]
197pub struct AeronCError {
198 pub code: i32,
199}
200
201impl AeronCError {
202 pub fn from_code(code: i32) -> Self {
206 #[cfg(feature = "backtrace")]
207 {
208 if code < 0 {
209 let backtrace = Backtrace::capture();
210 let backtrace = format!("{:?}", backtrace);
211
212 let re =
213 regex::Regex::new(r#"fn: "([^"]+)", file: "([^"]+)", line: (\d+)"#).unwrap();
214 let mut lines = String::new();
215 re.captures_iter(&backtrace).for_each(|cap| {
216 let function = &cap[1];
217 let mut file = cap[2].to_string();
218 let line = &cap[3];
219 if file.starts_with("./") {
220 file = format!("{}/{}", env!("CARGO_MANIFEST_DIR"), &file[2..]);
221 } else if file.starts_with("/rustc/") {
222 file = file.split("/").last().unwrap().to_string();
223 }
224 lines.push_str(&format!(" {file}:{line} in {function}\n"));
226 });
227
228 log::error!(
229 "Aeron C error code: {}, kind: '{:?}'\n{}",
230 code,
231 AeronErrorType::from_code(code),
232 lines
233 );
234 }
235 }
236 AeronCError { code }
237 }
238
239 pub fn kind(&self) -> AeronErrorType {
240 AeronErrorType::from_code(self.code)
241 }
242}
243
244impl fmt::Display for AeronCError {
245 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246 write!(f, "Aeron error {}: {:?}", self.code, self.kind())
247 }
248}
249
250impl fmt::Debug for AeronCError {
251 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252 f.debug_struct("AeronCError")
253 .field("code", &self.code)
254 .field("kind", &self.kind())
255 .finish()
256 }
257}
258
259impl std::error::Error for AeronCError {}
260
261pub struct Handler<T> {
277 raw_ptr: *mut T,
278 should_drop: bool,
279}
280
281unsafe impl<T> Send for Handler<T> {}
282unsafe impl<T> Sync for Handler<T> {}
283
284pub struct Handlers;
286
287impl<T> Handler<T> {
288 pub fn leak(handler: T) -> Self {
289 let raw_ptr = Box::into_raw(Box::new(handler)) as *mut _;
290 Self {
291 raw_ptr,
292 should_drop: true,
293 }
294 }
295
296 pub fn is_none(&self) -> bool {
297 self.raw_ptr.is_null()
298 }
299
300 pub fn as_raw(&self) -> *mut std::os::raw::c_void {
301 self.raw_ptr as *mut std::os::raw::c_void
302 }
303
304 pub fn release(&mut self) {
305 if self.should_drop && !self.raw_ptr.is_null() {
306 unsafe {
307 #[cfg(feature = "extra-logging")]
308 log::debug!("dropping handler {:?}", self.raw_ptr);
309 let _ = Box::from_raw(self.raw_ptr as *mut Box<T>);
310 self.should_drop = false;
311 }
312 }
313 }
314}
315
316impl<T> Deref for Handler<T> {
317 type Target = T;
318
319 fn deref(&self) -> &Self::Target {
320 unsafe { &*self.raw_ptr as &T }
321 }
322}
323
324impl<T> DerefMut for Handler<T> {
325 fn deref_mut(&mut self) -> &mut Self::Target {
326 unsafe { &mut *self.raw_ptr as &mut T }
327 }
328}
329
330pub fn find_unused_udp_port(start_port: u16) -> Option<u16> {
331 let end_port = u16::MAX;
332
333 for port in start_port..=end_port {
334 if is_udp_port_available(port) {
335 return Some(port);
336 }
337 }
338
339 None
340}
341
342pub fn is_udp_port_available(port: u16) -> bool {
343 std::net::UdpSocket::bind(("127.0.0.1", port)).is_ok()
344}
345
346pub struct ChannelUri {}
348
349impl ChannelUri {
350 pub const AERON_SCHEME: &'static str = "aeron";
351 pub const SPY_QUALIFIER: &'static str = "aeron-spy";
352 pub const MAX_URI_LENGTH: usize = 4095;
353}
354
355pub const DRIVER_TIMEOUT_MS_DEFAULT: u64 = 10_000;
356pub const AERON_DIR_PROP_NAME: &str = "aeron.dir";
357pub const AERON_IPC_MEDIA: &str = "aeron:ipc";
358pub const AERON_UDP_MEDIA: &str = "aeron:udp";
359pub const SPY_PREFIX: &str = "aeron-spy:";
360pub const TAG_PREFIX: &str = "tag:";
361
362#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
364pub enum Media {
365 Ipc,
366 Udp,
367}
368
369impl Media {
370 pub fn as_str(&self) -> &'static str {
371 match self {
372 Media::Ipc => "ipc",
373 Media::Udp => "udp",
374 }
375 }
376}
377
378#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
380pub enum ControlMode {
381 Manual,
382 Dynamic,
383 Response,
385}
386
387impl ControlMode {
388 pub fn as_str(&self) -> &'static str {
389 match self {
390 ControlMode::Manual => "manual",
391 ControlMode::Dynamic => "dynamic",
392 ControlMode::Response => "response",
393 }
394 }
395}