1use super::*;
2
3use std::io;
4use std::path::Path;
5
6#[macro_export]
9macro_rules! assert_err {
10 ($ex:expr) => {
11 if let Ok(v) = $ex {
12 panic!("assertion failed, expected Err(..), got {:?}", v);
13 }
14 };
15}
16
17#[macro_export]
18macro_rules! io_error_other {
19 ($msg:expr) => {
20 io::Error::new(io::ErrorKind::Other, $msg.to_string())
21 };
22}
23
24pub fn to_io_error_other<E: std::error::Error + Send + Sync + 'static>(x: E) -> io::Error {
25 io::Error::new(io::ErrorKind::Other, x)
26}
27
28#[macro_export]
29macro_rules! bail_io_error_other {
30 ($msg:expr) => {
31 return io::Result::Err(io::Error::new(io::ErrorKind::Other, $msg.to_string()))
32 };
33}
34
35cfg_if::cfg_if! {
36 if #[cfg(feature="rt-tokio")] {
37 #[macro_export]
38 macro_rules! asyncmutex_try_lock {
39 ($x:expr) => {
40 $x.try_lock().ok()
41 };
42 }
43
44 #[macro_export]
45 macro_rules! asyncmutex_lock_arc {
46 ($x:expr) => {
47 $x.clone().lock_owned().await
48 };
49 }
50
51 #[macro_export]
52 macro_rules! asyncmutex_try_lock_arc {
53 ($x:expr) => {
54 $x.clone().try_lock_owned().ok()
55 };
56 }
57
58 } else {
72 #[macro_export]
73 macro_rules! asyncmutex_try_lock {
74 ($x:expr) => {
75 $x.try_lock()
76 };
77 }
78 #[macro_export]
79 macro_rules! asyncmutex_lock_arc {
80 ($x:expr) => {
81 $x.lock_arc().await
82 };
83 }
84 #[macro_export]
85 macro_rules! asyncmutex_try_lock_arc {
86 ($x:expr) => {
87 $x.try_lock_arc()
88 };
89 }
90
91 }
92}
93
94#[macro_export]
95macro_rules! asyncrwlock_try_read {
96 ($x:expr) => {
97 $x.try_read()
98 };
99}
100#[macro_export]
101macro_rules! asyncrwlock_try_write {
102 ($x:expr) => {
103 $x.try_write()
104 };
105}
106
107#[macro_export]
108macro_rules! asyncrwlock_try_read_arc {
109 ($x:expr) => {
110 $x.try_read_arc()
111 };
112}
113#[macro_export]
114macro_rules! asyncrwlock_try_write_arc {
115 ($x:expr) => {
116 $x.try_write_arc()
117 };
118}
119
120pub fn system_boxed<'a, Out>(
123 future: impl Future<Output = Out> + Send + 'a,
124) -> SendPinBoxFutureLifetime<'a, Out> {
125 Box::pin(future)
126}
127
128cfg_if! {
131 if #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] {
132 pub fn get_concurrency() -> u32 {
133 std::thread::available_parallelism()
134 .map(|x| x.get())
135 .unwrap_or_else(|e| {
136 warn!("unable to get concurrency defaulting to single core: {}", e);
137 1
138 }) as u32
139 }
140 }
141}
142
143pub fn split_port(name: &str) -> Result<(String, Option<u16>), String> {
146 if let Some(split) = name.rfind(':') {
147 let hoststr = &name[0..split];
148 let portstr = &name[split + 1..];
149 let port: u16 = portstr
150 .parse::<u16>()
151 .map_err(|e| format!("invalid port: {}", e))?;
152
153 Ok((hoststr.to_string(), Some(port)))
154 } else {
155 Ok((name.to_string(), None))
156 }
157}
158
159pub fn prepend_slash(s: String) -> String {
160 if s.starts_with('/') {
161 return s;
162 }
163 let mut out = "/".to_owned();
164 out.push_str(s.as_str());
165 out
166}
167
168pub fn timestamp_to_secs(ts: u64) -> f64 {
169 ts as f64 / 1000000.0f64
170}
171
172pub fn secs_to_timestamp(secs: f64) -> u64 {
173 (secs * 1000000.0f64) as u64
174}
175
176pub fn ms_to_us(ms: u32) -> u64 {
177 (ms as u64) * 1000u64
178}
179
180pub fn us_to_ms(us: u64) -> Result<u32, String> {
181 u32::try_from(us / 1000u64).map_err(|e| format!("could not convert microseconds: {}", e))
182}
183
184pub fn retry_falloff_log(
186 last_us: u64,
187 cur_us: u64,
188 interval_start_us: u64,
189 interval_max_us: u64,
190 interval_multiplier_us: f64,
191) -> bool {
192 if cur_us < interval_start_us {
194 false
196 } else if cur_us >= last_us + interval_max_us {
197 true
199 } else {
200 last_us <= secs_to_timestamp(timestamp_to_secs(cur_us) / interval_multiplier_us)
202 }
203}
204
205pub fn try_at_most_n_things<T, I, C, R>(max: usize, things: I, closure: C) -> Option<R>
206where
207 I: IntoIterator<Item = T>,
208 C: Fn(T) -> Option<R>,
209{
210 let mut fails = 0usize;
211 for thing in things.into_iter() {
212 if let Some(r) = closure(thing) {
213 return Some(r);
214 }
215 fails += 1;
216 if fails >= max {
217 break;
218 }
219 }
220 None
221}
222
223pub async fn async_try_at_most_n_things<T, I, C, R, F>(
224 max: usize,
225 things: I,
226 closure: C,
227) -> Option<R>
228where
229 I: IntoIterator<Item = T>,
230 C: Fn(T) -> F,
231 F: Future<Output = Option<R>>,
232{
233 let mut fails = 0usize;
234 for thing in things.into_iter() {
235 if let Some(r) = closure(thing).await {
236 return Some(r);
237 }
238 fails += 1;
239 if fails >= max {
240 break;
241 }
242 }
243 None
244}
245
246pub trait CmpAssign {
247 fn min_assign(&mut self, other: Self);
248 fn max_assign(&mut self, other: Self);
249}
250
251impl<T> CmpAssign for T
252where
253 T: core::cmp::Ord,
254{
255 fn min_assign(&mut self, other: Self) {
256 if &other < self {
257 *self = other;
258 }
259 }
260 fn max_assign(&mut self, other: Self) {
261 if &other > self {
262 *self = other;
263 }
264 }
265}
266
267pub fn compatible_unspecified_socket_addr(socket_addr: &SocketAddr) -> SocketAddr {
268 match socket_addr {
269 SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
270 SocketAddr::V6(_) => SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), 0),
271 }
272}
273
274cfg_if! {
275 if #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] {
276 use std::net::UdpSocket;
277
278 static IPV6_IS_SUPPORTED: Mutex<Option<bool>> = Mutex::new(None);
279
280 pub fn is_ipv6_supported() -> bool {
281 let mut opt_supp = IPV6_IS_SUPPORTED.lock();
282 if let Some(supp) = *opt_supp {
283 return supp;
284 }
285 let supp = UdpSocket::bind(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 0, 0, 0)).is_ok();
287 *opt_supp = Some(supp);
288 supp
289 }
290 }
291}
292
293pub fn available_unspecified_addresses() -> Vec<IpAddr> {
294 if is_ipv6_supported() {
295 vec![
296 IpAddr::V4(Ipv4Addr::UNSPECIFIED),
297 IpAddr::V6(Ipv6Addr::UNSPECIFIED),
298 ]
299 } else {
300 vec![IpAddr::V4(Ipv4Addr::UNSPECIFIED)]
301 }
302}
303
304pub fn listen_address_to_socket_addrs(listen_address: &str) -> Result<Vec<SocketAddr>, String> {
305 let ip_addrs = available_unspecified_addresses();
309
310 Ok(if let Some(portstr) = listen_address.strip_prefix(':') {
311 let port = portstr
312 .parse::<u16>()
313 .map_err(|e| format!("Invalid port format in udp listen address: {}", e))?;
314 ip_addrs.iter().map(|a| SocketAddr::new(*a, port)).collect()
315 } else if let Ok(port) = listen_address.parse::<u16>() {
316 ip_addrs.iter().map(|a| SocketAddr::new(*a, port)).collect()
317 } else {
318 let listen_address_with_port = if listen_address.contains(':') {
319 listen_address.to_string()
320 } else {
321 format!("{}:0", listen_address)
322 };
323 cfg_if! {
324 if #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] {
325 use core::str::FromStr;
326 vec![SocketAddr::from_str(&listen_address_with_port).map_err(|e| format!("Unable to parse address: {}",e))?]
327 } else {
328 listen_address_with_port
329 .to_socket_addrs()
330 .map_err(|e| format!("Unable to resolve address: {}", e))?
331 .collect()
332 }
333 }
334 })
335}
336
337pub trait RemoveDuplicates<T: PartialEq + Clone> {
339 fn remove_duplicates(&mut self);
340}
341
342impl<T: PartialEq + Clone> RemoveDuplicates<T> for Vec<T> {
343 fn remove_duplicates(&mut self) {
344 let mut already_seen = Vec::new();
345 self.retain(|item| match already_seen.contains(item) {
346 true => false,
347 _ => {
348 already_seen.push(item.clone());
349 true
350 }
351 })
352 }
353}
354
355cfg_if::cfg_if! {
356 if #[cfg(unix)] {
357 use std::os::unix::fs::MetadataExt;
358 use std::os::unix::prelude::PermissionsExt;
359 use nix::unistd::{Uid, Gid};
360
361 pub fn ensure_file_private_owner<P:AsRef<Path>>(path: P) -> Result<(), String>
362 {
363 let path = path.as_ref();
364 if !path.is_file() {
365 return Ok(());
366 }
367
368 let uid = Uid::effective();
369 let gid = Gid::effective();
370 let meta = std::fs::metadata(path).map_err(|e| format!("unable to get metadata for path: {}", e))?;
371
372 if meta.mode() != 0o600 {
373 std::fs::set_permissions(path,std::fs::Permissions::from_mode(0o600)).map_err(|e| format!("unable to set correct permissions on path: {}", e))?;
374 }
375 if meta.uid() != uid.as_raw() || meta.gid() != gid.as_raw() {
376 return Err("path has incorrect owner/group".to_owned());
377 }
378 Ok(())
379 }
380
381 pub fn ensure_directory_private_owner<P:AsRef<Path>>(path: P, group_read: bool) -> Result<(), String>
382 {
383 let path = path.as_ref();
384 if !path.is_dir() {
385 return Ok(());
386 }
387
388 let uid = Uid::effective();
389 let gid = Gid::effective();
390 let meta = std::fs::metadata(path).map_err(|e| format!("unable to get metadata for path: {}", e))?;
391
392 let perm = if group_read {
393 0o750
394 } else {
395 0o700
396 };
397
398 if meta.mode() != perm {
399 std::fs::set_permissions(path,std::fs::Permissions::from_mode(perm)).map_err(|e| format!("unable to set correct permissions on path: {}", e))?;
400 }
401 if meta.uid() != uid.as_raw() || meta.gid() != gid.as_raw() {
402 return Err("path has incorrect owner/group".to_owned());
403 }
404 Ok(())
405 }
406 } else if #[cfg(windows)] {
407 pub fn ensure_file_private_owner<P:AsRef<Path>>(path: P) -> Result<(), String>
411 {
412 let path = path.as_ref();
413 if !path.is_file() {
414 return Ok(());
415 }
416
417 Ok(())
418 }
419
420 pub fn ensure_directory_private_owner<P:AsRef<Path>>(path: P, _group_read: bool) -> Result<(), String>
421 {
422 let path = path.as_ref();
423 if !path.is_dir() {
424 return Ok(());
425 }
426
427 Ok(())
428 }
429
430 } else {
431 pub fn ensure_file_private_owner<P:AsRef<Path>>(path: P) -> Result<(), String>
432 {
433 let path = path.as_ref();
434 if !path.is_file() {
435 return Ok(());
436 }
437
438 Ok(())
439 }
440
441 pub fn ensure_directory_private_owner<P:AsRef<Path>>(path: P, _group_read: bool) -> Result<(), String>
442 {
443 let path = path.as_ref();
444 if !path.is_dir() {
445 return Ok(());
446 }
447
448 Ok(())
449 }
450 }
451}
452
453#[repr(C, align(8))]
454struct AlignToEight([u8; 8]);
455
456pub unsafe fn aligned_8_u8_vec_uninit(n_bytes: usize) -> Vec<u8> {
459 let n_units = (n_bytes + mem::size_of::<AlignToEight>() - 1) / mem::size_of::<AlignToEight>();
460 let mut aligned: Vec<AlignToEight> = Vec::with_capacity(n_units);
461 let ptr = aligned.as_mut_ptr();
462 let cap_units = aligned.capacity();
463 mem::forget(aligned);
464
465 Vec::from_raw_parts(
466 ptr as *mut u8,
467 n_bytes,
468 cap_units * mem::size_of::<AlignToEight>(),
469 )
470}
471
472pub unsafe fn unaligned_u8_vec_uninit(n_bytes: usize) -> Vec<u8> {
475 let mut unaligned: Vec<u8> = Vec::with_capacity(n_bytes);
476 let ptr = unaligned.as_mut_ptr();
477 mem::forget(unaligned);
478
479 Vec::from_raw_parts(ptr, n_bytes, n_bytes)
480}
481
482pub fn debug_backtrace() -> String {
483 let bt = backtrace::Backtrace::new();
484 format!("{:?}", bt)
485}
486
487pub fn debug_print_backtrace() {
488 if is_debug_backtrace_enabled() {
489 debug!("{}", debug_backtrace());
490 }
491}
492
493pub fn is_debug_backtrace_enabled() -> bool {
494 cfg_if! {
495 if #[cfg(debug_assertions)] {
496 cfg_if! {
497 if #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] {
498 let rbenv = get_wasm_global_string_value("RUST_BACKTRACE").unwrap_or_default();
499 }
500 else
501 {
502 let rbenv = std::env::var("RUST_BACKTRACE").unwrap_or_default();
503 }
504 }
505 rbenv == "1" || rbenv == "full"
506 } else {
507 false
508 }
509 }
510}
511
512#[track_caller]
513pub fn debug_duration<R, F: Future<Output = R>, T: FnOnce() -> F>(f: T) -> impl Future<Output = R> {
514 let location = std::panic::Location::caller();
515 async move {
516 let t1 = get_timestamp();
517 let out = f().await;
518 let t2 = get_timestamp();
519 debug!("duration@{}: {}", location, display_duration(t2 - t1));
520 out
521 }
522}
523
524pub fn type_name_of_val<T: ?Sized>(_val: &T) -> &'static str {
525 std::any::type_name::<T>()
526}
527
528pub fn map_to_string<X: ToString>(arg: X) -> String {
529 arg.to_string()
530}
531
532pub struct DebugGuard {
535 name: &'static str,
536 counter: &'static AtomicUsize,
537}
538
539impl DebugGuard {
540 pub fn new(name: &'static str, counter: &'static AtomicUsize) -> Self {
541 let c = counter.fetch_add(1, Ordering::SeqCst);
542 eprintln!("{} entered: {}", name, c + 1);
543 Self { name, counter }
544 }
545}
546
547impl Drop for DebugGuard {
548 fn drop(&mut self) {
549 let c = self.counter.fetch_sub(1, Ordering::SeqCst);
550 eprintln!("{} exited: {}", self.name, c - 1);
551 }
552}