Skip to main content

smith_config/
nats.rs

1//! NATS and JetStream configuration
2
3use anyhow::Result;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::path::PathBuf;
7use std::time::Duration;
8
9/// NATS connection and JetStream configuration
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct NatsConfig {
12    /// NATS server URL (nats://, tls://, or ws://)
13    pub url: String,
14
15    /// Additional NATS server URLs for clustering
16    pub cluster_urls: Vec<String>,
17
18    /// JetStream domain
19    pub jetstream_domain: String,
20
21    /// Connection timeout
22    #[serde(with = "duration_serde")]
23    pub connection_timeout: Duration,
24
25    /// Request timeout for pub/sub operations
26    #[serde(with = "duration_serde")]
27    pub request_timeout: Duration,
28
29    /// TLS configuration
30    pub tls: Option<TlsConfig>,
31
32    /// Authentication configuration
33    pub auth: Option<AuthConfig>,
34
35    /// Performance tuning
36    pub performance: NatsPerformanceConfig,
37
38    /// Stream and consumer configuration templates
39    pub streams: HashMap<String, StreamConfig>,
40}
41
42/// TLS configuration for NATS connection
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct TlsConfig {
45    /// Client certificate file path
46    pub cert_file: Option<PathBuf>,
47
48    /// Client private key file path
49    pub key_file: Option<PathBuf>,
50
51    /// CA certificate file path
52    pub ca_file: Option<PathBuf>,
53
54    /// Skip certificate verification (dangerous, only for development)
55    pub insecure: bool,
56}
57
58/// Authentication configuration
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct AuthConfig {
61    /// Username for basic auth
62    pub username: Option<String>,
63
64    /// Password for basic auth
65    pub password: Option<String>,
66
67    /// JWT token for JWT auth
68    pub jwt: Option<String>,
69
70    /// Seed file path for NKey auth
71    pub nkey_seed: Option<PathBuf>,
72
73    /// Credentials file path
74    pub credentials_file: Option<PathBuf>,
75}
76
77/// NATS performance configuration
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct NatsPerformanceConfig {
80    /// Maximum messages per second rate limit
81    pub max_messages_per_second: u64,
82
83    /// Target round-trip latency in milliseconds
84    pub target_latency_ms: u64,
85
86    /// Maximum message size in bytes
87    pub max_message_size: usize,
88
89    /// Connection pool size
90    pub connection_pool_size: usize,
91
92    /// Enable message compression
93    pub enable_compression: bool,
94
95    /// Batch size for bulk operations
96    pub batch_size: usize,
97
98    /// Flush interval for batched messages
99    #[serde(with = "duration_serde")]
100    pub flush_interval: Duration,
101
102    /// Reconnection configuration
103    pub reconnect: ReconnectConfig,
104}
105
106/// Reconnection configuration
107#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct ReconnectConfig {
109    /// Maximum reconnection attempts (0 = unlimited)
110    pub max_attempts: u32,
111
112    /// Initial reconnection delay
113    #[serde(with = "duration_serde")]
114    pub initial_delay: Duration,
115
116    /// Maximum reconnection delay
117    #[serde(with = "duration_serde")]
118    pub max_delay: Duration,
119
120    /// Backoff multiplier (exponential backoff)
121    pub backoff_multiplier: f64,
122}
123
124/// JetStream stream configuration template
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct StreamConfig {
127    /// Stream name
128    pub name: String,
129
130    /// Stream subjects pattern
131    pub subjects: Vec<String>,
132
133    /// Maximum age for messages
134    pub max_age: String,
135
136    /// Maximum bytes for stream
137    pub max_bytes: String,
138
139    /// Maximum messages in stream
140    pub max_messages: Option<i64>,
141
142    /// Storage type (file or memory)
143    pub storage: String,
144
145    /// Retention policy
146    pub retention: String,
147
148    /// Number of replicas
149    pub replicas: u32,
150
151    /// Consumer configuration
152    pub consumers: HashMap<String, ConsumerConfig>,
153}
154
155/// JetStream consumer configuration
156#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct ConsumerConfig {
158    /// Consumer name
159    pub name: String,
160
161    /// Delivery subject (for push consumers)
162    pub deliver_subject: Option<String>,
163
164    /// Delivery policy (all, last, new, by_start_sequence, by_start_time)
165    pub deliver_policy: String,
166
167    /// Ack policy (none, all, explicit)
168    pub ack_policy: String,
169
170    /// Ack wait timeout
171    pub ack_wait: String,
172
173    /// Maximum delivery attempts
174    pub max_deliver: u32,
175
176    /// Filter subject for consuming subset of stream
177    pub filter_subject: Option<String>,
178
179    /// Replay policy (instant, original)
180    pub replay_policy: String,
181}
182
183impl Default for NatsConfig {
184    fn default() -> Self {
185        let mut streams = HashMap::new();
186
187        // Default intent stream configuration
188        streams.insert(
189            "intents".to_string(),
190            StreamConfig {
191                name: "INTENTS".to_string(),
192                subjects: vec!["smith.intents.>".to_string()],
193                max_age: "10m".to_string(),
194                max_bytes: "1GB".to_string(),
195                max_messages: None,
196                storage: "file".to_string(),
197                retention: "limits".to_string(),
198                replicas: 1,
199                consumers: {
200                    let mut consumers = HashMap::new();
201                    consumers.insert(
202                        "executor".to_string(),
203                        ConsumerConfig {
204                            name: "executor".to_string(),
205                            deliver_subject: None, // Pull consumer
206                            deliver_policy: "new".to_string(),
207                            ack_policy: "explicit".to_string(),
208                            ack_wait: "30s".to_string(),
209                            max_deliver: 3,
210                            filter_subject: None,
211                            replay_policy: "instant".to_string(),
212                        },
213                    );
214                    consumers
215                },
216            },
217        );
218
219        // Default results stream configuration
220        streams.insert(
221            "results".to_string(),
222            StreamConfig {
223                name: "RESULTS".to_string(),
224                subjects: vec!["smith.results.>".to_string()],
225                max_age: "5m".to_string(),
226                max_bytes: "512MB".to_string(),
227                max_messages: None,
228                storage: "file".to_string(),
229                retention: "limits".to_string(),
230                replicas: 1,
231                consumers: {
232                    let mut consumers = HashMap::new();
233                    consumers.insert(
234                        "http".to_string(),
235                        ConsumerConfig {
236                            name: "http".to_string(),
237                            deliver_subject: None,
238                            deliver_policy: "new".to_string(),
239                            ack_policy: "explicit".to_string(),
240                            ack_wait: "10s".to_string(),
241                            max_deliver: 2,
242                            filter_subject: None,
243                            replay_policy: "instant".to_string(),
244                        },
245                    );
246                    consumers
247                },
248            },
249        );
250
251        Self {
252            url: "nats://127.0.0.1:4222".to_string(),
253            cluster_urls: vec![],
254            jetstream_domain: "JS".to_string(),
255            connection_timeout: Duration::from_secs(5),
256            request_timeout: Duration::from_millis(100),
257            tls: None,
258            auth: None,
259            performance: NatsPerformanceConfig::default(),
260            streams,
261        }
262    }
263}
264
265impl Default for NatsPerformanceConfig {
266    fn default() -> Self {
267        Self {
268            max_messages_per_second: 1000,
269            target_latency_ms: 20,
270            max_message_size: 1024 * 1024, // 1MB
271            connection_pool_size: 4,
272            enable_compression: false, // Latency over bandwidth
273            batch_size: 10,
274            flush_interval: Duration::from_millis(10),
275            reconnect: ReconnectConfig::default(),
276        }
277    }
278}
279
280impl Default for ReconnectConfig {
281    fn default() -> Self {
282        Self {
283            max_attempts: 10,
284            initial_delay: Duration::from_millis(100),
285            max_delay: Duration::from_secs(10),
286            backoff_multiplier: 2.0,
287        }
288    }
289}
290
291impl NatsConfig {
292    /// Validate the top-level NATS connection, TLS, and stream settings.
293    pub fn validate(&self) -> Result<()> {
294        // Validate URL
295        if self.url.is_empty() {
296            return Err(anyhow::anyhow!("NATS URL cannot be empty"));
297        }
298
299        url::Url::parse(&self.url)
300            .map_err(|e| anyhow::anyhow!("Invalid NATS URL '{}': {}", self.url, e))?;
301
302        // Validate cluster URLs
303        for url in &self.cluster_urls {
304            url::Url::parse(url)
305                .map_err(|e| anyhow::anyhow!("Invalid cluster URL '{}': {}", url, e))?;
306        }
307
308        // Validate JetStream domain
309        if self.jetstream_domain.is_empty() {
310            return Err(anyhow::anyhow!("JetStream domain cannot be empty"));
311        }
312
313        // Validate timeouts
314        if self.connection_timeout.as_millis() == 0 {
315            return Err(anyhow::anyhow!("Connection timeout must be > 0"));
316        }
317
318        if self.request_timeout.as_millis() == 0 {
319            return Err(anyhow::anyhow!("Request timeout must be > 0"));
320        }
321
322        // Validate TLS config if present
323        if let Some(ref tls) = self.tls {
324            tls.validate()?;
325        }
326
327        // Validate auth config if present
328        if let Some(ref auth) = self.auth {
329            auth.validate()?;
330        }
331
332        // Validate performance config
333        self.performance.validate()?;
334
335        // Validate stream configurations
336        for (name, stream) in &self.streams {
337            stream
338                .validate()
339                .map_err(|e| anyhow::anyhow!("Stream '{}' validation failed: {}", name, e))?;
340        }
341
342        Ok(())
343    }
344
345    /// Development profile optimized for localhost experimentation.
346    pub fn development() -> Self {
347        Self {
348            url: "nats://127.0.0.1:4222".to_string(),
349            performance: NatsPerformanceConfig {
350                target_latency_ms: 50, // Relaxed for development
351                ..Default::default()
352            },
353            ..Default::default()
354        }
355    }
356
357    /// Production profile with clustering and higher throughput defaults.
358    pub fn production() -> Self {
359        Self {
360            url: "nats://nats-cluster:4222".to_string(),
361            cluster_urls: vec![
362                "nats://nats-1:4222".to_string(),
363                "nats://nats-2:4222".to_string(),
364                "nats://nats-3:4222".to_string(),
365            ],
366            connection_timeout: Duration::from_secs(10),
367            request_timeout: Duration::from_millis(50),
368            performance: NatsPerformanceConfig {
369                max_messages_per_second: 2000,
370                target_latency_ms: 10,
371                connection_pool_size: 8,
372                ..Default::default()
373            },
374            streams: {
375                let mut streams = HashMap::new();
376
377                // Production intent stream with replication
378                streams.insert(
379                    "intents".to_string(),
380                    StreamConfig {
381                        name: "INTENTS".to_string(),
382                        subjects: vec!["smith.intents.>".to_string()],
383                        max_age: "10m".to_string(),
384                        max_bytes: "5GB".to_string(),
385                        max_messages: None,
386                        storage: "file".to_string(),
387                        retention: "limits".to_string(),
388                        replicas: 3,
389                        consumers: HashMap::new(),
390                    },
391                );
392
393                streams.insert(
394                    "results".to_string(),
395                    StreamConfig {
396                        name: "RESULTS".to_string(),
397                        subjects: vec!["smith.results.>".to_string()],
398                        max_age: "5m".to_string(),
399                        max_bytes: "2GB".to_string(),
400                        max_messages: None,
401                        storage: "file".to_string(),
402                        retention: "limits".to_string(),
403                        replicas: 3,
404                        consumers: HashMap::new(),
405                    },
406                );
407
408                streams
409            },
410            ..Default::default()
411        }
412    }
413
414    /// Lightweight profile used in CI where infrastructure is mocked.
415    pub fn testing() -> Self {
416        Self {
417            url: "nats://127.0.0.1:4222".to_string(),
418            request_timeout: Duration::from_millis(500), // Generous for CI
419            performance: NatsPerformanceConfig {
420                max_messages_per_second: 100, // Limited for tests
421                batch_size: 5,                // Smaller batches
422                ..Default::default()
423            },
424            streams: HashMap::new(), // No default streams for tests
425            ..Default::default()
426        }
427    }
428}
429
430impl TlsConfig {
431    /// Ensure referenced TLS artifacts exist on disk.
432    pub fn validate(&self) -> Result<()> {
433        if let Some(ref cert_file) = self.cert_file {
434            if !cert_file.exists() {
435                return Err(anyhow::anyhow!(
436                    "TLS cert file does not exist: {}",
437                    cert_file.display()
438                ));
439            }
440        }
441
442        if let Some(ref key_file) = self.key_file {
443            if !key_file.exists() {
444                return Err(anyhow::anyhow!(
445                    "TLS key file does not exist: {}",
446                    key_file.display()
447                ));
448            }
449        }
450
451        if let Some(ref ca_file) = self.ca_file {
452            if !ca_file.exists() {
453                return Err(anyhow::anyhow!(
454                    "TLS CA file does not exist: {}",
455                    ca_file.display()
456                ));
457            }
458        }
459
460        Ok(())
461    }
462}
463
464impl AuthConfig {
465    /// Ensure exactly one authentication mechanism is configured correctly.
466    pub fn validate(&self) -> Result<()> {
467        // At most one auth method should be configured
468        let auth_methods = [
469            self.username.is_some() && self.password.is_some(),
470            self.jwt.is_some(),
471            self.nkey_seed.is_some(),
472            self.credentials_file.is_some(),
473        ];
474
475        let auth_count = auth_methods.iter().filter(|&&x| x).count();
476        if auth_count > 1 {
477            return Err(anyhow::anyhow!(
478                "Multiple authentication methods configured. Use only one."
479            ));
480        }
481
482        if let Some(ref nkey_file) = self.nkey_seed {
483            if !nkey_file.exists() {
484                return Err(anyhow::anyhow!(
485                    "NKey seed file does not exist: {}",
486                    nkey_file.display()
487                ));
488            }
489        }
490
491        if let Some(ref creds_file) = self.credentials_file {
492            if !creds_file.exists() {
493                return Err(anyhow::anyhow!(
494                    "Credentials file does not exist: {}",
495                    creds_file.display()
496                ));
497            }
498        }
499
500        Ok(())
501    }
502}
503
504impl NatsPerformanceConfig {
505    /// Validate batching, pool sizing, and reconnect tuning before use.
506    pub fn validate(&self) -> Result<()> {
507        if self.max_messages_per_second == 0 {
508            return Err(anyhow::anyhow!("Max messages per second must be > 0"));
509        }
510
511        if self.target_latency_ms > 1000 {
512            tracing::warn!("Target latency > 1000ms may impact system performance");
513        }
514
515        if self.max_message_size < 1024 {
516            return Err(anyhow::anyhow!("Max message size must be >= 1KB"));
517        }
518
519        if self.connection_pool_size == 0 {
520            return Err(anyhow::anyhow!("Connection pool size must be > 0"));
521        }
522
523        if self.batch_size == 0 {
524            return Err(anyhow::anyhow!("Batch size must be > 0"));
525        }
526
527        self.reconnect.validate()?;
528
529        Ok(())
530    }
531}
532
533impl ReconnectConfig {
534    /// Ensure reconnect backoff parameters are positive and ordered correctly.
535    pub fn validate(&self) -> Result<()> {
536        if self.initial_delay.as_millis() == 0 {
537            return Err(anyhow::anyhow!("Initial delay must be > 0"));
538        }
539
540        if self.max_delay < self.initial_delay {
541            return Err(anyhow::anyhow!("Max delay must be >= initial delay"));
542        }
543
544        if self.backoff_multiplier <= 1.0 {
545            return Err(anyhow::anyhow!("Backoff multiplier must be > 1.0"));
546        }
547
548        Ok(())
549    }
550}
551
552impl StreamConfig {
553    /// Validate stream metadata, retention, and nested consumer templates.
554    pub fn validate(&self) -> Result<()> {
555        if self.name.is_empty() {
556            return Err(anyhow::anyhow!("Stream name cannot be empty"));
557        }
558
559        if self.subjects.is_empty() {
560            return Err(anyhow::anyhow!("Stream must have at least one subject"));
561        }
562
563        // Validate storage type
564        if !["file", "memory"].contains(&self.storage.as_str()) {
565            return Err(anyhow::anyhow!(
566                "Invalid storage type: {}. Must be 'file' or 'memory'",
567                self.storage
568            ));
569        }
570
571        // Validate retention policy
572        if !["limits", "interest", "workqueue"].contains(&self.retention.as_str()) {
573            return Err(anyhow::anyhow!(
574                "Invalid retention policy: {}. Must be 'limits', 'interest', or 'workqueue'",
575                self.retention
576            ));
577        }
578
579        if self.replicas == 0 {
580            return Err(anyhow::anyhow!("Stream replicas must be > 0"));
581        }
582
583        // Validate consumers
584        for (name, consumer) in &self.consumers {
585            consumer
586                .validate()
587                .map_err(|e| anyhow::anyhow!("Consumer '{}' validation failed: {}", name, e))?;
588        }
589
590        Ok(())
591    }
592}
593
594impl ConsumerConfig {
595    /// Validate deliver/ack policies and replay configuration for a consumer.
596    pub fn validate(&self) -> Result<()> {
597        if self.name.is_empty() {
598            return Err(anyhow::anyhow!("Consumer name cannot be empty"));
599        }
600
601        // Validate deliver policy
602        let valid_policies = ["all", "last", "new", "by_start_sequence", "by_start_time"];
603        if !valid_policies.contains(&self.deliver_policy.as_str()) {
604            return Err(anyhow::anyhow!(
605                "Invalid deliver policy: {}. Must be one of: {}",
606                self.deliver_policy,
607                valid_policies.join(", ")
608            ));
609        }
610
611        // Validate ack policy
612        if !["none", "all", "explicit"].contains(&self.ack_policy.as_str()) {
613            return Err(anyhow::anyhow!(
614                "Invalid ack policy: {}. Must be 'none', 'all', or 'explicit'",
615                self.ack_policy
616            ));
617        }
618
619        // Validate replay policy
620        if !["instant", "original"].contains(&self.replay_policy.as_str()) {
621            return Err(anyhow::anyhow!(
622                "Invalid replay policy: {}. Must be 'instant' or 'original'",
623                self.replay_policy
624            ));
625        }
626
627        if self.max_deliver == 0 {
628            return Err(anyhow::anyhow!("Max deliver must be > 0"));
629        }
630
631        Ok(())
632    }
633}
634
635/// Helper module for serializing durations as millisecond integers.
636pub(crate) mod duration_serde {
637    use serde::{Deserialize, Deserializer, Serializer};
638    use std::time::Duration;
639
640    /// Serialize a `Duration` using millisecond precision for config files.
641    pub fn serialize<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
642    where
643        S: Serializer,
644    {
645        serializer.serialize_u64(duration.as_millis() as u64)
646    }
647
648    /// Deserialize a millisecond count into a `Duration`.
649    pub fn deserialize<'de, D>(deserializer: D) -> Result<Duration, D::Error>
650    where
651        D: Deserializer<'de>,
652    {
653        let millis = u64::deserialize(deserializer)?;
654        Ok(Duration::from_millis(millis))
655    }
656}