use crate::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
use datafusion_common::{DataFusionError, Result};
use log::debug;
use parking_lot::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug, Default)]
pub struct UnboundedMemoryPool {
    used: AtomicUsize,
}
impl MemoryPool for UnboundedMemoryPool {
    fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
        self.used.fetch_add(additional, Ordering::Relaxed);
    }
    fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
        self.used.fetch_sub(shrink, Ordering::Relaxed);
    }
    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
        self.grow(reservation, additional);
        Ok(())
    }
    fn reserved(&self) -> usize {
        self.used.load(Ordering::Relaxed)
    }
}
#[derive(Debug)]
pub struct GreedyMemoryPool {
    pool_size: usize,
    used: AtomicUsize,
}
impl GreedyMemoryPool {
    pub fn new(pool_size: usize) -> Self {
        debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
        Self {
            pool_size,
            used: AtomicUsize::new(0),
        }
    }
}
impl MemoryPool for GreedyMemoryPool {
    fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
        self.used.fetch_add(additional, Ordering::Relaxed);
    }
    fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
        self.used.fetch_sub(shrink, Ordering::Relaxed);
    }
    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
        self.used
            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
                let new_used = used + additional;
                (new_used <= self.pool_size).then_some(new_used)
            })
            .map_err(|used| {
                insufficient_capacity_err(
                    reservation,
                    additional,
                    self.pool_size.saturating_sub(used),
                )
            })?;
        Ok(())
    }
    fn reserved(&self) -> usize {
        self.used.load(Ordering::Relaxed)
    }
}
#[derive(Debug)]
pub struct FairSpillPool {
    pool_size: usize,
    state: Mutex<FairSpillPoolState>,
}
#[derive(Debug)]
struct FairSpillPoolState {
    num_spill: usize,
    spillable: usize,
    unspillable: usize,
}
impl FairSpillPool {
    pub fn new(pool_size: usize) -> Self {
        debug!("Created new FairSpillPool(pool_size={pool_size})");
        Self {
            pool_size,
            state: Mutex::new(FairSpillPoolState {
                num_spill: 0,
                spillable: 0,
                unspillable: 0,
            }),
        }
    }
}
impl MemoryPool for FairSpillPool {
    fn register(&self, consumer: &MemoryConsumer) {
        if consumer.can_spill {
            self.state.lock().num_spill += 1;
        }
    }
    fn unregister(&self, consumer: &MemoryConsumer) {
        if consumer.can_spill {
            let mut state = self.state.lock();
            state.num_spill = state.num_spill.checked_sub(1).unwrap();
        }
    }
    fn grow(&self, reservation: &MemoryReservation, additional: usize) {
        let mut state = self.state.lock();
        match reservation.registration.consumer.can_spill {
            true => state.spillable += additional,
            false => state.unspillable += additional,
        }
    }
    fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
        let mut state = self.state.lock();
        match reservation.registration.consumer.can_spill {
            true => state.spillable -= shrink,
            false => state.unspillable -= shrink,
        }
    }
    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
        let mut state = self.state.lock();
        match reservation.registration.consumer.can_spill {
            true => {
                let spill_available = self.pool_size.saturating_sub(state.unspillable);
                let available = spill_available
                    .checked_div(state.num_spill)
                    .unwrap_or(spill_available);
                if reservation.size + additional > available {
                    return Err(insufficient_capacity_err(
                        reservation,
                        additional,
                        available,
                    ));
                }
                state.spillable += additional;
            }
            false => {
                let available = self
                    .pool_size
                    .saturating_sub(state.unspillable + state.spillable);
                if available < additional {
                    return Err(insufficient_capacity_err(
                        reservation,
                        additional,
                        available,
                    ));
                }
                state.unspillable += additional;
            }
        }
        Ok(())
    }
    fn reserved(&self) -> usize {
        let state = self.state.lock();
        state.spillable + state.unspillable
    }
}
fn insufficient_capacity_err(
    reservation: &MemoryReservation,
    additional: usize,
    available: usize,
) -> DataFusionError {
    DataFusionError::ResourcesExhausted(format!("Failed to allocate additional {} bytes for {} with {} bytes already allocated - maximum available is {}", additional, reservation.registration.consumer.name, reservation.size, available))
}
#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::Arc;
    #[test]
    fn test_fair() {
        let pool = Arc::new(FairSpillPool::new(100)) as _;
        let mut r1 = MemoryConsumer::new("unspillable").register(&pool);
        r1.grow(2000);
        assert_eq!(pool.reserved(), 2000);
        let mut r2 = MemoryConsumer::new("r2")
            .with_can_spill(true)
            .register(&pool);
        r2.grow(2000);
        assert_eq!(pool.reserved(), 4000);
        let err = r2.try_grow(1).unwrap_err().strip_backtrace();
        assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated - maximum available is 0");
        let err = r2.try_grow(1).unwrap_err().strip_backtrace();
        assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated - maximum available is 0");
        r1.shrink(1990);
        r2.shrink(2000);
        assert_eq!(pool.reserved(), 10);
        r1.try_grow(10).unwrap();
        assert_eq!(pool.reserved(), 20);
        r2.try_grow(80).unwrap();
        assert_eq!(pool.reserved(), 100);
        r2.shrink(70);
        assert_eq!(r1.size(), 20);
        assert_eq!(r2.size(), 10);
        assert_eq!(pool.reserved(), 30);
        let mut r3 = MemoryConsumer::new("r3")
            .with_can_spill(true)
            .register(&pool);
        let err = r3.try_grow(70).unwrap_err().strip_backtrace();
        assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated - maximum available is 40");
        r2.free();
        let err = r3.try_grow(70).unwrap_err().strip_backtrace();
        assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated - maximum available is 40");
        drop(r2);
        assert_eq!(pool.reserved(), 20);
        r3.try_grow(80).unwrap();
        assert_eq!(pool.reserved(), 100);
        r1.free();
        assert_eq!(pool.reserved(), 80);
        let mut r4 = MemoryConsumer::new("s4").register(&pool);
        let err = r4.try_grow(30).unwrap_err().strip_backtrace();
        assert_eq!(err, "Resources exhausted: Failed to allocate additional 30 bytes for s4 with 0 bytes already allocated - maximum available is 20");
    }
}