cellos-sink-jetstream 0.5.1

NATS JetStream EventSink for CellOS — durable, ordered CloudEvent delivery with 90-day retention floor (FC-74).
Documentation

cellos-sink-jetstream

EventSink that publishes CloudEvents JSON to NATS JetStream. The production-default transport for cellos-supervisor (and cellos-server per ADR-0011).

What it is

Implements cellos_core::ports::EventSink. Each emit serializes a CloudEventV1 to JSON and publishes it to a configured subject through an async-nats JetStream client.

In front of the underlying async_nats::Client (which does its own TCP-level reconnect) the sink keeps a small Connected / Reconnecting { attempt, next_after } state machine (P3-01). On failure the circuit opens, emit returns Err fast (instead of being held by the client's blocking publish path), and a single probe attempt fires once the backoff elapses. Backoff is 100ms * 2^attempt, capped at 3 min, reset to zero on the first successful publish.

Selected in cellos-supervisor::composition::build_primary_event_sink as the default — there is no enum env var to opt in. The supervisor falls back to NoopEventSink if the JetStream connect fails, unless CELL_OS_REQUIRE_JETSTREAM=1 is set (in which case startup fails).

What it does NOT do:

  • It does not create streams or consumers — the operator pre-stages the JetStream stream that captures the configured subject.
  • It does not redact or sign payloads — those are separate wrapper sinks (cellos-sink-redact, SigningEventSink).
  • It does not buffer to disk on outage — pair with cellos-sink-dlq for that.

Public API surface

Symbol Purpose
JetStreamEventSink The sink.
JetStreamEventSink::connect(url, subject) Connect with default reqwest/NATS settings.
JetStreamEventSink::connect_with_root_ca(url, subject, ca_path) Connect over TLS with a private root CA.
JetStreamEventSink::from_publisher(publisher, subject) Inject a fake Publisher for tests.
JetStreamEventSink::debug_state() Inspect the reconnect state (tests).
TENANT_ID_PLACEHOLDER / TENANT_ID_DEFAULT_TOKEN Subject-template placeholder for per-tenant isolation (A2-03).
resolve_tenant_subject(template, event) Pure helper.
with_retry(max_attempts, f) Bounded retry helper used internally.

Source: src/lib.rs.

Configuration

Env var Default Description
NATS_URL nats://127.0.0.1:4222 NATS server URL (credentials are stripped from logs).
NATS_CA_FILE (none) PEM CA bundle for TLS to private NATS endpoints.
CELL_OS_REQUIRE_JETSTREAM unset If set, startup fails on connect error instead of degrading to noop.
CELL_OS_USE_NOOP_SINK unset Explicit opt-in to noop; skips JetStream entirely.

The subject is derived from spec.id, runId, and an optional tenant ID (when spec.correlation.tenantId is present and the subject template uses {tenantId}).

Examples

export NATS_URL=nats://nats.cellos.svc.cluster.local:4222
export NATS_CA_FILE=/run/secrets/nats-ca.pem
cellos-supervisor --spec cell.yaml

Sink composition order in the supervisor (build_primary_event_sink):

DlqSink( Sign( Redact( JetStreamEventSink ) ) )

Testing

cargo test -p cellos-sink-jetstream

The crate's tests use the injected-Publisher constructor; no live NATS broker is required.

Related crates

  • cellos-sink-jsonl — file-based mirror / dev sink.
  • cellos-sink-redact — wraps this sink to strip secrets.
  • cellos-sink-dlq — wraps this sink to spill failures to disk.
  • cellos-supervisor — selects this sink in build_primary_event_sink.
  • cellos-core — defines EventSink and CloudEventV1.

ADRs

  • ADR-0011 — cellos-server publishes through the same sink chain.
  • P3-01 — reconnect / circuit-breaker model.
  • A2-03 — per-tenant subject templating.
  • FC-74 — 90-day audit retention floor (applies to JetStream stream).