oneringbuf 0.7.0

A lock-free single-producer, single-consumer (SPSC) ring buffer with in-place mutability, asynchronous support, and virtual memory optimisation.
Documentation
#[cfg(cpal)]
fn main() {
    use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
    use cpal::{InputCallbackInfo, OutputCallbackInfo, StreamConfig};
    use oneringbuf::ORBIterator;
    use std::sync::Arc;
    use std::sync::atomic::AtomicBool;
    use std::sync::atomic::Ordering::Relaxed;
    use std::thread;

    const BUF_SIZE: usize = 4096 * 50;
    const DELAY_MS: usize = 500;
    const DECAY: f32 = 0.5;

    let host = cpal::default_host();
    let (in_dev, out_dev) = (
        host.default_input_device().unwrap(),
        host.default_output_device().unwrap(),
    );
    let (in_cfg, out_cfg) = (
        StreamConfig::from(in_dev.default_input_config().unwrap()),
        StreamConfig::from(out_dev.default_output_config().unwrap()),
    );

    let delay_samples = DELAY_MS * (in_cfg.sample_rate.0 as usize / 1000);

    #[cfg(feature = "alloc")]
    let buf = oneringbuf::SharedHeapRBMut::from(vec![0.; BUF_SIZE]);
    #[cfg(not(feature = "alloc"))]
    let mut buf = oneringbuf::SharedStackRBMut::<f32, BUF_SIZE>::default();

    let (mut prod, mut work, mut cons) = buf.split_mut();

    let stop_worker = Arc::new(AtomicBool::new(false));
    let stop_clone = stop_worker.clone();
    let worker = thread::spawn(move || {
        let mut acc = vec![0f32; delay_samples];

        while !stop_clone.load(Relaxed) {
            if let Some((h, t)) = work.get_mut_slice_exact(delay_samples) {
                let len = h.len() + t.len();
                h.swap_with_slice(&mut acc[..h.len()]);
                t.swap_with_slice(&mut acc[h.len()..]);

                for (v, w) in h.iter_mut().chain(t).zip(&acc) {
                    *v *= DECAY;
                    *v += *w;
                }

                unsafe { work.advance(len) };
            }
        }

        // work was moved by thread::spawn, so, returning it here will allow to use it after call to join
        work
    });

    let in_stream = in_dev
        .build_input_stream(
            &in_cfg,
            move |slice: &[f32], _info: &InputCallbackInfo| {
                if prod.push_slice(slice).is_none() {
                    println!("Input iter fell behind!");
                }
            },
            move |err| println!("INPUT ERROR: {}", err),
            None,
        )
        .expect("Cannot create input stream");

    let out_stream = out_dev
        .build_output_stream(
            &out_cfg,
            move |slice: &mut [f32], _info: &OutputCallbackInfo| {
                let len = slice.len();

                if let Some((h, t)) = cons.peek_slice(len) {
                    slice[..h.len()].copy_from_slice(h);
                    slice[h.len()..].copy_from_slice(t);

                    unsafe { cons.advance(len) };
                }
            },
            move |err| println!("OUTPUT ERROR: {}", err),
            None,
        )
        .expect("Cannot create output stream");

    in_stream.play().unwrap();
    out_stream.play().unwrap();

    println!("Playing for 15 seconds... ");
    thread::sleep(std::time::Duration::from_secs(15));
    stop_worker.store(true, Relaxed); // Stop worker thread

    let work = worker.join().unwrap();

    drop(in_stream); // This also drops prod
    assert_eq!(work.alive_iters(), 2);
    drop(out_stream); // This also drops cons
    assert_eq!(work.alive_iters(), 1);
    drop(work); // Drop work

    // => All iterators are now dropped, so buf gets dropped as well.
}

#[cfg(not(cpal))]
pub fn main() {
    eprintln!("To run this example, please, use the following command:");
    println!(r#"RUSTFLAGS="--cfg cpal" cargo run --example cpal"#);
}