Skip to main content

sockudo_core/
options.rs

1use crate::app::App;
2use crate::utils::{parse_bool_env, parse_env, parse_env_optional};
3use ahash::AHashMap;
4use serde::{Deserialize, Serialize};
5use std::str::FromStr;
6use tracing::{info, warn};
7use url::Url;
8
9// Custom deserializer for octal permission mode (string format only, like chmod)
10fn deserialize_octal_permission<'de, D>(deserializer: D) -> Result<u32, D::Error>
11where
12    D: serde::Deserializer<'de>,
13{
14    use serde::de::{self};
15
16    let s = String::deserialize(deserializer)?;
17
18    // Validate that the string contains only octal digits
19    if !s.chars().all(|c| c.is_digit(8)) {
20        return Err(de::Error::custom(format!(
21            "invalid octal permission mode '{}': must contain only digits 0-7",
22            s
23        )));
24    }
25
26    // Parse as octal
27    let mode = u32::from_str_radix(&s, 8)
28        .map_err(|_| de::Error::custom(format!("invalid octal permission mode: {}", s)))?;
29
30    // Validate it's within valid Unix permission range
31    if mode > 0o777 {
32        return Err(de::Error::custom(format!(
33            "permission mode '{}' exceeds maximum value 777",
34            s
35        )));
36    }
37
38    Ok(mode)
39}
40
41// Helper function to parse driver enums with fallback behavior (matches main.rs)
42fn parse_driver_enum<T: FromStr + Clone + std::fmt::Debug>(
43    driver_str: String,
44    default_driver: T,
45    driver_name: &str,
46) -> T
47where
48    <T as FromStr>::Err: std::fmt::Debug,
49{
50    match T::from_str(&driver_str.to_lowercase()) {
51        Ok(driver_enum) => driver_enum,
52        Err(e) => {
53            warn!(
54                "Failed to parse {} driver from string '{}': {:?}. Using default: {:?}.",
55                driver_name, driver_str, e, default_driver
56            );
57            default_driver
58        }
59    }
60}
61
62fn override_db_pool_settings(db_conn: &mut DatabaseConnection, prefix: &str) {
63    if let Some(min) = parse_env_optional::<u32>(&format!("{}_POOL_MIN", prefix)) {
64        db_conn.pool_min = Some(min);
65    }
66    if let Some(max) = parse_env_optional::<u32>(&format!("{}_POOL_MAX", prefix)) {
67        db_conn.pool_max = Some(max);
68    }
69}
70
71// --- Enums for Driver Types ---
72
73#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(rename_all = "lowercase")]
75pub enum AdapterDriver {
76    #[default]
77    Local,
78    Redis,
79    #[serde(rename = "redis-cluster")]
80    RedisCluster,
81    Nats,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
85#[serde(default)]
86pub struct DynamoDbSettings {
87    pub region: String,
88    pub table_name: String,
89    pub endpoint_url: Option<String>,
90    pub aws_access_key_id: Option<String>,
91    pub aws_secret_access_key: Option<String>,
92    pub aws_profile_name: Option<String>,
93}
94
95impl Default for DynamoDbSettings {
96    fn default() -> Self {
97        Self {
98            region: "us-east-1".to_string(),
99            table_name: "sockudo-applications".to_string(),
100            endpoint_url: None,
101            aws_access_key_id: None,
102            aws_secret_access_key: None,
103            aws_profile_name: None,
104        }
105    }
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
109#[serde(default)]
110pub struct ScyllaDbSettings {
111    pub nodes: Vec<String>,
112    pub keyspace: String,
113    pub table_name: String,
114    pub username: Option<String>,
115    pub password: Option<String>,
116    pub replication_class: String,
117    pub replication_factor: u32,
118}
119
120impl Default for ScyllaDbSettings {
121    fn default() -> Self {
122        Self {
123            nodes: vec!["127.0.0.1:9042".to_string()],
124            keyspace: "sockudo".to_string(),
125            table_name: "applications".to_string(),
126            username: None,
127            password: None,
128            replication_class: "SimpleStrategy".to_string(),
129            replication_factor: 3,
130        }
131    }
132}
133
134impl FromStr for AdapterDriver {
135    type Err = String;
136    fn from_str(s: &str) -> Result<Self, Self::Err> {
137        match s.to_lowercase().as_str() {
138            "local" => Ok(AdapterDriver::Local),
139            "redis" => Ok(AdapterDriver::Redis),
140            "redis-cluster" => Ok(AdapterDriver::RedisCluster),
141            "nats" => Ok(AdapterDriver::Nats),
142            _ => Err(format!("Unknown adapter driver: {s}")),
143        }
144    }
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
148#[serde(rename_all = "lowercase")]
149pub enum AppManagerDriver {
150    #[default]
151    Memory,
152    Mysql,
153    Dynamodb,
154    PgSql,
155    ScyllaDb,
156}
157impl FromStr for AppManagerDriver {
158    type Err = String;
159    fn from_str(s: &str) -> Result<Self, Self::Err> {
160        match s.to_lowercase().as_str() {
161            "memory" => Ok(AppManagerDriver::Memory),
162            "mysql" => Ok(AppManagerDriver::Mysql),
163            "dynamodb" => Ok(AppManagerDriver::Dynamodb),
164            "pgsql" | "postgres" | "postgresql" => Ok(AppManagerDriver::PgSql),
165            "scylladb" | "scylla" => Ok(AppManagerDriver::ScyllaDb),
166            _ => Err(format!("Unknown app manager driver: {s}")),
167        }
168    }
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
172#[serde(rename_all = "lowercase")]
173pub enum CacheDriver {
174    #[default]
175    Memory,
176    Redis,
177    #[serde(rename = "redis-cluster")]
178    RedisCluster,
179    None,
180}
181
182impl FromStr for CacheDriver {
183    type Err = String;
184    fn from_str(s: &str) -> Result<Self, Self::Err> {
185        match s.to_lowercase().as_str() {
186            "memory" => Ok(CacheDriver::Memory),
187            "redis" => Ok(CacheDriver::Redis),
188            "redis-cluster" => Ok(CacheDriver::RedisCluster),
189            "none" => Ok(CacheDriver::None),
190            _ => Err(format!("Unknown cache driver: {s}")),
191        }
192    }
193}
194
195#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
196#[serde(rename_all = "lowercase")]
197pub enum QueueDriver {
198    Memory,
199    #[default]
200    Redis,
201    #[serde(rename = "redis-cluster")]
202    RedisCluster,
203    Sqs,
204    Sns,
205    None,
206}
207
208impl FromStr for QueueDriver {
209    type Err = String;
210    fn from_str(s: &str) -> Result<Self, Self::Err> {
211        match s.to_lowercase().as_str() {
212            "memory" => Ok(QueueDriver::Memory),
213            "redis" => Ok(QueueDriver::Redis),
214            "redis-cluster" => Ok(QueueDriver::RedisCluster),
215            "sqs" => Ok(QueueDriver::Sqs),
216            "sns" => Ok(QueueDriver::Sns),
217            "none" => Ok(QueueDriver::None),
218            _ => Err(format!("Unknown queue driver: {s}")),
219        }
220    }
221}
222
223impl AsRef<str> for QueueDriver {
224    fn as_ref(&self) -> &str {
225        match self {
226            QueueDriver::Memory => "memory",
227            QueueDriver::Redis => "redis",
228            QueueDriver::RedisCluster => "redis-cluster",
229            QueueDriver::Sqs => "sqs",
230            QueueDriver::Sns => "sns",
231            QueueDriver::None => "none",
232        }
233    }
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
237#[serde(default)]
238pub struct RedisClusterQueueConfig {
239    pub concurrency: u32,
240    pub prefix: Option<String>,
241    pub nodes: Vec<String>,
242    pub request_timeout_ms: u64,
243}
244
245impl Default for RedisClusterQueueConfig {
246    fn default() -> Self {
247        Self {
248            concurrency: 5,
249            prefix: Some("sockudo_queue:".to_string()),
250            nodes: vec!["redis://127.0.0.1:6379".to_string()],
251            request_timeout_ms: 5000,
252        }
253    }
254}
255
256#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
257#[serde(rename_all = "lowercase")]
258pub enum MetricsDriver {
259    #[default]
260    Prometheus,
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
264#[serde(rename_all = "lowercase")]
265pub enum LogOutputFormat {
266    #[default]
267    Human,
268    Json,
269}
270
271impl FromStr for LogOutputFormat {
272    type Err = String;
273    fn from_str(s: &str) -> Result<Self, Self::Err> {
274        match s.to_lowercase().as_str() {
275            "human" => Ok(LogOutputFormat::Human),
276            "json" => Ok(LogOutputFormat::Json),
277            _ => Err(format!("Unknown log output format: {s}")),
278        }
279    }
280}
281
282impl FromStr for MetricsDriver {
283    type Err = String;
284    fn from_str(s: &str) -> Result<Self, Self::Err> {
285        match s.to_lowercase().as_str() {
286            "prometheus" => Ok(MetricsDriver::Prometheus),
287            _ => Err(format!("Unknown metrics driver: {s}")),
288        }
289    }
290}
291
292impl AsRef<str> for MetricsDriver {
293    fn as_ref(&self) -> &str {
294        match self {
295            MetricsDriver::Prometheus => "prometheus",
296        }
297    }
298}
299
300// --- Main Configuration Struct ---
301#[derive(Debug, Clone, Serialize, Deserialize)]
302#[serde(default)]
303pub struct ServerOptions {
304    pub adapter: AdapterConfig,
305    pub app_manager: AppManagerConfig,
306    pub cache: CacheConfig,
307    pub channel_limits: ChannelLimits,
308    pub cors: CorsConfig,
309    pub database: DatabaseConfig,
310    pub database_pooling: DatabasePooling,
311    pub debug: bool,
312    pub event_limits: EventLimits,
313    pub host: String,
314    pub http_api: HttpApiConfig,
315    pub instance: InstanceConfig,
316    pub logging: Option<LoggingConfig>,
317    pub metrics: MetricsConfig,
318    pub mode: String,
319    pub port: u16,
320    pub path_prefix: String,
321    pub presence: PresenceConfig,
322    pub queue: QueueConfig,
323    pub rate_limiter: RateLimiterConfig,
324    pub shutdown_grace_period: u64,
325    pub ssl: SslConfig,
326    pub user_authentication_timeout: u64,
327    pub webhooks: WebhooksConfig,
328    pub websocket_max_payload_kb: u32,
329    pub cleanup: CleanupConfig,
330    pub activity_timeout: u64,
331    pub cluster_health: ClusterHealthConfig,
332    pub unix_socket: UnixSocketConfig,
333    pub delta_compression: DeltaCompressionOptionsConfig,
334    pub tag_filtering: TagFilteringConfig,
335    pub websocket: WebSocketConfig,
336}
337
338// --- Configuration Sub-Structs ---
339
340#[derive(Debug, Clone, Serialize, Deserialize)]
341#[serde(default)]
342pub struct SqsQueueConfig {
343    pub region: String,
344    pub queue_url_prefix: Option<String>,
345    pub visibility_timeout: i32,
346    pub endpoint_url: Option<String>,
347    pub max_messages: i32,
348    pub wait_time_seconds: i32,
349    pub concurrency: u32,
350    pub fifo: bool,
351    pub message_group_id: Option<String>,
352}
353
354#[derive(Debug, Clone, Serialize, Deserialize)]
355#[serde(default)]
356pub struct SnsQueueConfig {
357    pub region: String,
358    pub topic_arn: String,
359    pub endpoint_url: Option<String>,
360}
361
362#[derive(Debug, Clone, Serialize, Deserialize)]
363#[serde(default)]
364pub struct AdapterConfig {
365    pub driver: AdapterDriver,
366    pub redis: RedisAdapterConfig,
367    pub cluster: RedisClusterAdapterConfig,
368    pub nats: NatsAdapterConfig,
369    #[serde(default = "default_buffer_multiplier_per_cpu")]
370    pub buffer_multiplier_per_cpu: usize,
371    pub cluster_health: ClusterHealthConfig,
372    #[serde(default = "default_enable_socket_counting")]
373    pub enable_socket_counting: bool,
374}
375
376fn default_enable_socket_counting() -> bool {
377    true
378}
379
380fn default_buffer_multiplier_per_cpu() -> usize {
381    64
382}
383
384impl Default for AdapterConfig {
385    fn default() -> Self {
386        Self {
387            driver: AdapterDriver::default(),
388            redis: RedisAdapterConfig::default(),
389            cluster: RedisClusterAdapterConfig::default(),
390            nats: NatsAdapterConfig::default(),
391            buffer_multiplier_per_cpu: default_buffer_multiplier_per_cpu(),
392            cluster_health: ClusterHealthConfig::default(),
393            enable_socket_counting: default_enable_socket_counting(),
394        }
395    }
396}
397
398#[derive(Debug, Clone, Serialize, Deserialize)]
399#[serde(default)]
400pub struct RedisAdapterConfig {
401    pub requests_timeout: u64,
402    pub prefix: String,
403    pub redis_pub_options: AHashMap<String, sonic_rs::Value>,
404    pub redis_sub_options: AHashMap<String, sonic_rs::Value>,
405    pub cluster_mode: bool,
406}
407
408#[derive(Debug, Clone, Serialize, Deserialize)]
409#[serde(default)]
410pub struct RedisClusterAdapterConfig {
411    pub nodes: Vec<String>,
412    pub prefix: String,
413    pub request_timeout_ms: u64,
414    pub use_connection_manager: bool,
415    #[serde(default)]
416    pub use_sharded_pubsub: bool,
417}
418
419#[derive(Debug, Clone, Serialize, Deserialize)]
420#[serde(default)]
421pub struct NatsAdapterConfig {
422    pub servers: Vec<String>,
423    pub prefix: String,
424    pub request_timeout_ms: u64,
425    pub username: Option<String>,
426    pub password: Option<String>,
427    pub token: Option<String>,
428    pub connection_timeout_ms: u64,
429    pub nodes_number: Option<u32>,
430}
431
432#[derive(Debug, Clone, Serialize, Deserialize, Default)]
433#[serde(default)]
434pub struct AppManagerConfig {
435    pub driver: AppManagerDriver,
436    pub array: ArrayConfig,
437    pub cache: CacheSettings,
438    pub scylladb: ScyllaDbSettings,
439}
440
441#[derive(Debug, Clone, Serialize, Deserialize, Default)]
442#[serde(default)]
443pub struct ArrayConfig {
444    pub apps: Vec<App>,
445}
446
447#[derive(Debug, Clone, Serialize, Deserialize)]
448#[serde(default)]
449pub struct CacheSettings {
450    pub enabled: bool,
451    pub ttl: u64,
452}
453
454#[derive(Debug, Clone, Serialize, Deserialize)]
455#[serde(default)]
456pub struct MemoryCacheOptions {
457    pub ttl: u64,
458    pub cleanup_interval: u64,
459    pub max_capacity: u64,
460}
461
462#[derive(Debug, Clone, Serialize, Deserialize)]
463#[serde(default)]
464pub struct CacheConfig {
465    pub driver: CacheDriver,
466    pub redis: RedisConfig,
467    pub memory: MemoryCacheOptions,
468}
469
470#[derive(Debug, Clone, Serialize, Deserialize, Default)]
471#[serde(default)]
472pub struct RedisConfig {
473    pub prefix: Option<String>,
474    pub url_override: Option<String>,
475    pub cluster_mode: bool,
476}
477
478#[derive(Debug, Clone, Serialize, Deserialize)]
479#[serde(default)]
480pub struct ChannelLimits {
481    pub max_name_length: u32,
482    pub cache_ttl: u64,
483}
484
485#[derive(Debug, Clone, Serialize, Deserialize)]
486#[serde(default)]
487pub struct CorsConfig {
488    pub credentials: bool,
489    #[serde(deserialize_with = "deserialize_and_validate_cors_origins")]
490    pub origin: Vec<String>,
491    pub methods: Vec<String>,
492    pub allowed_headers: Vec<String>,
493}
494
495fn deserialize_and_validate_cors_origins<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
496where
497    D: serde::Deserializer<'de>,
498{
499    use serde::de::Error;
500    let origins = Vec::<String>::deserialize(deserializer)?;
501
502    if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&origins) {
503        return Err(D::Error::custom(format!(
504            "CORS origin pattern validation failed: {}",
505            e
506        )));
507    }
508
509    Ok(origins)
510}
511
512#[derive(Debug, Clone, Serialize, Deserialize, Default)]
513#[serde(default)]
514pub struct DatabaseConfig {
515    pub mysql: DatabaseConnection,
516    pub postgres: DatabaseConnection,
517    pub redis: RedisConnection,
518    pub dynamodb: DynamoDbSettings,
519    pub scylladb: ScyllaDbSettings,
520}
521
522#[derive(Debug, Clone, Serialize, Deserialize)]
523#[serde(default)]
524pub struct DatabaseConnection {
525    pub host: String,
526    pub port: u16,
527    pub username: String,
528    pub password: String,
529    pub database: String,
530    pub table_name: String,
531    pub connection_pool_size: u32,
532    pub pool_min: Option<u32>,
533    pub pool_max: Option<u32>,
534    pub cache_ttl: u64,
535    pub cache_cleanup_interval: u64,
536    pub cache_max_capacity: u64,
537}
538
539#[derive(Debug, Clone, Serialize, Deserialize)]
540#[serde(default)]
541pub struct RedisConnection {
542    pub host: String,
543    pub port: u16,
544    pub db: u32,
545    pub username: Option<String>,
546    pub password: Option<String>,
547    pub key_prefix: String,
548    pub sentinels: Vec<RedisSentinel>,
549    pub sentinel_password: Option<String>,
550    pub name: String,
551    pub cluster: RedisClusterConnection,
552    /// Legacy field kept for backward compatibility. Prefer `database.redis.cluster.nodes`.
553    pub cluster_nodes: Vec<ClusterNode>,
554}
555
556#[derive(Debug, Clone, Serialize, Deserialize)]
557#[serde(default)]
558pub struct RedisSentinel {
559    pub host: String,
560    pub port: u16,
561}
562
563#[derive(Debug, Clone, Serialize, Deserialize, Default)]
564#[serde(default)]
565pub struct RedisClusterConnection {
566    pub nodes: Vec<ClusterNode>,
567    pub username: Option<String>,
568    pub password: Option<String>,
569    #[serde(alias = "useTLS")]
570    pub use_tls: bool,
571}
572
573impl RedisConnection {
574    /// Returns true if Redis Sentinel is configured.
575    pub fn is_sentinel_configured(&self) -> bool {
576        !self.sentinels.is_empty()
577    }
578
579    /// Builds a Redis connection URL based on the configuration.
580    pub fn to_url(&self) -> String {
581        if self.is_sentinel_configured() {
582            self.build_sentinel_url()
583        } else {
584            self.build_standard_url()
585        }
586    }
587
588    fn build_standard_url(&self) -> String {
589        // Extract scheme from host if present, otherwise default to redis://
590        let (scheme, host) = if self.host.starts_with("rediss://") {
591            ("rediss://", self.host.trim_start_matches("rediss://"))
592        } else if self.host.starts_with("redis://") {
593            ("redis://", self.host.trim_start_matches("redis://"))
594        } else {
595            ("redis://", self.host.as_str())
596        };
597
598        let mut url = String::from(scheme);
599
600        if let Some(ref username) = self.username {
601            url.push_str(username);
602            if let Some(ref password) = self.password {
603                url.push(':');
604                url.push_str(&urlencoding::encode(password));
605            }
606            url.push('@');
607        } else if let Some(ref password) = self.password {
608            url.push(':');
609            url.push_str(&urlencoding::encode(password));
610            url.push('@');
611        }
612
613        url.push_str(host);
614        url.push(':');
615        url.push_str(&self.port.to_string());
616        url.push('/');
617        url.push_str(&self.db.to_string());
618
619        url
620    }
621
622    fn build_sentinel_url(&self) -> String {
623        let mut url = String::from("redis+sentinel://");
624
625        if let Some(ref sentinel_password) = self.sentinel_password {
626            url.push(':');
627            url.push_str(&urlencoding::encode(sentinel_password));
628            url.push('@');
629        }
630
631        let sentinel_hosts: Vec<String> = self
632            .sentinels
633            .iter()
634            .map(|s| format!("{}:{}", s.host, s.port))
635            .collect();
636        url.push_str(&sentinel_hosts.join(","));
637
638        url.push('/');
639        url.push_str(&self.name);
640        url.push('/');
641        url.push_str(&self.db.to_string());
642
643        let mut params = Vec::new();
644        if let Some(ref password) = self.password {
645            params.push(format!("password={}", urlencoding::encode(password)));
646        }
647        if let Some(ref username) = self.username {
648            params.push(format!("username={}", urlencoding::encode(username)));
649        }
650
651        if !params.is_empty() {
652            url.push('?');
653            url.push_str(&params.join("&"));
654        }
655
656        url
657    }
658
659    /// Returns true when cluster nodes are configured via either the new (`cluster.nodes`)
660    /// or legacy (`cluster_nodes`) field.
661    pub fn has_cluster_nodes(&self) -> bool {
662        !self.cluster.nodes.is_empty() || !self.cluster_nodes.is_empty()
663    }
664
665    /// Returns normalized Redis Cluster seed URLs from the canonical cluster configuration.
666    /// Falls back to legacy `cluster_nodes` for backward compatibility.
667    pub fn cluster_node_urls(&self) -> Vec<String> {
668        if !self.cluster.nodes.is_empty() {
669            return self.build_cluster_urls(&self.cluster.nodes);
670        }
671        self.build_cluster_urls(&self.cluster_nodes)
672    }
673
674    /// Normalizes any list of seed strings (`host:port`, `redis://...`, `rediss://...`) using
675    /// shared cluster auth/TLS options.
676    pub fn normalize_cluster_seed_urls(&self, seeds: &[String]) -> Vec<String> {
677        self.build_cluster_urls(
678            &seeds
679                .iter()
680                .filter_map(|seed| ClusterNode::from_seed(seed))
681                .collect::<Vec<ClusterNode>>(),
682        )
683    }
684
685    fn build_cluster_urls(&self, nodes: &[ClusterNode]) -> Vec<String> {
686        let username = self
687            .cluster
688            .username
689            .as_deref()
690            .or(self.username.as_deref());
691        let password = self
692            .cluster
693            .password
694            .as_deref()
695            .or(self.password.as_deref());
696        let use_tls = self.cluster.use_tls;
697
698        nodes
699            .iter()
700            .map(|node| node.to_url_with_options(use_tls, username, password))
701            .collect()
702    }
703}
704
705impl RedisSentinel {
706    pub fn to_host_port(&self) -> String {
707        format!("{}:{}", self.host, self.port)
708    }
709}
710
711#[derive(Debug, Clone, Serialize, Deserialize)]
712#[serde(default)]
713pub struct ClusterNode {
714    pub host: String,
715    pub port: u16,
716}
717
718impl ClusterNode {
719    pub fn to_url(&self) -> String {
720        self.to_url_with_options(false, None, None)
721    }
722
723    pub fn to_url_with_options(
724        &self,
725        use_tls: bool,
726        username: Option<&str>,
727        password: Option<&str>,
728    ) -> String {
729        let host = self.host.trim();
730
731        if host.starts_with("redis://") || host.starts_with("rediss://") {
732            if let Ok(parsed) = Url::parse(host)
733                && let Some(host_str) = parsed.host_str()
734            {
735                let scheme = parsed.scheme();
736                let port = parsed.port_or_known_default().unwrap_or(self.port);
737                let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
738                let parsed_password = parsed.password();
739                let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
740                let (effective_username, effective_password) = if has_embedded_auth {
741                    (parsed_username, parsed_password)
742                } else {
743                    (username, password)
744                };
745
746                return build_redis_url(
747                    scheme,
748                    host_str,
749                    port,
750                    effective_username,
751                    effective_password,
752                );
753            }
754
755            // Fallback for malformed URLs
756            let has_port = if let Some(bracket_pos) = host.rfind(']') {
757                host[bracket_pos..].contains(':')
758            } else {
759                host.split(':').count() >= 3
760            };
761            let base = if has_port {
762                host.to_string()
763            } else {
764                format!("{}:{}", host, self.port)
765            };
766
767            if let Ok(parsed) = Url::parse(&base) {
768                let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
769                let parsed_password = parsed.password();
770                if let Some(host_str) = parsed.host_str() {
771                    let port = parsed.port_or_known_default().unwrap_or(self.port);
772                    let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
773                    let (effective_username, effective_password) = if has_embedded_auth {
774                        (parsed_username, parsed_password)
775                    } else {
776                        (username, password)
777                    };
778                    return build_redis_url(
779                        parsed.scheme(),
780                        host_str,
781                        port,
782                        effective_username,
783                        effective_password,
784                    );
785                }
786            }
787            return base;
788        }
789
790        let (normalized_host, normalized_port) = split_plain_host_and_port(host, self.port);
791        let scheme = if use_tls { "rediss" } else { "redis" };
792        build_redis_url(
793            scheme,
794            &normalized_host,
795            normalized_port,
796            username,
797            password,
798        )
799    }
800
801    pub fn from_seed(seed: &str) -> Option<Self> {
802        let trimmed = seed.trim();
803        if trimmed.is_empty() {
804            return None;
805        }
806
807        if trimmed.starts_with("redis://") || trimmed.starts_with("rediss://") {
808            let port = Url::parse(trimmed)
809                .ok()
810                .and_then(|parsed| parsed.port_or_known_default())
811                .unwrap_or(6379);
812            return Some(Self {
813                host: trimmed.to_string(),
814                port,
815            });
816        }
817
818        let (host, port) = split_plain_host_and_port(trimmed, 6379);
819        Some(Self { host, port })
820    }
821}
822
823fn split_plain_host_and_port(raw_host: &str, default_port: u16) -> (String, u16) {
824    let host = raw_host.trim();
825
826    // Handle bracketed IPv6: [::1]:6379
827    if host.starts_with('[') {
828        if let Some(end_bracket) = host.find(']') {
829            let host_part = host[1..end_bracket].to_string();
830            let remainder = &host[end_bracket + 1..];
831            if let Some(port_str) = remainder.strip_prefix(':')
832                && let Ok(port) = port_str.parse::<u16>()
833            {
834                return (host_part, port);
835            }
836            return (host_part, default_port);
837        }
838        return (host.to_string(), default_port);
839    }
840
841    // Handle hostname/IP with port: host:6379
842    if host.matches(':').count() == 1
843        && let Some((host_part, port_part)) = host.rsplit_once(':')
844        && let Ok(port) = port_part.parse::<u16>()
845    {
846        return (host_part.to_string(), port);
847    }
848
849    (host.to_string(), default_port)
850}
851
852fn build_redis_url(
853    scheme: &str,
854    host: &str,
855    port: u16,
856    username: Option<&str>,
857    password: Option<&str>,
858) -> String {
859    let mut url = format!("{scheme}://");
860
861    if let Some(user) = username {
862        url.push_str(&urlencoding::encode(user));
863        if let Some(pass) = password {
864            url.push(':');
865            url.push_str(&urlencoding::encode(pass));
866        }
867        url.push('@');
868    } else if let Some(pass) = password {
869        url.push(':');
870        url.push_str(&urlencoding::encode(pass));
871        url.push('@');
872    }
873
874    if host.contains(':') && !host.starts_with('[') {
875        url.push('[');
876        url.push_str(host);
877        url.push(']');
878    } else {
879        url.push_str(host);
880    }
881    url.push(':');
882    url.push_str(&port.to_string());
883    url
884}
885
886#[derive(Debug, Clone, Serialize, Deserialize)]
887#[serde(default)]
888pub struct DatabasePooling {
889    pub enabled: bool,
890    pub min: u32,
891    pub max: u32,
892}
893
894#[derive(Debug, Clone, Serialize, Deserialize)]
895#[serde(default)]
896pub struct EventLimits {
897    pub max_channels_at_once: u32,
898    pub max_name_length: u32,
899    pub max_payload_in_kb: u32,
900    pub max_batch_size: u32,
901}
902
903#[derive(Debug, Clone, Serialize, Deserialize)]
904#[serde(default)]
905pub struct HttpApiConfig {
906    pub request_limit_in_mb: u32,
907    pub accept_traffic: AcceptTraffic,
908    pub usage_enabled: bool,
909}
910
911#[derive(Debug, Clone, Serialize, Deserialize)]
912#[serde(default)]
913pub struct AcceptTraffic {
914    pub memory_threshold: f64,
915}
916
917#[derive(Debug, Clone, Serialize, Deserialize)]
918#[serde(default)]
919pub struct InstanceConfig {
920    pub process_id: String,
921}
922
923#[derive(Debug, Clone, Serialize, Deserialize)]
924#[serde(default)]
925pub struct MetricsConfig {
926    pub enabled: bool,
927    pub driver: MetricsDriver,
928    pub host: String,
929    pub prometheus: PrometheusConfig,
930    pub port: u16,
931}
932
933#[derive(Debug, Clone, Serialize, Deserialize)]
934#[serde(default)]
935pub struct PrometheusConfig {
936    pub prefix: String,
937}
938
939#[derive(Debug, Clone, Serialize, Deserialize)]
940#[serde(default)]
941pub struct LoggingConfig {
942    pub colors_enabled: bool,
943    pub include_target: bool,
944}
945
946#[derive(Debug, Clone, Serialize, Deserialize)]
947#[serde(default)]
948pub struct PresenceConfig {
949    pub max_members_per_channel: u32,
950    pub max_member_size_in_kb: u32,
951}
952
953/// WebSocket connection buffer configuration
954/// Controls backpressure handling for slow consumers
955#[derive(Debug, Clone, Serialize, Deserialize)]
956#[serde(default)]
957pub struct WebSocketConfig {
958    pub max_messages: Option<usize>,
959    pub max_bytes: Option<usize>,
960    pub disconnect_on_buffer_full: bool,
961    pub max_message_size: usize,
962    pub max_frame_size: usize,
963    pub write_buffer_size: usize,
964    pub max_backpressure: usize,
965    pub auto_ping: bool,
966    pub ping_interval: u32,
967    pub idle_timeout: u32,
968    pub compression: String,
969}
970
971impl Default for WebSocketConfig {
972    fn default() -> Self {
973        Self {
974            max_messages: Some(1000),
975            max_bytes: None,
976            disconnect_on_buffer_full: true,
977            max_message_size: 64 * 1024 * 1024,
978            max_frame_size: 16 * 1024 * 1024,
979            write_buffer_size: 16 * 1024,
980            max_backpressure: 1024 * 1024,
981            auto_ping: true,
982            ping_interval: 30,
983            idle_timeout: 120,
984            compression: "disabled".to_string(),
985        }
986    }
987}
988
989impl WebSocketConfig {
990    /// Convert to WebSocketBufferConfig for runtime use
991    pub fn to_buffer_config(&self) -> crate::websocket::WebSocketBufferConfig {
992        use crate::websocket::{BufferLimit, WebSocketBufferConfig};
993
994        let limit = match (self.max_messages, self.max_bytes) {
995            (Some(messages), Some(bytes)) => BufferLimit::Both { messages, bytes },
996            (Some(messages), None) => BufferLimit::Messages(messages),
997            (None, Some(bytes)) => BufferLimit::Bytes(bytes),
998            (None, None) => BufferLimit::Messages(1000),
999        };
1000
1001        WebSocketBufferConfig {
1002            limit,
1003            disconnect_on_full: self.disconnect_on_buffer_full,
1004        }
1005    }
1006
1007    /// Convert to native sockudo-ws runtime configuration.
1008    pub fn to_sockudo_ws_config(
1009        &self,
1010        websocket_max_payload_kb: u32,
1011        activity_timeout: u64,
1012    ) -> sockudo_ws::Config {
1013        use sockudo_ws::Compression;
1014
1015        let compression = match self.compression.to_lowercase().as_str() {
1016            "dedicated" => Compression::Dedicated,
1017            "shared" => Compression::Shared,
1018            "window256b" => Compression::Window256B,
1019            "window1kb" => Compression::Window1KB,
1020            "window2kb" => Compression::Window2KB,
1021            "window4kb" => Compression::Window4KB,
1022            "window8kb" => Compression::Window8KB,
1023            "window16kb" => Compression::Window16KB,
1024            "window32kb" => Compression::Window32KB,
1025            _ => Compression::Disabled,
1026        };
1027
1028        sockudo_ws::Config::builder()
1029            .max_payload_length(
1030                self.max_bytes
1031                    .unwrap_or(websocket_max_payload_kb as usize * 1024),
1032            )
1033            .max_message_size(self.max_message_size)
1034            .max_frame_size(self.max_frame_size)
1035            .write_buffer_size(self.write_buffer_size)
1036            .max_backpressure(self.max_backpressure)
1037            .idle_timeout(self.idle_timeout)
1038            .auto_ping(self.auto_ping)
1039            .ping_interval(self.ping_interval.max((activity_timeout / 2).max(5) as u32))
1040            .compression(compression)
1041            .build()
1042    }
1043}
1044
1045#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1046#[serde(default)]
1047pub struct QueueConfig {
1048    pub driver: QueueDriver,
1049    pub redis: RedisQueueConfig,
1050    pub redis_cluster: RedisClusterQueueConfig,
1051    pub sqs: SqsQueueConfig,
1052    pub sns: SnsQueueConfig,
1053}
1054
1055#[derive(Debug, Clone, Serialize, Deserialize)]
1056#[serde(default)]
1057pub struct RedisQueueConfig {
1058    pub concurrency: u32,
1059    pub prefix: Option<String>,
1060    pub url_override: Option<String>,
1061    pub cluster_mode: bool,
1062}
1063
1064#[derive(Clone, Debug, Serialize, Deserialize)]
1065#[serde(default)]
1066pub struct RateLimit {
1067    pub max_requests: u32,
1068    pub window_seconds: u64,
1069    pub identifier: Option<String>,
1070    pub trust_hops: Option<u32>,
1071}
1072
1073#[derive(Debug, Clone, Serialize, Deserialize)]
1074#[serde(default)]
1075pub struct RateLimiterConfig {
1076    pub enabled: bool,
1077    pub driver: CacheDriver,
1078    pub api_rate_limit: RateLimit,
1079    pub websocket_rate_limit: RateLimit,
1080    pub redis: RedisConfig,
1081}
1082
1083#[derive(Debug, Clone, Serialize, Deserialize)]
1084#[serde(default)]
1085pub struct SslConfig {
1086    pub enabled: bool,
1087    pub cert_path: String,
1088    pub key_path: String,
1089    pub passphrase: Option<String>,
1090    pub ca_path: Option<String>,
1091    pub redirect_http: bool,
1092    pub http_port: Option<u16>,
1093}
1094
1095#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1096#[serde(default)]
1097pub struct WebhooksConfig {
1098    pub batching: BatchingConfig,
1099}
1100
1101#[derive(Debug, Clone, Serialize, Deserialize)]
1102#[serde(default)]
1103pub struct BatchingConfig {
1104    pub enabled: bool,
1105    pub duration: u64,
1106    pub size: usize,
1107}
1108
1109#[derive(Debug, Clone, Serialize, Deserialize)]
1110#[serde(default)]
1111pub struct ClusterHealthConfig {
1112    pub enabled: bool,
1113    pub heartbeat_interval_ms: u64,
1114    pub node_timeout_ms: u64,
1115    pub cleanup_interval_ms: u64,
1116}
1117
1118#[derive(Debug, Clone, Serialize, Deserialize)]
1119#[serde(default)]
1120pub struct UnixSocketConfig {
1121    pub enabled: bool,
1122    pub path: String,
1123    #[serde(deserialize_with = "deserialize_octal_permission")]
1124    pub permission_mode: u32,
1125}
1126
1127#[derive(Debug, Clone, Serialize, Deserialize)]
1128#[serde(default)]
1129pub struct DeltaCompressionOptionsConfig {
1130    pub enabled: bool,
1131    pub algorithm: String,
1132    pub full_message_interval: u32,
1133    pub min_message_size: usize,
1134    pub max_state_age_secs: u64,
1135    pub max_channel_states_per_socket: usize,
1136    pub max_conflation_states_per_channel: Option<usize>,
1137    pub conflation_key_path: Option<String>,
1138    pub cluster_coordination: bool,
1139    pub omit_delta_algorithm: bool,
1140}
1141
1142/// Cleanup system configuration (minimal version for options; full impl lives in sockudo crate)
1143#[derive(Debug, Clone, Serialize, Deserialize)]
1144#[serde(default)]
1145pub struct CleanupConfig {
1146    pub queue_buffer_size: usize,
1147    pub batch_size: usize,
1148    pub batch_timeout_ms: u64,
1149    pub worker_threads: WorkerThreadsConfig,
1150    pub max_retry_attempts: u32,
1151    pub async_enabled: bool,
1152    pub fallback_to_sync: bool,
1153}
1154
1155/// Worker threads configuration for the cleanup system
1156#[derive(Debug, Clone)]
1157pub enum WorkerThreadsConfig {
1158    Auto,
1159    Fixed(usize),
1160}
1161
1162impl serde::Serialize for WorkerThreadsConfig {
1163    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1164    where
1165        S: serde::Serializer,
1166    {
1167        match self {
1168            WorkerThreadsConfig::Auto => serializer.serialize_str("auto"),
1169            WorkerThreadsConfig::Fixed(n) => serializer.serialize_u64(*n as u64),
1170        }
1171    }
1172}
1173
1174impl<'de> serde::Deserialize<'de> for WorkerThreadsConfig {
1175    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1176    where
1177        D: serde::Deserializer<'de>,
1178    {
1179        use serde::de;
1180        struct WorkerThreadsVisitor;
1181        impl<'de> de::Visitor<'de> for WorkerThreadsVisitor {
1182            type Value = WorkerThreadsConfig;
1183
1184            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1185                formatter.write_str(r#""auto" or a positive integer"#)
1186            }
1187
1188            fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
1189                if value.eq_ignore_ascii_case("auto") {
1190                    Ok(WorkerThreadsConfig::Auto)
1191                } else if let Ok(n) = value.parse::<usize>() {
1192                    Ok(WorkerThreadsConfig::Fixed(n))
1193                } else {
1194                    Err(E::custom(format!(
1195                        "expected 'auto' or a number, got '{value}'"
1196                    )))
1197                }
1198            }
1199
1200            fn visit_u64<E: de::Error>(self, value: u64) -> Result<Self::Value, E> {
1201                Ok(WorkerThreadsConfig::Fixed(value as usize))
1202            }
1203
1204            fn visit_i64<E: de::Error>(self, value: i64) -> Result<Self::Value, E> {
1205                if value >= 0 {
1206                    Ok(WorkerThreadsConfig::Fixed(value as usize))
1207                } else {
1208                    Err(E::custom("worker_threads must be non-negative"))
1209                }
1210            }
1211        }
1212        deserializer.deserialize_any(WorkerThreadsVisitor)
1213    }
1214}
1215
1216impl Default for CleanupConfig {
1217    fn default() -> Self {
1218        Self {
1219            queue_buffer_size: 1024,
1220            batch_size: 64,
1221            batch_timeout_ms: 100,
1222            worker_threads: WorkerThreadsConfig::Auto,
1223            max_retry_attempts: 3,
1224            async_enabled: true,
1225            fallback_to_sync: true,
1226        }
1227    }
1228}
1229
1230impl CleanupConfig {
1231    pub fn validate(&self) -> Result<(), String> {
1232        if self.queue_buffer_size == 0 {
1233            return Err("queue_buffer_size must be greater than 0".to_string());
1234        }
1235        if self.batch_size == 0 {
1236            return Err("batch_size must be greater than 0".to_string());
1237        }
1238        if self.batch_timeout_ms == 0 {
1239            return Err("batch_timeout_ms must be greater than 0".to_string());
1240        }
1241        if let WorkerThreadsConfig::Fixed(n) = self.worker_threads
1242            && n == 0
1243        {
1244            return Err("worker_threads must be greater than 0 when using fixed count".to_string());
1245        }
1246        Ok(())
1247    }
1248}
1249
1250#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1251#[serde(default)]
1252pub struct TagFilteringConfig {
1253    #[serde(default)]
1254    pub enabled: bool,
1255    #[serde(default = "default_true")]
1256    pub enable_tags: bool,
1257}
1258
1259fn default_true() -> bool {
1260    true
1261}
1262
1263// --- Default Implementations ---
1264
1265impl Default for ServerOptions {
1266    fn default() -> Self {
1267        Self {
1268            adapter: AdapterConfig::default(),
1269            app_manager: AppManagerConfig::default(),
1270            cache: CacheConfig::default(),
1271            channel_limits: ChannelLimits::default(),
1272            cors: CorsConfig::default(),
1273            database: DatabaseConfig::default(),
1274            database_pooling: DatabasePooling::default(),
1275            debug: false,
1276            tag_filtering: TagFilteringConfig::default(),
1277            event_limits: EventLimits::default(),
1278            host: "0.0.0.0".to_string(),
1279            http_api: HttpApiConfig::default(),
1280            instance: InstanceConfig::default(),
1281            logging: None,
1282            metrics: MetricsConfig::default(),
1283            mode: "production".to_string(),
1284            port: 6001,
1285            path_prefix: "/".to_string(),
1286            presence: PresenceConfig::default(),
1287            queue: QueueConfig::default(),
1288            rate_limiter: RateLimiterConfig::default(),
1289            shutdown_grace_period: 10,
1290            ssl: SslConfig::default(),
1291            user_authentication_timeout: 3600,
1292            webhooks: WebhooksConfig::default(),
1293            websocket_max_payload_kb: 64,
1294            cleanup: CleanupConfig::default(),
1295            activity_timeout: 120,
1296            cluster_health: ClusterHealthConfig::default(),
1297            unix_socket: UnixSocketConfig::default(),
1298            delta_compression: DeltaCompressionOptionsConfig::default(),
1299            websocket: WebSocketConfig::default(),
1300        }
1301    }
1302}
1303
1304impl Default for SqsQueueConfig {
1305    fn default() -> Self {
1306        Self {
1307            region: "us-east-1".to_string(),
1308            queue_url_prefix: None,
1309            visibility_timeout: 30,
1310            endpoint_url: None,
1311            max_messages: 10,
1312            wait_time_seconds: 5,
1313            concurrency: 5,
1314            fifo: false,
1315            message_group_id: Some("default".to_string()),
1316        }
1317    }
1318}
1319
1320impl Default for SnsQueueConfig {
1321    fn default() -> Self {
1322        Self {
1323            region: "us-east-1".to_string(),
1324            topic_arn: String::new(),
1325            endpoint_url: None,
1326        }
1327    }
1328}
1329
1330impl Default for RedisAdapterConfig {
1331    fn default() -> Self {
1332        Self {
1333            requests_timeout: 5000,
1334            prefix: "sockudo_adapter:".to_string(),
1335            redis_pub_options: AHashMap::new(),
1336            redis_sub_options: AHashMap::new(),
1337            cluster_mode: false,
1338        }
1339    }
1340}
1341
1342impl Default for RedisClusterAdapterConfig {
1343    fn default() -> Self {
1344        Self {
1345            nodes: vec![],
1346            prefix: "sockudo_adapter:".to_string(),
1347            request_timeout_ms: 1000,
1348            use_connection_manager: true,
1349            use_sharded_pubsub: false,
1350        }
1351    }
1352}
1353
1354impl Default for NatsAdapterConfig {
1355    fn default() -> Self {
1356        Self {
1357            servers: vec!["nats://localhost:4222".to_string()],
1358            prefix: "sockudo_adapter:".to_string(),
1359            request_timeout_ms: 5000,
1360            username: None,
1361            password: None,
1362            token: None,
1363            connection_timeout_ms: 5000,
1364            nodes_number: None,
1365        }
1366    }
1367}
1368
1369impl Default for CacheSettings {
1370    fn default() -> Self {
1371        Self {
1372            enabled: true,
1373            ttl: 300,
1374        }
1375    }
1376}
1377
1378impl Default for MemoryCacheOptions {
1379    fn default() -> Self {
1380        Self {
1381            ttl: 300,
1382            cleanup_interval: 60,
1383            max_capacity: 10000,
1384        }
1385    }
1386}
1387
1388impl Default for CacheConfig {
1389    fn default() -> Self {
1390        Self {
1391            driver: CacheDriver::default(),
1392            redis: RedisConfig {
1393                prefix: Some("sockudo_cache:".to_string()),
1394                url_override: None,
1395                cluster_mode: false,
1396            },
1397            memory: MemoryCacheOptions::default(),
1398        }
1399    }
1400}
1401
1402impl Default for ChannelLimits {
1403    fn default() -> Self {
1404        Self {
1405            max_name_length: 200,
1406            cache_ttl: 3600,
1407        }
1408    }
1409}
1410impl Default for CorsConfig {
1411    fn default() -> Self {
1412        Self {
1413            credentials: true,
1414            origin: vec!["*".to_string()],
1415            methods: vec!["GET".to_string(), "POST".to_string(), "OPTIONS".to_string()],
1416            allowed_headers: vec![
1417                "Authorization".to_string(),
1418                "Content-Type".to_string(),
1419                "X-Requested-With".to_string(),
1420                "Accept".to_string(),
1421            ],
1422        }
1423    }
1424}
1425
1426impl Default for DatabaseConnection {
1427    fn default() -> Self {
1428        Self {
1429            host: "localhost".to_string(),
1430            port: 3306,
1431            username: "root".to_string(),
1432            password: "".to_string(),
1433            database: "sockudo".to_string(),
1434            table_name: "applications".to_string(),
1435            connection_pool_size: 10,
1436            pool_min: None,
1437            pool_max: None,
1438            cache_ttl: 300,
1439            cache_cleanup_interval: 60,
1440            cache_max_capacity: 100,
1441        }
1442    }
1443}
1444
1445impl Default for RedisConnection {
1446    fn default() -> Self {
1447        Self {
1448            host: "127.0.0.1".to_string(),
1449            port: 6379,
1450            db: 0,
1451            username: None,
1452            password: None,
1453            key_prefix: "sockudo:".to_string(),
1454            sentinels: Vec::new(),
1455            sentinel_password: None,
1456            name: "mymaster".to_string(),
1457            cluster: RedisClusterConnection::default(),
1458            cluster_nodes: Vec::new(),
1459        }
1460    }
1461}
1462
1463impl Default for RedisSentinel {
1464    fn default() -> Self {
1465        Self {
1466            host: "localhost".to_string(),
1467            port: 26379,
1468        }
1469    }
1470}
1471
1472impl Default for ClusterNode {
1473    fn default() -> Self {
1474        Self {
1475            host: "127.0.0.1".to_string(),
1476            port: 7000,
1477        }
1478    }
1479}
1480
1481impl Default for DatabasePooling {
1482    fn default() -> Self {
1483        Self {
1484            enabled: true,
1485            min: 2,
1486            max: 10,
1487        }
1488    }
1489}
1490
1491impl Default for EventLimits {
1492    fn default() -> Self {
1493        Self {
1494            max_channels_at_once: 100,
1495            max_name_length: 200,
1496            max_payload_in_kb: 100,
1497            max_batch_size: 10,
1498        }
1499    }
1500}
1501
1502impl Default for HttpApiConfig {
1503    fn default() -> Self {
1504        Self {
1505            request_limit_in_mb: 10,
1506            accept_traffic: AcceptTraffic::default(),
1507            usage_enabled: true,
1508        }
1509    }
1510}
1511
1512impl Default for AcceptTraffic {
1513    fn default() -> Self {
1514        Self {
1515            memory_threshold: 0.90,
1516        }
1517    }
1518}
1519
1520impl Default for InstanceConfig {
1521    fn default() -> Self {
1522        Self {
1523            process_id: uuid::Uuid::new_v4().to_string(),
1524        }
1525    }
1526}
1527
1528impl Default for LoggingConfig {
1529    fn default() -> Self {
1530        Self {
1531            colors_enabled: true,
1532            include_target: true,
1533        }
1534    }
1535}
1536
1537impl Default for MetricsConfig {
1538    fn default() -> Self {
1539        Self {
1540            enabled: true,
1541            driver: MetricsDriver::default(),
1542            host: "0.0.0.0".to_string(),
1543            prometheus: PrometheusConfig::default(),
1544            port: 9601,
1545        }
1546    }
1547}
1548
1549impl Default for PrometheusConfig {
1550    fn default() -> Self {
1551        Self {
1552            prefix: "sockudo_".to_string(),
1553        }
1554    }
1555}
1556
1557impl Default for PresenceConfig {
1558    fn default() -> Self {
1559        Self {
1560            max_members_per_channel: 100,
1561            max_member_size_in_kb: 2,
1562        }
1563    }
1564}
1565
1566impl Default for RedisQueueConfig {
1567    fn default() -> Self {
1568        Self {
1569            concurrency: 5,
1570            prefix: Some("sockudo_queue:".to_string()),
1571            url_override: None,
1572            cluster_mode: false,
1573        }
1574    }
1575}
1576
1577impl Default for RateLimit {
1578    fn default() -> Self {
1579        Self {
1580            max_requests: 60,
1581            window_seconds: 60,
1582            identifier: Some("default".to_string()),
1583            trust_hops: Some(0),
1584        }
1585    }
1586}
1587
1588impl Default for RateLimiterConfig {
1589    fn default() -> Self {
1590        Self {
1591            enabled: true,
1592            driver: CacheDriver::Memory,
1593            api_rate_limit: RateLimit {
1594                max_requests: 100,
1595                window_seconds: 60,
1596                identifier: Some("api".to_string()),
1597                trust_hops: Some(0),
1598            },
1599            websocket_rate_limit: RateLimit {
1600                max_requests: 20,
1601                window_seconds: 60,
1602                identifier: Some("websocket_connect".to_string()),
1603                trust_hops: Some(0),
1604            },
1605            redis: RedisConfig {
1606                prefix: Some("sockudo_rl:".to_string()),
1607                url_override: None,
1608                cluster_mode: false,
1609            },
1610        }
1611    }
1612}
1613
1614impl Default for SslConfig {
1615    fn default() -> Self {
1616        Self {
1617            enabled: false,
1618            cert_path: "".to_string(),
1619            key_path: "".to_string(),
1620            passphrase: None,
1621            ca_path: None,
1622            redirect_http: false,
1623            http_port: Some(80),
1624        }
1625    }
1626}
1627
1628impl Default for BatchingConfig {
1629    fn default() -> Self {
1630        Self {
1631            enabled: true,
1632            duration: 50,
1633            size: 100,
1634        }
1635    }
1636}
1637
1638impl Default for ClusterHealthConfig {
1639    fn default() -> Self {
1640        Self {
1641            enabled: true,
1642            heartbeat_interval_ms: 10000,
1643            node_timeout_ms: 30000,
1644            cleanup_interval_ms: 10000,
1645        }
1646    }
1647}
1648
1649impl Default for UnixSocketConfig {
1650    fn default() -> Self {
1651        Self {
1652            enabled: false,
1653            path: "/var/run/sockudo/sockudo.sock".to_string(),
1654            permission_mode: 0o660,
1655        }
1656    }
1657}
1658
1659impl Default for DeltaCompressionOptionsConfig {
1660    fn default() -> Self {
1661        Self {
1662            enabled: true,
1663            algorithm: "fossil".to_string(),
1664            full_message_interval: 10,
1665            min_message_size: 100,
1666            max_state_age_secs: 300,
1667            max_channel_states_per_socket: 100,
1668            max_conflation_states_per_channel: Some(100),
1669            conflation_key_path: None,
1670            cluster_coordination: false,
1671            omit_delta_algorithm: false,
1672        }
1673    }
1674}
1675
1676impl ClusterHealthConfig {
1677    pub fn validate(&self) -> Result<(), String> {
1678        if self.heartbeat_interval_ms == 0 {
1679            return Err("heartbeat_interval_ms must be greater than 0".to_string());
1680        }
1681        if self.node_timeout_ms == 0 {
1682            return Err("node_timeout_ms must be greater than 0".to_string());
1683        }
1684        if self.cleanup_interval_ms == 0 {
1685            return Err("cleanup_interval_ms must be greater than 0".to_string());
1686        }
1687
1688        if self.heartbeat_interval_ms > self.node_timeout_ms / 3 {
1689            return Err(format!(
1690                "heartbeat_interval_ms ({}) should be at least 3x smaller than node_timeout_ms ({}) to avoid false positive dead node detection. Recommended: heartbeat_interval_ms <= {}",
1691                self.heartbeat_interval_ms,
1692                self.node_timeout_ms,
1693                self.node_timeout_ms / 3
1694            ));
1695        }
1696
1697        if self.cleanup_interval_ms > self.node_timeout_ms {
1698            return Err(format!(
1699                "cleanup_interval_ms ({}) should not be larger than node_timeout_ms ({}) to ensure timely dead node detection",
1700                self.cleanup_interval_ms, self.node_timeout_ms
1701            ));
1702        }
1703
1704        Ok(())
1705    }
1706}
1707
1708impl ServerOptions {
1709    pub async fn load_from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
1710        let content = tokio::fs::read_to_string(path).await?;
1711        let options: Self = sonic_rs::from_str(&content)?;
1712        Ok(options)
1713    }
1714
1715    pub async fn override_from_env(&mut self) -> Result<(), Box<dyn std::error::Error>> {
1716        // --- General Settings ---
1717        if let Ok(mode) = std::env::var("ENVIRONMENT") {
1718            self.mode = mode;
1719        }
1720        self.debug = parse_bool_env("DEBUG_MODE", self.debug);
1721        if parse_bool_env("DEBUG", false) {
1722            self.debug = true;
1723            info!("DEBUG environment variable forces debug mode ON");
1724        }
1725
1726        self.activity_timeout = parse_env::<u64>("ACTIVITY_TIMEOUT", self.activity_timeout);
1727
1728        if let Ok(host) = std::env::var("HOST") {
1729            self.host = host;
1730        }
1731        self.port = parse_env::<u16>("PORT", self.port);
1732        self.shutdown_grace_period =
1733            parse_env::<u64>("SHUTDOWN_GRACE_PERIOD", self.shutdown_grace_period);
1734        self.user_authentication_timeout = parse_env::<u64>(
1735            "USER_AUTHENTICATION_TIMEOUT",
1736            self.user_authentication_timeout,
1737        );
1738        self.websocket_max_payload_kb =
1739            parse_env::<u32>("WEBSOCKET_MAX_PAYLOAD_KB", self.websocket_max_payload_kb);
1740        if let Ok(id) = std::env::var("INSTANCE_PROCESS_ID") {
1741            self.instance.process_id = id;
1742        }
1743
1744        // --- Driver Configuration ---
1745        if let Ok(driver_str) = std::env::var("ADAPTER_DRIVER") {
1746            self.adapter.driver =
1747                parse_driver_enum(driver_str, self.adapter.driver.clone(), "Adapter");
1748        }
1749        self.adapter.buffer_multiplier_per_cpu = parse_env::<usize>(
1750            "ADAPTER_BUFFER_MULTIPLIER_PER_CPU",
1751            self.adapter.buffer_multiplier_per_cpu,
1752        );
1753        self.adapter.enable_socket_counting = parse_env::<bool>(
1754            "ADAPTER_ENABLE_SOCKET_COUNTING",
1755            self.adapter.enable_socket_counting,
1756        );
1757        if let Ok(driver_str) = std::env::var("CACHE_DRIVER") {
1758            self.cache.driver = parse_driver_enum(driver_str, self.cache.driver.clone(), "Cache");
1759        }
1760        if let Ok(driver_str) = std::env::var("QUEUE_DRIVER") {
1761            self.queue.driver = parse_driver_enum(driver_str, self.queue.driver.clone(), "Queue");
1762        }
1763        if let Ok(driver_str) = std::env::var("APP_MANAGER_DRIVER") {
1764            self.app_manager.driver =
1765                parse_driver_enum(driver_str, self.app_manager.driver.clone(), "AppManager");
1766        }
1767        if let Ok(driver_str) = std::env::var("RATE_LIMITER_DRIVER") {
1768            self.rate_limiter.driver = parse_driver_enum(
1769                driver_str,
1770                self.rate_limiter.driver.clone(),
1771                "RateLimiter Backend",
1772            );
1773        }
1774
1775        // --- Database: Redis ---
1776        if let Ok(host) = std::env::var("DATABASE_REDIS_HOST") {
1777            self.database.redis.host = host;
1778        }
1779        self.database.redis.port =
1780            parse_env::<u16>("DATABASE_REDIS_PORT", self.database.redis.port);
1781        if let Ok(username) = std::env::var("DATABASE_REDIS_USERNAME") {
1782            self.database.redis.username = if username.is_empty() {
1783                None
1784            } else {
1785                Some(username)
1786            };
1787        }
1788        if let Ok(password) = std::env::var("DATABASE_REDIS_PASSWORD") {
1789            self.database.redis.password = Some(password);
1790        }
1791        self.database.redis.db = parse_env::<u32>("DATABASE_REDIS_DB", self.database.redis.db);
1792        if let Ok(prefix) = std::env::var("DATABASE_REDIS_KEY_PREFIX") {
1793            self.database.redis.key_prefix = prefix;
1794        }
1795        if let Ok(cluster_username) = std::env::var("DATABASE_REDIS_CLUSTER_USERNAME") {
1796            self.database.redis.cluster.username = if cluster_username.is_empty() {
1797                None
1798            } else {
1799                Some(cluster_username)
1800            };
1801        }
1802        if let Ok(cluster_password) = std::env::var("DATABASE_REDIS_CLUSTER_PASSWORD") {
1803            self.database.redis.cluster.password = Some(cluster_password);
1804        }
1805        self.database.redis.cluster.use_tls = parse_bool_env(
1806            "DATABASE_REDIS_CLUSTER_USE_TLS",
1807            self.database.redis.cluster.use_tls,
1808        );
1809
1810        // --- Database: MySQL ---
1811        if let Ok(host) = std::env::var("DATABASE_MYSQL_HOST") {
1812            self.database.mysql.host = host;
1813        }
1814        self.database.mysql.port =
1815            parse_env::<u16>("DATABASE_MYSQL_PORT", self.database.mysql.port);
1816        if let Ok(user) = std::env::var("DATABASE_MYSQL_USERNAME") {
1817            self.database.mysql.username = user;
1818        }
1819        if let Ok(pass) = std::env::var("DATABASE_MYSQL_PASSWORD") {
1820            self.database.mysql.password = pass;
1821        }
1822        if let Ok(db) = std::env::var("DATABASE_MYSQL_DATABASE") {
1823            self.database.mysql.database = db;
1824        }
1825        if let Ok(table) = std::env::var("DATABASE_MYSQL_TABLE_NAME") {
1826            self.database.mysql.table_name = table;
1827        }
1828        override_db_pool_settings(&mut self.database.mysql, "DATABASE_MYSQL");
1829
1830        // --- Database: PostgreSQL ---
1831        if let Ok(host) = std::env::var("DATABASE_POSTGRES_HOST") {
1832            self.database.postgres.host = host;
1833        }
1834        self.database.postgres.port =
1835            parse_env::<u16>("DATABASE_POSTGRES_PORT", self.database.postgres.port);
1836        if let Ok(user) = std::env::var("DATABASE_POSTGRES_USERNAME") {
1837            self.database.postgres.username = user;
1838        }
1839        if let Ok(pass) = std::env::var("DATABASE_POSTGRES_PASSWORD") {
1840            self.database.postgres.password = pass;
1841        }
1842        if let Ok(db) = std::env::var("DATABASE_POSTGRES_DATABASE") {
1843            self.database.postgres.database = db;
1844        }
1845        override_db_pool_settings(&mut self.database.postgres, "DATABASE_POSTGRES");
1846
1847        // --- Database: DynamoDB ---
1848        if let Ok(region) = std::env::var("DATABASE_DYNAMODB_REGION") {
1849            self.database.dynamodb.region = region;
1850        }
1851        if let Ok(table) = std::env::var("DATABASE_DYNAMODB_TABLE_NAME") {
1852            self.database.dynamodb.table_name = table;
1853        }
1854        if let Ok(endpoint) = std::env::var("DATABASE_DYNAMODB_ENDPOINT_URL") {
1855            self.database.dynamodb.endpoint_url = Some(endpoint);
1856        }
1857        if let Ok(key_id) = std::env::var("AWS_ACCESS_KEY_ID") {
1858            self.database.dynamodb.aws_access_key_id = Some(key_id);
1859        }
1860        if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") {
1861            self.database.dynamodb.aws_secret_access_key = Some(secret);
1862        }
1863
1864        // --- Redis Cluster ---
1865        let apply_redis_cluster_nodes = |options: &mut Self, nodes: &str| {
1866            let node_list: Vec<String> = nodes
1867                .split(',')
1868                .map(|s| s.trim())
1869                .filter(|s| !s.is_empty())
1870                .map(ToString::to_string)
1871                .collect();
1872
1873            options.adapter.cluster.nodes = node_list.clone();
1874            options.queue.redis_cluster.nodes = node_list.clone();
1875
1876            let parsed_nodes: Vec<ClusterNode> = node_list
1877                .iter()
1878                .filter_map(|seed| ClusterNode::from_seed(seed))
1879                .collect();
1880            options.database.redis.cluster.nodes = parsed_nodes.clone();
1881            options.database.redis.cluster_nodes = parsed_nodes;
1882        };
1883
1884        if let Ok(nodes) = std::env::var("REDIS_CLUSTER_NODES") {
1885            apply_redis_cluster_nodes(self, &nodes);
1886        }
1887        if let Ok(nodes) = std::env::var("DATABASE_REDIS_CLUSTER_NODES") {
1888            apply_redis_cluster_nodes(self, &nodes);
1889        }
1890        self.queue.redis_cluster.concurrency = parse_env::<u32>(
1891            "REDIS_CLUSTER_QUEUE_CONCURRENCY",
1892            self.queue.redis_cluster.concurrency,
1893        );
1894        if let Ok(prefix) = std::env::var("REDIS_CLUSTER_QUEUE_PREFIX") {
1895            self.queue.redis_cluster.prefix = Some(prefix);
1896        }
1897
1898        // --- SSL Configuration ---
1899        self.ssl.enabled = parse_bool_env("SSL_ENABLED", self.ssl.enabled);
1900        if let Ok(val) = std::env::var("SSL_CERT_PATH") {
1901            self.ssl.cert_path = val;
1902        }
1903        if let Ok(val) = std::env::var("SSL_KEY_PATH") {
1904            self.ssl.key_path = val;
1905        }
1906        self.ssl.redirect_http = parse_bool_env("SSL_REDIRECT_HTTP", self.ssl.redirect_http);
1907        if let Some(port) = parse_env_optional::<u16>("SSL_HTTP_PORT") {
1908            self.ssl.http_port = Some(port);
1909        }
1910
1911        // --- Unix Socket Configuration ---
1912        self.unix_socket.enabled = parse_bool_env("UNIX_SOCKET_ENABLED", self.unix_socket.enabled);
1913        if let Ok(path) = std::env::var("UNIX_SOCKET_PATH") {
1914            self.unix_socket.path = path;
1915        }
1916        if let Ok(mode_str) = std::env::var("UNIX_SOCKET_PERMISSION_MODE") {
1917            if mode_str.chars().all(|c| c.is_digit(8)) {
1918                if let Ok(mode) = u32::from_str_radix(&mode_str, 8) {
1919                    if mode <= 0o777 {
1920                        self.unix_socket.permission_mode = mode;
1921                    } else {
1922                        warn!(
1923                            "UNIX_SOCKET_PERMISSION_MODE '{}' exceeds maximum value 777. Using default: {:o}",
1924                            mode_str, self.unix_socket.permission_mode
1925                        );
1926                    }
1927                } else {
1928                    warn!(
1929                        "Failed to parse UNIX_SOCKET_PERMISSION_MODE '{}' as octal. Using default: {:o}",
1930                        mode_str, self.unix_socket.permission_mode
1931                    );
1932                }
1933            } else {
1934                warn!(
1935                    "UNIX_SOCKET_PERMISSION_MODE '{}' must contain only octal digits (0-7). Using default: {:o}",
1936                    mode_str, self.unix_socket.permission_mode
1937                );
1938            }
1939        }
1940
1941        // --- Metrics ---
1942        if let Ok(driver_str) = std::env::var("METRICS_DRIVER") {
1943            self.metrics.driver =
1944                parse_driver_enum(driver_str, self.metrics.driver.clone(), "Metrics");
1945        }
1946        self.metrics.enabled = parse_bool_env("METRICS_ENABLED", self.metrics.enabled);
1947        if let Ok(val) = std::env::var("METRICS_HOST") {
1948            self.metrics.host = val;
1949        }
1950        self.metrics.port = parse_env::<u16>("METRICS_PORT", self.metrics.port);
1951        if let Ok(val) = std::env::var("METRICS_PROMETHEUS_PREFIX") {
1952            self.metrics.prometheus.prefix = val;
1953        }
1954
1955        // --- HTTP API ---
1956        self.http_api.usage_enabled =
1957            parse_bool_env("HTTP_API_USAGE_ENABLED", self.http_api.usage_enabled);
1958
1959        // --- Rate Limiter ---
1960        self.rate_limiter.enabled =
1961            parse_bool_env("RATE_LIMITER_ENABLED", self.rate_limiter.enabled);
1962        self.rate_limiter.api_rate_limit.max_requests = parse_env::<u32>(
1963            "RATE_LIMITER_API_MAX_REQUESTS",
1964            self.rate_limiter.api_rate_limit.max_requests,
1965        );
1966        self.rate_limiter.api_rate_limit.window_seconds = parse_env::<u64>(
1967            "RATE_LIMITER_API_WINDOW_SECONDS",
1968            self.rate_limiter.api_rate_limit.window_seconds,
1969        );
1970        if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_API_TRUST_HOPS") {
1971            self.rate_limiter.api_rate_limit.trust_hops = Some(hops);
1972        }
1973        self.rate_limiter.websocket_rate_limit.max_requests = parse_env::<u32>(
1974            "RATE_LIMITER_WS_MAX_REQUESTS",
1975            self.rate_limiter.websocket_rate_limit.max_requests,
1976        );
1977        self.rate_limiter.websocket_rate_limit.window_seconds = parse_env::<u64>(
1978            "RATE_LIMITER_WS_WINDOW_SECONDS",
1979            self.rate_limiter.websocket_rate_limit.window_seconds,
1980        );
1981        if let Ok(prefix) = std::env::var("RATE_LIMITER_REDIS_PREFIX") {
1982            self.rate_limiter.redis.prefix = Some(prefix);
1983        }
1984
1985        // --- Queue: Redis ---
1986        self.queue.redis.concurrency =
1987            parse_env::<u32>("QUEUE_REDIS_CONCURRENCY", self.queue.redis.concurrency);
1988        if let Ok(prefix) = std::env::var("QUEUE_REDIS_PREFIX") {
1989            self.queue.redis.prefix = Some(prefix);
1990        }
1991
1992        // --- Queue: SQS ---
1993        if let Ok(region) = std::env::var("QUEUE_SQS_REGION") {
1994            self.queue.sqs.region = region;
1995        }
1996        self.queue.sqs.visibility_timeout = parse_env::<i32>(
1997            "QUEUE_SQS_VISIBILITY_TIMEOUT",
1998            self.queue.sqs.visibility_timeout,
1999        );
2000        self.queue.sqs.max_messages =
2001            parse_env::<i32>("QUEUE_SQS_MAX_MESSAGES", self.queue.sqs.max_messages);
2002        self.queue.sqs.wait_time_seconds = parse_env::<i32>(
2003            "QUEUE_SQS_WAIT_TIME_SECONDS",
2004            self.queue.sqs.wait_time_seconds,
2005        );
2006        self.queue.sqs.concurrency =
2007            parse_env::<u32>("QUEUE_SQS_CONCURRENCY", self.queue.sqs.concurrency);
2008        self.queue.sqs.fifo = parse_bool_env("QUEUE_SQS_FIFO", self.queue.sqs.fifo);
2009        if let Ok(endpoint) = std::env::var("QUEUE_SQS_ENDPOINT_URL") {
2010            self.queue.sqs.endpoint_url = Some(endpoint);
2011        }
2012
2013        // --- Queue: SNS ---
2014        if let Ok(region) = std::env::var("QUEUE_SNS_REGION") {
2015            self.queue.sns.region = region;
2016        }
2017        if let Ok(topic_arn) = std::env::var("QUEUE_SNS_TOPIC_ARN") {
2018            self.queue.sns.topic_arn = topic_arn;
2019        }
2020        if let Ok(endpoint) = std::env::var("QUEUE_SNS_ENDPOINT_URL") {
2021            self.queue.sns.endpoint_url = Some(endpoint);
2022        }
2023
2024        // --- Webhooks ---
2025        self.webhooks.batching.enabled =
2026            parse_bool_env("WEBHOOK_BATCHING_ENABLED", self.webhooks.batching.enabled);
2027        self.webhooks.batching.duration =
2028            parse_env::<u64>("WEBHOOK_BATCHING_DURATION", self.webhooks.batching.duration);
2029        self.webhooks.batching.size =
2030            parse_env::<usize>("WEBHOOK_BATCHING_SIZE", self.webhooks.batching.size);
2031
2032        // --- NATS Adapter ---
2033        if let Ok(servers) = std::env::var("NATS_SERVERS") {
2034            self.adapter.nats.servers = servers.split(',').map(|s| s.trim().to_string()).collect();
2035        }
2036        if let Ok(user) = std::env::var("NATS_USERNAME") {
2037            self.adapter.nats.username = Some(user);
2038        }
2039        if let Ok(pass) = std::env::var("NATS_PASSWORD") {
2040            self.adapter.nats.password = Some(pass);
2041        }
2042        if let Ok(token) = std::env::var("NATS_TOKEN") {
2043            self.adapter.nats.token = Some(token);
2044        }
2045        self.adapter.nats.connection_timeout_ms = parse_env::<u64>(
2046            "NATS_CONNECTION_TIMEOUT_MS",
2047            self.adapter.nats.connection_timeout_ms,
2048        );
2049        self.adapter.nats.request_timeout_ms = parse_env::<u64>(
2050            "NATS_REQUEST_TIMEOUT_MS",
2051            self.adapter.nats.request_timeout_ms,
2052        );
2053
2054        // --- CORS ---
2055        if let Ok(origins) = std::env::var("CORS_ORIGINS") {
2056            let parsed: Vec<String> = origins.split(',').map(|s| s.trim().to_string()).collect();
2057            if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&parsed) {
2058                warn!(
2059                    "CORS_ORIGINS contains invalid patterns: {}. Keeping previous CORS origins.",
2060                    e
2061                );
2062            } else {
2063                self.cors.origin = parsed;
2064            }
2065        }
2066        if let Ok(methods) = std::env::var("CORS_METHODS") {
2067            self.cors.methods = methods.split(',').map(|s| s.trim().to_string()).collect();
2068        }
2069        if let Ok(headers) = std::env::var("CORS_HEADERS") {
2070            self.cors.allowed_headers = headers.split(',').map(|s| s.trim().to_string()).collect();
2071        }
2072        self.cors.credentials = parse_bool_env("CORS_CREDENTIALS", self.cors.credentials);
2073
2074        // --- Performance Tuning ---
2075        self.database_pooling.enabled =
2076            parse_bool_env("DATABASE_POOLING_ENABLED", self.database_pooling.enabled);
2077        if let Some(min) = parse_env_optional::<u32>("DATABASE_POOL_MIN") {
2078            self.database_pooling.min = min;
2079        }
2080        if let Some(max) = parse_env_optional::<u32>("DATABASE_POOL_MAX") {
2081            self.database_pooling.max = max;
2082        }
2083
2084        if let Some(pool_size) = parse_env_optional::<u32>("DATABASE_CONNECTION_POOL_SIZE") {
2085            self.database.mysql.connection_pool_size = pool_size;
2086            self.database.postgres.connection_pool_size = pool_size;
2087        }
2088        if let Some(cache_ttl) = parse_env_optional::<u64>("CACHE_TTL_SECONDS") {
2089            self.app_manager.cache.ttl = cache_ttl;
2090            self.channel_limits.cache_ttl = cache_ttl;
2091            self.database.mysql.cache_ttl = cache_ttl;
2092            self.database.postgres.cache_ttl = cache_ttl;
2093            self.cache.memory.ttl = cache_ttl;
2094        }
2095        if let Some(cleanup_interval) = parse_env_optional::<u64>("CACHE_CLEANUP_INTERVAL") {
2096            self.database.mysql.cache_cleanup_interval = cleanup_interval;
2097            self.database.postgres.cache_cleanup_interval = cleanup_interval;
2098            self.cache.memory.cleanup_interval = cleanup_interval;
2099        }
2100        if let Some(max_capacity) = parse_env_optional::<u64>("CACHE_MAX_CAPACITY") {
2101            self.database.mysql.cache_max_capacity = max_capacity;
2102            self.database.postgres.cache_max_capacity = max_capacity;
2103            self.cache.memory.max_capacity = max_capacity;
2104        }
2105
2106        let default_app_id = std::env::var("SOCKUDO_DEFAULT_APP_ID");
2107        let default_app_key = std::env::var("SOCKUDO_DEFAULT_APP_KEY");
2108        let default_app_secret = std::env::var("SOCKUDO_DEFAULT_APP_SECRET");
2109        let default_app_enabled = parse_bool_env("SOCKUDO_DEFAULT_APP_ENABLED", true);
2110
2111        if let (Ok(app_id), Ok(app_key), Ok(app_secret)) =
2112            (default_app_id, default_app_key, default_app_secret)
2113            && default_app_enabled
2114        {
2115            let default_app = App {
2116                id: app_id,
2117                key: app_key,
2118                secret: app_secret,
2119                enable_client_messages: parse_bool_env("SOCKUDO_ENABLE_CLIENT_MESSAGES", false),
2120                enabled: default_app_enabled,
2121                max_connections: parse_env::<u32>("SOCKUDO_DEFAULT_APP_MAX_CONNECTIONS", 100),
2122                max_client_events_per_second: parse_env::<u32>(
2123                    "SOCKUDO_DEFAULT_APP_MAX_CLIENT_EVENTS_PER_SECOND",
2124                    100,
2125                ),
2126                max_read_requests_per_second: Some(parse_env::<u32>(
2127                    "SOCKUDO_DEFAULT_APP_MAX_READ_REQUESTS_PER_SECOND",
2128                    100,
2129                )),
2130                max_presence_members_per_channel: Some(parse_env::<u32>(
2131                    "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBERS_PER_CHANNEL",
2132                    100,
2133                )),
2134                max_presence_member_size_in_kb: Some(parse_env::<u32>(
2135                    "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBER_SIZE_IN_KB",
2136                    100,
2137                )),
2138                max_channel_name_length: Some(parse_env::<u32>(
2139                    "SOCKUDO_DEFAULT_APP_MAX_CHANNEL_NAME_LENGTH",
2140                    100,
2141                )),
2142                max_event_channels_at_once: Some(parse_env::<u32>(
2143                    "SOCKUDO_DEFAULT_APP_MAX_EVENT_CHANNELS_AT_ONCE",
2144                    100,
2145                )),
2146                max_event_name_length: Some(parse_env::<u32>(
2147                    "SOCKUDO_DEFAULT_APP_MAX_EVENT_NAME_LENGTH",
2148                    100,
2149                )),
2150                max_event_payload_in_kb: Some(parse_env::<u32>(
2151                    "SOCKUDO_DEFAULT_APP_MAX_EVENT_PAYLOAD_IN_KB",
2152                    100,
2153                )),
2154                max_event_batch_size: Some(parse_env::<u32>(
2155                    "SOCKUDO_DEFAULT_APP_MAX_EVENT_BATCH_SIZE",
2156                    100,
2157                )),
2158                enable_user_authentication: Some(parse_bool_env(
2159                    "SOCKUDO_DEFAULT_APP_ENABLE_USER_AUTHENTICATION",
2160                    false,
2161                )),
2162                webhooks: None,
2163                max_backend_events_per_second: Some(parse_env::<u32>(
2164                    "SOCKUDO_DEFAULT_APP_MAX_BACKEND_EVENTS_PER_SECOND",
2165                    100,
2166                )),
2167                enable_watchlist_events: Some(parse_bool_env(
2168                    "SOCKUDO_DEFAULT_APP_ENABLE_WATCHLIST_EVENTS",
2169                    false,
2170                )),
2171                allowed_origins: {
2172                    if let Ok(origins_str) = std::env::var("SOCKUDO_DEFAULT_APP_ALLOWED_ORIGINS") {
2173                        if !origins_str.is_empty() {
2174                            Some(
2175                                origins_str
2176                                    .split(',')
2177                                    .map(|s| s.trim().to_string())
2178                                    .collect(),
2179                            )
2180                        } else {
2181                            None
2182                        }
2183                    } else {
2184                        None
2185                    }
2186                },
2187                channel_delta_compression: None,
2188            };
2189
2190            self.app_manager.array.apps.push(default_app);
2191            info!("Successfully registered default app from env");
2192        }
2193
2194        // Special handling for REDIS_URL
2195        if let Ok(redis_url_env) = std::env::var("REDIS_URL") {
2196            info!("Applying REDIS_URL environment variable override");
2197
2198            let redis_url_json = sonic_rs::json!(redis_url_env);
2199
2200            self.adapter
2201                .redis
2202                .redis_pub_options
2203                .insert("url".to_string(), redis_url_json.clone());
2204            self.adapter
2205                .redis
2206                .redis_sub_options
2207                .insert("url".to_string(), redis_url_json);
2208
2209            self.cache.redis.url_override = Some(redis_url_env.clone());
2210            self.queue.redis.url_override = Some(redis_url_env.clone());
2211            self.rate_limiter.redis.url_override = Some(redis_url_env);
2212        }
2213
2214        // --- Logging Configuration ---
2215        let has_colors_env = std::env::var("LOG_COLORS_ENABLED").is_ok();
2216        let has_target_env = std::env::var("LOG_INCLUDE_TARGET").is_ok();
2217        if has_colors_env || has_target_env {
2218            let logging_config = self.logging.get_or_insert_with(Default::default);
2219            if has_colors_env {
2220                logging_config.colors_enabled =
2221                    parse_bool_env("LOG_COLORS_ENABLED", logging_config.colors_enabled);
2222            }
2223            if has_target_env {
2224                logging_config.include_target =
2225                    parse_bool_env("LOG_INCLUDE_TARGET", logging_config.include_target);
2226            }
2227        }
2228
2229        // --- Cleanup Configuration ---
2230        self.cleanup.async_enabled =
2231            parse_bool_env("CLEANUP_ASYNC_ENABLED", self.cleanup.async_enabled);
2232        self.cleanup.fallback_to_sync =
2233            parse_bool_env("CLEANUP_FALLBACK_TO_SYNC", self.cleanup.fallback_to_sync);
2234        self.cleanup.queue_buffer_size =
2235            parse_env::<usize>("CLEANUP_QUEUE_BUFFER_SIZE", self.cleanup.queue_buffer_size);
2236        self.cleanup.batch_size = parse_env::<usize>("CLEANUP_BATCH_SIZE", self.cleanup.batch_size);
2237        self.cleanup.batch_timeout_ms =
2238            parse_env::<u64>("CLEANUP_BATCH_TIMEOUT_MS", self.cleanup.batch_timeout_ms);
2239        if let Ok(worker_threads_str) = std::env::var("CLEANUP_WORKER_THREADS") {
2240            self.cleanup.worker_threads = if worker_threads_str.to_lowercase() == "auto" {
2241                WorkerThreadsConfig::Auto
2242            } else if let Ok(n) = worker_threads_str.parse::<usize>() {
2243                WorkerThreadsConfig::Fixed(n)
2244            } else {
2245                warn!(
2246                    "Invalid CLEANUP_WORKER_THREADS value '{}', keeping current setting",
2247                    worker_threads_str
2248                );
2249                self.cleanup.worker_threads.clone()
2250            };
2251        }
2252        self.cleanup.max_retry_attempts = parse_env::<u32>(
2253            "CLEANUP_MAX_RETRY_ATTEMPTS",
2254            self.cleanup.max_retry_attempts,
2255        );
2256
2257        // Cluster health configuration
2258        self.cluster_health.enabled =
2259            parse_bool_env("CLUSTER_HEALTH_ENABLED", self.cluster_health.enabled);
2260        self.cluster_health.heartbeat_interval_ms = parse_env::<u64>(
2261            "CLUSTER_HEALTH_HEARTBEAT_INTERVAL",
2262            self.cluster_health.heartbeat_interval_ms,
2263        );
2264        self.cluster_health.node_timeout_ms = parse_env::<u64>(
2265            "CLUSTER_HEALTH_NODE_TIMEOUT",
2266            self.cluster_health.node_timeout_ms,
2267        );
2268        self.cluster_health.cleanup_interval_ms = parse_env::<u64>(
2269            "CLUSTER_HEALTH_CLEANUP_INTERVAL",
2270            self.cluster_health.cleanup_interval_ms,
2271        );
2272
2273        // Tag filtering configuration
2274        self.tag_filtering.enabled =
2275            parse_bool_env("TAG_FILTERING_ENABLED", self.tag_filtering.enabled);
2276
2277        // WebSocket buffer configuration
2278        if let Ok(val) = std::env::var("WEBSOCKET_MAX_MESSAGES") {
2279            if val.to_lowercase() == "none" || val == "0" {
2280                self.websocket.max_messages = None;
2281            } else if let Ok(n) = val.parse::<usize>() {
2282                self.websocket.max_messages = Some(n);
2283            }
2284        }
2285        if let Ok(val) = std::env::var("WEBSOCKET_MAX_BYTES") {
2286            if val.to_lowercase() == "none" || val == "0" {
2287                self.websocket.max_bytes = None;
2288            } else if let Ok(n) = val.parse::<usize>() {
2289                self.websocket.max_bytes = Some(n);
2290            }
2291        }
2292        self.websocket.disconnect_on_buffer_full = parse_bool_env(
2293            "WEBSOCKET_DISCONNECT_ON_BUFFER_FULL",
2294            self.websocket.disconnect_on_buffer_full,
2295        );
2296        self.websocket.max_message_size = parse_env::<usize>(
2297            "WEBSOCKET_MAX_MESSAGE_SIZE",
2298            self.websocket.max_message_size,
2299        );
2300        self.websocket.max_frame_size =
2301            parse_env::<usize>("WEBSOCKET_MAX_FRAME_SIZE", self.websocket.max_frame_size);
2302        self.websocket.write_buffer_size = parse_env::<usize>(
2303            "WEBSOCKET_WRITE_BUFFER_SIZE",
2304            self.websocket.write_buffer_size,
2305        );
2306        self.websocket.max_backpressure = parse_env::<usize>(
2307            "WEBSOCKET_MAX_BACKPRESSURE",
2308            self.websocket.max_backpressure,
2309        );
2310        self.websocket.auto_ping = parse_bool_env("WEBSOCKET_AUTO_PING", self.websocket.auto_ping);
2311        self.websocket.ping_interval =
2312            parse_env::<u32>("WEBSOCKET_PING_INTERVAL", self.websocket.ping_interval);
2313        self.websocket.idle_timeout =
2314            parse_env::<u32>("WEBSOCKET_IDLE_TIMEOUT", self.websocket.idle_timeout);
2315        if let Ok(mode) = std::env::var("WEBSOCKET_COMPRESSION") {
2316            self.websocket.compression = mode;
2317        }
2318
2319        Ok(())
2320    }
2321
2322    pub fn validate(&self) -> Result<(), String> {
2323        if self.unix_socket.enabled {
2324            if self.unix_socket.path.is_empty() {
2325                return Err(
2326                    "Unix socket path cannot be empty when Unix socket is enabled".to_string(),
2327                );
2328            }
2329
2330            self.validate_unix_socket_security()?;
2331
2332            if self.ssl.enabled {
2333                tracing::warn!(
2334                    "Both Unix socket and SSL are enabled. This is unusual as Unix sockets are typically used behind reverse proxies that handle SSL termination."
2335                );
2336            }
2337
2338            if self.unix_socket.permission_mode > 0o777 {
2339                return Err(format!(
2340                    "Unix socket permission_mode ({:o}) is invalid. Must be a valid octal mode (0o000 to 0o777)",
2341                    self.unix_socket.permission_mode
2342                ));
2343            }
2344        }
2345
2346        if let Err(e) = self.cleanup.validate() {
2347            return Err(format!("Invalid cleanup configuration: {}", e));
2348        }
2349
2350        Ok(())
2351    }
2352
2353    fn validate_unix_socket_security(&self) -> Result<(), String> {
2354        let path = &self.unix_socket.path;
2355
2356        if path.contains("../") || path.contains("..\\") {
2357            return Err(
2358                "Unix socket path contains directory traversal sequences (../). This is not allowed for security reasons.".to_string()
2359            );
2360        }
2361
2362        if self.unix_socket.permission_mode & 0o002 != 0 {
2363            warn!(
2364                "Unix socket permission mode ({:o}) allows world write access. This may be a security risk. Consider using more restrictive permissions like 0o660 or 0o750.",
2365                self.unix_socket.permission_mode
2366            );
2367        }
2368
2369        if self.unix_socket.permission_mode & 0o007 > 0o005 {
2370            warn!(
2371                "Unix socket permission mode ({:o}) grants write permissions to others. Consider using more restrictive permissions.",
2372                self.unix_socket.permission_mode
2373            );
2374        }
2375
2376        if self.mode == "production" && path.starts_with("/tmp/") {
2377            warn!(
2378                "Unix socket path '{}' is in /tmp directory. In production, consider using a more permanent location like /var/run/sockudo/ for better security and persistence.",
2379                path
2380            );
2381        }
2382
2383        if !path.starts_with('/') {
2384            return Err(
2385                "Unix socket path must be absolute (start with /) for security and reliability."
2386                    .to_string(),
2387            );
2388        }
2389
2390        Ok(())
2391    }
2392}
2393
2394#[cfg(test)]
2395mod redis_connection_tests {
2396    use super::{ClusterNode, RedisClusterConnection, RedisConnection, RedisSentinel};
2397
2398    #[test]
2399    fn test_standard_url_basic() {
2400        let conn = RedisConnection {
2401            host: "127.0.0.1".to_string(),
2402            port: 6379,
2403            db: 0,
2404            username: None,
2405            password: None,
2406            key_prefix: "sockudo:".to_string(),
2407            sentinels: Vec::new(),
2408            sentinel_password: None,
2409            name: "mymaster".to_string(),
2410            cluster: RedisClusterConnection::default(),
2411            cluster_nodes: Vec::new(),
2412        };
2413        assert_eq!(conn.to_url(), "redis://127.0.0.1:6379/0");
2414    }
2415
2416    #[test]
2417    fn test_standard_url_with_password() {
2418        let conn = RedisConnection {
2419            host: "127.0.0.1".to_string(),
2420            port: 6379,
2421            db: 2,
2422            username: None,
2423            password: Some("secret".to_string()),
2424            key_prefix: "sockudo:".to_string(),
2425            sentinels: Vec::new(),
2426            sentinel_password: None,
2427            name: "mymaster".to_string(),
2428            cluster: RedisClusterConnection::default(),
2429            cluster_nodes: Vec::new(),
2430        };
2431        assert_eq!(conn.to_url(), "redis://:secret@127.0.0.1:6379/2");
2432    }
2433
2434    #[test]
2435    fn test_standard_url_with_username_and_password() {
2436        let conn = RedisConnection {
2437            host: "redis.example.com".to_string(),
2438            port: 6380,
2439            db: 1,
2440            username: Some("admin".to_string()),
2441            password: Some("pass123".to_string()),
2442            key_prefix: "sockudo:".to_string(),
2443            sentinels: Vec::new(),
2444            sentinel_password: None,
2445            name: "mymaster".to_string(),
2446            cluster: RedisClusterConnection::default(),
2447            cluster_nodes: Vec::new(),
2448        };
2449        assert_eq!(
2450            conn.to_url(),
2451            "redis://admin:pass123@redis.example.com:6380/1"
2452        );
2453    }
2454
2455    #[test]
2456    fn test_standard_url_with_special_chars_in_password() {
2457        let conn = RedisConnection {
2458            host: "127.0.0.1".to_string(),
2459            port: 6379,
2460            db: 0,
2461            username: None,
2462            password: Some("pass@word#123".to_string()),
2463            key_prefix: "sockudo:".to_string(),
2464            sentinels: Vec::new(),
2465            sentinel_password: None,
2466            name: "mymaster".to_string(),
2467            cluster: RedisClusterConnection::default(),
2468            cluster_nodes: Vec::new(),
2469        };
2470        assert_eq!(conn.to_url(), "redis://:pass%40word%23123@127.0.0.1:6379/0");
2471    }
2472
2473    #[test]
2474    fn test_is_sentinel_configured_false() {
2475        let conn = RedisConnection::default();
2476        assert!(!conn.is_sentinel_configured());
2477    }
2478
2479    #[test]
2480    fn test_is_sentinel_configured_true() {
2481        let conn = RedisConnection {
2482            sentinels: vec![RedisSentinel {
2483                host: "sentinel1".to_string(),
2484                port: 26379,
2485            }],
2486            ..Default::default()
2487        };
2488        assert!(conn.is_sentinel_configured());
2489    }
2490
2491    #[test]
2492    fn test_sentinel_url_basic() {
2493        let conn = RedisConnection {
2494            host: "127.0.0.1".to_string(),
2495            port: 6379,
2496            db: 0,
2497            username: None,
2498            password: None,
2499            key_prefix: "sockudo:".to_string(),
2500            sentinels: vec![
2501                RedisSentinel {
2502                    host: "sentinel1".to_string(),
2503                    port: 26379,
2504                },
2505                RedisSentinel {
2506                    host: "sentinel2".to_string(),
2507                    port: 26379,
2508                },
2509            ],
2510            sentinel_password: None,
2511            name: "mymaster".to_string(),
2512            cluster: RedisClusterConnection::default(),
2513            cluster_nodes: Vec::new(),
2514        };
2515        assert_eq!(
2516            conn.to_url(),
2517            "redis+sentinel://sentinel1:26379,sentinel2:26379/mymaster/0"
2518        );
2519    }
2520
2521    #[test]
2522    fn test_sentinel_url_with_sentinel_password() {
2523        let conn = RedisConnection {
2524            host: "127.0.0.1".to_string(),
2525            port: 6379,
2526            db: 0,
2527            username: None,
2528            password: None,
2529            key_prefix: "sockudo:".to_string(),
2530            sentinels: vec![RedisSentinel {
2531                host: "sentinel1".to_string(),
2532                port: 26379,
2533            }],
2534            sentinel_password: Some("sentinelpass".to_string()),
2535            name: "mymaster".to_string(),
2536            cluster: RedisClusterConnection::default(),
2537            cluster_nodes: Vec::new(),
2538        };
2539        assert_eq!(
2540            conn.to_url(),
2541            "redis+sentinel://:sentinelpass@sentinel1:26379/mymaster/0"
2542        );
2543    }
2544
2545    #[test]
2546    fn test_sentinel_url_with_master_password() {
2547        let conn = RedisConnection {
2548            host: "127.0.0.1".to_string(),
2549            port: 6379,
2550            db: 1,
2551            username: None,
2552            password: Some("masterpass".to_string()),
2553            key_prefix: "sockudo:".to_string(),
2554            sentinels: vec![RedisSentinel {
2555                host: "sentinel1".to_string(),
2556                port: 26379,
2557            }],
2558            sentinel_password: None,
2559            name: "mymaster".to_string(),
2560            cluster: RedisClusterConnection::default(),
2561            cluster_nodes: Vec::new(),
2562        };
2563        assert_eq!(
2564            conn.to_url(),
2565            "redis+sentinel://sentinel1:26379/mymaster/1?password=masterpass"
2566        );
2567    }
2568
2569    #[test]
2570    fn test_sentinel_url_with_all_auth() {
2571        let conn = RedisConnection {
2572            host: "127.0.0.1".to_string(),
2573            port: 6379,
2574            db: 2,
2575            username: Some("redisuser".to_string()),
2576            password: Some("redispass".to_string()),
2577            key_prefix: "sockudo:".to_string(),
2578            sentinels: vec![
2579                RedisSentinel {
2580                    host: "sentinel1".to_string(),
2581                    port: 26379,
2582                },
2583                RedisSentinel {
2584                    host: "sentinel2".to_string(),
2585                    port: 26380,
2586                },
2587            ],
2588            sentinel_password: Some("sentinelauth".to_string()),
2589            name: "production-master".to_string(),
2590            cluster: RedisClusterConnection::default(),
2591            cluster_nodes: Vec::new(),
2592        };
2593        assert_eq!(
2594            conn.to_url(),
2595            "redis+sentinel://:sentinelauth@sentinel1:26379,sentinel2:26380/production-master/2?password=redispass&username=redisuser"
2596        );
2597    }
2598
2599    #[test]
2600    fn test_sentinel_to_host_port() {
2601        let sentinel = RedisSentinel {
2602            host: "sentinel.example.com".to_string(),
2603            port: 26379,
2604        };
2605        assert_eq!(sentinel.to_host_port(), "sentinel.example.com:26379");
2606    }
2607
2608    #[test]
2609    fn test_cluster_node_urls_with_shared_cluster_auth_and_tls() {
2610        let conn = RedisConnection {
2611            cluster: RedisClusterConnection {
2612                nodes: vec![
2613                    ClusterNode {
2614                        host: "node1.secure-cluster.com".to_string(),
2615                        port: 7000,
2616                    },
2617                    ClusterNode {
2618                        host: "redis://node2.secure-cluster.com:7001".to_string(),
2619                        port: 7001,
2620                    },
2621                    ClusterNode {
2622                        host: "rediss://node3.secure-cluster.com".to_string(),
2623                        port: 7002,
2624                    },
2625                ],
2626                username: None,
2627                password: Some("cluster-secret".to_string()),
2628                use_tls: true,
2629            },
2630            ..Default::default()
2631        };
2632
2633        assert_eq!(
2634            conn.cluster_node_urls(),
2635            vec![
2636                "rediss://:cluster-secret@node1.secure-cluster.com:7000",
2637                "redis://:cluster-secret@node2.secure-cluster.com:7001",
2638                "rediss://:cluster-secret@node3.secure-cluster.com:7002",
2639            ]
2640        );
2641    }
2642
2643    #[test]
2644    fn test_cluster_node_urls_fallback_to_legacy_nodes() {
2645        let conn = RedisConnection {
2646            password: Some("fallback-secret".to_string()),
2647            cluster_nodes: vec![ClusterNode {
2648                host: "legacy-node.example.com".to_string(),
2649                port: 7000,
2650            }],
2651            ..Default::default()
2652        };
2653
2654        assert_eq!(
2655            conn.cluster_node_urls(),
2656            vec!["redis://:fallback-secret@legacy-node.example.com:7000"]
2657        );
2658    }
2659
2660    #[test]
2661    fn test_normalize_cluster_seed_urls() {
2662        let conn = RedisConnection {
2663            cluster: RedisClusterConnection {
2664                nodes: Vec::new(),
2665                username: Some("svc-user".to_string()),
2666                password: Some("svc-pass".to_string()),
2667                use_tls: true,
2668            },
2669            ..Default::default()
2670        };
2671
2672        let seeds = vec![
2673            "node1.example.com:7000".to_string(),
2674            "redis://node2.example.com:7001".to_string(),
2675            "rediss://node3.example.com".to_string(),
2676        ];
2677
2678        assert_eq!(
2679            conn.normalize_cluster_seed_urls(&seeds),
2680            vec![
2681                "rediss://svc-user:svc-pass@node1.example.com:7000",
2682                "redis://svc-user:svc-pass@node2.example.com:7001",
2683                "rediss://svc-user:svc-pass@node3.example.com:6379",
2684            ]
2685        );
2686    }
2687}
2688
2689#[cfg(test)]
2690mod cluster_node_tests {
2691    use super::ClusterNode;
2692
2693    #[test]
2694    fn test_to_url_basic_host() {
2695        let node = ClusterNode {
2696            host: "localhost".to_string(),
2697            port: 6379,
2698        };
2699        assert_eq!(node.to_url(), "redis://localhost:6379");
2700    }
2701
2702    #[test]
2703    fn test_to_url_ip_address() {
2704        let node = ClusterNode {
2705            host: "127.0.0.1".to_string(),
2706            port: 6379,
2707        };
2708        assert_eq!(node.to_url(), "redis://127.0.0.1:6379");
2709    }
2710
2711    #[test]
2712    fn test_to_url_with_redis_protocol() {
2713        let node = ClusterNode {
2714            host: "redis://example.com".to_string(),
2715            port: 6379,
2716        };
2717        assert_eq!(node.to_url(), "redis://example.com:6379");
2718    }
2719
2720    #[test]
2721    fn test_to_url_with_rediss_protocol() {
2722        let node = ClusterNode {
2723            host: "rediss://secure.example.com".to_string(),
2724            port: 6379,
2725        };
2726        assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
2727    }
2728
2729    #[test]
2730    fn test_to_url_with_rediss_protocol_and_port_in_url() {
2731        let node = ClusterNode {
2732            host: "rediss://secure.example.com:7000".to_string(),
2733            port: 6379,
2734        };
2735        assert_eq!(node.to_url(), "rediss://secure.example.com:7000");
2736    }
2737
2738    #[test]
2739    fn test_to_url_with_redis_protocol_and_port_in_url() {
2740        let node = ClusterNode {
2741            host: "redis://example.com:7001".to_string(),
2742            port: 6379,
2743        };
2744        assert_eq!(node.to_url(), "redis://example.com:7001");
2745    }
2746
2747    #[test]
2748    fn test_to_url_with_trailing_whitespace() {
2749        let node = ClusterNode {
2750            host: "  rediss://secure.example.com  ".to_string(),
2751            port: 6379,
2752        };
2753        assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
2754    }
2755
2756    #[test]
2757    fn test_to_url_custom_port() {
2758        let node = ClusterNode {
2759            host: "redis-cluster.example.com".to_string(),
2760            port: 7000,
2761        };
2762        assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7000");
2763    }
2764
2765    #[test]
2766    fn test_to_url_plain_host_with_port_in_host_field() {
2767        let node = ClusterNode {
2768            host: "redis-cluster.example.com:7010".to_string(),
2769            port: 7000,
2770        };
2771        assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7010");
2772    }
2773
2774    #[test]
2775    fn test_to_url_with_options_adds_auth_and_tls() {
2776        let node = ClusterNode {
2777            host: "node.example.com".to_string(),
2778            port: 7000,
2779        };
2780        assert_eq!(
2781            node.to_url_with_options(true, Some("svc-user"), Some("secret")),
2782            "rediss://svc-user:secret@node.example.com:7000"
2783        );
2784    }
2785
2786    #[test]
2787    fn test_to_url_with_options_keeps_embedded_auth() {
2788        let node = ClusterNode {
2789            host: "rediss://:node-secret@node.example.com:7000".to_string(),
2790            port: 7000,
2791        };
2792        assert_eq!(
2793            node.to_url_with_options(true, Some("global-user"), Some("global-secret")),
2794            "rediss://:node-secret@node.example.com:7000"
2795        );
2796    }
2797
2798    #[test]
2799    fn test_from_seed_parses_plain_host_port() {
2800        let node = ClusterNode::from_seed("cluster-node-1:7005").expect("node should parse");
2801        assert_eq!(node.host, "cluster-node-1");
2802        assert_eq!(node.port, 7005);
2803    }
2804
2805    #[test]
2806    fn test_from_seed_keeps_scheme_urls() {
2807        let node =
2808            ClusterNode::from_seed("rediss://secure.example.com:7005").expect("node should parse");
2809        assert_eq!(node.host, "rediss://secure.example.com:7005");
2810        assert_eq!(node.port, 7005);
2811    }
2812
2813    #[test]
2814    fn test_to_url_aws_elasticache_hostname() {
2815        let node = ClusterNode {
2816            host: "rediss://my-cluster.use1.cache.amazonaws.com".to_string(),
2817            port: 6379,
2818        };
2819        assert_eq!(
2820            node.to_url(),
2821            "rediss://my-cluster.use1.cache.amazonaws.com:6379"
2822        );
2823    }
2824
2825    #[test]
2826    fn test_to_url_with_ipv6_no_port() {
2827        let node = ClusterNode {
2828            host: "rediss://[::1]".to_string(),
2829            port: 6379,
2830        };
2831        assert_eq!(node.to_url(), "rediss://[::1]:6379");
2832    }
2833
2834    #[test]
2835    fn test_to_url_with_ipv6_and_port_in_url() {
2836        let node = ClusterNode {
2837            host: "rediss://[::1]:7000".to_string(),
2838            port: 6379,
2839        };
2840        assert_eq!(node.to_url(), "rediss://[::1]:7000");
2841    }
2842
2843    #[test]
2844    fn test_to_url_with_ipv6_full_address_no_port() {
2845        let node = ClusterNode {
2846            host: "rediss://[2001:db8::1]".to_string(),
2847            port: 6379,
2848        };
2849        assert_eq!(node.to_url(), "rediss://[2001:db8::1]:6379");
2850    }
2851
2852    #[test]
2853    fn test_to_url_with_ipv6_full_address_with_port() {
2854        let node = ClusterNode {
2855            host: "rediss://[2001:db8::1]:7000".to_string(),
2856            port: 6379,
2857        };
2858        assert_eq!(node.to_url(), "rediss://[2001:db8::1]:7000");
2859    }
2860
2861    #[test]
2862    fn test_to_url_with_redis_protocol_ipv6() {
2863        let node = ClusterNode {
2864            host: "redis://[::1]".to_string(),
2865            port: 6379,
2866        };
2867        assert_eq!(node.to_url(), "redis://[::1]:6379");
2868    }
2869}
2870
2871#[cfg(test)]
2872mod cors_config_tests {
2873    use super::CorsConfig;
2874
2875    fn cors_from_json(json: &str) -> sonic_rs::Result<CorsConfig> {
2876        sonic_rs::from_str(json)
2877    }
2878
2879    #[test]
2880    fn test_deserialize_valid_exact_origins() {
2881        let config =
2882            cors_from_json(r#"{"origin": ["https://example.com", "https://other.com"]}"#).unwrap();
2883        assert_eq!(config.origin.len(), 2);
2884    }
2885
2886    #[test]
2887    fn test_deserialize_valid_wildcard_patterns() {
2888        let config =
2889            cors_from_json(r#"{"origin": ["*.example.com", "https://*.staging.example.com"]}"#)
2890                .unwrap();
2891        assert_eq!(config.origin.len(), 2);
2892    }
2893
2894    #[test]
2895    fn test_deserialize_allows_special_markers() {
2896        assert!(cors_from_json(r#"{"origin": ["*"]}"#).is_ok());
2897        assert!(cors_from_json(r#"{"origin": ["Any"]}"#).is_ok());
2898        assert!(cors_from_json(r#"{"origin": ["any"]}"#).is_ok());
2899        assert!(cors_from_json(r#"{"origin": ["*", "https://example.com"]}"#).is_ok());
2900    }
2901
2902    #[test]
2903    fn test_deserialize_rejects_invalid_patterns() {
2904        assert!(cors_from_json(r#"{"origin": ["*.*bad"]}"#).is_err());
2905        assert!(cors_from_json(r#"{"origin": ["*."]}"#).is_err());
2906        assert!(cors_from_json(r#"{"origin": [""]}"#).is_err());
2907        assert!(cors_from_json(r#"{"origin": ["https://"]}"#).is_err());
2908    }
2909
2910    #[test]
2911    fn test_deserialize_rejects_mixed_valid_and_invalid() {
2912        assert!(cors_from_json(r#"{"origin": ["https://good.com", "*.*bad"]}"#).is_err());
2913    }
2914}