use crate::memory_pool::{
MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation, human_readable_size,
};
use datafusion_common::HashMap;
use datafusion_common::{DataFusionError, Result, resources_datafusion_err};
use log::debug;
use parking_lot::Mutex;
use std::fmt::{Display, Formatter};
use std::{
num::NonZeroUsize,
sync::atomic::{AtomicUsize, Ordering},
};
#[derive(Debug, Default)]
pub struct UnboundedMemoryPool {
used: AtomicUsize,
}
impl MemoryPool for UnboundedMemoryPool {
fn name(&self) -> &str {
"unbounded"
}
fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
self.used.fetch_add(additional, Ordering::Relaxed);
}
fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
self.used.fetch_sub(shrink, Ordering::Relaxed);
}
fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
self.grow(reservation, additional);
Ok(())
}
fn reserved(&self) -> usize {
self.used.load(Ordering::Relaxed)
}
fn memory_limit(&self) -> MemoryLimit {
MemoryLimit::Infinite
}
}
impl Display for UnboundedMemoryPool {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let used = self.used.load(Ordering::Relaxed);
write!(f, "{}(used: {})", &self.name(), human_readable_size(used))
}
}
#[derive(Debug)]
pub struct GreedyMemoryPool {
pool_size: usize,
used: AtomicUsize,
}
impl GreedyMemoryPool {
pub fn new(pool_size: usize) -> Self {
debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
Self {
pool_size,
used: AtomicUsize::new(0),
}
}
}
impl MemoryPool for GreedyMemoryPool {
fn name(&self) -> &str {
"greedy"
}
fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
self.used.fetch_add(additional, Ordering::Relaxed);
}
fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
self.used.fetch_sub(shrink, Ordering::Relaxed);
}
fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
self.used
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
let new_used = used + additional;
(new_used <= self.pool_size).then_some(new_used)
})
.map_err(|used| {
insufficient_capacity_err(
reservation,
additional,
self.pool_size.saturating_sub(used),
self,
)
})?;
Ok(())
}
fn reserved(&self) -> usize {
self.used.load(Ordering::Relaxed)
}
fn memory_limit(&self) -> MemoryLimit {
MemoryLimit::Finite(self.pool_size)
}
}
impl Display for GreedyMemoryPool {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let used = self.used.load(Ordering::Relaxed);
write!(
f,
"{}(used: {}, pool_size: {})",
&self.name(),
human_readable_size(used),
human_readable_size(self.pool_size)
)
}
}
#[derive(Debug)]
pub struct FairSpillPool {
pool_size: usize,
state: Mutex<FairSpillPoolState>,
}
#[derive(Debug)]
struct FairSpillPoolState {
num_spill: usize,
spillable: usize,
unspillable: usize,
}
impl FairSpillPool {
pub fn new(pool_size: usize) -> Self {
debug!("Created new FairSpillPool(pool_size={pool_size})");
Self {
pool_size,
state: Mutex::new(FairSpillPoolState {
num_spill: 0,
spillable: 0,
unspillable: 0,
}),
}
}
}
impl MemoryPool for FairSpillPool {
fn name(&self) -> &str {
"fair"
}
fn register(&self, consumer: &MemoryConsumer) {
if consumer.can_spill {
self.state.lock().num_spill += 1;
}
}
fn unregister(&self, consumer: &MemoryConsumer) {
if consumer.can_spill {
let mut state = self.state.lock();
state.num_spill = state.num_spill.checked_sub(1).unwrap();
}
}
fn grow(&self, reservation: &MemoryReservation, additional: usize) {
let mut state = self.state.lock();
match reservation.registration.consumer.can_spill {
true => state.spillable += additional,
false => state.unspillable += additional,
}
}
fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
let mut state = self.state.lock();
match reservation.registration.consumer.can_spill {
true => state.spillable -= shrink,
false => state.unspillable -= shrink,
}
}
fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
let mut state = self.state.lock();
match reservation.registration.consumer.can_spill {
true => {
let spill_available = self.pool_size.saturating_sub(state.unspillable);
let available = spill_available
.checked_div(state.num_spill)
.unwrap_or(spill_available);
if reservation.size() + additional > available {
return Err(insufficient_capacity_err(
reservation,
additional,
available,
self,
));
}
state.spillable += additional;
}
false => {
let available = self
.pool_size
.saturating_sub(state.unspillable + state.spillable);
if available < additional {
return Err(insufficient_capacity_err(
reservation,
additional,
available,
self,
));
}
state.unspillable += additional;
}
}
Ok(())
}
fn reserved(&self) -> usize {
let state = self.state.lock();
state.spillable + state.unspillable
}
fn memory_limit(&self) -> MemoryLimit {
MemoryLimit::Finite(self.pool_size)
}
}
impl Display for FairSpillPool {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}(pool_size: {})",
&self.name(),
human_readable_size(self.pool_size),
)
}
}
#[inline(always)]
fn insufficient_capacity_err(
reservation: &MemoryReservation,
additional: usize,
available: usize,
pool: &impl MemoryPool,
) -> DataFusionError {
resources_datafusion_err!(
"Failed to allocate additional {} for {} with {} already allocated for this reservation - {} remain available for the total memory pool: {}",
human_readable_size(additional),
reservation.registration.consumer.name,
human_readable_size(reservation.size()),
human_readable_size(available),
pool
)
}
#[derive(Debug)]
struct TrackedConsumer {
name: String,
can_spill: bool,
reserved: AtomicUsize,
peak: AtomicUsize,
}
impl TrackedConsumer {
fn reserved(&self) -> usize {
self.reserved.load(Ordering::Relaxed)
}
fn peak(&self) -> usize {
self.peak.load(Ordering::Relaxed)
}
fn grow(&self, additional: usize) {
self.reserved.fetch_add(additional, Ordering::Relaxed);
self.peak.fetch_max(self.reserved(), Ordering::Relaxed);
}
fn shrink(&self, shrink: usize) {
self.reserved.fetch_sub(shrink, Ordering::Relaxed);
}
}
#[derive(Debug, Clone)]
pub struct MemoryConsumerMetrics {
pub name: String,
pub can_spill: bool,
pub reserved: usize,
pub peak: usize,
}
impl From<&TrackedConsumer> for MemoryConsumerMetrics {
fn from(tracked: &TrackedConsumer) -> Self {
Self {
name: tracked.name.clone(),
can_spill: tracked.can_spill,
reserved: tracked.reserved(),
peak: tracked.peak(),
}
}
}
#[derive(Debug)]
pub struct TrackConsumersPool<I> {
inner: I,
top: NonZeroUsize,
tracked_consumers: Mutex<HashMap<usize, TrackedConsumer>>,
}
impl<I: MemoryPool> Display for TrackConsumersPool<I> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}(inner_pool: {}, num_of_top_consumers: {})",
&self.name(),
&self.inner,
&self.top,
)
}
}
impl<I: MemoryPool> TrackConsumersPool<I> {
pub fn new(inner: I, top: NonZeroUsize) -> Self {
Self {
inner,
top,
tracked_consumers: Default::default(),
}
}
pub fn inner(&self) -> &I {
&self.inner
}
pub fn metrics(&self) -> Vec<MemoryConsumerMetrics> {
self.tracked_consumers
.lock()
.values()
.map(Into::into)
.collect()
}
pub fn report_top(&self, top: usize) -> String {
let mut consumers = self
.tracked_consumers
.lock()
.iter()
.map(|(consumer_id, tracked_consumer)| {
(
(
*consumer_id,
tracked_consumer.name.to_owned(),
tracked_consumer.can_spill,
tracked_consumer.peak(),
),
tracked_consumer.reserved(),
)
})
.collect::<Vec<_>>();
consumers.sort_by_key(|consumer| std::cmp::Reverse(consumer.1));
consumers[0..std::cmp::min(top, consumers.len())]
.iter()
.map(|((id, name, can_spill, peak), size)| {
format!(
" {name}#{id}(can spill: {can_spill}) consumed {}, peak {}",
human_readable_size(*size),
human_readable_size(*peak),
)
})
.collect::<Vec<_>>()
.join(",\n")
+ "."
}
}
impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
fn name(&self) -> &str {
"track_consumers"
}
fn register(&self, consumer: &MemoryConsumer) {
self.inner.register(consumer);
let mut guard = self.tracked_consumers.lock();
let existing = guard.insert(
consumer.id(),
TrackedConsumer {
name: consumer.name().to_string(),
can_spill: consumer.can_spill(),
reserved: Default::default(),
peak: Default::default(),
},
);
debug_assert!(
existing.is_none(),
"Registered was called twice on the same consumer"
);
}
fn unregister(&self, consumer: &MemoryConsumer) {
self.inner.unregister(consumer);
self.tracked_consumers.lock().remove(&consumer.id());
}
fn grow(&self, reservation: &MemoryReservation, additional: usize) {
self.inner.grow(reservation, additional);
self.tracked_consumers
.lock()
.entry(reservation.consumer().id())
.and_modify(|tracked_consumer| {
tracked_consumer.grow(additional);
});
}
fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
self.inner.shrink(reservation, shrink);
self.tracked_consumers
.lock()
.entry(reservation.consumer().id())
.and_modify(|tracked_consumer| {
tracked_consumer.shrink(shrink);
});
}
fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
self.inner
.try_grow(reservation, additional)
.map_err(|e| match e {
DataFusionError::ResourcesExhausted(e) => {
DataFusionError::ResourcesExhausted(
provide_top_memory_consumers_to_error_msg(
&reservation.consumer().name,
&e,
&self.report_top(self.top.into()),
),
)
}
_ => e,
})?;
self.tracked_consumers
.lock()
.entry(reservation.consumer().id())
.and_modify(|tracked_consumer| {
tracked_consumer.grow(additional);
});
Ok(())
}
fn reserved(&self) -> usize {
self.inner.reserved()
}
fn memory_limit(&self) -> MemoryLimit {
self.inner.memory_limit()
}
}
fn provide_top_memory_consumers_to_error_msg(
consumer_name: &str,
error_msg: &str,
top_consumers: &str,
) -> String {
format!(
"Additional allocation failed for {consumer_name} with top memory consumers (across reservations) as:\n{top_consumers}\nError: {error_msg}"
)
}
#[cfg(test)]
mod tests {
use super::*;
use insta::{Settings, allow_duplicates, assert_snapshot, with_settings};
use std::sync::Arc;
fn make_settings() -> Settings {
let mut settings = Settings::clone_current();
settings.add_filter(
r"([^\s]+)\#\d+\(can spill: (true|false)\)",
"$1#[ID](can spill: $2)",
);
settings
}
#[test]
fn test_fair() {
let pool = Arc::new(FairSpillPool::new(100)) as _;
let r1 = MemoryConsumer::new("unspillable").register(&pool);
r1.grow(2000);
assert_eq!(pool.reserved(), 2000);
let r2 = MemoryConsumer::new("r2")
.with_can_spill(true)
.register(&pool);
r2.grow(2000);
assert_eq!(pool.reserved(), 4000);
let err = r2.try_grow(1).unwrap_err().strip_backtrace();
assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)");
let err = r2.try_grow(1).unwrap_err().strip_backtrace();
assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)");
r1.shrink(1990);
r2.shrink(2000);
assert_eq!(pool.reserved(), 10);
r1.try_grow(10).unwrap();
assert_eq!(pool.reserved(), 20);
r2.try_grow(80).unwrap();
assert_eq!(pool.reserved(), 100);
r2.shrink(70);
assert_eq!(r1.size(), 20);
assert_eq!(r2.size(), 10);
assert_eq!(pool.reserved(), 30);
let r3 = MemoryConsumer::new("r3")
.with_can_spill(true)
.register(&pool);
let err = r3.try_grow(70).unwrap_err().strip_backtrace();
assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)");
r2.free();
let err = r3.try_grow(70).unwrap_err().strip_backtrace();
assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)");
drop(r2);
assert_eq!(pool.reserved(), 20);
r3.try_grow(80).unwrap();
assert_eq!(pool.reserved(), 100);
r1.free();
assert_eq!(pool.reserved(), 80);
let r4 = MemoryConsumer::new("s4").register(&pool);
let err = r4.try_grow(30).unwrap_err().strip_backtrace();
assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 30.0 B for s4 with 0.0 B already allocated for this reservation - 20.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)");
}
#[test]
fn test_tracked_consumers_pool() {
let setting = make_settings();
let _bound = setting.bind_to_scope();
let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(100),
NonZeroUsize::new(3).unwrap(),
));
let r1 = MemoryConsumer::new("r1").register(&pool);
r1.grow(50);
r1.grow(20);
r1.shrink(20);
let r2 = MemoryConsumer::new("r2").register(&pool);
r2.try_grow(15)
.expect("should succeed in memory allotment for r2");
let r3 = MemoryConsumer::new("r3").register(&pool);
r3.try_resize(25)
.expect("should succeed in memory allotment for r3");
r3.try_resize(20)
.expect("should succeed in memory allotment for r3");
let r4 = MemoryConsumer::new("r4").register(&pool);
r4.grow(10);
let r5 = MemoryConsumer::new("r5").register(&pool);
let res = r5.try_grow(150);
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed for r5 with top memory consumers (across reservations) as:
r1#[ID](can spill: false) consumed 50.0 B, peak 70.0 B,
r3#[ID](can spill: false) consumed 20.0 B, peak 25.0 B,
r2#[ID](can spill: false) consumed 15.0 B, peak 15.0 B.
Error: Failed to allocate additional 150.0 B for r5 with 0.0 B already allocated for this reservation - 5.0 B remain available for the total memory pool: greedy(used: 95.0 B, pool_size: 100.0 B)
");
}
#[test]
fn test_tracked_consumers_pool_register() {
let setting = make_settings();
let _bound = setting.bind_to_scope();
let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(100),
NonZeroUsize::new(3).unwrap(),
));
let same_name = "foo";
let r0 = MemoryConsumer::new(same_name).register(&pool);
let res = r0.try_grow(150);
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B.
Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 100.0 B remain available for the total memory pool: greedy(used: 0.0 B, pool_size: 100.0 B)
");
r0.grow(10); let new_consumer_same_name = MemoryConsumer::new(same_name);
let r1 = new_consumer_same_name.register(&pool);
let res = r1.try_grow(150);
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B,
foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B.
Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: greedy(used: 10.0 B, pool_size: 100.0 B)
");
r1.grow(20);
let res = r1.try_grow(150);
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
Error: Failed to allocate additional 150.0 B for foo with 20.0 B already allocated for this reservation - 70.0 B remain available for the total memory pool: greedy(used: 30.0 B, pool_size: 100.0 B)
");
let consumer_with_same_name_but_different_hash =
MemoryConsumer::new(same_name).with_can_spill(true);
let r2 = consumer_with_same_name_but_different_hash.register(&pool);
let res = r2.try_grow(150);
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B,
foo#[ID](can spill: true) consumed 0.0 B, peak 0.0 B.
Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 70.0 B remain available for the total memory pool: greedy(used: 30.0 B, pool_size: 100.0 B)
");
}
#[test]
fn test_tracked_consumers_pool_deregister() {
fn test_per_pool_type<P: MemoryPool + 'static>(pool: Arc<TrackConsumersPool<P>>) {
with_settings!({
snapshot_suffix => pool.inner().name().to_string(),
filters => vec![
(
r"([^\s]+)\#\d+\(can spill: (true|false)\)",
"$1#[ID](can spill: $2)",
),
(
r"for the total memory pool: [^\n]+",
"for the total memory pool: [INNER_POOL]",
),
],
}, {
let memory_pool: Arc<dyn MemoryPool> = Arc::<TrackConsumersPool<P>>::clone(&pool);
let r0 = MemoryConsumer::new("r0").register(&memory_pool);
r0.grow(10);
let r1 = MemoryConsumer::new("r1").register(&memory_pool);
r1.grow(20);
let error = r0.try_grow(150).unwrap_err().strip_backtrace();
assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 70.0 B remain available for the total memory pool: [INNER_POOL]
");
drop(r1);
let error = r0.try_grow(150).unwrap_err().strip_backtrace();
assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: [INNER_POOL]
");
let error = r0.try_grow(150).unwrap_err().strip_backtrace();
assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: [INNER_POOL]
");
let error = r0.try_grow(150).unwrap_err().strip_backtrace();
assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: [INNER_POOL]
");
}
);
}
allow_duplicates! {
let tracked_spill_pool = Arc::new(TrackConsumersPool::new(
FairSpillPool::new(100),
NonZeroUsize::new(3).unwrap(),
));
test_per_pool_type(tracked_spill_pool);
let tracked_greedy_pool = Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(100),
NonZeroUsize::new(3).unwrap(),
));
test_per_pool_type(tracked_greedy_pool);
}
}
#[test]
fn test_track_consumers_pool_metrics() {
let track_consumers_pool = Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(1000),
NonZeroUsize::new(3).unwrap(),
));
let memory_pool: Arc<dyn MemoryPool> = Arc::clone(&track_consumers_pool) as _;
assert!(track_consumers_pool.metrics().is_empty());
let r1 = MemoryConsumer::new("spilling")
.with_can_spill(true)
.register(&memory_pool);
let r2 = MemoryConsumer::new("non-spilling").register(&memory_pool);
r1.grow(100);
r1.grow(50);
r1.shrink(50);
r2.grow(200);
let mut metrics = track_consumers_pool.metrics();
metrics.sort_by_key(|m| m.name.clone());
assert_eq!(metrics.len(), 2);
let m_non = &metrics[0];
assert_eq!(m_non.name, "non-spilling");
assert!(!m_non.can_spill);
assert_eq!(m_non.reserved, 200);
assert_eq!(m_non.peak, 200);
let m_spill = &metrics[1];
assert_eq!(m_spill.name, "spilling");
assert!(m_spill.can_spill);
assert_eq!(m_spill.reserved, 100);
assert_eq!(m_spill.peak, 150);
drop(r2);
let metrics = track_consumers_pool.metrics();
assert_eq!(metrics.len(), 1);
assert_eq!(metrics[0].name, "spilling");
}
#[test]
fn test_tracked_consumers_pool_use_beyond_errors() {
let setting = make_settings();
let _bound = setting.bind_to_scope();
let upcasted: Arc<dyn std::any::Any + Send + Sync> =
Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(100),
NonZeroUsize::new(3).unwrap(),
));
let pool: Arc<dyn MemoryPool> = Arc::clone(&upcasted)
.downcast::<TrackConsumersPool<GreedyMemoryPool>>()
.unwrap();
let r1 = MemoryConsumer::new("r1").register(&pool);
r1.grow(20);
let r2 = MemoryConsumer::new("r2").register(&pool);
r2.grow(15);
let r3 = MemoryConsumer::new("r3").register(&pool);
r3.grow(45);
let downcasted = upcasted
.downcast::<TrackConsumersPool<GreedyMemoryPool>>()
.unwrap();
let res = downcasted.report_top(2);
assert_snapshot!(res, @r"
r3#[ID](can spill: false) consumed 45.0 B, peak 45.0 B,
r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B.
");
}
#[test]
fn test_memory_pool_display_fmt() {
let top = NonZeroUsize::new(5).unwrap();
let unbounded = UnboundedMemoryPool::default();
assert_eq!(
unbounded.to_string(),
"unbounded(used: 0.0 B)",
"UnboundedMemoryPool Display"
);
let unbounded_arc: Arc<dyn MemoryPool> = Arc::new(UnboundedMemoryPool::default());
let r = MemoryConsumer::new("u").register(&unbounded_arc);
r.grow(2048);
assert_eq!(
unbounded_arc.as_ref().to_string(),
"unbounded(used: 2.0 KB)",
"UnboundedMemoryPool Display with reservations"
);
let greedy = GreedyMemoryPool::new(100);
assert_eq!(
greedy.to_string(),
"greedy(used: 0.0 B, pool_size: 100.0 B)",
"GreedyMemoryPool Display"
);
let greedy_arc: Arc<dyn MemoryPool> = Arc::new(GreedyMemoryPool::new(100));
let r = MemoryConsumer::new("g").register(&greedy_arc);
r.grow(50);
assert_eq!(
greedy_arc.as_ref().to_string(),
"greedy(used: 50.0 B, pool_size: 100.0 B)",
"GreedyMemoryPool Display with reservations"
);
let fair = FairSpillPool::new(4096);
assert_eq!(
fair.to_string(),
"fair(pool_size: 4.0 KB)",
"FairSpillPool Display"
);
let tracked_greedy = TrackConsumersPool::new(GreedyMemoryPool::new(128), top);
assert_eq!(
tracked_greedy.to_string(),
"track_consumers(inner_pool: greedy(used: 0.0 B, pool_size: 128.0 B), num_of_top_consumers: 5)",
"TrackConsumersPool<GreedyMemoryPool> Display"
);
let tracked_fair = TrackConsumersPool::new(FairSpillPool::new(256), top);
assert_eq!(
tracked_fair.to_string(),
"track_consumers(inner_pool: fair(pool_size: 256.0 B), num_of_top_consumers: 5)",
"TrackConsumersPool<FairSpillPool> Display"
);
let tracked_unbounded =
TrackConsumersPool::new(UnboundedMemoryPool::default(), top);
assert_eq!(
tracked_unbounded.to_string(),
"track_consumers(inner_pool: unbounded(used: 0.0 B), num_of_top_consumers: 5)",
"TrackConsumersPool<UnboundedMemoryPool> Display"
);
}
}