local_buf 0.1.1

local_buf 是一个支持多线程异步的线程缓冲区
Documentation
#![cfg(feature = "stats")]

use crate::tests::*;
use crate::LocalBuf;
use rand::{thread_rng, Rng};
use std::sync::{
    atomic::{AtomicUsize, Ordering},
    Arc,
};
use std::time::Duration;
use tokio::sync::Barrier;

/// 异步场景:随机 buffer 大小 + 随机 sleep 触发(30% 概率),
/// 最接近真实异步 I/O 工作负载
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_cache_hit_rate_async_random() {
    let (start_a, start_h) = LocalBuf::stats();
    let total_ops = Arc::new(AtomicUsize::new(0));
    let barrier = Arc::new(Barrier::new(ASYNC_TASK_COUNT + 1));
    let handles: Vec<_> = (0..ASYNC_TASK_COUNT)
        .map(|_| {
            let total_ops = total_ops.clone();
            let barrier = barrier.clone();
            tokio::spawn(async move {
                barrier.wait().await;
                let data: Vec<(usize, bool)> = {
                    let mut rng = thread_rng();
                    (0..ASYNC_OPS_PER_TASK)
                        .map(|_| (rng.gen_range(32..16384), rng.gen_bool(0.3)))
                        .collect()
                };
                for (cap, should_sleep) in data {
                    let _buf = LocalBuf::with_capacity(cap);
                    if should_sleep {
                        for _ in 0..2 {
                            tokio::time::sleep(Duration::from_micros(1)).await;
                        }
                    }
                    total_ops.fetch_add(1, Ordering::Relaxed);
                }
            })
        })
        .collect();
    barrier.wait().await;
    for h in handles {
        h.await.unwrap();
    }
    let (end_a, end_h) = LocalBuf::stats();
    report(
        "异步随机(大小+频率)",
        end_a - start_a,
        end_h - start_h,
        total_ops.load(Ordering::Relaxed),
    );
}