#![allow(missing_docs)]
#[allow(unused_imports)]
use crate::DEFAULT_RUNTIME_WARM_POOL_MINIMUM_FREE_DISK;
#[allow(unused_imports)]
use crate::disk::{DiskPressureProbe, HostDiskPressureProbe, RuntimeDiskPressureGuard};
#[allow(unused_imports)]
use crate::restore::{
ActiveSessionReservation, SnapshotRestoreRequest, SnapshotSessionLauncher,
disk_pressure_to_capacity_error,
};
#[allow(unused_imports)]
use firkin_admission::CapacityError;
#[allow(unused_imports)]
use firkin_admission::WarmPoolEntry;
#[allow(unused_imports)]
use firkin_admission::WarmPoolReplenishmentSkip;
#[allow(unused_imports)]
use firkin_admission::{CapacityLedger, WarmPoolLedger};
#[allow(unused_imports)]
use firkin_admission::{WarmPoolReplenishmentPlan, WarmPoolReplenishmentTarget};
#[allow(unused_imports)]
use firkin_trace::BenchmarkSample;
#[allow(unused_imports)]
use firkin_trace::{BenchmarkMetricKind, BenchmarkUnit};
#[allow(unused_imports)]
use std::collections::BTreeMap;
#[allow(unused_imports)]
use std::collections::VecDeque;
#[allow(unused_imports)]
use std::path::Path;
#[allow(unused_imports)]
use std::sync::Arc;
#[allow(unused_imports)]
use std::time::Duration;
#[allow(unused_imports)]
use thiserror::Error as ThisError;
#[allow(unused_imports)]
use tokio::sync::{Mutex, oneshot};
#[allow(unused_imports)]
use tokio::task::JoinHandle;
#[allow(unused_imports)]
use {
firkin_admission::{ResourceBudget, WarmPoolKey},
firkin_artifacts::SnapshotArtifactManifest,
};
#[derive(Clone, Copy, Debug)]
pub struct WarmPoolRestoreRequest<'a> {
key: &'a WarmPoolKey,
pub(crate) manifest: &'a SnapshotArtifactManifest,
pub(crate) budget: ResourceBudget,
}
impl<'a> WarmPoolRestoreRequest<'a> {
#[must_use]
pub const fn new(
key: &'a WarmPoolKey,
manifest: &'a SnapshotArtifactManifest,
budget: ResourceBudget,
) -> Self {
Self {
key,
manifest,
budget,
}
}
#[must_use]
pub const fn key(self) -> &'a WarmPoolKey {
self.key
}
#[must_use]
pub const fn manifest(self) -> &'a SnapshotArtifactManifest {
self.manifest
}
#[must_use]
pub const fn budget(self) -> ResourceBudget {
self.budget
}
}
pub trait WarmPoolSessionLauncher {
type Error;
type Session;
fn restore_warm_pool_entry(
&mut self,
request: &WarmPoolRestoreRequest<'_>,
) -> Result<Self::Session, Self::Error>;
}
#[derive(Clone, Copy, Debug)]
pub struct WarmPoolCheckoutRequest<'a> {
entry: &'a WarmPoolEntry,
}
impl<'a> WarmPoolCheckoutRequest<'a> {
#[must_use]
pub const fn new(entry: &'a WarmPoolEntry) -> Self {
Self { entry }
}
#[must_use]
pub const fn entry(self) -> &'a WarmPoolEntry {
self.entry
}
}
pub trait WarmPoolSessionCheckout {
type Error;
type Session;
fn checkout_warm_pool_entry(
&mut self,
request: &WarmPoolCheckoutRequest<'_>,
) -> Result<Self::Session, Self::Error>;
}
#[derive(Clone, Debug, PartialEq, Eq, ThisError)]
pub enum WarmPoolMaintenanceError<E> {
#[error("warm-pool capacity admission failed: {0}")]
Capacity(#[from] CapacityError),
#[error("warm-pool launcher failed: {source}")]
Launch {
source: E,
},
}
#[derive(Clone, Debug, PartialEq, Eq, ThisError)]
pub enum WarmPoolCheckoutError<E> {
#[error("warm-pool checkout capacity promotion failed: {0}")]
Capacity(#[from] CapacityError),
#[error("warm-pool checkout failed: {source}")]
Checkout {
source: E,
},
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct WarmPoolMaintenanceReport<S> {
pub(crate) session: S,
entry: WarmPoolEntry,
}
impl<S> WarmPoolMaintenanceReport<S> {
#[must_use]
pub const fn new(session: S, entry: WarmPoolEntry) -> Self {
Self { session, entry }
}
#[must_use]
pub const fn session(&self) -> &S {
&self.session
}
#[must_use]
pub const fn entry(&self) -> &WarmPoolEntry {
&self.entry
}
#[must_use]
pub fn into_parts(self) -> (S, WarmPoolEntry) {
(self.session, self.entry)
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct WarmPoolCheckoutReport<S> {
pub(crate) session: S,
entry: WarmPoolEntry,
pub(crate) reservation: ActiveSessionReservation,
pub(crate) benchmark_samples: Vec<BenchmarkSample>,
}
impl<S> WarmPoolCheckoutReport<S> {
#[must_use]
pub fn new(
session: S,
entry: WarmPoolEntry,
reservation: ActiveSessionReservation,
benchmark_samples: Vec<BenchmarkSample>,
) -> Self {
Self {
session,
entry,
reservation,
benchmark_samples,
}
}
#[must_use]
pub const fn session(&self) -> &S {
&self.session
}
#[must_use]
pub const fn entry(&self) -> &WarmPoolEntry {
&self.entry
}
#[must_use]
pub const fn reservation(&self) -> &ActiveSessionReservation {
&self.reservation
}
#[must_use]
pub fn benchmark_samples(&self) -> &[BenchmarkSample] {
&self.benchmark_samples
}
#[must_use]
pub fn into_parts(self) -> (S, ActiveSessionReservation) {
(self.session, self.reservation)
}
}
pub type WarmPoolCheckoutResult<S, E> =
Option<Result<WarmPoolCheckoutReport<S>, WarmPoolCheckoutError<E>>>;
#[derive(Debug)]
pub struct RuntimeWarmPoolMaintain<'a> {
capacity: &'a mut CapacityLedger,
pool: &'a mut WarmPoolLedger,
key: WarmPoolKey,
pub(crate) manifest: &'a SnapshotArtifactManifest,
pub(crate) budget: ResourceBudget,
}
impl<'a> RuntimeWarmPoolMaintain<'a> {
pub fn new(
capacity: &'a mut CapacityLedger,
pool: &'a mut WarmPoolLedger,
key: WarmPoolKey,
manifest: &'a SnapshotArtifactManifest,
budget: ResourceBudget,
) -> Self {
Self {
capacity,
pool,
key,
manifest,
budget,
}
}
pub fn execute<L>(
self,
launcher: &mut L,
) -> Result<WarmPoolMaintenanceReport<L::Session>, WarmPoolMaintenanceError<L::Error>>
where
L: WarmPoolSessionLauncher,
{
let request = WarmPoolRestoreRequest::new(&self.key, self.manifest, self.budget);
let session = launcher
.restore_warm_pool_entry(&request)
.map_err(|source| WarmPoolMaintenanceError::Launch { source })?;
let entry =
WarmPoolEntry::new(self.key, self.manifest.logical_id().to_owned(), self.budget);
self.pool.maintain(entry.clone(), self.capacity)?;
Ok(WarmPoolMaintenanceReport::new(session, entry))
}
}
#[derive(Debug)]
pub struct RuntimeWarmPoolCheckout<'a> {
capacity: &'a mut CapacityLedger,
pool: &'a mut WarmPoolLedger,
key: &'a WarmPoolKey,
}
impl<'a> RuntimeWarmPoolCheckout<'a> {
pub fn new(
capacity: &'a mut CapacityLedger,
pool: &'a mut WarmPoolLedger,
key: &'a WarmPoolKey,
) -> Self {
Self {
capacity,
pool,
key,
}
}
pub fn execute_with_elapsed<C>(
self,
checkout: &mut C,
elapsed: Duration,
) -> WarmPoolCheckoutResult<C::Session, C::Error>
where
C: WarmPoolSessionCheckout,
{
let entry = match self.pool.checkout(self.key, self.capacity) {
Ok(Some(entry)) => entry,
Ok(None) => return None,
Err(error) => return Some(Err(WarmPoolCheckoutError::Capacity(error))),
};
let request = WarmPoolCheckoutRequest::new(&entry);
let session = match checkout.checkout_warm_pool_entry(&request) {
Ok(session) => session,
Err(source) => {
self.capacity.release_active(entry.budget());
let _ = self.pool.maintain(entry.clone(), self.capacity);
return Some(Err(WarmPoolCheckoutError::Checkout { source }));
}
};
let reservation = ActiveSessionReservation::new(entry.budget());
let sample = BenchmarkSample::new(
"warm_pool_checkout",
BenchmarkMetricKind::LifecycleLatency,
BenchmarkUnit::Milliseconds,
elapsed.as_secs_f64() * 1000.0,
);
Some(Ok(WarmPoolCheckoutReport::new(
session,
entry,
reservation,
vec![sample],
)))
}
}
#[derive(Clone, Debug, PartialEq, Eq, ThisError)]
pub enum SnapshotWarmPoolCheckoutError {
#[error("warm-pool checkout capacity promotion failed: {0}")]
Capacity(#[from] CapacityError),
#[error("warm-pool metadata existed without a retained restored session")]
MissingRetainedSession,
}
#[derive(Clone, Debug)]
pub struct RuntimeSnapshotWarmPool<S> {
capacity: CapacityLedger,
pub(crate) ledger: WarmPoolLedger,
sessions: BTreeMap<WarmPoolKey, VecDeque<WarmPoolMaintenanceReport<S>>>,
}
impl<S> RuntimeSnapshotWarmPool<S> {
#[must_use]
pub fn new(capacity: CapacityLedger) -> Self {
Self {
capacity,
ledger: WarmPoolLedger::default(),
sessions: BTreeMap::new(),
}
}
#[must_use]
pub const fn capacity(&self) -> CapacityLedger {
self.capacity
}
#[must_use]
pub fn contains(&self, key: &WarmPoolKey) -> bool {
self.sessions
.get(key)
.is_some_and(|entries| !entries.is_empty())
}
pub async fn maintain_with_elapsed<L>(
&mut self,
key: WarmPoolKey,
manifest: &SnapshotArtifactManifest,
budget: ResourceBudget,
launcher: &mut L,
elapsed: Duration,
) -> Result<&WarmPoolMaintenanceReport<S>, WarmPoolMaintenanceError<L::Error>>
where
L: SnapshotSessionLauncher<Session = S>,
{
let mut probe = HostDiskPressureProbe::new();
self.maintain_with_disk_probe_elapsed(key, manifest, budget, launcher, elapsed, &mut probe)
.await
}
pub async fn maintain_with_disk_probe_elapsed<L, P>(
&mut self,
key: WarmPoolKey,
manifest: &SnapshotArtifactManifest,
budget: ResourceBudget,
launcher: &mut L,
_elapsed: Duration,
disk_probe: &mut P,
) -> Result<&WarmPoolMaintenanceReport<S>, WarmPoolMaintenanceError<L::Error>>
where
L: SnapshotSessionLauncher<Session = S>,
P: DiskPressureProbe,
{
let disk_root = manifest.path().parent().unwrap_or(Path::new("/"));
RuntimeDiskPressureGuard::new(disk_root, DEFAULT_RUNTIME_WARM_POOL_MINIMUM_FREE_DISK)
.check(disk_probe)
.map_err(|error| {
WarmPoolMaintenanceError::Capacity(disk_pressure_to_capacity_error(&error))
})?;
let request = SnapshotRestoreRequest::new(manifest, budget);
let session = launcher
.restore_from_snapshot(&request)
.await
.map_err(|source| WarmPoolMaintenanceError::Launch { source })?;
let entry = WarmPoolEntry::new(key.clone(), manifest.logical_id().to_owned(), budget);
if let Err(error) = self.ledger.maintain(entry.clone(), &mut self.capacity) {
return Err(WarmPoolMaintenanceError::Capacity(error));
}
self.sessions
.entry(key.clone())
.or_default()
.push_back(WarmPoolMaintenanceReport::new(session, entry));
Ok(self
.sessions
.get(&key)
.and_then(|entries| entries.back())
.expect("warm-pool session was just inserted"))
}
pub fn checkout_with_elapsed(
&mut self,
key: &WarmPoolKey,
elapsed: Duration,
) -> Result<Option<WarmPoolCheckoutReport<S>>, SnapshotWarmPoolCheckoutError> {
let Some(entry) = self.ledger.checkout(key, &mut self.capacity)? else {
return Ok(None);
};
let Some(reports) = self.sessions.get_mut(key) else {
self.capacity.release_active(entry.budget());
return Err(SnapshotWarmPoolCheckoutError::MissingRetainedSession);
};
let Some(report) = reports.pop_front() else {
self.sessions.remove(key);
self.capacity.release_active(entry.budget());
return Err(SnapshotWarmPoolCheckoutError::MissingRetainedSession);
};
if reports.is_empty() {
self.sessions.remove(key);
}
let (session, retained_entry) = report.into_parts();
let reservation = ActiveSessionReservation::new(entry.budget());
let sample = BenchmarkSample::new(
"warm_pool_checkout",
BenchmarkMetricKind::LifecycleLatency,
BenchmarkUnit::Milliseconds,
elapsed.as_secs_f64() * 1000.0,
);
Ok(Some(WarmPoolCheckoutReport::new(
session,
retained_entry,
reservation,
vec![sample],
)))
}
pub async fn replenish_with_elapsed<L>(
&mut self,
targets: &[WarmPoolReplenishmentTarget],
launcher: &mut L,
elapsed: Duration,
) -> RuntimeWarmPoolReplenishmentReport<L::Error>
where
L: SnapshotSessionLauncher<Session = S>,
{
let mut disk_probe = HostDiskPressureProbe::new();
self.replenish_with_disk_probe_elapsed(targets, launcher, elapsed, &mut disk_probe)
.await
}
pub async fn replenish_with_disk_probe_elapsed<L, P>(
&mut self,
targets: &[WarmPoolReplenishmentTarget],
launcher: &mut L,
elapsed: Duration,
disk_probe: &mut P,
) -> RuntimeWarmPoolReplenishmentReport<L::Error>
where
L: SnapshotSessionLauncher<Session = S>,
P: DiskPressureProbe,
{
let plan = WarmPoolReplenishmentPlan::from_targets(targets, &self.ledger, self.capacity);
let mut maintained = Vec::new();
let skipped = plan.skipped().to_vec();
let mut failed = Vec::new();
for target in plan.maintain() {
match self
.maintain_with_disk_probe_elapsed(
target.key().clone(),
target.manifest(),
target.budget(),
launcher,
elapsed,
disk_probe,
)
.await
{
Ok(report) => maintained.push(report.entry().key().clone()),
Err(error) => failed.push(RuntimeWarmPoolReplenishmentFailure::new(
target.key().clone(),
error,
)),
}
}
RuntimeWarmPoolReplenishmentReport::new(maintained, skipped, failed)
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RuntimeWarmPoolReplenishmentFailure<E> {
key: WarmPoolKey,
pub(crate) error: WarmPoolMaintenanceError<E>,
}
impl<E> RuntimeWarmPoolReplenishmentFailure<E> {
#[must_use]
pub const fn new(key: WarmPoolKey, error: WarmPoolMaintenanceError<E>) -> Self {
Self { key, error }
}
#[must_use]
pub const fn key(&self) -> &WarmPoolKey {
&self.key
}
#[must_use]
pub const fn error(&self) -> &WarmPoolMaintenanceError<E> {
&self.error
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RuntimeWarmPoolReplenishmentReport<E> {
pub(crate) maintained: Vec<WarmPoolKey>,
skipped: Vec<WarmPoolReplenishmentSkip>,
failed: Vec<RuntimeWarmPoolReplenishmentFailure<E>>,
}
impl<E> RuntimeWarmPoolReplenishmentReport<E> {
#[must_use]
pub const fn new(
maintained: Vec<WarmPoolKey>,
skipped: Vec<WarmPoolReplenishmentSkip>,
failed: Vec<RuntimeWarmPoolReplenishmentFailure<E>>,
) -> Self {
Self {
maintained,
skipped,
failed,
}
}
#[must_use]
pub fn maintained(&self) -> &[WarmPoolKey] {
&self.maintained
}
#[must_use]
pub fn skipped(&self) -> &[WarmPoolReplenishmentSkip] {
&self.skipped
}
#[must_use]
pub fn failed(&self) -> &[RuntimeWarmPoolReplenishmentFailure<E>] {
&self.failed
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RuntimeWarmPoolSupervisor {
pub(crate) targets: Vec<WarmPoolReplenishmentTarget>,
pub(crate) interval: Duration,
}
impl RuntimeWarmPoolSupervisor {
#[must_use]
pub fn new(targets: Vec<WarmPoolReplenishmentTarget>, interval: Duration) -> Self {
Self { targets, interval }
}
#[must_use]
pub fn targets(&self) -> &[WarmPoolReplenishmentTarget] {
&self.targets
}
#[must_use]
pub const fn interval(&self) -> Duration {
self.interval
}
pub async fn run_cycles<S, L>(
&self,
pool: &mut RuntimeSnapshotWarmPool<S>,
launcher: &mut L,
cycles: usize,
) -> Vec<RuntimeWarmPoolReplenishmentReport<L::Error>>
where
L: SnapshotSessionLauncher<Session = S>,
{
let mut disk_probe = HostDiskPressureProbe::new();
self.run_cycles_with_disk_probe(pool, launcher, cycles, &mut disk_probe)
.await
}
pub async fn run_cycles_with_disk_probe<S, L, P>(
&self,
pool: &mut RuntimeSnapshotWarmPool<S>,
launcher: &mut L,
cycles: usize,
disk_probe: &mut P,
) -> Vec<RuntimeWarmPoolReplenishmentReport<L::Error>>
where
L: SnapshotSessionLauncher<Session = S>,
P: DiskPressureProbe,
{
let mut reports = Vec::with_capacity(cycles);
for cycle in 0..cycles {
if cycle > 0 && !self.interval.is_zero() {
tokio::time::sleep(self.interval).await;
}
reports.push(
pool.replenish_with_disk_probe_elapsed(
&self.targets,
launcher,
Duration::ZERO,
disk_probe,
)
.await,
);
}
reports
}
}
#[derive(Clone, Debug)]
pub struct RuntimeWarmPoolService<S, L> {
pool: RuntimeSnapshotWarmPool<S>,
supervisor: RuntimeWarmPoolSupervisor,
pub(crate) launcher: L,
}
impl<S, L> RuntimeWarmPoolService<S, L> {
#[must_use]
pub const fn new(
pool: RuntimeSnapshotWarmPool<S>,
supervisor: RuntimeWarmPoolSupervisor,
launcher: L,
) -> Self {
Self {
pool,
supervisor,
launcher,
}
}
#[must_use]
pub const fn capacity(&self) -> CapacityLedger {
self.pool.capacity()
}
#[must_use]
pub const fn supervisor(&self) -> &RuntimeWarmPoolSupervisor {
&self.supervisor
}
#[must_use]
pub const fn launcher(&self) -> &L {
&self.launcher
}
#[must_use]
pub fn contains(&self, key: &WarmPoolKey) -> bool {
self.pool.contains(key)
}
pub async fn tick(&mut self, elapsed: Duration) -> RuntimeWarmPoolReplenishmentReport<L::Error>
where
L: SnapshotSessionLauncher<Session = S>,
{
let mut disk_probe = HostDiskPressureProbe::new();
self.tick_with_disk_probe(elapsed, &mut disk_probe).await
}
pub async fn tick_with_disk_probe<P>(
&mut self,
elapsed: Duration,
disk_probe: &mut P,
) -> RuntimeWarmPoolReplenishmentReport<L::Error>
where
L: SnapshotSessionLauncher<Session = S>,
P: DiskPressureProbe,
{
self.pool
.replenish_with_disk_probe_elapsed(
self.supervisor.targets(),
&mut self.launcher,
elapsed,
disk_probe,
)
.await
}
pub fn checkout_with_elapsed(
&mut self,
key: &WarmPoolKey,
elapsed: Duration,
) -> Result<Option<WarmPoolCheckoutReport<S>>, SnapshotWarmPoolCheckoutError> {
self.pool.checkout_with_elapsed(key, elapsed)
}
#[must_use]
pub fn spawn(self) -> RuntimeWarmPoolServiceHandle<S, L>
where
S: Send + 'static,
L: SnapshotSessionLauncher<Session = S> + Send + 'static,
L::Error: Send,
{
self.spawn_with_disk_probe(HostDiskPressureProbe::new())
}
#[must_use]
pub fn spawn_with_disk_probe<P>(self, mut disk_probe: P) -> RuntimeWarmPoolServiceHandle<S, L>
where
S: Send + 'static,
L: SnapshotSessionLauncher<Session = S> + Send + 'static,
L::Error: Send,
P: DiskPressureProbe + Send + 'static,
{
let interval = self.supervisor.interval();
let state = Arc::new(Mutex::new(self));
let task_state = Arc::clone(&state);
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
let task = tokio::spawn(async move {
loop {
{
let mut service = task_state.lock().await;
service
.tick_with_disk_probe(Duration::ZERO, &mut disk_probe)
.await;
}
tokio::select! {
_ = & mut shutdown_rx => break, () = tokio::time::sleep(interval) =>
{}
}
}
});
RuntimeWarmPoolServiceHandle {
state,
shutdown: Some(shutdown_tx),
task,
}
}
}
#[derive(Debug)]
pub struct RuntimeWarmPoolServiceHandle<S, L> {
pub(crate) state: Arc<Mutex<RuntimeWarmPoolService<S, L>>>,
pub(crate) shutdown: Option<oneshot::Sender<()>>,
pub(crate) task: JoinHandle<()>,
}
impl<S, L> RuntimeWarmPoolServiceHandle<S, L> {
pub async fn capacity(&self) -> CapacityLedger {
self.state.lock().await.capacity()
}
pub async fn contains(&self, key: &WarmPoolKey) -> bool {
self.state.lock().await.contains(key)
}
pub async fn checkout_with_elapsed(
&self,
key: &WarmPoolKey,
elapsed: Duration,
) -> Result<Option<WarmPoolCheckoutReport<S>>, SnapshotWarmPoolCheckoutError> {
self.state.lock().await.checkout_with_elapsed(key, elapsed)
}
pub async fn shutdown(
mut self,
) -> Result<RuntimeWarmPoolService<S, L>, tokio::task::JoinError> {
if let Some(shutdown) = self.shutdown.take() {
let _ = shutdown.send(());
}
self.task.await?;
let mutex = Arc::try_unwrap(self.state).unwrap_or_else(|_| {
panic!("warm-pool service handle owns the only remaining state reference")
});
Ok(mutex.into_inner())
}
}