#[cfg(all(
feature = "simd-per-arch",
feature = "opt-simd-bucket-aggregation",
feature = "detect-features",
any(target_arch = "x86", target_arch = "x86_64")
))]
use std::arch::is_x86_feature_detected;
#[cfg(all(
feature = "simd-per-arch",
feature = "opt-simd-bucket-aggregation",
feature = "detect-features",
any(target_arch = "x86", target_arch = "x86_64")
))]
use std::sync::OnceLock;
#[allow(dead_code)]
mod portable_simd;
mod wasm32_simd128;
mod x86_avx2;
mod x86_sse2;
mod x86_ssse3;
#[cfg(all(test, feature = "tests-slow"))]
mod fuzzer;
#[allow(dead_code)]
pub(crate) mod naive {
#[inline(always)]
pub(super) const fn get_quartile(value: u32, q1: u32, q2: u32, q3: u32) -> u8 {
debug_assert!(q1 <= q2);
debug_assert!(q2 <= q3);
if value > q3 {
3
} else if value > q2 {
2
} else if value > q1 {
1
} else {
0
}
}
macro_rules! aggregation_func_template {
{$($name:ident = ($size_small:literal, $size_large:literal);)*} => {
$(
#[doc = concat!(
"Aggregate ",
stringify!($size_large),
" buckets into the ",
stringify!($size_small),
"-byte digest based on three quartiles.\n",
"\n",
"This function requires that:\n",
"* `q1 <= q2`\n",
"* `q2 <= q3`"
)]
#[inline]
pub fn $name(out: &mut [u8; $size_small], buckets: &[u32; $size_large], q1: u32, q2: u32, q3: u32) {
for (out, subbuckets) in out.iter_mut().rev().zip(buckets.as_slice().chunks_exact(4)) {
*out = subbuckets.iter().rev().fold(0u8, |x, &b| {
let q = get_quartile(b, q1, q2, q3);
x << 2 | q
});
}
}
)*
}
}
aggregation_func_template! {
aggregate_48 = (12, 48);
aggregate_128 = (32, 128);
aggregate_256 = (64, 256);
}
}
macro_rules! aggregation_func_template {
{$($name:ident = ($size_small:literal, $size_large:literal, $dispatch:ident);)*} => {
$(
#[doc = concat!(
stringify!($size_large),
"-bucket aggregation function (to be dynamically dispatched).\n",
"\n",
"By default, this is a reference to [`naive::aggregate_",
stringify!($size_large),
"()`].\n",
"\n",
"If the platform is detected to have specific features ",
"(e.g. SIMD instructions), this is overridden with a reference to the ",
"suitable function (or its wrapper)."
)]
#[allow(clippy::type_complexity)]
#[cfg(all(
feature = "simd-per-arch",
feature = "opt-simd-bucket-aggregation",
feature = "detect-features",
any(target_arch = "x86", target_arch = "x86_64")
))]
#[cfg_attr(
feature = "unstable",
doc(cfg(all(
feature = "simd-per-arch",
feature = "opt-simd-bucket-aggregation",
feature = "detect-features"
)))
)]
static $dispatch: OnceLock<
&'static (dyn Fn(&mut [u8; $size_small], &[u32; $size_large], u32, u32, u32) + Sync),
> = OnceLock::new();
#[doc = concat!(
"Aggregate ",
stringify!($size_large),
" buckets into the ",
stringify!($size_small),
"-byte digest based on three quartiles.\n",
"\n",
"This function requires that:\n",
"* `q1 <= q2`\n",
"* `q2 <= q3`"
)]
#[inline]
pub fn $name(out: &mut [u8; $size_small], buckets: &[u32; $size_large], q1: u32, q2: u32, q3: u32) {
debug_assert!(q1 <= q2);
debug_assert!(q2 <= q3);
cfg_if::cfg_if! {
if #[cfg(all(
feature = "simd-per-arch",
feature = "opt-simd-bucket-aggregation",
feature = "detect-features",
any(target_arch = "x86", target_arch = "x86_64")
))] {
$dispatch.get_or_init(|| {
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
{
if is_x86_feature_detected!("avx2") {
return &|out, buckets, q1, q2, q3| {
#[allow(unsafe_code)]
unsafe {
x86_avx2::$name(out, buckets, q1, q2, q3)
}
};
}
if is_x86_feature_detected!("ssse3") {
return &|out, buckets, q1, q2, q3| {
#[allow(unsafe_code)]
unsafe {
x86_ssse3::$name(out, buckets, q1, q2, q3)
}
};
}
if is_x86_feature_detected!("sse2") {
return &|out, buckets, q1, q2, q3| {
#[allow(unsafe_code)]
unsafe {
x86_sse2::$name(out, buckets, q1, q2, q3)
}
};
}
}
&naive::$name
})(out, buckets, q1, q2, q3)
}
else if #[cfg(all(
feature = "simd-per-arch",
feature = "opt-simd-bucket-aggregation",
not(feature = "detect-features"),
any(target_arch = "x86", target_arch = "x86_64"),
target_feature = "avx2"
))] {
#[allow(unsafe_code)]
unsafe {
x86_avx2::$name(out, buckets, q1, q2, q3)
}
}
else if #[cfg(all(
feature = "simd-per-arch",
feature = "opt-simd-bucket-aggregation",
not(feature = "detect-features"),
any(target_arch = "x86", target_arch = "x86_64"),
target_feature = "ssse3"
))] {
#[allow(unsafe_code)]
unsafe {
x86_ssse3::$name(out, buckets, q1, q2, q3)
}
}
else if #[cfg(all(
feature = "simd-per-arch",
feature = "opt-simd-bucket-aggregation",
not(feature = "detect-features"),
any(target_arch = "x86", target_arch = "x86_64"),
target_feature = "sse2"
))] {
#[allow(unsafe_code)]
unsafe {
x86_sse2::$name(out, buckets, q1, q2, q3)
}
}
else if #[cfg(all(
feature = "simd-per-arch",
feature = "opt-simd-bucket-aggregation",
target_arch = "wasm32",
target_feature = "simd128"
))] {
#[allow(unsafe_code)]
unsafe {
wasm32_simd128::$name(out, buckets, q1, q2, q3)
}
}
else if #[cfg(all(
feature = "simd-portable",
feature = "opt-simd-bucket-aggregation"
))] {
portable_simd::$name(out, buckets, q1, q2, q3)
}
else {
naive::$name(out, buckets, q1, q2, q3)
}
}
}
)*
}
}
aggregation_func_template! {
aggregate_48 = (12, 48, DISPATCH_AGGREGATE_48);
aggregate_128 = (32, 128, DISPATCH_AGGREGATE_128);
aggregate_256 = (64, 256, DISPATCH_AGGREGATE_256);
}
mod tests;