koprs 0.7.0

A reusable, ergonomic library that streamlines Kubernetes operator development, allowing developers to build controllers with significantly less code.
Documentation

KOPRS - Kubernetes Operators Rust

A reusable, ergonomic library that streamlines Kubernetes operator development. By providing generic implementations for the most common operator patterns, it eliminates widespread boilerplate across your codebase. It integrates tightly with the kube-rs ecosystem to handle repetitive operational scaffolding, allowing developers to build reliable controllers with significantly less code.

Architecture Overview

koprs is an opinionated, high-level orchestration framework built directly on top of kube and kube-runtime. While kube provides type-safe Kubernetes API bindings and kube-runtime delivers the controller primitives, koprs abstracts away the repetitive boilerplate required to build production ready controllers.

It encapsulates complex infrastructure orchestration loops, robust Server-Side Apply (SSA) patterns, and automated background garbage collection/cleanup processes out of your controller's core codebase. Additionally, it streamlines state synchronization with ready to use watcher logic and provides a strongly typed error handling model that removes the friction of building custom Kubernetes error variants from scratch. Every generic operation comes out of the box with structured, built-in tracing instrumentation, giving you deep visibility into your controller's execution paths without additional setup.

By lifting these structural requirements off your shoulders, koprs leaves you free to focus purely on your custom business logic.

+-------------------------------------------------------+
|                 Your Operator App                     |
|  (Business Logic, Sync Mode Matching, Storage Rules)  |
+-------------------------------------------------------+
                           |
                           v  [Turbofish Types Passed Down]
+-------------------------------------------------------+
|                    koprs                        |
|  (Generic SSA, Lifecycle Helpers, Status Patching)    |
+-------------------------------------------------------+
                           |
                           v
+-------------------------------------------------------+
|                      kube-rs                          |
|         (Low-level Kubernetes API Engine)             |
+-------------------------------------------------------+

Features

  • Apply & delete — cluster-scoped and namespaced resources via Server-Side Apply (SSA)
  • Get — fetch a single resource by name, returning Option<T> (None on 404)
  • Status patching — patch the /status subresource of any CRD, cluster-scoped or namespaced
  • Finalizers — add and remove finalizers on cluster-scoped and namespaced resources
  • Garbage collection — diff-based GC for orphaned cluster and namespaced resources, with stuck-termination recovery
  • Watchers — watch any resource type with optional label filtering, signal-based via mpsc
  • Listing — list resources across namespaces or within a namespace, with or without label selectors
  • Ownership & controller wiring — build OwnerReferences, set owner refs on children, generate ObjectRef sets, and create mapper closures for cross-resource reconcile triggers
  • Status conditionsmake_condition builds a Condition with the current timestamp; upsert_condition merges it with lastTransitionTime preservation. Include conditions in your status struct and patch them with patch_status_*
  • Patch labels / annotations — merge labels or annotations onto any resource without replacing existing ones
  • Persist to disk — fetch a resource list and write it as JSON to a file
  • Typed errorsKubeGenericError enum via thiserror, pattern-matchable by callers

Installation

[dependencies]
koprs = { path = "../koprs" }
# or once published:
# koprs = "<version>"

Module overview

Module Description
resources Apply, delete, get, list, poll, patch labels/annotations, and fetch resources
status Patch /status subresource via SSA; make_condition and upsert_condition helpers
finalizers Add and remove finalizers
gc Garbage collect orphaned resources
watcher Watch resources for changes via mpsc signals
owners Owner references, child wiring, ObjectRef sets, and mapper closures
scope Cluster and Namespaced scope markers for compile-time API selection
traits KubeResource, NamespacedResource, ClusterResource trait aliases
error KubeGenericError enum

Usage

Most operations come in three forms: _namespaced (most common), _cluster, and a generic scope form that accepts Namespaced("ns") or Cluster. The examples below show the namespaced form; the others follow the same signature.

Apply and delete

use koprs::resources::{apply_namespaced_resource, delete_namespaced_resource};

apply_namespaced_resource::<MyCR>(client.clone(), "my-ns", &resource, "my-operator").await?;

// Returns Ok(false) if the resource was already gone
let deleted = delete_namespaced_resource::<MyCR>(client.clone(), "my-ns", "my-cr").await?;

Finalizers

use koprs::finalizers::{add_finalizer_namespaced, remove_finalizers_namespaced};

// No-op if the finalizer is already present — safe to call on every reconcile.
add_finalizer_namespaced::<MyCR>(client.clone(), &cr, "my-operator/cleanup").await?;
remove_finalizers_namespaced::<MyCR>(client.clone(), "my-ns", "my-cr").await?;

Status

Include all status fields — scalars and conditions — in a single patch_status_namespaced call. Using separate patches with the same field manager causes each one to drop the fields from the other on every reconcile, producing an endless watch-event loop.

Preserve lastTransitionTime when the condition status has not changed so the patch is idempotent and does not bump resourceVersion.

use koprs::status::patch_status_namespaced;
use serde::Serialize;

#[derive(Serialize)]
struct MyStatus {
    ready: bool,
    #[serde(default, skip_serializing_if = "Vec::is_empty")]
    conditions: Vec<MyCondition>,
}

let last_transition_time = cr.status.as_ref()
    .and_then(|s| s.conditions.iter().find(|c| c.type_ == "Ready" && c.status == "True"))
    .map(|c| c.last_transition_time.clone())
    .unwrap_or_else(|| chrono::Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true));

patch_status_namespaced::<MyCR, _>(
    client.clone(), "my-ns", "my-cr",
    MyStatus {
        ready: true,
        conditions: vec![MyCondition { type_: "Ready".into(), status: "True".into(), last_transition_time, .. }],
    },
    "my-operator",
).await?;

make_condition and upsert_condition from koprs::status are available as pure helpers when you need to build or merge k8s_openapi::Condition values before converting to your CRD's own condition type.

Garbage collection

Accepts a keep-predicate: any resource matching the label selector for which the predicate returns false is deleted.

use koprs::gc::gc_namespaced_resources;

gc_namespaced_resources::<ConfigMap>(
    client.clone(), "my-ns", "app=my-operator",
    |r| desired_names.contains(&r.name_any()),
).await?;

Watcher

use koprs::watcher::watch_namespaced_by_label;
use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::channel(16);
let _handle = watch_namespaced_by_label::<MyCR>(client.clone(), "my-ns", "app=my-operator", tx).await?;

while let Some(()) = rx.recv().await { /* resource changed */ }

List and poll

use koprs::resources::{list_namespaced_resources, list_resource_names, wait_for_resources_namespaced};
use std::time::Duration;

let items = list_namespaced_resources::<MyCR>(client.clone(), "my-ns").await?;
let names = list_resource_names::<MyCR>(client.clone(), "app=my-operator").await?;
let items = wait_for_resources_namespaced::<MyCR>(client.clone(), "my-ns", Duration::from_secs(10)).await?;

Ownership and controller wiring

use koprs::owners::{controller_ref, set_owner_refs, make_object_refs_namespaced, make_object_ref_mapper};
use std::sync::Arc;

let oref = controller_ref(&parent_cr)?;
set_owner_refs(&mut child, vec![oref]);

let refs   = make_object_refs_namespaced::<MyCR>(client.clone(), "my-ns").await?;
let mapper = make_object_ref_mapper::<TriggerType, _>(Arc::new(refs));

Labels, annotations, and namespaces

use koprs::resources::{patch_labels_namespaced, patch_annotations_namespaced, ensure_namespace};

patch_labels_namespaced::<MyCR>(client.clone(), "my-ns", "my-cr", &[("app.kubernetes.io/managed-by", "my-operator")]).await?;
patch_annotations_namespaced::<MyCR>(client.clone(), "my-ns", "my-cr", &[("my-operator/synced", "true")]).await?;
ensure_namespace(client.clone(), "my-ns", "my-operator").await?;

Error handling

All functions return Result<T, KubeGenericError>:

pub enum KubeGenericError {
    Kube(kube::Error),
    MissingMetadata(String),
    Serialization(serde_json::Error),
    Io(std::io::Error),
    Internal(String),
}

KubeGenericError implements std::error::Error via thiserror and composes with the ? operator. Variants are pattern-matchable for cases where you need to handle specific failures — for example, distinguishing a missing resource from a permission error:

use koprs::error::KubeGenericError;

match delete_cluster_resource::<Namespace>(client, "my-resource").await {
    Ok(true)  => info!("deleted"),
    Ok(false) => info!("already gone"),
    Err(KubeGenericError::Kube(kube::Error::Api(e))) if e.code == 403 => {
        error!("permission denied");
    }
    Err(e) => return Err(e.into()),
}

Testing

Unit tests

Unit tests use tower_test::mock to intercept HTTP requests and inject hand-crafted JSON responses — no cluster or kubeconfig needed:

cargo test

Enable log output:

RUST_LOG=koprs=debug cargo test -- --nocapture

Tests are organised one file per module under src/tests/:

src/tests/
├── mod.rs
├── resources.rs
├── status.rs
├── finalizers.rs
├── gc.rs
├── owners.rs
├── watcher.rs
├── scope.rs
├── traits.rs
└── error.rs

To write your own tests, create a mock (Client, Handle) pair with tower_test::mock::pair and serve responses from a background task:

use http::{Request, Response, StatusCode};
use kube::client::Body;
use kube::Client;
use serde_json::json;
use tower_test::mock;
type MockHandle = mock::Handle<Request<Body>, Response<Body>>;
fn mock_client() -> (Client, MockHandle) {
    let (svc, handle) = mock::pair::<Request<Body>, Response<Body>>();
    (Client::new(svc, "default"), handle)
}
#[tokio::test]
async fn my_test() {
    let (client, mut handle) = mock_client();
    // Serve the response in a background task — both sides must run
    // concurrently because the client blocks waiting for a response.
    let server = tokio::spawn(async move {
        let (req, send) = handle.next_request().await.unwrap();
        assert_eq!(req.method(), http::Method::GET);
        let body = serde_json::to_vec(&json!({
            "apiVersion": "v1",
            "kind": "ConfigMapList",
            "metadata": {},
            "items": []
        }))
        .unwrap();
        send.send_response(
            Response::builder()
                .status(StatusCode::OK)
                .header("Content-Type", "application/json")
                .body(Body::from(body))
                .unwrap(),
        );
    });
    // Call the function under test using the mock client.
    let result = koprs::resources::list_resources::<k8s_openapi::api::core::v1::ConfigMap>(
        client,
    )
    .await
    .unwrap();
    assert!(result.items.is_empty());
    server.await.unwrap();
}

The mock handle serves requests in FIFO order. Functions that make multiple API calls (such as the GC loop: list → delete → patch) require one handle.next_request() call per request in the correct sequence.


Integration tests

Integration tests run against a real cluster and are gated behind the integration feature flag. The test functions are always compiled so type errors are caught by cargo check, but they only execute when the feature is enabled:

# Verify the integration tests compile without a cluster
cargo test --features integration --test integration --no-run
# Create a local cluster
kind create cluster --name koprs-test
# Run
cargo test --features integration --test integration
# Tear down
kind delete cluster --name koprs-test

Each test creates resources with a unique name suffix and cleans up after itself, so the suite is safe to run with --test-threads greater than one.


License

MIT