use anyhow::{Context, Result};
use async_nats::jetstream::{self, consumer::PullConsumer};
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tokio_retry::{strategy::ExponentialBackoff, Retry};
use tracing::{debug, error, info, warn};
pub mod consumer;
pub mod lag_monitor;
pub mod publisher;
pub mod sharding;
pub mod streams;
pub mod subjects;
#[cfg(test)]
mod consumer_tests;
#[cfg(test)]
mod lag_monitor_tests;
#[cfg(test)]
mod lib_tests;
#[cfg(test)]
mod publisher_tests;
#[cfg(test)]
mod smith_bus_tests;
#[cfg(test)]
mod streams_tests;
pub use consumer::Consumer;
pub use lag_monitor::*;
pub use publisher::Publisher;
pub use sharding::*;
pub use streams::StreamManager;
pub use subjects::*;
#[derive(Clone)]
pub struct SmithBus {
nats_client: async_nats::Client,
jetstream: jetstream::Context,
}
impl SmithBus {
pub async fn connect(nats_url: &str) -> Result<Self> {
info!("Connecting to NATS server: {}", nats_url);
let nats_client = async_nats::connect(nats_url)
.await
.with_context(|| format!("Failed to connect to NATS server: {}", nats_url))?;
let jetstream = jetstream::new(nats_client.clone());
info!("Successfully connected to NATS server");
Ok(Self {
nats_client,
jetstream,
})
}
pub fn publisher(&self) -> Publisher {
Publisher::new(self.jetstream.clone())
}
pub async fn consumer(&self, capability: &str, config: ConsumerConfig) -> Result<Consumer> {
Consumer::new(self.jetstream.clone(), capability, config).await
}
pub fn stream_manager(&self) -> StreamManager {
StreamManager::new(self.jetstream.clone())
}
pub fn nats_client(&self) -> &async_nats::Client {
&self.nats_client
}
pub fn jetstream(&self) -> &jetstream::Context {
&self.jetstream
}
pub async fn health_check(&self) -> Result<HealthStatus> {
let nats_connected = Self::check_nats_connectivity(&self.nats_client);
let jetstream_available = self.check_jetstream_availability().await;
Ok(HealthStatus {
nats_connected,
jetstream_available,
})
}
fn check_nats_connectivity(nats_client: &async_nats::Client) -> bool {
matches!(
nats_client.connection_state(),
async_nats::connection::State::Connected
)
}
async fn check_jetstream_availability(&self) -> bool {
match tokio::time::timeout(Duration::from_secs(1), async {
self.jetstream
.stream_names()
.try_collect::<Vec<String>>()
.await
})
.await
{
Ok(Ok(_)) => true,
Ok(Err(e)) => {
warn!("JetStream not available: {}", e);
false
}
Err(_) => {
warn!("JetStream timeout - may not be available");
false
}
}
}
pub async fn publish<T: Serialize>(&self, subject: String, message: &T) -> Result<()> {
self.publisher().publish(subject, message).await
}
pub async fn publish_with_headers<T: Serialize>(
&self,
subject: String,
headers: async_nats::HeaderMap,
message: &T,
) -> Result<()> {
self.publisher()
.publish_with_headers(subject, headers, message)
.await
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct HealthStatus {
pub nats_connected: bool,
pub jetstream_available: bool,
}
impl HealthStatus {
pub fn is_healthy(&self) -> bool {
self.nats_connected && self.jetstream_available
}
}
#[derive(Debug, Clone)]
pub struct ConsumerConfig {
pub name: String,
pub consumer_group: Option<String>,
pub max_deliver: i64,
pub ack_wait: Duration,
pub max_age: Option<Duration>,
pub start_sequence: ConsumerStartSequence,
pub worker_count: usize,
}
impl Default for ConsumerConfig {
fn default() -> Self {
Self {
name: format!("consumer-{}", uuid::Uuid::new_v4()),
consumer_group: None,
max_deliver: 3, ack_wait: Duration::from_secs(30),
max_age: Some(Duration::from_secs(24 * 60 * 60)),
start_sequence: ConsumerStartSequence::Latest,
worker_count: 1,
}
}
}
#[derive(Debug, Clone)]
pub enum ConsumerStartSequence {
First,
Latest,
Sequence(u64),
Time(chrono::DateTime<chrono::Utc>),
}
pub struct Message<T> {
pub payload: T,
pub jetstream_message: jetstream::Message,
pub delivery_count: u64,
pub subject: String,
}
impl<T> Message<T> {
pub async fn ack(&self) -> Result<()> {
self.jetstream_message
.ack()
.await
.map_err(|e| anyhow::anyhow!("Failed to ack message: {}", e))
}
pub async fn nack(&self) -> Result<()> {
self.jetstream_message
.ack_with(jetstream::AckKind::Nak(None))
.await
.map_err(|e| anyhow::anyhow!("Failed to nack message: {}", e))
}
pub async fn term(&self) -> Result<()> {
self.jetstream_message
.ack_with(jetstream::AckKind::Term)
.await
.map_err(|e| anyhow::anyhow!("Failed to terminate message: {}", e))
}
pub fn is_redelivery(&self) -> bool {
self.delivery_count > 1
}
}
pub struct WorkQueue {
consumer: PullConsumer,
batch_size: usize,
timeout: Duration,
}
impl WorkQueue {
pub fn new(consumer: PullConsumer, batch_size: usize, timeout: Duration) -> Self {
Self {
consumer,
batch_size,
timeout,
}
}
pub async fn pull_batch(&mut self) -> Result<Vec<jetstream::Message>> {
let retry_strategy = Self::create_batch_retry_strategy();
Retry::spawn(retry_strategy, || self.attempt_pull_batch())
.await
.with_context(|| "Failed to pull message batch after retries")
}
fn create_batch_retry_strategy() -> impl Iterator<Item = Duration> {
ExponentialBackoff::from_millis(100)
.max_delay(Duration::from_secs(5))
.take(3)
}
async fn attempt_pull_batch(&self) -> Result<Vec<jetstream::Message>> {
let batch = self
.consumer
.batch()
.max_messages(self.batch_size)
.expires(self.timeout)
.messages()
.await
.map_err(|e| {
error!("Failed to pull message batch: {}", e);
e
})?;
let messages: Vec<jetstream::Message> = batch
.try_collect()
.await
.map_err(|e| anyhow::anyhow!("Failed to collect messages: {}", e))?;
Self::log_batch_result(&messages);
Ok(messages)
}
fn log_batch_result(messages: &[jetstream::Message]) {
if messages.is_empty() {
debug!("No messages available in batch");
} else {
debug!("Pulled batch of {} messages", messages.len());
}
}
pub async fn pull_one(&mut self) -> Result<Option<jetstream::Message>> {
let retry_strategy = Self::create_single_retry_strategy();
Retry::spawn(retry_strategy, || self.attempt_pull_single())
.await
.with_context(|| "Failed to pull message after retries")
}
fn create_single_retry_strategy() -> impl Iterator<Item = Duration> {
ExponentialBackoff::from_millis(50)
.max_delay(Duration::from_secs(2))
.take(3)
}
async fn attempt_pull_single(&self) -> Result<Option<jetstream::Message>> {
let messages = self
.consumer
.messages()
.await
.map_err(|e| anyhow::anyhow!("Failed to get messages stream: {}", e))?;
Self::try_get_next_message(messages).await
}
async fn try_get_next_message(
mut messages: impl futures::Stream<
Item = Result<
jetstream::Message,
async_nats::error::Error<
async_nats::jetstream::consumer::pull::MessagesErrorKind,
>,
>,
> + Unpin,
) -> Result<Option<jetstream::Message>> {
match tokio::time::timeout(Duration::from_millis(100), messages.try_next()).await {
Ok(Ok(Some(message))) => {
debug!("Pulled single message: {}", message.subject);
Ok(Some(message))
}
Ok(Ok(None)) => {
debug!("No messages available");
Ok(None)
}
Ok(Err(e)) => {
error!("Failed to pull message: {}", e);
Err(anyhow::anyhow!("Message stream error: {}", e))
}
Err(_) => {
debug!("No messages available (timeout)");
Ok(None)
}
}
}
}
#[derive(Debug, Clone)]
pub struct BackoffConfig {
pub initial_delay: Duration,
pub max_delay: Duration,
pub max_retries: usize,
pub multiplier: f64,
pub jitter: f64,
}
impl Default for BackoffConfig {
fn default() -> Self {
Self {
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(30),
max_retries: 5,
multiplier: 2.0,
jitter: 0.1,
}
}
}
pub fn create_backoff_strategy(config: &BackoffConfig) -> impl Iterator<Item = Duration> {
ExponentialBackoff::from_millis(config.initial_delay.as_millis() as u64)
.max_delay(config.max_delay)
.factor(config.multiplier as u64)
.take(config.max_retries)
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_health_status() {
let status = HealthStatus {
nats_connected: true,
jetstream_available: true,
};
assert!(status.is_healthy());
let status = HealthStatus {
nats_connected: true,
jetstream_available: false,
};
assert!(!status.is_healthy());
}
#[test]
fn test_consumer_config_defaults() {
let config = ConsumerConfig::default();
assert_eq!(config.max_deliver, 3);
assert_eq!(config.ack_wait, Duration::from_secs(30));
assert_eq!(config.worker_count, 1);
}
#[test]
fn test_backoff_config() {
let config = BackoffConfig::default();
let _strategy = create_backoff_strategy(&config);
assert_eq!(config.max_retries, 5);
assert_eq!(config.multiplier, 2.0);
}
}