use crate::container::{Container, ContainerKey};
use crate::pod::Pod;
use chrono::{DateTime, Utc};
use k8s_openapi::api::core::v1::{
ContainerState, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting,
ContainerStatus as KubeContainerStatus, Pod as KubePod,
};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::Time;
use tracing::{debug, warn};
#[derive(Clone, Debug)]
pub enum Status {
Waiting {
timestamp: DateTime<Utc>,
message: String,
},
Running {
timestamp: DateTime<Utc>,
},
Terminated {
timestamp: DateTime<Utc>,
message: String,
failed: bool,
},
}
impl Status {
pub fn waiting(message: &str) -> Self {
Status::Waiting {
timestamp: Utc::now(),
message: message.to_string(),
}
}
pub fn running() -> Self {
Status::Running {
timestamp: Utc::now(),
}
}
pub fn terminated(message: &str, failed: bool) -> Self {
Status::Terminated {
timestamp: Utc::now(),
message: message.to_string(),
failed,
}
}
pub fn to_kubernetes(&self, container_name: &str) -> KubeContainerStatus {
let mut state = ContainerState::default();
match self {
Self::Waiting { message, .. } => {
state.waiting.replace(ContainerStateWaiting {
message: Some(message.clone()),
..Default::default()
});
}
Self::Running { timestamp } => {
state.running.replace(ContainerStateRunning {
started_at: Some(Time(*timestamp)),
});
}
Self::Terminated {
timestamp,
message,
failed,
} => {
state.terminated.replace(ContainerStateTerminated {
finished_at: Some(Time(*timestamp)),
message: Some(message.clone()),
exit_code: *failed as i32,
..Default::default()
});
}
};
let ready = state.running.is_some();
KubeContainerStatus {
state: Some(state),
name: container_name.to_string(),
ready,
started: Some(true),
..Default::default()
}
}
}
pub async fn patch_container_status(
client: &kube::Api<KubePod>,
pod: &Pod,
key: &ContainerKey,
status: &Status,
) -> anyhow::Result<()> {
match pod.find_container(&key) {
Some(container) => {
let kube_status = status.to_kubernetes(container.name());
let patches = match pod.container_status_index(&key) {
Some(idx) => {
let path_prefix = if key.is_init() {
format!("/status/initContainerStatuses/{}", idx)
} else {
format!("/status/containerStatuses/{}", idx)
};
vec![
json_patch::PatchOperation::Replace(json_patch::ReplaceOperation {
path: format!("{}/state", path_prefix),
value: serde_json::json!(kube_status.state.unwrap()),
}),
json_patch::PatchOperation::Replace(json_patch::ReplaceOperation {
path: format!("{}/ready", path_prefix),
value: serde_json::json!(kube_status.ready),
}),
json_patch::PatchOperation::Replace(json_patch::ReplaceOperation {
path: format!("{}/started", path_prefix),
value: serde_json::json!(true),
}),
]
}
None => {
let path = if key.is_init() {
"/status/initContainerStatuses/-".to_string()
} else {
"/status/containerStatuses/-".to_string()
};
vec![json_patch::PatchOperation::Add(json_patch::AddOperation {
path,
value: serde_json::json!(kube_status),
})]
}
};
let patch = json_patch::Patch(patches);
let params = kube::api::PatchParams::default();
debug!(
"Patching container status {} {}: '{:?}'",
pod.name(),
container.name(),
patch
);
client
.patch_status(pod.name(), ¶ms, &kube::api::Patch::<()>::Json(patch))
.await?;
Ok(())
}
None => {
warn!(
"Container status update for unknown container {}.",
key.name()
);
Ok(())
}
}
}
pub fn make_initial_container_status(container: &Container) -> KubeContainerStatus {
let state = ContainerState {
waiting: Some(ContainerStateWaiting {
message: Some("Registered".to_string()),
reason: Some("Registered".to_string()),
}),
..Default::default()
};
KubeContainerStatus {
name: container.name().to_string(),
ready: false,
started: Some(false),
state: Some(state),
..Default::default()
}
}