cellos_sink_jetstream/lib.rs
1//! JetStream [`cellos_core::ports::EventSink`] — publishes JSON payloads to a configured subject.
2//!
3//! ## Reconnect / circuit-breaker model (P3-01)
4//!
5//! `JetStreamEventSink` keeps a small in-memory state machine in front of the
6//! underlying NATS connection so an extended broker outage cannot stall the
7//! supervisor's `emit()` path:
8//!
9//! - **`Connected`** — last publish succeeded. Next `emit()` calls
10//! [`Publisher::publish`] directly (with a small bounded retry, see
11//! [`with_retry`]).
12//! - **`Reconnecting { attempt, next_after }`** — last publish failed.
13//! `emit()` returns `Err` immediately while `Instant::now() < next_after`
14//! (the circuit is "open"). Once `next_after` has elapsed the sink takes
15//! one bounded probe attempt; on success the state resets to `Connected`,
16//! on failure `attempt` is incremented and `next_after` is pushed forward
17//! by an exponentially-backed-off interval.
18//!
19//! The async-nats `Client` performs its own TCP-level reconnect under the
20//! hood — this state machine is layered on top so that the supervisor's
21//! `emit()` returns Err quickly during an outage instead of being held by
22//! the underlying client's blocking publish path. Operators decide what to
23//! do with that Err (typically log and continue).
24//!
25//! Backoff schedule: `100ms * 2^attempt` with `attempt` saturating at the
26//! cap that keeps the value below 3 minutes (`180s`). So the sequence is
27//! `100ms, 200ms, 400ms, 800ms, 1.6s, 3.2s, …, 102.4s, 180s, 180s, …`.
28//! Backoff resets to `attempt = 0` on the first successful publish after an
29//! outage (acceptance: "Backoff resets on successful publish").
30
31use std::fmt::Display;
32use std::future::Future;
33use std::path::Path;
34use std::sync::Arc;
35use std::time::Duration;
36
37use async_trait::async_trait;
38use bytes::Bytes;
39use tokio::sync::Mutex;
40use tokio::time::Instant;
41use tracing::{debug, instrument, warn};
42
43use async_nats::jetstream;
44
45use cellos_core::ports::EventSink;
46use cellos_core::{redact_url_if_echoed_in_text, CellosError, CloudEventV1};
47
48/// Maximum number of publish attempts before giving up.
49const PUBLISH_MAX_ATTEMPTS: u32 = 3;
50/// Base backoff between attempts; doubled (cubed, ...) on subsequent retries.
51/// With base = 100ms and exponent base 2: 100ms, 400ms.
52const PUBLISH_BACKOFF_BASE: Duration = Duration::from_millis(100);
53
54/// Base for the reconnect-state backoff schedule (P3-01).
55///
56/// The first reconnect wait is `RECONNECT_BACKOFF_BASE` (100ms). Subsequent
57/// waits double until the cap [`RECONNECT_BACKOFF_CAP`].
58const RECONNECT_BACKOFF_BASE: Duration = Duration::from_millis(100);
59
60/// Cap on the reconnect-state backoff: never wait longer than 3 minutes
61/// before the next probe (P3-01 acceptance).
62const RECONNECT_BACKOFF_CAP: Duration = Duration::from_secs(180);
63
64/// Compute the backoff for reconnect attempt `attempt` (0-indexed).
65///
66/// `attempt = 0` → `RECONNECT_BACKOFF_BASE` (100ms).
67/// `attempt = N` → `RECONNECT_BACKOFF_BASE * 2^N`, clamped to
68/// [`RECONNECT_BACKOFF_CAP`] (180s). Saturates safely on overflow.
69pub(crate) fn reconnect_backoff(attempt: u32) -> Duration {
70 // 2^attempt, saturating; clamp to cap to avoid Duration overflow.
71 let factor = 1u64.checked_shl(attempt).unwrap_or(u64::MAX);
72 let raw = RECONNECT_BACKOFF_BASE
73 .checked_mul(u32::try_from(factor).unwrap_or(u32::MAX))
74 .unwrap_or(RECONNECT_BACKOFF_CAP);
75 raw.min(RECONNECT_BACKOFF_CAP)
76}
77
78/// Subject template placeholder for the per-tenant isolation dimension (A2-03).
79///
80/// When the configured subject template contains this token, [`resolve_tenant_subject`]
81/// substitutes the event's `data.tenantId` field at publish time. Templates without
82/// the placeholder are forwarded verbatim — single-tenant deployments produce
83/// byte-identical wire output to pre-A2-03 builds.
84pub const TENANT_ID_PLACEHOLDER: &str = "{tenantId}";
85
86/// Sentinel value substituted for `{tenantId}` when the event carries no tenant.
87///
88/// Chosen so the resulting subject is always a valid NATS token (no dots, no
89/// wildcards). Operators running multi-tenant streams who want a hard guarantee
90/// that untenanted events never land on a tenant subject can configure their
91/// JetStream stream filter to exclude `cellos.events.single.*`.
92pub const TENANT_ID_DEFAULT_TOKEN: &str = "single";
93
94/// Resolve a JetStream subject for `event` against `template`.
95///
96/// - Templates without [`TENANT_ID_PLACEHOLDER`] are returned unchanged
97/// (single-tenant noop, byte-identical to pre-A2-03 behaviour).
98/// - When the placeholder is present, it is replaced with `event.data.tenantId`
99/// if the field is set, or [`TENANT_ID_DEFAULT_TOKEN`] otherwise.
100///
101/// The tenant id read here comes from `data.tenantId`, which the cellos-core
102/// event constructors mirror from `spec.correlation.tenantId` (A2-03). See
103/// `cellos_core::events::lifecycle_started_data_v1`.
104pub fn resolve_tenant_subject(template: &str, event: &CloudEventV1) -> String {
105 if !template.contains(TENANT_ID_PLACEHOLDER) {
106 return template.to_string();
107 }
108 let tenant = event
109 .data
110 .as_ref()
111 .and_then(|d| d.get("tenantId"))
112 .and_then(|v| v.as_str())
113 .unwrap_or(TENANT_ID_DEFAULT_TOKEN);
114 template.replace(TENANT_ID_PLACEHOLDER, tenant)
115}
116
117/// Run `f` up to `max_attempts` times with exponential backoff on failure.
118///
119/// Backoff between attempt N and attempt N+1 is `base * 4^(N-1)` (using a
120/// factor of 4 so attempts 1→2→3 wait 100ms then 400ms with `base = 100ms`).
121/// Each retry emits a `tracing::warn!` with the attempt number and the error.
122/// On the final failure the error is returned to the caller; no warn is
123/// emitted for that final failure (the caller handles terminal logging).
124///
125/// `f` is called fresh for each attempt — it must be a closure that produces
126/// a new future, not a single future polled multiple times.
127pub async fn with_retry<F, Fut, T, E>(max_attempts: u32, mut f: F) -> Result<T, E>
128where
129 F: FnMut() -> Fut,
130 Fut: Future<Output = Result<T, E>>,
131 E: Display,
132{
133 let mut attempt: u32 = 1;
134 loop {
135 match f().await {
136 Ok(value) => return Ok(value),
137 Err(err) if attempt >= max_attempts => return Err(err),
138 Err(err) => {
139 // base * 4^(attempt-1): 1→100ms, 2→400ms, 3→1.6s, ...
140 let backoff = PUBLISH_BACKOFF_BASE.saturating_mul(4u32.saturating_pow(attempt - 1));
141 warn!(
142 attempt,
143 max_attempts,
144 backoff_ms = backoff.as_millis() as u64,
145 error = %err,
146 "publish attempt failed; retrying after backoff"
147 );
148 tokio::time::sleep(backoff).await;
149 attempt += 1;
150 }
151 }
152 }
153}
154
155/// Reconnect state for [`JetStreamEventSink`].
156///
157/// `Connected` is the steady state. We move to `Reconnecting { .. }` after a
158/// publish error and stay there until a probe publish succeeds.
159///
160/// Public + `#[doc(hidden)]` so `tests/reconnect.rs` can pattern-match on
161/// the snapshot returned by [`JetStreamEventSink::debug_state`]; not part
162/// of the stable surface area.
163#[doc(hidden)]
164#[derive(Debug, Clone)]
165pub enum ReconnectState {
166 /// Last observed publish succeeded; emit takes the fast path.
167 Connected,
168 /// Last observed publish failed. `attempt` is the 0-indexed reconnect
169 /// attempt count (incremented on each consecutive failure). `next_after`
170 /// is the earliest [`Instant`] at which the sink will take its next
171 /// probe attempt; before that, `emit()` returns Err immediately.
172 Reconnecting {
173 /// Number of consecutive failed reconnect probes (0 means "we just
174 /// transitioned to Reconnecting and the next probe is allowed once
175 /// `next_after` elapses").
176 attempt: u32,
177 /// Earliest instant at which the next probe attempt is allowed.
178 next_after: Instant,
179 },
180}
181
182/// Trait abstraction over JetStream publish, so the reconnect state machine
183/// can be exercised by integration tests with a mock that simulates broker
184/// outages.
185///
186/// This is `pub` (with `#[doc(hidden)]`) only so `tests/reconnect.rs` can
187/// drive the state machine without a real NATS server; production wiring
188/// uses the inherent `connect*` constructors and never sees this trait.
189#[doc(hidden)]
190#[async_trait]
191pub trait Publisher: Send + Sync {
192 /// Attempt to publish `payload` to `subject`.
193 async fn publish(&self, subject: String, payload: Bytes) -> Result<(), CellosError>;
194}
195
196/// Real `Publisher` backed by an `async_nats::jetstream::Context`.
197struct JetStreamPublisher {
198 context: jetstream::Context,
199}
200
201#[async_trait]
202impl Publisher for JetStreamPublisher {
203 async fn publish(&self, subject: String, payload: Bytes) -> Result<(), CellosError> {
204 self.context
205 .publish(subject, payload)
206 .await
207 .map_err(|e| CellosError::EventSink(format!("jetstream publish: {e}")))?;
208 Ok(())
209 }
210}
211
212/// Publishes serialized [`CloudEventV1`] to JetStream.
213///
214/// See module docs for the reconnect / circuit-breaker semantics.
215pub struct JetStreamEventSink {
216 publisher: Arc<dyn Publisher>,
217 subject: String,
218 state: Arc<Mutex<ReconnectState>>,
219}
220
221impl JetStreamEventSink {
222 /// Connect to NATS and wrap the JetStream context. Stream must exist or server auto-creates per NATS config.
223 pub async fn connect(nats_url: &str, subject: impl Into<String>) -> Result<Self, CellosError> {
224 Self::connect_with_root_ca(nats_url, subject, None).await
225 }
226
227 /// Like [`Self::connect`], but trust an extra PEM root (e.g. dev CA via `NATS_CA_FILE` for `tls://` URLs).
228 pub async fn connect_with_root_ca(
229 nats_url: &str,
230 subject: impl Into<String>,
231 root_ca_pem_file: Option<&Path>,
232 ) -> Result<Self, CellosError> {
233 let mut opts = async_nats::ConnectOptions::new();
234 if let Some(path) = root_ca_pem_file {
235 opts = opts.add_root_certificates(path.to_path_buf());
236 }
237 let conn = opts.connect(nats_url).await.map_err(|e| {
238 let msg = redact_url_if_echoed_in_text(&e.to_string(), nats_url);
239 CellosError::EventSink(format!("nats connect: {msg}"))
240 })?;
241 let context = jetstream::new(conn);
242 Ok(Self {
243 publisher: Arc::new(JetStreamPublisher { context }),
244 subject: subject.into(),
245 state: Arc::new(Mutex::new(ReconnectState::Connected)),
246 })
247 }
248
249 /// Test-only constructor: build a sink around an arbitrary `Publisher`
250 /// (used by `tests/reconnect.rs` to simulate broker outages without
251 /// running a real NATS server). Hidden from rustdoc and not intended
252 /// for production callers — production wiring uses [`Self::connect`].
253 #[doc(hidden)]
254 pub fn from_publisher(publisher: Arc<dyn Publisher>, subject: impl Into<String>) -> Self {
255 Self {
256 publisher,
257 subject: subject.into(),
258 state: Arc::new(Mutex::new(ReconnectState::Connected)),
259 }
260 }
261
262 /// Snapshot the current reconnect state. Hidden from rustdoc; exists
263 /// so the reconnect integration test can verify state transitions.
264 #[doc(hidden)]
265 pub async fn debug_state(&self) -> ReconnectState {
266 self.state.lock().await.clone()
267 }
268}
269
270#[async_trait]
271impl EventSink for JetStreamEventSink {
272 #[instrument(skip(self, event), fields(ce_id = %event.id, ce_type = %event.ty))]
273 async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
274 let payload = serde_json::to_vec(event)
275 .map_err(|e| CellosError::EventSink(format!("serialize CloudEvent: {e}")))?;
276 let payload = Bytes::from(payload);
277
278 // Circuit-breaker check: if we are mid-outage and the next probe is
279 // not yet due, return Err immediately. This is the bound that keeps
280 // the supervisor's emit() path from blocking on broker outages.
281 {
282 let state = self.state.lock().await;
283 if let ReconnectState::Reconnecting {
284 attempt,
285 next_after,
286 } = *state
287 {
288 let now = Instant::now();
289 if now < next_after {
290 let wait_ms = next_after.saturating_duration_since(now).as_millis() as u64;
291 return Err(CellosError::EventSink(format!(
292 "jetstream sink in reconnecting state (attempt={attempt}, next probe in {wait_ms}ms)"
293 )));
294 }
295 }
296 }
297
298 // Resolve `{tenantId}` substitution per event (A2-03). When the
299 // configured subject does not contain the placeholder this is a
300 // copy-only noop: single-tenant deployments still publish to the
301 // exact subject they configured.
302 let resolved_subject = resolve_tenant_subject(&self.subject, event);
303
304 // Probe / steady-state publish path. `with_retry` handles transient
305 // hiccups inside a single emit() call; persistent failure is what
306 // drives the reconnect state machine below.
307 let publish_result = with_retry(PUBLISH_MAX_ATTEMPTS, || {
308 let publisher = Arc::clone(&self.publisher);
309 let subject = resolved_subject.clone();
310 let payload = payload.clone();
311 async move { publisher.publish(subject, payload).await }
312 })
313 .await;
314
315 match publish_result {
316 Ok(()) => {
317 // Success path: reset state to Connected (resetting backoff
318 // is the explicit P3-01 acceptance).
319 let mut state = self.state.lock().await;
320 if matches!(*state, ReconnectState::Reconnecting { .. }) {
321 debug!("jetstream sink: publish recovered; resetting backoff to Connected");
322 }
323 *state = ReconnectState::Connected;
324 Ok(())
325 }
326 Err(e) => {
327 // Failure path: bump the reconnect-state attempt counter and
328 // schedule the next probe.
329 let mut state = self.state.lock().await;
330 let next_attempt = match *state {
331 ReconnectState::Connected => 0,
332 ReconnectState::Reconnecting { attempt, .. } => attempt.saturating_add(1),
333 };
334 let backoff = reconnect_backoff(next_attempt);
335 let next_after = Instant::now() + backoff;
336 warn!(
337 attempt = next_attempt,
338 backoff_ms = backoff.as_millis() as u64,
339 error = %e,
340 "jetstream sink: publish failed; entering reconnecting state"
341 );
342 *state = ReconnectState::Reconnecting {
343 attempt: next_attempt,
344 next_after,
345 };
346 Err(e)
347 }
348 }
349 }
350}
351
352#[cfg(test)]
353mod tests {
354 use super::*;
355 use std::sync::atomic::{AtomicU32, Ordering};
356 use std::sync::Arc;
357
358 /// Pause tokio's clock so backoff sleeps don't burn real wall time in
359 /// tests; auto-advance any pending sleeps as soon as nothing else is
360 /// runnable on the runtime.
361 fn rt() -> tokio::runtime::Runtime {
362 tokio::runtime::Builder::new_current_thread()
363 .enable_time()
364 .start_paused(true)
365 .build()
366 .unwrap()
367 }
368
369 #[test]
370 fn with_retry_succeeds_on_first_try() {
371 let calls = Arc::new(AtomicU32::new(0));
372 let result: Result<u32, &'static str> = rt().block_on(async {
373 let calls = calls.clone();
374 with_retry(3, || {
375 let calls = calls.clone();
376 async move {
377 calls.fetch_add(1, Ordering::SeqCst);
378 Ok::<u32, &'static str>(42)
379 }
380 })
381 .await
382 });
383 assert_eq!(result, Ok(42));
384 assert_eq!(calls.load(Ordering::SeqCst), 1);
385 }
386
387 #[test]
388 fn with_retry_recovers_after_transient_failures() {
389 let calls = Arc::new(AtomicU32::new(0));
390 let calls_for_assert = calls.clone();
391 let result: Result<&'static str, &'static str> = rt().block_on(async move {
392 with_retry(3, || {
393 let calls = calls.clone();
394 async move {
395 let n = calls.fetch_add(1, Ordering::SeqCst) + 1;
396 if n < 3 {
397 Err("transient")
398 } else {
399 Ok("ok")
400 }
401 }
402 })
403 .await
404 });
405 assert_eq!(result, Ok("ok"));
406 assert_eq!(calls_for_assert.load(Ordering::SeqCst), 3);
407 }
408
409 #[test]
410 fn with_retry_returns_last_error_after_exhaustion() {
411 let calls = Arc::new(AtomicU32::new(0));
412 let calls_for_assert = calls.clone();
413 let result: Result<(), String> = rt().block_on(async move {
414 with_retry(3, || {
415 let calls = calls.clone();
416 async move {
417 let n = calls.fetch_add(1, Ordering::SeqCst) + 1;
418 Err::<(), String>(format!("fail-{n}"))
419 }
420 })
421 .await
422 });
423 assert_eq!(result, Err("fail-3".into()));
424 assert_eq!(calls_for_assert.load(Ordering::SeqCst), 3);
425 }
426
427 fn ce(data: Option<serde_json::Value>) -> CloudEventV1 {
428 CloudEventV1 {
429 specversion: "1.0".into(),
430 id: "ce-1".into(),
431 source: "test".into(),
432 ty: "dev.cellos.events.cell.lifecycle.v1.started".into(),
433 datacontenttype: Some("application/json".into()),
434 data,
435 time: None,
436 traceparent: None,
437 }
438 }
439
440 #[test]
441 fn resolve_tenant_subject_template_without_placeholder_is_passthrough() {
442 let event = ce(Some(serde_json::json!({"tenantId": "acme"})));
443 assert_eq!(
444 resolve_tenant_subject("cellos.events.v1", &event),
445 "cellos.events.v1"
446 );
447 }
448
449 #[test]
450 fn resolve_tenant_subject_substitutes_when_tenant_present() {
451 let event = ce(Some(serde_json::json!({"tenantId": "acme"})));
452 assert_eq!(
453 resolve_tenant_subject("cellos.events.{tenantId}.v1", &event),
454 "cellos.events.acme.v1"
455 );
456 }
457
458 #[test]
459 fn resolve_tenant_subject_uses_sentinel_when_tenant_absent() {
460 let event = ce(Some(serde_json::json!({"cellId": "c1"})));
461 assert_eq!(
462 resolve_tenant_subject("cellos.events.{tenantId}.v1", &event),
463 "cellos.events.single.v1"
464 );
465 }
466
467 #[test]
468 fn resolve_tenant_subject_uses_sentinel_when_data_missing() {
469 let event = ce(None);
470 assert_eq!(
471 resolve_tenant_subject("cellos.events.{tenantId}.v1", &event),
472 "cellos.events.single.v1"
473 );
474 }
475
476 #[test]
477 fn with_retry_single_attempt_does_not_retry() {
478 let calls = Arc::new(AtomicU32::new(0));
479 let calls_for_assert = calls.clone();
480 let result: Result<(), &'static str> = rt().block_on(async move {
481 with_retry(1, || {
482 let calls = calls.clone();
483 async move {
484 calls.fetch_add(1, Ordering::SeqCst);
485 Err::<(), &'static str>("nope")
486 }
487 })
488 .await
489 });
490 assert_eq!(result, Err("nope"));
491 assert_eq!(calls_for_assert.load(Ordering::SeqCst), 1);
492 }
493
494 #[test]
495 fn reconnect_backoff_schedule_is_exponential_and_capped() {
496 // 0 → 100ms, 1 → 200ms, 2 → 400ms, 3 → 800ms, 4 → 1.6s, ...
497 assert_eq!(reconnect_backoff(0), Duration::from_millis(100));
498 assert_eq!(reconnect_backoff(1), Duration::from_millis(200));
499 assert_eq!(reconnect_backoff(2), Duration::from_millis(400));
500 assert_eq!(reconnect_backoff(3), Duration::from_millis(800));
501 assert_eq!(reconnect_backoff(4), Duration::from_millis(1600));
502 // Cap at 3 minutes regardless of how high the attempt count climbs.
503 assert_eq!(reconnect_backoff(20), RECONNECT_BACKOFF_CAP);
504 assert_eq!(reconnect_backoff(u32::MAX), RECONNECT_BACKOFF_CAP);
505 }
506}