Skip to main content

smith_config/
nats_adapter.rs

1//! NATS adapter configuration shared across services
2
3use anyhow::{anyhow, Context, Result};
4use regex::Regex;
5use serde::{Deserialize, Serialize};
6use std::collections::HashSet;
7use std::net::IpAddr;
8use std::time::Duration;
9
10use crate::nats::duration_serde;
11
12/// Shared adapter configuration for NATS services
13#[derive(Debug, Clone, Serialize, Deserialize, Default)]
14pub struct AdapterConfig {
15    pub security: SecurityConfig,
16    pub topics: TopicConfig,
17    pub performance: PerformanceConfig,
18    pub queues: QueueConfig,
19}
20
21impl AdapterConfig {
22    /// Production-friendly defaults tuned for higher throughput and fan-out.
23    pub fn production() -> Self {
24        Self {
25            performance: PerformanceConfig {
26                max_messages_per_second: 2000,
27                target_latency_ms: 10,
28                max_message_size: 512 * 1024,
29                connection_pool_size: 8,
30                enable_compression: false,
31                batch_size: 25,
32                flush_interval: Duration::from_millis(5),
33                reconnect: ReconnectConfig {
34                    max_attempts: 0,
35                    initial_delay: Duration::from_millis(500),
36                    max_delay: Duration::from_secs(5),
37                    backoff_multiplier: 2.0,
38                },
39            },
40            queues: QueueConfig {
41                command_queue_size: 2_000,
42                event_queue_size: 10_000,
43                processing_queue_size: 5_000,
44                drain_strategy: DrainStrategy::DropOldest,
45            },
46            ..Self::default()
47        }
48    }
49
50    /// Development defaults emphasizing simplicity over performance.
51    pub fn development() -> Self {
52        Self::default()
53    }
54
55    /// Testing defaults that slow down throughput for easier assertions.
56    pub fn testing() -> Self {
57        let mut config = Self::default();
58        config.performance.max_messages_per_second = 100;
59        config.performance.batch_size = 5;
60        config.performance.flush_interval = Duration::from_millis(25);
61        config
62    }
63
64    /// Validate adapter configuration
65    pub fn validate(&self) -> Result<()> {
66        self.security
67            .validate()
68            .context("security configuration invalid")?;
69        self.topics
70            .validate()
71            .context("topic configuration invalid")?;
72        self.performance
73            .validate()
74            .context("performance configuration invalid")?;
75        self.queues
76            .validate()
77            .context("queue configuration invalid")?;
78        Ok(())
79    }
80}
81
82/// Security configuration for adapter connections
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct SecurityConfig {
85    pub require_authentication: bool,
86    pub auth_token: Option<String>,
87    pub username: Option<String>,
88    pub password: Option<String>,
89    pub jwt_token: Option<String>,
90    pub nkey_seed: Option<String>,
91    pub tls: TlsConfig,
92    pub subject_permissions: SubjectPermissions,
93    pub allowed_ips: HashSet<String>,
94    pub rate_limits: RateLimits,
95}
96
97impl Default for SecurityConfig {
98    fn default() -> Self {
99        Self {
100            require_authentication: false,
101            auth_token: None,
102            username: None,
103            password: None,
104            jwt_token: None,
105            nkey_seed: None,
106            tls: TlsConfig::default(),
107            subject_permissions: SubjectPermissions::default(),
108            allowed_ips: HashSet::new(),
109            rate_limits: RateLimits::default(),
110        }
111    }
112}
113
114/// TLS configuration that guards adapter connections.
115#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct TlsConfig {
117    pub enabled: bool,
118    pub required: bool,
119    pub ca_file: Option<String>,
120    pub cert_file: Option<String>,
121    pub key_file: Option<String>,
122    pub server_name: Option<String>,
123    pub insecure_skip_verify: bool,
124}
125
126impl Default for TlsConfig {
127    fn default() -> Self {
128        Self {
129            enabled: true,
130            required: false,
131            ca_file: None,
132            cert_file: None,
133            key_file: None,
134            server_name: None,
135            insecure_skip_verify: false,
136        }
137    }
138}
139
140/// Fine-grained publish/subscribe allow/deny lists.
141#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct SubjectPermissions {
143    pub publish_allow: HashSet<String>,
144    pub publish_deny: HashSet<String>,
145    pub subscribe_allow: HashSet<String>,
146    pub subscribe_deny: HashSet<String>,
147}
148
149impl Default for SubjectPermissions {
150    fn default() -> Self {
151        let mut publish_allow = HashSet::new();
152        let mut subscribe_allow = HashSet::new();
153        publish_allow.insert("claude-code-rs.>".to_string());
154        subscribe_allow.insert("claude-code-rs.>".to_string());
155        Self {
156            publish_allow,
157            publish_deny: HashSet::new(),
158            subscribe_allow,
159            subscribe_deny: HashSet::new(),
160        }
161    }
162}
163
164impl SubjectPermissions {
165    /// Construct permissions that open both publish and subscribe for a prefix.
166    pub fn wildcard(prefix: &str) -> Self {
167        let mut publish_allow = HashSet::new();
168        let mut subscribe_allow = HashSet::new();
169        publish_allow.insert(format!("{}>", prefix));
170        subscribe_allow.insert(format!("{}>", prefix));
171        Self {
172            publish_allow,
173            publish_deny: HashSet::new(),
174            subscribe_allow,
175            subscribe_deny: HashSet::new(),
176        }
177    }
178
179    /// Ensure at least one publish or subscribe pattern is whitelisted.
180    fn validate(&self) -> Result<()> {
181        if self.publish_allow.is_empty() && self.subscribe_allow.is_empty() {
182            return Err(anyhow!(
183                "at least one publish or subscribe allow pattern required"
184            ));
185        }
186        Ok(())
187    }
188}
189
190/// Basic throughput and payload guardrails.
191#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct RateLimits {
193    pub messages_per_second: u64,
194    pub bytes_per_second: u64,
195    pub max_subscriptions: usize,
196    pub max_payload_size: usize,
197}
198
199impl Default for RateLimits {
200    fn default() -> Self {
201        Self {
202            messages_per_second: 1_000,
203            bytes_per_second: 1024 * 1024,
204            max_subscriptions: 100,
205            max_payload_size: 1024 * 1024,
206        }
207    }
208}
209
210impl RateLimits {
211    /// Ensure rate limits are non-zero and sized above minimum thresholds.
212    fn validate(&self) -> Result<()> {
213        if self.messages_per_second == 0 {
214            return Err(anyhow!("messages_per_second must be greater than zero"));
215        }
216        if self.bytes_per_second == 0 {
217            return Err(anyhow!("bytes_per_second must be greater than zero"));
218        }
219        if self.max_subscriptions == 0 {
220            return Err(anyhow!("max_subscriptions must be greater than zero"));
221        }
222        if self.max_payload_size < 1024 {
223            return Err(anyhow!("max_payload_size must be at least 1KB"));
224        }
225        Ok(())
226    }
227}
228
229/// Topic configuration
230/// Topic namespace and pattern configuration shared by adapters.
231#[derive(Debug, Clone, Serialize, Deserialize)]
232pub struct TopicConfig {
233    pub prefix: String,
234    pub command_subject: String,
235    pub event_subject: String,
236    pub max_topic_length: usize,
237    pub allowed_patterns: Vec<String>,
238}
239
240impl Default for TopicConfig {
241    fn default() -> Self {
242        Self {
243            prefix: "claude-code-rs".to_string(),
244            command_subject: "command".to_string(),
245            event_subject: "event".to_string(),
246            max_topic_length: 256,
247            allowed_patterns: vec![r"^claude-code-rs\.(command|event)\.[a-z_]+$".to_string()],
248        }
249    }
250}
251
252/// Performance configuration
253/// Adapter runtime performance tuning knobs.
254#[derive(Debug, Clone, Serialize, Deserialize)]
255pub struct PerformanceConfig {
256    pub max_messages_per_second: u64,
257    pub target_latency_ms: u64,
258    pub max_message_size: usize,
259    pub connection_pool_size: usize,
260    pub enable_compression: bool,
261    pub batch_size: usize,
262    #[serde(with = "duration_serde")]
263    pub flush_interval: Duration,
264    pub reconnect: ReconnectConfig,
265}
266
267impl Default for PerformanceConfig {
268    fn default() -> Self {
269        Self {
270            max_messages_per_second: 1_000,
271            target_latency_ms: 20,
272            max_message_size: 1024 * 1024,
273            connection_pool_size: 4,
274            enable_compression: false,
275            batch_size: 10,
276            flush_interval: Duration::from_millis(10),
277            reconnect: ReconnectConfig::default(),
278        }
279    }
280}
281
282/// Reconnect strategy for adapter-managed NATS connections.
283#[derive(Debug, Clone, Serialize, Deserialize)]
284pub struct ReconnectConfig {
285    pub max_attempts: u32,
286    #[serde(with = "duration_serde")]
287    pub initial_delay: Duration,
288    #[serde(with = "duration_serde")]
289    pub max_delay: Duration,
290    pub backoff_multiplier: f64,
291}
292
293impl Default for ReconnectConfig {
294    fn default() -> Self {
295        Self {
296            max_attempts: 5,
297            initial_delay: Duration::from_millis(250),
298            max_delay: Duration::from_secs(2),
299            backoff_multiplier: 2.0,
300        }
301    }
302}
303
304/// Queue configuration for adapter internals
305/// Queue sizing and overflow strategy for adapter worker pools.
306#[derive(Debug, Clone, Serialize, Deserialize)]
307pub struct QueueConfig {
308    pub command_queue_size: usize,
309    pub event_queue_size: usize,
310    pub processing_queue_size: usize,
311    pub drain_strategy: DrainStrategy,
312}
313
314impl Default for QueueConfig {
315    fn default() -> Self {
316        Self {
317            command_queue_size: 1_000,
318            event_queue_size: 5_000,
319            processing_queue_size: 2_000,
320            drain_strategy: DrainStrategy::DropOldest,
321        }
322    }
323}
324
325impl QueueConfig {
326    /// Ensure in-memory queue sizes are non-zero.
327    fn validate(&self) -> Result<()> {
328        if self.command_queue_size == 0 {
329            return Err(anyhow!("command_queue_size must be greater than zero"));
330        }
331        if self.event_queue_size == 0 {
332            return Err(anyhow!("event_queue_size must be greater than zero"));
333        }
334        if self.processing_queue_size == 0 {
335            return Err(anyhow!("processing_queue_size must be greater than zero"));
336        }
337        Ok(())
338    }
339}
340
341/// Determines how in-memory queues behave once capacity is hit.
342#[derive(Debug, Clone, Serialize, Deserialize, Default)]
343#[serde(rename_all = "snake_case")]
344pub enum DrainStrategy {
345    #[default]
346    DropOldest,
347    DropNewest,
348    Block,
349    Error,
350}
351
352impl SecurityConfig {
353    /// Verify authentication, TLS, and IP restrictions are coherent.
354    pub fn validate(&self) -> Result<()> {
355        if self.require_authentication
356            && self.auth_token.is_none()
357            && self.username.is_none()
358            && self.jwt_token.is_none()
359            && self.nkey_seed.is_none()
360        {
361            return Err(anyhow!(
362                "authentication required but no credentials were provided"
363            ));
364        }
365
366        if self.username.is_some() && self.password.is_none() {
367            return Err(anyhow!(
368                "username supplied but password missing for basic authentication"
369            ));
370        }
371
372        if self.tls.required && !self.tls.enabled {
373            return Err(anyhow!("TLS is marked as required but disabled"));
374        }
375
376        self.subject_permissions
377            .validate()
378            .context("subject permissions invalid")?;
379
380        for ip in &self.allowed_ips {
381            ip.parse::<IpAddr>()
382                .with_context(|| format!("invalid allowed IP address: {ip}"))?;
383        }
384
385        self.rate_limits.validate().context("rate limits invalid")?;
386
387        Ok(())
388    }
389}
390
391impl TopicConfig {
392    /// Validate prefix, pattern, and length constraints are sane.
393    pub fn validate(&self) -> Result<()> {
394        if self.prefix.trim().is_empty() {
395            return Err(anyhow!("topic prefix cannot be empty"));
396        }
397
398        if self.max_topic_length < 10 {
399            return Err(anyhow!("max_topic_length must be at least 10 characters"));
400        }
401
402        if self.allowed_patterns.is_empty() {
403            return Err(anyhow!("allowed_patterns must contain at least one entry"));
404        }
405
406        for pattern in &self.allowed_patterns {
407            Regex::new(pattern)
408                .with_context(|| format!("invalid topic pattern regex: {pattern}"))?;
409        }
410
411        Ok(())
412    }
413
414    /// Check if a topic matches the configured allow-list of regex patterns.
415    pub fn is_topic_allowed(&self, topic: &str) -> bool {
416        if topic.len() > self.max_topic_length {
417            return false;
418        }
419
420        self.allowed_patterns.iter().any(|pattern| {
421            Regex::new(pattern)
422                .map(|regex| regex.is_match(topic))
423                .unwrap_or(false)
424        })
425    }
426}
427
428impl PerformanceConfig {
429    /// Validate batching and connection pool limits for adapter throughput.
430    pub fn validate(&self) -> Result<()> {
431        if self.max_messages_per_second == 0 {
432            return Err(anyhow!("max_messages_per_second must be greater than zero"));
433        }
434        if self.connection_pool_size == 0 {
435            return Err(anyhow!("connection_pool_size must be greater than zero"));
436        }
437        if self.batch_size == 0 {
438            return Err(anyhow!("batch_size must be greater than zero"));
439        }
440        if self.flush_interval.is_zero() {
441            return Err(anyhow!("flush_interval must be greater than zero"));
442        }
443
444        self.reconnect
445            .validate()
446            .context("reconnect configuration invalid")?;
447
448        Ok(())
449    }
450}
451
452impl ReconnectConfig {
453    /// Ensure reconnect jitter/backoff parameters are internally consistent.
454    pub fn validate(&self) -> Result<()> {
455        if self.backoff_multiplier < 1.0 {
456            return Err(anyhow!("backoff_multiplier must be at least 1.0"));
457        }
458        if self.max_delay < self.initial_delay {
459            return Err(anyhow!(
460                "max_delay must be greater than or equal to initial_delay"
461            ));
462        }
463        Ok(())
464    }
465}
466
467#[cfg(test)]
468mod tests {
469    use super::*;
470
471    #[test]
472    fn default_config_validates() {
473        let config = AdapterConfig::default();
474        assert!(config.validate().is_ok());
475    }
476
477    #[test]
478    fn invalid_topic_prefix_fails() {
479        let mut config = AdapterConfig::default();
480        config.topics.prefix = String::new();
481        assert!(config.validate().is_err());
482    }
483
484    #[test]
485    fn requires_credentials_when_auth_enabled() {
486        let mut config = AdapterConfig::default();
487        config.security.require_authentication = true;
488        config.security.auth_token = None;
489        assert!(config.validate().is_err());
490
491        config.security.auth_token = Some("token".into());
492        assert!(config.validate().is_ok());
493    }
494
495    #[test]
496    fn queue_sizes_must_be_positive() {
497        let mut config = AdapterConfig::default();
498        config.queues.command_queue_size = 0;
499        assert!(config.validate().is_err());
500    }
501}