Skip to main content

sockudo_core/
options.rs

1use crate::app::App;
2use crate::utils::{parse_bool_env, parse_env, parse_env_optional};
3use ahash::AHashMap;
4use serde::{Deserialize, Serialize};
5use std::str::FromStr;
6use tracing::{info, warn};
7use url::Url;
8
9// Custom deserializer for octal permission mode (string format only, like chmod)
10fn deserialize_octal_permission<'de, D>(deserializer: D) -> Result<u32, D::Error>
11where
12    D: serde::Deserializer<'de>,
13{
14    use serde::de::{self};
15
16    let s = String::deserialize(deserializer)?;
17
18    // Validate that the string contains only octal digits
19    if !s.chars().all(|c| c.is_digit(8)) {
20        return Err(de::Error::custom(format!(
21            "invalid octal permission mode '{}': must contain only digits 0-7",
22            s
23        )));
24    }
25
26    // Parse as octal
27    let mode = u32::from_str_radix(&s, 8)
28        .map_err(|_| de::Error::custom(format!("invalid octal permission mode: {}", s)))?;
29
30    // Validate it's within valid Unix permission range
31    if mode > 0o777 {
32        return Err(de::Error::custom(format!(
33            "permission mode '{}' exceeds maximum value 777",
34            s
35        )));
36    }
37
38    Ok(mode)
39}
40
41// Helper function to parse driver enums with fallback behavior (matches main.rs)
42fn parse_driver_enum<T: FromStr + Clone + std::fmt::Debug>(
43    driver_str: String,
44    default_driver: T,
45    driver_name: &str,
46) -> T
47where
48    <T as FromStr>::Err: std::fmt::Debug,
49{
50    match T::from_str(&driver_str.to_lowercase()) {
51        Ok(driver_enum) => driver_enum,
52        Err(e) => {
53            warn!(
54                "Failed to parse {} driver from string '{}': {:?}. Using default: {:?}.",
55                driver_name, driver_str, e, default_driver
56            );
57            default_driver
58        }
59    }
60}
61
62fn override_db_pool_settings(db_conn: &mut DatabaseConnection, prefix: &str) {
63    if let Some(min) = parse_env_optional::<u32>(&format!("{}_POOL_MIN", prefix)) {
64        db_conn.pool_min = Some(min);
65    }
66    if let Some(max) = parse_env_optional::<u32>(&format!("{}_POOL_MAX", prefix)) {
67        db_conn.pool_max = Some(max);
68    }
69}
70
71// --- Enums for Driver Types ---
72
73#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(rename_all = "lowercase")]
75pub enum AdapterDriver {
76    #[default]
77    Local,
78    Redis,
79    #[serde(rename = "redis-cluster")]
80    RedisCluster,
81    Nats,
82    RabbitMq,
83    #[serde(rename = "google-pubsub")]
84    GooglePubSub,
85    Kafka,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
89#[serde(default)]
90pub struct DynamoDbSettings {
91    pub region: String,
92    pub table_name: String,
93    pub endpoint_url: Option<String>,
94    pub aws_access_key_id: Option<String>,
95    pub aws_secret_access_key: Option<String>,
96    pub aws_profile_name: Option<String>,
97}
98
99impl Default for DynamoDbSettings {
100    fn default() -> Self {
101        Self {
102            region: "us-east-1".to_string(),
103            table_name: "sockudo-applications".to_string(),
104            endpoint_url: None,
105            aws_access_key_id: None,
106            aws_secret_access_key: None,
107            aws_profile_name: None,
108        }
109    }
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
113#[serde(default)]
114pub struct ScyllaDbSettings {
115    pub nodes: Vec<String>,
116    pub keyspace: String,
117    pub table_name: String,
118    pub username: Option<String>,
119    pub password: Option<String>,
120    pub replication_class: String,
121    pub replication_factor: u32,
122}
123
124impl Default for ScyllaDbSettings {
125    fn default() -> Self {
126        Self {
127            nodes: vec!["127.0.0.1:9042".to_string()],
128            keyspace: "sockudo".to_string(),
129            table_name: "applications".to_string(),
130            username: None,
131            password: None,
132            replication_class: "SimpleStrategy".to_string(),
133            replication_factor: 3,
134        }
135    }
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize)]
139#[serde(default)]
140pub struct SurrealDbSettings {
141    pub url: String,
142    pub namespace: String,
143    pub database: String,
144    pub username: String,
145    pub password: String,
146    pub table_name: String,
147    pub cache_ttl: u64,
148    pub cache_max_capacity: u64,
149}
150
151impl Default for SurrealDbSettings {
152    fn default() -> Self {
153        Self {
154            url: "ws://127.0.0.1:8000".to_string(),
155            namespace: "sockudo".to_string(),
156            database: "sockudo".to_string(),
157            username: "root".to_string(),
158            password: "root".to_string(),
159            table_name: "applications".to_string(),
160            cache_ttl: 300,
161            cache_max_capacity: 100,
162        }
163    }
164}
165
166impl FromStr for AdapterDriver {
167    type Err = String;
168    fn from_str(s: &str) -> Result<Self, Self::Err> {
169        match s.to_lowercase().as_str() {
170            "local" => Ok(AdapterDriver::Local),
171            "redis" => Ok(AdapterDriver::Redis),
172            "redis-cluster" => Ok(AdapterDriver::RedisCluster),
173            "nats" => Ok(AdapterDriver::Nats),
174            "rabbitmq" | "rabbit-mq" => Ok(AdapterDriver::RabbitMq),
175            "google-pubsub" | "gcp-pubsub" | "pubsub" => Ok(AdapterDriver::GooglePubSub),
176            "kafka" => Ok(AdapterDriver::Kafka),
177            _ => Err(format!("Unknown adapter driver: {s}")),
178        }
179    }
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
183#[serde(rename_all = "lowercase")]
184pub enum AppManagerDriver {
185    #[default]
186    Memory,
187    Mysql,
188    Dynamodb,
189    PgSql,
190    SurrealDb,
191    ScyllaDb,
192}
193impl FromStr for AppManagerDriver {
194    type Err = String;
195    fn from_str(s: &str) -> Result<Self, Self::Err> {
196        match s.to_lowercase().as_str() {
197            "memory" => Ok(AppManagerDriver::Memory),
198            "mysql" => Ok(AppManagerDriver::Mysql),
199            "dynamodb" => Ok(AppManagerDriver::Dynamodb),
200            "pgsql" | "postgres" | "postgresql" => Ok(AppManagerDriver::PgSql),
201            "surreal" | "surrealdb" => Ok(AppManagerDriver::SurrealDb),
202            "scylladb" | "scylla" => Ok(AppManagerDriver::ScyllaDb),
203            _ => Err(format!("Unknown app manager driver: {s}")),
204        }
205    }
206}
207
208#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
209#[serde(rename_all = "lowercase")]
210pub enum CacheDriver {
211    #[default]
212    Memory,
213    Redis,
214    #[serde(rename = "redis-cluster")]
215    RedisCluster,
216    None,
217}
218
219impl FromStr for CacheDriver {
220    type Err = String;
221    fn from_str(s: &str) -> Result<Self, Self::Err> {
222        match s.to_lowercase().as_str() {
223            "memory" => Ok(CacheDriver::Memory),
224            "redis" => Ok(CacheDriver::Redis),
225            "redis-cluster" => Ok(CacheDriver::RedisCluster),
226            "none" => Ok(CacheDriver::None),
227            _ => Err(format!("Unknown cache driver: {s}")),
228        }
229    }
230}
231
232#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
233#[serde(rename_all = "lowercase")]
234pub enum QueueDriver {
235    Memory,
236    #[default]
237    Redis,
238    #[serde(rename = "redis-cluster")]
239    RedisCluster,
240    Sqs,
241    Sns,
242    None,
243}
244
245impl FromStr for QueueDriver {
246    type Err = String;
247    fn from_str(s: &str) -> Result<Self, Self::Err> {
248        match s.to_lowercase().as_str() {
249            "memory" => Ok(QueueDriver::Memory),
250            "redis" => Ok(QueueDriver::Redis),
251            "redis-cluster" => Ok(QueueDriver::RedisCluster),
252            "sqs" => Ok(QueueDriver::Sqs),
253            "sns" => Ok(QueueDriver::Sns),
254            "none" => Ok(QueueDriver::None),
255            _ => Err(format!("Unknown queue driver: {s}")),
256        }
257    }
258}
259
260impl AsRef<str> for QueueDriver {
261    fn as_ref(&self) -> &str {
262        match self {
263            QueueDriver::Memory => "memory",
264            QueueDriver::Redis => "redis",
265            QueueDriver::RedisCluster => "redis-cluster",
266            QueueDriver::Sqs => "sqs",
267            QueueDriver::Sns => "sns",
268            QueueDriver::None => "none",
269        }
270    }
271}
272
273#[derive(Debug, Clone, Serialize, Deserialize)]
274#[serde(default)]
275pub struct RedisClusterQueueConfig {
276    pub concurrency: u32,
277    pub prefix: Option<String>,
278    pub nodes: Vec<String>,
279    pub request_timeout_ms: u64,
280}
281
282impl Default for RedisClusterQueueConfig {
283    fn default() -> Self {
284        Self {
285            concurrency: 5,
286            prefix: Some("sockudo_queue:".to_string()),
287            nodes: vec!["redis://127.0.0.1:6379".to_string()],
288            request_timeout_ms: 5000,
289        }
290    }
291}
292
293#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
294#[serde(rename_all = "lowercase")]
295pub enum MetricsDriver {
296    #[default]
297    Prometheus,
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
301#[serde(rename_all = "lowercase")]
302pub enum LogOutputFormat {
303    #[default]
304    Human,
305    Json,
306}
307
308impl FromStr for LogOutputFormat {
309    type Err = String;
310    fn from_str(s: &str) -> Result<Self, Self::Err> {
311        match s.to_lowercase().as_str() {
312            "human" => Ok(LogOutputFormat::Human),
313            "json" => Ok(LogOutputFormat::Json),
314            _ => Err(format!("Unknown log output format: {s}")),
315        }
316    }
317}
318
319impl FromStr for MetricsDriver {
320    type Err = String;
321    fn from_str(s: &str) -> Result<Self, Self::Err> {
322        match s.to_lowercase().as_str() {
323            "prometheus" => Ok(MetricsDriver::Prometheus),
324            _ => Err(format!("Unknown metrics driver: {s}")),
325        }
326    }
327}
328
329impl AsRef<str> for MetricsDriver {
330    fn as_ref(&self) -> &str {
331        match self {
332            MetricsDriver::Prometheus => "prometheus",
333        }
334    }
335}
336
337// --- Main Configuration Struct ---
338#[derive(Debug, Clone, Serialize, Deserialize)]
339#[serde(default)]
340pub struct ServerOptions {
341    pub adapter: AdapterConfig,
342    pub app_manager: AppManagerConfig,
343    pub cache: CacheConfig,
344    pub channel_limits: ChannelLimits,
345    pub cors: CorsConfig,
346    pub database: DatabaseConfig,
347    pub database_pooling: DatabasePooling,
348    pub debug: bool,
349    pub event_limits: EventLimits,
350    pub host: String,
351    pub http_api: HttpApiConfig,
352    pub instance: InstanceConfig,
353    pub logging: Option<LoggingConfig>,
354    pub metrics: MetricsConfig,
355    pub mode: String,
356    pub port: u16,
357    pub path_prefix: String,
358    pub presence: PresenceConfig,
359    pub queue: QueueConfig,
360    pub rate_limiter: RateLimiterConfig,
361    pub shutdown_grace_period: u64,
362    pub ssl: SslConfig,
363    pub user_authentication_timeout: u64,
364    pub webhooks: WebhooksConfig,
365    pub websocket_max_payload_kb: u32,
366    pub cleanup: CleanupConfig,
367    pub activity_timeout: u64,
368    pub cluster_health: ClusterHealthConfig,
369    pub unix_socket: UnixSocketConfig,
370    pub delta_compression: DeltaCompressionOptionsConfig,
371    pub tag_filtering: TagFilteringConfig,
372    pub websocket: WebSocketConfig,
373    pub connection_recovery: ConnectionRecoveryConfig,
374    pub idempotency: IdempotencyConfig,
375    pub ephemeral: EphemeralConfig,
376    pub echo_control: EchoControlConfig,
377    pub event_name_filtering: EventNameFilteringConfig,
378}
379
380// --- Configuration Sub-Structs ---
381
382#[derive(Debug, Clone, Serialize, Deserialize)]
383#[serde(default)]
384pub struct SqsQueueConfig {
385    pub region: String,
386    pub queue_url_prefix: Option<String>,
387    pub visibility_timeout: i32,
388    pub endpoint_url: Option<String>,
389    pub max_messages: i32,
390    pub wait_time_seconds: i32,
391    pub concurrency: u32,
392    pub fifo: bool,
393    pub message_group_id: Option<String>,
394}
395
396#[derive(Debug, Clone, Serialize, Deserialize)]
397#[serde(default)]
398pub struct SnsQueueConfig {
399    pub region: String,
400    pub topic_arn: String,
401    pub endpoint_url: Option<String>,
402}
403
404#[derive(Debug, Clone, Serialize, Deserialize)]
405#[serde(default)]
406pub struct AdapterConfig {
407    pub driver: AdapterDriver,
408    pub redis: RedisAdapterConfig,
409    pub cluster: RedisClusterAdapterConfig,
410    pub nats: NatsAdapterConfig,
411    pub rabbitmq: RabbitMqAdapterConfig,
412    pub google_pubsub: GooglePubSubAdapterConfig,
413    pub kafka: KafkaAdapterConfig,
414    #[serde(default = "default_buffer_multiplier_per_cpu")]
415    pub buffer_multiplier_per_cpu: usize,
416    pub cluster_health: ClusterHealthConfig,
417    #[serde(default = "default_enable_socket_counting")]
418    pub enable_socket_counting: bool,
419}
420
421fn default_enable_socket_counting() -> bool {
422    true
423}
424
425fn default_buffer_multiplier_per_cpu() -> usize {
426    64
427}
428
429impl Default for AdapterConfig {
430    fn default() -> Self {
431        Self {
432            driver: AdapterDriver::default(),
433            redis: RedisAdapterConfig::default(),
434            cluster: RedisClusterAdapterConfig::default(),
435            nats: NatsAdapterConfig::default(),
436            rabbitmq: RabbitMqAdapterConfig::default(),
437            google_pubsub: GooglePubSubAdapterConfig::default(),
438            kafka: KafkaAdapterConfig::default(),
439            buffer_multiplier_per_cpu: default_buffer_multiplier_per_cpu(),
440            cluster_health: ClusterHealthConfig::default(),
441            enable_socket_counting: default_enable_socket_counting(),
442        }
443    }
444}
445
446#[derive(Debug, Clone, Serialize, Deserialize)]
447#[serde(default)]
448pub struct RedisAdapterConfig {
449    pub requests_timeout: u64,
450    pub prefix: String,
451    pub redis_pub_options: AHashMap<String, sonic_rs::Value>,
452    pub redis_sub_options: AHashMap<String, sonic_rs::Value>,
453    pub cluster_mode: bool,
454}
455
456#[derive(Debug, Clone, Serialize, Deserialize)]
457#[serde(default)]
458pub struct RedisClusterAdapterConfig {
459    pub nodes: Vec<String>,
460    pub prefix: String,
461    pub request_timeout_ms: u64,
462    pub use_connection_manager: bool,
463    #[serde(default)]
464    pub use_sharded_pubsub: bool,
465}
466
467#[derive(Debug, Clone, Serialize, Deserialize)]
468#[serde(default)]
469pub struct NatsAdapterConfig {
470    pub servers: Vec<String>,
471    pub prefix: String,
472    pub request_timeout_ms: u64,
473    pub username: Option<String>,
474    pub password: Option<String>,
475    pub token: Option<String>,
476    pub connection_timeout_ms: u64,
477    pub nodes_number: Option<u32>,
478}
479
480#[derive(Debug, Clone, Serialize, Deserialize)]
481#[serde(default)]
482pub struct RabbitMqAdapterConfig {
483    pub url: String,
484    pub prefix: String,
485    pub request_timeout_ms: u64,
486    pub connection_timeout_ms: u64,
487    pub nodes_number: Option<u32>,
488}
489
490#[derive(Debug, Clone, Serialize, Deserialize)]
491#[serde(default)]
492pub struct GooglePubSubAdapterConfig {
493    pub project_id: String,
494    pub prefix: String,
495    pub request_timeout_ms: u64,
496    pub emulator_host: Option<String>,
497    pub nodes_number: Option<u32>,
498}
499
500#[derive(Debug, Clone, Serialize, Deserialize)]
501#[serde(default)]
502pub struct KafkaAdapterConfig {
503    pub brokers: Vec<String>,
504    pub prefix: String,
505    pub request_timeout_ms: u64,
506    pub security_protocol: Option<String>,
507    pub sasl_mechanism: Option<String>,
508    pub sasl_username: Option<String>,
509    pub sasl_password: Option<String>,
510    pub nodes_number: Option<u32>,
511}
512
513#[derive(Debug, Clone, Serialize, Deserialize, Default)]
514#[serde(default)]
515pub struct AppManagerConfig {
516    pub driver: AppManagerDriver,
517    pub array: ArrayConfig,
518    pub cache: CacheSettings,
519    pub scylladb: ScyllaDbSettings,
520}
521
522#[derive(Debug, Clone, Serialize, Deserialize, Default)]
523#[serde(default)]
524pub struct ArrayConfig {
525    pub apps: Vec<App>,
526}
527
528#[derive(Debug, Clone, Serialize, Deserialize)]
529#[serde(default)]
530pub struct CacheSettings {
531    pub enabled: bool,
532    pub ttl: u64,
533}
534
535#[derive(Debug, Clone, Serialize, Deserialize)]
536#[serde(default)]
537pub struct MemoryCacheOptions {
538    pub ttl: u64,
539    pub cleanup_interval: u64,
540    pub max_capacity: u64,
541}
542
543#[derive(Debug, Clone, Serialize, Deserialize)]
544#[serde(default)]
545pub struct CacheConfig {
546    pub driver: CacheDriver,
547    pub redis: RedisConfig,
548    pub memory: MemoryCacheOptions,
549}
550
551#[derive(Debug, Clone, Serialize, Deserialize, Default)]
552#[serde(default)]
553pub struct RedisConfig {
554    pub prefix: Option<String>,
555    pub url_override: Option<String>,
556    pub cluster_mode: bool,
557}
558
559#[derive(Debug, Clone, Serialize, Deserialize)]
560#[serde(default)]
561pub struct ChannelLimits {
562    pub max_name_length: u32,
563    pub cache_ttl: u64,
564}
565
566#[derive(Debug, Clone, Serialize, Deserialize)]
567#[serde(default)]
568pub struct CorsConfig {
569    pub credentials: bool,
570    #[serde(deserialize_with = "deserialize_and_validate_cors_origins")]
571    pub origin: Vec<String>,
572    pub methods: Vec<String>,
573    pub allowed_headers: Vec<String>,
574}
575
576fn deserialize_and_validate_cors_origins<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
577where
578    D: serde::Deserializer<'de>,
579{
580    use serde::de::Error;
581    let origins = Vec::<String>::deserialize(deserializer)?;
582
583    if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&origins) {
584        return Err(D::Error::custom(format!(
585            "CORS origin pattern validation failed: {}",
586            e
587        )));
588    }
589
590    Ok(origins)
591}
592
593#[derive(Debug, Clone, Serialize, Deserialize, Default)]
594#[serde(default)]
595pub struct DatabaseConfig {
596    pub mysql: DatabaseConnection,
597    pub postgres: DatabaseConnection,
598    pub redis: RedisConnection,
599    pub dynamodb: DynamoDbSettings,
600    pub surrealdb: SurrealDbSettings,
601    pub scylladb: ScyllaDbSettings,
602}
603
604#[derive(Debug, Clone, Serialize, Deserialize)]
605#[serde(default)]
606pub struct DatabaseConnection {
607    pub host: String,
608    pub port: u16,
609    pub username: String,
610    pub password: String,
611    pub database: String,
612    pub table_name: String,
613    pub connection_pool_size: u32,
614    pub pool_min: Option<u32>,
615    pub pool_max: Option<u32>,
616    pub cache_ttl: u64,
617    pub cache_cleanup_interval: u64,
618    pub cache_max_capacity: u64,
619}
620
621#[derive(Debug, Clone, Serialize, Deserialize)]
622#[serde(default)]
623pub struct RedisConnection {
624    pub host: String,
625    pub port: u16,
626    pub db: u32,
627    pub username: Option<String>,
628    pub password: Option<String>,
629    pub key_prefix: String,
630    pub sentinels: Vec<RedisSentinel>,
631    pub sentinel_password: Option<String>,
632    pub name: String,
633    pub cluster: RedisClusterConnection,
634    /// Legacy field kept for backward compatibility. Prefer `database.redis.cluster.nodes`.
635    pub cluster_nodes: Vec<ClusterNode>,
636}
637
638#[derive(Debug, Clone, Serialize, Deserialize)]
639#[serde(default)]
640pub struct RedisSentinel {
641    pub host: String,
642    pub port: u16,
643}
644
645#[derive(Debug, Clone, Serialize, Deserialize, Default)]
646#[serde(default)]
647pub struct RedisClusterConnection {
648    pub nodes: Vec<ClusterNode>,
649    pub username: Option<String>,
650    pub password: Option<String>,
651    #[serde(alias = "useTLS")]
652    pub use_tls: bool,
653}
654
655impl RedisConnection {
656    /// Returns true if Redis Sentinel is configured.
657    pub fn is_sentinel_configured(&self) -> bool {
658        !self.sentinels.is_empty()
659    }
660
661    /// Builds a Redis connection URL based on the configuration.
662    pub fn to_url(&self) -> String {
663        if self.is_sentinel_configured() {
664            self.build_sentinel_url()
665        } else {
666            self.build_standard_url()
667        }
668    }
669
670    fn build_standard_url(&self) -> String {
671        // Extract scheme from host if present, otherwise default to redis://
672        let (scheme, host) = if self.host.starts_with("rediss://") {
673            ("rediss://", self.host.trim_start_matches("rediss://"))
674        } else if self.host.starts_with("redis://") {
675            ("redis://", self.host.trim_start_matches("redis://"))
676        } else {
677            ("redis://", self.host.as_str())
678        };
679
680        let mut url = String::from(scheme);
681
682        if let Some(ref username) = self.username {
683            url.push_str(username);
684            if let Some(ref password) = self.password {
685                url.push(':');
686                url.push_str(&urlencoding::encode(password));
687            }
688            url.push('@');
689        } else if let Some(ref password) = self.password {
690            url.push(':');
691            url.push_str(&urlencoding::encode(password));
692            url.push('@');
693        }
694
695        url.push_str(host);
696        url.push(':');
697        url.push_str(&self.port.to_string());
698        url.push('/');
699        url.push_str(&self.db.to_string());
700
701        url
702    }
703
704    fn build_sentinel_url(&self) -> String {
705        let mut url = String::from("redis+sentinel://");
706
707        if let Some(ref sentinel_password) = self.sentinel_password {
708            url.push(':');
709            url.push_str(&urlencoding::encode(sentinel_password));
710            url.push('@');
711        }
712
713        let sentinel_hosts: Vec<String> = self
714            .sentinels
715            .iter()
716            .map(|s| format!("{}:{}", s.host, s.port))
717            .collect();
718        url.push_str(&sentinel_hosts.join(","));
719
720        url.push('/');
721        url.push_str(&self.name);
722        url.push('/');
723        url.push_str(&self.db.to_string());
724
725        let mut params = Vec::new();
726        if let Some(ref password) = self.password {
727            params.push(format!("password={}", urlencoding::encode(password)));
728        }
729        if let Some(ref username) = self.username {
730            params.push(format!("username={}", urlencoding::encode(username)));
731        }
732
733        if !params.is_empty() {
734            url.push('?');
735            url.push_str(&params.join("&"));
736        }
737
738        url
739    }
740
741    /// Returns true when cluster nodes are configured via either the new (`cluster.nodes`)
742    /// or legacy (`cluster_nodes`) field.
743    pub fn has_cluster_nodes(&self) -> bool {
744        !self.cluster.nodes.is_empty() || !self.cluster_nodes.is_empty()
745    }
746
747    /// Returns normalized Redis Cluster seed URLs from the canonical cluster configuration.
748    /// Falls back to legacy `cluster_nodes` for backward compatibility.
749    pub fn cluster_node_urls(&self) -> Vec<String> {
750        if !self.cluster.nodes.is_empty() {
751            return self.build_cluster_urls(&self.cluster.nodes);
752        }
753        self.build_cluster_urls(&self.cluster_nodes)
754    }
755
756    /// Normalizes any list of seed strings (`host:port`, `redis://...`, `rediss://...`) using
757    /// shared cluster auth/TLS options.
758    pub fn normalize_cluster_seed_urls(&self, seeds: &[String]) -> Vec<String> {
759        self.build_cluster_urls(
760            &seeds
761                .iter()
762                .filter_map(|seed| ClusterNode::from_seed(seed))
763                .collect::<Vec<ClusterNode>>(),
764        )
765    }
766
767    fn build_cluster_urls(&self, nodes: &[ClusterNode]) -> Vec<String> {
768        let username = self
769            .cluster
770            .username
771            .as_deref()
772            .or(self.username.as_deref());
773        let password = self
774            .cluster
775            .password
776            .as_deref()
777            .or(self.password.as_deref());
778        let use_tls = self.cluster.use_tls;
779
780        nodes
781            .iter()
782            .map(|node| node.to_url_with_options(use_tls, username, password))
783            .collect()
784    }
785}
786
787impl RedisSentinel {
788    pub fn to_host_port(&self) -> String {
789        format!("{}:{}", self.host, self.port)
790    }
791}
792
793#[derive(Debug, Clone, Serialize, Deserialize)]
794#[serde(default)]
795pub struct ClusterNode {
796    pub host: String,
797    pub port: u16,
798}
799
800impl ClusterNode {
801    pub fn to_url(&self) -> String {
802        self.to_url_with_options(false, None, None)
803    }
804
805    pub fn to_url_with_options(
806        &self,
807        use_tls: bool,
808        username: Option<&str>,
809        password: Option<&str>,
810    ) -> String {
811        let host = self.host.trim();
812
813        if host.starts_with("redis://") || host.starts_with("rediss://") {
814            if let Ok(parsed) = Url::parse(host)
815                && let Some(host_str) = parsed.host_str()
816            {
817                let scheme = parsed.scheme();
818                let port = parsed.port_or_known_default().unwrap_or(self.port);
819                let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
820                let parsed_password = parsed.password();
821                let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
822                let (effective_username, effective_password) = if has_embedded_auth {
823                    (parsed_username, parsed_password)
824                } else {
825                    (username, password)
826                };
827
828                return build_redis_url(
829                    scheme,
830                    host_str,
831                    port,
832                    effective_username,
833                    effective_password,
834                );
835            }
836
837            // Fallback for malformed URLs
838            let has_port = if let Some(bracket_pos) = host.rfind(']') {
839                host[bracket_pos..].contains(':')
840            } else {
841                host.split(':').count() >= 3
842            };
843            let base = if has_port {
844                host.to_string()
845            } else {
846                format!("{}:{}", host, self.port)
847            };
848
849            if let Ok(parsed) = Url::parse(&base) {
850                let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
851                let parsed_password = parsed.password();
852                if let Some(host_str) = parsed.host_str() {
853                    let port = parsed.port_or_known_default().unwrap_or(self.port);
854                    let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
855                    let (effective_username, effective_password) = if has_embedded_auth {
856                        (parsed_username, parsed_password)
857                    } else {
858                        (username, password)
859                    };
860                    return build_redis_url(
861                        parsed.scheme(),
862                        host_str,
863                        port,
864                        effective_username,
865                        effective_password,
866                    );
867                }
868            }
869            return base;
870        }
871
872        let (normalized_host, normalized_port) = split_plain_host_and_port(host, self.port);
873        let scheme = if use_tls { "rediss" } else { "redis" };
874        build_redis_url(
875            scheme,
876            &normalized_host,
877            normalized_port,
878            username,
879            password,
880        )
881    }
882
883    pub fn from_seed(seed: &str) -> Option<Self> {
884        let trimmed = seed.trim();
885        if trimmed.is_empty() {
886            return None;
887        }
888
889        if trimmed.starts_with("redis://") || trimmed.starts_with("rediss://") {
890            let port = Url::parse(trimmed)
891                .ok()
892                .and_then(|parsed| parsed.port_or_known_default())
893                .unwrap_or(6379);
894            return Some(Self {
895                host: trimmed.to_string(),
896                port,
897            });
898        }
899
900        let (host, port) = split_plain_host_and_port(trimmed, 6379);
901        Some(Self { host, port })
902    }
903}
904
905fn split_plain_host_and_port(raw_host: &str, default_port: u16) -> (String, u16) {
906    let host = raw_host.trim();
907
908    // Handle bracketed IPv6: [::1]:6379
909    if host.starts_with('[') {
910        if let Some(end_bracket) = host.find(']') {
911            let host_part = host[1..end_bracket].to_string();
912            let remainder = &host[end_bracket + 1..];
913            if let Some(port_str) = remainder.strip_prefix(':')
914                && let Ok(port) = port_str.parse::<u16>()
915            {
916                return (host_part, port);
917            }
918            return (host_part, default_port);
919        }
920        return (host.to_string(), default_port);
921    }
922
923    // Handle hostname/IP with port: host:6379
924    if host.matches(':').count() == 1
925        && let Some((host_part, port_part)) = host.rsplit_once(':')
926        && let Ok(port) = port_part.parse::<u16>()
927    {
928        return (host_part.to_string(), port);
929    }
930
931    (host.to_string(), default_port)
932}
933
934fn build_redis_url(
935    scheme: &str,
936    host: &str,
937    port: u16,
938    username: Option<&str>,
939    password: Option<&str>,
940) -> String {
941    let mut url = format!("{scheme}://");
942
943    if let Some(user) = username {
944        url.push_str(&urlencoding::encode(user));
945        if let Some(pass) = password {
946            url.push(':');
947            url.push_str(&urlencoding::encode(pass));
948        }
949        url.push('@');
950    } else if let Some(pass) = password {
951        url.push(':');
952        url.push_str(&urlencoding::encode(pass));
953        url.push('@');
954    }
955
956    if host.contains(':') && !host.starts_with('[') {
957        url.push('[');
958        url.push_str(host);
959        url.push(']');
960    } else {
961        url.push_str(host);
962    }
963    url.push(':');
964    url.push_str(&port.to_string());
965    url
966}
967
968#[derive(Debug, Clone, Serialize, Deserialize)]
969#[serde(default)]
970pub struct DatabasePooling {
971    pub enabled: bool,
972    pub min: u32,
973    pub max: u32,
974}
975
976#[derive(Debug, Clone, Serialize, Deserialize)]
977#[serde(default)]
978pub struct EventLimits {
979    pub max_channels_at_once: u32,
980    pub max_name_length: u32,
981    pub max_payload_in_kb: u32,
982    pub max_batch_size: u32,
983}
984
985#[derive(Debug, Clone, Serialize, Deserialize)]
986#[serde(default)]
987pub struct IdempotencyConfig {
988    /// Whether idempotency key support is enabled
989    pub enabled: bool,
990    /// TTL in seconds for idempotency keys (default: 120s, like Ably's 2-minute window)
991    pub ttl_seconds: u64,
992    /// Maximum length of an idempotency key
993    pub max_key_length: usize,
994}
995
996#[derive(Debug, Clone, Serialize, Deserialize)]
997#[serde(default)]
998pub struct EphemeralConfig {
999    /// Whether ephemeral message handling is enabled.
1000    pub enabled: bool,
1001}
1002
1003#[derive(Debug, Clone, Serialize, Deserialize)]
1004#[serde(default)]
1005pub struct EchoControlConfig {
1006    /// Whether connection-level and per-message echo control is enabled.
1007    pub enabled: bool,
1008    /// Default echo behavior for new V2 connections when the query param is omitted.
1009    pub default_echo_messages: bool,
1010}
1011
1012#[derive(Debug, Clone, Serialize, Deserialize)]
1013#[serde(default)]
1014pub struct EventNameFilteringConfig {
1015    /// Whether per-subscription event name filtering is enabled.
1016    pub enabled: bool,
1017    /// Maximum event names allowed in a single filter.
1018    pub max_events_per_filter: usize,
1019    /// Maximum length for each event name in a filter.
1020    pub max_event_name_length: usize,
1021}
1022
1023#[derive(Debug, Clone, Serialize, Deserialize)]
1024#[serde(default)]
1025pub struct ConnectionRecoveryConfig {
1026    /// Whether connection recovery (resume) is enabled.
1027    /// When enabled, the server keeps a bounded replay buffer per channel so that
1028    /// reconnecting clients can receive missed messages. Disabled by default for
1029    /// Pusher protocol compatibility.
1030    pub enabled: bool,
1031    /// How long messages stay in the replay buffer (seconds). Default: 120 (2 min).
1032    pub buffer_ttl_seconds: u64,
1033    /// Maximum number of messages kept per channel. Default: 100.
1034    pub max_buffer_size: usize,
1035}
1036
1037#[derive(Debug, Clone, Serialize, Deserialize)]
1038#[serde(default)]
1039pub struct HttpApiConfig {
1040    pub request_limit_in_mb: u32,
1041    pub accept_traffic: AcceptTraffic,
1042    pub usage_enabled: bool,
1043}
1044
1045#[derive(Debug, Clone, Serialize, Deserialize)]
1046#[serde(default)]
1047pub struct AcceptTraffic {
1048    pub memory_threshold: f64,
1049}
1050
1051#[derive(Debug, Clone, Serialize, Deserialize)]
1052#[serde(default)]
1053pub struct InstanceConfig {
1054    pub process_id: String,
1055}
1056
1057#[derive(Debug, Clone, Serialize, Deserialize)]
1058#[serde(default)]
1059pub struct MetricsConfig {
1060    pub enabled: bool,
1061    pub driver: MetricsDriver,
1062    pub host: String,
1063    pub prometheus: PrometheusConfig,
1064    pub port: u16,
1065}
1066
1067#[derive(Debug, Clone, Serialize, Deserialize)]
1068#[serde(default)]
1069pub struct PrometheusConfig {
1070    pub prefix: String,
1071}
1072
1073#[derive(Debug, Clone, Serialize, Deserialize)]
1074#[serde(default)]
1075pub struct LoggingConfig {
1076    pub colors_enabled: bool,
1077    pub include_target: bool,
1078}
1079
1080#[derive(Debug, Clone, Serialize, Deserialize)]
1081#[serde(default)]
1082pub struct PresenceConfig {
1083    pub max_members_per_channel: u32,
1084    pub max_member_size_in_kb: u32,
1085}
1086
1087/// WebSocket connection buffer configuration
1088/// Controls backpressure handling for slow consumers
1089#[derive(Debug, Clone, Serialize, Deserialize)]
1090#[serde(default)]
1091pub struct WebSocketConfig {
1092    pub max_messages: Option<usize>,
1093    pub max_bytes: Option<usize>,
1094    pub disconnect_on_buffer_full: bool,
1095    pub max_message_size: usize,
1096    pub max_frame_size: usize,
1097    pub write_buffer_size: usize,
1098    pub max_backpressure: usize,
1099    pub auto_ping: bool,
1100    pub ping_interval: u32,
1101    pub idle_timeout: u32,
1102    pub compression: String,
1103}
1104
1105impl Default for WebSocketConfig {
1106    fn default() -> Self {
1107        Self {
1108            max_messages: Some(1000),
1109            max_bytes: None,
1110            disconnect_on_buffer_full: true,
1111            max_message_size: 64 * 1024 * 1024,
1112            max_frame_size: 16 * 1024 * 1024,
1113            write_buffer_size: 16 * 1024,
1114            max_backpressure: 1024 * 1024,
1115            auto_ping: true,
1116            ping_interval: 30,
1117            idle_timeout: 120,
1118            compression: "disabled".to_string(),
1119        }
1120    }
1121}
1122
1123impl WebSocketConfig {
1124    /// Convert to WebSocketBufferConfig for runtime use
1125    pub fn to_buffer_config(&self) -> crate::websocket::WebSocketBufferConfig {
1126        use crate::websocket::{BufferLimit, WebSocketBufferConfig};
1127
1128        let limit = match (self.max_messages, self.max_bytes) {
1129            (Some(messages), Some(bytes)) => BufferLimit::Both { messages, bytes },
1130            (Some(messages), None) => BufferLimit::Messages(messages),
1131            (None, Some(bytes)) => BufferLimit::Bytes(bytes),
1132            (None, None) => BufferLimit::Messages(1000),
1133        };
1134
1135        WebSocketBufferConfig {
1136            limit,
1137            disconnect_on_full: self.disconnect_on_buffer_full,
1138        }
1139    }
1140
1141    /// Convert to native sockudo-ws runtime configuration.
1142    pub fn to_sockudo_ws_config(
1143        &self,
1144        websocket_max_payload_kb: u32,
1145        activity_timeout: u64,
1146    ) -> sockudo_ws::Config {
1147        use sockudo_ws::Compression;
1148
1149        let compression = match self.compression.to_lowercase().as_str() {
1150            "dedicated" => Compression::Dedicated,
1151            "shared" => Compression::Shared,
1152            "window256b" => Compression::Window256B,
1153            "window1kb" => Compression::Window1KB,
1154            "window2kb" => Compression::Window2KB,
1155            "window4kb" => Compression::Window4KB,
1156            "window8kb" => Compression::Window8KB,
1157            "window16kb" => Compression::Window16KB,
1158            "window32kb" => Compression::Window32KB,
1159            _ => Compression::Disabled,
1160        };
1161
1162        sockudo_ws::Config::builder()
1163            .max_payload_length(
1164                self.max_bytes
1165                    .unwrap_or(websocket_max_payload_kb as usize * 1024),
1166            )
1167            .max_message_size(self.max_message_size)
1168            .max_frame_size(self.max_frame_size)
1169            .write_buffer_size(self.write_buffer_size)
1170            .max_backpressure(self.max_backpressure)
1171            .idle_timeout(self.idle_timeout)
1172            .auto_ping(self.auto_ping)
1173            .ping_interval(self.ping_interval.max((activity_timeout / 2).max(5) as u32))
1174            .compression(compression)
1175            .build()
1176    }
1177}
1178
1179#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1180#[serde(default)]
1181pub struct QueueConfig {
1182    pub driver: QueueDriver,
1183    pub redis: RedisQueueConfig,
1184    pub redis_cluster: RedisClusterQueueConfig,
1185    pub sqs: SqsQueueConfig,
1186    pub sns: SnsQueueConfig,
1187}
1188
1189#[derive(Debug, Clone, Serialize, Deserialize)]
1190#[serde(default)]
1191pub struct RedisQueueConfig {
1192    pub concurrency: u32,
1193    pub prefix: Option<String>,
1194    pub url_override: Option<String>,
1195    pub cluster_mode: bool,
1196}
1197
1198#[derive(Clone, Debug, Serialize, Deserialize)]
1199#[serde(default)]
1200pub struct RateLimit {
1201    pub max_requests: u32,
1202    pub window_seconds: u64,
1203    pub identifier: Option<String>,
1204    pub trust_hops: Option<u32>,
1205}
1206
1207#[derive(Debug, Clone, Serialize, Deserialize)]
1208#[serde(default)]
1209pub struct RateLimiterConfig {
1210    pub enabled: bool,
1211    pub driver: CacheDriver,
1212    pub api_rate_limit: RateLimit,
1213    pub websocket_rate_limit: RateLimit,
1214    pub redis: RedisConfig,
1215}
1216
1217#[derive(Debug, Clone, Serialize, Deserialize)]
1218#[serde(default)]
1219pub struct SslConfig {
1220    pub enabled: bool,
1221    pub cert_path: String,
1222    pub key_path: String,
1223    pub passphrase: Option<String>,
1224    pub ca_path: Option<String>,
1225    pub redirect_http: bool,
1226    pub http_port: Option<u16>,
1227}
1228
1229#[derive(Debug, Clone, Serialize, Deserialize)]
1230#[serde(default)]
1231pub struct WebhooksConfig {
1232    pub batching: BatchingConfig,
1233    pub retry: WebhookRetryConfig,
1234    pub request_timeout_ms: u64,
1235}
1236
1237#[derive(Debug, Clone, Serialize, Deserialize)]
1238#[serde(default)]
1239pub struct BatchingConfig {
1240    pub enabled: bool,
1241    pub duration: u64,
1242    pub size: usize,
1243}
1244
1245#[derive(Debug, Clone, Serialize, Deserialize)]
1246#[serde(default)]
1247pub struct WebhookRetryConfig {
1248    pub enabled: bool,
1249    pub max_attempts: Option<u32>,
1250    pub max_elapsed_time_ms: u64,
1251    pub initial_backoff_ms: u64,
1252    pub max_backoff_ms: u64,
1253}
1254
1255impl Default for WebhooksConfig {
1256    fn default() -> Self {
1257        Self {
1258            batching: BatchingConfig::default(),
1259            retry: WebhookRetryConfig::default(),
1260            request_timeout_ms: 10_000,
1261        }
1262    }
1263}
1264
1265impl Default for WebhookRetryConfig {
1266    fn default() -> Self {
1267        Self {
1268            enabled: true,
1269            max_attempts: None,
1270            max_elapsed_time_ms: 300_000,
1271            initial_backoff_ms: 1_000,
1272            max_backoff_ms: 60_000,
1273        }
1274    }
1275}
1276
1277#[derive(Debug, Clone, Serialize, Deserialize)]
1278#[serde(default)]
1279pub struct ClusterHealthConfig {
1280    pub enabled: bool,
1281    pub heartbeat_interval_ms: u64,
1282    pub node_timeout_ms: u64,
1283    pub cleanup_interval_ms: u64,
1284}
1285
1286#[derive(Debug, Clone, Serialize, Deserialize)]
1287#[serde(default)]
1288pub struct UnixSocketConfig {
1289    pub enabled: bool,
1290    pub path: String,
1291    #[serde(deserialize_with = "deserialize_octal_permission")]
1292    pub permission_mode: u32,
1293}
1294
1295#[derive(Debug, Clone, Serialize, Deserialize)]
1296#[serde(default)]
1297pub struct DeltaCompressionOptionsConfig {
1298    pub enabled: bool,
1299    pub algorithm: String,
1300    pub full_message_interval: u32,
1301    pub min_message_size: usize,
1302    pub max_state_age_secs: u64,
1303    pub max_channel_states_per_socket: usize,
1304    pub max_conflation_states_per_channel: Option<usize>,
1305    pub conflation_key_path: Option<String>,
1306    pub cluster_coordination: bool,
1307    pub omit_delta_algorithm: bool,
1308}
1309
1310/// Cleanup system configuration (minimal version for options; full impl lives in sockudo crate)
1311#[derive(Debug, Clone, Serialize, Deserialize)]
1312#[serde(default)]
1313pub struct CleanupConfig {
1314    pub queue_buffer_size: usize,
1315    pub batch_size: usize,
1316    pub batch_timeout_ms: u64,
1317    pub worker_threads: WorkerThreadsConfig,
1318    pub max_retry_attempts: u32,
1319    pub async_enabled: bool,
1320    pub fallback_to_sync: bool,
1321}
1322
1323/// Worker threads configuration for the cleanup system
1324#[derive(Debug, Clone)]
1325pub enum WorkerThreadsConfig {
1326    Auto,
1327    Fixed(usize),
1328}
1329
1330impl serde::Serialize for WorkerThreadsConfig {
1331    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1332    where
1333        S: serde::Serializer,
1334    {
1335        match self {
1336            WorkerThreadsConfig::Auto => serializer.serialize_str("auto"),
1337            WorkerThreadsConfig::Fixed(n) => serializer.serialize_u64(*n as u64),
1338        }
1339    }
1340}
1341
1342impl<'de> serde::Deserialize<'de> for WorkerThreadsConfig {
1343    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1344    where
1345        D: serde::Deserializer<'de>,
1346    {
1347        use serde::de;
1348        struct WorkerThreadsVisitor;
1349        impl<'de> de::Visitor<'de> for WorkerThreadsVisitor {
1350            type Value = WorkerThreadsConfig;
1351
1352            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1353                formatter.write_str(r#""auto" or a positive integer"#)
1354            }
1355
1356            fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
1357                if value.eq_ignore_ascii_case("auto") {
1358                    Ok(WorkerThreadsConfig::Auto)
1359                } else if let Ok(n) = value.parse::<usize>() {
1360                    Ok(WorkerThreadsConfig::Fixed(n))
1361                } else {
1362                    Err(E::custom(format!(
1363                        "expected 'auto' or a number, got '{value}'"
1364                    )))
1365                }
1366            }
1367
1368            fn visit_u64<E: de::Error>(self, value: u64) -> Result<Self::Value, E> {
1369                Ok(WorkerThreadsConfig::Fixed(value as usize))
1370            }
1371
1372            fn visit_i64<E: de::Error>(self, value: i64) -> Result<Self::Value, E> {
1373                if value >= 0 {
1374                    Ok(WorkerThreadsConfig::Fixed(value as usize))
1375                } else {
1376                    Err(E::custom("worker_threads must be non-negative"))
1377                }
1378            }
1379        }
1380        deserializer.deserialize_any(WorkerThreadsVisitor)
1381    }
1382}
1383
1384impl Default for CleanupConfig {
1385    fn default() -> Self {
1386        Self {
1387            queue_buffer_size: 1024,
1388            batch_size: 64,
1389            batch_timeout_ms: 100,
1390            worker_threads: WorkerThreadsConfig::Auto,
1391            max_retry_attempts: 3,
1392            async_enabled: true,
1393            fallback_to_sync: true,
1394        }
1395    }
1396}
1397
1398impl CleanupConfig {
1399    pub fn validate(&self) -> Result<(), String> {
1400        if self.queue_buffer_size == 0 {
1401            return Err("queue_buffer_size must be greater than 0".to_string());
1402        }
1403        if self.batch_size == 0 {
1404            return Err("batch_size must be greater than 0".to_string());
1405        }
1406        if self.batch_timeout_ms == 0 {
1407            return Err("batch_timeout_ms must be greater than 0".to_string());
1408        }
1409        if let WorkerThreadsConfig::Fixed(n) = self.worker_threads
1410            && n == 0
1411        {
1412            return Err("worker_threads must be greater than 0 when using fixed count".to_string());
1413        }
1414        Ok(())
1415    }
1416}
1417
1418#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1419#[serde(default)]
1420pub struct TagFilteringConfig {
1421    #[serde(default)]
1422    pub enabled: bool,
1423    #[serde(default = "default_true")]
1424    pub enable_tags: bool,
1425}
1426
1427fn default_true() -> bool {
1428    true
1429}
1430
1431// --- Default Implementations ---
1432
1433impl Default for ServerOptions {
1434    fn default() -> Self {
1435        Self {
1436            adapter: AdapterConfig::default(),
1437            app_manager: AppManagerConfig::default(),
1438            cache: CacheConfig::default(),
1439            channel_limits: ChannelLimits::default(),
1440            cors: CorsConfig::default(),
1441            database: DatabaseConfig::default(),
1442            database_pooling: DatabasePooling::default(),
1443            debug: false,
1444            tag_filtering: TagFilteringConfig::default(),
1445            event_limits: EventLimits::default(),
1446            host: "0.0.0.0".to_string(),
1447            http_api: HttpApiConfig::default(),
1448            instance: InstanceConfig::default(),
1449            logging: None,
1450            metrics: MetricsConfig::default(),
1451            mode: "production".to_string(),
1452            port: 6001,
1453            path_prefix: "/".to_string(),
1454            presence: PresenceConfig::default(),
1455            queue: QueueConfig::default(),
1456            rate_limiter: RateLimiterConfig::default(),
1457            shutdown_grace_period: 10,
1458            ssl: SslConfig::default(),
1459            user_authentication_timeout: 3600,
1460            webhooks: WebhooksConfig::default(),
1461            websocket_max_payload_kb: 64,
1462            cleanup: CleanupConfig::default(),
1463            activity_timeout: 120,
1464            cluster_health: ClusterHealthConfig::default(),
1465            unix_socket: UnixSocketConfig::default(),
1466            delta_compression: DeltaCompressionOptionsConfig::default(),
1467            websocket: WebSocketConfig::default(),
1468            connection_recovery: ConnectionRecoveryConfig::default(),
1469            idempotency: IdempotencyConfig::default(),
1470            ephemeral: EphemeralConfig::default(),
1471            echo_control: EchoControlConfig::default(),
1472            event_name_filtering: EventNameFilteringConfig::default(),
1473        }
1474    }
1475}
1476
1477impl Default for SqsQueueConfig {
1478    fn default() -> Self {
1479        Self {
1480            region: "us-east-1".to_string(),
1481            queue_url_prefix: None,
1482            visibility_timeout: 30,
1483            endpoint_url: None,
1484            max_messages: 10,
1485            wait_time_seconds: 5,
1486            concurrency: 5,
1487            fifo: false,
1488            message_group_id: Some("default".to_string()),
1489        }
1490    }
1491}
1492
1493impl Default for SnsQueueConfig {
1494    fn default() -> Self {
1495        Self {
1496            region: "us-east-1".to_string(),
1497            topic_arn: String::new(),
1498            endpoint_url: None,
1499        }
1500    }
1501}
1502
1503impl Default for RedisAdapterConfig {
1504    fn default() -> Self {
1505        Self {
1506            requests_timeout: 5000,
1507            prefix: "sockudo_adapter:".to_string(),
1508            redis_pub_options: AHashMap::new(),
1509            redis_sub_options: AHashMap::new(),
1510            cluster_mode: false,
1511        }
1512    }
1513}
1514
1515impl Default for RedisClusterAdapterConfig {
1516    fn default() -> Self {
1517        Self {
1518            nodes: vec![],
1519            prefix: "sockudo_adapter:".to_string(),
1520            request_timeout_ms: 1000,
1521            use_connection_manager: true,
1522            use_sharded_pubsub: false,
1523        }
1524    }
1525}
1526
1527impl Default for NatsAdapterConfig {
1528    fn default() -> Self {
1529        Self {
1530            servers: vec!["nats://localhost:4222".to_string()],
1531            prefix: "sockudo_adapter:".to_string(),
1532            request_timeout_ms: 5000,
1533            username: None,
1534            password: None,
1535            token: None,
1536            connection_timeout_ms: 5000,
1537            nodes_number: None,
1538        }
1539    }
1540}
1541
1542impl Default for RabbitMqAdapterConfig {
1543    fn default() -> Self {
1544        Self {
1545            url: "amqp://guest:guest@127.0.0.1:5672/%2f".to_string(),
1546            prefix: "sockudo_adapter".to_string(),
1547            request_timeout_ms: 5000,
1548            connection_timeout_ms: 5000,
1549            nodes_number: None,
1550        }
1551    }
1552}
1553
1554impl Default for GooglePubSubAdapterConfig {
1555    fn default() -> Self {
1556        Self {
1557            project_id: "".to_string(),
1558            prefix: "sockudo-adapter".to_string(),
1559            request_timeout_ms: 5000,
1560            emulator_host: None,
1561            nodes_number: None,
1562        }
1563    }
1564}
1565
1566impl Default for KafkaAdapterConfig {
1567    fn default() -> Self {
1568        Self {
1569            brokers: vec!["localhost:9092".to_string()],
1570            prefix: "sockudo_adapter".to_string(),
1571            request_timeout_ms: 5000,
1572            security_protocol: None,
1573            sasl_mechanism: None,
1574            sasl_username: None,
1575            sasl_password: None,
1576            nodes_number: None,
1577        }
1578    }
1579}
1580
1581impl Default for CacheSettings {
1582    fn default() -> Self {
1583        Self {
1584            enabled: true,
1585            ttl: 300,
1586        }
1587    }
1588}
1589
1590impl Default for MemoryCacheOptions {
1591    fn default() -> Self {
1592        Self {
1593            ttl: 300,
1594            cleanup_interval: 60,
1595            max_capacity: 10000,
1596        }
1597    }
1598}
1599
1600impl Default for CacheConfig {
1601    fn default() -> Self {
1602        Self {
1603            driver: CacheDriver::default(),
1604            redis: RedisConfig {
1605                prefix: Some("sockudo_cache:".to_string()),
1606                url_override: None,
1607                cluster_mode: false,
1608            },
1609            memory: MemoryCacheOptions::default(),
1610        }
1611    }
1612}
1613
1614impl Default for ChannelLimits {
1615    fn default() -> Self {
1616        Self {
1617            max_name_length: 200,
1618            cache_ttl: 3600,
1619        }
1620    }
1621}
1622impl Default for CorsConfig {
1623    fn default() -> Self {
1624        Self {
1625            credentials: true,
1626            origin: vec!["*".to_string()],
1627            methods: vec!["GET".to_string(), "POST".to_string(), "OPTIONS".to_string()],
1628            allowed_headers: vec![
1629                "Authorization".to_string(),
1630                "Content-Type".to_string(),
1631                "X-Requested-With".to_string(),
1632                "Accept".to_string(),
1633            ],
1634        }
1635    }
1636}
1637
1638impl Default for DatabaseConnection {
1639    fn default() -> Self {
1640        Self {
1641            host: "localhost".to_string(),
1642            port: 3306,
1643            username: "root".to_string(),
1644            password: "".to_string(),
1645            database: "sockudo".to_string(),
1646            table_name: "applications".to_string(),
1647            connection_pool_size: 10,
1648            pool_min: None,
1649            pool_max: None,
1650            cache_ttl: 300,
1651            cache_cleanup_interval: 60,
1652            cache_max_capacity: 100,
1653        }
1654    }
1655}
1656
1657impl Default for RedisConnection {
1658    fn default() -> Self {
1659        Self {
1660            host: "127.0.0.1".to_string(),
1661            port: 6379,
1662            db: 0,
1663            username: None,
1664            password: None,
1665            key_prefix: "sockudo:".to_string(),
1666            sentinels: Vec::new(),
1667            sentinel_password: None,
1668            name: "mymaster".to_string(),
1669            cluster: RedisClusterConnection::default(),
1670            cluster_nodes: Vec::new(),
1671        }
1672    }
1673}
1674
1675impl Default for RedisSentinel {
1676    fn default() -> Self {
1677        Self {
1678            host: "localhost".to_string(),
1679            port: 26379,
1680        }
1681    }
1682}
1683
1684impl Default for ClusterNode {
1685    fn default() -> Self {
1686        Self {
1687            host: "127.0.0.1".to_string(),
1688            port: 7000,
1689        }
1690    }
1691}
1692
1693impl Default for DatabasePooling {
1694    fn default() -> Self {
1695        Self {
1696            enabled: true,
1697            min: 2,
1698            max: 10,
1699        }
1700    }
1701}
1702
1703impl Default for EventLimits {
1704    fn default() -> Self {
1705        Self {
1706            max_channels_at_once: 100,
1707            max_name_length: 200,
1708            max_payload_in_kb: 100,
1709            max_batch_size: 10,
1710        }
1711    }
1712}
1713
1714impl Default for IdempotencyConfig {
1715    fn default() -> Self {
1716        Self {
1717            enabled: true,
1718            ttl_seconds: 120,
1719            max_key_length: 128,
1720        }
1721    }
1722}
1723
1724impl Default for EphemeralConfig {
1725    fn default() -> Self {
1726        Self { enabled: true }
1727    }
1728}
1729
1730impl Default for EchoControlConfig {
1731    fn default() -> Self {
1732        Self {
1733            enabled: true,
1734            default_echo_messages: true,
1735        }
1736    }
1737}
1738
1739impl Default for EventNameFilteringConfig {
1740    fn default() -> Self {
1741        Self {
1742            enabled: true,
1743            max_events_per_filter: 50,
1744            max_event_name_length: 200,
1745        }
1746    }
1747}
1748
1749impl Default for ConnectionRecoveryConfig {
1750    fn default() -> Self {
1751        Self {
1752            enabled: false,
1753            buffer_ttl_seconds: 120,
1754            max_buffer_size: 100,
1755        }
1756    }
1757}
1758
1759impl Default for HttpApiConfig {
1760    fn default() -> Self {
1761        Self {
1762            request_limit_in_mb: 10,
1763            accept_traffic: AcceptTraffic::default(),
1764            usage_enabled: true,
1765        }
1766    }
1767}
1768
1769impl Default for AcceptTraffic {
1770    fn default() -> Self {
1771        Self {
1772            memory_threshold: 0.90,
1773        }
1774    }
1775}
1776
1777impl Default for InstanceConfig {
1778    fn default() -> Self {
1779        Self {
1780            process_id: uuid::Uuid::new_v4().to_string(),
1781        }
1782    }
1783}
1784
1785impl Default for LoggingConfig {
1786    fn default() -> Self {
1787        Self {
1788            colors_enabled: true,
1789            include_target: true,
1790        }
1791    }
1792}
1793
1794impl Default for MetricsConfig {
1795    fn default() -> Self {
1796        Self {
1797            enabled: true,
1798            driver: MetricsDriver::default(),
1799            host: "0.0.0.0".to_string(),
1800            prometheus: PrometheusConfig::default(),
1801            port: 9601,
1802        }
1803    }
1804}
1805
1806impl Default for PrometheusConfig {
1807    fn default() -> Self {
1808        Self {
1809            prefix: "sockudo_".to_string(),
1810        }
1811    }
1812}
1813
1814impl Default for PresenceConfig {
1815    fn default() -> Self {
1816        Self {
1817            max_members_per_channel: 100,
1818            max_member_size_in_kb: 2,
1819        }
1820    }
1821}
1822
1823impl Default for RedisQueueConfig {
1824    fn default() -> Self {
1825        Self {
1826            concurrency: 5,
1827            prefix: Some("sockudo_queue:".to_string()),
1828            url_override: None,
1829            cluster_mode: false,
1830        }
1831    }
1832}
1833
1834impl Default for RateLimit {
1835    fn default() -> Self {
1836        Self {
1837            max_requests: 60,
1838            window_seconds: 60,
1839            identifier: Some("default".to_string()),
1840            trust_hops: Some(0),
1841        }
1842    }
1843}
1844
1845impl Default for RateLimiterConfig {
1846    fn default() -> Self {
1847        Self {
1848            enabled: true,
1849            driver: CacheDriver::Memory,
1850            api_rate_limit: RateLimit {
1851                max_requests: 100,
1852                window_seconds: 60,
1853                identifier: Some("api".to_string()),
1854                trust_hops: Some(0),
1855            },
1856            websocket_rate_limit: RateLimit {
1857                max_requests: 20,
1858                window_seconds: 60,
1859                identifier: Some("websocket_connect".to_string()),
1860                trust_hops: Some(0),
1861            },
1862            redis: RedisConfig {
1863                prefix: Some("sockudo_rl:".to_string()),
1864                url_override: None,
1865                cluster_mode: false,
1866            },
1867        }
1868    }
1869}
1870
1871impl Default for SslConfig {
1872    fn default() -> Self {
1873        Self {
1874            enabled: false,
1875            cert_path: "".to_string(),
1876            key_path: "".to_string(),
1877            passphrase: None,
1878            ca_path: None,
1879            redirect_http: false,
1880            http_port: Some(80),
1881        }
1882    }
1883}
1884
1885impl Default for BatchingConfig {
1886    fn default() -> Self {
1887        Self {
1888            enabled: true,
1889            duration: 50,
1890            size: 100,
1891        }
1892    }
1893}
1894
1895impl Default for ClusterHealthConfig {
1896    fn default() -> Self {
1897        Self {
1898            enabled: true,
1899            heartbeat_interval_ms: 10000,
1900            node_timeout_ms: 30000,
1901            cleanup_interval_ms: 10000,
1902        }
1903    }
1904}
1905
1906impl Default for UnixSocketConfig {
1907    fn default() -> Self {
1908        Self {
1909            enabled: false,
1910            path: "/var/run/sockudo/sockudo.sock".to_string(),
1911            permission_mode: 0o660,
1912        }
1913    }
1914}
1915
1916impl Default for DeltaCompressionOptionsConfig {
1917    fn default() -> Self {
1918        Self {
1919            enabled: true,
1920            algorithm: "fossil".to_string(),
1921            full_message_interval: 10,
1922            min_message_size: 100,
1923            max_state_age_secs: 300,
1924            max_channel_states_per_socket: 100,
1925            max_conflation_states_per_channel: Some(100),
1926            conflation_key_path: None,
1927            cluster_coordination: false,
1928            omit_delta_algorithm: false,
1929        }
1930    }
1931}
1932
1933impl ClusterHealthConfig {
1934    pub fn validate(&self) -> Result<(), String> {
1935        if self.heartbeat_interval_ms == 0 {
1936            return Err("heartbeat_interval_ms must be greater than 0".to_string());
1937        }
1938        if self.node_timeout_ms == 0 {
1939            return Err("node_timeout_ms must be greater than 0".to_string());
1940        }
1941        if self.cleanup_interval_ms == 0 {
1942            return Err("cleanup_interval_ms must be greater than 0".to_string());
1943        }
1944
1945        if self.heartbeat_interval_ms > self.node_timeout_ms / 3 {
1946            return Err(format!(
1947                "heartbeat_interval_ms ({}) should be at least 3x smaller than node_timeout_ms ({}) to avoid false positive dead node detection. Recommended: heartbeat_interval_ms <= {}",
1948                self.heartbeat_interval_ms,
1949                self.node_timeout_ms,
1950                self.node_timeout_ms / 3
1951            ));
1952        }
1953
1954        if self.cleanup_interval_ms > self.node_timeout_ms {
1955            return Err(format!(
1956                "cleanup_interval_ms ({}) should not be larger than node_timeout_ms ({}) to ensure timely dead node detection",
1957                self.cleanup_interval_ms, self.node_timeout_ms
1958            ));
1959        }
1960
1961        Ok(())
1962    }
1963}
1964
1965impl ServerOptions {
1966    pub async fn load_from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
1967        let content = tokio::fs::read_to_string(path).await?;
1968        let options: Self = if path.ends_with(".toml") {
1969            toml::from_str(&content)?
1970        } else {
1971            // Legacy JSON support
1972            sonic_rs::from_str(&content)?
1973        };
1974        Ok(options)
1975    }
1976
1977    pub async fn override_from_env(&mut self) -> Result<(), Box<dyn std::error::Error>> {
1978        // --- General Settings ---
1979        if let Ok(mode) = std::env::var("ENVIRONMENT") {
1980            self.mode = mode;
1981        }
1982        self.debug = parse_bool_env("DEBUG_MODE", self.debug);
1983        if parse_bool_env("DEBUG", false) {
1984            self.debug = true;
1985            info!("DEBUG environment variable forces debug mode ON");
1986        }
1987
1988        self.activity_timeout = parse_env::<u64>("ACTIVITY_TIMEOUT", self.activity_timeout);
1989
1990        if let Ok(host) = std::env::var("HOST") {
1991            self.host = host;
1992        }
1993        self.port = parse_env::<u16>("PORT", self.port);
1994        self.shutdown_grace_period =
1995            parse_env::<u64>("SHUTDOWN_GRACE_PERIOD", self.shutdown_grace_period);
1996        self.user_authentication_timeout = parse_env::<u64>(
1997            "USER_AUTHENTICATION_TIMEOUT",
1998            self.user_authentication_timeout,
1999        );
2000        self.websocket_max_payload_kb =
2001            parse_env::<u32>("WEBSOCKET_MAX_PAYLOAD_KB", self.websocket_max_payload_kb);
2002        if let Ok(id) = std::env::var("INSTANCE_PROCESS_ID") {
2003            self.instance.process_id = id;
2004        }
2005
2006        // --- Driver Configuration ---
2007        if let Ok(driver_str) = std::env::var("ADAPTER_DRIVER") {
2008            self.adapter.driver =
2009                parse_driver_enum(driver_str, self.adapter.driver.clone(), "Adapter");
2010        }
2011        self.adapter.buffer_multiplier_per_cpu = parse_env::<usize>(
2012            "ADAPTER_BUFFER_MULTIPLIER_PER_CPU",
2013            self.adapter.buffer_multiplier_per_cpu,
2014        );
2015        self.adapter.enable_socket_counting = parse_env::<bool>(
2016            "ADAPTER_ENABLE_SOCKET_COUNTING",
2017            self.adapter.enable_socket_counting,
2018        );
2019        if let Ok(driver_str) = std::env::var("CACHE_DRIVER") {
2020            self.cache.driver = parse_driver_enum(driver_str, self.cache.driver.clone(), "Cache");
2021        }
2022        if let Ok(driver_str) = std::env::var("QUEUE_DRIVER") {
2023            self.queue.driver = parse_driver_enum(driver_str, self.queue.driver.clone(), "Queue");
2024        }
2025        if let Ok(driver_str) = std::env::var("APP_MANAGER_DRIVER") {
2026            self.app_manager.driver =
2027                parse_driver_enum(driver_str, self.app_manager.driver.clone(), "AppManager");
2028        }
2029        if let Ok(driver_str) = std::env::var("RATE_LIMITER_DRIVER") {
2030            self.rate_limiter.driver = parse_driver_enum(
2031                driver_str,
2032                self.rate_limiter.driver.clone(),
2033                "RateLimiter Backend",
2034            );
2035        }
2036
2037        // --- Database: Redis ---
2038        if let Ok(host) = std::env::var("DATABASE_REDIS_HOST") {
2039            self.database.redis.host = host;
2040        }
2041        self.database.redis.port =
2042            parse_env::<u16>("DATABASE_REDIS_PORT", self.database.redis.port);
2043        if let Ok(username) = std::env::var("DATABASE_REDIS_USERNAME") {
2044            self.database.redis.username = if username.is_empty() {
2045                None
2046            } else {
2047                Some(username)
2048            };
2049        }
2050        if let Ok(password) = std::env::var("DATABASE_REDIS_PASSWORD") {
2051            self.database.redis.password = Some(password);
2052        }
2053        self.database.redis.db = parse_env::<u32>("DATABASE_REDIS_DB", self.database.redis.db);
2054        if let Ok(prefix) = std::env::var("DATABASE_REDIS_KEY_PREFIX") {
2055            self.database.redis.key_prefix = prefix;
2056        }
2057        if let Ok(cluster_username) = std::env::var("DATABASE_REDIS_CLUSTER_USERNAME") {
2058            self.database.redis.cluster.username = if cluster_username.is_empty() {
2059                None
2060            } else {
2061                Some(cluster_username)
2062            };
2063        }
2064        if let Ok(cluster_password) = std::env::var("DATABASE_REDIS_CLUSTER_PASSWORD") {
2065            self.database.redis.cluster.password = Some(cluster_password);
2066        }
2067        self.database.redis.cluster.use_tls = parse_bool_env(
2068            "DATABASE_REDIS_CLUSTER_USE_TLS",
2069            self.database.redis.cluster.use_tls,
2070        );
2071
2072        // --- Database: MySQL ---
2073        if let Ok(host) = std::env::var("DATABASE_MYSQL_HOST") {
2074            self.database.mysql.host = host;
2075        }
2076        self.database.mysql.port =
2077            parse_env::<u16>("DATABASE_MYSQL_PORT", self.database.mysql.port);
2078        if let Ok(user) = std::env::var("DATABASE_MYSQL_USERNAME") {
2079            self.database.mysql.username = user;
2080        }
2081        if let Ok(pass) = std::env::var("DATABASE_MYSQL_PASSWORD") {
2082            self.database.mysql.password = pass;
2083        }
2084        if let Ok(db) = std::env::var("DATABASE_MYSQL_DATABASE") {
2085            self.database.mysql.database = db;
2086        }
2087        if let Ok(table) = std::env::var("DATABASE_MYSQL_TABLE_NAME") {
2088            self.database.mysql.table_name = table;
2089        }
2090        override_db_pool_settings(&mut self.database.mysql, "DATABASE_MYSQL");
2091
2092        // --- Database: PostgreSQL ---
2093        if let Ok(host) = std::env::var("DATABASE_POSTGRES_HOST") {
2094            self.database.postgres.host = host;
2095        }
2096        self.database.postgres.port =
2097            parse_env::<u16>("DATABASE_POSTGRES_PORT", self.database.postgres.port);
2098        if let Ok(user) = std::env::var("DATABASE_POSTGRES_USERNAME") {
2099            self.database.postgres.username = user;
2100        }
2101        if let Ok(pass) = std::env::var("DATABASE_POSTGRES_PASSWORD") {
2102            self.database.postgres.password = pass;
2103        }
2104        if let Ok(db) = std::env::var("DATABASE_POSTGRES_DATABASE") {
2105            self.database.postgres.database = db;
2106        }
2107        override_db_pool_settings(&mut self.database.postgres, "DATABASE_POSTGRES");
2108
2109        // --- Database: DynamoDB ---
2110        if let Ok(region) = std::env::var("DATABASE_DYNAMODB_REGION") {
2111            self.database.dynamodb.region = region;
2112        }
2113        if let Ok(table) = std::env::var("DATABASE_DYNAMODB_TABLE_NAME") {
2114            self.database.dynamodb.table_name = table;
2115        }
2116        if let Ok(endpoint) = std::env::var("DATABASE_DYNAMODB_ENDPOINT_URL") {
2117            self.database.dynamodb.endpoint_url = Some(endpoint);
2118        }
2119        if let Ok(key_id) = std::env::var("AWS_ACCESS_KEY_ID") {
2120            self.database.dynamodb.aws_access_key_id = Some(key_id);
2121        }
2122        if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") {
2123            self.database.dynamodb.aws_secret_access_key = Some(secret);
2124        }
2125
2126        // --- Database: SurrealDB ---
2127        if let Ok(url) = std::env::var("DATABASE_SURREALDB_URL") {
2128            self.database.surrealdb.url = url;
2129        }
2130        if let Ok(namespace) = std::env::var("DATABASE_SURREALDB_NAMESPACE") {
2131            self.database.surrealdb.namespace = namespace;
2132        }
2133        if let Ok(database) = std::env::var("DATABASE_SURREALDB_DATABASE") {
2134            self.database.surrealdb.database = database;
2135        }
2136        if let Ok(username) = std::env::var("DATABASE_SURREALDB_USERNAME") {
2137            self.database.surrealdb.username = username;
2138        }
2139        if let Ok(password) = std::env::var("DATABASE_SURREALDB_PASSWORD") {
2140            self.database.surrealdb.password = password;
2141        }
2142        if let Ok(table) = std::env::var("DATABASE_SURREALDB_TABLE_NAME") {
2143            self.database.surrealdb.table_name = table;
2144        }
2145
2146        // --- Redis Cluster ---
2147        let apply_redis_cluster_nodes = |options: &mut Self, nodes: &str| {
2148            let node_list: Vec<String> = nodes
2149                .split(',')
2150                .map(|s| s.trim())
2151                .filter(|s| !s.is_empty())
2152                .map(ToString::to_string)
2153                .collect();
2154
2155            options.adapter.cluster.nodes = node_list.clone();
2156            options.queue.redis_cluster.nodes = node_list.clone();
2157
2158            let parsed_nodes: Vec<ClusterNode> = node_list
2159                .iter()
2160                .filter_map(|seed| ClusterNode::from_seed(seed))
2161                .collect();
2162            options.database.redis.cluster.nodes = parsed_nodes.clone();
2163            options.database.redis.cluster_nodes = parsed_nodes;
2164        };
2165
2166        if let Ok(nodes) = std::env::var("REDIS_CLUSTER_NODES") {
2167            apply_redis_cluster_nodes(self, &nodes);
2168        }
2169        if let Ok(nodes) = std::env::var("DATABASE_REDIS_CLUSTER_NODES") {
2170            apply_redis_cluster_nodes(self, &nodes);
2171        }
2172        self.queue.redis_cluster.concurrency = parse_env::<u32>(
2173            "REDIS_CLUSTER_QUEUE_CONCURRENCY",
2174            self.queue.redis_cluster.concurrency,
2175        );
2176        if let Ok(prefix) = std::env::var("REDIS_CLUSTER_QUEUE_PREFIX") {
2177            self.queue.redis_cluster.prefix = Some(prefix);
2178        }
2179
2180        // --- SSL Configuration ---
2181        self.ssl.enabled = parse_bool_env("SSL_ENABLED", self.ssl.enabled);
2182        if let Ok(val) = std::env::var("SSL_CERT_PATH") {
2183            self.ssl.cert_path = val;
2184        }
2185        if let Ok(val) = std::env::var("SSL_KEY_PATH") {
2186            self.ssl.key_path = val;
2187        }
2188        self.ssl.redirect_http = parse_bool_env("SSL_REDIRECT_HTTP", self.ssl.redirect_http);
2189        if let Some(port) = parse_env_optional::<u16>("SSL_HTTP_PORT") {
2190            self.ssl.http_port = Some(port);
2191        }
2192
2193        // --- Unix Socket Configuration ---
2194        self.unix_socket.enabled = parse_bool_env("UNIX_SOCKET_ENABLED", self.unix_socket.enabled);
2195        if let Ok(path) = std::env::var("UNIX_SOCKET_PATH") {
2196            self.unix_socket.path = path;
2197        }
2198        if let Ok(mode_str) = std::env::var("UNIX_SOCKET_PERMISSION_MODE") {
2199            if mode_str.chars().all(|c| c.is_digit(8)) {
2200                if let Ok(mode) = u32::from_str_radix(&mode_str, 8) {
2201                    if mode <= 0o777 {
2202                        self.unix_socket.permission_mode = mode;
2203                    } else {
2204                        warn!(
2205                            "UNIX_SOCKET_PERMISSION_MODE '{}' exceeds maximum value 777. Using default: {:o}",
2206                            mode_str, self.unix_socket.permission_mode
2207                        );
2208                    }
2209                } else {
2210                    warn!(
2211                        "Failed to parse UNIX_SOCKET_PERMISSION_MODE '{}' as octal. Using default: {:o}",
2212                        mode_str, self.unix_socket.permission_mode
2213                    );
2214                }
2215            } else {
2216                warn!(
2217                    "UNIX_SOCKET_PERMISSION_MODE '{}' must contain only octal digits (0-7). Using default: {:o}",
2218                    mode_str, self.unix_socket.permission_mode
2219                );
2220            }
2221        }
2222
2223        // --- Metrics ---
2224        if let Ok(driver_str) = std::env::var("METRICS_DRIVER") {
2225            self.metrics.driver =
2226                parse_driver_enum(driver_str, self.metrics.driver.clone(), "Metrics");
2227        }
2228        self.metrics.enabled = parse_bool_env("METRICS_ENABLED", self.metrics.enabled);
2229        if let Ok(val) = std::env::var("METRICS_HOST") {
2230            self.metrics.host = val;
2231        }
2232        self.metrics.port = parse_env::<u16>("METRICS_PORT", self.metrics.port);
2233        if let Ok(val) = std::env::var("METRICS_PROMETHEUS_PREFIX") {
2234            self.metrics.prometheus.prefix = val;
2235        }
2236
2237        // --- HTTP API ---
2238        self.http_api.usage_enabled =
2239            parse_bool_env("HTTP_API_USAGE_ENABLED", self.http_api.usage_enabled);
2240
2241        // --- Rate Limiter ---
2242        self.rate_limiter.enabled =
2243            parse_bool_env("RATE_LIMITER_ENABLED", self.rate_limiter.enabled);
2244        self.rate_limiter.api_rate_limit.max_requests = parse_env::<u32>(
2245            "RATE_LIMITER_API_MAX_REQUESTS",
2246            self.rate_limiter.api_rate_limit.max_requests,
2247        );
2248        self.rate_limiter.api_rate_limit.window_seconds = parse_env::<u64>(
2249            "RATE_LIMITER_API_WINDOW_SECONDS",
2250            self.rate_limiter.api_rate_limit.window_seconds,
2251        );
2252        if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_API_TRUST_HOPS") {
2253            self.rate_limiter.api_rate_limit.trust_hops = Some(hops);
2254        }
2255        self.rate_limiter.websocket_rate_limit.max_requests = parse_env::<u32>(
2256            "RATE_LIMITER_WS_MAX_REQUESTS",
2257            self.rate_limiter.websocket_rate_limit.max_requests,
2258        );
2259        self.rate_limiter.websocket_rate_limit.window_seconds = parse_env::<u64>(
2260            "RATE_LIMITER_WS_WINDOW_SECONDS",
2261            self.rate_limiter.websocket_rate_limit.window_seconds,
2262        );
2263        if let Ok(prefix) = std::env::var("RATE_LIMITER_REDIS_PREFIX") {
2264            self.rate_limiter.redis.prefix = Some(prefix);
2265        }
2266
2267        // --- Queue: Redis ---
2268        self.queue.redis.concurrency =
2269            parse_env::<u32>("QUEUE_REDIS_CONCURRENCY", self.queue.redis.concurrency);
2270        if let Ok(prefix) = std::env::var("QUEUE_REDIS_PREFIX") {
2271            self.queue.redis.prefix = Some(prefix);
2272        }
2273
2274        // --- Queue: SQS ---
2275        if let Ok(region) = std::env::var("QUEUE_SQS_REGION") {
2276            self.queue.sqs.region = region;
2277        }
2278        self.queue.sqs.visibility_timeout = parse_env::<i32>(
2279            "QUEUE_SQS_VISIBILITY_TIMEOUT",
2280            self.queue.sqs.visibility_timeout,
2281        );
2282        self.queue.sqs.max_messages =
2283            parse_env::<i32>("QUEUE_SQS_MAX_MESSAGES", self.queue.sqs.max_messages);
2284        self.queue.sqs.wait_time_seconds = parse_env::<i32>(
2285            "QUEUE_SQS_WAIT_TIME_SECONDS",
2286            self.queue.sqs.wait_time_seconds,
2287        );
2288        self.queue.sqs.concurrency =
2289            parse_env::<u32>("QUEUE_SQS_CONCURRENCY", self.queue.sqs.concurrency);
2290        self.queue.sqs.fifo = parse_bool_env("QUEUE_SQS_FIFO", self.queue.sqs.fifo);
2291        if let Ok(endpoint) = std::env::var("QUEUE_SQS_ENDPOINT_URL") {
2292            self.queue.sqs.endpoint_url = Some(endpoint);
2293        }
2294
2295        // --- Queue: SNS ---
2296        if let Ok(region) = std::env::var("QUEUE_SNS_REGION") {
2297            self.queue.sns.region = region;
2298        }
2299        if let Ok(topic_arn) = std::env::var("QUEUE_SNS_TOPIC_ARN") {
2300            self.queue.sns.topic_arn = topic_arn;
2301        }
2302        if let Ok(endpoint) = std::env::var("QUEUE_SNS_ENDPOINT_URL") {
2303            self.queue.sns.endpoint_url = Some(endpoint);
2304        }
2305
2306        // --- Webhooks ---
2307        self.webhooks.batching.enabled =
2308            parse_bool_env("WEBHOOK_BATCHING_ENABLED", self.webhooks.batching.enabled);
2309        self.webhooks.batching.duration =
2310            parse_env::<u64>("WEBHOOK_BATCHING_DURATION", self.webhooks.batching.duration);
2311        self.webhooks.batching.size =
2312            parse_env::<usize>("WEBHOOK_BATCHING_SIZE", self.webhooks.batching.size);
2313
2314        // --- NATS Adapter ---
2315        if let Ok(servers) = std::env::var("NATS_SERVERS") {
2316            self.adapter.nats.servers = servers.split(',').map(|s| s.trim().to_string()).collect();
2317        }
2318        if let Ok(user) = std::env::var("NATS_USERNAME") {
2319            self.adapter.nats.username = Some(user);
2320        }
2321        if let Ok(pass) = std::env::var("NATS_PASSWORD") {
2322            self.adapter.nats.password = Some(pass);
2323        }
2324        if let Ok(token) = std::env::var("NATS_TOKEN") {
2325            self.adapter.nats.token = Some(token);
2326        }
2327        self.adapter.nats.connection_timeout_ms = parse_env::<u64>(
2328            "NATS_CONNECTION_TIMEOUT_MS",
2329            self.adapter.nats.connection_timeout_ms,
2330        );
2331        self.adapter.nats.request_timeout_ms = parse_env::<u64>(
2332            "NATS_REQUEST_TIMEOUT_MS",
2333            self.adapter.nats.request_timeout_ms,
2334        );
2335
2336        // --- RabbitMQ Adapter ---
2337        if let Ok(url) = std::env::var("RABBITMQ_URL") {
2338            self.adapter.rabbitmq.url = url;
2339        }
2340        if let Ok(prefix) = std::env::var("RABBITMQ_PREFIX") {
2341            self.adapter.rabbitmq.prefix = prefix;
2342        }
2343        self.adapter.rabbitmq.connection_timeout_ms = parse_env::<u64>(
2344            "RABBITMQ_CONNECTION_TIMEOUT_MS",
2345            self.adapter.rabbitmq.connection_timeout_ms,
2346        );
2347        self.adapter.rabbitmq.request_timeout_ms = parse_env::<u64>(
2348            "RABBITMQ_REQUEST_TIMEOUT_MS",
2349            self.adapter.rabbitmq.request_timeout_ms,
2350        );
2351        if let Some(nodes) = parse_env_optional::<u32>("RABBITMQ_NODES_NUMBER") {
2352            self.adapter.rabbitmq.nodes_number = Some(nodes);
2353        }
2354
2355        // --- Google Pub/Sub Adapter ---
2356        if let Ok(project_id) = std::env::var("GOOGLE_PUBSUB_PROJECT_ID") {
2357            self.adapter.google_pubsub.project_id = project_id;
2358        }
2359        if let Ok(prefix) = std::env::var("GOOGLE_PUBSUB_PREFIX") {
2360            self.adapter.google_pubsub.prefix = prefix;
2361        }
2362        if let Ok(emulator_host) = std::env::var("PUBSUB_EMULATOR_HOST") {
2363            self.adapter.google_pubsub.emulator_host = Some(emulator_host);
2364        }
2365        self.adapter.google_pubsub.request_timeout_ms = parse_env::<u64>(
2366            "GOOGLE_PUBSUB_REQUEST_TIMEOUT_MS",
2367            self.adapter.google_pubsub.request_timeout_ms,
2368        );
2369        if let Some(nodes) = parse_env_optional::<u32>("GOOGLE_PUBSUB_NODES_NUMBER") {
2370            self.adapter.google_pubsub.nodes_number = Some(nodes);
2371        }
2372
2373        // --- Kafka Adapter ---
2374        if let Ok(brokers) = std::env::var("KAFKA_BROKERS") {
2375            self.adapter.kafka.brokers = brokers.split(',').map(|s| s.trim().to_string()).collect();
2376        }
2377        if let Ok(prefix) = std::env::var("KAFKA_PREFIX") {
2378            self.adapter.kafka.prefix = prefix;
2379        }
2380        if let Ok(protocol) = std::env::var("KAFKA_SECURITY_PROTOCOL") {
2381            self.adapter.kafka.security_protocol = Some(protocol);
2382        }
2383        if let Ok(mechanism) = std::env::var("KAFKA_SASL_MECHANISM") {
2384            self.adapter.kafka.sasl_mechanism = Some(mechanism);
2385        }
2386        if let Ok(username) = std::env::var("KAFKA_SASL_USERNAME") {
2387            self.adapter.kafka.sasl_username = Some(username);
2388        }
2389        if let Ok(password) = std::env::var("KAFKA_SASL_PASSWORD") {
2390            self.adapter.kafka.sasl_password = Some(password);
2391        }
2392        self.adapter.kafka.request_timeout_ms = parse_env::<u64>(
2393            "KAFKA_REQUEST_TIMEOUT_MS",
2394            self.adapter.kafka.request_timeout_ms,
2395        );
2396        if let Some(nodes) = parse_env_optional::<u32>("KAFKA_NODES_NUMBER") {
2397            self.adapter.kafka.nodes_number = Some(nodes);
2398        }
2399
2400        // --- CORS ---
2401        if let Ok(origins) = std::env::var("CORS_ORIGINS") {
2402            let parsed: Vec<String> = origins.split(',').map(|s| s.trim().to_string()).collect();
2403            if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&parsed) {
2404                warn!(
2405                    "CORS_ORIGINS contains invalid patterns: {}. Keeping previous CORS origins.",
2406                    e
2407                );
2408            } else {
2409                self.cors.origin = parsed;
2410            }
2411        }
2412        if let Ok(methods) = std::env::var("CORS_METHODS") {
2413            self.cors.methods = methods.split(',').map(|s| s.trim().to_string()).collect();
2414        }
2415        if let Ok(headers) = std::env::var("CORS_HEADERS") {
2416            self.cors.allowed_headers = headers.split(',').map(|s| s.trim().to_string()).collect();
2417        }
2418        self.cors.credentials = parse_bool_env("CORS_CREDENTIALS", self.cors.credentials);
2419
2420        // --- Performance Tuning ---
2421        self.database_pooling.enabled =
2422            parse_bool_env("DATABASE_POOLING_ENABLED", self.database_pooling.enabled);
2423        if let Some(min) = parse_env_optional::<u32>("DATABASE_POOL_MIN") {
2424            self.database_pooling.min = min;
2425        }
2426        if let Some(max) = parse_env_optional::<u32>("DATABASE_POOL_MAX") {
2427            self.database_pooling.max = max;
2428        }
2429
2430        if let Some(pool_size) = parse_env_optional::<u32>("DATABASE_CONNECTION_POOL_SIZE") {
2431            self.database.mysql.connection_pool_size = pool_size;
2432            self.database.postgres.connection_pool_size = pool_size;
2433        }
2434        if let Some(cache_ttl) = parse_env_optional::<u64>("CACHE_TTL_SECONDS") {
2435            self.app_manager.cache.ttl = cache_ttl;
2436            self.channel_limits.cache_ttl = cache_ttl;
2437            self.database.mysql.cache_ttl = cache_ttl;
2438            self.database.postgres.cache_ttl = cache_ttl;
2439            self.database.surrealdb.cache_ttl = cache_ttl;
2440            self.cache.memory.ttl = cache_ttl;
2441        }
2442        if let Some(cleanup_interval) = parse_env_optional::<u64>("CACHE_CLEANUP_INTERVAL") {
2443            self.database.mysql.cache_cleanup_interval = cleanup_interval;
2444            self.database.postgres.cache_cleanup_interval = cleanup_interval;
2445            self.cache.memory.cleanup_interval = cleanup_interval;
2446        }
2447        if let Some(max_capacity) = parse_env_optional::<u64>("CACHE_MAX_CAPACITY") {
2448            self.database.mysql.cache_max_capacity = max_capacity;
2449            self.database.postgres.cache_max_capacity = max_capacity;
2450            self.database.surrealdb.cache_max_capacity = max_capacity;
2451            self.cache.memory.max_capacity = max_capacity;
2452        }
2453
2454        let default_app_id = std::env::var("SOCKUDO_DEFAULT_APP_ID");
2455        let default_app_key = std::env::var("SOCKUDO_DEFAULT_APP_KEY");
2456        let default_app_secret = std::env::var("SOCKUDO_DEFAULT_APP_SECRET");
2457        let default_app_enabled = parse_bool_env("SOCKUDO_DEFAULT_APP_ENABLED", true);
2458
2459        if let (Ok(app_id), Ok(app_key), Ok(app_secret)) =
2460            (default_app_id, default_app_key, default_app_secret)
2461            && default_app_enabled
2462        {
2463            let default_app = App::from_policy(
2464                app_id,
2465                app_key,
2466                app_secret,
2467                default_app_enabled,
2468                crate::app::AppPolicy {
2469                    limits: crate::app::AppLimitsPolicy {
2470                        max_connections: parse_env::<u32>(
2471                            "SOCKUDO_DEFAULT_APP_MAX_CONNECTIONS",
2472                            100,
2473                        ),
2474                        max_backend_events_per_second: Some(parse_env::<u32>(
2475                            "SOCKUDO_DEFAULT_APP_MAX_BACKEND_EVENTS_PER_SECOND",
2476                            100,
2477                        )),
2478                        max_client_events_per_second: parse_env::<u32>(
2479                            "SOCKUDO_DEFAULT_APP_MAX_CLIENT_EVENTS_PER_SECOND",
2480                            100,
2481                        ),
2482                        max_read_requests_per_second: Some(parse_env::<u32>(
2483                            "SOCKUDO_DEFAULT_APP_MAX_READ_REQUESTS_PER_SECOND",
2484                            100,
2485                        )),
2486                        max_presence_members_per_channel: Some(parse_env::<u32>(
2487                            "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBERS_PER_CHANNEL",
2488                            100,
2489                        )),
2490                        max_presence_member_size_in_kb: Some(parse_env::<u32>(
2491                            "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBER_SIZE_IN_KB",
2492                            100,
2493                        )),
2494                        max_channel_name_length: Some(parse_env::<u32>(
2495                            "SOCKUDO_DEFAULT_APP_MAX_CHANNEL_NAME_LENGTH",
2496                            100,
2497                        )),
2498                        max_event_channels_at_once: Some(parse_env::<u32>(
2499                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_CHANNELS_AT_ONCE",
2500                            100,
2501                        )),
2502                        max_event_name_length: Some(parse_env::<u32>(
2503                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_NAME_LENGTH",
2504                            100,
2505                        )),
2506                        max_event_payload_in_kb: Some(parse_env::<u32>(
2507                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_PAYLOAD_IN_KB",
2508                            100,
2509                        )),
2510                        max_event_batch_size: Some(parse_env::<u32>(
2511                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_BATCH_SIZE",
2512                            100,
2513                        )),
2514                    },
2515                    features: crate::app::AppFeaturesPolicy {
2516                        enable_client_messages: std::env::var(
2517                            "SOCKUDO_DEFAULT_APP_ENABLE_CLIENT_MESSAGES",
2518                        )
2519                        .ok()
2520                        .map(|value| {
2521                            matches!(
2522                                value.trim().to_ascii_lowercase().as_str(),
2523                                "true" | "1" | "yes" | "on"
2524                            )
2525                        })
2526                        .unwrap_or_else(|| parse_bool_env("SOCKUDO_ENABLE_CLIENT_MESSAGES", false)),
2527                        enable_user_authentication: Some(parse_bool_env(
2528                            "SOCKUDO_DEFAULT_APP_ENABLE_USER_AUTHENTICATION",
2529                            false,
2530                        )),
2531                        enable_watchlist_events: Some(parse_bool_env(
2532                            "SOCKUDO_DEFAULT_APP_ENABLE_WATCHLIST_EVENTS",
2533                            false,
2534                        )),
2535                    },
2536                    channels: crate::app::AppChannelsPolicy {
2537                        allowed_origins: {
2538                            if let Ok(origins_str) =
2539                                std::env::var("SOCKUDO_DEFAULT_APP_ALLOWED_ORIGINS")
2540                            {
2541                                if !origins_str.is_empty() {
2542                                    Some(
2543                                        origins_str
2544                                            .split(',')
2545                                            .map(|s| s.trim().to_string())
2546                                            .collect(),
2547                                    )
2548                                } else {
2549                                    None
2550                                }
2551                            } else {
2552                                None
2553                            }
2554                        },
2555                        channel_delta_compression: None,
2556                        channel_namespaces: None,
2557                    },
2558                    webhooks: None,
2559                    idempotency: None,
2560                    connection_recovery: None,
2561                },
2562            );
2563
2564            self.app_manager.array.apps.push(default_app);
2565            info!("Successfully registered default app from env");
2566        }
2567
2568        // Special handling for REDIS_URL
2569        if let Ok(redis_url_env) = std::env::var("REDIS_URL") {
2570            info!("Applying REDIS_URL environment variable override");
2571
2572            let redis_url_json = sonic_rs::json!(redis_url_env);
2573
2574            self.adapter
2575                .redis
2576                .redis_pub_options
2577                .insert("url".to_string(), redis_url_json.clone());
2578            self.adapter
2579                .redis
2580                .redis_sub_options
2581                .insert("url".to_string(), redis_url_json);
2582
2583            self.cache.redis.url_override = Some(redis_url_env.clone());
2584            self.queue.redis.url_override = Some(redis_url_env.clone());
2585            self.rate_limiter.redis.url_override = Some(redis_url_env);
2586        }
2587
2588        // --- Logging Configuration ---
2589        let has_colors_env = std::env::var("LOG_COLORS_ENABLED").is_ok();
2590        let has_target_env = std::env::var("LOG_INCLUDE_TARGET").is_ok();
2591        if has_colors_env || has_target_env {
2592            let logging_config = self.logging.get_or_insert_with(Default::default);
2593            if has_colors_env {
2594                logging_config.colors_enabled =
2595                    parse_bool_env("LOG_COLORS_ENABLED", logging_config.colors_enabled);
2596            }
2597            if has_target_env {
2598                logging_config.include_target =
2599                    parse_bool_env("LOG_INCLUDE_TARGET", logging_config.include_target);
2600            }
2601        }
2602
2603        // --- Cleanup Configuration ---
2604        self.cleanup.async_enabled =
2605            parse_bool_env("CLEANUP_ASYNC_ENABLED", self.cleanup.async_enabled);
2606        self.cleanup.fallback_to_sync =
2607            parse_bool_env("CLEANUP_FALLBACK_TO_SYNC", self.cleanup.fallback_to_sync);
2608        self.cleanup.queue_buffer_size =
2609            parse_env::<usize>("CLEANUP_QUEUE_BUFFER_SIZE", self.cleanup.queue_buffer_size);
2610        self.cleanup.batch_size = parse_env::<usize>("CLEANUP_BATCH_SIZE", self.cleanup.batch_size);
2611        self.cleanup.batch_timeout_ms =
2612            parse_env::<u64>("CLEANUP_BATCH_TIMEOUT_MS", self.cleanup.batch_timeout_ms);
2613        if let Ok(worker_threads_str) = std::env::var("CLEANUP_WORKER_THREADS") {
2614            self.cleanup.worker_threads = if worker_threads_str.to_lowercase() == "auto" {
2615                WorkerThreadsConfig::Auto
2616            } else if let Ok(n) = worker_threads_str.parse::<usize>() {
2617                WorkerThreadsConfig::Fixed(n)
2618            } else {
2619                warn!(
2620                    "Invalid CLEANUP_WORKER_THREADS value '{}', keeping current setting",
2621                    worker_threads_str
2622                );
2623                self.cleanup.worker_threads.clone()
2624            };
2625        }
2626        self.cleanup.max_retry_attempts = parse_env::<u32>(
2627            "CLEANUP_MAX_RETRY_ATTEMPTS",
2628            self.cleanup.max_retry_attempts,
2629        );
2630
2631        // Cluster health configuration
2632        self.cluster_health.enabled =
2633            parse_bool_env("CLUSTER_HEALTH_ENABLED", self.cluster_health.enabled);
2634        self.cluster_health.heartbeat_interval_ms = parse_env::<u64>(
2635            "CLUSTER_HEALTH_HEARTBEAT_INTERVAL",
2636            self.cluster_health.heartbeat_interval_ms,
2637        );
2638        self.cluster_health.node_timeout_ms = parse_env::<u64>(
2639            "CLUSTER_HEALTH_NODE_TIMEOUT",
2640            self.cluster_health.node_timeout_ms,
2641        );
2642        self.cluster_health.cleanup_interval_ms = parse_env::<u64>(
2643            "CLUSTER_HEALTH_CLEANUP_INTERVAL",
2644            self.cluster_health.cleanup_interval_ms,
2645        );
2646
2647        // Tag filtering configuration
2648        self.tag_filtering.enabled =
2649            parse_bool_env("TAG_FILTERING_ENABLED", self.tag_filtering.enabled);
2650
2651        // WebSocket buffer configuration
2652        if let Ok(val) = std::env::var("WEBSOCKET_MAX_MESSAGES") {
2653            if val.to_lowercase() == "none" || val == "0" {
2654                self.websocket.max_messages = None;
2655            } else if let Ok(n) = val.parse::<usize>() {
2656                self.websocket.max_messages = Some(n);
2657            }
2658        }
2659        if let Ok(val) = std::env::var("WEBSOCKET_MAX_BYTES") {
2660            if val.to_lowercase() == "none" || val == "0" {
2661                self.websocket.max_bytes = None;
2662            } else if let Ok(n) = val.parse::<usize>() {
2663                self.websocket.max_bytes = Some(n);
2664            }
2665        }
2666        self.websocket.disconnect_on_buffer_full = parse_bool_env(
2667            "WEBSOCKET_DISCONNECT_ON_BUFFER_FULL",
2668            self.websocket.disconnect_on_buffer_full,
2669        );
2670        self.websocket.max_message_size = parse_env::<usize>(
2671            "WEBSOCKET_MAX_MESSAGE_SIZE",
2672            self.websocket.max_message_size,
2673        );
2674        self.websocket.max_frame_size =
2675            parse_env::<usize>("WEBSOCKET_MAX_FRAME_SIZE", self.websocket.max_frame_size);
2676        self.websocket.write_buffer_size = parse_env::<usize>(
2677            "WEBSOCKET_WRITE_BUFFER_SIZE",
2678            self.websocket.write_buffer_size,
2679        );
2680        self.websocket.max_backpressure = parse_env::<usize>(
2681            "WEBSOCKET_MAX_BACKPRESSURE",
2682            self.websocket.max_backpressure,
2683        );
2684        self.websocket.auto_ping = parse_bool_env("WEBSOCKET_AUTO_PING", self.websocket.auto_ping);
2685        self.websocket.ping_interval =
2686            parse_env::<u32>("WEBSOCKET_PING_INTERVAL", self.websocket.ping_interval);
2687        self.websocket.idle_timeout =
2688            parse_env::<u32>("WEBSOCKET_IDLE_TIMEOUT", self.websocket.idle_timeout);
2689        if let Ok(mode) = std::env::var("WEBSOCKET_COMPRESSION") {
2690            self.websocket.compression = mode;
2691        }
2692
2693        // Connection recovery (includes serial + message_id + replay buffer)
2694        self.connection_recovery.enabled = parse_bool_env(
2695            "CONNECTION_RECOVERY_ENABLED",
2696            self.connection_recovery.enabled,
2697        );
2698        self.connection_recovery.buffer_ttl_seconds = parse_env::<u64>(
2699            "CONNECTION_RECOVERY_BUFFER_TTL",
2700            self.connection_recovery.buffer_ttl_seconds,
2701        );
2702        self.connection_recovery.max_buffer_size = parse_env::<usize>(
2703            "CONNECTION_RECOVERY_MAX_BUFFER_SIZE",
2704            self.connection_recovery.max_buffer_size,
2705        );
2706
2707        self.idempotency.enabled = parse_bool_env("IDEMPOTENCY_ENABLED", self.idempotency.enabled);
2708        self.idempotency.ttl_seconds =
2709            parse_env::<u64>("IDEMPOTENCY_TTL_SECONDS", self.idempotency.ttl_seconds);
2710        self.idempotency.max_key_length = parse_env::<usize>(
2711            "IDEMPOTENCY_MAX_KEY_LENGTH",
2712            self.idempotency.max_key_length,
2713        );
2714
2715        self.ephemeral.enabled = parse_bool_env("EPHEMERAL_ENABLED", self.ephemeral.enabled);
2716
2717        self.echo_control.enabled =
2718            parse_bool_env("ECHO_CONTROL_ENABLED", self.echo_control.enabled);
2719        self.echo_control.default_echo_messages = parse_bool_env(
2720            "ECHO_CONTROL_DEFAULT_ECHO_MESSAGES",
2721            self.echo_control.default_echo_messages,
2722        );
2723
2724        self.event_name_filtering.enabled = parse_bool_env(
2725            "EVENT_NAME_FILTERING_ENABLED",
2726            self.event_name_filtering.enabled,
2727        );
2728        self.event_name_filtering.max_events_per_filter = parse_env::<usize>(
2729            "EVENT_NAME_FILTERING_MAX_EVENTS_PER_FILTER",
2730            self.event_name_filtering.max_events_per_filter,
2731        );
2732        self.event_name_filtering.max_event_name_length = parse_env::<usize>(
2733            "EVENT_NAME_FILTERING_MAX_EVENT_NAME_LENGTH",
2734            self.event_name_filtering.max_event_name_length,
2735        );
2736
2737        Ok(())
2738    }
2739
2740    pub fn validate(&self) -> Result<(), String> {
2741        if self.unix_socket.enabled {
2742            if self.unix_socket.path.is_empty() {
2743                return Err(
2744                    "Unix socket path cannot be empty when Unix socket is enabled".to_string(),
2745                );
2746            }
2747
2748            self.validate_unix_socket_security()?;
2749
2750            if self.ssl.enabled {
2751                tracing::warn!(
2752                    "Both Unix socket and SSL are enabled. This is unusual as Unix sockets are typically used behind reverse proxies that handle SSL termination."
2753                );
2754            }
2755
2756            if self.unix_socket.permission_mode > 0o777 {
2757                return Err(format!(
2758                    "Unix socket permission_mode ({:o}) is invalid. Must be a valid octal mode (0o000 to 0o777)",
2759                    self.unix_socket.permission_mode
2760                ));
2761            }
2762        }
2763
2764        if let Err(e) = self.cleanup.validate() {
2765            return Err(format!("Invalid cleanup configuration: {}", e));
2766        }
2767
2768        Ok(())
2769    }
2770
2771    fn validate_unix_socket_security(&self) -> Result<(), String> {
2772        let path = &self.unix_socket.path;
2773
2774        if path.contains("../") || path.contains("..\\") {
2775            return Err(
2776                "Unix socket path contains directory traversal sequences (../). This is not allowed for security reasons.".to_string()
2777            );
2778        }
2779
2780        if self.unix_socket.permission_mode & 0o002 != 0 {
2781            warn!(
2782                "Unix socket permission mode ({:o}) allows world write access. This may be a security risk. Consider using more restrictive permissions like 0o660 or 0o750.",
2783                self.unix_socket.permission_mode
2784            );
2785        }
2786
2787        if self.unix_socket.permission_mode & 0o007 > 0o005 {
2788            warn!(
2789                "Unix socket permission mode ({:o}) grants write permissions to others. Consider using more restrictive permissions.",
2790                self.unix_socket.permission_mode
2791            );
2792        }
2793
2794        if self.mode == "production" && path.starts_with("/tmp/") {
2795            warn!(
2796                "Unix socket path '{}' is in /tmp directory. In production, consider using a more permanent location like /var/run/sockudo/ for better security and persistence.",
2797                path
2798            );
2799        }
2800
2801        if !path.starts_with('/') {
2802            return Err(
2803                "Unix socket path must be absolute (start with /) for security and reliability."
2804                    .to_string(),
2805            );
2806        }
2807
2808        Ok(())
2809    }
2810}
2811
2812#[cfg(test)]
2813mod redis_connection_tests {
2814    use super::{ClusterNode, RedisClusterConnection, RedisConnection, RedisSentinel};
2815
2816    #[test]
2817    fn test_standard_url_basic() {
2818        let conn = RedisConnection {
2819            host: "127.0.0.1".to_string(),
2820            port: 6379,
2821            db: 0,
2822            username: None,
2823            password: None,
2824            key_prefix: "sockudo:".to_string(),
2825            sentinels: Vec::new(),
2826            sentinel_password: None,
2827            name: "mymaster".to_string(),
2828            cluster: RedisClusterConnection::default(),
2829            cluster_nodes: Vec::new(),
2830        };
2831        assert_eq!(conn.to_url(), "redis://127.0.0.1:6379/0");
2832    }
2833
2834    #[test]
2835    fn test_standard_url_with_password() {
2836        let conn = RedisConnection {
2837            host: "127.0.0.1".to_string(),
2838            port: 6379,
2839            db: 2,
2840            username: None,
2841            password: Some("secret".to_string()),
2842            key_prefix: "sockudo:".to_string(),
2843            sentinels: Vec::new(),
2844            sentinel_password: None,
2845            name: "mymaster".to_string(),
2846            cluster: RedisClusterConnection::default(),
2847            cluster_nodes: Vec::new(),
2848        };
2849        assert_eq!(conn.to_url(), "redis://:secret@127.0.0.1:6379/2");
2850    }
2851
2852    #[test]
2853    fn test_standard_url_with_username_and_password() {
2854        let conn = RedisConnection {
2855            host: "redis.example.com".to_string(),
2856            port: 6380,
2857            db: 1,
2858            username: Some("admin".to_string()),
2859            password: Some("pass123".to_string()),
2860            key_prefix: "sockudo:".to_string(),
2861            sentinels: Vec::new(),
2862            sentinel_password: None,
2863            name: "mymaster".to_string(),
2864            cluster: RedisClusterConnection::default(),
2865            cluster_nodes: Vec::new(),
2866        };
2867        assert_eq!(
2868            conn.to_url(),
2869            "redis://admin:pass123@redis.example.com:6380/1"
2870        );
2871    }
2872
2873    #[test]
2874    fn test_standard_url_with_special_chars_in_password() {
2875        let conn = RedisConnection {
2876            host: "127.0.0.1".to_string(),
2877            port: 6379,
2878            db: 0,
2879            username: None,
2880            password: Some("pass@word#123".to_string()),
2881            key_prefix: "sockudo:".to_string(),
2882            sentinels: Vec::new(),
2883            sentinel_password: None,
2884            name: "mymaster".to_string(),
2885            cluster: RedisClusterConnection::default(),
2886            cluster_nodes: Vec::new(),
2887        };
2888        assert_eq!(conn.to_url(), "redis://:pass%40word%23123@127.0.0.1:6379/0");
2889    }
2890
2891    #[test]
2892    fn test_is_sentinel_configured_false() {
2893        let conn = RedisConnection::default();
2894        assert!(!conn.is_sentinel_configured());
2895    }
2896
2897    #[test]
2898    fn test_is_sentinel_configured_true() {
2899        let conn = RedisConnection {
2900            sentinels: vec![RedisSentinel {
2901                host: "sentinel1".to_string(),
2902                port: 26379,
2903            }],
2904            ..Default::default()
2905        };
2906        assert!(conn.is_sentinel_configured());
2907    }
2908
2909    #[test]
2910    fn test_sentinel_url_basic() {
2911        let conn = RedisConnection {
2912            host: "127.0.0.1".to_string(),
2913            port: 6379,
2914            db: 0,
2915            username: None,
2916            password: None,
2917            key_prefix: "sockudo:".to_string(),
2918            sentinels: vec![
2919                RedisSentinel {
2920                    host: "sentinel1".to_string(),
2921                    port: 26379,
2922                },
2923                RedisSentinel {
2924                    host: "sentinel2".to_string(),
2925                    port: 26379,
2926                },
2927            ],
2928            sentinel_password: None,
2929            name: "mymaster".to_string(),
2930            cluster: RedisClusterConnection::default(),
2931            cluster_nodes: Vec::new(),
2932        };
2933        assert_eq!(
2934            conn.to_url(),
2935            "redis+sentinel://sentinel1:26379,sentinel2:26379/mymaster/0"
2936        );
2937    }
2938
2939    #[test]
2940    fn test_sentinel_url_with_sentinel_password() {
2941        let conn = RedisConnection {
2942            host: "127.0.0.1".to_string(),
2943            port: 6379,
2944            db: 0,
2945            username: None,
2946            password: None,
2947            key_prefix: "sockudo:".to_string(),
2948            sentinels: vec![RedisSentinel {
2949                host: "sentinel1".to_string(),
2950                port: 26379,
2951            }],
2952            sentinel_password: Some("sentinelpass".to_string()),
2953            name: "mymaster".to_string(),
2954            cluster: RedisClusterConnection::default(),
2955            cluster_nodes: Vec::new(),
2956        };
2957        assert_eq!(
2958            conn.to_url(),
2959            "redis+sentinel://:sentinelpass@sentinel1:26379/mymaster/0"
2960        );
2961    }
2962
2963    #[test]
2964    fn test_sentinel_url_with_master_password() {
2965        let conn = RedisConnection {
2966            host: "127.0.0.1".to_string(),
2967            port: 6379,
2968            db: 1,
2969            username: None,
2970            password: Some("masterpass".to_string()),
2971            key_prefix: "sockudo:".to_string(),
2972            sentinels: vec![RedisSentinel {
2973                host: "sentinel1".to_string(),
2974                port: 26379,
2975            }],
2976            sentinel_password: None,
2977            name: "mymaster".to_string(),
2978            cluster: RedisClusterConnection::default(),
2979            cluster_nodes: Vec::new(),
2980        };
2981        assert_eq!(
2982            conn.to_url(),
2983            "redis+sentinel://sentinel1:26379/mymaster/1?password=masterpass"
2984        );
2985    }
2986
2987    #[test]
2988    fn test_sentinel_url_with_all_auth() {
2989        let conn = RedisConnection {
2990            host: "127.0.0.1".to_string(),
2991            port: 6379,
2992            db: 2,
2993            username: Some("redisuser".to_string()),
2994            password: Some("redispass".to_string()),
2995            key_prefix: "sockudo:".to_string(),
2996            sentinels: vec![
2997                RedisSentinel {
2998                    host: "sentinel1".to_string(),
2999                    port: 26379,
3000                },
3001                RedisSentinel {
3002                    host: "sentinel2".to_string(),
3003                    port: 26380,
3004                },
3005            ],
3006            sentinel_password: Some("sentinelauth".to_string()),
3007            name: "production-master".to_string(),
3008            cluster: RedisClusterConnection::default(),
3009            cluster_nodes: Vec::new(),
3010        };
3011        assert_eq!(
3012            conn.to_url(),
3013            "redis+sentinel://:sentinelauth@sentinel1:26379,sentinel2:26380/production-master/2?password=redispass&username=redisuser"
3014        );
3015    }
3016
3017    #[test]
3018    fn test_sentinel_to_host_port() {
3019        let sentinel = RedisSentinel {
3020            host: "sentinel.example.com".to_string(),
3021            port: 26379,
3022        };
3023        assert_eq!(sentinel.to_host_port(), "sentinel.example.com:26379");
3024    }
3025
3026    #[test]
3027    fn test_cluster_node_urls_with_shared_cluster_auth_and_tls() {
3028        let conn = RedisConnection {
3029            cluster: RedisClusterConnection {
3030                nodes: vec![
3031                    ClusterNode {
3032                        host: "node1.secure-cluster.com".to_string(),
3033                        port: 7000,
3034                    },
3035                    ClusterNode {
3036                        host: "redis://node2.secure-cluster.com:7001".to_string(),
3037                        port: 7001,
3038                    },
3039                    ClusterNode {
3040                        host: "rediss://node3.secure-cluster.com".to_string(),
3041                        port: 7002,
3042                    },
3043                ],
3044                username: None,
3045                password: Some("cluster-secret".to_string()),
3046                use_tls: true,
3047            },
3048            ..Default::default()
3049        };
3050
3051        assert_eq!(
3052            conn.cluster_node_urls(),
3053            vec![
3054                "rediss://:cluster-secret@node1.secure-cluster.com:7000",
3055                "redis://:cluster-secret@node2.secure-cluster.com:7001",
3056                "rediss://:cluster-secret@node3.secure-cluster.com:7002",
3057            ]
3058        );
3059    }
3060
3061    #[test]
3062    fn test_cluster_node_urls_fallback_to_legacy_nodes() {
3063        let conn = RedisConnection {
3064            password: Some("fallback-secret".to_string()),
3065            cluster_nodes: vec![ClusterNode {
3066                host: "legacy-node.example.com".to_string(),
3067                port: 7000,
3068            }],
3069            ..Default::default()
3070        };
3071
3072        assert_eq!(
3073            conn.cluster_node_urls(),
3074            vec!["redis://:fallback-secret@legacy-node.example.com:7000"]
3075        );
3076    }
3077
3078    #[test]
3079    fn test_normalize_cluster_seed_urls() {
3080        let conn = RedisConnection {
3081            cluster: RedisClusterConnection {
3082                nodes: Vec::new(),
3083                username: Some("svc-user".to_string()),
3084                password: Some("svc-pass".to_string()),
3085                use_tls: true,
3086            },
3087            ..Default::default()
3088        };
3089
3090        let seeds = vec![
3091            "node1.example.com:7000".to_string(),
3092            "redis://node2.example.com:7001".to_string(),
3093            "rediss://node3.example.com".to_string(),
3094        ];
3095
3096        assert_eq!(
3097            conn.normalize_cluster_seed_urls(&seeds),
3098            vec![
3099                "rediss://svc-user:svc-pass@node1.example.com:7000",
3100                "redis://svc-user:svc-pass@node2.example.com:7001",
3101                "rediss://svc-user:svc-pass@node3.example.com:6379",
3102            ]
3103        );
3104    }
3105}
3106
3107#[cfg(test)]
3108mod cluster_node_tests {
3109    use super::ClusterNode;
3110
3111    #[test]
3112    fn test_to_url_basic_host() {
3113        let node = ClusterNode {
3114            host: "localhost".to_string(),
3115            port: 6379,
3116        };
3117        assert_eq!(node.to_url(), "redis://localhost:6379");
3118    }
3119
3120    #[test]
3121    fn test_to_url_ip_address() {
3122        let node = ClusterNode {
3123            host: "127.0.0.1".to_string(),
3124            port: 6379,
3125        };
3126        assert_eq!(node.to_url(), "redis://127.0.0.1:6379");
3127    }
3128
3129    #[test]
3130    fn test_to_url_with_redis_protocol() {
3131        let node = ClusterNode {
3132            host: "redis://example.com".to_string(),
3133            port: 6379,
3134        };
3135        assert_eq!(node.to_url(), "redis://example.com:6379");
3136    }
3137
3138    #[test]
3139    fn test_to_url_with_rediss_protocol() {
3140        let node = ClusterNode {
3141            host: "rediss://secure.example.com".to_string(),
3142            port: 6379,
3143        };
3144        assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
3145    }
3146
3147    #[test]
3148    fn test_to_url_with_rediss_protocol_and_port_in_url() {
3149        let node = ClusterNode {
3150            host: "rediss://secure.example.com:7000".to_string(),
3151            port: 6379,
3152        };
3153        assert_eq!(node.to_url(), "rediss://secure.example.com:7000");
3154    }
3155
3156    #[test]
3157    fn test_to_url_with_redis_protocol_and_port_in_url() {
3158        let node = ClusterNode {
3159            host: "redis://example.com:7001".to_string(),
3160            port: 6379,
3161        };
3162        assert_eq!(node.to_url(), "redis://example.com:7001");
3163    }
3164
3165    #[test]
3166    fn test_to_url_with_trailing_whitespace() {
3167        let node = ClusterNode {
3168            host: "  rediss://secure.example.com  ".to_string(),
3169            port: 6379,
3170        };
3171        assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
3172    }
3173
3174    #[test]
3175    fn test_to_url_custom_port() {
3176        let node = ClusterNode {
3177            host: "redis-cluster.example.com".to_string(),
3178            port: 7000,
3179        };
3180        assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7000");
3181    }
3182
3183    #[test]
3184    fn test_to_url_plain_host_with_port_in_host_field() {
3185        let node = ClusterNode {
3186            host: "redis-cluster.example.com:7010".to_string(),
3187            port: 7000,
3188        };
3189        assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7010");
3190    }
3191
3192    #[test]
3193    fn test_to_url_with_options_adds_auth_and_tls() {
3194        let node = ClusterNode {
3195            host: "node.example.com".to_string(),
3196            port: 7000,
3197        };
3198        assert_eq!(
3199            node.to_url_with_options(true, Some("svc-user"), Some("secret")),
3200            "rediss://svc-user:secret@node.example.com:7000"
3201        );
3202    }
3203
3204    #[test]
3205    fn test_to_url_with_options_keeps_embedded_auth() {
3206        let node = ClusterNode {
3207            host: "rediss://:node-secret@node.example.com:7000".to_string(),
3208            port: 7000,
3209        };
3210        assert_eq!(
3211            node.to_url_with_options(true, Some("global-user"), Some("global-secret")),
3212            "rediss://:node-secret@node.example.com:7000"
3213        );
3214    }
3215
3216    #[test]
3217    fn test_from_seed_parses_plain_host_port() {
3218        let node = ClusterNode::from_seed("cluster-node-1:7005").expect("node should parse");
3219        assert_eq!(node.host, "cluster-node-1");
3220        assert_eq!(node.port, 7005);
3221    }
3222
3223    #[test]
3224    fn test_from_seed_keeps_scheme_urls() {
3225        let node =
3226            ClusterNode::from_seed("rediss://secure.example.com:7005").expect("node should parse");
3227        assert_eq!(node.host, "rediss://secure.example.com:7005");
3228        assert_eq!(node.port, 7005);
3229    }
3230
3231    #[test]
3232    fn test_to_url_aws_elasticache_hostname() {
3233        let node = ClusterNode {
3234            host: "rediss://my-cluster.use1.cache.amazonaws.com".to_string(),
3235            port: 6379,
3236        };
3237        assert_eq!(
3238            node.to_url(),
3239            "rediss://my-cluster.use1.cache.amazonaws.com:6379"
3240        );
3241    }
3242
3243    #[test]
3244    fn test_to_url_with_ipv6_no_port() {
3245        let node = ClusterNode {
3246            host: "rediss://[::1]".to_string(),
3247            port: 6379,
3248        };
3249        assert_eq!(node.to_url(), "rediss://[::1]:6379");
3250    }
3251
3252    #[test]
3253    fn test_to_url_with_ipv6_and_port_in_url() {
3254        let node = ClusterNode {
3255            host: "rediss://[::1]:7000".to_string(),
3256            port: 6379,
3257        };
3258        assert_eq!(node.to_url(), "rediss://[::1]:7000");
3259    }
3260
3261    #[test]
3262    fn test_to_url_with_ipv6_full_address_no_port() {
3263        let node = ClusterNode {
3264            host: "rediss://[2001:db8::1]".to_string(),
3265            port: 6379,
3266        };
3267        assert_eq!(node.to_url(), "rediss://[2001:db8::1]:6379");
3268    }
3269
3270    #[test]
3271    fn test_to_url_with_ipv6_full_address_with_port() {
3272        let node = ClusterNode {
3273            host: "rediss://[2001:db8::1]:7000".to_string(),
3274            port: 6379,
3275        };
3276        assert_eq!(node.to_url(), "rediss://[2001:db8::1]:7000");
3277    }
3278
3279    #[test]
3280    fn test_to_url_with_redis_protocol_ipv6() {
3281        let node = ClusterNode {
3282            host: "redis://[::1]".to_string(),
3283            port: 6379,
3284        };
3285        assert_eq!(node.to_url(), "redis://[::1]:6379");
3286    }
3287}
3288
3289#[cfg(test)]
3290mod cors_config_tests {
3291    use super::CorsConfig;
3292
3293    fn cors_from_json(json: &str) -> sonic_rs::Result<CorsConfig> {
3294        sonic_rs::from_str(json)
3295    }
3296
3297    #[test]
3298    fn test_deserialize_valid_exact_origins() {
3299        let config =
3300            cors_from_json(r#"{"origin": ["https://example.com", "https://other.com"]}"#).unwrap();
3301        assert_eq!(config.origin.len(), 2);
3302    }
3303
3304    #[test]
3305    fn test_deserialize_valid_wildcard_patterns() {
3306        let config =
3307            cors_from_json(r#"{"origin": ["*.example.com", "https://*.staging.example.com"]}"#)
3308                .unwrap();
3309        assert_eq!(config.origin.len(), 2);
3310    }
3311
3312    #[test]
3313    fn test_deserialize_allows_special_markers() {
3314        assert!(cors_from_json(r#"{"origin": ["*"]}"#).is_ok());
3315        assert!(cors_from_json(r#"{"origin": ["Any"]}"#).is_ok());
3316        assert!(cors_from_json(r#"{"origin": ["any"]}"#).is_ok());
3317        assert!(cors_from_json(r#"{"origin": ["*", "https://example.com"]}"#).is_ok());
3318    }
3319
3320    #[test]
3321    fn test_deserialize_rejects_invalid_patterns() {
3322        assert!(cors_from_json(r#"{"origin": ["*.*bad"]}"#).is_err());
3323        assert!(cors_from_json(r#"{"origin": ["*."]}"#).is_err());
3324        assert!(cors_from_json(r#"{"origin": [""]}"#).is_err());
3325        assert!(cors_from_json(r#"{"origin": ["https://"]}"#).is_err());
3326    }
3327
3328    #[test]
3329    fn test_deserialize_rejects_mixed_valid_and_invalid() {
3330        assert!(cors_from_json(r#"{"origin": ["https://good.com", "*.*bad"]}"#).is_err());
3331    }
3332}