Skip to main content

sockudo_core/
options.rs

1use crate::app::App;
2use crate::utils::{parse_bool_env, parse_env, parse_env_optional};
3use ahash::AHashMap;
4use serde::{Deserialize, Serialize};
5use std::str::FromStr;
6use tracing::{info, warn};
7use url::Url;
8
9// Custom deserializer for octal permission mode (string format only, like chmod)
10fn deserialize_octal_permission<'de, D>(deserializer: D) -> Result<u32, D::Error>
11where
12    D: serde::Deserializer<'de>,
13{
14    use serde::de::{self};
15
16    let s = String::deserialize(deserializer)?;
17
18    // Validate that the string contains only octal digits
19    if !s.chars().all(|c| c.is_digit(8)) {
20        return Err(de::Error::custom(format!(
21            "invalid octal permission mode '{}': must contain only digits 0-7",
22            s
23        )));
24    }
25
26    // Parse as octal
27    let mode = u32::from_str_radix(&s, 8)
28        .map_err(|_| de::Error::custom(format!("invalid octal permission mode: {}", s)))?;
29
30    // Validate it's within valid Unix permission range
31    if mode > 0o777 {
32        return Err(de::Error::custom(format!(
33            "permission mode '{}' exceeds maximum value 777",
34            s
35        )));
36    }
37
38    Ok(mode)
39}
40
41// Helper function to parse driver enums with fallback behavior (matches main.rs)
42fn parse_driver_enum<T: FromStr + Clone + std::fmt::Debug>(
43    driver_str: String,
44    default_driver: T,
45    driver_name: &str,
46) -> T
47where
48    <T as FromStr>::Err: std::fmt::Debug,
49{
50    match T::from_str(&driver_str.to_lowercase()) {
51        Ok(driver_enum) => driver_enum,
52        Err(e) => {
53            warn!(
54                "Failed to parse {} driver from string '{}': {:?}. Using default: {:?}.",
55                driver_name, driver_str, e, default_driver
56            );
57            default_driver
58        }
59    }
60}
61
62fn override_db_pool_settings(db_conn: &mut DatabaseConnection, prefix: &str) {
63    if let Some(min) = parse_env_optional::<u32>(&format!("{}_POOL_MIN", prefix)) {
64        db_conn.pool_min = Some(min);
65    }
66    if let Some(max) = parse_env_optional::<u32>(&format!("{}_POOL_MAX", prefix)) {
67        db_conn.pool_max = Some(max);
68    }
69}
70
71// --- Enums for Driver Types ---
72
73#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(rename_all = "lowercase")]
75pub enum AdapterDriver {
76    #[default]
77    Local,
78    Redis,
79    #[serde(rename = "redis-cluster")]
80    RedisCluster,
81    Nats,
82    Pulsar,
83    RabbitMq,
84    #[serde(rename = "google-pubsub")]
85    GooglePubSub,
86    Kafka,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
90#[serde(default)]
91pub struct DynamoDbSettings {
92    pub region: String,
93    pub table_name: String,
94    pub endpoint_url: Option<String>,
95    pub aws_access_key_id: Option<String>,
96    pub aws_secret_access_key: Option<String>,
97    pub aws_profile_name: Option<String>,
98}
99
100impl Default for DynamoDbSettings {
101    fn default() -> Self {
102        Self {
103            region: "us-east-1".to_string(),
104            table_name: "sockudo-applications".to_string(),
105            endpoint_url: None,
106            aws_access_key_id: None,
107            aws_secret_access_key: None,
108            aws_profile_name: None,
109        }
110    }
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
114#[serde(default)]
115pub struct ScyllaDbSettings {
116    pub nodes: Vec<String>,
117    pub keyspace: String,
118    pub table_name: String,
119    pub username: Option<String>,
120    pub password: Option<String>,
121    pub replication_class: String,
122    pub replication_factor: u32,
123}
124
125impl Default for ScyllaDbSettings {
126    fn default() -> Self {
127        Self {
128            nodes: vec!["127.0.0.1:9042".to_string()],
129            keyspace: "sockudo".to_string(),
130            table_name: "applications".to_string(),
131            username: None,
132            password: None,
133            replication_class: "SimpleStrategy".to_string(),
134            replication_factor: 3,
135        }
136    }
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
140#[serde(default)]
141pub struct SurrealDbSettings {
142    pub url: String,
143    pub namespace: String,
144    pub database: String,
145    pub username: String,
146    pub password: String,
147    pub table_name: String,
148    pub cache_ttl: u64,
149    pub cache_max_capacity: u64,
150}
151
152impl Default for SurrealDbSettings {
153    fn default() -> Self {
154        Self {
155            url: "ws://127.0.0.1:8000".to_string(),
156            namespace: "sockudo".to_string(),
157            database: "sockudo".to_string(),
158            username: "root".to_string(),
159            password: "root".to_string(),
160            table_name: "applications".to_string(),
161            cache_ttl: 300,
162            cache_max_capacity: 100,
163        }
164    }
165}
166
167impl FromStr for AdapterDriver {
168    type Err = String;
169    fn from_str(s: &str) -> Result<Self, Self::Err> {
170        match s.to_lowercase().as_str() {
171            "local" => Ok(AdapterDriver::Local),
172            "redis" => Ok(AdapterDriver::Redis),
173            "redis-cluster" => Ok(AdapterDriver::RedisCluster),
174            "nats" => Ok(AdapterDriver::Nats),
175            "pulsar" => Ok(AdapterDriver::Pulsar),
176            "rabbitmq" | "rabbit-mq" => Ok(AdapterDriver::RabbitMq),
177            "google-pubsub" | "gcp-pubsub" | "pubsub" => Ok(AdapterDriver::GooglePubSub),
178            "kafka" => Ok(AdapterDriver::Kafka),
179            _ => Err(format!("Unknown adapter driver: {s}")),
180        }
181    }
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
185#[serde(rename_all = "lowercase")]
186pub enum AppManagerDriver {
187    #[default]
188    Memory,
189    Mysql,
190    Dynamodb,
191    PgSql,
192    SurrealDb,
193    ScyllaDb,
194}
195impl FromStr for AppManagerDriver {
196    type Err = String;
197    fn from_str(s: &str) -> Result<Self, Self::Err> {
198        match s.to_lowercase().as_str() {
199            "memory" => Ok(AppManagerDriver::Memory),
200            "mysql" => Ok(AppManagerDriver::Mysql),
201            "dynamodb" => Ok(AppManagerDriver::Dynamodb),
202            "pgsql" | "postgres" | "postgresql" => Ok(AppManagerDriver::PgSql),
203            "surreal" | "surrealdb" => Ok(AppManagerDriver::SurrealDb),
204            "scylladb" | "scylla" => Ok(AppManagerDriver::ScyllaDb),
205            _ => Err(format!("Unknown app manager driver: {s}")),
206        }
207    }
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
211#[serde(rename_all = "lowercase")]
212pub enum CacheDriver {
213    #[default]
214    Memory,
215    Redis,
216    #[serde(rename = "redis-cluster")]
217    RedisCluster,
218    None,
219}
220
221impl FromStr for CacheDriver {
222    type Err = String;
223    fn from_str(s: &str) -> Result<Self, Self::Err> {
224        match s.to_lowercase().as_str() {
225            "memory" => Ok(CacheDriver::Memory),
226            "redis" => Ok(CacheDriver::Redis),
227            "redis-cluster" => Ok(CacheDriver::RedisCluster),
228            "none" => Ok(CacheDriver::None),
229            _ => Err(format!("Unknown cache driver: {s}")),
230        }
231    }
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
235#[serde(rename_all = "lowercase")]
236pub enum QueueDriver {
237    Memory,
238    #[default]
239    Redis,
240    #[serde(rename = "redis-cluster")]
241    RedisCluster,
242    Sqs,
243    Sns,
244    None,
245}
246
247impl FromStr for QueueDriver {
248    type Err = String;
249    fn from_str(s: &str) -> Result<Self, Self::Err> {
250        match s.to_lowercase().as_str() {
251            "memory" => Ok(QueueDriver::Memory),
252            "redis" => Ok(QueueDriver::Redis),
253            "redis-cluster" => Ok(QueueDriver::RedisCluster),
254            "sqs" => Ok(QueueDriver::Sqs),
255            "sns" => Ok(QueueDriver::Sns),
256            "none" => Ok(QueueDriver::None),
257            _ => Err(format!("Unknown queue driver: {s}")),
258        }
259    }
260}
261
262impl AsRef<str> for QueueDriver {
263    fn as_ref(&self) -> &str {
264        match self {
265            QueueDriver::Memory => "memory",
266            QueueDriver::Redis => "redis",
267            QueueDriver::RedisCluster => "redis-cluster",
268            QueueDriver::Sqs => "sqs",
269            QueueDriver::Sns => "sns",
270            QueueDriver::None => "none",
271        }
272    }
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize)]
276#[serde(default)]
277pub struct RedisClusterQueueConfig {
278    pub concurrency: u32,
279    pub prefix: Option<String>,
280    pub nodes: Vec<String>,
281    pub request_timeout_ms: u64,
282}
283
284impl Default for RedisClusterQueueConfig {
285    fn default() -> Self {
286        Self {
287            concurrency: 5,
288            prefix: Some("sockudo_queue:".to_string()),
289            nodes: vec!["redis://127.0.0.1:6379".to_string()],
290            request_timeout_ms: 5000,
291        }
292    }
293}
294
295#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
296#[serde(rename_all = "lowercase")]
297pub enum MetricsDriver {
298    #[default]
299    Prometheus,
300}
301
302#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
303#[serde(rename_all = "lowercase")]
304pub enum LogOutputFormat {
305    #[default]
306    Human,
307    Json,
308}
309
310impl FromStr for LogOutputFormat {
311    type Err = String;
312    fn from_str(s: &str) -> Result<Self, Self::Err> {
313        match s.to_lowercase().as_str() {
314            "human" => Ok(LogOutputFormat::Human),
315            "json" => Ok(LogOutputFormat::Json),
316            _ => Err(format!("Unknown log output format: {s}")),
317        }
318    }
319}
320
321impl FromStr for MetricsDriver {
322    type Err = String;
323    fn from_str(s: &str) -> Result<Self, Self::Err> {
324        match s.to_lowercase().as_str() {
325            "prometheus" => Ok(MetricsDriver::Prometheus),
326            _ => Err(format!("Unknown metrics driver: {s}")),
327        }
328    }
329}
330
331impl AsRef<str> for MetricsDriver {
332    fn as_ref(&self) -> &str {
333        match self {
334            MetricsDriver::Prometheus => "prometheus",
335        }
336    }
337}
338
339// --- Main Configuration Struct ---
340#[derive(Debug, Clone, Serialize, Deserialize)]
341#[serde(default)]
342pub struct ServerOptions {
343    pub adapter: AdapterConfig,
344    pub app_manager: AppManagerConfig,
345    pub cache: CacheConfig,
346    pub channel_limits: ChannelLimits,
347    pub cors: CorsConfig,
348    pub database: DatabaseConfig,
349    pub database_pooling: DatabasePooling,
350    pub debug: bool,
351    pub event_limits: EventLimits,
352    pub host: String,
353    pub http_api: HttpApiConfig,
354    pub instance: InstanceConfig,
355    pub logging: Option<LoggingConfig>,
356    pub metrics: MetricsConfig,
357    pub mode: String,
358    pub port: u16,
359    pub path_prefix: String,
360    pub presence: PresenceConfig,
361    pub queue: QueueConfig,
362    pub rate_limiter: RateLimiterConfig,
363    pub shutdown_grace_period: u64,
364    pub ssl: SslConfig,
365    pub user_authentication_timeout: u64,
366    pub webhooks: WebhooksConfig,
367    pub websocket_max_payload_kb: u32,
368    pub cleanup: CleanupConfig,
369    pub activity_timeout: u64,
370    pub cluster_health: ClusterHealthConfig,
371    pub unix_socket: UnixSocketConfig,
372    pub delta_compression: DeltaCompressionOptionsConfig,
373    pub tag_filtering: TagFilteringConfig,
374    pub websocket: WebSocketConfig,
375    pub connection_recovery: ConnectionRecoveryConfig,
376    pub idempotency: IdempotencyConfig,
377    pub ephemeral: EphemeralConfig,
378    pub echo_control: EchoControlConfig,
379    pub event_name_filtering: EventNameFilteringConfig,
380}
381
382// --- Configuration Sub-Structs ---
383
384#[derive(Debug, Clone, Serialize, Deserialize)]
385#[serde(default)]
386pub struct SqsQueueConfig {
387    pub region: String,
388    pub queue_url_prefix: Option<String>,
389    pub visibility_timeout: i32,
390    pub endpoint_url: Option<String>,
391    pub max_messages: i32,
392    pub wait_time_seconds: i32,
393    pub concurrency: u32,
394    pub fifo: bool,
395    pub message_group_id: Option<String>,
396}
397
398#[derive(Debug, Clone, Serialize, Deserialize)]
399#[serde(default)]
400pub struct SnsQueueConfig {
401    pub region: String,
402    pub topic_arn: String,
403    pub endpoint_url: Option<String>,
404}
405
406#[derive(Debug, Clone, Serialize, Deserialize)]
407#[serde(default)]
408pub struct AdapterConfig {
409    pub driver: AdapterDriver,
410    pub redis: RedisAdapterConfig,
411    pub cluster: RedisClusterAdapterConfig,
412    pub nats: NatsAdapterConfig,
413    pub pulsar: PulsarAdapterConfig,
414    pub rabbitmq: RabbitMqAdapterConfig,
415    pub google_pubsub: GooglePubSubAdapterConfig,
416    pub kafka: KafkaAdapterConfig,
417    #[serde(default = "default_buffer_multiplier_per_cpu")]
418    pub buffer_multiplier_per_cpu: usize,
419    pub cluster_health: ClusterHealthConfig,
420    #[serde(default = "default_enable_socket_counting")]
421    pub enable_socket_counting: bool,
422    #[serde(default = "default_fallback_to_local")]
423    pub fallback_to_local: bool,
424}
425
426fn default_enable_socket_counting() -> bool {
427    true
428}
429
430fn default_fallback_to_local() -> bool {
431    true
432}
433
434fn default_buffer_multiplier_per_cpu() -> usize {
435    64
436}
437
438impl Default for AdapterConfig {
439    fn default() -> Self {
440        Self {
441            driver: AdapterDriver::default(),
442            redis: RedisAdapterConfig::default(),
443            cluster: RedisClusterAdapterConfig::default(),
444            nats: NatsAdapterConfig::default(),
445            pulsar: PulsarAdapterConfig::default(),
446            rabbitmq: RabbitMqAdapterConfig::default(),
447            google_pubsub: GooglePubSubAdapterConfig::default(),
448            kafka: KafkaAdapterConfig::default(),
449            buffer_multiplier_per_cpu: default_buffer_multiplier_per_cpu(),
450            cluster_health: ClusterHealthConfig::default(),
451            enable_socket_counting: default_enable_socket_counting(),
452            fallback_to_local: default_fallback_to_local(),
453        }
454    }
455}
456
457#[derive(Debug, Clone, Serialize, Deserialize)]
458#[serde(default)]
459pub struct RedisAdapterConfig {
460    pub requests_timeout: u64,
461    pub prefix: String,
462    pub redis_pub_options: AHashMap<String, sonic_rs::Value>,
463    pub redis_sub_options: AHashMap<String, sonic_rs::Value>,
464    pub cluster_mode: bool,
465}
466
467#[derive(Debug, Clone, Serialize, Deserialize)]
468#[serde(default)]
469pub struct RedisClusterAdapterConfig {
470    pub nodes: Vec<String>,
471    pub prefix: String,
472    pub request_timeout_ms: u64,
473    pub use_connection_manager: bool,
474    #[serde(default)]
475    pub use_sharded_pubsub: bool,
476}
477
478#[derive(Debug, Clone, Serialize, Deserialize)]
479#[serde(default)]
480pub struct NatsAdapterConfig {
481    pub servers: Vec<String>,
482    pub prefix: String,
483    pub request_timeout_ms: u64,
484    pub username: Option<String>,
485    pub password: Option<String>,
486    pub token: Option<String>,
487    pub connection_timeout_ms: u64,
488    pub nodes_number: Option<u32>,
489    pub discovery_max_wait_ms: u64,
490    pub discovery_idle_wait_ms: u64,
491}
492
493#[derive(Debug, Clone, Serialize, Deserialize)]
494#[serde(default)]
495pub struct PulsarAdapterConfig {
496    pub url: String,
497    pub prefix: String,
498    pub request_timeout_ms: u64,
499    pub token: Option<String>,
500    pub nodes_number: Option<u32>,
501}
502
503#[derive(Debug, Clone, Serialize, Deserialize)]
504#[serde(default)]
505pub struct RabbitMqAdapterConfig {
506    pub url: String,
507    pub prefix: String,
508    pub request_timeout_ms: u64,
509    pub connection_timeout_ms: u64,
510    pub nodes_number: Option<u32>,
511}
512
513#[derive(Debug, Clone, Serialize, Deserialize)]
514#[serde(default)]
515pub struct GooglePubSubAdapterConfig {
516    pub project_id: String,
517    pub prefix: String,
518    pub request_timeout_ms: u64,
519    pub emulator_host: Option<String>,
520    pub nodes_number: Option<u32>,
521}
522
523#[derive(Debug, Clone, Serialize, Deserialize)]
524#[serde(default)]
525pub struct KafkaAdapterConfig {
526    pub brokers: Vec<String>,
527    pub prefix: String,
528    pub request_timeout_ms: u64,
529    pub security_protocol: Option<String>,
530    pub sasl_mechanism: Option<String>,
531    pub sasl_username: Option<String>,
532    pub sasl_password: Option<String>,
533    pub nodes_number: Option<u32>,
534}
535
536#[derive(Debug, Clone, Serialize, Deserialize, Default)]
537#[serde(default)]
538pub struct AppManagerConfig {
539    pub driver: AppManagerDriver,
540    pub array: ArrayConfig,
541    pub cache: CacheSettings,
542    pub scylladb: ScyllaDbSettings,
543}
544
545#[derive(Debug, Clone, Serialize, Deserialize, Default)]
546#[serde(default)]
547pub struct ArrayConfig {
548    pub apps: Vec<App>,
549}
550
551#[derive(Debug, Clone, Serialize, Deserialize)]
552#[serde(default)]
553pub struct CacheSettings {
554    pub enabled: bool,
555    pub ttl: u64,
556}
557
558#[derive(Debug, Clone, Serialize, Deserialize)]
559#[serde(default)]
560pub struct MemoryCacheOptions {
561    pub ttl: u64,
562    pub cleanup_interval: u64,
563    pub max_capacity: u64,
564}
565
566#[derive(Debug, Clone, Serialize, Deserialize)]
567#[serde(default)]
568pub struct CacheConfig {
569    pub driver: CacheDriver,
570    pub redis: RedisConfig,
571    pub memory: MemoryCacheOptions,
572}
573
574#[derive(Debug, Clone, Serialize, Deserialize, Default)]
575#[serde(default)]
576pub struct RedisConfig {
577    pub prefix: Option<String>,
578    pub url_override: Option<String>,
579    pub cluster_mode: bool,
580}
581
582#[derive(Debug, Clone, Serialize, Deserialize)]
583#[serde(default)]
584pub struct ChannelLimits {
585    pub max_name_length: u32,
586    pub cache_ttl: u64,
587}
588
589#[derive(Debug, Clone, Serialize, Deserialize)]
590#[serde(default)]
591pub struct CorsConfig {
592    pub credentials: bool,
593    #[serde(deserialize_with = "deserialize_and_validate_cors_origins")]
594    pub origin: Vec<String>,
595    pub methods: Vec<String>,
596    pub allowed_headers: Vec<String>,
597}
598
599fn deserialize_and_validate_cors_origins<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
600where
601    D: serde::Deserializer<'de>,
602{
603    use serde::de::Error;
604    let origins = Vec::<String>::deserialize(deserializer)?;
605
606    if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&origins) {
607        return Err(D::Error::custom(format!(
608            "CORS origin pattern validation failed: {}",
609            e
610        )));
611    }
612
613    Ok(origins)
614}
615
616#[derive(Debug, Clone, Serialize, Deserialize, Default)]
617#[serde(default)]
618pub struct DatabaseConfig {
619    pub mysql: DatabaseConnection,
620    pub postgres: DatabaseConnection,
621    pub redis: RedisConnection,
622    pub dynamodb: DynamoDbSettings,
623    pub surrealdb: SurrealDbSettings,
624    pub scylladb: ScyllaDbSettings,
625}
626
627#[derive(Debug, Clone, Serialize, Deserialize)]
628#[serde(default)]
629pub struct DatabaseConnection {
630    pub host: String,
631    pub port: u16,
632    pub username: String,
633    pub password: String,
634    pub database: String,
635    pub table_name: String,
636    pub connection_pool_size: u32,
637    pub pool_min: Option<u32>,
638    pub pool_max: Option<u32>,
639    pub cache_ttl: u64,
640    pub cache_cleanup_interval: u64,
641    pub cache_max_capacity: u64,
642}
643
644#[derive(Debug, Clone, Serialize, Deserialize)]
645#[serde(default)]
646pub struct RedisConnection {
647    pub host: String,
648    pub port: u16,
649    pub db: u32,
650    pub username: Option<String>,
651    pub password: Option<String>,
652    pub key_prefix: String,
653    pub sentinels: Vec<RedisSentinel>,
654    pub sentinel_password: Option<String>,
655    pub name: String,
656    pub cluster: RedisClusterConnection,
657    /// Legacy field kept for backward compatibility. Prefer `database.redis.cluster.nodes`.
658    pub cluster_nodes: Vec<ClusterNode>,
659}
660
661#[derive(Debug, Clone, Serialize, Deserialize)]
662#[serde(default)]
663pub struct RedisSentinel {
664    pub host: String,
665    pub port: u16,
666}
667
668#[derive(Debug, Clone, Serialize, Deserialize, Default)]
669#[serde(default)]
670pub struct RedisClusterConnection {
671    pub nodes: Vec<ClusterNode>,
672    pub username: Option<String>,
673    pub password: Option<String>,
674    #[serde(alias = "useTLS")]
675    pub use_tls: bool,
676}
677
678impl RedisConnection {
679    /// Returns true if Redis Sentinel is configured.
680    pub fn is_sentinel_configured(&self) -> bool {
681        !self.sentinels.is_empty()
682    }
683
684    /// Builds a Redis connection URL based on the configuration.
685    pub fn to_url(&self) -> String {
686        if self.is_sentinel_configured() {
687            self.build_sentinel_url()
688        } else {
689            self.build_standard_url()
690        }
691    }
692
693    fn build_standard_url(&self) -> String {
694        // Extract scheme from host if present, otherwise default to redis://
695        let (scheme, host) = if self.host.starts_with("rediss://") {
696            ("rediss://", self.host.trim_start_matches("rediss://"))
697        } else if self.host.starts_with("redis://") {
698            ("redis://", self.host.trim_start_matches("redis://"))
699        } else {
700            ("redis://", self.host.as_str())
701        };
702
703        let mut url = String::from(scheme);
704
705        if let Some(ref username) = self.username {
706            url.push_str(username);
707            if let Some(ref password) = self.password {
708                url.push(':');
709                url.push_str(&urlencoding::encode(password));
710            }
711            url.push('@');
712        } else if let Some(ref password) = self.password {
713            url.push(':');
714            url.push_str(&urlencoding::encode(password));
715            url.push('@');
716        }
717
718        url.push_str(host);
719        url.push(':');
720        url.push_str(&self.port.to_string());
721        url.push('/');
722        url.push_str(&self.db.to_string());
723
724        url
725    }
726
727    fn build_sentinel_url(&self) -> String {
728        let mut url = String::from("redis+sentinel://");
729
730        if let Some(ref sentinel_password) = self.sentinel_password {
731            url.push(':');
732            url.push_str(&urlencoding::encode(sentinel_password));
733            url.push('@');
734        }
735
736        let sentinel_hosts: Vec<String> = self
737            .sentinels
738            .iter()
739            .map(|s| format!("{}:{}", s.host, s.port))
740            .collect();
741        url.push_str(&sentinel_hosts.join(","));
742
743        url.push('/');
744        url.push_str(&self.name);
745        url.push('/');
746        url.push_str(&self.db.to_string());
747
748        let mut params = Vec::new();
749        if let Some(ref password) = self.password {
750            params.push(format!("password={}", urlencoding::encode(password)));
751        }
752        if let Some(ref username) = self.username {
753            params.push(format!("username={}", urlencoding::encode(username)));
754        }
755
756        if !params.is_empty() {
757            url.push('?');
758            url.push_str(&params.join("&"));
759        }
760
761        url
762    }
763
764    /// Returns true when cluster nodes are configured via either the new (`cluster.nodes`)
765    /// or legacy (`cluster_nodes`) field.
766    pub fn has_cluster_nodes(&self) -> bool {
767        !self.cluster.nodes.is_empty() || !self.cluster_nodes.is_empty()
768    }
769
770    /// Returns normalized Redis Cluster seed URLs from the canonical cluster configuration.
771    /// Falls back to legacy `cluster_nodes` for backward compatibility.
772    pub fn cluster_node_urls(&self) -> Vec<String> {
773        if !self.cluster.nodes.is_empty() {
774            return self.build_cluster_urls(&self.cluster.nodes);
775        }
776        self.build_cluster_urls(&self.cluster_nodes)
777    }
778
779    /// Normalizes any list of seed strings (`host:port`, `redis://...`, `rediss://...`) using
780    /// shared cluster auth/TLS options.
781    pub fn normalize_cluster_seed_urls(&self, seeds: &[String]) -> Vec<String> {
782        self.build_cluster_urls(
783            &seeds
784                .iter()
785                .filter_map(|seed| ClusterNode::from_seed(seed))
786                .collect::<Vec<ClusterNode>>(),
787        )
788    }
789
790    fn build_cluster_urls(&self, nodes: &[ClusterNode]) -> Vec<String> {
791        let username = self
792            .cluster
793            .username
794            .as_deref()
795            .or(self.username.as_deref());
796        let password = self
797            .cluster
798            .password
799            .as_deref()
800            .or(self.password.as_deref());
801        let use_tls = self.cluster.use_tls;
802
803        nodes
804            .iter()
805            .map(|node| node.to_url_with_options(use_tls, username, password))
806            .collect()
807    }
808}
809
810impl RedisSentinel {
811    pub fn to_host_port(&self) -> String {
812        format!("{}:{}", self.host, self.port)
813    }
814}
815
816#[derive(Debug, Clone, Serialize, Deserialize)]
817#[serde(default)]
818pub struct ClusterNode {
819    pub host: String,
820    pub port: u16,
821}
822
823impl ClusterNode {
824    pub fn to_url(&self) -> String {
825        self.to_url_with_options(false, None, None)
826    }
827
828    pub fn to_url_with_options(
829        &self,
830        use_tls: bool,
831        username: Option<&str>,
832        password: Option<&str>,
833    ) -> String {
834        let host = self.host.trim();
835
836        if host.starts_with("redis://") || host.starts_with("rediss://") {
837            if let Ok(parsed) = Url::parse(host)
838                && let Some(host_str) = parsed.host_str()
839            {
840                let scheme = parsed.scheme();
841                let port = parsed.port_or_known_default().unwrap_or(self.port);
842                let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
843                let parsed_password = parsed.password();
844                let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
845                let (effective_username, effective_password) = if has_embedded_auth {
846                    (parsed_username, parsed_password)
847                } else {
848                    (username, password)
849                };
850
851                return build_redis_url(
852                    scheme,
853                    host_str,
854                    port,
855                    effective_username,
856                    effective_password,
857                );
858            }
859
860            // Fallback for malformed URLs
861            let has_port = if let Some(bracket_pos) = host.rfind(']') {
862                host[bracket_pos..].contains(':')
863            } else {
864                host.split(':').count() >= 3
865            };
866            let base = if has_port {
867                host.to_string()
868            } else {
869                format!("{}:{}", host, self.port)
870            };
871
872            if let Ok(parsed) = Url::parse(&base) {
873                let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
874                let parsed_password = parsed.password();
875                if let Some(host_str) = parsed.host_str() {
876                    let port = parsed.port_or_known_default().unwrap_or(self.port);
877                    let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
878                    let (effective_username, effective_password) = if has_embedded_auth {
879                        (parsed_username, parsed_password)
880                    } else {
881                        (username, password)
882                    };
883                    return build_redis_url(
884                        parsed.scheme(),
885                        host_str,
886                        port,
887                        effective_username,
888                        effective_password,
889                    );
890                }
891            }
892            return base;
893        }
894
895        let (normalized_host, normalized_port) = split_plain_host_and_port(host, self.port);
896        let scheme = if use_tls { "rediss" } else { "redis" };
897        build_redis_url(
898            scheme,
899            &normalized_host,
900            normalized_port,
901            username,
902            password,
903        )
904    }
905
906    pub fn from_seed(seed: &str) -> Option<Self> {
907        let trimmed = seed.trim();
908        if trimmed.is_empty() {
909            return None;
910        }
911
912        if trimmed.starts_with("redis://") || trimmed.starts_with("rediss://") {
913            let port = Url::parse(trimmed)
914                .ok()
915                .and_then(|parsed| parsed.port_or_known_default())
916                .unwrap_or(6379);
917            return Some(Self {
918                host: trimmed.to_string(),
919                port,
920            });
921        }
922
923        let (host, port) = split_plain_host_and_port(trimmed, 6379);
924        Some(Self { host, port })
925    }
926}
927
928fn split_plain_host_and_port(raw_host: &str, default_port: u16) -> (String, u16) {
929    let host = raw_host.trim();
930
931    // Handle bracketed IPv6: [::1]:6379
932    if host.starts_with('[') {
933        if let Some(end_bracket) = host.find(']') {
934            let host_part = host[1..end_bracket].to_string();
935            let remainder = &host[end_bracket + 1..];
936            if let Some(port_str) = remainder.strip_prefix(':')
937                && let Ok(port) = port_str.parse::<u16>()
938            {
939                return (host_part, port);
940            }
941            return (host_part, default_port);
942        }
943        return (host.to_string(), default_port);
944    }
945
946    // Handle hostname/IP with port: host:6379
947    if host.matches(':').count() == 1
948        && let Some((host_part, port_part)) = host.rsplit_once(':')
949        && let Ok(port) = port_part.parse::<u16>()
950    {
951        return (host_part.to_string(), port);
952    }
953
954    (host.to_string(), default_port)
955}
956
957fn build_redis_url(
958    scheme: &str,
959    host: &str,
960    port: u16,
961    username: Option<&str>,
962    password: Option<&str>,
963) -> String {
964    let mut url = format!("{scheme}://");
965
966    if let Some(user) = username {
967        url.push_str(&urlencoding::encode(user));
968        if let Some(pass) = password {
969            url.push(':');
970            url.push_str(&urlencoding::encode(pass));
971        }
972        url.push('@');
973    } else if let Some(pass) = password {
974        url.push(':');
975        url.push_str(&urlencoding::encode(pass));
976        url.push('@');
977    }
978
979    if host.contains(':') && !host.starts_with('[') {
980        url.push('[');
981        url.push_str(host);
982        url.push(']');
983    } else {
984        url.push_str(host);
985    }
986    url.push(':');
987    url.push_str(&port.to_string());
988    url
989}
990
991#[derive(Debug, Clone, Serialize, Deserialize)]
992#[serde(default)]
993pub struct DatabasePooling {
994    pub enabled: bool,
995    pub min: u32,
996    pub max: u32,
997}
998
999#[derive(Debug, Clone, Serialize, Deserialize)]
1000#[serde(default)]
1001pub struct EventLimits {
1002    pub max_channels_at_once: u32,
1003    pub max_name_length: u32,
1004    pub max_payload_in_kb: u32,
1005    pub max_batch_size: u32,
1006}
1007
1008#[derive(Debug, Clone, Serialize, Deserialize)]
1009#[serde(default)]
1010pub struct IdempotencyConfig {
1011    /// Whether idempotency key support is enabled
1012    pub enabled: bool,
1013    /// TTL in seconds for idempotency keys (default: 120s, like Ably's 2-minute window)
1014    pub ttl_seconds: u64,
1015    /// Maximum length of an idempotency key
1016    pub max_key_length: usize,
1017}
1018
1019#[derive(Debug, Clone, Serialize, Deserialize)]
1020#[serde(default)]
1021pub struct EphemeralConfig {
1022    /// Whether ephemeral message handling is enabled.
1023    pub enabled: bool,
1024}
1025
1026#[derive(Debug, Clone, Serialize, Deserialize)]
1027#[serde(default)]
1028pub struct EchoControlConfig {
1029    /// Whether connection-level and per-message echo control is enabled.
1030    pub enabled: bool,
1031    /// Default echo behavior for new V2 connections when the query param is omitted.
1032    pub default_echo_messages: bool,
1033}
1034
1035#[derive(Debug, Clone, Serialize, Deserialize)]
1036#[serde(default)]
1037pub struct EventNameFilteringConfig {
1038    /// Whether per-subscription event name filtering is enabled.
1039    pub enabled: bool,
1040    /// Maximum event names allowed in a single filter.
1041    pub max_events_per_filter: usize,
1042    /// Maximum length for each event name in a filter.
1043    pub max_event_name_length: usize,
1044}
1045
1046#[derive(Debug, Clone, Serialize, Deserialize)]
1047#[serde(default)]
1048pub struct ConnectionRecoveryConfig {
1049    /// Whether connection recovery (resume) is enabled.
1050    /// When enabled, the server keeps a bounded replay buffer per channel so that
1051    /// reconnecting clients can receive missed messages. Disabled by default for
1052    /// Pusher protocol compatibility.
1053    pub enabled: bool,
1054    /// How long messages stay in the replay buffer (seconds). Default: 120 (2 min).
1055    pub buffer_ttl_seconds: u64,
1056    /// Maximum number of messages kept per channel. Default: 100.
1057    pub max_buffer_size: usize,
1058}
1059
1060#[derive(Debug, Clone, Serialize, Deserialize)]
1061#[serde(default)]
1062pub struct HttpApiConfig {
1063    pub request_limit_in_mb: u32,
1064    pub accept_traffic: AcceptTraffic,
1065    pub usage_enabled: bool,
1066}
1067
1068#[derive(Debug, Clone, Serialize, Deserialize)]
1069#[serde(default)]
1070pub struct AcceptTraffic {
1071    pub memory_threshold: f64,
1072}
1073
1074#[derive(Debug, Clone, Serialize, Deserialize)]
1075#[serde(default)]
1076pub struct InstanceConfig {
1077    pub process_id: String,
1078}
1079
1080#[derive(Debug, Clone, Serialize, Deserialize)]
1081#[serde(default)]
1082pub struct MetricsConfig {
1083    pub enabled: bool,
1084    pub driver: MetricsDriver,
1085    pub host: String,
1086    pub prometheus: PrometheusConfig,
1087    pub port: u16,
1088}
1089
1090#[derive(Debug, Clone, Serialize, Deserialize)]
1091#[serde(default)]
1092pub struct PrometheusConfig {
1093    pub prefix: String,
1094}
1095
1096#[derive(Debug, Clone, Serialize, Deserialize)]
1097#[serde(default)]
1098pub struct LoggingConfig {
1099    pub colors_enabled: bool,
1100    pub include_target: bool,
1101}
1102
1103#[derive(Debug, Clone, Serialize, Deserialize)]
1104#[serde(default)]
1105pub struct PresenceConfig {
1106    pub max_members_per_channel: u32,
1107    pub max_member_size_in_kb: u32,
1108}
1109
1110/// WebSocket connection buffer configuration
1111/// Controls backpressure handling for slow consumers
1112#[derive(Debug, Clone, Serialize, Deserialize)]
1113#[serde(default)]
1114pub struct WebSocketConfig {
1115    pub max_messages: Option<usize>,
1116    pub max_bytes: Option<usize>,
1117    pub disconnect_on_buffer_full: bool,
1118    pub max_message_size: usize,
1119    pub max_frame_size: usize,
1120    pub write_buffer_size: usize,
1121    pub max_backpressure: usize,
1122    pub auto_ping: bool,
1123    pub ping_interval: u32,
1124    pub idle_timeout: u32,
1125    pub compression: String,
1126}
1127
1128impl Default for WebSocketConfig {
1129    fn default() -> Self {
1130        Self {
1131            max_messages: Some(1000),
1132            max_bytes: None,
1133            disconnect_on_buffer_full: true,
1134            max_message_size: 64 * 1024 * 1024,
1135            max_frame_size: 16 * 1024 * 1024,
1136            write_buffer_size: 16 * 1024,
1137            max_backpressure: 1024 * 1024,
1138            auto_ping: true,
1139            ping_interval: 30,
1140            idle_timeout: 120,
1141            compression: "disabled".to_string(),
1142        }
1143    }
1144}
1145
1146impl WebSocketConfig {
1147    /// Convert to WebSocketBufferConfig for runtime use
1148    pub fn to_buffer_config(&self) -> crate::websocket::WebSocketBufferConfig {
1149        use crate::websocket::{BufferLimit, WebSocketBufferConfig};
1150
1151        let limit = match (self.max_messages, self.max_bytes) {
1152            (Some(messages), Some(bytes)) => BufferLimit::Both { messages, bytes },
1153            (Some(messages), None) => BufferLimit::Messages(messages),
1154            (None, Some(bytes)) => BufferLimit::Bytes(bytes),
1155            (None, None) => BufferLimit::Messages(1000),
1156        };
1157
1158        WebSocketBufferConfig {
1159            limit,
1160            disconnect_on_full: self.disconnect_on_buffer_full,
1161        }
1162    }
1163
1164    /// Convert to native sockudo-ws runtime configuration.
1165    pub fn to_sockudo_ws_config(
1166        &self,
1167        websocket_max_payload_kb: u32,
1168        activity_timeout: u64,
1169    ) -> sockudo_ws::Config {
1170        use sockudo_ws::Compression;
1171
1172        let compression = match self.compression.to_lowercase().as_str() {
1173            "dedicated" => Compression::Dedicated,
1174            "shared" => Compression::Shared,
1175            "window256b" => Compression::Window256B,
1176            "window1kb" => Compression::Window1KB,
1177            "window2kb" => Compression::Window2KB,
1178            "window4kb" => Compression::Window4KB,
1179            "window8kb" => Compression::Window8KB,
1180            "window16kb" => Compression::Window16KB,
1181            "window32kb" => Compression::Window32KB,
1182            _ => Compression::Disabled,
1183        };
1184
1185        sockudo_ws::Config::builder()
1186            .max_payload_length(
1187                self.max_bytes
1188                    .unwrap_or(websocket_max_payload_kb as usize * 1024),
1189            )
1190            .max_message_size(self.max_message_size)
1191            .max_frame_size(self.max_frame_size)
1192            .write_buffer_size(self.write_buffer_size)
1193            .max_backpressure(self.max_backpressure)
1194            .idle_timeout(self.idle_timeout)
1195            .auto_ping(self.auto_ping)
1196            .ping_interval(self.ping_interval.max((activity_timeout / 2).max(5) as u32))
1197            .compression(compression)
1198            .build()
1199    }
1200}
1201
1202#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1203#[serde(default)]
1204pub struct QueueConfig {
1205    pub driver: QueueDriver,
1206    pub redis: RedisQueueConfig,
1207    pub redis_cluster: RedisClusterQueueConfig,
1208    pub sqs: SqsQueueConfig,
1209    pub sns: SnsQueueConfig,
1210}
1211
1212#[derive(Debug, Clone, Serialize, Deserialize)]
1213#[serde(default)]
1214pub struct RedisQueueConfig {
1215    pub concurrency: u32,
1216    pub prefix: Option<String>,
1217    pub url_override: Option<String>,
1218    pub cluster_mode: bool,
1219}
1220
1221#[derive(Clone, Debug, Serialize, Deserialize)]
1222#[serde(default)]
1223pub struct RateLimit {
1224    pub max_requests: u32,
1225    pub window_seconds: u64,
1226    pub identifier: Option<String>,
1227    pub trust_hops: Option<u32>,
1228}
1229
1230#[derive(Debug, Clone, Serialize, Deserialize)]
1231#[serde(default)]
1232pub struct RateLimiterConfig {
1233    pub enabled: bool,
1234    pub driver: CacheDriver,
1235    pub api_rate_limit: RateLimit,
1236    pub websocket_rate_limit: RateLimit,
1237    pub redis: RedisConfig,
1238}
1239
1240#[derive(Debug, Clone, Serialize, Deserialize)]
1241#[serde(default)]
1242pub struct SslConfig {
1243    pub enabled: bool,
1244    pub cert_path: String,
1245    pub key_path: String,
1246    pub passphrase: Option<String>,
1247    pub ca_path: Option<String>,
1248    pub redirect_http: bool,
1249    pub http_port: Option<u16>,
1250}
1251
1252#[derive(Debug, Clone, Serialize, Deserialize)]
1253#[serde(default)]
1254pub struct WebhooksConfig {
1255    pub batching: BatchingConfig,
1256    pub retry: WebhookRetryConfig,
1257    pub request_timeout_ms: u64,
1258}
1259
1260#[derive(Debug, Clone, Serialize, Deserialize)]
1261#[serde(default)]
1262pub struct BatchingConfig {
1263    pub enabled: bool,
1264    pub duration: u64,
1265    pub size: usize,
1266}
1267
1268#[derive(Debug, Clone, Serialize, Deserialize)]
1269#[serde(default)]
1270pub struct WebhookRetryConfig {
1271    pub enabled: bool,
1272    pub max_attempts: Option<u32>,
1273    pub max_elapsed_time_ms: u64,
1274    pub initial_backoff_ms: u64,
1275    pub max_backoff_ms: u64,
1276}
1277
1278impl Default for WebhooksConfig {
1279    fn default() -> Self {
1280        Self {
1281            batching: BatchingConfig::default(),
1282            retry: WebhookRetryConfig::default(),
1283            request_timeout_ms: 10_000,
1284        }
1285    }
1286}
1287
1288impl Default for WebhookRetryConfig {
1289    fn default() -> Self {
1290        Self {
1291            enabled: true,
1292            max_attempts: None,
1293            max_elapsed_time_ms: 300_000,
1294            initial_backoff_ms: 1_000,
1295            max_backoff_ms: 60_000,
1296        }
1297    }
1298}
1299
1300#[derive(Debug, Clone, Serialize, Deserialize)]
1301#[serde(default)]
1302pub struct ClusterHealthConfig {
1303    pub enabled: bool,
1304    pub heartbeat_interval_ms: u64,
1305    pub node_timeout_ms: u64,
1306    pub cleanup_interval_ms: u64,
1307}
1308
1309#[derive(Debug, Clone, Serialize, Deserialize)]
1310#[serde(default)]
1311pub struct UnixSocketConfig {
1312    pub enabled: bool,
1313    pub path: String,
1314    #[serde(deserialize_with = "deserialize_octal_permission")]
1315    pub permission_mode: u32,
1316}
1317
1318#[derive(Debug, Clone, Serialize, Deserialize)]
1319#[serde(default)]
1320pub struct DeltaCompressionOptionsConfig {
1321    pub enabled: bool,
1322    pub algorithm: String,
1323    pub full_message_interval: u32,
1324    pub min_message_size: usize,
1325    pub max_state_age_secs: u64,
1326    pub max_channel_states_per_socket: usize,
1327    pub max_conflation_states_per_channel: Option<usize>,
1328    pub conflation_key_path: Option<String>,
1329    pub cluster_coordination: bool,
1330    pub omit_delta_algorithm: bool,
1331}
1332
1333/// Cleanup system configuration (minimal version for options; full impl lives in sockudo crate)
1334#[derive(Debug, Clone, Serialize, Deserialize)]
1335#[serde(default)]
1336pub struct CleanupConfig {
1337    pub queue_buffer_size: usize,
1338    pub batch_size: usize,
1339    pub batch_timeout_ms: u64,
1340    pub worker_threads: WorkerThreadsConfig,
1341    pub max_retry_attempts: u32,
1342    pub async_enabled: bool,
1343    pub fallback_to_sync: bool,
1344}
1345
1346/// Worker threads configuration for the cleanup system
1347#[derive(Debug, Clone)]
1348pub enum WorkerThreadsConfig {
1349    Auto,
1350    Fixed(usize),
1351}
1352
1353impl serde::Serialize for WorkerThreadsConfig {
1354    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1355    where
1356        S: serde::Serializer,
1357    {
1358        match self {
1359            WorkerThreadsConfig::Auto => serializer.serialize_str("auto"),
1360            WorkerThreadsConfig::Fixed(n) => serializer.serialize_u64(*n as u64),
1361        }
1362    }
1363}
1364
1365impl<'de> serde::Deserialize<'de> for WorkerThreadsConfig {
1366    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1367    where
1368        D: serde::Deserializer<'de>,
1369    {
1370        use serde::de;
1371        struct WorkerThreadsVisitor;
1372        impl<'de> de::Visitor<'de> for WorkerThreadsVisitor {
1373            type Value = WorkerThreadsConfig;
1374
1375            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1376                formatter.write_str(r#""auto" or a positive integer"#)
1377            }
1378
1379            fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
1380                if value.eq_ignore_ascii_case("auto") {
1381                    Ok(WorkerThreadsConfig::Auto)
1382                } else if let Ok(n) = value.parse::<usize>() {
1383                    Ok(WorkerThreadsConfig::Fixed(n))
1384                } else {
1385                    Err(E::custom(format!(
1386                        "expected 'auto' or a number, got '{value}'"
1387                    )))
1388                }
1389            }
1390
1391            fn visit_u64<E: de::Error>(self, value: u64) -> Result<Self::Value, E> {
1392                Ok(WorkerThreadsConfig::Fixed(value as usize))
1393            }
1394
1395            fn visit_i64<E: de::Error>(self, value: i64) -> Result<Self::Value, E> {
1396                if value >= 0 {
1397                    Ok(WorkerThreadsConfig::Fixed(value as usize))
1398                } else {
1399                    Err(E::custom("worker_threads must be non-negative"))
1400                }
1401            }
1402        }
1403        deserializer.deserialize_any(WorkerThreadsVisitor)
1404    }
1405}
1406
1407impl Default for CleanupConfig {
1408    fn default() -> Self {
1409        Self {
1410            queue_buffer_size: 1024,
1411            batch_size: 64,
1412            batch_timeout_ms: 100,
1413            worker_threads: WorkerThreadsConfig::Auto,
1414            max_retry_attempts: 3,
1415            async_enabled: true,
1416            fallback_to_sync: true,
1417        }
1418    }
1419}
1420
1421impl CleanupConfig {
1422    pub fn validate(&self) -> Result<(), String> {
1423        if self.queue_buffer_size == 0 {
1424            return Err("queue_buffer_size must be greater than 0".to_string());
1425        }
1426        if self.batch_size == 0 {
1427            return Err("batch_size must be greater than 0".to_string());
1428        }
1429        if self.batch_timeout_ms == 0 {
1430            return Err("batch_timeout_ms must be greater than 0".to_string());
1431        }
1432        if let WorkerThreadsConfig::Fixed(n) = self.worker_threads
1433            && n == 0
1434        {
1435            return Err("worker_threads must be greater than 0 when using fixed count".to_string());
1436        }
1437        Ok(())
1438    }
1439}
1440
1441#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1442#[serde(default)]
1443pub struct TagFilteringConfig {
1444    #[serde(default)]
1445    pub enabled: bool,
1446    #[serde(default = "default_true")]
1447    pub enable_tags: bool,
1448}
1449
1450fn default_true() -> bool {
1451    true
1452}
1453
1454// --- Default Implementations ---
1455
1456impl Default for ServerOptions {
1457    fn default() -> Self {
1458        Self {
1459            adapter: AdapterConfig::default(),
1460            app_manager: AppManagerConfig::default(),
1461            cache: CacheConfig::default(),
1462            channel_limits: ChannelLimits::default(),
1463            cors: CorsConfig::default(),
1464            database: DatabaseConfig::default(),
1465            database_pooling: DatabasePooling::default(),
1466            debug: false,
1467            tag_filtering: TagFilteringConfig::default(),
1468            event_limits: EventLimits::default(),
1469            host: "0.0.0.0".to_string(),
1470            http_api: HttpApiConfig::default(),
1471            instance: InstanceConfig::default(),
1472            logging: None,
1473            metrics: MetricsConfig::default(),
1474            mode: "production".to_string(),
1475            port: 6001,
1476            path_prefix: "/".to_string(),
1477            presence: PresenceConfig::default(),
1478            queue: QueueConfig::default(),
1479            rate_limiter: RateLimiterConfig::default(),
1480            shutdown_grace_period: 10,
1481            ssl: SslConfig::default(),
1482            user_authentication_timeout: 3600,
1483            webhooks: WebhooksConfig::default(),
1484            websocket_max_payload_kb: 64,
1485            cleanup: CleanupConfig::default(),
1486            activity_timeout: 120,
1487            cluster_health: ClusterHealthConfig::default(),
1488            unix_socket: UnixSocketConfig::default(),
1489            delta_compression: DeltaCompressionOptionsConfig::default(),
1490            websocket: WebSocketConfig::default(),
1491            connection_recovery: ConnectionRecoveryConfig::default(),
1492            idempotency: IdempotencyConfig::default(),
1493            ephemeral: EphemeralConfig::default(),
1494            echo_control: EchoControlConfig::default(),
1495            event_name_filtering: EventNameFilteringConfig::default(),
1496        }
1497    }
1498}
1499
1500impl Default for SqsQueueConfig {
1501    fn default() -> Self {
1502        Self {
1503            region: "us-east-1".to_string(),
1504            queue_url_prefix: None,
1505            visibility_timeout: 30,
1506            endpoint_url: None,
1507            max_messages: 10,
1508            wait_time_seconds: 5,
1509            concurrency: 5,
1510            fifo: false,
1511            message_group_id: Some("default".to_string()),
1512        }
1513    }
1514}
1515
1516impl Default for SnsQueueConfig {
1517    fn default() -> Self {
1518        Self {
1519            region: "us-east-1".to_string(),
1520            topic_arn: String::new(),
1521            endpoint_url: None,
1522        }
1523    }
1524}
1525
1526impl Default for RedisAdapterConfig {
1527    fn default() -> Self {
1528        Self {
1529            requests_timeout: 5000,
1530            prefix: "sockudo_adapter:".to_string(),
1531            redis_pub_options: AHashMap::new(),
1532            redis_sub_options: AHashMap::new(),
1533            cluster_mode: false,
1534        }
1535    }
1536}
1537
1538impl Default for RedisClusterAdapterConfig {
1539    fn default() -> Self {
1540        Self {
1541            nodes: vec![],
1542            prefix: "sockudo_adapter:".to_string(),
1543            request_timeout_ms: 1000,
1544            use_connection_manager: true,
1545            use_sharded_pubsub: false,
1546        }
1547    }
1548}
1549
1550impl Default for NatsAdapterConfig {
1551    fn default() -> Self {
1552        Self {
1553            servers: vec!["nats://localhost:4222".to_string()],
1554            prefix: "sockudo_adapter:".to_string(),
1555            request_timeout_ms: 5000,
1556            username: None,
1557            password: None,
1558            token: None,
1559            connection_timeout_ms: 5000,
1560            nodes_number: None,
1561            discovery_max_wait_ms: 1000,
1562            discovery_idle_wait_ms: 150,
1563        }
1564    }
1565}
1566
1567impl Default for PulsarAdapterConfig {
1568    fn default() -> Self {
1569        Self {
1570            url: "pulsar://127.0.0.1:6650".to_string(),
1571            prefix: "sockudo-adapter".to_string(),
1572            request_timeout_ms: 5000,
1573            token: None,
1574            nodes_number: None,
1575        }
1576    }
1577}
1578
1579impl Default for RabbitMqAdapterConfig {
1580    fn default() -> Self {
1581        Self {
1582            url: "amqp://guest:guest@127.0.0.1:5672/%2f".to_string(),
1583            prefix: "sockudo_adapter".to_string(),
1584            request_timeout_ms: 5000,
1585            connection_timeout_ms: 5000,
1586            nodes_number: None,
1587        }
1588    }
1589}
1590
1591impl Default for GooglePubSubAdapterConfig {
1592    fn default() -> Self {
1593        Self {
1594            project_id: "".to_string(),
1595            prefix: "sockudo-adapter".to_string(),
1596            request_timeout_ms: 5000,
1597            emulator_host: None,
1598            nodes_number: None,
1599        }
1600    }
1601}
1602
1603impl Default for KafkaAdapterConfig {
1604    fn default() -> Self {
1605        Self {
1606            brokers: vec!["localhost:9092".to_string()],
1607            prefix: "sockudo_adapter".to_string(),
1608            request_timeout_ms: 5000,
1609            security_protocol: None,
1610            sasl_mechanism: None,
1611            sasl_username: None,
1612            sasl_password: None,
1613            nodes_number: None,
1614        }
1615    }
1616}
1617
1618impl Default for CacheSettings {
1619    fn default() -> Self {
1620        Self {
1621            enabled: true,
1622            ttl: 300,
1623        }
1624    }
1625}
1626
1627impl Default for MemoryCacheOptions {
1628    fn default() -> Self {
1629        Self {
1630            ttl: 300,
1631            cleanup_interval: 60,
1632            max_capacity: 10000,
1633        }
1634    }
1635}
1636
1637impl Default for CacheConfig {
1638    fn default() -> Self {
1639        Self {
1640            driver: CacheDriver::default(),
1641            redis: RedisConfig {
1642                prefix: Some("sockudo_cache:".to_string()),
1643                url_override: None,
1644                cluster_mode: false,
1645            },
1646            memory: MemoryCacheOptions::default(),
1647        }
1648    }
1649}
1650
1651impl Default for ChannelLimits {
1652    fn default() -> Self {
1653        Self {
1654            max_name_length: 200,
1655            cache_ttl: 3600,
1656        }
1657    }
1658}
1659impl Default for CorsConfig {
1660    fn default() -> Self {
1661        Self {
1662            credentials: true,
1663            origin: vec!["*".to_string()],
1664            methods: vec!["GET".to_string(), "POST".to_string(), "OPTIONS".to_string()],
1665            allowed_headers: vec![
1666                "Authorization".to_string(),
1667                "Content-Type".to_string(),
1668                "X-Requested-With".to_string(),
1669                "Accept".to_string(),
1670            ],
1671        }
1672    }
1673}
1674
1675impl Default for DatabaseConnection {
1676    fn default() -> Self {
1677        Self {
1678            host: "localhost".to_string(),
1679            port: 3306,
1680            username: "root".to_string(),
1681            password: "".to_string(),
1682            database: "sockudo".to_string(),
1683            table_name: "applications".to_string(),
1684            connection_pool_size: 10,
1685            pool_min: None,
1686            pool_max: None,
1687            cache_ttl: 300,
1688            cache_cleanup_interval: 60,
1689            cache_max_capacity: 100,
1690        }
1691    }
1692}
1693
1694impl Default for RedisConnection {
1695    fn default() -> Self {
1696        Self {
1697            host: "127.0.0.1".to_string(),
1698            port: 6379,
1699            db: 0,
1700            username: None,
1701            password: None,
1702            key_prefix: "sockudo:".to_string(),
1703            sentinels: Vec::new(),
1704            sentinel_password: None,
1705            name: "mymaster".to_string(),
1706            cluster: RedisClusterConnection::default(),
1707            cluster_nodes: Vec::new(),
1708        }
1709    }
1710}
1711
1712impl Default for RedisSentinel {
1713    fn default() -> Self {
1714        Self {
1715            host: "localhost".to_string(),
1716            port: 26379,
1717        }
1718    }
1719}
1720
1721impl Default for ClusterNode {
1722    fn default() -> Self {
1723        Self {
1724            host: "127.0.0.1".to_string(),
1725            port: 7000,
1726        }
1727    }
1728}
1729
1730impl Default for DatabasePooling {
1731    fn default() -> Self {
1732        Self {
1733            enabled: true,
1734            min: 2,
1735            max: 10,
1736        }
1737    }
1738}
1739
1740impl Default for EventLimits {
1741    fn default() -> Self {
1742        Self {
1743            max_channels_at_once: 100,
1744            max_name_length: 200,
1745            max_payload_in_kb: 100,
1746            max_batch_size: 10,
1747        }
1748    }
1749}
1750
1751impl Default for IdempotencyConfig {
1752    fn default() -> Self {
1753        Self {
1754            enabled: true,
1755            ttl_seconds: 120,
1756            max_key_length: 128,
1757        }
1758    }
1759}
1760
1761impl Default for EphemeralConfig {
1762    fn default() -> Self {
1763        Self { enabled: true }
1764    }
1765}
1766
1767impl Default for EchoControlConfig {
1768    fn default() -> Self {
1769        Self {
1770            enabled: true,
1771            default_echo_messages: true,
1772        }
1773    }
1774}
1775
1776impl Default for EventNameFilteringConfig {
1777    fn default() -> Self {
1778        Self {
1779            enabled: true,
1780            max_events_per_filter: 50,
1781            max_event_name_length: 200,
1782        }
1783    }
1784}
1785
1786impl Default for ConnectionRecoveryConfig {
1787    fn default() -> Self {
1788        Self {
1789            enabled: false,
1790            buffer_ttl_seconds: 120,
1791            max_buffer_size: 100,
1792        }
1793    }
1794}
1795
1796impl Default for HttpApiConfig {
1797    fn default() -> Self {
1798        Self {
1799            request_limit_in_mb: 10,
1800            accept_traffic: AcceptTraffic::default(),
1801            usage_enabled: true,
1802        }
1803    }
1804}
1805
1806impl Default for AcceptTraffic {
1807    fn default() -> Self {
1808        Self {
1809            memory_threshold: 0.90,
1810        }
1811    }
1812}
1813
1814impl Default for InstanceConfig {
1815    fn default() -> Self {
1816        Self {
1817            process_id: uuid::Uuid::new_v4().to_string(),
1818        }
1819    }
1820}
1821
1822impl Default for LoggingConfig {
1823    fn default() -> Self {
1824        Self {
1825            colors_enabled: true,
1826            include_target: true,
1827        }
1828    }
1829}
1830
1831impl Default for MetricsConfig {
1832    fn default() -> Self {
1833        Self {
1834            enabled: true,
1835            driver: MetricsDriver::default(),
1836            host: "0.0.0.0".to_string(),
1837            prometheus: PrometheusConfig::default(),
1838            port: 9601,
1839        }
1840    }
1841}
1842
1843impl Default for PrometheusConfig {
1844    fn default() -> Self {
1845        Self {
1846            prefix: "sockudo_".to_string(),
1847        }
1848    }
1849}
1850
1851impl Default for PresenceConfig {
1852    fn default() -> Self {
1853        Self {
1854            max_members_per_channel: 100,
1855            max_member_size_in_kb: 2,
1856        }
1857    }
1858}
1859
1860impl Default for RedisQueueConfig {
1861    fn default() -> Self {
1862        Self {
1863            concurrency: 5,
1864            prefix: Some("sockudo_queue:".to_string()),
1865            url_override: None,
1866            cluster_mode: false,
1867        }
1868    }
1869}
1870
1871impl Default for RateLimit {
1872    fn default() -> Self {
1873        Self {
1874            max_requests: 60,
1875            window_seconds: 60,
1876            identifier: Some("default".to_string()),
1877            trust_hops: Some(0),
1878        }
1879    }
1880}
1881
1882impl Default for RateLimiterConfig {
1883    fn default() -> Self {
1884        Self {
1885            enabled: true,
1886            driver: CacheDriver::Memory,
1887            api_rate_limit: RateLimit {
1888                max_requests: 100,
1889                window_seconds: 60,
1890                identifier: Some("api".to_string()),
1891                trust_hops: Some(0),
1892            },
1893            websocket_rate_limit: RateLimit {
1894                max_requests: 20,
1895                window_seconds: 60,
1896                identifier: Some("websocket_connect".to_string()),
1897                trust_hops: Some(0),
1898            },
1899            redis: RedisConfig {
1900                prefix: Some("sockudo_rl:".to_string()),
1901                url_override: None,
1902                cluster_mode: false,
1903            },
1904        }
1905    }
1906}
1907
1908impl Default for SslConfig {
1909    fn default() -> Self {
1910        Self {
1911            enabled: false,
1912            cert_path: "".to_string(),
1913            key_path: "".to_string(),
1914            passphrase: None,
1915            ca_path: None,
1916            redirect_http: false,
1917            http_port: Some(80),
1918        }
1919    }
1920}
1921
1922impl Default for BatchingConfig {
1923    fn default() -> Self {
1924        Self {
1925            enabled: true,
1926            duration: 50,
1927            size: 100,
1928        }
1929    }
1930}
1931
1932impl Default for ClusterHealthConfig {
1933    fn default() -> Self {
1934        Self {
1935            enabled: true,
1936            heartbeat_interval_ms: 10000,
1937            node_timeout_ms: 30000,
1938            cleanup_interval_ms: 10000,
1939        }
1940    }
1941}
1942
1943impl Default for UnixSocketConfig {
1944    fn default() -> Self {
1945        Self {
1946            enabled: false,
1947            path: "/var/run/sockudo/sockudo.sock".to_string(),
1948            permission_mode: 0o660,
1949        }
1950    }
1951}
1952
1953impl Default for DeltaCompressionOptionsConfig {
1954    fn default() -> Self {
1955        Self {
1956            enabled: true,
1957            algorithm: "fossil".to_string(),
1958            full_message_interval: 10,
1959            min_message_size: 100,
1960            max_state_age_secs: 300,
1961            max_channel_states_per_socket: 100,
1962            max_conflation_states_per_channel: Some(100),
1963            conflation_key_path: None,
1964            cluster_coordination: false,
1965            omit_delta_algorithm: false,
1966        }
1967    }
1968}
1969
1970impl ClusterHealthConfig {
1971    pub fn validate(&self) -> Result<(), String> {
1972        if self.heartbeat_interval_ms == 0 {
1973            return Err("heartbeat_interval_ms must be greater than 0".to_string());
1974        }
1975        if self.node_timeout_ms == 0 {
1976            return Err("node_timeout_ms must be greater than 0".to_string());
1977        }
1978        if self.cleanup_interval_ms == 0 {
1979            return Err("cleanup_interval_ms must be greater than 0".to_string());
1980        }
1981
1982        if self.heartbeat_interval_ms > self.node_timeout_ms / 3 {
1983            return Err(format!(
1984                "heartbeat_interval_ms ({}) should be at least 3x smaller than node_timeout_ms ({}) to avoid false positive dead node detection. Recommended: heartbeat_interval_ms <= {}",
1985                self.heartbeat_interval_ms,
1986                self.node_timeout_ms,
1987                self.node_timeout_ms / 3
1988            ));
1989        }
1990
1991        if self.cleanup_interval_ms > self.node_timeout_ms {
1992            return Err(format!(
1993                "cleanup_interval_ms ({}) should not be larger than node_timeout_ms ({}) to ensure timely dead node detection",
1994                self.cleanup_interval_ms, self.node_timeout_ms
1995            ));
1996        }
1997
1998        Ok(())
1999    }
2000}
2001
2002impl ServerOptions {
2003    pub async fn load_from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
2004        let content = tokio::fs::read_to_string(path).await?;
2005        let options: Self = if path.ends_with(".toml") {
2006            toml::from_str(&content)?
2007        } else {
2008            // Legacy JSON support
2009            sonic_rs::from_str(&content)?
2010        };
2011        Ok(options)
2012    }
2013
2014    pub async fn override_from_env(&mut self) -> Result<(), Box<dyn std::error::Error>> {
2015        // --- General Settings ---
2016        if let Ok(mode) = std::env::var("ENVIRONMENT") {
2017            self.mode = mode;
2018        }
2019        self.debug = parse_bool_env("DEBUG_MODE", self.debug);
2020        if parse_bool_env("DEBUG", false) {
2021            self.debug = true;
2022            info!("DEBUG environment variable forces debug mode ON");
2023        }
2024
2025        self.activity_timeout = parse_env::<u64>("ACTIVITY_TIMEOUT", self.activity_timeout);
2026
2027        if let Ok(host) = std::env::var("HOST") {
2028            self.host = host;
2029        }
2030        self.port = parse_env::<u16>("PORT", self.port);
2031        self.shutdown_grace_period =
2032            parse_env::<u64>("SHUTDOWN_GRACE_PERIOD", self.shutdown_grace_period);
2033        self.user_authentication_timeout = parse_env::<u64>(
2034            "USER_AUTHENTICATION_TIMEOUT",
2035            self.user_authentication_timeout,
2036        );
2037        self.websocket_max_payload_kb =
2038            parse_env::<u32>("WEBSOCKET_MAX_PAYLOAD_KB", self.websocket_max_payload_kb);
2039        if let Ok(id) = std::env::var("INSTANCE_PROCESS_ID") {
2040            self.instance.process_id = id;
2041        }
2042
2043        // --- Driver Configuration ---
2044        if let Ok(driver_str) = std::env::var("ADAPTER_DRIVER") {
2045            self.adapter.driver =
2046                parse_driver_enum(driver_str, self.adapter.driver.clone(), "Adapter");
2047        }
2048        self.adapter.buffer_multiplier_per_cpu = parse_env::<usize>(
2049            "ADAPTER_BUFFER_MULTIPLIER_PER_CPU",
2050            self.adapter.buffer_multiplier_per_cpu,
2051        );
2052        self.adapter.enable_socket_counting = parse_env::<bool>(
2053            "ADAPTER_ENABLE_SOCKET_COUNTING",
2054            self.adapter.enable_socket_counting,
2055        );
2056        self.adapter.fallback_to_local =
2057            parse_env::<bool>("ADAPTER_FALLBACK_TO_LOCAL", self.adapter.fallback_to_local);
2058        if let Ok(driver_str) = std::env::var("CACHE_DRIVER") {
2059            self.cache.driver = parse_driver_enum(driver_str, self.cache.driver.clone(), "Cache");
2060        }
2061        if let Ok(driver_str) = std::env::var("QUEUE_DRIVER") {
2062            self.queue.driver = parse_driver_enum(driver_str, self.queue.driver.clone(), "Queue");
2063        }
2064        if let Ok(driver_str) = std::env::var("APP_MANAGER_DRIVER") {
2065            self.app_manager.driver =
2066                parse_driver_enum(driver_str, self.app_manager.driver.clone(), "AppManager");
2067        }
2068        if let Ok(driver_str) = std::env::var("RATE_LIMITER_DRIVER") {
2069            self.rate_limiter.driver = parse_driver_enum(
2070                driver_str,
2071                self.rate_limiter.driver.clone(),
2072                "RateLimiter Backend",
2073            );
2074        }
2075
2076        // --- Database: Redis ---
2077        if let Ok(host) = std::env::var("DATABASE_REDIS_HOST") {
2078            self.database.redis.host = host;
2079        }
2080        self.database.redis.port =
2081            parse_env::<u16>("DATABASE_REDIS_PORT", self.database.redis.port);
2082        if let Ok(username) = std::env::var("DATABASE_REDIS_USERNAME") {
2083            self.database.redis.username = if username.is_empty() {
2084                None
2085            } else {
2086                Some(username)
2087            };
2088        }
2089        if let Ok(password) = std::env::var("DATABASE_REDIS_PASSWORD") {
2090            self.database.redis.password = Some(password);
2091        }
2092        self.database.redis.db = parse_env::<u32>("DATABASE_REDIS_DB", self.database.redis.db);
2093        if let Ok(prefix) = std::env::var("DATABASE_REDIS_KEY_PREFIX") {
2094            self.database.redis.key_prefix = prefix;
2095        }
2096        if let Ok(cluster_username) = std::env::var("DATABASE_REDIS_CLUSTER_USERNAME") {
2097            self.database.redis.cluster.username = if cluster_username.is_empty() {
2098                None
2099            } else {
2100                Some(cluster_username)
2101            };
2102        }
2103        if let Ok(cluster_password) = std::env::var("DATABASE_REDIS_CLUSTER_PASSWORD") {
2104            self.database.redis.cluster.password = Some(cluster_password);
2105        }
2106        self.database.redis.cluster.use_tls = parse_bool_env(
2107            "DATABASE_REDIS_CLUSTER_USE_TLS",
2108            self.database.redis.cluster.use_tls,
2109        );
2110
2111        // --- Database: MySQL ---
2112        if let Ok(host) = std::env::var("DATABASE_MYSQL_HOST") {
2113            self.database.mysql.host = host;
2114        }
2115        self.database.mysql.port =
2116            parse_env::<u16>("DATABASE_MYSQL_PORT", self.database.mysql.port);
2117        if let Ok(user) = std::env::var("DATABASE_MYSQL_USERNAME") {
2118            self.database.mysql.username = user;
2119        }
2120        if let Ok(pass) = std::env::var("DATABASE_MYSQL_PASSWORD") {
2121            self.database.mysql.password = pass;
2122        }
2123        if let Ok(db) = std::env::var("DATABASE_MYSQL_DATABASE") {
2124            self.database.mysql.database = db;
2125        }
2126        if let Ok(table) = std::env::var("DATABASE_MYSQL_TABLE_NAME") {
2127            self.database.mysql.table_name = table;
2128        }
2129        override_db_pool_settings(&mut self.database.mysql, "DATABASE_MYSQL");
2130
2131        // --- Database: PostgreSQL ---
2132        if let Ok(host) = std::env::var("DATABASE_POSTGRES_HOST") {
2133            self.database.postgres.host = host;
2134        }
2135        self.database.postgres.port =
2136            parse_env::<u16>("DATABASE_POSTGRES_PORT", self.database.postgres.port);
2137        if let Ok(user) = std::env::var("DATABASE_POSTGRES_USERNAME") {
2138            self.database.postgres.username = user;
2139        }
2140        if let Ok(pass) = std::env::var("DATABASE_POSTGRES_PASSWORD") {
2141            self.database.postgres.password = pass;
2142        }
2143        if let Ok(db) = std::env::var("DATABASE_POSTGRES_DATABASE") {
2144            self.database.postgres.database = db;
2145        }
2146        override_db_pool_settings(&mut self.database.postgres, "DATABASE_POSTGRES");
2147
2148        // --- Database: DynamoDB ---
2149        if let Ok(region) = std::env::var("DATABASE_DYNAMODB_REGION") {
2150            self.database.dynamodb.region = region;
2151        }
2152        if let Ok(table) = std::env::var("DATABASE_DYNAMODB_TABLE_NAME") {
2153            self.database.dynamodb.table_name = table;
2154        }
2155        if let Ok(endpoint) = std::env::var("DATABASE_DYNAMODB_ENDPOINT_URL") {
2156            self.database.dynamodb.endpoint_url = Some(endpoint);
2157        }
2158        if let Ok(key_id) = std::env::var("AWS_ACCESS_KEY_ID") {
2159            self.database.dynamodb.aws_access_key_id = Some(key_id);
2160        }
2161        if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") {
2162            self.database.dynamodb.aws_secret_access_key = Some(secret);
2163        }
2164
2165        // --- Database: SurrealDB ---
2166        if let Ok(url) = std::env::var("DATABASE_SURREALDB_URL") {
2167            self.database.surrealdb.url = url;
2168        }
2169        if let Ok(namespace) = std::env::var("DATABASE_SURREALDB_NAMESPACE") {
2170            self.database.surrealdb.namespace = namespace;
2171        }
2172        if let Ok(database) = std::env::var("DATABASE_SURREALDB_DATABASE") {
2173            self.database.surrealdb.database = database;
2174        }
2175        if let Ok(username) = std::env::var("DATABASE_SURREALDB_USERNAME") {
2176            self.database.surrealdb.username = username;
2177        }
2178        if let Ok(password) = std::env::var("DATABASE_SURREALDB_PASSWORD") {
2179            self.database.surrealdb.password = password;
2180        }
2181        if let Ok(table) = std::env::var("DATABASE_SURREALDB_TABLE_NAME") {
2182            self.database.surrealdb.table_name = table;
2183        }
2184
2185        // --- Redis Cluster ---
2186        let apply_redis_cluster_nodes = |options: &mut Self, nodes: &str| {
2187            let node_list: Vec<String> = nodes
2188                .split(',')
2189                .map(|s| s.trim())
2190                .filter(|s| !s.is_empty())
2191                .map(ToString::to_string)
2192                .collect();
2193
2194            options.adapter.cluster.nodes = node_list.clone();
2195            options.queue.redis_cluster.nodes = node_list.clone();
2196
2197            let parsed_nodes: Vec<ClusterNode> = node_list
2198                .iter()
2199                .filter_map(|seed| ClusterNode::from_seed(seed))
2200                .collect();
2201            options.database.redis.cluster.nodes = parsed_nodes.clone();
2202            options.database.redis.cluster_nodes = parsed_nodes;
2203        };
2204
2205        if let Ok(nodes) = std::env::var("REDIS_CLUSTER_NODES") {
2206            apply_redis_cluster_nodes(self, &nodes);
2207        }
2208        if let Ok(nodes) = std::env::var("DATABASE_REDIS_CLUSTER_NODES") {
2209            apply_redis_cluster_nodes(self, &nodes);
2210        }
2211        self.queue.redis_cluster.concurrency = parse_env::<u32>(
2212            "REDIS_CLUSTER_QUEUE_CONCURRENCY",
2213            self.queue.redis_cluster.concurrency,
2214        );
2215        if let Ok(prefix) = std::env::var("REDIS_CLUSTER_QUEUE_PREFIX") {
2216            self.queue.redis_cluster.prefix = Some(prefix);
2217        }
2218
2219        // --- SSL Configuration ---
2220        self.ssl.enabled = parse_bool_env("SSL_ENABLED", self.ssl.enabled);
2221        if let Ok(val) = std::env::var("SSL_CERT_PATH") {
2222            self.ssl.cert_path = val;
2223        }
2224        if let Ok(val) = std::env::var("SSL_KEY_PATH") {
2225            self.ssl.key_path = val;
2226        }
2227        self.ssl.redirect_http = parse_bool_env("SSL_REDIRECT_HTTP", self.ssl.redirect_http);
2228        if let Some(port) = parse_env_optional::<u16>("SSL_HTTP_PORT") {
2229            self.ssl.http_port = Some(port);
2230        }
2231
2232        // --- Unix Socket Configuration ---
2233        self.unix_socket.enabled = parse_bool_env("UNIX_SOCKET_ENABLED", self.unix_socket.enabled);
2234        if let Ok(path) = std::env::var("UNIX_SOCKET_PATH") {
2235            self.unix_socket.path = path;
2236        }
2237        if let Ok(mode_str) = std::env::var("UNIX_SOCKET_PERMISSION_MODE") {
2238            if mode_str.chars().all(|c| c.is_digit(8)) {
2239                if let Ok(mode) = u32::from_str_radix(&mode_str, 8) {
2240                    if mode <= 0o777 {
2241                        self.unix_socket.permission_mode = mode;
2242                    } else {
2243                        warn!(
2244                            "UNIX_SOCKET_PERMISSION_MODE '{}' exceeds maximum value 777. Using default: {:o}",
2245                            mode_str, self.unix_socket.permission_mode
2246                        );
2247                    }
2248                } else {
2249                    warn!(
2250                        "Failed to parse UNIX_SOCKET_PERMISSION_MODE '{}' as octal. Using default: {:o}",
2251                        mode_str, self.unix_socket.permission_mode
2252                    );
2253                }
2254            } else {
2255                warn!(
2256                    "UNIX_SOCKET_PERMISSION_MODE '{}' must contain only octal digits (0-7). Using default: {:o}",
2257                    mode_str, self.unix_socket.permission_mode
2258                );
2259            }
2260        }
2261
2262        // --- Metrics ---
2263        if let Ok(driver_str) = std::env::var("METRICS_DRIVER") {
2264            self.metrics.driver =
2265                parse_driver_enum(driver_str, self.metrics.driver.clone(), "Metrics");
2266        }
2267        self.metrics.enabled = parse_bool_env("METRICS_ENABLED", self.metrics.enabled);
2268        if let Ok(val) = std::env::var("METRICS_HOST") {
2269            self.metrics.host = val;
2270        }
2271        self.metrics.port = parse_env::<u16>("METRICS_PORT", self.metrics.port);
2272        if let Ok(val) = std::env::var("METRICS_PROMETHEUS_PREFIX") {
2273            self.metrics.prometheus.prefix = val;
2274        }
2275
2276        // --- HTTP API ---
2277        self.http_api.usage_enabled =
2278            parse_bool_env("HTTP_API_USAGE_ENABLED", self.http_api.usage_enabled);
2279
2280        // --- Rate Limiter ---
2281        self.rate_limiter.enabled =
2282            parse_bool_env("RATE_LIMITER_ENABLED", self.rate_limiter.enabled);
2283        self.rate_limiter.api_rate_limit.max_requests = parse_env::<u32>(
2284            "RATE_LIMITER_API_MAX_REQUESTS",
2285            self.rate_limiter.api_rate_limit.max_requests,
2286        );
2287        self.rate_limiter.api_rate_limit.window_seconds = parse_env::<u64>(
2288            "RATE_LIMITER_API_WINDOW_SECONDS",
2289            self.rate_limiter.api_rate_limit.window_seconds,
2290        );
2291        if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_API_TRUST_HOPS") {
2292            self.rate_limiter.api_rate_limit.trust_hops = Some(hops);
2293        }
2294        self.rate_limiter.websocket_rate_limit.max_requests = parse_env::<u32>(
2295            "RATE_LIMITER_WS_MAX_REQUESTS",
2296            self.rate_limiter.websocket_rate_limit.max_requests,
2297        );
2298        self.rate_limiter.websocket_rate_limit.window_seconds = parse_env::<u64>(
2299            "RATE_LIMITER_WS_WINDOW_SECONDS",
2300            self.rate_limiter.websocket_rate_limit.window_seconds,
2301        );
2302        if let Ok(prefix) = std::env::var("RATE_LIMITER_REDIS_PREFIX") {
2303            self.rate_limiter.redis.prefix = Some(prefix);
2304        }
2305
2306        // --- Queue: Redis ---
2307        self.queue.redis.concurrency =
2308            parse_env::<u32>("QUEUE_REDIS_CONCURRENCY", self.queue.redis.concurrency);
2309        if let Ok(prefix) = std::env::var("QUEUE_REDIS_PREFIX") {
2310            self.queue.redis.prefix = Some(prefix);
2311        }
2312
2313        // --- Queue: SQS ---
2314        if let Ok(region) = std::env::var("QUEUE_SQS_REGION") {
2315            self.queue.sqs.region = region;
2316        }
2317        self.queue.sqs.visibility_timeout = parse_env::<i32>(
2318            "QUEUE_SQS_VISIBILITY_TIMEOUT",
2319            self.queue.sqs.visibility_timeout,
2320        );
2321        self.queue.sqs.max_messages =
2322            parse_env::<i32>("QUEUE_SQS_MAX_MESSAGES", self.queue.sqs.max_messages);
2323        self.queue.sqs.wait_time_seconds = parse_env::<i32>(
2324            "QUEUE_SQS_WAIT_TIME_SECONDS",
2325            self.queue.sqs.wait_time_seconds,
2326        );
2327        self.queue.sqs.concurrency =
2328            parse_env::<u32>("QUEUE_SQS_CONCURRENCY", self.queue.sqs.concurrency);
2329        self.queue.sqs.fifo = parse_bool_env("QUEUE_SQS_FIFO", self.queue.sqs.fifo);
2330        if let Ok(endpoint) = std::env::var("QUEUE_SQS_ENDPOINT_URL") {
2331            self.queue.sqs.endpoint_url = Some(endpoint);
2332        }
2333
2334        // --- Queue: SNS ---
2335        if let Ok(region) = std::env::var("QUEUE_SNS_REGION") {
2336            self.queue.sns.region = region;
2337        }
2338        if let Ok(topic_arn) = std::env::var("QUEUE_SNS_TOPIC_ARN") {
2339            self.queue.sns.topic_arn = topic_arn;
2340        }
2341        if let Ok(endpoint) = std::env::var("QUEUE_SNS_ENDPOINT_URL") {
2342            self.queue.sns.endpoint_url = Some(endpoint);
2343        }
2344
2345        // --- Webhooks ---
2346        self.webhooks.batching.enabled =
2347            parse_bool_env("WEBHOOK_BATCHING_ENABLED", self.webhooks.batching.enabled);
2348        self.webhooks.batching.duration =
2349            parse_env::<u64>("WEBHOOK_BATCHING_DURATION", self.webhooks.batching.duration);
2350        self.webhooks.batching.size =
2351            parse_env::<usize>("WEBHOOK_BATCHING_SIZE", self.webhooks.batching.size);
2352
2353        // --- NATS Adapter ---
2354        if let Ok(servers) = std::env::var("NATS_SERVERS") {
2355            self.adapter.nats.servers = servers.split(',').map(|s| s.trim().to_string()).collect();
2356        }
2357        if let Ok(user) = std::env::var("NATS_USERNAME") {
2358            self.adapter.nats.username = Some(user);
2359        }
2360        if let Ok(pass) = std::env::var("NATS_PASSWORD") {
2361            self.adapter.nats.password = Some(pass);
2362        }
2363        if let Ok(token) = std::env::var("NATS_TOKEN") {
2364            self.adapter.nats.token = Some(token);
2365        }
2366        if let Ok(prefix) = std::env::var("NATS_PREFIX") {
2367            self.adapter.nats.prefix = prefix;
2368        }
2369        self.adapter.nats.connection_timeout_ms = parse_env::<u64>(
2370            "NATS_CONNECTION_TIMEOUT_MS",
2371            self.adapter.nats.connection_timeout_ms,
2372        );
2373        self.adapter.nats.request_timeout_ms = parse_env::<u64>(
2374            "NATS_REQUEST_TIMEOUT_MS",
2375            self.adapter.nats.request_timeout_ms,
2376        );
2377        self.adapter.nats.discovery_max_wait_ms = parse_env::<u64>(
2378            "NATS_DISCOVERY_MAX_WAIT_MS",
2379            self.adapter.nats.discovery_max_wait_ms,
2380        );
2381        self.adapter.nats.discovery_idle_wait_ms = parse_env::<u64>(
2382            "NATS_DISCOVERY_IDLE_WAIT_MS",
2383            self.adapter.nats.discovery_idle_wait_ms,
2384        );
2385        if let Some(nodes) = parse_env_optional::<u32>("NATS_NODES_NUMBER") {
2386            self.adapter.nats.nodes_number = Some(nodes);
2387        }
2388
2389        // --- Pulsar Adapter ---
2390        if let Ok(url) = std::env::var("PULSAR_URL") {
2391            self.adapter.pulsar.url = url;
2392        }
2393        if let Ok(prefix) = std::env::var("PULSAR_PREFIX") {
2394            self.adapter.pulsar.prefix = prefix;
2395        }
2396        if let Ok(token) = std::env::var("PULSAR_TOKEN") {
2397            self.adapter.pulsar.token = Some(token);
2398        }
2399        self.adapter.pulsar.request_timeout_ms = parse_env::<u64>(
2400            "PULSAR_REQUEST_TIMEOUT_MS",
2401            self.adapter.pulsar.request_timeout_ms,
2402        );
2403        if let Some(nodes) = parse_env_optional::<u32>("PULSAR_NODES_NUMBER") {
2404            self.adapter.pulsar.nodes_number = Some(nodes);
2405        }
2406
2407        // --- RabbitMQ Adapter ---
2408        if let Ok(url) = std::env::var("RABBITMQ_URL") {
2409            self.adapter.rabbitmq.url = url;
2410        }
2411        if let Ok(prefix) = std::env::var("RABBITMQ_PREFIX") {
2412            self.adapter.rabbitmq.prefix = prefix;
2413        }
2414        self.adapter.rabbitmq.connection_timeout_ms = parse_env::<u64>(
2415            "RABBITMQ_CONNECTION_TIMEOUT_MS",
2416            self.adapter.rabbitmq.connection_timeout_ms,
2417        );
2418        self.adapter.rabbitmq.request_timeout_ms = parse_env::<u64>(
2419            "RABBITMQ_REQUEST_TIMEOUT_MS",
2420            self.adapter.rabbitmq.request_timeout_ms,
2421        );
2422        if let Some(nodes) = parse_env_optional::<u32>("RABBITMQ_NODES_NUMBER") {
2423            self.adapter.rabbitmq.nodes_number = Some(nodes);
2424        }
2425
2426        // --- Google Pub/Sub Adapter ---
2427        if let Ok(project_id) = std::env::var("GOOGLE_PUBSUB_PROJECT_ID") {
2428            self.adapter.google_pubsub.project_id = project_id;
2429        }
2430        if let Ok(prefix) = std::env::var("GOOGLE_PUBSUB_PREFIX") {
2431            self.adapter.google_pubsub.prefix = prefix;
2432        }
2433        if let Ok(emulator_host) = std::env::var("PUBSUB_EMULATOR_HOST") {
2434            self.adapter.google_pubsub.emulator_host = Some(emulator_host);
2435        }
2436        self.adapter.google_pubsub.request_timeout_ms = parse_env::<u64>(
2437            "GOOGLE_PUBSUB_REQUEST_TIMEOUT_MS",
2438            self.adapter.google_pubsub.request_timeout_ms,
2439        );
2440        if let Some(nodes) = parse_env_optional::<u32>("GOOGLE_PUBSUB_NODES_NUMBER") {
2441            self.adapter.google_pubsub.nodes_number = Some(nodes);
2442        }
2443
2444        // --- Kafka Adapter ---
2445        if let Ok(brokers) = std::env::var("KAFKA_BROKERS") {
2446            self.adapter.kafka.brokers = brokers.split(',').map(|s| s.trim().to_string()).collect();
2447        }
2448        if let Ok(prefix) = std::env::var("KAFKA_PREFIX") {
2449            self.adapter.kafka.prefix = prefix;
2450        }
2451        if let Ok(protocol) = std::env::var("KAFKA_SECURITY_PROTOCOL") {
2452            self.adapter.kafka.security_protocol = Some(protocol);
2453        }
2454        if let Ok(mechanism) = std::env::var("KAFKA_SASL_MECHANISM") {
2455            self.adapter.kafka.sasl_mechanism = Some(mechanism);
2456        }
2457        if let Ok(username) = std::env::var("KAFKA_SASL_USERNAME") {
2458            self.adapter.kafka.sasl_username = Some(username);
2459        }
2460        if let Ok(password) = std::env::var("KAFKA_SASL_PASSWORD") {
2461            self.adapter.kafka.sasl_password = Some(password);
2462        }
2463        self.adapter.kafka.request_timeout_ms = parse_env::<u64>(
2464            "KAFKA_REQUEST_TIMEOUT_MS",
2465            self.adapter.kafka.request_timeout_ms,
2466        );
2467        if let Some(nodes) = parse_env_optional::<u32>("KAFKA_NODES_NUMBER") {
2468            self.adapter.kafka.nodes_number = Some(nodes);
2469        }
2470
2471        // --- CORS ---
2472        if let Ok(origins) = std::env::var("CORS_ORIGINS") {
2473            let parsed: Vec<String> = origins.split(',').map(|s| s.trim().to_string()).collect();
2474            if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&parsed) {
2475                warn!(
2476                    "CORS_ORIGINS contains invalid patterns: {}. Keeping previous CORS origins.",
2477                    e
2478                );
2479            } else {
2480                self.cors.origin = parsed;
2481            }
2482        }
2483        if let Ok(methods) = std::env::var("CORS_METHODS") {
2484            self.cors.methods = methods.split(',').map(|s| s.trim().to_string()).collect();
2485        }
2486        if let Ok(headers) = std::env::var("CORS_HEADERS") {
2487            self.cors.allowed_headers = headers.split(',').map(|s| s.trim().to_string()).collect();
2488        }
2489        self.cors.credentials = parse_bool_env("CORS_CREDENTIALS", self.cors.credentials);
2490
2491        // --- Performance Tuning ---
2492        self.database_pooling.enabled =
2493            parse_bool_env("DATABASE_POOLING_ENABLED", self.database_pooling.enabled);
2494        if let Some(min) = parse_env_optional::<u32>("DATABASE_POOL_MIN") {
2495            self.database_pooling.min = min;
2496        }
2497        if let Some(max) = parse_env_optional::<u32>("DATABASE_POOL_MAX") {
2498            self.database_pooling.max = max;
2499        }
2500
2501        if let Some(pool_size) = parse_env_optional::<u32>("DATABASE_CONNECTION_POOL_SIZE") {
2502            self.database.mysql.connection_pool_size = pool_size;
2503            self.database.postgres.connection_pool_size = pool_size;
2504        }
2505        if let Some(cache_ttl) = parse_env_optional::<u64>("CACHE_TTL_SECONDS") {
2506            self.app_manager.cache.ttl = cache_ttl;
2507            self.channel_limits.cache_ttl = cache_ttl;
2508            self.database.mysql.cache_ttl = cache_ttl;
2509            self.database.postgres.cache_ttl = cache_ttl;
2510            self.database.surrealdb.cache_ttl = cache_ttl;
2511            self.cache.memory.ttl = cache_ttl;
2512        }
2513        if let Some(cleanup_interval) = parse_env_optional::<u64>("CACHE_CLEANUP_INTERVAL") {
2514            self.database.mysql.cache_cleanup_interval = cleanup_interval;
2515            self.database.postgres.cache_cleanup_interval = cleanup_interval;
2516            self.cache.memory.cleanup_interval = cleanup_interval;
2517        }
2518        if let Some(max_capacity) = parse_env_optional::<u64>("CACHE_MAX_CAPACITY") {
2519            self.database.mysql.cache_max_capacity = max_capacity;
2520            self.database.postgres.cache_max_capacity = max_capacity;
2521            self.database.surrealdb.cache_max_capacity = max_capacity;
2522            self.cache.memory.max_capacity = max_capacity;
2523        }
2524
2525        let default_app_id = std::env::var("SOCKUDO_DEFAULT_APP_ID");
2526        let default_app_key = std::env::var("SOCKUDO_DEFAULT_APP_KEY");
2527        let default_app_secret = std::env::var("SOCKUDO_DEFAULT_APP_SECRET");
2528        let default_app_enabled = parse_bool_env("SOCKUDO_DEFAULT_APP_ENABLED", true);
2529
2530        if let (Ok(app_id), Ok(app_key), Ok(app_secret)) =
2531            (default_app_id, default_app_key, default_app_secret)
2532            && default_app_enabled
2533        {
2534            let default_app = App::from_policy(
2535                app_id,
2536                app_key,
2537                app_secret,
2538                default_app_enabled,
2539                crate::app::AppPolicy {
2540                    limits: crate::app::AppLimitsPolicy {
2541                        max_connections: parse_env::<u32>(
2542                            "SOCKUDO_DEFAULT_APP_MAX_CONNECTIONS",
2543                            100,
2544                        ),
2545                        max_backend_events_per_second: Some(parse_env::<u32>(
2546                            "SOCKUDO_DEFAULT_APP_MAX_BACKEND_EVENTS_PER_SECOND",
2547                            100,
2548                        )),
2549                        max_client_events_per_second: parse_env::<u32>(
2550                            "SOCKUDO_DEFAULT_APP_MAX_CLIENT_EVENTS_PER_SECOND",
2551                            100,
2552                        ),
2553                        max_read_requests_per_second: Some(parse_env::<u32>(
2554                            "SOCKUDO_DEFAULT_APP_MAX_READ_REQUESTS_PER_SECOND",
2555                            100,
2556                        )),
2557                        max_presence_members_per_channel: Some(parse_env::<u32>(
2558                            "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBERS_PER_CHANNEL",
2559                            100,
2560                        )),
2561                        max_presence_member_size_in_kb: Some(parse_env::<u32>(
2562                            "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBER_SIZE_IN_KB",
2563                            100,
2564                        )),
2565                        max_channel_name_length: Some(parse_env::<u32>(
2566                            "SOCKUDO_DEFAULT_APP_MAX_CHANNEL_NAME_LENGTH",
2567                            100,
2568                        )),
2569                        max_event_channels_at_once: Some(parse_env::<u32>(
2570                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_CHANNELS_AT_ONCE",
2571                            100,
2572                        )),
2573                        max_event_name_length: Some(parse_env::<u32>(
2574                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_NAME_LENGTH",
2575                            100,
2576                        )),
2577                        max_event_payload_in_kb: Some(parse_env::<u32>(
2578                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_PAYLOAD_IN_KB",
2579                            100,
2580                        )),
2581                        max_event_batch_size: Some(parse_env::<u32>(
2582                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_BATCH_SIZE",
2583                            100,
2584                        )),
2585                        decay_seconds: None,
2586                        terminate_on_limit: false,
2587                        message_rate_limit: None,
2588                    },
2589                    features: crate::app::AppFeaturesPolicy {
2590                        enable_client_messages: std::env::var(
2591                            "SOCKUDO_DEFAULT_APP_ENABLE_CLIENT_MESSAGES",
2592                        )
2593                        .ok()
2594                        .map(|value| {
2595                            matches!(
2596                                value.trim().to_ascii_lowercase().as_str(),
2597                                "true" | "1" | "yes" | "on"
2598                            )
2599                        })
2600                        .unwrap_or_else(|| parse_bool_env("SOCKUDO_ENABLE_CLIENT_MESSAGES", false)),
2601                        enable_user_authentication: Some(parse_bool_env(
2602                            "SOCKUDO_DEFAULT_APP_ENABLE_USER_AUTHENTICATION",
2603                            false,
2604                        )),
2605                        enable_watchlist_events: Some(parse_bool_env(
2606                            "SOCKUDO_DEFAULT_APP_ENABLE_WATCHLIST_EVENTS",
2607                            false,
2608                        )),
2609                    },
2610                    channels: crate::app::AppChannelsPolicy {
2611                        allowed_origins: {
2612                            if let Ok(origins_str) =
2613                                std::env::var("SOCKUDO_DEFAULT_APP_ALLOWED_ORIGINS")
2614                            {
2615                                if !origins_str.is_empty() {
2616                                    Some(
2617                                        origins_str
2618                                            .split(',')
2619                                            .map(|s| s.trim().to_string())
2620                                            .collect(),
2621                                    )
2622                                } else {
2623                                    None
2624                                }
2625                            } else {
2626                                None
2627                            }
2628                        },
2629                        channel_delta_compression: None,
2630                        channel_namespaces: None,
2631                    },
2632                    webhooks: None,
2633                    idempotency: None,
2634                    connection_recovery: None,
2635                },
2636            );
2637
2638            self.app_manager.array.apps.push(default_app);
2639            info!("Successfully registered default app from env");
2640        }
2641
2642        // Special handling for REDIS_URL
2643        if let Ok(redis_url_env) = std::env::var("REDIS_URL") {
2644            info!("Applying REDIS_URL environment variable override");
2645
2646            let redis_url_json = sonic_rs::json!(redis_url_env);
2647
2648            self.adapter
2649                .redis
2650                .redis_pub_options
2651                .insert("url".to_string(), redis_url_json.clone());
2652            self.adapter
2653                .redis
2654                .redis_sub_options
2655                .insert("url".to_string(), redis_url_json);
2656
2657            self.cache.redis.url_override = Some(redis_url_env.clone());
2658            self.queue.redis.url_override = Some(redis_url_env.clone());
2659            self.rate_limiter.redis.url_override = Some(redis_url_env);
2660        }
2661
2662        // --- Logging Configuration ---
2663        let has_colors_env = std::env::var("LOG_COLORS_ENABLED").is_ok();
2664        let has_target_env = std::env::var("LOG_INCLUDE_TARGET").is_ok();
2665        if has_colors_env || has_target_env {
2666            let logging_config = self.logging.get_or_insert_with(Default::default);
2667            if has_colors_env {
2668                logging_config.colors_enabled =
2669                    parse_bool_env("LOG_COLORS_ENABLED", logging_config.colors_enabled);
2670            }
2671            if has_target_env {
2672                logging_config.include_target =
2673                    parse_bool_env("LOG_INCLUDE_TARGET", logging_config.include_target);
2674            }
2675        }
2676
2677        // --- Cleanup Configuration ---
2678        self.cleanup.async_enabled =
2679            parse_bool_env("CLEANUP_ASYNC_ENABLED", self.cleanup.async_enabled);
2680        self.cleanup.fallback_to_sync =
2681            parse_bool_env("CLEANUP_FALLBACK_TO_SYNC", self.cleanup.fallback_to_sync);
2682        self.cleanup.queue_buffer_size =
2683            parse_env::<usize>("CLEANUP_QUEUE_BUFFER_SIZE", self.cleanup.queue_buffer_size);
2684        self.cleanup.batch_size = parse_env::<usize>("CLEANUP_BATCH_SIZE", self.cleanup.batch_size);
2685        self.cleanup.batch_timeout_ms =
2686            parse_env::<u64>("CLEANUP_BATCH_TIMEOUT_MS", self.cleanup.batch_timeout_ms);
2687        if let Ok(worker_threads_str) = std::env::var("CLEANUP_WORKER_THREADS") {
2688            self.cleanup.worker_threads = if worker_threads_str.to_lowercase() == "auto" {
2689                WorkerThreadsConfig::Auto
2690            } else if let Ok(n) = worker_threads_str.parse::<usize>() {
2691                WorkerThreadsConfig::Fixed(n)
2692            } else {
2693                warn!(
2694                    "Invalid CLEANUP_WORKER_THREADS value '{}', keeping current setting",
2695                    worker_threads_str
2696                );
2697                self.cleanup.worker_threads.clone()
2698            };
2699        }
2700        self.cleanup.max_retry_attempts = parse_env::<u32>(
2701            "CLEANUP_MAX_RETRY_ATTEMPTS",
2702            self.cleanup.max_retry_attempts,
2703        );
2704
2705        // Cluster health configuration
2706        self.cluster_health.enabled =
2707            parse_bool_env("CLUSTER_HEALTH_ENABLED", self.cluster_health.enabled);
2708        self.cluster_health.heartbeat_interval_ms = parse_env::<u64>(
2709            "CLUSTER_HEALTH_HEARTBEAT_INTERVAL",
2710            self.cluster_health.heartbeat_interval_ms,
2711        );
2712        self.cluster_health.node_timeout_ms = parse_env::<u64>(
2713            "CLUSTER_HEALTH_NODE_TIMEOUT",
2714            self.cluster_health.node_timeout_ms,
2715        );
2716        self.cluster_health.cleanup_interval_ms = parse_env::<u64>(
2717            "CLUSTER_HEALTH_CLEANUP_INTERVAL",
2718            self.cluster_health.cleanup_interval_ms,
2719        );
2720
2721        // Tag filtering configuration
2722        self.tag_filtering.enabled =
2723            parse_bool_env("TAG_FILTERING_ENABLED", self.tag_filtering.enabled);
2724
2725        // WebSocket buffer configuration
2726        if let Ok(val) = std::env::var("WEBSOCKET_MAX_MESSAGES") {
2727            if val.to_lowercase() == "none" || val == "0" {
2728                self.websocket.max_messages = None;
2729            } else if let Ok(n) = val.parse::<usize>() {
2730                self.websocket.max_messages = Some(n);
2731            }
2732        }
2733        if let Ok(val) = std::env::var("WEBSOCKET_MAX_BYTES") {
2734            if val.to_lowercase() == "none" || val == "0" {
2735                self.websocket.max_bytes = None;
2736            } else if let Ok(n) = val.parse::<usize>() {
2737                self.websocket.max_bytes = Some(n);
2738            }
2739        }
2740        self.websocket.disconnect_on_buffer_full = parse_bool_env(
2741            "WEBSOCKET_DISCONNECT_ON_BUFFER_FULL",
2742            self.websocket.disconnect_on_buffer_full,
2743        );
2744        self.websocket.max_message_size = parse_env::<usize>(
2745            "WEBSOCKET_MAX_MESSAGE_SIZE",
2746            self.websocket.max_message_size,
2747        );
2748        self.websocket.max_frame_size =
2749            parse_env::<usize>("WEBSOCKET_MAX_FRAME_SIZE", self.websocket.max_frame_size);
2750        self.websocket.write_buffer_size = parse_env::<usize>(
2751            "WEBSOCKET_WRITE_BUFFER_SIZE",
2752            self.websocket.write_buffer_size,
2753        );
2754        self.websocket.max_backpressure = parse_env::<usize>(
2755            "WEBSOCKET_MAX_BACKPRESSURE",
2756            self.websocket.max_backpressure,
2757        );
2758        self.websocket.auto_ping = parse_bool_env("WEBSOCKET_AUTO_PING", self.websocket.auto_ping);
2759        self.websocket.ping_interval =
2760            parse_env::<u32>("WEBSOCKET_PING_INTERVAL", self.websocket.ping_interval);
2761        self.websocket.idle_timeout =
2762            parse_env::<u32>("WEBSOCKET_IDLE_TIMEOUT", self.websocket.idle_timeout);
2763        if let Ok(mode) = std::env::var("WEBSOCKET_COMPRESSION") {
2764            self.websocket.compression = mode;
2765        }
2766
2767        // Connection recovery (includes serial + message_id + replay buffer)
2768        self.connection_recovery.enabled = parse_bool_env(
2769            "CONNECTION_RECOVERY_ENABLED",
2770            self.connection_recovery.enabled,
2771        );
2772        self.connection_recovery.buffer_ttl_seconds = parse_env::<u64>(
2773            "CONNECTION_RECOVERY_BUFFER_TTL",
2774            self.connection_recovery.buffer_ttl_seconds,
2775        );
2776        self.connection_recovery.max_buffer_size = parse_env::<usize>(
2777            "CONNECTION_RECOVERY_MAX_BUFFER_SIZE",
2778            self.connection_recovery.max_buffer_size,
2779        );
2780
2781        self.idempotency.enabled = parse_bool_env("IDEMPOTENCY_ENABLED", self.idempotency.enabled);
2782        self.idempotency.ttl_seconds =
2783            parse_env::<u64>("IDEMPOTENCY_TTL_SECONDS", self.idempotency.ttl_seconds);
2784        self.idempotency.max_key_length = parse_env::<usize>(
2785            "IDEMPOTENCY_MAX_KEY_LENGTH",
2786            self.idempotency.max_key_length,
2787        );
2788
2789        self.ephemeral.enabled = parse_bool_env("EPHEMERAL_ENABLED", self.ephemeral.enabled);
2790
2791        self.echo_control.enabled =
2792            parse_bool_env("ECHO_CONTROL_ENABLED", self.echo_control.enabled);
2793        self.echo_control.default_echo_messages = parse_bool_env(
2794            "ECHO_CONTROL_DEFAULT_ECHO_MESSAGES",
2795            self.echo_control.default_echo_messages,
2796        );
2797
2798        self.event_name_filtering.enabled = parse_bool_env(
2799            "EVENT_NAME_FILTERING_ENABLED",
2800            self.event_name_filtering.enabled,
2801        );
2802        self.event_name_filtering.max_events_per_filter = parse_env::<usize>(
2803            "EVENT_NAME_FILTERING_MAX_EVENTS_PER_FILTER",
2804            self.event_name_filtering.max_events_per_filter,
2805        );
2806        self.event_name_filtering.max_event_name_length = parse_env::<usize>(
2807            "EVENT_NAME_FILTERING_MAX_EVENT_NAME_LENGTH",
2808            self.event_name_filtering.max_event_name_length,
2809        );
2810
2811        Ok(())
2812    }
2813
2814    pub fn validate(&self) -> Result<(), String> {
2815        if self.unix_socket.enabled {
2816            if self.unix_socket.path.is_empty() {
2817                return Err(
2818                    "Unix socket path cannot be empty when Unix socket is enabled".to_string(),
2819                );
2820            }
2821
2822            self.validate_unix_socket_security()?;
2823
2824            if self.ssl.enabled {
2825                tracing::warn!(
2826                    "Both Unix socket and SSL are enabled. This is unusual as Unix sockets are typically used behind reverse proxies that handle SSL termination."
2827                );
2828            }
2829
2830            if self.unix_socket.permission_mode > 0o777 {
2831                return Err(format!(
2832                    "Unix socket permission_mode ({:o}) is invalid. Must be a valid octal mode (0o000 to 0o777)",
2833                    self.unix_socket.permission_mode
2834                ));
2835            }
2836        }
2837
2838        if let Err(e) = self.cleanup.validate() {
2839            return Err(format!("Invalid cleanup configuration: {}", e));
2840        }
2841
2842        Ok(())
2843    }
2844
2845    fn validate_unix_socket_security(&self) -> Result<(), String> {
2846        let path = &self.unix_socket.path;
2847
2848        if path.contains("../") || path.contains("..\\") {
2849            return Err(
2850                "Unix socket path contains directory traversal sequences (../). This is not allowed for security reasons.".to_string()
2851            );
2852        }
2853
2854        if self.unix_socket.permission_mode & 0o002 != 0 {
2855            warn!(
2856                "Unix socket permission mode ({:o}) allows world write access. This may be a security risk. Consider using more restrictive permissions like 0o660 or 0o750.",
2857                self.unix_socket.permission_mode
2858            );
2859        }
2860
2861        if self.unix_socket.permission_mode & 0o007 > 0o005 {
2862            warn!(
2863                "Unix socket permission mode ({:o}) grants write permissions to others. Consider using more restrictive permissions.",
2864                self.unix_socket.permission_mode
2865            );
2866        }
2867
2868        if self.mode == "production" && path.starts_with("/tmp/") {
2869            warn!(
2870                "Unix socket path '{}' is in /tmp directory. In production, consider using a more permanent location like /var/run/sockudo/ for better security and persistence.",
2871                path
2872            );
2873        }
2874
2875        if !path.starts_with('/') {
2876            return Err(
2877                "Unix socket path must be absolute (start with /) for security and reliability."
2878                    .to_string(),
2879            );
2880        }
2881
2882        Ok(())
2883    }
2884}
2885
2886#[cfg(test)]
2887mod redis_connection_tests {
2888    use super::{ClusterNode, RedisClusterConnection, RedisConnection, RedisSentinel};
2889
2890    #[test]
2891    fn test_standard_url_basic() {
2892        let conn = RedisConnection {
2893            host: "127.0.0.1".to_string(),
2894            port: 6379,
2895            db: 0,
2896            username: None,
2897            password: None,
2898            key_prefix: "sockudo:".to_string(),
2899            sentinels: Vec::new(),
2900            sentinel_password: None,
2901            name: "mymaster".to_string(),
2902            cluster: RedisClusterConnection::default(),
2903            cluster_nodes: Vec::new(),
2904        };
2905        assert_eq!(conn.to_url(), "redis://127.0.0.1:6379/0");
2906    }
2907
2908    #[test]
2909    fn test_standard_url_with_password() {
2910        let conn = RedisConnection {
2911            host: "127.0.0.1".to_string(),
2912            port: 6379,
2913            db: 2,
2914            username: None,
2915            password: Some("secret".to_string()),
2916            key_prefix: "sockudo:".to_string(),
2917            sentinels: Vec::new(),
2918            sentinel_password: None,
2919            name: "mymaster".to_string(),
2920            cluster: RedisClusterConnection::default(),
2921            cluster_nodes: Vec::new(),
2922        };
2923        assert_eq!(conn.to_url(), "redis://:secret@127.0.0.1:6379/2");
2924    }
2925
2926    #[test]
2927    fn test_standard_url_with_username_and_password() {
2928        let conn = RedisConnection {
2929            host: "redis.example.com".to_string(),
2930            port: 6380,
2931            db: 1,
2932            username: Some("admin".to_string()),
2933            password: Some("pass123".to_string()),
2934            key_prefix: "sockudo:".to_string(),
2935            sentinels: Vec::new(),
2936            sentinel_password: None,
2937            name: "mymaster".to_string(),
2938            cluster: RedisClusterConnection::default(),
2939            cluster_nodes: Vec::new(),
2940        };
2941        assert_eq!(
2942            conn.to_url(),
2943            "redis://admin:pass123@redis.example.com:6380/1"
2944        );
2945    }
2946
2947    #[test]
2948    fn test_standard_url_with_special_chars_in_password() {
2949        let conn = RedisConnection {
2950            host: "127.0.0.1".to_string(),
2951            port: 6379,
2952            db: 0,
2953            username: None,
2954            password: Some("pass@word#123".to_string()),
2955            key_prefix: "sockudo:".to_string(),
2956            sentinels: Vec::new(),
2957            sentinel_password: None,
2958            name: "mymaster".to_string(),
2959            cluster: RedisClusterConnection::default(),
2960            cluster_nodes: Vec::new(),
2961        };
2962        assert_eq!(conn.to_url(), "redis://:pass%40word%23123@127.0.0.1:6379/0");
2963    }
2964
2965    #[test]
2966    fn test_is_sentinel_configured_false() {
2967        let conn = RedisConnection::default();
2968        assert!(!conn.is_sentinel_configured());
2969    }
2970
2971    #[test]
2972    fn test_is_sentinel_configured_true() {
2973        let conn = RedisConnection {
2974            sentinels: vec![RedisSentinel {
2975                host: "sentinel1".to_string(),
2976                port: 26379,
2977            }],
2978            ..Default::default()
2979        };
2980        assert!(conn.is_sentinel_configured());
2981    }
2982
2983    #[test]
2984    fn test_sentinel_url_basic() {
2985        let conn = RedisConnection {
2986            host: "127.0.0.1".to_string(),
2987            port: 6379,
2988            db: 0,
2989            username: None,
2990            password: None,
2991            key_prefix: "sockudo:".to_string(),
2992            sentinels: vec![
2993                RedisSentinel {
2994                    host: "sentinel1".to_string(),
2995                    port: 26379,
2996                },
2997                RedisSentinel {
2998                    host: "sentinel2".to_string(),
2999                    port: 26379,
3000                },
3001            ],
3002            sentinel_password: None,
3003            name: "mymaster".to_string(),
3004            cluster: RedisClusterConnection::default(),
3005            cluster_nodes: Vec::new(),
3006        };
3007        assert_eq!(
3008            conn.to_url(),
3009            "redis+sentinel://sentinel1:26379,sentinel2:26379/mymaster/0"
3010        );
3011    }
3012
3013    #[test]
3014    fn test_sentinel_url_with_sentinel_password() {
3015        let conn = RedisConnection {
3016            host: "127.0.0.1".to_string(),
3017            port: 6379,
3018            db: 0,
3019            username: None,
3020            password: None,
3021            key_prefix: "sockudo:".to_string(),
3022            sentinels: vec![RedisSentinel {
3023                host: "sentinel1".to_string(),
3024                port: 26379,
3025            }],
3026            sentinel_password: Some("sentinelpass".to_string()),
3027            name: "mymaster".to_string(),
3028            cluster: RedisClusterConnection::default(),
3029            cluster_nodes: Vec::new(),
3030        };
3031        assert_eq!(
3032            conn.to_url(),
3033            "redis+sentinel://:sentinelpass@sentinel1:26379/mymaster/0"
3034        );
3035    }
3036
3037    #[test]
3038    fn test_sentinel_url_with_master_password() {
3039        let conn = RedisConnection {
3040            host: "127.0.0.1".to_string(),
3041            port: 6379,
3042            db: 1,
3043            username: None,
3044            password: Some("masterpass".to_string()),
3045            key_prefix: "sockudo:".to_string(),
3046            sentinels: vec![RedisSentinel {
3047                host: "sentinel1".to_string(),
3048                port: 26379,
3049            }],
3050            sentinel_password: None,
3051            name: "mymaster".to_string(),
3052            cluster: RedisClusterConnection::default(),
3053            cluster_nodes: Vec::new(),
3054        };
3055        assert_eq!(
3056            conn.to_url(),
3057            "redis+sentinel://sentinel1:26379/mymaster/1?password=masterpass"
3058        );
3059    }
3060
3061    #[test]
3062    fn test_sentinel_url_with_all_auth() {
3063        let conn = RedisConnection {
3064            host: "127.0.0.1".to_string(),
3065            port: 6379,
3066            db: 2,
3067            username: Some("redisuser".to_string()),
3068            password: Some("redispass".to_string()),
3069            key_prefix: "sockudo:".to_string(),
3070            sentinels: vec![
3071                RedisSentinel {
3072                    host: "sentinel1".to_string(),
3073                    port: 26379,
3074                },
3075                RedisSentinel {
3076                    host: "sentinel2".to_string(),
3077                    port: 26380,
3078                },
3079            ],
3080            sentinel_password: Some("sentinelauth".to_string()),
3081            name: "production-master".to_string(),
3082            cluster: RedisClusterConnection::default(),
3083            cluster_nodes: Vec::new(),
3084        };
3085        assert_eq!(
3086            conn.to_url(),
3087            "redis+sentinel://:sentinelauth@sentinel1:26379,sentinel2:26380/production-master/2?password=redispass&username=redisuser"
3088        );
3089    }
3090
3091    #[test]
3092    fn test_sentinel_to_host_port() {
3093        let sentinel = RedisSentinel {
3094            host: "sentinel.example.com".to_string(),
3095            port: 26379,
3096        };
3097        assert_eq!(sentinel.to_host_port(), "sentinel.example.com:26379");
3098    }
3099
3100    #[test]
3101    fn test_cluster_node_urls_with_shared_cluster_auth_and_tls() {
3102        let conn = RedisConnection {
3103            cluster: RedisClusterConnection {
3104                nodes: vec![
3105                    ClusterNode {
3106                        host: "node1.secure-cluster.com".to_string(),
3107                        port: 7000,
3108                    },
3109                    ClusterNode {
3110                        host: "redis://node2.secure-cluster.com:7001".to_string(),
3111                        port: 7001,
3112                    },
3113                    ClusterNode {
3114                        host: "rediss://node3.secure-cluster.com".to_string(),
3115                        port: 7002,
3116                    },
3117                ],
3118                username: None,
3119                password: Some("cluster-secret".to_string()),
3120                use_tls: true,
3121            },
3122            ..Default::default()
3123        };
3124
3125        assert_eq!(
3126            conn.cluster_node_urls(),
3127            vec![
3128                "rediss://:cluster-secret@node1.secure-cluster.com:7000",
3129                "redis://:cluster-secret@node2.secure-cluster.com:7001",
3130                "rediss://:cluster-secret@node3.secure-cluster.com:7002",
3131            ]
3132        );
3133    }
3134
3135    #[test]
3136    fn test_cluster_node_urls_fallback_to_legacy_nodes() {
3137        let conn = RedisConnection {
3138            password: Some("fallback-secret".to_string()),
3139            cluster_nodes: vec![ClusterNode {
3140                host: "legacy-node.example.com".to_string(),
3141                port: 7000,
3142            }],
3143            ..Default::default()
3144        };
3145
3146        assert_eq!(
3147            conn.cluster_node_urls(),
3148            vec!["redis://:fallback-secret@legacy-node.example.com:7000"]
3149        );
3150    }
3151
3152    #[test]
3153    fn test_normalize_cluster_seed_urls() {
3154        let conn = RedisConnection {
3155            cluster: RedisClusterConnection {
3156                nodes: Vec::new(),
3157                username: Some("svc-user".to_string()),
3158                password: Some("svc-pass".to_string()),
3159                use_tls: true,
3160            },
3161            ..Default::default()
3162        };
3163
3164        let seeds = vec![
3165            "node1.example.com:7000".to_string(),
3166            "redis://node2.example.com:7001".to_string(),
3167            "rediss://node3.example.com".to_string(),
3168        ];
3169
3170        assert_eq!(
3171            conn.normalize_cluster_seed_urls(&seeds),
3172            vec![
3173                "rediss://svc-user:svc-pass@node1.example.com:7000",
3174                "redis://svc-user:svc-pass@node2.example.com:7001",
3175                "rediss://svc-user:svc-pass@node3.example.com:6379",
3176            ]
3177        );
3178    }
3179}
3180
3181#[cfg(test)]
3182mod cluster_node_tests {
3183    use super::ClusterNode;
3184
3185    #[test]
3186    fn test_to_url_basic_host() {
3187        let node = ClusterNode {
3188            host: "localhost".to_string(),
3189            port: 6379,
3190        };
3191        assert_eq!(node.to_url(), "redis://localhost:6379");
3192    }
3193
3194    #[test]
3195    fn test_to_url_ip_address() {
3196        let node = ClusterNode {
3197            host: "127.0.0.1".to_string(),
3198            port: 6379,
3199        };
3200        assert_eq!(node.to_url(), "redis://127.0.0.1:6379");
3201    }
3202
3203    #[test]
3204    fn test_to_url_with_redis_protocol() {
3205        let node = ClusterNode {
3206            host: "redis://example.com".to_string(),
3207            port: 6379,
3208        };
3209        assert_eq!(node.to_url(), "redis://example.com:6379");
3210    }
3211
3212    #[test]
3213    fn test_to_url_with_rediss_protocol() {
3214        let node = ClusterNode {
3215            host: "rediss://secure.example.com".to_string(),
3216            port: 6379,
3217        };
3218        assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
3219    }
3220
3221    #[test]
3222    fn test_to_url_with_rediss_protocol_and_port_in_url() {
3223        let node = ClusterNode {
3224            host: "rediss://secure.example.com:7000".to_string(),
3225            port: 6379,
3226        };
3227        assert_eq!(node.to_url(), "rediss://secure.example.com:7000");
3228    }
3229
3230    #[test]
3231    fn test_to_url_with_redis_protocol_and_port_in_url() {
3232        let node = ClusterNode {
3233            host: "redis://example.com:7001".to_string(),
3234            port: 6379,
3235        };
3236        assert_eq!(node.to_url(), "redis://example.com:7001");
3237    }
3238
3239    #[test]
3240    fn test_to_url_with_trailing_whitespace() {
3241        let node = ClusterNode {
3242            host: "  rediss://secure.example.com  ".to_string(),
3243            port: 6379,
3244        };
3245        assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
3246    }
3247
3248    #[test]
3249    fn test_to_url_custom_port() {
3250        let node = ClusterNode {
3251            host: "redis-cluster.example.com".to_string(),
3252            port: 7000,
3253        };
3254        assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7000");
3255    }
3256
3257    #[test]
3258    fn test_to_url_plain_host_with_port_in_host_field() {
3259        let node = ClusterNode {
3260            host: "redis-cluster.example.com:7010".to_string(),
3261            port: 7000,
3262        };
3263        assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7010");
3264    }
3265
3266    #[test]
3267    fn test_to_url_with_options_adds_auth_and_tls() {
3268        let node = ClusterNode {
3269            host: "node.example.com".to_string(),
3270            port: 7000,
3271        };
3272        assert_eq!(
3273            node.to_url_with_options(true, Some("svc-user"), Some("secret")),
3274            "rediss://svc-user:secret@node.example.com:7000"
3275        );
3276    }
3277
3278    #[test]
3279    fn test_to_url_with_options_keeps_embedded_auth() {
3280        let node = ClusterNode {
3281            host: "rediss://:node-secret@node.example.com:7000".to_string(),
3282            port: 7000,
3283        };
3284        assert_eq!(
3285            node.to_url_with_options(true, Some("global-user"), Some("global-secret")),
3286            "rediss://:node-secret@node.example.com:7000"
3287        );
3288    }
3289
3290    #[test]
3291    fn test_from_seed_parses_plain_host_port() {
3292        let node = ClusterNode::from_seed("cluster-node-1:7005").expect("node should parse");
3293        assert_eq!(node.host, "cluster-node-1");
3294        assert_eq!(node.port, 7005);
3295    }
3296
3297    #[test]
3298    fn test_from_seed_keeps_scheme_urls() {
3299        let node =
3300            ClusterNode::from_seed("rediss://secure.example.com:7005").expect("node should parse");
3301        assert_eq!(node.host, "rediss://secure.example.com:7005");
3302        assert_eq!(node.port, 7005);
3303    }
3304
3305    #[test]
3306    fn test_to_url_aws_elasticache_hostname() {
3307        let node = ClusterNode {
3308            host: "rediss://my-cluster.use1.cache.amazonaws.com".to_string(),
3309            port: 6379,
3310        };
3311        assert_eq!(
3312            node.to_url(),
3313            "rediss://my-cluster.use1.cache.amazonaws.com:6379"
3314        );
3315    }
3316
3317    #[test]
3318    fn test_to_url_with_ipv6_no_port() {
3319        let node = ClusterNode {
3320            host: "rediss://[::1]".to_string(),
3321            port: 6379,
3322        };
3323        assert_eq!(node.to_url(), "rediss://[::1]:6379");
3324    }
3325
3326    #[test]
3327    fn test_to_url_with_ipv6_and_port_in_url() {
3328        let node = ClusterNode {
3329            host: "rediss://[::1]:7000".to_string(),
3330            port: 6379,
3331        };
3332        assert_eq!(node.to_url(), "rediss://[::1]:7000");
3333    }
3334
3335    #[test]
3336    fn test_to_url_with_ipv6_full_address_no_port() {
3337        let node = ClusterNode {
3338            host: "rediss://[2001:db8::1]".to_string(),
3339            port: 6379,
3340        };
3341        assert_eq!(node.to_url(), "rediss://[2001:db8::1]:6379");
3342    }
3343
3344    #[test]
3345    fn test_to_url_with_ipv6_full_address_with_port() {
3346        let node = ClusterNode {
3347            host: "rediss://[2001:db8::1]:7000".to_string(),
3348            port: 6379,
3349        };
3350        assert_eq!(node.to_url(), "rediss://[2001:db8::1]:7000");
3351    }
3352
3353    #[test]
3354    fn test_to_url_with_redis_protocol_ipv6() {
3355        let node = ClusterNode {
3356            host: "redis://[::1]".to_string(),
3357            port: 6379,
3358        };
3359        assert_eq!(node.to_url(), "redis://[::1]:6379");
3360    }
3361}
3362
3363#[cfg(test)]
3364mod cors_config_tests {
3365    use super::CorsConfig;
3366
3367    fn cors_from_json(json: &str) -> sonic_rs::Result<CorsConfig> {
3368        sonic_rs::from_str(json)
3369    }
3370
3371    #[test]
3372    fn test_deserialize_valid_exact_origins() {
3373        let config =
3374            cors_from_json(r#"{"origin": ["https://example.com", "https://other.com"]}"#).unwrap();
3375        assert_eq!(config.origin.len(), 2);
3376    }
3377
3378    #[test]
3379    fn test_deserialize_valid_wildcard_patterns() {
3380        let config =
3381            cors_from_json(r#"{"origin": ["*.example.com", "https://*.staging.example.com"]}"#)
3382                .unwrap();
3383        assert_eq!(config.origin.len(), 2);
3384    }
3385
3386    #[test]
3387    fn test_deserialize_allows_special_markers() {
3388        assert!(cors_from_json(r#"{"origin": ["*"]}"#).is_ok());
3389        assert!(cors_from_json(r#"{"origin": ["Any"]}"#).is_ok());
3390        assert!(cors_from_json(r#"{"origin": ["any"]}"#).is_ok());
3391        assert!(cors_from_json(r#"{"origin": ["*", "https://example.com"]}"#).is_ok());
3392    }
3393
3394    #[test]
3395    fn test_deserialize_rejects_invalid_patterns() {
3396        assert!(cors_from_json(r#"{"origin": ["*.*bad"]}"#).is_err());
3397        assert!(cors_from_json(r#"{"origin": ["*."]}"#).is_err());
3398        assert!(cors_from_json(r#"{"origin": [""]}"#).is_err());
3399        assert!(cors_from_json(r#"{"origin": ["https://"]}"#).is_err());
3400    }
3401
3402    #[test]
3403    fn test_deserialize_rejects_mixed_valid_and_invalid() {
3404        assert!(cors_from_json(r#"{"origin": ["https://good.com", "*.*bad"]}"#).is_err());
3405    }
3406}