use std::time::Duration;
use anyhow::Context as _;
use async_nats::jetstream::{
self,
consumer::{
pull::{Config as PullConfig, Stream as PullStream},
AckPolicy, Consumer, DeliverPolicy,
},
context::Context,
stream::{Config as StreamConfig, RetentionPolicy, StorageType, Stream},
};
use futures_util::StreamExt;
use tracing::{debug, info, warn};
use crate::state::{AppState, ApplyOutcome};
pub const STREAM_NAME: &str = "CELLOS_EVENTS";
pub const STREAM_SUBJECT: &str = "cellos.events.>";
const RETENTION_MAX_AGE: Duration = Duration::from_secs(90 * 24 * 60 * 60);
const REPLAY_BATCH: usize = 256;
const REPLAY_BATCH_TIMEOUT: Duration = Duration::from_millis(250);
pub async fn ensure_stream(client: &async_nats::Client) -> anyhow::Result<Context> {
let ctx = jetstream::new(client.clone());
let config = StreamConfig {
name: STREAM_NAME.to_string(),
subjects: vec![STREAM_SUBJECT.to_string()],
retention: RetentionPolicy::Limits,
storage: StorageType::File,
max_age: RETENTION_MAX_AGE,
..Default::default()
};
ctx.get_or_create_stream(config)
.await
.context("get_or_create CELLOS_EVENTS stream")?;
Ok(ctx)
}
pub fn deliver_policy_for(start_seq: Option<u64>) -> DeliverPolicy {
match start_seq {
None => DeliverPolicy::All,
Some(seq) => DeliverPolicy::ByStartSequence {
start_sequence: seq.saturating_add(1),
},
}
}
pub fn deliver_policy_live_tail() -> DeliverPolicy {
DeliverPolicy::New
}
const EPHEMERAL_INACTIVE_THRESHOLD: Duration = Duration::from_secs(5 * 60);
pub fn ephemeral_consumer_config_for(policy: DeliverPolicy, subject: Option<&str>) -> PullConfig {
PullConfig {
durable_name: None,
name: None,
deliver_policy: policy,
ack_policy: AckPolicy::None,
filter_subject: subject.unwrap_or("").to_string(),
inactive_threshold: EPHEMERAL_INACTIVE_THRESHOLD,
..Default::default()
}
}
pub async fn create_ephemeral_consumer(
stream: &Stream,
policy: DeliverPolicy,
subject: Option<&str>,
) -> anyhow::Result<Consumer<PullConfig>> {
let config = ephemeral_consumer_config_for(policy, subject);
stream
.create_consumer(config)
.await
.context("create ephemeral pull consumer")
}
pub async fn replay_projection(state: &AppState, ctx: &Context) -> anyhow::Result<()> {
if std::env::var("CELLOS_SERVER_SKIP_REPLAY")
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
.unwrap_or(false)
{
info!("CELLOS_SERVER_SKIP_REPLAY set; skipping projection replay");
return Ok(());
}
let stream = ctx
.get_stream(STREAM_NAME)
.await
.context("get_stream CELLOS_EVENTS for replay")?;
let consumer = create_ephemeral_consumer(&stream, deliver_policy_for(None), None).await?;
let mut total_seen: u64 = 0;
let mut total_applied: u64 = 0;
let mut highest_seq: u64 = 0;
loop {
let mut batch = consumer
.fetch()
.max_messages(REPLAY_BATCH)
.expires(REPLAY_BATCH_TIMEOUT)
.messages()
.await
.context("replay: pull batch")?;
let mut batch_size: usize = 0;
while let Some(msg) = batch.next().await {
let msg = msg.map_err(|e| anyhow::anyhow!("replay: read message from batch: {e}"))?;
batch_size += 1;
total_seen += 1;
let seq = match msg.info() {
Ok(info) => info.stream_sequence,
Err(e) => {
warn!(error = %e, "replay: message missing JetStream info; skipping");
continue;
}
};
highest_seq = highest_seq.max(seq);
match state.apply_event_payload(&msg.payload).await {
Ok(ApplyOutcome::Applied) => total_applied += 1,
Ok(ApplyOutcome::Ignored) => {}
Err(e) => {
warn!(seq, error = %e, "replay: failed to apply event; skipping");
}
}
state.bump_cursor(seq);
}
if batch_size == 0 {
break;
}
debug!(batch_size, total_seen, "replay batch consumed");
}
let formations = state.formations.read().await.len();
info!(
cursor = highest_seq,
formations,
events_seen = total_seen,
events_applied = total_applied,
"replay_complete"
);
Ok(())
}
pub async fn open_ws_message_stream(
ctx: &Context,
subject: Option<&str>,
since: Option<u64>,
) -> anyhow::Result<PullStream> {
let stream = ctx
.get_stream(STREAM_NAME)
.await
.context("get_stream CELLOS_EVENTS for ws")?;
let policy = match since {
Some(seq) => deliver_policy_for(Some(seq)),
None => deliver_policy_live_tail(),
};
let consumer = create_ephemeral_consumer(&stream, policy, subject).await?;
consumer
.messages()
.await
.context("open consumer messages stream")
}
pub async fn stream_first_seq(ctx: &Context) -> Option<u64> {
match ctx.get_stream(STREAM_NAME).await {
Ok(mut stream) => match stream.info().await {
Ok(info) => Some(info.state.first_sequence),
Err(e) => {
debug!(error = %e, "stream_first_seq: info() failed");
None
}
},
Err(e) => {
debug!(error = %e, "stream_first_seq: get_stream failed");
None
}
}
}
pub fn looks_like_retention_exhausted(err: &anyhow::Error) -> bool {
let chain = format!("{err:#}").to_ascii_lowercase();
chain.contains("optional start sequence")
|| chain.contains("too low")
|| chain.contains("no such sequence")
|| chain.contains("not found in stream")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn deliver_policy_none_is_all() {
assert!(matches!(deliver_policy_for(None), DeliverPolicy::All));
}
#[test]
fn deliver_policy_some_starts_at_seq_plus_one() {
match deliver_policy_for(Some(42)) {
DeliverPolicy::ByStartSequence { start_sequence } => {
assert_eq!(start_sequence, 43);
}
other => panic!("expected ByStartSequence(43), got {other:?}"),
}
}
#[test]
fn deliver_policy_since_zero_starts_at_one() {
match deliver_policy_for(Some(0)) {
DeliverPolicy::ByStartSequence { start_sequence } => {
assert_eq!(start_sequence, 1);
}
other => panic!("expected ByStartSequence(1), got {other:?}"),
}
}
#[test]
fn deliver_policy_since_saturates_at_u64_max() {
match deliver_policy_for(Some(u64::MAX)) {
DeliverPolicy::ByStartSequence { start_sequence } => {
assert_eq!(start_sequence, u64::MAX);
}
other => panic!("expected ByStartSequence(u64::MAX), got {other:?}"),
}
}
#[test]
fn live_tail_is_deliver_new() {
assert!(matches!(deliver_policy_live_tail(), DeliverPolicy::New));
}
#[test]
fn retention_exhausted_matches_known_messages() {
let cases = [
"consumer create failed: optional start sequence 5 is too low",
"no such sequence in stream",
"sequence 7 not found in stream CELLOS_EVENTS",
"ErrConsumerCreate: optional start sequence too low",
];
for case in cases {
let err = anyhow::anyhow!(case.to_string());
assert!(
looks_like_retention_exhausted(&err),
"expected match for {case:?}",
);
}
}
#[test]
fn ephemeral_consumer_config_sets_inactive_threshold() {
let cfg = ephemeral_consumer_config_for(DeliverPolicy::New, None);
assert!(
cfg.inactive_threshold > Duration::ZERO,
"inactive_threshold must be non-zero so broker GCs dead consumers; got {:?}",
cfg.inactive_threshold,
);
assert!(
cfg.inactive_threshold >= Duration::from_secs(60),
"inactive_threshold must ride out transient reconnect; got {:?}",
cfg.inactive_threshold,
);
assert!(cfg.durable_name.is_none(), "consumer must be ephemeral");
assert!(
matches!(cfg.ack_policy, AckPolicy::None),
"consumer must remain AckPolicy::None for read-only projection feed"
);
}
#[test]
fn retention_exhausted_ignores_unrelated_errors() {
let cases = [
"broker connection refused",
"timed out",
"stream not found", ];
for case in cases {
let err = anyhow::anyhow!(case.to_string());
assert!(
!looks_like_retention_exhausted(&err),
"unexpected match for {case:?}",
);
}
}
}