Skip to main content

sockudo_core/
options.rs

1use crate::app::App;
2use crate::utils::{parse_bool_env, parse_env, parse_env_optional};
3use ahash::AHashMap;
4use serde::{Deserialize, Serialize};
5use std::str::FromStr;
6use tracing::{info, warn};
7use url::Url;
8
9// Custom deserializer for octal permission mode (string format only, like chmod)
10fn deserialize_octal_permission<'de, D>(deserializer: D) -> Result<u32, D::Error>
11where
12    D: serde::Deserializer<'de>,
13{
14    use serde::de::{self};
15
16    let s = String::deserialize(deserializer)?;
17
18    // Validate that the string contains only octal digits
19    if !s.chars().all(|c| c.is_digit(8)) {
20        return Err(de::Error::custom(format!(
21            "invalid octal permission mode '{}': must contain only digits 0-7",
22            s
23        )));
24    }
25
26    // Parse as octal
27    let mode = u32::from_str_radix(&s, 8)
28        .map_err(|_| de::Error::custom(format!("invalid octal permission mode: {}", s)))?;
29
30    // Validate it's within valid Unix permission range
31    if mode > 0o777 {
32        return Err(de::Error::custom(format!(
33            "permission mode '{}' exceeds maximum value 777",
34            s
35        )));
36    }
37
38    Ok(mode)
39}
40
41// Helper function to parse driver enums with fallback behavior (matches main.rs)
42fn parse_driver_enum<T: FromStr + Clone + std::fmt::Debug>(
43    driver_str: String,
44    default_driver: T,
45    driver_name: &str,
46) -> T
47where
48    <T as FromStr>::Err: std::fmt::Debug,
49{
50    match T::from_str(&driver_str.to_lowercase()) {
51        Ok(driver_enum) => driver_enum,
52        Err(e) => {
53            warn!(
54                "Failed to parse {} driver from string '{}': {:?}. Using default: {:?}.",
55                driver_name, driver_str, e, default_driver
56            );
57            default_driver
58        }
59    }
60}
61
62fn override_db_pool_settings(db_conn: &mut DatabaseConnection, prefix: &str) {
63    if let Some(min) = parse_env_optional::<u32>(&format!("{}_POOL_MIN", prefix)) {
64        db_conn.pool_min = Some(min);
65    }
66    if let Some(max) = parse_env_optional::<u32>(&format!("{}_POOL_MAX", prefix)) {
67        db_conn.pool_max = Some(max);
68    }
69}
70
71// --- Enums for Driver Types ---
72
73#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(rename_all = "lowercase")]
75pub enum AdapterDriver {
76    #[default]
77    Local,
78    Redis,
79    #[serde(rename = "redis-cluster")]
80    RedisCluster,
81    Nats,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
85#[serde(default)]
86pub struct DynamoDbSettings {
87    pub region: String,
88    pub table_name: String,
89    pub endpoint_url: Option<String>,
90    pub aws_access_key_id: Option<String>,
91    pub aws_secret_access_key: Option<String>,
92    pub aws_profile_name: Option<String>,
93}
94
95impl Default for DynamoDbSettings {
96    fn default() -> Self {
97        Self {
98            region: "us-east-1".to_string(),
99            table_name: "sockudo-applications".to_string(),
100            endpoint_url: None,
101            aws_access_key_id: None,
102            aws_secret_access_key: None,
103            aws_profile_name: None,
104        }
105    }
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
109#[serde(default)]
110pub struct ScyllaDbSettings {
111    pub nodes: Vec<String>,
112    pub keyspace: String,
113    pub table_name: String,
114    pub username: Option<String>,
115    pub password: Option<String>,
116    pub replication_class: String,
117    pub replication_factor: u32,
118}
119
120impl Default for ScyllaDbSettings {
121    fn default() -> Self {
122        Self {
123            nodes: vec!["127.0.0.1:9042".to_string()],
124            keyspace: "sockudo".to_string(),
125            table_name: "applications".to_string(),
126            username: None,
127            password: None,
128            replication_class: "SimpleStrategy".to_string(),
129            replication_factor: 3,
130        }
131    }
132}
133
134impl FromStr for AdapterDriver {
135    type Err = String;
136    fn from_str(s: &str) -> Result<Self, Self::Err> {
137        match s.to_lowercase().as_str() {
138            "local" => Ok(AdapterDriver::Local),
139            "redis" => Ok(AdapterDriver::Redis),
140            "redis-cluster" => Ok(AdapterDriver::RedisCluster),
141            "nats" => Ok(AdapterDriver::Nats),
142            _ => Err(format!("Unknown adapter driver: {s}")),
143        }
144    }
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
148#[serde(rename_all = "lowercase")]
149pub enum AppManagerDriver {
150    #[default]
151    Memory,
152    Mysql,
153    Dynamodb,
154    PgSql,
155    ScyllaDb,
156}
157impl FromStr for AppManagerDriver {
158    type Err = String;
159    fn from_str(s: &str) -> Result<Self, Self::Err> {
160        match s.to_lowercase().as_str() {
161            "memory" => Ok(AppManagerDriver::Memory),
162            "mysql" => Ok(AppManagerDriver::Mysql),
163            "dynamodb" => Ok(AppManagerDriver::Dynamodb),
164            "pgsql" | "postgres" | "postgresql" => Ok(AppManagerDriver::PgSql),
165            "scylladb" | "scylla" => Ok(AppManagerDriver::ScyllaDb),
166            _ => Err(format!("Unknown app manager driver: {s}")),
167        }
168    }
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
172#[serde(rename_all = "lowercase")]
173pub enum CacheDriver {
174    #[default]
175    Memory,
176    Redis,
177    #[serde(rename = "redis-cluster")]
178    RedisCluster,
179    None,
180}
181
182impl FromStr for CacheDriver {
183    type Err = String;
184    fn from_str(s: &str) -> Result<Self, Self::Err> {
185        match s.to_lowercase().as_str() {
186            "memory" => Ok(CacheDriver::Memory),
187            "redis" => Ok(CacheDriver::Redis),
188            "redis-cluster" => Ok(CacheDriver::RedisCluster),
189            "none" => Ok(CacheDriver::None),
190            _ => Err(format!("Unknown cache driver: {s}")),
191        }
192    }
193}
194
195#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
196#[serde(rename_all = "lowercase")]
197pub enum QueueDriver {
198    Memory,
199    #[default]
200    Redis,
201    #[serde(rename = "redis-cluster")]
202    RedisCluster,
203    Sqs,
204    Sns,
205    None,
206}
207
208impl FromStr for QueueDriver {
209    type Err = String;
210    fn from_str(s: &str) -> Result<Self, Self::Err> {
211        match s.to_lowercase().as_str() {
212            "memory" => Ok(QueueDriver::Memory),
213            "redis" => Ok(QueueDriver::Redis),
214            "redis-cluster" => Ok(QueueDriver::RedisCluster),
215            "sqs" => Ok(QueueDriver::Sqs),
216            "sns" => Ok(QueueDriver::Sns),
217            "none" => Ok(QueueDriver::None),
218            _ => Err(format!("Unknown queue driver: {s}")),
219        }
220    }
221}
222
223impl AsRef<str> for QueueDriver {
224    fn as_ref(&self) -> &str {
225        match self {
226            QueueDriver::Memory => "memory",
227            QueueDriver::Redis => "redis",
228            QueueDriver::RedisCluster => "redis-cluster",
229            QueueDriver::Sqs => "sqs",
230            QueueDriver::Sns => "sns",
231            QueueDriver::None => "none",
232        }
233    }
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
237#[serde(default)]
238pub struct RedisClusterQueueConfig {
239    pub concurrency: u32,
240    pub prefix: Option<String>,
241    pub nodes: Vec<String>,
242    pub request_timeout_ms: u64,
243}
244
245impl Default for RedisClusterQueueConfig {
246    fn default() -> Self {
247        Self {
248            concurrency: 5,
249            prefix: Some("sockudo_queue:".to_string()),
250            nodes: vec!["redis://127.0.0.1:6379".to_string()],
251            request_timeout_ms: 5000,
252        }
253    }
254}
255
256#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
257#[serde(rename_all = "lowercase")]
258pub enum MetricsDriver {
259    #[default]
260    Prometheus,
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
264#[serde(rename_all = "lowercase")]
265pub enum LogOutputFormat {
266    #[default]
267    Human,
268    Json,
269}
270
271impl FromStr for LogOutputFormat {
272    type Err = String;
273    fn from_str(s: &str) -> Result<Self, Self::Err> {
274        match s.to_lowercase().as_str() {
275            "human" => Ok(LogOutputFormat::Human),
276            "json" => Ok(LogOutputFormat::Json),
277            _ => Err(format!("Unknown log output format: {s}")),
278        }
279    }
280}
281
282impl FromStr for MetricsDriver {
283    type Err = String;
284    fn from_str(s: &str) -> Result<Self, Self::Err> {
285        match s.to_lowercase().as_str() {
286            "prometheus" => Ok(MetricsDriver::Prometheus),
287            _ => Err(format!("Unknown metrics driver: {s}")),
288        }
289    }
290}
291
292impl AsRef<str> for MetricsDriver {
293    fn as_ref(&self) -> &str {
294        match self {
295            MetricsDriver::Prometheus => "prometheus",
296        }
297    }
298}
299
300// --- Main Configuration Struct ---
301#[derive(Debug, Clone, Serialize, Deserialize)]
302#[serde(default)]
303pub struct ServerOptions {
304    pub adapter: AdapterConfig,
305    pub app_manager: AppManagerConfig,
306    pub cache: CacheConfig,
307    pub channel_limits: ChannelLimits,
308    pub cors: CorsConfig,
309    pub database: DatabaseConfig,
310    pub database_pooling: DatabasePooling,
311    pub debug: bool,
312    pub event_limits: EventLimits,
313    pub host: String,
314    pub http_api: HttpApiConfig,
315    pub instance: InstanceConfig,
316    pub logging: Option<LoggingConfig>,
317    pub metrics: MetricsConfig,
318    pub mode: String,
319    pub port: u16,
320    pub path_prefix: String,
321    pub presence: PresenceConfig,
322    pub queue: QueueConfig,
323    pub rate_limiter: RateLimiterConfig,
324    pub shutdown_grace_period: u64,
325    pub ssl: SslConfig,
326    pub user_authentication_timeout: u64,
327    pub webhooks: WebhooksConfig,
328    pub websocket_max_payload_kb: u32,
329    pub cleanup: CleanupConfig,
330    pub activity_timeout: u64,
331    pub cluster_health: ClusterHealthConfig,
332    pub unix_socket: UnixSocketConfig,
333    pub delta_compression: DeltaCompressionOptionsConfig,
334    pub tag_filtering: TagFilteringConfig,
335    pub websocket: WebSocketConfig,
336}
337
338// --- Configuration Sub-Structs ---
339
340#[derive(Debug, Clone, Serialize, Deserialize)]
341#[serde(default)]
342pub struct SqsQueueConfig {
343    pub region: String,
344    pub queue_url_prefix: Option<String>,
345    pub visibility_timeout: i32,
346    pub endpoint_url: Option<String>,
347    pub max_messages: i32,
348    pub wait_time_seconds: i32,
349    pub concurrency: u32,
350    pub fifo: bool,
351    pub message_group_id: Option<String>,
352}
353
354#[derive(Debug, Clone, Serialize, Deserialize)]
355#[serde(default)]
356pub struct SnsQueueConfig {
357    pub region: String,
358    pub topic_arn: String,
359    pub endpoint_url: Option<String>,
360}
361
362#[derive(Debug, Clone, Serialize, Deserialize)]
363#[serde(default)]
364pub struct AdapterConfig {
365    pub driver: AdapterDriver,
366    pub redis: RedisAdapterConfig,
367    pub cluster: RedisClusterAdapterConfig,
368    pub nats: NatsAdapterConfig,
369    #[serde(default = "default_buffer_multiplier_per_cpu")]
370    pub buffer_multiplier_per_cpu: usize,
371    pub cluster_health: ClusterHealthConfig,
372    #[serde(default = "default_enable_socket_counting")]
373    pub enable_socket_counting: bool,
374}
375
376fn default_enable_socket_counting() -> bool {
377    true
378}
379
380fn default_buffer_multiplier_per_cpu() -> usize {
381    64
382}
383
384impl Default for AdapterConfig {
385    fn default() -> Self {
386        Self {
387            driver: AdapterDriver::default(),
388            redis: RedisAdapterConfig::default(),
389            cluster: RedisClusterAdapterConfig::default(),
390            nats: NatsAdapterConfig::default(),
391            buffer_multiplier_per_cpu: default_buffer_multiplier_per_cpu(),
392            cluster_health: ClusterHealthConfig::default(),
393            enable_socket_counting: default_enable_socket_counting(),
394        }
395    }
396}
397
398#[derive(Debug, Clone, Serialize, Deserialize)]
399#[serde(default)]
400pub struct RedisAdapterConfig {
401    pub requests_timeout: u64,
402    pub prefix: String,
403    pub redis_pub_options: AHashMap<String, sonic_rs::Value>,
404    pub redis_sub_options: AHashMap<String, sonic_rs::Value>,
405    pub cluster_mode: bool,
406}
407
408#[derive(Debug, Clone, Serialize, Deserialize)]
409#[serde(default)]
410pub struct RedisClusterAdapterConfig {
411    pub nodes: Vec<String>,
412    pub prefix: String,
413    pub request_timeout_ms: u64,
414    pub use_connection_manager: bool,
415    #[serde(default)]
416    pub use_sharded_pubsub: bool,
417}
418
419#[derive(Debug, Clone, Serialize, Deserialize)]
420#[serde(default)]
421pub struct NatsAdapterConfig {
422    pub servers: Vec<String>,
423    pub prefix: String,
424    pub request_timeout_ms: u64,
425    pub username: Option<String>,
426    pub password: Option<String>,
427    pub token: Option<String>,
428    pub connection_timeout_ms: u64,
429    pub nodes_number: Option<u32>,
430}
431
432#[derive(Debug, Clone, Serialize, Deserialize, Default)]
433#[serde(default)]
434pub struct AppManagerConfig {
435    pub driver: AppManagerDriver,
436    pub array: ArrayConfig,
437    pub cache: CacheSettings,
438    pub scylladb: ScyllaDbSettings,
439}
440
441#[derive(Debug, Clone, Serialize, Deserialize, Default)]
442#[serde(default)]
443pub struct ArrayConfig {
444    pub apps: Vec<App>,
445}
446
447#[derive(Debug, Clone, Serialize, Deserialize)]
448#[serde(default)]
449pub struct CacheSettings {
450    pub enabled: bool,
451    pub ttl: u64,
452}
453
454#[derive(Debug, Clone, Serialize, Deserialize)]
455#[serde(default)]
456pub struct MemoryCacheOptions {
457    pub ttl: u64,
458    pub cleanup_interval: u64,
459    pub max_capacity: u64,
460}
461
462#[derive(Debug, Clone, Serialize, Deserialize)]
463#[serde(default)]
464pub struct CacheConfig {
465    pub driver: CacheDriver,
466    pub redis: RedisConfig,
467    pub memory: MemoryCacheOptions,
468}
469
470#[derive(Debug, Clone, Serialize, Deserialize, Default)]
471#[serde(default)]
472pub struct RedisConfig {
473    pub prefix: Option<String>,
474    pub url_override: Option<String>,
475    pub cluster_mode: bool,
476}
477
478#[derive(Debug, Clone, Serialize, Deserialize)]
479#[serde(default)]
480pub struct ChannelLimits {
481    pub max_name_length: u32,
482    pub cache_ttl: u64,
483}
484
485#[derive(Debug, Clone, Serialize, Deserialize)]
486#[serde(default)]
487pub struct CorsConfig {
488    pub credentials: bool,
489    pub origin: Vec<String>,
490    pub methods: Vec<String>,
491    pub allowed_headers: Vec<String>,
492}
493
494#[derive(Debug, Clone, Serialize, Deserialize, Default)]
495#[serde(default)]
496pub struct DatabaseConfig {
497    pub mysql: DatabaseConnection,
498    pub postgres: DatabaseConnection,
499    pub redis: RedisConnection,
500    pub dynamodb: DynamoDbSettings,
501    pub scylladb: ScyllaDbSettings,
502}
503
504#[derive(Debug, Clone, Serialize, Deserialize)]
505#[serde(default)]
506pub struct DatabaseConnection {
507    pub host: String,
508    pub port: u16,
509    pub username: String,
510    pub password: String,
511    pub database: String,
512    pub table_name: String,
513    pub connection_pool_size: u32,
514    pub pool_min: Option<u32>,
515    pub pool_max: Option<u32>,
516    pub cache_ttl: u64,
517    pub cache_cleanup_interval: u64,
518    pub cache_max_capacity: u64,
519}
520
521#[derive(Debug, Clone, Serialize, Deserialize)]
522#[serde(default)]
523pub struct RedisConnection {
524    pub host: String,
525    pub port: u16,
526    pub db: u32,
527    pub username: Option<String>,
528    pub password: Option<String>,
529    pub key_prefix: String,
530    pub sentinels: Vec<RedisSentinel>,
531    pub sentinel_password: Option<String>,
532    pub name: String,
533    pub cluster: RedisClusterConnection,
534    /// Legacy field kept for backward compatibility. Prefer `database.redis.cluster.nodes`.
535    pub cluster_nodes: Vec<ClusterNode>,
536}
537
538#[derive(Debug, Clone, Serialize, Deserialize)]
539#[serde(default)]
540pub struct RedisSentinel {
541    pub host: String,
542    pub port: u16,
543}
544
545#[derive(Debug, Clone, Serialize, Deserialize, Default)]
546#[serde(default)]
547pub struct RedisClusterConnection {
548    pub nodes: Vec<ClusterNode>,
549    pub username: Option<String>,
550    pub password: Option<String>,
551    #[serde(alias = "useTLS")]
552    pub use_tls: bool,
553}
554
555impl RedisConnection {
556    /// Returns true if Redis Sentinel is configured.
557    pub fn is_sentinel_configured(&self) -> bool {
558        !self.sentinels.is_empty()
559    }
560
561    /// Builds a Redis connection URL based on the configuration.
562    pub fn to_url(&self) -> String {
563        if self.is_sentinel_configured() {
564            self.build_sentinel_url()
565        } else {
566            self.build_standard_url()
567        }
568    }
569
570    fn build_standard_url(&self) -> String {
571        let mut url = String::from("redis://");
572
573        if let Some(ref username) = self.username {
574            url.push_str(username);
575            if let Some(ref password) = self.password {
576                url.push(':');
577                url.push_str(&urlencoding::encode(password));
578            }
579            url.push('@');
580        } else if let Some(ref password) = self.password {
581            url.push(':');
582            url.push_str(&urlencoding::encode(password));
583            url.push('@');
584        }
585
586        url.push_str(&self.host);
587        url.push(':');
588        url.push_str(&self.port.to_string());
589        url.push('/');
590        url.push_str(&self.db.to_string());
591
592        url
593    }
594
595    fn build_sentinel_url(&self) -> String {
596        let mut url = String::from("redis+sentinel://");
597
598        if let Some(ref sentinel_password) = self.sentinel_password {
599            url.push(':');
600            url.push_str(&urlencoding::encode(sentinel_password));
601            url.push('@');
602        }
603
604        let sentinel_hosts: Vec<String> = self
605            .sentinels
606            .iter()
607            .map(|s| format!("{}:{}", s.host, s.port))
608            .collect();
609        url.push_str(&sentinel_hosts.join(","));
610
611        url.push('/');
612        url.push_str(&self.name);
613        url.push('/');
614        url.push_str(&self.db.to_string());
615
616        let mut params = Vec::new();
617        if let Some(ref password) = self.password {
618            params.push(format!("password={}", urlencoding::encode(password)));
619        }
620        if let Some(ref username) = self.username {
621            params.push(format!("username={}", urlencoding::encode(username)));
622        }
623
624        if !params.is_empty() {
625            url.push('?');
626            url.push_str(&params.join("&"));
627        }
628
629        url
630    }
631
632    /// Returns true when cluster nodes are configured via either the new (`cluster.nodes`)
633    /// or legacy (`cluster_nodes`) field.
634    pub fn has_cluster_nodes(&self) -> bool {
635        !self.cluster.nodes.is_empty() || !self.cluster_nodes.is_empty()
636    }
637
638    /// Returns normalized Redis Cluster seed URLs from the canonical cluster configuration.
639    /// Falls back to legacy `cluster_nodes` for backward compatibility.
640    pub fn cluster_node_urls(&self) -> Vec<String> {
641        if !self.cluster.nodes.is_empty() {
642            return self.build_cluster_urls(&self.cluster.nodes);
643        }
644        self.build_cluster_urls(&self.cluster_nodes)
645    }
646
647    /// Normalizes any list of seed strings (`host:port`, `redis://...`, `rediss://...`) using
648    /// shared cluster auth/TLS options.
649    pub fn normalize_cluster_seed_urls(&self, seeds: &[String]) -> Vec<String> {
650        self.build_cluster_urls(
651            &seeds
652                .iter()
653                .filter_map(|seed| ClusterNode::from_seed(seed))
654                .collect::<Vec<ClusterNode>>(),
655        )
656    }
657
658    fn build_cluster_urls(&self, nodes: &[ClusterNode]) -> Vec<String> {
659        let username = self
660            .cluster
661            .username
662            .as_deref()
663            .or(self.username.as_deref());
664        let password = self
665            .cluster
666            .password
667            .as_deref()
668            .or(self.password.as_deref());
669        let use_tls = self.cluster.use_tls;
670
671        nodes
672            .iter()
673            .map(|node| node.to_url_with_options(use_tls, username, password))
674            .collect()
675    }
676}
677
678impl RedisSentinel {
679    pub fn to_host_port(&self) -> String {
680        format!("{}:{}", self.host, self.port)
681    }
682}
683
684#[derive(Debug, Clone, Serialize, Deserialize)]
685#[serde(default)]
686pub struct ClusterNode {
687    pub host: String,
688    pub port: u16,
689}
690
691impl ClusterNode {
692    pub fn to_url(&self) -> String {
693        self.to_url_with_options(false, None, None)
694    }
695
696    pub fn to_url_with_options(
697        &self,
698        use_tls: bool,
699        username: Option<&str>,
700        password: Option<&str>,
701    ) -> String {
702        let host = self.host.trim();
703
704        if host.starts_with("redis://") || host.starts_with("rediss://") {
705            if let Ok(parsed) = Url::parse(host)
706                && let Some(host_str) = parsed.host_str()
707            {
708                let scheme = parsed.scheme();
709                let port = parsed.port_or_known_default().unwrap_or(self.port);
710                let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
711                let parsed_password = parsed.password();
712                let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
713                let (effective_username, effective_password) = if has_embedded_auth {
714                    (parsed_username, parsed_password)
715                } else {
716                    (username, password)
717                };
718
719                return build_redis_url(
720                    scheme,
721                    host_str,
722                    port,
723                    effective_username,
724                    effective_password,
725                );
726            }
727
728            // Fallback for malformed URLs
729            let has_port = if let Some(bracket_pos) = host.rfind(']') {
730                host[bracket_pos..].contains(':')
731            } else {
732                host.split(':').count() >= 3
733            };
734            let base = if has_port {
735                host.to_string()
736            } else {
737                format!("{}:{}", host, self.port)
738            };
739
740            if let Ok(parsed) = Url::parse(&base) {
741                let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
742                let parsed_password = parsed.password();
743                if let Some(host_str) = parsed.host_str() {
744                    let port = parsed.port_or_known_default().unwrap_or(self.port);
745                    let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
746                    let (effective_username, effective_password) = if has_embedded_auth {
747                        (parsed_username, parsed_password)
748                    } else {
749                        (username, password)
750                    };
751                    return build_redis_url(
752                        parsed.scheme(),
753                        host_str,
754                        port,
755                        effective_username,
756                        effective_password,
757                    );
758                }
759            }
760            return base;
761        }
762
763        let (normalized_host, normalized_port) = split_plain_host_and_port(host, self.port);
764        let scheme = if use_tls { "rediss" } else { "redis" };
765        build_redis_url(
766            scheme,
767            &normalized_host,
768            normalized_port,
769            username,
770            password,
771        )
772    }
773
774    pub fn from_seed(seed: &str) -> Option<Self> {
775        let trimmed = seed.trim();
776        if trimmed.is_empty() {
777            return None;
778        }
779
780        if trimmed.starts_with("redis://") || trimmed.starts_with("rediss://") {
781            let port = Url::parse(trimmed)
782                .ok()
783                .and_then(|parsed| parsed.port_or_known_default())
784                .unwrap_or(6379);
785            return Some(Self {
786                host: trimmed.to_string(),
787                port,
788            });
789        }
790
791        let (host, port) = split_plain_host_and_port(trimmed, 6379);
792        Some(Self { host, port })
793    }
794}
795
796fn split_plain_host_and_port(raw_host: &str, default_port: u16) -> (String, u16) {
797    let host = raw_host.trim();
798
799    // Handle bracketed IPv6: [::1]:6379
800    if host.starts_with('[') {
801        if let Some(end_bracket) = host.find(']') {
802            let host_part = host[1..end_bracket].to_string();
803            let remainder = &host[end_bracket + 1..];
804            if let Some(port_str) = remainder.strip_prefix(':')
805                && let Ok(port) = port_str.parse::<u16>()
806            {
807                return (host_part, port);
808            }
809            return (host_part, default_port);
810        }
811        return (host.to_string(), default_port);
812    }
813
814    // Handle hostname/IP with port: host:6379
815    if host.matches(':').count() == 1
816        && let Some((host_part, port_part)) = host.rsplit_once(':')
817        && let Ok(port) = port_part.parse::<u16>()
818    {
819        return (host_part.to_string(), port);
820    }
821
822    (host.to_string(), default_port)
823}
824
825fn build_redis_url(
826    scheme: &str,
827    host: &str,
828    port: u16,
829    username: Option<&str>,
830    password: Option<&str>,
831) -> String {
832    let mut url = format!("{scheme}://");
833
834    if let Some(user) = username {
835        url.push_str(&urlencoding::encode(user));
836        if let Some(pass) = password {
837            url.push(':');
838            url.push_str(&urlencoding::encode(pass));
839        }
840        url.push('@');
841    } else if let Some(pass) = password {
842        url.push(':');
843        url.push_str(&urlencoding::encode(pass));
844        url.push('@');
845    }
846
847    if host.contains(':') && !host.starts_with('[') {
848        url.push('[');
849        url.push_str(host);
850        url.push(']');
851    } else {
852        url.push_str(host);
853    }
854    url.push(':');
855    url.push_str(&port.to_string());
856    url
857}
858
859#[derive(Debug, Clone, Serialize, Deserialize)]
860#[serde(default)]
861pub struct DatabasePooling {
862    pub enabled: bool,
863    pub min: u32,
864    pub max: u32,
865}
866
867#[derive(Debug, Clone, Serialize, Deserialize)]
868#[serde(default)]
869pub struct EventLimits {
870    pub max_channels_at_once: u32,
871    pub max_name_length: u32,
872    pub max_payload_in_kb: u32,
873    pub max_batch_size: u32,
874}
875
876#[derive(Debug, Clone, Serialize, Deserialize)]
877#[serde(default)]
878pub struct HttpApiConfig {
879    pub request_limit_in_mb: u32,
880    pub accept_traffic: AcceptTraffic,
881}
882
883#[derive(Debug, Clone, Serialize, Deserialize)]
884#[serde(default)]
885pub struct AcceptTraffic {
886    pub memory_threshold: f64,
887}
888
889#[derive(Debug, Clone, Serialize, Deserialize)]
890#[serde(default)]
891pub struct InstanceConfig {
892    pub process_id: String,
893}
894
895#[derive(Debug, Clone, Serialize, Deserialize)]
896#[serde(default)]
897pub struct MetricsConfig {
898    pub enabled: bool,
899    pub driver: MetricsDriver,
900    pub host: String,
901    pub prometheus: PrometheusConfig,
902    pub port: u16,
903}
904
905#[derive(Debug, Clone, Serialize, Deserialize)]
906#[serde(default)]
907pub struct PrometheusConfig {
908    pub prefix: String,
909}
910
911#[derive(Debug, Clone, Serialize, Deserialize)]
912#[serde(default)]
913pub struct LoggingConfig {
914    pub colors_enabled: bool,
915    pub include_target: bool,
916}
917
918#[derive(Debug, Clone, Serialize, Deserialize)]
919#[serde(default)]
920pub struct PresenceConfig {
921    pub max_members_per_channel: u32,
922    pub max_member_size_in_kb: u32,
923}
924
925/// WebSocket connection buffer configuration
926/// Controls backpressure handling for slow consumers
927#[derive(Debug, Clone, Serialize, Deserialize)]
928#[serde(default)]
929pub struct WebSocketConfig {
930    pub max_messages: Option<usize>,
931    pub max_bytes: Option<usize>,
932    pub disconnect_on_buffer_full: bool,
933    pub max_message_size: usize,
934    pub max_frame_size: usize,
935    pub write_buffer_size: usize,
936    pub max_backpressure: usize,
937    pub auto_ping: bool,
938    pub ping_interval: u32,
939    pub idle_timeout: u32,
940    pub compression: String,
941}
942
943impl Default for WebSocketConfig {
944    fn default() -> Self {
945        Self {
946            max_messages: Some(1000),
947            max_bytes: None,
948            disconnect_on_buffer_full: true,
949            max_message_size: 64 * 1024 * 1024,
950            max_frame_size: 16 * 1024 * 1024,
951            write_buffer_size: 16 * 1024,
952            max_backpressure: 1024 * 1024,
953            auto_ping: true,
954            ping_interval: 30,
955            idle_timeout: 120,
956            compression: "disabled".to_string(),
957        }
958    }
959}
960
961impl WebSocketConfig {
962    /// Convert to WebSocketBufferConfig for runtime use
963    pub fn to_buffer_config(&self) -> crate::websocket::WebSocketBufferConfig {
964        use crate::websocket::{BufferLimit, WebSocketBufferConfig};
965
966        let limit = match (self.max_messages, self.max_bytes) {
967            (Some(messages), Some(bytes)) => BufferLimit::Both { messages, bytes },
968            (Some(messages), None) => BufferLimit::Messages(messages),
969            (None, Some(bytes)) => BufferLimit::Bytes(bytes),
970            (None, None) => BufferLimit::Messages(1000),
971        };
972
973        WebSocketBufferConfig {
974            limit,
975            disconnect_on_full: self.disconnect_on_buffer_full,
976        }
977    }
978
979    /// Convert to native sockudo-ws runtime configuration.
980    pub fn to_sockudo_ws_config(
981        &self,
982        websocket_max_payload_kb: u32,
983        activity_timeout: u64,
984    ) -> sockudo_ws::Config {
985        use sockudo_ws::Compression;
986
987        let compression = match self.compression.to_lowercase().as_str() {
988            "dedicated" => Compression::Dedicated,
989            "shared" => Compression::Shared,
990            "window256b" => Compression::Window256B,
991            "window1kb" => Compression::Window1KB,
992            "window2kb" => Compression::Window2KB,
993            "window4kb" => Compression::Window4KB,
994            "window8kb" => Compression::Window8KB,
995            "window16kb" => Compression::Window16KB,
996            "window32kb" => Compression::Window32KB,
997            _ => Compression::Disabled,
998        };
999
1000        sockudo_ws::Config::builder()
1001            .max_payload_length(
1002                self.max_bytes
1003                    .unwrap_or(websocket_max_payload_kb as usize * 1024),
1004            )
1005            .max_message_size(self.max_message_size)
1006            .max_frame_size(self.max_frame_size)
1007            .write_buffer_size(self.write_buffer_size)
1008            .max_backpressure(self.max_backpressure)
1009            .idle_timeout(self.idle_timeout)
1010            .auto_ping(self.auto_ping)
1011            .ping_interval(self.ping_interval.max((activity_timeout / 2).max(5) as u32))
1012            .compression(compression)
1013            .build()
1014    }
1015}
1016
1017#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1018#[serde(default)]
1019pub struct QueueConfig {
1020    pub driver: QueueDriver,
1021    pub redis: RedisQueueConfig,
1022    pub redis_cluster: RedisClusterQueueConfig,
1023    pub sqs: SqsQueueConfig,
1024    pub sns: SnsQueueConfig,
1025}
1026
1027#[derive(Debug, Clone, Serialize, Deserialize)]
1028#[serde(default)]
1029pub struct RedisQueueConfig {
1030    pub concurrency: u32,
1031    pub prefix: Option<String>,
1032    pub url_override: Option<String>,
1033    pub cluster_mode: bool,
1034}
1035
1036#[derive(Clone, Debug, Serialize, Deserialize)]
1037#[serde(default)]
1038pub struct RateLimit {
1039    pub max_requests: u32,
1040    pub window_seconds: u64,
1041    pub identifier: Option<String>,
1042    pub trust_hops: Option<u32>,
1043}
1044
1045#[derive(Debug, Clone, Serialize, Deserialize)]
1046#[serde(default)]
1047pub struct RateLimiterConfig {
1048    pub enabled: bool,
1049    pub driver: CacheDriver,
1050    pub api_rate_limit: RateLimit,
1051    pub websocket_rate_limit: RateLimit,
1052    pub redis: RedisConfig,
1053}
1054
1055#[derive(Debug, Clone, Serialize, Deserialize)]
1056#[serde(default)]
1057pub struct SslConfig {
1058    pub enabled: bool,
1059    pub cert_path: String,
1060    pub key_path: String,
1061    pub passphrase: Option<String>,
1062    pub ca_path: Option<String>,
1063    pub redirect_http: bool,
1064    pub http_port: Option<u16>,
1065}
1066
1067#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1068#[serde(default)]
1069pub struct WebhooksConfig {
1070    pub batching: BatchingConfig,
1071}
1072
1073#[derive(Debug, Clone, Serialize, Deserialize)]
1074#[serde(default)]
1075pub struct BatchingConfig {
1076    pub enabled: bool,
1077    pub duration: u64,
1078    pub size: usize,
1079}
1080
1081#[derive(Debug, Clone, Serialize, Deserialize)]
1082#[serde(default)]
1083pub struct ClusterHealthConfig {
1084    pub enabled: bool,
1085    pub heartbeat_interval_ms: u64,
1086    pub node_timeout_ms: u64,
1087    pub cleanup_interval_ms: u64,
1088}
1089
1090#[derive(Debug, Clone, Serialize, Deserialize)]
1091#[serde(default)]
1092pub struct UnixSocketConfig {
1093    pub enabled: bool,
1094    pub path: String,
1095    #[serde(deserialize_with = "deserialize_octal_permission")]
1096    pub permission_mode: u32,
1097}
1098
1099#[derive(Debug, Clone, Serialize, Deserialize)]
1100#[serde(default)]
1101pub struct DeltaCompressionOptionsConfig {
1102    pub enabled: bool,
1103    pub algorithm: String,
1104    pub full_message_interval: u32,
1105    pub min_message_size: usize,
1106    pub max_state_age_secs: u64,
1107    pub max_channel_states_per_socket: usize,
1108    pub max_conflation_states_per_channel: Option<usize>,
1109    pub conflation_key_path: Option<String>,
1110    pub cluster_coordination: bool,
1111    pub omit_delta_algorithm: bool,
1112}
1113
1114/// Cleanup system configuration (minimal version for options; full impl lives in sockudo crate)
1115#[derive(Debug, Clone, Serialize, Deserialize)]
1116#[serde(default)]
1117pub struct CleanupConfig {
1118    pub queue_buffer_size: usize,
1119    pub batch_size: usize,
1120    pub batch_timeout_ms: u64,
1121    pub worker_threads: WorkerThreadsConfig,
1122    pub max_retry_attempts: u32,
1123    pub async_enabled: bool,
1124    pub fallback_to_sync: bool,
1125}
1126
1127/// Worker threads configuration for the cleanup system
1128#[derive(Debug, Clone)]
1129pub enum WorkerThreadsConfig {
1130    Auto,
1131    Fixed(usize),
1132}
1133
1134impl serde::Serialize for WorkerThreadsConfig {
1135    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1136    where
1137        S: serde::Serializer,
1138    {
1139        match self {
1140            WorkerThreadsConfig::Auto => serializer.serialize_str("auto"),
1141            WorkerThreadsConfig::Fixed(n) => serializer.serialize_u64(*n as u64),
1142        }
1143    }
1144}
1145
1146impl<'de> serde::Deserialize<'de> for WorkerThreadsConfig {
1147    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1148    where
1149        D: serde::Deserializer<'de>,
1150    {
1151        use serde::de;
1152        struct WorkerThreadsVisitor;
1153        impl<'de> de::Visitor<'de> for WorkerThreadsVisitor {
1154            type Value = WorkerThreadsConfig;
1155
1156            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1157                formatter.write_str(r#""auto" or a positive integer"#)
1158            }
1159
1160            fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
1161                if value.eq_ignore_ascii_case("auto") {
1162                    Ok(WorkerThreadsConfig::Auto)
1163                } else if let Ok(n) = value.parse::<usize>() {
1164                    Ok(WorkerThreadsConfig::Fixed(n))
1165                } else {
1166                    Err(E::custom(format!(
1167                        "expected 'auto' or a number, got '{value}'"
1168                    )))
1169                }
1170            }
1171
1172            fn visit_u64<E: de::Error>(self, value: u64) -> Result<Self::Value, E> {
1173                Ok(WorkerThreadsConfig::Fixed(value as usize))
1174            }
1175
1176            fn visit_i64<E: de::Error>(self, value: i64) -> Result<Self::Value, E> {
1177                if value >= 0 {
1178                    Ok(WorkerThreadsConfig::Fixed(value as usize))
1179                } else {
1180                    Err(E::custom("worker_threads must be non-negative"))
1181                }
1182            }
1183        }
1184        deserializer.deserialize_any(WorkerThreadsVisitor)
1185    }
1186}
1187
1188impl Default for CleanupConfig {
1189    fn default() -> Self {
1190        Self {
1191            queue_buffer_size: 1024,
1192            batch_size: 64,
1193            batch_timeout_ms: 100,
1194            worker_threads: WorkerThreadsConfig::Auto,
1195            max_retry_attempts: 3,
1196            async_enabled: true,
1197            fallback_to_sync: true,
1198        }
1199    }
1200}
1201
1202impl CleanupConfig {
1203    pub fn validate(&self) -> Result<(), String> {
1204        if self.queue_buffer_size == 0 {
1205            return Err("queue_buffer_size must be greater than 0".to_string());
1206        }
1207        if self.batch_size == 0 {
1208            return Err("batch_size must be greater than 0".to_string());
1209        }
1210        if self.batch_timeout_ms == 0 {
1211            return Err("batch_timeout_ms must be greater than 0".to_string());
1212        }
1213        if let WorkerThreadsConfig::Fixed(n) = self.worker_threads
1214            && n == 0
1215        {
1216            return Err("worker_threads must be greater than 0 when using fixed count".to_string());
1217        }
1218        Ok(())
1219    }
1220}
1221
1222#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1223#[serde(default)]
1224pub struct TagFilteringConfig {
1225    #[serde(default)]
1226    pub enabled: bool,
1227    #[serde(default = "default_true")]
1228    pub enable_tags: bool,
1229}
1230
1231fn default_true() -> bool {
1232    true
1233}
1234
1235// --- Default Implementations ---
1236
1237impl Default for ServerOptions {
1238    fn default() -> Self {
1239        Self {
1240            adapter: AdapterConfig::default(),
1241            app_manager: AppManagerConfig::default(),
1242            cache: CacheConfig::default(),
1243            channel_limits: ChannelLimits::default(),
1244            cors: CorsConfig::default(),
1245            database: DatabaseConfig::default(),
1246            database_pooling: DatabasePooling::default(),
1247            debug: false,
1248            tag_filtering: TagFilteringConfig::default(),
1249            event_limits: EventLimits::default(),
1250            host: "0.0.0.0".to_string(),
1251            http_api: HttpApiConfig::default(),
1252            instance: InstanceConfig::default(),
1253            logging: None,
1254            metrics: MetricsConfig::default(),
1255            mode: "production".to_string(),
1256            port: 6001,
1257            path_prefix: "/".to_string(),
1258            presence: PresenceConfig::default(),
1259            queue: QueueConfig::default(),
1260            rate_limiter: RateLimiterConfig::default(),
1261            shutdown_grace_period: 10,
1262            ssl: SslConfig::default(),
1263            user_authentication_timeout: 3600,
1264            webhooks: WebhooksConfig::default(),
1265            websocket_max_payload_kb: 64,
1266            cleanup: CleanupConfig::default(),
1267            activity_timeout: 120,
1268            cluster_health: ClusterHealthConfig::default(),
1269            unix_socket: UnixSocketConfig::default(),
1270            delta_compression: DeltaCompressionOptionsConfig::default(),
1271            websocket: WebSocketConfig::default(),
1272        }
1273    }
1274}
1275
1276impl Default for SqsQueueConfig {
1277    fn default() -> Self {
1278        Self {
1279            region: "us-east-1".to_string(),
1280            queue_url_prefix: None,
1281            visibility_timeout: 30,
1282            endpoint_url: None,
1283            max_messages: 10,
1284            wait_time_seconds: 5,
1285            concurrency: 5,
1286            fifo: false,
1287            message_group_id: Some("default".to_string()),
1288        }
1289    }
1290}
1291
1292impl Default for SnsQueueConfig {
1293    fn default() -> Self {
1294        Self {
1295            region: "us-east-1".to_string(),
1296            topic_arn: String::new(),
1297            endpoint_url: None,
1298        }
1299    }
1300}
1301
1302impl Default for RedisAdapterConfig {
1303    fn default() -> Self {
1304        Self {
1305            requests_timeout: 5000,
1306            prefix: "sockudo_adapter:".to_string(),
1307            redis_pub_options: AHashMap::new(),
1308            redis_sub_options: AHashMap::new(),
1309            cluster_mode: false,
1310        }
1311    }
1312}
1313
1314impl Default for RedisClusterAdapterConfig {
1315    fn default() -> Self {
1316        Self {
1317            nodes: vec![],
1318            prefix: "sockudo_adapter:".to_string(),
1319            request_timeout_ms: 1000,
1320            use_connection_manager: true,
1321            use_sharded_pubsub: false,
1322        }
1323    }
1324}
1325
1326impl Default for NatsAdapterConfig {
1327    fn default() -> Self {
1328        Self {
1329            servers: vec!["nats://localhost:4222".to_string()],
1330            prefix: "sockudo_adapter:".to_string(),
1331            request_timeout_ms: 5000,
1332            username: None,
1333            password: None,
1334            token: None,
1335            connection_timeout_ms: 5000,
1336            nodes_number: None,
1337        }
1338    }
1339}
1340
1341impl Default for CacheSettings {
1342    fn default() -> Self {
1343        Self {
1344            enabled: true,
1345            ttl: 300,
1346        }
1347    }
1348}
1349
1350impl Default for MemoryCacheOptions {
1351    fn default() -> Self {
1352        Self {
1353            ttl: 300,
1354            cleanup_interval: 60,
1355            max_capacity: 10000,
1356        }
1357    }
1358}
1359
1360impl Default for CacheConfig {
1361    fn default() -> Self {
1362        Self {
1363            driver: CacheDriver::default(),
1364            redis: RedisConfig {
1365                prefix: Some("sockudo_cache:".to_string()),
1366                url_override: None,
1367                cluster_mode: false,
1368            },
1369            memory: MemoryCacheOptions::default(),
1370        }
1371    }
1372}
1373
1374impl Default for ChannelLimits {
1375    fn default() -> Self {
1376        Self {
1377            max_name_length: 200,
1378            cache_ttl: 3600,
1379        }
1380    }
1381}
1382impl Default for CorsConfig {
1383    fn default() -> Self {
1384        Self {
1385            credentials: true,
1386            origin: vec!["*".to_string()],
1387            methods: vec!["GET".to_string(), "POST".to_string(), "OPTIONS".to_string()],
1388            allowed_headers: vec![
1389                "Authorization".to_string(),
1390                "Content-Type".to_string(),
1391                "X-Requested-With".to_string(),
1392                "Accept".to_string(),
1393            ],
1394        }
1395    }
1396}
1397
1398impl Default for DatabaseConnection {
1399    fn default() -> Self {
1400        Self {
1401            host: "localhost".to_string(),
1402            port: 3306,
1403            username: "root".to_string(),
1404            password: "".to_string(),
1405            database: "sockudo".to_string(),
1406            table_name: "applications".to_string(),
1407            connection_pool_size: 10,
1408            pool_min: None,
1409            pool_max: None,
1410            cache_ttl: 300,
1411            cache_cleanup_interval: 60,
1412            cache_max_capacity: 100,
1413        }
1414    }
1415}
1416
1417impl Default for RedisConnection {
1418    fn default() -> Self {
1419        Self {
1420            host: "127.0.0.1".to_string(),
1421            port: 6379,
1422            db: 0,
1423            username: None,
1424            password: None,
1425            key_prefix: "sockudo:".to_string(),
1426            sentinels: Vec::new(),
1427            sentinel_password: None,
1428            name: "mymaster".to_string(),
1429            cluster: RedisClusterConnection::default(),
1430            cluster_nodes: Vec::new(),
1431        }
1432    }
1433}
1434
1435impl Default for RedisSentinel {
1436    fn default() -> Self {
1437        Self {
1438            host: "localhost".to_string(),
1439            port: 26379,
1440        }
1441    }
1442}
1443
1444impl Default for ClusterNode {
1445    fn default() -> Self {
1446        Self {
1447            host: "127.0.0.1".to_string(),
1448            port: 7000,
1449        }
1450    }
1451}
1452
1453impl Default for DatabasePooling {
1454    fn default() -> Self {
1455        Self {
1456            enabled: true,
1457            min: 2,
1458            max: 10,
1459        }
1460    }
1461}
1462
1463impl Default for EventLimits {
1464    fn default() -> Self {
1465        Self {
1466            max_channels_at_once: 100,
1467            max_name_length: 200,
1468            max_payload_in_kb: 100,
1469            max_batch_size: 10,
1470        }
1471    }
1472}
1473
1474impl Default for HttpApiConfig {
1475    fn default() -> Self {
1476        Self {
1477            request_limit_in_mb: 10,
1478            accept_traffic: AcceptTraffic::default(),
1479        }
1480    }
1481}
1482
1483impl Default for AcceptTraffic {
1484    fn default() -> Self {
1485        Self {
1486            memory_threshold: 0.90,
1487        }
1488    }
1489}
1490
1491impl Default for InstanceConfig {
1492    fn default() -> Self {
1493        Self {
1494            process_id: uuid::Uuid::new_v4().to_string(),
1495        }
1496    }
1497}
1498
1499impl Default for LoggingConfig {
1500    fn default() -> Self {
1501        Self {
1502            colors_enabled: true,
1503            include_target: true,
1504        }
1505    }
1506}
1507
1508impl Default for MetricsConfig {
1509    fn default() -> Self {
1510        Self {
1511            enabled: true,
1512            driver: MetricsDriver::default(),
1513            host: "0.0.0.0".to_string(),
1514            prometheus: PrometheusConfig::default(),
1515            port: 9601,
1516        }
1517    }
1518}
1519
1520impl Default for PrometheusConfig {
1521    fn default() -> Self {
1522        Self {
1523            prefix: "sockudo_".to_string(),
1524        }
1525    }
1526}
1527
1528impl Default for PresenceConfig {
1529    fn default() -> Self {
1530        Self {
1531            max_members_per_channel: 100,
1532            max_member_size_in_kb: 2,
1533        }
1534    }
1535}
1536
1537impl Default for RedisQueueConfig {
1538    fn default() -> Self {
1539        Self {
1540            concurrency: 5,
1541            prefix: Some("sockudo_queue:".to_string()),
1542            url_override: None,
1543            cluster_mode: false,
1544        }
1545    }
1546}
1547
1548impl Default for RateLimit {
1549    fn default() -> Self {
1550        Self {
1551            max_requests: 60,
1552            window_seconds: 60,
1553            identifier: Some("default".to_string()),
1554            trust_hops: Some(0),
1555        }
1556    }
1557}
1558
1559impl Default for RateLimiterConfig {
1560    fn default() -> Self {
1561        Self {
1562            enabled: true,
1563            driver: CacheDriver::Memory,
1564            api_rate_limit: RateLimit {
1565                max_requests: 100,
1566                window_seconds: 60,
1567                identifier: Some("api".to_string()),
1568                trust_hops: Some(0),
1569            },
1570            websocket_rate_limit: RateLimit {
1571                max_requests: 20,
1572                window_seconds: 60,
1573                identifier: Some("websocket_connect".to_string()),
1574                trust_hops: Some(0),
1575            },
1576            redis: RedisConfig {
1577                prefix: Some("sockudo_rl:".to_string()),
1578                url_override: None,
1579                cluster_mode: false,
1580            },
1581        }
1582    }
1583}
1584
1585impl Default for SslConfig {
1586    fn default() -> Self {
1587        Self {
1588            enabled: false,
1589            cert_path: "".to_string(),
1590            key_path: "".to_string(),
1591            passphrase: None,
1592            ca_path: None,
1593            redirect_http: false,
1594            http_port: Some(80),
1595        }
1596    }
1597}
1598
1599impl Default for BatchingConfig {
1600    fn default() -> Self {
1601        Self {
1602            enabled: true,
1603            duration: 50,
1604            size: 100,
1605        }
1606    }
1607}
1608
1609impl Default for ClusterHealthConfig {
1610    fn default() -> Self {
1611        Self {
1612            enabled: true,
1613            heartbeat_interval_ms: 10000,
1614            node_timeout_ms: 30000,
1615            cleanup_interval_ms: 10000,
1616        }
1617    }
1618}
1619
1620impl Default for UnixSocketConfig {
1621    fn default() -> Self {
1622        Self {
1623            enabled: false,
1624            path: "/var/run/sockudo/sockudo.sock".to_string(),
1625            permission_mode: 0o660,
1626        }
1627    }
1628}
1629
1630impl Default for DeltaCompressionOptionsConfig {
1631    fn default() -> Self {
1632        Self {
1633            enabled: true,
1634            algorithm: "fossil".to_string(),
1635            full_message_interval: 10,
1636            min_message_size: 100,
1637            max_state_age_secs: 300,
1638            max_channel_states_per_socket: 100,
1639            max_conflation_states_per_channel: Some(100),
1640            conflation_key_path: None,
1641            cluster_coordination: false,
1642            omit_delta_algorithm: false,
1643        }
1644    }
1645}
1646
1647impl ClusterHealthConfig {
1648    pub fn validate(&self) -> Result<(), String> {
1649        if self.heartbeat_interval_ms == 0 {
1650            return Err("heartbeat_interval_ms must be greater than 0".to_string());
1651        }
1652        if self.node_timeout_ms == 0 {
1653            return Err("node_timeout_ms must be greater than 0".to_string());
1654        }
1655        if self.cleanup_interval_ms == 0 {
1656            return Err("cleanup_interval_ms must be greater than 0".to_string());
1657        }
1658
1659        if self.heartbeat_interval_ms > self.node_timeout_ms / 3 {
1660            return Err(format!(
1661                "heartbeat_interval_ms ({}) should be at least 3x smaller than node_timeout_ms ({}) to avoid false positive dead node detection. Recommended: heartbeat_interval_ms <= {}",
1662                self.heartbeat_interval_ms,
1663                self.node_timeout_ms,
1664                self.node_timeout_ms / 3
1665            ));
1666        }
1667
1668        if self.cleanup_interval_ms > self.node_timeout_ms {
1669            return Err(format!(
1670                "cleanup_interval_ms ({}) should not be larger than node_timeout_ms ({}) to ensure timely dead node detection",
1671                self.cleanup_interval_ms, self.node_timeout_ms
1672            ));
1673        }
1674
1675        Ok(())
1676    }
1677}
1678
1679impl ServerOptions {
1680    pub async fn load_from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
1681        let content = tokio::fs::read_to_string(path).await?;
1682        let options: Self = sonic_rs::from_str(&content)?;
1683        Ok(options)
1684    }
1685
1686    pub async fn override_from_env(&mut self) -> Result<(), Box<dyn std::error::Error>> {
1687        // --- General Settings ---
1688        if let Ok(mode) = std::env::var("ENVIRONMENT") {
1689            self.mode = mode;
1690        }
1691        self.debug = parse_bool_env("DEBUG_MODE", self.debug);
1692        if parse_bool_env("DEBUG", false) {
1693            self.debug = true;
1694            info!("DEBUG environment variable forces debug mode ON");
1695        }
1696
1697        self.activity_timeout = parse_env::<u64>("ACTIVITY_TIMEOUT", self.activity_timeout);
1698
1699        if let Ok(host) = std::env::var("HOST") {
1700            self.host = host;
1701        }
1702        self.port = parse_env::<u16>("PORT", self.port);
1703        self.shutdown_grace_period =
1704            parse_env::<u64>("SHUTDOWN_GRACE_PERIOD", self.shutdown_grace_period);
1705        self.user_authentication_timeout = parse_env::<u64>(
1706            "USER_AUTHENTICATION_TIMEOUT",
1707            self.user_authentication_timeout,
1708        );
1709        self.websocket_max_payload_kb =
1710            parse_env::<u32>("WEBSOCKET_MAX_PAYLOAD_KB", self.websocket_max_payload_kb);
1711        if let Ok(id) = std::env::var("INSTANCE_PROCESS_ID") {
1712            self.instance.process_id = id;
1713        }
1714
1715        // --- Driver Configuration ---
1716        if let Ok(driver_str) = std::env::var("ADAPTER_DRIVER") {
1717            self.adapter.driver =
1718                parse_driver_enum(driver_str, self.adapter.driver.clone(), "Adapter");
1719        }
1720        self.adapter.buffer_multiplier_per_cpu = parse_env::<usize>(
1721            "ADAPTER_BUFFER_MULTIPLIER_PER_CPU",
1722            self.adapter.buffer_multiplier_per_cpu,
1723        );
1724        self.adapter.enable_socket_counting = parse_env::<bool>(
1725            "ADAPTER_ENABLE_SOCKET_COUNTING",
1726            self.adapter.enable_socket_counting,
1727        );
1728        if let Ok(driver_str) = std::env::var("CACHE_DRIVER") {
1729            self.cache.driver = parse_driver_enum(driver_str, self.cache.driver.clone(), "Cache");
1730        }
1731        if let Ok(driver_str) = std::env::var("QUEUE_DRIVER") {
1732            self.queue.driver = parse_driver_enum(driver_str, self.queue.driver.clone(), "Queue");
1733        }
1734        if let Ok(driver_str) = std::env::var("APP_MANAGER_DRIVER") {
1735            self.app_manager.driver =
1736                parse_driver_enum(driver_str, self.app_manager.driver.clone(), "AppManager");
1737        }
1738        if let Ok(driver_str) = std::env::var("RATE_LIMITER_DRIVER") {
1739            self.rate_limiter.driver = parse_driver_enum(
1740                driver_str,
1741                self.rate_limiter.driver.clone(),
1742                "RateLimiter Backend",
1743            );
1744        }
1745
1746        // --- Database: Redis ---
1747        if let Ok(host) = std::env::var("DATABASE_REDIS_HOST") {
1748            self.database.redis.host = host;
1749        }
1750        self.database.redis.port =
1751            parse_env::<u16>("DATABASE_REDIS_PORT", self.database.redis.port);
1752        if let Ok(username) = std::env::var("DATABASE_REDIS_USERNAME") {
1753            self.database.redis.username = if username.is_empty() {
1754                None
1755            } else {
1756                Some(username)
1757            };
1758        }
1759        if let Ok(password) = std::env::var("DATABASE_REDIS_PASSWORD") {
1760            self.database.redis.password = Some(password);
1761        }
1762        self.database.redis.db = parse_env::<u32>("DATABASE_REDIS_DB", self.database.redis.db);
1763        if let Ok(prefix) = std::env::var("DATABASE_REDIS_KEY_PREFIX") {
1764            self.database.redis.key_prefix = prefix;
1765        }
1766        if let Ok(cluster_username) = std::env::var("DATABASE_REDIS_CLUSTER_USERNAME") {
1767            self.database.redis.cluster.username = if cluster_username.is_empty() {
1768                None
1769            } else {
1770                Some(cluster_username)
1771            };
1772        }
1773        if let Ok(cluster_password) = std::env::var("DATABASE_REDIS_CLUSTER_PASSWORD") {
1774            self.database.redis.cluster.password = Some(cluster_password);
1775        }
1776        self.database.redis.cluster.use_tls = parse_bool_env(
1777            "DATABASE_REDIS_CLUSTER_USE_TLS",
1778            self.database.redis.cluster.use_tls,
1779        );
1780
1781        // --- Database: MySQL ---
1782        if let Ok(host) = std::env::var("DATABASE_MYSQL_HOST") {
1783            self.database.mysql.host = host;
1784        }
1785        self.database.mysql.port =
1786            parse_env::<u16>("DATABASE_MYSQL_PORT", self.database.mysql.port);
1787        if let Ok(user) = std::env::var("DATABASE_MYSQL_USERNAME") {
1788            self.database.mysql.username = user;
1789        }
1790        if let Ok(pass) = std::env::var("DATABASE_MYSQL_PASSWORD") {
1791            self.database.mysql.password = pass;
1792        }
1793        if let Ok(db) = std::env::var("DATABASE_MYSQL_DATABASE") {
1794            self.database.mysql.database = db;
1795        }
1796        if let Ok(table) = std::env::var("DATABASE_MYSQL_TABLE_NAME") {
1797            self.database.mysql.table_name = table;
1798        }
1799        override_db_pool_settings(&mut self.database.mysql, "DATABASE_MYSQL");
1800
1801        // --- Database: PostgreSQL ---
1802        if let Ok(host) = std::env::var("DATABASE_POSTGRES_HOST") {
1803            self.database.postgres.host = host;
1804        }
1805        self.database.postgres.port =
1806            parse_env::<u16>("DATABASE_POSTGRES_PORT", self.database.postgres.port);
1807        if let Ok(user) = std::env::var("DATABASE_POSTGRES_USERNAME") {
1808            self.database.postgres.username = user;
1809        }
1810        if let Ok(pass) = std::env::var("DATABASE_POSTGRES_PASSWORD") {
1811            self.database.postgres.password = pass;
1812        }
1813        if let Ok(db) = std::env::var("DATABASE_POSTGRES_DATABASE") {
1814            self.database.postgres.database = db;
1815        }
1816        override_db_pool_settings(&mut self.database.postgres, "DATABASE_POSTGRES");
1817
1818        // --- Database: DynamoDB ---
1819        if let Ok(region) = std::env::var("DATABASE_DYNAMODB_REGION") {
1820            self.database.dynamodb.region = region;
1821        }
1822        if let Ok(table) = std::env::var("DATABASE_DYNAMODB_TABLE_NAME") {
1823            self.database.dynamodb.table_name = table;
1824        }
1825        if let Ok(endpoint) = std::env::var("DATABASE_DYNAMODB_ENDPOINT_URL") {
1826            self.database.dynamodb.endpoint_url = Some(endpoint);
1827        }
1828        if let Ok(key_id) = std::env::var("AWS_ACCESS_KEY_ID") {
1829            self.database.dynamodb.aws_access_key_id = Some(key_id);
1830        }
1831        if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") {
1832            self.database.dynamodb.aws_secret_access_key = Some(secret);
1833        }
1834
1835        // --- Redis Cluster ---
1836        let apply_redis_cluster_nodes = |options: &mut Self, nodes: &str| {
1837            let node_list: Vec<String> = nodes
1838                .split(',')
1839                .map(|s| s.trim())
1840                .filter(|s| !s.is_empty())
1841                .map(ToString::to_string)
1842                .collect();
1843
1844            options.adapter.cluster.nodes = node_list.clone();
1845            options.queue.redis_cluster.nodes = node_list.clone();
1846
1847            let parsed_nodes: Vec<ClusterNode> = node_list
1848                .iter()
1849                .filter_map(|seed| ClusterNode::from_seed(seed))
1850                .collect();
1851            options.database.redis.cluster.nodes = parsed_nodes.clone();
1852            options.database.redis.cluster_nodes = parsed_nodes;
1853        };
1854
1855        if let Ok(nodes) = std::env::var("REDIS_CLUSTER_NODES") {
1856            apply_redis_cluster_nodes(self, &nodes);
1857        }
1858        if let Ok(nodes) = std::env::var("DATABASE_REDIS_CLUSTER_NODES") {
1859            apply_redis_cluster_nodes(self, &nodes);
1860        }
1861        self.queue.redis_cluster.concurrency = parse_env::<u32>(
1862            "REDIS_CLUSTER_QUEUE_CONCURRENCY",
1863            self.queue.redis_cluster.concurrency,
1864        );
1865        if let Ok(prefix) = std::env::var("REDIS_CLUSTER_QUEUE_PREFIX") {
1866            self.queue.redis_cluster.prefix = Some(prefix);
1867        }
1868
1869        // --- SSL Configuration ---
1870        self.ssl.enabled = parse_bool_env("SSL_ENABLED", self.ssl.enabled);
1871        if let Ok(val) = std::env::var("SSL_CERT_PATH") {
1872            self.ssl.cert_path = val;
1873        }
1874        if let Ok(val) = std::env::var("SSL_KEY_PATH") {
1875            self.ssl.key_path = val;
1876        }
1877        self.ssl.redirect_http = parse_bool_env("SSL_REDIRECT_HTTP", self.ssl.redirect_http);
1878        if let Some(port) = parse_env_optional::<u16>("SSL_HTTP_PORT") {
1879            self.ssl.http_port = Some(port);
1880        }
1881
1882        // --- Unix Socket Configuration ---
1883        self.unix_socket.enabled = parse_bool_env("UNIX_SOCKET_ENABLED", self.unix_socket.enabled);
1884        if let Ok(path) = std::env::var("UNIX_SOCKET_PATH") {
1885            self.unix_socket.path = path;
1886        }
1887        if let Ok(mode_str) = std::env::var("UNIX_SOCKET_PERMISSION_MODE") {
1888            if mode_str.chars().all(|c| c.is_digit(8)) {
1889                if let Ok(mode) = u32::from_str_radix(&mode_str, 8) {
1890                    if mode <= 0o777 {
1891                        self.unix_socket.permission_mode = mode;
1892                    } else {
1893                        warn!(
1894                            "UNIX_SOCKET_PERMISSION_MODE '{}' exceeds maximum value 777. Using default: {:o}",
1895                            mode_str, self.unix_socket.permission_mode
1896                        );
1897                    }
1898                } else {
1899                    warn!(
1900                        "Failed to parse UNIX_SOCKET_PERMISSION_MODE '{}' as octal. Using default: {:o}",
1901                        mode_str, self.unix_socket.permission_mode
1902                    );
1903                }
1904            } else {
1905                warn!(
1906                    "UNIX_SOCKET_PERMISSION_MODE '{}' must contain only octal digits (0-7). Using default: {:o}",
1907                    mode_str, self.unix_socket.permission_mode
1908                );
1909            }
1910        }
1911
1912        // --- Metrics ---
1913        if let Ok(driver_str) = std::env::var("METRICS_DRIVER") {
1914            self.metrics.driver =
1915                parse_driver_enum(driver_str, self.metrics.driver.clone(), "Metrics");
1916        }
1917        self.metrics.enabled = parse_bool_env("METRICS_ENABLED", self.metrics.enabled);
1918        if let Ok(val) = std::env::var("METRICS_HOST") {
1919            self.metrics.host = val;
1920        }
1921        self.metrics.port = parse_env::<u16>("METRICS_PORT", self.metrics.port);
1922        if let Ok(val) = std::env::var("METRICS_PROMETHEUS_PREFIX") {
1923            self.metrics.prometheus.prefix = val;
1924        }
1925
1926        // --- Rate Limiter ---
1927        self.rate_limiter.enabled =
1928            parse_bool_env("RATE_LIMITER_ENABLED", self.rate_limiter.enabled);
1929        self.rate_limiter.api_rate_limit.max_requests = parse_env::<u32>(
1930            "RATE_LIMITER_API_MAX_REQUESTS",
1931            self.rate_limiter.api_rate_limit.max_requests,
1932        );
1933        self.rate_limiter.api_rate_limit.window_seconds = parse_env::<u64>(
1934            "RATE_LIMITER_API_WINDOW_SECONDS",
1935            self.rate_limiter.api_rate_limit.window_seconds,
1936        );
1937        if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_API_TRUST_HOPS") {
1938            self.rate_limiter.api_rate_limit.trust_hops = Some(hops);
1939        }
1940        self.rate_limiter.websocket_rate_limit.max_requests = parse_env::<u32>(
1941            "RATE_LIMITER_WS_MAX_REQUESTS",
1942            self.rate_limiter.websocket_rate_limit.max_requests,
1943        );
1944        self.rate_limiter.websocket_rate_limit.window_seconds = parse_env::<u64>(
1945            "RATE_LIMITER_WS_WINDOW_SECONDS",
1946            self.rate_limiter.websocket_rate_limit.window_seconds,
1947        );
1948        if let Ok(prefix) = std::env::var("RATE_LIMITER_REDIS_PREFIX") {
1949            self.rate_limiter.redis.prefix = Some(prefix);
1950        }
1951
1952        // --- Queue: Redis ---
1953        self.queue.redis.concurrency =
1954            parse_env::<u32>("QUEUE_REDIS_CONCURRENCY", self.queue.redis.concurrency);
1955        if let Ok(prefix) = std::env::var("QUEUE_REDIS_PREFIX") {
1956            self.queue.redis.prefix = Some(prefix);
1957        }
1958
1959        // --- Queue: SQS ---
1960        if let Ok(region) = std::env::var("QUEUE_SQS_REGION") {
1961            self.queue.sqs.region = region;
1962        }
1963        self.queue.sqs.visibility_timeout = parse_env::<i32>(
1964            "QUEUE_SQS_VISIBILITY_TIMEOUT",
1965            self.queue.sqs.visibility_timeout,
1966        );
1967        self.queue.sqs.max_messages =
1968            parse_env::<i32>("QUEUE_SQS_MAX_MESSAGES", self.queue.sqs.max_messages);
1969        self.queue.sqs.wait_time_seconds = parse_env::<i32>(
1970            "QUEUE_SQS_WAIT_TIME_SECONDS",
1971            self.queue.sqs.wait_time_seconds,
1972        );
1973        self.queue.sqs.concurrency =
1974            parse_env::<u32>("QUEUE_SQS_CONCURRENCY", self.queue.sqs.concurrency);
1975        self.queue.sqs.fifo = parse_bool_env("QUEUE_SQS_FIFO", self.queue.sqs.fifo);
1976        if let Ok(endpoint) = std::env::var("QUEUE_SQS_ENDPOINT_URL") {
1977            self.queue.sqs.endpoint_url = Some(endpoint);
1978        }
1979
1980        // --- Queue: SNS ---
1981        if let Ok(region) = std::env::var("QUEUE_SNS_REGION") {
1982            self.queue.sns.region = region;
1983        }
1984        if let Ok(topic_arn) = std::env::var("QUEUE_SNS_TOPIC_ARN") {
1985            self.queue.sns.topic_arn = topic_arn;
1986        }
1987        if let Ok(endpoint) = std::env::var("QUEUE_SNS_ENDPOINT_URL") {
1988            self.queue.sns.endpoint_url = Some(endpoint);
1989        }
1990
1991        // --- Webhooks ---
1992        self.webhooks.batching.enabled =
1993            parse_bool_env("WEBHOOK_BATCHING_ENABLED", self.webhooks.batching.enabled);
1994        self.webhooks.batching.duration =
1995            parse_env::<u64>("WEBHOOK_BATCHING_DURATION", self.webhooks.batching.duration);
1996        self.webhooks.batching.size =
1997            parse_env::<usize>("WEBHOOK_BATCHING_SIZE", self.webhooks.batching.size);
1998
1999        // --- NATS Adapter ---
2000        if let Ok(servers) = std::env::var("NATS_SERVERS") {
2001            self.adapter.nats.servers = servers.split(',').map(|s| s.trim().to_string()).collect();
2002        }
2003        if let Ok(user) = std::env::var("NATS_USERNAME") {
2004            self.adapter.nats.username = Some(user);
2005        }
2006        if let Ok(pass) = std::env::var("NATS_PASSWORD") {
2007            self.adapter.nats.password = Some(pass);
2008        }
2009        if let Ok(token) = std::env::var("NATS_TOKEN") {
2010            self.adapter.nats.token = Some(token);
2011        }
2012        self.adapter.nats.connection_timeout_ms = parse_env::<u64>(
2013            "NATS_CONNECTION_TIMEOUT_MS",
2014            self.adapter.nats.connection_timeout_ms,
2015        );
2016        self.adapter.nats.request_timeout_ms = parse_env::<u64>(
2017            "NATS_REQUEST_TIMEOUT_MS",
2018            self.adapter.nats.request_timeout_ms,
2019        );
2020
2021        // --- CORS ---
2022        if let Ok(origins) = std::env::var("CORS_ORIGINS") {
2023            self.cors.origin = origins.split(',').map(|s| s.trim().to_string()).collect();
2024        }
2025        if let Ok(methods) = std::env::var("CORS_METHODS") {
2026            self.cors.methods = methods.split(',').map(|s| s.trim().to_string()).collect();
2027        }
2028        if let Ok(headers) = std::env::var("CORS_HEADERS") {
2029            self.cors.allowed_headers = headers.split(',').map(|s| s.trim().to_string()).collect();
2030        }
2031        self.cors.credentials = parse_bool_env("CORS_CREDENTIALS", self.cors.credentials);
2032
2033        // --- Performance Tuning ---
2034        self.database_pooling.enabled =
2035            parse_bool_env("DATABASE_POOLING_ENABLED", self.database_pooling.enabled);
2036        if let Some(min) = parse_env_optional::<u32>("DATABASE_POOL_MIN") {
2037            self.database_pooling.min = min;
2038        }
2039        if let Some(max) = parse_env_optional::<u32>("DATABASE_POOL_MAX") {
2040            self.database_pooling.max = max;
2041        }
2042
2043        if let Some(pool_size) = parse_env_optional::<u32>("DATABASE_CONNECTION_POOL_SIZE") {
2044            self.database.mysql.connection_pool_size = pool_size;
2045            self.database.postgres.connection_pool_size = pool_size;
2046        }
2047        if let Some(cache_ttl) = parse_env_optional::<u64>("CACHE_TTL_SECONDS") {
2048            self.app_manager.cache.ttl = cache_ttl;
2049            self.channel_limits.cache_ttl = cache_ttl;
2050            self.database.mysql.cache_ttl = cache_ttl;
2051            self.database.postgres.cache_ttl = cache_ttl;
2052            self.cache.memory.ttl = cache_ttl;
2053        }
2054        if let Some(cleanup_interval) = parse_env_optional::<u64>("CACHE_CLEANUP_INTERVAL") {
2055            self.database.mysql.cache_cleanup_interval = cleanup_interval;
2056            self.database.postgres.cache_cleanup_interval = cleanup_interval;
2057            self.cache.memory.cleanup_interval = cleanup_interval;
2058        }
2059        if let Some(max_capacity) = parse_env_optional::<u64>("CACHE_MAX_CAPACITY") {
2060            self.database.mysql.cache_max_capacity = max_capacity;
2061            self.database.postgres.cache_max_capacity = max_capacity;
2062            self.cache.memory.max_capacity = max_capacity;
2063        }
2064
2065        let default_app_id = std::env::var("SOCKUDO_DEFAULT_APP_ID");
2066        let default_app_key = std::env::var("SOCKUDO_DEFAULT_APP_KEY");
2067        let default_app_secret = std::env::var("SOCKUDO_DEFAULT_APP_SECRET");
2068        let default_app_enabled = parse_bool_env("SOCKUDO_DEFAULT_APP_ENABLED", true);
2069
2070        if let (Ok(app_id), Ok(app_key), Ok(app_secret)) =
2071            (default_app_id, default_app_key, default_app_secret)
2072            && default_app_enabled
2073        {
2074            let default_app = App {
2075                id: app_id,
2076                key: app_key,
2077                secret: app_secret,
2078                enable_client_messages: parse_bool_env("SOCKUDO_ENABLE_CLIENT_MESSAGES", false),
2079                enabled: default_app_enabled,
2080                max_connections: parse_env::<u32>("SOCKUDO_DEFAULT_APP_MAX_CONNECTIONS", 100),
2081                max_client_events_per_second: parse_env::<u32>(
2082                    "SOCKUDO_DEFAULT_APP_MAX_CLIENT_EVENTS_PER_SECOND",
2083                    100,
2084                ),
2085                max_read_requests_per_second: Some(parse_env::<u32>(
2086                    "SOCKUDO_DEFAULT_APP_MAX_READ_REQUESTS_PER_SECOND",
2087                    100,
2088                )),
2089                max_presence_members_per_channel: Some(parse_env::<u32>(
2090                    "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBERS_PER_CHANNEL",
2091                    100,
2092                )),
2093                max_presence_member_size_in_kb: Some(parse_env::<u32>(
2094                    "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBER_SIZE_IN_KB",
2095                    100,
2096                )),
2097                max_channel_name_length: Some(parse_env::<u32>(
2098                    "SOCKUDO_DEFAULT_APP_MAX_CHANNEL_NAME_LENGTH",
2099                    100,
2100                )),
2101                max_event_channels_at_once: Some(parse_env::<u32>(
2102                    "SOCKUDO_DEFAULT_APP_MAX_EVENT_CHANNELS_AT_ONCE",
2103                    100,
2104                )),
2105                max_event_name_length: Some(parse_env::<u32>(
2106                    "SOCKUDO_DEFAULT_APP_MAX_EVENT_NAME_LENGTH",
2107                    100,
2108                )),
2109                max_event_payload_in_kb: Some(parse_env::<u32>(
2110                    "SOCKUDO_DEFAULT_APP_MAX_EVENT_PAYLOAD_IN_KB",
2111                    100,
2112                )),
2113                max_event_batch_size: Some(parse_env::<u32>(
2114                    "SOCKUDO_DEFAULT_APP_MAX_EVENT_BATCH_SIZE",
2115                    100,
2116                )),
2117                enable_user_authentication: Some(parse_bool_env(
2118                    "SOCKUDO_DEFAULT_APP_ENABLE_USER_AUTHENTICATION",
2119                    false,
2120                )),
2121                webhooks: None,
2122                max_backend_events_per_second: Some(parse_env::<u32>(
2123                    "SOCKUDO_DEFAULT_APP_MAX_BACKEND_EVENTS_PER_SECOND",
2124                    100,
2125                )),
2126                enable_watchlist_events: Some(parse_bool_env(
2127                    "SOCKUDO_DEFAULT_APP_ENABLE_WATCHLIST_EVENTS",
2128                    false,
2129                )),
2130                allowed_origins: {
2131                    if let Ok(origins_str) = std::env::var("SOCKUDO_DEFAULT_APP_ALLOWED_ORIGINS") {
2132                        if !origins_str.is_empty() {
2133                            Some(
2134                                origins_str
2135                                    .split(',')
2136                                    .map(|s| s.trim().to_string())
2137                                    .collect(),
2138                            )
2139                        } else {
2140                            None
2141                        }
2142                    } else {
2143                        None
2144                    }
2145                },
2146                channel_delta_compression: None,
2147            };
2148
2149            self.app_manager.array.apps.push(default_app);
2150            info!("Successfully registered default app from env");
2151        }
2152
2153        // Special handling for REDIS_URL
2154        if let Ok(redis_url_env) = std::env::var("REDIS_URL") {
2155            info!("Applying REDIS_URL environment variable override");
2156
2157            let redis_url_json = sonic_rs::json!(redis_url_env);
2158
2159            self.adapter
2160                .redis
2161                .redis_pub_options
2162                .insert("url".to_string(), redis_url_json.clone());
2163            self.adapter
2164                .redis
2165                .redis_sub_options
2166                .insert("url".to_string(), redis_url_json);
2167
2168            self.cache.redis.url_override = Some(redis_url_env.clone());
2169            self.queue.redis.url_override = Some(redis_url_env.clone());
2170            self.rate_limiter.redis.url_override = Some(redis_url_env);
2171        }
2172
2173        // --- Logging Configuration ---
2174        let has_colors_env = std::env::var("LOG_COLORS_ENABLED").is_ok();
2175        let has_target_env = std::env::var("LOG_INCLUDE_TARGET").is_ok();
2176        if has_colors_env || has_target_env {
2177            let logging_config = self.logging.get_or_insert_with(Default::default);
2178            if has_colors_env {
2179                logging_config.colors_enabled =
2180                    parse_bool_env("LOG_COLORS_ENABLED", logging_config.colors_enabled);
2181            }
2182            if has_target_env {
2183                logging_config.include_target =
2184                    parse_bool_env("LOG_INCLUDE_TARGET", logging_config.include_target);
2185            }
2186        }
2187
2188        // --- Cleanup Configuration ---
2189        self.cleanup.async_enabled =
2190            parse_bool_env("CLEANUP_ASYNC_ENABLED", self.cleanup.async_enabled);
2191        self.cleanup.fallback_to_sync =
2192            parse_bool_env("CLEANUP_FALLBACK_TO_SYNC", self.cleanup.fallback_to_sync);
2193        self.cleanup.queue_buffer_size =
2194            parse_env::<usize>("CLEANUP_QUEUE_BUFFER_SIZE", self.cleanup.queue_buffer_size);
2195        self.cleanup.batch_size = parse_env::<usize>("CLEANUP_BATCH_SIZE", self.cleanup.batch_size);
2196        self.cleanup.batch_timeout_ms =
2197            parse_env::<u64>("CLEANUP_BATCH_TIMEOUT_MS", self.cleanup.batch_timeout_ms);
2198        if let Ok(worker_threads_str) = std::env::var("CLEANUP_WORKER_THREADS") {
2199            self.cleanup.worker_threads = if worker_threads_str.to_lowercase() == "auto" {
2200                WorkerThreadsConfig::Auto
2201            } else if let Ok(n) = worker_threads_str.parse::<usize>() {
2202                WorkerThreadsConfig::Fixed(n)
2203            } else {
2204                warn!(
2205                    "Invalid CLEANUP_WORKER_THREADS value '{}', keeping current setting",
2206                    worker_threads_str
2207                );
2208                self.cleanup.worker_threads.clone()
2209            };
2210        }
2211        self.cleanup.max_retry_attempts = parse_env::<u32>(
2212            "CLEANUP_MAX_RETRY_ATTEMPTS",
2213            self.cleanup.max_retry_attempts,
2214        );
2215
2216        // Cluster health configuration
2217        self.cluster_health.enabled =
2218            parse_bool_env("CLUSTER_HEALTH_ENABLED", self.cluster_health.enabled);
2219        self.cluster_health.heartbeat_interval_ms = parse_env::<u64>(
2220            "CLUSTER_HEALTH_HEARTBEAT_INTERVAL",
2221            self.cluster_health.heartbeat_interval_ms,
2222        );
2223        self.cluster_health.node_timeout_ms = parse_env::<u64>(
2224            "CLUSTER_HEALTH_NODE_TIMEOUT",
2225            self.cluster_health.node_timeout_ms,
2226        );
2227        self.cluster_health.cleanup_interval_ms = parse_env::<u64>(
2228            "CLUSTER_HEALTH_CLEANUP_INTERVAL",
2229            self.cluster_health.cleanup_interval_ms,
2230        );
2231
2232        // Tag filtering configuration
2233        self.tag_filtering.enabled =
2234            parse_bool_env("TAG_FILTERING_ENABLED", self.tag_filtering.enabled);
2235
2236        // WebSocket buffer configuration
2237        if let Ok(val) = std::env::var("WEBSOCKET_MAX_MESSAGES") {
2238            if val.to_lowercase() == "none" || val == "0" {
2239                self.websocket.max_messages = None;
2240            } else if let Ok(n) = val.parse::<usize>() {
2241                self.websocket.max_messages = Some(n);
2242            }
2243        }
2244        if let Ok(val) = std::env::var("WEBSOCKET_MAX_BYTES") {
2245            if val.to_lowercase() == "none" || val == "0" {
2246                self.websocket.max_bytes = None;
2247            } else if let Ok(n) = val.parse::<usize>() {
2248                self.websocket.max_bytes = Some(n);
2249            }
2250        }
2251        self.websocket.disconnect_on_buffer_full = parse_bool_env(
2252            "WEBSOCKET_DISCONNECT_ON_BUFFER_FULL",
2253            self.websocket.disconnect_on_buffer_full,
2254        );
2255        self.websocket.max_message_size = parse_env::<usize>(
2256            "WEBSOCKET_MAX_MESSAGE_SIZE",
2257            self.websocket.max_message_size,
2258        );
2259        self.websocket.max_frame_size =
2260            parse_env::<usize>("WEBSOCKET_MAX_FRAME_SIZE", self.websocket.max_frame_size);
2261        self.websocket.write_buffer_size = parse_env::<usize>(
2262            "WEBSOCKET_WRITE_BUFFER_SIZE",
2263            self.websocket.write_buffer_size,
2264        );
2265        self.websocket.max_backpressure = parse_env::<usize>(
2266            "WEBSOCKET_MAX_BACKPRESSURE",
2267            self.websocket.max_backpressure,
2268        );
2269        self.websocket.auto_ping = parse_bool_env("WEBSOCKET_AUTO_PING", self.websocket.auto_ping);
2270        self.websocket.ping_interval =
2271            parse_env::<u32>("WEBSOCKET_PING_INTERVAL", self.websocket.ping_interval);
2272        self.websocket.idle_timeout =
2273            parse_env::<u32>("WEBSOCKET_IDLE_TIMEOUT", self.websocket.idle_timeout);
2274        if let Ok(mode) = std::env::var("WEBSOCKET_COMPRESSION") {
2275            self.websocket.compression = mode;
2276        }
2277
2278        Ok(())
2279    }
2280
2281    pub fn validate(&self) -> Result<(), String> {
2282        if self.unix_socket.enabled {
2283            if self.unix_socket.path.is_empty() {
2284                return Err(
2285                    "Unix socket path cannot be empty when Unix socket is enabled".to_string(),
2286                );
2287            }
2288
2289            self.validate_unix_socket_security()?;
2290
2291            if self.ssl.enabled {
2292                tracing::warn!(
2293                    "Both Unix socket and SSL are enabled. This is unusual as Unix sockets are typically used behind reverse proxies that handle SSL termination."
2294                );
2295            }
2296
2297            if self.unix_socket.permission_mode > 0o777 {
2298                return Err(format!(
2299                    "Unix socket permission_mode ({:o}) is invalid. Must be a valid octal mode (0o000 to 0o777)",
2300                    self.unix_socket.permission_mode
2301                ));
2302            }
2303        }
2304
2305        if let Err(e) = self.cleanup.validate() {
2306            return Err(format!("Invalid cleanup configuration: {}", e));
2307        }
2308
2309        Ok(())
2310    }
2311
2312    fn validate_unix_socket_security(&self) -> Result<(), String> {
2313        let path = &self.unix_socket.path;
2314
2315        if path.contains("../") || path.contains("..\\") {
2316            return Err(
2317                "Unix socket path contains directory traversal sequences (../). This is not allowed for security reasons.".to_string()
2318            );
2319        }
2320
2321        if self.unix_socket.permission_mode & 0o002 != 0 {
2322            warn!(
2323                "Unix socket permission mode ({:o}) allows world write access. This may be a security risk. Consider using more restrictive permissions like 0o660 or 0o750.",
2324                self.unix_socket.permission_mode
2325            );
2326        }
2327
2328        if self.unix_socket.permission_mode & 0o007 > 0o005 {
2329            warn!(
2330                "Unix socket permission mode ({:o}) grants write permissions to others. Consider using more restrictive permissions.",
2331                self.unix_socket.permission_mode
2332            );
2333        }
2334
2335        if self.mode == "production" && path.starts_with("/tmp/") {
2336            warn!(
2337                "Unix socket path '{}' is in /tmp directory. In production, consider using a more permanent location like /var/run/sockudo/ for better security and persistence.",
2338                path
2339            );
2340        }
2341
2342        if !path.starts_with('/') {
2343            return Err(
2344                "Unix socket path must be absolute (start with /) for security and reliability."
2345                    .to_string(),
2346            );
2347        }
2348
2349        Ok(())
2350    }
2351}