cellos-sink-jetstream
EventSink that publishes CloudEvents JSON to NATS JetStream. The
production-default transport for cellos-supervisor (and cellos-server
per ADR-0011).
What it is
Implements cellos_core::ports::EventSink. Each emit serializes a
CloudEventV1 to JSON and publishes it to a configured subject through
an async-nats JetStream client.
In front of the underlying async_nats::Client (which does its own
TCP-level reconnect) the sink keeps a small Connected /
Reconnecting { attempt, next_after } state machine (P3-01). On
failure the circuit opens, emit returns Err fast (instead of being
held by the client's blocking publish path), and a single probe attempt
fires once the backoff elapses. Backoff is 100ms * 2^attempt, capped
at 3 min, reset to zero on the first successful publish.
Selected in cellos-supervisor::composition::build_primary_event_sink
as the default — there is no enum env var to opt in. The supervisor
falls back to NoopEventSink if the JetStream connect fails, unless
CELL_OS_REQUIRE_JETSTREAM=1 is set (in which case startup fails).
What it does NOT do:
- It does not create streams or consumers — the operator pre-stages the JetStream stream that captures the configured subject.
- It does not redact or sign payloads — those are separate wrapper
sinks (
cellos-sink-redact,SigningEventSink). - It does not buffer to disk on outage — pair with
cellos-sink-dlqfor that.
Public API surface
| Symbol | Purpose |
|---|---|
JetStreamEventSink |
The sink. |
JetStreamEventSink::connect(url, subject) |
Connect with default reqwest/NATS settings. |
JetStreamEventSink::connect_with_root_ca(url, subject, ca_path) |
Connect over TLS with a private root CA. |
JetStreamEventSink::from_publisher(publisher, subject) |
Inject a fake Publisher for tests. |
JetStreamEventSink::debug_state() |
Inspect the reconnect state (tests). |
TENANT_ID_PLACEHOLDER / TENANT_ID_DEFAULT_TOKEN |
Subject-template placeholder for per-tenant isolation (A2-03). |
resolve_tenant_subject(template, event) |
Pure helper. |
with_retry(max_attempts, f) |
Bounded retry helper used internally. |
Source: src/lib.rs.
Configuration
| Env var | Default | Description |
|---|---|---|
NATS_URL |
nats://127.0.0.1:4222 |
NATS server URL (credentials are stripped from logs). |
NATS_CA_FILE |
(none) | PEM CA bundle for TLS to private NATS endpoints. |
CELL_OS_REQUIRE_JETSTREAM |
unset | If set, startup fails on connect error instead of degrading to noop. |
CELL_OS_USE_NOOP_SINK |
unset | Explicit opt-in to noop; skips JetStream entirely. |
The subject is derived from spec.id, runId, and an optional tenant
ID (when spec.correlation.tenantId is present and the subject template
uses {tenantId}).
Examples
Sink composition order in the supervisor
(build_primary_event_sink):
DlqSink( Sign( Redact( JetStreamEventSink ) ) )
Testing
The crate's tests use the injected-Publisher constructor; no live
NATS broker is required.
Related crates
cellos-sink-jsonl— file-based mirror / dev sink.cellos-sink-redact— wraps this sink to strip secrets.cellos-sink-dlq— wraps this sink to spill failures to disk.cellos-supervisor— selects this sink inbuild_primary_event_sink.cellos-core— definesEventSinkandCloudEventV1.
ADRs
- ADR-0011 —
cellos-serverpublishes through the same sink chain. - P3-01 — reconnect / circuit-breaker model.
- A2-03 — per-tenant subject templating.
- FC-74 — 90-day audit retention floor (applies to JetStream stream).