use super::Pod;
use crate::container::make_initial_container_status;
use k8s_openapi::api::core::v1::ContainerStatus as KubeContainerStatus;
use k8s_openapi::api::core::v1::Pod as KubePod;
use k8s_openapi::api::core::v1::PodCondition as KubePodCondition;
use k8s_openapi::api::core::v1::PodStatus as KubePodStatus;
use krator::{Manifest, ObjectStatus};
use kube::api::PatchParams;
use kube::Api;
use tracing::{debug, warn};
pub async fn patch_status(api: &Api<KubePod>, name: &str, status: Status) {
let patch = status.json_patch();
debug!("Applying status patch to Pod {}: '{:?}'", &name, patch);
match api
.patch_status(
&name,
&PatchParams::default(),
&kube::api::Patch::Strategic(patch),
)
.await
{
Ok(_) => (),
Err(e) => {
warn!("Pod {} error patching status: {:?}", name, e);
}
}
}
const MAX_STATUS_INIT_RETRIES: usize = 5;
pub async fn initialize_pod_container_statuses(
name: String,
pod: Manifest<Pod>,
api: &Api<KubePod>,
) -> anyhow::Result<()> {
let mut retries = 0;
'main: loop {
if retries == MAX_STATUS_INIT_RETRIES {
let status = make_status(
Phase::Failed,
"Timed out while initializing container statuses.",
);
patch_status(&api, &name, status).await;
anyhow::bail!("Timed out while initializing container statuses.")
}
let (num_containers, num_init_containers) = {
let pod = pod.latest();
patch_status(&api, &name, make_registered_status(&pod)).await;
let num_containers = pod.containers().len();
let num_init_containers = pod.init_containers().len();
(num_containers, num_init_containers)
};
for _ in 0..10 {
let status = pod
.latest()
.as_kube_pod()
.status
.clone()
.unwrap_or_default();
let num_statuses = status
.container_statuses
.as_ref()
.map(|statuses| statuses.len())
.unwrap_or(0);
let num_init_statuses = status
.init_container_statuses
.as_ref()
.map(|statuses| statuses.len())
.unwrap_or(0);
if (num_statuses == num_containers) && (num_init_statuses == num_init_containers) {
break 'main Ok(());
} else {
debug!("Pod {} waiting for status to populate: {:?}", &name, status);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
retries += 1;
}
}
pub fn make_registered_status(pod: &Pod) -> Status {
let init_container_statuses: Vec<KubeContainerStatus> = pod
.init_containers()
.iter()
.map(make_initial_container_status)
.collect();
let container_statuses: Vec<KubeContainerStatus> = pod
.containers()
.iter()
.map(make_initial_container_status)
.collect();
make_status_with_containers(
Phase::Pending,
"Registered",
container_statuses,
init_container_statuses,
)
}
pub fn make_status(phase: Phase, reason: &str) -> Status {
StatusBuilder::new()
.phase(phase)
.reason(reason)
.message(reason)
.build()
}
pub fn make_status_with_containers(
phase: Phase,
reason: &str,
container_statuses: Vec<KubeContainerStatus>,
init_container_statuses: Vec<KubeContainerStatus>,
) -> Status {
StatusBuilder::new()
.phase(phase)
.reason(reason)
.container_statuses(container_statuses)
.init_container_statuses(init_container_statuses)
.build()
}
#[derive(Debug, Default)]
pub struct Status(KubePodStatus);
#[derive(Default)]
pub struct StatusBuilder(KubePodStatus);
impl StatusBuilder {
pub fn new() -> Self {
StatusBuilder(Default::default())
}
pub fn phase(mut self, phase: Phase) -> StatusBuilder {
self.0.phase = Some(format!("{}", phase));
self
}
pub fn reason(mut self, reason: &str) -> StatusBuilder {
self.0.reason = Some(reason.to_string());
self
}
pub fn message(mut self, message: &str) -> StatusBuilder {
self.0.message = Some(message.to_string());
self
}
pub fn container_statuses(
mut self,
container_statuses: Vec<KubeContainerStatus>,
) -> StatusBuilder {
self.0.container_statuses = Some(container_statuses);
self
}
pub fn init_container_statuses(
mut self,
init_container_statuses: Vec<KubeContainerStatus>,
) -> StatusBuilder {
self.0.init_container_statuses = Some(init_container_statuses);
self
}
pub fn conditions(mut self, conditions: Vec<KubePodCondition>) -> StatusBuilder {
self.0.conditions = Some(conditions);
self
}
pub fn build(self) -> Status {
Status(self.0)
}
}
#[derive(Clone, Debug, serde::Serialize)]
pub enum Phase {
Pending,
Running,
Failed,
Succeeded,
Unknown,
}
impl std::fmt::Display for Phase {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", serde_json::json!(self).as_str().unwrap())
}
}
impl Default for Phase {
fn default() -> Self {
Self::Unknown
}
}
impl ObjectStatus for Status {
fn json_patch(&self) -> serde_json::Value {
let mut status = serde_json::Map::new();
if let Some(s) = self.0.phase.clone() {
status.insert("phase".to_string(), serde_json::Value::String(s));
};
if let Some(s) = self.0.message.clone() {
status.insert("message".to_string(), serde_json::Value::String(s));
};
if let Some(s) = self.0.reason.clone() {
status.insert("reason".to_string(), serde_json::Value::String(s));
};
if let Some(s) = self.0.container_statuses.clone() {
status.insert("containerStatuses".to_string(), serde_json::json!(s));
};
if let Some(s) = self.0.init_container_statuses.clone() {
status.insert("initContainerStatuses".to_string(), serde_json::json!(s));
};
if let Some(s) = self.0.conditions.clone() {
status.insert("conditions".to_string(), serde_json::json!(s));
};
serde_json::json!(
{
"metadata": {
"resourceVersion": "",
},
"status": serde_json::Value::Object(status)
}
)
}
fn failed(e: &str) -> Self {
StatusBuilder::new()
.phase(Phase::Failed)
.message(e)
.reason(e)
.build()
}
}