1use super::*;
2
3use std::io;
4use std::path::Path;
5
6#[macro_export]
9macro_rules! assert_err {
10 ($ex:expr) => {
11 if let Ok(v) = $ex {
12 panic!("assertion failed, expected Err(..), got {:?}", v);
13 }
14 };
15}
16
17#[macro_export]
18macro_rules! io_error_other {
19 ($msg:expr) => {
20 io::Error::new(io::ErrorKind::Other, $msg.to_string())
21 };
22}
23
24pub fn to_io_error_other<E: std::error::Error + Send + Sync + 'static>(x: E) -> io::Error {
25 io::Error::other(x)
26}
27
28#[macro_export]
29macro_rules! bail_io_error_other {
30 ($msg:expr) => {
31 return io::Result::Err(io::Error::new(io::ErrorKind::Other, $msg.to_string()))
32 };
33}
34
35cfg_if::cfg_if! {
36 if #[cfg(feature="rt-tokio")] {
37 #[macro_export]
38 macro_rules! asyncmutex_try_lock {
39 ($x:expr) => {
40 $x.try_lock().ok()
41 };
42 }
43
44 #[macro_export]
45 macro_rules! asyncmutex_lock_arc {
46 ($x:expr) => {
47 $x.clone().lock_owned().await
48 };
49 }
50
51 #[macro_export]
52 macro_rules! asyncmutex_try_lock_arc {
53 ($x:expr) => {
54 $x.clone().try_lock_owned().ok()
55 };
56 }
57
58 } else {
72 #[macro_export]
73 macro_rules! asyncmutex_try_lock {
74 ($x:expr) => {
75 $x.try_lock()
76 };
77 }
78 #[macro_export]
79 macro_rules! asyncmutex_lock_arc {
80 ($x:expr) => {
81 $x.lock_arc().await
82 };
83 }
84 #[macro_export]
85 macro_rules! asyncmutex_try_lock_arc {
86 ($x:expr) => {
87 $x.try_lock_arc()
88 };
89 }
90
91 }
92}
93
94#[macro_export]
95macro_rules! asyncrwlock_try_read {
96 ($x:expr) => {
97 $x.try_read()
98 };
99}
100#[macro_export]
101macro_rules! asyncrwlock_try_write {
102 ($x:expr) => {
103 $x.try_write()
104 };
105}
106
107#[macro_export]
108macro_rules! asyncrwlock_try_read_arc {
109 ($x:expr) => {
110 $x.try_read_arc()
111 };
112}
113#[macro_export]
114macro_rules! asyncrwlock_try_write_arc {
115 ($x:expr) => {
116 $x.try_write_arc()
117 };
118}
119
120cfg_if! {
123 if #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] {
124 #[must_use]
125 pub fn get_concurrency() -> u32 {
126 std::thread::available_parallelism()
127 .map(|x| x.get())
128 .unwrap_or_else(|e| {
129 warn!("unable to get concurrency defaulting to single core: {}", e);
130 1
131 }) as u32
132 }
133 }
134}
135
136pub fn split_port(name: &str) -> Result<(String, Option<u16>), String> {
139 if let Some(split) = name.rfind(':') {
140 let hoststr = &name[0..split];
141 let portstr = &name[split + 1..];
142 let port: u16 = portstr
143 .parse::<u16>()
144 .map_err(|e| format!("invalid port: {}", e))?;
145
146 Ok((hoststr.to_string(), Some(port)))
147 } else {
148 Ok((name.to_string(), None))
149 }
150}
151
152#[must_use]
153pub fn prepend_slash(s: String) -> String {
154 if s.starts_with('/') {
155 return s;
156 }
157 let mut out = "/".to_owned();
158 out.push_str(s.as_str());
159 out
160}
161
162#[must_use]
163pub fn timestamp_to_secs(ts: u64) -> f64 {
164 ts as f64 / 1000000.0f64
165}
166
167#[must_use]
168pub fn secs_to_timestamp(secs: f64) -> u64 {
169 (secs * 1000000.0f64) as u64
170}
171
172#[must_use]
173pub fn ms_to_us(ms: u32) -> u64 {
174 (ms as u64) * 1000u64
175}
176
177pub fn us_to_ms(us: u64) -> Result<u32, String> {
178 u32::try_from(us / 1000u64).map_err(|e| format!("could not convert microseconds: {}", e))
179}
180
181#[must_use]
183pub fn retry_falloff_log(
184 last_us: u64,
185 cur_us: u64,
186 interval_start_us: u64,
187 interval_max_us: u64,
188 interval_multiplier_us: f64,
189) -> bool {
190 if cur_us < interval_start_us {
192 false
194 } else if cur_us >= last_us + interval_max_us {
195 true
197 } else {
198 last_us <= secs_to_timestamp(timestamp_to_secs(cur_us) / interval_multiplier_us)
200 }
201}
202
203pub fn try_at_most_n_things<T, I, C, R>(max: usize, things: I, closure: C) -> Option<R>
204where
205 I: IntoIterator<Item = T>,
206 C: Fn(T) -> Option<R>,
207{
208 let mut fails = 0usize;
209 for thing in things.into_iter() {
210 if let Some(r) = closure(thing) {
211 return Some(r);
212 }
213 fails += 1;
214 if fails >= max {
215 break;
216 }
217 }
218 None
219}
220
221pub async fn async_try_at_most_n_things<T, I, C, R, F>(
222 max: usize,
223 things: I,
224 closure: C,
225) -> Option<R>
226where
227 I: IntoIterator<Item = T>,
228 C: Fn(T) -> F,
229 F: Future<Output = Option<R>>,
230{
231 let mut fails = 0usize;
232 for thing in things.into_iter() {
233 if let Some(r) = closure(thing).await {
234 return Some(r);
235 }
236 fails += 1;
237 if fails >= max {
238 break;
239 }
240 }
241 None
242}
243
244pub trait CmpAssign {
245 fn min_assign(&mut self, other: Self);
246 fn max_assign(&mut self, other: Self);
247}
248
249impl<T> CmpAssign for T
250where
251 T: core::cmp::Ord,
252{
253 fn min_assign(&mut self, other: Self) {
254 if &other < self {
255 *self = other;
256 }
257 }
258 fn max_assign(&mut self, other: Self) {
259 if &other > self {
260 *self = other;
261 }
262 }
263}
264
265#[must_use]
266pub fn compatible_unspecified_socket_addr(socket_addr: &SocketAddr) -> SocketAddr {
267 match socket_addr {
268 SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
269 SocketAddr::V6(_) => SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), 0),
270 }
271}
272
273cfg_if! {
274 if #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] {
275 use std::net::UdpSocket;
276
277 static IPV6_IS_SUPPORTED: Mutex<Option<bool>> = Mutex::new(None);
278
279 pub fn is_ipv6_supported() -> bool {
280 let mut opt_supp = IPV6_IS_SUPPORTED.lock();
281 if let Some(supp) = *opt_supp {
282 return supp;
283 }
284 let supp = UdpSocket::bind(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 0, 0, 0)).is_ok();
286 *opt_supp = Some(supp);
287 supp
288 }
289 }
290}
291
292#[must_use]
293pub fn available_unspecified_addresses() -> Vec<IpAddr> {
294 if is_ipv6_supported() {
295 vec![
296 IpAddr::V4(Ipv4Addr::UNSPECIFIED),
297 IpAddr::V6(Ipv6Addr::UNSPECIFIED),
298 ]
299 } else {
300 vec![IpAddr::V4(Ipv4Addr::UNSPECIFIED)]
301 }
302}
303
304pub fn listen_address_to_socket_addrs(listen_address: &str) -> Result<Vec<SocketAddr>, String> {
305 let ip_addrs = available_unspecified_addresses();
309
310 Ok(if let Some(portstr) = listen_address.strip_prefix(':') {
311 let port = portstr
312 .parse::<u16>()
313 .map_err(|e| format!("Invalid port format in udp listen address: {}", e))?;
314 ip_addrs.iter().map(|a| SocketAddr::new(*a, port)).collect()
315 } else if let Ok(port) = listen_address.parse::<u16>() {
316 ip_addrs.iter().map(|a| SocketAddr::new(*a, port)).collect()
317 } else {
318 let listen_address_with_port = if listen_address.contains(':') {
319 listen_address.to_string()
320 } else {
321 format!("{}:0", listen_address)
322 };
323 cfg_if! {
324 if #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] {
325 use core::str::FromStr;
326 vec![SocketAddr::from_str(&listen_address_with_port).map_err(|e| format!("Unable to parse address: {}",e))?]
327 } else {
328 listen_address_with_port
329 .to_socket_addrs()
330 .map_err(|e| format!("Unable to resolve address: {}", e))?
331 .collect()
332 }
333 }
334 })
335}
336
337pub trait RemoveDuplicates<T: PartialEq + Clone> {
339 fn remove_duplicates(&mut self);
340}
341
342impl<T: PartialEq + Clone> RemoveDuplicates<T> for Vec<T> {
343 fn remove_duplicates(&mut self) {
344 let mut already_seen = Vec::new();
345 self.retain(|item| match already_seen.contains(item) {
346 true => false,
347 _ => {
348 already_seen.push(item.clone());
349 true
350 }
351 })
352 }
353}
354
355cfg_if::cfg_if! {
356 if #[cfg(unix)] {
357 use std::os::unix::fs::MetadataExt;
358 use std::os::unix::prelude::PermissionsExt;
359 use nix::unistd::{Uid, Gid};
360
361 pub fn ensure_file_private_owner<P:AsRef<Path>>(path: P) -> Result<(), String>
362 {
363 let path = path.as_ref();
364 if !path.is_file() {
365 return Ok(());
366 }
367
368 let uid = Uid::effective();
369 let gid = Gid::effective();
370 let meta = std::fs::metadata(path).map_err(|e| format!("unable to get metadata for path: {}", e))?;
371
372 if meta.mode() != 0o600 {
373 std::fs::set_permissions(path,std::fs::Permissions::from_mode(0o600)).map_err(|e| format!("unable to set correct permissions on path: {}", e))?;
374 }
375 if meta.uid() != uid.as_raw() || meta.gid() != gid.as_raw() {
376 return Err("path has incorrect owner/group".to_owned());
377 }
378 Ok(())
379 }
380
381 pub fn ensure_directory_private_owner<P:AsRef<Path>>(path: P, group_read: bool) -> Result<(), String>
382 {
383 let path = path.as_ref();
384 if !path.is_dir() {
385 return Ok(());
386 }
387
388 let uid = Uid::effective();
389 let gid = Gid::effective();
390 let meta = std::fs::metadata(path).map_err(|e| format!("unable to get metadata for path: {}", e))?;
391
392 let perm = if group_read {
393 0o750
394 } else {
395 0o700
396 };
397
398 if meta.mode() != perm {
399 std::fs::set_permissions(path,std::fs::Permissions::from_mode(perm)).map_err(|e| format!("unable to set correct permissions on path: {}", e))?;
400 }
401 if meta.uid() != uid.as_raw() || meta.gid() != gid.as_raw() {
402 return Err("path has incorrect owner/group".to_owned());
403 }
404 Ok(())
405 }
406 } else if #[cfg(windows)] {
407 pub fn ensure_file_private_owner<P:AsRef<Path>>(path: P) -> Result<(), String>
411 {
412 let path = path.as_ref();
413 if !path.is_file() {
414 return Ok(());
415 }
416
417 Ok(())
418 }
419
420 pub fn ensure_directory_private_owner<P:AsRef<Path>>(path: P, _group_read: bool) -> Result<(), String>
421 {
422 let path = path.as_ref();
423 if !path.is_dir() {
424 return Ok(());
425 }
426
427 Ok(())
428 }
429
430 } else {
431 pub fn ensure_file_private_owner<P:AsRef<Path>>(path: P) -> Result<(), String>
432 {
433 let path = path.as_ref();
434 if !path.is_file() {
435 return Ok(());
436 }
437
438 Ok(())
439 }
440
441 pub fn ensure_directory_private_owner<P:AsRef<Path>>(path: P, _group_read: bool) -> Result<(), String>
442 {
443 let path = path.as_ref();
444 if !path.is_dir() {
445 return Ok(());
446 }
447
448 Ok(())
449 }
450 }
451}
452
453#[repr(C, align(8))]
454struct AlignToEight([u8; 8]);
455
456#[must_use]
459pub unsafe fn aligned_8_u8_vec_uninit(n_bytes: usize) -> Vec<u8> {
460 let n_units = n_bytes.div_ceil(mem::size_of::<AlignToEight>());
461 let mut aligned: Vec<AlignToEight> = Vec::with_capacity(n_units);
462 let ptr = aligned.as_mut_ptr();
463 let cap_units = aligned.capacity();
464 mem::forget(aligned);
465
466 Vec::from_raw_parts(
467 ptr as *mut u8,
468 n_bytes,
469 cap_units * mem::size_of::<AlignToEight>(),
470 )
471}
472
473#[must_use]
476pub unsafe fn unaligned_u8_vec_uninit(n_bytes: usize) -> Vec<u8> {
477 let mut unaligned: Vec<u8> = Vec::with_capacity(n_bytes);
478 let ptr = unaligned.as_mut_ptr();
479 mem::forget(unaligned);
480
481 Vec::from_raw_parts(ptr, n_bytes, n_bytes)
482}
483
484#[must_use]
485pub fn debug_backtrace() -> String {
486 let bt = backtrace::Backtrace::new();
487 format!("{:?}", bt)
488}
489
490pub fn debug_print_backtrace() {
491 if is_debug_backtrace_enabled() {
492 debug!("{}", debug_backtrace());
493 }
494}
495
496#[must_use]
497pub fn is_debug_backtrace_enabled() -> bool {
498 cfg_if! {
499 if #[cfg(debug_assertions)] {
500 cfg_if! {
501 if #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] {
502 let rbenv = get_wasm_global_string_value("RUST_BACKTRACE").unwrap_or_default();
503 }
504 else
505 {
506 let rbenv = std::env::var("RUST_BACKTRACE").unwrap_or_default();
507 }
508 }
509 rbenv == "1" || rbenv == "full"
510 } else {
511 false
512 }
513 }
514}
515
516#[track_caller]
517pub fn debug_duration<R, F: Future<Output = R>, T: FnOnce() -> F>(
518 f: T,
519 opt_timeout_us: Option<u64>,
520) -> impl Future<Output = R> {
521 let location = core::panic::Location::caller();
522 async move {
523 let t1 = get_timestamp();
524 let out = f().await;
525 let t2 = get_timestamp();
526 let duration_us = t2 - t1;
527 if let Some(timeout_us) = opt_timeout_us {
528 if duration_us > timeout_us {
529 #[cfg(not(feature = "debug-duration-timeout"))]
530 debug!(
531 "Excessive duration: {}\n{:?}",
532 display_duration(duration_us),
533 backtrace::Backtrace::new()
534 );
535 #[cfg(feature = "debug-duration-timeout")]
536 panic!(format!(
537 "Duration panic timeout exceeded: {}",
538 display_duration(duration_us)
539 ));
540 }
541 } else {
542 debug!("Duration: {} = {}", location, display_duration(duration_us),);
543 }
544 out
545 }
546}
547
548pub fn type_name_of_val<T: ?Sized>(_val: &T) -> &'static str {
549 std::any::type_name::<T>()
550}
551
552pub fn map_to_string<X: ToString>(arg: X) -> String {
553 arg.to_string()
554}
555
556pub struct DebugGuard {
559 name: &'static str,
560 counter: &'static AtomicUsize,
561}
562
563impl DebugGuard {
564 pub fn new(name: &'static str, counter: &'static AtomicUsize) -> Self {
565 let c = counter.fetch_add(1, Ordering::SeqCst);
566 eprintln!("{} entered: {}", name, c + 1);
567 Self { name, counter }
568 }
569}
570
571impl Drop for DebugGuard {
572 fn drop(&mut self) {
573 let c = self.counter.fetch_sub(1, Ordering::SeqCst);
574 eprintln!("{} exited: {}", self.name, c - 1);
575 }
576}