Skip to main content

sockudo_core/
options.rs

1use crate::app::App;
2use crate::utils::{parse_bool_env, parse_env, parse_env_optional};
3use ahash::AHashMap;
4use serde::{Deserialize, Serialize};
5use std::str::FromStr;
6use tracing::{info, warn};
7use url::Url;
8
9// Custom deserializer for octal permission mode (string format only, like chmod)
10fn deserialize_octal_permission<'de, D>(deserializer: D) -> Result<u32, D::Error>
11where
12    D: serde::Deserializer<'de>,
13{
14    use serde::de::{self};
15
16    let s = String::deserialize(deserializer)?;
17
18    // Validate that the string contains only octal digits
19    if !s.chars().all(|c| c.is_digit(8)) {
20        return Err(de::Error::custom(format!(
21            "invalid octal permission mode '{}': must contain only digits 0-7",
22            s
23        )));
24    }
25
26    // Parse as octal
27    let mode = u32::from_str_radix(&s, 8)
28        .map_err(|_| de::Error::custom(format!("invalid octal permission mode: {}", s)))?;
29
30    // Validate it's within valid Unix permission range
31    if mode > 0o777 {
32        return Err(de::Error::custom(format!(
33            "permission mode '{}' exceeds maximum value 777",
34            s
35        )));
36    }
37
38    Ok(mode)
39}
40
41// Helper function to parse driver enums with fallback behavior (matches main.rs)
42fn parse_driver_enum<T: FromStr + Clone + std::fmt::Debug>(
43    driver_str: String,
44    default_driver: T,
45    driver_name: &str,
46) -> T
47where
48    <T as FromStr>::Err: std::fmt::Debug,
49{
50    match T::from_str(&driver_str.to_lowercase()) {
51        Ok(driver_enum) => driver_enum,
52        Err(e) => {
53            warn!(
54                "Failed to parse {} driver from string '{}': {:?}. Using default: {:?}.",
55                driver_name, driver_str, e, default_driver
56            );
57            default_driver
58        }
59    }
60}
61
62fn override_db_pool_settings(db_conn: &mut DatabaseConnection, prefix: &str) {
63    if let Some(min) = parse_env_optional::<u32>(&format!("{}_POOL_MIN", prefix)) {
64        db_conn.pool_min = Some(min);
65    }
66    if let Some(max) = parse_env_optional::<u32>(&format!("{}_POOL_MAX", prefix)) {
67        db_conn.pool_max = Some(max);
68    }
69}
70
71// --- Enums for Driver Types ---
72
73#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(rename_all = "lowercase")]
75pub enum AdapterDriver {
76    #[default]
77    Local,
78    Redis,
79    #[serde(rename = "redis-cluster")]
80    RedisCluster,
81    Nats,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
85#[serde(default)]
86pub struct DynamoDbSettings {
87    pub region: String,
88    pub table_name: String,
89    pub endpoint_url: Option<String>,
90    pub aws_access_key_id: Option<String>,
91    pub aws_secret_access_key: Option<String>,
92    pub aws_profile_name: Option<String>,
93}
94
95impl Default for DynamoDbSettings {
96    fn default() -> Self {
97        Self {
98            region: "us-east-1".to_string(),
99            table_name: "sockudo-applications".to_string(),
100            endpoint_url: None,
101            aws_access_key_id: None,
102            aws_secret_access_key: None,
103            aws_profile_name: None,
104        }
105    }
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
109#[serde(default)]
110pub struct ScyllaDbSettings {
111    pub nodes: Vec<String>,
112    pub keyspace: String,
113    pub table_name: String,
114    pub username: Option<String>,
115    pub password: Option<String>,
116    pub replication_class: String,
117    pub replication_factor: u32,
118}
119
120impl Default for ScyllaDbSettings {
121    fn default() -> Self {
122        Self {
123            nodes: vec!["127.0.0.1:9042".to_string()],
124            keyspace: "sockudo".to_string(),
125            table_name: "applications".to_string(),
126            username: None,
127            password: None,
128            replication_class: "SimpleStrategy".to_string(),
129            replication_factor: 3,
130        }
131    }
132}
133
134impl FromStr for AdapterDriver {
135    type Err = String;
136    fn from_str(s: &str) -> Result<Self, Self::Err> {
137        match s.to_lowercase().as_str() {
138            "local" => Ok(AdapterDriver::Local),
139            "redis" => Ok(AdapterDriver::Redis),
140            "redis-cluster" => Ok(AdapterDriver::RedisCluster),
141            "nats" => Ok(AdapterDriver::Nats),
142            _ => Err(format!("Unknown adapter driver: {s}")),
143        }
144    }
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
148#[serde(rename_all = "lowercase")]
149pub enum AppManagerDriver {
150    #[default]
151    Memory,
152    Mysql,
153    Dynamodb,
154    PgSql,
155    ScyllaDb,
156}
157impl FromStr for AppManagerDriver {
158    type Err = String;
159    fn from_str(s: &str) -> Result<Self, Self::Err> {
160        match s.to_lowercase().as_str() {
161            "memory" => Ok(AppManagerDriver::Memory),
162            "mysql" => Ok(AppManagerDriver::Mysql),
163            "dynamodb" => Ok(AppManagerDriver::Dynamodb),
164            "pgsql" | "postgres" | "postgresql" => Ok(AppManagerDriver::PgSql),
165            "scylladb" | "scylla" => Ok(AppManagerDriver::ScyllaDb),
166            _ => Err(format!("Unknown app manager driver: {s}")),
167        }
168    }
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
172#[serde(rename_all = "lowercase")]
173pub enum CacheDriver {
174    #[default]
175    Memory,
176    Redis,
177    #[serde(rename = "redis-cluster")]
178    RedisCluster,
179    None,
180}
181
182impl FromStr for CacheDriver {
183    type Err = String;
184    fn from_str(s: &str) -> Result<Self, Self::Err> {
185        match s.to_lowercase().as_str() {
186            "memory" => Ok(CacheDriver::Memory),
187            "redis" => Ok(CacheDriver::Redis),
188            "redis-cluster" => Ok(CacheDriver::RedisCluster),
189            "none" => Ok(CacheDriver::None),
190            _ => Err(format!("Unknown cache driver: {s}")),
191        }
192    }
193}
194
195#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
196#[serde(rename_all = "lowercase")]
197pub enum QueueDriver {
198    Memory,
199    #[default]
200    Redis,
201    #[serde(rename = "redis-cluster")]
202    RedisCluster,
203    Sqs,
204    Sns,
205    None,
206}
207
208impl FromStr for QueueDriver {
209    type Err = String;
210    fn from_str(s: &str) -> Result<Self, Self::Err> {
211        match s.to_lowercase().as_str() {
212            "memory" => Ok(QueueDriver::Memory),
213            "redis" => Ok(QueueDriver::Redis),
214            "redis-cluster" => Ok(QueueDriver::RedisCluster),
215            "sqs" => Ok(QueueDriver::Sqs),
216            "sns" => Ok(QueueDriver::Sns),
217            "none" => Ok(QueueDriver::None),
218            _ => Err(format!("Unknown queue driver: {s}")),
219        }
220    }
221}
222
223impl AsRef<str> for QueueDriver {
224    fn as_ref(&self) -> &str {
225        match self {
226            QueueDriver::Memory => "memory",
227            QueueDriver::Redis => "redis",
228            QueueDriver::RedisCluster => "redis-cluster",
229            QueueDriver::Sqs => "sqs",
230            QueueDriver::Sns => "sns",
231            QueueDriver::None => "none",
232        }
233    }
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
237#[serde(default)]
238pub struct RedisClusterQueueConfig {
239    pub concurrency: u32,
240    pub prefix: Option<String>,
241    pub nodes: Vec<String>,
242    pub request_timeout_ms: u64,
243}
244
245impl Default for RedisClusterQueueConfig {
246    fn default() -> Self {
247        Self {
248            concurrency: 5,
249            prefix: Some("sockudo_queue:".to_string()),
250            nodes: vec!["redis://127.0.0.1:6379".to_string()],
251            request_timeout_ms: 5000,
252        }
253    }
254}
255
256#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
257#[serde(rename_all = "lowercase")]
258pub enum MetricsDriver {
259    #[default]
260    Prometheus,
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
264#[serde(rename_all = "lowercase")]
265pub enum LogOutputFormat {
266    #[default]
267    Human,
268    Json,
269}
270
271impl FromStr for LogOutputFormat {
272    type Err = String;
273    fn from_str(s: &str) -> Result<Self, Self::Err> {
274        match s.to_lowercase().as_str() {
275            "human" => Ok(LogOutputFormat::Human),
276            "json" => Ok(LogOutputFormat::Json),
277            _ => Err(format!("Unknown log output format: {s}")),
278        }
279    }
280}
281
282impl FromStr for MetricsDriver {
283    type Err = String;
284    fn from_str(s: &str) -> Result<Self, Self::Err> {
285        match s.to_lowercase().as_str() {
286            "prometheus" => Ok(MetricsDriver::Prometheus),
287            _ => Err(format!("Unknown metrics driver: {s}")),
288        }
289    }
290}
291
292impl AsRef<str> for MetricsDriver {
293    fn as_ref(&self) -> &str {
294        match self {
295            MetricsDriver::Prometheus => "prometheus",
296        }
297    }
298}
299
300// --- Main Configuration Struct ---
301#[derive(Debug, Clone, Serialize, Deserialize)]
302#[serde(default)]
303pub struct ServerOptions {
304    pub adapter: AdapterConfig,
305    pub app_manager: AppManagerConfig,
306    pub cache: CacheConfig,
307    pub channel_limits: ChannelLimits,
308    pub cors: CorsConfig,
309    pub database: DatabaseConfig,
310    pub database_pooling: DatabasePooling,
311    pub debug: bool,
312    pub event_limits: EventLimits,
313    pub host: String,
314    pub http_api: HttpApiConfig,
315    pub instance: InstanceConfig,
316    pub logging: Option<LoggingConfig>,
317    pub metrics: MetricsConfig,
318    pub mode: String,
319    pub port: u16,
320    pub path_prefix: String,
321    pub presence: PresenceConfig,
322    pub queue: QueueConfig,
323    pub rate_limiter: RateLimiterConfig,
324    pub shutdown_grace_period: u64,
325    pub ssl: SslConfig,
326    pub user_authentication_timeout: u64,
327    pub webhooks: WebhooksConfig,
328    pub websocket_max_payload_kb: u32,
329    pub cleanup: CleanupConfig,
330    pub activity_timeout: u64,
331    pub cluster_health: ClusterHealthConfig,
332    pub unix_socket: UnixSocketConfig,
333    pub delta_compression: DeltaCompressionOptionsConfig,
334    pub tag_filtering: TagFilteringConfig,
335    pub websocket: WebSocketConfig,
336}
337
338// --- Configuration Sub-Structs ---
339
340#[derive(Debug, Clone, Serialize, Deserialize)]
341#[serde(default)]
342pub struct SqsQueueConfig {
343    pub region: String,
344    pub queue_url_prefix: Option<String>,
345    pub visibility_timeout: i32,
346    pub endpoint_url: Option<String>,
347    pub max_messages: i32,
348    pub wait_time_seconds: i32,
349    pub concurrency: u32,
350    pub fifo: bool,
351    pub message_group_id: Option<String>,
352}
353
354#[derive(Debug, Clone, Serialize, Deserialize)]
355#[serde(default)]
356pub struct SnsQueueConfig {
357    pub region: String,
358    pub topic_arn: String,
359    pub endpoint_url: Option<String>,
360}
361
362#[derive(Debug, Clone, Serialize, Deserialize)]
363#[serde(default)]
364pub struct AdapterConfig {
365    pub driver: AdapterDriver,
366    pub redis: RedisAdapterConfig,
367    pub cluster: RedisClusterAdapterConfig,
368    pub nats: NatsAdapterConfig,
369    #[serde(default = "default_buffer_multiplier_per_cpu")]
370    pub buffer_multiplier_per_cpu: usize,
371    pub cluster_health: ClusterHealthConfig,
372    #[serde(default = "default_enable_socket_counting")]
373    pub enable_socket_counting: bool,
374}
375
376fn default_enable_socket_counting() -> bool {
377    true
378}
379
380fn default_buffer_multiplier_per_cpu() -> usize {
381    64
382}
383
384impl Default for AdapterConfig {
385    fn default() -> Self {
386        Self {
387            driver: AdapterDriver::default(),
388            redis: RedisAdapterConfig::default(),
389            cluster: RedisClusterAdapterConfig::default(),
390            nats: NatsAdapterConfig::default(),
391            buffer_multiplier_per_cpu: default_buffer_multiplier_per_cpu(),
392            cluster_health: ClusterHealthConfig::default(),
393            enable_socket_counting: default_enable_socket_counting(),
394        }
395    }
396}
397
398#[derive(Debug, Clone, Serialize, Deserialize)]
399#[serde(default)]
400pub struct RedisAdapterConfig {
401    pub requests_timeout: u64,
402    pub prefix: String,
403    pub redis_pub_options: AHashMap<String, sonic_rs::Value>,
404    pub redis_sub_options: AHashMap<String, sonic_rs::Value>,
405    pub cluster_mode: bool,
406}
407
408#[derive(Debug, Clone, Serialize, Deserialize)]
409#[serde(default)]
410pub struct RedisClusterAdapterConfig {
411    pub nodes: Vec<String>,
412    pub prefix: String,
413    pub request_timeout_ms: u64,
414    pub use_connection_manager: bool,
415    #[serde(default)]
416    pub use_sharded_pubsub: bool,
417}
418
419#[derive(Debug, Clone, Serialize, Deserialize)]
420#[serde(default)]
421pub struct NatsAdapterConfig {
422    pub servers: Vec<String>,
423    pub prefix: String,
424    pub request_timeout_ms: u64,
425    pub username: Option<String>,
426    pub password: Option<String>,
427    pub token: Option<String>,
428    pub connection_timeout_ms: u64,
429    pub nodes_number: Option<u32>,
430}
431
432#[derive(Debug, Clone, Serialize, Deserialize, Default)]
433#[serde(default)]
434pub struct AppManagerConfig {
435    pub driver: AppManagerDriver,
436    pub array: ArrayConfig,
437    pub cache: CacheSettings,
438    pub scylladb: ScyllaDbSettings,
439}
440
441#[derive(Debug, Clone, Serialize, Deserialize, Default)]
442#[serde(default)]
443pub struct ArrayConfig {
444    pub apps: Vec<App>,
445}
446
447#[derive(Debug, Clone, Serialize, Deserialize)]
448#[serde(default)]
449pub struct CacheSettings {
450    pub enabled: bool,
451    pub ttl: u64,
452}
453
454#[derive(Debug, Clone, Serialize, Deserialize)]
455#[serde(default)]
456pub struct MemoryCacheOptions {
457    pub ttl: u64,
458    pub cleanup_interval: u64,
459    pub max_capacity: u64,
460}
461
462#[derive(Debug, Clone, Serialize, Deserialize)]
463#[serde(default)]
464pub struct CacheConfig {
465    pub driver: CacheDriver,
466    pub redis: RedisConfig,
467    pub memory: MemoryCacheOptions,
468}
469
470#[derive(Debug, Clone, Serialize, Deserialize, Default)]
471#[serde(default)]
472pub struct RedisConfig {
473    pub prefix: Option<String>,
474    pub url_override: Option<String>,
475    pub cluster_mode: bool,
476}
477
478#[derive(Debug, Clone, Serialize, Deserialize)]
479#[serde(default)]
480pub struct ChannelLimits {
481    pub max_name_length: u32,
482    pub cache_ttl: u64,
483}
484
485#[derive(Debug, Clone, Serialize, Deserialize)]
486#[serde(default)]
487pub struct CorsConfig {
488    pub credentials: bool,
489    pub origin: Vec<String>,
490    pub methods: Vec<String>,
491    pub allowed_headers: Vec<String>,
492}
493
494#[derive(Debug, Clone, Serialize, Deserialize, Default)]
495#[serde(default)]
496pub struct DatabaseConfig {
497    pub mysql: DatabaseConnection,
498    pub postgres: DatabaseConnection,
499    pub redis: RedisConnection,
500    pub dynamodb: DynamoDbSettings,
501    pub scylladb: ScyllaDbSettings,
502}
503
504#[derive(Debug, Clone, Serialize, Deserialize)]
505#[serde(default)]
506pub struct DatabaseConnection {
507    pub host: String,
508    pub port: u16,
509    pub username: String,
510    pub password: String,
511    pub database: String,
512    pub table_name: String,
513    pub connection_pool_size: u32,
514    pub pool_min: Option<u32>,
515    pub pool_max: Option<u32>,
516    pub cache_ttl: u64,
517    pub cache_cleanup_interval: u64,
518    pub cache_max_capacity: u64,
519}
520
521#[derive(Debug, Clone, Serialize, Deserialize)]
522#[serde(default)]
523pub struct RedisConnection {
524    pub host: String,
525    pub port: u16,
526    pub db: u32,
527    pub username: Option<String>,
528    pub password: Option<String>,
529    pub key_prefix: String,
530    pub sentinels: Vec<RedisSentinel>,
531    pub sentinel_password: Option<String>,
532    pub name: String,
533    pub cluster: RedisClusterConnection,
534    /// Legacy field kept for backward compatibility. Prefer `database.redis.cluster.nodes`.
535    pub cluster_nodes: Vec<ClusterNode>,
536}
537
538#[derive(Debug, Clone, Serialize, Deserialize)]
539#[serde(default)]
540pub struct RedisSentinel {
541    pub host: String,
542    pub port: u16,
543}
544
545#[derive(Debug, Clone, Serialize, Deserialize, Default)]
546#[serde(default)]
547pub struct RedisClusterConnection {
548    pub nodes: Vec<ClusterNode>,
549    pub username: Option<String>,
550    pub password: Option<String>,
551    #[serde(alias = "useTLS")]
552    pub use_tls: bool,
553}
554
555impl RedisConnection {
556    /// Returns true if Redis Sentinel is configured.
557    pub fn is_sentinel_configured(&self) -> bool {
558        !self.sentinels.is_empty()
559    }
560
561    /// Builds a Redis connection URL based on the configuration.
562    pub fn to_url(&self) -> String {
563        if self.is_sentinel_configured() {
564            self.build_sentinel_url()
565        } else {
566            self.build_standard_url()
567        }
568    }
569
570    fn build_standard_url(&self) -> String {
571        // Extract scheme from host if present, otherwise default to redis://
572        let (scheme, host) = if self.host.starts_with("rediss://") {
573            ("rediss://", self.host.trim_start_matches("rediss://"))
574        } else if self.host.starts_with("redis://") {
575            ("redis://", self.host.trim_start_matches("redis://"))
576        } else {
577            ("redis://", self.host.as_str())
578        };
579
580        let mut url = String::from(scheme);
581
582        if let Some(ref username) = self.username {
583            url.push_str(username);
584            if let Some(ref password) = self.password {
585                url.push(':');
586                url.push_str(&urlencoding::encode(password));
587            }
588            url.push('@');
589        } else if let Some(ref password) = self.password {
590            url.push(':');
591            url.push_str(&urlencoding::encode(password));
592            url.push('@');
593        }
594
595        url.push_str(host);
596        url.push(':');
597        url.push_str(&self.port.to_string());
598        url.push('/');
599        url.push_str(&self.db.to_string());
600
601        url
602    }
603
604    fn build_sentinel_url(&self) -> String {
605        let mut url = String::from("redis+sentinel://");
606
607        if let Some(ref sentinel_password) = self.sentinel_password {
608            url.push(':');
609            url.push_str(&urlencoding::encode(sentinel_password));
610            url.push('@');
611        }
612
613        let sentinel_hosts: Vec<String> = self
614            .sentinels
615            .iter()
616            .map(|s| format!("{}:{}", s.host, s.port))
617            .collect();
618        url.push_str(&sentinel_hosts.join(","));
619
620        url.push('/');
621        url.push_str(&self.name);
622        url.push('/');
623        url.push_str(&self.db.to_string());
624
625        let mut params = Vec::new();
626        if let Some(ref password) = self.password {
627            params.push(format!("password={}", urlencoding::encode(password)));
628        }
629        if let Some(ref username) = self.username {
630            params.push(format!("username={}", urlencoding::encode(username)));
631        }
632
633        if !params.is_empty() {
634            url.push('?');
635            url.push_str(&params.join("&"));
636        }
637
638        url
639    }
640
641    /// Returns true when cluster nodes are configured via either the new (`cluster.nodes`)
642    /// or legacy (`cluster_nodes`) field.
643    pub fn has_cluster_nodes(&self) -> bool {
644        !self.cluster.nodes.is_empty() || !self.cluster_nodes.is_empty()
645    }
646
647    /// Returns normalized Redis Cluster seed URLs from the canonical cluster configuration.
648    /// Falls back to legacy `cluster_nodes` for backward compatibility.
649    pub fn cluster_node_urls(&self) -> Vec<String> {
650        if !self.cluster.nodes.is_empty() {
651            return self.build_cluster_urls(&self.cluster.nodes);
652        }
653        self.build_cluster_urls(&self.cluster_nodes)
654    }
655
656    /// Normalizes any list of seed strings (`host:port`, `redis://...`, `rediss://...`) using
657    /// shared cluster auth/TLS options.
658    pub fn normalize_cluster_seed_urls(&self, seeds: &[String]) -> Vec<String> {
659        self.build_cluster_urls(
660            &seeds
661                .iter()
662                .filter_map(|seed| ClusterNode::from_seed(seed))
663                .collect::<Vec<ClusterNode>>(),
664        )
665    }
666
667    fn build_cluster_urls(&self, nodes: &[ClusterNode]) -> Vec<String> {
668        let username = self
669            .cluster
670            .username
671            .as_deref()
672            .or(self.username.as_deref());
673        let password = self
674            .cluster
675            .password
676            .as_deref()
677            .or(self.password.as_deref());
678        let use_tls = self.cluster.use_tls;
679
680        nodes
681            .iter()
682            .map(|node| node.to_url_with_options(use_tls, username, password))
683            .collect()
684    }
685}
686
687impl RedisSentinel {
688    pub fn to_host_port(&self) -> String {
689        format!("{}:{}", self.host, self.port)
690    }
691}
692
693#[derive(Debug, Clone, Serialize, Deserialize)]
694#[serde(default)]
695pub struct ClusterNode {
696    pub host: String,
697    pub port: u16,
698}
699
700impl ClusterNode {
701    pub fn to_url(&self) -> String {
702        self.to_url_with_options(false, None, None)
703    }
704
705    pub fn to_url_with_options(
706        &self,
707        use_tls: bool,
708        username: Option<&str>,
709        password: Option<&str>,
710    ) -> String {
711        let host = self.host.trim();
712
713        if host.starts_with("redis://") || host.starts_with("rediss://") {
714            if let Ok(parsed) = Url::parse(host)
715                && let Some(host_str) = parsed.host_str()
716            {
717                let scheme = parsed.scheme();
718                let port = parsed.port_or_known_default().unwrap_or(self.port);
719                let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
720                let parsed_password = parsed.password();
721                let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
722                let (effective_username, effective_password) = if has_embedded_auth {
723                    (parsed_username, parsed_password)
724                } else {
725                    (username, password)
726                };
727
728                return build_redis_url(
729                    scheme,
730                    host_str,
731                    port,
732                    effective_username,
733                    effective_password,
734                );
735            }
736
737            // Fallback for malformed URLs
738            let has_port = if let Some(bracket_pos) = host.rfind(']') {
739                host[bracket_pos..].contains(':')
740            } else {
741                host.split(':').count() >= 3
742            };
743            let base = if has_port {
744                host.to_string()
745            } else {
746                format!("{}:{}", host, self.port)
747            };
748
749            if let Ok(parsed) = Url::parse(&base) {
750                let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
751                let parsed_password = parsed.password();
752                if let Some(host_str) = parsed.host_str() {
753                    let port = parsed.port_or_known_default().unwrap_or(self.port);
754                    let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
755                    let (effective_username, effective_password) = if has_embedded_auth {
756                        (parsed_username, parsed_password)
757                    } else {
758                        (username, password)
759                    };
760                    return build_redis_url(
761                        parsed.scheme(),
762                        host_str,
763                        port,
764                        effective_username,
765                        effective_password,
766                    );
767                }
768            }
769            return base;
770        }
771
772        let (normalized_host, normalized_port) = split_plain_host_and_port(host, self.port);
773        let scheme = if use_tls { "rediss" } else { "redis" };
774        build_redis_url(
775            scheme,
776            &normalized_host,
777            normalized_port,
778            username,
779            password,
780        )
781    }
782
783    pub fn from_seed(seed: &str) -> Option<Self> {
784        let trimmed = seed.trim();
785        if trimmed.is_empty() {
786            return None;
787        }
788
789        if trimmed.starts_with("redis://") || trimmed.starts_with("rediss://") {
790            let port = Url::parse(trimmed)
791                .ok()
792                .and_then(|parsed| parsed.port_or_known_default())
793                .unwrap_or(6379);
794            return Some(Self {
795                host: trimmed.to_string(),
796                port,
797            });
798        }
799
800        let (host, port) = split_plain_host_and_port(trimmed, 6379);
801        Some(Self { host, port })
802    }
803}
804
805fn split_plain_host_and_port(raw_host: &str, default_port: u16) -> (String, u16) {
806    let host = raw_host.trim();
807
808    // Handle bracketed IPv6: [::1]:6379
809    if host.starts_with('[') {
810        if let Some(end_bracket) = host.find(']') {
811            let host_part = host[1..end_bracket].to_string();
812            let remainder = &host[end_bracket + 1..];
813            if let Some(port_str) = remainder.strip_prefix(':')
814                && let Ok(port) = port_str.parse::<u16>()
815            {
816                return (host_part, port);
817            }
818            return (host_part, default_port);
819        }
820        return (host.to_string(), default_port);
821    }
822
823    // Handle hostname/IP with port: host:6379
824    if host.matches(':').count() == 1
825        && let Some((host_part, port_part)) = host.rsplit_once(':')
826        && let Ok(port) = port_part.parse::<u16>()
827    {
828        return (host_part.to_string(), port);
829    }
830
831    (host.to_string(), default_port)
832}
833
834fn build_redis_url(
835    scheme: &str,
836    host: &str,
837    port: u16,
838    username: Option<&str>,
839    password: Option<&str>,
840) -> String {
841    let mut url = format!("{scheme}://");
842
843    if let Some(user) = username {
844        url.push_str(&urlencoding::encode(user));
845        if let Some(pass) = password {
846            url.push(':');
847            url.push_str(&urlencoding::encode(pass));
848        }
849        url.push('@');
850    } else if let Some(pass) = password {
851        url.push(':');
852        url.push_str(&urlencoding::encode(pass));
853        url.push('@');
854    }
855
856    if host.contains(':') && !host.starts_with('[') {
857        url.push('[');
858        url.push_str(host);
859        url.push(']');
860    } else {
861        url.push_str(host);
862    }
863    url.push(':');
864    url.push_str(&port.to_string());
865    url
866}
867
868#[derive(Debug, Clone, Serialize, Deserialize)]
869#[serde(default)]
870pub struct DatabasePooling {
871    pub enabled: bool,
872    pub min: u32,
873    pub max: u32,
874}
875
876#[derive(Debug, Clone, Serialize, Deserialize)]
877#[serde(default)]
878pub struct EventLimits {
879    pub max_channels_at_once: u32,
880    pub max_name_length: u32,
881    pub max_payload_in_kb: u32,
882    pub max_batch_size: u32,
883}
884
885#[derive(Debug, Clone, Serialize, Deserialize)]
886#[serde(default)]
887pub struct HttpApiConfig {
888    pub request_limit_in_mb: u32,
889    pub accept_traffic: AcceptTraffic,
890    pub usage_enabled: bool,
891}
892
893#[derive(Debug, Clone, Serialize, Deserialize)]
894#[serde(default)]
895pub struct AcceptTraffic {
896    pub memory_threshold: f64,
897}
898
899#[derive(Debug, Clone, Serialize, Deserialize)]
900#[serde(default)]
901pub struct InstanceConfig {
902    pub process_id: String,
903}
904
905#[derive(Debug, Clone, Serialize, Deserialize)]
906#[serde(default)]
907pub struct MetricsConfig {
908    pub enabled: bool,
909    pub driver: MetricsDriver,
910    pub host: String,
911    pub prometheus: PrometheusConfig,
912    pub port: u16,
913}
914
915#[derive(Debug, Clone, Serialize, Deserialize)]
916#[serde(default)]
917pub struct PrometheusConfig {
918    pub prefix: String,
919}
920
921#[derive(Debug, Clone, Serialize, Deserialize)]
922#[serde(default)]
923pub struct LoggingConfig {
924    pub colors_enabled: bool,
925    pub include_target: bool,
926}
927
928#[derive(Debug, Clone, Serialize, Deserialize)]
929#[serde(default)]
930pub struct PresenceConfig {
931    pub max_members_per_channel: u32,
932    pub max_member_size_in_kb: u32,
933}
934
935/// WebSocket connection buffer configuration
936/// Controls backpressure handling for slow consumers
937#[derive(Debug, Clone, Serialize, Deserialize)]
938#[serde(default)]
939pub struct WebSocketConfig {
940    pub max_messages: Option<usize>,
941    pub max_bytes: Option<usize>,
942    pub disconnect_on_buffer_full: bool,
943    pub max_message_size: usize,
944    pub max_frame_size: usize,
945    pub write_buffer_size: usize,
946    pub max_backpressure: usize,
947    pub auto_ping: bool,
948    pub ping_interval: u32,
949    pub idle_timeout: u32,
950    pub compression: String,
951}
952
953impl Default for WebSocketConfig {
954    fn default() -> Self {
955        Self {
956            max_messages: Some(1000),
957            max_bytes: None,
958            disconnect_on_buffer_full: true,
959            max_message_size: 64 * 1024 * 1024,
960            max_frame_size: 16 * 1024 * 1024,
961            write_buffer_size: 16 * 1024,
962            max_backpressure: 1024 * 1024,
963            auto_ping: true,
964            ping_interval: 30,
965            idle_timeout: 120,
966            compression: "disabled".to_string(),
967        }
968    }
969}
970
971impl WebSocketConfig {
972    /// Convert to WebSocketBufferConfig for runtime use
973    pub fn to_buffer_config(&self) -> crate::websocket::WebSocketBufferConfig {
974        use crate::websocket::{BufferLimit, WebSocketBufferConfig};
975
976        let limit = match (self.max_messages, self.max_bytes) {
977            (Some(messages), Some(bytes)) => BufferLimit::Both { messages, bytes },
978            (Some(messages), None) => BufferLimit::Messages(messages),
979            (None, Some(bytes)) => BufferLimit::Bytes(bytes),
980            (None, None) => BufferLimit::Messages(1000),
981        };
982
983        WebSocketBufferConfig {
984            limit,
985            disconnect_on_full: self.disconnect_on_buffer_full,
986        }
987    }
988
989    /// Convert to native sockudo-ws runtime configuration.
990    pub fn to_sockudo_ws_config(
991        &self,
992        websocket_max_payload_kb: u32,
993        activity_timeout: u64,
994    ) -> sockudo_ws::Config {
995        use sockudo_ws::Compression;
996
997        let compression = match self.compression.to_lowercase().as_str() {
998            "dedicated" => Compression::Dedicated,
999            "shared" => Compression::Shared,
1000            "window256b" => Compression::Window256B,
1001            "window1kb" => Compression::Window1KB,
1002            "window2kb" => Compression::Window2KB,
1003            "window4kb" => Compression::Window4KB,
1004            "window8kb" => Compression::Window8KB,
1005            "window16kb" => Compression::Window16KB,
1006            "window32kb" => Compression::Window32KB,
1007            _ => Compression::Disabled,
1008        };
1009
1010        sockudo_ws::Config::builder()
1011            .max_payload_length(
1012                self.max_bytes
1013                    .unwrap_or(websocket_max_payload_kb as usize * 1024),
1014            )
1015            .max_message_size(self.max_message_size)
1016            .max_frame_size(self.max_frame_size)
1017            .write_buffer_size(self.write_buffer_size)
1018            .max_backpressure(self.max_backpressure)
1019            .idle_timeout(self.idle_timeout)
1020            .auto_ping(self.auto_ping)
1021            .ping_interval(self.ping_interval.max((activity_timeout / 2).max(5) as u32))
1022            .compression(compression)
1023            .build()
1024    }
1025}
1026
1027#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1028#[serde(default)]
1029pub struct QueueConfig {
1030    pub driver: QueueDriver,
1031    pub redis: RedisQueueConfig,
1032    pub redis_cluster: RedisClusterQueueConfig,
1033    pub sqs: SqsQueueConfig,
1034    pub sns: SnsQueueConfig,
1035}
1036
1037#[derive(Debug, Clone, Serialize, Deserialize)]
1038#[serde(default)]
1039pub struct RedisQueueConfig {
1040    pub concurrency: u32,
1041    pub prefix: Option<String>,
1042    pub url_override: Option<String>,
1043    pub cluster_mode: bool,
1044}
1045
1046#[derive(Clone, Debug, Serialize, Deserialize)]
1047#[serde(default)]
1048pub struct RateLimit {
1049    pub max_requests: u32,
1050    pub window_seconds: u64,
1051    pub identifier: Option<String>,
1052    pub trust_hops: Option<u32>,
1053}
1054
1055#[derive(Debug, Clone, Serialize, Deserialize)]
1056#[serde(default)]
1057pub struct RateLimiterConfig {
1058    pub enabled: bool,
1059    pub driver: CacheDriver,
1060    pub api_rate_limit: RateLimit,
1061    pub websocket_rate_limit: RateLimit,
1062    pub redis: RedisConfig,
1063}
1064
1065#[derive(Debug, Clone, Serialize, Deserialize)]
1066#[serde(default)]
1067pub struct SslConfig {
1068    pub enabled: bool,
1069    pub cert_path: String,
1070    pub key_path: String,
1071    pub passphrase: Option<String>,
1072    pub ca_path: Option<String>,
1073    pub redirect_http: bool,
1074    pub http_port: Option<u16>,
1075}
1076
1077#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1078#[serde(default)]
1079pub struct WebhooksConfig {
1080    pub batching: BatchingConfig,
1081}
1082
1083#[derive(Debug, Clone, Serialize, Deserialize)]
1084#[serde(default)]
1085pub struct BatchingConfig {
1086    pub enabled: bool,
1087    pub duration: u64,
1088    pub size: usize,
1089}
1090
1091#[derive(Debug, Clone, Serialize, Deserialize)]
1092#[serde(default)]
1093pub struct ClusterHealthConfig {
1094    pub enabled: bool,
1095    pub heartbeat_interval_ms: u64,
1096    pub node_timeout_ms: u64,
1097    pub cleanup_interval_ms: u64,
1098}
1099
1100#[derive(Debug, Clone, Serialize, Deserialize)]
1101#[serde(default)]
1102pub struct UnixSocketConfig {
1103    pub enabled: bool,
1104    pub path: String,
1105    #[serde(deserialize_with = "deserialize_octal_permission")]
1106    pub permission_mode: u32,
1107}
1108
1109#[derive(Debug, Clone, Serialize, Deserialize)]
1110#[serde(default)]
1111pub struct DeltaCompressionOptionsConfig {
1112    pub enabled: bool,
1113    pub algorithm: String,
1114    pub full_message_interval: u32,
1115    pub min_message_size: usize,
1116    pub max_state_age_secs: u64,
1117    pub max_channel_states_per_socket: usize,
1118    pub max_conflation_states_per_channel: Option<usize>,
1119    pub conflation_key_path: Option<String>,
1120    pub cluster_coordination: bool,
1121    pub omit_delta_algorithm: bool,
1122}
1123
1124/// Cleanup system configuration (minimal version for options; full impl lives in sockudo crate)
1125#[derive(Debug, Clone, Serialize, Deserialize)]
1126#[serde(default)]
1127pub struct CleanupConfig {
1128    pub queue_buffer_size: usize,
1129    pub batch_size: usize,
1130    pub batch_timeout_ms: u64,
1131    pub worker_threads: WorkerThreadsConfig,
1132    pub max_retry_attempts: u32,
1133    pub async_enabled: bool,
1134    pub fallback_to_sync: bool,
1135}
1136
1137/// Worker threads configuration for the cleanup system
1138#[derive(Debug, Clone)]
1139pub enum WorkerThreadsConfig {
1140    Auto,
1141    Fixed(usize),
1142}
1143
1144impl serde::Serialize for WorkerThreadsConfig {
1145    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1146    where
1147        S: serde::Serializer,
1148    {
1149        match self {
1150            WorkerThreadsConfig::Auto => serializer.serialize_str("auto"),
1151            WorkerThreadsConfig::Fixed(n) => serializer.serialize_u64(*n as u64),
1152        }
1153    }
1154}
1155
1156impl<'de> serde::Deserialize<'de> for WorkerThreadsConfig {
1157    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1158    where
1159        D: serde::Deserializer<'de>,
1160    {
1161        use serde::de;
1162        struct WorkerThreadsVisitor;
1163        impl<'de> de::Visitor<'de> for WorkerThreadsVisitor {
1164            type Value = WorkerThreadsConfig;
1165
1166            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1167                formatter.write_str(r#""auto" or a positive integer"#)
1168            }
1169
1170            fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
1171                if value.eq_ignore_ascii_case("auto") {
1172                    Ok(WorkerThreadsConfig::Auto)
1173                } else if let Ok(n) = value.parse::<usize>() {
1174                    Ok(WorkerThreadsConfig::Fixed(n))
1175                } else {
1176                    Err(E::custom(format!(
1177                        "expected 'auto' or a number, got '{value}'"
1178                    )))
1179                }
1180            }
1181
1182            fn visit_u64<E: de::Error>(self, value: u64) -> Result<Self::Value, E> {
1183                Ok(WorkerThreadsConfig::Fixed(value as usize))
1184            }
1185
1186            fn visit_i64<E: de::Error>(self, value: i64) -> Result<Self::Value, E> {
1187                if value >= 0 {
1188                    Ok(WorkerThreadsConfig::Fixed(value as usize))
1189                } else {
1190                    Err(E::custom("worker_threads must be non-negative"))
1191                }
1192            }
1193        }
1194        deserializer.deserialize_any(WorkerThreadsVisitor)
1195    }
1196}
1197
1198impl Default for CleanupConfig {
1199    fn default() -> Self {
1200        Self {
1201            queue_buffer_size: 1024,
1202            batch_size: 64,
1203            batch_timeout_ms: 100,
1204            worker_threads: WorkerThreadsConfig::Auto,
1205            max_retry_attempts: 3,
1206            async_enabled: true,
1207            fallback_to_sync: true,
1208        }
1209    }
1210}
1211
1212impl CleanupConfig {
1213    pub fn validate(&self) -> Result<(), String> {
1214        if self.queue_buffer_size == 0 {
1215            return Err("queue_buffer_size must be greater than 0".to_string());
1216        }
1217        if self.batch_size == 0 {
1218            return Err("batch_size must be greater than 0".to_string());
1219        }
1220        if self.batch_timeout_ms == 0 {
1221            return Err("batch_timeout_ms must be greater than 0".to_string());
1222        }
1223        if let WorkerThreadsConfig::Fixed(n) = self.worker_threads
1224            && n == 0
1225        {
1226            return Err("worker_threads must be greater than 0 when using fixed count".to_string());
1227        }
1228        Ok(())
1229    }
1230}
1231
1232#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1233#[serde(default)]
1234pub struct TagFilteringConfig {
1235    #[serde(default)]
1236    pub enabled: bool,
1237    #[serde(default = "default_true")]
1238    pub enable_tags: bool,
1239}
1240
1241fn default_true() -> bool {
1242    true
1243}
1244
1245// --- Default Implementations ---
1246
1247impl Default for ServerOptions {
1248    fn default() -> Self {
1249        Self {
1250            adapter: AdapterConfig::default(),
1251            app_manager: AppManagerConfig::default(),
1252            cache: CacheConfig::default(),
1253            channel_limits: ChannelLimits::default(),
1254            cors: CorsConfig::default(),
1255            database: DatabaseConfig::default(),
1256            database_pooling: DatabasePooling::default(),
1257            debug: false,
1258            tag_filtering: TagFilteringConfig::default(),
1259            event_limits: EventLimits::default(),
1260            host: "0.0.0.0".to_string(),
1261            http_api: HttpApiConfig::default(),
1262            instance: InstanceConfig::default(),
1263            logging: None,
1264            metrics: MetricsConfig::default(),
1265            mode: "production".to_string(),
1266            port: 6001,
1267            path_prefix: "/".to_string(),
1268            presence: PresenceConfig::default(),
1269            queue: QueueConfig::default(),
1270            rate_limiter: RateLimiterConfig::default(),
1271            shutdown_grace_period: 10,
1272            ssl: SslConfig::default(),
1273            user_authentication_timeout: 3600,
1274            webhooks: WebhooksConfig::default(),
1275            websocket_max_payload_kb: 64,
1276            cleanup: CleanupConfig::default(),
1277            activity_timeout: 120,
1278            cluster_health: ClusterHealthConfig::default(),
1279            unix_socket: UnixSocketConfig::default(),
1280            delta_compression: DeltaCompressionOptionsConfig::default(),
1281            websocket: WebSocketConfig::default(),
1282        }
1283    }
1284}
1285
1286impl Default for SqsQueueConfig {
1287    fn default() -> Self {
1288        Self {
1289            region: "us-east-1".to_string(),
1290            queue_url_prefix: None,
1291            visibility_timeout: 30,
1292            endpoint_url: None,
1293            max_messages: 10,
1294            wait_time_seconds: 5,
1295            concurrency: 5,
1296            fifo: false,
1297            message_group_id: Some("default".to_string()),
1298        }
1299    }
1300}
1301
1302impl Default for SnsQueueConfig {
1303    fn default() -> Self {
1304        Self {
1305            region: "us-east-1".to_string(),
1306            topic_arn: String::new(),
1307            endpoint_url: None,
1308        }
1309    }
1310}
1311
1312impl Default for RedisAdapterConfig {
1313    fn default() -> Self {
1314        Self {
1315            requests_timeout: 5000,
1316            prefix: "sockudo_adapter:".to_string(),
1317            redis_pub_options: AHashMap::new(),
1318            redis_sub_options: AHashMap::new(),
1319            cluster_mode: false,
1320        }
1321    }
1322}
1323
1324impl Default for RedisClusterAdapterConfig {
1325    fn default() -> Self {
1326        Self {
1327            nodes: vec![],
1328            prefix: "sockudo_adapter:".to_string(),
1329            request_timeout_ms: 1000,
1330            use_connection_manager: true,
1331            use_sharded_pubsub: false,
1332        }
1333    }
1334}
1335
1336impl Default for NatsAdapterConfig {
1337    fn default() -> Self {
1338        Self {
1339            servers: vec!["nats://localhost:4222".to_string()],
1340            prefix: "sockudo_adapter:".to_string(),
1341            request_timeout_ms: 5000,
1342            username: None,
1343            password: None,
1344            token: None,
1345            connection_timeout_ms: 5000,
1346            nodes_number: None,
1347        }
1348    }
1349}
1350
1351impl Default for CacheSettings {
1352    fn default() -> Self {
1353        Self {
1354            enabled: true,
1355            ttl: 300,
1356        }
1357    }
1358}
1359
1360impl Default for MemoryCacheOptions {
1361    fn default() -> Self {
1362        Self {
1363            ttl: 300,
1364            cleanup_interval: 60,
1365            max_capacity: 10000,
1366        }
1367    }
1368}
1369
1370impl Default for CacheConfig {
1371    fn default() -> Self {
1372        Self {
1373            driver: CacheDriver::default(),
1374            redis: RedisConfig {
1375                prefix: Some("sockudo_cache:".to_string()),
1376                url_override: None,
1377                cluster_mode: false,
1378            },
1379            memory: MemoryCacheOptions::default(),
1380        }
1381    }
1382}
1383
1384impl Default for ChannelLimits {
1385    fn default() -> Self {
1386        Self {
1387            max_name_length: 200,
1388            cache_ttl: 3600,
1389        }
1390    }
1391}
1392impl Default for CorsConfig {
1393    fn default() -> Self {
1394        Self {
1395            credentials: true,
1396            origin: vec!["*".to_string()],
1397            methods: vec!["GET".to_string(), "POST".to_string(), "OPTIONS".to_string()],
1398            allowed_headers: vec![
1399                "Authorization".to_string(),
1400                "Content-Type".to_string(),
1401                "X-Requested-With".to_string(),
1402                "Accept".to_string(),
1403            ],
1404        }
1405    }
1406}
1407
1408impl Default for DatabaseConnection {
1409    fn default() -> Self {
1410        Self {
1411            host: "localhost".to_string(),
1412            port: 3306,
1413            username: "root".to_string(),
1414            password: "".to_string(),
1415            database: "sockudo".to_string(),
1416            table_name: "applications".to_string(),
1417            connection_pool_size: 10,
1418            pool_min: None,
1419            pool_max: None,
1420            cache_ttl: 300,
1421            cache_cleanup_interval: 60,
1422            cache_max_capacity: 100,
1423        }
1424    }
1425}
1426
1427impl Default for RedisConnection {
1428    fn default() -> Self {
1429        Self {
1430            host: "127.0.0.1".to_string(),
1431            port: 6379,
1432            db: 0,
1433            username: None,
1434            password: None,
1435            key_prefix: "sockudo:".to_string(),
1436            sentinels: Vec::new(),
1437            sentinel_password: None,
1438            name: "mymaster".to_string(),
1439            cluster: RedisClusterConnection::default(),
1440            cluster_nodes: Vec::new(),
1441        }
1442    }
1443}
1444
1445impl Default for RedisSentinel {
1446    fn default() -> Self {
1447        Self {
1448            host: "localhost".to_string(),
1449            port: 26379,
1450        }
1451    }
1452}
1453
1454impl Default for ClusterNode {
1455    fn default() -> Self {
1456        Self {
1457            host: "127.0.0.1".to_string(),
1458            port: 7000,
1459        }
1460    }
1461}
1462
1463impl Default for DatabasePooling {
1464    fn default() -> Self {
1465        Self {
1466            enabled: true,
1467            min: 2,
1468            max: 10,
1469        }
1470    }
1471}
1472
1473impl Default for EventLimits {
1474    fn default() -> Self {
1475        Self {
1476            max_channels_at_once: 100,
1477            max_name_length: 200,
1478            max_payload_in_kb: 100,
1479            max_batch_size: 10,
1480        }
1481    }
1482}
1483
1484impl Default for HttpApiConfig {
1485    fn default() -> Self {
1486        Self {
1487            request_limit_in_mb: 10,
1488            accept_traffic: AcceptTraffic::default(),
1489            usage_enabled: true,
1490        }
1491    }
1492}
1493
1494impl Default for AcceptTraffic {
1495    fn default() -> Self {
1496        Self {
1497            memory_threshold: 0.90,
1498        }
1499    }
1500}
1501
1502impl Default for InstanceConfig {
1503    fn default() -> Self {
1504        Self {
1505            process_id: uuid::Uuid::new_v4().to_string(),
1506        }
1507    }
1508}
1509
1510impl Default for LoggingConfig {
1511    fn default() -> Self {
1512        Self {
1513            colors_enabled: true,
1514            include_target: true,
1515        }
1516    }
1517}
1518
1519impl Default for MetricsConfig {
1520    fn default() -> Self {
1521        Self {
1522            enabled: true,
1523            driver: MetricsDriver::default(),
1524            host: "0.0.0.0".to_string(),
1525            prometheus: PrometheusConfig::default(),
1526            port: 9601,
1527        }
1528    }
1529}
1530
1531impl Default for PrometheusConfig {
1532    fn default() -> Self {
1533        Self {
1534            prefix: "sockudo_".to_string(),
1535        }
1536    }
1537}
1538
1539impl Default for PresenceConfig {
1540    fn default() -> Self {
1541        Self {
1542            max_members_per_channel: 100,
1543            max_member_size_in_kb: 2,
1544        }
1545    }
1546}
1547
1548impl Default for RedisQueueConfig {
1549    fn default() -> Self {
1550        Self {
1551            concurrency: 5,
1552            prefix: Some("sockudo_queue:".to_string()),
1553            url_override: None,
1554            cluster_mode: false,
1555        }
1556    }
1557}
1558
1559impl Default for RateLimit {
1560    fn default() -> Self {
1561        Self {
1562            max_requests: 60,
1563            window_seconds: 60,
1564            identifier: Some("default".to_string()),
1565            trust_hops: Some(0),
1566        }
1567    }
1568}
1569
1570impl Default for RateLimiterConfig {
1571    fn default() -> Self {
1572        Self {
1573            enabled: true,
1574            driver: CacheDriver::Memory,
1575            api_rate_limit: RateLimit {
1576                max_requests: 100,
1577                window_seconds: 60,
1578                identifier: Some("api".to_string()),
1579                trust_hops: Some(0),
1580            },
1581            websocket_rate_limit: RateLimit {
1582                max_requests: 20,
1583                window_seconds: 60,
1584                identifier: Some("websocket_connect".to_string()),
1585                trust_hops: Some(0),
1586            },
1587            redis: RedisConfig {
1588                prefix: Some("sockudo_rl:".to_string()),
1589                url_override: None,
1590                cluster_mode: false,
1591            },
1592        }
1593    }
1594}
1595
1596impl Default for SslConfig {
1597    fn default() -> Self {
1598        Self {
1599            enabled: false,
1600            cert_path: "".to_string(),
1601            key_path: "".to_string(),
1602            passphrase: None,
1603            ca_path: None,
1604            redirect_http: false,
1605            http_port: Some(80),
1606        }
1607    }
1608}
1609
1610impl Default for BatchingConfig {
1611    fn default() -> Self {
1612        Self {
1613            enabled: true,
1614            duration: 50,
1615            size: 100,
1616        }
1617    }
1618}
1619
1620impl Default for ClusterHealthConfig {
1621    fn default() -> Self {
1622        Self {
1623            enabled: true,
1624            heartbeat_interval_ms: 10000,
1625            node_timeout_ms: 30000,
1626            cleanup_interval_ms: 10000,
1627        }
1628    }
1629}
1630
1631impl Default for UnixSocketConfig {
1632    fn default() -> Self {
1633        Self {
1634            enabled: false,
1635            path: "/var/run/sockudo/sockudo.sock".to_string(),
1636            permission_mode: 0o660,
1637        }
1638    }
1639}
1640
1641impl Default for DeltaCompressionOptionsConfig {
1642    fn default() -> Self {
1643        Self {
1644            enabled: true,
1645            algorithm: "fossil".to_string(),
1646            full_message_interval: 10,
1647            min_message_size: 100,
1648            max_state_age_secs: 300,
1649            max_channel_states_per_socket: 100,
1650            max_conflation_states_per_channel: Some(100),
1651            conflation_key_path: None,
1652            cluster_coordination: false,
1653            omit_delta_algorithm: false,
1654        }
1655    }
1656}
1657
1658impl ClusterHealthConfig {
1659    pub fn validate(&self) -> Result<(), String> {
1660        if self.heartbeat_interval_ms == 0 {
1661            return Err("heartbeat_interval_ms must be greater than 0".to_string());
1662        }
1663        if self.node_timeout_ms == 0 {
1664            return Err("node_timeout_ms must be greater than 0".to_string());
1665        }
1666        if self.cleanup_interval_ms == 0 {
1667            return Err("cleanup_interval_ms must be greater than 0".to_string());
1668        }
1669
1670        if self.heartbeat_interval_ms > self.node_timeout_ms / 3 {
1671            return Err(format!(
1672                "heartbeat_interval_ms ({}) should be at least 3x smaller than node_timeout_ms ({}) to avoid false positive dead node detection. Recommended: heartbeat_interval_ms <= {}",
1673                self.heartbeat_interval_ms,
1674                self.node_timeout_ms,
1675                self.node_timeout_ms / 3
1676            ));
1677        }
1678
1679        if self.cleanup_interval_ms > self.node_timeout_ms {
1680            return Err(format!(
1681                "cleanup_interval_ms ({}) should not be larger than node_timeout_ms ({}) to ensure timely dead node detection",
1682                self.cleanup_interval_ms, self.node_timeout_ms
1683            ));
1684        }
1685
1686        Ok(())
1687    }
1688}
1689
1690impl ServerOptions {
1691    pub async fn load_from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
1692        let content = tokio::fs::read_to_string(path).await?;
1693        let options: Self = sonic_rs::from_str(&content)?;
1694        Ok(options)
1695    }
1696
1697    pub async fn override_from_env(&mut self) -> Result<(), Box<dyn std::error::Error>> {
1698        // --- General Settings ---
1699        if let Ok(mode) = std::env::var("ENVIRONMENT") {
1700            self.mode = mode;
1701        }
1702        self.debug = parse_bool_env("DEBUG_MODE", self.debug);
1703        if parse_bool_env("DEBUG", false) {
1704            self.debug = true;
1705            info!("DEBUG environment variable forces debug mode ON");
1706        }
1707
1708        self.activity_timeout = parse_env::<u64>("ACTIVITY_TIMEOUT", self.activity_timeout);
1709
1710        if let Ok(host) = std::env::var("HOST") {
1711            self.host = host;
1712        }
1713        self.port = parse_env::<u16>("PORT", self.port);
1714        self.shutdown_grace_period =
1715            parse_env::<u64>("SHUTDOWN_GRACE_PERIOD", self.shutdown_grace_period);
1716        self.user_authentication_timeout = parse_env::<u64>(
1717            "USER_AUTHENTICATION_TIMEOUT",
1718            self.user_authentication_timeout,
1719        );
1720        self.websocket_max_payload_kb =
1721            parse_env::<u32>("WEBSOCKET_MAX_PAYLOAD_KB", self.websocket_max_payload_kb);
1722        if let Ok(id) = std::env::var("INSTANCE_PROCESS_ID") {
1723            self.instance.process_id = id;
1724        }
1725
1726        // --- Driver Configuration ---
1727        if let Ok(driver_str) = std::env::var("ADAPTER_DRIVER") {
1728            self.adapter.driver =
1729                parse_driver_enum(driver_str, self.adapter.driver.clone(), "Adapter");
1730        }
1731        self.adapter.buffer_multiplier_per_cpu = parse_env::<usize>(
1732            "ADAPTER_BUFFER_MULTIPLIER_PER_CPU",
1733            self.adapter.buffer_multiplier_per_cpu,
1734        );
1735        self.adapter.enable_socket_counting = parse_env::<bool>(
1736            "ADAPTER_ENABLE_SOCKET_COUNTING",
1737            self.adapter.enable_socket_counting,
1738        );
1739        if let Ok(driver_str) = std::env::var("CACHE_DRIVER") {
1740            self.cache.driver = parse_driver_enum(driver_str, self.cache.driver.clone(), "Cache");
1741        }
1742        if let Ok(driver_str) = std::env::var("QUEUE_DRIVER") {
1743            self.queue.driver = parse_driver_enum(driver_str, self.queue.driver.clone(), "Queue");
1744        }
1745        if let Ok(driver_str) = std::env::var("APP_MANAGER_DRIVER") {
1746            self.app_manager.driver =
1747                parse_driver_enum(driver_str, self.app_manager.driver.clone(), "AppManager");
1748        }
1749        if let Ok(driver_str) = std::env::var("RATE_LIMITER_DRIVER") {
1750            self.rate_limiter.driver = parse_driver_enum(
1751                driver_str,
1752                self.rate_limiter.driver.clone(),
1753                "RateLimiter Backend",
1754            );
1755        }
1756
1757        // --- Database: Redis ---
1758        if let Ok(host) = std::env::var("DATABASE_REDIS_HOST") {
1759            self.database.redis.host = host;
1760        }
1761        self.database.redis.port =
1762            parse_env::<u16>("DATABASE_REDIS_PORT", self.database.redis.port);
1763        if let Ok(username) = std::env::var("DATABASE_REDIS_USERNAME") {
1764            self.database.redis.username = if username.is_empty() {
1765                None
1766            } else {
1767                Some(username)
1768            };
1769        }
1770        if let Ok(password) = std::env::var("DATABASE_REDIS_PASSWORD") {
1771            self.database.redis.password = Some(password);
1772        }
1773        self.database.redis.db = parse_env::<u32>("DATABASE_REDIS_DB", self.database.redis.db);
1774        if let Ok(prefix) = std::env::var("DATABASE_REDIS_KEY_PREFIX") {
1775            self.database.redis.key_prefix = prefix;
1776        }
1777        if let Ok(cluster_username) = std::env::var("DATABASE_REDIS_CLUSTER_USERNAME") {
1778            self.database.redis.cluster.username = if cluster_username.is_empty() {
1779                None
1780            } else {
1781                Some(cluster_username)
1782            };
1783        }
1784        if let Ok(cluster_password) = std::env::var("DATABASE_REDIS_CLUSTER_PASSWORD") {
1785            self.database.redis.cluster.password = Some(cluster_password);
1786        }
1787        self.database.redis.cluster.use_tls = parse_bool_env(
1788            "DATABASE_REDIS_CLUSTER_USE_TLS",
1789            self.database.redis.cluster.use_tls,
1790        );
1791
1792        // --- Database: MySQL ---
1793        if let Ok(host) = std::env::var("DATABASE_MYSQL_HOST") {
1794            self.database.mysql.host = host;
1795        }
1796        self.database.mysql.port =
1797            parse_env::<u16>("DATABASE_MYSQL_PORT", self.database.mysql.port);
1798        if let Ok(user) = std::env::var("DATABASE_MYSQL_USERNAME") {
1799            self.database.mysql.username = user;
1800        }
1801        if let Ok(pass) = std::env::var("DATABASE_MYSQL_PASSWORD") {
1802            self.database.mysql.password = pass;
1803        }
1804        if let Ok(db) = std::env::var("DATABASE_MYSQL_DATABASE") {
1805            self.database.mysql.database = db;
1806        }
1807        if let Ok(table) = std::env::var("DATABASE_MYSQL_TABLE_NAME") {
1808            self.database.mysql.table_name = table;
1809        }
1810        override_db_pool_settings(&mut self.database.mysql, "DATABASE_MYSQL");
1811
1812        // --- Database: PostgreSQL ---
1813        if let Ok(host) = std::env::var("DATABASE_POSTGRES_HOST") {
1814            self.database.postgres.host = host;
1815        }
1816        self.database.postgres.port =
1817            parse_env::<u16>("DATABASE_POSTGRES_PORT", self.database.postgres.port);
1818        if let Ok(user) = std::env::var("DATABASE_POSTGRES_USERNAME") {
1819            self.database.postgres.username = user;
1820        }
1821        if let Ok(pass) = std::env::var("DATABASE_POSTGRES_PASSWORD") {
1822            self.database.postgres.password = pass;
1823        }
1824        if let Ok(db) = std::env::var("DATABASE_POSTGRES_DATABASE") {
1825            self.database.postgres.database = db;
1826        }
1827        override_db_pool_settings(&mut self.database.postgres, "DATABASE_POSTGRES");
1828
1829        // --- Database: DynamoDB ---
1830        if let Ok(region) = std::env::var("DATABASE_DYNAMODB_REGION") {
1831            self.database.dynamodb.region = region;
1832        }
1833        if let Ok(table) = std::env::var("DATABASE_DYNAMODB_TABLE_NAME") {
1834            self.database.dynamodb.table_name = table;
1835        }
1836        if let Ok(endpoint) = std::env::var("DATABASE_DYNAMODB_ENDPOINT_URL") {
1837            self.database.dynamodb.endpoint_url = Some(endpoint);
1838        }
1839        if let Ok(key_id) = std::env::var("AWS_ACCESS_KEY_ID") {
1840            self.database.dynamodb.aws_access_key_id = Some(key_id);
1841        }
1842        if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") {
1843            self.database.dynamodb.aws_secret_access_key = Some(secret);
1844        }
1845
1846        // --- Redis Cluster ---
1847        let apply_redis_cluster_nodes = |options: &mut Self, nodes: &str| {
1848            let node_list: Vec<String> = nodes
1849                .split(',')
1850                .map(|s| s.trim())
1851                .filter(|s| !s.is_empty())
1852                .map(ToString::to_string)
1853                .collect();
1854
1855            options.adapter.cluster.nodes = node_list.clone();
1856            options.queue.redis_cluster.nodes = node_list.clone();
1857
1858            let parsed_nodes: Vec<ClusterNode> = node_list
1859                .iter()
1860                .filter_map(|seed| ClusterNode::from_seed(seed))
1861                .collect();
1862            options.database.redis.cluster.nodes = parsed_nodes.clone();
1863            options.database.redis.cluster_nodes = parsed_nodes;
1864        };
1865
1866        if let Ok(nodes) = std::env::var("REDIS_CLUSTER_NODES") {
1867            apply_redis_cluster_nodes(self, &nodes);
1868        }
1869        if let Ok(nodes) = std::env::var("DATABASE_REDIS_CLUSTER_NODES") {
1870            apply_redis_cluster_nodes(self, &nodes);
1871        }
1872        self.queue.redis_cluster.concurrency = parse_env::<u32>(
1873            "REDIS_CLUSTER_QUEUE_CONCURRENCY",
1874            self.queue.redis_cluster.concurrency,
1875        );
1876        if let Ok(prefix) = std::env::var("REDIS_CLUSTER_QUEUE_PREFIX") {
1877            self.queue.redis_cluster.prefix = Some(prefix);
1878        }
1879
1880        // --- SSL Configuration ---
1881        self.ssl.enabled = parse_bool_env("SSL_ENABLED", self.ssl.enabled);
1882        if let Ok(val) = std::env::var("SSL_CERT_PATH") {
1883            self.ssl.cert_path = val;
1884        }
1885        if let Ok(val) = std::env::var("SSL_KEY_PATH") {
1886            self.ssl.key_path = val;
1887        }
1888        self.ssl.redirect_http = parse_bool_env("SSL_REDIRECT_HTTP", self.ssl.redirect_http);
1889        if let Some(port) = parse_env_optional::<u16>("SSL_HTTP_PORT") {
1890            self.ssl.http_port = Some(port);
1891        }
1892
1893        // --- Unix Socket Configuration ---
1894        self.unix_socket.enabled = parse_bool_env("UNIX_SOCKET_ENABLED", self.unix_socket.enabled);
1895        if let Ok(path) = std::env::var("UNIX_SOCKET_PATH") {
1896            self.unix_socket.path = path;
1897        }
1898        if let Ok(mode_str) = std::env::var("UNIX_SOCKET_PERMISSION_MODE") {
1899            if mode_str.chars().all(|c| c.is_digit(8)) {
1900                if let Ok(mode) = u32::from_str_radix(&mode_str, 8) {
1901                    if mode <= 0o777 {
1902                        self.unix_socket.permission_mode = mode;
1903                    } else {
1904                        warn!(
1905                            "UNIX_SOCKET_PERMISSION_MODE '{}' exceeds maximum value 777. Using default: {:o}",
1906                            mode_str, self.unix_socket.permission_mode
1907                        );
1908                    }
1909                } else {
1910                    warn!(
1911                        "Failed to parse UNIX_SOCKET_PERMISSION_MODE '{}' as octal. Using default: {:o}",
1912                        mode_str, self.unix_socket.permission_mode
1913                    );
1914                }
1915            } else {
1916                warn!(
1917                    "UNIX_SOCKET_PERMISSION_MODE '{}' must contain only octal digits (0-7). Using default: {:o}",
1918                    mode_str, self.unix_socket.permission_mode
1919                );
1920            }
1921        }
1922
1923        // --- Metrics ---
1924        if let Ok(driver_str) = std::env::var("METRICS_DRIVER") {
1925            self.metrics.driver =
1926                parse_driver_enum(driver_str, self.metrics.driver.clone(), "Metrics");
1927        }
1928        self.metrics.enabled = parse_bool_env("METRICS_ENABLED", self.metrics.enabled);
1929        if let Ok(val) = std::env::var("METRICS_HOST") {
1930            self.metrics.host = val;
1931        }
1932        self.metrics.port = parse_env::<u16>("METRICS_PORT", self.metrics.port);
1933        if let Ok(val) = std::env::var("METRICS_PROMETHEUS_PREFIX") {
1934            self.metrics.prometheus.prefix = val;
1935        }
1936
1937        // --- HTTP API ---
1938        self.http_api.usage_enabled =
1939            parse_bool_env("HTTP_API_USAGE_ENABLED", self.http_api.usage_enabled);
1940
1941        // --- Rate Limiter ---
1942        self.rate_limiter.enabled =
1943            parse_bool_env("RATE_LIMITER_ENABLED", self.rate_limiter.enabled);
1944        self.rate_limiter.api_rate_limit.max_requests = parse_env::<u32>(
1945            "RATE_LIMITER_API_MAX_REQUESTS",
1946            self.rate_limiter.api_rate_limit.max_requests,
1947        );
1948        self.rate_limiter.api_rate_limit.window_seconds = parse_env::<u64>(
1949            "RATE_LIMITER_API_WINDOW_SECONDS",
1950            self.rate_limiter.api_rate_limit.window_seconds,
1951        );
1952        if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_API_TRUST_HOPS") {
1953            self.rate_limiter.api_rate_limit.trust_hops = Some(hops);
1954        }
1955        self.rate_limiter.websocket_rate_limit.max_requests = parse_env::<u32>(
1956            "RATE_LIMITER_WS_MAX_REQUESTS",
1957            self.rate_limiter.websocket_rate_limit.max_requests,
1958        );
1959        self.rate_limiter.websocket_rate_limit.window_seconds = parse_env::<u64>(
1960            "RATE_LIMITER_WS_WINDOW_SECONDS",
1961            self.rate_limiter.websocket_rate_limit.window_seconds,
1962        );
1963        if let Ok(prefix) = std::env::var("RATE_LIMITER_REDIS_PREFIX") {
1964            self.rate_limiter.redis.prefix = Some(prefix);
1965        }
1966
1967        // --- Queue: Redis ---
1968        self.queue.redis.concurrency =
1969            parse_env::<u32>("QUEUE_REDIS_CONCURRENCY", self.queue.redis.concurrency);
1970        if let Ok(prefix) = std::env::var("QUEUE_REDIS_PREFIX") {
1971            self.queue.redis.prefix = Some(prefix);
1972        }
1973
1974        // --- Queue: SQS ---
1975        if let Ok(region) = std::env::var("QUEUE_SQS_REGION") {
1976            self.queue.sqs.region = region;
1977        }
1978        self.queue.sqs.visibility_timeout = parse_env::<i32>(
1979            "QUEUE_SQS_VISIBILITY_TIMEOUT",
1980            self.queue.sqs.visibility_timeout,
1981        );
1982        self.queue.sqs.max_messages =
1983            parse_env::<i32>("QUEUE_SQS_MAX_MESSAGES", self.queue.sqs.max_messages);
1984        self.queue.sqs.wait_time_seconds = parse_env::<i32>(
1985            "QUEUE_SQS_WAIT_TIME_SECONDS",
1986            self.queue.sqs.wait_time_seconds,
1987        );
1988        self.queue.sqs.concurrency =
1989            parse_env::<u32>("QUEUE_SQS_CONCURRENCY", self.queue.sqs.concurrency);
1990        self.queue.sqs.fifo = parse_bool_env("QUEUE_SQS_FIFO", self.queue.sqs.fifo);
1991        if let Ok(endpoint) = std::env::var("QUEUE_SQS_ENDPOINT_URL") {
1992            self.queue.sqs.endpoint_url = Some(endpoint);
1993        }
1994
1995        // --- Queue: SNS ---
1996        if let Ok(region) = std::env::var("QUEUE_SNS_REGION") {
1997            self.queue.sns.region = region;
1998        }
1999        if let Ok(topic_arn) = std::env::var("QUEUE_SNS_TOPIC_ARN") {
2000            self.queue.sns.topic_arn = topic_arn;
2001        }
2002        if let Ok(endpoint) = std::env::var("QUEUE_SNS_ENDPOINT_URL") {
2003            self.queue.sns.endpoint_url = Some(endpoint);
2004        }
2005
2006        // --- Webhooks ---
2007        self.webhooks.batching.enabled =
2008            parse_bool_env("WEBHOOK_BATCHING_ENABLED", self.webhooks.batching.enabled);
2009        self.webhooks.batching.duration =
2010            parse_env::<u64>("WEBHOOK_BATCHING_DURATION", self.webhooks.batching.duration);
2011        self.webhooks.batching.size =
2012            parse_env::<usize>("WEBHOOK_BATCHING_SIZE", self.webhooks.batching.size);
2013
2014        // --- NATS Adapter ---
2015        if let Ok(servers) = std::env::var("NATS_SERVERS") {
2016            self.adapter.nats.servers = servers.split(',').map(|s| s.trim().to_string()).collect();
2017        }
2018        if let Ok(user) = std::env::var("NATS_USERNAME") {
2019            self.adapter.nats.username = Some(user);
2020        }
2021        if let Ok(pass) = std::env::var("NATS_PASSWORD") {
2022            self.adapter.nats.password = Some(pass);
2023        }
2024        if let Ok(token) = std::env::var("NATS_TOKEN") {
2025            self.adapter.nats.token = Some(token);
2026        }
2027        self.adapter.nats.connection_timeout_ms = parse_env::<u64>(
2028            "NATS_CONNECTION_TIMEOUT_MS",
2029            self.adapter.nats.connection_timeout_ms,
2030        );
2031        self.adapter.nats.request_timeout_ms = parse_env::<u64>(
2032            "NATS_REQUEST_TIMEOUT_MS",
2033            self.adapter.nats.request_timeout_ms,
2034        );
2035
2036        // --- CORS ---
2037        if let Ok(origins) = std::env::var("CORS_ORIGINS") {
2038            self.cors.origin = origins.split(',').map(|s| s.trim().to_string()).collect();
2039        }
2040        if let Ok(methods) = std::env::var("CORS_METHODS") {
2041            self.cors.methods = methods.split(',').map(|s| s.trim().to_string()).collect();
2042        }
2043        if let Ok(headers) = std::env::var("CORS_HEADERS") {
2044            self.cors.allowed_headers = headers.split(',').map(|s| s.trim().to_string()).collect();
2045        }
2046        self.cors.credentials = parse_bool_env("CORS_CREDENTIALS", self.cors.credentials);
2047
2048        // --- Performance Tuning ---
2049        self.database_pooling.enabled =
2050            parse_bool_env("DATABASE_POOLING_ENABLED", self.database_pooling.enabled);
2051        if let Some(min) = parse_env_optional::<u32>("DATABASE_POOL_MIN") {
2052            self.database_pooling.min = min;
2053        }
2054        if let Some(max) = parse_env_optional::<u32>("DATABASE_POOL_MAX") {
2055            self.database_pooling.max = max;
2056        }
2057
2058        if let Some(pool_size) = parse_env_optional::<u32>("DATABASE_CONNECTION_POOL_SIZE") {
2059            self.database.mysql.connection_pool_size = pool_size;
2060            self.database.postgres.connection_pool_size = pool_size;
2061        }
2062        if let Some(cache_ttl) = parse_env_optional::<u64>("CACHE_TTL_SECONDS") {
2063            self.app_manager.cache.ttl = cache_ttl;
2064            self.channel_limits.cache_ttl = cache_ttl;
2065            self.database.mysql.cache_ttl = cache_ttl;
2066            self.database.postgres.cache_ttl = cache_ttl;
2067            self.cache.memory.ttl = cache_ttl;
2068        }
2069        if let Some(cleanup_interval) = parse_env_optional::<u64>("CACHE_CLEANUP_INTERVAL") {
2070            self.database.mysql.cache_cleanup_interval = cleanup_interval;
2071            self.database.postgres.cache_cleanup_interval = cleanup_interval;
2072            self.cache.memory.cleanup_interval = cleanup_interval;
2073        }
2074        if let Some(max_capacity) = parse_env_optional::<u64>("CACHE_MAX_CAPACITY") {
2075            self.database.mysql.cache_max_capacity = max_capacity;
2076            self.database.postgres.cache_max_capacity = max_capacity;
2077            self.cache.memory.max_capacity = max_capacity;
2078        }
2079
2080        let default_app_id = std::env::var("SOCKUDO_DEFAULT_APP_ID");
2081        let default_app_key = std::env::var("SOCKUDO_DEFAULT_APP_KEY");
2082        let default_app_secret = std::env::var("SOCKUDO_DEFAULT_APP_SECRET");
2083        let default_app_enabled = parse_bool_env("SOCKUDO_DEFAULT_APP_ENABLED", true);
2084
2085        if let (Ok(app_id), Ok(app_key), Ok(app_secret)) =
2086            (default_app_id, default_app_key, default_app_secret)
2087            && default_app_enabled
2088        {
2089            let default_app = App {
2090                id: app_id,
2091                key: app_key,
2092                secret: app_secret,
2093                enable_client_messages: parse_bool_env("SOCKUDO_ENABLE_CLIENT_MESSAGES", false),
2094                enabled: default_app_enabled,
2095                max_connections: parse_env::<u32>("SOCKUDO_DEFAULT_APP_MAX_CONNECTIONS", 100),
2096                max_client_events_per_second: parse_env::<u32>(
2097                    "SOCKUDO_DEFAULT_APP_MAX_CLIENT_EVENTS_PER_SECOND",
2098                    100,
2099                ),
2100                max_read_requests_per_second: Some(parse_env::<u32>(
2101                    "SOCKUDO_DEFAULT_APP_MAX_READ_REQUESTS_PER_SECOND",
2102                    100,
2103                )),
2104                max_presence_members_per_channel: Some(parse_env::<u32>(
2105                    "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBERS_PER_CHANNEL",
2106                    100,
2107                )),
2108                max_presence_member_size_in_kb: Some(parse_env::<u32>(
2109                    "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBER_SIZE_IN_KB",
2110                    100,
2111                )),
2112                max_channel_name_length: Some(parse_env::<u32>(
2113                    "SOCKUDO_DEFAULT_APP_MAX_CHANNEL_NAME_LENGTH",
2114                    100,
2115                )),
2116                max_event_channels_at_once: Some(parse_env::<u32>(
2117                    "SOCKUDO_DEFAULT_APP_MAX_EVENT_CHANNELS_AT_ONCE",
2118                    100,
2119                )),
2120                max_event_name_length: Some(parse_env::<u32>(
2121                    "SOCKUDO_DEFAULT_APP_MAX_EVENT_NAME_LENGTH",
2122                    100,
2123                )),
2124                max_event_payload_in_kb: Some(parse_env::<u32>(
2125                    "SOCKUDO_DEFAULT_APP_MAX_EVENT_PAYLOAD_IN_KB",
2126                    100,
2127                )),
2128                max_event_batch_size: Some(parse_env::<u32>(
2129                    "SOCKUDO_DEFAULT_APP_MAX_EVENT_BATCH_SIZE",
2130                    100,
2131                )),
2132                enable_user_authentication: Some(parse_bool_env(
2133                    "SOCKUDO_DEFAULT_APP_ENABLE_USER_AUTHENTICATION",
2134                    false,
2135                )),
2136                webhooks: None,
2137                max_backend_events_per_second: Some(parse_env::<u32>(
2138                    "SOCKUDO_DEFAULT_APP_MAX_BACKEND_EVENTS_PER_SECOND",
2139                    100,
2140                )),
2141                enable_watchlist_events: Some(parse_bool_env(
2142                    "SOCKUDO_DEFAULT_APP_ENABLE_WATCHLIST_EVENTS",
2143                    false,
2144                )),
2145                allowed_origins: {
2146                    if let Ok(origins_str) = std::env::var("SOCKUDO_DEFAULT_APP_ALLOWED_ORIGINS") {
2147                        if !origins_str.is_empty() {
2148                            Some(
2149                                origins_str
2150                                    .split(',')
2151                                    .map(|s| s.trim().to_string())
2152                                    .collect(),
2153                            )
2154                        } else {
2155                            None
2156                        }
2157                    } else {
2158                        None
2159                    }
2160                },
2161                channel_delta_compression: None,
2162            };
2163
2164            self.app_manager.array.apps.push(default_app);
2165            info!("Successfully registered default app from env");
2166        }
2167
2168        // Special handling for REDIS_URL
2169        if let Ok(redis_url_env) = std::env::var("REDIS_URL") {
2170            info!("Applying REDIS_URL environment variable override");
2171
2172            let redis_url_json = sonic_rs::json!(redis_url_env);
2173
2174            self.adapter
2175                .redis
2176                .redis_pub_options
2177                .insert("url".to_string(), redis_url_json.clone());
2178            self.adapter
2179                .redis
2180                .redis_sub_options
2181                .insert("url".to_string(), redis_url_json);
2182
2183            self.cache.redis.url_override = Some(redis_url_env.clone());
2184            self.queue.redis.url_override = Some(redis_url_env.clone());
2185            self.rate_limiter.redis.url_override = Some(redis_url_env);
2186        }
2187
2188        // --- Logging Configuration ---
2189        let has_colors_env = std::env::var("LOG_COLORS_ENABLED").is_ok();
2190        let has_target_env = std::env::var("LOG_INCLUDE_TARGET").is_ok();
2191        if has_colors_env || has_target_env {
2192            let logging_config = self.logging.get_or_insert_with(Default::default);
2193            if has_colors_env {
2194                logging_config.colors_enabled =
2195                    parse_bool_env("LOG_COLORS_ENABLED", logging_config.colors_enabled);
2196            }
2197            if has_target_env {
2198                logging_config.include_target =
2199                    parse_bool_env("LOG_INCLUDE_TARGET", logging_config.include_target);
2200            }
2201        }
2202
2203        // --- Cleanup Configuration ---
2204        self.cleanup.async_enabled =
2205            parse_bool_env("CLEANUP_ASYNC_ENABLED", self.cleanup.async_enabled);
2206        self.cleanup.fallback_to_sync =
2207            parse_bool_env("CLEANUP_FALLBACK_TO_SYNC", self.cleanup.fallback_to_sync);
2208        self.cleanup.queue_buffer_size =
2209            parse_env::<usize>("CLEANUP_QUEUE_BUFFER_SIZE", self.cleanup.queue_buffer_size);
2210        self.cleanup.batch_size = parse_env::<usize>("CLEANUP_BATCH_SIZE", self.cleanup.batch_size);
2211        self.cleanup.batch_timeout_ms =
2212            parse_env::<u64>("CLEANUP_BATCH_TIMEOUT_MS", self.cleanup.batch_timeout_ms);
2213        if let Ok(worker_threads_str) = std::env::var("CLEANUP_WORKER_THREADS") {
2214            self.cleanup.worker_threads = if worker_threads_str.to_lowercase() == "auto" {
2215                WorkerThreadsConfig::Auto
2216            } else if let Ok(n) = worker_threads_str.parse::<usize>() {
2217                WorkerThreadsConfig::Fixed(n)
2218            } else {
2219                warn!(
2220                    "Invalid CLEANUP_WORKER_THREADS value '{}', keeping current setting",
2221                    worker_threads_str
2222                );
2223                self.cleanup.worker_threads.clone()
2224            };
2225        }
2226        self.cleanup.max_retry_attempts = parse_env::<u32>(
2227            "CLEANUP_MAX_RETRY_ATTEMPTS",
2228            self.cleanup.max_retry_attempts,
2229        );
2230
2231        // Cluster health configuration
2232        self.cluster_health.enabled =
2233            parse_bool_env("CLUSTER_HEALTH_ENABLED", self.cluster_health.enabled);
2234        self.cluster_health.heartbeat_interval_ms = parse_env::<u64>(
2235            "CLUSTER_HEALTH_HEARTBEAT_INTERVAL",
2236            self.cluster_health.heartbeat_interval_ms,
2237        );
2238        self.cluster_health.node_timeout_ms = parse_env::<u64>(
2239            "CLUSTER_HEALTH_NODE_TIMEOUT",
2240            self.cluster_health.node_timeout_ms,
2241        );
2242        self.cluster_health.cleanup_interval_ms = parse_env::<u64>(
2243            "CLUSTER_HEALTH_CLEANUP_INTERVAL",
2244            self.cluster_health.cleanup_interval_ms,
2245        );
2246
2247        // Tag filtering configuration
2248        self.tag_filtering.enabled =
2249            parse_bool_env("TAG_FILTERING_ENABLED", self.tag_filtering.enabled);
2250
2251        // WebSocket buffer configuration
2252        if let Ok(val) = std::env::var("WEBSOCKET_MAX_MESSAGES") {
2253            if val.to_lowercase() == "none" || val == "0" {
2254                self.websocket.max_messages = None;
2255            } else if let Ok(n) = val.parse::<usize>() {
2256                self.websocket.max_messages = Some(n);
2257            }
2258        }
2259        if let Ok(val) = std::env::var("WEBSOCKET_MAX_BYTES") {
2260            if val.to_lowercase() == "none" || val == "0" {
2261                self.websocket.max_bytes = None;
2262            } else if let Ok(n) = val.parse::<usize>() {
2263                self.websocket.max_bytes = Some(n);
2264            }
2265        }
2266        self.websocket.disconnect_on_buffer_full = parse_bool_env(
2267            "WEBSOCKET_DISCONNECT_ON_BUFFER_FULL",
2268            self.websocket.disconnect_on_buffer_full,
2269        );
2270        self.websocket.max_message_size = parse_env::<usize>(
2271            "WEBSOCKET_MAX_MESSAGE_SIZE",
2272            self.websocket.max_message_size,
2273        );
2274        self.websocket.max_frame_size =
2275            parse_env::<usize>("WEBSOCKET_MAX_FRAME_SIZE", self.websocket.max_frame_size);
2276        self.websocket.write_buffer_size = parse_env::<usize>(
2277            "WEBSOCKET_WRITE_BUFFER_SIZE",
2278            self.websocket.write_buffer_size,
2279        );
2280        self.websocket.max_backpressure = parse_env::<usize>(
2281            "WEBSOCKET_MAX_BACKPRESSURE",
2282            self.websocket.max_backpressure,
2283        );
2284        self.websocket.auto_ping = parse_bool_env("WEBSOCKET_AUTO_PING", self.websocket.auto_ping);
2285        self.websocket.ping_interval =
2286            parse_env::<u32>("WEBSOCKET_PING_INTERVAL", self.websocket.ping_interval);
2287        self.websocket.idle_timeout =
2288            parse_env::<u32>("WEBSOCKET_IDLE_TIMEOUT", self.websocket.idle_timeout);
2289        if let Ok(mode) = std::env::var("WEBSOCKET_COMPRESSION") {
2290            self.websocket.compression = mode;
2291        }
2292
2293        Ok(())
2294    }
2295
2296    pub fn validate(&self) -> Result<(), String> {
2297        if self.unix_socket.enabled {
2298            if self.unix_socket.path.is_empty() {
2299                return Err(
2300                    "Unix socket path cannot be empty when Unix socket is enabled".to_string(),
2301                );
2302            }
2303
2304            self.validate_unix_socket_security()?;
2305
2306            if self.ssl.enabled {
2307                tracing::warn!(
2308                    "Both Unix socket and SSL are enabled. This is unusual as Unix sockets are typically used behind reverse proxies that handle SSL termination."
2309                );
2310            }
2311
2312            if self.unix_socket.permission_mode > 0o777 {
2313                return Err(format!(
2314                    "Unix socket permission_mode ({:o}) is invalid. Must be a valid octal mode (0o000 to 0o777)",
2315                    self.unix_socket.permission_mode
2316                ));
2317            }
2318        }
2319
2320        if let Err(e) = self.cleanup.validate() {
2321            return Err(format!("Invalid cleanup configuration: {}", e));
2322        }
2323
2324        Ok(())
2325    }
2326
2327    fn validate_unix_socket_security(&self) -> Result<(), String> {
2328        let path = &self.unix_socket.path;
2329
2330        if path.contains("../") || path.contains("..\\") {
2331            return Err(
2332                "Unix socket path contains directory traversal sequences (../). This is not allowed for security reasons.".to_string()
2333            );
2334        }
2335
2336        if self.unix_socket.permission_mode & 0o002 != 0 {
2337            warn!(
2338                "Unix socket permission mode ({:o}) allows world write access. This may be a security risk. Consider using more restrictive permissions like 0o660 or 0o750.",
2339                self.unix_socket.permission_mode
2340            );
2341        }
2342
2343        if self.unix_socket.permission_mode & 0o007 > 0o005 {
2344            warn!(
2345                "Unix socket permission mode ({:o}) grants write permissions to others. Consider using more restrictive permissions.",
2346                self.unix_socket.permission_mode
2347            );
2348        }
2349
2350        if self.mode == "production" && path.starts_with("/tmp/") {
2351            warn!(
2352                "Unix socket path '{}' is in /tmp directory. In production, consider using a more permanent location like /var/run/sockudo/ for better security and persistence.",
2353                path
2354            );
2355        }
2356
2357        if !path.starts_with('/') {
2358            return Err(
2359                "Unix socket path must be absolute (start with /) for security and reliability."
2360                    .to_string(),
2361            );
2362        }
2363
2364        Ok(())
2365    }
2366}
2367
2368#[cfg(test)]
2369mod redis_connection_tests {
2370    use super::{ClusterNode, RedisClusterConnection, RedisConnection, RedisSentinel};
2371
2372    #[test]
2373    fn test_standard_url_basic() {
2374        let conn = RedisConnection {
2375            host: "127.0.0.1".to_string(),
2376            port: 6379,
2377            db: 0,
2378            username: None,
2379            password: None,
2380            key_prefix: "sockudo:".to_string(),
2381            sentinels: Vec::new(),
2382            sentinel_password: None,
2383            name: "mymaster".to_string(),
2384            cluster: RedisClusterConnection::default(),
2385            cluster_nodes: Vec::new(),
2386        };
2387        assert_eq!(conn.to_url(), "redis://127.0.0.1:6379/0");
2388    }
2389
2390    #[test]
2391    fn test_standard_url_with_password() {
2392        let conn = RedisConnection {
2393            host: "127.0.0.1".to_string(),
2394            port: 6379,
2395            db: 2,
2396            username: None,
2397            password: Some("secret".to_string()),
2398            key_prefix: "sockudo:".to_string(),
2399            sentinels: Vec::new(),
2400            sentinel_password: None,
2401            name: "mymaster".to_string(),
2402            cluster: RedisClusterConnection::default(),
2403            cluster_nodes: Vec::new(),
2404        };
2405        assert_eq!(conn.to_url(), "redis://:secret@127.0.0.1:6379/2");
2406    }
2407
2408    #[test]
2409    fn test_standard_url_with_username_and_password() {
2410        let conn = RedisConnection {
2411            host: "redis.example.com".to_string(),
2412            port: 6380,
2413            db: 1,
2414            username: Some("admin".to_string()),
2415            password: Some("pass123".to_string()),
2416            key_prefix: "sockudo:".to_string(),
2417            sentinels: Vec::new(),
2418            sentinel_password: None,
2419            name: "mymaster".to_string(),
2420            cluster: RedisClusterConnection::default(),
2421            cluster_nodes: Vec::new(),
2422        };
2423        assert_eq!(
2424            conn.to_url(),
2425            "redis://admin:pass123@redis.example.com:6380/1"
2426        );
2427    }
2428
2429    #[test]
2430    fn test_standard_url_with_special_chars_in_password() {
2431        let conn = RedisConnection {
2432            host: "127.0.0.1".to_string(),
2433            port: 6379,
2434            db: 0,
2435            username: None,
2436            password: Some("pass@word#123".to_string()),
2437            key_prefix: "sockudo:".to_string(),
2438            sentinels: Vec::new(),
2439            sentinel_password: None,
2440            name: "mymaster".to_string(),
2441            cluster: RedisClusterConnection::default(),
2442            cluster_nodes: Vec::new(),
2443        };
2444        assert_eq!(conn.to_url(), "redis://:pass%40word%23123@127.0.0.1:6379/0");
2445    }
2446
2447    #[test]
2448    fn test_is_sentinel_configured_false() {
2449        let conn = RedisConnection::default();
2450        assert!(!conn.is_sentinel_configured());
2451    }
2452
2453    #[test]
2454    fn test_is_sentinel_configured_true() {
2455        let conn = RedisConnection {
2456            sentinels: vec![RedisSentinel {
2457                host: "sentinel1".to_string(),
2458                port: 26379,
2459            }],
2460            ..Default::default()
2461        };
2462        assert!(conn.is_sentinel_configured());
2463    }
2464
2465    #[test]
2466    fn test_sentinel_url_basic() {
2467        let conn = RedisConnection {
2468            host: "127.0.0.1".to_string(),
2469            port: 6379,
2470            db: 0,
2471            username: None,
2472            password: None,
2473            key_prefix: "sockudo:".to_string(),
2474            sentinels: vec![
2475                RedisSentinel {
2476                    host: "sentinel1".to_string(),
2477                    port: 26379,
2478                },
2479                RedisSentinel {
2480                    host: "sentinel2".to_string(),
2481                    port: 26379,
2482                },
2483            ],
2484            sentinel_password: None,
2485            name: "mymaster".to_string(),
2486            cluster: RedisClusterConnection::default(),
2487            cluster_nodes: Vec::new(),
2488        };
2489        assert_eq!(
2490            conn.to_url(),
2491            "redis+sentinel://sentinel1:26379,sentinel2:26379/mymaster/0"
2492        );
2493    }
2494
2495    #[test]
2496    fn test_sentinel_url_with_sentinel_password() {
2497        let conn = RedisConnection {
2498            host: "127.0.0.1".to_string(),
2499            port: 6379,
2500            db: 0,
2501            username: None,
2502            password: None,
2503            key_prefix: "sockudo:".to_string(),
2504            sentinels: vec![RedisSentinel {
2505                host: "sentinel1".to_string(),
2506                port: 26379,
2507            }],
2508            sentinel_password: Some("sentinelpass".to_string()),
2509            name: "mymaster".to_string(),
2510            cluster: RedisClusterConnection::default(),
2511            cluster_nodes: Vec::new(),
2512        };
2513        assert_eq!(
2514            conn.to_url(),
2515            "redis+sentinel://:sentinelpass@sentinel1:26379/mymaster/0"
2516        );
2517    }
2518
2519    #[test]
2520    fn test_sentinel_url_with_master_password() {
2521        let conn = RedisConnection {
2522            host: "127.0.0.1".to_string(),
2523            port: 6379,
2524            db: 1,
2525            username: None,
2526            password: Some("masterpass".to_string()),
2527            key_prefix: "sockudo:".to_string(),
2528            sentinels: vec![RedisSentinel {
2529                host: "sentinel1".to_string(),
2530                port: 26379,
2531            }],
2532            sentinel_password: None,
2533            name: "mymaster".to_string(),
2534            cluster: RedisClusterConnection::default(),
2535            cluster_nodes: Vec::new(),
2536        };
2537        assert_eq!(
2538            conn.to_url(),
2539            "redis+sentinel://sentinel1:26379/mymaster/1?password=masterpass"
2540        );
2541    }
2542
2543    #[test]
2544    fn test_sentinel_url_with_all_auth() {
2545        let conn = RedisConnection {
2546            host: "127.0.0.1".to_string(),
2547            port: 6379,
2548            db: 2,
2549            username: Some("redisuser".to_string()),
2550            password: Some("redispass".to_string()),
2551            key_prefix: "sockudo:".to_string(),
2552            sentinels: vec![
2553                RedisSentinel {
2554                    host: "sentinel1".to_string(),
2555                    port: 26379,
2556                },
2557                RedisSentinel {
2558                    host: "sentinel2".to_string(),
2559                    port: 26380,
2560                },
2561            ],
2562            sentinel_password: Some("sentinelauth".to_string()),
2563            name: "production-master".to_string(),
2564            cluster: RedisClusterConnection::default(),
2565            cluster_nodes: Vec::new(),
2566        };
2567        assert_eq!(
2568            conn.to_url(),
2569            "redis+sentinel://:sentinelauth@sentinel1:26379,sentinel2:26380/production-master/2?password=redispass&username=redisuser"
2570        );
2571    }
2572
2573    #[test]
2574    fn test_sentinel_to_host_port() {
2575        let sentinel = RedisSentinel {
2576            host: "sentinel.example.com".to_string(),
2577            port: 26379,
2578        };
2579        assert_eq!(sentinel.to_host_port(), "sentinel.example.com:26379");
2580    }
2581
2582    #[test]
2583    fn test_cluster_node_urls_with_shared_cluster_auth_and_tls() {
2584        let conn = RedisConnection {
2585            cluster: RedisClusterConnection {
2586                nodes: vec![
2587                    ClusterNode {
2588                        host: "node1.secure-cluster.com".to_string(),
2589                        port: 7000,
2590                    },
2591                    ClusterNode {
2592                        host: "redis://node2.secure-cluster.com:7001".to_string(),
2593                        port: 7001,
2594                    },
2595                    ClusterNode {
2596                        host: "rediss://node3.secure-cluster.com".to_string(),
2597                        port: 7002,
2598                    },
2599                ],
2600                username: None,
2601                password: Some("cluster-secret".to_string()),
2602                use_tls: true,
2603            },
2604            ..Default::default()
2605        };
2606
2607        assert_eq!(
2608            conn.cluster_node_urls(),
2609            vec![
2610                "rediss://:cluster-secret@node1.secure-cluster.com:7000",
2611                "redis://:cluster-secret@node2.secure-cluster.com:7001",
2612                "rediss://:cluster-secret@node3.secure-cluster.com:7002",
2613            ]
2614        );
2615    }
2616
2617    #[test]
2618    fn test_cluster_node_urls_fallback_to_legacy_nodes() {
2619        let conn = RedisConnection {
2620            password: Some("fallback-secret".to_string()),
2621            cluster_nodes: vec![ClusterNode {
2622                host: "legacy-node.example.com".to_string(),
2623                port: 7000,
2624            }],
2625            ..Default::default()
2626        };
2627
2628        assert_eq!(
2629            conn.cluster_node_urls(),
2630            vec!["redis://:fallback-secret@legacy-node.example.com:7000"]
2631        );
2632    }
2633
2634    #[test]
2635    fn test_normalize_cluster_seed_urls() {
2636        let conn = RedisConnection {
2637            cluster: RedisClusterConnection {
2638                nodes: Vec::new(),
2639                username: Some("svc-user".to_string()),
2640                password: Some("svc-pass".to_string()),
2641                use_tls: true,
2642            },
2643            ..Default::default()
2644        };
2645
2646        let seeds = vec![
2647            "node1.example.com:7000".to_string(),
2648            "redis://node2.example.com:7001".to_string(),
2649            "rediss://node3.example.com".to_string(),
2650        ];
2651
2652        assert_eq!(
2653            conn.normalize_cluster_seed_urls(&seeds),
2654            vec![
2655                "rediss://svc-user:svc-pass@node1.example.com:7000",
2656                "redis://svc-user:svc-pass@node2.example.com:7001",
2657                "rediss://svc-user:svc-pass@node3.example.com:6379",
2658            ]
2659        );
2660    }
2661}
2662
2663#[cfg(test)]
2664mod cluster_node_tests {
2665    use super::ClusterNode;
2666
2667    #[test]
2668    fn test_to_url_basic_host() {
2669        let node = ClusterNode {
2670            host: "localhost".to_string(),
2671            port: 6379,
2672        };
2673        assert_eq!(node.to_url(), "redis://localhost:6379");
2674    }
2675
2676    #[test]
2677    fn test_to_url_ip_address() {
2678        let node = ClusterNode {
2679            host: "127.0.0.1".to_string(),
2680            port: 6379,
2681        };
2682        assert_eq!(node.to_url(), "redis://127.0.0.1:6379");
2683    }
2684
2685    #[test]
2686    fn test_to_url_with_redis_protocol() {
2687        let node = ClusterNode {
2688            host: "redis://example.com".to_string(),
2689            port: 6379,
2690        };
2691        assert_eq!(node.to_url(), "redis://example.com:6379");
2692    }
2693
2694    #[test]
2695    fn test_to_url_with_rediss_protocol() {
2696        let node = ClusterNode {
2697            host: "rediss://secure.example.com".to_string(),
2698            port: 6379,
2699        };
2700        assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
2701    }
2702
2703    #[test]
2704    fn test_to_url_with_rediss_protocol_and_port_in_url() {
2705        let node = ClusterNode {
2706            host: "rediss://secure.example.com:7000".to_string(),
2707            port: 6379,
2708        };
2709        assert_eq!(node.to_url(), "rediss://secure.example.com:7000");
2710    }
2711
2712    #[test]
2713    fn test_to_url_with_redis_protocol_and_port_in_url() {
2714        let node = ClusterNode {
2715            host: "redis://example.com:7001".to_string(),
2716            port: 6379,
2717        };
2718        assert_eq!(node.to_url(), "redis://example.com:7001");
2719    }
2720
2721    #[test]
2722    fn test_to_url_with_trailing_whitespace() {
2723        let node = ClusterNode {
2724            host: "  rediss://secure.example.com  ".to_string(),
2725            port: 6379,
2726        };
2727        assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
2728    }
2729
2730    #[test]
2731    fn test_to_url_custom_port() {
2732        let node = ClusterNode {
2733            host: "redis-cluster.example.com".to_string(),
2734            port: 7000,
2735        };
2736        assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7000");
2737    }
2738
2739    #[test]
2740    fn test_to_url_plain_host_with_port_in_host_field() {
2741        let node = ClusterNode {
2742            host: "redis-cluster.example.com:7010".to_string(),
2743            port: 7000,
2744        };
2745        assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7010");
2746    }
2747
2748    #[test]
2749    fn test_to_url_with_options_adds_auth_and_tls() {
2750        let node = ClusterNode {
2751            host: "node.example.com".to_string(),
2752            port: 7000,
2753        };
2754        assert_eq!(
2755            node.to_url_with_options(true, Some("svc-user"), Some("secret")),
2756            "rediss://svc-user:secret@node.example.com:7000"
2757        );
2758    }
2759
2760    #[test]
2761    fn test_to_url_with_options_keeps_embedded_auth() {
2762        let node = ClusterNode {
2763            host: "rediss://:node-secret@node.example.com:7000".to_string(),
2764            port: 7000,
2765        };
2766        assert_eq!(
2767            node.to_url_with_options(true, Some("global-user"), Some("global-secret")),
2768            "rediss://:node-secret@node.example.com:7000"
2769        );
2770    }
2771
2772    #[test]
2773    fn test_from_seed_parses_plain_host_port() {
2774        let node = ClusterNode::from_seed("cluster-node-1:7005").expect("node should parse");
2775        assert_eq!(node.host, "cluster-node-1");
2776        assert_eq!(node.port, 7005);
2777    }
2778
2779    #[test]
2780    fn test_from_seed_keeps_scheme_urls() {
2781        let node =
2782            ClusterNode::from_seed("rediss://secure.example.com:7005").expect("node should parse");
2783        assert_eq!(node.host, "rediss://secure.example.com:7005");
2784        assert_eq!(node.port, 7005);
2785    }
2786
2787    #[test]
2788    fn test_to_url_aws_elasticache_hostname() {
2789        let node = ClusterNode {
2790            host: "rediss://my-cluster.use1.cache.amazonaws.com".to_string(),
2791            port: 6379,
2792        };
2793        assert_eq!(
2794            node.to_url(),
2795            "rediss://my-cluster.use1.cache.amazonaws.com:6379"
2796        );
2797    }
2798
2799    #[test]
2800    fn test_to_url_with_ipv6_no_port() {
2801        let node = ClusterNode {
2802            host: "rediss://[::1]".to_string(),
2803            port: 6379,
2804        };
2805        assert_eq!(node.to_url(), "rediss://[::1]:6379");
2806    }
2807
2808    #[test]
2809    fn test_to_url_with_ipv6_and_port_in_url() {
2810        let node = ClusterNode {
2811            host: "rediss://[::1]:7000".to_string(),
2812            port: 6379,
2813        };
2814        assert_eq!(node.to_url(), "rediss://[::1]:7000");
2815    }
2816
2817    #[test]
2818    fn test_to_url_with_ipv6_full_address_no_port() {
2819        let node = ClusterNode {
2820            host: "rediss://[2001:db8::1]".to_string(),
2821            port: 6379,
2822        };
2823        assert_eq!(node.to_url(), "rediss://[2001:db8::1]:6379");
2824    }
2825
2826    #[test]
2827    fn test_to_url_with_ipv6_full_address_with_port() {
2828        let node = ClusterNode {
2829            host: "rediss://[2001:db8::1]:7000".to_string(),
2830            port: 6379,
2831        };
2832        assert_eq!(node.to_url(), "rediss://[2001:db8::1]:7000");
2833    }
2834
2835    #[test]
2836    fn test_to_url_with_redis_protocol_ipv6() {
2837        let node = ClusterNode {
2838            host: "redis://[::1]".to_string(),
2839            port: 6379,
2840        };
2841        assert_eq!(node.to_url(), "redis://[::1]:6379");
2842    }
2843}