1use anyhow::Result;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::path::PathBuf;
7use std::time::Duration;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct NatsConfig {
12 pub url: String,
14
15 pub cluster_urls: Vec<String>,
17
18 pub jetstream_domain: String,
20
21 #[serde(with = "duration_serde")]
23 pub connection_timeout: Duration,
24
25 #[serde(with = "duration_serde")]
27 pub request_timeout: Duration,
28
29 pub tls: Option<TlsConfig>,
31
32 pub auth: Option<AuthConfig>,
34
35 pub performance: NatsPerformanceConfig,
37
38 pub streams: HashMap<String, StreamConfig>,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct TlsConfig {
45 pub cert_file: Option<PathBuf>,
47
48 pub key_file: Option<PathBuf>,
50
51 pub ca_file: Option<PathBuf>,
53
54 pub insecure: bool,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct AuthConfig {
61 pub username: Option<String>,
63
64 pub password: Option<String>,
66
67 pub jwt: Option<String>,
69
70 pub nkey_seed: Option<PathBuf>,
72
73 pub credentials_file: Option<PathBuf>,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct NatsPerformanceConfig {
80 pub max_messages_per_second: u64,
82
83 pub target_latency_ms: u64,
85
86 pub max_message_size: usize,
88
89 pub connection_pool_size: usize,
91
92 pub enable_compression: bool,
94
95 pub batch_size: usize,
97
98 #[serde(with = "duration_serde")]
100 pub flush_interval: Duration,
101
102 pub reconnect: ReconnectConfig,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct ReconnectConfig {
109 pub max_attempts: u32,
111
112 #[serde(with = "duration_serde")]
114 pub initial_delay: Duration,
115
116 #[serde(with = "duration_serde")]
118 pub max_delay: Duration,
119
120 pub backoff_multiplier: f64,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct StreamConfig {
127 pub name: String,
129
130 pub subjects: Vec<String>,
132
133 pub max_age: String,
135
136 pub max_bytes: String,
138
139 pub max_messages: Option<i64>,
141
142 pub storage: String,
144
145 pub retention: String,
147
148 pub replicas: u32,
150
151 pub consumers: HashMap<String, ConsumerConfig>,
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct ConsumerConfig {
158 pub name: String,
160
161 pub deliver_subject: Option<String>,
163
164 pub deliver_policy: String,
166
167 pub ack_policy: String,
169
170 pub ack_wait: String,
172
173 pub max_deliver: u32,
175
176 pub filter_subject: Option<String>,
178
179 pub replay_policy: String,
181}
182
183impl Default for NatsConfig {
184 fn default() -> Self {
185 let mut streams = HashMap::new();
186
187 streams.insert(
189 "intents".to_string(),
190 StreamConfig {
191 name: "INTENTS".to_string(),
192 subjects: vec!["smith.intents.>".to_string()],
193 max_age: "10m".to_string(),
194 max_bytes: "1GB".to_string(),
195 max_messages: None,
196 storage: "file".to_string(),
197 retention: "limits".to_string(),
198 replicas: 1,
199 consumers: {
200 let mut consumers = HashMap::new();
201 consumers.insert(
202 "executor".to_string(),
203 ConsumerConfig {
204 name: "executor".to_string(),
205 deliver_subject: None, deliver_policy: "new".to_string(),
207 ack_policy: "explicit".to_string(),
208 ack_wait: "30s".to_string(),
209 max_deliver: 3,
210 filter_subject: None,
211 replay_policy: "instant".to_string(),
212 },
213 );
214 consumers
215 },
216 },
217 );
218
219 streams.insert(
221 "results".to_string(),
222 StreamConfig {
223 name: "RESULTS".to_string(),
224 subjects: vec!["smith.results.>".to_string()],
225 max_age: "5m".to_string(),
226 max_bytes: "512MB".to_string(),
227 max_messages: None,
228 storage: "file".to_string(),
229 retention: "limits".to_string(),
230 replicas: 1,
231 consumers: {
232 let mut consumers = HashMap::new();
233 consumers.insert(
234 "http".to_string(),
235 ConsumerConfig {
236 name: "http".to_string(),
237 deliver_subject: None,
238 deliver_policy: "new".to_string(),
239 ack_policy: "explicit".to_string(),
240 ack_wait: "10s".to_string(),
241 max_deliver: 2,
242 filter_subject: None,
243 replay_policy: "instant".to_string(),
244 },
245 );
246 consumers
247 },
248 },
249 );
250
251 Self {
252 url: "nats://127.0.0.1:4222".to_string(),
253 cluster_urls: vec![],
254 jetstream_domain: "JS".to_string(),
255 connection_timeout: Duration::from_secs(5),
256 request_timeout: Duration::from_millis(100),
257 tls: None,
258 auth: None,
259 performance: NatsPerformanceConfig::default(),
260 streams,
261 }
262 }
263}
264
265impl Default for NatsPerformanceConfig {
266 fn default() -> Self {
267 Self {
268 max_messages_per_second: 1000,
269 target_latency_ms: 20,
270 max_message_size: 1024 * 1024, connection_pool_size: 4,
272 enable_compression: false, batch_size: 10,
274 flush_interval: Duration::from_millis(10),
275 reconnect: ReconnectConfig::default(),
276 }
277 }
278}
279
280impl Default for ReconnectConfig {
281 fn default() -> Self {
282 Self {
283 max_attempts: 10,
284 initial_delay: Duration::from_millis(100),
285 max_delay: Duration::from_secs(10),
286 backoff_multiplier: 2.0,
287 }
288 }
289}
290
291impl NatsConfig {
292 pub fn validate(&self) -> Result<()> {
294 if self.url.is_empty() {
296 return Err(anyhow::anyhow!("NATS URL cannot be empty"));
297 }
298
299 url::Url::parse(&self.url)
300 .map_err(|e| anyhow::anyhow!("Invalid NATS URL '{}': {}", self.url, e))?;
301
302 for url in &self.cluster_urls {
304 url::Url::parse(url)
305 .map_err(|e| anyhow::anyhow!("Invalid cluster URL '{}': {}", url, e))?;
306 }
307
308 if self.jetstream_domain.is_empty() {
310 return Err(anyhow::anyhow!("JetStream domain cannot be empty"));
311 }
312
313 if self.connection_timeout.as_millis() == 0 {
315 return Err(anyhow::anyhow!("Connection timeout must be > 0"));
316 }
317
318 if self.request_timeout.as_millis() == 0 {
319 return Err(anyhow::anyhow!("Request timeout must be > 0"));
320 }
321
322 if let Some(ref tls) = self.tls {
324 tls.validate()?;
325 }
326
327 if let Some(ref auth) = self.auth {
329 auth.validate()?;
330 }
331
332 self.performance.validate()?;
334
335 for (name, stream) in &self.streams {
337 stream
338 .validate()
339 .map_err(|e| anyhow::anyhow!("Stream '{}' validation failed: {}", name, e))?;
340 }
341
342 Ok(())
343 }
344
345 pub fn development() -> Self {
347 Self {
348 url: "nats://127.0.0.1:4222".to_string(),
349 performance: NatsPerformanceConfig {
350 target_latency_ms: 50, ..Default::default()
352 },
353 ..Default::default()
354 }
355 }
356
357 pub fn production() -> Self {
359 Self {
360 url: "nats://nats-cluster:4222".to_string(),
361 cluster_urls: vec![
362 "nats://nats-1:4222".to_string(),
363 "nats://nats-2:4222".to_string(),
364 "nats://nats-3:4222".to_string(),
365 ],
366 connection_timeout: Duration::from_secs(10),
367 request_timeout: Duration::from_millis(50),
368 performance: NatsPerformanceConfig {
369 max_messages_per_second: 2000,
370 target_latency_ms: 10,
371 connection_pool_size: 8,
372 ..Default::default()
373 },
374 streams: {
375 let mut streams = HashMap::new();
376
377 streams.insert(
379 "intents".to_string(),
380 StreamConfig {
381 name: "INTENTS".to_string(),
382 subjects: vec!["smith.intents.>".to_string()],
383 max_age: "10m".to_string(),
384 max_bytes: "5GB".to_string(),
385 max_messages: None,
386 storage: "file".to_string(),
387 retention: "limits".to_string(),
388 replicas: 3,
389 consumers: HashMap::new(),
390 },
391 );
392
393 streams.insert(
394 "results".to_string(),
395 StreamConfig {
396 name: "RESULTS".to_string(),
397 subjects: vec!["smith.results.>".to_string()],
398 max_age: "5m".to_string(),
399 max_bytes: "2GB".to_string(),
400 max_messages: None,
401 storage: "file".to_string(),
402 retention: "limits".to_string(),
403 replicas: 3,
404 consumers: HashMap::new(),
405 },
406 );
407
408 streams
409 },
410 ..Default::default()
411 }
412 }
413
414 pub fn testing() -> Self {
416 Self {
417 url: "nats://127.0.0.1:4222".to_string(),
418 request_timeout: Duration::from_millis(500), performance: NatsPerformanceConfig {
420 max_messages_per_second: 100, batch_size: 5, ..Default::default()
423 },
424 streams: HashMap::new(), ..Default::default()
426 }
427 }
428}
429
430impl TlsConfig {
431 pub fn validate(&self) -> Result<()> {
433 if let Some(ref cert_file) = self.cert_file {
434 if !cert_file.exists() {
435 return Err(anyhow::anyhow!(
436 "TLS cert file does not exist: {}",
437 cert_file.display()
438 ));
439 }
440 }
441
442 if let Some(ref key_file) = self.key_file {
443 if !key_file.exists() {
444 return Err(anyhow::anyhow!(
445 "TLS key file does not exist: {}",
446 key_file.display()
447 ));
448 }
449 }
450
451 if let Some(ref ca_file) = self.ca_file {
452 if !ca_file.exists() {
453 return Err(anyhow::anyhow!(
454 "TLS CA file does not exist: {}",
455 ca_file.display()
456 ));
457 }
458 }
459
460 Ok(())
461 }
462}
463
464impl AuthConfig {
465 pub fn validate(&self) -> Result<()> {
467 let auth_methods = [
469 self.username.is_some() && self.password.is_some(),
470 self.jwt.is_some(),
471 self.nkey_seed.is_some(),
472 self.credentials_file.is_some(),
473 ];
474
475 let auth_count = auth_methods.iter().filter(|&&x| x).count();
476 if auth_count > 1 {
477 return Err(anyhow::anyhow!(
478 "Multiple authentication methods configured. Use only one."
479 ));
480 }
481
482 if let Some(ref nkey_file) = self.nkey_seed {
483 if !nkey_file.exists() {
484 return Err(anyhow::anyhow!(
485 "NKey seed file does not exist: {}",
486 nkey_file.display()
487 ));
488 }
489 }
490
491 if let Some(ref creds_file) = self.credentials_file {
492 if !creds_file.exists() {
493 return Err(anyhow::anyhow!(
494 "Credentials file does not exist: {}",
495 creds_file.display()
496 ));
497 }
498 }
499
500 Ok(())
501 }
502}
503
504impl NatsPerformanceConfig {
505 pub fn validate(&self) -> Result<()> {
507 if self.max_messages_per_second == 0 {
508 return Err(anyhow::anyhow!("Max messages per second must be > 0"));
509 }
510
511 if self.target_latency_ms > 1000 {
512 tracing::warn!("Target latency > 1000ms may impact system performance");
513 }
514
515 if self.max_message_size < 1024 {
516 return Err(anyhow::anyhow!("Max message size must be >= 1KB"));
517 }
518
519 if self.connection_pool_size == 0 {
520 return Err(anyhow::anyhow!("Connection pool size must be > 0"));
521 }
522
523 if self.batch_size == 0 {
524 return Err(anyhow::anyhow!("Batch size must be > 0"));
525 }
526
527 self.reconnect.validate()?;
528
529 Ok(())
530 }
531}
532
533impl ReconnectConfig {
534 pub fn validate(&self) -> Result<()> {
536 if self.initial_delay.as_millis() == 0 {
537 return Err(anyhow::anyhow!("Initial delay must be > 0"));
538 }
539
540 if self.max_delay < self.initial_delay {
541 return Err(anyhow::anyhow!("Max delay must be >= initial delay"));
542 }
543
544 if self.backoff_multiplier <= 1.0 {
545 return Err(anyhow::anyhow!("Backoff multiplier must be > 1.0"));
546 }
547
548 Ok(())
549 }
550}
551
552impl StreamConfig {
553 pub fn validate(&self) -> Result<()> {
555 if self.name.is_empty() {
556 return Err(anyhow::anyhow!("Stream name cannot be empty"));
557 }
558
559 if self.subjects.is_empty() {
560 return Err(anyhow::anyhow!("Stream must have at least one subject"));
561 }
562
563 if !["file", "memory"].contains(&self.storage.as_str()) {
565 return Err(anyhow::anyhow!(
566 "Invalid storage type: {}. Must be 'file' or 'memory'",
567 self.storage
568 ));
569 }
570
571 if !["limits", "interest", "workqueue"].contains(&self.retention.as_str()) {
573 return Err(anyhow::anyhow!(
574 "Invalid retention policy: {}. Must be 'limits', 'interest', or 'workqueue'",
575 self.retention
576 ));
577 }
578
579 if self.replicas == 0 {
580 return Err(anyhow::anyhow!("Stream replicas must be > 0"));
581 }
582
583 for (name, consumer) in &self.consumers {
585 consumer
586 .validate()
587 .map_err(|e| anyhow::anyhow!("Consumer '{}' validation failed: {}", name, e))?;
588 }
589
590 Ok(())
591 }
592}
593
594impl ConsumerConfig {
595 pub fn validate(&self) -> Result<()> {
597 if self.name.is_empty() {
598 return Err(anyhow::anyhow!("Consumer name cannot be empty"));
599 }
600
601 let valid_policies = ["all", "last", "new", "by_start_sequence", "by_start_time"];
603 if !valid_policies.contains(&self.deliver_policy.as_str()) {
604 return Err(anyhow::anyhow!(
605 "Invalid deliver policy: {}. Must be one of: {}",
606 self.deliver_policy,
607 valid_policies.join(", ")
608 ));
609 }
610
611 if !["none", "all", "explicit"].contains(&self.ack_policy.as_str()) {
613 return Err(anyhow::anyhow!(
614 "Invalid ack policy: {}. Must be 'none', 'all', or 'explicit'",
615 self.ack_policy
616 ));
617 }
618
619 if !["instant", "original"].contains(&self.replay_policy.as_str()) {
621 return Err(anyhow::anyhow!(
622 "Invalid replay policy: {}. Must be 'instant' or 'original'",
623 self.replay_policy
624 ));
625 }
626
627 if self.max_deliver == 0 {
628 return Err(anyhow::anyhow!("Max deliver must be > 0"));
629 }
630
631 Ok(())
632 }
633}
634
635pub(crate) mod duration_serde {
637 use serde::{Deserialize, Deserializer, Serializer};
638 use std::time::Duration;
639
640 pub fn serialize<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
642 where
643 S: Serializer,
644 {
645 serializer.serialize_u64(duration.as_millis() as u64)
646 }
647
648 pub fn deserialize<'de, D>(deserializer: D) -> Result<Duration, D::Error>
650 where
651 D: Deserializer<'de>,
652 {
653 let millis = u64::deserialize(deserializer)?;
654 Ok(Duration::from_millis(millis))
655 }
656}