koprs-external 0.9.1

Generic polling watchers for external sources such as HTTP APIs and object stores, designed as a companion to koprs Kubernetes operators.
Documentation

koprs-external

Generic polling watchers for external sources such as HTTP REST APIs and object stores, designed as a companion to koprs Kubernetes operators.

Kubernetes operators often need to reconcile cluster state with resources that live outside the cluster, a remote configuration endpoint, an object store, or a third-party API. koprs-external provides a lightweight polling abstraction that fits naturally alongside koprs controllers, using the same channel-based pattern as koprs::watcher.

Architecture Overview

koprs-external sits alongside your operator and bridges external HTTP or object-store sources into the same mpsc channel model used by koprs::watcher.

+------------------------------------------------------+
|                 Your Operator App                    |
|  (Reconcile Kubernetes state from external sources)  |
+------------------------------------------------------+
          |                            |
          v                            v
+------------------+       +---------------------------+
|    koprs         |       |    koprs-external         |
|  (Kubernetes     |       |  (HTTP polling,           |
|   API watcher)   |       |   object-store diffing)   |
+------------------+       +---------------------------+
          |                            |
          v                            v
+------------------+       +---------------------------+
|    kube-rs       |       |  reqwest / object_store   |
+------------------+       +---------------------------+

What koprs-external provides

Area koprs-external rolling your own
Polling loop watch_external spawns a background task that ticks on a configurable interval You write the loop, interval logic, and missed-tick handling
Change detection ETag / 304 Not Modified support; falls back to Last-Modified; tracks added, modified, and removed items You diff results yourself on every poll
Event model ExternalEvent<T> with Added, Modified, Removed variants — same shape as koprs::WatchEvent You define and maintain your own event type
Authentication Bearer token and arbitrary request headers via fluent builder You build headers on every request
Object store support ETag-based diffing over any object_store-compatible backend — S3, GCS, Azure, local, HTTP, in-memory (feature-gated) You call the list API and diff the results yourself
Error handling Transparent exponential backoff on consecutive poll failures — wait doubles from interval up to max_backoff, resets on success You decide whether to panic, log, or back off
Tracing All poll activity and errors are emitted as structured tracing spans You wire up logging yourself

Features

Cargo feature What it enables
(default) HttpPoller — polls any HTTP or HTTPS endpoint
object-store ObjectStorePoller — lists and diffs any object_store-compatible backend (S3, GCS, Azure Blob, local filesystem, in-memory)
integration Enables tests/integration.rs (requires --features integration to compile)

Object store backends are opt-in via object_store's own feature flags. Add the backend you need in your application's Cargo.toml (e.g. object_store = { version = "0.11", features = ["aws"] } for S3).


Module overview

Module Description
watcher ExternalSource trait, ExternalEvent<T> enum, WatchConfig, watch_external / watch_external_with_config spawners
http HttpPoller — polls a single HTTP endpoint; ETag / Last-Modified change detection
store ObjectStorePoller — lists and diffs any object_store backend (requires object-store feature)
error ExternalError enum

Installation

[dependencies]
koprs-external = { path = "../koprs-external" }
# or once published:
# koprs-external = "<version>"

# Optional S3 support
# koprs-external = { version = "<version>", features = ["object-store"] }

Usage

HTTP API — polling a REST endpoint

HttpPoller polls a single URL. The first successful response emits Added, subsequent changes emit Modified, and a 404 after a prior success emits Removed. ETag-based conditional requests (304 Not Modified) are handled automatically.

use std::time::Duration;
use koprs_external::http::HttpPoller;
use koprs_external::watcher::{watch_external, ExternalEvent};
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(16);

    let poller = HttpPoller::new("https://api.example.com/config")
        .with_bearer_token("my-token")
        .with_name("config-api");

    let _handle = watch_external(poller, Duration::from_secs(30), tx);

    while let Some(event) = rx.recv().await {
        match event {
            ExternalEvent::Added(r)    => println!("appeared: {} bytes", r.body.len()),
            ExternalEvent::Modified(r) => println!("changed:  {} bytes", r.body.len()),
            ExternalEvent::Removed(_)  => println!("gone"),
        }
    }
}

Backoff — custom retry ceiling

watch_external uses WatchConfig::new(interval) with max_backoff = interval × 32. Use watch_external_with_config when you need a specific ceiling — for example, capping retries at 5 minutes regardless of how large the base interval is:

use std::time::Duration;
use koprs_external::http::HttpPoller;
use koprs_external::watcher::{WatchConfig, watch_external_with_config, ExternalEvent};
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(16);

    let poller = HttpPoller::new("https://api.example.com/config");
    let config = WatchConfig::new(Duration::from_secs(30))
        .with_max_backoff(Duration::from_secs(300)); // cap at 5 min

    let _handle = watch_external_with_config(poller, config, tx);

    while let Some(event) = rx.recv().await {
        println!("{event:?}");
    }
}

On the first error the retry wait is 30 s × 2 = 60 s. Each subsequent failure doubles it until it reaches the 5-minute cap. A successful poll resets the wait back to the base 30 s.

HTTP API — custom TLS (e.g. Kubernetes API server)

Bring your own reqwest::Client for mutual TLS, custom CA certificates, or connection timeouts. This pattern works against the Kubernetes REST API when combined with a bearer token:

use koprs_external::http::HttpPoller;

let client = reqwest::Client::builder()
    .add_root_certificate(/* your cluster CA cert */)
    .build()
    .unwrap();

let poller = HttpPoller::new("https://kubernetes.default.svc/api/v1/namespaces/default/configmaps/my-config")
    .with_client(client)
    .with_bearer_token(&token);

HTTP API — implementing a custom source

For sources that do not fit the single-URL model, implement ExternalSource directly:

use futures::future::BoxFuture;
use koprs_external::error::Result;
use koprs_external::watcher::{ExternalEvent, ExternalSource};

struct MyApiPoller { /* ... */ }

impl ExternalSource for MyApiPoller {
    type Item = MyData;

    fn poll(&mut self) -> BoxFuture<'_, Result<Vec<ExternalEvent<MyData>>>> {
        Box::pin(async move {
            // fetch, diff against self.last_state, return events
            todo!()
        })
    }

    fn name(&self) -> &str { "my-api" }
}

Object store — polling for object changes

ObjectStorePoller accepts any Arc<dyn ObjectStore>. The backend is configured entirely outside the poller — swap S3 for GCS or a local directory without changing any polling code.

AWS S3 (add object_store = { version = "0.11", features = ["aws"] } to your Cargo.toml):

#[cfg(feature = "object-store")]
use std::sync::Arc;
#[cfg(feature = "object-store")]
use object_store::aws::AmazonS3Builder;
#[cfg(feature = "object-store")]
use koprs_external::store::ObjectStorePoller;
use koprs_external::watcher::{watch_external, ExternalEvent};
use std::time::Duration;
use tokio::sync::mpsc;

#[cfg(feature = "object-store")]
#[tokio::main]
async fn main() {
    let store = Arc::new(
        AmazonS3Builder::from_env()
            .with_bucket_name("my-bucket")
            .build()
            .unwrap(),
    );

    let (tx, mut rx) = mpsc::channel(16);
    let poller = ObjectStorePoller::new(store).with_prefix("configs/");
    let _handle = watch_external(poller, Duration::from_secs(60), tx);

    while let Some(event) = rx.recv().await {
        match event {
            ExternalEvent::Added(obj)    => println!("new:     {} ({} B)", obj.path, obj.size),
            ExternalEvent::Modified(obj) => println!("changed: {}", obj.path),
            ExternalEvent::Removed(obj)  => println!("deleted: {}", obj.path),
        }
    }
}

The same poller works unchanged against GCS, Azure Blob, or a local directory — just swap the builder:

// Google Cloud Storage
use object_store::gcp::GoogleCloudStorageBuilder;
let store = Arc::new(GoogleCloudStorageBuilder::from_env().with_bucket_name("my-bucket").build().unwrap());

// Local filesystem
use object_store::local::LocalFileSystem;
let store = Arc::new(LocalFileSystem::new_with_prefix("/data/configs").unwrap());

Testing

Unit tests

Unit tests use an in-process axum HTTP server and the object_store::memory::InMemory backend — no external services required. Backoff behaviour is covered by dedicated tests that use a FlakySource which fails a configurable number of times then recovers.

cargo test -p koprs-external
cargo test -p koprs-external --features object-store

Integration tests

The HTTP integration tests spin up a local server and exercise the full polling loop end-to-end. They do not require a Kubernetes cluster or AWS credentials.

cargo test -p koprs-external --features integration --test integration

Kubernetes integration test

One test (kubernetes_configmap_lifecycle_via_http_poller) is marked #[ignore] and requires a reachable cluster with a service-account token. See the header of tests/integration.rs for step-by-step setup instructions using kind.

# After completing the setup in tests/integration.rs:
cargo test -p koprs-external --features integration --test integration \
    -- --include-ignored

Object store tests

ObjectStorePoller unit tests use the built-in InMemory backend from object_store — no AWS credentials, no LocalStack, no external services required. All event types (Added, Modified, Removed) and prefix filtering are covered by these tests.

cargo test -p koprs-external --features object-store