Skip to main content

cellos_sink_jetstream/
lib.rs

1//! JetStream [`cellos_core::ports::EventSink`] — publishes JSON payloads to a configured subject.
2//!
3//! ## Reconnect / circuit-breaker model (P3-01)
4//!
5//! `JetStreamEventSink` keeps a small in-memory state machine in front of the
6//! underlying NATS connection so an extended broker outage cannot stall the
7//! supervisor's `emit()` path:
8//!
9//! - **`Connected`** — last publish succeeded. Next `emit()` calls
10//!   [`Publisher::publish`] directly (with a small bounded retry, see
11//!   [`with_retry`]).
12//! - **`Reconnecting { attempt, next_after }`** — last publish failed.
13//!   `emit()` returns `Err` immediately while `Instant::now() < next_after`
14//!   (the circuit is "open"). Once `next_after` has elapsed the sink takes
15//!   one bounded probe attempt; on success the state resets to `Connected`,
16//!   on failure `attempt` is incremented and `next_after` is pushed forward
17//!   by an exponentially-backed-off interval.
18//!
19//! The async-nats `Client` performs its own TCP-level reconnect under the
20//! hood — this state machine is layered on top so that the supervisor's
21//! `emit()` returns Err quickly during an outage instead of being held by
22//! the underlying client's blocking publish path. Operators decide what to
23//! do with that Err (typically log and continue).
24//!
25//! Backoff schedule: `100ms * 2^attempt` with `attempt` saturating at the
26//! cap that keeps the value below 3 minutes (`180s`). So the sequence is
27//! `100ms, 200ms, 400ms, 800ms, 1.6s, 3.2s, …, 102.4s, 180s, 180s, …`.
28//! Backoff resets to `attempt = 0` on the first successful publish after an
29//! outage (acceptance: "Backoff resets on successful publish").
30
31use std::fmt::Display;
32use std::future::Future;
33use std::path::Path;
34use std::sync::Arc;
35use std::time::Duration;
36
37use async_trait::async_trait;
38use bytes::Bytes;
39use tokio::sync::Mutex;
40use tokio::time::Instant;
41use tracing::{debug, instrument, warn};
42
43use async_nats::jetstream;
44
45use cellos_core::ports::EventSink;
46use cellos_core::{redact_url_if_echoed_in_text, CellosError, CloudEventV1};
47
48/// Maximum number of publish attempts before giving up.
49const PUBLISH_MAX_ATTEMPTS: u32 = 3;
50/// Base backoff between attempts; doubled (cubed, ...) on subsequent retries.
51/// With base = 100ms and exponent base 2: 100ms, 400ms.
52const PUBLISH_BACKOFF_BASE: Duration = Duration::from_millis(100);
53
54/// Base for the reconnect-state backoff schedule (P3-01).
55///
56/// The first reconnect wait is `RECONNECT_BACKOFF_BASE` (100ms). Subsequent
57/// waits double until the cap [`RECONNECT_BACKOFF_CAP`].
58const RECONNECT_BACKOFF_BASE: Duration = Duration::from_millis(100);
59
60/// Cap on the reconnect-state backoff: never wait longer than 3 minutes
61/// before the next probe (P3-01 acceptance).
62const RECONNECT_BACKOFF_CAP: Duration = Duration::from_secs(180);
63
64/// Compute the backoff for reconnect attempt `attempt` (0-indexed).
65///
66/// `attempt = 0` → `RECONNECT_BACKOFF_BASE` (100ms).
67/// `attempt = N` → `RECONNECT_BACKOFF_BASE * 2^N`, clamped to
68/// [`RECONNECT_BACKOFF_CAP`] (180s). Saturates safely on overflow.
69pub(crate) fn reconnect_backoff(attempt: u32) -> Duration {
70    // 2^attempt, saturating; clamp to cap to avoid Duration overflow.
71    let factor = 1u64.checked_shl(attempt).unwrap_or(u64::MAX);
72    let raw = RECONNECT_BACKOFF_BASE
73        .checked_mul(u32::try_from(factor).unwrap_or(u32::MAX))
74        .unwrap_or(RECONNECT_BACKOFF_CAP);
75    raw.min(RECONNECT_BACKOFF_CAP)
76}
77
78/// Subject template placeholder for the per-tenant isolation dimension (A2-03).
79///
80/// When the configured subject template contains this token, [`resolve_tenant_subject`]
81/// substitutes the event's `data.tenantId` field at publish time. Templates without
82/// the placeholder are forwarded verbatim — single-tenant deployments produce
83/// byte-identical wire output to pre-A2-03 builds.
84pub const TENANT_ID_PLACEHOLDER: &str = "{tenantId}";
85
86/// Sentinel value substituted for `{tenantId}` when the event carries no tenant.
87///
88/// Chosen so the resulting subject is always a valid NATS token (no dots, no
89/// wildcards). Operators running multi-tenant streams who want a hard guarantee
90/// that untenanted events never land on a tenant subject can configure their
91/// JetStream stream filter to exclude `cellos.events.single.*`.
92pub const TENANT_ID_DEFAULT_TOKEN: &str = "single";
93
94/// Resolve a JetStream subject for `event` against `template`.
95///
96/// - Templates without [`TENANT_ID_PLACEHOLDER`] are returned unchanged
97///   (single-tenant noop, byte-identical to pre-A2-03 behaviour).
98/// - When the placeholder is present, it is replaced with `event.data.tenantId`
99///   if the field is set, or [`TENANT_ID_DEFAULT_TOKEN`] otherwise.
100///
101/// The tenant id read here comes from `data.tenantId`, which the cellos-core
102/// event constructors mirror from `spec.correlation.tenantId` (A2-03). See
103/// `cellos_core::events::lifecycle_started_data_v1`.
104pub fn resolve_tenant_subject(template: &str, event: &CloudEventV1) -> String {
105    if !template.contains(TENANT_ID_PLACEHOLDER) {
106        return template.to_string();
107    }
108    let tenant = event
109        .data
110        .as_ref()
111        .and_then(|d| d.get("tenantId"))
112        .and_then(|v| v.as_str())
113        .unwrap_or(TENANT_ID_DEFAULT_TOKEN);
114    template.replace(TENANT_ID_PLACEHOLDER, tenant)
115}
116
117/// Run `f` up to `max_attempts` times with exponential backoff on failure.
118///
119/// Backoff between attempt N and attempt N+1 is `base * 4^(N-1)` (using a
120/// factor of 4 so attempts 1→2→3 wait 100ms then 400ms with `base = 100ms`).
121/// Each retry emits a `tracing::warn!` with the attempt number and the error.
122/// On the final failure the error is returned to the caller; no warn is
123/// emitted for that final failure (the caller handles terminal logging).
124///
125/// `f` is called fresh for each attempt — it must be a closure that produces
126/// a new future, not a single future polled multiple times.
127pub async fn with_retry<F, Fut, T, E>(max_attempts: u32, mut f: F) -> Result<T, E>
128where
129    F: FnMut() -> Fut,
130    Fut: Future<Output = Result<T, E>>,
131    E: Display,
132{
133    let mut attempt: u32 = 1;
134    loop {
135        match f().await {
136            Ok(value) => return Ok(value),
137            Err(err) if attempt >= max_attempts => return Err(err),
138            Err(err) => {
139                // base * 4^(attempt-1): 1→100ms, 2→400ms, 3→1.6s, ...
140                let backoff = PUBLISH_BACKOFF_BASE.saturating_mul(4u32.saturating_pow(attempt - 1));
141                warn!(
142                    attempt,
143                    max_attempts,
144                    backoff_ms = backoff.as_millis() as u64,
145                    error = %err,
146                    "publish attempt failed; retrying after backoff"
147                );
148                tokio::time::sleep(backoff).await;
149                attempt += 1;
150            }
151        }
152    }
153}
154
155/// Reconnect state for [`JetStreamEventSink`].
156///
157/// `Connected` is the steady state. We move to `Reconnecting { .. }` after a
158/// publish error and stay there until a probe publish succeeds.
159///
160/// Public + `#[doc(hidden)]` so `tests/reconnect.rs` can pattern-match on
161/// the snapshot returned by [`JetStreamEventSink::debug_state`]; not part
162/// of the stable surface area.
163#[doc(hidden)]
164#[derive(Debug, Clone)]
165pub enum ReconnectState {
166    /// Last observed publish succeeded; emit takes the fast path.
167    Connected,
168    /// Last observed publish failed. `attempt` is the 0-indexed reconnect
169    /// attempt count (incremented on each consecutive failure). `next_after`
170    /// is the earliest [`Instant`] at which the sink will take its next
171    /// probe attempt; before that, `emit()` returns Err immediately.
172    Reconnecting {
173        /// Number of consecutive failed reconnect probes (0 means "we just
174        /// transitioned to Reconnecting and the next probe is allowed once
175        /// `next_after` elapses").
176        attempt: u32,
177        /// Earliest instant at which the next probe attempt is allowed.
178        next_after: Instant,
179    },
180}
181
182/// Trait abstraction over JetStream publish, so the reconnect state machine
183/// can be exercised by integration tests with a mock that simulates broker
184/// outages.
185///
186/// This is `pub` (with `#[doc(hidden)]`) only so `tests/reconnect.rs` can
187/// drive the state machine without a real NATS server; production wiring
188/// uses the inherent `connect*` constructors and never sees this trait.
189#[doc(hidden)]
190#[async_trait]
191pub trait Publisher: Send + Sync {
192    /// Attempt to publish `payload` to `subject`.
193    async fn publish(&self, subject: String, payload: Bytes) -> Result<(), CellosError>;
194}
195
196/// Real `Publisher` backed by an `async_nats::jetstream::Context`.
197struct JetStreamPublisher {
198    context: jetstream::Context,
199}
200
201#[async_trait]
202impl Publisher for JetStreamPublisher {
203    async fn publish(&self, subject: String, payload: Bytes) -> Result<(), CellosError> {
204        self.context
205            .publish(subject, payload)
206            .await
207            .map_err(|e| CellosError::EventSink(format!("jetstream publish: {e}")))?;
208        Ok(())
209    }
210}
211
212/// Publishes serialized [`CloudEventV1`] to JetStream.
213///
214/// See module docs for the reconnect / circuit-breaker semantics.
215pub struct JetStreamEventSink {
216    publisher: Arc<dyn Publisher>,
217    subject: String,
218    state: Arc<Mutex<ReconnectState>>,
219}
220
221impl JetStreamEventSink {
222    /// Connect to NATS and wrap the JetStream context. Stream must exist or server auto-creates per NATS config.
223    pub async fn connect(nats_url: &str, subject: impl Into<String>) -> Result<Self, CellosError> {
224        Self::connect_with_root_ca(nats_url, subject, None).await
225    }
226
227    /// Like [`Self::connect`], but trust an extra PEM root (e.g. dev CA via `NATS_CA_FILE` for `tls://` URLs).
228    pub async fn connect_with_root_ca(
229        nats_url: &str,
230        subject: impl Into<String>,
231        root_ca_pem_file: Option<&Path>,
232    ) -> Result<Self, CellosError> {
233        let mut opts = async_nats::ConnectOptions::new();
234        if let Some(path) = root_ca_pem_file {
235            opts = opts.add_root_certificates(path.to_path_buf());
236        }
237        let conn = opts.connect(nats_url).await.map_err(|e| {
238            let msg = redact_url_if_echoed_in_text(&e.to_string(), nats_url);
239            CellosError::EventSink(format!("nats connect: {msg}"))
240        })?;
241        let context = jetstream::new(conn);
242        Ok(Self {
243            publisher: Arc::new(JetStreamPublisher { context }),
244            subject: subject.into(),
245            state: Arc::new(Mutex::new(ReconnectState::Connected)),
246        })
247    }
248
249    /// Test-only constructor: build a sink around an arbitrary `Publisher`
250    /// (used by `tests/reconnect.rs` to simulate broker outages without
251    /// running a real NATS server). Hidden from rustdoc and not intended
252    /// for production callers — production wiring uses [`Self::connect`].
253    #[doc(hidden)]
254    pub fn from_publisher(publisher: Arc<dyn Publisher>, subject: impl Into<String>) -> Self {
255        Self {
256            publisher,
257            subject: subject.into(),
258            state: Arc::new(Mutex::new(ReconnectState::Connected)),
259        }
260    }
261
262    /// Snapshot the current reconnect state. Hidden from rustdoc; exists
263    /// so the reconnect integration test can verify state transitions.
264    #[doc(hidden)]
265    pub async fn debug_state(&self) -> ReconnectState {
266        self.state.lock().await.clone()
267    }
268}
269
270#[async_trait]
271impl EventSink for JetStreamEventSink {
272    #[instrument(skip(self, event), fields(ce_id = %event.id, ce_type = %event.ty))]
273    async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
274        let payload = serde_json::to_vec(event)
275            .map_err(|e| CellosError::EventSink(format!("serialize CloudEvent: {e}")))?;
276        let payload = Bytes::from(payload);
277
278        // Circuit-breaker check: if we are mid-outage and the next probe is
279        // not yet due, return Err immediately. This is the bound that keeps
280        // the supervisor's emit() path from blocking on broker outages.
281        {
282            let state = self.state.lock().await;
283            if let ReconnectState::Reconnecting {
284                attempt,
285                next_after,
286            } = *state
287            {
288                let now = Instant::now();
289                if now < next_after {
290                    let wait_ms = next_after.saturating_duration_since(now).as_millis() as u64;
291                    return Err(CellosError::EventSink(format!(
292                        "jetstream sink in reconnecting state (attempt={attempt}, next probe in {wait_ms}ms)"
293                    )));
294                }
295            }
296        }
297
298        // Resolve `{tenantId}` substitution per event (A2-03). When the
299        // configured subject does not contain the placeholder this is a
300        // copy-only noop: single-tenant deployments still publish to the
301        // exact subject they configured.
302        let resolved_subject = resolve_tenant_subject(&self.subject, event);
303
304        // Probe / steady-state publish path. `with_retry` handles transient
305        // hiccups inside a single emit() call; persistent failure is what
306        // drives the reconnect state machine below.
307        let publish_result = with_retry(PUBLISH_MAX_ATTEMPTS, || {
308            let publisher = Arc::clone(&self.publisher);
309            let subject = resolved_subject.clone();
310            let payload = payload.clone();
311            async move { publisher.publish(subject, payload).await }
312        })
313        .await;
314
315        match publish_result {
316            Ok(()) => {
317                // Success path: reset state to Connected (resetting backoff
318                // is the explicit P3-01 acceptance).
319                let mut state = self.state.lock().await;
320                if matches!(*state, ReconnectState::Reconnecting { .. }) {
321                    debug!("jetstream sink: publish recovered; resetting backoff to Connected");
322                }
323                *state = ReconnectState::Connected;
324                Ok(())
325            }
326            Err(e) => {
327                // Failure path: bump the reconnect-state attempt counter and
328                // schedule the next probe.
329                let mut state = self.state.lock().await;
330                let next_attempt = match *state {
331                    ReconnectState::Connected => 0,
332                    ReconnectState::Reconnecting { attempt, .. } => attempt.saturating_add(1),
333                };
334                let backoff = reconnect_backoff(next_attempt);
335                let next_after = Instant::now() + backoff;
336                warn!(
337                    attempt = next_attempt,
338                    backoff_ms = backoff.as_millis() as u64,
339                    error = %e,
340                    "jetstream sink: publish failed; entering reconnecting state"
341                );
342                *state = ReconnectState::Reconnecting {
343                    attempt: next_attempt,
344                    next_after,
345                };
346                Err(e)
347            }
348        }
349    }
350}
351
352#[cfg(test)]
353mod tests {
354    use super::*;
355    use std::sync::atomic::{AtomicU32, Ordering};
356    use std::sync::Arc;
357
358    /// Pause tokio's clock so backoff sleeps don't burn real wall time in
359    /// tests; auto-advance any pending sleeps as soon as nothing else is
360    /// runnable on the runtime.
361    fn rt() -> tokio::runtime::Runtime {
362        tokio::runtime::Builder::new_current_thread()
363            .enable_time()
364            .start_paused(true)
365            .build()
366            .unwrap()
367    }
368
369    #[test]
370    fn with_retry_succeeds_on_first_try() {
371        let calls = Arc::new(AtomicU32::new(0));
372        let result: Result<u32, &'static str> = rt().block_on(async {
373            let calls = calls.clone();
374            with_retry(3, || {
375                let calls = calls.clone();
376                async move {
377                    calls.fetch_add(1, Ordering::SeqCst);
378                    Ok::<u32, &'static str>(42)
379                }
380            })
381            .await
382        });
383        assert_eq!(result, Ok(42));
384        assert_eq!(calls.load(Ordering::SeqCst), 1);
385    }
386
387    #[test]
388    fn with_retry_recovers_after_transient_failures() {
389        let calls = Arc::new(AtomicU32::new(0));
390        let calls_for_assert = calls.clone();
391        let result: Result<&'static str, &'static str> = rt().block_on(async move {
392            with_retry(3, || {
393                let calls = calls.clone();
394                async move {
395                    let n = calls.fetch_add(1, Ordering::SeqCst) + 1;
396                    if n < 3 {
397                        Err("transient")
398                    } else {
399                        Ok("ok")
400                    }
401                }
402            })
403            .await
404        });
405        assert_eq!(result, Ok("ok"));
406        assert_eq!(calls_for_assert.load(Ordering::SeqCst), 3);
407    }
408
409    #[test]
410    fn with_retry_returns_last_error_after_exhaustion() {
411        let calls = Arc::new(AtomicU32::new(0));
412        let calls_for_assert = calls.clone();
413        let result: Result<(), String> = rt().block_on(async move {
414            with_retry(3, || {
415                let calls = calls.clone();
416                async move {
417                    let n = calls.fetch_add(1, Ordering::SeqCst) + 1;
418                    Err::<(), String>(format!("fail-{n}"))
419                }
420            })
421            .await
422        });
423        assert_eq!(result, Err("fail-3".into()));
424        assert_eq!(calls_for_assert.load(Ordering::SeqCst), 3);
425    }
426
427    fn ce(data: Option<serde_json::Value>) -> CloudEventV1 {
428        CloudEventV1 {
429            specversion: "1.0".into(),
430            id: "ce-1".into(),
431            source: "test".into(),
432            ty: "dev.cellos.events.cell.lifecycle.v1.started".into(),
433            datacontenttype: Some("application/json".into()),
434            data,
435            time: None,
436            traceparent: None,
437        }
438    }
439
440    #[test]
441    fn resolve_tenant_subject_template_without_placeholder_is_passthrough() {
442        let event = ce(Some(serde_json::json!({"tenantId": "acme"})));
443        assert_eq!(
444            resolve_tenant_subject("cellos.events.v1", &event),
445            "cellos.events.v1"
446        );
447    }
448
449    #[test]
450    fn resolve_tenant_subject_substitutes_when_tenant_present() {
451        let event = ce(Some(serde_json::json!({"tenantId": "acme"})));
452        assert_eq!(
453            resolve_tenant_subject("cellos.events.{tenantId}.v1", &event),
454            "cellos.events.acme.v1"
455        );
456    }
457
458    #[test]
459    fn resolve_tenant_subject_uses_sentinel_when_tenant_absent() {
460        let event = ce(Some(serde_json::json!({"cellId": "c1"})));
461        assert_eq!(
462            resolve_tenant_subject("cellos.events.{tenantId}.v1", &event),
463            "cellos.events.single.v1"
464        );
465    }
466
467    #[test]
468    fn resolve_tenant_subject_uses_sentinel_when_data_missing() {
469        let event = ce(None);
470        assert_eq!(
471            resolve_tenant_subject("cellos.events.{tenantId}.v1", &event),
472            "cellos.events.single.v1"
473        );
474    }
475
476    #[test]
477    fn with_retry_single_attempt_does_not_retry() {
478        let calls = Arc::new(AtomicU32::new(0));
479        let calls_for_assert = calls.clone();
480        let result: Result<(), &'static str> = rt().block_on(async move {
481            with_retry(1, || {
482                let calls = calls.clone();
483                async move {
484                    calls.fetch_add(1, Ordering::SeqCst);
485                    Err::<(), &'static str>("nope")
486                }
487            })
488            .await
489        });
490        assert_eq!(result, Err("nope"));
491        assert_eq!(calls_for_assert.load(Ordering::SeqCst), 1);
492    }
493
494    #[test]
495    fn reconnect_backoff_schedule_is_exponential_and_capped() {
496        // 0 → 100ms, 1 → 200ms, 2 → 400ms, 3 → 800ms, 4 → 1.6s, ...
497        assert_eq!(reconnect_backoff(0), Duration::from_millis(100));
498        assert_eq!(reconnect_backoff(1), Duration::from_millis(200));
499        assert_eq!(reconnect_backoff(2), Duration::from_millis(400));
500        assert_eq!(reconnect_backoff(3), Duration::from_millis(800));
501        assert_eq!(reconnect_backoff(4), Duration::from_millis(1600));
502        // Cap at 3 minutes regardless of how high the attempt count climbs.
503        assert_eq!(reconnect_backoff(20), RECONNECT_BACKOFF_CAP);
504        assert_eq!(reconnect_backoff(u32::MAX), RECONNECT_BACKOFF_CAP);
505    }
506}