#![cfg(test)]
#![expect(clippy::cast_possible_wrap)]
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
static LIVE_BYTES_GLOBAL: AtomicUsize = AtomicUsize::new(0);
thread_local! {
static LIVE_BYTES_IN_THREAD: AtomicUsize = const { AtomicUsize::new(0) };
}
pub struct TrackingAllocator {
allocator: std::alloc::System,
}
#[global_allocator]
pub static GLOBAL_ALLOCATOR: TrackingAllocator = TrackingAllocator {
allocator: std::alloc::System,
};
#[expect(unsafe_code)]
unsafe impl std::alloc::GlobalAlloc for TrackingAllocator {
unsafe fn alloc(&self, layout: std::alloc::Layout) -> *mut u8 {
LIVE_BYTES_IN_THREAD.with(|bytes| bytes.fetch_add(layout.size(), Relaxed));
LIVE_BYTES_GLOBAL.fetch_add(layout.size(), Relaxed);
unsafe { self.allocator.alloc(layout) }
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: std::alloc::Layout) {
LIVE_BYTES_IN_THREAD.with(|bytes| bytes.fetch_sub(layout.size(), Relaxed));
LIVE_BYTES_GLOBAL.fetch_sub(layout.size(), Relaxed);
unsafe { self.allocator.dealloc(ptr, layout) };
}
}
fn live_bytes_local() -> usize {
LIVE_BYTES_IN_THREAD.with(|bytes| bytes.load(Relaxed))
}
fn live_bytes_global() -> usize {
LIVE_BYTES_GLOBAL.load(Relaxed)
}
fn memory_use<R>(run: impl Fn() -> R) -> (usize, usize) {
let used_bytes_start_local = live_bytes_local();
let used_bytes_start_global = live_bytes_global();
let ret = run();
let bytes_used_local = live_bytes_local() - used_bytes_start_local;
let bytes_used_global = live_bytes_global() - used_bytes_start_global;
drop(ret);
(bytes_used_global, bytes_used_local)
}
use re_chunk::external::crossbeam::channel::TryRecvError;
use re_chunk::{BatcherHooks, ChunkBatcher, ChunkBatcherConfig, PendingRow};
use re_chunk_store::{ChunkStore, ChunkStoreConfig};
use re_log_types::{TimePoint, Timeline};
use re_sdk_types::components::Scalar;
use re_sdk_types::{Loggable as _, SerializedComponentBatch, archetypes};
#[test]
fn scalar_memory_overhead() {
re_log::setup_logging();
re_log::warn!(
"THIS TEST HAS TO ACCOUNT FOR THE MEMORY OF ALL RUNNING THREADS -- IT MUST BE RUN ON ITS OWN, WITH NO OTHER TESTS RUNNING IN PARALLEL: `cargo t --all-features -p re_chunk_store memory_tests -- scalar_memory_overhead`"
);
const NUM_SCALARS: usize = 1024 * 1024;
let (total_mem_use_global, _total_mem_use_local) = memory_use(|| {
let mut store = ChunkStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording, "test_app"),
ChunkStoreConfig::default(),
);
let batcher = ChunkBatcher::new(
ChunkBatcherConfig {
flush_num_rows: 1000,
..ChunkBatcherConfig::NEVER
},
BatcherHooks::NONE,
)
.unwrap();
for i in 0..NUM_SCALARS {
let entity_path = re_log_types::entity_path!("scalar");
let timepoint = TimePoint::default().with(Timeline::log_time(), i as i64);
let scalars = Scalar::to_arrow([Scalar::from(i as f64)]).unwrap();
let row = PendingRow::from_iter(
timepoint,
std::iter::once(SerializedComponentBatch::new(
scalars,
archetypes::Scalars::descriptor_scalars(),
)),
);
batcher.push_row(entity_path.clone(), row);
}
let chunks_rx = batcher.chunks();
drop(batcher);
loop {
let chunk = match chunks_rx.try_recv() {
Ok(chunk) => chunk,
Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
Err(TryRecvError::Disconnected) => break,
};
_ = store.insert_chunk(&Arc::new(chunk)).unwrap();
}
store
});
insta::assert_debug_snapshot!(
"scalars_on_one_timeline_new",
[
format!("{NUM_SCALARS} scalars"),
format!(
"{} MiB in total",
(total_mem_use_global as f64 / 1024.0 / 1024.0).round() ),
format!(
"{} per row",
re_format::format_bytes(total_mem_use_global as f64 / NUM_SCALARS as f64)
),
]
);
}