use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::Duration;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NatsConfig {
pub url: String,
pub cluster_urls: Vec<String>,
pub jetstream_domain: String,
#[serde(with = "duration_serde")]
pub connection_timeout: Duration,
#[serde(with = "duration_serde")]
pub request_timeout: Duration,
pub tls: Option<TlsConfig>,
pub auth: Option<AuthConfig>,
pub performance: NatsPerformanceConfig,
pub streams: HashMap<String, StreamConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TlsConfig {
pub cert_file: Option<PathBuf>,
pub key_file: Option<PathBuf>,
pub ca_file: Option<PathBuf>,
pub insecure: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuthConfig {
pub username: Option<String>,
pub password: Option<String>,
pub jwt: Option<String>,
pub nkey_seed: Option<PathBuf>,
pub credentials_file: Option<PathBuf>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NatsPerformanceConfig {
pub max_messages_per_second: u64,
pub target_latency_ms: u64,
pub max_message_size: usize,
pub connection_pool_size: usize,
pub enable_compression: bool,
pub batch_size: usize,
#[serde(with = "duration_serde")]
pub flush_interval: Duration,
pub reconnect: ReconnectConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReconnectConfig {
pub max_attempts: u32,
#[serde(with = "duration_serde")]
pub initial_delay: Duration,
#[serde(with = "duration_serde")]
pub max_delay: Duration,
pub backoff_multiplier: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamConfig {
pub name: String,
pub subjects: Vec<String>,
pub max_age: String,
pub max_bytes: String,
pub max_messages: Option<i64>,
pub storage: String,
pub retention: String,
pub replicas: u32,
pub consumers: HashMap<String, ConsumerConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsumerConfig {
pub name: String,
pub deliver_subject: Option<String>,
pub deliver_policy: String,
pub ack_policy: String,
pub ack_wait: String,
pub max_deliver: u32,
pub filter_subject: Option<String>,
pub replay_policy: String,
}
impl Default for NatsConfig {
fn default() -> Self {
let mut streams = HashMap::new();
streams.insert(
"intents".to_string(),
StreamConfig {
name: "INTENTS".to_string(),
subjects: vec!["smith.intents.>".to_string()],
max_age: "10m".to_string(),
max_bytes: "1GB".to_string(),
max_messages: None,
storage: "file".to_string(),
retention: "limits".to_string(),
replicas: 1,
consumers: {
let mut consumers = HashMap::new();
consumers.insert(
"executor".to_string(),
ConsumerConfig {
name: "executor".to_string(),
deliver_subject: None, deliver_policy: "new".to_string(),
ack_policy: "explicit".to_string(),
ack_wait: "30s".to_string(),
max_deliver: 3,
filter_subject: None,
replay_policy: "instant".to_string(),
},
);
consumers
},
},
);
streams.insert(
"results".to_string(),
StreamConfig {
name: "RESULTS".to_string(),
subjects: vec!["smith.results.>".to_string()],
max_age: "5m".to_string(),
max_bytes: "512MB".to_string(),
max_messages: None,
storage: "file".to_string(),
retention: "limits".to_string(),
replicas: 1,
consumers: {
let mut consumers = HashMap::new();
consumers.insert(
"http".to_string(),
ConsumerConfig {
name: "http".to_string(),
deliver_subject: None,
deliver_policy: "new".to_string(),
ack_policy: "explicit".to_string(),
ack_wait: "10s".to_string(),
max_deliver: 2,
filter_subject: None,
replay_policy: "instant".to_string(),
},
);
consumers
},
},
);
Self {
url: "nats://127.0.0.1:4222".to_string(),
cluster_urls: vec![],
jetstream_domain: "JS".to_string(),
connection_timeout: Duration::from_secs(5),
request_timeout: Duration::from_millis(100),
tls: None,
auth: None,
performance: NatsPerformanceConfig::default(),
streams,
}
}
}
impl Default for NatsPerformanceConfig {
fn default() -> Self {
Self {
max_messages_per_second: 1000,
target_latency_ms: 20,
max_message_size: 1024 * 1024, connection_pool_size: 4,
enable_compression: false, batch_size: 10,
flush_interval: Duration::from_millis(10),
reconnect: ReconnectConfig::default(),
}
}
}
impl Default for ReconnectConfig {
fn default() -> Self {
Self {
max_attempts: 10,
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(10),
backoff_multiplier: 2.0,
}
}
}
impl NatsConfig {
pub fn validate(&self) -> Result<()> {
if self.url.is_empty() {
return Err(anyhow::anyhow!("NATS URL cannot be empty"));
}
url::Url::parse(&self.url)
.map_err(|e| anyhow::anyhow!("Invalid NATS URL '{}': {}", self.url, e))?;
for url in &self.cluster_urls {
url::Url::parse(url)
.map_err(|e| anyhow::anyhow!("Invalid cluster URL '{}': {}", url, e))?;
}
if self.jetstream_domain.is_empty() {
return Err(anyhow::anyhow!("JetStream domain cannot be empty"));
}
if self.connection_timeout.as_millis() == 0 {
return Err(anyhow::anyhow!("Connection timeout must be > 0"));
}
if self.request_timeout.as_millis() == 0 {
return Err(anyhow::anyhow!("Request timeout must be > 0"));
}
if let Some(ref tls) = self.tls {
tls.validate()?;
}
if let Some(ref auth) = self.auth {
auth.validate()?;
}
self.performance.validate()?;
for (name, stream) in &self.streams {
stream
.validate()
.map_err(|e| anyhow::anyhow!("Stream '{}' validation failed: {}", name, e))?;
}
Ok(())
}
pub fn development() -> Self {
Self {
url: "nats://127.0.0.1:4222".to_string(),
performance: NatsPerformanceConfig {
target_latency_ms: 50, ..Default::default()
},
..Default::default()
}
}
pub fn production() -> Self {
Self {
url: "nats://nats-cluster:4222".to_string(),
cluster_urls: vec![
"nats://nats-1:4222".to_string(),
"nats://nats-2:4222".to_string(),
"nats://nats-3:4222".to_string(),
],
connection_timeout: Duration::from_secs(10),
request_timeout: Duration::from_millis(50),
performance: NatsPerformanceConfig {
max_messages_per_second: 2000,
target_latency_ms: 10,
connection_pool_size: 8,
..Default::default()
},
streams: {
let mut streams = HashMap::new();
streams.insert(
"intents".to_string(),
StreamConfig {
name: "INTENTS".to_string(),
subjects: vec!["smith.intents.>".to_string()],
max_age: "10m".to_string(),
max_bytes: "5GB".to_string(),
max_messages: None,
storage: "file".to_string(),
retention: "limits".to_string(),
replicas: 3,
consumers: HashMap::new(),
},
);
streams.insert(
"results".to_string(),
StreamConfig {
name: "RESULTS".to_string(),
subjects: vec!["smith.results.>".to_string()],
max_age: "5m".to_string(),
max_bytes: "2GB".to_string(),
max_messages: None,
storage: "file".to_string(),
retention: "limits".to_string(),
replicas: 3,
consumers: HashMap::new(),
},
);
streams
},
..Default::default()
}
}
pub fn testing() -> Self {
Self {
url: "nats://127.0.0.1:4222".to_string(),
request_timeout: Duration::from_millis(500), performance: NatsPerformanceConfig {
max_messages_per_second: 100, batch_size: 5, ..Default::default()
},
streams: HashMap::new(), ..Default::default()
}
}
}
impl TlsConfig {
pub fn validate(&self) -> Result<()> {
if let Some(ref cert_file) = self.cert_file {
if !cert_file.exists() {
return Err(anyhow::anyhow!(
"TLS cert file does not exist: {}",
cert_file.display()
));
}
}
if let Some(ref key_file) = self.key_file {
if !key_file.exists() {
return Err(anyhow::anyhow!(
"TLS key file does not exist: {}",
key_file.display()
));
}
}
if let Some(ref ca_file) = self.ca_file {
if !ca_file.exists() {
return Err(anyhow::anyhow!(
"TLS CA file does not exist: {}",
ca_file.display()
));
}
}
Ok(())
}
}
impl AuthConfig {
pub fn validate(&self) -> Result<()> {
let auth_methods = [
self.username.is_some() && self.password.is_some(),
self.jwt.is_some(),
self.nkey_seed.is_some(),
self.credentials_file.is_some(),
];
let auth_count = auth_methods.iter().filter(|&&x| x).count();
if auth_count > 1 {
return Err(anyhow::anyhow!(
"Multiple authentication methods configured. Use only one."
));
}
if let Some(ref nkey_file) = self.nkey_seed {
if !nkey_file.exists() {
return Err(anyhow::anyhow!(
"NKey seed file does not exist: {}",
nkey_file.display()
));
}
}
if let Some(ref creds_file) = self.credentials_file {
if !creds_file.exists() {
return Err(anyhow::anyhow!(
"Credentials file does not exist: {}",
creds_file.display()
));
}
}
Ok(())
}
}
impl NatsPerformanceConfig {
pub fn validate(&self) -> Result<()> {
if self.max_messages_per_second == 0 {
return Err(anyhow::anyhow!("Max messages per second must be > 0"));
}
if self.target_latency_ms > 1000 {
tracing::warn!("Target latency > 1000ms may impact system performance");
}
if self.max_message_size < 1024 {
return Err(anyhow::anyhow!("Max message size must be >= 1KB"));
}
if self.connection_pool_size == 0 {
return Err(anyhow::anyhow!("Connection pool size must be > 0"));
}
if self.batch_size == 0 {
return Err(anyhow::anyhow!("Batch size must be > 0"));
}
self.reconnect.validate()?;
Ok(())
}
}
impl ReconnectConfig {
pub fn validate(&self) -> Result<()> {
if self.initial_delay.as_millis() == 0 {
return Err(anyhow::anyhow!("Initial delay must be > 0"));
}
if self.max_delay < self.initial_delay {
return Err(anyhow::anyhow!("Max delay must be >= initial delay"));
}
if self.backoff_multiplier <= 1.0 {
return Err(anyhow::anyhow!("Backoff multiplier must be > 1.0"));
}
Ok(())
}
}
impl StreamConfig {
pub fn validate(&self) -> Result<()> {
if self.name.is_empty() {
return Err(anyhow::anyhow!("Stream name cannot be empty"));
}
if self.subjects.is_empty() {
return Err(anyhow::anyhow!("Stream must have at least one subject"));
}
if !["file", "memory"].contains(&self.storage.as_str()) {
return Err(anyhow::anyhow!(
"Invalid storage type: {}. Must be 'file' or 'memory'",
self.storage
));
}
if !["limits", "interest", "workqueue"].contains(&self.retention.as_str()) {
return Err(anyhow::anyhow!(
"Invalid retention policy: {}. Must be 'limits', 'interest', or 'workqueue'",
self.retention
));
}
if self.replicas == 0 {
return Err(anyhow::anyhow!("Stream replicas must be > 0"));
}
for (name, consumer) in &self.consumers {
consumer
.validate()
.map_err(|e| anyhow::anyhow!("Consumer '{}' validation failed: {}", name, e))?;
}
Ok(())
}
}
impl ConsumerConfig {
pub fn validate(&self) -> Result<()> {
if self.name.is_empty() {
return Err(anyhow::anyhow!("Consumer name cannot be empty"));
}
let valid_policies = ["all", "last", "new", "by_start_sequence", "by_start_time"];
if !valid_policies.contains(&self.deliver_policy.as_str()) {
return Err(anyhow::anyhow!(
"Invalid deliver policy: {}. Must be one of: {}",
self.deliver_policy,
valid_policies.join(", ")
));
}
if !["none", "all", "explicit"].contains(&self.ack_policy.as_str()) {
return Err(anyhow::anyhow!(
"Invalid ack policy: {}. Must be 'none', 'all', or 'explicit'",
self.ack_policy
));
}
if !["instant", "original"].contains(&self.replay_policy.as_str()) {
return Err(anyhow::anyhow!(
"Invalid replay policy: {}. Must be 'instant' or 'original'",
self.replay_policy
));
}
if self.max_deliver == 0 {
return Err(anyhow::anyhow!("Max deliver must be > 0"));
}
Ok(())
}
}
pub(crate) mod duration_serde {
use serde::{Deserialize, Deserializer, Serializer};
use std::time::Duration;
pub fn serialize<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_u64(duration.as_millis() as u64)
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Duration, D::Error>
where
D: Deserializer<'de>,
{
let millis = u64::deserialize(deserializer)?;
Ok(Duration::from_millis(millis))
}
}