use tokio::{
runtime::Builder,
sync::{mpsc, Barrier},
};
use tracing::{info_span, Instrument};
use tracing_subscriber::{layer::SubscriberExt, Registry};
use tracking_allocator::{
AllocationGroupId, AllocationGroupToken, AllocationLayer, AllocationRegistry,
AllocationTracker, Allocator,
};
use std::{alloc::System, sync::Arc};
#[global_allocator]
static GLOBAL: Allocator<System> = Allocator::system();
struct StdoutTracker;
impl AllocationTracker for StdoutTracker {
fn allocated(
&self,
addr: usize,
object_size: usize,
wrapped_size: usize,
group_id: AllocationGroupId,
) {
println!(
"allocation -> addr=0x{:0x} object_size={} wrapped_size={} group_id={:?}",
addr, object_size, wrapped_size, group_id
);
}
fn deallocated(
&self,
addr: usize,
object_size: usize,
wrapped_size: usize,
source_group_id: AllocationGroupId,
current_group_id: AllocationGroupId,
) {
println!(
"deallocation -> addr=0x{:0x} object_size={} wrapped_size={} source_group_id={:?} current_group_id={:?}",
addr, object_size, wrapped_size, source_group_id, current_group_id
);
}
}
fn main() {
let registry = Registry::default().with(AllocationLayer::new());
tracing::subscriber::set_global_default(registry)
.expect("failed to install tracing subscriber");
let _ = AllocationRegistry::set_global_tracker(StdoutTracker)
.expect("no other global tracker should be set yet");
let task1_token =
AllocationGroupToken::register().expect("failed to register allocation group");
let task2_token =
AllocationGroupToken::register().expect("failed to register allocation group");
AllocationRegistry::enable_tracking();
let basic_rt = Builder::new_current_thread()
.build()
.expect("failed to build current-thread runtime");
basic_rt.block_on(async move {
let barrier1 = Arc::new(Barrier::new(2));
let barrier2 = Arc::clone(&barrier1);
let (tx1, rx2) = mpsc::channel(1);
let (tx2, rx1) = mpsc::channel(1);
let task1_span = info_span!("task1");
task1_token.attach_to_span(&task1_span);
let task1 = ping_pong(barrier1, 16, tx1, rx1).instrument(task1_span);
let task2_span = info_span!("task2");
task2_token.attach_to_span(&task2_span);
let task2 = ping_pong(barrier2, 128, tx2, rx2).instrument(task2_span);
let handle1 = tokio::spawn(task1);
let handle2 = tokio::spawn(task2);
let _ = handle1.await.expect("task1 panicked unexpectedly");
let _ = handle2.await.expect("task2 panicked unexpectedly");
println!("Done.");
});
AllocationRegistry::disable_tracking();
}
async fn ping_pong(
barrier: Arc<Barrier>,
buf_size: usize,
tx: mpsc::Sender<Vec<String>>,
mut rx: mpsc::Receiver<Vec<String>>,
) {
barrier.wait().await;
let mut counter = 3;
while counter > 0 {
let buf: Vec<String> = Vec::with_capacity(buf_size);
let _ = tx.send(buf).await.expect("tx send should not fail");
let their_buf = rx.recv().await.expect("rx recv should not be empty");
drop(their_buf);
counter -= 1;
}
}