#[macro_use]
extern crate serde_derive;
use chrono::offset::Utc;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ObjectMeta, Time};
use roperator::config::{ClientConfig, Credentials, KubeConfig};
use roperator::handler::failable::{DefaultFailableHandler, FailableHandler, HandlerResult};
use roperator::prelude::{k8s_types, ChildConfig, K8sType, OperatorConfig, SyncRequest};
use roperator::serde_json::{json, Value};
use serde::de::{self, Deserialize, Deserializer};
use std::time::Duration;
const OPERATOR_NAME: &str = "temp-namespace-example";
static PARENT_TYPE: &K8sType = &K8sType {
api_version: "example.roperator.com/v1alpha1",
kind: "TempNamespace",
plural_kind: "tempnamespaces",
};
#[derive(Deserialize, Clone, Debug, PartialEq)]
pub struct TempNamespace {
pub metadata: ObjectMeta,
pub spec: TempNamespaceSpec,
pub status: Option<TempNamespaceStatus>,
}
impl TempNamespace {
fn get_time_remaining(&self) -> Option<Duration> {
let created_at = self
.status
.as_ref()
.and_then(|status| status.created_at.as_ref().map(|time| time.0))
.unwrap_or(Utc::now());
let elapsed = Utc::now()
.signed_duration_since(created_at)
.to_std()
.unwrap_or_else(|_| Duration::from_secs(0));
if self.spec.delete_after > elapsed {
Some(self.spec.delete_after - elapsed)
} else {
None
}
}
}
#[derive(Deserialize, Clone, Debug, PartialEq)]
pub struct TempNamespaceSpec {
#[serde(rename = "deleteAfter", deserialize_with = "deserialize_duration")]
delete_after: Duration,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Default)]
#[serde(rename_all = "camelCase")]
pub struct TempNamespaceStatus {
#[serde(skip_serializing_if = "Option::is_none")]
created_at: Option<Time>,
#[serde(skip_serializing_if = "Option::is_none")]
deleted_at: Option<Time>,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
}
struct TempNsHandler;
impl FailableHandler for TempNsHandler {
type Validated = TempNamespace;
type Error = serde_json::Error;
type Status = TempNamespaceStatus;
fn validate(&self, request: &SyncRequest) -> Result<Self::Validated, Self::Error> {
request.deserialize_parent::<TempNamespace>()
}
fn sync_children(
&self,
validated: &mut TempNamespace,
request: &SyncRequest,
) -> Result<Vec<Value>, Self::Error> {
let time_remaining = validated.get_time_remaining();
log::info!(
"Namespace: '{}' has {:?} time remaining",
request.parent.name(),
time_remaining
);
if time_remaining.is_some() {
Ok(vec![namespace(&validated)])
} else {
Ok(Vec::new())
}
}
fn determine_status(
&self,
request: &SyncRequest,
result: HandlerResult<Self::Validated, Self::Error>,
) -> Self::Status {
let parent = request.deserialize_parent::<TempNamespace>().ok();
let time_remaining = parent.as_ref().and_then(|p| p.get_time_remaining());
let prev_status = parent.and_then(|p| p.status).unwrap_or_default();
let existing_namespace = request
.children()
.of_type(k8s_types::core::v1::Namespace)
.first();
let created_at = prev_status
.created_at
.clone()
.or_else(|| existing_namespace.as_ref().map(|_| Time(Utc::now())));
let deleted_at = prev_status.deleted_at.clone().or_else(|| {
if time_remaining.is_none() {
Some(Time(Utc::now()))
} else {
None
}
});
let error = result
.into_error()
.map(|e| format!("invalid TempNamepsace resource: {}", e));
TempNamespaceStatus {
created_at,
deleted_at,
error,
}
}
}
fn main() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "roperator=info,temp_namespace=debug,warn");
}
env_logger::init();
let operator_config = OperatorConfig::new(OPERATOR_NAME, PARENT_TYPE)
.with_child(k8s_types::core::v1::Namespace, ChildConfig::on_delete());
let client_config_result = if let Ok(token) = std::env::var("ROPERATOR_AUTH_TOKEN") {
let credentials = Credentials::base64_bearer_token(token);
let (kubeconfig, kubeconfig_path) = KubeConfig::load().expect("failed to load kubeconfig");
let kubeconfig_parent_path = kubeconfig_path.parent().unwrap();
kubeconfig.create_client_config_with_credentials(
OPERATOR_NAME.to_string(),
kubeconfig_parent_path,
credentials,
)
} else {
ClientConfig::from_kubeconfig(OPERATOR_NAME.to_string())
};
let client_config =
client_config_result.expect("failed to resolve cluster data from kubeconfig");
let handler =
DefaultFailableHandler::wrap(TempNsHandler).with_regular_resync(Duration::from_secs(30));
let err =
roperator::runner::run_operator_with_client_config(operator_config, client_config, handler);
log::error!("Error running operator: {}", err);
std::process::exit(1);
}
fn namespace(parent: &TempNamespace) -> Value {
json!({
"apiVersion": "v1",
"kind": "Namespace",
"metadata": {
"name": parent.metadata.name.as_ref().expect("parent name is missing"),
}
})
}
fn deserialize_duration<'de, D>(deserailizer: D) -> Result<Duration, D::Error>
where
D: Deserializer<'de>,
{
let as_str = String::deserialize(deserailizer)?;
if as_str.len() < 2 {
return Err(de::Error::custom(format!(
"invalid duration: '{:?}'",
as_str
)));
}
let (digits, unit) = as_str.split_at(as_str.len() - 1);
let quantity = digits
.parse::<u64>()
.map_err(|e| de::Error::custom(format!("invalid number: '{}'", e)))?;
let multiplier = match unit.to_ascii_uppercase().as_str() {
"S" => 1,
"M" => 60,
"H" => 60 * 60,
"D" => 60 * 60 * 24,
other => return Err(de::Error::custom(format!("invalid unit: '{:?}'", other))),
};
Ok(Duration::from_secs(quantity * multiplier))
}