Kuberator - Kubernetes Operator Framework
Kuberatoris a Kubernetes Operator Framework designed to simplify the process of
building Kubernetes Operators. It is still in its early stages and a work in progress.
Features
- Trait-based Architecture - Clean separation of concerns with
Finalize,Context, andReconciletraits - API Caching - Lock-free caching strategies for optimal performance (Strict, Adhoc, Extendable)
- Generic Repository - Zero-boilerplate
K8sRepositoryimplementation - Event Emission - Built-in Kubernetes Event creation via
events.k8s.io/v1with automatic deduplication - Status Management - Observed Generation Pattern with error handling support
- Graceful Shutdown - Signal-based controller termination
- Concurrent Reconciliation - Configurable parallelism with
start_concurrent()
Usage
It's best to follow an example to understand how to use kuberator in it's current form.
use Arc;
use async_trait;
use Action;
use Config;
use Api;
use Client;
use CustomResource;
use StaticApiProvider;
use CachingStrategy;
use K8sRepository;
use Result as KubeResult;
use Context;
use Finalize;
use Reconcile;
use JsonSchema;
use Deserialize;
use Serialize;
// First, we need to define a custom resource that the operator will manage.
// The core of the operator is the implementation of the [Reconcile] trait, which requires
// us to first implement the [Context] and [Finalize] traits for certain structs.
// Option 1: Use the generic K8sRepository (recommended for simple cases)
type MyK8sRepo = ;
// Option 2: Create a custom repository (if you need custom state or methods)
// struct MyK8sRepo {
// api_provider: StaticApiProvider<MyCrd>,
// }
//
// impl Finalize<MyCrd, StaticApiProvider<MyCrd>> for MyK8sRepo {
// fn api_provider(&self) -> &StaticApiProvider<MyCrd> {
// &self.api_provider
// }
// }
// The [Context] trait must be implemented on a struct that serves as the core of the
// operator. It contains the logic for handling the custom resource object, including
// creation, updates, and deletion.
// The final step is to implement the [Reconcile] trait on a struct that holds the context.
// The Reconciler is responsible for starting the controller runtime and managing the
// reconciliation loop.
// The [destruct] function is used to retrieve the Api, Config, and context.
// And that’s basically it!
// Now we can wire everything together in the main function and start the reconciler.
// It will continuously watch for custom resource objects and invoke the [handle_apply] and
// [handle_cleanup] functions as part of the reconciliation loop.
async
Caching Strategies
Kuberator provides flexible API caching strategies through the CachingStrategy enum to optimize
performance based on your deployment requirements:
CachingStrategy::Strict (Recommended for Production)
- Lock-free - Zero contention, fastest possible (~5ns per lookup)
- Pre-caches all specified namespaces at startup
- Returns error if accessing uncached namespace
- Use when: All namespaces are known upfront in production deployments
new
CachingStrategy::Adhoc
- Lock-free - Pre-caches specified namespaces (~5ns), creates others on-the-fly (~100ns)
- Like Strict but doesn't error on uncached namespaces - creates them as needed
- APIs are NOT cached after creation (created fresh each time)
- Use when: You know some namespaces upfront but need flexibility for others
// Pre-cache common namespaces, allow others dynamically
new
CachingStrategy::Extendable
- RwLock-based - Lazy loading with automatic caching (~10-15ns after first access)
- Caches new namespaces as they're discovered
- Use when: Namespaces are discovered at runtime but will be reused
new
For advanced use cases, consider using CachedApiProvider which provides dynamic namespace
discovery with RwLock-based lazy loading.
Graceful Shutdown
The second parameter of the start and start_concurrent methods is an optional graceful shutdown
signal. You can pass a future that resolves when you want to shut down the reconciler, like an OS
signal, e.g. SIGTERM.
use signal;
use select;
let shutdown_signal = async ;
reconciler.start.await;
Event Emission
Emit Kubernetes Events via the events.k8s.io/v1 API with built-in deduplication. Events appear in
kubectl describe and kubectl events output and provide visibility into operator operations.
Identical events (same object, type, reason, and action) within a 6-minute window are automatically
deduplicated: the first occurrence creates a new Event, subsequent ones PATCH the existing Event's
series.count.
Quick Example
use ;
use ;
// Define your event reasons
// Setup event recorder (note: use events::v1::Event, not core::v1::Event)
let event_api_provider = new;
let event_recorder = new;
// Emit events in reconciliation
event_recorder.emit.await;
Events never fail reconciliation—errors are logged as warnings. Use MockEventRecorder for testing.
RBAC Requirements
Your operator's ClusterRole needs permission to create and patch events in the events.k8s.io API group:
- apiGroups:
resources:
verbs:
Viewing Events
Use kubectl events (not kubectl get events) to see events.k8s.io/v1 events with dedup counts:
See module documentation for detailed usage.
Error Handling
Kuberator provides a dedicated error type. When implementing [Reconcile::handle_apply] and [Reconcile::handle_cleanup], you must return this error in your Result, or use [kuberator::error::Result] directly.
To convert your custom error into a Kuberator error, implement From for your error
type and wrap it using Error::Anyhow.
Your error.rs file could look something like this:
use Debug;
use Error as KubeError;
use Error as ThisError;
pub type Result<T> = Result;
With this approach, you can conveniently handle your custom errors using the ? operator
and return them as Kuberator errors.
Status Object Handling
Kuberator provides helper methods to facilitate the Observed Generation Pattern. To use this pattern, you need to implement the ObserveGeneration trait for your status object.
Let's say this is your status.rs file:
use ObserveGeneration;
With this implementation, you can utilize the update_status() method provided by the
[Finalize] trait.
This allows you to:
(a) Keep your resource status up to date.
(b) Compare it against the current generation of the resource (object.meta().generation)
to determine whether you have already processed this version or if it is a new show in
the reconciliation cycle.
This pattern is particularly useful for ensuring idempotency in your reconciliation logic.
Status Object Error handling
If you want to handle errors in your status object, you can implement the WithStatusError trait.
This trait allows you to define how errors should be handled and reported in the status object.
use WithStatusError;
use AsStatusError;
use Deserialize;
use Serialize;
use JsonSchema;
// You have a custom error type
// Additionally, you have you status object with a specific status error
// Implement the `AsStatusError` trait for your custom error type
// And implement the `WithStatusError` trait that links your error and status error
With this implementation, you can utilize the update_status_with_error() method provided by the
[Finalize] trait. This method takes care of adding the (optional) error to your status object.
This way, you can handle errors in your status object and report them accordingly. You control how
errors are represented in the status object from the source up, by implementing AsStatusError for
your error type.