use crate::{
error::{AppError, AppResult},
types::{ResourceCapacity, RuntimeResourcesResponse, TaskResourceReservation},
};
#[derive(Debug, Clone)]
pub struct ResourceLedger {
capacity: ResourceCapacity,
}
impl ResourceLedger {
pub fn new(capacity: ResourceCapacity) -> Self {
Self { capacity }
}
pub fn capacity(&self) -> &ResourceCapacity {
&self.capacity
}
pub fn ensure_within_capacity(&self, reservation: &TaskResourceReservation) -> AppResult<()> {
if reservation.task_slots > self.capacity.task_slots {
return Err(AppError::InsufficientResources(format!(
"task requires {} task slots but runtime capacity is {}",
reservation.task_slots, self.capacity.task_slots
)));
}
if let (Some(requested), Some(capacity)) =
(reservation.memory_bytes, self.capacity.memory_bytes)
{
if requested > capacity {
return Err(AppError::InsufficientResources(format!(
"task requires {requested} memory_bytes but runtime capacity is {capacity}"
)));
}
}
if let (Some(requested), Some(capacity)) = (reservation.pids, self.capacity.pids) {
if requested > capacity {
return Err(AppError::InsufficientResources(format!(
"task requires {requested} pids but runtime capacity is {capacity}"
)));
}
}
Ok(())
}
pub fn can_reserve(
&self,
currently_reserved: &ResourceCapacity,
reservation: &TaskResourceReservation,
) -> bool {
if currently_reserved
.task_slots
.saturating_add(reservation.task_slots)
> self.capacity.task_slots
{
return false;
}
if let (Some(reserved), Some(requested), Some(capacity)) = (
currently_reserved.memory_bytes,
reservation.memory_bytes,
self.capacity.memory_bytes,
) {
if reserved.saturating_add(requested) > capacity {
return false;
}
}
if let (Some(reserved), Some(requested), Some(capacity)) = (
currently_reserved.pids,
reservation.pids,
self.capacity.pids,
) {
if reserved.saturating_add(requested) > capacity {
return false;
}
}
true
}
pub fn reserved_capacity<'a, I>(&self, reservations: I) -> ResourceCapacity
where
I: IntoIterator<Item = &'a TaskResourceReservation>,
{
let mut task_slots = 0u64;
let mut memory = if self.capacity.memory_bytes.is_some() {
Some(0u64)
} else {
None
};
let mut pids = if self.capacity.pids.is_some() {
Some(0u64)
} else {
None
};
for reservation in reservations {
task_slots = task_slots.saturating_add(reservation.task_slots);
if let Some(value) = reservation.memory_bytes {
let current = memory.unwrap_or(0);
memory = Some(current.saturating_add(value));
}
if let Some(value) = reservation.pids {
let current = pids.unwrap_or(0);
pids = Some(current.saturating_add(value));
}
}
ResourceCapacity {
task_slots,
memory_bytes: memory,
pids,
}
}
pub fn available_capacity(&self, reserved: &ResourceCapacity) -> ResourceCapacity {
ResourceCapacity {
task_slots: self.capacity.task_slots.saturating_sub(reserved.task_slots),
memory_bytes: self
.capacity
.memory_bytes
.map(|capacity| capacity.saturating_sub(reserved.memory_bytes.unwrap_or(0))),
pids: self
.capacity
.pids
.map(|capacity| capacity.saturating_sub(reserved.pids.unwrap_or(0))),
}
}
pub fn empty_snapshot(&self, runtime_id: String) -> RuntimeResourcesResponse {
let reserved = self.reserved_capacity(std::iter::empty::<&TaskResourceReservation>());
RuntimeResourcesResponse {
runtime_id,
capacity: self.capacity.clone(),
available: self.available_capacity(&reserved),
reserved,
active_reservations: Vec::new(),
accepted_waiting_tasks: 0,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn detects_capacity_overflow() {
let ledger = ResourceLedger::new(ResourceCapacity {
task_slots: 2,
memory_bytes: Some(100),
pids: Some(8),
});
let reserved = ResourceCapacity {
task_slots: 1,
memory_bytes: Some(50),
pids: Some(2),
};
let reservation = TaskResourceReservation {
task_slots: 2,
memory_bytes: Some(60),
pids: Some(1),
};
assert!(!ledger.can_reserve(&reserved, &reservation));
}
}