veilid_tools/
tools.rs

1use super::*;
2
3use std::io;
4use std::path::Path;
5
6//////////////////////////////////////////////////////////////////////////////////////////////////////////////
7
8#[macro_export]
9macro_rules! assert_err {
10    ($ex:expr) => {
11        if let Ok(v) = $ex {
12            panic!("assertion failed, expected Err(..), got {:?}", v);
13        }
14    };
15}
16
17#[macro_export]
18macro_rules! io_error_other {
19    ($msg:expr) => {
20        io::Error::new(io::ErrorKind::Other, $msg.to_string())
21    };
22}
23
24pub fn to_io_error_other<E: std::error::Error + Send + Sync + 'static>(x: E) -> io::Error {
25    io::Error::new(io::ErrorKind::Other, x)
26}
27
28#[macro_export]
29macro_rules! bail_io_error_other {
30    ($msg:expr) => {
31        return io::Result::Err(io::Error::new(io::ErrorKind::Other, $msg.to_string()))
32    };
33}
34
35cfg_if::cfg_if! {
36    if #[cfg(feature="rt-tokio")] {
37        #[macro_export]
38        macro_rules! asyncmutex_try_lock {
39            ($x:expr) => {
40                $x.try_lock().ok()
41            };
42        }
43
44        #[macro_export]
45        macro_rules! asyncmutex_lock_arc {
46            ($x:expr) => {
47                $x.clone().lock_owned().await
48            };
49        }
50
51        #[macro_export]
52        macro_rules! asyncmutex_try_lock_arc {
53            ($x:expr) => {
54                $x.clone().try_lock_owned().ok()
55            };
56        }
57
58        // #[macro_export]
59        // macro_rules! asyncrwlock_try_read {
60        //     ($x:expr) => {
61        //         $x.try_read().ok()
62        //     };
63        // }
64
65        // #[macro_export]
66        // macro_rules! asyncrwlock_try_write {
67        //     ($x:expr) => {
68        //         $x.try_write().ok()
69        //     };
70        // }
71    } else {
72        #[macro_export]
73        macro_rules! asyncmutex_try_lock {
74            ($x:expr) => {
75                $x.try_lock()
76            };
77        }
78        #[macro_export]
79        macro_rules! asyncmutex_lock_arc {
80            ($x:expr) => {
81                $x.lock_arc().await
82            };
83        }
84        #[macro_export]
85        macro_rules! asyncmutex_try_lock_arc {
86            ($x:expr) => {
87                $x.try_lock_arc()
88            };
89        }
90
91    }
92}
93
94#[macro_export]
95macro_rules! asyncrwlock_try_read {
96    ($x:expr) => {
97        $x.try_read()
98    };
99}
100#[macro_export]
101macro_rules! asyncrwlock_try_write {
102    ($x:expr) => {
103        $x.try_write()
104    };
105}
106
107#[macro_export]
108macro_rules! asyncrwlock_try_read_arc {
109    ($x:expr) => {
110        $x.try_read_arc()
111    };
112}
113#[macro_export]
114macro_rules! asyncrwlock_try_write_arc {
115    ($x:expr) => {
116        $x.try_write_arc()
117    };
118}
119
120//////////////////////////////////////////////////////////////////////////////////////////////////////////////
121
122pub fn system_boxed<'a, Out>(
123    future: impl Future<Output = Out> + Send + 'a,
124) -> SendPinBoxFutureLifetime<'a, Out> {
125    Box::pin(future)
126}
127
128//////////////////////////////////////////////////////////////////////////////////////////////////////////////
129
130cfg_if! {
131    if #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] {
132        pub fn get_concurrency() -> u32 {
133            std::thread::available_parallelism()
134                .map(|x| x.get())
135                .unwrap_or_else(|e| {
136                    warn!("unable to get concurrency defaulting to single core: {}", e);
137                    1
138                }) as u32
139        }
140    }
141}
142
143//////////////////////////////////////////////////////////////////////////////////////////////////////////////
144
145pub fn split_port(name: &str) -> Result<(String, Option<u16>), String> {
146    if let Some(split) = name.rfind(':') {
147        let hoststr = &name[0..split];
148        let portstr = &name[split + 1..];
149        let port: u16 = portstr
150            .parse::<u16>()
151            .map_err(|e| format!("invalid port: {}", e))?;
152
153        Ok((hoststr.to_string(), Some(port)))
154    } else {
155        Ok((name.to_string(), None))
156    }
157}
158
159pub fn prepend_slash(s: String) -> String {
160    if s.starts_with('/') {
161        return s;
162    }
163    let mut out = "/".to_owned();
164    out.push_str(s.as_str());
165    out
166}
167
168pub fn timestamp_to_secs(ts: u64) -> f64 {
169    ts as f64 / 1000000.0f64
170}
171
172pub fn secs_to_timestamp(secs: f64) -> u64 {
173    (secs * 1000000.0f64) as u64
174}
175
176pub fn ms_to_us(ms: u32) -> u64 {
177    (ms as u64) * 1000u64
178}
179
180pub fn us_to_ms(us: u64) -> Result<u32, String> {
181    u32::try_from(us / 1000u64).map_err(|e| format!("could not convert microseconds: {}", e))
182}
183
184// Calculate retry attempt with logarhythmic falloff
185pub fn retry_falloff_log(
186    last_us: u64,
187    cur_us: u64,
188    interval_start_us: u64,
189    interval_max_us: u64,
190    interval_multiplier_us: f64,
191) -> bool {
192    //
193    if cur_us < interval_start_us {
194        // Don't require a retry within the first 'interval_start_us' microseconds of the reliable time period
195        false
196    } else if cur_us >= last_us + interval_max_us {
197        // Retry at least every 'interval_max_us' microseconds
198        true
199    } else {
200        // Exponential falloff between 'interval_start_us' and 'interval_max_us' microseconds
201        last_us <= secs_to_timestamp(timestamp_to_secs(cur_us) / interval_multiplier_us)
202    }
203}
204
205pub fn try_at_most_n_things<T, I, C, R>(max: usize, things: I, closure: C) -> Option<R>
206where
207    I: IntoIterator<Item = T>,
208    C: Fn(T) -> Option<R>,
209{
210    let mut fails = 0usize;
211    for thing in things.into_iter() {
212        if let Some(r) = closure(thing) {
213            return Some(r);
214        }
215        fails += 1;
216        if fails >= max {
217            break;
218        }
219    }
220    None
221}
222
223pub async fn async_try_at_most_n_things<T, I, C, R, F>(
224    max: usize,
225    things: I,
226    closure: C,
227) -> Option<R>
228where
229    I: IntoIterator<Item = T>,
230    C: Fn(T) -> F,
231    F: Future<Output = Option<R>>,
232{
233    let mut fails = 0usize;
234    for thing in things.into_iter() {
235        if let Some(r) = closure(thing).await {
236            return Some(r);
237        }
238        fails += 1;
239        if fails >= max {
240            break;
241        }
242    }
243    None
244}
245
246pub trait CmpAssign {
247    fn min_assign(&mut self, other: Self);
248    fn max_assign(&mut self, other: Self);
249}
250
251impl<T> CmpAssign for T
252where
253    T: core::cmp::Ord,
254{
255    fn min_assign(&mut self, other: Self) {
256        if &other < self {
257            *self = other;
258        }
259    }
260    fn max_assign(&mut self, other: Self) {
261        if &other > self {
262            *self = other;
263        }
264    }
265}
266
267pub fn compatible_unspecified_socket_addr(socket_addr: &SocketAddr) -> SocketAddr {
268    match socket_addr {
269        SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
270        SocketAddr::V6(_) => SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), 0),
271    }
272}
273
274cfg_if! {
275    if #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] {
276        use std::net::UdpSocket;
277
278        static IPV6_IS_SUPPORTED: Mutex<Option<bool>> = Mutex::new(None);
279
280        pub fn is_ipv6_supported() -> bool {
281            let mut opt_supp = IPV6_IS_SUPPORTED.lock();
282            if let Some(supp) = *opt_supp {
283                return supp;
284            }
285            // Not exhaustive but for our use case it should be sufficient. If no local ports are available for binding, Veilid isn't going to work anyway :P
286            let supp = UdpSocket::bind(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 0, 0, 0)).is_ok();
287            *opt_supp = Some(supp);
288            supp
289        }
290    }
291}
292
293pub fn available_unspecified_addresses() -> Vec<IpAddr> {
294    if is_ipv6_supported() {
295        vec![
296            IpAddr::V4(Ipv4Addr::UNSPECIFIED),
297            IpAddr::V6(Ipv6Addr::UNSPECIFIED),
298        ]
299    } else {
300        vec![IpAddr::V4(Ipv4Addr::UNSPECIFIED)]
301    }
302}
303
304pub fn listen_address_to_socket_addrs(listen_address: &str) -> Result<Vec<SocketAddr>, String> {
305    // If no address is specified, but the port is, use ipv4 and ipv6 unspecified
306    // If the address is specified, only use the specified port and fail otherwise
307
308    let ip_addrs = available_unspecified_addresses();
309
310    Ok(if let Some(portstr) = listen_address.strip_prefix(':') {
311        let port = portstr
312            .parse::<u16>()
313            .map_err(|e| format!("Invalid port format in udp listen address: {}", e))?;
314        ip_addrs.iter().map(|a| SocketAddr::new(*a, port)).collect()
315    } else if let Ok(port) = listen_address.parse::<u16>() {
316        ip_addrs.iter().map(|a| SocketAddr::new(*a, port)).collect()
317    } else {
318        let listen_address_with_port = if listen_address.contains(':') {
319            listen_address.to_string()
320        } else {
321            format!("{}:0", listen_address)
322        };
323        cfg_if! {
324            if #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] {
325                use core::str::FromStr;
326                vec![SocketAddr::from_str(&listen_address_with_port).map_err(|e| format!("Unable to parse address: {}",e))?]
327            } else {
328                listen_address_with_port
329                    .to_socket_addrs()
330                    .map_err(|e| format!("Unable to resolve address: {}", e))?
331                    .collect()
332            }
333        }
334    })
335}
336
337/// Dedup, but doesn't require a sorted vec, and keeps the element order
338pub trait RemoveDuplicates<T: PartialEq + Clone> {
339    fn remove_duplicates(&mut self);
340}
341
342impl<T: PartialEq + Clone> RemoveDuplicates<T> for Vec<T> {
343    fn remove_duplicates(&mut self) {
344        let mut already_seen = Vec::new();
345        self.retain(|item| match already_seen.contains(item) {
346            true => false,
347            _ => {
348                already_seen.push(item.clone());
349                true
350            }
351        })
352    }
353}
354
355cfg_if::cfg_if! {
356    if #[cfg(unix)] {
357        use std::os::unix::fs::MetadataExt;
358        use std::os::unix::prelude::PermissionsExt;
359        use nix::unistd::{Uid, Gid};
360
361        pub fn ensure_file_private_owner<P:AsRef<Path>>(path: P) -> Result<(), String>
362        {
363            let path = path.as_ref();
364            if !path.is_file() {
365                return Ok(());
366            }
367
368            let uid = Uid::effective();
369            let gid = Gid::effective();
370            let meta = std::fs::metadata(path).map_err(|e| format!("unable to get metadata for path: {}", e))?;
371
372            if meta.mode() != 0o600 {
373                std::fs::set_permissions(path,std::fs::Permissions::from_mode(0o600)).map_err(|e| format!("unable to set correct permissions on path: {}", e))?;
374            }
375            if meta.uid() != uid.as_raw() || meta.gid() != gid.as_raw() {
376                return Err("path has incorrect owner/group".to_owned());
377            }
378            Ok(())
379        }
380
381        pub fn ensure_directory_private_owner<P:AsRef<Path>>(path: P, group_read: bool) -> Result<(), String>
382        {
383            let path = path.as_ref();
384            if !path.is_dir() {
385                return Ok(());
386            }
387
388            let uid = Uid::effective();
389            let gid = Gid::effective();
390            let meta = std::fs::metadata(path).map_err(|e| format!("unable to get metadata for path: {}", e))?;
391
392            let perm = if group_read {
393                0o750
394            } else {
395                0o700
396            };
397
398            if meta.mode() != perm {
399                std::fs::set_permissions(path,std::fs::Permissions::from_mode(perm)).map_err(|e| format!("unable to set correct permissions on path: {}", e))?;
400            }
401            if meta.uid() != uid.as_raw() || meta.gid() != gid.as_raw() {
402                return Err("path has incorrect owner/group".to_owned());
403            }
404            Ok(())
405        }
406    } else if #[cfg(windows)] {
407        //use std::os::windows::fs::MetadataExt;
408        //use windows_permissions::*;
409
410        pub fn ensure_file_private_owner<P:AsRef<Path>>(path: P) -> Result<(), String>
411        {
412            let path = path.as_ref();
413            if !path.is_file() {
414                return Ok(());
415            }
416
417            Ok(())
418        }
419
420        pub fn ensure_directory_private_owner<P:AsRef<Path>>(path: P, _group_read: bool) -> Result<(), String>
421        {
422            let path = path.as_ref();
423            if !path.is_dir() {
424                return Ok(());
425            }
426
427            Ok(())
428        }
429
430    } else {
431        pub fn ensure_file_private_owner<P:AsRef<Path>>(path: P) -> Result<(), String>
432        {
433            let path = path.as_ref();
434            if !path.is_file() {
435                return Ok(());
436            }
437
438            Ok(())
439        }
440
441        pub fn ensure_directory_private_owner<P:AsRef<Path>>(path: P, _group_read: bool) -> Result<(), String>
442        {
443            let path = path.as_ref();
444            if !path.is_dir() {
445                return Ok(());
446            }
447
448            Ok(())
449        }
450    }
451}
452
453#[repr(C, align(8))]
454struct AlignToEight([u8; 8]);
455
456/// # Safety
457/// Ensure you immediately initialize this vector as it could contain sensitive data
458pub unsafe fn aligned_8_u8_vec_uninit(n_bytes: usize) -> Vec<u8> {
459    let n_units = (n_bytes + mem::size_of::<AlignToEight>() - 1) / mem::size_of::<AlignToEight>();
460    let mut aligned: Vec<AlignToEight> = Vec::with_capacity(n_units);
461    let ptr = aligned.as_mut_ptr();
462    let cap_units = aligned.capacity();
463    mem::forget(aligned);
464
465    Vec::from_raw_parts(
466        ptr as *mut u8,
467        n_bytes,
468        cap_units * mem::size_of::<AlignToEight>(),
469    )
470}
471
472/// # Safety
473/// Ensure you immediately initialize this vector as it could contain sensitive data
474pub unsafe fn unaligned_u8_vec_uninit(n_bytes: usize) -> Vec<u8> {
475    let mut unaligned: Vec<u8> = Vec::with_capacity(n_bytes);
476    let ptr = unaligned.as_mut_ptr();
477    mem::forget(unaligned);
478
479    Vec::from_raw_parts(ptr, n_bytes, n_bytes)
480}
481
482pub fn debug_backtrace() -> String {
483    let bt = backtrace::Backtrace::new();
484    format!("{:?}", bt)
485}
486
487pub fn debug_print_backtrace() {
488    if is_debug_backtrace_enabled() {
489        debug!("{}", debug_backtrace());
490    }
491}
492
493pub fn is_debug_backtrace_enabled() -> bool {
494    cfg_if! {
495        if #[cfg(debug_assertions)] {
496            cfg_if! {
497                if #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] {
498                    let rbenv = get_wasm_global_string_value("RUST_BACKTRACE").unwrap_or_default();
499                }
500                else
501                {
502                    let rbenv = std::env::var("RUST_BACKTRACE").unwrap_or_default();
503                }
504            }
505            rbenv == "1" || rbenv == "full"
506        } else {
507            false
508        }
509    }
510}
511
512#[track_caller]
513pub fn debug_duration<R, F: Future<Output = R>, T: FnOnce() -> F>(f: T) -> impl Future<Output = R> {
514    let location = std::panic::Location::caller();
515    async move {
516        let t1 = get_timestamp();
517        let out = f().await;
518        let t2 = get_timestamp();
519        debug!("duration@{}: {}", location, display_duration(t2 - t1));
520        out
521    }
522}
523
524pub fn type_name_of_val<T: ?Sized>(_val: &T) -> &'static str {
525    std::any::type_name::<T>()
526}
527
528pub fn map_to_string<X: ToString>(arg: X) -> String {
529    arg.to_string()
530}
531
532//////////////////////////////////////////////////////////////////////////////////////////////////////////////
533
534pub struct DebugGuard {
535    name: &'static str,
536    counter: &'static AtomicUsize,
537}
538
539impl DebugGuard {
540    pub fn new(name: &'static str, counter: &'static AtomicUsize) -> Self {
541        let c = counter.fetch_add(1, Ordering::SeqCst);
542        eprintln!("{} entered: {}", name, c + 1);
543        Self { name, counter }
544    }
545}
546
547impl Drop for DebugGuard {
548    fn drop(&mut self) {
549        let c = self.counter.fetch_sub(1, Ordering::SeqCst);
550        eprintln!("{} exited: {}", self.name, c - 1);
551    }
552}