#[macro_use]
extern crate serde_derive;
use roperator::config::{ClientConfig, Credentials, KubeConfig};
use roperator::prelude::{
k8s_types, ChildConfig, Error, K8sType, OperatorConfig, SyncRequest, SyncResponse,
};
use roperator::serde_json::{json, Value};
use std::time::Duration;
const OPERATOR_NAME: &str = "echoserver-example";
static PARENT_TYPE: &K8sType = &K8sType {
api_version: "example.roperator.com/v1alpha1",
kind: "EchoServer",
plural_kind: "echoservers",
};
#[derive(Deserialize, Clone, Debug, PartialEq)]
pub struct EchoServer {
pub metadata: Metadata,
pub spec: EchoServerSpec,
pub status: Option<EchoServerStatus>,
}
#[derive(Deserialize, Clone, Debug, PartialEq)]
pub struct Metadata {
pub namespace: String,
pub name: String,
}
#[derive(Deserialize, Clone, Debug, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct EchoServerSpec {
pub service_name: String,
}
#[derive(Deserialize, Clone, Debug, PartialEq)]
pub struct EchoServerStatus {
pub message: String,
}
fn main() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "roperator=info,warn");
}
env_logger::init();
let operator_config = OperatorConfig::new(OPERATOR_NAME, PARENT_TYPE)
.with_child(k8s_types::core::v1::Pod, ChildConfig::recreate())
.with_child(k8s_types::core::v1::Service, ChildConfig::replace());
let client_config_result = if let Ok(token) = std::env::var("ROPERATOR_AUTH_TOKEN") {
let credentials = Credentials::base64_bearer_token(token);
let (kubeconfig, kubeconfig_path) = KubeConfig::load().expect("failed to load kubeconfig");
let kubeconfig_parent_path = kubeconfig_path.parent().unwrap();
kubeconfig.create_client_config_with_credentials(
OPERATOR_NAME.to_string(),
kubeconfig_parent_path,
credentials,
)
} else {
ClientConfig::from_kubeconfig(OPERATOR_NAME.to_string())
};
let client_config =
client_config_result.expect("failed to resolve cluster data from kubeconfig");
let err = roperator::runner::run_operator_with_client_config(
operator_config,
client_config,
(handle_sync, handle_error),
);
log::error!("Error running operator: {}", err);
std::process::exit(1);
}
fn handle_sync(request: &SyncRequest) -> Result<SyncResponse, Error> {
log::info!("Got sync request: {:?}", request);
let status = json!({
"message": get_current_status_message(request),
"phase": "Running",
});
let children = get_desired_children(request)?;
Ok(SyncResponse {
status,
children,
resync: None,
})
}
fn handle_error(request: &SyncRequest, err: Error) -> (Value, Option<Duration>) {
log::error!("Failed to process request: {:?}\nCause: {:?}", request, err);
let status = json!({
"message": err.to_string(),
"phase": "Error",
});
(status, None)
}
fn get_current_status_message(request: &SyncRequest) -> String {
let pod = request.children().of_type(("v1", "Pod")).first();
pod.and_then(|p| p.pointer("/status/message").and_then(Value::as_str))
.unwrap_or("Waiting for Pod to be initialized")
.to_owned()
}
fn get_desired_children(request: &SyncRequest) -> Result<Vec<Value>, Error> {
let custom_resource: EchoServer = request.deserialize_parent()?;
let service_name = custom_resource.spec.service_name.as_str();
let echo_server_name = custom_resource.metadata.name.as_str();
let echo_server_namespace = custom_resource.metadata.namespace.as_str();
let pod = json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"namespace": echo_server_namespace,
"name": echo_server_name,
"labels": {
"app.kubernetes.io/name": service_name,
}
},
"spec": {
"containers": [
{
"name": "echo-server",
"image": "kennship/http-echo:latest",
"env": [
{
"name": "SERVICE_NAME",
"value": service_name,
}
],
"resources": {
"requests": {
"cpu": "100m",
"memory": "50Mi"
}
}
}
]
}
});
let service = json!({
"apiVersion": "v1",
"kind": "Service",
"metadata": {
"namespace": echo_server_namespace,
"name": echo_server_name,
},
"spec": {
"type": "ClusterIP",
"selector": {
"app.kubernetes.io/name": service_name,
},
"ports": [
{
"port": 80,
"targetPort": 3000,
}
]
}
});
Ok(vec![pod, service])
}