Skip to main content

sockudo_core/
options.rs

1use crate::app::App;
2use crate::utils::{parse_bool_env, parse_env, parse_env_optional};
3use ahash::AHashMap;
4use serde::{Deserialize, Serialize};
5use std::str::FromStr;
6use tracing::{info, warn};
7use url::Url;
8
9// Custom deserializer for octal permission mode (string format only, like chmod)
10fn deserialize_octal_permission<'de, D>(deserializer: D) -> Result<u32, D::Error>
11where
12    D: serde::Deserializer<'de>,
13{
14    use serde::de::{self};
15
16    let s = String::deserialize(deserializer)?;
17
18    // Validate that the string contains only octal digits
19    if !s.chars().all(|c| c.is_digit(8)) {
20        return Err(de::Error::custom(format!(
21            "invalid octal permission mode '{}': must contain only digits 0-7",
22            s
23        )));
24    }
25
26    // Parse as octal
27    let mode = u32::from_str_radix(&s, 8)
28        .map_err(|_| de::Error::custom(format!("invalid octal permission mode: {}", s)))?;
29
30    // Validate it's within valid Unix permission range
31    if mode > 0o777 {
32        return Err(de::Error::custom(format!(
33            "permission mode '{}' exceeds maximum value 777",
34            s
35        )));
36    }
37
38    Ok(mode)
39}
40
41// Helper function to parse driver enums with fallback behavior (matches main.rs)
42fn parse_driver_enum<T: FromStr + Clone + std::fmt::Debug>(
43    driver_str: String,
44    default_driver: T,
45    driver_name: &str,
46) -> T
47where
48    <T as FromStr>::Err: std::fmt::Debug,
49{
50    match T::from_str(&driver_str.to_lowercase()) {
51        Ok(driver_enum) => driver_enum,
52        Err(e) => {
53            warn!(
54                "Failed to parse {} driver from string '{}': {:?}. Using default: {:?}.",
55                driver_name, driver_str, e, default_driver
56            );
57            default_driver
58        }
59    }
60}
61
62fn override_db_pool_settings(db_conn: &mut DatabaseConnection, prefix: &str) {
63    if let Some(min) = parse_env_optional::<u32>(&format!("{}_POOL_MIN", prefix)) {
64        db_conn.pool_min = Some(min);
65    }
66    if let Some(max) = parse_env_optional::<u32>(&format!("{}_POOL_MAX", prefix)) {
67        db_conn.pool_max = Some(max);
68    }
69}
70
71// --- Enums for Driver Types ---
72
73#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(rename_all = "lowercase")]
75pub enum AdapterDriver {
76    #[default]
77    Local,
78    Redis,
79    #[serde(rename = "redis-cluster")]
80    RedisCluster,
81    Nats,
82    Pulsar,
83    RabbitMq,
84    #[serde(rename = "google-pubsub")]
85    GooglePubSub,
86    Kafka,
87    Iggy,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91#[serde(default)]
92pub struct DynamoDbSettings {
93    pub region: String,
94    pub table_name: String,
95    pub endpoint_url: Option<String>,
96    pub aws_access_key_id: Option<String>,
97    pub aws_secret_access_key: Option<String>,
98    pub aws_profile_name: Option<String>,
99}
100
101impl Default for DynamoDbSettings {
102    fn default() -> Self {
103        Self {
104            region: "us-east-1".to_string(),
105            table_name: "sockudo-applications".to_string(),
106            endpoint_url: None,
107            aws_access_key_id: None,
108            aws_secret_access_key: None,
109            aws_profile_name: None,
110        }
111    }
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
115#[serde(default)]
116pub struct ScyllaDbSettings {
117    pub nodes: Vec<String>,
118    pub keyspace: String,
119    pub table_name: String,
120    pub username: Option<String>,
121    pub password: Option<String>,
122    pub replication_class: String,
123    pub replication_factor: u32,
124}
125
126impl Default for ScyllaDbSettings {
127    fn default() -> Self {
128        Self {
129            nodes: vec!["127.0.0.1:9042".to_string()],
130            keyspace: "sockudo".to_string(),
131            table_name: "applications".to_string(),
132            username: None,
133            password: None,
134            replication_class: "SimpleStrategy".to_string(),
135            replication_factor: 3,
136        }
137    }
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
141#[serde(default)]
142pub struct SurrealDbSettings {
143    pub url: String,
144    pub namespace: String,
145    pub database: String,
146    pub username: String,
147    pub password: String,
148    pub table_name: String,
149    pub cache_ttl: u64,
150    pub cache_max_capacity: u64,
151}
152
153impl Default for SurrealDbSettings {
154    fn default() -> Self {
155        Self {
156            url: "ws://127.0.0.1:8000".to_string(),
157            namespace: "sockudo".to_string(),
158            database: "sockudo".to_string(),
159            username: "root".to_string(),
160            password: "root".to_string(),
161            table_name: "applications".to_string(),
162            cache_ttl: 300,
163            cache_max_capacity: 100,
164        }
165    }
166}
167
168impl FromStr for AdapterDriver {
169    type Err = String;
170    fn from_str(s: &str) -> Result<Self, Self::Err> {
171        match s.to_lowercase().as_str() {
172            "local" => Ok(AdapterDriver::Local),
173            "redis" => Ok(AdapterDriver::Redis),
174            "redis-cluster" => Ok(AdapterDriver::RedisCluster),
175            "nats" => Ok(AdapterDriver::Nats),
176            "pulsar" => Ok(AdapterDriver::Pulsar),
177            "rabbitmq" | "rabbit-mq" => Ok(AdapterDriver::RabbitMq),
178            "google-pubsub" | "gcp-pubsub" | "pubsub" => Ok(AdapterDriver::GooglePubSub),
179            "kafka" => Ok(AdapterDriver::Kafka),
180            "iggy" | "apache-iggy" | "apache_iggy" => Ok(AdapterDriver::Iggy),
181            _ => Err(format!("Unknown adapter driver: {s}")),
182        }
183    }
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
187#[serde(rename_all = "lowercase")]
188pub enum AppManagerDriver {
189    #[default]
190    Memory,
191    Mysql,
192    Dynamodb,
193    PgSql,
194    SurrealDb,
195    ScyllaDb,
196}
197impl FromStr for AppManagerDriver {
198    type Err = String;
199    fn from_str(s: &str) -> Result<Self, Self::Err> {
200        match s.to_lowercase().as_str() {
201            "memory" => Ok(AppManagerDriver::Memory),
202            "mysql" => Ok(AppManagerDriver::Mysql),
203            "dynamodb" => Ok(AppManagerDriver::Dynamodb),
204            "pgsql" | "postgres" | "postgresql" => Ok(AppManagerDriver::PgSql),
205            "surreal" | "surrealdb" => Ok(AppManagerDriver::SurrealDb),
206            "scylladb" | "scylla" => Ok(AppManagerDriver::ScyllaDb),
207            _ => Err(format!("Unknown app manager driver: {s}")),
208        }
209    }
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
213#[serde(rename_all = "lowercase")]
214pub enum CacheDriver {
215    #[default]
216    Memory,
217    Redis,
218    #[serde(rename = "redis-cluster")]
219    RedisCluster,
220    None,
221}
222
223impl FromStr for CacheDriver {
224    type Err = String;
225    fn from_str(s: &str) -> Result<Self, Self::Err> {
226        match s.to_lowercase().as_str() {
227            "memory" => Ok(CacheDriver::Memory),
228            "redis" => Ok(CacheDriver::Redis),
229            "redis-cluster" => Ok(CacheDriver::RedisCluster),
230            "none" => Ok(CacheDriver::None),
231            _ => Err(format!("Unknown cache driver: {s}")),
232        }
233    }
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
237#[serde(rename_all = "lowercase")]
238pub enum QueueDriver {
239    Memory,
240    #[default]
241    Redis,
242    #[serde(rename = "redis-cluster")]
243    RedisCluster,
244    Nats,
245    Pulsar,
246    RabbitMq,
247    #[serde(rename = "google-pubsub")]
248    GooglePubSub,
249    Kafka,
250    Iggy,
251    Sqs,
252    Sns,
253    None,
254}
255
256impl FromStr for QueueDriver {
257    type Err = String;
258    fn from_str(s: &str) -> Result<Self, Self::Err> {
259        match s.to_lowercase().as_str() {
260            "memory" => Ok(QueueDriver::Memory),
261            "redis" => Ok(QueueDriver::Redis),
262            "redis-cluster" => Ok(QueueDriver::RedisCluster),
263            "nats" => Ok(QueueDriver::Nats),
264            "pulsar" => Ok(QueueDriver::Pulsar),
265            "rabbitmq" | "rabbit-mq" => Ok(QueueDriver::RabbitMq),
266            "google-pubsub" | "gcp-pubsub" | "pubsub" => Ok(QueueDriver::GooglePubSub),
267            "kafka" => Ok(QueueDriver::Kafka),
268            "iggy" | "apache-iggy" | "apache_iggy" => Ok(QueueDriver::Iggy),
269            "sqs" => Ok(QueueDriver::Sqs),
270            "sns" => Ok(QueueDriver::Sns),
271            "none" => Ok(QueueDriver::None),
272            _ => Err(format!("Unknown queue driver: {s}")),
273        }
274    }
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
278#[serde(rename_all = "snake_case")]
279pub enum DeltaCoordinationBackend {
280    #[default]
281    Auto,
282    None,
283    Redis,
284    RedisCluster,
285    Nats,
286}
287
288impl FromStr for DeltaCoordinationBackend {
289    type Err = String;
290
291    fn from_str(s: &str) -> Result<Self, Self::Err> {
292        match s.trim().to_ascii_lowercase().as_str() {
293            "auto" => Ok(Self::Auto),
294            "none" => Ok(Self::None),
295            "redis" => Ok(Self::Redis),
296            "redis-cluster" | "redis_cluster" => Ok(Self::RedisCluster),
297            "nats" => Ok(Self::Nats),
298            _ => Err(format!("Unknown delta coordination backend: {s}")),
299        }
300    }
301}
302
303impl AsRef<str> for QueueDriver {
304    fn as_ref(&self) -> &str {
305        match self {
306            QueueDriver::Memory => "memory",
307            QueueDriver::Redis => "redis",
308            QueueDriver::RedisCluster => "redis-cluster",
309            QueueDriver::Nats => "nats",
310            QueueDriver::Pulsar => "pulsar",
311            QueueDriver::RabbitMq => "rabbitmq",
312            QueueDriver::GooglePubSub => "google-pubsub",
313            QueueDriver::Kafka => "kafka",
314            QueueDriver::Iggy => "iggy",
315            QueueDriver::Sqs => "sqs",
316            QueueDriver::Sns => "sns",
317            QueueDriver::None => "none",
318        }
319    }
320}
321
322#[derive(Debug, Clone, Serialize, Deserialize)]
323#[serde(default)]
324pub struct RedisClusterQueueConfig {
325    pub concurrency: u32,
326    pub prefix: Option<String>,
327    pub nodes: Vec<String>,
328    pub request_timeout_ms: u64,
329}
330
331impl Default for RedisClusterQueueConfig {
332    fn default() -> Self {
333        Self {
334            concurrency: 5,
335            prefix: Some("sockudo_queue:".to_string()),
336            nodes: vec!["redis://127.0.0.1:6379".to_string()],
337            request_timeout_ms: 5000,
338        }
339    }
340}
341
342#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
343#[serde(rename_all = "lowercase")]
344pub enum MetricsDriver {
345    #[default]
346    Prometheus,
347}
348
349#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
350#[serde(rename_all = "lowercase")]
351pub enum LogOutputFormat {
352    #[default]
353    Human,
354    Json,
355}
356
357impl FromStr for LogOutputFormat {
358    type Err = String;
359    fn from_str(s: &str) -> Result<Self, Self::Err> {
360        match s.to_lowercase().as_str() {
361            "human" => Ok(LogOutputFormat::Human),
362            "json" => Ok(LogOutputFormat::Json),
363            _ => Err(format!("Unknown log output format: {s}")),
364        }
365    }
366}
367
368impl FromStr for MetricsDriver {
369    type Err = String;
370    fn from_str(s: &str) -> Result<Self, Self::Err> {
371        match s.to_lowercase().as_str() {
372            "prometheus" => Ok(MetricsDriver::Prometheus),
373            _ => Err(format!("Unknown metrics driver: {s}")),
374        }
375    }
376}
377
378impl AsRef<str> for MetricsDriver {
379    fn as_ref(&self) -> &str {
380        match self {
381            MetricsDriver::Prometheus => "prometheus",
382        }
383    }
384}
385
386// --- Main Configuration Struct ---
387#[derive(Debug, Clone, Serialize, Deserialize)]
388#[serde(default)]
389pub struct ServerOptions {
390    pub adapter: AdapterConfig,
391    pub app_manager: AppManagerConfig,
392    pub cache: CacheConfig,
393    pub channel_limits: ChannelLimits,
394    pub cors: CorsConfig,
395    pub database: DatabaseConfig,
396    pub database_pooling: DatabasePooling,
397    pub debug: bool,
398    pub event_limits: EventLimits,
399    pub host: String,
400    pub http_api: HttpApiConfig,
401    pub instance: InstanceConfig,
402    pub logging: Option<LoggingConfig>,
403    pub metrics: MetricsConfig,
404    pub mode: String,
405    pub port: u16,
406    pub path_prefix: String,
407    pub presence: PresenceConfig,
408    pub queue: QueueConfig,
409    pub rate_limiter: RateLimiterConfig,
410    pub shutdown_grace_period: u64,
411    pub ssl: SslConfig,
412    pub user_authentication_timeout: u64,
413    pub webhooks: WebhooksConfig,
414    pub websocket_max_payload_kb: u32,
415    pub cleanup: CleanupConfig,
416    pub activity_timeout: u64,
417    pub cluster_health: ClusterHealthConfig,
418    pub unix_socket: UnixSocketConfig,
419    pub delta_compression: DeltaCompressionOptionsConfig,
420    pub tag_filtering: TagFilteringConfig,
421    pub websocket: WebSocketConfig,
422    pub connection_recovery: ConnectionRecoveryConfig,
423    pub history: HistoryConfig,
424    pub presence_history: PresenceHistoryConfig,
425    pub idempotency: IdempotencyConfig,
426    pub ephemeral: EphemeralConfig,
427    pub echo_control: EchoControlConfig,
428    pub event_name_filtering: EventNameFilteringConfig,
429    pub versioned_messages: VersionedMessagesConfig,
430    pub annotations: AnnotationsConfig,
431}
432
433// --- Configuration Sub-Structs ---
434
435#[derive(Debug, Clone, Serialize, Deserialize)]
436#[serde(default)]
437pub struct SqsQueueConfig {
438    pub region: String,
439    pub queue_url_prefix: Option<String>,
440    pub visibility_timeout: i32,
441    pub endpoint_url: Option<String>,
442    pub max_messages: i32,
443    pub wait_time_seconds: i32,
444    pub concurrency: u32,
445    pub fifo: bool,
446    pub message_group_id: Option<String>,
447}
448
449#[derive(Debug, Clone, Serialize, Deserialize)]
450#[serde(default)]
451pub struct SnsQueueConfig {
452    pub region: String,
453    pub topic_arn: String,
454    pub endpoint_url: Option<String>,
455}
456
457#[derive(Debug, Clone, Serialize, Deserialize)]
458#[serde(default)]
459pub struct AdapterConfig {
460    pub driver: AdapterDriver,
461    pub redis: RedisAdapterConfig,
462    pub cluster: RedisClusterAdapterConfig,
463    pub nats: NatsAdapterConfig,
464    pub pulsar: PulsarAdapterConfig,
465    pub rabbitmq: RabbitMqAdapterConfig,
466    pub google_pubsub: GooglePubSubAdapterConfig,
467    pub kafka: KafkaAdapterConfig,
468    pub iggy: IggyConfig,
469    #[serde(default = "default_buffer_multiplier_per_cpu")]
470    pub buffer_multiplier_per_cpu: usize,
471    pub cluster_health: ClusterHealthConfig,
472    #[serde(default = "default_enable_socket_counting")]
473    pub enable_socket_counting: bool,
474    #[serde(default = "default_fallback_to_local")]
475    pub fallback_to_local: bool,
476}
477
478fn default_enable_socket_counting() -> bool {
479    true
480}
481
482fn default_fallback_to_local() -> bool {
483    true
484}
485
486fn default_buffer_multiplier_per_cpu() -> usize {
487    64
488}
489
490impl Default for AdapterConfig {
491    fn default() -> Self {
492        Self {
493            driver: AdapterDriver::default(),
494            redis: RedisAdapterConfig::default(),
495            cluster: RedisClusterAdapterConfig::default(),
496            nats: NatsAdapterConfig::default(),
497            pulsar: PulsarAdapterConfig::default(),
498            rabbitmq: RabbitMqAdapterConfig::default(),
499            google_pubsub: GooglePubSubAdapterConfig::default(),
500            kafka: KafkaAdapterConfig::default(),
501            iggy: IggyConfig::default(),
502            buffer_multiplier_per_cpu: default_buffer_multiplier_per_cpu(),
503            cluster_health: ClusterHealthConfig::default(),
504            enable_socket_counting: default_enable_socket_counting(),
505            fallback_to_local: default_fallback_to_local(),
506        }
507    }
508}
509
510#[derive(Debug, Clone, Serialize, Deserialize)]
511#[serde(default)]
512pub struct RedisAdapterConfig {
513    pub requests_timeout: u64,
514    pub prefix: String,
515    pub redis_pub_options: AHashMap<String, sonic_rs::Value>,
516    pub redis_sub_options: AHashMap<String, sonic_rs::Value>,
517    pub cluster_mode: bool,
518}
519
520#[derive(Debug, Clone, Serialize, Deserialize)]
521#[serde(default)]
522pub struct RedisClusterAdapterConfig {
523    pub nodes: Vec<String>,
524    pub prefix: String,
525    pub request_timeout_ms: u64,
526    pub use_connection_manager: bool,
527    #[serde(default)]
528    pub use_sharded_pubsub: bool,
529}
530
531#[derive(Debug, Clone, Serialize, Deserialize)]
532#[serde(default)]
533pub struct NatsAdapterConfig {
534    pub servers: Vec<String>,
535    pub prefix: String,
536    pub request_timeout_ms: u64,
537    pub username: Option<String>,
538    pub password: Option<String>,
539    pub token: Option<String>,
540    pub connection_timeout_ms: u64,
541    pub nodes_number: Option<u32>,
542    pub discovery_max_wait_ms: u64,
543    pub discovery_idle_wait_ms: u64,
544}
545
546#[derive(Debug, Clone, Serialize, Deserialize)]
547#[serde(default)]
548pub struct PulsarAdapterConfig {
549    pub url: String,
550    pub prefix: String,
551    pub request_timeout_ms: u64,
552    pub token: Option<String>,
553    pub nodes_number: Option<u32>,
554}
555
556#[derive(Debug, Clone, Serialize, Deserialize)]
557#[serde(default)]
558pub struct RabbitMqAdapterConfig {
559    pub url: String,
560    pub prefix: String,
561    pub request_timeout_ms: u64,
562    pub connection_timeout_ms: u64,
563    pub nodes_number: Option<u32>,
564}
565
566#[derive(Debug, Clone, Serialize, Deserialize)]
567#[serde(default)]
568pub struct GooglePubSubAdapterConfig {
569    pub project_id: String,
570    pub prefix: String,
571    pub request_timeout_ms: u64,
572    pub emulator_host: Option<String>,
573    pub nodes_number: Option<u32>,
574}
575
576#[derive(Debug, Clone, Serialize, Deserialize)]
577#[serde(default)]
578pub struct KafkaAdapterConfig {
579    pub brokers: Vec<String>,
580    pub prefix: String,
581    pub request_timeout_ms: u64,
582    pub security_protocol: Option<String>,
583    pub sasl_mechanism: Option<String>,
584    pub sasl_username: Option<String>,
585    pub sasl_password: Option<String>,
586    pub nodes_number: Option<u32>,
587}
588
589#[derive(Debug, Clone, Serialize, Deserialize)]
590#[serde(default)]
591pub struct IggyConfig {
592    pub connection_string: String,
593    pub username: Option<String>,
594    pub password: Option<String>,
595    pub consumer_name: Option<String>,
596    pub stream: String,
597    pub topic_prefix: String,
598    pub queue_topic_prefix: String,
599    pub consumer_group_prefix: String,
600    pub request_timeout_ms: u64,
601    pub poll_interval_ms: u64,
602    pub poll_batch_size: u32,
603    pub partitions_count: u32,
604    pub partition_id: u32,
605    pub auto_create: bool,
606    pub start_from_latest: bool,
607    pub nodes_number: Option<u32>,
608}
609
610#[derive(Debug, Clone, Serialize, Deserialize, Default)]
611#[serde(default)]
612pub struct AppManagerConfig {
613    pub driver: AppManagerDriver,
614    pub array: ArrayConfig,
615    pub cache: CacheSettings,
616    pub scylladb: ScyllaDbSettings,
617}
618
619#[derive(Debug, Clone, Serialize, Deserialize, Default)]
620#[serde(default)]
621pub struct ArrayConfig {
622    pub apps: Vec<App>,
623}
624
625#[derive(Debug, Clone, Serialize, Deserialize)]
626#[serde(default)]
627pub struct CacheSettings {
628    pub enabled: bool,
629    pub ttl: u64,
630}
631
632#[derive(Debug, Clone, Serialize, Deserialize)]
633#[serde(default)]
634pub struct MemoryCacheOptions {
635    pub ttl: u64,
636    pub cleanup_interval: u64,
637    pub max_capacity: u64,
638}
639
640#[derive(Debug, Clone, Serialize, Deserialize)]
641#[serde(default)]
642pub struct CacheConfig {
643    pub driver: CacheDriver,
644    pub redis: RedisConfig,
645    pub memory: MemoryCacheOptions,
646}
647
648#[derive(Debug, Clone, Serialize, Deserialize, Default)]
649#[serde(default)]
650pub struct RedisConfig {
651    pub prefix: Option<String>,
652    pub url_override: Option<String>,
653    pub cluster_mode: bool,
654}
655
656#[derive(Debug, Clone, Serialize, Deserialize)]
657#[serde(default)]
658pub struct ChannelLimits {
659    pub max_name_length: u32,
660    pub cache_ttl: u64,
661}
662
663#[derive(Debug, Clone, Serialize, Deserialize)]
664#[serde(default)]
665pub struct CorsConfig {
666    pub credentials: bool,
667    #[serde(deserialize_with = "deserialize_and_validate_cors_origins")]
668    pub origin: Vec<String>,
669    pub methods: Vec<String>,
670    pub allowed_headers: Vec<String>,
671}
672
673fn deserialize_and_validate_cors_origins<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
674where
675    D: serde::Deserializer<'de>,
676{
677    use serde::de::Error;
678    let origins = Vec::<String>::deserialize(deserializer)?;
679
680    if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&origins) {
681        return Err(D::Error::custom(format!(
682            "CORS origin pattern validation failed: {}",
683            e
684        )));
685    }
686
687    Ok(origins)
688}
689
690#[derive(Debug, Clone, Serialize, Deserialize, Default)]
691#[serde(default)]
692pub struct DatabaseConfig {
693    pub mysql: DatabaseConnection,
694    pub postgres: DatabaseConnection,
695    pub redis: RedisConnection,
696    pub dynamodb: DynamoDbSettings,
697    pub surrealdb: SurrealDbSettings,
698    pub scylladb: ScyllaDbSettings,
699}
700
701#[derive(Debug, Clone, Serialize, Deserialize)]
702#[serde(default)]
703pub struct DatabaseConnection {
704    pub host: String,
705    pub port: u16,
706    pub username: String,
707    pub password: String,
708    pub database: String,
709    pub table_name: String,
710    pub connection_pool_size: u32,
711    pub pool_min: Option<u32>,
712    pub pool_max: Option<u32>,
713    pub cache_ttl: u64,
714    pub cache_cleanup_interval: u64,
715    pub cache_max_capacity: u64,
716}
717
718#[derive(Debug, Clone, Serialize, Deserialize)]
719#[serde(default)]
720pub struct RedisConnection {
721    pub host: String,
722    pub port: u16,
723    pub db: u32,
724    pub username: Option<String>,
725    pub password: Option<String>,
726    pub key_prefix: String,
727    pub sentinels: Vec<RedisSentinel>,
728    pub sentinel_password: Option<String>,
729    pub name: String,
730    pub cluster: RedisClusterConnection,
731    /// Legacy field kept for backward compatibility. Prefer `database.redis.cluster.nodes`.
732    pub cluster_nodes: Vec<ClusterNode>,
733}
734
735#[derive(Debug, Clone, Serialize, Deserialize)]
736#[serde(default)]
737pub struct RedisSentinel {
738    pub host: String,
739    pub port: u16,
740}
741
742#[derive(Debug, Clone, Serialize, Deserialize, Default)]
743#[serde(default)]
744pub struct RedisClusterConnection {
745    pub nodes: Vec<ClusterNode>,
746    pub username: Option<String>,
747    pub password: Option<String>,
748    #[serde(alias = "useTLS")]
749    pub use_tls: bool,
750}
751
752impl RedisConnection {
753    /// Returns true if Redis Sentinel is configured.
754    pub fn is_sentinel_configured(&self) -> bool {
755        !self.sentinels.is_empty()
756    }
757
758    /// Builds a Redis connection URL based on the configuration.
759    pub fn to_url(&self) -> String {
760        if self.is_sentinel_configured() {
761            self.build_sentinel_url()
762        } else {
763            self.build_standard_url()
764        }
765    }
766
767    fn build_standard_url(&self) -> String {
768        // Extract scheme from host if present, otherwise default to redis://
769        let (scheme, host) = if self.host.starts_with("rediss://") {
770            ("rediss://", self.host.trim_start_matches("rediss://"))
771        } else if self.host.starts_with("redis://") {
772            ("redis://", self.host.trim_start_matches("redis://"))
773        } else {
774            ("redis://", self.host.as_str())
775        };
776
777        let mut url = String::from(scheme);
778
779        if let Some(ref username) = self.username {
780            url.push_str(username);
781            if let Some(ref password) = self.password {
782                url.push(':');
783                url.push_str(&urlencoding::encode(password));
784            }
785            url.push('@');
786        } else if let Some(ref password) = self.password {
787            url.push(':');
788            url.push_str(&urlencoding::encode(password));
789            url.push('@');
790        }
791
792        url.push_str(host);
793        url.push(':');
794        url.push_str(&self.port.to_string());
795        url.push('/');
796        url.push_str(&self.db.to_string());
797
798        url
799    }
800
801    fn build_sentinel_url(&self) -> String {
802        let mut url = String::from("redis+sentinel://");
803
804        if let Some(ref sentinel_password) = self.sentinel_password {
805            url.push(':');
806            url.push_str(&urlencoding::encode(sentinel_password));
807            url.push('@');
808        }
809
810        let sentinel_hosts: Vec<String> = self
811            .sentinels
812            .iter()
813            .map(|s| format!("{}:{}", s.host, s.port))
814            .collect();
815        url.push_str(&sentinel_hosts.join(","));
816
817        url.push('/');
818        url.push_str(&self.name);
819        url.push('/');
820        url.push_str(&self.db.to_string());
821
822        let mut params = Vec::new();
823        if let Some(ref password) = self.password {
824            params.push(format!("password={}", urlencoding::encode(password)));
825        }
826        if let Some(ref username) = self.username {
827            params.push(format!("username={}", urlencoding::encode(username)));
828        }
829
830        if !params.is_empty() {
831            url.push('?');
832            url.push_str(&params.join("&"));
833        }
834
835        url
836    }
837
838    /// Returns true when cluster nodes are configured via either the new (`cluster.nodes`)
839    /// or legacy (`cluster_nodes`) field.
840    pub fn has_cluster_nodes(&self) -> bool {
841        !self.cluster.nodes.is_empty() || !self.cluster_nodes.is_empty()
842    }
843
844    /// Returns normalized Redis Cluster seed URLs from the canonical cluster configuration.
845    /// Falls back to legacy `cluster_nodes` for backward compatibility.
846    pub fn cluster_node_urls(&self) -> Vec<String> {
847        if !self.cluster.nodes.is_empty() {
848            return self.build_cluster_urls(&self.cluster.nodes);
849        }
850        self.build_cluster_urls(&self.cluster_nodes)
851    }
852
853    /// Normalizes any list of seed strings (`host:port`, `redis://...`, `rediss://...`) using
854    /// shared cluster auth/TLS options.
855    pub fn normalize_cluster_seed_urls(&self, seeds: &[String]) -> Vec<String> {
856        self.build_cluster_urls(
857            &seeds
858                .iter()
859                .filter_map(|seed| ClusterNode::from_seed(seed))
860                .collect::<Vec<ClusterNode>>(),
861        )
862    }
863
864    fn build_cluster_urls(&self, nodes: &[ClusterNode]) -> Vec<String> {
865        let username = self
866            .cluster
867            .username
868            .as_deref()
869            .or(self.username.as_deref());
870        let password = self
871            .cluster
872            .password
873            .as_deref()
874            .or(self.password.as_deref());
875        let use_tls = self.cluster.use_tls;
876
877        nodes
878            .iter()
879            .map(|node| node.to_url_with_options(use_tls, username, password))
880            .collect()
881    }
882}
883
884impl RedisSentinel {
885    pub fn to_host_port(&self) -> String {
886        format!("{}:{}", self.host, self.port)
887    }
888}
889
890#[derive(Debug, Clone, Serialize, Deserialize)]
891#[serde(default)]
892pub struct ClusterNode {
893    pub host: String,
894    pub port: u16,
895}
896
897impl ClusterNode {
898    pub fn to_url(&self) -> String {
899        self.to_url_with_options(false, None, None)
900    }
901
902    pub fn to_url_with_options(
903        &self,
904        use_tls: bool,
905        username: Option<&str>,
906        password: Option<&str>,
907    ) -> String {
908        let host = self.host.trim();
909
910        if host.starts_with("redis://") || host.starts_with("rediss://") {
911            if let Ok(parsed) = Url::parse(host)
912                && let Some(host_str) = parsed.host_str()
913            {
914                let scheme = parsed.scheme();
915                let port = parsed.port_or_known_default().unwrap_or(self.port);
916                let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
917                let parsed_password = parsed.password();
918                let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
919                let (effective_username, effective_password) = if has_embedded_auth {
920                    (parsed_username, parsed_password)
921                } else {
922                    (username, password)
923                };
924
925                return build_redis_url(
926                    scheme,
927                    host_str,
928                    port,
929                    effective_username,
930                    effective_password,
931                );
932            }
933
934            // Fallback for malformed URLs
935            let has_port = if let Some(bracket_pos) = host.rfind(']') {
936                host[bracket_pos..].contains(':')
937            } else {
938                host.split(':').count() >= 3
939            };
940            let base = if has_port {
941                host.to_string()
942            } else {
943                format!("{}:{}", host, self.port)
944            };
945
946            if let Ok(parsed) = Url::parse(&base) {
947                let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
948                let parsed_password = parsed.password();
949                if let Some(host_str) = parsed.host_str() {
950                    let port = parsed.port_or_known_default().unwrap_or(self.port);
951                    let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
952                    let (effective_username, effective_password) = if has_embedded_auth {
953                        (parsed_username, parsed_password)
954                    } else {
955                        (username, password)
956                    };
957                    return build_redis_url(
958                        parsed.scheme(),
959                        host_str,
960                        port,
961                        effective_username,
962                        effective_password,
963                    );
964                }
965            }
966            return base;
967        }
968
969        let (normalized_host, normalized_port) = split_plain_host_and_port(host, self.port);
970        let scheme = if use_tls { "rediss" } else { "redis" };
971        build_redis_url(
972            scheme,
973            &normalized_host,
974            normalized_port,
975            username,
976            password,
977        )
978    }
979
980    pub fn from_seed(seed: &str) -> Option<Self> {
981        let trimmed = seed.trim();
982        if trimmed.is_empty() {
983            return None;
984        }
985
986        if trimmed.starts_with("redis://") || trimmed.starts_with("rediss://") {
987            let port = Url::parse(trimmed)
988                .ok()
989                .and_then(|parsed| parsed.port_or_known_default())
990                .unwrap_or(6379);
991            return Some(Self {
992                host: trimmed.to_string(),
993                port,
994            });
995        }
996
997        let (host, port) = split_plain_host_and_port(trimmed, 6379);
998        Some(Self { host, port })
999    }
1000}
1001
1002fn split_plain_host_and_port(raw_host: &str, default_port: u16) -> (String, u16) {
1003    let host = raw_host.trim();
1004
1005    // Handle bracketed IPv6: [::1]:6379
1006    if host.starts_with('[') {
1007        if let Some(end_bracket) = host.find(']') {
1008            let host_part = host[1..end_bracket].to_string();
1009            let remainder = &host[end_bracket + 1..];
1010            if let Some(port_str) = remainder.strip_prefix(':')
1011                && let Ok(port) = port_str.parse::<u16>()
1012            {
1013                return (host_part, port);
1014            }
1015            return (host_part, default_port);
1016        }
1017        return (host.to_string(), default_port);
1018    }
1019
1020    // Handle hostname/IP with port: host:6379
1021    if host.matches(':').count() == 1
1022        && let Some((host_part, port_part)) = host.rsplit_once(':')
1023        && let Ok(port) = port_part.parse::<u16>()
1024    {
1025        return (host_part.to_string(), port);
1026    }
1027
1028    (host.to_string(), default_port)
1029}
1030
1031fn build_redis_url(
1032    scheme: &str,
1033    host: &str,
1034    port: u16,
1035    username: Option<&str>,
1036    password: Option<&str>,
1037) -> String {
1038    let mut url = format!("{scheme}://");
1039
1040    if let Some(user) = username {
1041        url.push_str(&urlencoding::encode(user));
1042        if let Some(pass) = password {
1043            url.push(':');
1044            url.push_str(&urlencoding::encode(pass));
1045        }
1046        url.push('@');
1047    } else if let Some(pass) = password {
1048        url.push(':');
1049        url.push_str(&urlencoding::encode(pass));
1050        url.push('@');
1051    }
1052
1053    if host.contains(':') && !host.starts_with('[') {
1054        url.push('[');
1055        url.push_str(host);
1056        url.push(']');
1057    } else {
1058        url.push_str(host);
1059    }
1060    url.push(':');
1061    url.push_str(&port.to_string());
1062    url
1063}
1064
1065#[derive(Debug, Clone, Serialize, Deserialize)]
1066#[serde(default)]
1067pub struct DatabasePooling {
1068    pub enabled: bool,
1069    pub min: u32,
1070    pub max: u32,
1071}
1072
1073#[derive(Debug, Clone, Serialize, Deserialize)]
1074#[serde(default)]
1075pub struct EventLimits {
1076    pub max_channels_at_once: u32,
1077    pub max_name_length: u32,
1078    pub max_payload_in_kb: u32,
1079    pub max_batch_size: u32,
1080}
1081
1082#[derive(Debug, Clone, Serialize, Deserialize)]
1083#[serde(default)]
1084pub struct IdempotencyConfig {
1085    /// Whether idempotency key support is enabled
1086    pub enabled: bool,
1087    /// TTL in seconds for idempotency keys (default: 120s, like Ably's 2-minute window)
1088    pub ttl_seconds: u64,
1089    /// Maximum length of an idempotency key
1090    pub max_key_length: usize,
1091}
1092
1093#[derive(Debug, Clone, Serialize, Deserialize)]
1094#[serde(default)]
1095pub struct EphemeralConfig {
1096    /// Whether ephemeral message handling is enabled.
1097    pub enabled: bool,
1098}
1099
1100#[derive(Debug, Clone, Serialize, Deserialize)]
1101#[serde(default)]
1102pub struct EchoControlConfig {
1103    /// Whether connection-level and per-message echo control is enabled.
1104    pub enabled: bool,
1105    /// Default echo behavior for new V2 connections when the query param is omitted.
1106    pub default_echo_messages: bool,
1107}
1108
1109#[derive(Debug, Clone, Serialize, Deserialize)]
1110#[serde(default)]
1111pub struct EventNameFilteringConfig {
1112    /// Whether per-subscription event name filtering is enabled.
1113    pub enabled: bool,
1114    /// Maximum event names allowed in a single filter.
1115    pub max_events_per_filter: usize,
1116    /// Maximum length for each event name in a filter.
1117    pub max_event_name_length: usize,
1118}
1119
1120#[derive(Debug, Clone, Serialize, Deserialize)]
1121#[serde(default)]
1122pub struct ConnectionRecoveryConfig {
1123    /// Whether connection recovery (resume) is enabled.
1124    /// When enabled, the server keeps a bounded replay buffer per channel so that
1125    /// reconnecting clients can receive missed messages. Disabled by default for
1126    /// Pusher protocol compatibility.
1127    pub enabled: bool,
1128    /// How long messages stay in the replay buffer (seconds). Default: 120 (2 min).
1129    pub buffer_ttl_seconds: u64,
1130    /// Maximum number of messages kept per channel. Default: 100.
1131    pub max_buffer_size: usize,
1132}
1133
1134#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1135#[serde(rename_all = "lowercase")]
1136pub enum VersionStoreDriver {
1137    #[default]
1138    Memory,
1139    Postgres,
1140    Mysql,
1141    DynamoDb,
1142    ScyllaDb,
1143    SurrealDb,
1144}
1145
1146impl FromStr for VersionStoreDriver {
1147    type Err = String;
1148    fn from_str(s: &str) -> Result<Self, Self::Err> {
1149        match s.to_lowercase().as_str() {
1150            "postgres" | "postgresql" | "pgsql" => Ok(Self::Postgres),
1151            "mysql" => Ok(Self::Mysql),
1152            "dynamodb" => Ok(Self::DynamoDb),
1153            "scylladb" | "scylla" => Ok(Self::ScyllaDb),
1154            "surrealdb" | "surreal" => Ok(Self::SurrealDb),
1155            "memory" => Ok(Self::Memory),
1156            _ => Err(format!("Unknown version store driver: {s}")),
1157        }
1158    }
1159}
1160
1161#[derive(Debug, Clone, Serialize, Deserialize)]
1162#[serde(default)]
1163pub struct VersionedMessagesConfig {
1164    /// Whether V2 mutable message HTTP/retrieval surfaces are enabled.
1165    pub enabled: bool,
1166    /// Storage driver for versioned messages. Defaults to memory.
1167    pub driver: VersionStoreDriver,
1168    /// Maximum page size for version-history retrieval.
1169    pub max_page_size: usize,
1170    /// Retention window for version entries, in seconds. `0` disables expiry
1171    /// (entries are kept forever).
1172    ///
1173    /// Backends with native row TTL (ScyllaDB `USING TTL`, DynamoDB TTL
1174    /// attribute) apply this at the storage layer. Other backends (MySQL,
1175    /// PostgreSQL, SurrealDB, Memory) enforce it via a periodic background
1176    /// purge worker controlled by `purge_interval_seconds`.
1177    pub retention_window_seconds: u64,
1178    /// Interval between purge worker runs, in seconds. Only consulted for
1179    /// backends without native TTL. Clamped to a minimum of 10 seconds.
1180    pub purge_interval_seconds: u64,
1181    /// Maximum rows deleted per purge query. Bounds lock/transaction sizes
1182    /// for SQL backends. The worker loops until no more expired rows remain
1183    /// or `max_purge_per_tick` is reached.
1184    pub purge_batch_size: usize,
1185    /// Hard cap on rows deleted per purge tick across all loop iterations.
1186    /// Prevents a backlog of expired rows from monopolising a worker run.
1187    pub max_purge_per_tick: usize,
1188}
1189
1190#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1191#[serde(default)]
1192pub struct AnnotationsConfig {
1193    /// Whether Sockudo-native annotation APIs and realtime annotation protocol
1194    /// surfaces are enabled. Disabled by default while the feature is opt-in.
1195    pub enabled: bool,
1196}
1197
1198#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1199#[serde(rename_all = "lowercase")]
1200pub enum HistoryBackend {
1201    #[default]
1202    Postgres,
1203    Mysql,
1204    DynamoDb,
1205    SurrealDb,
1206    ScyllaDb,
1207    Memory,
1208}
1209
1210impl FromStr for HistoryBackend {
1211    type Err = String;
1212    fn from_str(s: &str) -> Result<Self, Self::Err> {
1213        match s.to_lowercase().as_str() {
1214            "postgres" | "postgresql" | "pgsql" => Ok(Self::Postgres),
1215            "mysql" => Ok(Self::Mysql),
1216            "dynamodb" => Ok(Self::DynamoDb),
1217            "surrealdb" | "surreal" => Ok(Self::SurrealDb),
1218            "scylladb" | "scylla" => Ok(Self::ScyllaDb),
1219            "memory" => Ok(Self::Memory),
1220            _ => Err(format!("Unknown history backend: {s}")),
1221        }
1222    }
1223}
1224
1225#[derive(Debug, Clone, Serialize, Deserialize)]
1226#[serde(default)]
1227pub struct PostgresHistoryConfig {
1228    pub table_prefix: String,
1229    pub write_timeout_ms: u64,
1230}
1231
1232impl Default for PostgresHistoryConfig {
1233    fn default() -> Self {
1234        Self {
1235            table_prefix: "sockudo_history".to_string(),
1236            write_timeout_ms: 5000,
1237        }
1238    }
1239}
1240
1241#[derive(Debug, Clone, Serialize, Deserialize)]
1242#[serde(default)]
1243pub struct MySqlHistoryConfig {
1244    pub table_prefix: String,
1245    pub write_timeout_ms: u64,
1246}
1247
1248impl Default for MySqlHistoryConfig {
1249    fn default() -> Self {
1250        Self {
1251            table_prefix: "sockudo_history".to_string(),
1252            write_timeout_ms: 5000,
1253        }
1254    }
1255}
1256
1257#[derive(Debug, Clone, Serialize, Deserialize)]
1258#[serde(default)]
1259pub struct DynamoDbHistoryConfig {
1260    pub table_prefix: String,
1261    pub write_timeout_ms: u64,
1262}
1263
1264impl Default for DynamoDbHistoryConfig {
1265    fn default() -> Self {
1266        Self {
1267            table_prefix: "sockudo_history".to_string(),
1268            write_timeout_ms: 5000,
1269        }
1270    }
1271}
1272
1273#[derive(Debug, Clone, Serialize, Deserialize)]
1274#[serde(default)]
1275pub struct SurrealDbHistoryConfig {
1276    pub table_prefix: String,
1277    pub write_timeout_ms: u64,
1278}
1279
1280impl Default for SurrealDbHistoryConfig {
1281    fn default() -> Self {
1282        Self {
1283            table_prefix: "sockudo_history".to_string(),
1284            write_timeout_ms: 5000,
1285        }
1286    }
1287}
1288
1289#[derive(Debug, Clone, Serialize, Deserialize)]
1290#[serde(default)]
1291pub struct ScyllaDbHistoryConfig {
1292    pub table_prefix: String,
1293    pub write_timeout_ms: u64,
1294}
1295
1296impl Default for ScyllaDbHistoryConfig {
1297    fn default() -> Self {
1298        Self {
1299            table_prefix: "sockudo_history".to_string(),
1300            write_timeout_ms: 5000,
1301        }
1302    }
1303}
1304
1305#[derive(Debug, Clone, Serialize, Deserialize)]
1306#[serde(default)]
1307pub struct HistoryConfig {
1308    pub enabled: bool,
1309    pub rewind_enabled: bool,
1310    pub backend: HistoryBackend,
1311    pub retention_window_seconds: u64,
1312    pub max_page_size: usize,
1313    pub max_messages_per_channel: Option<usize>,
1314    pub max_bytes_per_channel: Option<u64>,
1315    pub writer_shards: usize,
1316    pub writer_queue_capacity: usize,
1317    pub purge_interval_seconds: u64,
1318    pub purge_batch_size: usize,
1319    pub max_purge_per_tick: usize,
1320    pub postgres: PostgresHistoryConfig,
1321    pub mysql: MySqlHistoryConfig,
1322    pub dynamodb: DynamoDbHistoryConfig,
1323    pub surrealdb: SurrealDbHistoryConfig,
1324    pub scylladb: ScyllaDbHistoryConfig,
1325}
1326
1327impl Default for HistoryConfig {
1328    fn default() -> Self {
1329        Self {
1330            enabled: false,
1331            rewind_enabled: true,
1332            backend: HistoryBackend::Postgres,
1333            retention_window_seconds: 86400,
1334            max_page_size: 100,
1335            max_messages_per_channel: None,
1336            max_bytes_per_channel: None,
1337            writer_shards: 16,
1338            writer_queue_capacity: 4096,
1339            purge_interval_seconds: 300,
1340            purge_batch_size: 1000,
1341            max_purge_per_tick: 100_000,
1342            postgres: PostgresHistoryConfig::default(),
1343            mysql: MySqlHistoryConfig::default(),
1344            dynamodb: DynamoDbHistoryConfig::default(),
1345            surrealdb: SurrealDbHistoryConfig::default(),
1346            scylladb: ScyllaDbHistoryConfig::default(),
1347        }
1348    }
1349}
1350
1351impl Default for VersionedMessagesConfig {
1352    fn default() -> Self {
1353        Self {
1354            enabled: false,
1355            driver: VersionStoreDriver::Memory,
1356            max_page_size: 100,
1357            retention_window_seconds: 0,
1358            purge_interval_seconds: 300,
1359            purge_batch_size: 1000,
1360            max_purge_per_tick: 100_000,
1361        }
1362    }
1363}
1364
1365#[derive(Debug, Clone, Serialize, Deserialize)]
1366#[serde(default)]
1367pub struct PresenceHistoryConfig {
1368    pub enabled: bool,
1369    pub retention_window_seconds: u64,
1370    pub max_page_size: usize,
1371    pub max_events_per_channel: Option<usize>,
1372    pub max_bytes_per_channel: Option<u64>,
1373}
1374
1375impl Default for PresenceHistoryConfig {
1376    fn default() -> Self {
1377        Self {
1378            enabled: false,
1379            retention_window_seconds: 86400,
1380            max_page_size: 100,
1381            max_events_per_channel: None,
1382            max_bytes_per_channel: None,
1383        }
1384    }
1385}
1386
1387#[derive(Debug, Clone, Serialize, Deserialize)]
1388#[serde(default)]
1389pub struct HttpApiConfig {
1390    pub request_limit_in_mb: u32,
1391    pub accept_traffic: AcceptTraffic,
1392    pub usage_enabled: bool,
1393}
1394
1395#[derive(Debug, Clone, Serialize, Deserialize)]
1396#[serde(default)]
1397pub struct AcceptTraffic {
1398    pub memory_threshold: f64,
1399}
1400
1401#[derive(Debug, Clone, Serialize, Deserialize)]
1402#[serde(default)]
1403pub struct InstanceConfig {
1404    pub process_id: String,
1405}
1406
1407#[derive(Debug, Clone, Serialize, Deserialize)]
1408#[serde(default)]
1409pub struct MetricsConfig {
1410    pub enabled: bool,
1411    pub driver: MetricsDriver,
1412    pub host: String,
1413    pub prometheus: PrometheusConfig,
1414    pub port: u16,
1415}
1416
1417#[derive(Debug, Clone, Serialize, Deserialize)]
1418#[serde(default)]
1419pub struct PrometheusConfig {
1420    pub prefix: String,
1421}
1422
1423#[derive(Debug, Clone, Serialize, Deserialize)]
1424#[serde(default)]
1425pub struct LoggingConfig {
1426    pub colors_enabled: bool,
1427    pub include_target: bool,
1428}
1429
1430#[derive(Debug, Clone, Serialize, Deserialize)]
1431#[serde(default)]
1432pub struct PresenceConfig {
1433    pub max_members_per_channel: u32,
1434    pub max_member_size_in_kb: u32,
1435}
1436
1437/// WebSocket connection buffer configuration
1438/// Controls backpressure handling for slow consumers
1439#[derive(Debug, Clone, Serialize, Deserialize)]
1440#[serde(default)]
1441pub struct WebSocketConfig {
1442    pub max_messages: Option<usize>,
1443    pub max_bytes: Option<usize>,
1444    pub disconnect_on_buffer_full: bool,
1445    pub max_message_size: usize,
1446    pub max_frame_size: usize,
1447    pub write_buffer_size: usize,
1448    pub max_backpressure: usize,
1449    pub auto_ping: bool,
1450    pub ping_interval: u32,
1451    pub idle_timeout: u32,
1452    pub compression: String,
1453}
1454
1455impl Default for WebSocketConfig {
1456    fn default() -> Self {
1457        Self {
1458            max_messages: Some(1000),
1459            max_bytes: None,
1460            disconnect_on_buffer_full: true,
1461            max_message_size: 64 * 1024 * 1024,
1462            max_frame_size: 16 * 1024 * 1024,
1463            write_buffer_size: 16 * 1024,
1464            max_backpressure: 1024 * 1024,
1465            auto_ping: true,
1466            ping_interval: 30,
1467            idle_timeout: 120,
1468            compression: "disabled".to_string(),
1469        }
1470    }
1471}
1472
1473impl WebSocketConfig {
1474    /// Convert to WebSocketBufferConfig for runtime use
1475    pub fn to_buffer_config(&self) -> crate::websocket::WebSocketBufferConfig {
1476        use crate::websocket::{BufferLimit, WebSocketBufferConfig};
1477
1478        let limit = match (self.max_messages, self.max_bytes) {
1479            (Some(messages), Some(bytes)) => BufferLimit::Both { messages, bytes },
1480            (Some(messages), None) => BufferLimit::Messages(messages),
1481            (None, Some(bytes)) => BufferLimit::Bytes(bytes),
1482            (None, None) => BufferLimit::Messages(1000),
1483        };
1484
1485        WebSocketBufferConfig {
1486            limit,
1487            disconnect_on_full: self.disconnect_on_buffer_full,
1488        }
1489    }
1490
1491    /// Convert to native sockudo-ws runtime configuration.
1492    pub fn to_sockudo_ws_config(
1493        &self,
1494        websocket_max_payload_kb: u32,
1495        activity_timeout: u64,
1496    ) -> sockudo_ws::Config {
1497        use sockudo_ws::Compression;
1498
1499        let compression = match self.compression.to_lowercase().as_str() {
1500            "dedicated" => Compression::Dedicated,
1501            "shared" => Compression::Shared,
1502            "window256b" => Compression::Window256B,
1503            "window1kb" => Compression::Window1KB,
1504            "window2kb" => Compression::Window2KB,
1505            "window4kb" => Compression::Window4KB,
1506            "window8kb" => Compression::Window8KB,
1507            "window16kb" => Compression::Window16KB,
1508            "window32kb" => Compression::Window32KB,
1509            _ => Compression::Disabled,
1510        };
1511
1512        sockudo_ws::Config::builder()
1513            .max_payload_length(
1514                self.max_bytes
1515                    .unwrap_or(websocket_max_payload_kb as usize * 1024),
1516            )
1517            .max_message_size(self.max_message_size)
1518            .max_frame_size(self.max_frame_size)
1519            .write_buffer_size(self.write_buffer_size)
1520            .max_backpressure(self.max_backpressure)
1521            .idle_timeout(self.idle_timeout)
1522            .auto_ping(self.auto_ping)
1523            .ping_interval(self.ping_interval.max((activity_timeout / 2).max(5) as u32))
1524            .compression(compression)
1525            .build()
1526    }
1527}
1528
1529#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1530#[serde(default)]
1531pub struct QueueConfig {
1532    pub driver: QueueDriver,
1533    pub redis: RedisQueueConfig,
1534    pub redis_cluster: RedisClusterQueueConfig,
1535    pub nats: NatsAdapterConfig,
1536    pub pulsar: PulsarAdapterConfig,
1537    pub rabbitmq: RabbitMqAdapterConfig,
1538    pub google_pubsub: GooglePubSubAdapterConfig,
1539    pub kafka: KafkaAdapterConfig,
1540    pub iggy: IggyConfig,
1541    pub sqs: SqsQueueConfig,
1542    pub sns: SnsQueueConfig,
1543}
1544
1545#[derive(Debug, Clone, Serialize, Deserialize)]
1546#[serde(default)]
1547pub struct RedisQueueConfig {
1548    pub concurrency: u32,
1549    pub prefix: Option<String>,
1550    pub url_override: Option<String>,
1551    pub cluster_mode: bool,
1552}
1553
1554#[derive(Clone, Debug, Serialize, Deserialize)]
1555#[serde(default)]
1556pub struct RateLimit {
1557    pub max_requests: u32,
1558    pub window_seconds: u64,
1559    pub identifier: Option<String>,
1560    pub trust_hops: Option<u32>,
1561}
1562
1563#[derive(Debug, Clone, Serialize, Deserialize)]
1564#[serde(default)]
1565pub struct RateLimiterConfig {
1566    pub enabled: bool,
1567    pub driver: CacheDriver,
1568    pub api_rate_limit: RateLimit,
1569    pub websocket_rate_limit: RateLimit,
1570    pub redis: RedisConfig,
1571}
1572
1573#[derive(Debug, Clone, Serialize, Deserialize)]
1574#[serde(default)]
1575pub struct SslConfig {
1576    pub enabled: bool,
1577    pub cert_path: String,
1578    pub key_path: String,
1579    pub passphrase: Option<String>,
1580    pub ca_path: Option<String>,
1581    pub redirect_http: bool,
1582    pub http_port: Option<u16>,
1583}
1584
1585#[derive(Debug, Clone, Serialize, Deserialize)]
1586#[serde(default)]
1587pub struct WebhooksConfig {
1588    pub batching: BatchingConfig,
1589    pub retry: WebhookRetryConfig,
1590    pub request_timeout_ms: u64,
1591}
1592
1593#[derive(Debug, Clone, Serialize, Deserialize)]
1594#[serde(default)]
1595pub struct BatchingConfig {
1596    pub enabled: bool,
1597    pub duration: u64,
1598    pub size: usize,
1599}
1600
1601#[derive(Debug, Clone, Serialize, Deserialize)]
1602#[serde(default)]
1603pub struct WebhookRetryConfig {
1604    pub enabled: bool,
1605    pub max_attempts: Option<u32>,
1606    pub max_elapsed_time_ms: u64,
1607    pub initial_backoff_ms: u64,
1608    pub max_backoff_ms: u64,
1609}
1610
1611impl Default for WebhooksConfig {
1612    fn default() -> Self {
1613        Self {
1614            batching: BatchingConfig::default(),
1615            retry: WebhookRetryConfig::default(),
1616            request_timeout_ms: 10_000,
1617        }
1618    }
1619}
1620
1621impl Default for WebhookRetryConfig {
1622    fn default() -> Self {
1623        Self {
1624            enabled: true,
1625            max_attempts: None,
1626            max_elapsed_time_ms: 300_000,
1627            initial_backoff_ms: 1_000,
1628            max_backoff_ms: 60_000,
1629        }
1630    }
1631}
1632
1633#[derive(Debug, Clone, Serialize, Deserialize)]
1634#[serde(default)]
1635pub struct ClusterHealthConfig {
1636    pub enabled: bool,
1637    pub heartbeat_interval_ms: u64,
1638    pub node_timeout_ms: u64,
1639    pub cleanup_interval_ms: u64,
1640}
1641
1642#[derive(Debug, Clone, Serialize, Deserialize)]
1643#[serde(default)]
1644pub struct UnixSocketConfig {
1645    pub enabled: bool,
1646    pub path: String,
1647    #[serde(deserialize_with = "deserialize_octal_permission")]
1648    pub permission_mode: u32,
1649}
1650
1651#[derive(Debug, Clone, Serialize, Deserialize)]
1652#[serde(default)]
1653pub struct DeltaCompressionOptionsConfig {
1654    pub enabled: bool,
1655    pub algorithm: String,
1656    pub full_message_interval: u32,
1657    pub min_message_size: usize,
1658    pub max_state_age_secs: u64,
1659    pub max_channel_states_per_socket: usize,
1660    pub max_conflation_states_per_channel: Option<usize>,
1661    pub conflation_key_path: Option<String>,
1662    pub cluster_coordination: bool,
1663    pub coordination_backend: DeltaCoordinationBackend,
1664    pub omit_delta_algorithm: bool,
1665}
1666
1667/// Cleanup system configuration (minimal version for options; full impl lives in sockudo crate)
1668#[derive(Debug, Clone, Serialize, Deserialize)]
1669#[serde(default)]
1670pub struct CleanupConfig {
1671    pub queue_buffer_size: usize,
1672    pub batch_size: usize,
1673    pub batch_timeout_ms: u64,
1674    pub worker_threads: WorkerThreadsConfig,
1675    pub max_retry_attempts: u32,
1676    pub async_enabled: bool,
1677    pub fallback_to_sync: bool,
1678}
1679
1680/// Worker threads configuration for the cleanup system
1681#[derive(Debug, Clone)]
1682pub enum WorkerThreadsConfig {
1683    Auto,
1684    Fixed(usize),
1685}
1686
1687impl serde::Serialize for WorkerThreadsConfig {
1688    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1689    where
1690        S: serde::Serializer,
1691    {
1692        match self {
1693            WorkerThreadsConfig::Auto => serializer.serialize_str("auto"),
1694            WorkerThreadsConfig::Fixed(n) => serializer.serialize_u64(*n as u64),
1695        }
1696    }
1697}
1698
1699impl<'de> serde::Deserialize<'de> for WorkerThreadsConfig {
1700    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1701    where
1702        D: serde::Deserializer<'de>,
1703    {
1704        use serde::de;
1705        struct WorkerThreadsVisitor;
1706        impl<'de> de::Visitor<'de> for WorkerThreadsVisitor {
1707            type Value = WorkerThreadsConfig;
1708
1709            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1710                formatter.write_str(r#""auto" or a positive integer"#)
1711            }
1712
1713            fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
1714                if value.eq_ignore_ascii_case("auto") {
1715                    Ok(WorkerThreadsConfig::Auto)
1716                } else if let Ok(n) = value.parse::<usize>() {
1717                    Ok(WorkerThreadsConfig::Fixed(n))
1718                } else {
1719                    Err(E::custom(format!(
1720                        "expected 'auto' or a number, got '{value}'"
1721                    )))
1722                }
1723            }
1724
1725            fn visit_u64<E: de::Error>(self, value: u64) -> Result<Self::Value, E> {
1726                Ok(WorkerThreadsConfig::Fixed(value as usize))
1727            }
1728
1729            fn visit_i64<E: de::Error>(self, value: i64) -> Result<Self::Value, E> {
1730                if value >= 0 {
1731                    Ok(WorkerThreadsConfig::Fixed(value as usize))
1732                } else {
1733                    Err(E::custom("worker_threads must be non-negative"))
1734                }
1735            }
1736        }
1737        deserializer.deserialize_any(WorkerThreadsVisitor)
1738    }
1739}
1740
1741impl Default for CleanupConfig {
1742    fn default() -> Self {
1743        Self {
1744            queue_buffer_size: 1024,
1745            batch_size: 64,
1746            batch_timeout_ms: 100,
1747            worker_threads: WorkerThreadsConfig::Auto,
1748            max_retry_attempts: 3,
1749            async_enabled: true,
1750            fallback_to_sync: true,
1751        }
1752    }
1753}
1754
1755impl CleanupConfig {
1756    pub fn validate(&self) -> Result<(), String> {
1757        if self.queue_buffer_size == 0 {
1758            return Err("queue_buffer_size must be greater than 0".to_string());
1759        }
1760        if self.batch_size == 0 {
1761            return Err("batch_size must be greater than 0".to_string());
1762        }
1763        if self.batch_timeout_ms == 0 {
1764            return Err("batch_timeout_ms must be greater than 0".to_string());
1765        }
1766        if let WorkerThreadsConfig::Fixed(n) = self.worker_threads
1767            && n == 0
1768        {
1769            return Err("worker_threads must be greater than 0 when using fixed count".to_string());
1770        }
1771        Ok(())
1772    }
1773}
1774
1775#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1776#[serde(default)]
1777pub struct TagFilteringConfig {
1778    #[serde(default)]
1779    pub enabled: bool,
1780    #[serde(default = "default_true")]
1781    pub enable_tags: bool,
1782}
1783
1784fn default_true() -> bool {
1785    true
1786}
1787
1788// --- Default Implementations ---
1789
1790impl Default for ServerOptions {
1791    fn default() -> Self {
1792        Self {
1793            adapter: AdapterConfig::default(),
1794            app_manager: AppManagerConfig::default(),
1795            cache: CacheConfig::default(),
1796            channel_limits: ChannelLimits::default(),
1797            cors: CorsConfig::default(),
1798            database: DatabaseConfig::default(),
1799            database_pooling: DatabasePooling::default(),
1800            debug: false,
1801            tag_filtering: TagFilteringConfig::default(),
1802            event_limits: EventLimits::default(),
1803            host: "0.0.0.0".to_string(),
1804            http_api: HttpApiConfig::default(),
1805            instance: InstanceConfig::default(),
1806            logging: None,
1807            metrics: MetricsConfig::default(),
1808            mode: "production".to_string(),
1809            port: 6001,
1810            path_prefix: "/".to_string(),
1811            presence: PresenceConfig::default(),
1812            queue: QueueConfig::default(),
1813            rate_limiter: RateLimiterConfig::default(),
1814            shutdown_grace_period: 10,
1815            ssl: SslConfig::default(),
1816            user_authentication_timeout: 3600,
1817            webhooks: WebhooksConfig::default(),
1818            websocket_max_payload_kb: 64,
1819            cleanup: CleanupConfig::default(),
1820            activity_timeout: 120,
1821            cluster_health: ClusterHealthConfig::default(),
1822            unix_socket: UnixSocketConfig::default(),
1823            delta_compression: DeltaCompressionOptionsConfig::default(),
1824            websocket: WebSocketConfig::default(),
1825            connection_recovery: ConnectionRecoveryConfig::default(),
1826            history: HistoryConfig::default(),
1827            presence_history: PresenceHistoryConfig::default(),
1828            idempotency: IdempotencyConfig::default(),
1829            ephemeral: EphemeralConfig::default(),
1830            echo_control: EchoControlConfig::default(),
1831            event_name_filtering: EventNameFilteringConfig::default(),
1832            versioned_messages: VersionedMessagesConfig::default(),
1833            annotations: AnnotationsConfig::default(),
1834        }
1835    }
1836}
1837
1838impl Default for SqsQueueConfig {
1839    fn default() -> Self {
1840        Self {
1841            region: "us-east-1".to_string(),
1842            queue_url_prefix: None,
1843            visibility_timeout: 30,
1844            endpoint_url: None,
1845            max_messages: 10,
1846            wait_time_seconds: 5,
1847            concurrency: 5,
1848            fifo: false,
1849            message_group_id: Some("default".to_string()),
1850        }
1851    }
1852}
1853
1854impl Default for SnsQueueConfig {
1855    fn default() -> Self {
1856        Self {
1857            region: "us-east-1".to_string(),
1858            topic_arn: String::new(),
1859            endpoint_url: None,
1860        }
1861    }
1862}
1863
1864impl Default for RedisAdapterConfig {
1865    fn default() -> Self {
1866        Self {
1867            requests_timeout: 5000,
1868            prefix: "sockudo_adapter:".to_string(),
1869            redis_pub_options: AHashMap::new(),
1870            redis_sub_options: AHashMap::new(),
1871            cluster_mode: false,
1872        }
1873    }
1874}
1875
1876impl Default for RedisClusterAdapterConfig {
1877    fn default() -> Self {
1878        Self {
1879            nodes: vec![],
1880            prefix: "sockudo_adapter:".to_string(),
1881            request_timeout_ms: 1000,
1882            use_connection_manager: true,
1883            use_sharded_pubsub: false,
1884        }
1885    }
1886}
1887
1888impl Default for NatsAdapterConfig {
1889    fn default() -> Self {
1890        Self {
1891            servers: vec!["nats://localhost:4222".to_string()],
1892            prefix: "sockudo_adapter:".to_string(),
1893            request_timeout_ms: 5000,
1894            username: None,
1895            password: None,
1896            token: None,
1897            connection_timeout_ms: 5000,
1898            nodes_number: None,
1899            discovery_max_wait_ms: 1000,
1900            discovery_idle_wait_ms: 150,
1901        }
1902    }
1903}
1904
1905impl Default for PulsarAdapterConfig {
1906    fn default() -> Self {
1907        Self {
1908            url: "pulsar://127.0.0.1:6650".to_string(),
1909            prefix: "sockudo-adapter".to_string(),
1910            request_timeout_ms: 5000,
1911            token: None,
1912            nodes_number: None,
1913        }
1914    }
1915}
1916
1917impl Default for RabbitMqAdapterConfig {
1918    fn default() -> Self {
1919        Self {
1920            url: "amqp://guest:guest@127.0.0.1:5672/%2f".to_string(),
1921            prefix: "sockudo_adapter".to_string(),
1922            request_timeout_ms: 5000,
1923            connection_timeout_ms: 5000,
1924            nodes_number: None,
1925        }
1926    }
1927}
1928
1929impl Default for GooglePubSubAdapterConfig {
1930    fn default() -> Self {
1931        Self {
1932            project_id: "".to_string(),
1933            prefix: "sockudo-adapter".to_string(),
1934            request_timeout_ms: 5000,
1935            emulator_host: None,
1936            nodes_number: None,
1937        }
1938    }
1939}
1940
1941impl Default for KafkaAdapterConfig {
1942    fn default() -> Self {
1943        Self {
1944            brokers: vec!["localhost:9092".to_string()],
1945            prefix: "sockudo_adapter".to_string(),
1946            request_timeout_ms: 5000,
1947            security_protocol: None,
1948            sasl_mechanism: None,
1949            sasl_username: None,
1950            sasl_password: None,
1951            nodes_number: None,
1952        }
1953    }
1954}
1955
1956impl Default for IggyConfig {
1957    fn default() -> Self {
1958        Self {
1959            connection_string: "iggy://iggy:iggy@127.0.0.1:8090".to_string(),
1960            username: None,
1961            password: None,
1962            consumer_name: None,
1963            stream: "sockudo".to_string(),
1964            topic_prefix: "sockudo-adapter".to_string(),
1965            queue_topic_prefix: "sockudo-queue".to_string(),
1966            consumer_group_prefix: "sockudo-workers".to_string(),
1967            request_timeout_ms: 5000,
1968            poll_interval_ms: 50,
1969            poll_batch_size: 100,
1970            partitions_count: 1,
1971            partition_id: 0,
1972            auto_create: true,
1973            start_from_latest: true,
1974            nodes_number: None,
1975        }
1976    }
1977}
1978
1979impl Default for CacheSettings {
1980    fn default() -> Self {
1981        Self {
1982            enabled: true,
1983            ttl: 300,
1984        }
1985    }
1986}
1987
1988impl Default for MemoryCacheOptions {
1989    fn default() -> Self {
1990        Self {
1991            ttl: 300,
1992            cleanup_interval: 60,
1993            max_capacity: 10000,
1994        }
1995    }
1996}
1997
1998impl Default for CacheConfig {
1999    fn default() -> Self {
2000        Self {
2001            driver: CacheDriver::default(),
2002            redis: RedisConfig {
2003                prefix: Some("sockudo_cache:".to_string()),
2004                url_override: None,
2005                cluster_mode: false,
2006            },
2007            memory: MemoryCacheOptions::default(),
2008        }
2009    }
2010}
2011
2012impl Default for ChannelLimits {
2013    fn default() -> Self {
2014        Self {
2015            max_name_length: 200,
2016            cache_ttl: 3600,
2017        }
2018    }
2019}
2020impl Default for CorsConfig {
2021    fn default() -> Self {
2022        Self {
2023            credentials: true,
2024            origin: vec!["*".to_string()],
2025            methods: vec!["GET".to_string(), "POST".to_string(), "OPTIONS".to_string()],
2026            allowed_headers: vec![
2027                "Authorization".to_string(),
2028                "Content-Type".to_string(),
2029                "X-Requested-With".to_string(),
2030                "Accept".to_string(),
2031            ],
2032        }
2033    }
2034}
2035
2036impl Default for DatabaseConnection {
2037    fn default() -> Self {
2038        Self {
2039            host: "localhost".to_string(),
2040            port: 3306,
2041            username: "root".to_string(),
2042            password: "".to_string(),
2043            database: "sockudo".to_string(),
2044            table_name: "applications".to_string(),
2045            connection_pool_size: 10,
2046            pool_min: None,
2047            pool_max: None,
2048            cache_ttl: 300,
2049            cache_cleanup_interval: 60,
2050            cache_max_capacity: 100,
2051        }
2052    }
2053}
2054
2055impl Default for RedisConnection {
2056    fn default() -> Self {
2057        Self {
2058            host: "127.0.0.1".to_string(),
2059            port: 6379,
2060            db: 0,
2061            username: None,
2062            password: None,
2063            key_prefix: "sockudo:".to_string(),
2064            sentinels: Vec::new(),
2065            sentinel_password: None,
2066            name: "mymaster".to_string(),
2067            cluster: RedisClusterConnection::default(),
2068            cluster_nodes: Vec::new(),
2069        }
2070    }
2071}
2072
2073impl Default for RedisSentinel {
2074    fn default() -> Self {
2075        Self {
2076            host: "localhost".to_string(),
2077            port: 26379,
2078        }
2079    }
2080}
2081
2082impl Default for ClusterNode {
2083    fn default() -> Self {
2084        Self {
2085            host: "127.0.0.1".to_string(),
2086            port: 7000,
2087        }
2088    }
2089}
2090
2091impl Default for DatabasePooling {
2092    fn default() -> Self {
2093        Self {
2094            enabled: true,
2095            min: 2,
2096            max: 10,
2097        }
2098    }
2099}
2100
2101impl Default for EventLimits {
2102    fn default() -> Self {
2103        Self {
2104            max_channels_at_once: 100,
2105            max_name_length: 200,
2106            max_payload_in_kb: 100,
2107            max_batch_size: 10,
2108        }
2109    }
2110}
2111
2112impl Default for IdempotencyConfig {
2113    fn default() -> Self {
2114        Self {
2115            enabled: true,
2116            ttl_seconds: 120,
2117            max_key_length: 128,
2118        }
2119    }
2120}
2121
2122impl Default for EphemeralConfig {
2123    fn default() -> Self {
2124        Self { enabled: true }
2125    }
2126}
2127
2128impl Default for EchoControlConfig {
2129    fn default() -> Self {
2130        Self {
2131            enabled: true,
2132            default_echo_messages: true,
2133        }
2134    }
2135}
2136
2137impl Default for EventNameFilteringConfig {
2138    fn default() -> Self {
2139        Self {
2140            enabled: true,
2141            max_events_per_filter: 50,
2142            max_event_name_length: 200,
2143        }
2144    }
2145}
2146
2147impl Default for ConnectionRecoveryConfig {
2148    fn default() -> Self {
2149        Self {
2150            enabled: false,
2151            buffer_ttl_seconds: 120,
2152            max_buffer_size: 100,
2153        }
2154    }
2155}
2156
2157impl Default for HttpApiConfig {
2158    fn default() -> Self {
2159        Self {
2160            request_limit_in_mb: 10,
2161            accept_traffic: AcceptTraffic::default(),
2162            usage_enabled: true,
2163        }
2164    }
2165}
2166
2167impl Default for AcceptTraffic {
2168    fn default() -> Self {
2169        Self {
2170            memory_threshold: 0.90,
2171        }
2172    }
2173}
2174
2175impl Default for InstanceConfig {
2176    fn default() -> Self {
2177        Self {
2178            process_id: uuid::Uuid::new_v4().to_string(),
2179        }
2180    }
2181}
2182
2183impl Default for LoggingConfig {
2184    fn default() -> Self {
2185        Self {
2186            colors_enabled: true,
2187            include_target: true,
2188        }
2189    }
2190}
2191
2192impl Default for MetricsConfig {
2193    fn default() -> Self {
2194        Self {
2195            enabled: true,
2196            driver: MetricsDriver::default(),
2197            host: "0.0.0.0".to_string(),
2198            prometheus: PrometheusConfig::default(),
2199            port: 9601,
2200        }
2201    }
2202}
2203
2204impl Default for PrometheusConfig {
2205    fn default() -> Self {
2206        Self {
2207            prefix: "sockudo_".to_string(),
2208        }
2209    }
2210}
2211
2212impl Default for PresenceConfig {
2213    fn default() -> Self {
2214        Self {
2215            max_members_per_channel: 100,
2216            max_member_size_in_kb: 2,
2217        }
2218    }
2219}
2220
2221impl Default for RedisQueueConfig {
2222    fn default() -> Self {
2223        Self {
2224            concurrency: 5,
2225            prefix: Some("sockudo_queue:".to_string()),
2226            url_override: None,
2227            cluster_mode: false,
2228        }
2229    }
2230}
2231
2232impl Default for RateLimit {
2233    fn default() -> Self {
2234        Self {
2235            max_requests: 60,
2236            window_seconds: 60,
2237            identifier: Some("default".to_string()),
2238            trust_hops: Some(0),
2239        }
2240    }
2241}
2242
2243impl Default for RateLimiterConfig {
2244    fn default() -> Self {
2245        Self {
2246            enabled: true,
2247            driver: CacheDriver::Memory,
2248            api_rate_limit: RateLimit {
2249                max_requests: 100,
2250                window_seconds: 60,
2251                identifier: Some("api".to_string()),
2252                trust_hops: Some(0),
2253            },
2254            websocket_rate_limit: RateLimit {
2255                max_requests: 20,
2256                window_seconds: 60,
2257                identifier: Some("websocket_connect".to_string()),
2258                trust_hops: Some(0),
2259            },
2260            redis: RedisConfig {
2261                prefix: Some("sockudo_rl:".to_string()),
2262                url_override: None,
2263                cluster_mode: false,
2264            },
2265        }
2266    }
2267}
2268
2269impl Default for SslConfig {
2270    fn default() -> Self {
2271        Self {
2272            enabled: false,
2273            cert_path: "".to_string(),
2274            key_path: "".to_string(),
2275            passphrase: None,
2276            ca_path: None,
2277            redirect_http: false,
2278            http_port: Some(80),
2279        }
2280    }
2281}
2282
2283impl Default for BatchingConfig {
2284    fn default() -> Self {
2285        Self {
2286            enabled: true,
2287            duration: 50,
2288            size: 100,
2289        }
2290    }
2291}
2292
2293impl Default for ClusterHealthConfig {
2294    fn default() -> Self {
2295        Self {
2296            enabled: true,
2297            heartbeat_interval_ms: 10000,
2298            node_timeout_ms: 30000,
2299            cleanup_interval_ms: 10000,
2300        }
2301    }
2302}
2303
2304impl Default for UnixSocketConfig {
2305    fn default() -> Self {
2306        Self {
2307            enabled: false,
2308            path: "/var/run/sockudo/sockudo.sock".to_string(),
2309            permission_mode: 0o660,
2310        }
2311    }
2312}
2313
2314impl Default for DeltaCompressionOptionsConfig {
2315    fn default() -> Self {
2316        Self {
2317            enabled: true,
2318            algorithm: "fossil".to_string(),
2319            full_message_interval: 10,
2320            min_message_size: 100,
2321            max_state_age_secs: 300,
2322            max_channel_states_per_socket: 100,
2323            max_conflation_states_per_channel: Some(100),
2324            conflation_key_path: None,
2325            cluster_coordination: false,
2326            coordination_backend: DeltaCoordinationBackend::Auto,
2327            omit_delta_algorithm: false,
2328        }
2329    }
2330}
2331
2332#[cfg(test)]
2333mod tests {
2334    use super::{DeltaCoordinationBackend, QueueDriver, ServerOptions, VersionStoreDriver};
2335    use std::str::FromStr;
2336
2337    #[test]
2338    fn queue_driver_parses_broker_backends() {
2339        assert_eq!(
2340            QueueDriver::from_str("rabbitmq").unwrap(),
2341            QueueDriver::RabbitMq
2342        );
2343        assert_eq!(QueueDriver::from_str("kafka").unwrap(), QueueDriver::Kafka);
2344        assert_eq!(
2345            QueueDriver::from_str("pulsar").unwrap(),
2346            QueueDriver::Pulsar
2347        );
2348        assert_eq!(
2349            QueueDriver::from_str("google-pubsub").unwrap(),
2350            QueueDriver::GooglePubSub
2351        );
2352    }
2353
2354    #[test]
2355    fn delta_coordination_backend_parses_expected_values() {
2356        assert_eq!(
2357            DeltaCoordinationBackend::from_str("auto").unwrap(),
2358            DeltaCoordinationBackend::Auto
2359        );
2360        assert_eq!(
2361            DeltaCoordinationBackend::from_str("redis-cluster").unwrap(),
2362            DeltaCoordinationBackend::RedisCluster
2363        );
2364        assert_eq!(
2365            DeltaCoordinationBackend::from_str("nats").unwrap(),
2366            DeltaCoordinationBackend::Nats
2367        );
2368    }
2369
2370    #[tokio::test]
2371    async fn versioned_messages_driver_overrides_from_env() {
2372        let previous = std::env::var("VERSIONED_MESSAGES_DRIVER").ok();
2373        // SAFETY: This test controls the environment variable lifecycle for a
2374        // single key and restores the prior value before it returns.
2375        unsafe { std::env::set_var("VERSIONED_MESSAGES_DRIVER", "postgres") };
2376
2377        let mut options = ServerOptions::default();
2378        options.override_from_env().await.unwrap();
2379
2380        if let Some(previous) = previous {
2381            // SAFETY: Restoring the pre-test value for the same key.
2382            unsafe { std::env::set_var("VERSIONED_MESSAGES_DRIVER", previous) };
2383        } else {
2384            // SAFETY: Removing the test-only environment variable before exit.
2385            unsafe { std::env::remove_var("VERSIONED_MESSAGES_DRIVER") };
2386        }
2387
2388        assert_eq!(
2389            options.versioned_messages.driver,
2390            VersionStoreDriver::Postgres
2391        );
2392    }
2393}
2394
2395impl ClusterHealthConfig {
2396    pub fn validate(&self) -> Result<(), String> {
2397        if self.heartbeat_interval_ms == 0 {
2398            return Err("heartbeat_interval_ms must be greater than 0".to_string());
2399        }
2400        if self.node_timeout_ms == 0 {
2401            return Err("node_timeout_ms must be greater than 0".to_string());
2402        }
2403        if self.cleanup_interval_ms == 0 {
2404            return Err("cleanup_interval_ms must be greater than 0".to_string());
2405        }
2406
2407        if self.heartbeat_interval_ms > self.node_timeout_ms / 3 {
2408            return Err(format!(
2409                "heartbeat_interval_ms ({}) should be at least 3x smaller than node_timeout_ms ({}) to avoid false positive dead node detection. Recommended: heartbeat_interval_ms <= {}",
2410                self.heartbeat_interval_ms,
2411                self.node_timeout_ms,
2412                self.node_timeout_ms / 3
2413            ));
2414        }
2415
2416        if self.cleanup_interval_ms > self.node_timeout_ms {
2417            return Err(format!(
2418                "cleanup_interval_ms ({}) should not be larger than node_timeout_ms ({}) to ensure timely dead node detection",
2419                self.cleanup_interval_ms, self.node_timeout_ms
2420            ));
2421        }
2422
2423        Ok(())
2424    }
2425}
2426
2427impl ServerOptions {
2428    pub async fn load_from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
2429        let content = tokio::fs::read_to_string(path).await?;
2430        let options: Self = if path.ends_with(".toml") {
2431            toml::from_str(&content)?
2432        } else {
2433            // Legacy JSON support
2434            sonic_rs::from_str(&content)?
2435        };
2436        Ok(options)
2437    }
2438
2439    pub async fn override_from_env(&mut self) -> Result<(), Box<dyn std::error::Error>> {
2440        // --- General Settings ---
2441        if let Ok(mode) = std::env::var("ENVIRONMENT") {
2442            self.mode = mode;
2443        }
2444        self.debug = parse_bool_env("DEBUG_MODE", self.debug);
2445        if parse_bool_env("DEBUG", false) {
2446            self.debug = true;
2447            info!("DEBUG environment variable forces debug mode ON");
2448        }
2449
2450        self.activity_timeout = parse_env::<u64>("ACTIVITY_TIMEOUT", self.activity_timeout);
2451
2452        if let Ok(host) = std::env::var("HOST") {
2453            self.host = host;
2454        }
2455        self.port = parse_env::<u16>("PORT", self.port);
2456        self.shutdown_grace_period =
2457            parse_env::<u64>("SHUTDOWN_GRACE_PERIOD", self.shutdown_grace_period);
2458        self.user_authentication_timeout = parse_env::<u64>(
2459            "USER_AUTHENTICATION_TIMEOUT",
2460            self.user_authentication_timeout,
2461        );
2462        self.websocket_max_payload_kb =
2463            parse_env::<u32>("WEBSOCKET_MAX_PAYLOAD_KB", self.websocket_max_payload_kb);
2464        if let Ok(id) = std::env::var("INSTANCE_PROCESS_ID") {
2465            self.instance.process_id = id;
2466        }
2467
2468        // --- Driver Configuration ---
2469        if let Ok(driver_str) = std::env::var("ADAPTER_DRIVER") {
2470            self.adapter.driver =
2471                parse_driver_enum(driver_str, self.adapter.driver.clone(), "Adapter");
2472        }
2473        self.adapter.buffer_multiplier_per_cpu = parse_env::<usize>(
2474            "ADAPTER_BUFFER_MULTIPLIER_PER_CPU",
2475            self.adapter.buffer_multiplier_per_cpu,
2476        );
2477        self.adapter.enable_socket_counting = parse_env::<bool>(
2478            "ADAPTER_ENABLE_SOCKET_COUNTING",
2479            self.adapter.enable_socket_counting,
2480        );
2481        self.adapter.fallback_to_local =
2482            parse_env::<bool>("ADAPTER_FALLBACK_TO_LOCAL", self.adapter.fallback_to_local);
2483        if let Ok(driver_str) = std::env::var("CACHE_DRIVER") {
2484            self.cache.driver = parse_driver_enum(driver_str, self.cache.driver.clone(), "Cache");
2485        }
2486        if let Ok(driver_str) = std::env::var("QUEUE_DRIVER") {
2487            self.queue.driver = parse_driver_enum(driver_str, self.queue.driver.clone(), "Queue");
2488        }
2489        if let Ok(driver_str) = std::env::var("APP_MANAGER_DRIVER") {
2490            self.app_manager.driver =
2491                parse_driver_enum(driver_str, self.app_manager.driver.clone(), "AppManager");
2492        }
2493        if let Ok(driver_str) = std::env::var("RATE_LIMITER_DRIVER") {
2494            self.rate_limiter.driver = parse_driver_enum(
2495                driver_str,
2496                self.rate_limiter.driver.clone(),
2497                "RateLimiter Backend",
2498            );
2499        }
2500
2501        // --- Database: Redis ---
2502        if let Ok(host) = std::env::var("DATABASE_REDIS_HOST") {
2503            self.database.redis.host = host;
2504        }
2505        self.database.redis.port =
2506            parse_env::<u16>("DATABASE_REDIS_PORT", self.database.redis.port);
2507        if let Ok(username) = std::env::var("DATABASE_REDIS_USERNAME") {
2508            self.database.redis.username = if username.is_empty() {
2509                None
2510            } else {
2511                Some(username)
2512            };
2513        }
2514        if let Ok(password) = std::env::var("DATABASE_REDIS_PASSWORD") {
2515            self.database.redis.password = Some(password);
2516        }
2517        self.database.redis.db = parse_env::<u32>("DATABASE_REDIS_DB", self.database.redis.db);
2518        if let Ok(prefix) = std::env::var("DATABASE_REDIS_KEY_PREFIX") {
2519            self.database.redis.key_prefix = prefix;
2520        }
2521        if let Ok(cluster_username) = std::env::var("DATABASE_REDIS_CLUSTER_USERNAME") {
2522            self.database.redis.cluster.username = if cluster_username.is_empty() {
2523                None
2524            } else {
2525                Some(cluster_username)
2526            };
2527        }
2528        if let Ok(cluster_password) = std::env::var("DATABASE_REDIS_CLUSTER_PASSWORD") {
2529            self.database.redis.cluster.password = Some(cluster_password);
2530        }
2531        self.database.redis.cluster.use_tls = parse_bool_env(
2532            "DATABASE_REDIS_CLUSTER_USE_TLS",
2533            self.database.redis.cluster.use_tls,
2534        );
2535
2536        // --- Database: MySQL ---
2537        if let Ok(host) = std::env::var("DATABASE_MYSQL_HOST") {
2538            self.database.mysql.host = host;
2539        }
2540        self.database.mysql.port =
2541            parse_env::<u16>("DATABASE_MYSQL_PORT", self.database.mysql.port);
2542        if let Ok(user) = std::env::var("DATABASE_MYSQL_USERNAME") {
2543            self.database.mysql.username = user;
2544        }
2545        if let Ok(pass) = std::env::var("DATABASE_MYSQL_PASSWORD") {
2546            self.database.mysql.password = pass;
2547        }
2548        if let Ok(db) = std::env::var("DATABASE_MYSQL_DATABASE") {
2549            self.database.mysql.database = db;
2550        }
2551        if let Ok(table) = std::env::var("DATABASE_MYSQL_TABLE_NAME") {
2552            self.database.mysql.table_name = table;
2553        }
2554        override_db_pool_settings(&mut self.database.mysql, "DATABASE_MYSQL");
2555
2556        // --- Database: PostgreSQL ---
2557        if let Ok(host) = std::env::var("DATABASE_POSTGRES_HOST") {
2558            self.database.postgres.host = host;
2559        }
2560        self.database.postgres.port =
2561            parse_env::<u16>("DATABASE_POSTGRES_PORT", self.database.postgres.port);
2562        if let Ok(user) = std::env::var("DATABASE_POSTGRES_USERNAME") {
2563            self.database.postgres.username = user;
2564        }
2565        if let Ok(pass) = std::env::var("DATABASE_POSTGRES_PASSWORD") {
2566            self.database.postgres.password = pass;
2567        }
2568        if let Ok(db) = std::env::var("DATABASE_POSTGRES_DATABASE") {
2569            self.database.postgres.database = db;
2570        }
2571        override_db_pool_settings(&mut self.database.postgres, "DATABASE_POSTGRES");
2572
2573        // --- Database: DynamoDB ---
2574        if let Ok(region) = std::env::var("DATABASE_DYNAMODB_REGION") {
2575            self.database.dynamodb.region = region;
2576        }
2577        if let Ok(table) = std::env::var("DATABASE_DYNAMODB_TABLE_NAME") {
2578            self.database.dynamodb.table_name = table;
2579        }
2580        if let Ok(endpoint) = std::env::var("DATABASE_DYNAMODB_ENDPOINT_URL") {
2581            self.database.dynamodb.endpoint_url = Some(endpoint);
2582        }
2583        if let Ok(key_id) = std::env::var("AWS_ACCESS_KEY_ID") {
2584            self.database.dynamodb.aws_access_key_id = Some(key_id);
2585        }
2586        if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") {
2587            self.database.dynamodb.aws_secret_access_key = Some(secret);
2588        }
2589
2590        // --- Database: SurrealDB ---
2591        if let Ok(url) = std::env::var("DATABASE_SURREALDB_URL") {
2592            self.database.surrealdb.url = url;
2593        }
2594        if let Ok(namespace) = std::env::var("DATABASE_SURREALDB_NAMESPACE") {
2595            self.database.surrealdb.namespace = namespace;
2596        }
2597        if let Ok(database) = std::env::var("DATABASE_SURREALDB_DATABASE") {
2598            self.database.surrealdb.database = database;
2599        }
2600        if let Ok(username) = std::env::var("DATABASE_SURREALDB_USERNAME") {
2601            self.database.surrealdb.username = username;
2602        }
2603        if let Ok(password) = std::env::var("DATABASE_SURREALDB_PASSWORD") {
2604            self.database.surrealdb.password = password;
2605        }
2606        if let Ok(table) = std::env::var("DATABASE_SURREALDB_TABLE_NAME") {
2607            self.database.surrealdb.table_name = table;
2608        }
2609
2610        // --- Redis Cluster ---
2611        let apply_redis_cluster_nodes = |options: &mut Self, nodes: &str| {
2612            let node_list: Vec<String> = nodes
2613                .split(',')
2614                .map(|s| s.trim())
2615                .filter(|s| !s.is_empty())
2616                .map(ToString::to_string)
2617                .collect();
2618
2619            options.adapter.cluster.nodes = node_list.clone();
2620            options.queue.redis_cluster.nodes = node_list.clone();
2621
2622            let parsed_nodes: Vec<ClusterNode> = node_list
2623                .iter()
2624                .filter_map(|seed| ClusterNode::from_seed(seed))
2625                .collect();
2626            options.database.redis.cluster.nodes = parsed_nodes.clone();
2627            options.database.redis.cluster_nodes = parsed_nodes;
2628        };
2629
2630        if let Ok(nodes) = std::env::var("REDIS_CLUSTER_NODES") {
2631            apply_redis_cluster_nodes(self, &nodes);
2632        }
2633        if let Ok(nodes) = std::env::var("DATABASE_REDIS_CLUSTER_NODES") {
2634            apply_redis_cluster_nodes(self, &nodes);
2635        }
2636        self.queue.redis_cluster.concurrency = parse_env::<u32>(
2637            "REDIS_CLUSTER_QUEUE_CONCURRENCY",
2638            self.queue.redis_cluster.concurrency,
2639        );
2640        if let Ok(prefix) = std::env::var("REDIS_CLUSTER_QUEUE_PREFIX") {
2641            self.queue.redis_cluster.prefix = Some(prefix);
2642        }
2643
2644        // --- SSL Configuration ---
2645        self.ssl.enabled = parse_bool_env("SSL_ENABLED", self.ssl.enabled);
2646        if let Ok(val) = std::env::var("SSL_CERT_PATH") {
2647            self.ssl.cert_path = val;
2648        }
2649        if let Ok(val) = std::env::var("SSL_KEY_PATH") {
2650            self.ssl.key_path = val;
2651        }
2652        self.ssl.redirect_http = parse_bool_env("SSL_REDIRECT_HTTP", self.ssl.redirect_http);
2653        if let Some(port) = parse_env_optional::<u16>("SSL_HTTP_PORT") {
2654            self.ssl.http_port = Some(port);
2655        }
2656
2657        // --- Unix Socket Configuration ---
2658        self.unix_socket.enabled = parse_bool_env("UNIX_SOCKET_ENABLED", self.unix_socket.enabled);
2659        if let Ok(path) = std::env::var("UNIX_SOCKET_PATH") {
2660            self.unix_socket.path = path;
2661        }
2662        if let Ok(mode_str) = std::env::var("UNIX_SOCKET_PERMISSION_MODE") {
2663            if mode_str.chars().all(|c| c.is_digit(8)) {
2664                if let Ok(mode) = u32::from_str_radix(&mode_str, 8) {
2665                    if mode <= 0o777 {
2666                        self.unix_socket.permission_mode = mode;
2667                    } else {
2668                        warn!(
2669                            "UNIX_SOCKET_PERMISSION_MODE '{}' exceeds maximum value 777. Using default: {:o}",
2670                            mode_str, self.unix_socket.permission_mode
2671                        );
2672                    }
2673                } else {
2674                    warn!(
2675                        "Failed to parse UNIX_SOCKET_PERMISSION_MODE '{}' as octal. Using default: {:o}",
2676                        mode_str, self.unix_socket.permission_mode
2677                    );
2678                }
2679            } else {
2680                warn!(
2681                    "UNIX_SOCKET_PERMISSION_MODE '{}' must contain only octal digits (0-7). Using default: {:o}",
2682                    mode_str, self.unix_socket.permission_mode
2683                );
2684            }
2685        }
2686
2687        // --- Metrics ---
2688        if let Ok(driver_str) = std::env::var("METRICS_DRIVER") {
2689            self.metrics.driver =
2690                parse_driver_enum(driver_str, self.metrics.driver.clone(), "Metrics");
2691        }
2692        self.metrics.enabled = parse_bool_env("METRICS_ENABLED", self.metrics.enabled);
2693        if let Ok(val) = std::env::var("METRICS_HOST") {
2694            self.metrics.host = val;
2695        }
2696        self.metrics.port = parse_env::<u16>("METRICS_PORT", self.metrics.port);
2697        if let Ok(val) = std::env::var("METRICS_PROMETHEUS_PREFIX") {
2698            self.metrics.prometheus.prefix = val;
2699        }
2700
2701        // --- HTTP API ---
2702        self.http_api.usage_enabled =
2703            parse_bool_env("HTTP_API_USAGE_ENABLED", self.http_api.usage_enabled);
2704
2705        // --- Rate Limiter ---
2706        self.rate_limiter.enabled =
2707            parse_bool_env("RATE_LIMITER_ENABLED", self.rate_limiter.enabled);
2708        self.rate_limiter.api_rate_limit.max_requests = parse_env::<u32>(
2709            "RATE_LIMITER_API_MAX_REQUESTS",
2710            self.rate_limiter.api_rate_limit.max_requests,
2711        );
2712        self.rate_limiter.api_rate_limit.window_seconds = parse_env::<u64>(
2713            "RATE_LIMITER_API_WINDOW_SECONDS",
2714            self.rate_limiter.api_rate_limit.window_seconds,
2715        );
2716        if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_API_TRUST_HOPS") {
2717            self.rate_limiter.api_rate_limit.trust_hops = Some(hops);
2718        }
2719        self.rate_limiter.websocket_rate_limit.max_requests = parse_env::<u32>(
2720            "RATE_LIMITER_WS_MAX_REQUESTS",
2721            self.rate_limiter.websocket_rate_limit.max_requests,
2722        );
2723        self.rate_limiter.websocket_rate_limit.window_seconds = parse_env::<u64>(
2724            "RATE_LIMITER_WS_WINDOW_SECONDS",
2725            self.rate_limiter.websocket_rate_limit.window_seconds,
2726        );
2727        if let Ok(prefix) = std::env::var("RATE_LIMITER_REDIS_PREFIX") {
2728            self.rate_limiter.redis.prefix = Some(prefix);
2729        }
2730
2731        // --- Queue: Redis ---
2732        self.queue.redis.concurrency =
2733            parse_env::<u32>("QUEUE_REDIS_CONCURRENCY", self.queue.redis.concurrency);
2734        if let Ok(prefix) = std::env::var("QUEUE_REDIS_PREFIX") {
2735            self.queue.redis.prefix = Some(prefix);
2736        }
2737
2738        // --- Queue: SQS ---
2739        if let Ok(region) = std::env::var("QUEUE_SQS_REGION") {
2740            self.queue.sqs.region = region;
2741        }
2742        self.queue.sqs.visibility_timeout = parse_env::<i32>(
2743            "QUEUE_SQS_VISIBILITY_TIMEOUT",
2744            self.queue.sqs.visibility_timeout,
2745        );
2746        self.queue.sqs.max_messages =
2747            parse_env::<i32>("QUEUE_SQS_MAX_MESSAGES", self.queue.sqs.max_messages);
2748        self.queue.sqs.wait_time_seconds = parse_env::<i32>(
2749            "QUEUE_SQS_WAIT_TIME_SECONDS",
2750            self.queue.sqs.wait_time_seconds,
2751        );
2752        self.queue.sqs.concurrency =
2753            parse_env::<u32>("QUEUE_SQS_CONCURRENCY", self.queue.sqs.concurrency);
2754        self.queue.sqs.fifo = parse_bool_env("QUEUE_SQS_FIFO", self.queue.sqs.fifo);
2755        if let Ok(endpoint) = std::env::var("QUEUE_SQS_ENDPOINT_URL") {
2756            self.queue.sqs.endpoint_url = Some(endpoint);
2757        }
2758
2759        // --- Queue: SNS ---
2760        if let Ok(region) = std::env::var("QUEUE_SNS_REGION") {
2761            self.queue.sns.region = region;
2762        }
2763        if let Ok(topic_arn) = std::env::var("QUEUE_SNS_TOPIC_ARN") {
2764            self.queue.sns.topic_arn = topic_arn;
2765        }
2766        if let Ok(endpoint) = std::env::var("QUEUE_SNS_ENDPOINT_URL") {
2767            self.queue.sns.endpoint_url = Some(endpoint);
2768        }
2769
2770        // --- Webhooks ---
2771        self.webhooks.batching.enabled =
2772            parse_bool_env("WEBHOOK_BATCHING_ENABLED", self.webhooks.batching.enabled);
2773        self.webhooks.batching.duration =
2774            parse_env::<u64>("WEBHOOK_BATCHING_DURATION", self.webhooks.batching.duration);
2775        self.webhooks.batching.size =
2776            parse_env::<usize>("WEBHOOK_BATCHING_SIZE", self.webhooks.batching.size);
2777
2778        // --- NATS Adapter ---
2779        if let Ok(servers) = std::env::var("NATS_SERVERS") {
2780            self.adapter.nats.servers = servers.split(',').map(|s| s.trim().to_string()).collect();
2781        }
2782        if let Ok(user) = std::env::var("NATS_USERNAME") {
2783            self.adapter.nats.username = Some(user);
2784        }
2785        if let Ok(pass) = std::env::var("NATS_PASSWORD") {
2786            self.adapter.nats.password = Some(pass);
2787        }
2788        if let Ok(token) = std::env::var("NATS_TOKEN") {
2789            self.adapter.nats.token = Some(token);
2790        }
2791        if let Ok(prefix) = std::env::var("NATS_PREFIX") {
2792            self.adapter.nats.prefix = prefix;
2793        }
2794        self.adapter.nats.connection_timeout_ms = parse_env::<u64>(
2795            "NATS_CONNECTION_TIMEOUT_MS",
2796            self.adapter.nats.connection_timeout_ms,
2797        );
2798        self.adapter.nats.request_timeout_ms = parse_env::<u64>(
2799            "NATS_REQUEST_TIMEOUT_MS",
2800            self.adapter.nats.request_timeout_ms,
2801        );
2802        self.adapter.nats.discovery_max_wait_ms = parse_env::<u64>(
2803            "NATS_DISCOVERY_MAX_WAIT_MS",
2804            self.adapter.nats.discovery_max_wait_ms,
2805        );
2806        self.adapter.nats.discovery_idle_wait_ms = parse_env::<u64>(
2807            "NATS_DISCOVERY_IDLE_WAIT_MS",
2808            self.adapter.nats.discovery_idle_wait_ms,
2809        );
2810        if let Some(nodes) = parse_env_optional::<u32>("NATS_NODES_NUMBER") {
2811            self.adapter.nats.nodes_number = Some(nodes);
2812        }
2813
2814        // --- Pulsar Adapter ---
2815        if let Ok(url) = std::env::var("PULSAR_URL") {
2816            self.adapter.pulsar.url = url;
2817        }
2818        if let Ok(prefix) = std::env::var("PULSAR_PREFIX") {
2819            self.adapter.pulsar.prefix = prefix;
2820        }
2821        if let Ok(token) = std::env::var("PULSAR_TOKEN") {
2822            self.adapter.pulsar.token = Some(token);
2823        }
2824        self.adapter.pulsar.request_timeout_ms = parse_env::<u64>(
2825            "PULSAR_REQUEST_TIMEOUT_MS",
2826            self.adapter.pulsar.request_timeout_ms,
2827        );
2828        if let Some(nodes) = parse_env_optional::<u32>("PULSAR_NODES_NUMBER") {
2829            self.adapter.pulsar.nodes_number = Some(nodes);
2830        }
2831
2832        // --- RabbitMQ Adapter ---
2833        if let Ok(url) = std::env::var("RABBITMQ_URL") {
2834            self.adapter.rabbitmq.url = url;
2835        }
2836        if let Ok(prefix) = std::env::var("RABBITMQ_PREFIX") {
2837            self.adapter.rabbitmq.prefix = prefix;
2838        }
2839        self.adapter.rabbitmq.connection_timeout_ms = parse_env::<u64>(
2840            "RABBITMQ_CONNECTION_TIMEOUT_MS",
2841            self.adapter.rabbitmq.connection_timeout_ms,
2842        );
2843        self.adapter.rabbitmq.request_timeout_ms = parse_env::<u64>(
2844            "RABBITMQ_REQUEST_TIMEOUT_MS",
2845            self.adapter.rabbitmq.request_timeout_ms,
2846        );
2847        if let Some(nodes) = parse_env_optional::<u32>("RABBITMQ_NODES_NUMBER") {
2848            self.adapter.rabbitmq.nodes_number = Some(nodes);
2849        }
2850
2851        // --- Google Pub/Sub Adapter ---
2852        if let Ok(project_id) = std::env::var("GOOGLE_PUBSUB_PROJECT_ID") {
2853            self.adapter.google_pubsub.project_id = project_id;
2854        }
2855        if let Ok(prefix) = std::env::var("GOOGLE_PUBSUB_PREFIX") {
2856            self.adapter.google_pubsub.prefix = prefix;
2857        }
2858        if let Ok(emulator_host) = std::env::var("PUBSUB_EMULATOR_HOST") {
2859            self.adapter.google_pubsub.emulator_host = Some(emulator_host);
2860        }
2861        self.adapter.google_pubsub.request_timeout_ms = parse_env::<u64>(
2862            "GOOGLE_PUBSUB_REQUEST_TIMEOUT_MS",
2863            self.adapter.google_pubsub.request_timeout_ms,
2864        );
2865        if let Some(nodes) = parse_env_optional::<u32>("GOOGLE_PUBSUB_NODES_NUMBER") {
2866            self.adapter.google_pubsub.nodes_number = Some(nodes);
2867        }
2868
2869        // --- Kafka Adapter ---
2870        if let Ok(brokers) = std::env::var("KAFKA_BROKERS") {
2871            self.adapter.kafka.brokers = brokers.split(',').map(|s| s.trim().to_string()).collect();
2872        }
2873        if let Ok(prefix) = std::env::var("KAFKA_PREFIX") {
2874            self.adapter.kafka.prefix = prefix;
2875        }
2876        if let Ok(protocol) = std::env::var("KAFKA_SECURITY_PROTOCOL") {
2877            self.adapter.kafka.security_protocol = Some(protocol);
2878        }
2879        if let Ok(mechanism) = std::env::var("KAFKA_SASL_MECHANISM") {
2880            self.adapter.kafka.sasl_mechanism = Some(mechanism);
2881        }
2882        if let Ok(username) = std::env::var("KAFKA_SASL_USERNAME") {
2883            self.adapter.kafka.sasl_username = Some(username);
2884        }
2885        if let Ok(password) = std::env::var("KAFKA_SASL_PASSWORD") {
2886            self.adapter.kafka.sasl_password = Some(password);
2887        }
2888        self.adapter.kafka.request_timeout_ms = parse_env::<u64>(
2889            "KAFKA_REQUEST_TIMEOUT_MS",
2890            self.adapter.kafka.request_timeout_ms,
2891        );
2892        if let Some(nodes) = parse_env_optional::<u32>("KAFKA_NODES_NUMBER") {
2893            self.adapter.kafka.nodes_number = Some(nodes);
2894        }
2895
2896        // --- Apache Iggy Adapter / Queue ---
2897        if let Ok(connection_string) = std::env::var("IGGY_CONNECTION_STRING") {
2898            self.adapter.iggy.connection_string = connection_string.clone();
2899            self.queue.iggy.connection_string = connection_string;
2900        }
2901        if let Ok(username) = std::env::var("IGGY_USERNAME") {
2902            let username = (!username.is_empty()).then_some(username);
2903            self.adapter.iggy.username = username.clone();
2904            self.queue.iggy.username = username;
2905        }
2906        if let Ok(password) = std::env::var("IGGY_PASSWORD") {
2907            let password = (!password.is_empty()).then_some(password);
2908            self.adapter.iggy.password = password.clone();
2909            self.queue.iggy.password = password;
2910        }
2911        if let Ok(consumer_name) = std::env::var("IGGY_CONSUMER_NAME") {
2912            let consumer_name = (!consumer_name.is_empty()).then_some(consumer_name);
2913            self.adapter.iggy.consumer_name = consumer_name.clone();
2914            self.queue.iggy.consumer_name = consumer_name;
2915        } else if let Ok(process_id) = std::env::var("INSTANCE_PROCESS_ID") {
2916            let process_id = (!process_id.is_empty()).then_some(process_id);
2917            self.adapter.iggy.consumer_name = process_id.clone();
2918            self.queue.iggy.consumer_name = process_id;
2919        }
2920        if let Ok(stream) = std::env::var("IGGY_STREAM") {
2921            self.adapter.iggy.stream = stream.clone();
2922            self.queue.iggy.stream = stream;
2923        }
2924        if let Ok(prefix) = std::env::var("IGGY_TOPIC_PREFIX") {
2925            self.adapter.iggy.topic_prefix = prefix;
2926        }
2927        if let Ok(prefix) = std::env::var("IGGY_QUEUE_TOPIC_PREFIX") {
2928            self.queue.iggy.queue_topic_prefix = prefix;
2929        }
2930        if let Ok(prefix) = std::env::var("IGGY_CONSUMER_GROUP_PREFIX") {
2931            self.queue.iggy.consumer_group_prefix = prefix;
2932        }
2933        self.adapter.iggy.request_timeout_ms = parse_env::<u64>(
2934            "IGGY_REQUEST_TIMEOUT_MS",
2935            self.adapter.iggy.request_timeout_ms,
2936        );
2937        self.queue.iggy.request_timeout_ms = parse_env::<u64>(
2938            "IGGY_REQUEST_TIMEOUT_MS",
2939            self.queue.iggy.request_timeout_ms,
2940        );
2941        self.adapter.iggy.poll_interval_ms =
2942            parse_env::<u64>("IGGY_POLL_INTERVAL_MS", self.adapter.iggy.poll_interval_ms);
2943        self.queue.iggy.poll_interval_ms =
2944            parse_env::<u64>("IGGY_POLL_INTERVAL_MS", self.queue.iggy.poll_interval_ms);
2945        self.adapter.iggy.poll_batch_size =
2946            parse_env::<u32>("IGGY_POLL_BATCH_SIZE", self.adapter.iggy.poll_batch_size);
2947        self.queue.iggy.poll_batch_size =
2948            parse_env::<u32>("IGGY_POLL_BATCH_SIZE", self.queue.iggy.poll_batch_size);
2949        self.adapter.iggy.partitions_count = parse_env::<u32>(
2950            "ADAPTER_IGGY_PARTITIONS_COUNT",
2951            parse_env::<u32>("IGGY_PARTITIONS_COUNT", self.adapter.iggy.partitions_count),
2952        );
2953        self.queue.iggy.partitions_count = parse_env::<u32>(
2954            "QUEUE_IGGY_PARTITIONS_COUNT",
2955            parse_env::<u32>("IGGY_PARTITIONS_COUNT", self.queue.iggy.partitions_count),
2956        );
2957        self.adapter.iggy.partition_id = parse_env::<u32>(
2958            "ADAPTER_IGGY_PARTITION_ID",
2959            parse_env::<u32>("IGGY_PARTITION_ID", self.adapter.iggy.partition_id),
2960        );
2961        self.queue.iggy.partition_id = parse_env::<u32>(
2962            "QUEUE_IGGY_PARTITION_ID",
2963            parse_env::<u32>("IGGY_PARTITION_ID", self.queue.iggy.partition_id),
2964        );
2965        self.adapter.iggy.auto_create =
2966            parse_env::<bool>("IGGY_AUTO_CREATE", self.adapter.iggy.auto_create);
2967        self.queue.iggy.auto_create =
2968            parse_env::<bool>("IGGY_AUTO_CREATE", self.queue.iggy.auto_create);
2969        self.adapter.iggy.start_from_latest = parse_env::<bool>(
2970            "IGGY_START_FROM_LATEST",
2971            self.adapter.iggy.start_from_latest,
2972        );
2973        if let Some(nodes) = parse_env_optional::<u32>("IGGY_NODES_NUMBER") {
2974            self.adapter.iggy.nodes_number = Some(nodes);
2975            self.queue.iggy.nodes_number = Some(nodes);
2976        }
2977
2978        // --- CORS ---
2979        if let Ok(origins) = std::env::var("CORS_ORIGINS") {
2980            let parsed: Vec<String> = origins.split(',').map(|s| s.trim().to_string()).collect();
2981            if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&parsed) {
2982                warn!(
2983                    "CORS_ORIGINS contains invalid patterns: {}. Keeping previous CORS origins.",
2984                    e
2985                );
2986            } else {
2987                self.cors.origin = parsed;
2988            }
2989        }
2990        if let Ok(methods) = std::env::var("CORS_METHODS") {
2991            self.cors.methods = methods.split(',').map(|s| s.trim().to_string()).collect();
2992        }
2993        if let Ok(headers) = std::env::var("CORS_HEADERS") {
2994            self.cors.allowed_headers = headers.split(',').map(|s| s.trim().to_string()).collect();
2995        }
2996        self.cors.credentials = parse_bool_env("CORS_CREDENTIALS", self.cors.credentials);
2997
2998        // --- Performance Tuning ---
2999        self.database_pooling.enabled =
3000            parse_bool_env("DATABASE_POOLING_ENABLED", self.database_pooling.enabled);
3001        if let Some(min) = parse_env_optional::<u32>("DATABASE_POOL_MIN") {
3002            self.database_pooling.min = min;
3003        }
3004        if let Some(max) = parse_env_optional::<u32>("DATABASE_POOL_MAX") {
3005            self.database_pooling.max = max;
3006        }
3007
3008        if let Some(pool_size) = parse_env_optional::<u32>("DATABASE_CONNECTION_POOL_SIZE") {
3009            self.database.mysql.connection_pool_size = pool_size;
3010            self.database.postgres.connection_pool_size = pool_size;
3011        }
3012        if let Some(cache_ttl) = parse_env_optional::<u64>("CACHE_TTL_SECONDS") {
3013            self.app_manager.cache.ttl = cache_ttl;
3014            self.channel_limits.cache_ttl = cache_ttl;
3015            self.database.mysql.cache_ttl = cache_ttl;
3016            self.database.postgres.cache_ttl = cache_ttl;
3017            self.database.surrealdb.cache_ttl = cache_ttl;
3018            self.cache.memory.ttl = cache_ttl;
3019        }
3020        if let Some(cleanup_interval) = parse_env_optional::<u64>("CACHE_CLEANUP_INTERVAL") {
3021            self.database.mysql.cache_cleanup_interval = cleanup_interval;
3022            self.database.postgres.cache_cleanup_interval = cleanup_interval;
3023            self.cache.memory.cleanup_interval = cleanup_interval;
3024        }
3025        if let Some(max_capacity) = parse_env_optional::<u64>("CACHE_MAX_CAPACITY") {
3026            self.database.mysql.cache_max_capacity = max_capacity;
3027            self.database.postgres.cache_max_capacity = max_capacity;
3028            self.database.surrealdb.cache_max_capacity = max_capacity;
3029            self.cache.memory.max_capacity = max_capacity;
3030        }
3031
3032        let default_app_id = std::env::var("SOCKUDO_DEFAULT_APP_ID");
3033        let default_app_key = std::env::var("SOCKUDO_DEFAULT_APP_KEY");
3034        let default_app_secret = std::env::var("SOCKUDO_DEFAULT_APP_SECRET");
3035        let default_app_enabled = parse_bool_env("SOCKUDO_DEFAULT_APP_ENABLED", true);
3036
3037        if let (Ok(app_id), Ok(app_key), Ok(app_secret)) =
3038            (default_app_id, default_app_key, default_app_secret)
3039            && default_app_enabled
3040        {
3041            let default_app = App::from_policy(
3042                app_id,
3043                app_key,
3044                app_secret,
3045                default_app_enabled,
3046                crate::app::AppPolicy {
3047                    limits: crate::app::AppLimitsPolicy {
3048                        max_connections: parse_env::<u32>(
3049                            "SOCKUDO_DEFAULT_APP_MAX_CONNECTIONS",
3050                            100,
3051                        ),
3052                        max_backend_events_per_second: Some(parse_env::<u32>(
3053                            "SOCKUDO_DEFAULT_APP_MAX_BACKEND_EVENTS_PER_SECOND",
3054                            100,
3055                        )),
3056                        max_client_events_per_second: parse_env::<u32>(
3057                            "SOCKUDO_DEFAULT_APP_MAX_CLIENT_EVENTS_PER_SECOND",
3058                            100,
3059                        ),
3060                        max_read_requests_per_second: Some(parse_env::<u32>(
3061                            "SOCKUDO_DEFAULT_APP_MAX_READ_REQUESTS_PER_SECOND",
3062                            100,
3063                        )),
3064                        max_presence_members_per_channel: Some(parse_env::<u32>(
3065                            "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBERS_PER_CHANNEL",
3066                            100,
3067                        )),
3068                        max_presence_member_size_in_kb: Some(parse_env::<u32>(
3069                            "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBER_SIZE_IN_KB",
3070                            100,
3071                        )),
3072                        max_channel_name_length: Some(parse_env::<u32>(
3073                            "SOCKUDO_DEFAULT_APP_MAX_CHANNEL_NAME_LENGTH",
3074                            100,
3075                        )),
3076                        max_event_channels_at_once: Some(parse_env::<u32>(
3077                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_CHANNELS_AT_ONCE",
3078                            100,
3079                        )),
3080                        max_event_name_length: Some(parse_env::<u32>(
3081                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_NAME_LENGTH",
3082                            100,
3083                        )),
3084                        max_event_payload_in_kb: Some(parse_env::<u32>(
3085                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_PAYLOAD_IN_KB",
3086                            100,
3087                        )),
3088                        max_event_batch_size: Some(parse_env::<u32>(
3089                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_BATCH_SIZE",
3090                            100,
3091                        )),
3092                        decay_seconds: None,
3093                        terminate_on_limit: false,
3094                        message_rate_limit: None,
3095                    },
3096                    features: crate::app::AppFeaturesPolicy {
3097                        enable_client_messages: std::env::var(
3098                            "SOCKUDO_DEFAULT_APP_ENABLE_CLIENT_MESSAGES",
3099                        )
3100                        .ok()
3101                        .map(|value| {
3102                            matches!(
3103                                value.trim().to_ascii_lowercase().as_str(),
3104                                "true" | "1" | "yes" | "on"
3105                            )
3106                        })
3107                        .unwrap_or_else(|| parse_bool_env("SOCKUDO_ENABLE_CLIENT_MESSAGES", false)),
3108                        enable_user_authentication: Some(parse_bool_env(
3109                            "SOCKUDO_DEFAULT_APP_ENABLE_USER_AUTHENTICATION",
3110                            false,
3111                        )),
3112                        enable_watchlist_events: Some(parse_bool_env(
3113                            "SOCKUDO_DEFAULT_APP_ENABLE_WATCHLIST_EVENTS",
3114                            false,
3115                        )),
3116                    },
3117                    channels: crate::app::AppChannelsPolicy {
3118                        allowed_origins: {
3119                            if let Ok(origins_str) =
3120                                std::env::var("SOCKUDO_DEFAULT_APP_ALLOWED_ORIGINS")
3121                            {
3122                                if !origins_str.is_empty() {
3123                                    Some(
3124                                        origins_str
3125                                            .split(',')
3126                                            .map(|s| s.trim().to_string())
3127                                            .collect(),
3128                                    )
3129                                } else {
3130                                    None
3131                                }
3132                            } else {
3133                                None
3134                            }
3135                        },
3136                        annotations_enabled: Some(parse_bool_env(
3137                            "SOCKUDO_DEFAULT_APP_ANNOTATIONS_ENABLED",
3138                            false,
3139                        )),
3140                        channel_delta_compression: None,
3141                        channel_namespaces: None,
3142                    },
3143                    webhooks: None,
3144                    idempotency: None,
3145                    connection_recovery: None,
3146                    history: None,
3147                    presence_history: None,
3148                },
3149            );
3150
3151            self.app_manager.array.apps.push(default_app);
3152            info!("Successfully registered default app from env");
3153        }
3154
3155        // Special handling for REDIS_URL
3156        if let Ok(redis_url_env) = std::env::var("REDIS_URL") {
3157            info!("Applying REDIS_URL environment variable override");
3158
3159            let redis_url_json = sonic_rs::json!(redis_url_env);
3160
3161            self.adapter
3162                .redis
3163                .redis_pub_options
3164                .insert("url".to_string(), redis_url_json.clone());
3165            self.adapter
3166                .redis
3167                .redis_sub_options
3168                .insert("url".to_string(), redis_url_json);
3169
3170            self.cache.redis.url_override = Some(redis_url_env.clone());
3171            self.queue.redis.url_override = Some(redis_url_env.clone());
3172            self.rate_limiter.redis.url_override = Some(redis_url_env);
3173        }
3174
3175        // --- Logging Configuration ---
3176        let has_colors_env = std::env::var("LOG_COLORS_ENABLED").is_ok();
3177        let has_target_env = std::env::var("LOG_INCLUDE_TARGET").is_ok();
3178        if has_colors_env || has_target_env {
3179            let logging_config = self.logging.get_or_insert_with(Default::default);
3180            if has_colors_env {
3181                logging_config.colors_enabled =
3182                    parse_bool_env("LOG_COLORS_ENABLED", logging_config.colors_enabled);
3183            }
3184            if has_target_env {
3185                logging_config.include_target =
3186                    parse_bool_env("LOG_INCLUDE_TARGET", logging_config.include_target);
3187            }
3188        }
3189
3190        // --- Cleanup Configuration ---
3191        self.cleanup.async_enabled =
3192            parse_bool_env("CLEANUP_ASYNC_ENABLED", self.cleanup.async_enabled);
3193        self.cleanup.fallback_to_sync =
3194            parse_bool_env("CLEANUP_FALLBACK_TO_SYNC", self.cleanup.fallback_to_sync);
3195        self.cleanup.queue_buffer_size =
3196            parse_env::<usize>("CLEANUP_QUEUE_BUFFER_SIZE", self.cleanup.queue_buffer_size);
3197        self.cleanup.batch_size = parse_env::<usize>("CLEANUP_BATCH_SIZE", self.cleanup.batch_size);
3198        self.cleanup.batch_timeout_ms =
3199            parse_env::<u64>("CLEANUP_BATCH_TIMEOUT_MS", self.cleanup.batch_timeout_ms);
3200        if let Ok(worker_threads_str) = std::env::var("CLEANUP_WORKER_THREADS") {
3201            self.cleanup.worker_threads = if worker_threads_str.to_lowercase() == "auto" {
3202                WorkerThreadsConfig::Auto
3203            } else if let Ok(n) = worker_threads_str.parse::<usize>() {
3204                WorkerThreadsConfig::Fixed(n)
3205            } else {
3206                warn!(
3207                    "Invalid CLEANUP_WORKER_THREADS value '{}', keeping current setting",
3208                    worker_threads_str
3209                );
3210                self.cleanup.worker_threads.clone()
3211            };
3212        }
3213        self.cleanup.max_retry_attempts = parse_env::<u32>(
3214            "CLEANUP_MAX_RETRY_ATTEMPTS",
3215            self.cleanup.max_retry_attempts,
3216        );
3217
3218        // Cluster health configuration
3219        self.cluster_health.enabled =
3220            parse_bool_env("CLUSTER_HEALTH_ENABLED", self.cluster_health.enabled);
3221        self.cluster_health.heartbeat_interval_ms = parse_env::<u64>(
3222            "CLUSTER_HEALTH_HEARTBEAT_INTERVAL",
3223            self.cluster_health.heartbeat_interval_ms,
3224        );
3225        self.cluster_health.node_timeout_ms = parse_env::<u64>(
3226            "CLUSTER_HEALTH_NODE_TIMEOUT",
3227            self.cluster_health.node_timeout_ms,
3228        );
3229        self.cluster_health.cleanup_interval_ms = parse_env::<u64>(
3230            "CLUSTER_HEALTH_CLEANUP_INTERVAL",
3231            self.cluster_health.cleanup_interval_ms,
3232        );
3233
3234        // Tag filtering configuration
3235        self.tag_filtering.enabled =
3236            parse_bool_env("TAG_FILTERING_ENABLED", self.tag_filtering.enabled);
3237
3238        // WebSocket buffer configuration
3239        if let Ok(val) = std::env::var("WEBSOCKET_MAX_MESSAGES") {
3240            if val.to_lowercase() == "none" || val == "0" {
3241                self.websocket.max_messages = None;
3242            } else if let Ok(n) = val.parse::<usize>() {
3243                self.websocket.max_messages = Some(n);
3244            }
3245        }
3246        if let Ok(val) = std::env::var("WEBSOCKET_MAX_BYTES") {
3247            if val.to_lowercase() == "none" || val == "0" {
3248                self.websocket.max_bytes = None;
3249            } else if let Ok(n) = val.parse::<usize>() {
3250                self.websocket.max_bytes = Some(n);
3251            }
3252        }
3253        self.websocket.disconnect_on_buffer_full = parse_bool_env(
3254            "WEBSOCKET_DISCONNECT_ON_BUFFER_FULL",
3255            self.websocket.disconnect_on_buffer_full,
3256        );
3257        self.websocket.max_message_size = parse_env::<usize>(
3258            "WEBSOCKET_MAX_MESSAGE_SIZE",
3259            self.websocket.max_message_size,
3260        );
3261        self.websocket.max_frame_size =
3262            parse_env::<usize>("WEBSOCKET_MAX_FRAME_SIZE", self.websocket.max_frame_size);
3263        self.websocket.write_buffer_size = parse_env::<usize>(
3264            "WEBSOCKET_WRITE_BUFFER_SIZE",
3265            self.websocket.write_buffer_size,
3266        );
3267        self.websocket.max_backpressure = parse_env::<usize>(
3268            "WEBSOCKET_MAX_BACKPRESSURE",
3269            self.websocket.max_backpressure,
3270        );
3271        self.websocket.auto_ping = parse_bool_env("WEBSOCKET_AUTO_PING", self.websocket.auto_ping);
3272        self.websocket.ping_interval =
3273            parse_env::<u32>("WEBSOCKET_PING_INTERVAL", self.websocket.ping_interval);
3274        self.websocket.idle_timeout =
3275            parse_env::<u32>("WEBSOCKET_IDLE_TIMEOUT", self.websocket.idle_timeout);
3276        if let Ok(mode) = std::env::var("WEBSOCKET_COMPRESSION") {
3277            self.websocket.compression = mode;
3278        }
3279
3280        // Connection recovery (includes serial + message_id + replay buffer)
3281        self.connection_recovery.enabled = parse_bool_env(
3282            "CONNECTION_RECOVERY_ENABLED",
3283            self.connection_recovery.enabled,
3284        );
3285        self.connection_recovery.buffer_ttl_seconds = parse_env::<u64>(
3286            "CONNECTION_RECOVERY_BUFFER_TTL",
3287            self.connection_recovery.buffer_ttl_seconds,
3288        );
3289        self.connection_recovery.max_buffer_size = parse_env::<usize>(
3290            "CONNECTION_RECOVERY_MAX_BUFFER_SIZE",
3291            self.connection_recovery.max_buffer_size,
3292        );
3293
3294        self.history.enabled = parse_bool_env("HISTORY_ENABLED", self.history.enabled);
3295        self.history.rewind_enabled =
3296            parse_bool_env("HISTORY_REWIND_ENABLED", self.history.rewind_enabled);
3297        self.history.retention_window_seconds = parse_env::<u64>(
3298            "HISTORY_RETENTION_WINDOW_SECONDS",
3299            self.history.retention_window_seconds,
3300        );
3301        self.history.max_page_size =
3302            parse_env::<usize>("HISTORY_MAX_PAGE_SIZE", self.history.max_page_size);
3303        self.history.writer_shards =
3304            parse_env::<usize>("HISTORY_WRITER_SHARDS", self.history.writer_shards);
3305        self.history.writer_queue_capacity = parse_env::<usize>(
3306            "HISTORY_WRITER_QUEUE_CAPACITY",
3307            self.history.writer_queue_capacity,
3308        );
3309        if let Ok(backend) = std::env::var("HISTORY_BACKEND") {
3310            self.history.backend = HistoryBackend::from_str(&backend)?;
3311        }
3312        if let Ok(max_messages) = std::env::var("HISTORY_MAX_MESSAGES_PER_CHANNEL") {
3313            self.history.max_messages_per_channel = Some(
3314                max_messages
3315                    .parse::<usize>()
3316                    .map_err(|e| format!("Invalid HISTORY_MAX_MESSAGES_PER_CHANNEL: {e}"))?,
3317            );
3318        }
3319        if let Ok(max_bytes) = std::env::var("HISTORY_MAX_BYTES_PER_CHANNEL") {
3320            self.history.max_bytes_per_channel = Some(
3321                max_bytes
3322                    .parse::<u64>()
3323                    .map_err(|e| format!("Invalid HISTORY_MAX_BYTES_PER_CHANNEL: {e}"))?,
3324            );
3325        }
3326        if let Ok(table_prefix) = std::env::var("HISTORY_POSTGRES_TABLE_PREFIX") {
3327            self.history.postgres.table_prefix = table_prefix;
3328        }
3329        self.history.postgres.write_timeout_ms = parse_env::<u64>(
3330            "HISTORY_POSTGRES_WRITE_TIMEOUT_MS",
3331            self.history.postgres.write_timeout_ms,
3332        );
3333        self.history.purge_interval_seconds = parse_env::<u64>(
3334            "HISTORY_PURGE_INTERVAL_SECONDS",
3335            self.history.purge_interval_seconds,
3336        );
3337        self.history.purge_batch_size =
3338            parse_env::<usize>("HISTORY_PURGE_BATCH_SIZE", self.history.purge_batch_size);
3339        self.history.max_purge_per_tick = parse_env::<usize>(
3340            "HISTORY_MAX_PURGE_PER_TICK",
3341            self.history.max_purge_per_tick,
3342        );
3343
3344        self.presence_history.enabled =
3345            parse_bool_env("PRESENCE_HISTORY_ENABLED", self.presence_history.enabled);
3346        self.presence_history.retention_window_seconds = parse_env::<u64>(
3347            "PRESENCE_HISTORY_RETENTION_WINDOW_SECONDS",
3348            self.presence_history.retention_window_seconds,
3349        );
3350        self.presence_history.max_page_size = parse_env::<usize>(
3351            "PRESENCE_HISTORY_MAX_PAGE_SIZE",
3352            self.presence_history.max_page_size,
3353        );
3354        if let Ok(max_events) = std::env::var("PRESENCE_HISTORY_MAX_EVENTS_PER_CHANNEL") {
3355            self.presence_history.max_events_per_channel =
3356                Some(max_events.parse::<usize>().map_err(|e| {
3357                    format!("Invalid PRESENCE_HISTORY_MAX_EVENTS_PER_CHANNEL: {e}")
3358                })?);
3359        }
3360        if let Ok(max_bytes) = std::env::var("PRESENCE_HISTORY_MAX_BYTES_PER_CHANNEL") {
3361            self.presence_history.max_bytes_per_channel = Some(
3362                max_bytes
3363                    .parse::<u64>()
3364                    .map_err(|e| format!("Invalid PRESENCE_HISTORY_MAX_BYTES_PER_CHANNEL: {e}"))?,
3365            );
3366        }
3367        self.idempotency.enabled = parse_bool_env("IDEMPOTENCY_ENABLED", self.idempotency.enabled);
3368        self.idempotency.ttl_seconds =
3369            parse_env::<u64>("IDEMPOTENCY_TTL_SECONDS", self.idempotency.ttl_seconds);
3370        self.idempotency.max_key_length = parse_env::<usize>(
3371            "IDEMPOTENCY_MAX_KEY_LENGTH",
3372            self.idempotency.max_key_length,
3373        );
3374
3375        self.ephemeral.enabled = parse_bool_env("EPHEMERAL_ENABLED", self.ephemeral.enabled);
3376
3377        self.echo_control.enabled =
3378            parse_bool_env("ECHO_CONTROL_ENABLED", self.echo_control.enabled);
3379        self.echo_control.default_echo_messages = parse_bool_env(
3380            "ECHO_CONTROL_DEFAULT_ECHO_MESSAGES",
3381            self.echo_control.default_echo_messages,
3382        );
3383
3384        self.event_name_filtering.enabled = parse_bool_env(
3385            "EVENT_NAME_FILTERING_ENABLED",
3386            self.event_name_filtering.enabled,
3387        );
3388        self.event_name_filtering.max_events_per_filter = parse_env::<usize>(
3389            "EVENT_NAME_FILTERING_MAX_EVENTS_PER_FILTER",
3390            self.event_name_filtering.max_events_per_filter,
3391        );
3392        self.event_name_filtering.max_event_name_length = parse_env::<usize>(
3393            "EVENT_NAME_FILTERING_MAX_EVENT_NAME_LENGTH",
3394            self.event_name_filtering.max_event_name_length,
3395        );
3396        self.versioned_messages.enabled = parse_bool_env(
3397            "VERSIONED_MESSAGES_ENABLED",
3398            self.versioned_messages.enabled,
3399        );
3400        if let Ok(driver_str) = std::env::var("VERSIONED_MESSAGES_DRIVER") {
3401            self.versioned_messages.driver = parse_driver_enum(
3402                driver_str,
3403                self.versioned_messages.driver.clone(),
3404                "VersionedMessages Backend",
3405            );
3406        }
3407        self.versioned_messages.max_page_size = parse_env::<usize>(
3408            "VERSIONED_MESSAGES_MAX_PAGE_SIZE",
3409            self.versioned_messages.max_page_size,
3410        );
3411        self.versioned_messages.retention_window_seconds = parse_env::<u64>(
3412            "VERSIONED_MESSAGES_RETENTION_WINDOW_SECONDS",
3413            self.versioned_messages.retention_window_seconds,
3414        );
3415        self.versioned_messages.purge_interval_seconds = parse_env::<u64>(
3416            "VERSIONED_MESSAGES_PURGE_INTERVAL_SECONDS",
3417            self.versioned_messages.purge_interval_seconds,
3418        );
3419        self.versioned_messages.purge_batch_size = parse_env::<usize>(
3420            "VERSIONED_MESSAGES_PURGE_BATCH_SIZE",
3421            self.versioned_messages.purge_batch_size,
3422        );
3423        self.versioned_messages.max_purge_per_tick = parse_env::<usize>(
3424            "VERSIONED_MESSAGES_MAX_PURGE_PER_TICK",
3425            self.versioned_messages.max_purge_per_tick,
3426        );
3427        self.annotations.enabled = parse_bool_env("ANNOTATIONS_ENABLED", self.annotations.enabled);
3428
3429        Ok(())
3430    }
3431
3432    pub fn validate(&self) -> Result<(), String> {
3433        if self.unix_socket.enabled {
3434            if self.unix_socket.path.is_empty() {
3435                return Err(
3436                    "Unix socket path cannot be empty when Unix socket is enabled".to_string(),
3437                );
3438            }
3439
3440            self.validate_unix_socket_security()?;
3441
3442            if self.ssl.enabled {
3443                tracing::warn!(
3444                    "Both Unix socket and SSL are enabled. This is unusual as Unix sockets are typically used behind reverse proxies that handle SSL termination."
3445                );
3446            }
3447
3448            if self.unix_socket.permission_mode > 0o777 {
3449                return Err(format!(
3450                    "Unix socket permission_mode ({:o}) is invalid. Must be a valid octal mode (0o000 to 0o777)",
3451                    self.unix_socket.permission_mode
3452                ));
3453            }
3454        }
3455
3456        if let Err(e) = self.cleanup.validate() {
3457            return Err(format!("Invalid cleanup configuration: {}", e));
3458        }
3459
3460        if self.history.enabled {
3461            if self.history.max_page_size == 0 {
3462                return Err("history.max_page_size must be greater than 0".to_string());
3463            }
3464            if self.history.writer_shards == 0 {
3465                return Err("history.writer_shards must be greater than 0".to_string());
3466            }
3467            if self.history.writer_queue_capacity == 0 {
3468                return Err("history.writer_queue_capacity must be greater than 0".to_string());
3469            }
3470            if self.history.retention_window_seconds == 0 {
3471                return Err("history.retention_window_seconds must be greater than 0".to_string());
3472            }
3473            if self.history.postgres.table_prefix.trim().is_empty() {
3474                return Err("history.postgres.table_prefix must not be empty".to_string());
3475            }
3476        }
3477
3478        if self.presence_history.enabled {
3479            if self.presence_history.max_page_size == 0 {
3480                return Err("presence_history.max_page_size must be greater than 0".to_string());
3481            }
3482            if self.presence_history.retention_window_seconds == 0 {
3483                return Err(
3484                    "presence_history.retention_window_seconds must be greater than 0".to_string(),
3485                );
3486            }
3487        }
3488
3489        if self.versioned_messages.enabled && self.versioned_messages.max_page_size == 0 {
3490            return Err("versioned_messages.max_page_size must be greater than 0".to_string());
3491        }
3492        if self.annotations.enabled && !self.versioned_messages.enabled {
3493            return Err("annotations require versioned_messages.enabled".to_string());
3494        }
3495
3496        Ok(())
3497    }
3498
3499    fn validate_unix_socket_security(&self) -> Result<(), String> {
3500        let path = &self.unix_socket.path;
3501
3502        if path.contains("../") || path.contains("..\\") {
3503            return Err(
3504                "Unix socket path contains directory traversal sequences (../). This is not allowed for security reasons.".to_string()
3505            );
3506        }
3507
3508        if self.unix_socket.permission_mode & 0o002 != 0 {
3509            warn!(
3510                "Unix socket permission mode ({:o}) allows world write access. This may be a security risk. Consider using more restrictive permissions like 0o660 or 0o750.",
3511                self.unix_socket.permission_mode
3512            );
3513        }
3514
3515        if self.unix_socket.permission_mode & 0o007 > 0o005 {
3516            warn!(
3517                "Unix socket permission mode ({:o}) grants write permissions to others. Consider using more restrictive permissions.",
3518                self.unix_socket.permission_mode
3519            );
3520        }
3521
3522        if self.mode == "production" && path.starts_with("/tmp/") {
3523            warn!(
3524                "Unix socket path '{}' is in /tmp directory. In production, consider using a more permanent location like /var/run/sockudo/ for better security and persistence.",
3525                path
3526            );
3527        }
3528
3529        if !path.starts_with('/') {
3530            return Err(
3531                "Unix socket path must be absolute (start with /) for security and reliability."
3532                    .to_string(),
3533            );
3534        }
3535
3536        Ok(())
3537    }
3538}
3539
3540#[cfg(test)]
3541mod redis_connection_tests {
3542    use super::{ClusterNode, RedisClusterConnection, RedisConnection, RedisSentinel};
3543
3544    #[test]
3545    fn test_standard_url_basic() {
3546        let conn = RedisConnection {
3547            host: "127.0.0.1".to_string(),
3548            port: 6379,
3549            db: 0,
3550            username: None,
3551            password: None,
3552            key_prefix: "sockudo:".to_string(),
3553            sentinels: Vec::new(),
3554            sentinel_password: None,
3555            name: "mymaster".to_string(),
3556            cluster: RedisClusterConnection::default(),
3557            cluster_nodes: Vec::new(),
3558        };
3559        assert_eq!(conn.to_url(), "redis://127.0.0.1:6379/0");
3560    }
3561
3562    #[test]
3563    fn test_standard_url_with_password() {
3564        let conn = RedisConnection {
3565            host: "127.0.0.1".to_string(),
3566            port: 6379,
3567            db: 2,
3568            username: None,
3569            password: Some("secret".to_string()),
3570            key_prefix: "sockudo:".to_string(),
3571            sentinels: Vec::new(),
3572            sentinel_password: None,
3573            name: "mymaster".to_string(),
3574            cluster: RedisClusterConnection::default(),
3575            cluster_nodes: Vec::new(),
3576        };
3577        assert_eq!(conn.to_url(), "redis://:secret@127.0.0.1:6379/2");
3578    }
3579
3580    #[test]
3581    fn test_standard_url_with_username_and_password() {
3582        let conn = RedisConnection {
3583            host: "redis.example.com".to_string(),
3584            port: 6380,
3585            db: 1,
3586            username: Some("admin".to_string()),
3587            password: Some("pass123".to_string()),
3588            key_prefix: "sockudo:".to_string(),
3589            sentinels: Vec::new(),
3590            sentinel_password: None,
3591            name: "mymaster".to_string(),
3592            cluster: RedisClusterConnection::default(),
3593            cluster_nodes: Vec::new(),
3594        };
3595        assert_eq!(
3596            conn.to_url(),
3597            "redis://admin:pass123@redis.example.com:6380/1"
3598        );
3599    }
3600
3601    #[test]
3602    fn test_standard_url_with_special_chars_in_password() {
3603        let conn = RedisConnection {
3604            host: "127.0.0.1".to_string(),
3605            port: 6379,
3606            db: 0,
3607            username: None,
3608            password: Some("pass@word#123".to_string()),
3609            key_prefix: "sockudo:".to_string(),
3610            sentinels: Vec::new(),
3611            sentinel_password: None,
3612            name: "mymaster".to_string(),
3613            cluster: RedisClusterConnection::default(),
3614            cluster_nodes: Vec::new(),
3615        };
3616        assert_eq!(conn.to_url(), "redis://:pass%40word%23123@127.0.0.1:6379/0");
3617    }
3618
3619    #[test]
3620    fn test_is_sentinel_configured_false() {
3621        let conn = RedisConnection::default();
3622        assert!(!conn.is_sentinel_configured());
3623    }
3624
3625    #[test]
3626    fn test_is_sentinel_configured_true() {
3627        let conn = RedisConnection {
3628            sentinels: vec![RedisSentinel {
3629                host: "sentinel1".to_string(),
3630                port: 26379,
3631            }],
3632            ..Default::default()
3633        };
3634        assert!(conn.is_sentinel_configured());
3635    }
3636
3637    #[test]
3638    fn test_sentinel_url_basic() {
3639        let conn = RedisConnection {
3640            host: "127.0.0.1".to_string(),
3641            port: 6379,
3642            db: 0,
3643            username: None,
3644            password: None,
3645            key_prefix: "sockudo:".to_string(),
3646            sentinels: vec![
3647                RedisSentinel {
3648                    host: "sentinel1".to_string(),
3649                    port: 26379,
3650                },
3651                RedisSentinel {
3652                    host: "sentinel2".to_string(),
3653                    port: 26379,
3654                },
3655            ],
3656            sentinel_password: None,
3657            name: "mymaster".to_string(),
3658            cluster: RedisClusterConnection::default(),
3659            cluster_nodes: Vec::new(),
3660        };
3661        assert_eq!(
3662            conn.to_url(),
3663            "redis+sentinel://sentinel1:26379,sentinel2:26379/mymaster/0"
3664        );
3665    }
3666
3667    #[test]
3668    fn test_sentinel_url_with_sentinel_password() {
3669        let conn = RedisConnection {
3670            host: "127.0.0.1".to_string(),
3671            port: 6379,
3672            db: 0,
3673            username: None,
3674            password: None,
3675            key_prefix: "sockudo:".to_string(),
3676            sentinels: vec![RedisSentinel {
3677                host: "sentinel1".to_string(),
3678                port: 26379,
3679            }],
3680            sentinel_password: Some("sentinelpass".to_string()),
3681            name: "mymaster".to_string(),
3682            cluster: RedisClusterConnection::default(),
3683            cluster_nodes: Vec::new(),
3684        };
3685        assert_eq!(
3686            conn.to_url(),
3687            "redis+sentinel://:sentinelpass@sentinel1:26379/mymaster/0"
3688        );
3689    }
3690
3691    #[test]
3692    fn test_sentinel_url_with_master_password() {
3693        let conn = RedisConnection {
3694            host: "127.0.0.1".to_string(),
3695            port: 6379,
3696            db: 1,
3697            username: None,
3698            password: Some("masterpass".to_string()),
3699            key_prefix: "sockudo:".to_string(),
3700            sentinels: vec![RedisSentinel {
3701                host: "sentinel1".to_string(),
3702                port: 26379,
3703            }],
3704            sentinel_password: None,
3705            name: "mymaster".to_string(),
3706            cluster: RedisClusterConnection::default(),
3707            cluster_nodes: Vec::new(),
3708        };
3709        assert_eq!(
3710            conn.to_url(),
3711            "redis+sentinel://sentinel1:26379/mymaster/1?password=masterpass"
3712        );
3713    }
3714
3715    #[test]
3716    fn test_sentinel_url_with_all_auth() {
3717        let conn = RedisConnection {
3718            host: "127.0.0.1".to_string(),
3719            port: 6379,
3720            db: 2,
3721            username: Some("redisuser".to_string()),
3722            password: Some("redispass".to_string()),
3723            key_prefix: "sockudo:".to_string(),
3724            sentinels: vec![
3725                RedisSentinel {
3726                    host: "sentinel1".to_string(),
3727                    port: 26379,
3728                },
3729                RedisSentinel {
3730                    host: "sentinel2".to_string(),
3731                    port: 26380,
3732                },
3733            ],
3734            sentinel_password: Some("sentinelauth".to_string()),
3735            name: "production-master".to_string(),
3736            cluster: RedisClusterConnection::default(),
3737            cluster_nodes: Vec::new(),
3738        };
3739        assert_eq!(
3740            conn.to_url(),
3741            "redis+sentinel://:sentinelauth@sentinel1:26379,sentinel2:26380/production-master/2?password=redispass&username=redisuser"
3742        );
3743    }
3744
3745    #[test]
3746    fn test_sentinel_to_host_port() {
3747        let sentinel = RedisSentinel {
3748            host: "sentinel.example.com".to_string(),
3749            port: 26379,
3750        };
3751        assert_eq!(sentinel.to_host_port(), "sentinel.example.com:26379");
3752    }
3753
3754    #[test]
3755    fn test_cluster_node_urls_with_shared_cluster_auth_and_tls() {
3756        let conn = RedisConnection {
3757            cluster: RedisClusterConnection {
3758                nodes: vec![
3759                    ClusterNode {
3760                        host: "node1.secure-cluster.com".to_string(),
3761                        port: 7000,
3762                    },
3763                    ClusterNode {
3764                        host: "redis://node2.secure-cluster.com:7001".to_string(),
3765                        port: 7001,
3766                    },
3767                    ClusterNode {
3768                        host: "rediss://node3.secure-cluster.com".to_string(),
3769                        port: 7002,
3770                    },
3771                ],
3772                username: None,
3773                password: Some("cluster-secret".to_string()),
3774                use_tls: true,
3775            },
3776            ..Default::default()
3777        };
3778
3779        assert_eq!(
3780            conn.cluster_node_urls(),
3781            vec![
3782                "rediss://:cluster-secret@node1.secure-cluster.com:7000",
3783                "redis://:cluster-secret@node2.secure-cluster.com:7001",
3784                "rediss://:cluster-secret@node3.secure-cluster.com:7002",
3785            ]
3786        );
3787    }
3788
3789    #[test]
3790    fn test_cluster_node_urls_fallback_to_legacy_nodes() {
3791        let conn = RedisConnection {
3792            password: Some("fallback-secret".to_string()),
3793            cluster_nodes: vec![ClusterNode {
3794                host: "legacy-node.example.com".to_string(),
3795                port: 7000,
3796            }],
3797            ..Default::default()
3798        };
3799
3800        assert_eq!(
3801            conn.cluster_node_urls(),
3802            vec!["redis://:fallback-secret@legacy-node.example.com:7000"]
3803        );
3804    }
3805
3806    #[test]
3807    fn test_normalize_cluster_seed_urls() {
3808        let conn = RedisConnection {
3809            cluster: RedisClusterConnection {
3810                nodes: Vec::new(),
3811                username: Some("svc-user".to_string()),
3812                password: Some("svc-pass".to_string()),
3813                use_tls: true,
3814            },
3815            ..Default::default()
3816        };
3817
3818        let seeds = vec![
3819            "node1.example.com:7000".to_string(),
3820            "redis://node2.example.com:7001".to_string(),
3821            "rediss://node3.example.com".to_string(),
3822        ];
3823
3824        assert_eq!(
3825            conn.normalize_cluster_seed_urls(&seeds),
3826            vec![
3827                "rediss://svc-user:svc-pass@node1.example.com:7000",
3828                "redis://svc-user:svc-pass@node2.example.com:7001",
3829                "rediss://svc-user:svc-pass@node3.example.com:6379",
3830            ]
3831        );
3832    }
3833}
3834
3835#[cfg(test)]
3836mod cluster_node_tests {
3837    use super::ClusterNode;
3838
3839    #[test]
3840    fn test_to_url_basic_host() {
3841        let node = ClusterNode {
3842            host: "localhost".to_string(),
3843            port: 6379,
3844        };
3845        assert_eq!(node.to_url(), "redis://localhost:6379");
3846    }
3847
3848    #[test]
3849    fn test_to_url_ip_address() {
3850        let node = ClusterNode {
3851            host: "127.0.0.1".to_string(),
3852            port: 6379,
3853        };
3854        assert_eq!(node.to_url(), "redis://127.0.0.1:6379");
3855    }
3856
3857    #[test]
3858    fn test_to_url_with_redis_protocol() {
3859        let node = ClusterNode {
3860            host: "redis://example.com".to_string(),
3861            port: 6379,
3862        };
3863        assert_eq!(node.to_url(), "redis://example.com:6379");
3864    }
3865
3866    #[test]
3867    fn test_to_url_with_rediss_protocol() {
3868        let node = ClusterNode {
3869            host: "rediss://secure.example.com".to_string(),
3870            port: 6379,
3871        };
3872        assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
3873    }
3874
3875    #[test]
3876    fn test_to_url_with_rediss_protocol_and_port_in_url() {
3877        let node = ClusterNode {
3878            host: "rediss://secure.example.com:7000".to_string(),
3879            port: 6379,
3880        };
3881        assert_eq!(node.to_url(), "rediss://secure.example.com:7000");
3882    }
3883
3884    #[test]
3885    fn test_to_url_with_redis_protocol_and_port_in_url() {
3886        let node = ClusterNode {
3887            host: "redis://example.com:7001".to_string(),
3888            port: 6379,
3889        };
3890        assert_eq!(node.to_url(), "redis://example.com:7001");
3891    }
3892
3893    #[test]
3894    fn test_to_url_with_trailing_whitespace() {
3895        let node = ClusterNode {
3896            host: "  rediss://secure.example.com  ".to_string(),
3897            port: 6379,
3898        };
3899        assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
3900    }
3901
3902    #[test]
3903    fn test_to_url_custom_port() {
3904        let node = ClusterNode {
3905            host: "redis-cluster.example.com".to_string(),
3906            port: 7000,
3907        };
3908        assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7000");
3909    }
3910
3911    #[test]
3912    fn test_to_url_plain_host_with_port_in_host_field() {
3913        let node = ClusterNode {
3914            host: "redis-cluster.example.com:7010".to_string(),
3915            port: 7000,
3916        };
3917        assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7010");
3918    }
3919
3920    #[test]
3921    fn test_to_url_with_options_adds_auth_and_tls() {
3922        let node = ClusterNode {
3923            host: "node.example.com".to_string(),
3924            port: 7000,
3925        };
3926        assert_eq!(
3927            node.to_url_with_options(true, Some("svc-user"), Some("secret")),
3928            "rediss://svc-user:secret@node.example.com:7000"
3929        );
3930    }
3931
3932    #[test]
3933    fn test_to_url_with_options_keeps_embedded_auth() {
3934        let node = ClusterNode {
3935            host: "rediss://:node-secret@node.example.com:7000".to_string(),
3936            port: 7000,
3937        };
3938        assert_eq!(
3939            node.to_url_with_options(true, Some("global-user"), Some("global-secret")),
3940            "rediss://:node-secret@node.example.com:7000"
3941        );
3942    }
3943
3944    #[test]
3945    fn test_from_seed_parses_plain_host_port() {
3946        let node = ClusterNode::from_seed("cluster-node-1:7005").expect("node should parse");
3947        assert_eq!(node.host, "cluster-node-1");
3948        assert_eq!(node.port, 7005);
3949    }
3950
3951    #[test]
3952    fn test_from_seed_keeps_scheme_urls() {
3953        let node =
3954            ClusterNode::from_seed("rediss://secure.example.com:7005").expect("node should parse");
3955        assert_eq!(node.host, "rediss://secure.example.com:7005");
3956        assert_eq!(node.port, 7005);
3957    }
3958
3959    #[test]
3960    fn test_to_url_aws_elasticache_hostname() {
3961        let node = ClusterNode {
3962            host: "rediss://my-cluster.use1.cache.amazonaws.com".to_string(),
3963            port: 6379,
3964        };
3965        assert_eq!(
3966            node.to_url(),
3967            "rediss://my-cluster.use1.cache.amazonaws.com:6379"
3968        );
3969    }
3970
3971    #[test]
3972    fn test_to_url_with_ipv6_no_port() {
3973        let node = ClusterNode {
3974            host: "rediss://[::1]".to_string(),
3975            port: 6379,
3976        };
3977        assert_eq!(node.to_url(), "rediss://[::1]:6379");
3978    }
3979
3980    #[test]
3981    fn test_to_url_with_ipv6_and_port_in_url() {
3982        let node = ClusterNode {
3983            host: "rediss://[::1]:7000".to_string(),
3984            port: 6379,
3985        };
3986        assert_eq!(node.to_url(), "rediss://[::1]:7000");
3987    }
3988
3989    #[test]
3990    fn test_to_url_with_ipv6_full_address_no_port() {
3991        let node = ClusterNode {
3992            host: "rediss://[2001:db8::1]".to_string(),
3993            port: 6379,
3994        };
3995        assert_eq!(node.to_url(), "rediss://[2001:db8::1]:6379");
3996    }
3997
3998    #[test]
3999    fn test_to_url_with_ipv6_full_address_with_port() {
4000        let node = ClusterNode {
4001            host: "rediss://[2001:db8::1]:7000".to_string(),
4002            port: 6379,
4003        };
4004        assert_eq!(node.to_url(), "rediss://[2001:db8::1]:7000");
4005    }
4006
4007    #[test]
4008    fn test_to_url_with_redis_protocol_ipv6() {
4009        let node = ClusterNode {
4010            host: "redis://[::1]".to_string(),
4011            port: 6379,
4012        };
4013        assert_eq!(node.to_url(), "redis://[::1]:6379");
4014    }
4015}
4016
4017#[cfg(test)]
4018mod cors_config_tests {
4019    use super::CorsConfig;
4020
4021    fn cors_from_json(json: &str) -> sonic_rs::Result<CorsConfig> {
4022        sonic_rs::from_str(json)
4023    }
4024
4025    #[test]
4026    fn test_deserialize_valid_exact_origins() {
4027        let config =
4028            cors_from_json(r#"{"origin": ["https://example.com", "https://other.com"]}"#).unwrap();
4029        assert_eq!(config.origin.len(), 2);
4030    }
4031
4032    #[test]
4033    fn test_deserialize_valid_wildcard_patterns() {
4034        let config =
4035            cors_from_json(r#"{"origin": ["*.example.com", "https://*.staging.example.com"]}"#)
4036                .unwrap();
4037        assert_eq!(config.origin.len(), 2);
4038    }
4039
4040    #[test]
4041    fn test_deserialize_allows_special_markers() {
4042        assert!(cors_from_json(r#"{"origin": ["*"]}"#).is_ok());
4043        assert!(cors_from_json(r#"{"origin": ["Any"]}"#).is_ok());
4044        assert!(cors_from_json(r#"{"origin": ["any"]}"#).is_ok());
4045        assert!(cors_from_json(r#"{"origin": ["*", "https://example.com"]}"#).is_ok());
4046    }
4047
4048    #[test]
4049    fn test_deserialize_rejects_invalid_patterns() {
4050        assert!(cors_from_json(r#"{"origin": ["*.*bad"]}"#).is_err());
4051        assert!(cors_from_json(r#"{"origin": ["*."]}"#).is_err());
4052        assert!(cors_from_json(r#"{"origin": [""]}"#).is_err());
4053        assert!(cors_from_json(r#"{"origin": ["https://"]}"#).is_err());
4054    }
4055
4056    #[test]
4057    fn test_deserialize_rejects_mixed_valid_and_invalid() {
4058        assert!(cors_from_json(r#"{"origin": ["https://good.com", "*.*bad"]}"#).is_err());
4059    }
4060}