pub mod cache;
pub mod error;
pub mod events;
pub mod k8s;
use std::error::Error as StdError;
use std::fmt::Debug;
use std::future::Future;
use std::hash::Hash;
use std::result::Result as StdResult;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use futures::stream::StreamExt;
use futures::TryFuture;
use k8s_openapi::NamespaceResourceScope;
use kube::api::ObjectMeta;
use kube::api::Patch;
use kube::api::PatchParams;
use kube::runtime::controller::Action;
use kube::runtime::controller::Error as KubeControllerError;
use kube::runtime::finalizer;
use kube::runtime::finalizer::Event;
use kube::runtime::reflector::ObjectRef;
use kube::runtime::watcher::Config;
use kube::runtime::Controller;
use kube::Api;
use kube::Resource;
use kube::ResourceExt;
use schemars::JsonSchema;
use serde::de::DeserializeOwned;
use serde::Deserialize;
use serde::Serialize;
use crate::cache::ProvideApi;
use crate::error::Error;
use crate::error::Result;
const REQUEUE_AFTER_ERROR_SECONDS: u64 = 60;
type ReconciliationResult<R, RE, QE> = StdResult<(ObjectRef<R>, Action), KubeControllerError<RE, QE>>;
#[async_trait]
pub trait Reconcile<R, C, F, P>: Sized
where
R: Resource<Scope = NamespaceResourceScope> + Serialize + DeserializeOwned + Debug + Clone + Send + Sync + 'static,
R::DynamicType: Default + Eq + Hash + Clone + Debug + Unpin,
C: Context<R, F, P> + 'static,
F: Finalize<R, P>,
P: ProvideApi<R> + 'static,
{
async fn start<G>(self, graceful_trigger: Option<G>)
where
G: Future<Output = ()> + Send + Sync + 'static,
{
let (crd_api, config, context) = self.destruct();
controller(crd_api, config, graceful_trigger)
.run(Self::reconcile, Self::error_policy, context)
.for_each(Self::handle_reconciliation_result)
.await;
}
async fn start_concurrent<G>(self, limit: Option<usize>, graceful_trigger: Option<G>)
where
G: Future<Output = ()> + Send + Sync + 'static,
{
let (crd_api, config, context) = self.destruct();
controller(crd_api, config, graceful_trigger)
.run(Self::reconcile, Self::error_policy, context)
.for_each_concurrent(limit, Self::handle_reconciliation_result)
.await;
}
#[tracing::instrument(
name = "kuberator.reconcile",
skip(resource, context),
fields(
resource_name = %resource.try_name().unwrap_or_default(),
resource_namespace = %resource.try_namespace().unwrap_or_default(),
resource_generation = %resource.meta().generation.unwrap_or(0),
)
)]
async fn reconcile(resource: Arc<R>, context: Arc<C>) -> Result<Action> {
tracing::info!("Reconciliation started");
Ok(context.handle_reconciliation(resource).await?)
}
#[tracing::instrument(
name = "kuberator.error_policy",
skip(resource, error, context),
fields(
resource_name = %resource.try_name().unwrap_or_default(),
resource_namespace = %resource.try_namespace().unwrap_or_default(),
)
)]
fn error_policy(resource: Arc<R>, error: &Error, context: Arc<C>) -> Action {
context.handle_error(resource, error, Self::requeue_after_error_seconds())
}
async fn handle_reconciliation_result<RE, QE>(reconciliation_result: ReconciliationResult<R, RE, QE>)
where
RE: Debug + Send,
QE: Debug + Send,
{
match reconciliation_result {
Ok(resource) => {
tracing::info!("Reconciliation successful. Resource: {resource:?}");
}
Err(error) => {
tracing::error!("Reconciliation error: {error:?}");
}
}
}
fn requeue_after_error_seconds() -> Option<Duration> {
Some(Duration::from_secs(REQUEUE_AFTER_ERROR_SECONDS))
}
fn destruct(self) -> (Api<R>, Config, Arc<C>);
}
fn controller<R, G>(crd_api: Api<R>, config: Config, graceful_trigger: Option<G>) -> Controller<R>
where
R: Resource<Scope = NamespaceResourceScope> + Serialize + DeserializeOwned + Debug + Clone + Send + Sync + 'static,
R::DynamicType: Default + Eq + Hash + Clone + Debug + Unpin,
G: Future<Output = ()> + Send + Sync + 'static,
{
let controller = Controller::new(crd_api, config);
if let Some(trigger) = graceful_trigger {
return controller.graceful_shutdown_on(trigger);
};
controller
}
#[async_trait]
pub trait Context<R, F, P>: Send + Sync
where
R: Resource<Scope = NamespaceResourceScope> + Serialize + DeserializeOwned + Debug + Clone + Send + Sync + 'static,
R::DynamicType: Default,
F: Finalize<R, P>,
P: ProvideApi<R>,
{
#[tracing::instrument(
name = "kuberator.handle_reconciliation",
skip(self, object),
fields(
resource_name = %object.try_name().unwrap_or_default(),
resource_namespace = %object.try_namespace().unwrap_or_default(),
finalizer = %self.finalizer(),
has_deletion_timestamp = %object.meta().deletion_timestamp.is_some(),
)
)]
async fn handle_reconciliation(&self, object: Arc<R>) -> Result<Action> {
let action = self
.k8s_repository()
.finalize(self.finalizer(), object, |event| async {
match event {
Event::Apply(object) => self.handle_apply(object).await,
Event::Cleanup(object) => self.handle_cleanup(object).await,
}
})
.await?;
Ok(action)
}
fn handle_error(&self, object: Arc<R>, error: &Error, requeue: Option<Duration>) -> Action {
tracing::error!(
resource_name = %object.try_name().unwrap_or_default(),
resource_namespace = %object.try_namespace().unwrap_or_default(),
error = ?error,
"Reconciliation error"
);
requeue.map_or_else(Action::await_change, Action::requeue)
}
fn k8s_repository(&self) -> Arc<F>;
fn finalizer(&self) -> &'static str;
async fn handle_apply(&self, object: Arc<R>) -> Result<Action>;
async fn handle_cleanup(&self, object: Arc<R>) -> Result<Action>;
}
#[async_trait]
pub trait Finalize<R, P>: Send + Sync
where
R: Resource<Scope = NamespaceResourceScope> + Serialize + DeserializeOwned + Debug + Clone + Send + Sync + 'static,
R::DynamicType: Default,
P: ProvideApi<R>,
{
fn api_provider(&self) -> &P;
#[tracing::instrument(
name = "kuberator.finalize",
skip(self, object, reconcile),
fields(
resource_name = %object.try_name().unwrap_or_default(),
resource_namespace = %object.try_namespace().unwrap_or_default(),
finalizer = %finalizer_name,
)
)]
async fn finalize<ReconcileFut>(
&self,
finalizer_name: &str,
object: Arc<R>,
reconcile: impl FnOnce(Event<R>) -> ReconcileFut + Send,
) -> Result<Action>
where
ReconcileFut: TryFuture<Ok = Action> + Send,
ReconcileFut::Error: StdError + Send + 'static,
{
let api = self.api_provider().get(&object.try_namespace()?)?;
finalizer(&api, finalizer_name, object, reconcile)
.await
.map_err(Error::from)
}
#[tracing::instrument(
name = "kuberator.update_status",
skip(self, object, status),
fields(
resource_name = %object.try_name().unwrap_or_default(),
resource_namespace = %object.try_namespace().unwrap_or_default(),
resource_generation = %object.meta().generation.unwrap_or(0),
)
)]
async fn update_status<S>(&self, object: &R, mut status: S) -> Result<()>
where
S: Serialize + ObserveGeneration + Debug + Send + Sync,
{
let api = self.api_provider().get(&object.try_namespace()?)?;
status.with_observed_gen(object.meta());
let new_status = Status { status };
api.patch_status(object.try_name()?, &PatchParams::default(), &Patch::Merge(&new_status))
.await?;
tracing::debug!("Status updated successfully");
Ok(())
}
#[tracing::instrument(
name = "kuberator.update_status_with_error",
skip(self, object, status, error),
fields(
resource_name = %object.try_name().unwrap_or_default(),
resource_namespace = %object.try_namespace().unwrap_or_default(),
resource_generation = %object.meta().generation.unwrap_or(0),
has_error = %error.is_some(),
)
)]
async fn update_status_with_error<S, A, E>(&self, object: &R, mut status: S, error: Option<&A>) -> Result<()>
where
S: Serialize + ObserveGeneration + WithStatusError<A, E> + Debug + Send + Sync,
A: AsStatusError<E> + Send + Sync,
E: Serialize + Debug + PartialEq + Clone + JsonSchema,
E: for<'de> Deserialize<'de>,
{
if let Some(error) = error {
status.with_status_error(error);
}
self.update_status(object, status).await
}
}
#[derive(Debug, Serialize)]
struct Status<S>
where
S: Serialize + ObserveGeneration + Debug,
{
status: S,
}
pub trait ObserveGeneration {
fn add(&mut self, observed_generation: i64);
fn with_observed_gen(&mut self, meta: &ObjectMeta) {
let observed_generation = meta.generation;
if let Some(observed_generation) = observed_generation {
self.add(observed_generation)
}
}
}
pub trait AsStatusError<E>
where
E: Serialize + Debug + PartialEq + Clone + JsonSchema,
E: for<'de> Deserialize<'de>,
{
fn as_status_error(&self) -> E;
}
pub trait WithStatusError<A, E>
where
A: AsStatusError<E>,
E: Serialize + Debug + PartialEq + Clone + JsonSchema,
E: for<'de> Deserialize<'de>,
{
fn add(&mut self, error: E);
fn with_status_error(&mut self, error: &A) {
self.add(error.as_status_error());
}
}
pub trait TryResource {
fn try_name(&self) -> Result<&str>;
fn try_namespace(&self) -> Result<String>;
}
impl<R> TryResource for R
where
R: Resource,
{
fn try_name(&self) -> Result<&str> {
self.meta().name.as_deref().ok_or(Error::UnnamedObject)
}
fn try_namespace(&self) -> Result<String> {
self.namespace().ok_or(Error::UserInput({
"Expected resource to be namespaced. Can't deploy to an unknown namespace.".to_owned()
}))
}
}
#[cfg(test)]
mod tests {
use super::*;
use k8s_openapi::api::core::v1::ConfigMap;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone)]
struct TestStatus {
#[allow(dead_code)]
state: String,
observed_generation: Option<i64>,
}
impl ObserveGeneration for TestStatus {
fn add(&mut self, observed_generation: i64) {
self.observed_generation = Some(observed_generation);
}
}
#[derive(Debug, PartialEq, Clone)]
enum TestError {
NotFound,
InvalidInput(String),
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, JsonSchema)]
struct TestStatusError {
message: String,
code: i32,
}
impl AsStatusError<TestStatusError> for TestError {
fn as_status_error(&self) -> TestStatusError {
match self {
TestError::NotFound => TestStatusError {
message: "Resource not found".to_string(),
code: 404,
},
TestError::InvalidInput(msg) => TestStatusError {
message: format!("Invalid input: {}", msg),
code: 400,
},
}
}
}
#[derive(Debug, Clone)]
struct TestStatusWithError {
#[allow(dead_code)]
state: String,
error: Option<TestStatusError>,
}
impl WithStatusError<TestError, TestStatusError> for TestStatusWithError {
fn add(&mut self, error: TestStatusError) {
self.error = Some(error);
}
}
#[tokio::test]
async fn test_observe_generation_add() {
let mut status = TestStatus {
state: "Running".to_string(),
observed_generation: None,
};
status.add(42);
assert_eq!(status.observed_generation, Some(42));
}
#[tokio::test]
async fn test_observe_generation_with_observed_gen() {
let mut status = TestStatus {
state: "Running".to_string(),
observed_generation: None,
};
let meta = ObjectMeta {
generation: Some(123),
..Default::default()
};
status.with_observed_gen(&meta);
assert_eq!(status.observed_generation, Some(123));
}
#[tokio::test]
async fn test_observe_generation_with_observed_gen_no_generation() {
let mut status = TestStatus {
state: "Running".to_string(),
observed_generation: None,
};
let meta = ObjectMeta {
generation: None,
..Default::default()
};
status.with_observed_gen(&meta);
assert_eq!(status.observed_generation, None);
}
#[tokio::test]
async fn test_as_status_error_not_found() {
let error = TestError::NotFound;
let status_error = error.as_status_error();
assert_eq!(status_error.message, "Resource not found");
assert_eq!(status_error.code, 404);
}
#[tokio::test]
async fn test_as_status_error_invalid_input() {
let error = TestError::InvalidInput("Bad format".to_string());
let status_error = error.as_status_error();
assert_eq!(status_error.message, "Invalid input: Bad format");
assert_eq!(status_error.code, 400);
}
#[tokio::test]
async fn test_with_status_error_add() {
let mut status = TestStatusWithError {
state: "Running".to_string(),
error: None,
};
let error = TestStatusError {
message: "Something went wrong".to_string(),
code: 500,
};
status.add(error.clone());
assert_eq!(status.error, Some(error));
}
#[tokio::test]
async fn test_with_status_error_with_status_error() {
let mut status = TestStatusWithError {
state: "Running".to_string(),
error: None,
};
let error = TestError::InvalidInput("Missing field".to_string());
status.with_status_error(&error);
assert!(status.error.is_some());
let status_error = status.error.unwrap();
assert_eq!(status_error.message, "Invalid input: Missing field");
assert_eq!(status_error.code, 400);
}
#[tokio::test]
async fn test_try_resource_try_name_success() {
let config_map = ConfigMap {
metadata: ObjectMeta {
name: Some("my-config".to_string()),
namespace: Some("default".to_string()),
..Default::default()
},
..Default::default()
};
let result = config_map.try_name();
assert!(result.is_ok());
assert_eq!(result.unwrap(), "my-config");
}
#[tokio::test]
async fn test_try_resource_try_name_unnamed() {
let config_map = ConfigMap {
metadata: ObjectMeta {
name: None,
namespace: Some("default".to_string()),
..Default::default()
},
..Default::default()
};
let result = config_map.try_name();
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), Error::UnnamedObject));
}
#[tokio::test]
async fn test_try_resource_try_namespace_success() {
let config_map = ConfigMap {
metadata: ObjectMeta {
name: Some("my-config".to_string()),
namespace: Some("production".to_string()),
..Default::default()
},
..Default::default()
};
let result = config_map.try_namespace();
assert!(result.is_ok());
assert_eq!(result.unwrap(), "production");
}
#[tokio::test]
async fn test_try_resource_try_namespace_missing() {
let config_map = ConfigMap {
metadata: ObjectMeta {
name: Some("my-config".to_string()),
namespace: None,
..Default::default()
},
..Default::default()
};
let result = config_map.try_namespace();
assert!(result.is_err());
if let Err(Error::UserInput(msg)) = result {
assert!(msg.contains("Expected resource to be namespaced"));
} else {
panic!("Expected UserInputError");
}
}
use crate::cache::{CachingStrategy, StaticApiProvider};
use kube::client::Body;
use kube::runtime::controller::Action;
use kube::CustomResource;
use kube::{Api, Client};
use std::time::Duration;
use tower_test::mock;
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema)]
#[kube(
group = "test.kuberator.io",
version = "v1",
kind = "TestResource",
plural = "testresources",
namespaced
)]
struct TestResourceSpec {
value: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema)]
#[allow(dead_code)]
struct TestResourceStatus {
state: String,
observed_generation: Option<i64>,
}
impl ObserveGeneration for TestResourceStatus {
fn add(&mut self, observed_generation: i64) {
self.observed_generation = Some(observed_generation);
}
}
struct MockFinalize {
api_provider: StaticApiProvider<TestResource>,
}
#[async_trait]
impl Finalize<TestResource, StaticApiProvider<TestResource>> for MockFinalize {
fn api_provider(&self) -> &StaticApiProvider<TestResource> {
&self.api_provider
}
}
struct MockContext {
finalize: Arc<MockFinalize>,
apply_called: Arc<std::sync::Mutex<bool>>,
cleanup_called: Arc<std::sync::Mutex<bool>>,
}
#[async_trait]
impl Context<TestResource, MockFinalize, StaticApiProvider<TestResource>> for MockContext {
fn k8s_repository(&self) -> Arc<MockFinalize> {
Arc::clone(&self.finalize)
}
fn finalizer(&self) -> &'static str {
"test.kuberator.io/finalizer"
}
async fn handle_apply(&self, _object: Arc<TestResource>) -> Result<Action> {
*self.apply_called.lock().unwrap() = true;
Ok(Action::await_change())
}
async fn handle_cleanup(&self, _object: Arc<TestResource>) -> Result<Action> {
*self.cleanup_called.lock().unwrap() = true;
Ok(Action::await_change())
}
}
fn test_client_for_traits() -> Client {
let (mock_service, _handle) = mock::pair::<http::Request<Body>, http::Response<hyper::body::Incoming>>();
Client::new(mock_service, "default")
}
#[tokio::test]
async fn test_finalize_api_provider() {
let client = test_client_for_traits();
let api_provider = StaticApiProvider::new(client, vec!["default"], CachingStrategy::Strict);
let finalize = MockFinalize { api_provider };
let provider = finalize.api_provider();
let result = provider.get("default");
assert!(result.is_ok());
}
#[tokio::test]
async fn test_context_k8s_repository() {
let client = test_client_for_traits();
let api_provider = StaticApiProvider::new(client, vec!["default"], CachingStrategy::Strict);
let finalize = Arc::new(MockFinalize { api_provider });
let context = MockContext {
finalize: Arc::clone(&finalize),
apply_called: Arc::new(std::sync::Mutex::new(false)),
cleanup_called: Arc::new(std::sync::Mutex::new(false)),
};
let repo = context.k8s_repository();
assert!(repo.api_provider().get("default").is_ok());
}
#[tokio::test]
async fn test_context_finalizer() {
let client = test_client_for_traits();
let api_provider = StaticApiProvider::new(client, vec!["default"], CachingStrategy::Strict);
let finalize = Arc::new(MockFinalize { api_provider });
let context = MockContext {
finalize,
apply_called: Arc::new(std::sync::Mutex::new(false)),
cleanup_called: Arc::new(std::sync::Mutex::new(false)),
};
let finalizer_name = context.finalizer();
assert_eq!(finalizer_name, "test.kuberator.io/finalizer");
}
#[tokio::test]
async fn test_context_handle_apply() {
let client = test_client_for_traits();
let api_provider = StaticApiProvider::new(client, vec!["default"], CachingStrategy::Strict);
let finalize = Arc::new(MockFinalize { api_provider });
let apply_called = Arc::new(std::sync::Mutex::new(false));
let context = MockContext {
finalize,
apply_called: Arc::clone(&apply_called),
cleanup_called: Arc::new(std::sync::Mutex::new(false)),
};
let test_resource = TestResource::new(
"test-resource",
TestResourceSpec {
value: "test-value".to_string(),
},
);
let result = context.handle_apply(Arc::new(test_resource)).await;
assert!(result.is_ok());
assert!(*apply_called.lock().unwrap());
}
#[tokio::test]
async fn test_context_handle_cleanup() {
let client = test_client_for_traits();
let api_provider = StaticApiProvider::new(client, vec!["default"], CachingStrategy::Strict);
let finalize = Arc::new(MockFinalize { api_provider });
let cleanup_called = Arc::new(std::sync::Mutex::new(false));
let context = MockContext {
finalize,
apply_called: Arc::new(std::sync::Mutex::new(false)),
cleanup_called: Arc::clone(&cleanup_called),
};
let test_resource = TestResource::new(
"test-resource",
TestResourceSpec {
value: "test-value".to_string(),
},
);
let result = context.handle_cleanup(Arc::new(test_resource)).await;
assert!(result.is_ok());
assert!(*cleanup_called.lock().unwrap());
}
#[tokio::test]
async fn test_context_handle_error() {
let client = test_client_for_traits();
let api_provider = StaticApiProvider::new(client, vec!["default"], CachingStrategy::Strict);
let finalize = Arc::new(MockFinalize { api_provider });
let context = MockContext {
finalize,
apply_called: Arc::new(std::sync::Mutex::new(false)),
cleanup_called: Arc::new(std::sync::Mutex::new(false)),
};
let test_resource = TestResource::new(
"test-resource",
TestResourceSpec {
value: "test-value".to_string(),
},
);
let error = Error::UserInput("Test error".to_string());
let action = context.handle_error(Arc::new(test_resource.clone()), &error, Some(Duration::from_secs(30)));
let expected_requeue = Action::requeue(Duration::from_secs(30));
assert_eq!(action, expected_requeue);
let action_no_requeue = context.handle_error(Arc::new(test_resource), &error, None);
let expected_await_change = Action::await_change();
assert_eq!(action_no_requeue, expected_await_change);
}
#[tokio::test]
async fn test_reconcile_requeue_after_error_seconds() {
struct MockReconcile;
impl Reconcile<TestResource, MockContext, MockFinalize, StaticApiProvider<TestResource>> for MockReconcile {
fn destruct(self) -> (Api<TestResource>, kube::runtime::watcher::Config, Arc<MockContext>) {
unimplemented!("Not needed for this test")
}
}
let duration = MockReconcile::requeue_after_error_seconds();
assert_eq!(duration, Some(Duration::from_secs(60)));
}
}