Skip to main content

sockudo_core/
options.rs

1use crate::app::App;
2use crate::utils::{parse_bool_env, parse_env, parse_env_optional};
3use ahash::AHashMap;
4use serde::{Deserialize, Serialize};
5use std::str::FromStr;
6use tracing::{info, warn};
7use url::Url;
8
9// Custom deserializer for octal permission mode (string format only, like chmod)
10fn deserialize_octal_permission<'de, D>(deserializer: D) -> Result<u32, D::Error>
11where
12    D: serde::Deserializer<'de>,
13{
14    use serde::de::{self};
15
16    let s = String::deserialize(deserializer)?;
17
18    // Validate that the string contains only octal digits
19    if !s.chars().all(|c| c.is_digit(8)) {
20        return Err(de::Error::custom(format!(
21            "invalid octal permission mode '{}': must contain only digits 0-7",
22            s
23        )));
24    }
25
26    // Parse as octal
27    let mode = u32::from_str_radix(&s, 8)
28        .map_err(|_| de::Error::custom(format!("invalid octal permission mode: {}", s)))?;
29
30    // Validate it's within valid Unix permission range
31    if mode > 0o777 {
32        return Err(de::Error::custom(format!(
33            "permission mode '{}' exceeds maximum value 777",
34            s
35        )));
36    }
37
38    Ok(mode)
39}
40
41// Helper function to parse driver enums with fallback behavior (matches main.rs)
42fn parse_driver_enum<T: FromStr + Clone + std::fmt::Debug>(
43    driver_str: String,
44    default_driver: T,
45    driver_name: &str,
46) -> T
47where
48    <T as FromStr>::Err: std::fmt::Debug,
49{
50    match T::from_str(&driver_str.to_lowercase()) {
51        Ok(driver_enum) => driver_enum,
52        Err(e) => {
53            warn!(
54                "Failed to parse {} driver from string '{}': {:?}. Using default: {:?}.",
55                driver_name, driver_str, e, default_driver
56            );
57            default_driver
58        }
59    }
60}
61
62fn override_db_pool_settings(db_conn: &mut DatabaseConnection, prefix: &str) {
63    if let Some(min) = parse_env_optional::<u32>(&format!("{}_POOL_MIN", prefix)) {
64        db_conn.pool_min = Some(min);
65    }
66    if let Some(max) = parse_env_optional::<u32>(&format!("{}_POOL_MAX", prefix)) {
67        db_conn.pool_max = Some(max);
68    }
69}
70
71// --- Enums for Driver Types ---
72
73#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(rename_all = "lowercase")]
75pub enum AdapterDriver {
76    #[default]
77    Local,
78    Redis,
79    #[serde(rename = "redis-cluster")]
80    RedisCluster,
81    Nats,
82    Pulsar,
83    RabbitMq,
84    #[serde(rename = "google-pubsub")]
85    GooglePubSub,
86    Kafka,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
90#[serde(default)]
91pub struct DynamoDbSettings {
92    pub region: String,
93    pub table_name: String,
94    pub endpoint_url: Option<String>,
95    pub aws_access_key_id: Option<String>,
96    pub aws_secret_access_key: Option<String>,
97    pub aws_profile_name: Option<String>,
98}
99
100impl Default for DynamoDbSettings {
101    fn default() -> Self {
102        Self {
103            region: "us-east-1".to_string(),
104            table_name: "sockudo-applications".to_string(),
105            endpoint_url: None,
106            aws_access_key_id: None,
107            aws_secret_access_key: None,
108            aws_profile_name: None,
109        }
110    }
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
114#[serde(default)]
115pub struct ScyllaDbSettings {
116    pub nodes: Vec<String>,
117    pub keyspace: String,
118    pub table_name: String,
119    pub username: Option<String>,
120    pub password: Option<String>,
121    pub replication_class: String,
122    pub replication_factor: u32,
123}
124
125impl Default for ScyllaDbSettings {
126    fn default() -> Self {
127        Self {
128            nodes: vec!["127.0.0.1:9042".to_string()],
129            keyspace: "sockudo".to_string(),
130            table_name: "applications".to_string(),
131            username: None,
132            password: None,
133            replication_class: "SimpleStrategy".to_string(),
134            replication_factor: 3,
135        }
136    }
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
140#[serde(default)]
141pub struct SurrealDbSettings {
142    pub url: String,
143    pub namespace: String,
144    pub database: String,
145    pub username: String,
146    pub password: String,
147    pub table_name: String,
148    pub cache_ttl: u64,
149    pub cache_max_capacity: u64,
150}
151
152impl Default for SurrealDbSettings {
153    fn default() -> Self {
154        Self {
155            url: "ws://127.0.0.1:8000".to_string(),
156            namespace: "sockudo".to_string(),
157            database: "sockudo".to_string(),
158            username: "root".to_string(),
159            password: "root".to_string(),
160            table_name: "applications".to_string(),
161            cache_ttl: 300,
162            cache_max_capacity: 100,
163        }
164    }
165}
166
167impl FromStr for AdapterDriver {
168    type Err = String;
169    fn from_str(s: &str) -> Result<Self, Self::Err> {
170        match s.to_lowercase().as_str() {
171            "local" => Ok(AdapterDriver::Local),
172            "redis" => Ok(AdapterDriver::Redis),
173            "redis-cluster" => Ok(AdapterDriver::RedisCluster),
174            "nats" => Ok(AdapterDriver::Nats),
175            "pulsar" => Ok(AdapterDriver::Pulsar),
176            "rabbitmq" | "rabbit-mq" => Ok(AdapterDriver::RabbitMq),
177            "google-pubsub" | "gcp-pubsub" | "pubsub" => Ok(AdapterDriver::GooglePubSub),
178            "kafka" => Ok(AdapterDriver::Kafka),
179            _ => Err(format!("Unknown adapter driver: {s}")),
180        }
181    }
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
185#[serde(rename_all = "lowercase")]
186pub enum AppManagerDriver {
187    #[default]
188    Memory,
189    Mysql,
190    Dynamodb,
191    PgSql,
192    SurrealDb,
193    ScyllaDb,
194}
195impl FromStr for AppManagerDriver {
196    type Err = String;
197    fn from_str(s: &str) -> Result<Self, Self::Err> {
198        match s.to_lowercase().as_str() {
199            "memory" => Ok(AppManagerDriver::Memory),
200            "mysql" => Ok(AppManagerDriver::Mysql),
201            "dynamodb" => Ok(AppManagerDriver::Dynamodb),
202            "pgsql" | "postgres" | "postgresql" => Ok(AppManagerDriver::PgSql),
203            "surreal" | "surrealdb" => Ok(AppManagerDriver::SurrealDb),
204            "scylladb" | "scylla" => Ok(AppManagerDriver::ScyllaDb),
205            _ => Err(format!("Unknown app manager driver: {s}")),
206        }
207    }
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
211#[serde(rename_all = "lowercase")]
212pub enum CacheDriver {
213    #[default]
214    Memory,
215    Redis,
216    #[serde(rename = "redis-cluster")]
217    RedisCluster,
218    None,
219}
220
221impl FromStr for CacheDriver {
222    type Err = String;
223    fn from_str(s: &str) -> Result<Self, Self::Err> {
224        match s.to_lowercase().as_str() {
225            "memory" => Ok(CacheDriver::Memory),
226            "redis" => Ok(CacheDriver::Redis),
227            "redis-cluster" => Ok(CacheDriver::RedisCluster),
228            "none" => Ok(CacheDriver::None),
229            _ => Err(format!("Unknown cache driver: {s}")),
230        }
231    }
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
235#[serde(rename_all = "lowercase")]
236pub enum QueueDriver {
237    Memory,
238    #[default]
239    Redis,
240    #[serde(rename = "redis-cluster")]
241    RedisCluster,
242    Sqs,
243    Sns,
244    None,
245}
246
247impl FromStr for QueueDriver {
248    type Err = String;
249    fn from_str(s: &str) -> Result<Self, Self::Err> {
250        match s.to_lowercase().as_str() {
251            "memory" => Ok(QueueDriver::Memory),
252            "redis" => Ok(QueueDriver::Redis),
253            "redis-cluster" => Ok(QueueDriver::RedisCluster),
254            "sqs" => Ok(QueueDriver::Sqs),
255            "sns" => Ok(QueueDriver::Sns),
256            "none" => Ok(QueueDriver::None),
257            _ => Err(format!("Unknown queue driver: {s}")),
258        }
259    }
260}
261
262impl AsRef<str> for QueueDriver {
263    fn as_ref(&self) -> &str {
264        match self {
265            QueueDriver::Memory => "memory",
266            QueueDriver::Redis => "redis",
267            QueueDriver::RedisCluster => "redis-cluster",
268            QueueDriver::Sqs => "sqs",
269            QueueDriver::Sns => "sns",
270            QueueDriver::None => "none",
271        }
272    }
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize)]
276#[serde(default)]
277pub struct RedisClusterQueueConfig {
278    pub concurrency: u32,
279    pub prefix: Option<String>,
280    pub nodes: Vec<String>,
281    pub request_timeout_ms: u64,
282}
283
284impl Default for RedisClusterQueueConfig {
285    fn default() -> Self {
286        Self {
287            concurrency: 5,
288            prefix: Some("sockudo_queue:".to_string()),
289            nodes: vec!["redis://127.0.0.1:6379".to_string()],
290            request_timeout_ms: 5000,
291        }
292    }
293}
294
295#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
296#[serde(rename_all = "lowercase")]
297pub enum MetricsDriver {
298    #[default]
299    Prometheus,
300}
301
302#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
303#[serde(rename_all = "lowercase")]
304pub enum LogOutputFormat {
305    #[default]
306    Human,
307    Json,
308}
309
310impl FromStr for LogOutputFormat {
311    type Err = String;
312    fn from_str(s: &str) -> Result<Self, Self::Err> {
313        match s.to_lowercase().as_str() {
314            "human" => Ok(LogOutputFormat::Human),
315            "json" => Ok(LogOutputFormat::Json),
316            _ => Err(format!("Unknown log output format: {s}")),
317        }
318    }
319}
320
321impl FromStr for MetricsDriver {
322    type Err = String;
323    fn from_str(s: &str) -> Result<Self, Self::Err> {
324        match s.to_lowercase().as_str() {
325            "prometheus" => Ok(MetricsDriver::Prometheus),
326            _ => Err(format!("Unknown metrics driver: {s}")),
327        }
328    }
329}
330
331impl AsRef<str> for MetricsDriver {
332    fn as_ref(&self) -> &str {
333        match self {
334            MetricsDriver::Prometheus => "prometheus",
335        }
336    }
337}
338
339// --- Main Configuration Struct ---
340#[derive(Debug, Clone, Serialize, Deserialize)]
341#[serde(default)]
342pub struct ServerOptions {
343    pub adapter: AdapterConfig,
344    pub app_manager: AppManagerConfig,
345    pub cache: CacheConfig,
346    pub channel_limits: ChannelLimits,
347    pub cors: CorsConfig,
348    pub database: DatabaseConfig,
349    pub database_pooling: DatabasePooling,
350    pub debug: bool,
351    pub event_limits: EventLimits,
352    pub host: String,
353    pub http_api: HttpApiConfig,
354    pub instance: InstanceConfig,
355    pub logging: Option<LoggingConfig>,
356    pub metrics: MetricsConfig,
357    pub mode: String,
358    pub port: u16,
359    pub path_prefix: String,
360    pub presence: PresenceConfig,
361    pub queue: QueueConfig,
362    pub rate_limiter: RateLimiterConfig,
363    pub shutdown_grace_period: u64,
364    pub ssl: SslConfig,
365    pub user_authentication_timeout: u64,
366    pub webhooks: WebhooksConfig,
367    pub websocket_max_payload_kb: u32,
368    pub cleanup: CleanupConfig,
369    pub activity_timeout: u64,
370    pub cluster_health: ClusterHealthConfig,
371    pub unix_socket: UnixSocketConfig,
372    pub delta_compression: DeltaCompressionOptionsConfig,
373    pub tag_filtering: TagFilteringConfig,
374    pub websocket: WebSocketConfig,
375    pub connection_recovery: ConnectionRecoveryConfig,
376    pub idempotency: IdempotencyConfig,
377    pub ephemeral: EphemeralConfig,
378    pub echo_control: EchoControlConfig,
379    pub event_name_filtering: EventNameFilteringConfig,
380}
381
382// --- Configuration Sub-Structs ---
383
384#[derive(Debug, Clone, Serialize, Deserialize)]
385#[serde(default)]
386pub struct SqsQueueConfig {
387    pub region: String,
388    pub queue_url_prefix: Option<String>,
389    pub visibility_timeout: i32,
390    pub endpoint_url: Option<String>,
391    pub max_messages: i32,
392    pub wait_time_seconds: i32,
393    pub concurrency: u32,
394    pub fifo: bool,
395    pub message_group_id: Option<String>,
396}
397
398#[derive(Debug, Clone, Serialize, Deserialize)]
399#[serde(default)]
400pub struct SnsQueueConfig {
401    pub region: String,
402    pub topic_arn: String,
403    pub endpoint_url: Option<String>,
404}
405
406#[derive(Debug, Clone, Serialize, Deserialize)]
407#[serde(default)]
408pub struct AdapterConfig {
409    pub driver: AdapterDriver,
410    pub redis: RedisAdapterConfig,
411    pub cluster: RedisClusterAdapterConfig,
412    pub nats: NatsAdapterConfig,
413    pub pulsar: PulsarAdapterConfig,
414    pub rabbitmq: RabbitMqAdapterConfig,
415    pub google_pubsub: GooglePubSubAdapterConfig,
416    pub kafka: KafkaAdapterConfig,
417    #[serde(default = "default_buffer_multiplier_per_cpu")]
418    pub buffer_multiplier_per_cpu: usize,
419    pub cluster_health: ClusterHealthConfig,
420    #[serde(default = "default_enable_socket_counting")]
421    pub enable_socket_counting: bool,
422}
423
424fn default_enable_socket_counting() -> bool {
425    true
426}
427
428fn default_buffer_multiplier_per_cpu() -> usize {
429    64
430}
431
432impl Default for AdapterConfig {
433    fn default() -> Self {
434        Self {
435            driver: AdapterDriver::default(),
436            redis: RedisAdapterConfig::default(),
437            cluster: RedisClusterAdapterConfig::default(),
438            nats: NatsAdapterConfig::default(),
439            pulsar: PulsarAdapterConfig::default(),
440            rabbitmq: RabbitMqAdapterConfig::default(),
441            google_pubsub: GooglePubSubAdapterConfig::default(),
442            kafka: KafkaAdapterConfig::default(),
443            buffer_multiplier_per_cpu: default_buffer_multiplier_per_cpu(),
444            cluster_health: ClusterHealthConfig::default(),
445            enable_socket_counting: default_enable_socket_counting(),
446        }
447    }
448}
449
450#[derive(Debug, Clone, Serialize, Deserialize)]
451#[serde(default)]
452pub struct RedisAdapterConfig {
453    pub requests_timeout: u64,
454    pub prefix: String,
455    pub redis_pub_options: AHashMap<String, sonic_rs::Value>,
456    pub redis_sub_options: AHashMap<String, sonic_rs::Value>,
457    pub cluster_mode: bool,
458}
459
460#[derive(Debug, Clone, Serialize, Deserialize)]
461#[serde(default)]
462pub struct RedisClusterAdapterConfig {
463    pub nodes: Vec<String>,
464    pub prefix: String,
465    pub request_timeout_ms: u64,
466    pub use_connection_manager: bool,
467    #[serde(default)]
468    pub use_sharded_pubsub: bool,
469}
470
471#[derive(Debug, Clone, Serialize, Deserialize)]
472#[serde(default)]
473pub struct NatsAdapterConfig {
474    pub servers: Vec<String>,
475    pub prefix: String,
476    pub request_timeout_ms: u64,
477    pub username: Option<String>,
478    pub password: Option<String>,
479    pub token: Option<String>,
480    pub connection_timeout_ms: u64,
481    pub nodes_number: Option<u32>,
482    pub discovery_max_wait_ms: u64,
483    pub discovery_idle_wait_ms: u64,
484}
485
486#[derive(Debug, Clone, Serialize, Deserialize)]
487#[serde(default)]
488pub struct PulsarAdapterConfig {
489    pub url: String,
490    pub prefix: String,
491    pub request_timeout_ms: u64,
492    pub token: Option<String>,
493    pub nodes_number: Option<u32>,
494}
495
496#[derive(Debug, Clone, Serialize, Deserialize)]
497#[serde(default)]
498pub struct RabbitMqAdapterConfig {
499    pub url: String,
500    pub prefix: String,
501    pub request_timeout_ms: u64,
502    pub connection_timeout_ms: u64,
503    pub nodes_number: Option<u32>,
504}
505
506#[derive(Debug, Clone, Serialize, Deserialize)]
507#[serde(default)]
508pub struct GooglePubSubAdapterConfig {
509    pub project_id: String,
510    pub prefix: String,
511    pub request_timeout_ms: u64,
512    pub emulator_host: Option<String>,
513    pub nodes_number: Option<u32>,
514}
515
516#[derive(Debug, Clone, Serialize, Deserialize)]
517#[serde(default)]
518pub struct KafkaAdapterConfig {
519    pub brokers: Vec<String>,
520    pub prefix: String,
521    pub request_timeout_ms: u64,
522    pub security_protocol: Option<String>,
523    pub sasl_mechanism: Option<String>,
524    pub sasl_username: Option<String>,
525    pub sasl_password: Option<String>,
526    pub nodes_number: Option<u32>,
527}
528
529#[derive(Debug, Clone, Serialize, Deserialize, Default)]
530#[serde(default)]
531pub struct AppManagerConfig {
532    pub driver: AppManagerDriver,
533    pub array: ArrayConfig,
534    pub cache: CacheSettings,
535    pub scylladb: ScyllaDbSettings,
536}
537
538#[derive(Debug, Clone, Serialize, Deserialize, Default)]
539#[serde(default)]
540pub struct ArrayConfig {
541    pub apps: Vec<App>,
542}
543
544#[derive(Debug, Clone, Serialize, Deserialize)]
545#[serde(default)]
546pub struct CacheSettings {
547    pub enabled: bool,
548    pub ttl: u64,
549}
550
551#[derive(Debug, Clone, Serialize, Deserialize)]
552#[serde(default)]
553pub struct MemoryCacheOptions {
554    pub ttl: u64,
555    pub cleanup_interval: u64,
556    pub max_capacity: u64,
557}
558
559#[derive(Debug, Clone, Serialize, Deserialize)]
560#[serde(default)]
561pub struct CacheConfig {
562    pub driver: CacheDriver,
563    pub redis: RedisConfig,
564    pub memory: MemoryCacheOptions,
565}
566
567#[derive(Debug, Clone, Serialize, Deserialize, Default)]
568#[serde(default)]
569pub struct RedisConfig {
570    pub prefix: Option<String>,
571    pub url_override: Option<String>,
572    pub cluster_mode: bool,
573}
574
575#[derive(Debug, Clone, Serialize, Deserialize)]
576#[serde(default)]
577pub struct ChannelLimits {
578    pub max_name_length: u32,
579    pub cache_ttl: u64,
580}
581
582#[derive(Debug, Clone, Serialize, Deserialize)]
583#[serde(default)]
584pub struct CorsConfig {
585    pub credentials: bool,
586    #[serde(deserialize_with = "deserialize_and_validate_cors_origins")]
587    pub origin: Vec<String>,
588    pub methods: Vec<String>,
589    pub allowed_headers: Vec<String>,
590}
591
592fn deserialize_and_validate_cors_origins<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
593where
594    D: serde::Deserializer<'de>,
595{
596    use serde::de::Error;
597    let origins = Vec::<String>::deserialize(deserializer)?;
598
599    if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&origins) {
600        return Err(D::Error::custom(format!(
601            "CORS origin pattern validation failed: {}",
602            e
603        )));
604    }
605
606    Ok(origins)
607}
608
609#[derive(Debug, Clone, Serialize, Deserialize, Default)]
610#[serde(default)]
611pub struct DatabaseConfig {
612    pub mysql: DatabaseConnection,
613    pub postgres: DatabaseConnection,
614    pub redis: RedisConnection,
615    pub dynamodb: DynamoDbSettings,
616    pub surrealdb: SurrealDbSettings,
617    pub scylladb: ScyllaDbSettings,
618}
619
620#[derive(Debug, Clone, Serialize, Deserialize)]
621#[serde(default)]
622pub struct DatabaseConnection {
623    pub host: String,
624    pub port: u16,
625    pub username: String,
626    pub password: String,
627    pub database: String,
628    pub table_name: String,
629    pub connection_pool_size: u32,
630    pub pool_min: Option<u32>,
631    pub pool_max: Option<u32>,
632    pub cache_ttl: u64,
633    pub cache_cleanup_interval: u64,
634    pub cache_max_capacity: u64,
635}
636
637#[derive(Debug, Clone, Serialize, Deserialize)]
638#[serde(default)]
639pub struct RedisConnection {
640    pub host: String,
641    pub port: u16,
642    pub db: u32,
643    pub username: Option<String>,
644    pub password: Option<String>,
645    pub key_prefix: String,
646    pub sentinels: Vec<RedisSentinel>,
647    pub sentinel_password: Option<String>,
648    pub name: String,
649    pub cluster: RedisClusterConnection,
650    /// Legacy field kept for backward compatibility. Prefer `database.redis.cluster.nodes`.
651    pub cluster_nodes: Vec<ClusterNode>,
652}
653
654#[derive(Debug, Clone, Serialize, Deserialize)]
655#[serde(default)]
656pub struct RedisSentinel {
657    pub host: String,
658    pub port: u16,
659}
660
661#[derive(Debug, Clone, Serialize, Deserialize, Default)]
662#[serde(default)]
663pub struct RedisClusterConnection {
664    pub nodes: Vec<ClusterNode>,
665    pub username: Option<String>,
666    pub password: Option<String>,
667    #[serde(alias = "useTLS")]
668    pub use_tls: bool,
669}
670
671impl RedisConnection {
672    /// Returns true if Redis Sentinel is configured.
673    pub fn is_sentinel_configured(&self) -> bool {
674        !self.sentinels.is_empty()
675    }
676
677    /// Builds a Redis connection URL based on the configuration.
678    pub fn to_url(&self) -> String {
679        if self.is_sentinel_configured() {
680            self.build_sentinel_url()
681        } else {
682            self.build_standard_url()
683        }
684    }
685
686    fn build_standard_url(&self) -> String {
687        // Extract scheme from host if present, otherwise default to redis://
688        let (scheme, host) = if self.host.starts_with("rediss://") {
689            ("rediss://", self.host.trim_start_matches("rediss://"))
690        } else if self.host.starts_with("redis://") {
691            ("redis://", self.host.trim_start_matches("redis://"))
692        } else {
693            ("redis://", self.host.as_str())
694        };
695
696        let mut url = String::from(scheme);
697
698        if let Some(ref username) = self.username {
699            url.push_str(username);
700            if let Some(ref password) = self.password {
701                url.push(':');
702                url.push_str(&urlencoding::encode(password));
703            }
704            url.push('@');
705        } else if let Some(ref password) = self.password {
706            url.push(':');
707            url.push_str(&urlencoding::encode(password));
708            url.push('@');
709        }
710
711        url.push_str(host);
712        url.push(':');
713        url.push_str(&self.port.to_string());
714        url.push('/');
715        url.push_str(&self.db.to_string());
716
717        url
718    }
719
720    fn build_sentinel_url(&self) -> String {
721        let mut url = String::from("redis+sentinel://");
722
723        if let Some(ref sentinel_password) = self.sentinel_password {
724            url.push(':');
725            url.push_str(&urlencoding::encode(sentinel_password));
726            url.push('@');
727        }
728
729        let sentinel_hosts: Vec<String> = self
730            .sentinels
731            .iter()
732            .map(|s| format!("{}:{}", s.host, s.port))
733            .collect();
734        url.push_str(&sentinel_hosts.join(","));
735
736        url.push('/');
737        url.push_str(&self.name);
738        url.push('/');
739        url.push_str(&self.db.to_string());
740
741        let mut params = Vec::new();
742        if let Some(ref password) = self.password {
743            params.push(format!("password={}", urlencoding::encode(password)));
744        }
745        if let Some(ref username) = self.username {
746            params.push(format!("username={}", urlencoding::encode(username)));
747        }
748
749        if !params.is_empty() {
750            url.push('?');
751            url.push_str(&params.join("&"));
752        }
753
754        url
755    }
756
757    /// Returns true when cluster nodes are configured via either the new (`cluster.nodes`)
758    /// or legacy (`cluster_nodes`) field.
759    pub fn has_cluster_nodes(&self) -> bool {
760        !self.cluster.nodes.is_empty() || !self.cluster_nodes.is_empty()
761    }
762
763    /// Returns normalized Redis Cluster seed URLs from the canonical cluster configuration.
764    /// Falls back to legacy `cluster_nodes` for backward compatibility.
765    pub fn cluster_node_urls(&self) -> Vec<String> {
766        if !self.cluster.nodes.is_empty() {
767            return self.build_cluster_urls(&self.cluster.nodes);
768        }
769        self.build_cluster_urls(&self.cluster_nodes)
770    }
771
772    /// Normalizes any list of seed strings (`host:port`, `redis://...`, `rediss://...`) using
773    /// shared cluster auth/TLS options.
774    pub fn normalize_cluster_seed_urls(&self, seeds: &[String]) -> Vec<String> {
775        self.build_cluster_urls(
776            &seeds
777                .iter()
778                .filter_map(|seed| ClusterNode::from_seed(seed))
779                .collect::<Vec<ClusterNode>>(),
780        )
781    }
782
783    fn build_cluster_urls(&self, nodes: &[ClusterNode]) -> Vec<String> {
784        let username = self
785            .cluster
786            .username
787            .as_deref()
788            .or(self.username.as_deref());
789        let password = self
790            .cluster
791            .password
792            .as_deref()
793            .or(self.password.as_deref());
794        let use_tls = self.cluster.use_tls;
795
796        nodes
797            .iter()
798            .map(|node| node.to_url_with_options(use_tls, username, password))
799            .collect()
800    }
801}
802
803impl RedisSentinel {
804    pub fn to_host_port(&self) -> String {
805        format!("{}:{}", self.host, self.port)
806    }
807}
808
809#[derive(Debug, Clone, Serialize, Deserialize)]
810#[serde(default)]
811pub struct ClusterNode {
812    pub host: String,
813    pub port: u16,
814}
815
816impl ClusterNode {
817    pub fn to_url(&self) -> String {
818        self.to_url_with_options(false, None, None)
819    }
820
821    pub fn to_url_with_options(
822        &self,
823        use_tls: bool,
824        username: Option<&str>,
825        password: Option<&str>,
826    ) -> String {
827        let host = self.host.trim();
828
829        if host.starts_with("redis://") || host.starts_with("rediss://") {
830            if let Ok(parsed) = Url::parse(host)
831                && let Some(host_str) = parsed.host_str()
832            {
833                let scheme = parsed.scheme();
834                let port = parsed.port_or_known_default().unwrap_or(self.port);
835                let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
836                let parsed_password = parsed.password();
837                let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
838                let (effective_username, effective_password) = if has_embedded_auth {
839                    (parsed_username, parsed_password)
840                } else {
841                    (username, password)
842                };
843
844                return build_redis_url(
845                    scheme,
846                    host_str,
847                    port,
848                    effective_username,
849                    effective_password,
850                );
851            }
852
853            // Fallback for malformed URLs
854            let has_port = if let Some(bracket_pos) = host.rfind(']') {
855                host[bracket_pos..].contains(':')
856            } else {
857                host.split(':').count() >= 3
858            };
859            let base = if has_port {
860                host.to_string()
861            } else {
862                format!("{}:{}", host, self.port)
863            };
864
865            if let Ok(parsed) = Url::parse(&base) {
866                let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
867                let parsed_password = parsed.password();
868                if let Some(host_str) = parsed.host_str() {
869                    let port = parsed.port_or_known_default().unwrap_or(self.port);
870                    let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
871                    let (effective_username, effective_password) = if has_embedded_auth {
872                        (parsed_username, parsed_password)
873                    } else {
874                        (username, password)
875                    };
876                    return build_redis_url(
877                        parsed.scheme(),
878                        host_str,
879                        port,
880                        effective_username,
881                        effective_password,
882                    );
883                }
884            }
885            return base;
886        }
887
888        let (normalized_host, normalized_port) = split_plain_host_and_port(host, self.port);
889        let scheme = if use_tls { "rediss" } else { "redis" };
890        build_redis_url(
891            scheme,
892            &normalized_host,
893            normalized_port,
894            username,
895            password,
896        )
897    }
898
899    pub fn from_seed(seed: &str) -> Option<Self> {
900        let trimmed = seed.trim();
901        if trimmed.is_empty() {
902            return None;
903        }
904
905        if trimmed.starts_with("redis://") || trimmed.starts_with("rediss://") {
906            let port = Url::parse(trimmed)
907                .ok()
908                .and_then(|parsed| parsed.port_or_known_default())
909                .unwrap_or(6379);
910            return Some(Self {
911                host: trimmed.to_string(),
912                port,
913            });
914        }
915
916        let (host, port) = split_plain_host_and_port(trimmed, 6379);
917        Some(Self { host, port })
918    }
919}
920
921fn split_plain_host_and_port(raw_host: &str, default_port: u16) -> (String, u16) {
922    let host = raw_host.trim();
923
924    // Handle bracketed IPv6: [::1]:6379
925    if host.starts_with('[') {
926        if let Some(end_bracket) = host.find(']') {
927            let host_part = host[1..end_bracket].to_string();
928            let remainder = &host[end_bracket + 1..];
929            if let Some(port_str) = remainder.strip_prefix(':')
930                && let Ok(port) = port_str.parse::<u16>()
931            {
932                return (host_part, port);
933            }
934            return (host_part, default_port);
935        }
936        return (host.to_string(), default_port);
937    }
938
939    // Handle hostname/IP with port: host:6379
940    if host.matches(':').count() == 1
941        && let Some((host_part, port_part)) = host.rsplit_once(':')
942        && let Ok(port) = port_part.parse::<u16>()
943    {
944        return (host_part.to_string(), port);
945    }
946
947    (host.to_string(), default_port)
948}
949
950fn build_redis_url(
951    scheme: &str,
952    host: &str,
953    port: u16,
954    username: Option<&str>,
955    password: Option<&str>,
956) -> String {
957    let mut url = format!("{scheme}://");
958
959    if let Some(user) = username {
960        url.push_str(&urlencoding::encode(user));
961        if let Some(pass) = password {
962            url.push(':');
963            url.push_str(&urlencoding::encode(pass));
964        }
965        url.push('@');
966    } else if let Some(pass) = password {
967        url.push(':');
968        url.push_str(&urlencoding::encode(pass));
969        url.push('@');
970    }
971
972    if host.contains(':') && !host.starts_with('[') {
973        url.push('[');
974        url.push_str(host);
975        url.push(']');
976    } else {
977        url.push_str(host);
978    }
979    url.push(':');
980    url.push_str(&port.to_string());
981    url
982}
983
984#[derive(Debug, Clone, Serialize, Deserialize)]
985#[serde(default)]
986pub struct DatabasePooling {
987    pub enabled: bool,
988    pub min: u32,
989    pub max: u32,
990}
991
992#[derive(Debug, Clone, Serialize, Deserialize)]
993#[serde(default)]
994pub struct EventLimits {
995    pub max_channels_at_once: u32,
996    pub max_name_length: u32,
997    pub max_payload_in_kb: u32,
998    pub max_batch_size: u32,
999}
1000
1001#[derive(Debug, Clone, Serialize, Deserialize)]
1002#[serde(default)]
1003pub struct IdempotencyConfig {
1004    /// Whether idempotency key support is enabled
1005    pub enabled: bool,
1006    /// TTL in seconds for idempotency keys (default: 120s, like Ably's 2-minute window)
1007    pub ttl_seconds: u64,
1008    /// Maximum length of an idempotency key
1009    pub max_key_length: usize,
1010}
1011
1012#[derive(Debug, Clone, Serialize, Deserialize)]
1013#[serde(default)]
1014pub struct EphemeralConfig {
1015    /// Whether ephemeral message handling is enabled.
1016    pub enabled: bool,
1017}
1018
1019#[derive(Debug, Clone, Serialize, Deserialize)]
1020#[serde(default)]
1021pub struct EchoControlConfig {
1022    /// Whether connection-level and per-message echo control is enabled.
1023    pub enabled: bool,
1024    /// Default echo behavior for new V2 connections when the query param is omitted.
1025    pub default_echo_messages: bool,
1026}
1027
1028#[derive(Debug, Clone, Serialize, Deserialize)]
1029#[serde(default)]
1030pub struct EventNameFilteringConfig {
1031    /// Whether per-subscription event name filtering is enabled.
1032    pub enabled: bool,
1033    /// Maximum event names allowed in a single filter.
1034    pub max_events_per_filter: usize,
1035    /// Maximum length for each event name in a filter.
1036    pub max_event_name_length: usize,
1037}
1038
1039#[derive(Debug, Clone, Serialize, Deserialize)]
1040#[serde(default)]
1041pub struct ConnectionRecoveryConfig {
1042    /// Whether connection recovery (resume) is enabled.
1043    /// When enabled, the server keeps a bounded replay buffer per channel so that
1044    /// reconnecting clients can receive missed messages. Disabled by default for
1045    /// Pusher protocol compatibility.
1046    pub enabled: bool,
1047    /// How long messages stay in the replay buffer (seconds). Default: 120 (2 min).
1048    pub buffer_ttl_seconds: u64,
1049    /// Maximum number of messages kept per channel. Default: 100.
1050    pub max_buffer_size: usize,
1051}
1052
1053#[derive(Debug, Clone, Serialize, Deserialize)]
1054#[serde(default)]
1055pub struct HttpApiConfig {
1056    pub request_limit_in_mb: u32,
1057    pub accept_traffic: AcceptTraffic,
1058    pub usage_enabled: bool,
1059}
1060
1061#[derive(Debug, Clone, Serialize, Deserialize)]
1062#[serde(default)]
1063pub struct AcceptTraffic {
1064    pub memory_threshold: f64,
1065}
1066
1067#[derive(Debug, Clone, Serialize, Deserialize)]
1068#[serde(default)]
1069pub struct InstanceConfig {
1070    pub process_id: String,
1071}
1072
1073#[derive(Debug, Clone, Serialize, Deserialize)]
1074#[serde(default)]
1075pub struct MetricsConfig {
1076    pub enabled: bool,
1077    pub driver: MetricsDriver,
1078    pub host: String,
1079    pub prometheus: PrometheusConfig,
1080    pub port: u16,
1081}
1082
1083#[derive(Debug, Clone, Serialize, Deserialize)]
1084#[serde(default)]
1085pub struct PrometheusConfig {
1086    pub prefix: String,
1087}
1088
1089#[derive(Debug, Clone, Serialize, Deserialize)]
1090#[serde(default)]
1091pub struct LoggingConfig {
1092    pub colors_enabled: bool,
1093    pub include_target: bool,
1094}
1095
1096#[derive(Debug, Clone, Serialize, Deserialize)]
1097#[serde(default)]
1098pub struct PresenceConfig {
1099    pub max_members_per_channel: u32,
1100    pub max_member_size_in_kb: u32,
1101}
1102
1103/// WebSocket connection buffer configuration
1104/// Controls backpressure handling for slow consumers
1105#[derive(Debug, Clone, Serialize, Deserialize)]
1106#[serde(default)]
1107pub struct WebSocketConfig {
1108    pub max_messages: Option<usize>,
1109    pub max_bytes: Option<usize>,
1110    pub disconnect_on_buffer_full: bool,
1111    pub max_message_size: usize,
1112    pub max_frame_size: usize,
1113    pub write_buffer_size: usize,
1114    pub max_backpressure: usize,
1115    pub auto_ping: bool,
1116    pub ping_interval: u32,
1117    pub idle_timeout: u32,
1118    pub compression: String,
1119}
1120
1121impl Default for WebSocketConfig {
1122    fn default() -> Self {
1123        Self {
1124            max_messages: Some(1000),
1125            max_bytes: None,
1126            disconnect_on_buffer_full: true,
1127            max_message_size: 64 * 1024 * 1024,
1128            max_frame_size: 16 * 1024 * 1024,
1129            write_buffer_size: 16 * 1024,
1130            max_backpressure: 1024 * 1024,
1131            auto_ping: true,
1132            ping_interval: 30,
1133            idle_timeout: 120,
1134            compression: "disabled".to_string(),
1135        }
1136    }
1137}
1138
1139impl WebSocketConfig {
1140    /// Convert to WebSocketBufferConfig for runtime use
1141    pub fn to_buffer_config(&self) -> crate::websocket::WebSocketBufferConfig {
1142        use crate::websocket::{BufferLimit, WebSocketBufferConfig};
1143
1144        let limit = match (self.max_messages, self.max_bytes) {
1145            (Some(messages), Some(bytes)) => BufferLimit::Both { messages, bytes },
1146            (Some(messages), None) => BufferLimit::Messages(messages),
1147            (None, Some(bytes)) => BufferLimit::Bytes(bytes),
1148            (None, None) => BufferLimit::Messages(1000),
1149        };
1150
1151        WebSocketBufferConfig {
1152            limit,
1153            disconnect_on_full: self.disconnect_on_buffer_full,
1154        }
1155    }
1156
1157    /// Convert to native sockudo-ws runtime configuration.
1158    pub fn to_sockudo_ws_config(
1159        &self,
1160        websocket_max_payload_kb: u32,
1161        activity_timeout: u64,
1162    ) -> sockudo_ws::Config {
1163        use sockudo_ws::Compression;
1164
1165        let compression = match self.compression.to_lowercase().as_str() {
1166            "dedicated" => Compression::Dedicated,
1167            "shared" => Compression::Shared,
1168            "window256b" => Compression::Window256B,
1169            "window1kb" => Compression::Window1KB,
1170            "window2kb" => Compression::Window2KB,
1171            "window4kb" => Compression::Window4KB,
1172            "window8kb" => Compression::Window8KB,
1173            "window16kb" => Compression::Window16KB,
1174            "window32kb" => Compression::Window32KB,
1175            _ => Compression::Disabled,
1176        };
1177
1178        sockudo_ws::Config::builder()
1179            .max_payload_length(
1180                self.max_bytes
1181                    .unwrap_or(websocket_max_payload_kb as usize * 1024),
1182            )
1183            .max_message_size(self.max_message_size)
1184            .max_frame_size(self.max_frame_size)
1185            .write_buffer_size(self.write_buffer_size)
1186            .max_backpressure(self.max_backpressure)
1187            .idle_timeout(self.idle_timeout)
1188            .auto_ping(self.auto_ping)
1189            .ping_interval(self.ping_interval.max((activity_timeout / 2).max(5) as u32))
1190            .compression(compression)
1191            .build()
1192    }
1193}
1194
1195#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1196#[serde(default)]
1197pub struct QueueConfig {
1198    pub driver: QueueDriver,
1199    pub redis: RedisQueueConfig,
1200    pub redis_cluster: RedisClusterQueueConfig,
1201    pub sqs: SqsQueueConfig,
1202    pub sns: SnsQueueConfig,
1203}
1204
1205#[derive(Debug, Clone, Serialize, Deserialize)]
1206#[serde(default)]
1207pub struct RedisQueueConfig {
1208    pub concurrency: u32,
1209    pub prefix: Option<String>,
1210    pub url_override: Option<String>,
1211    pub cluster_mode: bool,
1212}
1213
1214#[derive(Clone, Debug, Serialize, Deserialize)]
1215#[serde(default)]
1216pub struct RateLimit {
1217    pub max_requests: u32,
1218    pub window_seconds: u64,
1219    pub identifier: Option<String>,
1220    pub trust_hops: Option<u32>,
1221}
1222
1223#[derive(Debug, Clone, Serialize, Deserialize)]
1224#[serde(default)]
1225pub struct RateLimiterConfig {
1226    pub enabled: bool,
1227    pub driver: CacheDriver,
1228    pub api_rate_limit: RateLimit,
1229    pub websocket_rate_limit: RateLimit,
1230    pub redis: RedisConfig,
1231}
1232
1233#[derive(Debug, Clone, Serialize, Deserialize)]
1234#[serde(default)]
1235pub struct SslConfig {
1236    pub enabled: bool,
1237    pub cert_path: String,
1238    pub key_path: String,
1239    pub passphrase: Option<String>,
1240    pub ca_path: Option<String>,
1241    pub redirect_http: bool,
1242    pub http_port: Option<u16>,
1243}
1244
1245#[derive(Debug, Clone, Serialize, Deserialize)]
1246#[serde(default)]
1247pub struct WebhooksConfig {
1248    pub batching: BatchingConfig,
1249    pub retry: WebhookRetryConfig,
1250    pub request_timeout_ms: u64,
1251}
1252
1253#[derive(Debug, Clone, Serialize, Deserialize)]
1254#[serde(default)]
1255pub struct BatchingConfig {
1256    pub enabled: bool,
1257    pub duration: u64,
1258    pub size: usize,
1259}
1260
1261#[derive(Debug, Clone, Serialize, Deserialize)]
1262#[serde(default)]
1263pub struct WebhookRetryConfig {
1264    pub enabled: bool,
1265    pub max_attempts: Option<u32>,
1266    pub max_elapsed_time_ms: u64,
1267    pub initial_backoff_ms: u64,
1268    pub max_backoff_ms: u64,
1269}
1270
1271impl Default for WebhooksConfig {
1272    fn default() -> Self {
1273        Self {
1274            batching: BatchingConfig::default(),
1275            retry: WebhookRetryConfig::default(),
1276            request_timeout_ms: 10_000,
1277        }
1278    }
1279}
1280
1281impl Default for WebhookRetryConfig {
1282    fn default() -> Self {
1283        Self {
1284            enabled: true,
1285            max_attempts: None,
1286            max_elapsed_time_ms: 300_000,
1287            initial_backoff_ms: 1_000,
1288            max_backoff_ms: 60_000,
1289        }
1290    }
1291}
1292
1293#[derive(Debug, Clone, Serialize, Deserialize)]
1294#[serde(default)]
1295pub struct ClusterHealthConfig {
1296    pub enabled: bool,
1297    pub heartbeat_interval_ms: u64,
1298    pub node_timeout_ms: u64,
1299    pub cleanup_interval_ms: u64,
1300}
1301
1302#[derive(Debug, Clone, Serialize, Deserialize)]
1303#[serde(default)]
1304pub struct UnixSocketConfig {
1305    pub enabled: bool,
1306    pub path: String,
1307    #[serde(deserialize_with = "deserialize_octal_permission")]
1308    pub permission_mode: u32,
1309}
1310
1311#[derive(Debug, Clone, Serialize, Deserialize)]
1312#[serde(default)]
1313pub struct DeltaCompressionOptionsConfig {
1314    pub enabled: bool,
1315    pub algorithm: String,
1316    pub full_message_interval: u32,
1317    pub min_message_size: usize,
1318    pub max_state_age_secs: u64,
1319    pub max_channel_states_per_socket: usize,
1320    pub max_conflation_states_per_channel: Option<usize>,
1321    pub conflation_key_path: Option<String>,
1322    pub cluster_coordination: bool,
1323    pub omit_delta_algorithm: bool,
1324}
1325
1326/// Cleanup system configuration (minimal version for options; full impl lives in sockudo crate)
1327#[derive(Debug, Clone, Serialize, Deserialize)]
1328#[serde(default)]
1329pub struct CleanupConfig {
1330    pub queue_buffer_size: usize,
1331    pub batch_size: usize,
1332    pub batch_timeout_ms: u64,
1333    pub worker_threads: WorkerThreadsConfig,
1334    pub max_retry_attempts: u32,
1335    pub async_enabled: bool,
1336    pub fallback_to_sync: bool,
1337}
1338
1339/// Worker threads configuration for the cleanup system
1340#[derive(Debug, Clone)]
1341pub enum WorkerThreadsConfig {
1342    Auto,
1343    Fixed(usize),
1344}
1345
1346impl serde::Serialize for WorkerThreadsConfig {
1347    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1348    where
1349        S: serde::Serializer,
1350    {
1351        match self {
1352            WorkerThreadsConfig::Auto => serializer.serialize_str("auto"),
1353            WorkerThreadsConfig::Fixed(n) => serializer.serialize_u64(*n as u64),
1354        }
1355    }
1356}
1357
1358impl<'de> serde::Deserialize<'de> for WorkerThreadsConfig {
1359    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1360    where
1361        D: serde::Deserializer<'de>,
1362    {
1363        use serde::de;
1364        struct WorkerThreadsVisitor;
1365        impl<'de> de::Visitor<'de> for WorkerThreadsVisitor {
1366            type Value = WorkerThreadsConfig;
1367
1368            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1369                formatter.write_str(r#""auto" or a positive integer"#)
1370            }
1371
1372            fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
1373                if value.eq_ignore_ascii_case("auto") {
1374                    Ok(WorkerThreadsConfig::Auto)
1375                } else if let Ok(n) = value.parse::<usize>() {
1376                    Ok(WorkerThreadsConfig::Fixed(n))
1377                } else {
1378                    Err(E::custom(format!(
1379                        "expected 'auto' or a number, got '{value}'"
1380                    )))
1381                }
1382            }
1383
1384            fn visit_u64<E: de::Error>(self, value: u64) -> Result<Self::Value, E> {
1385                Ok(WorkerThreadsConfig::Fixed(value as usize))
1386            }
1387
1388            fn visit_i64<E: de::Error>(self, value: i64) -> Result<Self::Value, E> {
1389                if value >= 0 {
1390                    Ok(WorkerThreadsConfig::Fixed(value as usize))
1391                } else {
1392                    Err(E::custom("worker_threads must be non-negative"))
1393                }
1394            }
1395        }
1396        deserializer.deserialize_any(WorkerThreadsVisitor)
1397    }
1398}
1399
1400impl Default for CleanupConfig {
1401    fn default() -> Self {
1402        Self {
1403            queue_buffer_size: 1024,
1404            batch_size: 64,
1405            batch_timeout_ms: 100,
1406            worker_threads: WorkerThreadsConfig::Auto,
1407            max_retry_attempts: 3,
1408            async_enabled: true,
1409            fallback_to_sync: true,
1410        }
1411    }
1412}
1413
1414impl CleanupConfig {
1415    pub fn validate(&self) -> Result<(), String> {
1416        if self.queue_buffer_size == 0 {
1417            return Err("queue_buffer_size must be greater than 0".to_string());
1418        }
1419        if self.batch_size == 0 {
1420            return Err("batch_size must be greater than 0".to_string());
1421        }
1422        if self.batch_timeout_ms == 0 {
1423            return Err("batch_timeout_ms must be greater than 0".to_string());
1424        }
1425        if let WorkerThreadsConfig::Fixed(n) = self.worker_threads
1426            && n == 0
1427        {
1428            return Err("worker_threads must be greater than 0 when using fixed count".to_string());
1429        }
1430        Ok(())
1431    }
1432}
1433
1434#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1435#[serde(default)]
1436pub struct TagFilteringConfig {
1437    #[serde(default)]
1438    pub enabled: bool,
1439    #[serde(default = "default_true")]
1440    pub enable_tags: bool,
1441}
1442
1443fn default_true() -> bool {
1444    true
1445}
1446
1447// --- Default Implementations ---
1448
1449impl Default for ServerOptions {
1450    fn default() -> Self {
1451        Self {
1452            adapter: AdapterConfig::default(),
1453            app_manager: AppManagerConfig::default(),
1454            cache: CacheConfig::default(),
1455            channel_limits: ChannelLimits::default(),
1456            cors: CorsConfig::default(),
1457            database: DatabaseConfig::default(),
1458            database_pooling: DatabasePooling::default(),
1459            debug: false,
1460            tag_filtering: TagFilteringConfig::default(),
1461            event_limits: EventLimits::default(),
1462            host: "0.0.0.0".to_string(),
1463            http_api: HttpApiConfig::default(),
1464            instance: InstanceConfig::default(),
1465            logging: None,
1466            metrics: MetricsConfig::default(),
1467            mode: "production".to_string(),
1468            port: 6001,
1469            path_prefix: "/".to_string(),
1470            presence: PresenceConfig::default(),
1471            queue: QueueConfig::default(),
1472            rate_limiter: RateLimiterConfig::default(),
1473            shutdown_grace_period: 10,
1474            ssl: SslConfig::default(),
1475            user_authentication_timeout: 3600,
1476            webhooks: WebhooksConfig::default(),
1477            websocket_max_payload_kb: 64,
1478            cleanup: CleanupConfig::default(),
1479            activity_timeout: 120,
1480            cluster_health: ClusterHealthConfig::default(),
1481            unix_socket: UnixSocketConfig::default(),
1482            delta_compression: DeltaCompressionOptionsConfig::default(),
1483            websocket: WebSocketConfig::default(),
1484            connection_recovery: ConnectionRecoveryConfig::default(),
1485            idempotency: IdempotencyConfig::default(),
1486            ephemeral: EphemeralConfig::default(),
1487            echo_control: EchoControlConfig::default(),
1488            event_name_filtering: EventNameFilteringConfig::default(),
1489        }
1490    }
1491}
1492
1493impl Default for SqsQueueConfig {
1494    fn default() -> Self {
1495        Self {
1496            region: "us-east-1".to_string(),
1497            queue_url_prefix: None,
1498            visibility_timeout: 30,
1499            endpoint_url: None,
1500            max_messages: 10,
1501            wait_time_seconds: 5,
1502            concurrency: 5,
1503            fifo: false,
1504            message_group_id: Some("default".to_string()),
1505        }
1506    }
1507}
1508
1509impl Default for SnsQueueConfig {
1510    fn default() -> Self {
1511        Self {
1512            region: "us-east-1".to_string(),
1513            topic_arn: String::new(),
1514            endpoint_url: None,
1515        }
1516    }
1517}
1518
1519impl Default for RedisAdapterConfig {
1520    fn default() -> Self {
1521        Self {
1522            requests_timeout: 5000,
1523            prefix: "sockudo_adapter:".to_string(),
1524            redis_pub_options: AHashMap::new(),
1525            redis_sub_options: AHashMap::new(),
1526            cluster_mode: false,
1527        }
1528    }
1529}
1530
1531impl Default for RedisClusterAdapterConfig {
1532    fn default() -> Self {
1533        Self {
1534            nodes: vec![],
1535            prefix: "sockudo_adapter:".to_string(),
1536            request_timeout_ms: 1000,
1537            use_connection_manager: true,
1538            use_sharded_pubsub: false,
1539        }
1540    }
1541}
1542
1543impl Default for NatsAdapterConfig {
1544    fn default() -> Self {
1545        Self {
1546            servers: vec!["nats://localhost:4222".to_string()],
1547            prefix: "sockudo_adapter:".to_string(),
1548            request_timeout_ms: 5000,
1549            username: None,
1550            password: None,
1551            token: None,
1552            connection_timeout_ms: 5000,
1553            nodes_number: None,
1554            discovery_max_wait_ms: 1000,
1555            discovery_idle_wait_ms: 150,
1556        }
1557    }
1558}
1559
1560impl Default for PulsarAdapterConfig {
1561    fn default() -> Self {
1562        Self {
1563            url: "pulsar://127.0.0.1:6650".to_string(),
1564            prefix: "sockudo-adapter".to_string(),
1565            request_timeout_ms: 5000,
1566            token: None,
1567            nodes_number: None,
1568        }
1569    }
1570}
1571
1572impl Default for RabbitMqAdapterConfig {
1573    fn default() -> Self {
1574        Self {
1575            url: "amqp://guest:guest@127.0.0.1:5672/%2f".to_string(),
1576            prefix: "sockudo_adapter".to_string(),
1577            request_timeout_ms: 5000,
1578            connection_timeout_ms: 5000,
1579            nodes_number: None,
1580        }
1581    }
1582}
1583
1584impl Default for GooglePubSubAdapterConfig {
1585    fn default() -> Self {
1586        Self {
1587            project_id: "".to_string(),
1588            prefix: "sockudo-adapter".to_string(),
1589            request_timeout_ms: 5000,
1590            emulator_host: None,
1591            nodes_number: None,
1592        }
1593    }
1594}
1595
1596impl Default for KafkaAdapterConfig {
1597    fn default() -> Self {
1598        Self {
1599            brokers: vec!["localhost:9092".to_string()],
1600            prefix: "sockudo_adapter".to_string(),
1601            request_timeout_ms: 5000,
1602            security_protocol: None,
1603            sasl_mechanism: None,
1604            sasl_username: None,
1605            sasl_password: None,
1606            nodes_number: None,
1607        }
1608    }
1609}
1610
1611impl Default for CacheSettings {
1612    fn default() -> Self {
1613        Self {
1614            enabled: true,
1615            ttl: 300,
1616        }
1617    }
1618}
1619
1620impl Default for MemoryCacheOptions {
1621    fn default() -> Self {
1622        Self {
1623            ttl: 300,
1624            cleanup_interval: 60,
1625            max_capacity: 10000,
1626        }
1627    }
1628}
1629
1630impl Default for CacheConfig {
1631    fn default() -> Self {
1632        Self {
1633            driver: CacheDriver::default(),
1634            redis: RedisConfig {
1635                prefix: Some("sockudo_cache:".to_string()),
1636                url_override: None,
1637                cluster_mode: false,
1638            },
1639            memory: MemoryCacheOptions::default(),
1640        }
1641    }
1642}
1643
1644impl Default for ChannelLimits {
1645    fn default() -> Self {
1646        Self {
1647            max_name_length: 200,
1648            cache_ttl: 3600,
1649        }
1650    }
1651}
1652impl Default for CorsConfig {
1653    fn default() -> Self {
1654        Self {
1655            credentials: true,
1656            origin: vec!["*".to_string()],
1657            methods: vec!["GET".to_string(), "POST".to_string(), "OPTIONS".to_string()],
1658            allowed_headers: vec![
1659                "Authorization".to_string(),
1660                "Content-Type".to_string(),
1661                "X-Requested-With".to_string(),
1662                "Accept".to_string(),
1663            ],
1664        }
1665    }
1666}
1667
1668impl Default for DatabaseConnection {
1669    fn default() -> Self {
1670        Self {
1671            host: "localhost".to_string(),
1672            port: 3306,
1673            username: "root".to_string(),
1674            password: "".to_string(),
1675            database: "sockudo".to_string(),
1676            table_name: "applications".to_string(),
1677            connection_pool_size: 10,
1678            pool_min: None,
1679            pool_max: None,
1680            cache_ttl: 300,
1681            cache_cleanup_interval: 60,
1682            cache_max_capacity: 100,
1683        }
1684    }
1685}
1686
1687impl Default for RedisConnection {
1688    fn default() -> Self {
1689        Self {
1690            host: "127.0.0.1".to_string(),
1691            port: 6379,
1692            db: 0,
1693            username: None,
1694            password: None,
1695            key_prefix: "sockudo:".to_string(),
1696            sentinels: Vec::new(),
1697            sentinel_password: None,
1698            name: "mymaster".to_string(),
1699            cluster: RedisClusterConnection::default(),
1700            cluster_nodes: Vec::new(),
1701        }
1702    }
1703}
1704
1705impl Default for RedisSentinel {
1706    fn default() -> Self {
1707        Self {
1708            host: "localhost".to_string(),
1709            port: 26379,
1710        }
1711    }
1712}
1713
1714impl Default for ClusterNode {
1715    fn default() -> Self {
1716        Self {
1717            host: "127.0.0.1".to_string(),
1718            port: 7000,
1719        }
1720    }
1721}
1722
1723impl Default for DatabasePooling {
1724    fn default() -> Self {
1725        Self {
1726            enabled: true,
1727            min: 2,
1728            max: 10,
1729        }
1730    }
1731}
1732
1733impl Default for EventLimits {
1734    fn default() -> Self {
1735        Self {
1736            max_channels_at_once: 100,
1737            max_name_length: 200,
1738            max_payload_in_kb: 100,
1739            max_batch_size: 10,
1740        }
1741    }
1742}
1743
1744impl Default for IdempotencyConfig {
1745    fn default() -> Self {
1746        Self {
1747            enabled: true,
1748            ttl_seconds: 120,
1749            max_key_length: 128,
1750        }
1751    }
1752}
1753
1754impl Default for EphemeralConfig {
1755    fn default() -> Self {
1756        Self { enabled: true }
1757    }
1758}
1759
1760impl Default for EchoControlConfig {
1761    fn default() -> Self {
1762        Self {
1763            enabled: true,
1764            default_echo_messages: true,
1765        }
1766    }
1767}
1768
1769impl Default for EventNameFilteringConfig {
1770    fn default() -> Self {
1771        Self {
1772            enabled: true,
1773            max_events_per_filter: 50,
1774            max_event_name_length: 200,
1775        }
1776    }
1777}
1778
1779impl Default for ConnectionRecoveryConfig {
1780    fn default() -> Self {
1781        Self {
1782            enabled: false,
1783            buffer_ttl_seconds: 120,
1784            max_buffer_size: 100,
1785        }
1786    }
1787}
1788
1789impl Default for HttpApiConfig {
1790    fn default() -> Self {
1791        Self {
1792            request_limit_in_mb: 10,
1793            accept_traffic: AcceptTraffic::default(),
1794            usage_enabled: true,
1795        }
1796    }
1797}
1798
1799impl Default for AcceptTraffic {
1800    fn default() -> Self {
1801        Self {
1802            memory_threshold: 0.90,
1803        }
1804    }
1805}
1806
1807impl Default for InstanceConfig {
1808    fn default() -> Self {
1809        Self {
1810            process_id: uuid::Uuid::new_v4().to_string(),
1811        }
1812    }
1813}
1814
1815impl Default for LoggingConfig {
1816    fn default() -> Self {
1817        Self {
1818            colors_enabled: true,
1819            include_target: true,
1820        }
1821    }
1822}
1823
1824impl Default for MetricsConfig {
1825    fn default() -> Self {
1826        Self {
1827            enabled: true,
1828            driver: MetricsDriver::default(),
1829            host: "0.0.0.0".to_string(),
1830            prometheus: PrometheusConfig::default(),
1831            port: 9601,
1832        }
1833    }
1834}
1835
1836impl Default for PrometheusConfig {
1837    fn default() -> Self {
1838        Self {
1839            prefix: "sockudo_".to_string(),
1840        }
1841    }
1842}
1843
1844impl Default for PresenceConfig {
1845    fn default() -> Self {
1846        Self {
1847            max_members_per_channel: 100,
1848            max_member_size_in_kb: 2,
1849        }
1850    }
1851}
1852
1853impl Default for RedisQueueConfig {
1854    fn default() -> Self {
1855        Self {
1856            concurrency: 5,
1857            prefix: Some("sockudo_queue:".to_string()),
1858            url_override: None,
1859            cluster_mode: false,
1860        }
1861    }
1862}
1863
1864impl Default for RateLimit {
1865    fn default() -> Self {
1866        Self {
1867            max_requests: 60,
1868            window_seconds: 60,
1869            identifier: Some("default".to_string()),
1870            trust_hops: Some(0),
1871        }
1872    }
1873}
1874
1875impl Default for RateLimiterConfig {
1876    fn default() -> Self {
1877        Self {
1878            enabled: true,
1879            driver: CacheDriver::Memory,
1880            api_rate_limit: RateLimit {
1881                max_requests: 100,
1882                window_seconds: 60,
1883                identifier: Some("api".to_string()),
1884                trust_hops: Some(0),
1885            },
1886            websocket_rate_limit: RateLimit {
1887                max_requests: 20,
1888                window_seconds: 60,
1889                identifier: Some("websocket_connect".to_string()),
1890                trust_hops: Some(0),
1891            },
1892            redis: RedisConfig {
1893                prefix: Some("sockudo_rl:".to_string()),
1894                url_override: None,
1895                cluster_mode: false,
1896            },
1897        }
1898    }
1899}
1900
1901impl Default for SslConfig {
1902    fn default() -> Self {
1903        Self {
1904            enabled: false,
1905            cert_path: "".to_string(),
1906            key_path: "".to_string(),
1907            passphrase: None,
1908            ca_path: None,
1909            redirect_http: false,
1910            http_port: Some(80),
1911        }
1912    }
1913}
1914
1915impl Default for BatchingConfig {
1916    fn default() -> Self {
1917        Self {
1918            enabled: true,
1919            duration: 50,
1920            size: 100,
1921        }
1922    }
1923}
1924
1925impl Default for ClusterHealthConfig {
1926    fn default() -> Self {
1927        Self {
1928            enabled: true,
1929            heartbeat_interval_ms: 10000,
1930            node_timeout_ms: 30000,
1931            cleanup_interval_ms: 10000,
1932        }
1933    }
1934}
1935
1936impl Default for UnixSocketConfig {
1937    fn default() -> Self {
1938        Self {
1939            enabled: false,
1940            path: "/var/run/sockudo/sockudo.sock".to_string(),
1941            permission_mode: 0o660,
1942        }
1943    }
1944}
1945
1946impl Default for DeltaCompressionOptionsConfig {
1947    fn default() -> Self {
1948        Self {
1949            enabled: true,
1950            algorithm: "fossil".to_string(),
1951            full_message_interval: 10,
1952            min_message_size: 100,
1953            max_state_age_secs: 300,
1954            max_channel_states_per_socket: 100,
1955            max_conflation_states_per_channel: Some(100),
1956            conflation_key_path: None,
1957            cluster_coordination: false,
1958            omit_delta_algorithm: false,
1959        }
1960    }
1961}
1962
1963impl ClusterHealthConfig {
1964    pub fn validate(&self) -> Result<(), String> {
1965        if self.heartbeat_interval_ms == 0 {
1966            return Err("heartbeat_interval_ms must be greater than 0".to_string());
1967        }
1968        if self.node_timeout_ms == 0 {
1969            return Err("node_timeout_ms must be greater than 0".to_string());
1970        }
1971        if self.cleanup_interval_ms == 0 {
1972            return Err("cleanup_interval_ms must be greater than 0".to_string());
1973        }
1974
1975        if self.heartbeat_interval_ms > self.node_timeout_ms / 3 {
1976            return Err(format!(
1977                "heartbeat_interval_ms ({}) should be at least 3x smaller than node_timeout_ms ({}) to avoid false positive dead node detection. Recommended: heartbeat_interval_ms <= {}",
1978                self.heartbeat_interval_ms,
1979                self.node_timeout_ms,
1980                self.node_timeout_ms / 3
1981            ));
1982        }
1983
1984        if self.cleanup_interval_ms > self.node_timeout_ms {
1985            return Err(format!(
1986                "cleanup_interval_ms ({}) should not be larger than node_timeout_ms ({}) to ensure timely dead node detection",
1987                self.cleanup_interval_ms, self.node_timeout_ms
1988            ));
1989        }
1990
1991        Ok(())
1992    }
1993}
1994
1995impl ServerOptions {
1996    pub async fn load_from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
1997        let content = tokio::fs::read_to_string(path).await?;
1998        let options: Self = if path.ends_with(".toml") {
1999            toml::from_str(&content)?
2000        } else {
2001            // Legacy JSON support
2002            sonic_rs::from_str(&content)?
2003        };
2004        Ok(options)
2005    }
2006
2007    pub async fn override_from_env(&mut self) -> Result<(), Box<dyn std::error::Error>> {
2008        // --- General Settings ---
2009        if let Ok(mode) = std::env::var("ENVIRONMENT") {
2010            self.mode = mode;
2011        }
2012        self.debug = parse_bool_env("DEBUG_MODE", self.debug);
2013        if parse_bool_env("DEBUG", false) {
2014            self.debug = true;
2015            info!("DEBUG environment variable forces debug mode ON");
2016        }
2017
2018        self.activity_timeout = parse_env::<u64>("ACTIVITY_TIMEOUT", self.activity_timeout);
2019
2020        if let Ok(host) = std::env::var("HOST") {
2021            self.host = host;
2022        }
2023        self.port = parse_env::<u16>("PORT", self.port);
2024        self.shutdown_grace_period =
2025            parse_env::<u64>("SHUTDOWN_GRACE_PERIOD", self.shutdown_grace_period);
2026        self.user_authentication_timeout = parse_env::<u64>(
2027            "USER_AUTHENTICATION_TIMEOUT",
2028            self.user_authentication_timeout,
2029        );
2030        self.websocket_max_payload_kb =
2031            parse_env::<u32>("WEBSOCKET_MAX_PAYLOAD_KB", self.websocket_max_payload_kb);
2032        if let Ok(id) = std::env::var("INSTANCE_PROCESS_ID") {
2033            self.instance.process_id = id;
2034        }
2035
2036        // --- Driver Configuration ---
2037        if let Ok(driver_str) = std::env::var("ADAPTER_DRIVER") {
2038            self.adapter.driver =
2039                parse_driver_enum(driver_str, self.adapter.driver.clone(), "Adapter");
2040        }
2041        self.adapter.buffer_multiplier_per_cpu = parse_env::<usize>(
2042            "ADAPTER_BUFFER_MULTIPLIER_PER_CPU",
2043            self.adapter.buffer_multiplier_per_cpu,
2044        );
2045        self.adapter.enable_socket_counting = parse_env::<bool>(
2046            "ADAPTER_ENABLE_SOCKET_COUNTING",
2047            self.adapter.enable_socket_counting,
2048        );
2049        if let Ok(driver_str) = std::env::var("CACHE_DRIVER") {
2050            self.cache.driver = parse_driver_enum(driver_str, self.cache.driver.clone(), "Cache");
2051        }
2052        if let Ok(driver_str) = std::env::var("QUEUE_DRIVER") {
2053            self.queue.driver = parse_driver_enum(driver_str, self.queue.driver.clone(), "Queue");
2054        }
2055        if let Ok(driver_str) = std::env::var("APP_MANAGER_DRIVER") {
2056            self.app_manager.driver =
2057                parse_driver_enum(driver_str, self.app_manager.driver.clone(), "AppManager");
2058        }
2059        if let Ok(driver_str) = std::env::var("RATE_LIMITER_DRIVER") {
2060            self.rate_limiter.driver = parse_driver_enum(
2061                driver_str,
2062                self.rate_limiter.driver.clone(),
2063                "RateLimiter Backend",
2064            );
2065        }
2066
2067        // --- Database: Redis ---
2068        if let Ok(host) = std::env::var("DATABASE_REDIS_HOST") {
2069            self.database.redis.host = host;
2070        }
2071        self.database.redis.port =
2072            parse_env::<u16>("DATABASE_REDIS_PORT", self.database.redis.port);
2073        if let Ok(username) = std::env::var("DATABASE_REDIS_USERNAME") {
2074            self.database.redis.username = if username.is_empty() {
2075                None
2076            } else {
2077                Some(username)
2078            };
2079        }
2080        if let Ok(password) = std::env::var("DATABASE_REDIS_PASSWORD") {
2081            self.database.redis.password = Some(password);
2082        }
2083        self.database.redis.db = parse_env::<u32>("DATABASE_REDIS_DB", self.database.redis.db);
2084        if let Ok(prefix) = std::env::var("DATABASE_REDIS_KEY_PREFIX") {
2085            self.database.redis.key_prefix = prefix;
2086        }
2087        if let Ok(cluster_username) = std::env::var("DATABASE_REDIS_CLUSTER_USERNAME") {
2088            self.database.redis.cluster.username = if cluster_username.is_empty() {
2089                None
2090            } else {
2091                Some(cluster_username)
2092            };
2093        }
2094        if let Ok(cluster_password) = std::env::var("DATABASE_REDIS_CLUSTER_PASSWORD") {
2095            self.database.redis.cluster.password = Some(cluster_password);
2096        }
2097        self.database.redis.cluster.use_tls = parse_bool_env(
2098            "DATABASE_REDIS_CLUSTER_USE_TLS",
2099            self.database.redis.cluster.use_tls,
2100        );
2101
2102        // --- Database: MySQL ---
2103        if let Ok(host) = std::env::var("DATABASE_MYSQL_HOST") {
2104            self.database.mysql.host = host;
2105        }
2106        self.database.mysql.port =
2107            parse_env::<u16>("DATABASE_MYSQL_PORT", self.database.mysql.port);
2108        if let Ok(user) = std::env::var("DATABASE_MYSQL_USERNAME") {
2109            self.database.mysql.username = user;
2110        }
2111        if let Ok(pass) = std::env::var("DATABASE_MYSQL_PASSWORD") {
2112            self.database.mysql.password = pass;
2113        }
2114        if let Ok(db) = std::env::var("DATABASE_MYSQL_DATABASE") {
2115            self.database.mysql.database = db;
2116        }
2117        if let Ok(table) = std::env::var("DATABASE_MYSQL_TABLE_NAME") {
2118            self.database.mysql.table_name = table;
2119        }
2120        override_db_pool_settings(&mut self.database.mysql, "DATABASE_MYSQL");
2121
2122        // --- Database: PostgreSQL ---
2123        if let Ok(host) = std::env::var("DATABASE_POSTGRES_HOST") {
2124            self.database.postgres.host = host;
2125        }
2126        self.database.postgres.port =
2127            parse_env::<u16>("DATABASE_POSTGRES_PORT", self.database.postgres.port);
2128        if let Ok(user) = std::env::var("DATABASE_POSTGRES_USERNAME") {
2129            self.database.postgres.username = user;
2130        }
2131        if let Ok(pass) = std::env::var("DATABASE_POSTGRES_PASSWORD") {
2132            self.database.postgres.password = pass;
2133        }
2134        if let Ok(db) = std::env::var("DATABASE_POSTGRES_DATABASE") {
2135            self.database.postgres.database = db;
2136        }
2137        override_db_pool_settings(&mut self.database.postgres, "DATABASE_POSTGRES");
2138
2139        // --- Database: DynamoDB ---
2140        if let Ok(region) = std::env::var("DATABASE_DYNAMODB_REGION") {
2141            self.database.dynamodb.region = region;
2142        }
2143        if let Ok(table) = std::env::var("DATABASE_DYNAMODB_TABLE_NAME") {
2144            self.database.dynamodb.table_name = table;
2145        }
2146        if let Ok(endpoint) = std::env::var("DATABASE_DYNAMODB_ENDPOINT_URL") {
2147            self.database.dynamodb.endpoint_url = Some(endpoint);
2148        }
2149        if let Ok(key_id) = std::env::var("AWS_ACCESS_KEY_ID") {
2150            self.database.dynamodb.aws_access_key_id = Some(key_id);
2151        }
2152        if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") {
2153            self.database.dynamodb.aws_secret_access_key = Some(secret);
2154        }
2155
2156        // --- Database: SurrealDB ---
2157        if let Ok(url) = std::env::var("DATABASE_SURREALDB_URL") {
2158            self.database.surrealdb.url = url;
2159        }
2160        if let Ok(namespace) = std::env::var("DATABASE_SURREALDB_NAMESPACE") {
2161            self.database.surrealdb.namespace = namespace;
2162        }
2163        if let Ok(database) = std::env::var("DATABASE_SURREALDB_DATABASE") {
2164            self.database.surrealdb.database = database;
2165        }
2166        if let Ok(username) = std::env::var("DATABASE_SURREALDB_USERNAME") {
2167            self.database.surrealdb.username = username;
2168        }
2169        if let Ok(password) = std::env::var("DATABASE_SURREALDB_PASSWORD") {
2170            self.database.surrealdb.password = password;
2171        }
2172        if let Ok(table) = std::env::var("DATABASE_SURREALDB_TABLE_NAME") {
2173            self.database.surrealdb.table_name = table;
2174        }
2175
2176        // --- Redis Cluster ---
2177        let apply_redis_cluster_nodes = |options: &mut Self, nodes: &str| {
2178            let node_list: Vec<String> = nodes
2179                .split(',')
2180                .map(|s| s.trim())
2181                .filter(|s| !s.is_empty())
2182                .map(ToString::to_string)
2183                .collect();
2184
2185            options.adapter.cluster.nodes = node_list.clone();
2186            options.queue.redis_cluster.nodes = node_list.clone();
2187
2188            let parsed_nodes: Vec<ClusterNode> = node_list
2189                .iter()
2190                .filter_map(|seed| ClusterNode::from_seed(seed))
2191                .collect();
2192            options.database.redis.cluster.nodes = parsed_nodes.clone();
2193            options.database.redis.cluster_nodes = parsed_nodes;
2194        };
2195
2196        if let Ok(nodes) = std::env::var("REDIS_CLUSTER_NODES") {
2197            apply_redis_cluster_nodes(self, &nodes);
2198        }
2199        if let Ok(nodes) = std::env::var("DATABASE_REDIS_CLUSTER_NODES") {
2200            apply_redis_cluster_nodes(self, &nodes);
2201        }
2202        self.queue.redis_cluster.concurrency = parse_env::<u32>(
2203            "REDIS_CLUSTER_QUEUE_CONCURRENCY",
2204            self.queue.redis_cluster.concurrency,
2205        );
2206        if let Ok(prefix) = std::env::var("REDIS_CLUSTER_QUEUE_PREFIX") {
2207            self.queue.redis_cluster.prefix = Some(prefix);
2208        }
2209
2210        // --- SSL Configuration ---
2211        self.ssl.enabled = parse_bool_env("SSL_ENABLED", self.ssl.enabled);
2212        if let Ok(val) = std::env::var("SSL_CERT_PATH") {
2213            self.ssl.cert_path = val;
2214        }
2215        if let Ok(val) = std::env::var("SSL_KEY_PATH") {
2216            self.ssl.key_path = val;
2217        }
2218        self.ssl.redirect_http = parse_bool_env("SSL_REDIRECT_HTTP", self.ssl.redirect_http);
2219        if let Some(port) = parse_env_optional::<u16>("SSL_HTTP_PORT") {
2220            self.ssl.http_port = Some(port);
2221        }
2222
2223        // --- Unix Socket Configuration ---
2224        self.unix_socket.enabled = parse_bool_env("UNIX_SOCKET_ENABLED", self.unix_socket.enabled);
2225        if let Ok(path) = std::env::var("UNIX_SOCKET_PATH") {
2226            self.unix_socket.path = path;
2227        }
2228        if let Ok(mode_str) = std::env::var("UNIX_SOCKET_PERMISSION_MODE") {
2229            if mode_str.chars().all(|c| c.is_digit(8)) {
2230                if let Ok(mode) = u32::from_str_radix(&mode_str, 8) {
2231                    if mode <= 0o777 {
2232                        self.unix_socket.permission_mode = mode;
2233                    } else {
2234                        warn!(
2235                            "UNIX_SOCKET_PERMISSION_MODE '{}' exceeds maximum value 777. Using default: {:o}",
2236                            mode_str, self.unix_socket.permission_mode
2237                        );
2238                    }
2239                } else {
2240                    warn!(
2241                        "Failed to parse UNIX_SOCKET_PERMISSION_MODE '{}' as octal. Using default: {:o}",
2242                        mode_str, self.unix_socket.permission_mode
2243                    );
2244                }
2245            } else {
2246                warn!(
2247                    "UNIX_SOCKET_PERMISSION_MODE '{}' must contain only octal digits (0-7). Using default: {:o}",
2248                    mode_str, self.unix_socket.permission_mode
2249                );
2250            }
2251        }
2252
2253        // --- Metrics ---
2254        if let Ok(driver_str) = std::env::var("METRICS_DRIVER") {
2255            self.metrics.driver =
2256                parse_driver_enum(driver_str, self.metrics.driver.clone(), "Metrics");
2257        }
2258        self.metrics.enabled = parse_bool_env("METRICS_ENABLED", self.metrics.enabled);
2259        if let Ok(val) = std::env::var("METRICS_HOST") {
2260            self.metrics.host = val;
2261        }
2262        self.metrics.port = parse_env::<u16>("METRICS_PORT", self.metrics.port);
2263        if let Ok(val) = std::env::var("METRICS_PROMETHEUS_PREFIX") {
2264            self.metrics.prometheus.prefix = val;
2265        }
2266
2267        // --- HTTP API ---
2268        self.http_api.usage_enabled =
2269            parse_bool_env("HTTP_API_USAGE_ENABLED", self.http_api.usage_enabled);
2270
2271        // --- Rate Limiter ---
2272        self.rate_limiter.enabled =
2273            parse_bool_env("RATE_LIMITER_ENABLED", self.rate_limiter.enabled);
2274        self.rate_limiter.api_rate_limit.max_requests = parse_env::<u32>(
2275            "RATE_LIMITER_API_MAX_REQUESTS",
2276            self.rate_limiter.api_rate_limit.max_requests,
2277        );
2278        self.rate_limiter.api_rate_limit.window_seconds = parse_env::<u64>(
2279            "RATE_LIMITER_API_WINDOW_SECONDS",
2280            self.rate_limiter.api_rate_limit.window_seconds,
2281        );
2282        if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_API_TRUST_HOPS") {
2283            self.rate_limiter.api_rate_limit.trust_hops = Some(hops);
2284        }
2285        self.rate_limiter.websocket_rate_limit.max_requests = parse_env::<u32>(
2286            "RATE_LIMITER_WS_MAX_REQUESTS",
2287            self.rate_limiter.websocket_rate_limit.max_requests,
2288        );
2289        self.rate_limiter.websocket_rate_limit.window_seconds = parse_env::<u64>(
2290            "RATE_LIMITER_WS_WINDOW_SECONDS",
2291            self.rate_limiter.websocket_rate_limit.window_seconds,
2292        );
2293        if let Ok(prefix) = std::env::var("RATE_LIMITER_REDIS_PREFIX") {
2294            self.rate_limiter.redis.prefix = Some(prefix);
2295        }
2296
2297        // --- Queue: Redis ---
2298        self.queue.redis.concurrency =
2299            parse_env::<u32>("QUEUE_REDIS_CONCURRENCY", self.queue.redis.concurrency);
2300        if let Ok(prefix) = std::env::var("QUEUE_REDIS_PREFIX") {
2301            self.queue.redis.prefix = Some(prefix);
2302        }
2303
2304        // --- Queue: SQS ---
2305        if let Ok(region) = std::env::var("QUEUE_SQS_REGION") {
2306            self.queue.sqs.region = region;
2307        }
2308        self.queue.sqs.visibility_timeout = parse_env::<i32>(
2309            "QUEUE_SQS_VISIBILITY_TIMEOUT",
2310            self.queue.sqs.visibility_timeout,
2311        );
2312        self.queue.sqs.max_messages =
2313            parse_env::<i32>("QUEUE_SQS_MAX_MESSAGES", self.queue.sqs.max_messages);
2314        self.queue.sqs.wait_time_seconds = parse_env::<i32>(
2315            "QUEUE_SQS_WAIT_TIME_SECONDS",
2316            self.queue.sqs.wait_time_seconds,
2317        );
2318        self.queue.sqs.concurrency =
2319            parse_env::<u32>("QUEUE_SQS_CONCURRENCY", self.queue.sqs.concurrency);
2320        self.queue.sqs.fifo = parse_bool_env("QUEUE_SQS_FIFO", self.queue.sqs.fifo);
2321        if let Ok(endpoint) = std::env::var("QUEUE_SQS_ENDPOINT_URL") {
2322            self.queue.sqs.endpoint_url = Some(endpoint);
2323        }
2324
2325        // --- Queue: SNS ---
2326        if let Ok(region) = std::env::var("QUEUE_SNS_REGION") {
2327            self.queue.sns.region = region;
2328        }
2329        if let Ok(topic_arn) = std::env::var("QUEUE_SNS_TOPIC_ARN") {
2330            self.queue.sns.topic_arn = topic_arn;
2331        }
2332        if let Ok(endpoint) = std::env::var("QUEUE_SNS_ENDPOINT_URL") {
2333            self.queue.sns.endpoint_url = Some(endpoint);
2334        }
2335
2336        // --- Webhooks ---
2337        self.webhooks.batching.enabled =
2338            parse_bool_env("WEBHOOK_BATCHING_ENABLED", self.webhooks.batching.enabled);
2339        self.webhooks.batching.duration =
2340            parse_env::<u64>("WEBHOOK_BATCHING_DURATION", self.webhooks.batching.duration);
2341        self.webhooks.batching.size =
2342            parse_env::<usize>("WEBHOOK_BATCHING_SIZE", self.webhooks.batching.size);
2343
2344        // --- NATS Adapter ---
2345        if let Ok(servers) = std::env::var("NATS_SERVERS") {
2346            self.adapter.nats.servers = servers.split(',').map(|s| s.trim().to_string()).collect();
2347        }
2348        if let Ok(user) = std::env::var("NATS_USERNAME") {
2349            self.adapter.nats.username = Some(user);
2350        }
2351        if let Ok(pass) = std::env::var("NATS_PASSWORD") {
2352            self.adapter.nats.password = Some(pass);
2353        }
2354        if let Ok(token) = std::env::var("NATS_TOKEN") {
2355            self.adapter.nats.token = Some(token);
2356        }
2357        if let Ok(prefix) = std::env::var("NATS_PREFIX") {
2358            self.adapter.nats.prefix = prefix;
2359        }
2360        self.adapter.nats.connection_timeout_ms = parse_env::<u64>(
2361            "NATS_CONNECTION_TIMEOUT_MS",
2362            self.adapter.nats.connection_timeout_ms,
2363        );
2364        self.adapter.nats.request_timeout_ms = parse_env::<u64>(
2365            "NATS_REQUEST_TIMEOUT_MS",
2366            self.adapter.nats.request_timeout_ms,
2367        );
2368        self.adapter.nats.discovery_max_wait_ms = parse_env::<u64>(
2369            "NATS_DISCOVERY_MAX_WAIT_MS",
2370            self.adapter.nats.discovery_max_wait_ms,
2371        );
2372        self.adapter.nats.discovery_idle_wait_ms = parse_env::<u64>(
2373            "NATS_DISCOVERY_IDLE_WAIT_MS",
2374            self.adapter.nats.discovery_idle_wait_ms,
2375        );
2376        if let Some(nodes) = parse_env_optional::<u32>("NATS_NODES_NUMBER") {
2377            self.adapter.nats.nodes_number = Some(nodes);
2378        }
2379
2380        // --- Pulsar Adapter ---
2381        if let Ok(url) = std::env::var("PULSAR_URL") {
2382            self.adapter.pulsar.url = url;
2383        }
2384        if let Ok(prefix) = std::env::var("PULSAR_PREFIX") {
2385            self.adapter.pulsar.prefix = prefix;
2386        }
2387        if let Ok(token) = std::env::var("PULSAR_TOKEN") {
2388            self.adapter.pulsar.token = Some(token);
2389        }
2390        self.adapter.pulsar.request_timeout_ms = parse_env::<u64>(
2391            "PULSAR_REQUEST_TIMEOUT_MS",
2392            self.adapter.pulsar.request_timeout_ms,
2393        );
2394        if let Some(nodes) = parse_env_optional::<u32>("PULSAR_NODES_NUMBER") {
2395            self.adapter.pulsar.nodes_number = Some(nodes);
2396        }
2397
2398        // --- RabbitMQ Adapter ---
2399        if let Ok(url) = std::env::var("RABBITMQ_URL") {
2400            self.adapter.rabbitmq.url = url;
2401        }
2402        if let Ok(prefix) = std::env::var("RABBITMQ_PREFIX") {
2403            self.adapter.rabbitmq.prefix = prefix;
2404        }
2405        self.adapter.rabbitmq.connection_timeout_ms = parse_env::<u64>(
2406            "RABBITMQ_CONNECTION_TIMEOUT_MS",
2407            self.adapter.rabbitmq.connection_timeout_ms,
2408        );
2409        self.adapter.rabbitmq.request_timeout_ms = parse_env::<u64>(
2410            "RABBITMQ_REQUEST_TIMEOUT_MS",
2411            self.adapter.rabbitmq.request_timeout_ms,
2412        );
2413        if let Some(nodes) = parse_env_optional::<u32>("RABBITMQ_NODES_NUMBER") {
2414            self.adapter.rabbitmq.nodes_number = Some(nodes);
2415        }
2416
2417        // --- Google Pub/Sub Adapter ---
2418        if let Ok(project_id) = std::env::var("GOOGLE_PUBSUB_PROJECT_ID") {
2419            self.adapter.google_pubsub.project_id = project_id;
2420        }
2421        if let Ok(prefix) = std::env::var("GOOGLE_PUBSUB_PREFIX") {
2422            self.adapter.google_pubsub.prefix = prefix;
2423        }
2424        if let Ok(emulator_host) = std::env::var("PUBSUB_EMULATOR_HOST") {
2425            self.adapter.google_pubsub.emulator_host = Some(emulator_host);
2426        }
2427        self.adapter.google_pubsub.request_timeout_ms = parse_env::<u64>(
2428            "GOOGLE_PUBSUB_REQUEST_TIMEOUT_MS",
2429            self.adapter.google_pubsub.request_timeout_ms,
2430        );
2431        if let Some(nodes) = parse_env_optional::<u32>("GOOGLE_PUBSUB_NODES_NUMBER") {
2432            self.adapter.google_pubsub.nodes_number = Some(nodes);
2433        }
2434
2435        // --- Kafka Adapter ---
2436        if let Ok(brokers) = std::env::var("KAFKA_BROKERS") {
2437            self.adapter.kafka.brokers = brokers.split(',').map(|s| s.trim().to_string()).collect();
2438        }
2439        if let Ok(prefix) = std::env::var("KAFKA_PREFIX") {
2440            self.adapter.kafka.prefix = prefix;
2441        }
2442        if let Ok(protocol) = std::env::var("KAFKA_SECURITY_PROTOCOL") {
2443            self.adapter.kafka.security_protocol = Some(protocol);
2444        }
2445        if let Ok(mechanism) = std::env::var("KAFKA_SASL_MECHANISM") {
2446            self.adapter.kafka.sasl_mechanism = Some(mechanism);
2447        }
2448        if let Ok(username) = std::env::var("KAFKA_SASL_USERNAME") {
2449            self.adapter.kafka.sasl_username = Some(username);
2450        }
2451        if let Ok(password) = std::env::var("KAFKA_SASL_PASSWORD") {
2452            self.adapter.kafka.sasl_password = Some(password);
2453        }
2454        self.adapter.kafka.request_timeout_ms = parse_env::<u64>(
2455            "KAFKA_REQUEST_TIMEOUT_MS",
2456            self.adapter.kafka.request_timeout_ms,
2457        );
2458        if let Some(nodes) = parse_env_optional::<u32>("KAFKA_NODES_NUMBER") {
2459            self.adapter.kafka.nodes_number = Some(nodes);
2460        }
2461
2462        // --- CORS ---
2463        if let Ok(origins) = std::env::var("CORS_ORIGINS") {
2464            let parsed: Vec<String> = origins.split(',').map(|s| s.trim().to_string()).collect();
2465            if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&parsed) {
2466                warn!(
2467                    "CORS_ORIGINS contains invalid patterns: {}. Keeping previous CORS origins.",
2468                    e
2469                );
2470            } else {
2471                self.cors.origin = parsed;
2472            }
2473        }
2474        if let Ok(methods) = std::env::var("CORS_METHODS") {
2475            self.cors.methods = methods.split(',').map(|s| s.trim().to_string()).collect();
2476        }
2477        if let Ok(headers) = std::env::var("CORS_HEADERS") {
2478            self.cors.allowed_headers = headers.split(',').map(|s| s.trim().to_string()).collect();
2479        }
2480        self.cors.credentials = parse_bool_env("CORS_CREDENTIALS", self.cors.credentials);
2481
2482        // --- Performance Tuning ---
2483        self.database_pooling.enabled =
2484            parse_bool_env("DATABASE_POOLING_ENABLED", self.database_pooling.enabled);
2485        if let Some(min) = parse_env_optional::<u32>("DATABASE_POOL_MIN") {
2486            self.database_pooling.min = min;
2487        }
2488        if let Some(max) = parse_env_optional::<u32>("DATABASE_POOL_MAX") {
2489            self.database_pooling.max = max;
2490        }
2491
2492        if let Some(pool_size) = parse_env_optional::<u32>("DATABASE_CONNECTION_POOL_SIZE") {
2493            self.database.mysql.connection_pool_size = pool_size;
2494            self.database.postgres.connection_pool_size = pool_size;
2495        }
2496        if let Some(cache_ttl) = parse_env_optional::<u64>("CACHE_TTL_SECONDS") {
2497            self.app_manager.cache.ttl = cache_ttl;
2498            self.channel_limits.cache_ttl = cache_ttl;
2499            self.database.mysql.cache_ttl = cache_ttl;
2500            self.database.postgres.cache_ttl = cache_ttl;
2501            self.database.surrealdb.cache_ttl = cache_ttl;
2502            self.cache.memory.ttl = cache_ttl;
2503        }
2504        if let Some(cleanup_interval) = parse_env_optional::<u64>("CACHE_CLEANUP_INTERVAL") {
2505            self.database.mysql.cache_cleanup_interval = cleanup_interval;
2506            self.database.postgres.cache_cleanup_interval = cleanup_interval;
2507            self.cache.memory.cleanup_interval = cleanup_interval;
2508        }
2509        if let Some(max_capacity) = parse_env_optional::<u64>("CACHE_MAX_CAPACITY") {
2510            self.database.mysql.cache_max_capacity = max_capacity;
2511            self.database.postgres.cache_max_capacity = max_capacity;
2512            self.database.surrealdb.cache_max_capacity = max_capacity;
2513            self.cache.memory.max_capacity = max_capacity;
2514        }
2515
2516        let default_app_id = std::env::var("SOCKUDO_DEFAULT_APP_ID");
2517        let default_app_key = std::env::var("SOCKUDO_DEFAULT_APP_KEY");
2518        let default_app_secret = std::env::var("SOCKUDO_DEFAULT_APP_SECRET");
2519        let default_app_enabled = parse_bool_env("SOCKUDO_DEFAULT_APP_ENABLED", true);
2520
2521        if let (Ok(app_id), Ok(app_key), Ok(app_secret)) =
2522            (default_app_id, default_app_key, default_app_secret)
2523            && default_app_enabled
2524        {
2525            let default_app = App::from_policy(
2526                app_id,
2527                app_key,
2528                app_secret,
2529                default_app_enabled,
2530                crate::app::AppPolicy {
2531                    limits: crate::app::AppLimitsPolicy {
2532                        max_connections: parse_env::<u32>(
2533                            "SOCKUDO_DEFAULT_APP_MAX_CONNECTIONS",
2534                            100,
2535                        ),
2536                        max_backend_events_per_second: Some(parse_env::<u32>(
2537                            "SOCKUDO_DEFAULT_APP_MAX_BACKEND_EVENTS_PER_SECOND",
2538                            100,
2539                        )),
2540                        max_client_events_per_second: parse_env::<u32>(
2541                            "SOCKUDO_DEFAULT_APP_MAX_CLIENT_EVENTS_PER_SECOND",
2542                            100,
2543                        ),
2544                        max_read_requests_per_second: Some(parse_env::<u32>(
2545                            "SOCKUDO_DEFAULT_APP_MAX_READ_REQUESTS_PER_SECOND",
2546                            100,
2547                        )),
2548                        max_presence_members_per_channel: Some(parse_env::<u32>(
2549                            "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBERS_PER_CHANNEL",
2550                            100,
2551                        )),
2552                        max_presence_member_size_in_kb: Some(parse_env::<u32>(
2553                            "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBER_SIZE_IN_KB",
2554                            100,
2555                        )),
2556                        max_channel_name_length: Some(parse_env::<u32>(
2557                            "SOCKUDO_DEFAULT_APP_MAX_CHANNEL_NAME_LENGTH",
2558                            100,
2559                        )),
2560                        max_event_channels_at_once: Some(parse_env::<u32>(
2561                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_CHANNELS_AT_ONCE",
2562                            100,
2563                        )),
2564                        max_event_name_length: Some(parse_env::<u32>(
2565                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_NAME_LENGTH",
2566                            100,
2567                        )),
2568                        max_event_payload_in_kb: Some(parse_env::<u32>(
2569                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_PAYLOAD_IN_KB",
2570                            100,
2571                        )),
2572                        max_event_batch_size: Some(parse_env::<u32>(
2573                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_BATCH_SIZE",
2574                            100,
2575                        )),
2576                        decay_seconds: None,
2577                        terminate_on_limit: false,
2578                        message_rate_limit: None,
2579                    },
2580                    features: crate::app::AppFeaturesPolicy {
2581                        enable_client_messages: std::env::var(
2582                            "SOCKUDO_DEFAULT_APP_ENABLE_CLIENT_MESSAGES",
2583                        )
2584                        .ok()
2585                        .map(|value| {
2586                            matches!(
2587                                value.trim().to_ascii_lowercase().as_str(),
2588                                "true" | "1" | "yes" | "on"
2589                            )
2590                        })
2591                        .unwrap_or_else(|| parse_bool_env("SOCKUDO_ENABLE_CLIENT_MESSAGES", false)),
2592                        enable_user_authentication: Some(parse_bool_env(
2593                            "SOCKUDO_DEFAULT_APP_ENABLE_USER_AUTHENTICATION",
2594                            false,
2595                        )),
2596                        enable_watchlist_events: Some(parse_bool_env(
2597                            "SOCKUDO_DEFAULT_APP_ENABLE_WATCHLIST_EVENTS",
2598                            false,
2599                        )),
2600                    },
2601                    channels: crate::app::AppChannelsPolicy {
2602                        allowed_origins: {
2603                            if let Ok(origins_str) =
2604                                std::env::var("SOCKUDO_DEFAULT_APP_ALLOWED_ORIGINS")
2605                            {
2606                                if !origins_str.is_empty() {
2607                                    Some(
2608                                        origins_str
2609                                            .split(',')
2610                                            .map(|s| s.trim().to_string())
2611                                            .collect(),
2612                                    )
2613                                } else {
2614                                    None
2615                                }
2616                            } else {
2617                                None
2618                            }
2619                        },
2620                        channel_delta_compression: None,
2621                        channel_namespaces: None,
2622                    },
2623                    webhooks: None,
2624                    idempotency: None,
2625                    connection_recovery: None,
2626                },
2627            );
2628
2629            self.app_manager.array.apps.push(default_app);
2630            info!("Successfully registered default app from env");
2631        }
2632
2633        // Special handling for REDIS_URL
2634        if let Ok(redis_url_env) = std::env::var("REDIS_URL") {
2635            info!("Applying REDIS_URL environment variable override");
2636
2637            let redis_url_json = sonic_rs::json!(redis_url_env);
2638
2639            self.adapter
2640                .redis
2641                .redis_pub_options
2642                .insert("url".to_string(), redis_url_json.clone());
2643            self.adapter
2644                .redis
2645                .redis_sub_options
2646                .insert("url".to_string(), redis_url_json);
2647
2648            self.cache.redis.url_override = Some(redis_url_env.clone());
2649            self.queue.redis.url_override = Some(redis_url_env.clone());
2650            self.rate_limiter.redis.url_override = Some(redis_url_env);
2651        }
2652
2653        // --- Logging Configuration ---
2654        let has_colors_env = std::env::var("LOG_COLORS_ENABLED").is_ok();
2655        let has_target_env = std::env::var("LOG_INCLUDE_TARGET").is_ok();
2656        if has_colors_env || has_target_env {
2657            let logging_config = self.logging.get_or_insert_with(Default::default);
2658            if has_colors_env {
2659                logging_config.colors_enabled =
2660                    parse_bool_env("LOG_COLORS_ENABLED", logging_config.colors_enabled);
2661            }
2662            if has_target_env {
2663                logging_config.include_target =
2664                    parse_bool_env("LOG_INCLUDE_TARGET", logging_config.include_target);
2665            }
2666        }
2667
2668        // --- Cleanup Configuration ---
2669        self.cleanup.async_enabled =
2670            parse_bool_env("CLEANUP_ASYNC_ENABLED", self.cleanup.async_enabled);
2671        self.cleanup.fallback_to_sync =
2672            parse_bool_env("CLEANUP_FALLBACK_TO_SYNC", self.cleanup.fallback_to_sync);
2673        self.cleanup.queue_buffer_size =
2674            parse_env::<usize>("CLEANUP_QUEUE_BUFFER_SIZE", self.cleanup.queue_buffer_size);
2675        self.cleanup.batch_size = parse_env::<usize>("CLEANUP_BATCH_SIZE", self.cleanup.batch_size);
2676        self.cleanup.batch_timeout_ms =
2677            parse_env::<u64>("CLEANUP_BATCH_TIMEOUT_MS", self.cleanup.batch_timeout_ms);
2678        if let Ok(worker_threads_str) = std::env::var("CLEANUP_WORKER_THREADS") {
2679            self.cleanup.worker_threads = if worker_threads_str.to_lowercase() == "auto" {
2680                WorkerThreadsConfig::Auto
2681            } else if let Ok(n) = worker_threads_str.parse::<usize>() {
2682                WorkerThreadsConfig::Fixed(n)
2683            } else {
2684                warn!(
2685                    "Invalid CLEANUP_WORKER_THREADS value '{}', keeping current setting",
2686                    worker_threads_str
2687                );
2688                self.cleanup.worker_threads.clone()
2689            };
2690        }
2691        self.cleanup.max_retry_attempts = parse_env::<u32>(
2692            "CLEANUP_MAX_RETRY_ATTEMPTS",
2693            self.cleanup.max_retry_attempts,
2694        );
2695
2696        // Cluster health configuration
2697        self.cluster_health.enabled =
2698            parse_bool_env("CLUSTER_HEALTH_ENABLED", self.cluster_health.enabled);
2699        self.cluster_health.heartbeat_interval_ms = parse_env::<u64>(
2700            "CLUSTER_HEALTH_HEARTBEAT_INTERVAL",
2701            self.cluster_health.heartbeat_interval_ms,
2702        );
2703        self.cluster_health.node_timeout_ms = parse_env::<u64>(
2704            "CLUSTER_HEALTH_NODE_TIMEOUT",
2705            self.cluster_health.node_timeout_ms,
2706        );
2707        self.cluster_health.cleanup_interval_ms = parse_env::<u64>(
2708            "CLUSTER_HEALTH_CLEANUP_INTERVAL",
2709            self.cluster_health.cleanup_interval_ms,
2710        );
2711
2712        // Tag filtering configuration
2713        self.tag_filtering.enabled =
2714            parse_bool_env("TAG_FILTERING_ENABLED", self.tag_filtering.enabled);
2715
2716        // WebSocket buffer configuration
2717        if let Ok(val) = std::env::var("WEBSOCKET_MAX_MESSAGES") {
2718            if val.to_lowercase() == "none" || val == "0" {
2719                self.websocket.max_messages = None;
2720            } else if let Ok(n) = val.parse::<usize>() {
2721                self.websocket.max_messages = Some(n);
2722            }
2723        }
2724        if let Ok(val) = std::env::var("WEBSOCKET_MAX_BYTES") {
2725            if val.to_lowercase() == "none" || val == "0" {
2726                self.websocket.max_bytes = None;
2727            } else if let Ok(n) = val.parse::<usize>() {
2728                self.websocket.max_bytes = Some(n);
2729            }
2730        }
2731        self.websocket.disconnect_on_buffer_full = parse_bool_env(
2732            "WEBSOCKET_DISCONNECT_ON_BUFFER_FULL",
2733            self.websocket.disconnect_on_buffer_full,
2734        );
2735        self.websocket.max_message_size = parse_env::<usize>(
2736            "WEBSOCKET_MAX_MESSAGE_SIZE",
2737            self.websocket.max_message_size,
2738        );
2739        self.websocket.max_frame_size =
2740            parse_env::<usize>("WEBSOCKET_MAX_FRAME_SIZE", self.websocket.max_frame_size);
2741        self.websocket.write_buffer_size = parse_env::<usize>(
2742            "WEBSOCKET_WRITE_BUFFER_SIZE",
2743            self.websocket.write_buffer_size,
2744        );
2745        self.websocket.max_backpressure = parse_env::<usize>(
2746            "WEBSOCKET_MAX_BACKPRESSURE",
2747            self.websocket.max_backpressure,
2748        );
2749        self.websocket.auto_ping = parse_bool_env("WEBSOCKET_AUTO_PING", self.websocket.auto_ping);
2750        self.websocket.ping_interval =
2751            parse_env::<u32>("WEBSOCKET_PING_INTERVAL", self.websocket.ping_interval);
2752        self.websocket.idle_timeout =
2753            parse_env::<u32>("WEBSOCKET_IDLE_TIMEOUT", self.websocket.idle_timeout);
2754        if let Ok(mode) = std::env::var("WEBSOCKET_COMPRESSION") {
2755            self.websocket.compression = mode;
2756        }
2757
2758        // Connection recovery (includes serial + message_id + replay buffer)
2759        self.connection_recovery.enabled = parse_bool_env(
2760            "CONNECTION_RECOVERY_ENABLED",
2761            self.connection_recovery.enabled,
2762        );
2763        self.connection_recovery.buffer_ttl_seconds = parse_env::<u64>(
2764            "CONNECTION_RECOVERY_BUFFER_TTL",
2765            self.connection_recovery.buffer_ttl_seconds,
2766        );
2767        self.connection_recovery.max_buffer_size = parse_env::<usize>(
2768            "CONNECTION_RECOVERY_MAX_BUFFER_SIZE",
2769            self.connection_recovery.max_buffer_size,
2770        );
2771
2772        self.idempotency.enabled = parse_bool_env("IDEMPOTENCY_ENABLED", self.idempotency.enabled);
2773        self.idempotency.ttl_seconds =
2774            parse_env::<u64>("IDEMPOTENCY_TTL_SECONDS", self.idempotency.ttl_seconds);
2775        self.idempotency.max_key_length = parse_env::<usize>(
2776            "IDEMPOTENCY_MAX_KEY_LENGTH",
2777            self.idempotency.max_key_length,
2778        );
2779
2780        self.ephemeral.enabled = parse_bool_env("EPHEMERAL_ENABLED", self.ephemeral.enabled);
2781
2782        self.echo_control.enabled =
2783            parse_bool_env("ECHO_CONTROL_ENABLED", self.echo_control.enabled);
2784        self.echo_control.default_echo_messages = parse_bool_env(
2785            "ECHO_CONTROL_DEFAULT_ECHO_MESSAGES",
2786            self.echo_control.default_echo_messages,
2787        );
2788
2789        self.event_name_filtering.enabled = parse_bool_env(
2790            "EVENT_NAME_FILTERING_ENABLED",
2791            self.event_name_filtering.enabled,
2792        );
2793        self.event_name_filtering.max_events_per_filter = parse_env::<usize>(
2794            "EVENT_NAME_FILTERING_MAX_EVENTS_PER_FILTER",
2795            self.event_name_filtering.max_events_per_filter,
2796        );
2797        self.event_name_filtering.max_event_name_length = parse_env::<usize>(
2798            "EVENT_NAME_FILTERING_MAX_EVENT_NAME_LENGTH",
2799            self.event_name_filtering.max_event_name_length,
2800        );
2801
2802        Ok(())
2803    }
2804
2805    pub fn validate(&self) -> Result<(), String> {
2806        if self.unix_socket.enabled {
2807            if self.unix_socket.path.is_empty() {
2808                return Err(
2809                    "Unix socket path cannot be empty when Unix socket is enabled".to_string(),
2810                );
2811            }
2812
2813            self.validate_unix_socket_security()?;
2814
2815            if self.ssl.enabled {
2816                tracing::warn!(
2817                    "Both Unix socket and SSL are enabled. This is unusual as Unix sockets are typically used behind reverse proxies that handle SSL termination."
2818                );
2819            }
2820
2821            if self.unix_socket.permission_mode > 0o777 {
2822                return Err(format!(
2823                    "Unix socket permission_mode ({:o}) is invalid. Must be a valid octal mode (0o000 to 0o777)",
2824                    self.unix_socket.permission_mode
2825                ));
2826            }
2827        }
2828
2829        if let Err(e) = self.cleanup.validate() {
2830            return Err(format!("Invalid cleanup configuration: {}", e));
2831        }
2832
2833        Ok(())
2834    }
2835
2836    fn validate_unix_socket_security(&self) -> Result<(), String> {
2837        let path = &self.unix_socket.path;
2838
2839        if path.contains("../") || path.contains("..\\") {
2840            return Err(
2841                "Unix socket path contains directory traversal sequences (../). This is not allowed for security reasons.".to_string()
2842            );
2843        }
2844
2845        if self.unix_socket.permission_mode & 0o002 != 0 {
2846            warn!(
2847                "Unix socket permission mode ({:o}) allows world write access. This may be a security risk. Consider using more restrictive permissions like 0o660 or 0o750.",
2848                self.unix_socket.permission_mode
2849            );
2850        }
2851
2852        if self.unix_socket.permission_mode & 0o007 > 0o005 {
2853            warn!(
2854                "Unix socket permission mode ({:o}) grants write permissions to others. Consider using more restrictive permissions.",
2855                self.unix_socket.permission_mode
2856            );
2857        }
2858
2859        if self.mode == "production" && path.starts_with("/tmp/") {
2860            warn!(
2861                "Unix socket path '{}' is in /tmp directory. In production, consider using a more permanent location like /var/run/sockudo/ for better security and persistence.",
2862                path
2863            );
2864        }
2865
2866        if !path.starts_with('/') {
2867            return Err(
2868                "Unix socket path must be absolute (start with /) for security and reliability."
2869                    .to_string(),
2870            );
2871        }
2872
2873        Ok(())
2874    }
2875}
2876
2877#[cfg(test)]
2878mod redis_connection_tests {
2879    use super::{ClusterNode, RedisClusterConnection, RedisConnection, RedisSentinel};
2880
2881    #[test]
2882    fn test_standard_url_basic() {
2883        let conn = RedisConnection {
2884            host: "127.0.0.1".to_string(),
2885            port: 6379,
2886            db: 0,
2887            username: None,
2888            password: None,
2889            key_prefix: "sockudo:".to_string(),
2890            sentinels: Vec::new(),
2891            sentinel_password: None,
2892            name: "mymaster".to_string(),
2893            cluster: RedisClusterConnection::default(),
2894            cluster_nodes: Vec::new(),
2895        };
2896        assert_eq!(conn.to_url(), "redis://127.0.0.1:6379/0");
2897    }
2898
2899    #[test]
2900    fn test_standard_url_with_password() {
2901        let conn = RedisConnection {
2902            host: "127.0.0.1".to_string(),
2903            port: 6379,
2904            db: 2,
2905            username: None,
2906            password: Some("secret".to_string()),
2907            key_prefix: "sockudo:".to_string(),
2908            sentinels: Vec::new(),
2909            sentinel_password: None,
2910            name: "mymaster".to_string(),
2911            cluster: RedisClusterConnection::default(),
2912            cluster_nodes: Vec::new(),
2913        };
2914        assert_eq!(conn.to_url(), "redis://:secret@127.0.0.1:6379/2");
2915    }
2916
2917    #[test]
2918    fn test_standard_url_with_username_and_password() {
2919        let conn = RedisConnection {
2920            host: "redis.example.com".to_string(),
2921            port: 6380,
2922            db: 1,
2923            username: Some("admin".to_string()),
2924            password: Some("pass123".to_string()),
2925            key_prefix: "sockudo:".to_string(),
2926            sentinels: Vec::new(),
2927            sentinel_password: None,
2928            name: "mymaster".to_string(),
2929            cluster: RedisClusterConnection::default(),
2930            cluster_nodes: Vec::new(),
2931        };
2932        assert_eq!(
2933            conn.to_url(),
2934            "redis://admin:pass123@redis.example.com:6380/1"
2935        );
2936    }
2937
2938    #[test]
2939    fn test_standard_url_with_special_chars_in_password() {
2940        let conn = RedisConnection {
2941            host: "127.0.0.1".to_string(),
2942            port: 6379,
2943            db: 0,
2944            username: None,
2945            password: Some("pass@word#123".to_string()),
2946            key_prefix: "sockudo:".to_string(),
2947            sentinels: Vec::new(),
2948            sentinel_password: None,
2949            name: "mymaster".to_string(),
2950            cluster: RedisClusterConnection::default(),
2951            cluster_nodes: Vec::new(),
2952        };
2953        assert_eq!(conn.to_url(), "redis://:pass%40word%23123@127.0.0.1:6379/0");
2954    }
2955
2956    #[test]
2957    fn test_is_sentinel_configured_false() {
2958        let conn = RedisConnection::default();
2959        assert!(!conn.is_sentinel_configured());
2960    }
2961
2962    #[test]
2963    fn test_is_sentinel_configured_true() {
2964        let conn = RedisConnection {
2965            sentinels: vec![RedisSentinel {
2966                host: "sentinel1".to_string(),
2967                port: 26379,
2968            }],
2969            ..Default::default()
2970        };
2971        assert!(conn.is_sentinel_configured());
2972    }
2973
2974    #[test]
2975    fn test_sentinel_url_basic() {
2976        let conn = RedisConnection {
2977            host: "127.0.0.1".to_string(),
2978            port: 6379,
2979            db: 0,
2980            username: None,
2981            password: None,
2982            key_prefix: "sockudo:".to_string(),
2983            sentinels: vec![
2984                RedisSentinel {
2985                    host: "sentinel1".to_string(),
2986                    port: 26379,
2987                },
2988                RedisSentinel {
2989                    host: "sentinel2".to_string(),
2990                    port: 26379,
2991                },
2992            ],
2993            sentinel_password: None,
2994            name: "mymaster".to_string(),
2995            cluster: RedisClusterConnection::default(),
2996            cluster_nodes: Vec::new(),
2997        };
2998        assert_eq!(
2999            conn.to_url(),
3000            "redis+sentinel://sentinel1:26379,sentinel2:26379/mymaster/0"
3001        );
3002    }
3003
3004    #[test]
3005    fn test_sentinel_url_with_sentinel_password() {
3006        let conn = RedisConnection {
3007            host: "127.0.0.1".to_string(),
3008            port: 6379,
3009            db: 0,
3010            username: None,
3011            password: None,
3012            key_prefix: "sockudo:".to_string(),
3013            sentinels: vec![RedisSentinel {
3014                host: "sentinel1".to_string(),
3015                port: 26379,
3016            }],
3017            sentinel_password: Some("sentinelpass".to_string()),
3018            name: "mymaster".to_string(),
3019            cluster: RedisClusterConnection::default(),
3020            cluster_nodes: Vec::new(),
3021        };
3022        assert_eq!(
3023            conn.to_url(),
3024            "redis+sentinel://:sentinelpass@sentinel1:26379/mymaster/0"
3025        );
3026    }
3027
3028    #[test]
3029    fn test_sentinel_url_with_master_password() {
3030        let conn = RedisConnection {
3031            host: "127.0.0.1".to_string(),
3032            port: 6379,
3033            db: 1,
3034            username: None,
3035            password: Some("masterpass".to_string()),
3036            key_prefix: "sockudo:".to_string(),
3037            sentinels: vec![RedisSentinel {
3038                host: "sentinel1".to_string(),
3039                port: 26379,
3040            }],
3041            sentinel_password: None,
3042            name: "mymaster".to_string(),
3043            cluster: RedisClusterConnection::default(),
3044            cluster_nodes: Vec::new(),
3045        };
3046        assert_eq!(
3047            conn.to_url(),
3048            "redis+sentinel://sentinel1:26379/mymaster/1?password=masterpass"
3049        );
3050    }
3051
3052    #[test]
3053    fn test_sentinel_url_with_all_auth() {
3054        let conn = RedisConnection {
3055            host: "127.0.0.1".to_string(),
3056            port: 6379,
3057            db: 2,
3058            username: Some("redisuser".to_string()),
3059            password: Some("redispass".to_string()),
3060            key_prefix: "sockudo:".to_string(),
3061            sentinels: vec![
3062                RedisSentinel {
3063                    host: "sentinel1".to_string(),
3064                    port: 26379,
3065                },
3066                RedisSentinel {
3067                    host: "sentinel2".to_string(),
3068                    port: 26380,
3069                },
3070            ],
3071            sentinel_password: Some("sentinelauth".to_string()),
3072            name: "production-master".to_string(),
3073            cluster: RedisClusterConnection::default(),
3074            cluster_nodes: Vec::new(),
3075        };
3076        assert_eq!(
3077            conn.to_url(),
3078            "redis+sentinel://:sentinelauth@sentinel1:26379,sentinel2:26380/production-master/2?password=redispass&username=redisuser"
3079        );
3080    }
3081
3082    #[test]
3083    fn test_sentinel_to_host_port() {
3084        let sentinel = RedisSentinel {
3085            host: "sentinel.example.com".to_string(),
3086            port: 26379,
3087        };
3088        assert_eq!(sentinel.to_host_port(), "sentinel.example.com:26379");
3089    }
3090
3091    #[test]
3092    fn test_cluster_node_urls_with_shared_cluster_auth_and_tls() {
3093        let conn = RedisConnection {
3094            cluster: RedisClusterConnection {
3095                nodes: vec![
3096                    ClusterNode {
3097                        host: "node1.secure-cluster.com".to_string(),
3098                        port: 7000,
3099                    },
3100                    ClusterNode {
3101                        host: "redis://node2.secure-cluster.com:7001".to_string(),
3102                        port: 7001,
3103                    },
3104                    ClusterNode {
3105                        host: "rediss://node3.secure-cluster.com".to_string(),
3106                        port: 7002,
3107                    },
3108                ],
3109                username: None,
3110                password: Some("cluster-secret".to_string()),
3111                use_tls: true,
3112            },
3113            ..Default::default()
3114        };
3115
3116        assert_eq!(
3117            conn.cluster_node_urls(),
3118            vec![
3119                "rediss://:cluster-secret@node1.secure-cluster.com:7000",
3120                "redis://:cluster-secret@node2.secure-cluster.com:7001",
3121                "rediss://:cluster-secret@node3.secure-cluster.com:7002",
3122            ]
3123        );
3124    }
3125
3126    #[test]
3127    fn test_cluster_node_urls_fallback_to_legacy_nodes() {
3128        let conn = RedisConnection {
3129            password: Some("fallback-secret".to_string()),
3130            cluster_nodes: vec![ClusterNode {
3131                host: "legacy-node.example.com".to_string(),
3132                port: 7000,
3133            }],
3134            ..Default::default()
3135        };
3136
3137        assert_eq!(
3138            conn.cluster_node_urls(),
3139            vec!["redis://:fallback-secret@legacy-node.example.com:7000"]
3140        );
3141    }
3142
3143    #[test]
3144    fn test_normalize_cluster_seed_urls() {
3145        let conn = RedisConnection {
3146            cluster: RedisClusterConnection {
3147                nodes: Vec::new(),
3148                username: Some("svc-user".to_string()),
3149                password: Some("svc-pass".to_string()),
3150                use_tls: true,
3151            },
3152            ..Default::default()
3153        };
3154
3155        let seeds = vec![
3156            "node1.example.com:7000".to_string(),
3157            "redis://node2.example.com:7001".to_string(),
3158            "rediss://node3.example.com".to_string(),
3159        ];
3160
3161        assert_eq!(
3162            conn.normalize_cluster_seed_urls(&seeds),
3163            vec![
3164                "rediss://svc-user:svc-pass@node1.example.com:7000",
3165                "redis://svc-user:svc-pass@node2.example.com:7001",
3166                "rediss://svc-user:svc-pass@node3.example.com:6379",
3167            ]
3168        );
3169    }
3170}
3171
3172#[cfg(test)]
3173mod cluster_node_tests {
3174    use super::ClusterNode;
3175
3176    #[test]
3177    fn test_to_url_basic_host() {
3178        let node = ClusterNode {
3179            host: "localhost".to_string(),
3180            port: 6379,
3181        };
3182        assert_eq!(node.to_url(), "redis://localhost:6379");
3183    }
3184
3185    #[test]
3186    fn test_to_url_ip_address() {
3187        let node = ClusterNode {
3188            host: "127.0.0.1".to_string(),
3189            port: 6379,
3190        };
3191        assert_eq!(node.to_url(), "redis://127.0.0.1:6379");
3192    }
3193
3194    #[test]
3195    fn test_to_url_with_redis_protocol() {
3196        let node = ClusterNode {
3197            host: "redis://example.com".to_string(),
3198            port: 6379,
3199        };
3200        assert_eq!(node.to_url(), "redis://example.com:6379");
3201    }
3202
3203    #[test]
3204    fn test_to_url_with_rediss_protocol() {
3205        let node = ClusterNode {
3206            host: "rediss://secure.example.com".to_string(),
3207            port: 6379,
3208        };
3209        assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
3210    }
3211
3212    #[test]
3213    fn test_to_url_with_rediss_protocol_and_port_in_url() {
3214        let node = ClusterNode {
3215            host: "rediss://secure.example.com:7000".to_string(),
3216            port: 6379,
3217        };
3218        assert_eq!(node.to_url(), "rediss://secure.example.com:7000");
3219    }
3220
3221    #[test]
3222    fn test_to_url_with_redis_protocol_and_port_in_url() {
3223        let node = ClusterNode {
3224            host: "redis://example.com:7001".to_string(),
3225            port: 6379,
3226        };
3227        assert_eq!(node.to_url(), "redis://example.com:7001");
3228    }
3229
3230    #[test]
3231    fn test_to_url_with_trailing_whitespace() {
3232        let node = ClusterNode {
3233            host: "  rediss://secure.example.com  ".to_string(),
3234            port: 6379,
3235        };
3236        assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
3237    }
3238
3239    #[test]
3240    fn test_to_url_custom_port() {
3241        let node = ClusterNode {
3242            host: "redis-cluster.example.com".to_string(),
3243            port: 7000,
3244        };
3245        assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7000");
3246    }
3247
3248    #[test]
3249    fn test_to_url_plain_host_with_port_in_host_field() {
3250        let node = ClusterNode {
3251            host: "redis-cluster.example.com:7010".to_string(),
3252            port: 7000,
3253        };
3254        assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7010");
3255    }
3256
3257    #[test]
3258    fn test_to_url_with_options_adds_auth_and_tls() {
3259        let node = ClusterNode {
3260            host: "node.example.com".to_string(),
3261            port: 7000,
3262        };
3263        assert_eq!(
3264            node.to_url_with_options(true, Some("svc-user"), Some("secret")),
3265            "rediss://svc-user:secret@node.example.com:7000"
3266        );
3267    }
3268
3269    #[test]
3270    fn test_to_url_with_options_keeps_embedded_auth() {
3271        let node = ClusterNode {
3272            host: "rediss://:node-secret@node.example.com:7000".to_string(),
3273            port: 7000,
3274        };
3275        assert_eq!(
3276            node.to_url_with_options(true, Some("global-user"), Some("global-secret")),
3277            "rediss://:node-secret@node.example.com:7000"
3278        );
3279    }
3280
3281    #[test]
3282    fn test_from_seed_parses_plain_host_port() {
3283        let node = ClusterNode::from_seed("cluster-node-1:7005").expect("node should parse");
3284        assert_eq!(node.host, "cluster-node-1");
3285        assert_eq!(node.port, 7005);
3286    }
3287
3288    #[test]
3289    fn test_from_seed_keeps_scheme_urls() {
3290        let node =
3291            ClusterNode::from_seed("rediss://secure.example.com:7005").expect("node should parse");
3292        assert_eq!(node.host, "rediss://secure.example.com:7005");
3293        assert_eq!(node.port, 7005);
3294    }
3295
3296    #[test]
3297    fn test_to_url_aws_elasticache_hostname() {
3298        let node = ClusterNode {
3299            host: "rediss://my-cluster.use1.cache.amazonaws.com".to_string(),
3300            port: 6379,
3301        };
3302        assert_eq!(
3303            node.to_url(),
3304            "rediss://my-cluster.use1.cache.amazonaws.com:6379"
3305        );
3306    }
3307
3308    #[test]
3309    fn test_to_url_with_ipv6_no_port() {
3310        let node = ClusterNode {
3311            host: "rediss://[::1]".to_string(),
3312            port: 6379,
3313        };
3314        assert_eq!(node.to_url(), "rediss://[::1]:6379");
3315    }
3316
3317    #[test]
3318    fn test_to_url_with_ipv6_and_port_in_url() {
3319        let node = ClusterNode {
3320            host: "rediss://[::1]:7000".to_string(),
3321            port: 6379,
3322        };
3323        assert_eq!(node.to_url(), "rediss://[::1]:7000");
3324    }
3325
3326    #[test]
3327    fn test_to_url_with_ipv6_full_address_no_port() {
3328        let node = ClusterNode {
3329            host: "rediss://[2001:db8::1]".to_string(),
3330            port: 6379,
3331        };
3332        assert_eq!(node.to_url(), "rediss://[2001:db8::1]:6379");
3333    }
3334
3335    #[test]
3336    fn test_to_url_with_ipv6_full_address_with_port() {
3337        let node = ClusterNode {
3338            host: "rediss://[2001:db8::1]:7000".to_string(),
3339            port: 6379,
3340        };
3341        assert_eq!(node.to_url(), "rediss://[2001:db8::1]:7000");
3342    }
3343
3344    #[test]
3345    fn test_to_url_with_redis_protocol_ipv6() {
3346        let node = ClusterNode {
3347            host: "redis://[::1]".to_string(),
3348            port: 6379,
3349        };
3350        assert_eq!(node.to_url(), "redis://[::1]:6379");
3351    }
3352}
3353
3354#[cfg(test)]
3355mod cors_config_tests {
3356    use super::CorsConfig;
3357
3358    fn cors_from_json(json: &str) -> sonic_rs::Result<CorsConfig> {
3359        sonic_rs::from_str(json)
3360    }
3361
3362    #[test]
3363    fn test_deserialize_valid_exact_origins() {
3364        let config =
3365            cors_from_json(r#"{"origin": ["https://example.com", "https://other.com"]}"#).unwrap();
3366        assert_eq!(config.origin.len(), 2);
3367    }
3368
3369    #[test]
3370    fn test_deserialize_valid_wildcard_patterns() {
3371        let config =
3372            cors_from_json(r#"{"origin": ["*.example.com", "https://*.staging.example.com"]}"#)
3373                .unwrap();
3374        assert_eq!(config.origin.len(), 2);
3375    }
3376
3377    #[test]
3378    fn test_deserialize_allows_special_markers() {
3379        assert!(cors_from_json(r#"{"origin": ["*"]}"#).is_ok());
3380        assert!(cors_from_json(r#"{"origin": ["Any"]}"#).is_ok());
3381        assert!(cors_from_json(r#"{"origin": ["any"]}"#).is_ok());
3382        assert!(cors_from_json(r#"{"origin": ["*", "https://example.com"]}"#).is_ok());
3383    }
3384
3385    #[test]
3386    fn test_deserialize_rejects_invalid_patterns() {
3387        assert!(cors_from_json(r#"{"origin": ["*.*bad"]}"#).is_err());
3388        assert!(cors_from_json(r#"{"origin": ["*."]}"#).is_err());
3389        assert!(cors_from_json(r#"{"origin": [""]}"#).is_err());
3390        assert!(cors_from_json(r#"{"origin": ["https://"]}"#).is_err());
3391    }
3392
3393    #[test]
3394    fn test_deserialize_rejects_mixed_valid_and_invalid() {
3395        assert!(cors_from_json(r#"{"origin": ["https://good.com", "*.*bad"]}"#).is_err());
3396    }
3397}