koprs-external
Generic polling watchers for external sources such as HTTP REST APIs and object stores, designed as a companion to koprs Kubernetes operators.
Kubernetes operators often need to reconcile cluster state with resources that live outside the cluster, a remote configuration endpoint, an object store, or a third-party API. koprs-external provides a lightweight polling abstraction that fits naturally alongside koprs controllers, using the same channel-based pattern as koprs::watcher.
Architecture Overview
koprs-external sits alongside your operator and bridges external HTTP or object-store sources into the same mpsc channel model used by koprs::watcher.
| |
| () |
| |
| | | |
| ( | | ( |
| ) | | ) |
| |
| | | |
What koprs-external provides
| Area | koprs-external | rolling your own |
|---|---|---|
| Polling loop | watch_external spawns a background task that ticks on a configurable interval |
You write the loop, interval logic, and missed-tick handling |
| Change detection | ETag / 304 Not Modified support; falls back to Last-Modified; tracks added, modified, and removed items |
You diff results yourself on every poll |
| Event model | ExternalEvent<T> with Added, Modified, Removed variants — same shape as koprs::WatchEvent |
You define and maintain your own event type |
| Authentication | Bearer token and arbitrary request headers via fluent builder | You build headers on every request |
| Object store support | ETag-based diffing over any object_store-compatible backend — S3, GCS, Azure, local, HTTP, in-memory (feature-gated) |
You call the list API and diff the results yourself |
| Error handling | Transparent exponential backoff on consecutive poll failures — wait doubles from interval up to max_backoff, resets on success |
You decide whether to panic, log, or back off |
| Tracing | All poll activity and errors are emitted as structured tracing spans |
You wire up logging yourself |
Features
| Cargo feature | What it enables |
|---|---|
| (default) | HttpPoller — polls any HTTP or HTTPS endpoint |
object-store |
ObjectStorePoller — lists and diffs any object_store-compatible backend (S3, GCS, Azure Blob, local filesystem, in-memory) |
integration |
Enables tests/integration.rs (requires --features integration to compile) |
Object store backends are opt-in via
object_store's own feature flags. Add the backend you need in your application'sCargo.toml(e.g.object_store = { version = "0.11", features = ["aws"] }for S3).
Module overview
| Module | Description |
|---|---|
watcher |
ExternalSource trait, ExternalEvent<T> enum, WatchConfig, watch_external / watch_external_with_config spawners |
http |
HttpPoller — polls a single HTTP endpoint; ETag / Last-Modified change detection |
store |
ObjectStorePoller — lists and diffs any object_store backend (requires object-store feature) |
error |
ExternalError enum |
Installation
[]
= { = "../koprs-external" }
# or once published:
# koprs-external = "<version>"
# Optional S3 support
# koprs-external = { version = "<version>", features = ["object-store"] }
Usage
HTTP API — polling a REST endpoint
HttpPoller polls a single URL. The first successful response emits Added, subsequent changes emit Modified, and a 404 after a prior success emits Removed. ETag-based conditional requests (304 Not Modified) are handled automatically.
use Duration;
use HttpPoller;
use ;
use mpsc;
async
Backoff — custom retry ceiling
watch_external uses WatchConfig::new(interval) with max_backoff = interval × 32. Use
watch_external_with_config when you need a specific ceiling — for example, capping retries
at 5 minutes regardless of how large the base interval is:
use Duration;
use HttpPoller;
use ;
use mpsc;
async
On the first error the retry wait is 30 s × 2 = 60 s. Each subsequent failure
doubles it until it reaches the 5-minute cap. A successful poll resets the wait
back to the base 30 s.
HTTP API — custom TLS (e.g. Kubernetes API server)
Bring your own reqwest::Client for mutual TLS, custom CA certificates, or
connection timeouts. This pattern works against the Kubernetes REST API when
combined with a bearer token:
use HttpPoller;
let client = builder
.add_root_certificate
.build
.unwrap;
let poller = new
.with_client
.with_bearer_token;
HTTP API — implementing a custom source
For sources that do not fit the single-URL model, implement ExternalSource
directly:
use BoxFuture;
use Result;
use ;
Object store — polling for object changes
ObjectStorePoller accepts any Arc<dyn ObjectStore>. The backend is
configured entirely outside the poller — swap S3 for GCS or a local
directory without changing any polling code.
AWS S3 (add object_store = { version = "0.11", features = ["aws"] } to your Cargo.toml):
use Arc;
use AmazonS3Builder;
use ObjectStorePoller;
use ;
use Duration;
use mpsc;
async
The same poller works unchanged against GCS, Azure Blob, or a local directory — just swap the builder:
// Google Cloud Storage
use GoogleCloudStorageBuilder;
let store = new;
// Local filesystem
use LocalFileSystem;
let store = new;
Testing
Unit tests
Unit tests use an in-process axum HTTP server and the object_store::memory::InMemory
backend — no external services required. Backoff behaviour is covered by dedicated
tests that use a FlakySource which fails a configurable number of times then recovers.
Integration tests
The HTTP integration tests spin up a local server and exercise the full polling loop end-to-end. They do not require a Kubernetes cluster or AWS credentials.
Kubernetes integration test
One test (kubernetes_configmap_lifecycle_via_http_poller) is marked
#[ignore] and requires a reachable cluster with a service-account token. See
the header of tests/integration.rs for step-by-step setup instructions using
kind.
# After completing the setup in tests/integration.rs:
Object store tests
ObjectStorePoller unit tests use the built-in InMemory backend from
object_store — no AWS credentials, no LocalStack, no external services
required. All event types (Added, Modified, Removed) and prefix filtering
are covered by these tests.