use std::collections::HashMap;
use std::sync::Mutex;
use super::{Error, Result, SandboxResources, SingleNodeSchedulerConfig};
use firkin_admission::{CapacityError, CapacityLedger, ResourceBudget};
use firkin_types::Size;
#[derive(Debug)]
pub struct SingleNodeScheduler {
config: SingleNodeSchedulerConfig,
inner: Mutex<SchedulerState>,
}
impl SingleNodeScheduler {
#[must_use]
pub fn new(config: SingleNodeSchedulerConfig) -> Self {
Self {
config,
inner: Mutex::new(SchedulerState::new(config.resources())),
}
}
pub fn admit(&self, sandbox_id: &str, resources: SandboxResources) -> Result<()> {
let mut inner = self.lock()?;
if inner.admitted.contains_key(sandbox_id) {
return Ok(());
}
if inner.admitted.len() >= self.config.max_sessions() {
return Err(Error::CapacityRejected(format!(
"single-node scheduler rejected sandbox `{sandbox_id}`: max sessions {} reached",
self.config.max_sessions()
)));
}
inner
.ledger
.reserve_active(resources.to_budget())
.map_err(|error| capacity_error(sandbox_id, error))?;
inner.admitted.insert(sandbox_id.to_owned(), resources);
Ok(())
}
pub fn release(&self, sandbox_id: &str) -> Result<()> {
let mut inner = self.lock()?;
if let Some(resources) = inner.admitted.remove(sandbox_id) {
inner.ledger.release_active(resources.to_budget());
}
Ok(())
}
pub fn admitted_len(&self) -> Result<usize> {
Ok(self.lock()?.admitted.len())
}
pub fn available_resources(&self) -> Result<SandboxResources> {
Ok(SandboxResources::from_budget(
self.lock()?.ledger.available(),
))
}
fn lock(&self) -> Result<std::sync::MutexGuard<'_, SchedulerState>> {
self.inner
.lock()
.map_err(|_| Error::CapacityRejected("single-node scheduler lock poisoned".to_owned()))
}
}
impl Default for SingleNodeScheduler {
fn default() -> Self {
Self::new(SingleNodeSchedulerConfig::default())
}
}
#[derive(Debug)]
struct SchedulerState {
admitted: HashMap<String, SandboxResources>,
ledger: CapacityLedger,
}
impl SchedulerState {
fn new(resources: SandboxResources) -> Self {
Self {
admitted: HashMap::new(),
ledger: CapacityLedger::new(resources.to_budget()),
}
}
}
impl SandboxResources {
fn to_budget(self) -> ResourceBudget {
ResourceBudget::new(self.vcpus, Size::bytes(self.memory_bytes), Size::bytes(0))
}
fn from_budget(budget: ResourceBudget) -> Self {
Self::new(budget.cpus(), budget.memory())
}
}
fn capacity_error(sandbox_id: &str, error: CapacityError) -> Error {
match error {
CapacityError::Cpu {
requested,
available,
} => Error::CapacityRejected(format!(
"single-node scheduler rejected sandbox `{sandbox_id}`: CPU requested {requested}, available {available}"
)),
CapacityError::Memory {
requested,
available,
} => Error::CapacityRejected(format!(
"single-node scheduler rejected sandbox `{sandbox_id}`: memory requested {requested}, available {available}"
)),
CapacityError::Disk {
requested,
available,
} => Error::CapacityRejected(format!(
"single-node scheduler rejected sandbox `{sandbox_id}`: disk requested {requested}, available {available}"
)),
}
}