1use anyhow::{anyhow, Context, Result};
4use regex::Regex;
5use serde::{Deserialize, Serialize};
6use std::collections::HashSet;
7use std::net::IpAddr;
8use std::time::Duration;
9
10use crate::nats::duration_serde;
11
12#[derive(Debug, Clone, Serialize, Deserialize, Default)]
14pub struct AdapterConfig {
15 pub security: SecurityConfig,
16 pub topics: TopicConfig,
17 pub performance: PerformanceConfig,
18 pub queues: QueueConfig,
19}
20
21impl AdapterConfig {
22 pub fn production() -> Self {
24 Self {
25 performance: PerformanceConfig {
26 max_messages_per_second: 2000,
27 target_latency_ms: 10,
28 max_message_size: 512 * 1024,
29 connection_pool_size: 8,
30 enable_compression: false,
31 batch_size: 25,
32 flush_interval: Duration::from_millis(5),
33 reconnect: ReconnectConfig {
34 max_attempts: 0,
35 initial_delay: Duration::from_millis(500),
36 max_delay: Duration::from_secs(5),
37 backoff_multiplier: 2.0,
38 },
39 },
40 queues: QueueConfig {
41 command_queue_size: 2_000,
42 event_queue_size: 10_000,
43 processing_queue_size: 5_000,
44 drain_strategy: DrainStrategy::DropOldest,
45 },
46 ..Self::default()
47 }
48 }
49
50 pub fn development() -> Self {
52 Self::default()
53 }
54
55 pub fn testing() -> Self {
57 let mut config = Self::default();
58 config.performance.max_messages_per_second = 100;
59 config.performance.batch_size = 5;
60 config.performance.flush_interval = Duration::from_millis(25);
61 config
62 }
63
64 pub fn validate(&self) -> Result<()> {
66 self.security
67 .validate()
68 .context("security configuration invalid")?;
69 self.topics
70 .validate()
71 .context("topic configuration invalid")?;
72 self.performance
73 .validate()
74 .context("performance configuration invalid")?;
75 self.queues
76 .validate()
77 .context("queue configuration invalid")?;
78 Ok(())
79 }
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct SecurityConfig {
85 pub require_authentication: bool,
86 pub auth_token: Option<String>,
87 pub username: Option<String>,
88 pub password: Option<String>,
89 pub jwt_token: Option<String>,
90 pub nkey_seed: Option<String>,
91 pub tls: TlsConfig,
92 pub subject_permissions: SubjectPermissions,
93 pub allowed_ips: HashSet<String>,
94 pub rate_limits: RateLimits,
95}
96
97impl Default for SecurityConfig {
98 fn default() -> Self {
99 Self {
100 require_authentication: false,
101 auth_token: None,
102 username: None,
103 password: None,
104 jwt_token: None,
105 nkey_seed: None,
106 tls: TlsConfig::default(),
107 subject_permissions: SubjectPermissions::default(),
108 allowed_ips: HashSet::new(),
109 rate_limits: RateLimits::default(),
110 }
111 }
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct TlsConfig {
117 pub enabled: bool,
118 pub required: bool,
119 pub ca_file: Option<String>,
120 pub cert_file: Option<String>,
121 pub key_file: Option<String>,
122 pub server_name: Option<String>,
123 pub insecure_skip_verify: bool,
124}
125
126impl Default for TlsConfig {
127 fn default() -> Self {
128 Self {
129 enabled: true,
130 required: false,
131 ca_file: None,
132 cert_file: None,
133 key_file: None,
134 server_name: None,
135 insecure_skip_verify: false,
136 }
137 }
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct SubjectPermissions {
143 pub publish_allow: HashSet<String>,
144 pub publish_deny: HashSet<String>,
145 pub subscribe_allow: HashSet<String>,
146 pub subscribe_deny: HashSet<String>,
147}
148
149impl Default for SubjectPermissions {
150 fn default() -> Self {
151 let mut publish_allow = HashSet::new();
152 let mut subscribe_allow = HashSet::new();
153 publish_allow.insert("claude-code-rs.>".to_string());
154 subscribe_allow.insert("claude-code-rs.>".to_string());
155 Self {
156 publish_allow,
157 publish_deny: HashSet::new(),
158 subscribe_allow,
159 subscribe_deny: HashSet::new(),
160 }
161 }
162}
163
164impl SubjectPermissions {
165 pub fn wildcard(prefix: &str) -> Self {
167 let mut publish_allow = HashSet::new();
168 let mut subscribe_allow = HashSet::new();
169 publish_allow.insert(format!("{}>", prefix));
170 subscribe_allow.insert(format!("{}>", prefix));
171 Self {
172 publish_allow,
173 publish_deny: HashSet::new(),
174 subscribe_allow,
175 subscribe_deny: HashSet::new(),
176 }
177 }
178
179 fn validate(&self) -> Result<()> {
181 if self.publish_allow.is_empty() && self.subscribe_allow.is_empty() {
182 return Err(anyhow!(
183 "at least one publish or subscribe allow pattern required"
184 ));
185 }
186 Ok(())
187 }
188}
189
190#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct RateLimits {
193 pub messages_per_second: u64,
194 pub bytes_per_second: u64,
195 pub max_subscriptions: usize,
196 pub max_payload_size: usize,
197}
198
199impl Default for RateLimits {
200 fn default() -> Self {
201 Self {
202 messages_per_second: 1_000,
203 bytes_per_second: 1024 * 1024,
204 max_subscriptions: 100,
205 max_payload_size: 1024 * 1024,
206 }
207 }
208}
209
210impl RateLimits {
211 fn validate(&self) -> Result<()> {
213 if self.messages_per_second == 0 {
214 return Err(anyhow!("messages_per_second must be greater than zero"));
215 }
216 if self.bytes_per_second == 0 {
217 return Err(anyhow!("bytes_per_second must be greater than zero"));
218 }
219 if self.max_subscriptions == 0 {
220 return Err(anyhow!("max_subscriptions must be greater than zero"));
221 }
222 if self.max_payload_size < 1024 {
223 return Err(anyhow!("max_payload_size must be at least 1KB"));
224 }
225 Ok(())
226 }
227}
228
229#[derive(Debug, Clone, Serialize, Deserialize)]
232pub struct TopicConfig {
233 pub prefix: String,
234 pub command_subject: String,
235 pub event_subject: String,
236 pub max_topic_length: usize,
237 pub allowed_patterns: Vec<String>,
238}
239
240impl Default for TopicConfig {
241 fn default() -> Self {
242 Self {
243 prefix: "claude-code-rs".to_string(),
244 command_subject: "command".to_string(),
245 event_subject: "event".to_string(),
246 max_topic_length: 256,
247 allowed_patterns: vec![r"^claude-code-rs\.(command|event)\.[a-z_]+$".to_string()],
248 }
249 }
250}
251
252#[derive(Debug, Clone, Serialize, Deserialize)]
255pub struct PerformanceConfig {
256 pub max_messages_per_second: u64,
257 pub target_latency_ms: u64,
258 pub max_message_size: usize,
259 pub connection_pool_size: usize,
260 pub enable_compression: bool,
261 pub batch_size: usize,
262 #[serde(with = "duration_serde")]
263 pub flush_interval: Duration,
264 pub reconnect: ReconnectConfig,
265}
266
267impl Default for PerformanceConfig {
268 fn default() -> Self {
269 Self {
270 max_messages_per_second: 1_000,
271 target_latency_ms: 20,
272 max_message_size: 1024 * 1024,
273 connection_pool_size: 4,
274 enable_compression: false,
275 batch_size: 10,
276 flush_interval: Duration::from_millis(10),
277 reconnect: ReconnectConfig::default(),
278 }
279 }
280}
281
282#[derive(Debug, Clone, Serialize, Deserialize)]
284pub struct ReconnectConfig {
285 pub max_attempts: u32,
286 #[serde(with = "duration_serde")]
287 pub initial_delay: Duration,
288 #[serde(with = "duration_serde")]
289 pub max_delay: Duration,
290 pub backoff_multiplier: f64,
291}
292
293impl Default for ReconnectConfig {
294 fn default() -> Self {
295 Self {
296 max_attempts: 5,
297 initial_delay: Duration::from_millis(250),
298 max_delay: Duration::from_secs(2),
299 backoff_multiplier: 2.0,
300 }
301 }
302}
303
304#[derive(Debug, Clone, Serialize, Deserialize)]
307pub struct QueueConfig {
308 pub command_queue_size: usize,
309 pub event_queue_size: usize,
310 pub processing_queue_size: usize,
311 pub drain_strategy: DrainStrategy,
312}
313
314impl Default for QueueConfig {
315 fn default() -> Self {
316 Self {
317 command_queue_size: 1_000,
318 event_queue_size: 5_000,
319 processing_queue_size: 2_000,
320 drain_strategy: DrainStrategy::DropOldest,
321 }
322 }
323}
324
325impl QueueConfig {
326 fn validate(&self) -> Result<()> {
328 if self.command_queue_size == 0 {
329 return Err(anyhow!("command_queue_size must be greater than zero"));
330 }
331 if self.event_queue_size == 0 {
332 return Err(anyhow!("event_queue_size must be greater than zero"));
333 }
334 if self.processing_queue_size == 0 {
335 return Err(anyhow!("processing_queue_size must be greater than zero"));
336 }
337 Ok(())
338 }
339}
340
341#[derive(Debug, Clone, Serialize, Deserialize, Default)]
343#[serde(rename_all = "snake_case")]
344pub enum DrainStrategy {
345 #[default]
346 DropOldest,
347 DropNewest,
348 Block,
349 Error,
350}
351
352impl SecurityConfig {
353 pub fn validate(&self) -> Result<()> {
355 if self.require_authentication
356 && self.auth_token.is_none()
357 && self.username.is_none()
358 && self.jwt_token.is_none()
359 && self.nkey_seed.is_none()
360 {
361 return Err(anyhow!(
362 "authentication required but no credentials were provided"
363 ));
364 }
365
366 if self.username.is_some() && self.password.is_none() {
367 return Err(anyhow!(
368 "username supplied but password missing for basic authentication"
369 ));
370 }
371
372 if self.tls.required && !self.tls.enabled {
373 return Err(anyhow!("TLS is marked as required but disabled"));
374 }
375
376 self.subject_permissions
377 .validate()
378 .context("subject permissions invalid")?;
379
380 for ip in &self.allowed_ips {
381 ip.parse::<IpAddr>()
382 .with_context(|| format!("invalid allowed IP address: {ip}"))?;
383 }
384
385 self.rate_limits.validate().context("rate limits invalid")?;
386
387 Ok(())
388 }
389}
390
391impl TopicConfig {
392 pub fn validate(&self) -> Result<()> {
394 if self.prefix.trim().is_empty() {
395 return Err(anyhow!("topic prefix cannot be empty"));
396 }
397
398 if self.max_topic_length < 10 {
399 return Err(anyhow!("max_topic_length must be at least 10 characters"));
400 }
401
402 if self.allowed_patterns.is_empty() {
403 return Err(anyhow!("allowed_patterns must contain at least one entry"));
404 }
405
406 for pattern in &self.allowed_patterns {
407 Regex::new(pattern)
408 .with_context(|| format!("invalid topic pattern regex: {pattern}"))?;
409 }
410
411 Ok(())
412 }
413
414 pub fn is_topic_allowed(&self, topic: &str) -> bool {
416 if topic.len() > self.max_topic_length {
417 return false;
418 }
419
420 self.allowed_patterns.iter().any(|pattern| {
421 Regex::new(pattern)
422 .map(|regex| regex.is_match(topic))
423 .unwrap_or(false)
424 })
425 }
426}
427
428impl PerformanceConfig {
429 pub fn validate(&self) -> Result<()> {
431 if self.max_messages_per_second == 0 {
432 return Err(anyhow!("max_messages_per_second must be greater than zero"));
433 }
434 if self.connection_pool_size == 0 {
435 return Err(anyhow!("connection_pool_size must be greater than zero"));
436 }
437 if self.batch_size == 0 {
438 return Err(anyhow!("batch_size must be greater than zero"));
439 }
440 if self.flush_interval.is_zero() {
441 return Err(anyhow!("flush_interval must be greater than zero"));
442 }
443
444 self.reconnect
445 .validate()
446 .context("reconnect configuration invalid")?;
447
448 Ok(())
449 }
450}
451
452impl ReconnectConfig {
453 pub fn validate(&self) -> Result<()> {
455 if self.backoff_multiplier < 1.0 {
456 return Err(anyhow!("backoff_multiplier must be at least 1.0"));
457 }
458 if self.max_delay < self.initial_delay {
459 return Err(anyhow!(
460 "max_delay must be greater than or equal to initial_delay"
461 ));
462 }
463 Ok(())
464 }
465}
466
467#[cfg(test)]
468mod tests {
469 use super::*;
470
471 #[test]
472 fn default_config_validates() {
473 let config = AdapterConfig::default();
474 assert!(config.validate().is_ok());
475 }
476
477 #[test]
478 fn invalid_topic_prefix_fails() {
479 let mut config = AdapterConfig::default();
480 config.topics.prefix = String::new();
481 assert!(config.validate().is_err());
482 }
483
484 #[test]
485 fn requires_credentials_when_auth_enabled() {
486 let mut config = AdapterConfig::default();
487 config.security.require_authentication = true;
488 config.security.auth_token = None;
489 assert!(config.validate().is_err());
490
491 config.security.auth_token = Some("token".into());
492 assert!(config.validate().is_ok());
493 }
494
495 #[test]
496 fn queue_sizes_must_be_positive() {
497 let mut config = AdapterConfig::default();
498 config.queues.command_queue_size = 0;
499 assert!(config.validate().is_err());
500 }
501}