#![allow(missing_docs)]
#[cfg(feature = "snapshot")]
#[allow(unused_imports)]
use crate::DEFAULT_RUNTIME_MINIMUM_FREE_DISK;
#[cfg(feature = "snapshot")]
#[allow(unused_imports)]
use crate::disk::{DiskPressureProbe, HostDiskPressureProbe, RuntimeDiskPressureGuard};
#[cfg(feature = "snapshot")]
#[allow(unused_imports)]
use crate::restore::{
ActiveSessionReservation, RuntimeCubeSandboxCreateConfig, RuntimeSnapshotRestore,
SnapshotRestoreError, SnapshotSessionLauncher, disk_pressure_to_capacity_error,
snapshot_output_disk_root, write_snapshot_artifact_sidecars,
};
#[allow(unused_imports)]
use async_trait::async_trait;
#[cfg(feature = "snapshot")]
#[allow(unused_imports)]
use base64::Engine as _;
#[cfg(feature = "snapshot")]
#[allow(unused_imports)]
use firkin_admission::CapacityError;
#[cfg(feature = "snapshot")]
#[allow(unused_imports)]
use firkin_admission::{CapacityLedger, ResourceBudget};
#[cfg(feature = "snapshot")]
#[allow(unused_imports)]
use firkin_artifacts::ContinuationSnapshotPlan;
#[cfg(feature = "snapshot")]
#[allow(unused_imports)]
use firkin_artifacts::ContinuationSnapshotReason;
#[cfg(feature = "snapshot")]
#[allow(unused_imports)]
use firkin_artifacts::SnapshotArtifactManifest;
#[cfg(feature = "snapshot")]
#[allow(unused_imports)]
use firkin_e2b_contract::RuntimeSandbox;
#[cfg(feature = "snapshot")]
#[allow(unused_imports)]
use firkin_e2b_contract::StartSandboxRequest;
#[allow(unused_imports)]
use firkin_e2b_contract::{DEFAULT_CODE_INTERPRETER_PORT, DEFAULT_MCP_PORT};
#[allow(unused_imports)]
use firkin_envd::DEFAULT_ENVD_PORT;
#[allow(unused_imports)]
use firkin_template::SnapshotSinkError;
#[cfg(feature = "snapshot")]
#[allow(unused_imports)]
use firkin_template::TemplateSnapshotSink;
#[cfg(feature = "snapshot")]
#[allow(unused_imports)]
use firkin_trace::BenchmarkSample;
#[cfg(feature = "snapshot")]
#[allow(unused_imports)]
use firkin_trace::{BenchmarkMetricKind, BenchmarkUnit};
#[cfg(feature = "snapshot")]
#[allow(unused_imports)]
use std::io;
#[allow(unused_imports)]
use std::path::{Path, PathBuf};
#[cfg(feature = "snapshot")]
#[allow(unused_imports)]
use std::time::Duration;
#[cfg(feature = "snapshot")]
#[allow(unused_imports)]
use thiserror::Error as ThisError;
#[derive(Clone, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
pub struct PersistedContainerSnapshotState {
staging_dir: PathBuf,
machine_identifier: Vec<u8>,
network_macs: Vec<String>,
}
impl PersistedContainerSnapshotState {
#[cfg(feature = "snapshot")]
#[must_use]
pub fn from_snapshot_state(state: &firkin_core::ContainerSnapshotState) -> Self {
Self {
staging_dir: state.staging_dir().to_path_buf(),
machine_identifier: state.machine_identifier().to_vec(),
network_macs: state.network_macs().to_vec(),
}
}
#[cfg(feature = "snapshot")]
#[must_use]
pub fn to_snapshot_state(&self) -> firkin_core::ContainerSnapshotState {
firkin_core::ContainerSnapshotState::new(
self.staging_dir.clone(),
self.machine_identifier.clone(),
self.network_macs.clone(),
)
}
#[must_use]
pub fn staging_dir(&self) -> &Path {
&self.staging_dir
}
#[must_use]
pub fn machine_identifier(&self) -> &[u8] {
&self.machine_identifier
}
#[must_use]
pub fn network_macs(&self) -> &[String] {
&self.network_macs
}
}
#[async_trait]
pub trait RuntimeContinuationSnapshotSource: Send + Sync {
async fn save_continuation_snapshot(&self, path: &Path) -> Result<(), SnapshotSinkError>;
async fn cleanup_unsnapshotted_staging(&self) -> Result<(), SnapshotSinkError> {
Ok(())
}
}
#[cfg(feature = "snapshot")]
#[async_trait]
impl<S> RuntimeContinuationSnapshotSource for firkin_core::Container<S>
where
S: firkin_core::ContainerStdio + Send + Sync,
{
async fn save_continuation_snapshot(&self, path: &Path) -> Result<(), SnapshotSinkError> {
CoreContainerSnapshotSink::new(self)
.save_snapshot(path)
.await
}
async fn cleanup_unsnapshotted_staging(&self) -> Result<(), SnapshotSinkError> {
let state = self
.snapshot_state()
.await
.map_err(|source| Box::new(source) as SnapshotSinkError)?;
match std::fs::remove_dir_all(state.staging_dir()) {
Ok(()) => Ok(()),
Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(()),
Err(source) => Err(Box::new(source) as SnapshotSinkError),
}
}
}
#[cfg(feature = "snapshot")]
pub type RuntimeCubeSandboxFollowupConfig = RuntimeCubeSandboxCreateConfig;
#[cfg(feature = "snapshot")]
pub(crate) fn runtime_continuation_snapshot_path(snapshot_id: &str) -> PathBuf {
let root = crate::default_runtime_continuation_root();
let encoded = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(snapshot_id.as_bytes());
root.join(format!("{encoded}.vz"))
}
#[cfg(feature = "snapshot")]
#[derive(Clone, Debug, PartialEq)]
pub struct ContinuationSnapshotReport {
pub(crate) manifest: SnapshotArtifactManifest,
pub(crate) reason: ContinuationSnapshotReason,
pub(crate) benchmark_samples: Vec<BenchmarkSample>,
}
#[cfg(feature = "snapshot")]
impl ContinuationSnapshotReport {
#[must_use]
pub fn new(
manifest: SnapshotArtifactManifest,
reason: ContinuationSnapshotReason,
benchmark_samples: Vec<BenchmarkSample>,
) -> Self {
Self {
manifest,
reason,
benchmark_samples,
}
}
#[must_use]
pub const fn manifest(&self) -> &SnapshotArtifactManifest {
&self.manifest
}
#[must_use]
pub const fn reason(&self) -> ContinuationSnapshotReason {
self.reason
}
#[must_use]
pub fn benchmark_samples(&self) -> &[BenchmarkSample] {
&self.benchmark_samples
}
}
#[cfg(feature = "snapshot")]
#[derive(Debug, ThisError)]
pub enum ContinuationSnapshotError {
#[error("continuation snapshot capacity admission failed: {0}")]
Capacity(#[from] CapacityError),
#[error("continuation snapshot sink failed: {source}")]
Snapshot {
#[source]
source: SnapshotSinkError,
},
}
#[cfg(feature = "snapshot")]
#[derive(Clone, Debug, PartialEq)]
pub struct ContinuationSnapshotRestoreReport<S> {
pub(crate) session: S,
pub(crate) reason: ContinuationSnapshotReason,
pub(crate) reservation: ActiveSessionReservation,
pub(crate) benchmark_samples: Vec<BenchmarkSample>,
}
#[cfg(feature = "snapshot")]
impl<S> ContinuationSnapshotRestoreReport<S> {
#[must_use]
pub fn new(
session: S,
reason: ContinuationSnapshotReason,
reservation: ActiveSessionReservation,
benchmark_samples: Vec<BenchmarkSample>,
) -> Self {
Self {
session,
reason,
reservation,
benchmark_samples,
}
}
#[must_use]
pub const fn session(&self) -> &S {
&self.session
}
#[must_use]
pub const fn reason(&self) -> ContinuationSnapshotReason {
self.reason
}
#[must_use]
pub const fn reservation(&self) -> &ActiveSessionReservation {
&self.reservation
}
#[must_use]
pub fn benchmark_samples(&self) -> &[BenchmarkSample] {
&self.benchmark_samples
}
#[must_use]
pub fn into_parts(
self,
) -> (
S,
ContinuationSnapshotReason,
ActiveSessionReservation,
Vec<BenchmarkSample>,
) {
(
self.session,
self.reason,
self.reservation,
self.benchmark_samples,
)
}
}
#[cfg(feature = "snapshot")]
#[derive(Clone, Debug, PartialEq)]
pub struct RuntimeCubeSandboxFollowupReport<S> {
pub(crate) runtime_sandbox: RuntimeSandbox,
pub(crate) session: S,
pub(crate) reason: ContinuationSnapshotReason,
pub(crate) reservation: ActiveSessionReservation,
pub(crate) benchmark_samples: Vec<BenchmarkSample>,
}
#[cfg(feature = "snapshot")]
impl<S> RuntimeCubeSandboxFollowupReport<S> {
#[must_use]
pub fn new(
runtime_sandbox: RuntimeSandbox,
session: S,
reason: ContinuationSnapshotReason,
reservation: ActiveSessionReservation,
benchmark_samples: Vec<BenchmarkSample>,
) -> Self {
Self {
runtime_sandbox,
session,
reason,
reservation,
benchmark_samples,
}
}
#[must_use]
pub const fn runtime_sandbox(&self) -> &RuntimeSandbox {
&self.runtime_sandbox
}
#[must_use]
pub const fn session(&self) -> &S {
&self.session
}
#[must_use]
pub const fn reason(&self) -> ContinuationSnapshotReason {
self.reason
}
#[must_use]
pub const fn reservation(&self) -> &ActiveSessionReservation {
&self.reservation
}
#[must_use]
pub fn benchmark_samples(&self) -> &[BenchmarkSample] {
&self.benchmark_samples
}
#[must_use]
pub fn into_session_reservation(self) -> (S, ActiveSessionReservation) {
(self.session, self.reservation)
}
}
#[cfg(feature = "snapshot")]
#[derive(Debug)]
pub struct RuntimeCubeSandboxFollowup<'a> {
pub(crate) ledger: &'a mut CapacityLedger,
pub(crate) request: &'a StartSandboxRequest,
pub(crate) plan: &'a ContinuationSnapshotPlan,
pub(crate) budget: ResourceBudget,
#[allow(missing_docs)]
pub config: RuntimeCubeSandboxFollowupConfig,
}
#[cfg(feature = "snapshot")]
impl<'a> RuntimeCubeSandboxFollowup<'a> {
#[must_use]
pub const fn new(
ledger: &'a mut CapacityLedger,
request: &'a StartSandboxRequest,
plan: &'a ContinuationSnapshotPlan,
budget: ResourceBudget,
config: RuntimeCubeSandboxFollowupConfig,
) -> Self {
Self {
ledger,
request,
plan,
budget,
config,
}
}
pub async fn execute_with_elapsed<L>(
self,
launcher: &mut L,
elapsed: Duration,
) -> Result<RuntimeCubeSandboxFollowupReport<L::Session>, SnapshotRestoreError<L::Error>>
where
L: SnapshotSessionLauncher,
{
let _ = self.request;
let report = RuntimeContinuationSnapshotRestore::new(self.ledger, self.plan, self.budget)
.execute_with_elapsed(launcher, elapsed)
.await?;
let runtime_sandbox = RuntimeSandbox {
config: self.config.into_runtime_config(),
exposed_ports: vec![
DEFAULT_ENVD_PORT,
DEFAULT_CODE_INTERPRETER_PORT,
DEFAULT_MCP_PORT,
],
};
let (session, reason, reservation, benchmark_samples) = report.into_parts();
Ok(RuntimeCubeSandboxFollowupReport::new(
runtime_sandbox,
session,
reason,
reservation,
benchmark_samples,
))
}
}
#[cfg(feature = "snapshot")]
#[derive(Debug)]
pub struct RuntimeContinuationSnapshotCapture<'a> {
pub(crate) plan: &'a ContinuationSnapshotPlan,
}
#[cfg(feature = "snapshot")]
impl<'a> RuntimeContinuationSnapshotCapture<'a> {
#[must_use]
pub const fn new(plan: &'a ContinuationSnapshotPlan) -> Self {
Self { plan }
}
pub async fn execute_with_elapsed<S>(
self,
snapshot_sink: &S,
elapsed: Duration,
) -> Result<ContinuationSnapshotReport, ContinuationSnapshotError>
where
S: TemplateSnapshotSink,
{
let mut probe = HostDiskPressureProbe::new();
self.execute_with_disk_probe_elapsed(snapshot_sink, elapsed, &mut probe)
.await
}
pub async fn execute_with_disk_probe_elapsed<S, P>(
self,
snapshot_sink: &S,
elapsed: Duration,
disk_probe: &mut P,
) -> Result<ContinuationSnapshotReport, ContinuationSnapshotError>
where
S: TemplateSnapshotSink,
P: DiskPressureProbe,
{
let disk_root = snapshot_output_disk_root(self.plan.snapshot_output_path());
RuntimeDiskPressureGuard::new(disk_root, DEFAULT_RUNTIME_MINIMUM_FREE_DISK)
.check(disk_probe)
.map_err(|error| {
ContinuationSnapshotError::Capacity(disk_pressure_to_capacity_error(&error))
})?;
snapshot_sink
.save_snapshot(self.plan.snapshot_output_path())
.await
.map_err(|source| ContinuationSnapshotError::Snapshot { source })?;
let manifest = self.plan.snapshot_manifest();
write_snapshot_artifact_sidecars(&manifest)
.map_err(|source| ContinuationSnapshotError::Snapshot { source })?;
let sample = BenchmarkSample::new(
"snapshot_save",
BenchmarkMetricKind::LifecycleLatency,
BenchmarkUnit::Milliseconds,
elapsed.as_secs_f64() * 1000.0,
);
Ok(ContinuationSnapshotReport::new(
manifest,
self.plan.reason(),
vec![sample],
))
}
}
#[cfg(feature = "snapshot")]
#[derive(Debug)]
pub struct RuntimeContinuationSnapshotRestore<'a> {
pub(crate) ledger: &'a mut CapacityLedger,
pub(crate) plan: &'a ContinuationSnapshotPlan,
pub(crate) budget: ResourceBudget,
}
#[cfg(feature = "snapshot")]
impl<'a> RuntimeContinuationSnapshotRestore<'a> {
pub fn new(
ledger: &'a mut CapacityLedger,
plan: &'a ContinuationSnapshotPlan,
budget: ResourceBudget,
) -> Self {
Self {
ledger,
plan,
budget,
}
}
pub async fn execute_with_elapsed<L>(
self,
launcher: &mut L,
elapsed: Duration,
) -> Result<ContinuationSnapshotRestoreReport<L::Session>, SnapshotRestoreError<L::Error>>
where
L: SnapshotSessionLauncher,
{
let manifest = self.plan.snapshot_manifest();
let report = RuntimeSnapshotRestore::new(self.ledger, &manifest, self.budget)
.execute_with_elapsed(launcher, elapsed)
.await?;
let benchmark_samples = report.benchmark_samples().to_vec();
let (session, reservation) = report.into_parts();
Ok(ContinuationSnapshotRestoreReport::new(
session,
self.plan.reason(),
reservation,
benchmark_samples,
))
}
}
#[cfg(feature = "snapshot")]
#[derive(Debug)]
pub struct CoreContainerSnapshotSink<'a, S = firkin_core::Streams> {
pub(crate) container: &'a firkin_core::Container<S>,
pub(crate) state_path: Option<PathBuf>,
}
#[cfg(feature = "snapshot")]
impl<'a, S> CoreContainerSnapshotSink<'a, S> {
#[must_use]
pub const fn new(container: &'a firkin_core::Container<S>) -> Self {
Self {
container,
state_path: None,
}
}
#[must_use]
pub fn with_state_path(mut self, state_path: impl Into<PathBuf>) -> Self {
self.state_path = Some(state_path.into());
self
}
#[must_use]
pub const fn container(&self) -> &'a firkin_core::Container<S> {
self.container
}
#[must_use]
pub fn state_path_for_snapshot(&self, snapshot_path: &Path) -> PathBuf {
self.state_path
.clone()
.unwrap_or_else(|| snapshot_path.with_extension("state.json"))
}
}
#[cfg(feature = "snapshot")]
#[async_trait]
impl<S> TemplateSnapshotSink for CoreContainerSnapshotSink<'_, S>
where
S: firkin_core::ContainerStdio + Send + Sync,
{
async fn save_snapshot(&self, path: &std::path::Path) -> Result<(), SnapshotSinkError> {
self.container
.save_snapshot(path)
.await
.map_err(|source| Box::new(source) as SnapshotSinkError)?;
let state = self
.container
.snapshot_state()
.await
.map_err(|source| Box::new(source) as SnapshotSinkError)?;
let persisted = PersistedContainerSnapshotState::from_snapshot_state(&state);
let state_path = self.state_path_for_snapshot(path);
let bytes = serde_json::to_vec_pretty(&persisted)
.map_err(|source| Box::new(source) as SnapshotSinkError)?;
std::fs::write(state_path, bytes).map_err(|source| Box::new(source) as SnapshotSinkError)
}
}