Skip to main content

sockudo_core/
options.rs

1use crate::app::App;
2use crate::utils::{parse_bool_env, parse_env, parse_env_optional};
3use ahash::AHashMap;
4use serde::{Deserialize, Serialize};
5use std::str::FromStr;
6use tracing::{info, warn};
7use url::Url;
8
9// Custom deserializer for octal permission mode (string format only, like chmod)
10fn deserialize_octal_permission<'de, D>(deserializer: D) -> Result<u32, D::Error>
11where
12    D: serde::Deserializer<'de>,
13{
14    use serde::de::{self};
15
16    let s = String::deserialize(deserializer)?;
17
18    // Validate that the string contains only octal digits
19    if !s.chars().all(|c| c.is_digit(8)) {
20        return Err(de::Error::custom(format!(
21            "invalid octal permission mode '{}': must contain only digits 0-7",
22            s
23        )));
24    }
25
26    // Parse as octal
27    let mode = u32::from_str_radix(&s, 8)
28        .map_err(|_| de::Error::custom(format!("invalid octal permission mode: {}", s)))?;
29
30    // Validate it's within valid Unix permission range
31    if mode > 0o777 {
32        return Err(de::Error::custom(format!(
33            "permission mode '{}' exceeds maximum value 777",
34            s
35        )));
36    }
37
38    Ok(mode)
39}
40
41// Helper function to parse driver enums with fallback behavior (matches main.rs)
42fn parse_driver_enum<T: FromStr + Clone + std::fmt::Debug>(
43    driver_str: String,
44    default_driver: T,
45    driver_name: &str,
46) -> T
47where
48    <T as FromStr>::Err: std::fmt::Debug,
49{
50    match T::from_str(&driver_str.to_lowercase()) {
51        Ok(driver_enum) => driver_enum,
52        Err(e) => {
53            warn!(
54                "Failed to parse {} driver from string '{}': {:?}. Using default: {:?}.",
55                driver_name, driver_str, e, default_driver
56            );
57            default_driver
58        }
59    }
60}
61
62fn override_db_pool_settings(db_conn: &mut DatabaseConnection, prefix: &str) {
63    if let Some(min) = parse_env_optional::<u32>(&format!("{}_POOL_MIN", prefix)) {
64        db_conn.pool_min = Some(min);
65    }
66    if let Some(max) = parse_env_optional::<u32>(&format!("{}_POOL_MAX", prefix)) {
67        db_conn.pool_max = Some(max);
68    }
69}
70
71// --- Enums for Driver Types ---
72
73#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(rename_all = "lowercase")]
75pub enum AdapterDriver {
76    #[default]
77    Local,
78    Redis,
79    #[serde(rename = "redis-cluster")]
80    RedisCluster,
81    Nats,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
85#[serde(default)]
86pub struct DynamoDbSettings {
87    pub region: String,
88    pub table_name: String,
89    pub endpoint_url: Option<String>,
90    pub aws_access_key_id: Option<String>,
91    pub aws_secret_access_key: Option<String>,
92    pub aws_profile_name: Option<String>,
93}
94
95impl Default for DynamoDbSettings {
96    fn default() -> Self {
97        Self {
98            region: "us-east-1".to_string(),
99            table_name: "sockudo-applications".to_string(),
100            endpoint_url: None,
101            aws_access_key_id: None,
102            aws_secret_access_key: None,
103            aws_profile_name: None,
104        }
105    }
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
109#[serde(default)]
110pub struct ScyllaDbSettings {
111    pub nodes: Vec<String>,
112    pub keyspace: String,
113    pub table_name: String,
114    pub username: Option<String>,
115    pub password: Option<String>,
116    pub replication_class: String,
117    pub replication_factor: u32,
118}
119
120impl Default for ScyllaDbSettings {
121    fn default() -> Self {
122        Self {
123            nodes: vec!["127.0.0.1:9042".to_string()],
124            keyspace: "sockudo".to_string(),
125            table_name: "applications".to_string(),
126            username: None,
127            password: None,
128            replication_class: "SimpleStrategy".to_string(),
129            replication_factor: 3,
130        }
131    }
132}
133
134impl FromStr for AdapterDriver {
135    type Err = String;
136    fn from_str(s: &str) -> Result<Self, Self::Err> {
137        match s.to_lowercase().as_str() {
138            "local" => Ok(AdapterDriver::Local),
139            "redis" => Ok(AdapterDriver::Redis),
140            "redis-cluster" => Ok(AdapterDriver::RedisCluster),
141            "nats" => Ok(AdapterDriver::Nats),
142            _ => Err(format!("Unknown adapter driver: {s}")),
143        }
144    }
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
148#[serde(rename_all = "lowercase")]
149pub enum AppManagerDriver {
150    #[default]
151    Memory,
152    Mysql,
153    Dynamodb,
154    PgSql,
155    ScyllaDb,
156}
157impl FromStr for AppManagerDriver {
158    type Err = String;
159    fn from_str(s: &str) -> Result<Self, Self::Err> {
160        match s.to_lowercase().as_str() {
161            "memory" => Ok(AppManagerDriver::Memory),
162            "mysql" => Ok(AppManagerDriver::Mysql),
163            "dynamodb" => Ok(AppManagerDriver::Dynamodb),
164            "pgsql" | "postgres" | "postgresql" => Ok(AppManagerDriver::PgSql),
165            "scylladb" | "scylla" => Ok(AppManagerDriver::ScyllaDb),
166            _ => Err(format!("Unknown app manager driver: {s}")),
167        }
168    }
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
172#[serde(rename_all = "lowercase")]
173pub enum CacheDriver {
174    #[default]
175    Memory,
176    Redis,
177    #[serde(rename = "redis-cluster")]
178    RedisCluster,
179    None,
180}
181
182impl FromStr for CacheDriver {
183    type Err = String;
184    fn from_str(s: &str) -> Result<Self, Self::Err> {
185        match s.to_lowercase().as_str() {
186            "memory" => Ok(CacheDriver::Memory),
187            "redis" => Ok(CacheDriver::Redis),
188            "redis-cluster" => Ok(CacheDriver::RedisCluster),
189            "none" => Ok(CacheDriver::None),
190            _ => Err(format!("Unknown cache driver: {s}")),
191        }
192    }
193}
194
195#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
196#[serde(rename_all = "lowercase")]
197pub enum QueueDriver {
198    Memory,
199    #[default]
200    Redis,
201    #[serde(rename = "redis-cluster")]
202    RedisCluster,
203    Sqs,
204    Sns,
205    None,
206}
207
208impl FromStr for QueueDriver {
209    type Err = String;
210    fn from_str(s: &str) -> Result<Self, Self::Err> {
211        match s.to_lowercase().as_str() {
212            "memory" => Ok(QueueDriver::Memory),
213            "redis" => Ok(QueueDriver::Redis),
214            "redis-cluster" => Ok(QueueDriver::RedisCluster),
215            "sqs" => Ok(QueueDriver::Sqs),
216            "sns" => Ok(QueueDriver::Sns),
217            "none" => Ok(QueueDriver::None),
218            _ => Err(format!("Unknown queue driver: {s}")),
219        }
220    }
221}
222
223impl AsRef<str> for QueueDriver {
224    fn as_ref(&self) -> &str {
225        match self {
226            QueueDriver::Memory => "memory",
227            QueueDriver::Redis => "redis",
228            QueueDriver::RedisCluster => "redis-cluster",
229            QueueDriver::Sqs => "sqs",
230            QueueDriver::Sns => "sns",
231            QueueDriver::None => "none",
232        }
233    }
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
237#[serde(default)]
238pub struct RedisClusterQueueConfig {
239    pub concurrency: u32,
240    pub prefix: Option<String>,
241    pub nodes: Vec<String>,
242    pub request_timeout_ms: u64,
243}
244
245impl Default for RedisClusterQueueConfig {
246    fn default() -> Self {
247        Self {
248            concurrency: 5,
249            prefix: Some("sockudo_queue:".to_string()),
250            nodes: vec!["redis://127.0.0.1:6379".to_string()],
251            request_timeout_ms: 5000,
252        }
253    }
254}
255
256#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
257#[serde(rename_all = "lowercase")]
258pub enum MetricsDriver {
259    #[default]
260    Prometheus,
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
264#[serde(rename_all = "lowercase")]
265pub enum LogOutputFormat {
266    #[default]
267    Human,
268    Json,
269}
270
271impl FromStr for LogOutputFormat {
272    type Err = String;
273    fn from_str(s: &str) -> Result<Self, Self::Err> {
274        match s.to_lowercase().as_str() {
275            "human" => Ok(LogOutputFormat::Human),
276            "json" => Ok(LogOutputFormat::Json),
277            _ => Err(format!("Unknown log output format: {s}")),
278        }
279    }
280}
281
282impl FromStr for MetricsDriver {
283    type Err = String;
284    fn from_str(s: &str) -> Result<Self, Self::Err> {
285        match s.to_lowercase().as_str() {
286            "prometheus" => Ok(MetricsDriver::Prometheus),
287            _ => Err(format!("Unknown metrics driver: {s}")),
288        }
289    }
290}
291
292impl AsRef<str> for MetricsDriver {
293    fn as_ref(&self) -> &str {
294        match self {
295            MetricsDriver::Prometheus => "prometheus",
296        }
297    }
298}
299
300// --- Main Configuration Struct ---
301#[derive(Debug, Clone, Serialize, Deserialize)]
302#[serde(default)]
303pub struct ServerOptions {
304    pub adapter: AdapterConfig,
305    pub app_manager: AppManagerConfig,
306    pub cache: CacheConfig,
307    pub channel_limits: ChannelLimits,
308    pub cors: CorsConfig,
309    pub database: DatabaseConfig,
310    pub database_pooling: DatabasePooling,
311    pub debug: bool,
312    pub event_limits: EventLimits,
313    pub host: String,
314    pub http_api: HttpApiConfig,
315    pub instance: InstanceConfig,
316    pub logging: Option<LoggingConfig>,
317    pub metrics: MetricsConfig,
318    pub mode: String,
319    pub port: u16,
320    pub path_prefix: String,
321    pub presence: PresenceConfig,
322    pub queue: QueueConfig,
323    pub rate_limiter: RateLimiterConfig,
324    pub shutdown_grace_period: u64,
325    pub ssl: SslConfig,
326    pub user_authentication_timeout: u64,
327    pub webhooks: WebhooksConfig,
328    pub websocket_max_payload_kb: u32,
329    pub cleanup: CleanupConfig,
330    pub activity_timeout: u64,
331    pub cluster_health: ClusterHealthConfig,
332    pub unix_socket: UnixSocketConfig,
333    pub delta_compression: DeltaCompressionOptionsConfig,
334    pub tag_filtering: TagFilteringConfig,
335    pub websocket: WebSocketConfig,
336}
337
338// --- Configuration Sub-Structs ---
339
340#[derive(Debug, Clone, Serialize, Deserialize)]
341#[serde(default)]
342pub struct SqsQueueConfig {
343    pub region: String,
344    pub queue_url_prefix: Option<String>,
345    pub visibility_timeout: i32,
346    pub endpoint_url: Option<String>,
347    pub max_messages: i32,
348    pub wait_time_seconds: i32,
349    pub concurrency: u32,
350    pub fifo: bool,
351    pub message_group_id: Option<String>,
352}
353
354#[derive(Debug, Clone, Serialize, Deserialize)]
355#[serde(default)]
356pub struct SnsQueueConfig {
357    pub region: String,
358    pub topic_arn: String,
359    pub endpoint_url: Option<String>,
360}
361
362#[derive(Debug, Clone, Serialize, Deserialize)]
363#[serde(default)]
364pub struct AdapterConfig {
365    pub driver: AdapterDriver,
366    pub redis: RedisAdapterConfig,
367    pub cluster: RedisClusterAdapterConfig,
368    pub nats: NatsAdapterConfig,
369    #[serde(default = "default_buffer_multiplier_per_cpu")]
370    pub buffer_multiplier_per_cpu: usize,
371    pub cluster_health: ClusterHealthConfig,
372    #[serde(default = "default_enable_socket_counting")]
373    pub enable_socket_counting: bool,
374}
375
376fn default_enable_socket_counting() -> bool {
377    true
378}
379
380fn default_buffer_multiplier_per_cpu() -> usize {
381    64
382}
383
384impl Default for AdapterConfig {
385    fn default() -> Self {
386        Self {
387            driver: AdapterDriver::default(),
388            redis: RedisAdapterConfig::default(),
389            cluster: RedisClusterAdapterConfig::default(),
390            nats: NatsAdapterConfig::default(),
391            buffer_multiplier_per_cpu: default_buffer_multiplier_per_cpu(),
392            cluster_health: ClusterHealthConfig::default(),
393            enable_socket_counting: default_enable_socket_counting(),
394        }
395    }
396}
397
398#[derive(Debug, Clone, Serialize, Deserialize)]
399#[serde(default)]
400pub struct RedisAdapterConfig {
401    pub requests_timeout: u64,
402    pub prefix: String,
403    pub redis_pub_options: AHashMap<String, sonic_rs::Value>,
404    pub redis_sub_options: AHashMap<String, sonic_rs::Value>,
405    pub cluster_mode: bool,
406}
407
408#[derive(Debug, Clone, Serialize, Deserialize)]
409#[serde(default)]
410pub struct RedisClusterAdapterConfig {
411    pub nodes: Vec<String>,
412    pub prefix: String,
413    pub request_timeout_ms: u64,
414    pub use_connection_manager: bool,
415    #[serde(default)]
416    pub use_sharded_pubsub: bool,
417}
418
419#[derive(Debug, Clone, Serialize, Deserialize)]
420#[serde(default)]
421pub struct NatsAdapterConfig {
422    pub servers: Vec<String>,
423    pub prefix: String,
424    pub request_timeout_ms: u64,
425    pub username: Option<String>,
426    pub password: Option<String>,
427    pub token: Option<String>,
428    pub connection_timeout_ms: u64,
429    pub nodes_number: Option<u32>,
430}
431
432#[derive(Debug, Clone, Serialize, Deserialize, Default)]
433#[serde(default)]
434pub struct AppManagerConfig {
435    pub driver: AppManagerDriver,
436    pub array: ArrayConfig,
437    pub cache: CacheSettings,
438    pub scylladb: ScyllaDbSettings,
439}
440
441#[derive(Debug, Clone, Serialize, Deserialize, Default)]
442#[serde(default)]
443pub struct ArrayConfig {
444    pub apps: Vec<App>,
445}
446
447#[derive(Debug, Clone, Serialize, Deserialize)]
448#[serde(default)]
449pub struct CacheSettings {
450    pub enabled: bool,
451    pub ttl: u64,
452}
453
454#[derive(Debug, Clone, Serialize, Deserialize)]
455#[serde(default)]
456pub struct MemoryCacheOptions {
457    pub ttl: u64,
458    pub cleanup_interval: u64,
459    pub max_capacity: u64,
460}
461
462#[derive(Debug, Clone, Serialize, Deserialize)]
463#[serde(default)]
464pub struct CacheConfig {
465    pub driver: CacheDriver,
466    pub redis: RedisConfig,
467    pub memory: MemoryCacheOptions,
468}
469
470#[derive(Debug, Clone, Serialize, Deserialize, Default)]
471#[serde(default)]
472pub struct RedisConfig {
473    pub prefix: Option<String>,
474    pub url_override: Option<String>,
475    pub cluster_mode: bool,
476}
477
478#[derive(Debug, Clone, Serialize, Deserialize)]
479#[serde(default)]
480pub struct ChannelLimits {
481    pub max_name_length: u32,
482    pub cache_ttl: u64,
483}
484
485#[derive(Debug, Clone, Serialize, Deserialize)]
486#[serde(default)]
487pub struct CorsConfig {
488    pub credentials: bool,
489    pub origin: Vec<String>,
490    pub methods: Vec<String>,
491    pub allowed_headers: Vec<String>,
492}
493
494#[derive(Debug, Clone, Serialize, Deserialize, Default)]
495#[serde(default)]
496pub struct DatabaseConfig {
497    pub mysql: DatabaseConnection,
498    pub postgres: DatabaseConnection,
499    pub redis: RedisConnection,
500    pub dynamodb: DynamoDbSettings,
501    pub scylladb: ScyllaDbSettings,
502}
503
504#[derive(Debug, Clone, Serialize, Deserialize)]
505#[serde(default)]
506pub struct DatabaseConnection {
507    pub host: String,
508    pub port: u16,
509    pub username: String,
510    pub password: String,
511    pub database: String,
512    pub table_name: String,
513    pub connection_pool_size: u32,
514    pub pool_min: Option<u32>,
515    pub pool_max: Option<u32>,
516    pub cache_ttl: u64,
517    pub cache_cleanup_interval: u64,
518    pub cache_max_capacity: u64,
519}
520
521#[derive(Debug, Clone, Serialize, Deserialize)]
522#[serde(default)]
523pub struct RedisConnection {
524    pub host: String,
525    pub port: u16,
526    pub db: u32,
527    pub username: Option<String>,
528    pub password: Option<String>,
529    pub key_prefix: String,
530    pub sentinels: Vec<RedisSentinel>,
531    pub sentinel_password: Option<String>,
532    pub name: String,
533    pub cluster: RedisClusterConnection,
534    /// Legacy field kept for backward compatibility. Prefer `database.redis.cluster.nodes`.
535    pub cluster_nodes: Vec<ClusterNode>,
536}
537
538#[derive(Debug, Clone, Serialize, Deserialize)]
539#[serde(default)]
540pub struct RedisSentinel {
541    pub host: String,
542    pub port: u16,
543}
544
545#[derive(Debug, Clone, Serialize, Deserialize, Default)]
546#[serde(default)]
547pub struct RedisClusterConnection {
548    pub nodes: Vec<ClusterNode>,
549    pub username: Option<String>,
550    pub password: Option<String>,
551    #[serde(alias = "useTLS")]
552    pub use_tls: bool,
553}
554
555impl RedisConnection {
556    /// Returns true if Redis Sentinel is configured.
557    pub fn is_sentinel_configured(&self) -> bool {
558        !self.sentinels.is_empty()
559    }
560
561    /// Builds a Redis connection URL based on the configuration.
562    pub fn to_url(&self) -> String {
563        if self.is_sentinel_configured() {
564            self.build_sentinel_url()
565        } else {
566            self.build_standard_url()
567        }
568    }
569
570    fn build_standard_url(&self) -> String {
571        // Extract scheme from host if present, otherwise default to redis://
572        let (scheme, host) = if self.host.starts_with("rediss://") {
573            ("rediss://", self.host.trim_start_matches("rediss://"))
574        } else if self.host.starts_with("redis://") {
575            ("redis://", self.host.trim_start_matches("redis://"))
576        } else {
577            ("redis://", self.host.as_str())
578        };
579
580        let mut url = String::from(scheme);
581
582        if let Some(ref username) = self.username {
583            url.push_str(username);
584            if let Some(ref password) = self.password {
585                url.push(':');
586                url.push_str(&urlencoding::encode(password));
587            }
588            url.push('@');
589        } else if let Some(ref password) = self.password {
590            url.push(':');
591            url.push_str(&urlencoding::encode(password));
592            url.push('@');
593        }
594
595        url.push_str(host);
596        url.push(':');
597        url.push_str(&self.port.to_string());
598        url.push('/');
599        url.push_str(&self.db.to_string());
600
601        url
602    }
603
604    fn build_sentinel_url(&self) -> String {
605        let mut url = String::from("redis+sentinel://");
606
607        if let Some(ref sentinel_password) = self.sentinel_password {
608            url.push(':');
609            url.push_str(&urlencoding::encode(sentinel_password));
610            url.push('@');
611        }
612
613        let sentinel_hosts: Vec<String> = self
614            .sentinels
615            .iter()
616            .map(|s| format!("{}:{}", s.host, s.port))
617            .collect();
618        url.push_str(&sentinel_hosts.join(","));
619
620        url.push('/');
621        url.push_str(&self.name);
622        url.push('/');
623        url.push_str(&self.db.to_string());
624
625        let mut params = Vec::new();
626        if let Some(ref password) = self.password {
627            params.push(format!("password={}", urlencoding::encode(password)));
628        }
629        if let Some(ref username) = self.username {
630            params.push(format!("username={}", urlencoding::encode(username)));
631        }
632
633        if !params.is_empty() {
634            url.push('?');
635            url.push_str(&params.join("&"));
636        }
637
638        url
639    }
640
641    /// Returns true when cluster nodes are configured via either the new (`cluster.nodes`)
642    /// or legacy (`cluster_nodes`) field.
643    pub fn has_cluster_nodes(&self) -> bool {
644        !self.cluster.nodes.is_empty() || !self.cluster_nodes.is_empty()
645    }
646
647    /// Returns normalized Redis Cluster seed URLs from the canonical cluster configuration.
648    /// Falls back to legacy `cluster_nodes` for backward compatibility.
649    pub fn cluster_node_urls(&self) -> Vec<String> {
650        if !self.cluster.nodes.is_empty() {
651            return self.build_cluster_urls(&self.cluster.nodes);
652        }
653        self.build_cluster_urls(&self.cluster_nodes)
654    }
655
656    /// Normalizes any list of seed strings (`host:port`, `redis://...`, `rediss://...`) using
657    /// shared cluster auth/TLS options.
658    pub fn normalize_cluster_seed_urls(&self, seeds: &[String]) -> Vec<String> {
659        self.build_cluster_urls(
660            &seeds
661                .iter()
662                .filter_map(|seed| ClusterNode::from_seed(seed))
663                .collect::<Vec<ClusterNode>>(),
664        )
665    }
666
667    fn build_cluster_urls(&self, nodes: &[ClusterNode]) -> Vec<String> {
668        let username = self
669            .cluster
670            .username
671            .as_deref()
672            .or(self.username.as_deref());
673        let password = self
674            .cluster
675            .password
676            .as_deref()
677            .or(self.password.as_deref());
678        let use_tls = self.cluster.use_tls;
679
680        nodes
681            .iter()
682            .map(|node| node.to_url_with_options(use_tls, username, password))
683            .collect()
684    }
685}
686
687impl RedisSentinel {
688    pub fn to_host_port(&self) -> String {
689        format!("{}:{}", self.host, self.port)
690    }
691}
692
693#[derive(Debug, Clone, Serialize, Deserialize)]
694#[serde(default)]
695pub struct ClusterNode {
696    pub host: String,
697    pub port: u16,
698}
699
700impl ClusterNode {
701    pub fn to_url(&self) -> String {
702        self.to_url_with_options(false, None, None)
703    }
704
705    pub fn to_url_with_options(
706        &self,
707        use_tls: bool,
708        username: Option<&str>,
709        password: Option<&str>,
710    ) -> String {
711        let host = self.host.trim();
712
713        if host.starts_with("redis://") || host.starts_with("rediss://") {
714            if let Ok(parsed) = Url::parse(host)
715                && let Some(host_str) = parsed.host_str()
716            {
717                let scheme = parsed.scheme();
718                let port = parsed.port_or_known_default().unwrap_or(self.port);
719                let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
720                let parsed_password = parsed.password();
721                let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
722                let (effective_username, effective_password) = if has_embedded_auth {
723                    (parsed_username, parsed_password)
724                } else {
725                    (username, password)
726                };
727
728                return build_redis_url(
729                    scheme,
730                    host_str,
731                    port,
732                    effective_username,
733                    effective_password,
734                );
735            }
736
737            // Fallback for malformed URLs
738            let has_port = if let Some(bracket_pos) = host.rfind(']') {
739                host[bracket_pos..].contains(':')
740            } else {
741                host.split(':').count() >= 3
742            };
743            let base = if has_port {
744                host.to_string()
745            } else {
746                format!("{}:{}", host, self.port)
747            };
748
749            if let Ok(parsed) = Url::parse(&base) {
750                let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
751                let parsed_password = parsed.password();
752                if let Some(host_str) = parsed.host_str() {
753                    let port = parsed.port_or_known_default().unwrap_or(self.port);
754                    let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
755                    let (effective_username, effective_password) = if has_embedded_auth {
756                        (parsed_username, parsed_password)
757                    } else {
758                        (username, password)
759                    };
760                    return build_redis_url(
761                        parsed.scheme(),
762                        host_str,
763                        port,
764                        effective_username,
765                        effective_password,
766                    );
767                }
768            }
769            return base;
770        }
771
772        let (normalized_host, normalized_port) = split_plain_host_and_port(host, self.port);
773        let scheme = if use_tls { "rediss" } else { "redis" };
774        build_redis_url(
775            scheme,
776            &normalized_host,
777            normalized_port,
778            username,
779            password,
780        )
781    }
782
783    pub fn from_seed(seed: &str) -> Option<Self> {
784        let trimmed = seed.trim();
785        if trimmed.is_empty() {
786            return None;
787        }
788
789        if trimmed.starts_with("redis://") || trimmed.starts_with("rediss://") {
790            let port = Url::parse(trimmed)
791                .ok()
792                .and_then(|parsed| parsed.port_or_known_default())
793                .unwrap_or(6379);
794            return Some(Self {
795                host: trimmed.to_string(),
796                port,
797            });
798        }
799
800        let (host, port) = split_plain_host_and_port(trimmed, 6379);
801        Some(Self { host, port })
802    }
803}
804
805fn split_plain_host_and_port(raw_host: &str, default_port: u16) -> (String, u16) {
806    let host = raw_host.trim();
807
808    // Handle bracketed IPv6: [::1]:6379
809    if host.starts_with('[') {
810        if let Some(end_bracket) = host.find(']') {
811            let host_part = host[1..end_bracket].to_string();
812            let remainder = &host[end_bracket + 1..];
813            if let Some(port_str) = remainder.strip_prefix(':')
814                && let Ok(port) = port_str.parse::<u16>()
815            {
816                return (host_part, port);
817            }
818            return (host_part, default_port);
819        }
820        return (host.to_string(), default_port);
821    }
822
823    // Handle hostname/IP with port: host:6379
824    if host.matches(':').count() == 1
825        && let Some((host_part, port_part)) = host.rsplit_once(':')
826        && let Ok(port) = port_part.parse::<u16>()
827    {
828        return (host_part.to_string(), port);
829    }
830
831    (host.to_string(), default_port)
832}
833
834fn build_redis_url(
835    scheme: &str,
836    host: &str,
837    port: u16,
838    username: Option<&str>,
839    password: Option<&str>,
840) -> String {
841    let mut url = format!("{scheme}://");
842
843    if let Some(user) = username {
844        url.push_str(&urlencoding::encode(user));
845        if let Some(pass) = password {
846            url.push(':');
847            url.push_str(&urlencoding::encode(pass));
848        }
849        url.push('@');
850    } else if let Some(pass) = password {
851        url.push(':');
852        url.push_str(&urlencoding::encode(pass));
853        url.push('@');
854    }
855
856    if host.contains(':') && !host.starts_with('[') {
857        url.push('[');
858        url.push_str(host);
859        url.push(']');
860    } else {
861        url.push_str(host);
862    }
863    url.push(':');
864    url.push_str(&port.to_string());
865    url
866}
867
868#[derive(Debug, Clone, Serialize, Deserialize)]
869#[serde(default)]
870pub struct DatabasePooling {
871    pub enabled: bool,
872    pub min: u32,
873    pub max: u32,
874}
875
876#[derive(Debug, Clone, Serialize, Deserialize)]
877#[serde(default)]
878pub struct EventLimits {
879    pub max_channels_at_once: u32,
880    pub max_name_length: u32,
881    pub max_payload_in_kb: u32,
882    pub max_batch_size: u32,
883}
884
885#[derive(Debug, Clone, Serialize, Deserialize)]
886#[serde(default)]
887pub struct HttpApiConfig {
888    pub request_limit_in_mb: u32,
889    pub accept_traffic: AcceptTraffic,
890}
891
892#[derive(Debug, Clone, Serialize, Deserialize)]
893#[serde(default)]
894pub struct AcceptTraffic {
895    pub memory_threshold: f64,
896}
897
898#[derive(Debug, Clone, Serialize, Deserialize)]
899#[serde(default)]
900pub struct InstanceConfig {
901    pub process_id: String,
902}
903
904#[derive(Debug, Clone, Serialize, Deserialize)]
905#[serde(default)]
906pub struct MetricsConfig {
907    pub enabled: bool,
908    pub driver: MetricsDriver,
909    pub host: String,
910    pub prometheus: PrometheusConfig,
911    pub port: u16,
912}
913
914#[derive(Debug, Clone, Serialize, Deserialize)]
915#[serde(default)]
916pub struct PrometheusConfig {
917    pub prefix: String,
918}
919
920#[derive(Debug, Clone, Serialize, Deserialize)]
921#[serde(default)]
922pub struct LoggingConfig {
923    pub colors_enabled: bool,
924    pub include_target: bool,
925}
926
927#[derive(Debug, Clone, Serialize, Deserialize)]
928#[serde(default)]
929pub struct PresenceConfig {
930    pub max_members_per_channel: u32,
931    pub max_member_size_in_kb: u32,
932}
933
934/// WebSocket connection buffer configuration
935/// Controls backpressure handling for slow consumers
936#[derive(Debug, Clone, Serialize, Deserialize)]
937#[serde(default)]
938pub struct WebSocketConfig {
939    pub max_messages: Option<usize>,
940    pub max_bytes: Option<usize>,
941    pub disconnect_on_buffer_full: bool,
942    pub max_message_size: usize,
943    pub max_frame_size: usize,
944    pub write_buffer_size: usize,
945    pub max_backpressure: usize,
946    pub auto_ping: bool,
947    pub ping_interval: u32,
948    pub idle_timeout: u32,
949    pub compression: String,
950}
951
952impl Default for WebSocketConfig {
953    fn default() -> Self {
954        Self {
955            max_messages: Some(1000),
956            max_bytes: None,
957            disconnect_on_buffer_full: true,
958            max_message_size: 64 * 1024 * 1024,
959            max_frame_size: 16 * 1024 * 1024,
960            write_buffer_size: 16 * 1024,
961            max_backpressure: 1024 * 1024,
962            auto_ping: true,
963            ping_interval: 30,
964            idle_timeout: 120,
965            compression: "disabled".to_string(),
966        }
967    }
968}
969
970impl WebSocketConfig {
971    /// Convert to WebSocketBufferConfig for runtime use
972    pub fn to_buffer_config(&self) -> crate::websocket::WebSocketBufferConfig {
973        use crate::websocket::{BufferLimit, WebSocketBufferConfig};
974
975        let limit = match (self.max_messages, self.max_bytes) {
976            (Some(messages), Some(bytes)) => BufferLimit::Both { messages, bytes },
977            (Some(messages), None) => BufferLimit::Messages(messages),
978            (None, Some(bytes)) => BufferLimit::Bytes(bytes),
979            (None, None) => BufferLimit::Messages(1000),
980        };
981
982        WebSocketBufferConfig {
983            limit,
984            disconnect_on_full: self.disconnect_on_buffer_full,
985        }
986    }
987
988    /// Convert to native sockudo-ws runtime configuration.
989    pub fn to_sockudo_ws_config(
990        &self,
991        websocket_max_payload_kb: u32,
992        activity_timeout: u64,
993    ) -> sockudo_ws::Config {
994        use sockudo_ws::Compression;
995
996        let compression = match self.compression.to_lowercase().as_str() {
997            "dedicated" => Compression::Dedicated,
998            "shared" => Compression::Shared,
999            "window256b" => Compression::Window256B,
1000            "window1kb" => Compression::Window1KB,
1001            "window2kb" => Compression::Window2KB,
1002            "window4kb" => Compression::Window4KB,
1003            "window8kb" => Compression::Window8KB,
1004            "window16kb" => Compression::Window16KB,
1005            "window32kb" => Compression::Window32KB,
1006            _ => Compression::Disabled,
1007        };
1008
1009        sockudo_ws::Config::builder()
1010            .max_payload_length(
1011                self.max_bytes
1012                    .unwrap_or(websocket_max_payload_kb as usize * 1024),
1013            )
1014            .max_message_size(self.max_message_size)
1015            .max_frame_size(self.max_frame_size)
1016            .write_buffer_size(self.write_buffer_size)
1017            .max_backpressure(self.max_backpressure)
1018            .idle_timeout(self.idle_timeout)
1019            .auto_ping(self.auto_ping)
1020            .ping_interval(self.ping_interval.max((activity_timeout / 2).max(5) as u32))
1021            .compression(compression)
1022            .build()
1023    }
1024}
1025
1026#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1027#[serde(default)]
1028pub struct QueueConfig {
1029    pub driver: QueueDriver,
1030    pub redis: RedisQueueConfig,
1031    pub redis_cluster: RedisClusterQueueConfig,
1032    pub sqs: SqsQueueConfig,
1033    pub sns: SnsQueueConfig,
1034}
1035
1036#[derive(Debug, Clone, Serialize, Deserialize)]
1037#[serde(default)]
1038pub struct RedisQueueConfig {
1039    pub concurrency: u32,
1040    pub prefix: Option<String>,
1041    pub url_override: Option<String>,
1042    pub cluster_mode: bool,
1043}
1044
1045#[derive(Clone, Debug, Serialize, Deserialize)]
1046#[serde(default)]
1047pub struct RateLimit {
1048    pub max_requests: u32,
1049    pub window_seconds: u64,
1050    pub identifier: Option<String>,
1051    pub trust_hops: Option<u32>,
1052}
1053
1054#[derive(Debug, Clone, Serialize, Deserialize)]
1055#[serde(default)]
1056pub struct RateLimiterConfig {
1057    pub enabled: bool,
1058    pub driver: CacheDriver,
1059    pub api_rate_limit: RateLimit,
1060    pub websocket_rate_limit: RateLimit,
1061    pub redis: RedisConfig,
1062}
1063
1064#[derive(Debug, Clone, Serialize, Deserialize)]
1065#[serde(default)]
1066pub struct SslConfig {
1067    pub enabled: bool,
1068    pub cert_path: String,
1069    pub key_path: String,
1070    pub passphrase: Option<String>,
1071    pub ca_path: Option<String>,
1072    pub redirect_http: bool,
1073    pub http_port: Option<u16>,
1074}
1075
1076#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1077#[serde(default)]
1078pub struct WebhooksConfig {
1079    pub batching: BatchingConfig,
1080}
1081
1082#[derive(Debug, Clone, Serialize, Deserialize)]
1083#[serde(default)]
1084pub struct BatchingConfig {
1085    pub enabled: bool,
1086    pub duration: u64,
1087    pub size: usize,
1088}
1089
1090#[derive(Debug, Clone, Serialize, Deserialize)]
1091#[serde(default)]
1092pub struct ClusterHealthConfig {
1093    pub enabled: bool,
1094    pub heartbeat_interval_ms: u64,
1095    pub node_timeout_ms: u64,
1096    pub cleanup_interval_ms: u64,
1097}
1098
1099#[derive(Debug, Clone, Serialize, Deserialize)]
1100#[serde(default)]
1101pub struct UnixSocketConfig {
1102    pub enabled: bool,
1103    pub path: String,
1104    #[serde(deserialize_with = "deserialize_octal_permission")]
1105    pub permission_mode: u32,
1106}
1107
1108#[derive(Debug, Clone, Serialize, Deserialize)]
1109#[serde(default)]
1110pub struct DeltaCompressionOptionsConfig {
1111    pub enabled: bool,
1112    pub algorithm: String,
1113    pub full_message_interval: u32,
1114    pub min_message_size: usize,
1115    pub max_state_age_secs: u64,
1116    pub max_channel_states_per_socket: usize,
1117    pub max_conflation_states_per_channel: Option<usize>,
1118    pub conflation_key_path: Option<String>,
1119    pub cluster_coordination: bool,
1120    pub omit_delta_algorithm: bool,
1121}
1122
1123/// Cleanup system configuration (minimal version for options; full impl lives in sockudo crate)
1124#[derive(Debug, Clone, Serialize, Deserialize)]
1125#[serde(default)]
1126pub struct CleanupConfig {
1127    pub queue_buffer_size: usize,
1128    pub batch_size: usize,
1129    pub batch_timeout_ms: u64,
1130    pub worker_threads: WorkerThreadsConfig,
1131    pub max_retry_attempts: u32,
1132    pub async_enabled: bool,
1133    pub fallback_to_sync: bool,
1134}
1135
1136/// Worker threads configuration for the cleanup system
1137#[derive(Debug, Clone)]
1138pub enum WorkerThreadsConfig {
1139    Auto,
1140    Fixed(usize),
1141}
1142
1143impl serde::Serialize for WorkerThreadsConfig {
1144    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1145    where
1146        S: serde::Serializer,
1147    {
1148        match self {
1149            WorkerThreadsConfig::Auto => serializer.serialize_str("auto"),
1150            WorkerThreadsConfig::Fixed(n) => serializer.serialize_u64(*n as u64),
1151        }
1152    }
1153}
1154
1155impl<'de> serde::Deserialize<'de> for WorkerThreadsConfig {
1156    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1157    where
1158        D: serde::Deserializer<'de>,
1159    {
1160        use serde::de;
1161        struct WorkerThreadsVisitor;
1162        impl<'de> de::Visitor<'de> for WorkerThreadsVisitor {
1163            type Value = WorkerThreadsConfig;
1164
1165            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1166                formatter.write_str(r#""auto" or a positive integer"#)
1167            }
1168
1169            fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
1170                if value.eq_ignore_ascii_case("auto") {
1171                    Ok(WorkerThreadsConfig::Auto)
1172                } else if let Ok(n) = value.parse::<usize>() {
1173                    Ok(WorkerThreadsConfig::Fixed(n))
1174                } else {
1175                    Err(E::custom(format!(
1176                        "expected 'auto' or a number, got '{value}'"
1177                    )))
1178                }
1179            }
1180
1181            fn visit_u64<E: de::Error>(self, value: u64) -> Result<Self::Value, E> {
1182                Ok(WorkerThreadsConfig::Fixed(value as usize))
1183            }
1184
1185            fn visit_i64<E: de::Error>(self, value: i64) -> Result<Self::Value, E> {
1186                if value >= 0 {
1187                    Ok(WorkerThreadsConfig::Fixed(value as usize))
1188                } else {
1189                    Err(E::custom("worker_threads must be non-negative"))
1190                }
1191            }
1192        }
1193        deserializer.deserialize_any(WorkerThreadsVisitor)
1194    }
1195}
1196
1197impl Default for CleanupConfig {
1198    fn default() -> Self {
1199        Self {
1200            queue_buffer_size: 1024,
1201            batch_size: 64,
1202            batch_timeout_ms: 100,
1203            worker_threads: WorkerThreadsConfig::Auto,
1204            max_retry_attempts: 3,
1205            async_enabled: true,
1206            fallback_to_sync: true,
1207        }
1208    }
1209}
1210
1211impl CleanupConfig {
1212    pub fn validate(&self) -> Result<(), String> {
1213        if self.queue_buffer_size == 0 {
1214            return Err("queue_buffer_size must be greater than 0".to_string());
1215        }
1216        if self.batch_size == 0 {
1217            return Err("batch_size must be greater than 0".to_string());
1218        }
1219        if self.batch_timeout_ms == 0 {
1220            return Err("batch_timeout_ms must be greater than 0".to_string());
1221        }
1222        if let WorkerThreadsConfig::Fixed(n) = self.worker_threads
1223            && n == 0
1224        {
1225            return Err("worker_threads must be greater than 0 when using fixed count".to_string());
1226        }
1227        Ok(())
1228    }
1229}
1230
1231#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1232#[serde(default)]
1233pub struct TagFilteringConfig {
1234    #[serde(default)]
1235    pub enabled: bool,
1236    #[serde(default = "default_true")]
1237    pub enable_tags: bool,
1238}
1239
1240fn default_true() -> bool {
1241    true
1242}
1243
1244// --- Default Implementations ---
1245
1246impl Default for ServerOptions {
1247    fn default() -> Self {
1248        Self {
1249            adapter: AdapterConfig::default(),
1250            app_manager: AppManagerConfig::default(),
1251            cache: CacheConfig::default(),
1252            channel_limits: ChannelLimits::default(),
1253            cors: CorsConfig::default(),
1254            database: DatabaseConfig::default(),
1255            database_pooling: DatabasePooling::default(),
1256            debug: false,
1257            tag_filtering: TagFilteringConfig::default(),
1258            event_limits: EventLimits::default(),
1259            host: "0.0.0.0".to_string(),
1260            http_api: HttpApiConfig::default(),
1261            instance: InstanceConfig::default(),
1262            logging: None,
1263            metrics: MetricsConfig::default(),
1264            mode: "production".to_string(),
1265            port: 6001,
1266            path_prefix: "/".to_string(),
1267            presence: PresenceConfig::default(),
1268            queue: QueueConfig::default(),
1269            rate_limiter: RateLimiterConfig::default(),
1270            shutdown_grace_period: 10,
1271            ssl: SslConfig::default(),
1272            user_authentication_timeout: 3600,
1273            webhooks: WebhooksConfig::default(),
1274            websocket_max_payload_kb: 64,
1275            cleanup: CleanupConfig::default(),
1276            activity_timeout: 120,
1277            cluster_health: ClusterHealthConfig::default(),
1278            unix_socket: UnixSocketConfig::default(),
1279            delta_compression: DeltaCompressionOptionsConfig::default(),
1280            websocket: WebSocketConfig::default(),
1281        }
1282    }
1283}
1284
1285impl Default for SqsQueueConfig {
1286    fn default() -> Self {
1287        Self {
1288            region: "us-east-1".to_string(),
1289            queue_url_prefix: None,
1290            visibility_timeout: 30,
1291            endpoint_url: None,
1292            max_messages: 10,
1293            wait_time_seconds: 5,
1294            concurrency: 5,
1295            fifo: false,
1296            message_group_id: Some("default".to_string()),
1297        }
1298    }
1299}
1300
1301impl Default for SnsQueueConfig {
1302    fn default() -> Self {
1303        Self {
1304            region: "us-east-1".to_string(),
1305            topic_arn: String::new(),
1306            endpoint_url: None,
1307        }
1308    }
1309}
1310
1311impl Default for RedisAdapterConfig {
1312    fn default() -> Self {
1313        Self {
1314            requests_timeout: 5000,
1315            prefix: "sockudo_adapter:".to_string(),
1316            redis_pub_options: AHashMap::new(),
1317            redis_sub_options: AHashMap::new(),
1318            cluster_mode: false,
1319        }
1320    }
1321}
1322
1323impl Default for RedisClusterAdapterConfig {
1324    fn default() -> Self {
1325        Self {
1326            nodes: vec![],
1327            prefix: "sockudo_adapter:".to_string(),
1328            request_timeout_ms: 1000,
1329            use_connection_manager: true,
1330            use_sharded_pubsub: false,
1331        }
1332    }
1333}
1334
1335impl Default for NatsAdapterConfig {
1336    fn default() -> Self {
1337        Self {
1338            servers: vec!["nats://localhost:4222".to_string()],
1339            prefix: "sockudo_adapter:".to_string(),
1340            request_timeout_ms: 5000,
1341            username: None,
1342            password: None,
1343            token: None,
1344            connection_timeout_ms: 5000,
1345            nodes_number: None,
1346        }
1347    }
1348}
1349
1350impl Default for CacheSettings {
1351    fn default() -> Self {
1352        Self {
1353            enabled: true,
1354            ttl: 300,
1355        }
1356    }
1357}
1358
1359impl Default for MemoryCacheOptions {
1360    fn default() -> Self {
1361        Self {
1362            ttl: 300,
1363            cleanup_interval: 60,
1364            max_capacity: 10000,
1365        }
1366    }
1367}
1368
1369impl Default for CacheConfig {
1370    fn default() -> Self {
1371        Self {
1372            driver: CacheDriver::default(),
1373            redis: RedisConfig {
1374                prefix: Some("sockudo_cache:".to_string()),
1375                url_override: None,
1376                cluster_mode: false,
1377            },
1378            memory: MemoryCacheOptions::default(),
1379        }
1380    }
1381}
1382
1383impl Default for ChannelLimits {
1384    fn default() -> Self {
1385        Self {
1386            max_name_length: 200,
1387            cache_ttl: 3600,
1388        }
1389    }
1390}
1391impl Default for CorsConfig {
1392    fn default() -> Self {
1393        Self {
1394            credentials: true,
1395            origin: vec!["*".to_string()],
1396            methods: vec!["GET".to_string(), "POST".to_string(), "OPTIONS".to_string()],
1397            allowed_headers: vec![
1398                "Authorization".to_string(),
1399                "Content-Type".to_string(),
1400                "X-Requested-With".to_string(),
1401                "Accept".to_string(),
1402            ],
1403        }
1404    }
1405}
1406
1407impl Default for DatabaseConnection {
1408    fn default() -> Self {
1409        Self {
1410            host: "localhost".to_string(),
1411            port: 3306,
1412            username: "root".to_string(),
1413            password: "".to_string(),
1414            database: "sockudo".to_string(),
1415            table_name: "applications".to_string(),
1416            connection_pool_size: 10,
1417            pool_min: None,
1418            pool_max: None,
1419            cache_ttl: 300,
1420            cache_cleanup_interval: 60,
1421            cache_max_capacity: 100,
1422        }
1423    }
1424}
1425
1426impl Default for RedisConnection {
1427    fn default() -> Self {
1428        Self {
1429            host: "127.0.0.1".to_string(),
1430            port: 6379,
1431            db: 0,
1432            username: None,
1433            password: None,
1434            key_prefix: "sockudo:".to_string(),
1435            sentinels: Vec::new(),
1436            sentinel_password: None,
1437            name: "mymaster".to_string(),
1438            cluster: RedisClusterConnection::default(),
1439            cluster_nodes: Vec::new(),
1440        }
1441    }
1442}
1443
1444impl Default for RedisSentinel {
1445    fn default() -> Self {
1446        Self {
1447            host: "localhost".to_string(),
1448            port: 26379,
1449        }
1450    }
1451}
1452
1453impl Default for ClusterNode {
1454    fn default() -> Self {
1455        Self {
1456            host: "127.0.0.1".to_string(),
1457            port: 7000,
1458        }
1459    }
1460}
1461
1462impl Default for DatabasePooling {
1463    fn default() -> Self {
1464        Self {
1465            enabled: true,
1466            min: 2,
1467            max: 10,
1468        }
1469    }
1470}
1471
1472impl Default for EventLimits {
1473    fn default() -> Self {
1474        Self {
1475            max_channels_at_once: 100,
1476            max_name_length: 200,
1477            max_payload_in_kb: 100,
1478            max_batch_size: 10,
1479        }
1480    }
1481}
1482
1483impl Default for HttpApiConfig {
1484    fn default() -> Self {
1485        Self {
1486            request_limit_in_mb: 10,
1487            accept_traffic: AcceptTraffic::default(),
1488        }
1489    }
1490}
1491
1492impl Default for AcceptTraffic {
1493    fn default() -> Self {
1494        Self {
1495            memory_threshold: 0.90,
1496        }
1497    }
1498}
1499
1500impl Default for InstanceConfig {
1501    fn default() -> Self {
1502        Self {
1503            process_id: uuid::Uuid::new_v4().to_string(),
1504        }
1505    }
1506}
1507
1508impl Default for LoggingConfig {
1509    fn default() -> Self {
1510        Self {
1511            colors_enabled: true,
1512            include_target: true,
1513        }
1514    }
1515}
1516
1517impl Default for MetricsConfig {
1518    fn default() -> Self {
1519        Self {
1520            enabled: true,
1521            driver: MetricsDriver::default(),
1522            host: "0.0.0.0".to_string(),
1523            prometheus: PrometheusConfig::default(),
1524            port: 9601,
1525        }
1526    }
1527}
1528
1529impl Default for PrometheusConfig {
1530    fn default() -> Self {
1531        Self {
1532            prefix: "sockudo_".to_string(),
1533        }
1534    }
1535}
1536
1537impl Default for PresenceConfig {
1538    fn default() -> Self {
1539        Self {
1540            max_members_per_channel: 100,
1541            max_member_size_in_kb: 2,
1542        }
1543    }
1544}
1545
1546impl Default for RedisQueueConfig {
1547    fn default() -> Self {
1548        Self {
1549            concurrency: 5,
1550            prefix: Some("sockudo_queue:".to_string()),
1551            url_override: None,
1552            cluster_mode: false,
1553        }
1554    }
1555}
1556
1557impl Default for RateLimit {
1558    fn default() -> Self {
1559        Self {
1560            max_requests: 60,
1561            window_seconds: 60,
1562            identifier: Some("default".to_string()),
1563            trust_hops: Some(0),
1564        }
1565    }
1566}
1567
1568impl Default for RateLimiterConfig {
1569    fn default() -> Self {
1570        Self {
1571            enabled: true,
1572            driver: CacheDriver::Memory,
1573            api_rate_limit: RateLimit {
1574                max_requests: 100,
1575                window_seconds: 60,
1576                identifier: Some("api".to_string()),
1577                trust_hops: Some(0),
1578            },
1579            websocket_rate_limit: RateLimit {
1580                max_requests: 20,
1581                window_seconds: 60,
1582                identifier: Some("websocket_connect".to_string()),
1583                trust_hops: Some(0),
1584            },
1585            redis: RedisConfig {
1586                prefix: Some("sockudo_rl:".to_string()),
1587                url_override: None,
1588                cluster_mode: false,
1589            },
1590        }
1591    }
1592}
1593
1594impl Default for SslConfig {
1595    fn default() -> Self {
1596        Self {
1597            enabled: false,
1598            cert_path: "".to_string(),
1599            key_path: "".to_string(),
1600            passphrase: None,
1601            ca_path: None,
1602            redirect_http: false,
1603            http_port: Some(80),
1604        }
1605    }
1606}
1607
1608impl Default for BatchingConfig {
1609    fn default() -> Self {
1610        Self {
1611            enabled: true,
1612            duration: 50,
1613            size: 100,
1614        }
1615    }
1616}
1617
1618impl Default for ClusterHealthConfig {
1619    fn default() -> Self {
1620        Self {
1621            enabled: true,
1622            heartbeat_interval_ms: 10000,
1623            node_timeout_ms: 30000,
1624            cleanup_interval_ms: 10000,
1625        }
1626    }
1627}
1628
1629impl Default for UnixSocketConfig {
1630    fn default() -> Self {
1631        Self {
1632            enabled: false,
1633            path: "/var/run/sockudo/sockudo.sock".to_string(),
1634            permission_mode: 0o660,
1635        }
1636    }
1637}
1638
1639impl Default for DeltaCompressionOptionsConfig {
1640    fn default() -> Self {
1641        Self {
1642            enabled: true,
1643            algorithm: "fossil".to_string(),
1644            full_message_interval: 10,
1645            min_message_size: 100,
1646            max_state_age_secs: 300,
1647            max_channel_states_per_socket: 100,
1648            max_conflation_states_per_channel: Some(100),
1649            conflation_key_path: None,
1650            cluster_coordination: false,
1651            omit_delta_algorithm: false,
1652        }
1653    }
1654}
1655
1656impl ClusterHealthConfig {
1657    pub fn validate(&self) -> Result<(), String> {
1658        if self.heartbeat_interval_ms == 0 {
1659            return Err("heartbeat_interval_ms must be greater than 0".to_string());
1660        }
1661        if self.node_timeout_ms == 0 {
1662            return Err("node_timeout_ms must be greater than 0".to_string());
1663        }
1664        if self.cleanup_interval_ms == 0 {
1665            return Err("cleanup_interval_ms must be greater than 0".to_string());
1666        }
1667
1668        if self.heartbeat_interval_ms > self.node_timeout_ms / 3 {
1669            return Err(format!(
1670                "heartbeat_interval_ms ({}) should be at least 3x smaller than node_timeout_ms ({}) to avoid false positive dead node detection. Recommended: heartbeat_interval_ms <= {}",
1671                self.heartbeat_interval_ms,
1672                self.node_timeout_ms,
1673                self.node_timeout_ms / 3
1674            ));
1675        }
1676
1677        if self.cleanup_interval_ms > self.node_timeout_ms {
1678            return Err(format!(
1679                "cleanup_interval_ms ({}) should not be larger than node_timeout_ms ({}) to ensure timely dead node detection",
1680                self.cleanup_interval_ms, self.node_timeout_ms
1681            ));
1682        }
1683
1684        Ok(())
1685    }
1686}
1687
1688impl ServerOptions {
1689    pub async fn load_from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
1690        let content = tokio::fs::read_to_string(path).await?;
1691        let options: Self = sonic_rs::from_str(&content)?;
1692        Ok(options)
1693    }
1694
1695    pub async fn override_from_env(&mut self) -> Result<(), Box<dyn std::error::Error>> {
1696        // --- General Settings ---
1697        if let Ok(mode) = std::env::var("ENVIRONMENT") {
1698            self.mode = mode;
1699        }
1700        self.debug = parse_bool_env("DEBUG_MODE", self.debug);
1701        if parse_bool_env("DEBUG", false) {
1702            self.debug = true;
1703            info!("DEBUG environment variable forces debug mode ON");
1704        }
1705
1706        self.activity_timeout = parse_env::<u64>("ACTIVITY_TIMEOUT", self.activity_timeout);
1707
1708        if let Ok(host) = std::env::var("HOST") {
1709            self.host = host;
1710        }
1711        self.port = parse_env::<u16>("PORT", self.port);
1712        self.shutdown_grace_period =
1713            parse_env::<u64>("SHUTDOWN_GRACE_PERIOD", self.shutdown_grace_period);
1714        self.user_authentication_timeout = parse_env::<u64>(
1715            "USER_AUTHENTICATION_TIMEOUT",
1716            self.user_authentication_timeout,
1717        );
1718        self.websocket_max_payload_kb =
1719            parse_env::<u32>("WEBSOCKET_MAX_PAYLOAD_KB", self.websocket_max_payload_kb);
1720        if let Ok(id) = std::env::var("INSTANCE_PROCESS_ID") {
1721            self.instance.process_id = id;
1722        }
1723
1724        // --- Driver Configuration ---
1725        if let Ok(driver_str) = std::env::var("ADAPTER_DRIVER") {
1726            self.adapter.driver =
1727                parse_driver_enum(driver_str, self.adapter.driver.clone(), "Adapter");
1728        }
1729        self.adapter.buffer_multiplier_per_cpu = parse_env::<usize>(
1730            "ADAPTER_BUFFER_MULTIPLIER_PER_CPU",
1731            self.adapter.buffer_multiplier_per_cpu,
1732        );
1733        self.adapter.enable_socket_counting = parse_env::<bool>(
1734            "ADAPTER_ENABLE_SOCKET_COUNTING",
1735            self.adapter.enable_socket_counting,
1736        );
1737        if let Ok(driver_str) = std::env::var("CACHE_DRIVER") {
1738            self.cache.driver = parse_driver_enum(driver_str, self.cache.driver.clone(), "Cache");
1739        }
1740        if let Ok(driver_str) = std::env::var("QUEUE_DRIVER") {
1741            self.queue.driver = parse_driver_enum(driver_str, self.queue.driver.clone(), "Queue");
1742        }
1743        if let Ok(driver_str) = std::env::var("APP_MANAGER_DRIVER") {
1744            self.app_manager.driver =
1745                parse_driver_enum(driver_str, self.app_manager.driver.clone(), "AppManager");
1746        }
1747        if let Ok(driver_str) = std::env::var("RATE_LIMITER_DRIVER") {
1748            self.rate_limiter.driver = parse_driver_enum(
1749                driver_str,
1750                self.rate_limiter.driver.clone(),
1751                "RateLimiter Backend",
1752            );
1753        }
1754
1755        // --- Database: Redis ---
1756        if let Ok(host) = std::env::var("DATABASE_REDIS_HOST") {
1757            self.database.redis.host = host;
1758        }
1759        self.database.redis.port =
1760            parse_env::<u16>("DATABASE_REDIS_PORT", self.database.redis.port);
1761        if let Ok(username) = std::env::var("DATABASE_REDIS_USERNAME") {
1762            self.database.redis.username = if username.is_empty() {
1763                None
1764            } else {
1765                Some(username)
1766            };
1767        }
1768        if let Ok(password) = std::env::var("DATABASE_REDIS_PASSWORD") {
1769            self.database.redis.password = Some(password);
1770        }
1771        self.database.redis.db = parse_env::<u32>("DATABASE_REDIS_DB", self.database.redis.db);
1772        if let Ok(prefix) = std::env::var("DATABASE_REDIS_KEY_PREFIX") {
1773            self.database.redis.key_prefix = prefix;
1774        }
1775        if let Ok(cluster_username) = std::env::var("DATABASE_REDIS_CLUSTER_USERNAME") {
1776            self.database.redis.cluster.username = if cluster_username.is_empty() {
1777                None
1778            } else {
1779                Some(cluster_username)
1780            };
1781        }
1782        if let Ok(cluster_password) = std::env::var("DATABASE_REDIS_CLUSTER_PASSWORD") {
1783            self.database.redis.cluster.password = Some(cluster_password);
1784        }
1785        self.database.redis.cluster.use_tls = parse_bool_env(
1786            "DATABASE_REDIS_CLUSTER_USE_TLS",
1787            self.database.redis.cluster.use_tls,
1788        );
1789
1790        // --- Database: MySQL ---
1791        if let Ok(host) = std::env::var("DATABASE_MYSQL_HOST") {
1792            self.database.mysql.host = host;
1793        }
1794        self.database.mysql.port =
1795            parse_env::<u16>("DATABASE_MYSQL_PORT", self.database.mysql.port);
1796        if let Ok(user) = std::env::var("DATABASE_MYSQL_USERNAME") {
1797            self.database.mysql.username = user;
1798        }
1799        if let Ok(pass) = std::env::var("DATABASE_MYSQL_PASSWORD") {
1800            self.database.mysql.password = pass;
1801        }
1802        if let Ok(db) = std::env::var("DATABASE_MYSQL_DATABASE") {
1803            self.database.mysql.database = db;
1804        }
1805        if let Ok(table) = std::env::var("DATABASE_MYSQL_TABLE_NAME") {
1806            self.database.mysql.table_name = table;
1807        }
1808        override_db_pool_settings(&mut self.database.mysql, "DATABASE_MYSQL");
1809
1810        // --- Database: PostgreSQL ---
1811        if let Ok(host) = std::env::var("DATABASE_POSTGRES_HOST") {
1812            self.database.postgres.host = host;
1813        }
1814        self.database.postgres.port =
1815            parse_env::<u16>("DATABASE_POSTGRES_PORT", self.database.postgres.port);
1816        if let Ok(user) = std::env::var("DATABASE_POSTGRES_USERNAME") {
1817            self.database.postgres.username = user;
1818        }
1819        if let Ok(pass) = std::env::var("DATABASE_POSTGRES_PASSWORD") {
1820            self.database.postgres.password = pass;
1821        }
1822        if let Ok(db) = std::env::var("DATABASE_POSTGRES_DATABASE") {
1823            self.database.postgres.database = db;
1824        }
1825        override_db_pool_settings(&mut self.database.postgres, "DATABASE_POSTGRES");
1826
1827        // --- Database: DynamoDB ---
1828        if let Ok(region) = std::env::var("DATABASE_DYNAMODB_REGION") {
1829            self.database.dynamodb.region = region;
1830        }
1831        if let Ok(table) = std::env::var("DATABASE_DYNAMODB_TABLE_NAME") {
1832            self.database.dynamodb.table_name = table;
1833        }
1834        if let Ok(endpoint) = std::env::var("DATABASE_DYNAMODB_ENDPOINT_URL") {
1835            self.database.dynamodb.endpoint_url = Some(endpoint);
1836        }
1837        if let Ok(key_id) = std::env::var("AWS_ACCESS_KEY_ID") {
1838            self.database.dynamodb.aws_access_key_id = Some(key_id);
1839        }
1840        if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") {
1841            self.database.dynamodb.aws_secret_access_key = Some(secret);
1842        }
1843
1844        // --- Redis Cluster ---
1845        let apply_redis_cluster_nodes = |options: &mut Self, nodes: &str| {
1846            let node_list: Vec<String> = nodes
1847                .split(',')
1848                .map(|s| s.trim())
1849                .filter(|s| !s.is_empty())
1850                .map(ToString::to_string)
1851                .collect();
1852
1853            options.adapter.cluster.nodes = node_list.clone();
1854            options.queue.redis_cluster.nodes = node_list.clone();
1855
1856            let parsed_nodes: Vec<ClusterNode> = node_list
1857                .iter()
1858                .filter_map(|seed| ClusterNode::from_seed(seed))
1859                .collect();
1860            options.database.redis.cluster.nodes = parsed_nodes.clone();
1861            options.database.redis.cluster_nodes = parsed_nodes;
1862        };
1863
1864        if let Ok(nodes) = std::env::var("REDIS_CLUSTER_NODES") {
1865            apply_redis_cluster_nodes(self, &nodes);
1866        }
1867        if let Ok(nodes) = std::env::var("DATABASE_REDIS_CLUSTER_NODES") {
1868            apply_redis_cluster_nodes(self, &nodes);
1869        }
1870        self.queue.redis_cluster.concurrency = parse_env::<u32>(
1871            "REDIS_CLUSTER_QUEUE_CONCURRENCY",
1872            self.queue.redis_cluster.concurrency,
1873        );
1874        if let Ok(prefix) = std::env::var("REDIS_CLUSTER_QUEUE_PREFIX") {
1875            self.queue.redis_cluster.prefix = Some(prefix);
1876        }
1877
1878        // --- SSL Configuration ---
1879        self.ssl.enabled = parse_bool_env("SSL_ENABLED", self.ssl.enabled);
1880        if let Ok(val) = std::env::var("SSL_CERT_PATH") {
1881            self.ssl.cert_path = val;
1882        }
1883        if let Ok(val) = std::env::var("SSL_KEY_PATH") {
1884            self.ssl.key_path = val;
1885        }
1886        self.ssl.redirect_http = parse_bool_env("SSL_REDIRECT_HTTP", self.ssl.redirect_http);
1887        if let Some(port) = parse_env_optional::<u16>("SSL_HTTP_PORT") {
1888            self.ssl.http_port = Some(port);
1889        }
1890
1891        // --- Unix Socket Configuration ---
1892        self.unix_socket.enabled = parse_bool_env("UNIX_SOCKET_ENABLED", self.unix_socket.enabled);
1893        if let Ok(path) = std::env::var("UNIX_SOCKET_PATH") {
1894            self.unix_socket.path = path;
1895        }
1896        if let Ok(mode_str) = std::env::var("UNIX_SOCKET_PERMISSION_MODE") {
1897            if mode_str.chars().all(|c| c.is_digit(8)) {
1898                if let Ok(mode) = u32::from_str_radix(&mode_str, 8) {
1899                    if mode <= 0o777 {
1900                        self.unix_socket.permission_mode = mode;
1901                    } else {
1902                        warn!(
1903                            "UNIX_SOCKET_PERMISSION_MODE '{}' exceeds maximum value 777. Using default: {:o}",
1904                            mode_str, self.unix_socket.permission_mode
1905                        );
1906                    }
1907                } else {
1908                    warn!(
1909                        "Failed to parse UNIX_SOCKET_PERMISSION_MODE '{}' as octal. Using default: {:o}",
1910                        mode_str, self.unix_socket.permission_mode
1911                    );
1912                }
1913            } else {
1914                warn!(
1915                    "UNIX_SOCKET_PERMISSION_MODE '{}' must contain only octal digits (0-7). Using default: {:o}",
1916                    mode_str, self.unix_socket.permission_mode
1917                );
1918            }
1919        }
1920
1921        // --- Metrics ---
1922        if let Ok(driver_str) = std::env::var("METRICS_DRIVER") {
1923            self.metrics.driver =
1924                parse_driver_enum(driver_str, self.metrics.driver.clone(), "Metrics");
1925        }
1926        self.metrics.enabled = parse_bool_env("METRICS_ENABLED", self.metrics.enabled);
1927        if let Ok(val) = std::env::var("METRICS_HOST") {
1928            self.metrics.host = val;
1929        }
1930        self.metrics.port = parse_env::<u16>("METRICS_PORT", self.metrics.port);
1931        if let Ok(val) = std::env::var("METRICS_PROMETHEUS_PREFIX") {
1932            self.metrics.prometheus.prefix = val;
1933        }
1934
1935        // --- Rate Limiter ---
1936        self.rate_limiter.enabled =
1937            parse_bool_env("RATE_LIMITER_ENABLED", self.rate_limiter.enabled);
1938        self.rate_limiter.api_rate_limit.max_requests = parse_env::<u32>(
1939            "RATE_LIMITER_API_MAX_REQUESTS",
1940            self.rate_limiter.api_rate_limit.max_requests,
1941        );
1942        self.rate_limiter.api_rate_limit.window_seconds = parse_env::<u64>(
1943            "RATE_LIMITER_API_WINDOW_SECONDS",
1944            self.rate_limiter.api_rate_limit.window_seconds,
1945        );
1946        if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_API_TRUST_HOPS") {
1947            self.rate_limiter.api_rate_limit.trust_hops = Some(hops);
1948        }
1949        self.rate_limiter.websocket_rate_limit.max_requests = parse_env::<u32>(
1950            "RATE_LIMITER_WS_MAX_REQUESTS",
1951            self.rate_limiter.websocket_rate_limit.max_requests,
1952        );
1953        self.rate_limiter.websocket_rate_limit.window_seconds = parse_env::<u64>(
1954            "RATE_LIMITER_WS_WINDOW_SECONDS",
1955            self.rate_limiter.websocket_rate_limit.window_seconds,
1956        );
1957        if let Ok(prefix) = std::env::var("RATE_LIMITER_REDIS_PREFIX") {
1958            self.rate_limiter.redis.prefix = Some(prefix);
1959        }
1960
1961        // --- Queue: Redis ---
1962        self.queue.redis.concurrency =
1963            parse_env::<u32>("QUEUE_REDIS_CONCURRENCY", self.queue.redis.concurrency);
1964        if let Ok(prefix) = std::env::var("QUEUE_REDIS_PREFIX") {
1965            self.queue.redis.prefix = Some(prefix);
1966        }
1967
1968        // --- Queue: SQS ---
1969        if let Ok(region) = std::env::var("QUEUE_SQS_REGION") {
1970            self.queue.sqs.region = region;
1971        }
1972        self.queue.sqs.visibility_timeout = parse_env::<i32>(
1973            "QUEUE_SQS_VISIBILITY_TIMEOUT",
1974            self.queue.sqs.visibility_timeout,
1975        );
1976        self.queue.sqs.max_messages =
1977            parse_env::<i32>("QUEUE_SQS_MAX_MESSAGES", self.queue.sqs.max_messages);
1978        self.queue.sqs.wait_time_seconds = parse_env::<i32>(
1979            "QUEUE_SQS_WAIT_TIME_SECONDS",
1980            self.queue.sqs.wait_time_seconds,
1981        );
1982        self.queue.sqs.concurrency =
1983            parse_env::<u32>("QUEUE_SQS_CONCURRENCY", self.queue.sqs.concurrency);
1984        self.queue.sqs.fifo = parse_bool_env("QUEUE_SQS_FIFO", self.queue.sqs.fifo);
1985        if let Ok(endpoint) = std::env::var("QUEUE_SQS_ENDPOINT_URL") {
1986            self.queue.sqs.endpoint_url = Some(endpoint);
1987        }
1988
1989        // --- Queue: SNS ---
1990        if let Ok(region) = std::env::var("QUEUE_SNS_REGION") {
1991            self.queue.sns.region = region;
1992        }
1993        if let Ok(topic_arn) = std::env::var("QUEUE_SNS_TOPIC_ARN") {
1994            self.queue.sns.topic_arn = topic_arn;
1995        }
1996        if let Ok(endpoint) = std::env::var("QUEUE_SNS_ENDPOINT_URL") {
1997            self.queue.sns.endpoint_url = Some(endpoint);
1998        }
1999
2000        // --- Webhooks ---
2001        self.webhooks.batching.enabled =
2002            parse_bool_env("WEBHOOK_BATCHING_ENABLED", self.webhooks.batching.enabled);
2003        self.webhooks.batching.duration =
2004            parse_env::<u64>("WEBHOOK_BATCHING_DURATION", self.webhooks.batching.duration);
2005        self.webhooks.batching.size =
2006            parse_env::<usize>("WEBHOOK_BATCHING_SIZE", self.webhooks.batching.size);
2007
2008        // --- NATS Adapter ---
2009        if let Ok(servers) = std::env::var("NATS_SERVERS") {
2010            self.adapter.nats.servers = servers.split(',').map(|s| s.trim().to_string()).collect();
2011        }
2012        if let Ok(user) = std::env::var("NATS_USERNAME") {
2013            self.adapter.nats.username = Some(user);
2014        }
2015        if let Ok(pass) = std::env::var("NATS_PASSWORD") {
2016            self.adapter.nats.password = Some(pass);
2017        }
2018        if let Ok(token) = std::env::var("NATS_TOKEN") {
2019            self.adapter.nats.token = Some(token);
2020        }
2021        self.adapter.nats.connection_timeout_ms = parse_env::<u64>(
2022            "NATS_CONNECTION_TIMEOUT_MS",
2023            self.adapter.nats.connection_timeout_ms,
2024        );
2025        self.adapter.nats.request_timeout_ms = parse_env::<u64>(
2026            "NATS_REQUEST_TIMEOUT_MS",
2027            self.adapter.nats.request_timeout_ms,
2028        );
2029
2030        // --- CORS ---
2031        if let Ok(origins) = std::env::var("CORS_ORIGINS") {
2032            self.cors.origin = origins.split(',').map(|s| s.trim().to_string()).collect();
2033        }
2034        if let Ok(methods) = std::env::var("CORS_METHODS") {
2035            self.cors.methods = methods.split(',').map(|s| s.trim().to_string()).collect();
2036        }
2037        if let Ok(headers) = std::env::var("CORS_HEADERS") {
2038            self.cors.allowed_headers = headers.split(',').map(|s| s.trim().to_string()).collect();
2039        }
2040        self.cors.credentials = parse_bool_env("CORS_CREDENTIALS", self.cors.credentials);
2041
2042        // --- Performance Tuning ---
2043        self.database_pooling.enabled =
2044            parse_bool_env("DATABASE_POOLING_ENABLED", self.database_pooling.enabled);
2045        if let Some(min) = parse_env_optional::<u32>("DATABASE_POOL_MIN") {
2046            self.database_pooling.min = min;
2047        }
2048        if let Some(max) = parse_env_optional::<u32>("DATABASE_POOL_MAX") {
2049            self.database_pooling.max = max;
2050        }
2051
2052        if let Some(pool_size) = parse_env_optional::<u32>("DATABASE_CONNECTION_POOL_SIZE") {
2053            self.database.mysql.connection_pool_size = pool_size;
2054            self.database.postgres.connection_pool_size = pool_size;
2055        }
2056        if let Some(cache_ttl) = parse_env_optional::<u64>("CACHE_TTL_SECONDS") {
2057            self.app_manager.cache.ttl = cache_ttl;
2058            self.channel_limits.cache_ttl = cache_ttl;
2059            self.database.mysql.cache_ttl = cache_ttl;
2060            self.database.postgres.cache_ttl = cache_ttl;
2061            self.cache.memory.ttl = cache_ttl;
2062        }
2063        if let Some(cleanup_interval) = parse_env_optional::<u64>("CACHE_CLEANUP_INTERVAL") {
2064            self.database.mysql.cache_cleanup_interval = cleanup_interval;
2065            self.database.postgres.cache_cleanup_interval = cleanup_interval;
2066            self.cache.memory.cleanup_interval = cleanup_interval;
2067        }
2068        if let Some(max_capacity) = parse_env_optional::<u64>("CACHE_MAX_CAPACITY") {
2069            self.database.mysql.cache_max_capacity = max_capacity;
2070            self.database.postgres.cache_max_capacity = max_capacity;
2071            self.cache.memory.max_capacity = max_capacity;
2072        }
2073
2074        let default_app_id = std::env::var("SOCKUDO_DEFAULT_APP_ID");
2075        let default_app_key = std::env::var("SOCKUDO_DEFAULT_APP_KEY");
2076        let default_app_secret = std::env::var("SOCKUDO_DEFAULT_APP_SECRET");
2077        let default_app_enabled = parse_bool_env("SOCKUDO_DEFAULT_APP_ENABLED", true);
2078
2079        if let (Ok(app_id), Ok(app_key), Ok(app_secret)) =
2080            (default_app_id, default_app_key, default_app_secret)
2081            && default_app_enabled
2082        {
2083            let default_app = App {
2084                id: app_id,
2085                key: app_key,
2086                secret: app_secret,
2087                enable_client_messages: parse_bool_env("SOCKUDO_ENABLE_CLIENT_MESSAGES", false),
2088                enabled: default_app_enabled,
2089                max_connections: parse_env::<u32>("SOCKUDO_DEFAULT_APP_MAX_CONNECTIONS", 100),
2090                max_client_events_per_second: parse_env::<u32>(
2091                    "SOCKUDO_DEFAULT_APP_MAX_CLIENT_EVENTS_PER_SECOND",
2092                    100,
2093                ),
2094                max_read_requests_per_second: Some(parse_env::<u32>(
2095                    "SOCKUDO_DEFAULT_APP_MAX_READ_REQUESTS_PER_SECOND",
2096                    100,
2097                )),
2098                max_presence_members_per_channel: Some(parse_env::<u32>(
2099                    "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBERS_PER_CHANNEL",
2100                    100,
2101                )),
2102                max_presence_member_size_in_kb: Some(parse_env::<u32>(
2103                    "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBER_SIZE_IN_KB",
2104                    100,
2105                )),
2106                max_channel_name_length: Some(parse_env::<u32>(
2107                    "SOCKUDO_DEFAULT_APP_MAX_CHANNEL_NAME_LENGTH",
2108                    100,
2109                )),
2110                max_event_channels_at_once: Some(parse_env::<u32>(
2111                    "SOCKUDO_DEFAULT_APP_MAX_EVENT_CHANNELS_AT_ONCE",
2112                    100,
2113                )),
2114                max_event_name_length: Some(parse_env::<u32>(
2115                    "SOCKUDO_DEFAULT_APP_MAX_EVENT_NAME_LENGTH",
2116                    100,
2117                )),
2118                max_event_payload_in_kb: Some(parse_env::<u32>(
2119                    "SOCKUDO_DEFAULT_APP_MAX_EVENT_PAYLOAD_IN_KB",
2120                    100,
2121                )),
2122                max_event_batch_size: Some(parse_env::<u32>(
2123                    "SOCKUDO_DEFAULT_APP_MAX_EVENT_BATCH_SIZE",
2124                    100,
2125                )),
2126                enable_user_authentication: Some(parse_bool_env(
2127                    "SOCKUDO_DEFAULT_APP_ENABLE_USER_AUTHENTICATION",
2128                    false,
2129                )),
2130                webhooks: None,
2131                max_backend_events_per_second: Some(parse_env::<u32>(
2132                    "SOCKUDO_DEFAULT_APP_MAX_BACKEND_EVENTS_PER_SECOND",
2133                    100,
2134                )),
2135                enable_watchlist_events: Some(parse_bool_env(
2136                    "SOCKUDO_DEFAULT_APP_ENABLE_WATCHLIST_EVENTS",
2137                    false,
2138                )),
2139                allowed_origins: {
2140                    if let Ok(origins_str) = std::env::var("SOCKUDO_DEFAULT_APP_ALLOWED_ORIGINS") {
2141                        if !origins_str.is_empty() {
2142                            Some(
2143                                origins_str
2144                                    .split(',')
2145                                    .map(|s| s.trim().to_string())
2146                                    .collect(),
2147                            )
2148                        } else {
2149                            None
2150                        }
2151                    } else {
2152                        None
2153                    }
2154                },
2155                channel_delta_compression: None,
2156            };
2157
2158            self.app_manager.array.apps.push(default_app);
2159            info!("Successfully registered default app from env");
2160        }
2161
2162        // Special handling for REDIS_URL
2163        if let Ok(redis_url_env) = std::env::var("REDIS_URL") {
2164            info!("Applying REDIS_URL environment variable override");
2165
2166            let redis_url_json = sonic_rs::json!(redis_url_env);
2167
2168            self.adapter
2169                .redis
2170                .redis_pub_options
2171                .insert("url".to_string(), redis_url_json.clone());
2172            self.adapter
2173                .redis
2174                .redis_sub_options
2175                .insert("url".to_string(), redis_url_json);
2176
2177            self.cache.redis.url_override = Some(redis_url_env.clone());
2178            self.queue.redis.url_override = Some(redis_url_env.clone());
2179            self.rate_limiter.redis.url_override = Some(redis_url_env);
2180        }
2181
2182        // --- Logging Configuration ---
2183        let has_colors_env = std::env::var("LOG_COLORS_ENABLED").is_ok();
2184        let has_target_env = std::env::var("LOG_INCLUDE_TARGET").is_ok();
2185        if has_colors_env || has_target_env {
2186            let logging_config = self.logging.get_or_insert_with(Default::default);
2187            if has_colors_env {
2188                logging_config.colors_enabled =
2189                    parse_bool_env("LOG_COLORS_ENABLED", logging_config.colors_enabled);
2190            }
2191            if has_target_env {
2192                logging_config.include_target =
2193                    parse_bool_env("LOG_INCLUDE_TARGET", logging_config.include_target);
2194            }
2195        }
2196
2197        // --- Cleanup Configuration ---
2198        self.cleanup.async_enabled =
2199            parse_bool_env("CLEANUP_ASYNC_ENABLED", self.cleanup.async_enabled);
2200        self.cleanup.fallback_to_sync =
2201            parse_bool_env("CLEANUP_FALLBACK_TO_SYNC", self.cleanup.fallback_to_sync);
2202        self.cleanup.queue_buffer_size =
2203            parse_env::<usize>("CLEANUP_QUEUE_BUFFER_SIZE", self.cleanup.queue_buffer_size);
2204        self.cleanup.batch_size = parse_env::<usize>("CLEANUP_BATCH_SIZE", self.cleanup.batch_size);
2205        self.cleanup.batch_timeout_ms =
2206            parse_env::<u64>("CLEANUP_BATCH_TIMEOUT_MS", self.cleanup.batch_timeout_ms);
2207        if let Ok(worker_threads_str) = std::env::var("CLEANUP_WORKER_THREADS") {
2208            self.cleanup.worker_threads = if worker_threads_str.to_lowercase() == "auto" {
2209                WorkerThreadsConfig::Auto
2210            } else if let Ok(n) = worker_threads_str.parse::<usize>() {
2211                WorkerThreadsConfig::Fixed(n)
2212            } else {
2213                warn!(
2214                    "Invalid CLEANUP_WORKER_THREADS value '{}', keeping current setting",
2215                    worker_threads_str
2216                );
2217                self.cleanup.worker_threads.clone()
2218            };
2219        }
2220        self.cleanup.max_retry_attempts = parse_env::<u32>(
2221            "CLEANUP_MAX_RETRY_ATTEMPTS",
2222            self.cleanup.max_retry_attempts,
2223        );
2224
2225        // Cluster health configuration
2226        self.cluster_health.enabled =
2227            parse_bool_env("CLUSTER_HEALTH_ENABLED", self.cluster_health.enabled);
2228        self.cluster_health.heartbeat_interval_ms = parse_env::<u64>(
2229            "CLUSTER_HEALTH_HEARTBEAT_INTERVAL",
2230            self.cluster_health.heartbeat_interval_ms,
2231        );
2232        self.cluster_health.node_timeout_ms = parse_env::<u64>(
2233            "CLUSTER_HEALTH_NODE_TIMEOUT",
2234            self.cluster_health.node_timeout_ms,
2235        );
2236        self.cluster_health.cleanup_interval_ms = parse_env::<u64>(
2237            "CLUSTER_HEALTH_CLEANUP_INTERVAL",
2238            self.cluster_health.cleanup_interval_ms,
2239        );
2240
2241        // Tag filtering configuration
2242        self.tag_filtering.enabled =
2243            parse_bool_env("TAG_FILTERING_ENABLED", self.tag_filtering.enabled);
2244
2245        // WebSocket buffer configuration
2246        if let Ok(val) = std::env::var("WEBSOCKET_MAX_MESSAGES") {
2247            if val.to_lowercase() == "none" || val == "0" {
2248                self.websocket.max_messages = None;
2249            } else if let Ok(n) = val.parse::<usize>() {
2250                self.websocket.max_messages = Some(n);
2251            }
2252        }
2253        if let Ok(val) = std::env::var("WEBSOCKET_MAX_BYTES") {
2254            if val.to_lowercase() == "none" || val == "0" {
2255                self.websocket.max_bytes = None;
2256            } else if let Ok(n) = val.parse::<usize>() {
2257                self.websocket.max_bytes = Some(n);
2258            }
2259        }
2260        self.websocket.disconnect_on_buffer_full = parse_bool_env(
2261            "WEBSOCKET_DISCONNECT_ON_BUFFER_FULL",
2262            self.websocket.disconnect_on_buffer_full,
2263        );
2264        self.websocket.max_message_size = parse_env::<usize>(
2265            "WEBSOCKET_MAX_MESSAGE_SIZE",
2266            self.websocket.max_message_size,
2267        );
2268        self.websocket.max_frame_size =
2269            parse_env::<usize>("WEBSOCKET_MAX_FRAME_SIZE", self.websocket.max_frame_size);
2270        self.websocket.write_buffer_size = parse_env::<usize>(
2271            "WEBSOCKET_WRITE_BUFFER_SIZE",
2272            self.websocket.write_buffer_size,
2273        );
2274        self.websocket.max_backpressure = parse_env::<usize>(
2275            "WEBSOCKET_MAX_BACKPRESSURE",
2276            self.websocket.max_backpressure,
2277        );
2278        self.websocket.auto_ping = parse_bool_env("WEBSOCKET_AUTO_PING", self.websocket.auto_ping);
2279        self.websocket.ping_interval =
2280            parse_env::<u32>("WEBSOCKET_PING_INTERVAL", self.websocket.ping_interval);
2281        self.websocket.idle_timeout =
2282            parse_env::<u32>("WEBSOCKET_IDLE_TIMEOUT", self.websocket.idle_timeout);
2283        if let Ok(mode) = std::env::var("WEBSOCKET_COMPRESSION") {
2284            self.websocket.compression = mode;
2285        }
2286
2287        Ok(())
2288    }
2289
2290    pub fn validate(&self) -> Result<(), String> {
2291        if self.unix_socket.enabled {
2292            if self.unix_socket.path.is_empty() {
2293                return Err(
2294                    "Unix socket path cannot be empty when Unix socket is enabled".to_string(),
2295                );
2296            }
2297
2298            self.validate_unix_socket_security()?;
2299
2300            if self.ssl.enabled {
2301                tracing::warn!(
2302                    "Both Unix socket and SSL are enabled. This is unusual as Unix sockets are typically used behind reverse proxies that handle SSL termination."
2303                );
2304            }
2305
2306            if self.unix_socket.permission_mode > 0o777 {
2307                return Err(format!(
2308                    "Unix socket permission_mode ({:o}) is invalid. Must be a valid octal mode (0o000 to 0o777)",
2309                    self.unix_socket.permission_mode
2310                ));
2311            }
2312        }
2313
2314        if let Err(e) = self.cleanup.validate() {
2315            return Err(format!("Invalid cleanup configuration: {}", e));
2316        }
2317
2318        Ok(())
2319    }
2320
2321    fn validate_unix_socket_security(&self) -> Result<(), String> {
2322        let path = &self.unix_socket.path;
2323
2324        if path.contains("../") || path.contains("..\\") {
2325            return Err(
2326                "Unix socket path contains directory traversal sequences (../). This is not allowed for security reasons.".to_string()
2327            );
2328        }
2329
2330        if self.unix_socket.permission_mode & 0o002 != 0 {
2331            warn!(
2332                "Unix socket permission mode ({:o}) allows world write access. This may be a security risk. Consider using more restrictive permissions like 0o660 or 0o750.",
2333                self.unix_socket.permission_mode
2334            );
2335        }
2336
2337        if self.unix_socket.permission_mode & 0o007 > 0o005 {
2338            warn!(
2339                "Unix socket permission mode ({:o}) grants write permissions to others. Consider using more restrictive permissions.",
2340                self.unix_socket.permission_mode
2341            );
2342        }
2343
2344        if self.mode == "production" && path.starts_with("/tmp/") {
2345            warn!(
2346                "Unix socket path '{}' is in /tmp directory. In production, consider using a more permanent location like /var/run/sockudo/ for better security and persistence.",
2347                path
2348            );
2349        }
2350
2351        if !path.starts_with('/') {
2352            return Err(
2353                "Unix socket path must be absolute (start with /) for security and reliability."
2354                    .to_string(),
2355            );
2356        }
2357
2358        Ok(())
2359    }
2360}
2361
2362#[cfg(test)]
2363mod redis_connection_tests {
2364    use super::{ClusterNode, RedisClusterConnection, RedisConnection, RedisSentinel};
2365
2366    #[test]
2367    fn test_standard_url_basic() {
2368        let conn = RedisConnection {
2369            host: "127.0.0.1".to_string(),
2370            port: 6379,
2371            db: 0,
2372            username: None,
2373            password: None,
2374            key_prefix: "sockudo:".to_string(),
2375            sentinels: Vec::new(),
2376            sentinel_password: None,
2377            name: "mymaster".to_string(),
2378            cluster: RedisClusterConnection::default(),
2379            cluster_nodes: Vec::new(),
2380        };
2381        assert_eq!(conn.to_url(), "redis://127.0.0.1:6379/0");
2382    }
2383
2384    #[test]
2385    fn test_standard_url_with_password() {
2386        let conn = RedisConnection {
2387            host: "127.0.0.1".to_string(),
2388            port: 6379,
2389            db: 2,
2390            username: None,
2391            password: Some("secret".to_string()),
2392            key_prefix: "sockudo:".to_string(),
2393            sentinels: Vec::new(),
2394            sentinel_password: None,
2395            name: "mymaster".to_string(),
2396            cluster: RedisClusterConnection::default(),
2397            cluster_nodes: Vec::new(),
2398        };
2399        assert_eq!(conn.to_url(), "redis://:secret@127.0.0.1:6379/2");
2400    }
2401
2402    #[test]
2403    fn test_standard_url_with_username_and_password() {
2404        let conn = RedisConnection {
2405            host: "redis.example.com".to_string(),
2406            port: 6380,
2407            db: 1,
2408            username: Some("admin".to_string()),
2409            password: Some("pass123".to_string()),
2410            key_prefix: "sockudo:".to_string(),
2411            sentinels: Vec::new(),
2412            sentinel_password: None,
2413            name: "mymaster".to_string(),
2414            cluster: RedisClusterConnection::default(),
2415            cluster_nodes: Vec::new(),
2416        };
2417        assert_eq!(
2418            conn.to_url(),
2419            "redis://admin:pass123@redis.example.com:6380/1"
2420        );
2421    }
2422
2423    #[test]
2424    fn test_standard_url_with_special_chars_in_password() {
2425        let conn = RedisConnection {
2426            host: "127.0.0.1".to_string(),
2427            port: 6379,
2428            db: 0,
2429            username: None,
2430            password: Some("pass@word#123".to_string()),
2431            key_prefix: "sockudo:".to_string(),
2432            sentinels: Vec::new(),
2433            sentinel_password: None,
2434            name: "mymaster".to_string(),
2435            cluster: RedisClusterConnection::default(),
2436            cluster_nodes: Vec::new(),
2437        };
2438        assert_eq!(conn.to_url(), "redis://:pass%40word%23123@127.0.0.1:6379/0");
2439    }
2440
2441    #[test]
2442    fn test_is_sentinel_configured_false() {
2443        let conn = RedisConnection::default();
2444        assert!(!conn.is_sentinel_configured());
2445    }
2446
2447    #[test]
2448    fn test_is_sentinel_configured_true() {
2449        let conn = RedisConnection {
2450            sentinels: vec![RedisSentinel {
2451                host: "sentinel1".to_string(),
2452                port: 26379,
2453            }],
2454            ..Default::default()
2455        };
2456        assert!(conn.is_sentinel_configured());
2457    }
2458
2459    #[test]
2460    fn test_sentinel_url_basic() {
2461        let conn = RedisConnection {
2462            host: "127.0.0.1".to_string(),
2463            port: 6379,
2464            db: 0,
2465            username: None,
2466            password: None,
2467            key_prefix: "sockudo:".to_string(),
2468            sentinels: vec![
2469                RedisSentinel {
2470                    host: "sentinel1".to_string(),
2471                    port: 26379,
2472                },
2473                RedisSentinel {
2474                    host: "sentinel2".to_string(),
2475                    port: 26379,
2476                },
2477            ],
2478            sentinel_password: None,
2479            name: "mymaster".to_string(),
2480            cluster: RedisClusterConnection::default(),
2481            cluster_nodes: Vec::new(),
2482        };
2483        assert_eq!(
2484            conn.to_url(),
2485            "redis+sentinel://sentinel1:26379,sentinel2:26379/mymaster/0"
2486        );
2487    }
2488
2489    #[test]
2490    fn test_sentinel_url_with_sentinel_password() {
2491        let conn = RedisConnection {
2492            host: "127.0.0.1".to_string(),
2493            port: 6379,
2494            db: 0,
2495            username: None,
2496            password: None,
2497            key_prefix: "sockudo:".to_string(),
2498            sentinels: vec![RedisSentinel {
2499                host: "sentinel1".to_string(),
2500                port: 26379,
2501            }],
2502            sentinel_password: Some("sentinelpass".to_string()),
2503            name: "mymaster".to_string(),
2504            cluster: RedisClusterConnection::default(),
2505            cluster_nodes: Vec::new(),
2506        };
2507        assert_eq!(
2508            conn.to_url(),
2509            "redis+sentinel://:sentinelpass@sentinel1:26379/mymaster/0"
2510        );
2511    }
2512
2513    #[test]
2514    fn test_sentinel_url_with_master_password() {
2515        let conn = RedisConnection {
2516            host: "127.0.0.1".to_string(),
2517            port: 6379,
2518            db: 1,
2519            username: None,
2520            password: Some("masterpass".to_string()),
2521            key_prefix: "sockudo:".to_string(),
2522            sentinels: vec![RedisSentinel {
2523                host: "sentinel1".to_string(),
2524                port: 26379,
2525            }],
2526            sentinel_password: None,
2527            name: "mymaster".to_string(),
2528            cluster: RedisClusterConnection::default(),
2529            cluster_nodes: Vec::new(),
2530        };
2531        assert_eq!(
2532            conn.to_url(),
2533            "redis+sentinel://sentinel1:26379/mymaster/1?password=masterpass"
2534        );
2535    }
2536
2537    #[test]
2538    fn test_sentinel_url_with_all_auth() {
2539        let conn = RedisConnection {
2540            host: "127.0.0.1".to_string(),
2541            port: 6379,
2542            db: 2,
2543            username: Some("redisuser".to_string()),
2544            password: Some("redispass".to_string()),
2545            key_prefix: "sockudo:".to_string(),
2546            sentinels: vec![
2547                RedisSentinel {
2548                    host: "sentinel1".to_string(),
2549                    port: 26379,
2550                },
2551                RedisSentinel {
2552                    host: "sentinel2".to_string(),
2553                    port: 26380,
2554                },
2555            ],
2556            sentinel_password: Some("sentinelauth".to_string()),
2557            name: "production-master".to_string(),
2558            cluster: RedisClusterConnection::default(),
2559            cluster_nodes: Vec::new(),
2560        };
2561        assert_eq!(
2562            conn.to_url(),
2563            "redis+sentinel://:sentinelauth@sentinel1:26379,sentinel2:26380/production-master/2?password=redispass&username=redisuser"
2564        );
2565    }
2566
2567    #[test]
2568    fn test_sentinel_to_host_port() {
2569        let sentinel = RedisSentinel {
2570            host: "sentinel.example.com".to_string(),
2571            port: 26379,
2572        };
2573        assert_eq!(sentinel.to_host_port(), "sentinel.example.com:26379");
2574    }
2575
2576    #[test]
2577    fn test_cluster_node_urls_with_shared_cluster_auth_and_tls() {
2578        let conn = RedisConnection {
2579            cluster: RedisClusterConnection {
2580                nodes: vec![
2581                    ClusterNode {
2582                        host: "node1.secure-cluster.com".to_string(),
2583                        port: 7000,
2584                    },
2585                    ClusterNode {
2586                        host: "redis://node2.secure-cluster.com:7001".to_string(),
2587                        port: 7001,
2588                    },
2589                    ClusterNode {
2590                        host: "rediss://node3.secure-cluster.com".to_string(),
2591                        port: 7002,
2592                    },
2593                ],
2594                username: None,
2595                password: Some("cluster-secret".to_string()),
2596                use_tls: true,
2597            },
2598            ..Default::default()
2599        };
2600
2601        assert_eq!(
2602            conn.cluster_node_urls(),
2603            vec![
2604                "rediss://:cluster-secret@node1.secure-cluster.com:7000",
2605                "redis://:cluster-secret@node2.secure-cluster.com:7001",
2606                "rediss://:cluster-secret@node3.secure-cluster.com:7002",
2607            ]
2608        );
2609    }
2610
2611    #[test]
2612    fn test_cluster_node_urls_fallback_to_legacy_nodes() {
2613        let conn = RedisConnection {
2614            password: Some("fallback-secret".to_string()),
2615            cluster_nodes: vec![ClusterNode {
2616                host: "legacy-node.example.com".to_string(),
2617                port: 7000,
2618            }],
2619            ..Default::default()
2620        };
2621
2622        assert_eq!(
2623            conn.cluster_node_urls(),
2624            vec!["redis://:fallback-secret@legacy-node.example.com:7000"]
2625        );
2626    }
2627
2628    #[test]
2629    fn test_normalize_cluster_seed_urls() {
2630        let conn = RedisConnection {
2631            cluster: RedisClusterConnection {
2632                nodes: Vec::new(),
2633                username: Some("svc-user".to_string()),
2634                password: Some("svc-pass".to_string()),
2635                use_tls: true,
2636            },
2637            ..Default::default()
2638        };
2639
2640        let seeds = vec![
2641            "node1.example.com:7000".to_string(),
2642            "redis://node2.example.com:7001".to_string(),
2643            "rediss://node3.example.com".to_string(),
2644        ];
2645
2646        assert_eq!(
2647            conn.normalize_cluster_seed_urls(&seeds),
2648            vec![
2649                "rediss://svc-user:svc-pass@node1.example.com:7000",
2650                "redis://svc-user:svc-pass@node2.example.com:7001",
2651                "rediss://svc-user:svc-pass@node3.example.com:6379",
2652            ]
2653        );
2654    }
2655}
2656
2657#[cfg(test)]
2658mod cluster_node_tests {
2659    use super::ClusterNode;
2660
2661    #[test]
2662    fn test_to_url_basic_host() {
2663        let node = ClusterNode {
2664            host: "localhost".to_string(),
2665            port: 6379,
2666        };
2667        assert_eq!(node.to_url(), "redis://localhost:6379");
2668    }
2669
2670    #[test]
2671    fn test_to_url_ip_address() {
2672        let node = ClusterNode {
2673            host: "127.0.0.1".to_string(),
2674            port: 6379,
2675        };
2676        assert_eq!(node.to_url(), "redis://127.0.0.1:6379");
2677    }
2678
2679    #[test]
2680    fn test_to_url_with_redis_protocol() {
2681        let node = ClusterNode {
2682            host: "redis://example.com".to_string(),
2683            port: 6379,
2684        };
2685        assert_eq!(node.to_url(), "redis://example.com:6379");
2686    }
2687
2688    #[test]
2689    fn test_to_url_with_rediss_protocol() {
2690        let node = ClusterNode {
2691            host: "rediss://secure.example.com".to_string(),
2692            port: 6379,
2693        };
2694        assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
2695    }
2696
2697    #[test]
2698    fn test_to_url_with_rediss_protocol_and_port_in_url() {
2699        let node = ClusterNode {
2700            host: "rediss://secure.example.com:7000".to_string(),
2701            port: 6379,
2702        };
2703        assert_eq!(node.to_url(), "rediss://secure.example.com:7000");
2704    }
2705
2706    #[test]
2707    fn test_to_url_with_redis_protocol_and_port_in_url() {
2708        let node = ClusterNode {
2709            host: "redis://example.com:7001".to_string(),
2710            port: 6379,
2711        };
2712        assert_eq!(node.to_url(), "redis://example.com:7001");
2713    }
2714
2715    #[test]
2716    fn test_to_url_with_trailing_whitespace() {
2717        let node = ClusterNode {
2718            host: "  rediss://secure.example.com  ".to_string(),
2719            port: 6379,
2720        };
2721        assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
2722    }
2723
2724    #[test]
2725    fn test_to_url_custom_port() {
2726        let node = ClusterNode {
2727            host: "redis-cluster.example.com".to_string(),
2728            port: 7000,
2729        };
2730        assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7000");
2731    }
2732
2733    #[test]
2734    fn test_to_url_plain_host_with_port_in_host_field() {
2735        let node = ClusterNode {
2736            host: "redis-cluster.example.com:7010".to_string(),
2737            port: 7000,
2738        };
2739        assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7010");
2740    }
2741
2742    #[test]
2743    fn test_to_url_with_options_adds_auth_and_tls() {
2744        let node = ClusterNode {
2745            host: "node.example.com".to_string(),
2746            port: 7000,
2747        };
2748        assert_eq!(
2749            node.to_url_with_options(true, Some("svc-user"), Some("secret")),
2750            "rediss://svc-user:secret@node.example.com:7000"
2751        );
2752    }
2753
2754    #[test]
2755    fn test_to_url_with_options_keeps_embedded_auth() {
2756        let node = ClusterNode {
2757            host: "rediss://:node-secret@node.example.com:7000".to_string(),
2758            port: 7000,
2759        };
2760        assert_eq!(
2761            node.to_url_with_options(true, Some("global-user"), Some("global-secret")),
2762            "rediss://:node-secret@node.example.com:7000"
2763        );
2764    }
2765
2766    #[test]
2767    fn test_from_seed_parses_plain_host_port() {
2768        let node = ClusterNode::from_seed("cluster-node-1:7005").expect("node should parse");
2769        assert_eq!(node.host, "cluster-node-1");
2770        assert_eq!(node.port, 7005);
2771    }
2772
2773    #[test]
2774    fn test_from_seed_keeps_scheme_urls() {
2775        let node =
2776            ClusterNode::from_seed("rediss://secure.example.com:7005").expect("node should parse");
2777        assert_eq!(node.host, "rediss://secure.example.com:7005");
2778        assert_eq!(node.port, 7005);
2779    }
2780
2781    #[test]
2782    fn test_to_url_aws_elasticache_hostname() {
2783        let node = ClusterNode {
2784            host: "rediss://my-cluster.use1.cache.amazonaws.com".to_string(),
2785            port: 6379,
2786        };
2787        assert_eq!(
2788            node.to_url(),
2789            "rediss://my-cluster.use1.cache.amazonaws.com:6379"
2790        );
2791    }
2792
2793    #[test]
2794    fn test_to_url_with_ipv6_no_port() {
2795        let node = ClusterNode {
2796            host: "rediss://[::1]".to_string(),
2797            port: 6379,
2798        };
2799        assert_eq!(node.to_url(), "rediss://[::1]:6379");
2800    }
2801
2802    #[test]
2803    fn test_to_url_with_ipv6_and_port_in_url() {
2804        let node = ClusterNode {
2805            host: "rediss://[::1]:7000".to_string(),
2806            port: 6379,
2807        };
2808        assert_eq!(node.to_url(), "rediss://[::1]:7000");
2809    }
2810
2811    #[test]
2812    fn test_to_url_with_ipv6_full_address_no_port() {
2813        let node = ClusterNode {
2814            host: "rediss://[2001:db8::1]".to_string(),
2815            port: 6379,
2816        };
2817        assert_eq!(node.to_url(), "rediss://[2001:db8::1]:6379");
2818    }
2819
2820    #[test]
2821    fn test_to_url_with_ipv6_full_address_with_port() {
2822        let node = ClusterNode {
2823            host: "rediss://[2001:db8::1]:7000".to_string(),
2824            port: 6379,
2825        };
2826        assert_eq!(node.to_url(), "rediss://[2001:db8::1]:7000");
2827    }
2828
2829    #[test]
2830    fn test_to_url_with_redis_protocol_ipv6() {
2831        let node = ClusterNode {
2832            host: "redis://[::1]".to_string(),
2833            port: 6379,
2834        };
2835        assert_eq!(node.to_url(), "redis://[::1]:6379");
2836    }
2837}