veilid_tools/
tools.rs

1use super::*;
2
3use std::io;
4use std::path::Path;
5
6//////////////////////////////////////////////////////////////////////////////////////////////////////////////
7
8#[macro_export]
9macro_rules! assert_err {
10    ($ex:expr) => {
11        if let Ok(v) = $ex {
12            panic!("assertion failed, expected Err(..), got {:?}", v);
13        }
14    };
15}
16
17#[macro_export]
18macro_rules! io_error_other {
19    ($msg:expr) => {
20        io::Error::new(io::ErrorKind::Other, $msg.to_string())
21    };
22}
23
24pub fn to_io_error_other<E: std::error::Error + Send + Sync + 'static>(x: E) -> io::Error {
25    io::Error::other(x)
26}
27
28#[macro_export]
29macro_rules! bail_io_error_other {
30    ($msg:expr) => {
31        return io::Result::Err(io::Error::new(io::ErrorKind::Other, $msg.to_string()))
32    };
33}
34
35cfg_if::cfg_if! {
36    if #[cfg(feature="rt-tokio")] {
37        #[macro_export]
38        macro_rules! asyncmutex_try_lock {
39            ($x:expr) => {
40                $x.try_lock().ok()
41            };
42        }
43
44        #[macro_export]
45        macro_rules! asyncmutex_lock_arc {
46            ($x:expr) => {
47                $x.clone().lock_owned().await
48            };
49        }
50
51        #[macro_export]
52        macro_rules! asyncmutex_try_lock_arc {
53            ($x:expr) => {
54                $x.clone().try_lock_owned().ok()
55            };
56        }
57
58        // #[macro_export]
59        // macro_rules! asyncrwlock_try_read {
60        //     ($x:expr) => {
61        //         $x.try_read().ok()
62        //     };
63        // }
64
65        // #[macro_export]
66        // macro_rules! asyncrwlock_try_write {
67        //     ($x:expr) => {
68        //         $x.try_write().ok()
69        //     };
70        // }
71    } else {
72        #[macro_export]
73        macro_rules! asyncmutex_try_lock {
74            ($x:expr) => {
75                $x.try_lock()
76            };
77        }
78        #[macro_export]
79        macro_rules! asyncmutex_lock_arc {
80            ($x:expr) => {
81                $x.lock_arc().await
82            };
83        }
84        #[macro_export]
85        macro_rules! asyncmutex_try_lock_arc {
86            ($x:expr) => {
87                $x.try_lock_arc()
88            };
89        }
90
91    }
92}
93
94#[macro_export]
95macro_rules! asyncrwlock_try_read {
96    ($x:expr) => {
97        $x.try_read()
98    };
99}
100#[macro_export]
101macro_rules! asyncrwlock_try_write {
102    ($x:expr) => {
103        $x.try_write()
104    };
105}
106
107#[macro_export]
108macro_rules! asyncrwlock_try_read_arc {
109    ($x:expr) => {
110        $x.try_read_arc()
111    };
112}
113#[macro_export]
114macro_rules! asyncrwlock_try_write_arc {
115    ($x:expr) => {
116        $x.try_write_arc()
117    };
118}
119
120//////////////////////////////////////////////////////////////////////////////////////////////////////////////
121
122cfg_if! {
123    if #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] {
124        #[must_use]
125        pub fn get_concurrency() -> u32 {
126            std::thread::available_parallelism()
127                .map(|x| x.get())
128                .unwrap_or_else(|e| {
129                    warn!("unable to get concurrency defaulting to single core: {}", e);
130                    1
131                }) as u32
132        }
133    }
134}
135
136//////////////////////////////////////////////////////////////////////////////////////////////////////////////
137
138pub fn split_port(name: &str) -> Result<(String, Option<u16>), String> {
139    if let Some(split) = name.rfind(':') {
140        let hoststr = &name[0..split];
141        let portstr = &name[split + 1..];
142        let port: u16 = portstr
143            .parse::<u16>()
144            .map_err(|e| format!("invalid port: {}", e))?;
145
146        Ok((hoststr.to_string(), Some(port)))
147    } else {
148        Ok((name.to_string(), None))
149    }
150}
151
152#[must_use]
153pub fn prepend_slash(s: String) -> String {
154    if s.starts_with('/') {
155        return s;
156    }
157    let mut out = "/".to_owned();
158    out.push_str(s.as_str());
159    out
160}
161
162#[must_use]
163pub fn timestamp_to_secs(ts: u64) -> f64 {
164    ts as f64 / 1000000.0f64
165}
166
167#[must_use]
168pub fn secs_to_timestamp(secs: f64) -> u64 {
169    (secs * 1000000.0f64) as u64
170}
171
172#[must_use]
173pub fn ms_to_us(ms: u32) -> u64 {
174    (ms as u64) * 1000u64
175}
176
177pub fn us_to_ms(us: u64) -> Result<u32, String> {
178    u32::try_from(us / 1000u64).map_err(|e| format!("could not convert microseconds: {}", e))
179}
180
181// Calculate retry attempt with logarhythmic falloff
182#[must_use]
183pub fn retry_falloff_log(
184    last_us: u64,
185    cur_us: u64,
186    interval_start_us: u64,
187    interval_max_us: u64,
188    interval_multiplier_us: f64,
189) -> bool {
190    //
191    if cur_us < interval_start_us {
192        // Don't require a retry within the first 'interval_start_us' microseconds of the reliable time period
193        false
194    } else if cur_us >= last_us + interval_max_us {
195        // Retry at least every 'interval_max_us' microseconds
196        true
197    } else {
198        // Exponential falloff between 'interval_start_us' and 'interval_max_us' microseconds
199        last_us <= secs_to_timestamp(timestamp_to_secs(cur_us) / interval_multiplier_us)
200    }
201}
202
203pub fn try_at_most_n_things<T, I, C, R>(max: usize, things: I, closure: C) -> Option<R>
204where
205    I: IntoIterator<Item = T>,
206    C: Fn(T) -> Option<R>,
207{
208    let mut fails = 0usize;
209    for thing in things.into_iter() {
210        if let Some(r) = closure(thing) {
211            return Some(r);
212        }
213        fails += 1;
214        if fails >= max {
215            break;
216        }
217    }
218    None
219}
220
221pub async fn async_try_at_most_n_things<T, I, C, R, F>(
222    max: usize,
223    things: I,
224    closure: C,
225) -> Option<R>
226where
227    I: IntoIterator<Item = T>,
228    C: Fn(T) -> F,
229    F: Future<Output = Option<R>>,
230{
231    let mut fails = 0usize;
232    for thing in things.into_iter() {
233        if let Some(r) = closure(thing).await {
234            return Some(r);
235        }
236        fails += 1;
237        if fails >= max {
238            break;
239        }
240    }
241    None
242}
243
244pub trait CmpAssign {
245    fn min_assign(&mut self, other: Self);
246    fn max_assign(&mut self, other: Self);
247}
248
249impl<T> CmpAssign for T
250where
251    T: core::cmp::Ord,
252{
253    fn min_assign(&mut self, other: Self) {
254        if &other < self {
255            *self = other;
256        }
257    }
258    fn max_assign(&mut self, other: Self) {
259        if &other > self {
260            *self = other;
261        }
262    }
263}
264
265#[must_use]
266pub fn compatible_unspecified_socket_addr(socket_addr: &SocketAddr) -> SocketAddr {
267    match socket_addr {
268        SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
269        SocketAddr::V6(_) => SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), 0),
270    }
271}
272
273cfg_if! {
274    if #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] {
275        use std::net::UdpSocket;
276
277        static IPV6_IS_SUPPORTED: Mutex<Option<bool>> = Mutex::new(None);
278
279        pub fn is_ipv6_supported() -> bool {
280            let mut opt_supp = IPV6_IS_SUPPORTED.lock();
281            if let Some(supp) = *opt_supp {
282                return supp;
283            }
284            // Not exhaustive but for our use case it should be sufficient. If no local ports are available for binding, Veilid isn't going to work anyway :P
285            let supp = UdpSocket::bind(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 0, 0, 0)).is_ok();
286            *opt_supp = Some(supp);
287            supp
288        }
289    }
290}
291
292#[must_use]
293pub fn available_unspecified_addresses() -> Vec<IpAddr> {
294    if is_ipv6_supported() {
295        vec![
296            IpAddr::V4(Ipv4Addr::UNSPECIFIED),
297            IpAddr::V6(Ipv6Addr::UNSPECIFIED),
298        ]
299    } else {
300        vec![IpAddr::V4(Ipv4Addr::UNSPECIFIED)]
301    }
302}
303
304pub fn listen_address_to_socket_addrs(listen_address: &str) -> Result<Vec<SocketAddr>, String> {
305    // If no address is specified, but the port is, use ipv4 and ipv6 unspecified
306    // If the address is specified, only use the specified port and fail otherwise
307
308    let ip_addrs = available_unspecified_addresses();
309
310    Ok(if let Some(portstr) = listen_address.strip_prefix(':') {
311        let port = portstr
312            .parse::<u16>()
313            .map_err(|e| format!("Invalid port format in udp listen address: {}", e))?;
314        ip_addrs.iter().map(|a| SocketAddr::new(*a, port)).collect()
315    } else if let Ok(port) = listen_address.parse::<u16>() {
316        ip_addrs.iter().map(|a| SocketAddr::new(*a, port)).collect()
317    } else {
318        let listen_address_with_port = if listen_address.contains(':') {
319            listen_address.to_string()
320        } else {
321            format!("{}:0", listen_address)
322        };
323        cfg_if! {
324            if #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] {
325                use core::str::FromStr;
326                vec![SocketAddr::from_str(&listen_address_with_port).map_err(|e| format!("Unable to parse address: {}",e))?]
327            } else {
328                listen_address_with_port
329                    .to_socket_addrs()
330                    .map_err(|e| format!("Unable to resolve address: {}", e))?
331                    .collect()
332            }
333        }
334    })
335}
336
337/// Dedup, but doesn't require a sorted vec, and keeps the element order
338pub trait RemoveDuplicates<T: PartialEq + Clone> {
339    fn remove_duplicates(&mut self);
340}
341
342impl<T: PartialEq + Clone> RemoveDuplicates<T> for Vec<T> {
343    fn remove_duplicates(&mut self) {
344        let mut already_seen = Vec::new();
345        self.retain(|item| match already_seen.contains(item) {
346            true => false,
347            _ => {
348                already_seen.push(item.clone());
349                true
350            }
351        })
352    }
353}
354
355cfg_if::cfg_if! {
356    if #[cfg(unix)] {
357        use std::os::unix::fs::MetadataExt;
358        use std::os::unix::prelude::PermissionsExt;
359        use nix::unistd::{Uid, Gid};
360
361        pub fn ensure_file_private_owner<P:AsRef<Path>>(path: P) -> Result<(), String>
362        {
363            let path = path.as_ref();
364            if !path.is_file() {
365                return Ok(());
366            }
367
368            let uid = Uid::effective();
369            let gid = Gid::effective();
370            let meta = std::fs::metadata(path).map_err(|e| format!("unable to get metadata for path: {}", e))?;
371
372            if meta.mode() != 0o600 {
373                std::fs::set_permissions(path,std::fs::Permissions::from_mode(0o600)).map_err(|e| format!("unable to set correct permissions on path: {}", e))?;
374            }
375            if meta.uid() != uid.as_raw() || meta.gid() != gid.as_raw() {
376                return Err("path has incorrect owner/group".to_owned());
377            }
378            Ok(())
379        }
380
381        pub fn ensure_directory_private_owner<P:AsRef<Path>>(path: P, group_read: bool) -> Result<(), String>
382        {
383            let path = path.as_ref();
384            if !path.is_dir() {
385                return Ok(());
386            }
387
388            let uid = Uid::effective();
389            let gid = Gid::effective();
390            let meta = std::fs::metadata(path).map_err(|e| format!("unable to get metadata for path: {}", e))?;
391
392            let perm = if group_read {
393                0o750
394            } else {
395                0o700
396            };
397
398            if meta.mode() != perm {
399                std::fs::set_permissions(path,std::fs::Permissions::from_mode(perm)).map_err(|e| format!("unable to set correct permissions on path: {}", e))?;
400            }
401            if meta.uid() != uid.as_raw() || meta.gid() != gid.as_raw() {
402                return Err("path has incorrect owner/group".to_owned());
403            }
404            Ok(())
405        }
406    } else if #[cfg(windows)] {
407        //use std::os::windows::fs::MetadataExt;
408        //use windows_permissions::*;
409
410        pub fn ensure_file_private_owner<P:AsRef<Path>>(path: P) -> Result<(), String>
411        {
412            let path = path.as_ref();
413            if !path.is_file() {
414                return Ok(());
415            }
416
417            Ok(())
418        }
419
420        pub fn ensure_directory_private_owner<P:AsRef<Path>>(path: P, _group_read: bool) -> Result<(), String>
421        {
422            let path = path.as_ref();
423            if !path.is_dir() {
424                return Ok(());
425            }
426
427            Ok(())
428        }
429
430    } else {
431        pub fn ensure_file_private_owner<P:AsRef<Path>>(path: P) -> Result<(), String>
432        {
433            let path = path.as_ref();
434            if !path.is_file() {
435                return Ok(());
436            }
437
438            Ok(())
439        }
440
441        pub fn ensure_directory_private_owner<P:AsRef<Path>>(path: P, _group_read: bool) -> Result<(), String>
442        {
443            let path = path.as_ref();
444            if !path.is_dir() {
445                return Ok(());
446            }
447
448            Ok(())
449        }
450    }
451}
452
453#[repr(C, align(8))]
454struct AlignToEight([u8; 8]);
455
456/// # Safety
457/// Ensure you immediately initialize this vector as it could contain sensitive data
458#[must_use]
459pub unsafe fn aligned_8_u8_vec_uninit(n_bytes: usize) -> Vec<u8> {
460    let n_units = n_bytes.div_ceil(mem::size_of::<AlignToEight>());
461    let mut aligned: Vec<AlignToEight> = Vec::with_capacity(n_units);
462    let ptr = aligned.as_mut_ptr();
463    let cap_units = aligned.capacity();
464    mem::forget(aligned);
465
466    Vec::from_raw_parts(
467        ptr as *mut u8,
468        n_bytes,
469        cap_units * mem::size_of::<AlignToEight>(),
470    )
471}
472
473/// # Safety
474/// Ensure you immediately initialize this vector as it could contain sensitive data
475#[must_use]
476pub unsafe fn unaligned_u8_vec_uninit(n_bytes: usize) -> Vec<u8> {
477    let mut unaligned: Vec<u8> = Vec::with_capacity(n_bytes);
478    let ptr = unaligned.as_mut_ptr();
479    mem::forget(unaligned);
480
481    Vec::from_raw_parts(ptr, n_bytes, n_bytes)
482}
483
484#[must_use]
485pub fn debug_backtrace() -> String {
486    let bt = backtrace::Backtrace::new();
487    format!("{:?}", bt)
488}
489
490pub fn debug_print_backtrace() {
491    if is_debug_backtrace_enabled() {
492        debug!("{}", debug_backtrace());
493    }
494}
495
496#[must_use]
497pub fn is_debug_backtrace_enabled() -> bool {
498    cfg_if! {
499        if #[cfg(debug_assertions)] {
500            cfg_if! {
501                if #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] {
502                    let rbenv = get_wasm_global_string_value("RUST_BACKTRACE").unwrap_or_default();
503                }
504                else
505                {
506                    let rbenv = std::env::var("RUST_BACKTRACE").unwrap_or_default();
507                }
508            }
509            rbenv == "1" || rbenv == "full"
510        } else {
511            false
512        }
513    }
514}
515
516#[track_caller]
517pub fn debug_duration<R, F: Future<Output = R>, T: FnOnce() -> F>(
518    f: T,
519    opt_timeout_us: Option<u64>,
520) -> impl Future<Output = R> {
521    let location = core::panic::Location::caller();
522    async move {
523        let t1 = get_timestamp();
524        let out = f().await;
525        let t2 = get_timestamp();
526        let duration_us = t2 - t1;
527        if let Some(timeout_us) = opt_timeout_us {
528            if duration_us > timeout_us {
529                #[cfg(not(feature = "debug-duration-timeout"))]
530                debug!(
531                    "Excessive duration: {}\n{:?}",
532                    display_duration(duration_us),
533                    backtrace::Backtrace::new()
534                );
535                #[cfg(feature = "debug-duration-timeout")]
536                panic!(format!(
537                    "Duration panic timeout exceeded: {}",
538                    display_duration(duration_us)
539                ));
540            }
541        } else {
542            debug!("Duration: {} = {}", location, display_duration(duration_us),);
543        }
544        out
545    }
546}
547
548pub fn type_name_of_val<T: ?Sized>(_val: &T) -> &'static str {
549    std::any::type_name::<T>()
550}
551
552pub fn map_to_string<X: ToString>(arg: X) -> String {
553    arg.to_string()
554}
555
556//////////////////////////////////////////////////////////////////////////////////////////////////////////////
557
558pub struct DebugGuard {
559    name: &'static str,
560    counter: &'static AtomicUsize,
561}
562
563impl DebugGuard {
564    pub fn new(name: &'static str, counter: &'static AtomicUsize) -> Self {
565        let c = counter.fetch_add(1, Ordering::SeqCst);
566        eprintln!("{} entered: {}", name, c + 1);
567        Self { name, counter }
568    }
569}
570
571impl Drop for DebugGuard {
572    fn drop(&mut self) {
573        let c = self.counter.fetch_sub(1, Ordering::SeqCst);
574        eprintln!("{} exited: {}", self.name, c - 1);
575    }
576}