koprs 0.6.1

A reusable, ergonomic library that streamlines Kubernetes operator development, allowing developers to build controllers with significantly less code.
Documentation

KOPRS - Kubernetes Operators Rust

A reusable, ergonomic library that streamlines Kubernetes operator development. By providing generic implementations for the most common operator patterns, it eliminates widespread boilerplate across your codebase. It integrates tightly with the kube-rs ecosystem to handle repetitive operational scaffolding, allowing developers to build reliable controllers with significantly less code.

Architecture Overview

koprs is an opinionated, high-level orchestration framework built directly on top of kube and kube-runtime. While kube provides type-safe Kubernetes API bindings and kube-runtime delivers the controller primitives, koprs abstracts away the repetitive boilerplate required to build production ready controllers.

It encapsulates complex infrastructure orchestration loops, robust Server-Side Apply (SSA) patterns, and automated background garbage collection/cleanup processes out of your controller's core codebase. Additionally, it streamlines state synchronization with ready to use watcher logic and provides a strongly typed error handling model that removes the friction of building custom Kubernetes error variants from scratch. Every generic operation comes out of the box with structured, built-in tracing instrumentation, giving you deep visibility into your controller's execution paths without additional setup.

By lifting these structural requirements off your shoulders, koprs leaves you free to focus purely on your custom business logic.

+-------------------------------------------------------+
|                 Your Operator App                     |
|  (Business Logic, Sync Mode Matching, Storage Rules)  |
+-------------------------------------------------------+
                           |
                           v  [Turbofish Types Passed Down]
+-------------------------------------------------------+
|                    koprs                        |
|  (Generic SSA, Lifecycle Helpers, Status Patching)    |
+-------------------------------------------------------+
                           |
                           v
+-------------------------------------------------------+
|                      kube-rs                          |
|         (Low-level Kubernetes API Engine)             |
+-------------------------------------------------------+

Features

  • Apply & delete — cluster-scoped and namespaced resources via Server-Side Apply (SSA)
  • Status patching — patch the /status subresource of any CRD, cluster-scoped or namespaced
  • Finalizers — add and remove finalizers on cluster-scoped and namespaced resources
  • Garbage collection — diff-based GC for orphaned cluster and namespaced resources, with stuck-termination recovery
  • Watchers — watch any resource type with optional label filtering, signal-based via mpsc
  • Listing — list resources across namespaces or within a namespace, with or without label selectors
  • ObjectRefs — build ObjectRef sets for cross-resource reconcile triggers
  • Persist to disk — fetch a resource list and write it as JSON to a file
  • Typed errorsKubeGenericError enum via thiserror, pattern-matchable by callers

Installation

[dependencies]
koprs = { path = "../koprs" }
# or once published:
# koprs = "<version>"

Module overview

Module Description
resources Apply, delete, list, poll, and fetch resources (cluster + namespaced)
status Patch /status subresource via SSA
finalizers Add and remove finalizers
gc Garbage collect orphaned resources
watcher Watch resources for changes via mpsc signals
scope Cluster and Namespaced scope markers for compile-time API selection
traits KubeResource, NamespacedResource, ClusterResource trait aliases
error KubeGenericError enum

Usage

Apply a resource

The library exposes three layers of functions for applying resources:

Function Use when
apply_namespaced_resource Your CRD is namespace-scoped (most common)
apply_cluster_resource Your CRD is cluster-scoped
apply_resource Scope is generic or passed through from a caller
use koprs::resources::{
    apply_resource, apply_cluster_resource, apply_namespaced_resource,
};
use koprs::scope::{Cluster, Namespaced};

// Namespace-scoped CRD (convenience wrapper)
apply_namespaced_resource::(
    client.clone(),
    "my-namespace",
    &resource,
    "my-operator",
).await?;

// Cluster-scoped CRD (convenience wrapper)
apply_cluster_resource::(
    client.clone(),
    &resource,
    "my-operator",
).await?;

// Generic form — when scope is determined at runtime or passed through
apply_resource::(
    client.clone(),
    Namespaced("my-namespace"),
    &resource,
    "my-operator",
).await?;

apply_resource::(
    client.clone(),
    Cluster,
    &resource,
    "my-operator",
).await?;

Delete a resource

The library exposes three layers of functions for deleting resources:

Function Use when
delete_namespaced_resource Your CRD is namespace-scoped (most common)
delete_cluster_resource Your CRD is cluster-scoped
delete_resource Scope is generic or passed through from a caller
use koprs::resources::{
    delete_resource, delete_cluster_resource, delete_namespaced_resource,
};
use koprs::scope::{Cluster, Namespaced};

// Namespace-scoped CRD (convenience wrapper)
// Returns Ok(false) if the resource did not exist
let deleted = delete_namespaced_resource::(
    client.clone(),
    "my-namespace",
    "my-resource",
).await?;

// Cluster-scoped CRD (convenience wrapper)
let deleted = delete_cluster_resource::(
    client.clone(),
    "my-resource",
).await?;

// Generic form — when scope is determined at runtime or passed through
let deleted = delete_resource::(
    client.clone(),
    Namespaced("my-namespace"),
    "my-resource",
).await?;

let deleted = delete_resource::(
    client.clone(),
    Cluster,
    "my-resource",
).await?;

Ensure a namespace exists

use koprs::resources::ensure_namespace;

ensure_namespace(client.clone(), "my-namespace", "my-operator").await?;

Patch status

The library exposes three layers of functions for patching status:

Function Use when
patch_status_namespaced Your CRD is namespace-scoped (most common)
patch_status_cluster Your CRD is cluster-scoped
patch_status Scope is generic or passed through from a caller
use koprs::status::{patch_status, patch_status_cluster, patch_status_namespaced};
use serde::Serialize;

#[derive(Serialize)]
struct MyStatus { ready: bool, message: String }

// Namespace-scoped CRD (convenience wrapper)
patch_status_namespaced::(
    client.clone(),
    "my-namespace",
    "my-resource",
    MyStatus { ready: true, message: "Reconciled".into() },
    "my-operator",
).await?;

// Cluster-scoped CRD (convenience wrapper)
patch_status_cluster::(
    client.clone(),
    "my-resource",
    MyStatus { ready: true, message: "Reconciled".into() },
    "my-operator",
).await?;

// Generic form — when scope is determined at runtime or passed through
use koprs::scope::{Cluster, Namespaced};

patch_status::(
    client.clone(),
    Namespaced("my-namespace"),
    "my-resource",
    MyStatus { ready: true, message: "Reconciled".into() },
    "my-operator",
).await?;

Finalizers

The library exposes three layers of functions for managing finalizers:

Function Use when
add_finalizer_namespaced / remove_finalizers_namespaced Your CRD is namespace-scoped (most common)
add_finalizer_cluster / remove_finalizers_cluster Your CRD is cluster-scoped
add_finalizer / remove_finalizers Scope is generic or passed through from a caller
use koprs::finalizers::{
    add_finalizer, add_finalizer_cluster, add_finalizer_namespaced,
    remove_finalizers, remove_finalizers_cluster, remove_finalizers_namespaced,
};

// Namespace-scoped CRD (convenience wrapper)
add_finalizer_namespaced::(
    client.clone(),
    "my-namespace",
    "my-resource",
    "my-operator/cleanup",
).await?;

remove_finalizers_namespaced::(
    client.clone(),
    "my-namespace",
    "my-resource",
).await?;

// Cluster-scoped CRD (convenience wrapper)
add_finalizer_cluster::(
    client.clone(),
    "my-resource",
    "my-operator/cleanup",
).await?;

remove_finalizers_cluster::(
    client.clone(),
    "my-resource",
).await?;

// Generic form — when scope is determined at runtime or passed through
use koprs::scope::{Cluster, Namespaced};

add_finalizer::(
    client.clone(),
    Namespaced("my-namespace"),
    "my-resource",
    "my-operator/cleanup",
).await?;

remove_finalizers::(
    client.clone(),
    Cluster,
    "my-resource",
).await?;

Garbage collection

The library exposes three layers of functions for garbage collecting orphaned resources:

Function Use when
gc_namespaced_resources Your CRD is namespace-scoped (most common)
gc_cluster_resources Your CRD is cluster-scoped
gc_resources Scope is generic, or you need a custom runtime scope

All three variations accept a predicate closure (Fn(&T) -> bool) that evaluates whether a given resource should be kept. Any resource matching the label selector for which the predicate returns false will be garbage collected.

use std::collections::HashSet;
use kube::ResourceExt;
use koprs::gc::{gc_cluster_resources, gc_namespaced_resources, gc_resources};
use koprs::scope::{Cluster, Namespaced};

// Namespace-scoped CRD (convenience wrapper)
let desired = HashSet::from([
    ("default".to_string(), "my-resource-a".to_string()),
]);

gc_namespaced_resources::(
    client.clone(),
    "default",
    "app=my-operator",
    |r| {
        let ns = r.namespace().unwrap_or_default();
        desired.contains(&(ns, r.name_any()))
    },
).await?;

// Cluster-scoped CRD (convenience wrapper)
let desired = HashSet::from(["my-cluster-resource".to_string()]);

gc_cluster_resources::(
    client.clone(),
    "app=my-operator",
    |r| desired.contains(&r.name_any()),
).await?;

// Generic form — when scope is determined at runtime or passed through
let desired = HashSet::from([
    ("default".to_string(), "my-resource-b".to_string()),
]);

gc_resources::(
    client.clone(),
    Namespaced("default"),
    "app=my-operator",
    |r| {
        let ns = r.namespace().unwrap_or_default();
        desired.contains(&(ns, r.name_any()))
    },
).await?;

Watcher

The library exposes three layers of functions for watching resources:

Function Use when
watch_namespaced / watch_namespaced_by_label Your CRD is namespace-scoped (most common)
watch_cluster / watch_cluster_by_label Your CRD is cluster-scoped
watch Scope is generic or passed through from a caller
use koprs::watcher::{
    watch, watch_cluster, watch_cluster_by_label,
    watch_namespaced, watch_namespaced_by_label,
};
use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::channel(16);

// Namespace-scoped CRD (convenience wrapper)
let _handle = watch_namespaced::(
    client.clone(),
    "my-namespace",
    tx.clone(),
).await?;

// Namespace-scoped CRD with label filter
let _handle = watch_namespaced_by_label::(
    client.clone(),
    "my-namespace",
    "app=my-operator",
    tx.clone(),
).await?;

// Cluster-scoped CRD (convenience wrapper)
let _handle = watch_cluster::(
    client.clone(),
    tx.clone(),
).await?;

// Cluster-scoped CRD with label filter
let _handle = watch_cluster_by_label::(
    client.clone(),
    "app=my-operator",
    tx.clone(),
).await?;

// Generic form — when scope is determined at runtime or passed through
use koprs::scope::{Cluster, Namespaced};

let _handle = watch::(
    client.clone(),
    Namespaced("my-namespace"),
    None,
    tx.clone(),
).await?;

let _handle = watch::(
    client.clone(),
    Cluster,
    Some("app=my-operator"),
    tx.clone(),
).await?;

// Consume signals from any of the above
while let Some(()) = rx.recv().await {
    println!("Resource changed!");
}

List resources

use koprs::resources::{
    list_resources, list_resources_by_label, list_namespaced_resources, list_resource_names,
};
use k8s_openapi::api::core::v1::ConfigMap;

let all     = list_resources::(client.clone()).await?;
let labeled = list_resources_by_label::(client.clone(), "app=my-operator").await?;
let in_ns   = list_namespaced_resources::(client.clone(), "my-namespace").await?;
let names   = list_resource_names::(client.clone(), "app=my-operator").await?;

Polling

The library exposes three layers of functions for polling until resources exist:

Function Use when
wait_for_resources_namespaced Your CRD is namespace-scoped (most common)
wait_for_resources_cluster Your CRD is cluster-scoped
wait_for_resources Scope is generic or passed through from a caller
use koprs::resources::{
    wait_for_resources, wait_for_resources_cluster, wait_for_resources_namespaced,
};
use std::time::Duration;

// Namespace-scoped CRD (convenience wrapper)
let resources = wait_for_resources_namespaced::(
    client.clone(),
    "my-namespace",
    Duration::from_secs(10),
).await?;

// Cluster-scoped CRD (convenience wrapper)
let resources = wait_for_resources_cluster::(
    client.clone(),
    Duration::from_secs(10),
).await?;

// Generic form — when scope is determined at runtime or passed through
use koprs::scope::{Cluster, Namespaced};

let resources = wait_for_resources::(
    client.clone(),
    Namespaced("my-namespace"),
    Duration::from_secs(10),
).await?;

let resources = wait_for_resources::(
    client.clone(),
    Cluster,
    Duration::from_secs(10),
).await?;

// Cardinality policy stays in your operator — the library returns Vec
match resources.len() {
    1 => { /* exactly one CR, proceed */ }
    n => { /* zero is unreachable here; handle n > 1 as your domain requires */ }
}

ObjectRefs

use koprs::resources::{
    make_object_refs, make_object_refs_cluster, make_object_refs_namespaced,
    make_object_ref_mapper,
};
use koprs::scope::{Cluster, Namespaced};
use std::sync::Arc;

// Namespace-scoped (convenience wrapper)
let refs = make_object_refs_namespaced::(client.clone(), "my-namespace").await?;

// Cluster-scoped (convenience wrapper)
let refs = make_object_refs_cluster::(client.clone()).await?;

// Generic form — when scope is determined at runtime or passed through
let refs = make_object_refs::(client.clone(), Namespaced("my-namespace")).await?;
let refs = make_object_refs::(client.clone(), Cluster).await?;

// Build a mapper for cross-resource reconcile triggers
let mapper = make_object_ref_mapper::(Arc::new(refs));

Persist to disk

use koprs::resources::fetch_and_write_to_file;
use k8s_openapi::api::core::v1::Pod;

fetch_and_write_to_file::(client.clone(), "/tmp", "pods.json").await?;

Error handling

All functions return Result<T, KubeGenericError>:

pub enum KubeGenericError {
    Kube(kube::Error),
    MissingMetadata(String),
    Serialization(serde_json::Error),
    Other(anyhow::Error),
}

KubeGenericError implements std::error::Error via thiserror, so it composes naturally with anyhow and the ? operator. Variants are pattern-matchable for cases where you need to handle specific failures — for example, distinguishing a missing resource from a permission error:

use koprs::error::KubeGenericError;

match delete_cluster_resource::<Namespace>(client, "my-resource").await {
    Ok(true)  => info!("deleted"),
    Ok(false) => info!("already gone"),
    Err(KubeGenericError::Kube(kube::Error::Api(e))) if e.code == 403 => {
        error!("permission denied");
    }
    Err(e) => return Err(e.into()),
}

Testing

Unit tests

Unit tests use tower_test::mock to intercept HTTP requests and inject hand-crafted JSON responses — no cluster or kubeconfig needed:

cargo test

Enable log output:

RUST_LOG=koprs=debug cargo test -- --nocapture

Tests are organised one file per module under src/tests/:

src/tests/
├── mod.rs
├── common.rs        # shared mock harness and fixture builders
├── resources.rs
├── status.rs
├── finalizers.rs
├── gc.rs
├── watcher.rs
├── scope.rs
├── traits.rs
└── error.rs

To write your own tests, create a mock (Client, Handle) pair with tower_test::mock::pair and serve responses from a background task:

use http::{Request, Response, StatusCode};
use kube::client::Body;
use kube::Client;
use serde_json::json;
use tower_test::mock;
type MockHandle = mock::Handle<Request<Body>, Response<Body>>;
fn mock_client() -> (Client, MockHandle) {
    let (svc, handle) = mock::pair::<Request<Body>, Response<Body>>();
    (Client::new(svc, "default"), handle)
}
#[tokio::test]
async fn my_test() {
    let (client, mut handle) = mock_client();
    // Serve the response in a background task — both sides must run
    // concurrently because the client blocks waiting for a response.
    let server = tokio::spawn(async move {
        let (req, send) = handle.next_request().await.unwrap();
        assert_eq!(req.method(), http::Method::GET);
        let body = serde_json::to_vec(&json!({
            "apiVersion": "v1",
            "kind": "ConfigMapList",
            "metadata": {},
            "items": []
        }))
        .unwrap();
        send.send_response(
            Response::builder()
                .status(StatusCode::OK)
                .header("Content-Type", "application/json")
                .body(Body::from(body))
                .unwrap(),
        );
    });
    // Call the function under test using the mock client.
    let result = koprs::resources::list_resources::<k8s_openapi::api::core::v1::ConfigMap>(
        client,
    )
    .await
    .unwrap();
    assert!(result.items.is_empty());
    server.await.unwrap();
}

The mock handle serves requests in FIFO order. Functions that make multiple API calls (such as the GC loop: list → delete → patch) require one handle.next_request() call per request in the correct sequence.


Integration tests

Integration tests run against a real cluster and are gated behind the integration feature flag. The test functions are always compiled so type errors are caught by cargo check, but they only execute when the feature is enabled:

# Verify the integration tests compile without a cluster
cargo test --features integration --test integration --no-run
# Create a local cluster
kind create cluster --name koprs-test
# Run
cargo test --features integration --test integration
# Tear down
kind delete cluster --name koprs-test

Each test creates resources with a unique name suffix and cleans up after itself, so the suite is safe to run with --test-threads greater than one.


Integration tests

Integration tests run against a real cluster and are gated behind the integration feature flag. The test functions are always compiled so type errors are caught by cargo check, but they only execute when the feature is enabled:

# Verify the integration tests compile without a cluster
cargo test --features integration --test integration --no-run

# Create a local cluster
kind create cluster --name koprs-test

# Run
cargo test --features integration --test integration

# Tear down
kind delete cluster --name koprs-test

Each test creates resources with a unique name suffix and cleans up after itself, so the suite is safe to run with --test-threads greater than one.


License

MIT