KOPRS - Kubernetes Operators Rust
A reusable, ergonomic library that eliminates Kubernetes operator boilerplate by providing generic implementations of the most common patterns on top of kube-rs.
Architecture Overview
koprs is an opinionated, high-level orchestration framework built directly on top of kube and kube-runtime. It encapsulates SSA patterns, controller lifecycle management, garbage collection, watcher logic, and a strongly typed error model, all with built-in tracing instrumentation, so you can focus purely on your custom business logic.
| |
| () |
|
| |
| () |
|
| |
| () |
What koprs adds over plain kube-rs
| Area | koprs | plain kube / kube-runtime |
|---|---|---|
| Controller bootstrap | ControllerBuilder — health probes, leader election, graceful shutdown, timeouts, concurrency, secondary watches, all composable |
Raw Controller::new().run(...) stream, no operational skeleton |
| Apply / ensure | apply_resource, ensure_resource (SSA), EnsureOutcome<T> (Created / Updated / Unchanged) |
api.patch() — caller builds every PatchParams and branches on 404 manually |
| Status patching | patch_status_namespaced / cluster variants; KoprsCondition derives JsonSchema for direct CRD embedding |
api.patch_status() exists; no ready-made condition type with JsonSchema |
| Finalizers | add_finalizer / remove_finalizers — idempotent merge-patch, no-op if already present/absent |
No helpers; callers patch the finalizer list themselves |
| Garbage collection | gc_resources — list by label selector, delete orphans, clear finalizers on stuck-terminating resources |
Not provided |
| Event recording | record_event with EventType::Normal / Warning |
Not provided |
| Owner references | owner_ref, controller_ref, set_owner_refs; make_object_ref_mapper, owner_label_mapper for cross-resource reconcile triggers |
OwnerReference struct exists; no builder or mapper helpers |
| Scope markers | Cluster / Namespaced compile-time markers resolve to the right Api constructor |
Callers choose Api::all vs Api::namespaced at every call site |
| Metadata builder | Fluent ObjectMetaBuilder |
ObjectMeta { name: Some(...), labels: Some(BTreeMap::from([...])), ..Default::default() } |
| Watcher abstraction | watch (signal), watch_objects (resource data), watch_events (applied + deleted) — mpsc channels, backoff, tracing included |
Raw watcher() stream; callers wire mpsc, backoff, and error handling themselves |
| Generic bounds | KubeResource blanket trait collapses Clone + Debug + Resource<DynamicType=()> + DeserializeOwned + Serialize + Send + Sync + 'static to one name |
Full bound wall on every generic function |
| Error type | KubeGenericError unifies kube::Error, serde_json::Error, io::Error, and internal errors |
Each operator defines its own error type |
Features
Controller framework
Reconcilertrait — implementreconcilefor your CRD;error_policydefaults to requeue after 30 sControllerBuilder— one fluent builder that wires the reconcile loop; all methods are optional and composable:
| Method | What it provides |
|---|---|
.health_port(port) |
GET /healthz (liveness) + GET /readyz (readiness) HTTP server |
.graceful_shutdown() |
Clean stop on SIGTERM or Ctrl+C |
.leader_election(ns, name) |
Kubernetes Lease-based HA — only one replica reconciles at a time |
.leader_election_timings(dur, renew, retry) |
Override lease duration, renew period, and retry period (call after .leader_election()) |
.reconcile_timeout(dur) |
Cancel and requeue reconciles that exceed the duration |
.concurrency(n) |
Cap concurrent reconciles across all objects (default: unbounded; a single object is never reconciled concurrently regardless) |
.watch(api, config, mapper) |
Trigger re-queues from a secondary resource via a mapper function; calls compose |
.owns(api, config) |
Trigger re-queues from a child resource via Kubernetes owner references |
.with_watches(fn) |
Raw kube_runtime::Controller access for advanced watch configuration |
.label_selector(selector) |
Filter the primary resource watch by label |
.watcher_config(config) |
Replace the default watcher configuration for the primary watch |
use Duration;
use Deployment;
use ConfigMap;
use ;
use ;
use owner_label_mapper;
let client = try_default.await?;
let ctx = new;
new
// re-queue the CR whenever a managed Deployment changes (owner reference)
.owns
// re-queue the CR whenever a managed ConfigMap carrying the owner label changes
.watch
.health_port
.graceful_shutdown
.leader_election
.leader_election_timings
.reconcile_timeout
.concurrency // at most 4 objects reconciled simultaneously
.run
.await?;
Installation
[]
= { = "../koprs" }
# or once published:
# koprs = "<version>"
Module overview
| Module | Description |
|---|---|
resources |
Apply, delete, get, list, poll, patch labels/annotations, and fetch resources |
status |
Patch /status subresource via SSA; KoprsCondition type, make_condition and upsert_condition helpers |
meta |
ObjectMetaBuilder — fluent builder for ObjectMeta |
finalizers |
Add and remove finalizers |
gc |
Garbage collect orphaned resources |
watcher |
watch (signal), watch_objects (resource data), watch_events (applied + deleted); WatchEvent<T> type |
owners |
Owner references, child wiring, ObjectRef sets, owner_label_mapper, and mapper closures |
scope |
Cluster and Namespaced scope markers for compile-time API selection |
traits |
KubeResource, NamespacedResource, ClusterResource trait aliases; is_being_deleted helper |
error |
KubeGenericError enum |
Usage
Every operation takes an explicit scope argument — either Namespaced("ns") for namespace-scoped resources or Cluster for cluster-scoped ones. The scope is passed at the call site rather than encoded in the function name, so the routing is always visible.
Apply and delete
use ;
use ;
// Namespaced resource
.await?;
// Returns Ok(false) if the resource was already gone
let deleted = .await?;
// Cluster-scoped resource — same function, different scope marker
let deleted = .await?;
Finalizers
use ;
use Namespaced;
// add_finalizer_namespaced extracts the namespace from the resource metadata —
// no-op if the finalizer is already present, safe to call on every reconcile.
.await?;
// Removing finalizers uses the generic scope form — pass Cluster for cluster-scoped resources.
.await?;
Status
KoprsCondition derives JsonSchema so it can be embedded directly in a CustomResource-derived status struct — no mirror type or manual conversions required.
Include all status fields — scalars and conditions — in a single patch_status_namespaced call. Using separate patches with the same field manager causes each one to drop the other's fields on every reconcile, producing an endless watch-event loop.
upsert_condition preserves lastTransitionTime when the condition status has not changed, so the patch is idempotent and does not bump resourceVersion unnecessarily.
use ;
use JsonSchema;
use ;
// KoprsCondition is used directly — no mirror type needed.
let mut conditions = cr.status.as_ref
.map
.unwrap_or_default;
upsert_condition;
.await?;
Metadata builder
ObjectMetaBuilder replaces the verbose ObjectMeta { name: Some(...), labels: Some(BTreeMap::from([...])), ..Default::default() } construction pattern:
use ObjectMetaBuilder;
let meta = new
.name
.namespace
.label
.label
.build;
Deletion guard
Use is_being_deleted at the top of the reconcile loop to branch into the cleanup path:
use is_being_deleted;
use remove_finalizers;
use Namespaced;
if is_being_deleted
Garbage collection
Accepts a keep-predicate: any resource matching the label selector for which the predicate returns false is deleted.
use gc_resources;
use ;
// Namespaced
.await?;
// Cluster-scoped — same function, Cluster scope
.await?;
Watcher
Three functions cover progressively richer data, all sharing the same scope + optional label-selector signature:
Signal only — watch
Cheapest option. Sends () on every ADDED or MODIFIED event. Use this to re-queue a reconcile when a child resource changes.
use watch;
use Namespaced;
use mpsc;
let = channel;
let _handle = .await?;
while let Some = rx.recv.await
Resource data on applies — watch_objects
Sends the full resource T on every ADDED or MODIFIED event. Use this to maintain a local cache without a follow-up GET. Deletions are not reported.
use watch_objects;
use Namespaced;
use mpsc;
let = channel;
let _handle = .await?;
while let Some = rx.recv.await
Full event model — watch_events
Sends WatchEvent<T> for every event, including deletions. Objects observed during a watch restart arrive as Applied so the stream is always consistent.
use ;
use ;
use mpsc;
let = channel;
// Namespaced with label filter
let _handle = .await?;
// Cluster-scoped, no filter
let _handle = .await?;
while let Some = rx.recv.await
List and poll
use ;
use ;
use ListParams;
use Duration;
// List in a namespace with a label filter
let items = .await?;
// List across all namespaces (or cluster-scoped resources) with a field filter
let items = .await?;
// Names only — useful for GC diffing
let names = .await?;
// Poll until at least one resource exists
let items = .await?;
Cross-resource watches and ownership
.watch() — secondary trigger wiring
Use .watch() on ControllerBuilder to re-queue a CR whenever a secondary resource changes. Multiple calls compose — all watches are active simultaneously.
owner_label_mapper covers the most common pattern: the trigger resource carries a label whose value is the name of the CR to re-queue, and its namespace is where the CR lives.
use ;
use owner_label_mapper;
new
// Re-queue owning CR when a managed ConfigMap changes.
.watch
// Chain a second watch for a different resource type — both are active.
.watch
// Use .with_watches() for full kube-runtime Controller access when needed.
.with_watches
.run
.await?;
Owner references
use ;
use Namespaced;
use Arc;
let oref = controller_ref?;
set_owner_refs;
let refs = .await?;
let mapper = ;
Labels, annotations, and namespaces
use ;
use Namespaced;
.await?;
.await?;
ensure_namespace.await?;
Error handling
All functions return Result<T, KubeGenericError>:
KubeGenericError implements std::error::Error via thiserror and composes with the ? operator. Variants are pattern-matchable for cases where you need to handle specific failures — for example, distinguishing a missing resource from a permission error:
use KubeGenericError;
use delete_resource;
use Cluster;
match .await
Testing
Unit tests
Unit tests use tower_test::mock to intercept HTTP requests and inject
hand-crafted JSON responses — no cluster or kubeconfig needed:
Enable log output:
RUST_LOG=koprs=debug
Tests are organised one file per module under src/tests/:
src/tests/
├── mod.rs
├── resources.rs
├── status.rs
├── meta.rs
├── finalizers.rs
├── gc.rs
├── owners.rs
├── watcher.rs
├── scope.rs
├── traits.rs
└── error.rs
To write your own tests, create a mock (Client, Handle) pair with
tower_test::mock::pair and serve responses from a background task:
use ;
use Body;
use Client;
use json;
use mock;
type MockHandle = Handle;
async
The mock handle serves requests in FIFO order. Functions that make multiple
API calls (such as the GC loop: list → delete → patch) require one
handle.next_request() call per request in the correct sequence.
Integration tests
Integration tests run against a real cluster and are gated behind the
integration feature flag. The test functions are always compiled so type
errors are caught by cargo check, but they only execute when the feature
is enabled:
# Verify the integration tests compile without a cluster
# Create a local cluster
# Run
# Tear down
Each test creates resources with a unique name suffix and cleans up after
itself, so the suite is safe to run with --test-threads greater than one.
License
MIT