Skip to main content

sockudo_core/
options.rs

1use crate::app::App;
2use crate::utils::{parse_bool_env, parse_env, parse_env_optional};
3use ahash::AHashMap;
4use serde::{Deserialize, Serialize};
5use std::str::FromStr;
6use tracing::{info, warn};
7use url::Url;
8
9// Custom deserializer for octal permission mode (string format only, like chmod)
10fn deserialize_octal_permission<'de, D>(deserializer: D) -> Result<u32, D::Error>
11where
12    D: serde::Deserializer<'de>,
13{
14    use serde::de::{self};
15
16    let s = String::deserialize(deserializer)?;
17
18    // Validate that the string contains only octal digits
19    if !s.chars().all(|c| c.is_digit(8)) {
20        return Err(de::Error::custom(format!(
21            "invalid octal permission mode '{}': must contain only digits 0-7",
22            s
23        )));
24    }
25
26    // Parse as octal
27    let mode = u32::from_str_radix(&s, 8)
28        .map_err(|_| de::Error::custom(format!("invalid octal permission mode: {}", s)))?;
29
30    // Validate it's within valid Unix permission range
31    if mode > 0o777 {
32        return Err(de::Error::custom(format!(
33            "permission mode '{}' exceeds maximum value 777",
34            s
35        )));
36    }
37
38    Ok(mode)
39}
40
41// Helper function to parse driver enums with fallback behavior (matches main.rs)
42fn parse_driver_enum<T: FromStr + Clone + std::fmt::Debug>(
43    driver_str: String,
44    default_driver: T,
45    driver_name: &str,
46) -> T
47where
48    <T as FromStr>::Err: std::fmt::Debug,
49{
50    match T::from_str(&driver_str.to_lowercase()) {
51        Ok(driver_enum) => driver_enum,
52        Err(e) => {
53            warn!(
54                "Failed to parse {} driver from string '{}': {:?}. Using default: {:?}.",
55                driver_name, driver_str, e, default_driver
56            );
57            default_driver
58        }
59    }
60}
61
62fn override_db_pool_settings(db_conn: &mut DatabaseConnection, prefix: &str) {
63    if let Some(min) = parse_env_optional::<u32>(&format!("{}_POOL_MIN", prefix)) {
64        db_conn.pool_min = Some(min);
65    }
66    if let Some(max) = parse_env_optional::<u32>(&format!("{}_POOL_MAX", prefix)) {
67        db_conn.pool_max = Some(max);
68    }
69}
70
71// --- Enums for Driver Types ---
72
73#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(rename_all = "lowercase")]
75pub enum AdapterDriver {
76    #[default]
77    Local,
78    Redis,
79    #[serde(rename = "redis-cluster")]
80    RedisCluster,
81    Nats,
82    Pulsar,
83    RabbitMq,
84    #[serde(rename = "google-pubsub")]
85    GooglePubSub,
86    Kafka,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
90#[serde(default)]
91pub struct DynamoDbSettings {
92    pub region: String,
93    pub table_name: String,
94    pub endpoint_url: Option<String>,
95    pub aws_access_key_id: Option<String>,
96    pub aws_secret_access_key: Option<String>,
97    pub aws_profile_name: Option<String>,
98}
99
100impl Default for DynamoDbSettings {
101    fn default() -> Self {
102        Self {
103            region: "us-east-1".to_string(),
104            table_name: "sockudo-applications".to_string(),
105            endpoint_url: None,
106            aws_access_key_id: None,
107            aws_secret_access_key: None,
108            aws_profile_name: None,
109        }
110    }
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
114#[serde(default)]
115pub struct ScyllaDbSettings {
116    pub nodes: Vec<String>,
117    pub keyspace: String,
118    pub table_name: String,
119    pub username: Option<String>,
120    pub password: Option<String>,
121    pub replication_class: String,
122    pub replication_factor: u32,
123}
124
125impl Default for ScyllaDbSettings {
126    fn default() -> Self {
127        Self {
128            nodes: vec!["127.0.0.1:9042".to_string()],
129            keyspace: "sockudo".to_string(),
130            table_name: "applications".to_string(),
131            username: None,
132            password: None,
133            replication_class: "SimpleStrategy".to_string(),
134            replication_factor: 3,
135        }
136    }
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
140#[serde(default)]
141pub struct SurrealDbSettings {
142    pub url: String,
143    pub namespace: String,
144    pub database: String,
145    pub username: String,
146    pub password: String,
147    pub table_name: String,
148    pub cache_ttl: u64,
149    pub cache_max_capacity: u64,
150}
151
152impl Default for SurrealDbSettings {
153    fn default() -> Self {
154        Self {
155            url: "ws://127.0.0.1:8000".to_string(),
156            namespace: "sockudo".to_string(),
157            database: "sockudo".to_string(),
158            username: "root".to_string(),
159            password: "root".to_string(),
160            table_name: "applications".to_string(),
161            cache_ttl: 300,
162            cache_max_capacity: 100,
163        }
164    }
165}
166
167impl FromStr for AdapterDriver {
168    type Err = String;
169    fn from_str(s: &str) -> Result<Self, Self::Err> {
170        match s.to_lowercase().as_str() {
171            "local" => Ok(AdapterDriver::Local),
172            "redis" => Ok(AdapterDriver::Redis),
173            "redis-cluster" => Ok(AdapterDriver::RedisCluster),
174            "nats" => Ok(AdapterDriver::Nats),
175            "pulsar" => Ok(AdapterDriver::Pulsar),
176            "rabbitmq" | "rabbit-mq" => Ok(AdapterDriver::RabbitMq),
177            "google-pubsub" | "gcp-pubsub" | "pubsub" => Ok(AdapterDriver::GooglePubSub),
178            "kafka" => Ok(AdapterDriver::Kafka),
179            _ => Err(format!("Unknown adapter driver: {s}")),
180        }
181    }
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
185#[serde(rename_all = "lowercase")]
186pub enum AppManagerDriver {
187    #[default]
188    Memory,
189    Mysql,
190    Dynamodb,
191    PgSql,
192    SurrealDb,
193    ScyllaDb,
194}
195impl FromStr for AppManagerDriver {
196    type Err = String;
197    fn from_str(s: &str) -> Result<Self, Self::Err> {
198        match s.to_lowercase().as_str() {
199            "memory" => Ok(AppManagerDriver::Memory),
200            "mysql" => Ok(AppManagerDriver::Mysql),
201            "dynamodb" => Ok(AppManagerDriver::Dynamodb),
202            "pgsql" | "postgres" | "postgresql" => Ok(AppManagerDriver::PgSql),
203            "surreal" | "surrealdb" => Ok(AppManagerDriver::SurrealDb),
204            "scylladb" | "scylla" => Ok(AppManagerDriver::ScyllaDb),
205            _ => Err(format!("Unknown app manager driver: {s}")),
206        }
207    }
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
211#[serde(rename_all = "lowercase")]
212pub enum CacheDriver {
213    #[default]
214    Memory,
215    Redis,
216    #[serde(rename = "redis-cluster")]
217    RedisCluster,
218    None,
219}
220
221impl FromStr for CacheDriver {
222    type Err = String;
223    fn from_str(s: &str) -> Result<Self, Self::Err> {
224        match s.to_lowercase().as_str() {
225            "memory" => Ok(CacheDriver::Memory),
226            "redis" => Ok(CacheDriver::Redis),
227            "redis-cluster" => Ok(CacheDriver::RedisCluster),
228            "none" => Ok(CacheDriver::None),
229            _ => Err(format!("Unknown cache driver: {s}")),
230        }
231    }
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
235#[serde(rename_all = "lowercase")]
236pub enum QueueDriver {
237    Memory,
238    #[default]
239    Redis,
240    #[serde(rename = "redis-cluster")]
241    RedisCluster,
242    Nats,
243    Pulsar,
244    RabbitMq,
245    #[serde(rename = "google-pubsub")]
246    GooglePubSub,
247    Kafka,
248    Sqs,
249    Sns,
250    None,
251}
252
253impl FromStr for QueueDriver {
254    type Err = String;
255    fn from_str(s: &str) -> Result<Self, Self::Err> {
256        match s.to_lowercase().as_str() {
257            "memory" => Ok(QueueDriver::Memory),
258            "redis" => Ok(QueueDriver::Redis),
259            "redis-cluster" => Ok(QueueDriver::RedisCluster),
260            "nats" => Ok(QueueDriver::Nats),
261            "pulsar" => Ok(QueueDriver::Pulsar),
262            "rabbitmq" | "rabbit-mq" => Ok(QueueDriver::RabbitMq),
263            "google-pubsub" | "gcp-pubsub" | "pubsub" => Ok(QueueDriver::GooglePubSub),
264            "kafka" => Ok(QueueDriver::Kafka),
265            "sqs" => Ok(QueueDriver::Sqs),
266            "sns" => Ok(QueueDriver::Sns),
267            "none" => Ok(QueueDriver::None),
268            _ => Err(format!("Unknown queue driver: {s}")),
269        }
270    }
271}
272
273#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
274#[serde(rename_all = "snake_case")]
275pub enum DeltaCoordinationBackend {
276    #[default]
277    Auto,
278    None,
279    Redis,
280    RedisCluster,
281    Nats,
282}
283
284impl FromStr for DeltaCoordinationBackend {
285    type Err = String;
286
287    fn from_str(s: &str) -> Result<Self, Self::Err> {
288        match s.trim().to_ascii_lowercase().as_str() {
289            "auto" => Ok(Self::Auto),
290            "none" => Ok(Self::None),
291            "redis" => Ok(Self::Redis),
292            "redis-cluster" | "redis_cluster" => Ok(Self::RedisCluster),
293            "nats" => Ok(Self::Nats),
294            _ => Err(format!("Unknown delta coordination backend: {s}")),
295        }
296    }
297}
298
299impl AsRef<str> for QueueDriver {
300    fn as_ref(&self) -> &str {
301        match self {
302            QueueDriver::Memory => "memory",
303            QueueDriver::Redis => "redis",
304            QueueDriver::RedisCluster => "redis-cluster",
305            QueueDriver::Nats => "nats",
306            QueueDriver::Pulsar => "pulsar",
307            QueueDriver::RabbitMq => "rabbitmq",
308            QueueDriver::GooglePubSub => "google-pubsub",
309            QueueDriver::Kafka => "kafka",
310            QueueDriver::Sqs => "sqs",
311            QueueDriver::Sns => "sns",
312            QueueDriver::None => "none",
313        }
314    }
315}
316
317#[derive(Debug, Clone, Serialize, Deserialize)]
318#[serde(default)]
319pub struct RedisClusterQueueConfig {
320    pub concurrency: u32,
321    pub prefix: Option<String>,
322    pub nodes: Vec<String>,
323    pub request_timeout_ms: u64,
324}
325
326impl Default for RedisClusterQueueConfig {
327    fn default() -> Self {
328        Self {
329            concurrency: 5,
330            prefix: Some("sockudo_queue:".to_string()),
331            nodes: vec!["redis://127.0.0.1:6379".to_string()],
332            request_timeout_ms: 5000,
333        }
334    }
335}
336
337#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
338#[serde(rename_all = "lowercase")]
339pub enum MetricsDriver {
340    #[default]
341    Prometheus,
342}
343
344#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
345#[serde(rename_all = "lowercase")]
346pub enum LogOutputFormat {
347    #[default]
348    Human,
349    Json,
350}
351
352impl FromStr for LogOutputFormat {
353    type Err = String;
354    fn from_str(s: &str) -> Result<Self, Self::Err> {
355        match s.to_lowercase().as_str() {
356            "human" => Ok(LogOutputFormat::Human),
357            "json" => Ok(LogOutputFormat::Json),
358            _ => Err(format!("Unknown log output format: {s}")),
359        }
360    }
361}
362
363impl FromStr for MetricsDriver {
364    type Err = String;
365    fn from_str(s: &str) -> Result<Self, Self::Err> {
366        match s.to_lowercase().as_str() {
367            "prometheus" => Ok(MetricsDriver::Prometheus),
368            _ => Err(format!("Unknown metrics driver: {s}")),
369        }
370    }
371}
372
373impl AsRef<str> for MetricsDriver {
374    fn as_ref(&self) -> &str {
375        match self {
376            MetricsDriver::Prometheus => "prometheus",
377        }
378    }
379}
380
381// --- Main Configuration Struct ---
382#[derive(Debug, Clone, Serialize, Deserialize)]
383#[serde(default)]
384pub struct ServerOptions {
385    pub adapter: AdapterConfig,
386    pub app_manager: AppManagerConfig,
387    pub cache: CacheConfig,
388    pub channel_limits: ChannelLimits,
389    pub cors: CorsConfig,
390    pub database: DatabaseConfig,
391    pub database_pooling: DatabasePooling,
392    pub debug: bool,
393    pub event_limits: EventLimits,
394    pub host: String,
395    pub http_api: HttpApiConfig,
396    pub instance: InstanceConfig,
397    pub logging: Option<LoggingConfig>,
398    pub metrics: MetricsConfig,
399    pub mode: String,
400    pub port: u16,
401    pub path_prefix: String,
402    pub presence: PresenceConfig,
403    pub queue: QueueConfig,
404    pub rate_limiter: RateLimiterConfig,
405    pub shutdown_grace_period: u64,
406    pub ssl: SslConfig,
407    pub user_authentication_timeout: u64,
408    pub webhooks: WebhooksConfig,
409    pub websocket_max_payload_kb: u32,
410    pub cleanup: CleanupConfig,
411    pub activity_timeout: u64,
412    pub cluster_health: ClusterHealthConfig,
413    pub unix_socket: UnixSocketConfig,
414    pub delta_compression: DeltaCompressionOptionsConfig,
415    pub tag_filtering: TagFilteringConfig,
416    pub websocket: WebSocketConfig,
417    pub connection_recovery: ConnectionRecoveryConfig,
418    pub history: HistoryConfig,
419    pub presence_history: PresenceHistoryConfig,
420    pub idempotency: IdempotencyConfig,
421    pub ephemeral: EphemeralConfig,
422    pub echo_control: EchoControlConfig,
423    pub event_name_filtering: EventNameFilteringConfig,
424}
425
426// --- Configuration Sub-Structs ---
427
428#[derive(Debug, Clone, Serialize, Deserialize)]
429#[serde(default)]
430pub struct SqsQueueConfig {
431    pub region: String,
432    pub queue_url_prefix: Option<String>,
433    pub visibility_timeout: i32,
434    pub endpoint_url: Option<String>,
435    pub max_messages: i32,
436    pub wait_time_seconds: i32,
437    pub concurrency: u32,
438    pub fifo: bool,
439    pub message_group_id: Option<String>,
440}
441
442#[derive(Debug, Clone, Serialize, Deserialize)]
443#[serde(default)]
444pub struct SnsQueueConfig {
445    pub region: String,
446    pub topic_arn: String,
447    pub endpoint_url: Option<String>,
448}
449
450#[derive(Debug, Clone, Serialize, Deserialize)]
451#[serde(default)]
452pub struct AdapterConfig {
453    pub driver: AdapterDriver,
454    pub redis: RedisAdapterConfig,
455    pub cluster: RedisClusterAdapterConfig,
456    pub nats: NatsAdapterConfig,
457    pub pulsar: PulsarAdapterConfig,
458    pub rabbitmq: RabbitMqAdapterConfig,
459    pub google_pubsub: GooglePubSubAdapterConfig,
460    pub kafka: KafkaAdapterConfig,
461    #[serde(default = "default_buffer_multiplier_per_cpu")]
462    pub buffer_multiplier_per_cpu: usize,
463    pub cluster_health: ClusterHealthConfig,
464    #[serde(default = "default_enable_socket_counting")]
465    pub enable_socket_counting: bool,
466    #[serde(default = "default_fallback_to_local")]
467    pub fallback_to_local: bool,
468}
469
470fn default_enable_socket_counting() -> bool {
471    true
472}
473
474fn default_fallback_to_local() -> bool {
475    true
476}
477
478fn default_buffer_multiplier_per_cpu() -> usize {
479    64
480}
481
482impl Default for AdapterConfig {
483    fn default() -> Self {
484        Self {
485            driver: AdapterDriver::default(),
486            redis: RedisAdapterConfig::default(),
487            cluster: RedisClusterAdapterConfig::default(),
488            nats: NatsAdapterConfig::default(),
489            pulsar: PulsarAdapterConfig::default(),
490            rabbitmq: RabbitMqAdapterConfig::default(),
491            google_pubsub: GooglePubSubAdapterConfig::default(),
492            kafka: KafkaAdapterConfig::default(),
493            buffer_multiplier_per_cpu: default_buffer_multiplier_per_cpu(),
494            cluster_health: ClusterHealthConfig::default(),
495            enable_socket_counting: default_enable_socket_counting(),
496            fallback_to_local: default_fallback_to_local(),
497        }
498    }
499}
500
501#[derive(Debug, Clone, Serialize, Deserialize)]
502#[serde(default)]
503pub struct RedisAdapterConfig {
504    pub requests_timeout: u64,
505    pub prefix: String,
506    pub redis_pub_options: AHashMap<String, sonic_rs::Value>,
507    pub redis_sub_options: AHashMap<String, sonic_rs::Value>,
508    pub cluster_mode: bool,
509}
510
511#[derive(Debug, Clone, Serialize, Deserialize)]
512#[serde(default)]
513pub struct RedisClusterAdapterConfig {
514    pub nodes: Vec<String>,
515    pub prefix: String,
516    pub request_timeout_ms: u64,
517    pub use_connection_manager: bool,
518    #[serde(default)]
519    pub use_sharded_pubsub: bool,
520}
521
522#[derive(Debug, Clone, Serialize, Deserialize)]
523#[serde(default)]
524pub struct NatsAdapterConfig {
525    pub servers: Vec<String>,
526    pub prefix: String,
527    pub request_timeout_ms: u64,
528    pub username: Option<String>,
529    pub password: Option<String>,
530    pub token: Option<String>,
531    pub connection_timeout_ms: u64,
532    pub nodes_number: Option<u32>,
533    pub discovery_max_wait_ms: u64,
534    pub discovery_idle_wait_ms: u64,
535}
536
537#[derive(Debug, Clone, Serialize, Deserialize)]
538#[serde(default)]
539pub struct PulsarAdapterConfig {
540    pub url: String,
541    pub prefix: String,
542    pub request_timeout_ms: u64,
543    pub token: Option<String>,
544    pub nodes_number: Option<u32>,
545}
546
547#[derive(Debug, Clone, Serialize, Deserialize)]
548#[serde(default)]
549pub struct RabbitMqAdapterConfig {
550    pub url: String,
551    pub prefix: String,
552    pub request_timeout_ms: u64,
553    pub connection_timeout_ms: u64,
554    pub nodes_number: Option<u32>,
555}
556
557#[derive(Debug, Clone, Serialize, Deserialize)]
558#[serde(default)]
559pub struct GooglePubSubAdapterConfig {
560    pub project_id: String,
561    pub prefix: String,
562    pub request_timeout_ms: u64,
563    pub emulator_host: Option<String>,
564    pub nodes_number: Option<u32>,
565}
566
567#[derive(Debug, Clone, Serialize, Deserialize)]
568#[serde(default)]
569pub struct KafkaAdapterConfig {
570    pub brokers: Vec<String>,
571    pub prefix: String,
572    pub request_timeout_ms: u64,
573    pub security_protocol: Option<String>,
574    pub sasl_mechanism: Option<String>,
575    pub sasl_username: Option<String>,
576    pub sasl_password: Option<String>,
577    pub nodes_number: Option<u32>,
578}
579
580#[derive(Debug, Clone, Serialize, Deserialize, Default)]
581#[serde(default)]
582pub struct AppManagerConfig {
583    pub driver: AppManagerDriver,
584    pub array: ArrayConfig,
585    pub cache: CacheSettings,
586    pub scylladb: ScyllaDbSettings,
587}
588
589#[derive(Debug, Clone, Serialize, Deserialize, Default)]
590#[serde(default)]
591pub struct ArrayConfig {
592    pub apps: Vec<App>,
593}
594
595#[derive(Debug, Clone, Serialize, Deserialize)]
596#[serde(default)]
597pub struct CacheSettings {
598    pub enabled: bool,
599    pub ttl: u64,
600}
601
602#[derive(Debug, Clone, Serialize, Deserialize)]
603#[serde(default)]
604pub struct MemoryCacheOptions {
605    pub ttl: u64,
606    pub cleanup_interval: u64,
607    pub max_capacity: u64,
608}
609
610#[derive(Debug, Clone, Serialize, Deserialize)]
611#[serde(default)]
612pub struct CacheConfig {
613    pub driver: CacheDriver,
614    pub redis: RedisConfig,
615    pub memory: MemoryCacheOptions,
616}
617
618#[derive(Debug, Clone, Serialize, Deserialize, Default)]
619#[serde(default)]
620pub struct RedisConfig {
621    pub prefix: Option<String>,
622    pub url_override: Option<String>,
623    pub cluster_mode: bool,
624}
625
626#[derive(Debug, Clone, Serialize, Deserialize)]
627#[serde(default)]
628pub struct ChannelLimits {
629    pub max_name_length: u32,
630    pub cache_ttl: u64,
631}
632
633#[derive(Debug, Clone, Serialize, Deserialize)]
634#[serde(default)]
635pub struct CorsConfig {
636    pub credentials: bool,
637    #[serde(deserialize_with = "deserialize_and_validate_cors_origins")]
638    pub origin: Vec<String>,
639    pub methods: Vec<String>,
640    pub allowed_headers: Vec<String>,
641}
642
643fn deserialize_and_validate_cors_origins<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
644where
645    D: serde::Deserializer<'de>,
646{
647    use serde::de::Error;
648    let origins = Vec::<String>::deserialize(deserializer)?;
649
650    if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&origins) {
651        return Err(D::Error::custom(format!(
652            "CORS origin pattern validation failed: {}",
653            e
654        )));
655    }
656
657    Ok(origins)
658}
659
660#[derive(Debug, Clone, Serialize, Deserialize, Default)]
661#[serde(default)]
662pub struct DatabaseConfig {
663    pub mysql: DatabaseConnection,
664    pub postgres: DatabaseConnection,
665    pub redis: RedisConnection,
666    pub dynamodb: DynamoDbSettings,
667    pub surrealdb: SurrealDbSettings,
668    pub scylladb: ScyllaDbSettings,
669}
670
671#[derive(Debug, Clone, Serialize, Deserialize)]
672#[serde(default)]
673pub struct DatabaseConnection {
674    pub host: String,
675    pub port: u16,
676    pub username: String,
677    pub password: String,
678    pub database: String,
679    pub table_name: String,
680    pub connection_pool_size: u32,
681    pub pool_min: Option<u32>,
682    pub pool_max: Option<u32>,
683    pub cache_ttl: u64,
684    pub cache_cleanup_interval: u64,
685    pub cache_max_capacity: u64,
686}
687
688#[derive(Debug, Clone, Serialize, Deserialize)]
689#[serde(default)]
690pub struct RedisConnection {
691    pub host: String,
692    pub port: u16,
693    pub db: u32,
694    pub username: Option<String>,
695    pub password: Option<String>,
696    pub key_prefix: String,
697    pub sentinels: Vec<RedisSentinel>,
698    pub sentinel_password: Option<String>,
699    pub name: String,
700    pub cluster: RedisClusterConnection,
701    /// Legacy field kept for backward compatibility. Prefer `database.redis.cluster.nodes`.
702    pub cluster_nodes: Vec<ClusterNode>,
703}
704
705#[derive(Debug, Clone, Serialize, Deserialize)]
706#[serde(default)]
707pub struct RedisSentinel {
708    pub host: String,
709    pub port: u16,
710}
711
712#[derive(Debug, Clone, Serialize, Deserialize, Default)]
713#[serde(default)]
714pub struct RedisClusterConnection {
715    pub nodes: Vec<ClusterNode>,
716    pub username: Option<String>,
717    pub password: Option<String>,
718    #[serde(alias = "useTLS")]
719    pub use_tls: bool,
720}
721
722impl RedisConnection {
723    /// Returns true if Redis Sentinel is configured.
724    pub fn is_sentinel_configured(&self) -> bool {
725        !self.sentinels.is_empty()
726    }
727
728    /// Builds a Redis connection URL based on the configuration.
729    pub fn to_url(&self) -> String {
730        if self.is_sentinel_configured() {
731            self.build_sentinel_url()
732        } else {
733            self.build_standard_url()
734        }
735    }
736
737    fn build_standard_url(&self) -> String {
738        // Extract scheme from host if present, otherwise default to redis://
739        let (scheme, host) = if self.host.starts_with("rediss://") {
740            ("rediss://", self.host.trim_start_matches("rediss://"))
741        } else if self.host.starts_with("redis://") {
742            ("redis://", self.host.trim_start_matches("redis://"))
743        } else {
744            ("redis://", self.host.as_str())
745        };
746
747        let mut url = String::from(scheme);
748
749        if let Some(ref username) = self.username {
750            url.push_str(username);
751            if let Some(ref password) = self.password {
752                url.push(':');
753                url.push_str(&urlencoding::encode(password));
754            }
755            url.push('@');
756        } else if let Some(ref password) = self.password {
757            url.push(':');
758            url.push_str(&urlencoding::encode(password));
759            url.push('@');
760        }
761
762        url.push_str(host);
763        url.push(':');
764        url.push_str(&self.port.to_string());
765        url.push('/');
766        url.push_str(&self.db.to_string());
767
768        url
769    }
770
771    fn build_sentinel_url(&self) -> String {
772        let mut url = String::from("redis+sentinel://");
773
774        if let Some(ref sentinel_password) = self.sentinel_password {
775            url.push(':');
776            url.push_str(&urlencoding::encode(sentinel_password));
777            url.push('@');
778        }
779
780        let sentinel_hosts: Vec<String> = self
781            .sentinels
782            .iter()
783            .map(|s| format!("{}:{}", s.host, s.port))
784            .collect();
785        url.push_str(&sentinel_hosts.join(","));
786
787        url.push('/');
788        url.push_str(&self.name);
789        url.push('/');
790        url.push_str(&self.db.to_string());
791
792        let mut params = Vec::new();
793        if let Some(ref password) = self.password {
794            params.push(format!("password={}", urlencoding::encode(password)));
795        }
796        if let Some(ref username) = self.username {
797            params.push(format!("username={}", urlencoding::encode(username)));
798        }
799
800        if !params.is_empty() {
801            url.push('?');
802            url.push_str(&params.join("&"));
803        }
804
805        url
806    }
807
808    /// Returns true when cluster nodes are configured via either the new (`cluster.nodes`)
809    /// or legacy (`cluster_nodes`) field.
810    pub fn has_cluster_nodes(&self) -> bool {
811        !self.cluster.nodes.is_empty() || !self.cluster_nodes.is_empty()
812    }
813
814    /// Returns normalized Redis Cluster seed URLs from the canonical cluster configuration.
815    /// Falls back to legacy `cluster_nodes` for backward compatibility.
816    pub fn cluster_node_urls(&self) -> Vec<String> {
817        if !self.cluster.nodes.is_empty() {
818            return self.build_cluster_urls(&self.cluster.nodes);
819        }
820        self.build_cluster_urls(&self.cluster_nodes)
821    }
822
823    /// Normalizes any list of seed strings (`host:port`, `redis://...`, `rediss://...`) using
824    /// shared cluster auth/TLS options.
825    pub fn normalize_cluster_seed_urls(&self, seeds: &[String]) -> Vec<String> {
826        self.build_cluster_urls(
827            &seeds
828                .iter()
829                .filter_map(|seed| ClusterNode::from_seed(seed))
830                .collect::<Vec<ClusterNode>>(),
831        )
832    }
833
834    fn build_cluster_urls(&self, nodes: &[ClusterNode]) -> Vec<String> {
835        let username = self
836            .cluster
837            .username
838            .as_deref()
839            .or(self.username.as_deref());
840        let password = self
841            .cluster
842            .password
843            .as_deref()
844            .or(self.password.as_deref());
845        let use_tls = self.cluster.use_tls;
846
847        nodes
848            .iter()
849            .map(|node| node.to_url_with_options(use_tls, username, password))
850            .collect()
851    }
852}
853
854impl RedisSentinel {
855    pub fn to_host_port(&self) -> String {
856        format!("{}:{}", self.host, self.port)
857    }
858}
859
860#[derive(Debug, Clone, Serialize, Deserialize)]
861#[serde(default)]
862pub struct ClusterNode {
863    pub host: String,
864    pub port: u16,
865}
866
867impl ClusterNode {
868    pub fn to_url(&self) -> String {
869        self.to_url_with_options(false, None, None)
870    }
871
872    pub fn to_url_with_options(
873        &self,
874        use_tls: bool,
875        username: Option<&str>,
876        password: Option<&str>,
877    ) -> String {
878        let host = self.host.trim();
879
880        if host.starts_with("redis://") || host.starts_with("rediss://") {
881            if let Ok(parsed) = Url::parse(host)
882                && let Some(host_str) = parsed.host_str()
883            {
884                let scheme = parsed.scheme();
885                let port = parsed.port_or_known_default().unwrap_or(self.port);
886                let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
887                let parsed_password = parsed.password();
888                let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
889                let (effective_username, effective_password) = if has_embedded_auth {
890                    (parsed_username, parsed_password)
891                } else {
892                    (username, password)
893                };
894
895                return build_redis_url(
896                    scheme,
897                    host_str,
898                    port,
899                    effective_username,
900                    effective_password,
901                );
902            }
903
904            // Fallback for malformed URLs
905            let has_port = if let Some(bracket_pos) = host.rfind(']') {
906                host[bracket_pos..].contains(':')
907            } else {
908                host.split(':').count() >= 3
909            };
910            let base = if has_port {
911                host.to_string()
912            } else {
913                format!("{}:{}", host, self.port)
914            };
915
916            if let Ok(parsed) = Url::parse(&base) {
917                let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
918                let parsed_password = parsed.password();
919                if let Some(host_str) = parsed.host_str() {
920                    let port = parsed.port_or_known_default().unwrap_or(self.port);
921                    let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
922                    let (effective_username, effective_password) = if has_embedded_auth {
923                        (parsed_username, parsed_password)
924                    } else {
925                        (username, password)
926                    };
927                    return build_redis_url(
928                        parsed.scheme(),
929                        host_str,
930                        port,
931                        effective_username,
932                        effective_password,
933                    );
934                }
935            }
936            return base;
937        }
938
939        let (normalized_host, normalized_port) = split_plain_host_and_port(host, self.port);
940        let scheme = if use_tls { "rediss" } else { "redis" };
941        build_redis_url(
942            scheme,
943            &normalized_host,
944            normalized_port,
945            username,
946            password,
947        )
948    }
949
950    pub fn from_seed(seed: &str) -> Option<Self> {
951        let trimmed = seed.trim();
952        if trimmed.is_empty() {
953            return None;
954        }
955
956        if trimmed.starts_with("redis://") || trimmed.starts_with("rediss://") {
957            let port = Url::parse(trimmed)
958                .ok()
959                .and_then(|parsed| parsed.port_or_known_default())
960                .unwrap_or(6379);
961            return Some(Self {
962                host: trimmed.to_string(),
963                port,
964            });
965        }
966
967        let (host, port) = split_plain_host_and_port(trimmed, 6379);
968        Some(Self { host, port })
969    }
970}
971
972fn split_plain_host_and_port(raw_host: &str, default_port: u16) -> (String, u16) {
973    let host = raw_host.trim();
974
975    // Handle bracketed IPv6: [::1]:6379
976    if host.starts_with('[') {
977        if let Some(end_bracket) = host.find(']') {
978            let host_part = host[1..end_bracket].to_string();
979            let remainder = &host[end_bracket + 1..];
980            if let Some(port_str) = remainder.strip_prefix(':')
981                && let Ok(port) = port_str.parse::<u16>()
982            {
983                return (host_part, port);
984            }
985            return (host_part, default_port);
986        }
987        return (host.to_string(), default_port);
988    }
989
990    // Handle hostname/IP with port: host:6379
991    if host.matches(':').count() == 1
992        && let Some((host_part, port_part)) = host.rsplit_once(':')
993        && let Ok(port) = port_part.parse::<u16>()
994    {
995        return (host_part.to_string(), port);
996    }
997
998    (host.to_string(), default_port)
999}
1000
1001fn build_redis_url(
1002    scheme: &str,
1003    host: &str,
1004    port: u16,
1005    username: Option<&str>,
1006    password: Option<&str>,
1007) -> String {
1008    let mut url = format!("{scheme}://");
1009
1010    if let Some(user) = username {
1011        url.push_str(&urlencoding::encode(user));
1012        if let Some(pass) = password {
1013            url.push(':');
1014            url.push_str(&urlencoding::encode(pass));
1015        }
1016        url.push('@');
1017    } else if let Some(pass) = password {
1018        url.push(':');
1019        url.push_str(&urlencoding::encode(pass));
1020        url.push('@');
1021    }
1022
1023    if host.contains(':') && !host.starts_with('[') {
1024        url.push('[');
1025        url.push_str(host);
1026        url.push(']');
1027    } else {
1028        url.push_str(host);
1029    }
1030    url.push(':');
1031    url.push_str(&port.to_string());
1032    url
1033}
1034
1035#[derive(Debug, Clone, Serialize, Deserialize)]
1036#[serde(default)]
1037pub struct DatabasePooling {
1038    pub enabled: bool,
1039    pub min: u32,
1040    pub max: u32,
1041}
1042
1043#[derive(Debug, Clone, Serialize, Deserialize)]
1044#[serde(default)]
1045pub struct EventLimits {
1046    pub max_channels_at_once: u32,
1047    pub max_name_length: u32,
1048    pub max_payload_in_kb: u32,
1049    pub max_batch_size: u32,
1050}
1051
1052#[derive(Debug, Clone, Serialize, Deserialize)]
1053#[serde(default)]
1054pub struct IdempotencyConfig {
1055    /// Whether idempotency key support is enabled
1056    pub enabled: bool,
1057    /// TTL in seconds for idempotency keys (default: 120s, like Ably's 2-minute window)
1058    pub ttl_seconds: u64,
1059    /// Maximum length of an idempotency key
1060    pub max_key_length: usize,
1061}
1062
1063#[derive(Debug, Clone, Serialize, Deserialize)]
1064#[serde(default)]
1065pub struct EphemeralConfig {
1066    /// Whether ephemeral message handling is enabled.
1067    pub enabled: bool,
1068}
1069
1070#[derive(Debug, Clone, Serialize, Deserialize)]
1071#[serde(default)]
1072pub struct EchoControlConfig {
1073    /// Whether connection-level and per-message echo control is enabled.
1074    pub enabled: bool,
1075    /// Default echo behavior for new V2 connections when the query param is omitted.
1076    pub default_echo_messages: bool,
1077}
1078
1079#[derive(Debug, Clone, Serialize, Deserialize)]
1080#[serde(default)]
1081pub struct EventNameFilteringConfig {
1082    /// Whether per-subscription event name filtering is enabled.
1083    pub enabled: bool,
1084    /// Maximum event names allowed in a single filter.
1085    pub max_events_per_filter: usize,
1086    /// Maximum length for each event name in a filter.
1087    pub max_event_name_length: usize,
1088}
1089
1090#[derive(Debug, Clone, Serialize, Deserialize)]
1091#[serde(default)]
1092pub struct ConnectionRecoveryConfig {
1093    /// Whether connection recovery (resume) is enabled.
1094    /// When enabled, the server keeps a bounded replay buffer per channel so that
1095    /// reconnecting clients can receive missed messages. Disabled by default for
1096    /// Pusher protocol compatibility.
1097    pub enabled: bool,
1098    /// How long messages stay in the replay buffer (seconds). Default: 120 (2 min).
1099    pub buffer_ttl_seconds: u64,
1100    /// Maximum number of messages kept per channel. Default: 100.
1101    pub max_buffer_size: usize,
1102}
1103
1104#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1105#[serde(rename_all = "lowercase")]
1106pub enum HistoryBackend {
1107    #[default]
1108    Postgres,
1109    Mysql,
1110    DynamoDb,
1111    SurrealDb,
1112    ScyllaDb,
1113    Memory,
1114}
1115
1116impl FromStr for HistoryBackend {
1117    type Err = String;
1118    fn from_str(s: &str) -> Result<Self, Self::Err> {
1119        match s.to_lowercase().as_str() {
1120            "postgres" | "postgresql" | "pgsql" => Ok(Self::Postgres),
1121            "mysql" => Ok(Self::Mysql),
1122            "dynamodb" => Ok(Self::DynamoDb),
1123            "surrealdb" | "surreal" => Ok(Self::SurrealDb),
1124            "scylladb" | "scylla" => Ok(Self::ScyllaDb),
1125            "memory" => Ok(Self::Memory),
1126            _ => Err(format!("Unknown history backend: {s}")),
1127        }
1128    }
1129}
1130
1131#[derive(Debug, Clone, Serialize, Deserialize)]
1132#[serde(default)]
1133pub struct PostgresHistoryConfig {
1134    pub table_prefix: String,
1135    pub write_timeout_ms: u64,
1136}
1137
1138impl Default for PostgresHistoryConfig {
1139    fn default() -> Self {
1140        Self {
1141            table_prefix: "sockudo_history".to_string(),
1142            write_timeout_ms: 5000,
1143        }
1144    }
1145}
1146
1147#[derive(Debug, Clone, Serialize, Deserialize)]
1148#[serde(default)]
1149pub struct MySqlHistoryConfig {
1150    pub table_prefix: String,
1151    pub write_timeout_ms: u64,
1152}
1153
1154impl Default for MySqlHistoryConfig {
1155    fn default() -> Self {
1156        Self {
1157            table_prefix: "sockudo_history".to_string(),
1158            write_timeout_ms: 5000,
1159        }
1160    }
1161}
1162
1163#[derive(Debug, Clone, Serialize, Deserialize)]
1164#[serde(default)]
1165pub struct DynamoDbHistoryConfig {
1166    pub table_prefix: String,
1167    pub write_timeout_ms: u64,
1168}
1169
1170impl Default for DynamoDbHistoryConfig {
1171    fn default() -> Self {
1172        Self {
1173            table_prefix: "sockudo_history".to_string(),
1174            write_timeout_ms: 5000,
1175        }
1176    }
1177}
1178
1179#[derive(Debug, Clone, Serialize, Deserialize)]
1180#[serde(default)]
1181pub struct SurrealDbHistoryConfig {
1182    pub table_prefix: String,
1183    pub write_timeout_ms: u64,
1184}
1185
1186impl Default for SurrealDbHistoryConfig {
1187    fn default() -> Self {
1188        Self {
1189            table_prefix: "sockudo_history".to_string(),
1190            write_timeout_ms: 5000,
1191        }
1192    }
1193}
1194
1195#[derive(Debug, Clone, Serialize, Deserialize)]
1196#[serde(default)]
1197pub struct ScyllaDbHistoryConfig {
1198    pub table_prefix: String,
1199    pub write_timeout_ms: u64,
1200}
1201
1202impl Default for ScyllaDbHistoryConfig {
1203    fn default() -> Self {
1204        Self {
1205            table_prefix: "sockudo_history".to_string(),
1206            write_timeout_ms: 5000,
1207        }
1208    }
1209}
1210
1211#[derive(Debug, Clone, Serialize, Deserialize)]
1212#[serde(default)]
1213pub struct HistoryConfig {
1214    pub enabled: bool,
1215    pub rewind_enabled: bool,
1216    pub backend: HistoryBackend,
1217    pub retention_window_seconds: u64,
1218    pub max_page_size: usize,
1219    pub max_messages_per_channel: Option<usize>,
1220    pub max_bytes_per_channel: Option<u64>,
1221    pub writer_shards: usize,
1222    pub writer_queue_capacity: usize,
1223    pub postgres: PostgresHistoryConfig,
1224    pub mysql: MySqlHistoryConfig,
1225    pub dynamodb: DynamoDbHistoryConfig,
1226    pub surrealdb: SurrealDbHistoryConfig,
1227    pub scylladb: ScyllaDbHistoryConfig,
1228}
1229
1230impl Default for HistoryConfig {
1231    fn default() -> Self {
1232        Self {
1233            enabled: false,
1234            rewind_enabled: true,
1235            backend: HistoryBackend::Postgres,
1236            retention_window_seconds: 86400,
1237            max_page_size: 100,
1238            max_messages_per_channel: None,
1239            max_bytes_per_channel: None,
1240            writer_shards: 16,
1241            writer_queue_capacity: 4096,
1242            postgres: PostgresHistoryConfig::default(),
1243            mysql: MySqlHistoryConfig::default(),
1244            dynamodb: DynamoDbHistoryConfig::default(),
1245            surrealdb: SurrealDbHistoryConfig::default(),
1246            scylladb: ScyllaDbHistoryConfig::default(),
1247        }
1248    }
1249}
1250
1251#[derive(Debug, Clone, Serialize, Deserialize)]
1252#[serde(default)]
1253pub struct PresenceHistoryConfig {
1254    pub enabled: bool,
1255    pub retention_window_seconds: u64,
1256    pub max_page_size: usize,
1257    pub max_events_per_channel: Option<usize>,
1258    pub max_bytes_per_channel: Option<u64>,
1259}
1260
1261impl Default for PresenceHistoryConfig {
1262    fn default() -> Self {
1263        Self {
1264            enabled: false,
1265            retention_window_seconds: 86400,
1266            max_page_size: 100,
1267            max_events_per_channel: None,
1268            max_bytes_per_channel: None,
1269        }
1270    }
1271}
1272
1273#[derive(Debug, Clone, Serialize, Deserialize)]
1274#[serde(default)]
1275pub struct HttpApiConfig {
1276    pub request_limit_in_mb: u32,
1277    pub accept_traffic: AcceptTraffic,
1278    pub usage_enabled: bool,
1279}
1280
1281#[derive(Debug, Clone, Serialize, Deserialize)]
1282#[serde(default)]
1283pub struct AcceptTraffic {
1284    pub memory_threshold: f64,
1285}
1286
1287#[derive(Debug, Clone, Serialize, Deserialize)]
1288#[serde(default)]
1289pub struct InstanceConfig {
1290    pub process_id: String,
1291}
1292
1293#[derive(Debug, Clone, Serialize, Deserialize)]
1294#[serde(default)]
1295pub struct MetricsConfig {
1296    pub enabled: bool,
1297    pub driver: MetricsDriver,
1298    pub host: String,
1299    pub prometheus: PrometheusConfig,
1300    pub port: u16,
1301}
1302
1303#[derive(Debug, Clone, Serialize, Deserialize)]
1304#[serde(default)]
1305pub struct PrometheusConfig {
1306    pub prefix: String,
1307}
1308
1309#[derive(Debug, Clone, Serialize, Deserialize)]
1310#[serde(default)]
1311pub struct LoggingConfig {
1312    pub colors_enabled: bool,
1313    pub include_target: bool,
1314}
1315
1316#[derive(Debug, Clone, Serialize, Deserialize)]
1317#[serde(default)]
1318pub struct PresenceConfig {
1319    pub max_members_per_channel: u32,
1320    pub max_member_size_in_kb: u32,
1321}
1322
1323/// WebSocket connection buffer configuration
1324/// Controls backpressure handling for slow consumers
1325#[derive(Debug, Clone, Serialize, Deserialize)]
1326#[serde(default)]
1327pub struct WebSocketConfig {
1328    pub max_messages: Option<usize>,
1329    pub max_bytes: Option<usize>,
1330    pub disconnect_on_buffer_full: bool,
1331    pub max_message_size: usize,
1332    pub max_frame_size: usize,
1333    pub write_buffer_size: usize,
1334    pub max_backpressure: usize,
1335    pub auto_ping: bool,
1336    pub ping_interval: u32,
1337    pub idle_timeout: u32,
1338    pub compression: String,
1339}
1340
1341impl Default for WebSocketConfig {
1342    fn default() -> Self {
1343        Self {
1344            max_messages: Some(1000),
1345            max_bytes: None,
1346            disconnect_on_buffer_full: true,
1347            max_message_size: 64 * 1024 * 1024,
1348            max_frame_size: 16 * 1024 * 1024,
1349            write_buffer_size: 16 * 1024,
1350            max_backpressure: 1024 * 1024,
1351            auto_ping: true,
1352            ping_interval: 30,
1353            idle_timeout: 120,
1354            compression: "disabled".to_string(),
1355        }
1356    }
1357}
1358
1359impl WebSocketConfig {
1360    /// Convert to WebSocketBufferConfig for runtime use
1361    pub fn to_buffer_config(&self) -> crate::websocket::WebSocketBufferConfig {
1362        use crate::websocket::{BufferLimit, WebSocketBufferConfig};
1363
1364        let limit = match (self.max_messages, self.max_bytes) {
1365            (Some(messages), Some(bytes)) => BufferLimit::Both { messages, bytes },
1366            (Some(messages), None) => BufferLimit::Messages(messages),
1367            (None, Some(bytes)) => BufferLimit::Bytes(bytes),
1368            (None, None) => BufferLimit::Messages(1000),
1369        };
1370
1371        WebSocketBufferConfig {
1372            limit,
1373            disconnect_on_full: self.disconnect_on_buffer_full,
1374        }
1375    }
1376
1377    /// Convert to native sockudo-ws runtime configuration.
1378    pub fn to_sockudo_ws_config(
1379        &self,
1380        websocket_max_payload_kb: u32,
1381        activity_timeout: u64,
1382    ) -> sockudo_ws::Config {
1383        use sockudo_ws::Compression;
1384
1385        let compression = match self.compression.to_lowercase().as_str() {
1386            "dedicated" => Compression::Dedicated,
1387            "shared" => Compression::Shared,
1388            "window256b" => Compression::Window256B,
1389            "window1kb" => Compression::Window1KB,
1390            "window2kb" => Compression::Window2KB,
1391            "window4kb" => Compression::Window4KB,
1392            "window8kb" => Compression::Window8KB,
1393            "window16kb" => Compression::Window16KB,
1394            "window32kb" => Compression::Window32KB,
1395            _ => Compression::Disabled,
1396        };
1397
1398        sockudo_ws::Config::builder()
1399            .max_payload_length(
1400                self.max_bytes
1401                    .unwrap_or(websocket_max_payload_kb as usize * 1024),
1402            )
1403            .max_message_size(self.max_message_size)
1404            .max_frame_size(self.max_frame_size)
1405            .write_buffer_size(self.write_buffer_size)
1406            .max_backpressure(self.max_backpressure)
1407            .idle_timeout(self.idle_timeout)
1408            .auto_ping(self.auto_ping)
1409            .ping_interval(self.ping_interval.max((activity_timeout / 2).max(5) as u32))
1410            .compression(compression)
1411            .build()
1412    }
1413}
1414
1415#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1416#[serde(default)]
1417pub struct QueueConfig {
1418    pub driver: QueueDriver,
1419    pub redis: RedisQueueConfig,
1420    pub redis_cluster: RedisClusterQueueConfig,
1421    pub nats: NatsAdapterConfig,
1422    pub pulsar: PulsarAdapterConfig,
1423    pub rabbitmq: RabbitMqAdapterConfig,
1424    pub google_pubsub: GooglePubSubAdapterConfig,
1425    pub kafka: KafkaAdapterConfig,
1426    pub sqs: SqsQueueConfig,
1427    pub sns: SnsQueueConfig,
1428}
1429
1430#[derive(Debug, Clone, Serialize, Deserialize)]
1431#[serde(default)]
1432pub struct RedisQueueConfig {
1433    pub concurrency: u32,
1434    pub prefix: Option<String>,
1435    pub url_override: Option<String>,
1436    pub cluster_mode: bool,
1437}
1438
1439#[derive(Clone, Debug, Serialize, Deserialize)]
1440#[serde(default)]
1441pub struct RateLimit {
1442    pub max_requests: u32,
1443    pub window_seconds: u64,
1444    pub identifier: Option<String>,
1445    pub trust_hops: Option<u32>,
1446}
1447
1448#[derive(Debug, Clone, Serialize, Deserialize)]
1449#[serde(default)]
1450pub struct RateLimiterConfig {
1451    pub enabled: bool,
1452    pub driver: CacheDriver,
1453    pub api_rate_limit: RateLimit,
1454    pub websocket_rate_limit: RateLimit,
1455    pub redis: RedisConfig,
1456}
1457
1458#[derive(Debug, Clone, Serialize, Deserialize)]
1459#[serde(default)]
1460pub struct SslConfig {
1461    pub enabled: bool,
1462    pub cert_path: String,
1463    pub key_path: String,
1464    pub passphrase: Option<String>,
1465    pub ca_path: Option<String>,
1466    pub redirect_http: bool,
1467    pub http_port: Option<u16>,
1468}
1469
1470#[derive(Debug, Clone, Serialize, Deserialize)]
1471#[serde(default)]
1472pub struct WebhooksConfig {
1473    pub batching: BatchingConfig,
1474    pub retry: WebhookRetryConfig,
1475    pub request_timeout_ms: u64,
1476}
1477
1478#[derive(Debug, Clone, Serialize, Deserialize)]
1479#[serde(default)]
1480pub struct BatchingConfig {
1481    pub enabled: bool,
1482    pub duration: u64,
1483    pub size: usize,
1484}
1485
1486#[derive(Debug, Clone, Serialize, Deserialize)]
1487#[serde(default)]
1488pub struct WebhookRetryConfig {
1489    pub enabled: bool,
1490    pub max_attempts: Option<u32>,
1491    pub max_elapsed_time_ms: u64,
1492    pub initial_backoff_ms: u64,
1493    pub max_backoff_ms: u64,
1494}
1495
1496impl Default for WebhooksConfig {
1497    fn default() -> Self {
1498        Self {
1499            batching: BatchingConfig::default(),
1500            retry: WebhookRetryConfig::default(),
1501            request_timeout_ms: 10_000,
1502        }
1503    }
1504}
1505
1506impl Default for WebhookRetryConfig {
1507    fn default() -> Self {
1508        Self {
1509            enabled: true,
1510            max_attempts: None,
1511            max_elapsed_time_ms: 300_000,
1512            initial_backoff_ms: 1_000,
1513            max_backoff_ms: 60_000,
1514        }
1515    }
1516}
1517
1518#[derive(Debug, Clone, Serialize, Deserialize)]
1519#[serde(default)]
1520pub struct ClusterHealthConfig {
1521    pub enabled: bool,
1522    pub heartbeat_interval_ms: u64,
1523    pub node_timeout_ms: u64,
1524    pub cleanup_interval_ms: u64,
1525}
1526
1527#[derive(Debug, Clone, Serialize, Deserialize)]
1528#[serde(default)]
1529pub struct UnixSocketConfig {
1530    pub enabled: bool,
1531    pub path: String,
1532    #[serde(deserialize_with = "deserialize_octal_permission")]
1533    pub permission_mode: u32,
1534}
1535
1536#[derive(Debug, Clone, Serialize, Deserialize)]
1537#[serde(default)]
1538pub struct DeltaCompressionOptionsConfig {
1539    pub enabled: bool,
1540    pub algorithm: String,
1541    pub full_message_interval: u32,
1542    pub min_message_size: usize,
1543    pub max_state_age_secs: u64,
1544    pub max_channel_states_per_socket: usize,
1545    pub max_conflation_states_per_channel: Option<usize>,
1546    pub conflation_key_path: Option<String>,
1547    pub cluster_coordination: bool,
1548    pub coordination_backend: DeltaCoordinationBackend,
1549    pub omit_delta_algorithm: bool,
1550}
1551
1552/// Cleanup system configuration (minimal version for options; full impl lives in sockudo crate)
1553#[derive(Debug, Clone, Serialize, Deserialize)]
1554#[serde(default)]
1555pub struct CleanupConfig {
1556    pub queue_buffer_size: usize,
1557    pub batch_size: usize,
1558    pub batch_timeout_ms: u64,
1559    pub worker_threads: WorkerThreadsConfig,
1560    pub max_retry_attempts: u32,
1561    pub async_enabled: bool,
1562    pub fallback_to_sync: bool,
1563}
1564
1565/// Worker threads configuration for the cleanup system
1566#[derive(Debug, Clone)]
1567pub enum WorkerThreadsConfig {
1568    Auto,
1569    Fixed(usize),
1570}
1571
1572impl serde::Serialize for WorkerThreadsConfig {
1573    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1574    where
1575        S: serde::Serializer,
1576    {
1577        match self {
1578            WorkerThreadsConfig::Auto => serializer.serialize_str("auto"),
1579            WorkerThreadsConfig::Fixed(n) => serializer.serialize_u64(*n as u64),
1580        }
1581    }
1582}
1583
1584impl<'de> serde::Deserialize<'de> for WorkerThreadsConfig {
1585    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1586    where
1587        D: serde::Deserializer<'de>,
1588    {
1589        use serde::de;
1590        struct WorkerThreadsVisitor;
1591        impl<'de> de::Visitor<'de> for WorkerThreadsVisitor {
1592            type Value = WorkerThreadsConfig;
1593
1594            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1595                formatter.write_str(r#""auto" or a positive integer"#)
1596            }
1597
1598            fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
1599                if value.eq_ignore_ascii_case("auto") {
1600                    Ok(WorkerThreadsConfig::Auto)
1601                } else if let Ok(n) = value.parse::<usize>() {
1602                    Ok(WorkerThreadsConfig::Fixed(n))
1603                } else {
1604                    Err(E::custom(format!(
1605                        "expected 'auto' or a number, got '{value}'"
1606                    )))
1607                }
1608            }
1609
1610            fn visit_u64<E: de::Error>(self, value: u64) -> Result<Self::Value, E> {
1611                Ok(WorkerThreadsConfig::Fixed(value as usize))
1612            }
1613
1614            fn visit_i64<E: de::Error>(self, value: i64) -> Result<Self::Value, E> {
1615                if value >= 0 {
1616                    Ok(WorkerThreadsConfig::Fixed(value as usize))
1617                } else {
1618                    Err(E::custom("worker_threads must be non-negative"))
1619                }
1620            }
1621        }
1622        deserializer.deserialize_any(WorkerThreadsVisitor)
1623    }
1624}
1625
1626impl Default for CleanupConfig {
1627    fn default() -> Self {
1628        Self {
1629            queue_buffer_size: 1024,
1630            batch_size: 64,
1631            batch_timeout_ms: 100,
1632            worker_threads: WorkerThreadsConfig::Auto,
1633            max_retry_attempts: 3,
1634            async_enabled: true,
1635            fallback_to_sync: true,
1636        }
1637    }
1638}
1639
1640impl CleanupConfig {
1641    pub fn validate(&self) -> Result<(), String> {
1642        if self.queue_buffer_size == 0 {
1643            return Err("queue_buffer_size must be greater than 0".to_string());
1644        }
1645        if self.batch_size == 0 {
1646            return Err("batch_size must be greater than 0".to_string());
1647        }
1648        if self.batch_timeout_ms == 0 {
1649            return Err("batch_timeout_ms must be greater than 0".to_string());
1650        }
1651        if let WorkerThreadsConfig::Fixed(n) = self.worker_threads
1652            && n == 0
1653        {
1654            return Err("worker_threads must be greater than 0 when using fixed count".to_string());
1655        }
1656        Ok(())
1657    }
1658}
1659
1660#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1661#[serde(default)]
1662pub struct TagFilteringConfig {
1663    #[serde(default)]
1664    pub enabled: bool,
1665    #[serde(default = "default_true")]
1666    pub enable_tags: bool,
1667}
1668
1669fn default_true() -> bool {
1670    true
1671}
1672
1673// --- Default Implementations ---
1674
1675impl Default for ServerOptions {
1676    fn default() -> Self {
1677        Self {
1678            adapter: AdapterConfig::default(),
1679            app_manager: AppManagerConfig::default(),
1680            cache: CacheConfig::default(),
1681            channel_limits: ChannelLimits::default(),
1682            cors: CorsConfig::default(),
1683            database: DatabaseConfig::default(),
1684            database_pooling: DatabasePooling::default(),
1685            debug: false,
1686            tag_filtering: TagFilteringConfig::default(),
1687            event_limits: EventLimits::default(),
1688            host: "0.0.0.0".to_string(),
1689            http_api: HttpApiConfig::default(),
1690            instance: InstanceConfig::default(),
1691            logging: None,
1692            metrics: MetricsConfig::default(),
1693            mode: "production".to_string(),
1694            port: 6001,
1695            path_prefix: "/".to_string(),
1696            presence: PresenceConfig::default(),
1697            queue: QueueConfig::default(),
1698            rate_limiter: RateLimiterConfig::default(),
1699            shutdown_grace_period: 10,
1700            ssl: SslConfig::default(),
1701            user_authentication_timeout: 3600,
1702            webhooks: WebhooksConfig::default(),
1703            websocket_max_payload_kb: 64,
1704            cleanup: CleanupConfig::default(),
1705            activity_timeout: 120,
1706            cluster_health: ClusterHealthConfig::default(),
1707            unix_socket: UnixSocketConfig::default(),
1708            delta_compression: DeltaCompressionOptionsConfig::default(),
1709            websocket: WebSocketConfig::default(),
1710            connection_recovery: ConnectionRecoveryConfig::default(),
1711            history: HistoryConfig::default(),
1712            presence_history: PresenceHistoryConfig::default(),
1713            idempotency: IdempotencyConfig::default(),
1714            ephemeral: EphemeralConfig::default(),
1715            echo_control: EchoControlConfig::default(),
1716            event_name_filtering: EventNameFilteringConfig::default(),
1717        }
1718    }
1719}
1720
1721impl Default for SqsQueueConfig {
1722    fn default() -> Self {
1723        Self {
1724            region: "us-east-1".to_string(),
1725            queue_url_prefix: None,
1726            visibility_timeout: 30,
1727            endpoint_url: None,
1728            max_messages: 10,
1729            wait_time_seconds: 5,
1730            concurrency: 5,
1731            fifo: false,
1732            message_group_id: Some("default".to_string()),
1733        }
1734    }
1735}
1736
1737impl Default for SnsQueueConfig {
1738    fn default() -> Self {
1739        Self {
1740            region: "us-east-1".to_string(),
1741            topic_arn: String::new(),
1742            endpoint_url: None,
1743        }
1744    }
1745}
1746
1747impl Default for RedisAdapterConfig {
1748    fn default() -> Self {
1749        Self {
1750            requests_timeout: 5000,
1751            prefix: "sockudo_adapter:".to_string(),
1752            redis_pub_options: AHashMap::new(),
1753            redis_sub_options: AHashMap::new(),
1754            cluster_mode: false,
1755        }
1756    }
1757}
1758
1759impl Default for RedisClusterAdapterConfig {
1760    fn default() -> Self {
1761        Self {
1762            nodes: vec![],
1763            prefix: "sockudo_adapter:".to_string(),
1764            request_timeout_ms: 1000,
1765            use_connection_manager: true,
1766            use_sharded_pubsub: false,
1767        }
1768    }
1769}
1770
1771impl Default for NatsAdapterConfig {
1772    fn default() -> Self {
1773        Self {
1774            servers: vec!["nats://localhost:4222".to_string()],
1775            prefix: "sockudo_adapter:".to_string(),
1776            request_timeout_ms: 5000,
1777            username: None,
1778            password: None,
1779            token: None,
1780            connection_timeout_ms: 5000,
1781            nodes_number: None,
1782            discovery_max_wait_ms: 1000,
1783            discovery_idle_wait_ms: 150,
1784        }
1785    }
1786}
1787
1788impl Default for PulsarAdapterConfig {
1789    fn default() -> Self {
1790        Self {
1791            url: "pulsar://127.0.0.1:6650".to_string(),
1792            prefix: "sockudo-adapter".to_string(),
1793            request_timeout_ms: 5000,
1794            token: None,
1795            nodes_number: None,
1796        }
1797    }
1798}
1799
1800impl Default for RabbitMqAdapterConfig {
1801    fn default() -> Self {
1802        Self {
1803            url: "amqp://guest:guest@127.0.0.1:5672/%2f".to_string(),
1804            prefix: "sockudo_adapter".to_string(),
1805            request_timeout_ms: 5000,
1806            connection_timeout_ms: 5000,
1807            nodes_number: None,
1808        }
1809    }
1810}
1811
1812impl Default for GooglePubSubAdapterConfig {
1813    fn default() -> Self {
1814        Self {
1815            project_id: "".to_string(),
1816            prefix: "sockudo-adapter".to_string(),
1817            request_timeout_ms: 5000,
1818            emulator_host: None,
1819            nodes_number: None,
1820        }
1821    }
1822}
1823
1824impl Default for KafkaAdapterConfig {
1825    fn default() -> Self {
1826        Self {
1827            brokers: vec!["localhost:9092".to_string()],
1828            prefix: "sockudo_adapter".to_string(),
1829            request_timeout_ms: 5000,
1830            security_protocol: None,
1831            sasl_mechanism: None,
1832            sasl_username: None,
1833            sasl_password: None,
1834            nodes_number: None,
1835        }
1836    }
1837}
1838
1839impl Default for CacheSettings {
1840    fn default() -> Self {
1841        Self {
1842            enabled: true,
1843            ttl: 300,
1844        }
1845    }
1846}
1847
1848impl Default for MemoryCacheOptions {
1849    fn default() -> Self {
1850        Self {
1851            ttl: 300,
1852            cleanup_interval: 60,
1853            max_capacity: 10000,
1854        }
1855    }
1856}
1857
1858impl Default for CacheConfig {
1859    fn default() -> Self {
1860        Self {
1861            driver: CacheDriver::default(),
1862            redis: RedisConfig {
1863                prefix: Some("sockudo_cache:".to_string()),
1864                url_override: None,
1865                cluster_mode: false,
1866            },
1867            memory: MemoryCacheOptions::default(),
1868        }
1869    }
1870}
1871
1872impl Default for ChannelLimits {
1873    fn default() -> Self {
1874        Self {
1875            max_name_length: 200,
1876            cache_ttl: 3600,
1877        }
1878    }
1879}
1880impl Default for CorsConfig {
1881    fn default() -> Self {
1882        Self {
1883            credentials: true,
1884            origin: vec!["*".to_string()],
1885            methods: vec!["GET".to_string(), "POST".to_string(), "OPTIONS".to_string()],
1886            allowed_headers: vec![
1887                "Authorization".to_string(),
1888                "Content-Type".to_string(),
1889                "X-Requested-With".to_string(),
1890                "Accept".to_string(),
1891            ],
1892        }
1893    }
1894}
1895
1896impl Default for DatabaseConnection {
1897    fn default() -> Self {
1898        Self {
1899            host: "localhost".to_string(),
1900            port: 3306,
1901            username: "root".to_string(),
1902            password: "".to_string(),
1903            database: "sockudo".to_string(),
1904            table_name: "applications".to_string(),
1905            connection_pool_size: 10,
1906            pool_min: None,
1907            pool_max: None,
1908            cache_ttl: 300,
1909            cache_cleanup_interval: 60,
1910            cache_max_capacity: 100,
1911        }
1912    }
1913}
1914
1915impl Default for RedisConnection {
1916    fn default() -> Self {
1917        Self {
1918            host: "127.0.0.1".to_string(),
1919            port: 6379,
1920            db: 0,
1921            username: None,
1922            password: None,
1923            key_prefix: "sockudo:".to_string(),
1924            sentinels: Vec::new(),
1925            sentinel_password: None,
1926            name: "mymaster".to_string(),
1927            cluster: RedisClusterConnection::default(),
1928            cluster_nodes: Vec::new(),
1929        }
1930    }
1931}
1932
1933impl Default for RedisSentinel {
1934    fn default() -> Self {
1935        Self {
1936            host: "localhost".to_string(),
1937            port: 26379,
1938        }
1939    }
1940}
1941
1942impl Default for ClusterNode {
1943    fn default() -> Self {
1944        Self {
1945            host: "127.0.0.1".to_string(),
1946            port: 7000,
1947        }
1948    }
1949}
1950
1951impl Default for DatabasePooling {
1952    fn default() -> Self {
1953        Self {
1954            enabled: true,
1955            min: 2,
1956            max: 10,
1957        }
1958    }
1959}
1960
1961impl Default for EventLimits {
1962    fn default() -> Self {
1963        Self {
1964            max_channels_at_once: 100,
1965            max_name_length: 200,
1966            max_payload_in_kb: 100,
1967            max_batch_size: 10,
1968        }
1969    }
1970}
1971
1972impl Default for IdempotencyConfig {
1973    fn default() -> Self {
1974        Self {
1975            enabled: true,
1976            ttl_seconds: 120,
1977            max_key_length: 128,
1978        }
1979    }
1980}
1981
1982impl Default for EphemeralConfig {
1983    fn default() -> Self {
1984        Self { enabled: true }
1985    }
1986}
1987
1988impl Default for EchoControlConfig {
1989    fn default() -> Self {
1990        Self {
1991            enabled: true,
1992            default_echo_messages: true,
1993        }
1994    }
1995}
1996
1997impl Default for EventNameFilteringConfig {
1998    fn default() -> Self {
1999        Self {
2000            enabled: true,
2001            max_events_per_filter: 50,
2002            max_event_name_length: 200,
2003        }
2004    }
2005}
2006
2007impl Default for ConnectionRecoveryConfig {
2008    fn default() -> Self {
2009        Self {
2010            enabled: false,
2011            buffer_ttl_seconds: 120,
2012            max_buffer_size: 100,
2013        }
2014    }
2015}
2016
2017impl Default for HttpApiConfig {
2018    fn default() -> Self {
2019        Self {
2020            request_limit_in_mb: 10,
2021            accept_traffic: AcceptTraffic::default(),
2022            usage_enabled: true,
2023        }
2024    }
2025}
2026
2027impl Default for AcceptTraffic {
2028    fn default() -> Self {
2029        Self {
2030            memory_threshold: 0.90,
2031        }
2032    }
2033}
2034
2035impl Default for InstanceConfig {
2036    fn default() -> Self {
2037        Self {
2038            process_id: uuid::Uuid::new_v4().to_string(),
2039        }
2040    }
2041}
2042
2043impl Default for LoggingConfig {
2044    fn default() -> Self {
2045        Self {
2046            colors_enabled: true,
2047            include_target: true,
2048        }
2049    }
2050}
2051
2052impl Default for MetricsConfig {
2053    fn default() -> Self {
2054        Self {
2055            enabled: true,
2056            driver: MetricsDriver::default(),
2057            host: "0.0.0.0".to_string(),
2058            prometheus: PrometheusConfig::default(),
2059            port: 9601,
2060        }
2061    }
2062}
2063
2064impl Default for PrometheusConfig {
2065    fn default() -> Self {
2066        Self {
2067            prefix: "sockudo_".to_string(),
2068        }
2069    }
2070}
2071
2072impl Default for PresenceConfig {
2073    fn default() -> Self {
2074        Self {
2075            max_members_per_channel: 100,
2076            max_member_size_in_kb: 2,
2077        }
2078    }
2079}
2080
2081impl Default for RedisQueueConfig {
2082    fn default() -> Self {
2083        Self {
2084            concurrency: 5,
2085            prefix: Some("sockudo_queue:".to_string()),
2086            url_override: None,
2087            cluster_mode: false,
2088        }
2089    }
2090}
2091
2092impl Default for RateLimit {
2093    fn default() -> Self {
2094        Self {
2095            max_requests: 60,
2096            window_seconds: 60,
2097            identifier: Some("default".to_string()),
2098            trust_hops: Some(0),
2099        }
2100    }
2101}
2102
2103impl Default for RateLimiterConfig {
2104    fn default() -> Self {
2105        Self {
2106            enabled: true,
2107            driver: CacheDriver::Memory,
2108            api_rate_limit: RateLimit {
2109                max_requests: 100,
2110                window_seconds: 60,
2111                identifier: Some("api".to_string()),
2112                trust_hops: Some(0),
2113            },
2114            websocket_rate_limit: RateLimit {
2115                max_requests: 20,
2116                window_seconds: 60,
2117                identifier: Some("websocket_connect".to_string()),
2118                trust_hops: Some(0),
2119            },
2120            redis: RedisConfig {
2121                prefix: Some("sockudo_rl:".to_string()),
2122                url_override: None,
2123                cluster_mode: false,
2124            },
2125        }
2126    }
2127}
2128
2129impl Default for SslConfig {
2130    fn default() -> Self {
2131        Self {
2132            enabled: false,
2133            cert_path: "".to_string(),
2134            key_path: "".to_string(),
2135            passphrase: None,
2136            ca_path: None,
2137            redirect_http: false,
2138            http_port: Some(80),
2139        }
2140    }
2141}
2142
2143impl Default for BatchingConfig {
2144    fn default() -> Self {
2145        Self {
2146            enabled: true,
2147            duration: 50,
2148            size: 100,
2149        }
2150    }
2151}
2152
2153impl Default for ClusterHealthConfig {
2154    fn default() -> Self {
2155        Self {
2156            enabled: true,
2157            heartbeat_interval_ms: 10000,
2158            node_timeout_ms: 30000,
2159            cleanup_interval_ms: 10000,
2160        }
2161    }
2162}
2163
2164impl Default for UnixSocketConfig {
2165    fn default() -> Self {
2166        Self {
2167            enabled: false,
2168            path: "/var/run/sockudo/sockudo.sock".to_string(),
2169            permission_mode: 0o660,
2170        }
2171    }
2172}
2173
2174impl Default for DeltaCompressionOptionsConfig {
2175    fn default() -> Self {
2176        Self {
2177            enabled: true,
2178            algorithm: "fossil".to_string(),
2179            full_message_interval: 10,
2180            min_message_size: 100,
2181            max_state_age_secs: 300,
2182            max_channel_states_per_socket: 100,
2183            max_conflation_states_per_channel: Some(100),
2184            conflation_key_path: None,
2185            cluster_coordination: false,
2186            coordination_backend: DeltaCoordinationBackend::Auto,
2187            omit_delta_algorithm: false,
2188        }
2189    }
2190}
2191
2192#[cfg(test)]
2193mod tests {
2194    use super::{DeltaCoordinationBackend, QueueDriver};
2195    use std::str::FromStr;
2196
2197    #[test]
2198    fn queue_driver_parses_broker_backends() {
2199        assert_eq!(
2200            QueueDriver::from_str("rabbitmq").unwrap(),
2201            QueueDriver::RabbitMq
2202        );
2203        assert_eq!(QueueDriver::from_str("kafka").unwrap(), QueueDriver::Kafka);
2204        assert_eq!(
2205            QueueDriver::from_str("pulsar").unwrap(),
2206            QueueDriver::Pulsar
2207        );
2208        assert_eq!(
2209            QueueDriver::from_str("google-pubsub").unwrap(),
2210            QueueDriver::GooglePubSub
2211        );
2212    }
2213
2214    #[test]
2215    fn delta_coordination_backend_parses_expected_values() {
2216        assert_eq!(
2217            DeltaCoordinationBackend::from_str("auto").unwrap(),
2218            DeltaCoordinationBackend::Auto
2219        );
2220        assert_eq!(
2221            DeltaCoordinationBackend::from_str("redis-cluster").unwrap(),
2222            DeltaCoordinationBackend::RedisCluster
2223        );
2224        assert_eq!(
2225            DeltaCoordinationBackend::from_str("nats").unwrap(),
2226            DeltaCoordinationBackend::Nats
2227        );
2228    }
2229}
2230
2231impl ClusterHealthConfig {
2232    pub fn validate(&self) -> Result<(), String> {
2233        if self.heartbeat_interval_ms == 0 {
2234            return Err("heartbeat_interval_ms must be greater than 0".to_string());
2235        }
2236        if self.node_timeout_ms == 0 {
2237            return Err("node_timeout_ms must be greater than 0".to_string());
2238        }
2239        if self.cleanup_interval_ms == 0 {
2240            return Err("cleanup_interval_ms must be greater than 0".to_string());
2241        }
2242
2243        if self.heartbeat_interval_ms > self.node_timeout_ms / 3 {
2244            return Err(format!(
2245                "heartbeat_interval_ms ({}) should be at least 3x smaller than node_timeout_ms ({}) to avoid false positive dead node detection. Recommended: heartbeat_interval_ms <= {}",
2246                self.heartbeat_interval_ms,
2247                self.node_timeout_ms,
2248                self.node_timeout_ms / 3
2249            ));
2250        }
2251
2252        if self.cleanup_interval_ms > self.node_timeout_ms {
2253            return Err(format!(
2254                "cleanup_interval_ms ({}) should not be larger than node_timeout_ms ({}) to ensure timely dead node detection",
2255                self.cleanup_interval_ms, self.node_timeout_ms
2256            ));
2257        }
2258
2259        Ok(())
2260    }
2261}
2262
2263impl ServerOptions {
2264    pub async fn load_from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
2265        let content = tokio::fs::read_to_string(path).await?;
2266        let options: Self = if path.ends_with(".toml") {
2267            toml::from_str(&content)?
2268        } else {
2269            // Legacy JSON support
2270            sonic_rs::from_str(&content)?
2271        };
2272        Ok(options)
2273    }
2274
2275    pub async fn override_from_env(&mut self) -> Result<(), Box<dyn std::error::Error>> {
2276        // --- General Settings ---
2277        if let Ok(mode) = std::env::var("ENVIRONMENT") {
2278            self.mode = mode;
2279        }
2280        self.debug = parse_bool_env("DEBUG_MODE", self.debug);
2281        if parse_bool_env("DEBUG", false) {
2282            self.debug = true;
2283            info!("DEBUG environment variable forces debug mode ON");
2284        }
2285
2286        self.activity_timeout = parse_env::<u64>("ACTIVITY_TIMEOUT", self.activity_timeout);
2287
2288        if let Ok(host) = std::env::var("HOST") {
2289            self.host = host;
2290        }
2291        self.port = parse_env::<u16>("PORT", self.port);
2292        self.shutdown_grace_period =
2293            parse_env::<u64>("SHUTDOWN_GRACE_PERIOD", self.shutdown_grace_period);
2294        self.user_authentication_timeout = parse_env::<u64>(
2295            "USER_AUTHENTICATION_TIMEOUT",
2296            self.user_authentication_timeout,
2297        );
2298        self.websocket_max_payload_kb =
2299            parse_env::<u32>("WEBSOCKET_MAX_PAYLOAD_KB", self.websocket_max_payload_kb);
2300        if let Ok(id) = std::env::var("INSTANCE_PROCESS_ID") {
2301            self.instance.process_id = id;
2302        }
2303
2304        // --- Driver Configuration ---
2305        if let Ok(driver_str) = std::env::var("ADAPTER_DRIVER") {
2306            self.adapter.driver =
2307                parse_driver_enum(driver_str, self.adapter.driver.clone(), "Adapter");
2308        }
2309        self.adapter.buffer_multiplier_per_cpu = parse_env::<usize>(
2310            "ADAPTER_BUFFER_MULTIPLIER_PER_CPU",
2311            self.adapter.buffer_multiplier_per_cpu,
2312        );
2313        self.adapter.enable_socket_counting = parse_env::<bool>(
2314            "ADAPTER_ENABLE_SOCKET_COUNTING",
2315            self.adapter.enable_socket_counting,
2316        );
2317        self.adapter.fallback_to_local =
2318            parse_env::<bool>("ADAPTER_FALLBACK_TO_LOCAL", self.adapter.fallback_to_local);
2319        if let Ok(driver_str) = std::env::var("CACHE_DRIVER") {
2320            self.cache.driver = parse_driver_enum(driver_str, self.cache.driver.clone(), "Cache");
2321        }
2322        if let Ok(driver_str) = std::env::var("QUEUE_DRIVER") {
2323            self.queue.driver = parse_driver_enum(driver_str, self.queue.driver.clone(), "Queue");
2324        }
2325        if let Ok(driver_str) = std::env::var("APP_MANAGER_DRIVER") {
2326            self.app_manager.driver =
2327                parse_driver_enum(driver_str, self.app_manager.driver.clone(), "AppManager");
2328        }
2329        if let Ok(driver_str) = std::env::var("RATE_LIMITER_DRIVER") {
2330            self.rate_limiter.driver = parse_driver_enum(
2331                driver_str,
2332                self.rate_limiter.driver.clone(),
2333                "RateLimiter Backend",
2334            );
2335        }
2336
2337        // --- Database: Redis ---
2338        if let Ok(host) = std::env::var("DATABASE_REDIS_HOST") {
2339            self.database.redis.host = host;
2340        }
2341        self.database.redis.port =
2342            parse_env::<u16>("DATABASE_REDIS_PORT", self.database.redis.port);
2343        if let Ok(username) = std::env::var("DATABASE_REDIS_USERNAME") {
2344            self.database.redis.username = if username.is_empty() {
2345                None
2346            } else {
2347                Some(username)
2348            };
2349        }
2350        if let Ok(password) = std::env::var("DATABASE_REDIS_PASSWORD") {
2351            self.database.redis.password = Some(password);
2352        }
2353        self.database.redis.db = parse_env::<u32>("DATABASE_REDIS_DB", self.database.redis.db);
2354        if let Ok(prefix) = std::env::var("DATABASE_REDIS_KEY_PREFIX") {
2355            self.database.redis.key_prefix = prefix;
2356        }
2357        if let Ok(cluster_username) = std::env::var("DATABASE_REDIS_CLUSTER_USERNAME") {
2358            self.database.redis.cluster.username = if cluster_username.is_empty() {
2359                None
2360            } else {
2361                Some(cluster_username)
2362            };
2363        }
2364        if let Ok(cluster_password) = std::env::var("DATABASE_REDIS_CLUSTER_PASSWORD") {
2365            self.database.redis.cluster.password = Some(cluster_password);
2366        }
2367        self.database.redis.cluster.use_tls = parse_bool_env(
2368            "DATABASE_REDIS_CLUSTER_USE_TLS",
2369            self.database.redis.cluster.use_tls,
2370        );
2371
2372        // --- Database: MySQL ---
2373        if let Ok(host) = std::env::var("DATABASE_MYSQL_HOST") {
2374            self.database.mysql.host = host;
2375        }
2376        self.database.mysql.port =
2377            parse_env::<u16>("DATABASE_MYSQL_PORT", self.database.mysql.port);
2378        if let Ok(user) = std::env::var("DATABASE_MYSQL_USERNAME") {
2379            self.database.mysql.username = user;
2380        }
2381        if let Ok(pass) = std::env::var("DATABASE_MYSQL_PASSWORD") {
2382            self.database.mysql.password = pass;
2383        }
2384        if let Ok(db) = std::env::var("DATABASE_MYSQL_DATABASE") {
2385            self.database.mysql.database = db;
2386        }
2387        if let Ok(table) = std::env::var("DATABASE_MYSQL_TABLE_NAME") {
2388            self.database.mysql.table_name = table;
2389        }
2390        override_db_pool_settings(&mut self.database.mysql, "DATABASE_MYSQL");
2391
2392        // --- Database: PostgreSQL ---
2393        if let Ok(host) = std::env::var("DATABASE_POSTGRES_HOST") {
2394            self.database.postgres.host = host;
2395        }
2396        self.database.postgres.port =
2397            parse_env::<u16>("DATABASE_POSTGRES_PORT", self.database.postgres.port);
2398        if let Ok(user) = std::env::var("DATABASE_POSTGRES_USERNAME") {
2399            self.database.postgres.username = user;
2400        }
2401        if let Ok(pass) = std::env::var("DATABASE_POSTGRES_PASSWORD") {
2402            self.database.postgres.password = pass;
2403        }
2404        if let Ok(db) = std::env::var("DATABASE_POSTGRES_DATABASE") {
2405            self.database.postgres.database = db;
2406        }
2407        override_db_pool_settings(&mut self.database.postgres, "DATABASE_POSTGRES");
2408
2409        // --- Database: DynamoDB ---
2410        if let Ok(region) = std::env::var("DATABASE_DYNAMODB_REGION") {
2411            self.database.dynamodb.region = region;
2412        }
2413        if let Ok(table) = std::env::var("DATABASE_DYNAMODB_TABLE_NAME") {
2414            self.database.dynamodb.table_name = table;
2415        }
2416        if let Ok(endpoint) = std::env::var("DATABASE_DYNAMODB_ENDPOINT_URL") {
2417            self.database.dynamodb.endpoint_url = Some(endpoint);
2418        }
2419        if let Ok(key_id) = std::env::var("AWS_ACCESS_KEY_ID") {
2420            self.database.dynamodb.aws_access_key_id = Some(key_id);
2421        }
2422        if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") {
2423            self.database.dynamodb.aws_secret_access_key = Some(secret);
2424        }
2425
2426        // --- Database: SurrealDB ---
2427        if let Ok(url) = std::env::var("DATABASE_SURREALDB_URL") {
2428            self.database.surrealdb.url = url;
2429        }
2430        if let Ok(namespace) = std::env::var("DATABASE_SURREALDB_NAMESPACE") {
2431            self.database.surrealdb.namespace = namespace;
2432        }
2433        if let Ok(database) = std::env::var("DATABASE_SURREALDB_DATABASE") {
2434            self.database.surrealdb.database = database;
2435        }
2436        if let Ok(username) = std::env::var("DATABASE_SURREALDB_USERNAME") {
2437            self.database.surrealdb.username = username;
2438        }
2439        if let Ok(password) = std::env::var("DATABASE_SURREALDB_PASSWORD") {
2440            self.database.surrealdb.password = password;
2441        }
2442        if let Ok(table) = std::env::var("DATABASE_SURREALDB_TABLE_NAME") {
2443            self.database.surrealdb.table_name = table;
2444        }
2445
2446        // --- Redis Cluster ---
2447        let apply_redis_cluster_nodes = |options: &mut Self, nodes: &str| {
2448            let node_list: Vec<String> = nodes
2449                .split(',')
2450                .map(|s| s.trim())
2451                .filter(|s| !s.is_empty())
2452                .map(ToString::to_string)
2453                .collect();
2454
2455            options.adapter.cluster.nodes = node_list.clone();
2456            options.queue.redis_cluster.nodes = node_list.clone();
2457
2458            let parsed_nodes: Vec<ClusterNode> = node_list
2459                .iter()
2460                .filter_map(|seed| ClusterNode::from_seed(seed))
2461                .collect();
2462            options.database.redis.cluster.nodes = parsed_nodes.clone();
2463            options.database.redis.cluster_nodes = parsed_nodes;
2464        };
2465
2466        if let Ok(nodes) = std::env::var("REDIS_CLUSTER_NODES") {
2467            apply_redis_cluster_nodes(self, &nodes);
2468        }
2469        if let Ok(nodes) = std::env::var("DATABASE_REDIS_CLUSTER_NODES") {
2470            apply_redis_cluster_nodes(self, &nodes);
2471        }
2472        self.queue.redis_cluster.concurrency = parse_env::<u32>(
2473            "REDIS_CLUSTER_QUEUE_CONCURRENCY",
2474            self.queue.redis_cluster.concurrency,
2475        );
2476        if let Ok(prefix) = std::env::var("REDIS_CLUSTER_QUEUE_PREFIX") {
2477            self.queue.redis_cluster.prefix = Some(prefix);
2478        }
2479
2480        // --- SSL Configuration ---
2481        self.ssl.enabled = parse_bool_env("SSL_ENABLED", self.ssl.enabled);
2482        if let Ok(val) = std::env::var("SSL_CERT_PATH") {
2483            self.ssl.cert_path = val;
2484        }
2485        if let Ok(val) = std::env::var("SSL_KEY_PATH") {
2486            self.ssl.key_path = val;
2487        }
2488        self.ssl.redirect_http = parse_bool_env("SSL_REDIRECT_HTTP", self.ssl.redirect_http);
2489        if let Some(port) = parse_env_optional::<u16>("SSL_HTTP_PORT") {
2490            self.ssl.http_port = Some(port);
2491        }
2492
2493        // --- Unix Socket Configuration ---
2494        self.unix_socket.enabled = parse_bool_env("UNIX_SOCKET_ENABLED", self.unix_socket.enabled);
2495        if let Ok(path) = std::env::var("UNIX_SOCKET_PATH") {
2496            self.unix_socket.path = path;
2497        }
2498        if let Ok(mode_str) = std::env::var("UNIX_SOCKET_PERMISSION_MODE") {
2499            if mode_str.chars().all(|c| c.is_digit(8)) {
2500                if let Ok(mode) = u32::from_str_radix(&mode_str, 8) {
2501                    if mode <= 0o777 {
2502                        self.unix_socket.permission_mode = mode;
2503                    } else {
2504                        warn!(
2505                            "UNIX_SOCKET_PERMISSION_MODE '{}' exceeds maximum value 777. Using default: {:o}",
2506                            mode_str, self.unix_socket.permission_mode
2507                        );
2508                    }
2509                } else {
2510                    warn!(
2511                        "Failed to parse UNIX_SOCKET_PERMISSION_MODE '{}' as octal. Using default: {:o}",
2512                        mode_str, self.unix_socket.permission_mode
2513                    );
2514                }
2515            } else {
2516                warn!(
2517                    "UNIX_SOCKET_PERMISSION_MODE '{}' must contain only octal digits (0-7). Using default: {:o}",
2518                    mode_str, self.unix_socket.permission_mode
2519                );
2520            }
2521        }
2522
2523        // --- Metrics ---
2524        if let Ok(driver_str) = std::env::var("METRICS_DRIVER") {
2525            self.metrics.driver =
2526                parse_driver_enum(driver_str, self.metrics.driver.clone(), "Metrics");
2527        }
2528        self.metrics.enabled = parse_bool_env("METRICS_ENABLED", self.metrics.enabled);
2529        if let Ok(val) = std::env::var("METRICS_HOST") {
2530            self.metrics.host = val;
2531        }
2532        self.metrics.port = parse_env::<u16>("METRICS_PORT", self.metrics.port);
2533        if let Ok(val) = std::env::var("METRICS_PROMETHEUS_PREFIX") {
2534            self.metrics.prometheus.prefix = val;
2535        }
2536
2537        // --- HTTP API ---
2538        self.http_api.usage_enabled =
2539            parse_bool_env("HTTP_API_USAGE_ENABLED", self.http_api.usage_enabled);
2540
2541        // --- Rate Limiter ---
2542        self.rate_limiter.enabled =
2543            parse_bool_env("RATE_LIMITER_ENABLED", self.rate_limiter.enabled);
2544        self.rate_limiter.api_rate_limit.max_requests = parse_env::<u32>(
2545            "RATE_LIMITER_API_MAX_REQUESTS",
2546            self.rate_limiter.api_rate_limit.max_requests,
2547        );
2548        self.rate_limiter.api_rate_limit.window_seconds = parse_env::<u64>(
2549            "RATE_LIMITER_API_WINDOW_SECONDS",
2550            self.rate_limiter.api_rate_limit.window_seconds,
2551        );
2552        if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_API_TRUST_HOPS") {
2553            self.rate_limiter.api_rate_limit.trust_hops = Some(hops);
2554        }
2555        self.rate_limiter.websocket_rate_limit.max_requests = parse_env::<u32>(
2556            "RATE_LIMITER_WS_MAX_REQUESTS",
2557            self.rate_limiter.websocket_rate_limit.max_requests,
2558        );
2559        self.rate_limiter.websocket_rate_limit.window_seconds = parse_env::<u64>(
2560            "RATE_LIMITER_WS_WINDOW_SECONDS",
2561            self.rate_limiter.websocket_rate_limit.window_seconds,
2562        );
2563        if let Ok(prefix) = std::env::var("RATE_LIMITER_REDIS_PREFIX") {
2564            self.rate_limiter.redis.prefix = Some(prefix);
2565        }
2566
2567        // --- Queue: Redis ---
2568        self.queue.redis.concurrency =
2569            parse_env::<u32>("QUEUE_REDIS_CONCURRENCY", self.queue.redis.concurrency);
2570        if let Ok(prefix) = std::env::var("QUEUE_REDIS_PREFIX") {
2571            self.queue.redis.prefix = Some(prefix);
2572        }
2573
2574        // --- Queue: SQS ---
2575        if let Ok(region) = std::env::var("QUEUE_SQS_REGION") {
2576            self.queue.sqs.region = region;
2577        }
2578        self.queue.sqs.visibility_timeout = parse_env::<i32>(
2579            "QUEUE_SQS_VISIBILITY_TIMEOUT",
2580            self.queue.sqs.visibility_timeout,
2581        );
2582        self.queue.sqs.max_messages =
2583            parse_env::<i32>("QUEUE_SQS_MAX_MESSAGES", self.queue.sqs.max_messages);
2584        self.queue.sqs.wait_time_seconds = parse_env::<i32>(
2585            "QUEUE_SQS_WAIT_TIME_SECONDS",
2586            self.queue.sqs.wait_time_seconds,
2587        );
2588        self.queue.sqs.concurrency =
2589            parse_env::<u32>("QUEUE_SQS_CONCURRENCY", self.queue.sqs.concurrency);
2590        self.queue.sqs.fifo = parse_bool_env("QUEUE_SQS_FIFO", self.queue.sqs.fifo);
2591        if let Ok(endpoint) = std::env::var("QUEUE_SQS_ENDPOINT_URL") {
2592            self.queue.sqs.endpoint_url = Some(endpoint);
2593        }
2594
2595        // --- Queue: SNS ---
2596        if let Ok(region) = std::env::var("QUEUE_SNS_REGION") {
2597            self.queue.sns.region = region;
2598        }
2599        if let Ok(topic_arn) = std::env::var("QUEUE_SNS_TOPIC_ARN") {
2600            self.queue.sns.topic_arn = topic_arn;
2601        }
2602        if let Ok(endpoint) = std::env::var("QUEUE_SNS_ENDPOINT_URL") {
2603            self.queue.sns.endpoint_url = Some(endpoint);
2604        }
2605
2606        // --- Webhooks ---
2607        self.webhooks.batching.enabled =
2608            parse_bool_env("WEBHOOK_BATCHING_ENABLED", self.webhooks.batching.enabled);
2609        self.webhooks.batching.duration =
2610            parse_env::<u64>("WEBHOOK_BATCHING_DURATION", self.webhooks.batching.duration);
2611        self.webhooks.batching.size =
2612            parse_env::<usize>("WEBHOOK_BATCHING_SIZE", self.webhooks.batching.size);
2613
2614        // --- NATS Adapter ---
2615        if let Ok(servers) = std::env::var("NATS_SERVERS") {
2616            self.adapter.nats.servers = servers.split(',').map(|s| s.trim().to_string()).collect();
2617        }
2618        if let Ok(user) = std::env::var("NATS_USERNAME") {
2619            self.adapter.nats.username = Some(user);
2620        }
2621        if let Ok(pass) = std::env::var("NATS_PASSWORD") {
2622            self.adapter.nats.password = Some(pass);
2623        }
2624        if let Ok(token) = std::env::var("NATS_TOKEN") {
2625            self.adapter.nats.token = Some(token);
2626        }
2627        if let Ok(prefix) = std::env::var("NATS_PREFIX") {
2628            self.adapter.nats.prefix = prefix;
2629        }
2630        self.adapter.nats.connection_timeout_ms = parse_env::<u64>(
2631            "NATS_CONNECTION_TIMEOUT_MS",
2632            self.adapter.nats.connection_timeout_ms,
2633        );
2634        self.adapter.nats.request_timeout_ms = parse_env::<u64>(
2635            "NATS_REQUEST_TIMEOUT_MS",
2636            self.adapter.nats.request_timeout_ms,
2637        );
2638        self.adapter.nats.discovery_max_wait_ms = parse_env::<u64>(
2639            "NATS_DISCOVERY_MAX_WAIT_MS",
2640            self.adapter.nats.discovery_max_wait_ms,
2641        );
2642        self.adapter.nats.discovery_idle_wait_ms = parse_env::<u64>(
2643            "NATS_DISCOVERY_IDLE_WAIT_MS",
2644            self.adapter.nats.discovery_idle_wait_ms,
2645        );
2646        if let Some(nodes) = parse_env_optional::<u32>("NATS_NODES_NUMBER") {
2647            self.adapter.nats.nodes_number = Some(nodes);
2648        }
2649
2650        // --- Pulsar Adapter ---
2651        if let Ok(url) = std::env::var("PULSAR_URL") {
2652            self.adapter.pulsar.url = url;
2653        }
2654        if let Ok(prefix) = std::env::var("PULSAR_PREFIX") {
2655            self.adapter.pulsar.prefix = prefix;
2656        }
2657        if let Ok(token) = std::env::var("PULSAR_TOKEN") {
2658            self.adapter.pulsar.token = Some(token);
2659        }
2660        self.adapter.pulsar.request_timeout_ms = parse_env::<u64>(
2661            "PULSAR_REQUEST_TIMEOUT_MS",
2662            self.adapter.pulsar.request_timeout_ms,
2663        );
2664        if let Some(nodes) = parse_env_optional::<u32>("PULSAR_NODES_NUMBER") {
2665            self.adapter.pulsar.nodes_number = Some(nodes);
2666        }
2667
2668        // --- RabbitMQ Adapter ---
2669        if let Ok(url) = std::env::var("RABBITMQ_URL") {
2670            self.adapter.rabbitmq.url = url;
2671        }
2672        if let Ok(prefix) = std::env::var("RABBITMQ_PREFIX") {
2673            self.adapter.rabbitmq.prefix = prefix;
2674        }
2675        self.adapter.rabbitmq.connection_timeout_ms = parse_env::<u64>(
2676            "RABBITMQ_CONNECTION_TIMEOUT_MS",
2677            self.adapter.rabbitmq.connection_timeout_ms,
2678        );
2679        self.adapter.rabbitmq.request_timeout_ms = parse_env::<u64>(
2680            "RABBITMQ_REQUEST_TIMEOUT_MS",
2681            self.adapter.rabbitmq.request_timeout_ms,
2682        );
2683        if let Some(nodes) = parse_env_optional::<u32>("RABBITMQ_NODES_NUMBER") {
2684            self.adapter.rabbitmq.nodes_number = Some(nodes);
2685        }
2686
2687        // --- Google Pub/Sub Adapter ---
2688        if let Ok(project_id) = std::env::var("GOOGLE_PUBSUB_PROJECT_ID") {
2689            self.adapter.google_pubsub.project_id = project_id;
2690        }
2691        if let Ok(prefix) = std::env::var("GOOGLE_PUBSUB_PREFIX") {
2692            self.adapter.google_pubsub.prefix = prefix;
2693        }
2694        if let Ok(emulator_host) = std::env::var("PUBSUB_EMULATOR_HOST") {
2695            self.adapter.google_pubsub.emulator_host = Some(emulator_host);
2696        }
2697        self.adapter.google_pubsub.request_timeout_ms = parse_env::<u64>(
2698            "GOOGLE_PUBSUB_REQUEST_TIMEOUT_MS",
2699            self.adapter.google_pubsub.request_timeout_ms,
2700        );
2701        if let Some(nodes) = parse_env_optional::<u32>("GOOGLE_PUBSUB_NODES_NUMBER") {
2702            self.adapter.google_pubsub.nodes_number = Some(nodes);
2703        }
2704
2705        // --- Kafka Adapter ---
2706        if let Ok(brokers) = std::env::var("KAFKA_BROKERS") {
2707            self.adapter.kafka.brokers = brokers.split(',').map(|s| s.trim().to_string()).collect();
2708        }
2709        if let Ok(prefix) = std::env::var("KAFKA_PREFIX") {
2710            self.adapter.kafka.prefix = prefix;
2711        }
2712        if let Ok(protocol) = std::env::var("KAFKA_SECURITY_PROTOCOL") {
2713            self.adapter.kafka.security_protocol = Some(protocol);
2714        }
2715        if let Ok(mechanism) = std::env::var("KAFKA_SASL_MECHANISM") {
2716            self.adapter.kafka.sasl_mechanism = Some(mechanism);
2717        }
2718        if let Ok(username) = std::env::var("KAFKA_SASL_USERNAME") {
2719            self.adapter.kafka.sasl_username = Some(username);
2720        }
2721        if let Ok(password) = std::env::var("KAFKA_SASL_PASSWORD") {
2722            self.adapter.kafka.sasl_password = Some(password);
2723        }
2724        self.adapter.kafka.request_timeout_ms = parse_env::<u64>(
2725            "KAFKA_REQUEST_TIMEOUT_MS",
2726            self.adapter.kafka.request_timeout_ms,
2727        );
2728        if let Some(nodes) = parse_env_optional::<u32>("KAFKA_NODES_NUMBER") {
2729            self.adapter.kafka.nodes_number = Some(nodes);
2730        }
2731
2732        // --- CORS ---
2733        if let Ok(origins) = std::env::var("CORS_ORIGINS") {
2734            let parsed: Vec<String> = origins.split(',').map(|s| s.trim().to_string()).collect();
2735            if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&parsed) {
2736                warn!(
2737                    "CORS_ORIGINS contains invalid patterns: {}. Keeping previous CORS origins.",
2738                    e
2739                );
2740            } else {
2741                self.cors.origin = parsed;
2742            }
2743        }
2744        if let Ok(methods) = std::env::var("CORS_METHODS") {
2745            self.cors.methods = methods.split(',').map(|s| s.trim().to_string()).collect();
2746        }
2747        if let Ok(headers) = std::env::var("CORS_HEADERS") {
2748            self.cors.allowed_headers = headers.split(',').map(|s| s.trim().to_string()).collect();
2749        }
2750        self.cors.credentials = parse_bool_env("CORS_CREDENTIALS", self.cors.credentials);
2751
2752        // --- Performance Tuning ---
2753        self.database_pooling.enabled =
2754            parse_bool_env("DATABASE_POOLING_ENABLED", self.database_pooling.enabled);
2755        if let Some(min) = parse_env_optional::<u32>("DATABASE_POOL_MIN") {
2756            self.database_pooling.min = min;
2757        }
2758        if let Some(max) = parse_env_optional::<u32>("DATABASE_POOL_MAX") {
2759            self.database_pooling.max = max;
2760        }
2761
2762        if let Some(pool_size) = parse_env_optional::<u32>("DATABASE_CONNECTION_POOL_SIZE") {
2763            self.database.mysql.connection_pool_size = pool_size;
2764            self.database.postgres.connection_pool_size = pool_size;
2765        }
2766        if let Some(cache_ttl) = parse_env_optional::<u64>("CACHE_TTL_SECONDS") {
2767            self.app_manager.cache.ttl = cache_ttl;
2768            self.channel_limits.cache_ttl = cache_ttl;
2769            self.database.mysql.cache_ttl = cache_ttl;
2770            self.database.postgres.cache_ttl = cache_ttl;
2771            self.database.surrealdb.cache_ttl = cache_ttl;
2772            self.cache.memory.ttl = cache_ttl;
2773        }
2774        if let Some(cleanup_interval) = parse_env_optional::<u64>("CACHE_CLEANUP_INTERVAL") {
2775            self.database.mysql.cache_cleanup_interval = cleanup_interval;
2776            self.database.postgres.cache_cleanup_interval = cleanup_interval;
2777            self.cache.memory.cleanup_interval = cleanup_interval;
2778        }
2779        if let Some(max_capacity) = parse_env_optional::<u64>("CACHE_MAX_CAPACITY") {
2780            self.database.mysql.cache_max_capacity = max_capacity;
2781            self.database.postgres.cache_max_capacity = max_capacity;
2782            self.database.surrealdb.cache_max_capacity = max_capacity;
2783            self.cache.memory.max_capacity = max_capacity;
2784        }
2785
2786        let default_app_id = std::env::var("SOCKUDO_DEFAULT_APP_ID");
2787        let default_app_key = std::env::var("SOCKUDO_DEFAULT_APP_KEY");
2788        let default_app_secret = std::env::var("SOCKUDO_DEFAULT_APP_SECRET");
2789        let default_app_enabled = parse_bool_env("SOCKUDO_DEFAULT_APP_ENABLED", true);
2790
2791        if let (Ok(app_id), Ok(app_key), Ok(app_secret)) =
2792            (default_app_id, default_app_key, default_app_secret)
2793            && default_app_enabled
2794        {
2795            let default_app = App::from_policy(
2796                app_id,
2797                app_key,
2798                app_secret,
2799                default_app_enabled,
2800                crate::app::AppPolicy {
2801                    limits: crate::app::AppLimitsPolicy {
2802                        max_connections: parse_env::<u32>(
2803                            "SOCKUDO_DEFAULT_APP_MAX_CONNECTIONS",
2804                            100,
2805                        ),
2806                        max_backend_events_per_second: Some(parse_env::<u32>(
2807                            "SOCKUDO_DEFAULT_APP_MAX_BACKEND_EVENTS_PER_SECOND",
2808                            100,
2809                        )),
2810                        max_client_events_per_second: parse_env::<u32>(
2811                            "SOCKUDO_DEFAULT_APP_MAX_CLIENT_EVENTS_PER_SECOND",
2812                            100,
2813                        ),
2814                        max_read_requests_per_second: Some(parse_env::<u32>(
2815                            "SOCKUDO_DEFAULT_APP_MAX_READ_REQUESTS_PER_SECOND",
2816                            100,
2817                        )),
2818                        max_presence_members_per_channel: Some(parse_env::<u32>(
2819                            "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBERS_PER_CHANNEL",
2820                            100,
2821                        )),
2822                        max_presence_member_size_in_kb: Some(parse_env::<u32>(
2823                            "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBER_SIZE_IN_KB",
2824                            100,
2825                        )),
2826                        max_channel_name_length: Some(parse_env::<u32>(
2827                            "SOCKUDO_DEFAULT_APP_MAX_CHANNEL_NAME_LENGTH",
2828                            100,
2829                        )),
2830                        max_event_channels_at_once: Some(parse_env::<u32>(
2831                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_CHANNELS_AT_ONCE",
2832                            100,
2833                        )),
2834                        max_event_name_length: Some(parse_env::<u32>(
2835                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_NAME_LENGTH",
2836                            100,
2837                        )),
2838                        max_event_payload_in_kb: Some(parse_env::<u32>(
2839                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_PAYLOAD_IN_KB",
2840                            100,
2841                        )),
2842                        max_event_batch_size: Some(parse_env::<u32>(
2843                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_BATCH_SIZE",
2844                            100,
2845                        )),
2846                        decay_seconds: None,
2847                        terminate_on_limit: false,
2848                        message_rate_limit: None,
2849                    },
2850                    features: crate::app::AppFeaturesPolicy {
2851                        enable_client_messages: std::env::var(
2852                            "SOCKUDO_DEFAULT_APP_ENABLE_CLIENT_MESSAGES",
2853                        )
2854                        .ok()
2855                        .map(|value| {
2856                            matches!(
2857                                value.trim().to_ascii_lowercase().as_str(),
2858                                "true" | "1" | "yes" | "on"
2859                            )
2860                        })
2861                        .unwrap_or_else(|| parse_bool_env("SOCKUDO_ENABLE_CLIENT_MESSAGES", false)),
2862                        enable_user_authentication: Some(parse_bool_env(
2863                            "SOCKUDO_DEFAULT_APP_ENABLE_USER_AUTHENTICATION",
2864                            false,
2865                        )),
2866                        enable_watchlist_events: Some(parse_bool_env(
2867                            "SOCKUDO_DEFAULT_APP_ENABLE_WATCHLIST_EVENTS",
2868                            false,
2869                        )),
2870                    },
2871                    channels: crate::app::AppChannelsPolicy {
2872                        allowed_origins: {
2873                            if let Ok(origins_str) =
2874                                std::env::var("SOCKUDO_DEFAULT_APP_ALLOWED_ORIGINS")
2875                            {
2876                                if !origins_str.is_empty() {
2877                                    Some(
2878                                        origins_str
2879                                            .split(',')
2880                                            .map(|s| s.trim().to_string())
2881                                            .collect(),
2882                                    )
2883                                } else {
2884                                    None
2885                                }
2886                            } else {
2887                                None
2888                            }
2889                        },
2890                        channel_delta_compression: None,
2891                        channel_namespaces: None,
2892                    },
2893                    webhooks: None,
2894                    idempotency: None,
2895                    connection_recovery: None,
2896                    history: None,
2897                    presence_history: None,
2898                },
2899            );
2900
2901            self.app_manager.array.apps.push(default_app);
2902            info!("Successfully registered default app from env");
2903        }
2904
2905        // Special handling for REDIS_URL
2906        if let Ok(redis_url_env) = std::env::var("REDIS_URL") {
2907            info!("Applying REDIS_URL environment variable override");
2908
2909            let redis_url_json = sonic_rs::json!(redis_url_env);
2910
2911            self.adapter
2912                .redis
2913                .redis_pub_options
2914                .insert("url".to_string(), redis_url_json.clone());
2915            self.adapter
2916                .redis
2917                .redis_sub_options
2918                .insert("url".to_string(), redis_url_json);
2919
2920            self.cache.redis.url_override = Some(redis_url_env.clone());
2921            self.queue.redis.url_override = Some(redis_url_env.clone());
2922            self.rate_limiter.redis.url_override = Some(redis_url_env);
2923        }
2924
2925        // --- Logging Configuration ---
2926        let has_colors_env = std::env::var("LOG_COLORS_ENABLED").is_ok();
2927        let has_target_env = std::env::var("LOG_INCLUDE_TARGET").is_ok();
2928        if has_colors_env || has_target_env {
2929            let logging_config = self.logging.get_or_insert_with(Default::default);
2930            if has_colors_env {
2931                logging_config.colors_enabled =
2932                    parse_bool_env("LOG_COLORS_ENABLED", logging_config.colors_enabled);
2933            }
2934            if has_target_env {
2935                logging_config.include_target =
2936                    parse_bool_env("LOG_INCLUDE_TARGET", logging_config.include_target);
2937            }
2938        }
2939
2940        // --- Cleanup Configuration ---
2941        self.cleanup.async_enabled =
2942            parse_bool_env("CLEANUP_ASYNC_ENABLED", self.cleanup.async_enabled);
2943        self.cleanup.fallback_to_sync =
2944            parse_bool_env("CLEANUP_FALLBACK_TO_SYNC", self.cleanup.fallback_to_sync);
2945        self.cleanup.queue_buffer_size =
2946            parse_env::<usize>("CLEANUP_QUEUE_BUFFER_SIZE", self.cleanup.queue_buffer_size);
2947        self.cleanup.batch_size = parse_env::<usize>("CLEANUP_BATCH_SIZE", self.cleanup.batch_size);
2948        self.cleanup.batch_timeout_ms =
2949            parse_env::<u64>("CLEANUP_BATCH_TIMEOUT_MS", self.cleanup.batch_timeout_ms);
2950        if let Ok(worker_threads_str) = std::env::var("CLEANUP_WORKER_THREADS") {
2951            self.cleanup.worker_threads = if worker_threads_str.to_lowercase() == "auto" {
2952                WorkerThreadsConfig::Auto
2953            } else if let Ok(n) = worker_threads_str.parse::<usize>() {
2954                WorkerThreadsConfig::Fixed(n)
2955            } else {
2956                warn!(
2957                    "Invalid CLEANUP_WORKER_THREADS value '{}', keeping current setting",
2958                    worker_threads_str
2959                );
2960                self.cleanup.worker_threads.clone()
2961            };
2962        }
2963        self.cleanup.max_retry_attempts = parse_env::<u32>(
2964            "CLEANUP_MAX_RETRY_ATTEMPTS",
2965            self.cleanup.max_retry_attempts,
2966        );
2967
2968        // Cluster health configuration
2969        self.cluster_health.enabled =
2970            parse_bool_env("CLUSTER_HEALTH_ENABLED", self.cluster_health.enabled);
2971        self.cluster_health.heartbeat_interval_ms = parse_env::<u64>(
2972            "CLUSTER_HEALTH_HEARTBEAT_INTERVAL",
2973            self.cluster_health.heartbeat_interval_ms,
2974        );
2975        self.cluster_health.node_timeout_ms = parse_env::<u64>(
2976            "CLUSTER_HEALTH_NODE_TIMEOUT",
2977            self.cluster_health.node_timeout_ms,
2978        );
2979        self.cluster_health.cleanup_interval_ms = parse_env::<u64>(
2980            "CLUSTER_HEALTH_CLEANUP_INTERVAL",
2981            self.cluster_health.cleanup_interval_ms,
2982        );
2983
2984        // Tag filtering configuration
2985        self.tag_filtering.enabled =
2986            parse_bool_env("TAG_FILTERING_ENABLED", self.tag_filtering.enabled);
2987
2988        // WebSocket buffer configuration
2989        if let Ok(val) = std::env::var("WEBSOCKET_MAX_MESSAGES") {
2990            if val.to_lowercase() == "none" || val == "0" {
2991                self.websocket.max_messages = None;
2992            } else if let Ok(n) = val.parse::<usize>() {
2993                self.websocket.max_messages = Some(n);
2994            }
2995        }
2996        if let Ok(val) = std::env::var("WEBSOCKET_MAX_BYTES") {
2997            if val.to_lowercase() == "none" || val == "0" {
2998                self.websocket.max_bytes = None;
2999            } else if let Ok(n) = val.parse::<usize>() {
3000                self.websocket.max_bytes = Some(n);
3001            }
3002        }
3003        self.websocket.disconnect_on_buffer_full = parse_bool_env(
3004            "WEBSOCKET_DISCONNECT_ON_BUFFER_FULL",
3005            self.websocket.disconnect_on_buffer_full,
3006        );
3007        self.websocket.max_message_size = parse_env::<usize>(
3008            "WEBSOCKET_MAX_MESSAGE_SIZE",
3009            self.websocket.max_message_size,
3010        );
3011        self.websocket.max_frame_size =
3012            parse_env::<usize>("WEBSOCKET_MAX_FRAME_SIZE", self.websocket.max_frame_size);
3013        self.websocket.write_buffer_size = parse_env::<usize>(
3014            "WEBSOCKET_WRITE_BUFFER_SIZE",
3015            self.websocket.write_buffer_size,
3016        );
3017        self.websocket.max_backpressure = parse_env::<usize>(
3018            "WEBSOCKET_MAX_BACKPRESSURE",
3019            self.websocket.max_backpressure,
3020        );
3021        self.websocket.auto_ping = parse_bool_env("WEBSOCKET_AUTO_PING", self.websocket.auto_ping);
3022        self.websocket.ping_interval =
3023            parse_env::<u32>("WEBSOCKET_PING_INTERVAL", self.websocket.ping_interval);
3024        self.websocket.idle_timeout =
3025            parse_env::<u32>("WEBSOCKET_IDLE_TIMEOUT", self.websocket.idle_timeout);
3026        if let Ok(mode) = std::env::var("WEBSOCKET_COMPRESSION") {
3027            self.websocket.compression = mode;
3028        }
3029
3030        // Connection recovery (includes serial + message_id + replay buffer)
3031        self.connection_recovery.enabled = parse_bool_env(
3032            "CONNECTION_RECOVERY_ENABLED",
3033            self.connection_recovery.enabled,
3034        );
3035        self.connection_recovery.buffer_ttl_seconds = parse_env::<u64>(
3036            "CONNECTION_RECOVERY_BUFFER_TTL",
3037            self.connection_recovery.buffer_ttl_seconds,
3038        );
3039        self.connection_recovery.max_buffer_size = parse_env::<usize>(
3040            "CONNECTION_RECOVERY_MAX_BUFFER_SIZE",
3041            self.connection_recovery.max_buffer_size,
3042        );
3043
3044        self.history.enabled = parse_bool_env("HISTORY_ENABLED", self.history.enabled);
3045        self.history.rewind_enabled =
3046            parse_bool_env("HISTORY_REWIND_ENABLED", self.history.rewind_enabled);
3047        self.history.retention_window_seconds = parse_env::<u64>(
3048            "HISTORY_RETENTION_WINDOW_SECONDS",
3049            self.history.retention_window_seconds,
3050        );
3051        self.history.max_page_size =
3052            parse_env::<usize>("HISTORY_MAX_PAGE_SIZE", self.history.max_page_size);
3053        self.history.writer_shards =
3054            parse_env::<usize>("HISTORY_WRITER_SHARDS", self.history.writer_shards);
3055        self.history.writer_queue_capacity = parse_env::<usize>(
3056            "HISTORY_WRITER_QUEUE_CAPACITY",
3057            self.history.writer_queue_capacity,
3058        );
3059        if let Ok(backend) = std::env::var("HISTORY_BACKEND") {
3060            self.history.backend = HistoryBackend::from_str(&backend)?;
3061        }
3062        if let Ok(max_messages) = std::env::var("HISTORY_MAX_MESSAGES_PER_CHANNEL") {
3063            self.history.max_messages_per_channel = Some(
3064                max_messages
3065                    .parse::<usize>()
3066                    .map_err(|e| format!("Invalid HISTORY_MAX_MESSAGES_PER_CHANNEL: {e}"))?,
3067            );
3068        }
3069        if let Ok(max_bytes) = std::env::var("HISTORY_MAX_BYTES_PER_CHANNEL") {
3070            self.history.max_bytes_per_channel = Some(
3071                max_bytes
3072                    .parse::<u64>()
3073                    .map_err(|e| format!("Invalid HISTORY_MAX_BYTES_PER_CHANNEL: {e}"))?,
3074            );
3075        }
3076        if let Ok(table_prefix) = std::env::var("HISTORY_POSTGRES_TABLE_PREFIX") {
3077            self.history.postgres.table_prefix = table_prefix;
3078        }
3079        self.history.postgres.write_timeout_ms = parse_env::<u64>(
3080            "HISTORY_POSTGRES_WRITE_TIMEOUT_MS",
3081            self.history.postgres.write_timeout_ms,
3082        );
3083
3084        self.presence_history.enabled =
3085            parse_bool_env("PRESENCE_HISTORY_ENABLED", self.presence_history.enabled);
3086        self.presence_history.retention_window_seconds = parse_env::<u64>(
3087            "PRESENCE_HISTORY_RETENTION_WINDOW_SECONDS",
3088            self.presence_history.retention_window_seconds,
3089        );
3090        self.presence_history.max_page_size = parse_env::<usize>(
3091            "PRESENCE_HISTORY_MAX_PAGE_SIZE",
3092            self.presence_history.max_page_size,
3093        );
3094        if let Ok(max_events) = std::env::var("PRESENCE_HISTORY_MAX_EVENTS_PER_CHANNEL") {
3095            self.presence_history.max_events_per_channel =
3096                Some(max_events.parse::<usize>().map_err(|e| {
3097                    format!("Invalid PRESENCE_HISTORY_MAX_EVENTS_PER_CHANNEL: {e}")
3098                })?);
3099        }
3100        if let Ok(max_bytes) = std::env::var("PRESENCE_HISTORY_MAX_BYTES_PER_CHANNEL") {
3101            self.presence_history.max_bytes_per_channel = Some(
3102                max_bytes
3103                    .parse::<u64>()
3104                    .map_err(|e| format!("Invalid PRESENCE_HISTORY_MAX_BYTES_PER_CHANNEL: {e}"))?,
3105            );
3106        }
3107        self.idempotency.enabled = parse_bool_env("IDEMPOTENCY_ENABLED", self.idempotency.enabled);
3108        self.idempotency.ttl_seconds =
3109            parse_env::<u64>("IDEMPOTENCY_TTL_SECONDS", self.idempotency.ttl_seconds);
3110        self.idempotency.max_key_length = parse_env::<usize>(
3111            "IDEMPOTENCY_MAX_KEY_LENGTH",
3112            self.idempotency.max_key_length,
3113        );
3114
3115        self.ephemeral.enabled = parse_bool_env("EPHEMERAL_ENABLED", self.ephemeral.enabled);
3116
3117        self.echo_control.enabled =
3118            parse_bool_env("ECHO_CONTROL_ENABLED", self.echo_control.enabled);
3119        self.echo_control.default_echo_messages = parse_bool_env(
3120            "ECHO_CONTROL_DEFAULT_ECHO_MESSAGES",
3121            self.echo_control.default_echo_messages,
3122        );
3123
3124        self.event_name_filtering.enabled = parse_bool_env(
3125            "EVENT_NAME_FILTERING_ENABLED",
3126            self.event_name_filtering.enabled,
3127        );
3128        self.event_name_filtering.max_events_per_filter = parse_env::<usize>(
3129            "EVENT_NAME_FILTERING_MAX_EVENTS_PER_FILTER",
3130            self.event_name_filtering.max_events_per_filter,
3131        );
3132        self.event_name_filtering.max_event_name_length = parse_env::<usize>(
3133            "EVENT_NAME_FILTERING_MAX_EVENT_NAME_LENGTH",
3134            self.event_name_filtering.max_event_name_length,
3135        );
3136
3137        Ok(())
3138    }
3139
3140    pub fn validate(&self) -> Result<(), String> {
3141        if self.unix_socket.enabled {
3142            if self.unix_socket.path.is_empty() {
3143                return Err(
3144                    "Unix socket path cannot be empty when Unix socket is enabled".to_string(),
3145                );
3146            }
3147
3148            self.validate_unix_socket_security()?;
3149
3150            if self.ssl.enabled {
3151                tracing::warn!(
3152                    "Both Unix socket and SSL are enabled. This is unusual as Unix sockets are typically used behind reverse proxies that handle SSL termination."
3153                );
3154            }
3155
3156            if self.unix_socket.permission_mode > 0o777 {
3157                return Err(format!(
3158                    "Unix socket permission_mode ({:o}) is invalid. Must be a valid octal mode (0o000 to 0o777)",
3159                    self.unix_socket.permission_mode
3160                ));
3161            }
3162        }
3163
3164        if let Err(e) = self.cleanup.validate() {
3165            return Err(format!("Invalid cleanup configuration: {}", e));
3166        }
3167
3168        if self.history.enabled {
3169            if self.history.max_page_size == 0 {
3170                return Err("history.max_page_size must be greater than 0".to_string());
3171            }
3172            if self.history.writer_shards == 0 {
3173                return Err("history.writer_shards must be greater than 0".to_string());
3174            }
3175            if self.history.writer_queue_capacity == 0 {
3176                return Err("history.writer_queue_capacity must be greater than 0".to_string());
3177            }
3178            if self.history.retention_window_seconds == 0 {
3179                return Err("history.retention_window_seconds must be greater than 0".to_string());
3180            }
3181            if self.history.postgres.table_prefix.trim().is_empty() {
3182                return Err("history.postgres.table_prefix must not be empty".to_string());
3183            }
3184        }
3185
3186        if self.presence_history.enabled {
3187            if self.presence_history.max_page_size == 0 {
3188                return Err("presence_history.max_page_size must be greater than 0".to_string());
3189            }
3190            if self.presence_history.retention_window_seconds == 0 {
3191                return Err(
3192                    "presence_history.retention_window_seconds must be greater than 0".to_string(),
3193                );
3194            }
3195        }
3196
3197        Ok(())
3198    }
3199
3200    fn validate_unix_socket_security(&self) -> Result<(), String> {
3201        let path = &self.unix_socket.path;
3202
3203        if path.contains("../") || path.contains("..\\") {
3204            return Err(
3205                "Unix socket path contains directory traversal sequences (../). This is not allowed for security reasons.".to_string()
3206            );
3207        }
3208
3209        if self.unix_socket.permission_mode & 0o002 != 0 {
3210            warn!(
3211                "Unix socket permission mode ({:o}) allows world write access. This may be a security risk. Consider using more restrictive permissions like 0o660 or 0o750.",
3212                self.unix_socket.permission_mode
3213            );
3214        }
3215
3216        if self.unix_socket.permission_mode & 0o007 > 0o005 {
3217            warn!(
3218                "Unix socket permission mode ({:o}) grants write permissions to others. Consider using more restrictive permissions.",
3219                self.unix_socket.permission_mode
3220            );
3221        }
3222
3223        if self.mode == "production" && path.starts_with("/tmp/") {
3224            warn!(
3225                "Unix socket path '{}' is in /tmp directory. In production, consider using a more permanent location like /var/run/sockudo/ for better security and persistence.",
3226                path
3227            );
3228        }
3229
3230        if !path.starts_with('/') {
3231            return Err(
3232                "Unix socket path must be absolute (start with /) for security and reliability."
3233                    .to_string(),
3234            );
3235        }
3236
3237        Ok(())
3238    }
3239}
3240
3241#[cfg(test)]
3242mod redis_connection_tests {
3243    use super::{ClusterNode, RedisClusterConnection, RedisConnection, RedisSentinel};
3244
3245    #[test]
3246    fn test_standard_url_basic() {
3247        let conn = RedisConnection {
3248            host: "127.0.0.1".to_string(),
3249            port: 6379,
3250            db: 0,
3251            username: None,
3252            password: None,
3253            key_prefix: "sockudo:".to_string(),
3254            sentinels: Vec::new(),
3255            sentinel_password: None,
3256            name: "mymaster".to_string(),
3257            cluster: RedisClusterConnection::default(),
3258            cluster_nodes: Vec::new(),
3259        };
3260        assert_eq!(conn.to_url(), "redis://127.0.0.1:6379/0");
3261    }
3262
3263    #[test]
3264    fn test_standard_url_with_password() {
3265        let conn = RedisConnection {
3266            host: "127.0.0.1".to_string(),
3267            port: 6379,
3268            db: 2,
3269            username: None,
3270            password: Some("secret".to_string()),
3271            key_prefix: "sockudo:".to_string(),
3272            sentinels: Vec::new(),
3273            sentinel_password: None,
3274            name: "mymaster".to_string(),
3275            cluster: RedisClusterConnection::default(),
3276            cluster_nodes: Vec::new(),
3277        };
3278        assert_eq!(conn.to_url(), "redis://:secret@127.0.0.1:6379/2");
3279    }
3280
3281    #[test]
3282    fn test_standard_url_with_username_and_password() {
3283        let conn = RedisConnection {
3284            host: "redis.example.com".to_string(),
3285            port: 6380,
3286            db: 1,
3287            username: Some("admin".to_string()),
3288            password: Some("pass123".to_string()),
3289            key_prefix: "sockudo:".to_string(),
3290            sentinels: Vec::new(),
3291            sentinel_password: None,
3292            name: "mymaster".to_string(),
3293            cluster: RedisClusterConnection::default(),
3294            cluster_nodes: Vec::new(),
3295        };
3296        assert_eq!(
3297            conn.to_url(),
3298            "redis://admin:pass123@redis.example.com:6380/1"
3299        );
3300    }
3301
3302    #[test]
3303    fn test_standard_url_with_special_chars_in_password() {
3304        let conn = RedisConnection {
3305            host: "127.0.0.1".to_string(),
3306            port: 6379,
3307            db: 0,
3308            username: None,
3309            password: Some("pass@word#123".to_string()),
3310            key_prefix: "sockudo:".to_string(),
3311            sentinels: Vec::new(),
3312            sentinel_password: None,
3313            name: "mymaster".to_string(),
3314            cluster: RedisClusterConnection::default(),
3315            cluster_nodes: Vec::new(),
3316        };
3317        assert_eq!(conn.to_url(), "redis://:pass%40word%23123@127.0.0.1:6379/0");
3318    }
3319
3320    #[test]
3321    fn test_is_sentinel_configured_false() {
3322        let conn = RedisConnection::default();
3323        assert!(!conn.is_sentinel_configured());
3324    }
3325
3326    #[test]
3327    fn test_is_sentinel_configured_true() {
3328        let conn = RedisConnection {
3329            sentinels: vec![RedisSentinel {
3330                host: "sentinel1".to_string(),
3331                port: 26379,
3332            }],
3333            ..Default::default()
3334        };
3335        assert!(conn.is_sentinel_configured());
3336    }
3337
3338    #[test]
3339    fn test_sentinel_url_basic() {
3340        let conn = RedisConnection {
3341            host: "127.0.0.1".to_string(),
3342            port: 6379,
3343            db: 0,
3344            username: None,
3345            password: None,
3346            key_prefix: "sockudo:".to_string(),
3347            sentinels: vec![
3348                RedisSentinel {
3349                    host: "sentinel1".to_string(),
3350                    port: 26379,
3351                },
3352                RedisSentinel {
3353                    host: "sentinel2".to_string(),
3354                    port: 26379,
3355                },
3356            ],
3357            sentinel_password: None,
3358            name: "mymaster".to_string(),
3359            cluster: RedisClusterConnection::default(),
3360            cluster_nodes: Vec::new(),
3361        };
3362        assert_eq!(
3363            conn.to_url(),
3364            "redis+sentinel://sentinel1:26379,sentinel2:26379/mymaster/0"
3365        );
3366    }
3367
3368    #[test]
3369    fn test_sentinel_url_with_sentinel_password() {
3370        let conn = RedisConnection {
3371            host: "127.0.0.1".to_string(),
3372            port: 6379,
3373            db: 0,
3374            username: None,
3375            password: None,
3376            key_prefix: "sockudo:".to_string(),
3377            sentinels: vec![RedisSentinel {
3378                host: "sentinel1".to_string(),
3379                port: 26379,
3380            }],
3381            sentinel_password: Some("sentinelpass".to_string()),
3382            name: "mymaster".to_string(),
3383            cluster: RedisClusterConnection::default(),
3384            cluster_nodes: Vec::new(),
3385        };
3386        assert_eq!(
3387            conn.to_url(),
3388            "redis+sentinel://:sentinelpass@sentinel1:26379/mymaster/0"
3389        );
3390    }
3391
3392    #[test]
3393    fn test_sentinel_url_with_master_password() {
3394        let conn = RedisConnection {
3395            host: "127.0.0.1".to_string(),
3396            port: 6379,
3397            db: 1,
3398            username: None,
3399            password: Some("masterpass".to_string()),
3400            key_prefix: "sockudo:".to_string(),
3401            sentinels: vec![RedisSentinel {
3402                host: "sentinel1".to_string(),
3403                port: 26379,
3404            }],
3405            sentinel_password: None,
3406            name: "mymaster".to_string(),
3407            cluster: RedisClusterConnection::default(),
3408            cluster_nodes: Vec::new(),
3409        };
3410        assert_eq!(
3411            conn.to_url(),
3412            "redis+sentinel://sentinel1:26379/mymaster/1?password=masterpass"
3413        );
3414    }
3415
3416    #[test]
3417    fn test_sentinel_url_with_all_auth() {
3418        let conn = RedisConnection {
3419            host: "127.0.0.1".to_string(),
3420            port: 6379,
3421            db: 2,
3422            username: Some("redisuser".to_string()),
3423            password: Some("redispass".to_string()),
3424            key_prefix: "sockudo:".to_string(),
3425            sentinels: vec![
3426                RedisSentinel {
3427                    host: "sentinel1".to_string(),
3428                    port: 26379,
3429                },
3430                RedisSentinel {
3431                    host: "sentinel2".to_string(),
3432                    port: 26380,
3433                },
3434            ],
3435            sentinel_password: Some("sentinelauth".to_string()),
3436            name: "production-master".to_string(),
3437            cluster: RedisClusterConnection::default(),
3438            cluster_nodes: Vec::new(),
3439        };
3440        assert_eq!(
3441            conn.to_url(),
3442            "redis+sentinel://:sentinelauth@sentinel1:26379,sentinel2:26380/production-master/2?password=redispass&username=redisuser"
3443        );
3444    }
3445
3446    #[test]
3447    fn test_sentinel_to_host_port() {
3448        let sentinel = RedisSentinel {
3449            host: "sentinel.example.com".to_string(),
3450            port: 26379,
3451        };
3452        assert_eq!(sentinel.to_host_port(), "sentinel.example.com:26379");
3453    }
3454
3455    #[test]
3456    fn test_cluster_node_urls_with_shared_cluster_auth_and_tls() {
3457        let conn = RedisConnection {
3458            cluster: RedisClusterConnection {
3459                nodes: vec![
3460                    ClusterNode {
3461                        host: "node1.secure-cluster.com".to_string(),
3462                        port: 7000,
3463                    },
3464                    ClusterNode {
3465                        host: "redis://node2.secure-cluster.com:7001".to_string(),
3466                        port: 7001,
3467                    },
3468                    ClusterNode {
3469                        host: "rediss://node3.secure-cluster.com".to_string(),
3470                        port: 7002,
3471                    },
3472                ],
3473                username: None,
3474                password: Some("cluster-secret".to_string()),
3475                use_tls: true,
3476            },
3477            ..Default::default()
3478        };
3479
3480        assert_eq!(
3481            conn.cluster_node_urls(),
3482            vec![
3483                "rediss://:cluster-secret@node1.secure-cluster.com:7000",
3484                "redis://:cluster-secret@node2.secure-cluster.com:7001",
3485                "rediss://:cluster-secret@node3.secure-cluster.com:7002",
3486            ]
3487        );
3488    }
3489
3490    #[test]
3491    fn test_cluster_node_urls_fallback_to_legacy_nodes() {
3492        let conn = RedisConnection {
3493            password: Some("fallback-secret".to_string()),
3494            cluster_nodes: vec![ClusterNode {
3495                host: "legacy-node.example.com".to_string(),
3496                port: 7000,
3497            }],
3498            ..Default::default()
3499        };
3500
3501        assert_eq!(
3502            conn.cluster_node_urls(),
3503            vec!["redis://:fallback-secret@legacy-node.example.com:7000"]
3504        );
3505    }
3506
3507    #[test]
3508    fn test_normalize_cluster_seed_urls() {
3509        let conn = RedisConnection {
3510            cluster: RedisClusterConnection {
3511                nodes: Vec::new(),
3512                username: Some("svc-user".to_string()),
3513                password: Some("svc-pass".to_string()),
3514                use_tls: true,
3515            },
3516            ..Default::default()
3517        };
3518
3519        let seeds = vec![
3520            "node1.example.com:7000".to_string(),
3521            "redis://node2.example.com:7001".to_string(),
3522            "rediss://node3.example.com".to_string(),
3523        ];
3524
3525        assert_eq!(
3526            conn.normalize_cluster_seed_urls(&seeds),
3527            vec![
3528                "rediss://svc-user:svc-pass@node1.example.com:7000",
3529                "redis://svc-user:svc-pass@node2.example.com:7001",
3530                "rediss://svc-user:svc-pass@node3.example.com:6379",
3531            ]
3532        );
3533    }
3534}
3535
3536#[cfg(test)]
3537mod cluster_node_tests {
3538    use super::ClusterNode;
3539
3540    #[test]
3541    fn test_to_url_basic_host() {
3542        let node = ClusterNode {
3543            host: "localhost".to_string(),
3544            port: 6379,
3545        };
3546        assert_eq!(node.to_url(), "redis://localhost:6379");
3547    }
3548
3549    #[test]
3550    fn test_to_url_ip_address() {
3551        let node = ClusterNode {
3552            host: "127.0.0.1".to_string(),
3553            port: 6379,
3554        };
3555        assert_eq!(node.to_url(), "redis://127.0.0.1:6379");
3556    }
3557
3558    #[test]
3559    fn test_to_url_with_redis_protocol() {
3560        let node = ClusterNode {
3561            host: "redis://example.com".to_string(),
3562            port: 6379,
3563        };
3564        assert_eq!(node.to_url(), "redis://example.com:6379");
3565    }
3566
3567    #[test]
3568    fn test_to_url_with_rediss_protocol() {
3569        let node = ClusterNode {
3570            host: "rediss://secure.example.com".to_string(),
3571            port: 6379,
3572        };
3573        assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
3574    }
3575
3576    #[test]
3577    fn test_to_url_with_rediss_protocol_and_port_in_url() {
3578        let node = ClusterNode {
3579            host: "rediss://secure.example.com:7000".to_string(),
3580            port: 6379,
3581        };
3582        assert_eq!(node.to_url(), "rediss://secure.example.com:7000");
3583    }
3584
3585    #[test]
3586    fn test_to_url_with_redis_protocol_and_port_in_url() {
3587        let node = ClusterNode {
3588            host: "redis://example.com:7001".to_string(),
3589            port: 6379,
3590        };
3591        assert_eq!(node.to_url(), "redis://example.com:7001");
3592    }
3593
3594    #[test]
3595    fn test_to_url_with_trailing_whitespace() {
3596        let node = ClusterNode {
3597            host: "  rediss://secure.example.com  ".to_string(),
3598            port: 6379,
3599        };
3600        assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
3601    }
3602
3603    #[test]
3604    fn test_to_url_custom_port() {
3605        let node = ClusterNode {
3606            host: "redis-cluster.example.com".to_string(),
3607            port: 7000,
3608        };
3609        assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7000");
3610    }
3611
3612    #[test]
3613    fn test_to_url_plain_host_with_port_in_host_field() {
3614        let node = ClusterNode {
3615            host: "redis-cluster.example.com:7010".to_string(),
3616            port: 7000,
3617        };
3618        assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7010");
3619    }
3620
3621    #[test]
3622    fn test_to_url_with_options_adds_auth_and_tls() {
3623        let node = ClusterNode {
3624            host: "node.example.com".to_string(),
3625            port: 7000,
3626        };
3627        assert_eq!(
3628            node.to_url_with_options(true, Some("svc-user"), Some("secret")),
3629            "rediss://svc-user:secret@node.example.com:7000"
3630        );
3631    }
3632
3633    #[test]
3634    fn test_to_url_with_options_keeps_embedded_auth() {
3635        let node = ClusterNode {
3636            host: "rediss://:node-secret@node.example.com:7000".to_string(),
3637            port: 7000,
3638        };
3639        assert_eq!(
3640            node.to_url_with_options(true, Some("global-user"), Some("global-secret")),
3641            "rediss://:node-secret@node.example.com:7000"
3642        );
3643    }
3644
3645    #[test]
3646    fn test_from_seed_parses_plain_host_port() {
3647        let node = ClusterNode::from_seed("cluster-node-1:7005").expect("node should parse");
3648        assert_eq!(node.host, "cluster-node-1");
3649        assert_eq!(node.port, 7005);
3650    }
3651
3652    #[test]
3653    fn test_from_seed_keeps_scheme_urls() {
3654        let node =
3655            ClusterNode::from_seed("rediss://secure.example.com:7005").expect("node should parse");
3656        assert_eq!(node.host, "rediss://secure.example.com:7005");
3657        assert_eq!(node.port, 7005);
3658    }
3659
3660    #[test]
3661    fn test_to_url_aws_elasticache_hostname() {
3662        let node = ClusterNode {
3663            host: "rediss://my-cluster.use1.cache.amazonaws.com".to_string(),
3664            port: 6379,
3665        };
3666        assert_eq!(
3667            node.to_url(),
3668            "rediss://my-cluster.use1.cache.amazonaws.com:6379"
3669        );
3670    }
3671
3672    #[test]
3673    fn test_to_url_with_ipv6_no_port() {
3674        let node = ClusterNode {
3675            host: "rediss://[::1]".to_string(),
3676            port: 6379,
3677        };
3678        assert_eq!(node.to_url(), "rediss://[::1]:6379");
3679    }
3680
3681    #[test]
3682    fn test_to_url_with_ipv6_and_port_in_url() {
3683        let node = ClusterNode {
3684            host: "rediss://[::1]:7000".to_string(),
3685            port: 6379,
3686        };
3687        assert_eq!(node.to_url(), "rediss://[::1]:7000");
3688    }
3689
3690    #[test]
3691    fn test_to_url_with_ipv6_full_address_no_port() {
3692        let node = ClusterNode {
3693            host: "rediss://[2001:db8::1]".to_string(),
3694            port: 6379,
3695        };
3696        assert_eq!(node.to_url(), "rediss://[2001:db8::1]:6379");
3697    }
3698
3699    #[test]
3700    fn test_to_url_with_ipv6_full_address_with_port() {
3701        let node = ClusterNode {
3702            host: "rediss://[2001:db8::1]:7000".to_string(),
3703            port: 6379,
3704        };
3705        assert_eq!(node.to_url(), "rediss://[2001:db8::1]:7000");
3706    }
3707
3708    #[test]
3709    fn test_to_url_with_redis_protocol_ipv6() {
3710        let node = ClusterNode {
3711            host: "redis://[::1]".to_string(),
3712            port: 6379,
3713        };
3714        assert_eq!(node.to_url(), "redis://[::1]:6379");
3715    }
3716}
3717
3718#[cfg(test)]
3719mod cors_config_tests {
3720    use super::CorsConfig;
3721
3722    fn cors_from_json(json: &str) -> sonic_rs::Result<CorsConfig> {
3723        sonic_rs::from_str(json)
3724    }
3725
3726    #[test]
3727    fn test_deserialize_valid_exact_origins() {
3728        let config =
3729            cors_from_json(r#"{"origin": ["https://example.com", "https://other.com"]}"#).unwrap();
3730        assert_eq!(config.origin.len(), 2);
3731    }
3732
3733    #[test]
3734    fn test_deserialize_valid_wildcard_patterns() {
3735        let config =
3736            cors_from_json(r#"{"origin": ["*.example.com", "https://*.staging.example.com"]}"#)
3737                .unwrap();
3738        assert_eq!(config.origin.len(), 2);
3739    }
3740
3741    #[test]
3742    fn test_deserialize_allows_special_markers() {
3743        assert!(cors_from_json(r#"{"origin": ["*"]}"#).is_ok());
3744        assert!(cors_from_json(r#"{"origin": ["Any"]}"#).is_ok());
3745        assert!(cors_from_json(r#"{"origin": ["any"]}"#).is_ok());
3746        assert!(cors_from_json(r#"{"origin": ["*", "https://example.com"]}"#).is_ok());
3747    }
3748
3749    #[test]
3750    fn test_deserialize_rejects_invalid_patterns() {
3751        assert!(cors_from_json(r#"{"origin": ["*.*bad"]}"#).is_err());
3752        assert!(cors_from_json(r#"{"origin": ["*."]}"#).is_err());
3753        assert!(cors_from_json(r#"{"origin": [""]}"#).is_err());
3754        assert!(cors_from_json(r#"{"origin": ["https://"]}"#).is_err());
3755    }
3756
3757    #[test]
3758    fn test_deserialize_rejects_mixed_valid_and_invalid() {
3759        assert!(cors_from_json(r#"{"origin": ["https://good.com", "*.*bad"]}"#).is_err());
3760    }
3761}