dbsp 0.301.0

Continuous streaming analytics engine
Documentation
mod advance_retreat;
mod sample;
//pub(crate) mod tests;
pub(crate) mod binary_heap;
mod consolidation;
mod graph;
mod is_none;
mod sort;
mod supports_roaring;
pub mod tuple;

#[cfg(test)]
mod vec_ext;

#[cfg(test)]
pub(crate) mod test;

mod dot;

use memory_stats::memory_stats;
use std::cmp::Ordering;
use std::fmt::Debug;
use std::{fmt::Display, hint::unreachable_unchecked};

pub use advance_retreat::{
    advance, advance_erased, dyn_advance, dyn_retreat, retreat, retreat_erased,
};
pub use consolidation::{
    ConsolidatePairedSlices, consolidate, consolidate_from, consolidate_paired_slices,
    consolidate_payload_from, consolidate_slice,
};
pub use graph::components;

pub use is_none::IsNone;
pub use supports_roaring::SupportsRoaring;

#[allow(unused_imports)]
pub use dot::{DotEdgeAttributes, DotNodeAttributes};

#[cfg(test)]
pub use consolidation::consolidate_pairs;

use itertools::Itertools as _;
pub use sample::sample_slice;
pub use sort::{stable_sort, stable_sort_by};
pub use tuple::{
    ArchivedTup0, ArchivedTup1, ArchivedTup2, ArchivedTup3, ArchivedTup4, ArchivedTup5,
    ArchivedTup6, ArchivedTup7, ArchivedTup8, ArchivedTup9, ArchivedTup10, Tup0, Tup1, Tup2, Tup3,
    Tup4, Tup5, Tup6, Tup7, Tup8, Tup9, Tup10, TupleBitmap, TupleFormat,
};

// mod unstable_sort;

// FIXME: unstable sort implementation is currently broken; use stable sorting instead.
pub fn unstable_sort<T: Ord + Debug>(slice: &mut [T]) {
    stable_sort(slice)
}

// FIXME: unstable sort implementation is currently broken; use stable sorting instead.
pub fn unstable_sort_by<T, F>(slice: &mut [T], cmp: F)
where
    F: Fn(&T, &T) -> Ordering,
{
    stable_sort_by(slice, cmp)
}

#[cfg(test)]
pub(crate) use vec_ext::VecExt;

/// Tells the optimizer that a condition is always true
///
/// # Safety
///
/// It's UB to call this function with `false` as the condition
#[inline(always)]
#[deny(unsafe_op_in_unsafe_fn)]
#[cfg_attr(debug_assertions, track_caller)]
pub(crate) unsafe fn assume(cond: bool) {
    debug_assert!(cond, "called `assume()` on a false condition");

    if !cond {
        // Safety: It's UB for `cond` to be false
        unsafe { unreachable_unchecked() };
    }
}

#[cold]
#[inline(never)]
pub(crate) fn cursor_position_oob<P: Display>(position: P, length: usize) -> ! {
    panic!(
        "the cursor was at the invalid position {position} while the leaf was only {length} elements long"
    )
}

#[inline]
pub(crate) fn bytes_of<T>(slice: &[T]) -> &[std::mem::MaybeUninit<u8>] {
    // Safety: It's always sound to interpret possibly uninitialized bytes as
    // `MaybeUninit<u8>`
    unsafe { std::slice::from_raw_parts(slice.as_ptr().cast(), std::mem::size_of_val(slice)) }
}

/// Indent a string by the given number of spaces.
pub fn indent(s: &str, indent: usize) -> String {
    s.lines()
        .map(|line| format!("{:indent$}{line}", "", indent = indent))
        .join("\n")
}

/// Returns the current process RSS in bytes.
pub fn process_rss_bytes() -> Option<u64> {
    #[cfg(test)]
    if let Ok(mock_rss_bytes) = std::env::var("MOCK_PROCESS_RSS_BYTES")
        && let Ok(mock_rss_bytes) = mock_rss_bytes.parse::<u64>()
    {
        return Some(mock_rss_bytes);
    }

    memory_stats().map(|usage| usage.physical_mem as u64)
}

/// Results of capturing [tracing] output.
#[cfg(test)]
pub(crate) struct LogCapture<T> {
    /// Return value of the closure passed to [LogCapture::new].
    pub retval: T,

    /// Log output during [LogCapture::new].
    pub log: String,
}

#[cfg(test)]
impl<T> LogCapture<T> {
    /// Calls `f`, capturing [tracing] output to the log.
    pub fn new<F>(f: F) -> Self
    where
        F: FnOnce() -> T,
    {
        use std::{
            io::Write,
            sync::{Arc, Mutex},
        };

        use tracing::subscriber::with_default;

        #[derive(Debug, Default)]
        struct CaptureBuf(Mutex<Vec<u8>>);
        impl<'a> Write for &'a CaptureBuf {
            fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
                self.0.lock().unwrap().extend_from_slice(buf);
                Ok(buf.len())
            }
            fn flush(&mut self) -> std::io::Result<()> {
                Ok(())
            }
        }
        impl CaptureBuf {
            fn take(&self) -> String {
                String::from_utf8(std::mem::take(&mut *self.0.lock().unwrap())).unwrap()
            }
        }

        // Without the following line, capture occasionally fails to capture
        // anything at all.  There might be some kind of race between
        // initializing the global and per-thread subscribers.
        let _ = tracing_subscriber::fmt::try_init();

        let tracing_capture = Arc::new(CaptureBuf::default());
        let retval = with_default(
            tracing_subscriber::FmtSubscriber::builder()
                .with_ansi(false)
                .without_time()
                .with_writer(tracing_capture.clone())
                .finish(),
            f,
        );
        Self {
            log: tracing_capture.take(),
            retval,
        }
    }

    pub fn into_parts(self) -> (T, String) {
        (self.retval, self.log)
    }
}