use alloc::collections::BTreeMap;
use alloc::string::{String, ToString};
use alloc::vec::Vec;
use lazy_static::lazy_static;
use spin::Mutex;
use super::alloc::BlockAllocator;
use super::types::{
Alert, AlertType, OvercommitPolicy, PhysicalBlock, PoolConfig, PoolStats, ThinError,
ThinResult, ThresholdLevel, Thresholds, VirtualBlock, VolumeConfig,
};
use super::volume::ThinVolume;
pub type AlertCallback = fn(&Alert);
#[derive(Debug)]
pub struct ThinPool {
id: u64,
name: String,
allocator: BlockAllocator,
volumes: BTreeMap<u64, ThinVolume>,
volume_names: BTreeMap<String, u64>,
next_volume_id: u64,
config: PoolConfig,
stats: PoolStats,
last_level: ThresholdLevel,
alerts: Vec<Alert>,
max_alerts: usize,
alert_callback: Option<AlertCallback>,
}
impl ThinPool {
pub fn new(id: u64, config: PoolConfig) -> Self {
let total_blocks = config.physical_blocks();
let reserved = (total_blocks * config.metadata_reserve_percent as u64) / 100;
let allocator = BlockAllocator::new(total_blocks, config.block_size, reserved);
Self {
id,
name: config.name.clone(),
allocator,
volumes: BTreeMap::new(),
volume_names: BTreeMap::new(),
next_volume_id: 1,
stats: PoolStats::new(config.capacity),
config,
last_level: ThresholdLevel::Normal,
alerts: Vec::new(),
max_alerts: 100,
alert_callback: None,
}
}
pub fn id(&self) -> u64 {
self.id
}
pub fn name(&self) -> &str {
&self.name
}
pub fn config(&self) -> &PoolConfig {
&self.config
}
pub fn stats(&self) -> &PoolStats {
&self.stats
}
pub fn set_alert_callback(&mut self, callback: AlertCallback) {
self.alert_callback = Some(callback);
}
pub fn alerts(&self) -> &[Alert] {
&self.alerts
}
pub fn clear_alerts(&mut self) {
self.alerts.clear();
}
pub fn create_volume(&mut self, config: VolumeConfig) -> ThinResult<u64> {
if self.volume_names.contains_key(&config.name) {
return Err(ThinError::VolumeExists(config.name));
}
if config.reservation > 0 {
let available = self.allocator.free_bytes();
if config.reservation > available {
return Err(ThinError::AllocationFailed);
}
}
let id = self.next_volume_id;
self.next_volume_id += 1;
let name = config.name.clone();
let virtual_size = config.virtual_size;
let volume = ThinVolume::new(id, config);
self.volumes.insert(id, volume);
self.volume_names.insert(name, id);
self.stats.volume_count += 1;
self.stats.total_virtual += virtual_size;
self.update_stats();
Ok(id)
}
pub fn delete_volume(&mut self, id: u64) -> ThinResult<()> {
let volume = self
.volumes
.remove(&id)
.ok_or(ThinError::VolumeNotFound(id.to_string()))?;
if !volume.snapshots().is_empty() {
self.volumes.insert(id, volume);
return Err(ThinError::NotPermitted);
}
self.volume_names.remove(volume.name());
self.stats.volume_count = self.stats.volume_count.saturating_sub(1);
self.stats.total_virtual = self
.stats
.total_virtual
.saturating_sub(volume.virtual_size());
self.stats.physical_used = self
.stats
.physical_used
.saturating_sub(volume.physical_used());
self.update_stats();
Ok(())
}
pub fn get_volume(&self, id: u64) -> Option<&ThinVolume> {
self.volumes.get(&id)
}
pub fn get_volume_mut(&mut self, id: u64) -> Option<&mut ThinVolume> {
self.volumes.get_mut(&id)
}
pub fn get_volume_by_name(&self, name: &str) -> Option<&ThinVolume> {
self.volume_names
.get(name)
.and_then(|id| self.volumes.get(id))
}
pub fn get_volume_by_name_mut(&mut self, name: &str) -> Option<&mut ThinVolume> {
let id = self.volume_names.get(name).copied();
id.and_then(move |id| self.volumes.get_mut(&id))
}
pub fn volume_ids(&self) -> Vec<u64> {
self.volumes.keys().copied().collect()
}
pub fn volume_names(&self) -> Vec<String> {
self.volume_names.keys().cloned().collect()
}
pub fn volume_count(&self) -> usize {
self.volumes.len()
}
pub fn create_snapshot(&mut self, volume_id: u64, name: String) -> ThinResult<u64> {
if self.volume_names.contains_key(&name) {
return Err(ThinError::VolumeExists(name));
}
let snapshot_id = self.next_volume_id;
self.next_volume_id += 1;
let volume = self
.volumes
.get_mut(&volume_id)
.ok_or(ThinError::VolumeNotFound(volume_id.to_string()))?;
let snapshot = volume.create_snapshot(snapshot_id, name.clone());
let virtual_size = snapshot.virtual_size();
self.volumes.insert(snapshot_id, snapshot);
self.volume_names.insert(name, snapshot_id);
self.stats.snapshot_count += 1;
self.stats.total_virtual += virtual_size;
self.update_stats();
Ok(snapshot_id)
}
pub fn allocate_block(
&mut self,
volume_id: u64,
vblock: VirtualBlock,
) -> ThinResult<PhysicalBlock> {
self.check_allocation_allowed()?;
let pblock = self.allocator.allocate()?;
if let Some(volume) = self.volumes.get_mut(&volume_id) {
volume.map_block(vblock, pblock);
}
self.stats.physical_used += self.config.block_size;
self.stats.physical_free = self
.stats
.physical_free
.saturating_sub(self.config.block_size);
self.update_stats();
Ok(pblock)
}
pub fn free_block(&mut self, volume_id: u64, vblock: VirtualBlock) -> Option<PhysicalBlock> {
let pblock = if let Some(volume) = self.volumes.get_mut(&volume_id) {
volume.unmap_block(vblock)
} else {
None
};
if let Some(pb) = pblock {
self.allocator.free(pb);
self.stats.physical_used = self
.stats
.physical_used
.saturating_sub(self.config.block_size);
self.stats.physical_free += self.config.block_size;
self.update_stats();
}
pblock
}
fn check_allocation_allowed(&mut self) -> ThinResult<()> {
let level = self.current_threshold_level();
if !self.config.overcommit_policy.allows_write(level) {
self.raise_alert(
AlertType::PoolFull,
level,
"Pool threshold exceeded, writes blocked",
);
return Err(ThinError::PoolFull);
}
Ok(())
}
pub fn current_threshold_level(&self) -> ThresholdLevel {
let percent = self.allocator.usage_percent();
self.config.thresholds.level(percent)
}
pub fn overcommit_ratio(&self) -> f64 {
if self.stats.total_capacity == 0 {
return 0.0;
}
self.stats.total_virtual as f64 / self.stats.total_capacity as f64
}
pub fn usage_percent(&self) -> u8 {
self.allocator.usage_percent()
}
pub fn free_bytes(&self) -> u64 {
self.allocator.free_bytes()
}
pub fn free_blocks(&self) -> u64 {
self.allocator.free_blocks()
}
fn update_stats(&mut self) {
self.stats.physical_used = (self.allocator.allocated_blocks()) * self.config.block_size;
self.stats.physical_free = self.allocator.free_bytes();
self.stats.update_overcommit();
let current_level = self.current_threshold_level();
if current_level != self.last_level {
if current_level > self.last_level {
self.raise_threshold_alert(current_level);
}
self.last_level = current_level;
self.stats.threshold_level = current_level;
}
}
fn raise_threshold_alert(&mut self, level: ThresholdLevel) {
let percent = self.usage_percent();
let msg = alloc::format!("Pool usage at {}%, entering {} level", percent, level);
self.raise_alert(AlertType::ThresholdCrossed, level, &msg);
}
fn raise_alert(&mut self, alert_type: AlertType, level: ThresholdLevel, message: &str) {
let alert =
Alert::new(alert_type, level, &self.name, message).with_usage(self.usage_percent());
if let Some(callback) = self.alert_callback {
callback(&alert);
}
self.alerts.push(alert);
if self.alerts.len() > self.max_alerts {
self.alerts.remove(0);
}
}
pub fn can_create_volume(&self, virtual_size: u64, reservation: u64) -> bool {
if reservation > 0 {
reservation <= self.allocator.free_bytes()
} else {
true
}
}
pub fn capacity_summary(&self) -> CapacitySummary {
CapacitySummary {
total_physical: self.stats.total_capacity,
used_physical: self.stats.physical_used,
free_physical: self.stats.physical_free,
total_virtual: self.stats.total_virtual,
overcommit_ratio: self.overcommit_ratio(),
usage_percent: self.usage_percent(),
threshold_level: self.current_threshold_level(),
}
}
pub fn trim(&mut self) -> u64 {
self.allocator.free_blocks()
}
}
#[derive(Debug, Clone)]
pub struct CapacitySummary {
pub total_physical: u64,
pub used_physical: u64,
pub free_physical: u64,
pub total_virtual: u64,
pub overcommit_ratio: f64,
pub usage_percent: u8,
pub threshold_level: ThresholdLevel,
}
lazy_static! {
static ref POOLS: Mutex<PoolRegistry> = Mutex::new(PoolRegistry::new());
}
#[derive(Debug)]
struct PoolRegistry {
pools: BTreeMap<u64, ThinPool>,
names: BTreeMap<String, u64>,
next_id: u64,
}
impl PoolRegistry {
fn new() -> Self {
Self {
pools: BTreeMap::new(),
names: BTreeMap::new(),
next_id: 1,
}
}
}
pub fn create_pool(config: PoolConfig) -> ThinResult<u64> {
let mut reg = POOLS.lock();
if reg.names.contains_key(&config.name) {
return Err(ThinError::InvalidConfig("Pool name already exists".into()));
}
let id = reg.next_id;
reg.next_id += 1;
let name = config.name.clone();
let pool = ThinPool::new(id, config);
reg.pools.insert(id, pool);
reg.names.insert(name, id);
Ok(id)
}
pub fn delete_pool(id: u64) -> ThinResult<()> {
let mut reg = POOLS.lock();
let pool = reg
.pools
.remove(&id)
.ok_or(ThinError::PoolNotFound(id.to_string()))?;
if pool.volume_count() > 0 {
reg.pools.insert(id, pool);
return Err(ThinError::NotPermitted);
}
reg.names.remove(pool.name());
Ok(())
}
pub fn pool_exists(id: u64) -> bool {
POOLS.lock().pools.contains_key(&id)
}
pub fn pool_exists_by_name(name: &str) -> bool {
POOLS.lock().names.contains_key(name)
}
pub fn get_pool_name(id: u64) -> Option<String> {
POOLS.lock().pools.get(&id).map(|p| p.name().to_string())
}
pub fn get_pool_id(name: &str) -> Option<u64> {
POOLS.lock().names.get(name).copied()
}
pub fn with_pool<F, R>(id: u64, f: F) -> ThinResult<R>
where
F: FnOnce(&mut ThinPool) -> R,
{
let mut reg = POOLS.lock();
let pool = reg
.pools
.get_mut(&id)
.ok_or(ThinError::PoolNotFound(id.to_string()))?;
Ok(f(pool))
}
pub fn list_pools() -> Vec<u64> {
POOLS.lock().pools.keys().copied().collect()
}
pub fn pool_count() -> usize {
POOLS.lock().pools.len()
}
pub fn clear_pools() {
let mut reg = POOLS.lock();
reg.pools.clear();
reg.names.clear();
}
#[cfg(test)]
mod tests {
use super::*;
fn setup() {
clear_pools();
}
fn create_test_pool() -> ThinPool {
let config = PoolConfig::new("test-pool", 100 * 1024 * 1024 * 1024); ThinPool::new(1, config)
}
#[test]
fn test_pool_creation() {
let pool = create_test_pool();
assert_eq!(pool.id(), 1);
assert_eq!(pool.name(), "test-pool");
assert_eq!(pool.volume_count(), 0);
}
#[test]
fn test_volume_creation() {
let mut pool = create_test_pool();
let config = VolumeConfig::new("vol1", 10 * 1024 * 1024 * 1024); let id = pool.create_volume(config).unwrap();
assert_eq!(pool.volume_count(), 1);
assert!(pool.get_volume(id).is_some());
assert!(pool.get_volume_by_name("vol1").is_some());
}
#[test]
fn test_volume_creation_duplicate_name() {
let mut pool = create_test_pool();
let config1 = VolumeConfig::new("vol1", 10 * 1024 * 1024 * 1024);
pool.create_volume(config1).unwrap();
let config2 = VolumeConfig::new("vol1", 5 * 1024 * 1024 * 1024);
let result = pool.create_volume(config2);
assert!(matches!(result, Err(ThinError::VolumeExists(_))));
}
#[test]
fn test_volume_deletion() {
let mut pool = create_test_pool();
let config = VolumeConfig::new("vol1", 10 * 1024 * 1024 * 1024);
let id = pool.create_volume(config).unwrap();
pool.delete_volume(id).unwrap();
assert_eq!(pool.volume_count(), 0);
}
#[test]
fn test_snapshot_creation() {
let mut pool = create_test_pool();
let config = VolumeConfig::new("vol1", 10 * 1024 * 1024 * 1024);
let vol_id = pool.create_volume(config).unwrap();
let snap_id = pool.create_snapshot(vol_id, "snap1".into()).unwrap();
assert_eq!(pool.volume_count(), 2);
assert!(pool.get_volume(snap_id).unwrap().is_snapshot());
assert_eq!(pool.stats().snapshot_count, 1);
}
#[test]
fn test_block_allocation() {
let mut pool = create_test_pool();
let config = VolumeConfig::new("vol1", 10 * 1024 * 1024 * 1024);
let vol_id = pool.create_volume(config).unwrap();
let vblock = VirtualBlock::new(100);
let pblock = pool.allocate_block(vol_id, vblock).unwrap();
assert!(pblock.0 > 0);
assert!(pool.get_volume(vol_id).unwrap().is_allocated(vblock));
}
#[test]
fn test_block_freeing() {
let mut pool = create_test_pool();
let config = VolumeConfig::new("vol1", 10 * 1024 * 1024 * 1024);
let vol_id = pool.create_volume(config).unwrap();
let vblock = VirtualBlock::new(100);
pool.allocate_block(vol_id, vblock).unwrap();
let initial_free = pool.free_blocks();
pool.free_block(vol_id, vblock);
assert_eq!(pool.free_blocks(), initial_free + 1);
}
#[test]
fn test_overcommit_ratio() {
let mut pool = create_test_pool();
for i in 0..5 {
let config = VolumeConfig::new(alloc::format!("vol{}", i), 100 * 1024 * 1024 * 1024);
pool.create_volume(config).unwrap();
}
let ratio = pool.overcommit_ratio();
assert!(ratio > 4.0 && ratio < 6.0);
}
#[test]
fn test_threshold_levels() {
let config = PoolConfig::new("test", 1000 * 128 * 1024) .with_thresholds(Thresholds::new(80, 90, 95));
let mut pool = ThinPool::new(1, config);
assert_eq!(pool.current_threshold_level(), ThresholdLevel::Normal);
let vol_config = VolumeConfig::new("vol1", 1024 * 1024 * 1024);
let vol_id = pool.create_volume(vol_config).unwrap();
for i in 0..800 {
let _ = pool.allocate_block(vol_id, VirtualBlock::new(i));
}
let level = pool.current_threshold_level();
assert!(level >= ThresholdLevel::Warning);
}
#[test]
fn test_capacity_summary() {
let mut pool = create_test_pool();
let config = VolumeConfig::new("vol1", 50 * 1024 * 1024 * 1024);
pool.create_volume(config).unwrap();
let summary = pool.capacity_summary();
assert!(summary.total_physical > 0);
assert!(summary.total_virtual > 0);
assert!(summary.overcommit_ratio > 0.0);
}
#[test]
fn test_global_pool_registry() {
setup();
let config = PoolConfig::new("global-test", 10 * 1024 * 1024 * 1024);
let id = create_pool(config).unwrap();
assert!(pool_exists(id));
assert!(pool_exists_by_name("global-test"));
assert_eq!(pool_count(), 1);
delete_pool(id).unwrap();
assert_eq!(pool_count(), 0);
}
#[test]
fn test_with_pool() {
setup();
let config = PoolConfig::new("with-pool-test", 10 * 1024 * 1024 * 1024);
let id = create_pool(config).unwrap();
let result = with_pool(id, |pool| pool.volume_count()).unwrap();
assert_eq!(result, 0);
}
}