Skip to main content

sockudo_core/
options.rs

1use crate::app::App;
2use crate::utils::{parse_bool_env, parse_env, parse_env_optional};
3use ahash::AHashMap;
4use serde::{Deserialize, Serialize};
5use std::str::FromStr;
6use tracing::{info, warn};
7use url::Url;
8
9// Custom deserializer for octal permission mode (string format only, like chmod)
10fn deserialize_octal_permission<'de, D>(deserializer: D) -> Result<u32, D::Error>
11where
12    D: serde::Deserializer<'de>,
13{
14    use serde::de::{self};
15
16    let s = String::deserialize(deserializer)?;
17
18    // Validate that the string contains only octal digits
19    if !s.chars().all(|c| c.is_digit(8)) {
20        return Err(de::Error::custom(format!(
21            "invalid octal permission mode '{}': must contain only digits 0-7",
22            s
23        )));
24    }
25
26    // Parse as octal
27    let mode = u32::from_str_radix(&s, 8)
28        .map_err(|_| de::Error::custom(format!("invalid octal permission mode: {}", s)))?;
29
30    // Validate it's within valid Unix permission range
31    if mode > 0o777 {
32        return Err(de::Error::custom(format!(
33            "permission mode '{}' exceeds maximum value 777",
34            s
35        )));
36    }
37
38    Ok(mode)
39}
40
41// Helper function to parse driver enums with fallback behavior (matches main.rs)
42fn parse_driver_enum<T: FromStr + Clone + std::fmt::Debug>(
43    driver_str: String,
44    default_driver: T,
45    driver_name: &str,
46) -> T
47where
48    <T as FromStr>::Err: std::fmt::Debug,
49{
50    match T::from_str(&driver_str.to_lowercase()) {
51        Ok(driver_enum) => driver_enum,
52        Err(e) => {
53            warn!(
54                "Failed to parse {} driver from string '{}': {:?}. Using default: {:?}.",
55                driver_name, driver_str, e, default_driver
56            );
57            default_driver
58        }
59    }
60}
61
62fn override_db_pool_settings(db_conn: &mut DatabaseConnection, prefix: &str) {
63    if let Some(min) = parse_env_optional::<u32>(&format!("{}_POOL_MIN", prefix)) {
64        db_conn.pool_min = Some(min);
65    }
66    if let Some(max) = parse_env_optional::<u32>(&format!("{}_POOL_MAX", prefix)) {
67        db_conn.pool_max = Some(max);
68    }
69}
70
71// --- Enums for Driver Types ---
72
73#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(rename_all = "lowercase")]
75pub enum AdapterDriver {
76    #[default]
77    Local,
78    Redis,
79    #[serde(rename = "redis-cluster")]
80    RedisCluster,
81    Nats,
82    Pulsar,
83    RabbitMq,
84    #[serde(rename = "google-pubsub")]
85    GooglePubSub,
86    Kafka,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
90#[serde(default)]
91pub struct DynamoDbSettings {
92    pub region: String,
93    pub table_name: String,
94    pub endpoint_url: Option<String>,
95    pub aws_access_key_id: Option<String>,
96    pub aws_secret_access_key: Option<String>,
97    pub aws_profile_name: Option<String>,
98}
99
100impl Default for DynamoDbSettings {
101    fn default() -> Self {
102        Self {
103            region: "us-east-1".to_string(),
104            table_name: "sockudo-applications".to_string(),
105            endpoint_url: None,
106            aws_access_key_id: None,
107            aws_secret_access_key: None,
108            aws_profile_name: None,
109        }
110    }
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
114#[serde(default)]
115pub struct ScyllaDbSettings {
116    pub nodes: Vec<String>,
117    pub keyspace: String,
118    pub table_name: String,
119    pub username: Option<String>,
120    pub password: Option<String>,
121    pub replication_class: String,
122    pub replication_factor: u32,
123}
124
125impl Default for ScyllaDbSettings {
126    fn default() -> Self {
127        Self {
128            nodes: vec!["127.0.0.1:9042".to_string()],
129            keyspace: "sockudo".to_string(),
130            table_name: "applications".to_string(),
131            username: None,
132            password: None,
133            replication_class: "SimpleStrategy".to_string(),
134            replication_factor: 3,
135        }
136    }
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
140#[serde(default)]
141pub struct SurrealDbSettings {
142    pub url: String,
143    pub namespace: String,
144    pub database: String,
145    pub username: String,
146    pub password: String,
147    pub table_name: String,
148    pub cache_ttl: u64,
149    pub cache_max_capacity: u64,
150}
151
152impl Default for SurrealDbSettings {
153    fn default() -> Self {
154        Self {
155            url: "ws://127.0.0.1:8000".to_string(),
156            namespace: "sockudo".to_string(),
157            database: "sockudo".to_string(),
158            username: "root".to_string(),
159            password: "root".to_string(),
160            table_name: "applications".to_string(),
161            cache_ttl: 300,
162            cache_max_capacity: 100,
163        }
164    }
165}
166
167impl FromStr for AdapterDriver {
168    type Err = String;
169    fn from_str(s: &str) -> Result<Self, Self::Err> {
170        match s.to_lowercase().as_str() {
171            "local" => Ok(AdapterDriver::Local),
172            "redis" => Ok(AdapterDriver::Redis),
173            "redis-cluster" => Ok(AdapterDriver::RedisCluster),
174            "nats" => Ok(AdapterDriver::Nats),
175            "pulsar" => Ok(AdapterDriver::Pulsar),
176            "rabbitmq" | "rabbit-mq" => Ok(AdapterDriver::RabbitMq),
177            "google-pubsub" | "gcp-pubsub" | "pubsub" => Ok(AdapterDriver::GooglePubSub),
178            "kafka" => Ok(AdapterDriver::Kafka),
179            _ => Err(format!("Unknown adapter driver: {s}")),
180        }
181    }
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
185#[serde(rename_all = "lowercase")]
186pub enum AppManagerDriver {
187    #[default]
188    Memory,
189    Mysql,
190    Dynamodb,
191    PgSql,
192    SurrealDb,
193    ScyllaDb,
194}
195impl FromStr for AppManagerDriver {
196    type Err = String;
197    fn from_str(s: &str) -> Result<Self, Self::Err> {
198        match s.to_lowercase().as_str() {
199            "memory" => Ok(AppManagerDriver::Memory),
200            "mysql" => Ok(AppManagerDriver::Mysql),
201            "dynamodb" => Ok(AppManagerDriver::Dynamodb),
202            "pgsql" | "postgres" | "postgresql" => Ok(AppManagerDriver::PgSql),
203            "surreal" | "surrealdb" => Ok(AppManagerDriver::SurrealDb),
204            "scylladb" | "scylla" => Ok(AppManagerDriver::ScyllaDb),
205            _ => Err(format!("Unknown app manager driver: {s}")),
206        }
207    }
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
211#[serde(rename_all = "lowercase")]
212pub enum CacheDriver {
213    #[default]
214    Memory,
215    Redis,
216    #[serde(rename = "redis-cluster")]
217    RedisCluster,
218    None,
219}
220
221impl FromStr for CacheDriver {
222    type Err = String;
223    fn from_str(s: &str) -> Result<Self, Self::Err> {
224        match s.to_lowercase().as_str() {
225            "memory" => Ok(CacheDriver::Memory),
226            "redis" => Ok(CacheDriver::Redis),
227            "redis-cluster" => Ok(CacheDriver::RedisCluster),
228            "none" => Ok(CacheDriver::None),
229            _ => Err(format!("Unknown cache driver: {s}")),
230        }
231    }
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
235#[serde(rename_all = "lowercase")]
236pub enum QueueDriver {
237    Memory,
238    #[default]
239    Redis,
240    #[serde(rename = "redis-cluster")]
241    RedisCluster,
242    Nats,
243    Pulsar,
244    RabbitMq,
245    #[serde(rename = "google-pubsub")]
246    GooglePubSub,
247    Kafka,
248    Sqs,
249    Sns,
250    None,
251}
252
253impl FromStr for QueueDriver {
254    type Err = String;
255    fn from_str(s: &str) -> Result<Self, Self::Err> {
256        match s.to_lowercase().as_str() {
257            "memory" => Ok(QueueDriver::Memory),
258            "redis" => Ok(QueueDriver::Redis),
259            "redis-cluster" => Ok(QueueDriver::RedisCluster),
260            "nats" => Ok(QueueDriver::Nats),
261            "pulsar" => Ok(QueueDriver::Pulsar),
262            "rabbitmq" | "rabbit-mq" => Ok(QueueDriver::RabbitMq),
263            "google-pubsub" | "gcp-pubsub" | "pubsub" => Ok(QueueDriver::GooglePubSub),
264            "kafka" => Ok(QueueDriver::Kafka),
265            "sqs" => Ok(QueueDriver::Sqs),
266            "sns" => Ok(QueueDriver::Sns),
267            "none" => Ok(QueueDriver::None),
268            _ => Err(format!("Unknown queue driver: {s}")),
269        }
270    }
271}
272
273#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
274#[serde(rename_all = "snake_case")]
275pub enum DeltaCoordinationBackend {
276    #[default]
277    Auto,
278    None,
279    Redis,
280    RedisCluster,
281    Nats,
282}
283
284impl FromStr for DeltaCoordinationBackend {
285    type Err = String;
286
287    fn from_str(s: &str) -> Result<Self, Self::Err> {
288        match s.trim().to_ascii_lowercase().as_str() {
289            "auto" => Ok(Self::Auto),
290            "none" => Ok(Self::None),
291            "redis" => Ok(Self::Redis),
292            "redis-cluster" | "redis_cluster" => Ok(Self::RedisCluster),
293            "nats" => Ok(Self::Nats),
294            _ => Err(format!("Unknown delta coordination backend: {s}")),
295        }
296    }
297}
298
299impl AsRef<str> for QueueDriver {
300    fn as_ref(&self) -> &str {
301        match self {
302            QueueDriver::Memory => "memory",
303            QueueDriver::Redis => "redis",
304            QueueDriver::RedisCluster => "redis-cluster",
305            QueueDriver::Nats => "nats",
306            QueueDriver::Pulsar => "pulsar",
307            QueueDriver::RabbitMq => "rabbitmq",
308            QueueDriver::GooglePubSub => "google-pubsub",
309            QueueDriver::Kafka => "kafka",
310            QueueDriver::Sqs => "sqs",
311            QueueDriver::Sns => "sns",
312            QueueDriver::None => "none",
313        }
314    }
315}
316
317#[derive(Debug, Clone, Serialize, Deserialize)]
318#[serde(default)]
319pub struct RedisClusterQueueConfig {
320    pub concurrency: u32,
321    pub prefix: Option<String>,
322    pub nodes: Vec<String>,
323    pub request_timeout_ms: u64,
324}
325
326impl Default for RedisClusterQueueConfig {
327    fn default() -> Self {
328        Self {
329            concurrency: 5,
330            prefix: Some("sockudo_queue:".to_string()),
331            nodes: vec!["redis://127.0.0.1:6379".to_string()],
332            request_timeout_ms: 5000,
333        }
334    }
335}
336
337#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
338#[serde(rename_all = "lowercase")]
339pub enum MetricsDriver {
340    #[default]
341    Prometheus,
342}
343
344#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
345#[serde(rename_all = "lowercase")]
346pub enum LogOutputFormat {
347    #[default]
348    Human,
349    Json,
350}
351
352impl FromStr for LogOutputFormat {
353    type Err = String;
354    fn from_str(s: &str) -> Result<Self, Self::Err> {
355        match s.to_lowercase().as_str() {
356            "human" => Ok(LogOutputFormat::Human),
357            "json" => Ok(LogOutputFormat::Json),
358            _ => Err(format!("Unknown log output format: {s}")),
359        }
360    }
361}
362
363impl FromStr for MetricsDriver {
364    type Err = String;
365    fn from_str(s: &str) -> Result<Self, Self::Err> {
366        match s.to_lowercase().as_str() {
367            "prometheus" => Ok(MetricsDriver::Prometheus),
368            _ => Err(format!("Unknown metrics driver: {s}")),
369        }
370    }
371}
372
373impl AsRef<str> for MetricsDriver {
374    fn as_ref(&self) -> &str {
375        match self {
376            MetricsDriver::Prometheus => "prometheus",
377        }
378    }
379}
380
381// --- Main Configuration Struct ---
382#[derive(Debug, Clone, Serialize, Deserialize)]
383#[serde(default)]
384pub struct ServerOptions {
385    pub adapter: AdapterConfig,
386    pub app_manager: AppManagerConfig,
387    pub cache: CacheConfig,
388    pub channel_limits: ChannelLimits,
389    pub cors: CorsConfig,
390    pub database: DatabaseConfig,
391    pub database_pooling: DatabasePooling,
392    pub debug: bool,
393    pub event_limits: EventLimits,
394    pub host: String,
395    pub http_api: HttpApiConfig,
396    pub instance: InstanceConfig,
397    pub logging: Option<LoggingConfig>,
398    pub metrics: MetricsConfig,
399    pub mode: String,
400    pub port: u16,
401    pub path_prefix: String,
402    pub presence: PresenceConfig,
403    pub queue: QueueConfig,
404    pub rate_limiter: RateLimiterConfig,
405    pub shutdown_grace_period: u64,
406    pub ssl: SslConfig,
407    pub user_authentication_timeout: u64,
408    pub webhooks: WebhooksConfig,
409    pub websocket_max_payload_kb: u32,
410    pub cleanup: CleanupConfig,
411    pub activity_timeout: u64,
412    pub cluster_health: ClusterHealthConfig,
413    pub unix_socket: UnixSocketConfig,
414    pub delta_compression: DeltaCompressionOptionsConfig,
415    pub tag_filtering: TagFilteringConfig,
416    pub websocket: WebSocketConfig,
417    pub connection_recovery: ConnectionRecoveryConfig,
418    pub history: HistoryConfig,
419    pub presence_history: PresenceHistoryConfig,
420    pub idempotency: IdempotencyConfig,
421    pub ephemeral: EphemeralConfig,
422    pub echo_control: EchoControlConfig,
423    pub event_name_filtering: EventNameFilteringConfig,
424    pub versioned_messages: VersionedMessagesConfig,
425}
426
427// --- Configuration Sub-Structs ---
428
429#[derive(Debug, Clone, Serialize, Deserialize)]
430#[serde(default)]
431pub struct SqsQueueConfig {
432    pub region: String,
433    pub queue_url_prefix: Option<String>,
434    pub visibility_timeout: i32,
435    pub endpoint_url: Option<String>,
436    pub max_messages: i32,
437    pub wait_time_seconds: i32,
438    pub concurrency: u32,
439    pub fifo: bool,
440    pub message_group_id: Option<String>,
441}
442
443#[derive(Debug, Clone, Serialize, Deserialize)]
444#[serde(default)]
445pub struct SnsQueueConfig {
446    pub region: String,
447    pub topic_arn: String,
448    pub endpoint_url: Option<String>,
449}
450
451#[derive(Debug, Clone, Serialize, Deserialize)]
452#[serde(default)]
453pub struct AdapterConfig {
454    pub driver: AdapterDriver,
455    pub redis: RedisAdapterConfig,
456    pub cluster: RedisClusterAdapterConfig,
457    pub nats: NatsAdapterConfig,
458    pub pulsar: PulsarAdapterConfig,
459    pub rabbitmq: RabbitMqAdapterConfig,
460    pub google_pubsub: GooglePubSubAdapterConfig,
461    pub kafka: KafkaAdapterConfig,
462    #[serde(default = "default_buffer_multiplier_per_cpu")]
463    pub buffer_multiplier_per_cpu: usize,
464    pub cluster_health: ClusterHealthConfig,
465    #[serde(default = "default_enable_socket_counting")]
466    pub enable_socket_counting: bool,
467    #[serde(default = "default_fallback_to_local")]
468    pub fallback_to_local: bool,
469}
470
471fn default_enable_socket_counting() -> bool {
472    true
473}
474
475fn default_fallback_to_local() -> bool {
476    true
477}
478
479fn default_buffer_multiplier_per_cpu() -> usize {
480    64
481}
482
483impl Default for AdapterConfig {
484    fn default() -> Self {
485        Self {
486            driver: AdapterDriver::default(),
487            redis: RedisAdapterConfig::default(),
488            cluster: RedisClusterAdapterConfig::default(),
489            nats: NatsAdapterConfig::default(),
490            pulsar: PulsarAdapterConfig::default(),
491            rabbitmq: RabbitMqAdapterConfig::default(),
492            google_pubsub: GooglePubSubAdapterConfig::default(),
493            kafka: KafkaAdapterConfig::default(),
494            buffer_multiplier_per_cpu: default_buffer_multiplier_per_cpu(),
495            cluster_health: ClusterHealthConfig::default(),
496            enable_socket_counting: default_enable_socket_counting(),
497            fallback_to_local: default_fallback_to_local(),
498        }
499    }
500}
501
502#[derive(Debug, Clone, Serialize, Deserialize)]
503#[serde(default)]
504pub struct RedisAdapterConfig {
505    pub requests_timeout: u64,
506    pub prefix: String,
507    pub redis_pub_options: AHashMap<String, sonic_rs::Value>,
508    pub redis_sub_options: AHashMap<String, sonic_rs::Value>,
509    pub cluster_mode: bool,
510}
511
512#[derive(Debug, Clone, Serialize, Deserialize)]
513#[serde(default)]
514pub struct RedisClusterAdapterConfig {
515    pub nodes: Vec<String>,
516    pub prefix: String,
517    pub request_timeout_ms: u64,
518    pub use_connection_manager: bool,
519    #[serde(default)]
520    pub use_sharded_pubsub: bool,
521}
522
523#[derive(Debug, Clone, Serialize, Deserialize)]
524#[serde(default)]
525pub struct NatsAdapterConfig {
526    pub servers: Vec<String>,
527    pub prefix: String,
528    pub request_timeout_ms: u64,
529    pub username: Option<String>,
530    pub password: Option<String>,
531    pub token: Option<String>,
532    pub connection_timeout_ms: u64,
533    pub nodes_number: Option<u32>,
534    pub discovery_max_wait_ms: u64,
535    pub discovery_idle_wait_ms: u64,
536}
537
538#[derive(Debug, Clone, Serialize, Deserialize)]
539#[serde(default)]
540pub struct PulsarAdapterConfig {
541    pub url: String,
542    pub prefix: String,
543    pub request_timeout_ms: u64,
544    pub token: Option<String>,
545    pub nodes_number: Option<u32>,
546}
547
548#[derive(Debug, Clone, Serialize, Deserialize)]
549#[serde(default)]
550pub struct RabbitMqAdapterConfig {
551    pub url: String,
552    pub prefix: String,
553    pub request_timeout_ms: u64,
554    pub connection_timeout_ms: u64,
555    pub nodes_number: Option<u32>,
556}
557
558#[derive(Debug, Clone, Serialize, Deserialize)]
559#[serde(default)]
560pub struct GooglePubSubAdapterConfig {
561    pub project_id: String,
562    pub prefix: String,
563    pub request_timeout_ms: u64,
564    pub emulator_host: Option<String>,
565    pub nodes_number: Option<u32>,
566}
567
568#[derive(Debug, Clone, Serialize, Deserialize)]
569#[serde(default)]
570pub struct KafkaAdapterConfig {
571    pub brokers: Vec<String>,
572    pub prefix: String,
573    pub request_timeout_ms: u64,
574    pub security_protocol: Option<String>,
575    pub sasl_mechanism: Option<String>,
576    pub sasl_username: Option<String>,
577    pub sasl_password: Option<String>,
578    pub nodes_number: Option<u32>,
579}
580
581#[derive(Debug, Clone, Serialize, Deserialize, Default)]
582#[serde(default)]
583pub struct AppManagerConfig {
584    pub driver: AppManagerDriver,
585    pub array: ArrayConfig,
586    pub cache: CacheSettings,
587    pub scylladb: ScyllaDbSettings,
588}
589
590#[derive(Debug, Clone, Serialize, Deserialize, Default)]
591#[serde(default)]
592pub struct ArrayConfig {
593    pub apps: Vec<App>,
594}
595
596#[derive(Debug, Clone, Serialize, Deserialize)]
597#[serde(default)]
598pub struct CacheSettings {
599    pub enabled: bool,
600    pub ttl: u64,
601}
602
603#[derive(Debug, Clone, Serialize, Deserialize)]
604#[serde(default)]
605pub struct MemoryCacheOptions {
606    pub ttl: u64,
607    pub cleanup_interval: u64,
608    pub max_capacity: u64,
609}
610
611#[derive(Debug, Clone, Serialize, Deserialize)]
612#[serde(default)]
613pub struct CacheConfig {
614    pub driver: CacheDriver,
615    pub redis: RedisConfig,
616    pub memory: MemoryCacheOptions,
617}
618
619#[derive(Debug, Clone, Serialize, Deserialize, Default)]
620#[serde(default)]
621pub struct RedisConfig {
622    pub prefix: Option<String>,
623    pub url_override: Option<String>,
624    pub cluster_mode: bool,
625}
626
627#[derive(Debug, Clone, Serialize, Deserialize)]
628#[serde(default)]
629pub struct ChannelLimits {
630    pub max_name_length: u32,
631    pub cache_ttl: u64,
632}
633
634#[derive(Debug, Clone, Serialize, Deserialize)]
635#[serde(default)]
636pub struct CorsConfig {
637    pub credentials: bool,
638    #[serde(deserialize_with = "deserialize_and_validate_cors_origins")]
639    pub origin: Vec<String>,
640    pub methods: Vec<String>,
641    pub allowed_headers: Vec<String>,
642}
643
644fn deserialize_and_validate_cors_origins<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
645where
646    D: serde::Deserializer<'de>,
647{
648    use serde::de::Error;
649    let origins = Vec::<String>::deserialize(deserializer)?;
650
651    if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&origins) {
652        return Err(D::Error::custom(format!(
653            "CORS origin pattern validation failed: {}",
654            e
655        )));
656    }
657
658    Ok(origins)
659}
660
661#[derive(Debug, Clone, Serialize, Deserialize, Default)]
662#[serde(default)]
663pub struct DatabaseConfig {
664    pub mysql: DatabaseConnection,
665    pub postgres: DatabaseConnection,
666    pub redis: RedisConnection,
667    pub dynamodb: DynamoDbSettings,
668    pub surrealdb: SurrealDbSettings,
669    pub scylladb: ScyllaDbSettings,
670}
671
672#[derive(Debug, Clone, Serialize, Deserialize)]
673#[serde(default)]
674pub struct DatabaseConnection {
675    pub host: String,
676    pub port: u16,
677    pub username: String,
678    pub password: String,
679    pub database: String,
680    pub table_name: String,
681    pub connection_pool_size: u32,
682    pub pool_min: Option<u32>,
683    pub pool_max: Option<u32>,
684    pub cache_ttl: u64,
685    pub cache_cleanup_interval: u64,
686    pub cache_max_capacity: u64,
687}
688
689#[derive(Debug, Clone, Serialize, Deserialize)]
690#[serde(default)]
691pub struct RedisConnection {
692    pub host: String,
693    pub port: u16,
694    pub db: u32,
695    pub username: Option<String>,
696    pub password: Option<String>,
697    pub key_prefix: String,
698    pub sentinels: Vec<RedisSentinel>,
699    pub sentinel_password: Option<String>,
700    pub name: String,
701    pub cluster: RedisClusterConnection,
702    /// Legacy field kept for backward compatibility. Prefer `database.redis.cluster.nodes`.
703    pub cluster_nodes: Vec<ClusterNode>,
704}
705
706#[derive(Debug, Clone, Serialize, Deserialize)]
707#[serde(default)]
708pub struct RedisSentinel {
709    pub host: String,
710    pub port: u16,
711}
712
713#[derive(Debug, Clone, Serialize, Deserialize, Default)]
714#[serde(default)]
715pub struct RedisClusterConnection {
716    pub nodes: Vec<ClusterNode>,
717    pub username: Option<String>,
718    pub password: Option<String>,
719    #[serde(alias = "useTLS")]
720    pub use_tls: bool,
721}
722
723impl RedisConnection {
724    /// Returns true if Redis Sentinel is configured.
725    pub fn is_sentinel_configured(&self) -> bool {
726        !self.sentinels.is_empty()
727    }
728
729    /// Builds a Redis connection URL based on the configuration.
730    pub fn to_url(&self) -> String {
731        if self.is_sentinel_configured() {
732            self.build_sentinel_url()
733        } else {
734            self.build_standard_url()
735        }
736    }
737
738    fn build_standard_url(&self) -> String {
739        // Extract scheme from host if present, otherwise default to redis://
740        let (scheme, host) = if self.host.starts_with("rediss://") {
741            ("rediss://", self.host.trim_start_matches("rediss://"))
742        } else if self.host.starts_with("redis://") {
743            ("redis://", self.host.trim_start_matches("redis://"))
744        } else {
745            ("redis://", self.host.as_str())
746        };
747
748        let mut url = String::from(scheme);
749
750        if let Some(ref username) = self.username {
751            url.push_str(username);
752            if let Some(ref password) = self.password {
753                url.push(':');
754                url.push_str(&urlencoding::encode(password));
755            }
756            url.push('@');
757        } else if let Some(ref password) = self.password {
758            url.push(':');
759            url.push_str(&urlencoding::encode(password));
760            url.push('@');
761        }
762
763        url.push_str(host);
764        url.push(':');
765        url.push_str(&self.port.to_string());
766        url.push('/');
767        url.push_str(&self.db.to_string());
768
769        url
770    }
771
772    fn build_sentinel_url(&self) -> String {
773        let mut url = String::from("redis+sentinel://");
774
775        if let Some(ref sentinel_password) = self.sentinel_password {
776            url.push(':');
777            url.push_str(&urlencoding::encode(sentinel_password));
778            url.push('@');
779        }
780
781        let sentinel_hosts: Vec<String> = self
782            .sentinels
783            .iter()
784            .map(|s| format!("{}:{}", s.host, s.port))
785            .collect();
786        url.push_str(&sentinel_hosts.join(","));
787
788        url.push('/');
789        url.push_str(&self.name);
790        url.push('/');
791        url.push_str(&self.db.to_string());
792
793        let mut params = Vec::new();
794        if let Some(ref password) = self.password {
795            params.push(format!("password={}", urlencoding::encode(password)));
796        }
797        if let Some(ref username) = self.username {
798            params.push(format!("username={}", urlencoding::encode(username)));
799        }
800
801        if !params.is_empty() {
802            url.push('?');
803            url.push_str(&params.join("&"));
804        }
805
806        url
807    }
808
809    /// Returns true when cluster nodes are configured via either the new (`cluster.nodes`)
810    /// or legacy (`cluster_nodes`) field.
811    pub fn has_cluster_nodes(&self) -> bool {
812        !self.cluster.nodes.is_empty() || !self.cluster_nodes.is_empty()
813    }
814
815    /// Returns normalized Redis Cluster seed URLs from the canonical cluster configuration.
816    /// Falls back to legacy `cluster_nodes` for backward compatibility.
817    pub fn cluster_node_urls(&self) -> Vec<String> {
818        if !self.cluster.nodes.is_empty() {
819            return self.build_cluster_urls(&self.cluster.nodes);
820        }
821        self.build_cluster_urls(&self.cluster_nodes)
822    }
823
824    /// Normalizes any list of seed strings (`host:port`, `redis://...`, `rediss://...`) using
825    /// shared cluster auth/TLS options.
826    pub fn normalize_cluster_seed_urls(&self, seeds: &[String]) -> Vec<String> {
827        self.build_cluster_urls(
828            &seeds
829                .iter()
830                .filter_map(|seed| ClusterNode::from_seed(seed))
831                .collect::<Vec<ClusterNode>>(),
832        )
833    }
834
835    fn build_cluster_urls(&self, nodes: &[ClusterNode]) -> Vec<String> {
836        let username = self
837            .cluster
838            .username
839            .as_deref()
840            .or(self.username.as_deref());
841        let password = self
842            .cluster
843            .password
844            .as_deref()
845            .or(self.password.as_deref());
846        let use_tls = self.cluster.use_tls;
847
848        nodes
849            .iter()
850            .map(|node| node.to_url_with_options(use_tls, username, password))
851            .collect()
852    }
853}
854
855impl RedisSentinel {
856    pub fn to_host_port(&self) -> String {
857        format!("{}:{}", self.host, self.port)
858    }
859}
860
861#[derive(Debug, Clone, Serialize, Deserialize)]
862#[serde(default)]
863pub struct ClusterNode {
864    pub host: String,
865    pub port: u16,
866}
867
868impl ClusterNode {
869    pub fn to_url(&self) -> String {
870        self.to_url_with_options(false, None, None)
871    }
872
873    pub fn to_url_with_options(
874        &self,
875        use_tls: bool,
876        username: Option<&str>,
877        password: Option<&str>,
878    ) -> String {
879        let host = self.host.trim();
880
881        if host.starts_with("redis://") || host.starts_with("rediss://") {
882            if let Ok(parsed) = Url::parse(host)
883                && let Some(host_str) = parsed.host_str()
884            {
885                let scheme = parsed.scheme();
886                let port = parsed.port_or_known_default().unwrap_or(self.port);
887                let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
888                let parsed_password = parsed.password();
889                let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
890                let (effective_username, effective_password) = if has_embedded_auth {
891                    (parsed_username, parsed_password)
892                } else {
893                    (username, password)
894                };
895
896                return build_redis_url(
897                    scheme,
898                    host_str,
899                    port,
900                    effective_username,
901                    effective_password,
902                );
903            }
904
905            // Fallback for malformed URLs
906            let has_port = if let Some(bracket_pos) = host.rfind(']') {
907                host[bracket_pos..].contains(':')
908            } else {
909                host.split(':').count() >= 3
910            };
911            let base = if has_port {
912                host.to_string()
913            } else {
914                format!("{}:{}", host, self.port)
915            };
916
917            if let Ok(parsed) = Url::parse(&base) {
918                let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
919                let parsed_password = parsed.password();
920                if let Some(host_str) = parsed.host_str() {
921                    let port = parsed.port_or_known_default().unwrap_or(self.port);
922                    let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
923                    let (effective_username, effective_password) = if has_embedded_auth {
924                        (parsed_username, parsed_password)
925                    } else {
926                        (username, password)
927                    };
928                    return build_redis_url(
929                        parsed.scheme(),
930                        host_str,
931                        port,
932                        effective_username,
933                        effective_password,
934                    );
935                }
936            }
937            return base;
938        }
939
940        let (normalized_host, normalized_port) = split_plain_host_and_port(host, self.port);
941        let scheme = if use_tls { "rediss" } else { "redis" };
942        build_redis_url(
943            scheme,
944            &normalized_host,
945            normalized_port,
946            username,
947            password,
948        )
949    }
950
951    pub fn from_seed(seed: &str) -> Option<Self> {
952        let trimmed = seed.trim();
953        if trimmed.is_empty() {
954            return None;
955        }
956
957        if trimmed.starts_with("redis://") || trimmed.starts_with("rediss://") {
958            let port = Url::parse(trimmed)
959                .ok()
960                .and_then(|parsed| parsed.port_or_known_default())
961                .unwrap_or(6379);
962            return Some(Self {
963                host: trimmed.to_string(),
964                port,
965            });
966        }
967
968        let (host, port) = split_plain_host_and_port(trimmed, 6379);
969        Some(Self { host, port })
970    }
971}
972
973fn split_plain_host_and_port(raw_host: &str, default_port: u16) -> (String, u16) {
974    let host = raw_host.trim();
975
976    // Handle bracketed IPv6: [::1]:6379
977    if host.starts_with('[') {
978        if let Some(end_bracket) = host.find(']') {
979            let host_part = host[1..end_bracket].to_string();
980            let remainder = &host[end_bracket + 1..];
981            if let Some(port_str) = remainder.strip_prefix(':')
982                && let Ok(port) = port_str.parse::<u16>()
983            {
984                return (host_part, port);
985            }
986            return (host_part, default_port);
987        }
988        return (host.to_string(), default_port);
989    }
990
991    // Handle hostname/IP with port: host:6379
992    if host.matches(':').count() == 1
993        && let Some((host_part, port_part)) = host.rsplit_once(':')
994        && let Ok(port) = port_part.parse::<u16>()
995    {
996        return (host_part.to_string(), port);
997    }
998
999    (host.to_string(), default_port)
1000}
1001
1002fn build_redis_url(
1003    scheme: &str,
1004    host: &str,
1005    port: u16,
1006    username: Option<&str>,
1007    password: Option<&str>,
1008) -> String {
1009    let mut url = format!("{scheme}://");
1010
1011    if let Some(user) = username {
1012        url.push_str(&urlencoding::encode(user));
1013        if let Some(pass) = password {
1014            url.push(':');
1015            url.push_str(&urlencoding::encode(pass));
1016        }
1017        url.push('@');
1018    } else if let Some(pass) = password {
1019        url.push(':');
1020        url.push_str(&urlencoding::encode(pass));
1021        url.push('@');
1022    }
1023
1024    if host.contains(':') && !host.starts_with('[') {
1025        url.push('[');
1026        url.push_str(host);
1027        url.push(']');
1028    } else {
1029        url.push_str(host);
1030    }
1031    url.push(':');
1032    url.push_str(&port.to_string());
1033    url
1034}
1035
1036#[derive(Debug, Clone, Serialize, Deserialize)]
1037#[serde(default)]
1038pub struct DatabasePooling {
1039    pub enabled: bool,
1040    pub min: u32,
1041    pub max: u32,
1042}
1043
1044#[derive(Debug, Clone, Serialize, Deserialize)]
1045#[serde(default)]
1046pub struct EventLimits {
1047    pub max_channels_at_once: u32,
1048    pub max_name_length: u32,
1049    pub max_payload_in_kb: u32,
1050    pub max_batch_size: u32,
1051}
1052
1053#[derive(Debug, Clone, Serialize, Deserialize)]
1054#[serde(default)]
1055pub struct IdempotencyConfig {
1056    /// Whether idempotency key support is enabled
1057    pub enabled: bool,
1058    /// TTL in seconds for idempotency keys (default: 120s, like Ably's 2-minute window)
1059    pub ttl_seconds: u64,
1060    /// Maximum length of an idempotency key
1061    pub max_key_length: usize,
1062}
1063
1064#[derive(Debug, Clone, Serialize, Deserialize)]
1065#[serde(default)]
1066pub struct EphemeralConfig {
1067    /// Whether ephemeral message handling is enabled.
1068    pub enabled: bool,
1069}
1070
1071#[derive(Debug, Clone, Serialize, Deserialize)]
1072#[serde(default)]
1073pub struct EchoControlConfig {
1074    /// Whether connection-level and per-message echo control is enabled.
1075    pub enabled: bool,
1076    /// Default echo behavior for new V2 connections when the query param is omitted.
1077    pub default_echo_messages: bool,
1078}
1079
1080#[derive(Debug, Clone, Serialize, Deserialize)]
1081#[serde(default)]
1082pub struct EventNameFilteringConfig {
1083    /// Whether per-subscription event name filtering is enabled.
1084    pub enabled: bool,
1085    /// Maximum event names allowed in a single filter.
1086    pub max_events_per_filter: usize,
1087    /// Maximum length for each event name in a filter.
1088    pub max_event_name_length: usize,
1089}
1090
1091#[derive(Debug, Clone, Serialize, Deserialize)]
1092#[serde(default)]
1093pub struct ConnectionRecoveryConfig {
1094    /// Whether connection recovery (resume) is enabled.
1095    /// When enabled, the server keeps a bounded replay buffer per channel so that
1096    /// reconnecting clients can receive missed messages. Disabled by default for
1097    /// Pusher protocol compatibility.
1098    pub enabled: bool,
1099    /// How long messages stay in the replay buffer (seconds). Default: 120 (2 min).
1100    pub buffer_ttl_seconds: u64,
1101    /// Maximum number of messages kept per channel. Default: 100.
1102    pub max_buffer_size: usize,
1103}
1104
1105#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1106#[serde(rename_all = "lowercase")]
1107pub enum VersionStoreDriver {
1108    #[default]
1109    Memory,
1110    Postgres,
1111    Mysql,
1112    DynamoDb,
1113    ScyllaDb,
1114    SurrealDb,
1115}
1116
1117impl FromStr for VersionStoreDriver {
1118    type Err = String;
1119    fn from_str(s: &str) -> Result<Self, Self::Err> {
1120        match s.to_lowercase().as_str() {
1121            "postgres" | "postgresql" | "pgsql" => Ok(Self::Postgres),
1122            "mysql" => Ok(Self::Mysql),
1123            "dynamodb" => Ok(Self::DynamoDb),
1124            "scylladb" | "scylla" => Ok(Self::ScyllaDb),
1125            "surrealdb" | "surreal" => Ok(Self::SurrealDb),
1126            "memory" => Ok(Self::Memory),
1127            _ => Err(format!("Unknown version store driver: {s}")),
1128        }
1129    }
1130}
1131
1132#[derive(Debug, Clone, Serialize, Deserialize)]
1133#[serde(default)]
1134pub struct VersionedMessagesConfig {
1135    /// Whether V2 mutable message HTTP/retrieval surfaces are enabled.
1136    pub enabled: bool,
1137    /// Storage driver for versioned messages. Defaults to memory.
1138    pub driver: VersionStoreDriver,
1139    /// Maximum page size for version-history retrieval.
1140    pub max_page_size: usize,
1141    /// Retention window for version entries, in seconds. `0` disables expiry
1142    /// (entries are kept forever).
1143    ///
1144    /// Backends with native row TTL (ScyllaDB `USING TTL`, DynamoDB TTL
1145    /// attribute) apply this at the storage layer. Other backends (MySQL,
1146    /// PostgreSQL, SurrealDB, Memory) enforce it via a periodic background
1147    /// purge worker controlled by `purge_interval_seconds`.
1148    pub retention_window_seconds: u64,
1149    /// Interval between purge worker runs, in seconds. Only consulted for
1150    /// backends without native TTL. Clamped to a minimum of 10 seconds.
1151    pub purge_interval_seconds: u64,
1152    /// Maximum rows deleted per purge query. Bounds lock/transaction sizes
1153    /// for SQL backends. The worker loops until no more expired rows remain
1154    /// or `max_purge_per_tick` is reached.
1155    pub purge_batch_size: usize,
1156    /// Hard cap on rows deleted per purge tick across all loop iterations.
1157    /// Prevents a backlog of expired rows from monopolising a worker run.
1158    pub max_purge_per_tick: usize,
1159}
1160
1161#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1162#[serde(rename_all = "lowercase")]
1163pub enum HistoryBackend {
1164    #[default]
1165    Postgres,
1166    Mysql,
1167    DynamoDb,
1168    SurrealDb,
1169    ScyllaDb,
1170    Memory,
1171}
1172
1173impl FromStr for HistoryBackend {
1174    type Err = String;
1175    fn from_str(s: &str) -> Result<Self, Self::Err> {
1176        match s.to_lowercase().as_str() {
1177            "postgres" | "postgresql" | "pgsql" => Ok(Self::Postgres),
1178            "mysql" => Ok(Self::Mysql),
1179            "dynamodb" => Ok(Self::DynamoDb),
1180            "surrealdb" | "surreal" => Ok(Self::SurrealDb),
1181            "scylladb" | "scylla" => Ok(Self::ScyllaDb),
1182            "memory" => Ok(Self::Memory),
1183            _ => Err(format!("Unknown history backend: {s}")),
1184        }
1185    }
1186}
1187
1188#[derive(Debug, Clone, Serialize, Deserialize)]
1189#[serde(default)]
1190pub struct PostgresHistoryConfig {
1191    pub table_prefix: String,
1192    pub write_timeout_ms: u64,
1193}
1194
1195impl Default for PostgresHistoryConfig {
1196    fn default() -> Self {
1197        Self {
1198            table_prefix: "sockudo_history".to_string(),
1199            write_timeout_ms: 5000,
1200        }
1201    }
1202}
1203
1204#[derive(Debug, Clone, Serialize, Deserialize)]
1205#[serde(default)]
1206pub struct MySqlHistoryConfig {
1207    pub table_prefix: String,
1208    pub write_timeout_ms: u64,
1209}
1210
1211impl Default for MySqlHistoryConfig {
1212    fn default() -> Self {
1213        Self {
1214            table_prefix: "sockudo_history".to_string(),
1215            write_timeout_ms: 5000,
1216        }
1217    }
1218}
1219
1220#[derive(Debug, Clone, Serialize, Deserialize)]
1221#[serde(default)]
1222pub struct DynamoDbHistoryConfig {
1223    pub table_prefix: String,
1224    pub write_timeout_ms: u64,
1225}
1226
1227impl Default for DynamoDbHistoryConfig {
1228    fn default() -> Self {
1229        Self {
1230            table_prefix: "sockudo_history".to_string(),
1231            write_timeout_ms: 5000,
1232        }
1233    }
1234}
1235
1236#[derive(Debug, Clone, Serialize, Deserialize)]
1237#[serde(default)]
1238pub struct SurrealDbHistoryConfig {
1239    pub table_prefix: String,
1240    pub write_timeout_ms: u64,
1241}
1242
1243impl Default for SurrealDbHistoryConfig {
1244    fn default() -> Self {
1245        Self {
1246            table_prefix: "sockudo_history".to_string(),
1247            write_timeout_ms: 5000,
1248        }
1249    }
1250}
1251
1252#[derive(Debug, Clone, Serialize, Deserialize)]
1253#[serde(default)]
1254pub struct ScyllaDbHistoryConfig {
1255    pub table_prefix: String,
1256    pub write_timeout_ms: u64,
1257}
1258
1259impl Default for ScyllaDbHistoryConfig {
1260    fn default() -> Self {
1261        Self {
1262            table_prefix: "sockudo_history".to_string(),
1263            write_timeout_ms: 5000,
1264        }
1265    }
1266}
1267
1268#[derive(Debug, Clone, Serialize, Deserialize)]
1269#[serde(default)]
1270pub struct HistoryConfig {
1271    pub enabled: bool,
1272    pub rewind_enabled: bool,
1273    pub backend: HistoryBackend,
1274    pub retention_window_seconds: u64,
1275    pub max_page_size: usize,
1276    pub max_messages_per_channel: Option<usize>,
1277    pub max_bytes_per_channel: Option<u64>,
1278    pub writer_shards: usize,
1279    pub writer_queue_capacity: usize,
1280    pub purge_interval_seconds: u64,
1281    pub purge_batch_size: usize,
1282    pub max_purge_per_tick: usize,
1283    pub postgres: PostgresHistoryConfig,
1284    pub mysql: MySqlHistoryConfig,
1285    pub dynamodb: DynamoDbHistoryConfig,
1286    pub surrealdb: SurrealDbHistoryConfig,
1287    pub scylladb: ScyllaDbHistoryConfig,
1288}
1289
1290impl Default for HistoryConfig {
1291    fn default() -> Self {
1292        Self {
1293            enabled: false,
1294            rewind_enabled: true,
1295            backend: HistoryBackend::Postgres,
1296            retention_window_seconds: 86400,
1297            max_page_size: 100,
1298            max_messages_per_channel: None,
1299            max_bytes_per_channel: None,
1300            writer_shards: 16,
1301            writer_queue_capacity: 4096,
1302            purge_interval_seconds: 300,
1303            purge_batch_size: 1000,
1304            max_purge_per_tick: 100_000,
1305            postgres: PostgresHistoryConfig::default(),
1306            mysql: MySqlHistoryConfig::default(),
1307            dynamodb: DynamoDbHistoryConfig::default(),
1308            surrealdb: SurrealDbHistoryConfig::default(),
1309            scylladb: ScyllaDbHistoryConfig::default(),
1310        }
1311    }
1312}
1313
1314impl Default for VersionedMessagesConfig {
1315    fn default() -> Self {
1316        Self {
1317            enabled: false,
1318            driver: VersionStoreDriver::Memory,
1319            max_page_size: 100,
1320            retention_window_seconds: 0,
1321            purge_interval_seconds: 300,
1322            purge_batch_size: 1000,
1323            max_purge_per_tick: 100_000,
1324        }
1325    }
1326}
1327
1328#[derive(Debug, Clone, Serialize, Deserialize)]
1329#[serde(default)]
1330pub struct PresenceHistoryConfig {
1331    pub enabled: bool,
1332    pub retention_window_seconds: u64,
1333    pub max_page_size: usize,
1334    pub max_events_per_channel: Option<usize>,
1335    pub max_bytes_per_channel: Option<u64>,
1336}
1337
1338impl Default for PresenceHistoryConfig {
1339    fn default() -> Self {
1340        Self {
1341            enabled: false,
1342            retention_window_seconds: 86400,
1343            max_page_size: 100,
1344            max_events_per_channel: None,
1345            max_bytes_per_channel: None,
1346        }
1347    }
1348}
1349
1350#[derive(Debug, Clone, Serialize, Deserialize)]
1351#[serde(default)]
1352pub struct HttpApiConfig {
1353    pub request_limit_in_mb: u32,
1354    pub accept_traffic: AcceptTraffic,
1355    pub usage_enabled: bool,
1356}
1357
1358#[derive(Debug, Clone, Serialize, Deserialize)]
1359#[serde(default)]
1360pub struct AcceptTraffic {
1361    pub memory_threshold: f64,
1362}
1363
1364#[derive(Debug, Clone, Serialize, Deserialize)]
1365#[serde(default)]
1366pub struct InstanceConfig {
1367    pub process_id: String,
1368}
1369
1370#[derive(Debug, Clone, Serialize, Deserialize)]
1371#[serde(default)]
1372pub struct MetricsConfig {
1373    pub enabled: bool,
1374    pub driver: MetricsDriver,
1375    pub host: String,
1376    pub prometheus: PrometheusConfig,
1377    pub port: u16,
1378}
1379
1380#[derive(Debug, Clone, Serialize, Deserialize)]
1381#[serde(default)]
1382pub struct PrometheusConfig {
1383    pub prefix: String,
1384}
1385
1386#[derive(Debug, Clone, Serialize, Deserialize)]
1387#[serde(default)]
1388pub struct LoggingConfig {
1389    pub colors_enabled: bool,
1390    pub include_target: bool,
1391}
1392
1393#[derive(Debug, Clone, Serialize, Deserialize)]
1394#[serde(default)]
1395pub struct PresenceConfig {
1396    pub max_members_per_channel: u32,
1397    pub max_member_size_in_kb: u32,
1398}
1399
1400/// WebSocket connection buffer configuration
1401/// Controls backpressure handling for slow consumers
1402#[derive(Debug, Clone, Serialize, Deserialize)]
1403#[serde(default)]
1404pub struct WebSocketConfig {
1405    pub max_messages: Option<usize>,
1406    pub max_bytes: Option<usize>,
1407    pub disconnect_on_buffer_full: bool,
1408    pub max_message_size: usize,
1409    pub max_frame_size: usize,
1410    pub write_buffer_size: usize,
1411    pub max_backpressure: usize,
1412    pub auto_ping: bool,
1413    pub ping_interval: u32,
1414    pub idle_timeout: u32,
1415    pub compression: String,
1416}
1417
1418impl Default for WebSocketConfig {
1419    fn default() -> Self {
1420        Self {
1421            max_messages: Some(1000),
1422            max_bytes: None,
1423            disconnect_on_buffer_full: true,
1424            max_message_size: 64 * 1024 * 1024,
1425            max_frame_size: 16 * 1024 * 1024,
1426            write_buffer_size: 16 * 1024,
1427            max_backpressure: 1024 * 1024,
1428            auto_ping: true,
1429            ping_interval: 30,
1430            idle_timeout: 120,
1431            compression: "disabled".to_string(),
1432        }
1433    }
1434}
1435
1436impl WebSocketConfig {
1437    /// Convert to WebSocketBufferConfig for runtime use
1438    pub fn to_buffer_config(&self) -> crate::websocket::WebSocketBufferConfig {
1439        use crate::websocket::{BufferLimit, WebSocketBufferConfig};
1440
1441        let limit = match (self.max_messages, self.max_bytes) {
1442            (Some(messages), Some(bytes)) => BufferLimit::Both { messages, bytes },
1443            (Some(messages), None) => BufferLimit::Messages(messages),
1444            (None, Some(bytes)) => BufferLimit::Bytes(bytes),
1445            (None, None) => BufferLimit::Messages(1000),
1446        };
1447
1448        WebSocketBufferConfig {
1449            limit,
1450            disconnect_on_full: self.disconnect_on_buffer_full,
1451        }
1452    }
1453
1454    /// Convert to native sockudo-ws runtime configuration.
1455    pub fn to_sockudo_ws_config(
1456        &self,
1457        websocket_max_payload_kb: u32,
1458        activity_timeout: u64,
1459    ) -> sockudo_ws::Config {
1460        use sockudo_ws::Compression;
1461
1462        let compression = match self.compression.to_lowercase().as_str() {
1463            "dedicated" => Compression::Dedicated,
1464            "shared" => Compression::Shared,
1465            "window256b" => Compression::Window256B,
1466            "window1kb" => Compression::Window1KB,
1467            "window2kb" => Compression::Window2KB,
1468            "window4kb" => Compression::Window4KB,
1469            "window8kb" => Compression::Window8KB,
1470            "window16kb" => Compression::Window16KB,
1471            "window32kb" => Compression::Window32KB,
1472            _ => Compression::Disabled,
1473        };
1474
1475        sockudo_ws::Config::builder()
1476            .max_payload_length(
1477                self.max_bytes
1478                    .unwrap_or(websocket_max_payload_kb as usize * 1024),
1479            )
1480            .max_message_size(self.max_message_size)
1481            .max_frame_size(self.max_frame_size)
1482            .write_buffer_size(self.write_buffer_size)
1483            .max_backpressure(self.max_backpressure)
1484            .idle_timeout(self.idle_timeout)
1485            .auto_ping(self.auto_ping)
1486            .ping_interval(self.ping_interval.max((activity_timeout / 2).max(5) as u32))
1487            .compression(compression)
1488            .build()
1489    }
1490}
1491
1492#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1493#[serde(default)]
1494pub struct QueueConfig {
1495    pub driver: QueueDriver,
1496    pub redis: RedisQueueConfig,
1497    pub redis_cluster: RedisClusterQueueConfig,
1498    pub nats: NatsAdapterConfig,
1499    pub pulsar: PulsarAdapterConfig,
1500    pub rabbitmq: RabbitMqAdapterConfig,
1501    pub google_pubsub: GooglePubSubAdapterConfig,
1502    pub kafka: KafkaAdapterConfig,
1503    pub sqs: SqsQueueConfig,
1504    pub sns: SnsQueueConfig,
1505}
1506
1507#[derive(Debug, Clone, Serialize, Deserialize)]
1508#[serde(default)]
1509pub struct RedisQueueConfig {
1510    pub concurrency: u32,
1511    pub prefix: Option<String>,
1512    pub url_override: Option<String>,
1513    pub cluster_mode: bool,
1514}
1515
1516#[derive(Clone, Debug, Serialize, Deserialize)]
1517#[serde(default)]
1518pub struct RateLimit {
1519    pub max_requests: u32,
1520    pub window_seconds: u64,
1521    pub identifier: Option<String>,
1522    pub trust_hops: Option<u32>,
1523}
1524
1525#[derive(Debug, Clone, Serialize, Deserialize)]
1526#[serde(default)]
1527pub struct RateLimiterConfig {
1528    pub enabled: bool,
1529    pub driver: CacheDriver,
1530    pub api_rate_limit: RateLimit,
1531    pub websocket_rate_limit: RateLimit,
1532    pub redis: RedisConfig,
1533}
1534
1535#[derive(Debug, Clone, Serialize, Deserialize)]
1536#[serde(default)]
1537pub struct SslConfig {
1538    pub enabled: bool,
1539    pub cert_path: String,
1540    pub key_path: String,
1541    pub passphrase: Option<String>,
1542    pub ca_path: Option<String>,
1543    pub redirect_http: bool,
1544    pub http_port: Option<u16>,
1545}
1546
1547#[derive(Debug, Clone, Serialize, Deserialize)]
1548#[serde(default)]
1549pub struct WebhooksConfig {
1550    pub batching: BatchingConfig,
1551    pub retry: WebhookRetryConfig,
1552    pub request_timeout_ms: u64,
1553}
1554
1555#[derive(Debug, Clone, Serialize, Deserialize)]
1556#[serde(default)]
1557pub struct BatchingConfig {
1558    pub enabled: bool,
1559    pub duration: u64,
1560    pub size: usize,
1561}
1562
1563#[derive(Debug, Clone, Serialize, Deserialize)]
1564#[serde(default)]
1565pub struct WebhookRetryConfig {
1566    pub enabled: bool,
1567    pub max_attempts: Option<u32>,
1568    pub max_elapsed_time_ms: u64,
1569    pub initial_backoff_ms: u64,
1570    pub max_backoff_ms: u64,
1571}
1572
1573impl Default for WebhooksConfig {
1574    fn default() -> Self {
1575        Self {
1576            batching: BatchingConfig::default(),
1577            retry: WebhookRetryConfig::default(),
1578            request_timeout_ms: 10_000,
1579        }
1580    }
1581}
1582
1583impl Default for WebhookRetryConfig {
1584    fn default() -> Self {
1585        Self {
1586            enabled: true,
1587            max_attempts: None,
1588            max_elapsed_time_ms: 300_000,
1589            initial_backoff_ms: 1_000,
1590            max_backoff_ms: 60_000,
1591        }
1592    }
1593}
1594
1595#[derive(Debug, Clone, Serialize, Deserialize)]
1596#[serde(default)]
1597pub struct ClusterHealthConfig {
1598    pub enabled: bool,
1599    pub heartbeat_interval_ms: u64,
1600    pub node_timeout_ms: u64,
1601    pub cleanup_interval_ms: u64,
1602}
1603
1604#[derive(Debug, Clone, Serialize, Deserialize)]
1605#[serde(default)]
1606pub struct UnixSocketConfig {
1607    pub enabled: bool,
1608    pub path: String,
1609    #[serde(deserialize_with = "deserialize_octal_permission")]
1610    pub permission_mode: u32,
1611}
1612
1613#[derive(Debug, Clone, Serialize, Deserialize)]
1614#[serde(default)]
1615pub struct DeltaCompressionOptionsConfig {
1616    pub enabled: bool,
1617    pub algorithm: String,
1618    pub full_message_interval: u32,
1619    pub min_message_size: usize,
1620    pub max_state_age_secs: u64,
1621    pub max_channel_states_per_socket: usize,
1622    pub max_conflation_states_per_channel: Option<usize>,
1623    pub conflation_key_path: Option<String>,
1624    pub cluster_coordination: bool,
1625    pub coordination_backend: DeltaCoordinationBackend,
1626    pub omit_delta_algorithm: bool,
1627}
1628
1629/// Cleanup system configuration (minimal version for options; full impl lives in sockudo crate)
1630#[derive(Debug, Clone, Serialize, Deserialize)]
1631#[serde(default)]
1632pub struct CleanupConfig {
1633    pub queue_buffer_size: usize,
1634    pub batch_size: usize,
1635    pub batch_timeout_ms: u64,
1636    pub worker_threads: WorkerThreadsConfig,
1637    pub max_retry_attempts: u32,
1638    pub async_enabled: bool,
1639    pub fallback_to_sync: bool,
1640}
1641
1642/// Worker threads configuration for the cleanup system
1643#[derive(Debug, Clone)]
1644pub enum WorkerThreadsConfig {
1645    Auto,
1646    Fixed(usize),
1647}
1648
1649impl serde::Serialize for WorkerThreadsConfig {
1650    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1651    where
1652        S: serde::Serializer,
1653    {
1654        match self {
1655            WorkerThreadsConfig::Auto => serializer.serialize_str("auto"),
1656            WorkerThreadsConfig::Fixed(n) => serializer.serialize_u64(*n as u64),
1657        }
1658    }
1659}
1660
1661impl<'de> serde::Deserialize<'de> for WorkerThreadsConfig {
1662    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1663    where
1664        D: serde::Deserializer<'de>,
1665    {
1666        use serde::de;
1667        struct WorkerThreadsVisitor;
1668        impl<'de> de::Visitor<'de> for WorkerThreadsVisitor {
1669            type Value = WorkerThreadsConfig;
1670
1671            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1672                formatter.write_str(r#""auto" or a positive integer"#)
1673            }
1674
1675            fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
1676                if value.eq_ignore_ascii_case("auto") {
1677                    Ok(WorkerThreadsConfig::Auto)
1678                } else if let Ok(n) = value.parse::<usize>() {
1679                    Ok(WorkerThreadsConfig::Fixed(n))
1680                } else {
1681                    Err(E::custom(format!(
1682                        "expected 'auto' or a number, got '{value}'"
1683                    )))
1684                }
1685            }
1686
1687            fn visit_u64<E: de::Error>(self, value: u64) -> Result<Self::Value, E> {
1688                Ok(WorkerThreadsConfig::Fixed(value as usize))
1689            }
1690
1691            fn visit_i64<E: de::Error>(self, value: i64) -> Result<Self::Value, E> {
1692                if value >= 0 {
1693                    Ok(WorkerThreadsConfig::Fixed(value as usize))
1694                } else {
1695                    Err(E::custom("worker_threads must be non-negative"))
1696                }
1697            }
1698        }
1699        deserializer.deserialize_any(WorkerThreadsVisitor)
1700    }
1701}
1702
1703impl Default for CleanupConfig {
1704    fn default() -> Self {
1705        Self {
1706            queue_buffer_size: 1024,
1707            batch_size: 64,
1708            batch_timeout_ms: 100,
1709            worker_threads: WorkerThreadsConfig::Auto,
1710            max_retry_attempts: 3,
1711            async_enabled: true,
1712            fallback_to_sync: true,
1713        }
1714    }
1715}
1716
1717impl CleanupConfig {
1718    pub fn validate(&self) -> Result<(), String> {
1719        if self.queue_buffer_size == 0 {
1720            return Err("queue_buffer_size must be greater than 0".to_string());
1721        }
1722        if self.batch_size == 0 {
1723            return Err("batch_size must be greater than 0".to_string());
1724        }
1725        if self.batch_timeout_ms == 0 {
1726            return Err("batch_timeout_ms must be greater than 0".to_string());
1727        }
1728        if let WorkerThreadsConfig::Fixed(n) = self.worker_threads
1729            && n == 0
1730        {
1731            return Err("worker_threads must be greater than 0 when using fixed count".to_string());
1732        }
1733        Ok(())
1734    }
1735}
1736
1737#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1738#[serde(default)]
1739pub struct TagFilteringConfig {
1740    #[serde(default)]
1741    pub enabled: bool,
1742    #[serde(default = "default_true")]
1743    pub enable_tags: bool,
1744}
1745
1746fn default_true() -> bool {
1747    true
1748}
1749
1750// --- Default Implementations ---
1751
1752impl Default for ServerOptions {
1753    fn default() -> Self {
1754        Self {
1755            adapter: AdapterConfig::default(),
1756            app_manager: AppManagerConfig::default(),
1757            cache: CacheConfig::default(),
1758            channel_limits: ChannelLimits::default(),
1759            cors: CorsConfig::default(),
1760            database: DatabaseConfig::default(),
1761            database_pooling: DatabasePooling::default(),
1762            debug: false,
1763            tag_filtering: TagFilteringConfig::default(),
1764            event_limits: EventLimits::default(),
1765            host: "0.0.0.0".to_string(),
1766            http_api: HttpApiConfig::default(),
1767            instance: InstanceConfig::default(),
1768            logging: None,
1769            metrics: MetricsConfig::default(),
1770            mode: "production".to_string(),
1771            port: 6001,
1772            path_prefix: "/".to_string(),
1773            presence: PresenceConfig::default(),
1774            queue: QueueConfig::default(),
1775            rate_limiter: RateLimiterConfig::default(),
1776            shutdown_grace_period: 10,
1777            ssl: SslConfig::default(),
1778            user_authentication_timeout: 3600,
1779            webhooks: WebhooksConfig::default(),
1780            websocket_max_payload_kb: 64,
1781            cleanup: CleanupConfig::default(),
1782            activity_timeout: 120,
1783            cluster_health: ClusterHealthConfig::default(),
1784            unix_socket: UnixSocketConfig::default(),
1785            delta_compression: DeltaCompressionOptionsConfig::default(),
1786            websocket: WebSocketConfig::default(),
1787            connection_recovery: ConnectionRecoveryConfig::default(),
1788            history: HistoryConfig::default(),
1789            presence_history: PresenceHistoryConfig::default(),
1790            idempotency: IdempotencyConfig::default(),
1791            ephemeral: EphemeralConfig::default(),
1792            echo_control: EchoControlConfig::default(),
1793            event_name_filtering: EventNameFilteringConfig::default(),
1794            versioned_messages: VersionedMessagesConfig::default(),
1795        }
1796    }
1797}
1798
1799impl Default for SqsQueueConfig {
1800    fn default() -> Self {
1801        Self {
1802            region: "us-east-1".to_string(),
1803            queue_url_prefix: None,
1804            visibility_timeout: 30,
1805            endpoint_url: None,
1806            max_messages: 10,
1807            wait_time_seconds: 5,
1808            concurrency: 5,
1809            fifo: false,
1810            message_group_id: Some("default".to_string()),
1811        }
1812    }
1813}
1814
1815impl Default for SnsQueueConfig {
1816    fn default() -> Self {
1817        Self {
1818            region: "us-east-1".to_string(),
1819            topic_arn: String::new(),
1820            endpoint_url: None,
1821        }
1822    }
1823}
1824
1825impl Default for RedisAdapterConfig {
1826    fn default() -> Self {
1827        Self {
1828            requests_timeout: 5000,
1829            prefix: "sockudo_adapter:".to_string(),
1830            redis_pub_options: AHashMap::new(),
1831            redis_sub_options: AHashMap::new(),
1832            cluster_mode: false,
1833        }
1834    }
1835}
1836
1837impl Default for RedisClusterAdapterConfig {
1838    fn default() -> Self {
1839        Self {
1840            nodes: vec![],
1841            prefix: "sockudo_adapter:".to_string(),
1842            request_timeout_ms: 1000,
1843            use_connection_manager: true,
1844            use_sharded_pubsub: false,
1845        }
1846    }
1847}
1848
1849impl Default for NatsAdapterConfig {
1850    fn default() -> Self {
1851        Self {
1852            servers: vec!["nats://localhost:4222".to_string()],
1853            prefix: "sockudo_adapter:".to_string(),
1854            request_timeout_ms: 5000,
1855            username: None,
1856            password: None,
1857            token: None,
1858            connection_timeout_ms: 5000,
1859            nodes_number: None,
1860            discovery_max_wait_ms: 1000,
1861            discovery_idle_wait_ms: 150,
1862        }
1863    }
1864}
1865
1866impl Default for PulsarAdapterConfig {
1867    fn default() -> Self {
1868        Self {
1869            url: "pulsar://127.0.0.1:6650".to_string(),
1870            prefix: "sockudo-adapter".to_string(),
1871            request_timeout_ms: 5000,
1872            token: None,
1873            nodes_number: None,
1874        }
1875    }
1876}
1877
1878impl Default for RabbitMqAdapterConfig {
1879    fn default() -> Self {
1880        Self {
1881            url: "amqp://guest:guest@127.0.0.1:5672/%2f".to_string(),
1882            prefix: "sockudo_adapter".to_string(),
1883            request_timeout_ms: 5000,
1884            connection_timeout_ms: 5000,
1885            nodes_number: None,
1886        }
1887    }
1888}
1889
1890impl Default for GooglePubSubAdapterConfig {
1891    fn default() -> Self {
1892        Self {
1893            project_id: "".to_string(),
1894            prefix: "sockudo-adapter".to_string(),
1895            request_timeout_ms: 5000,
1896            emulator_host: None,
1897            nodes_number: None,
1898        }
1899    }
1900}
1901
1902impl Default for KafkaAdapterConfig {
1903    fn default() -> Self {
1904        Self {
1905            brokers: vec!["localhost:9092".to_string()],
1906            prefix: "sockudo_adapter".to_string(),
1907            request_timeout_ms: 5000,
1908            security_protocol: None,
1909            sasl_mechanism: None,
1910            sasl_username: None,
1911            sasl_password: None,
1912            nodes_number: None,
1913        }
1914    }
1915}
1916
1917impl Default for CacheSettings {
1918    fn default() -> Self {
1919        Self {
1920            enabled: true,
1921            ttl: 300,
1922        }
1923    }
1924}
1925
1926impl Default for MemoryCacheOptions {
1927    fn default() -> Self {
1928        Self {
1929            ttl: 300,
1930            cleanup_interval: 60,
1931            max_capacity: 10000,
1932        }
1933    }
1934}
1935
1936impl Default for CacheConfig {
1937    fn default() -> Self {
1938        Self {
1939            driver: CacheDriver::default(),
1940            redis: RedisConfig {
1941                prefix: Some("sockudo_cache:".to_string()),
1942                url_override: None,
1943                cluster_mode: false,
1944            },
1945            memory: MemoryCacheOptions::default(),
1946        }
1947    }
1948}
1949
1950impl Default for ChannelLimits {
1951    fn default() -> Self {
1952        Self {
1953            max_name_length: 200,
1954            cache_ttl: 3600,
1955        }
1956    }
1957}
1958impl Default for CorsConfig {
1959    fn default() -> Self {
1960        Self {
1961            credentials: true,
1962            origin: vec!["*".to_string()],
1963            methods: vec!["GET".to_string(), "POST".to_string(), "OPTIONS".to_string()],
1964            allowed_headers: vec![
1965                "Authorization".to_string(),
1966                "Content-Type".to_string(),
1967                "X-Requested-With".to_string(),
1968                "Accept".to_string(),
1969            ],
1970        }
1971    }
1972}
1973
1974impl Default for DatabaseConnection {
1975    fn default() -> Self {
1976        Self {
1977            host: "localhost".to_string(),
1978            port: 3306,
1979            username: "root".to_string(),
1980            password: "".to_string(),
1981            database: "sockudo".to_string(),
1982            table_name: "applications".to_string(),
1983            connection_pool_size: 10,
1984            pool_min: None,
1985            pool_max: None,
1986            cache_ttl: 300,
1987            cache_cleanup_interval: 60,
1988            cache_max_capacity: 100,
1989        }
1990    }
1991}
1992
1993impl Default for RedisConnection {
1994    fn default() -> Self {
1995        Self {
1996            host: "127.0.0.1".to_string(),
1997            port: 6379,
1998            db: 0,
1999            username: None,
2000            password: None,
2001            key_prefix: "sockudo:".to_string(),
2002            sentinels: Vec::new(),
2003            sentinel_password: None,
2004            name: "mymaster".to_string(),
2005            cluster: RedisClusterConnection::default(),
2006            cluster_nodes: Vec::new(),
2007        }
2008    }
2009}
2010
2011impl Default for RedisSentinel {
2012    fn default() -> Self {
2013        Self {
2014            host: "localhost".to_string(),
2015            port: 26379,
2016        }
2017    }
2018}
2019
2020impl Default for ClusterNode {
2021    fn default() -> Self {
2022        Self {
2023            host: "127.0.0.1".to_string(),
2024            port: 7000,
2025        }
2026    }
2027}
2028
2029impl Default for DatabasePooling {
2030    fn default() -> Self {
2031        Self {
2032            enabled: true,
2033            min: 2,
2034            max: 10,
2035        }
2036    }
2037}
2038
2039impl Default for EventLimits {
2040    fn default() -> Self {
2041        Self {
2042            max_channels_at_once: 100,
2043            max_name_length: 200,
2044            max_payload_in_kb: 100,
2045            max_batch_size: 10,
2046        }
2047    }
2048}
2049
2050impl Default for IdempotencyConfig {
2051    fn default() -> Self {
2052        Self {
2053            enabled: true,
2054            ttl_seconds: 120,
2055            max_key_length: 128,
2056        }
2057    }
2058}
2059
2060impl Default for EphemeralConfig {
2061    fn default() -> Self {
2062        Self { enabled: true }
2063    }
2064}
2065
2066impl Default for EchoControlConfig {
2067    fn default() -> Self {
2068        Self {
2069            enabled: true,
2070            default_echo_messages: true,
2071        }
2072    }
2073}
2074
2075impl Default for EventNameFilteringConfig {
2076    fn default() -> Self {
2077        Self {
2078            enabled: true,
2079            max_events_per_filter: 50,
2080            max_event_name_length: 200,
2081        }
2082    }
2083}
2084
2085impl Default for ConnectionRecoveryConfig {
2086    fn default() -> Self {
2087        Self {
2088            enabled: false,
2089            buffer_ttl_seconds: 120,
2090            max_buffer_size: 100,
2091        }
2092    }
2093}
2094
2095impl Default for HttpApiConfig {
2096    fn default() -> Self {
2097        Self {
2098            request_limit_in_mb: 10,
2099            accept_traffic: AcceptTraffic::default(),
2100            usage_enabled: true,
2101        }
2102    }
2103}
2104
2105impl Default for AcceptTraffic {
2106    fn default() -> Self {
2107        Self {
2108            memory_threshold: 0.90,
2109        }
2110    }
2111}
2112
2113impl Default for InstanceConfig {
2114    fn default() -> Self {
2115        Self {
2116            process_id: uuid::Uuid::new_v4().to_string(),
2117        }
2118    }
2119}
2120
2121impl Default for LoggingConfig {
2122    fn default() -> Self {
2123        Self {
2124            colors_enabled: true,
2125            include_target: true,
2126        }
2127    }
2128}
2129
2130impl Default for MetricsConfig {
2131    fn default() -> Self {
2132        Self {
2133            enabled: true,
2134            driver: MetricsDriver::default(),
2135            host: "0.0.0.0".to_string(),
2136            prometheus: PrometheusConfig::default(),
2137            port: 9601,
2138        }
2139    }
2140}
2141
2142impl Default for PrometheusConfig {
2143    fn default() -> Self {
2144        Self {
2145            prefix: "sockudo_".to_string(),
2146        }
2147    }
2148}
2149
2150impl Default for PresenceConfig {
2151    fn default() -> Self {
2152        Self {
2153            max_members_per_channel: 100,
2154            max_member_size_in_kb: 2,
2155        }
2156    }
2157}
2158
2159impl Default for RedisQueueConfig {
2160    fn default() -> Self {
2161        Self {
2162            concurrency: 5,
2163            prefix: Some("sockudo_queue:".to_string()),
2164            url_override: None,
2165            cluster_mode: false,
2166        }
2167    }
2168}
2169
2170impl Default for RateLimit {
2171    fn default() -> Self {
2172        Self {
2173            max_requests: 60,
2174            window_seconds: 60,
2175            identifier: Some("default".to_string()),
2176            trust_hops: Some(0),
2177        }
2178    }
2179}
2180
2181impl Default for RateLimiterConfig {
2182    fn default() -> Self {
2183        Self {
2184            enabled: true,
2185            driver: CacheDriver::Memory,
2186            api_rate_limit: RateLimit {
2187                max_requests: 100,
2188                window_seconds: 60,
2189                identifier: Some("api".to_string()),
2190                trust_hops: Some(0),
2191            },
2192            websocket_rate_limit: RateLimit {
2193                max_requests: 20,
2194                window_seconds: 60,
2195                identifier: Some("websocket_connect".to_string()),
2196                trust_hops: Some(0),
2197            },
2198            redis: RedisConfig {
2199                prefix: Some("sockudo_rl:".to_string()),
2200                url_override: None,
2201                cluster_mode: false,
2202            },
2203        }
2204    }
2205}
2206
2207impl Default for SslConfig {
2208    fn default() -> Self {
2209        Self {
2210            enabled: false,
2211            cert_path: "".to_string(),
2212            key_path: "".to_string(),
2213            passphrase: None,
2214            ca_path: None,
2215            redirect_http: false,
2216            http_port: Some(80),
2217        }
2218    }
2219}
2220
2221impl Default for BatchingConfig {
2222    fn default() -> Self {
2223        Self {
2224            enabled: true,
2225            duration: 50,
2226            size: 100,
2227        }
2228    }
2229}
2230
2231impl Default for ClusterHealthConfig {
2232    fn default() -> Self {
2233        Self {
2234            enabled: true,
2235            heartbeat_interval_ms: 10000,
2236            node_timeout_ms: 30000,
2237            cleanup_interval_ms: 10000,
2238        }
2239    }
2240}
2241
2242impl Default for UnixSocketConfig {
2243    fn default() -> Self {
2244        Self {
2245            enabled: false,
2246            path: "/var/run/sockudo/sockudo.sock".to_string(),
2247            permission_mode: 0o660,
2248        }
2249    }
2250}
2251
2252impl Default for DeltaCompressionOptionsConfig {
2253    fn default() -> Self {
2254        Self {
2255            enabled: true,
2256            algorithm: "fossil".to_string(),
2257            full_message_interval: 10,
2258            min_message_size: 100,
2259            max_state_age_secs: 300,
2260            max_channel_states_per_socket: 100,
2261            max_conflation_states_per_channel: Some(100),
2262            conflation_key_path: None,
2263            cluster_coordination: false,
2264            coordination_backend: DeltaCoordinationBackend::Auto,
2265            omit_delta_algorithm: false,
2266        }
2267    }
2268}
2269
2270#[cfg(test)]
2271mod tests {
2272    use super::{DeltaCoordinationBackend, QueueDriver, ServerOptions, VersionStoreDriver};
2273    use std::str::FromStr;
2274
2275    #[test]
2276    fn queue_driver_parses_broker_backends() {
2277        assert_eq!(
2278            QueueDriver::from_str("rabbitmq").unwrap(),
2279            QueueDriver::RabbitMq
2280        );
2281        assert_eq!(QueueDriver::from_str("kafka").unwrap(), QueueDriver::Kafka);
2282        assert_eq!(
2283            QueueDriver::from_str("pulsar").unwrap(),
2284            QueueDriver::Pulsar
2285        );
2286        assert_eq!(
2287            QueueDriver::from_str("google-pubsub").unwrap(),
2288            QueueDriver::GooglePubSub
2289        );
2290    }
2291
2292    #[test]
2293    fn delta_coordination_backend_parses_expected_values() {
2294        assert_eq!(
2295            DeltaCoordinationBackend::from_str("auto").unwrap(),
2296            DeltaCoordinationBackend::Auto
2297        );
2298        assert_eq!(
2299            DeltaCoordinationBackend::from_str("redis-cluster").unwrap(),
2300            DeltaCoordinationBackend::RedisCluster
2301        );
2302        assert_eq!(
2303            DeltaCoordinationBackend::from_str("nats").unwrap(),
2304            DeltaCoordinationBackend::Nats
2305        );
2306    }
2307
2308    #[tokio::test]
2309    async fn versioned_messages_driver_overrides_from_env() {
2310        let previous = std::env::var("VERSIONED_MESSAGES_DRIVER").ok();
2311        // SAFETY: This test controls the environment variable lifecycle for a
2312        // single key and restores the prior value before it returns.
2313        unsafe { std::env::set_var("VERSIONED_MESSAGES_DRIVER", "postgres") };
2314
2315        let mut options = ServerOptions::default();
2316        options.override_from_env().await.unwrap();
2317
2318        if let Some(previous) = previous {
2319            // SAFETY: Restoring the pre-test value for the same key.
2320            unsafe { std::env::set_var("VERSIONED_MESSAGES_DRIVER", previous) };
2321        } else {
2322            // SAFETY: Removing the test-only environment variable before exit.
2323            unsafe { std::env::remove_var("VERSIONED_MESSAGES_DRIVER") };
2324        }
2325
2326        assert_eq!(
2327            options.versioned_messages.driver,
2328            VersionStoreDriver::Postgres
2329        );
2330    }
2331}
2332
2333impl ClusterHealthConfig {
2334    pub fn validate(&self) -> Result<(), String> {
2335        if self.heartbeat_interval_ms == 0 {
2336            return Err("heartbeat_interval_ms must be greater than 0".to_string());
2337        }
2338        if self.node_timeout_ms == 0 {
2339            return Err("node_timeout_ms must be greater than 0".to_string());
2340        }
2341        if self.cleanup_interval_ms == 0 {
2342            return Err("cleanup_interval_ms must be greater than 0".to_string());
2343        }
2344
2345        if self.heartbeat_interval_ms > self.node_timeout_ms / 3 {
2346            return Err(format!(
2347                "heartbeat_interval_ms ({}) should be at least 3x smaller than node_timeout_ms ({}) to avoid false positive dead node detection. Recommended: heartbeat_interval_ms <= {}",
2348                self.heartbeat_interval_ms,
2349                self.node_timeout_ms,
2350                self.node_timeout_ms / 3
2351            ));
2352        }
2353
2354        if self.cleanup_interval_ms > self.node_timeout_ms {
2355            return Err(format!(
2356                "cleanup_interval_ms ({}) should not be larger than node_timeout_ms ({}) to ensure timely dead node detection",
2357                self.cleanup_interval_ms, self.node_timeout_ms
2358            ));
2359        }
2360
2361        Ok(())
2362    }
2363}
2364
2365impl ServerOptions {
2366    pub async fn load_from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
2367        let content = tokio::fs::read_to_string(path).await?;
2368        let options: Self = if path.ends_with(".toml") {
2369            toml::from_str(&content)?
2370        } else {
2371            // Legacy JSON support
2372            sonic_rs::from_str(&content)?
2373        };
2374        Ok(options)
2375    }
2376
2377    pub async fn override_from_env(&mut self) -> Result<(), Box<dyn std::error::Error>> {
2378        // --- General Settings ---
2379        if let Ok(mode) = std::env::var("ENVIRONMENT") {
2380            self.mode = mode;
2381        }
2382        self.debug = parse_bool_env("DEBUG_MODE", self.debug);
2383        if parse_bool_env("DEBUG", false) {
2384            self.debug = true;
2385            info!("DEBUG environment variable forces debug mode ON");
2386        }
2387
2388        self.activity_timeout = parse_env::<u64>("ACTIVITY_TIMEOUT", self.activity_timeout);
2389
2390        if let Ok(host) = std::env::var("HOST") {
2391            self.host = host;
2392        }
2393        self.port = parse_env::<u16>("PORT", self.port);
2394        self.shutdown_grace_period =
2395            parse_env::<u64>("SHUTDOWN_GRACE_PERIOD", self.shutdown_grace_period);
2396        self.user_authentication_timeout = parse_env::<u64>(
2397            "USER_AUTHENTICATION_TIMEOUT",
2398            self.user_authentication_timeout,
2399        );
2400        self.websocket_max_payload_kb =
2401            parse_env::<u32>("WEBSOCKET_MAX_PAYLOAD_KB", self.websocket_max_payload_kb);
2402        if let Ok(id) = std::env::var("INSTANCE_PROCESS_ID") {
2403            self.instance.process_id = id;
2404        }
2405
2406        // --- Driver Configuration ---
2407        if let Ok(driver_str) = std::env::var("ADAPTER_DRIVER") {
2408            self.adapter.driver =
2409                parse_driver_enum(driver_str, self.adapter.driver.clone(), "Adapter");
2410        }
2411        self.adapter.buffer_multiplier_per_cpu = parse_env::<usize>(
2412            "ADAPTER_BUFFER_MULTIPLIER_PER_CPU",
2413            self.adapter.buffer_multiplier_per_cpu,
2414        );
2415        self.adapter.enable_socket_counting = parse_env::<bool>(
2416            "ADAPTER_ENABLE_SOCKET_COUNTING",
2417            self.adapter.enable_socket_counting,
2418        );
2419        self.adapter.fallback_to_local =
2420            parse_env::<bool>("ADAPTER_FALLBACK_TO_LOCAL", self.adapter.fallback_to_local);
2421        if let Ok(driver_str) = std::env::var("CACHE_DRIVER") {
2422            self.cache.driver = parse_driver_enum(driver_str, self.cache.driver.clone(), "Cache");
2423        }
2424        if let Ok(driver_str) = std::env::var("QUEUE_DRIVER") {
2425            self.queue.driver = parse_driver_enum(driver_str, self.queue.driver.clone(), "Queue");
2426        }
2427        if let Ok(driver_str) = std::env::var("APP_MANAGER_DRIVER") {
2428            self.app_manager.driver =
2429                parse_driver_enum(driver_str, self.app_manager.driver.clone(), "AppManager");
2430        }
2431        if let Ok(driver_str) = std::env::var("RATE_LIMITER_DRIVER") {
2432            self.rate_limiter.driver = parse_driver_enum(
2433                driver_str,
2434                self.rate_limiter.driver.clone(),
2435                "RateLimiter Backend",
2436            );
2437        }
2438
2439        // --- Database: Redis ---
2440        if let Ok(host) = std::env::var("DATABASE_REDIS_HOST") {
2441            self.database.redis.host = host;
2442        }
2443        self.database.redis.port =
2444            parse_env::<u16>("DATABASE_REDIS_PORT", self.database.redis.port);
2445        if let Ok(username) = std::env::var("DATABASE_REDIS_USERNAME") {
2446            self.database.redis.username = if username.is_empty() {
2447                None
2448            } else {
2449                Some(username)
2450            };
2451        }
2452        if let Ok(password) = std::env::var("DATABASE_REDIS_PASSWORD") {
2453            self.database.redis.password = Some(password);
2454        }
2455        self.database.redis.db = parse_env::<u32>("DATABASE_REDIS_DB", self.database.redis.db);
2456        if let Ok(prefix) = std::env::var("DATABASE_REDIS_KEY_PREFIX") {
2457            self.database.redis.key_prefix = prefix;
2458        }
2459        if let Ok(cluster_username) = std::env::var("DATABASE_REDIS_CLUSTER_USERNAME") {
2460            self.database.redis.cluster.username = if cluster_username.is_empty() {
2461                None
2462            } else {
2463                Some(cluster_username)
2464            };
2465        }
2466        if let Ok(cluster_password) = std::env::var("DATABASE_REDIS_CLUSTER_PASSWORD") {
2467            self.database.redis.cluster.password = Some(cluster_password);
2468        }
2469        self.database.redis.cluster.use_tls = parse_bool_env(
2470            "DATABASE_REDIS_CLUSTER_USE_TLS",
2471            self.database.redis.cluster.use_tls,
2472        );
2473
2474        // --- Database: MySQL ---
2475        if let Ok(host) = std::env::var("DATABASE_MYSQL_HOST") {
2476            self.database.mysql.host = host;
2477        }
2478        self.database.mysql.port =
2479            parse_env::<u16>("DATABASE_MYSQL_PORT", self.database.mysql.port);
2480        if let Ok(user) = std::env::var("DATABASE_MYSQL_USERNAME") {
2481            self.database.mysql.username = user;
2482        }
2483        if let Ok(pass) = std::env::var("DATABASE_MYSQL_PASSWORD") {
2484            self.database.mysql.password = pass;
2485        }
2486        if let Ok(db) = std::env::var("DATABASE_MYSQL_DATABASE") {
2487            self.database.mysql.database = db;
2488        }
2489        if let Ok(table) = std::env::var("DATABASE_MYSQL_TABLE_NAME") {
2490            self.database.mysql.table_name = table;
2491        }
2492        override_db_pool_settings(&mut self.database.mysql, "DATABASE_MYSQL");
2493
2494        // --- Database: PostgreSQL ---
2495        if let Ok(host) = std::env::var("DATABASE_POSTGRES_HOST") {
2496            self.database.postgres.host = host;
2497        }
2498        self.database.postgres.port =
2499            parse_env::<u16>("DATABASE_POSTGRES_PORT", self.database.postgres.port);
2500        if let Ok(user) = std::env::var("DATABASE_POSTGRES_USERNAME") {
2501            self.database.postgres.username = user;
2502        }
2503        if let Ok(pass) = std::env::var("DATABASE_POSTGRES_PASSWORD") {
2504            self.database.postgres.password = pass;
2505        }
2506        if let Ok(db) = std::env::var("DATABASE_POSTGRES_DATABASE") {
2507            self.database.postgres.database = db;
2508        }
2509        override_db_pool_settings(&mut self.database.postgres, "DATABASE_POSTGRES");
2510
2511        // --- Database: DynamoDB ---
2512        if let Ok(region) = std::env::var("DATABASE_DYNAMODB_REGION") {
2513            self.database.dynamodb.region = region;
2514        }
2515        if let Ok(table) = std::env::var("DATABASE_DYNAMODB_TABLE_NAME") {
2516            self.database.dynamodb.table_name = table;
2517        }
2518        if let Ok(endpoint) = std::env::var("DATABASE_DYNAMODB_ENDPOINT_URL") {
2519            self.database.dynamodb.endpoint_url = Some(endpoint);
2520        }
2521        if let Ok(key_id) = std::env::var("AWS_ACCESS_KEY_ID") {
2522            self.database.dynamodb.aws_access_key_id = Some(key_id);
2523        }
2524        if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") {
2525            self.database.dynamodb.aws_secret_access_key = Some(secret);
2526        }
2527
2528        // --- Database: SurrealDB ---
2529        if let Ok(url) = std::env::var("DATABASE_SURREALDB_URL") {
2530            self.database.surrealdb.url = url;
2531        }
2532        if let Ok(namespace) = std::env::var("DATABASE_SURREALDB_NAMESPACE") {
2533            self.database.surrealdb.namespace = namespace;
2534        }
2535        if let Ok(database) = std::env::var("DATABASE_SURREALDB_DATABASE") {
2536            self.database.surrealdb.database = database;
2537        }
2538        if let Ok(username) = std::env::var("DATABASE_SURREALDB_USERNAME") {
2539            self.database.surrealdb.username = username;
2540        }
2541        if let Ok(password) = std::env::var("DATABASE_SURREALDB_PASSWORD") {
2542            self.database.surrealdb.password = password;
2543        }
2544        if let Ok(table) = std::env::var("DATABASE_SURREALDB_TABLE_NAME") {
2545            self.database.surrealdb.table_name = table;
2546        }
2547
2548        // --- Redis Cluster ---
2549        let apply_redis_cluster_nodes = |options: &mut Self, nodes: &str| {
2550            let node_list: Vec<String> = nodes
2551                .split(',')
2552                .map(|s| s.trim())
2553                .filter(|s| !s.is_empty())
2554                .map(ToString::to_string)
2555                .collect();
2556
2557            options.adapter.cluster.nodes = node_list.clone();
2558            options.queue.redis_cluster.nodes = node_list.clone();
2559
2560            let parsed_nodes: Vec<ClusterNode> = node_list
2561                .iter()
2562                .filter_map(|seed| ClusterNode::from_seed(seed))
2563                .collect();
2564            options.database.redis.cluster.nodes = parsed_nodes.clone();
2565            options.database.redis.cluster_nodes = parsed_nodes;
2566        };
2567
2568        if let Ok(nodes) = std::env::var("REDIS_CLUSTER_NODES") {
2569            apply_redis_cluster_nodes(self, &nodes);
2570        }
2571        if let Ok(nodes) = std::env::var("DATABASE_REDIS_CLUSTER_NODES") {
2572            apply_redis_cluster_nodes(self, &nodes);
2573        }
2574        self.queue.redis_cluster.concurrency = parse_env::<u32>(
2575            "REDIS_CLUSTER_QUEUE_CONCURRENCY",
2576            self.queue.redis_cluster.concurrency,
2577        );
2578        if let Ok(prefix) = std::env::var("REDIS_CLUSTER_QUEUE_PREFIX") {
2579            self.queue.redis_cluster.prefix = Some(prefix);
2580        }
2581
2582        // --- SSL Configuration ---
2583        self.ssl.enabled = parse_bool_env("SSL_ENABLED", self.ssl.enabled);
2584        if let Ok(val) = std::env::var("SSL_CERT_PATH") {
2585            self.ssl.cert_path = val;
2586        }
2587        if let Ok(val) = std::env::var("SSL_KEY_PATH") {
2588            self.ssl.key_path = val;
2589        }
2590        self.ssl.redirect_http = parse_bool_env("SSL_REDIRECT_HTTP", self.ssl.redirect_http);
2591        if let Some(port) = parse_env_optional::<u16>("SSL_HTTP_PORT") {
2592            self.ssl.http_port = Some(port);
2593        }
2594
2595        // --- Unix Socket Configuration ---
2596        self.unix_socket.enabled = parse_bool_env("UNIX_SOCKET_ENABLED", self.unix_socket.enabled);
2597        if let Ok(path) = std::env::var("UNIX_SOCKET_PATH") {
2598            self.unix_socket.path = path;
2599        }
2600        if let Ok(mode_str) = std::env::var("UNIX_SOCKET_PERMISSION_MODE") {
2601            if mode_str.chars().all(|c| c.is_digit(8)) {
2602                if let Ok(mode) = u32::from_str_radix(&mode_str, 8) {
2603                    if mode <= 0o777 {
2604                        self.unix_socket.permission_mode = mode;
2605                    } else {
2606                        warn!(
2607                            "UNIX_SOCKET_PERMISSION_MODE '{}' exceeds maximum value 777. Using default: {:o}",
2608                            mode_str, self.unix_socket.permission_mode
2609                        );
2610                    }
2611                } else {
2612                    warn!(
2613                        "Failed to parse UNIX_SOCKET_PERMISSION_MODE '{}' as octal. Using default: {:o}",
2614                        mode_str, self.unix_socket.permission_mode
2615                    );
2616                }
2617            } else {
2618                warn!(
2619                    "UNIX_SOCKET_PERMISSION_MODE '{}' must contain only octal digits (0-7). Using default: {:o}",
2620                    mode_str, self.unix_socket.permission_mode
2621                );
2622            }
2623        }
2624
2625        // --- Metrics ---
2626        if let Ok(driver_str) = std::env::var("METRICS_DRIVER") {
2627            self.metrics.driver =
2628                parse_driver_enum(driver_str, self.metrics.driver.clone(), "Metrics");
2629        }
2630        self.metrics.enabled = parse_bool_env("METRICS_ENABLED", self.metrics.enabled);
2631        if let Ok(val) = std::env::var("METRICS_HOST") {
2632            self.metrics.host = val;
2633        }
2634        self.metrics.port = parse_env::<u16>("METRICS_PORT", self.metrics.port);
2635        if let Ok(val) = std::env::var("METRICS_PROMETHEUS_PREFIX") {
2636            self.metrics.prometheus.prefix = val;
2637        }
2638
2639        // --- HTTP API ---
2640        self.http_api.usage_enabled =
2641            parse_bool_env("HTTP_API_USAGE_ENABLED", self.http_api.usage_enabled);
2642
2643        // --- Rate Limiter ---
2644        self.rate_limiter.enabled =
2645            parse_bool_env("RATE_LIMITER_ENABLED", self.rate_limiter.enabled);
2646        self.rate_limiter.api_rate_limit.max_requests = parse_env::<u32>(
2647            "RATE_LIMITER_API_MAX_REQUESTS",
2648            self.rate_limiter.api_rate_limit.max_requests,
2649        );
2650        self.rate_limiter.api_rate_limit.window_seconds = parse_env::<u64>(
2651            "RATE_LIMITER_API_WINDOW_SECONDS",
2652            self.rate_limiter.api_rate_limit.window_seconds,
2653        );
2654        if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_API_TRUST_HOPS") {
2655            self.rate_limiter.api_rate_limit.trust_hops = Some(hops);
2656        }
2657        self.rate_limiter.websocket_rate_limit.max_requests = parse_env::<u32>(
2658            "RATE_LIMITER_WS_MAX_REQUESTS",
2659            self.rate_limiter.websocket_rate_limit.max_requests,
2660        );
2661        self.rate_limiter.websocket_rate_limit.window_seconds = parse_env::<u64>(
2662            "RATE_LIMITER_WS_WINDOW_SECONDS",
2663            self.rate_limiter.websocket_rate_limit.window_seconds,
2664        );
2665        if let Ok(prefix) = std::env::var("RATE_LIMITER_REDIS_PREFIX") {
2666            self.rate_limiter.redis.prefix = Some(prefix);
2667        }
2668
2669        // --- Queue: Redis ---
2670        self.queue.redis.concurrency =
2671            parse_env::<u32>("QUEUE_REDIS_CONCURRENCY", self.queue.redis.concurrency);
2672        if let Ok(prefix) = std::env::var("QUEUE_REDIS_PREFIX") {
2673            self.queue.redis.prefix = Some(prefix);
2674        }
2675
2676        // --- Queue: SQS ---
2677        if let Ok(region) = std::env::var("QUEUE_SQS_REGION") {
2678            self.queue.sqs.region = region;
2679        }
2680        self.queue.sqs.visibility_timeout = parse_env::<i32>(
2681            "QUEUE_SQS_VISIBILITY_TIMEOUT",
2682            self.queue.sqs.visibility_timeout,
2683        );
2684        self.queue.sqs.max_messages =
2685            parse_env::<i32>("QUEUE_SQS_MAX_MESSAGES", self.queue.sqs.max_messages);
2686        self.queue.sqs.wait_time_seconds = parse_env::<i32>(
2687            "QUEUE_SQS_WAIT_TIME_SECONDS",
2688            self.queue.sqs.wait_time_seconds,
2689        );
2690        self.queue.sqs.concurrency =
2691            parse_env::<u32>("QUEUE_SQS_CONCURRENCY", self.queue.sqs.concurrency);
2692        self.queue.sqs.fifo = parse_bool_env("QUEUE_SQS_FIFO", self.queue.sqs.fifo);
2693        if let Ok(endpoint) = std::env::var("QUEUE_SQS_ENDPOINT_URL") {
2694            self.queue.sqs.endpoint_url = Some(endpoint);
2695        }
2696
2697        // --- Queue: SNS ---
2698        if let Ok(region) = std::env::var("QUEUE_SNS_REGION") {
2699            self.queue.sns.region = region;
2700        }
2701        if let Ok(topic_arn) = std::env::var("QUEUE_SNS_TOPIC_ARN") {
2702            self.queue.sns.topic_arn = topic_arn;
2703        }
2704        if let Ok(endpoint) = std::env::var("QUEUE_SNS_ENDPOINT_URL") {
2705            self.queue.sns.endpoint_url = Some(endpoint);
2706        }
2707
2708        // --- Webhooks ---
2709        self.webhooks.batching.enabled =
2710            parse_bool_env("WEBHOOK_BATCHING_ENABLED", self.webhooks.batching.enabled);
2711        self.webhooks.batching.duration =
2712            parse_env::<u64>("WEBHOOK_BATCHING_DURATION", self.webhooks.batching.duration);
2713        self.webhooks.batching.size =
2714            parse_env::<usize>("WEBHOOK_BATCHING_SIZE", self.webhooks.batching.size);
2715
2716        // --- NATS Adapter ---
2717        if let Ok(servers) = std::env::var("NATS_SERVERS") {
2718            self.adapter.nats.servers = servers.split(',').map(|s| s.trim().to_string()).collect();
2719        }
2720        if let Ok(user) = std::env::var("NATS_USERNAME") {
2721            self.adapter.nats.username = Some(user);
2722        }
2723        if let Ok(pass) = std::env::var("NATS_PASSWORD") {
2724            self.adapter.nats.password = Some(pass);
2725        }
2726        if let Ok(token) = std::env::var("NATS_TOKEN") {
2727            self.adapter.nats.token = Some(token);
2728        }
2729        if let Ok(prefix) = std::env::var("NATS_PREFIX") {
2730            self.adapter.nats.prefix = prefix;
2731        }
2732        self.adapter.nats.connection_timeout_ms = parse_env::<u64>(
2733            "NATS_CONNECTION_TIMEOUT_MS",
2734            self.adapter.nats.connection_timeout_ms,
2735        );
2736        self.adapter.nats.request_timeout_ms = parse_env::<u64>(
2737            "NATS_REQUEST_TIMEOUT_MS",
2738            self.adapter.nats.request_timeout_ms,
2739        );
2740        self.adapter.nats.discovery_max_wait_ms = parse_env::<u64>(
2741            "NATS_DISCOVERY_MAX_WAIT_MS",
2742            self.adapter.nats.discovery_max_wait_ms,
2743        );
2744        self.adapter.nats.discovery_idle_wait_ms = parse_env::<u64>(
2745            "NATS_DISCOVERY_IDLE_WAIT_MS",
2746            self.adapter.nats.discovery_idle_wait_ms,
2747        );
2748        if let Some(nodes) = parse_env_optional::<u32>("NATS_NODES_NUMBER") {
2749            self.adapter.nats.nodes_number = Some(nodes);
2750        }
2751
2752        // --- Pulsar Adapter ---
2753        if let Ok(url) = std::env::var("PULSAR_URL") {
2754            self.adapter.pulsar.url = url;
2755        }
2756        if let Ok(prefix) = std::env::var("PULSAR_PREFIX") {
2757            self.adapter.pulsar.prefix = prefix;
2758        }
2759        if let Ok(token) = std::env::var("PULSAR_TOKEN") {
2760            self.adapter.pulsar.token = Some(token);
2761        }
2762        self.adapter.pulsar.request_timeout_ms = parse_env::<u64>(
2763            "PULSAR_REQUEST_TIMEOUT_MS",
2764            self.adapter.pulsar.request_timeout_ms,
2765        );
2766        if let Some(nodes) = parse_env_optional::<u32>("PULSAR_NODES_NUMBER") {
2767            self.adapter.pulsar.nodes_number = Some(nodes);
2768        }
2769
2770        // --- RabbitMQ Adapter ---
2771        if let Ok(url) = std::env::var("RABBITMQ_URL") {
2772            self.adapter.rabbitmq.url = url;
2773        }
2774        if let Ok(prefix) = std::env::var("RABBITMQ_PREFIX") {
2775            self.adapter.rabbitmq.prefix = prefix;
2776        }
2777        self.adapter.rabbitmq.connection_timeout_ms = parse_env::<u64>(
2778            "RABBITMQ_CONNECTION_TIMEOUT_MS",
2779            self.adapter.rabbitmq.connection_timeout_ms,
2780        );
2781        self.adapter.rabbitmq.request_timeout_ms = parse_env::<u64>(
2782            "RABBITMQ_REQUEST_TIMEOUT_MS",
2783            self.adapter.rabbitmq.request_timeout_ms,
2784        );
2785        if let Some(nodes) = parse_env_optional::<u32>("RABBITMQ_NODES_NUMBER") {
2786            self.adapter.rabbitmq.nodes_number = Some(nodes);
2787        }
2788
2789        // --- Google Pub/Sub Adapter ---
2790        if let Ok(project_id) = std::env::var("GOOGLE_PUBSUB_PROJECT_ID") {
2791            self.adapter.google_pubsub.project_id = project_id;
2792        }
2793        if let Ok(prefix) = std::env::var("GOOGLE_PUBSUB_PREFIX") {
2794            self.adapter.google_pubsub.prefix = prefix;
2795        }
2796        if let Ok(emulator_host) = std::env::var("PUBSUB_EMULATOR_HOST") {
2797            self.adapter.google_pubsub.emulator_host = Some(emulator_host);
2798        }
2799        self.adapter.google_pubsub.request_timeout_ms = parse_env::<u64>(
2800            "GOOGLE_PUBSUB_REQUEST_TIMEOUT_MS",
2801            self.adapter.google_pubsub.request_timeout_ms,
2802        );
2803        if let Some(nodes) = parse_env_optional::<u32>("GOOGLE_PUBSUB_NODES_NUMBER") {
2804            self.adapter.google_pubsub.nodes_number = Some(nodes);
2805        }
2806
2807        // --- Kafka Adapter ---
2808        if let Ok(brokers) = std::env::var("KAFKA_BROKERS") {
2809            self.adapter.kafka.brokers = brokers.split(',').map(|s| s.trim().to_string()).collect();
2810        }
2811        if let Ok(prefix) = std::env::var("KAFKA_PREFIX") {
2812            self.adapter.kafka.prefix = prefix;
2813        }
2814        if let Ok(protocol) = std::env::var("KAFKA_SECURITY_PROTOCOL") {
2815            self.adapter.kafka.security_protocol = Some(protocol);
2816        }
2817        if let Ok(mechanism) = std::env::var("KAFKA_SASL_MECHANISM") {
2818            self.adapter.kafka.sasl_mechanism = Some(mechanism);
2819        }
2820        if let Ok(username) = std::env::var("KAFKA_SASL_USERNAME") {
2821            self.adapter.kafka.sasl_username = Some(username);
2822        }
2823        if let Ok(password) = std::env::var("KAFKA_SASL_PASSWORD") {
2824            self.adapter.kafka.sasl_password = Some(password);
2825        }
2826        self.adapter.kafka.request_timeout_ms = parse_env::<u64>(
2827            "KAFKA_REQUEST_TIMEOUT_MS",
2828            self.adapter.kafka.request_timeout_ms,
2829        );
2830        if let Some(nodes) = parse_env_optional::<u32>("KAFKA_NODES_NUMBER") {
2831            self.adapter.kafka.nodes_number = Some(nodes);
2832        }
2833
2834        // --- CORS ---
2835        if let Ok(origins) = std::env::var("CORS_ORIGINS") {
2836            let parsed: Vec<String> = origins.split(',').map(|s| s.trim().to_string()).collect();
2837            if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&parsed) {
2838                warn!(
2839                    "CORS_ORIGINS contains invalid patterns: {}. Keeping previous CORS origins.",
2840                    e
2841                );
2842            } else {
2843                self.cors.origin = parsed;
2844            }
2845        }
2846        if let Ok(methods) = std::env::var("CORS_METHODS") {
2847            self.cors.methods = methods.split(',').map(|s| s.trim().to_string()).collect();
2848        }
2849        if let Ok(headers) = std::env::var("CORS_HEADERS") {
2850            self.cors.allowed_headers = headers.split(',').map(|s| s.trim().to_string()).collect();
2851        }
2852        self.cors.credentials = parse_bool_env("CORS_CREDENTIALS", self.cors.credentials);
2853
2854        // --- Performance Tuning ---
2855        self.database_pooling.enabled =
2856            parse_bool_env("DATABASE_POOLING_ENABLED", self.database_pooling.enabled);
2857        if let Some(min) = parse_env_optional::<u32>("DATABASE_POOL_MIN") {
2858            self.database_pooling.min = min;
2859        }
2860        if let Some(max) = parse_env_optional::<u32>("DATABASE_POOL_MAX") {
2861            self.database_pooling.max = max;
2862        }
2863
2864        if let Some(pool_size) = parse_env_optional::<u32>("DATABASE_CONNECTION_POOL_SIZE") {
2865            self.database.mysql.connection_pool_size = pool_size;
2866            self.database.postgres.connection_pool_size = pool_size;
2867        }
2868        if let Some(cache_ttl) = parse_env_optional::<u64>("CACHE_TTL_SECONDS") {
2869            self.app_manager.cache.ttl = cache_ttl;
2870            self.channel_limits.cache_ttl = cache_ttl;
2871            self.database.mysql.cache_ttl = cache_ttl;
2872            self.database.postgres.cache_ttl = cache_ttl;
2873            self.database.surrealdb.cache_ttl = cache_ttl;
2874            self.cache.memory.ttl = cache_ttl;
2875        }
2876        if let Some(cleanup_interval) = parse_env_optional::<u64>("CACHE_CLEANUP_INTERVAL") {
2877            self.database.mysql.cache_cleanup_interval = cleanup_interval;
2878            self.database.postgres.cache_cleanup_interval = cleanup_interval;
2879            self.cache.memory.cleanup_interval = cleanup_interval;
2880        }
2881        if let Some(max_capacity) = parse_env_optional::<u64>("CACHE_MAX_CAPACITY") {
2882            self.database.mysql.cache_max_capacity = max_capacity;
2883            self.database.postgres.cache_max_capacity = max_capacity;
2884            self.database.surrealdb.cache_max_capacity = max_capacity;
2885            self.cache.memory.max_capacity = max_capacity;
2886        }
2887
2888        let default_app_id = std::env::var("SOCKUDO_DEFAULT_APP_ID");
2889        let default_app_key = std::env::var("SOCKUDO_DEFAULT_APP_KEY");
2890        let default_app_secret = std::env::var("SOCKUDO_DEFAULT_APP_SECRET");
2891        let default_app_enabled = parse_bool_env("SOCKUDO_DEFAULT_APP_ENABLED", true);
2892
2893        if let (Ok(app_id), Ok(app_key), Ok(app_secret)) =
2894            (default_app_id, default_app_key, default_app_secret)
2895            && default_app_enabled
2896        {
2897            let default_app = App::from_policy(
2898                app_id,
2899                app_key,
2900                app_secret,
2901                default_app_enabled,
2902                crate::app::AppPolicy {
2903                    limits: crate::app::AppLimitsPolicy {
2904                        max_connections: parse_env::<u32>(
2905                            "SOCKUDO_DEFAULT_APP_MAX_CONNECTIONS",
2906                            100,
2907                        ),
2908                        max_backend_events_per_second: Some(parse_env::<u32>(
2909                            "SOCKUDO_DEFAULT_APP_MAX_BACKEND_EVENTS_PER_SECOND",
2910                            100,
2911                        )),
2912                        max_client_events_per_second: parse_env::<u32>(
2913                            "SOCKUDO_DEFAULT_APP_MAX_CLIENT_EVENTS_PER_SECOND",
2914                            100,
2915                        ),
2916                        max_read_requests_per_second: Some(parse_env::<u32>(
2917                            "SOCKUDO_DEFAULT_APP_MAX_READ_REQUESTS_PER_SECOND",
2918                            100,
2919                        )),
2920                        max_presence_members_per_channel: Some(parse_env::<u32>(
2921                            "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBERS_PER_CHANNEL",
2922                            100,
2923                        )),
2924                        max_presence_member_size_in_kb: Some(parse_env::<u32>(
2925                            "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBER_SIZE_IN_KB",
2926                            100,
2927                        )),
2928                        max_channel_name_length: Some(parse_env::<u32>(
2929                            "SOCKUDO_DEFAULT_APP_MAX_CHANNEL_NAME_LENGTH",
2930                            100,
2931                        )),
2932                        max_event_channels_at_once: Some(parse_env::<u32>(
2933                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_CHANNELS_AT_ONCE",
2934                            100,
2935                        )),
2936                        max_event_name_length: Some(parse_env::<u32>(
2937                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_NAME_LENGTH",
2938                            100,
2939                        )),
2940                        max_event_payload_in_kb: Some(parse_env::<u32>(
2941                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_PAYLOAD_IN_KB",
2942                            100,
2943                        )),
2944                        max_event_batch_size: Some(parse_env::<u32>(
2945                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_BATCH_SIZE",
2946                            100,
2947                        )),
2948                        decay_seconds: None,
2949                        terminate_on_limit: false,
2950                        message_rate_limit: None,
2951                    },
2952                    features: crate::app::AppFeaturesPolicy {
2953                        enable_client_messages: std::env::var(
2954                            "SOCKUDO_DEFAULT_APP_ENABLE_CLIENT_MESSAGES",
2955                        )
2956                        .ok()
2957                        .map(|value| {
2958                            matches!(
2959                                value.trim().to_ascii_lowercase().as_str(),
2960                                "true" | "1" | "yes" | "on"
2961                            )
2962                        })
2963                        .unwrap_or_else(|| parse_bool_env("SOCKUDO_ENABLE_CLIENT_MESSAGES", false)),
2964                        enable_user_authentication: Some(parse_bool_env(
2965                            "SOCKUDO_DEFAULT_APP_ENABLE_USER_AUTHENTICATION",
2966                            false,
2967                        )),
2968                        enable_watchlist_events: Some(parse_bool_env(
2969                            "SOCKUDO_DEFAULT_APP_ENABLE_WATCHLIST_EVENTS",
2970                            false,
2971                        )),
2972                    },
2973                    channels: crate::app::AppChannelsPolicy {
2974                        allowed_origins: {
2975                            if let Ok(origins_str) =
2976                                std::env::var("SOCKUDO_DEFAULT_APP_ALLOWED_ORIGINS")
2977                            {
2978                                if !origins_str.is_empty() {
2979                                    Some(
2980                                        origins_str
2981                                            .split(',')
2982                                            .map(|s| s.trim().to_string())
2983                                            .collect(),
2984                                    )
2985                                } else {
2986                                    None
2987                                }
2988                            } else {
2989                                None
2990                            }
2991                        },
2992                        channel_delta_compression: None,
2993                        channel_namespaces: None,
2994                    },
2995                    webhooks: None,
2996                    idempotency: None,
2997                    connection_recovery: None,
2998                    history: None,
2999                    presence_history: None,
3000                },
3001            );
3002
3003            self.app_manager.array.apps.push(default_app);
3004            info!("Successfully registered default app from env");
3005        }
3006
3007        // Special handling for REDIS_URL
3008        if let Ok(redis_url_env) = std::env::var("REDIS_URL") {
3009            info!("Applying REDIS_URL environment variable override");
3010
3011            let redis_url_json = sonic_rs::json!(redis_url_env);
3012
3013            self.adapter
3014                .redis
3015                .redis_pub_options
3016                .insert("url".to_string(), redis_url_json.clone());
3017            self.adapter
3018                .redis
3019                .redis_sub_options
3020                .insert("url".to_string(), redis_url_json);
3021
3022            self.cache.redis.url_override = Some(redis_url_env.clone());
3023            self.queue.redis.url_override = Some(redis_url_env.clone());
3024            self.rate_limiter.redis.url_override = Some(redis_url_env);
3025        }
3026
3027        // --- Logging Configuration ---
3028        let has_colors_env = std::env::var("LOG_COLORS_ENABLED").is_ok();
3029        let has_target_env = std::env::var("LOG_INCLUDE_TARGET").is_ok();
3030        if has_colors_env || has_target_env {
3031            let logging_config = self.logging.get_or_insert_with(Default::default);
3032            if has_colors_env {
3033                logging_config.colors_enabled =
3034                    parse_bool_env("LOG_COLORS_ENABLED", logging_config.colors_enabled);
3035            }
3036            if has_target_env {
3037                logging_config.include_target =
3038                    parse_bool_env("LOG_INCLUDE_TARGET", logging_config.include_target);
3039            }
3040        }
3041
3042        // --- Cleanup Configuration ---
3043        self.cleanup.async_enabled =
3044            parse_bool_env("CLEANUP_ASYNC_ENABLED", self.cleanup.async_enabled);
3045        self.cleanup.fallback_to_sync =
3046            parse_bool_env("CLEANUP_FALLBACK_TO_SYNC", self.cleanup.fallback_to_sync);
3047        self.cleanup.queue_buffer_size =
3048            parse_env::<usize>("CLEANUP_QUEUE_BUFFER_SIZE", self.cleanup.queue_buffer_size);
3049        self.cleanup.batch_size = parse_env::<usize>("CLEANUP_BATCH_SIZE", self.cleanup.batch_size);
3050        self.cleanup.batch_timeout_ms =
3051            parse_env::<u64>("CLEANUP_BATCH_TIMEOUT_MS", self.cleanup.batch_timeout_ms);
3052        if let Ok(worker_threads_str) = std::env::var("CLEANUP_WORKER_THREADS") {
3053            self.cleanup.worker_threads = if worker_threads_str.to_lowercase() == "auto" {
3054                WorkerThreadsConfig::Auto
3055            } else if let Ok(n) = worker_threads_str.parse::<usize>() {
3056                WorkerThreadsConfig::Fixed(n)
3057            } else {
3058                warn!(
3059                    "Invalid CLEANUP_WORKER_THREADS value '{}', keeping current setting",
3060                    worker_threads_str
3061                );
3062                self.cleanup.worker_threads.clone()
3063            };
3064        }
3065        self.cleanup.max_retry_attempts = parse_env::<u32>(
3066            "CLEANUP_MAX_RETRY_ATTEMPTS",
3067            self.cleanup.max_retry_attempts,
3068        );
3069
3070        // Cluster health configuration
3071        self.cluster_health.enabled =
3072            parse_bool_env("CLUSTER_HEALTH_ENABLED", self.cluster_health.enabled);
3073        self.cluster_health.heartbeat_interval_ms = parse_env::<u64>(
3074            "CLUSTER_HEALTH_HEARTBEAT_INTERVAL",
3075            self.cluster_health.heartbeat_interval_ms,
3076        );
3077        self.cluster_health.node_timeout_ms = parse_env::<u64>(
3078            "CLUSTER_HEALTH_NODE_TIMEOUT",
3079            self.cluster_health.node_timeout_ms,
3080        );
3081        self.cluster_health.cleanup_interval_ms = parse_env::<u64>(
3082            "CLUSTER_HEALTH_CLEANUP_INTERVAL",
3083            self.cluster_health.cleanup_interval_ms,
3084        );
3085
3086        // Tag filtering configuration
3087        self.tag_filtering.enabled =
3088            parse_bool_env("TAG_FILTERING_ENABLED", self.tag_filtering.enabled);
3089
3090        // WebSocket buffer configuration
3091        if let Ok(val) = std::env::var("WEBSOCKET_MAX_MESSAGES") {
3092            if val.to_lowercase() == "none" || val == "0" {
3093                self.websocket.max_messages = None;
3094            } else if let Ok(n) = val.parse::<usize>() {
3095                self.websocket.max_messages = Some(n);
3096            }
3097        }
3098        if let Ok(val) = std::env::var("WEBSOCKET_MAX_BYTES") {
3099            if val.to_lowercase() == "none" || val == "0" {
3100                self.websocket.max_bytes = None;
3101            } else if let Ok(n) = val.parse::<usize>() {
3102                self.websocket.max_bytes = Some(n);
3103            }
3104        }
3105        self.websocket.disconnect_on_buffer_full = parse_bool_env(
3106            "WEBSOCKET_DISCONNECT_ON_BUFFER_FULL",
3107            self.websocket.disconnect_on_buffer_full,
3108        );
3109        self.websocket.max_message_size = parse_env::<usize>(
3110            "WEBSOCKET_MAX_MESSAGE_SIZE",
3111            self.websocket.max_message_size,
3112        );
3113        self.websocket.max_frame_size =
3114            parse_env::<usize>("WEBSOCKET_MAX_FRAME_SIZE", self.websocket.max_frame_size);
3115        self.websocket.write_buffer_size = parse_env::<usize>(
3116            "WEBSOCKET_WRITE_BUFFER_SIZE",
3117            self.websocket.write_buffer_size,
3118        );
3119        self.websocket.max_backpressure = parse_env::<usize>(
3120            "WEBSOCKET_MAX_BACKPRESSURE",
3121            self.websocket.max_backpressure,
3122        );
3123        self.websocket.auto_ping = parse_bool_env("WEBSOCKET_AUTO_PING", self.websocket.auto_ping);
3124        self.websocket.ping_interval =
3125            parse_env::<u32>("WEBSOCKET_PING_INTERVAL", self.websocket.ping_interval);
3126        self.websocket.idle_timeout =
3127            parse_env::<u32>("WEBSOCKET_IDLE_TIMEOUT", self.websocket.idle_timeout);
3128        if let Ok(mode) = std::env::var("WEBSOCKET_COMPRESSION") {
3129            self.websocket.compression = mode;
3130        }
3131
3132        // Connection recovery (includes serial + message_id + replay buffer)
3133        self.connection_recovery.enabled = parse_bool_env(
3134            "CONNECTION_RECOVERY_ENABLED",
3135            self.connection_recovery.enabled,
3136        );
3137        self.connection_recovery.buffer_ttl_seconds = parse_env::<u64>(
3138            "CONNECTION_RECOVERY_BUFFER_TTL",
3139            self.connection_recovery.buffer_ttl_seconds,
3140        );
3141        self.connection_recovery.max_buffer_size = parse_env::<usize>(
3142            "CONNECTION_RECOVERY_MAX_BUFFER_SIZE",
3143            self.connection_recovery.max_buffer_size,
3144        );
3145
3146        self.history.enabled = parse_bool_env("HISTORY_ENABLED", self.history.enabled);
3147        self.history.rewind_enabled =
3148            parse_bool_env("HISTORY_REWIND_ENABLED", self.history.rewind_enabled);
3149        self.history.retention_window_seconds = parse_env::<u64>(
3150            "HISTORY_RETENTION_WINDOW_SECONDS",
3151            self.history.retention_window_seconds,
3152        );
3153        self.history.max_page_size =
3154            parse_env::<usize>("HISTORY_MAX_PAGE_SIZE", self.history.max_page_size);
3155        self.history.writer_shards =
3156            parse_env::<usize>("HISTORY_WRITER_SHARDS", self.history.writer_shards);
3157        self.history.writer_queue_capacity = parse_env::<usize>(
3158            "HISTORY_WRITER_QUEUE_CAPACITY",
3159            self.history.writer_queue_capacity,
3160        );
3161        if let Ok(backend) = std::env::var("HISTORY_BACKEND") {
3162            self.history.backend = HistoryBackend::from_str(&backend)?;
3163        }
3164        if let Ok(max_messages) = std::env::var("HISTORY_MAX_MESSAGES_PER_CHANNEL") {
3165            self.history.max_messages_per_channel = Some(
3166                max_messages
3167                    .parse::<usize>()
3168                    .map_err(|e| format!("Invalid HISTORY_MAX_MESSAGES_PER_CHANNEL: {e}"))?,
3169            );
3170        }
3171        if let Ok(max_bytes) = std::env::var("HISTORY_MAX_BYTES_PER_CHANNEL") {
3172            self.history.max_bytes_per_channel = Some(
3173                max_bytes
3174                    .parse::<u64>()
3175                    .map_err(|e| format!("Invalid HISTORY_MAX_BYTES_PER_CHANNEL: {e}"))?,
3176            );
3177        }
3178        if let Ok(table_prefix) = std::env::var("HISTORY_POSTGRES_TABLE_PREFIX") {
3179            self.history.postgres.table_prefix = table_prefix;
3180        }
3181        self.history.postgres.write_timeout_ms = parse_env::<u64>(
3182            "HISTORY_POSTGRES_WRITE_TIMEOUT_MS",
3183            self.history.postgres.write_timeout_ms,
3184        );
3185        self.history.purge_interval_seconds = parse_env::<u64>(
3186            "HISTORY_PURGE_INTERVAL_SECONDS",
3187            self.history.purge_interval_seconds,
3188        );
3189        self.history.purge_batch_size =
3190            parse_env::<usize>("HISTORY_PURGE_BATCH_SIZE", self.history.purge_batch_size);
3191        self.history.max_purge_per_tick = parse_env::<usize>(
3192            "HISTORY_MAX_PURGE_PER_TICK",
3193            self.history.max_purge_per_tick,
3194        );
3195
3196        self.presence_history.enabled =
3197            parse_bool_env("PRESENCE_HISTORY_ENABLED", self.presence_history.enabled);
3198        self.presence_history.retention_window_seconds = parse_env::<u64>(
3199            "PRESENCE_HISTORY_RETENTION_WINDOW_SECONDS",
3200            self.presence_history.retention_window_seconds,
3201        );
3202        self.presence_history.max_page_size = parse_env::<usize>(
3203            "PRESENCE_HISTORY_MAX_PAGE_SIZE",
3204            self.presence_history.max_page_size,
3205        );
3206        if let Ok(max_events) = std::env::var("PRESENCE_HISTORY_MAX_EVENTS_PER_CHANNEL") {
3207            self.presence_history.max_events_per_channel =
3208                Some(max_events.parse::<usize>().map_err(|e| {
3209                    format!("Invalid PRESENCE_HISTORY_MAX_EVENTS_PER_CHANNEL: {e}")
3210                })?);
3211        }
3212        if let Ok(max_bytes) = std::env::var("PRESENCE_HISTORY_MAX_BYTES_PER_CHANNEL") {
3213            self.presence_history.max_bytes_per_channel = Some(
3214                max_bytes
3215                    .parse::<u64>()
3216                    .map_err(|e| format!("Invalid PRESENCE_HISTORY_MAX_BYTES_PER_CHANNEL: {e}"))?,
3217            );
3218        }
3219        self.idempotency.enabled = parse_bool_env("IDEMPOTENCY_ENABLED", self.idempotency.enabled);
3220        self.idempotency.ttl_seconds =
3221            parse_env::<u64>("IDEMPOTENCY_TTL_SECONDS", self.idempotency.ttl_seconds);
3222        self.idempotency.max_key_length = parse_env::<usize>(
3223            "IDEMPOTENCY_MAX_KEY_LENGTH",
3224            self.idempotency.max_key_length,
3225        );
3226
3227        self.ephemeral.enabled = parse_bool_env("EPHEMERAL_ENABLED", self.ephemeral.enabled);
3228
3229        self.echo_control.enabled =
3230            parse_bool_env("ECHO_CONTROL_ENABLED", self.echo_control.enabled);
3231        self.echo_control.default_echo_messages = parse_bool_env(
3232            "ECHO_CONTROL_DEFAULT_ECHO_MESSAGES",
3233            self.echo_control.default_echo_messages,
3234        );
3235
3236        self.event_name_filtering.enabled = parse_bool_env(
3237            "EVENT_NAME_FILTERING_ENABLED",
3238            self.event_name_filtering.enabled,
3239        );
3240        self.event_name_filtering.max_events_per_filter = parse_env::<usize>(
3241            "EVENT_NAME_FILTERING_MAX_EVENTS_PER_FILTER",
3242            self.event_name_filtering.max_events_per_filter,
3243        );
3244        self.event_name_filtering.max_event_name_length = parse_env::<usize>(
3245            "EVENT_NAME_FILTERING_MAX_EVENT_NAME_LENGTH",
3246            self.event_name_filtering.max_event_name_length,
3247        );
3248        self.versioned_messages.enabled = parse_bool_env(
3249            "VERSIONED_MESSAGES_ENABLED",
3250            self.versioned_messages.enabled,
3251        );
3252        if let Ok(driver_str) = std::env::var("VERSIONED_MESSAGES_DRIVER") {
3253            self.versioned_messages.driver = parse_driver_enum(
3254                driver_str,
3255                self.versioned_messages.driver.clone(),
3256                "VersionedMessages Backend",
3257            );
3258        }
3259        self.versioned_messages.max_page_size = parse_env::<usize>(
3260            "VERSIONED_MESSAGES_MAX_PAGE_SIZE",
3261            self.versioned_messages.max_page_size,
3262        );
3263        self.versioned_messages.retention_window_seconds = parse_env::<u64>(
3264            "VERSIONED_MESSAGES_RETENTION_WINDOW_SECONDS",
3265            self.versioned_messages.retention_window_seconds,
3266        );
3267        self.versioned_messages.purge_interval_seconds = parse_env::<u64>(
3268            "VERSIONED_MESSAGES_PURGE_INTERVAL_SECONDS",
3269            self.versioned_messages.purge_interval_seconds,
3270        );
3271        self.versioned_messages.purge_batch_size = parse_env::<usize>(
3272            "VERSIONED_MESSAGES_PURGE_BATCH_SIZE",
3273            self.versioned_messages.purge_batch_size,
3274        );
3275        self.versioned_messages.max_purge_per_tick = parse_env::<usize>(
3276            "VERSIONED_MESSAGES_MAX_PURGE_PER_TICK",
3277            self.versioned_messages.max_purge_per_tick,
3278        );
3279
3280        Ok(())
3281    }
3282
3283    pub fn validate(&self) -> Result<(), String> {
3284        if self.unix_socket.enabled {
3285            if self.unix_socket.path.is_empty() {
3286                return Err(
3287                    "Unix socket path cannot be empty when Unix socket is enabled".to_string(),
3288                );
3289            }
3290
3291            self.validate_unix_socket_security()?;
3292
3293            if self.ssl.enabled {
3294                tracing::warn!(
3295                    "Both Unix socket and SSL are enabled. This is unusual as Unix sockets are typically used behind reverse proxies that handle SSL termination."
3296                );
3297            }
3298
3299            if self.unix_socket.permission_mode > 0o777 {
3300                return Err(format!(
3301                    "Unix socket permission_mode ({:o}) is invalid. Must be a valid octal mode (0o000 to 0o777)",
3302                    self.unix_socket.permission_mode
3303                ));
3304            }
3305        }
3306
3307        if let Err(e) = self.cleanup.validate() {
3308            return Err(format!("Invalid cleanup configuration: {}", e));
3309        }
3310
3311        if self.history.enabled {
3312            if self.history.max_page_size == 0 {
3313                return Err("history.max_page_size must be greater than 0".to_string());
3314            }
3315            if self.history.writer_shards == 0 {
3316                return Err("history.writer_shards must be greater than 0".to_string());
3317            }
3318            if self.history.writer_queue_capacity == 0 {
3319                return Err("history.writer_queue_capacity must be greater than 0".to_string());
3320            }
3321            if self.history.retention_window_seconds == 0 {
3322                return Err("history.retention_window_seconds must be greater than 0".to_string());
3323            }
3324            if self.history.postgres.table_prefix.trim().is_empty() {
3325                return Err("history.postgres.table_prefix must not be empty".to_string());
3326            }
3327        }
3328
3329        if self.presence_history.enabled {
3330            if self.presence_history.max_page_size == 0 {
3331                return Err("presence_history.max_page_size must be greater than 0".to_string());
3332            }
3333            if self.presence_history.retention_window_seconds == 0 {
3334                return Err(
3335                    "presence_history.retention_window_seconds must be greater than 0".to_string(),
3336                );
3337            }
3338        }
3339
3340        if self.versioned_messages.enabled && self.versioned_messages.max_page_size == 0 {
3341            return Err("versioned_messages.max_page_size must be greater than 0".to_string());
3342        }
3343
3344        Ok(())
3345    }
3346
3347    fn validate_unix_socket_security(&self) -> Result<(), String> {
3348        let path = &self.unix_socket.path;
3349
3350        if path.contains("../") || path.contains("..\\") {
3351            return Err(
3352                "Unix socket path contains directory traversal sequences (../). This is not allowed for security reasons.".to_string()
3353            );
3354        }
3355
3356        if self.unix_socket.permission_mode & 0o002 != 0 {
3357            warn!(
3358                "Unix socket permission mode ({:o}) allows world write access. This may be a security risk. Consider using more restrictive permissions like 0o660 or 0o750.",
3359                self.unix_socket.permission_mode
3360            );
3361        }
3362
3363        if self.unix_socket.permission_mode & 0o007 > 0o005 {
3364            warn!(
3365                "Unix socket permission mode ({:o}) grants write permissions to others. Consider using more restrictive permissions.",
3366                self.unix_socket.permission_mode
3367            );
3368        }
3369
3370        if self.mode == "production" && path.starts_with("/tmp/") {
3371            warn!(
3372                "Unix socket path '{}' is in /tmp directory. In production, consider using a more permanent location like /var/run/sockudo/ for better security and persistence.",
3373                path
3374            );
3375        }
3376
3377        if !path.starts_with('/') {
3378            return Err(
3379                "Unix socket path must be absolute (start with /) for security and reliability."
3380                    .to_string(),
3381            );
3382        }
3383
3384        Ok(())
3385    }
3386}
3387
3388#[cfg(test)]
3389mod redis_connection_tests {
3390    use super::{ClusterNode, RedisClusterConnection, RedisConnection, RedisSentinel};
3391
3392    #[test]
3393    fn test_standard_url_basic() {
3394        let conn = RedisConnection {
3395            host: "127.0.0.1".to_string(),
3396            port: 6379,
3397            db: 0,
3398            username: None,
3399            password: None,
3400            key_prefix: "sockudo:".to_string(),
3401            sentinels: Vec::new(),
3402            sentinel_password: None,
3403            name: "mymaster".to_string(),
3404            cluster: RedisClusterConnection::default(),
3405            cluster_nodes: Vec::new(),
3406        };
3407        assert_eq!(conn.to_url(), "redis://127.0.0.1:6379/0");
3408    }
3409
3410    #[test]
3411    fn test_standard_url_with_password() {
3412        let conn = RedisConnection {
3413            host: "127.0.0.1".to_string(),
3414            port: 6379,
3415            db: 2,
3416            username: None,
3417            password: Some("secret".to_string()),
3418            key_prefix: "sockudo:".to_string(),
3419            sentinels: Vec::new(),
3420            sentinel_password: None,
3421            name: "mymaster".to_string(),
3422            cluster: RedisClusterConnection::default(),
3423            cluster_nodes: Vec::new(),
3424        };
3425        assert_eq!(conn.to_url(), "redis://:secret@127.0.0.1:6379/2");
3426    }
3427
3428    #[test]
3429    fn test_standard_url_with_username_and_password() {
3430        let conn = RedisConnection {
3431            host: "redis.example.com".to_string(),
3432            port: 6380,
3433            db: 1,
3434            username: Some("admin".to_string()),
3435            password: Some("pass123".to_string()),
3436            key_prefix: "sockudo:".to_string(),
3437            sentinels: Vec::new(),
3438            sentinel_password: None,
3439            name: "mymaster".to_string(),
3440            cluster: RedisClusterConnection::default(),
3441            cluster_nodes: Vec::new(),
3442        };
3443        assert_eq!(
3444            conn.to_url(),
3445            "redis://admin:pass123@redis.example.com:6380/1"
3446        );
3447    }
3448
3449    #[test]
3450    fn test_standard_url_with_special_chars_in_password() {
3451        let conn = RedisConnection {
3452            host: "127.0.0.1".to_string(),
3453            port: 6379,
3454            db: 0,
3455            username: None,
3456            password: Some("pass@word#123".to_string()),
3457            key_prefix: "sockudo:".to_string(),
3458            sentinels: Vec::new(),
3459            sentinel_password: None,
3460            name: "mymaster".to_string(),
3461            cluster: RedisClusterConnection::default(),
3462            cluster_nodes: Vec::new(),
3463        };
3464        assert_eq!(conn.to_url(), "redis://:pass%40word%23123@127.0.0.1:6379/0");
3465    }
3466
3467    #[test]
3468    fn test_is_sentinel_configured_false() {
3469        let conn = RedisConnection::default();
3470        assert!(!conn.is_sentinel_configured());
3471    }
3472
3473    #[test]
3474    fn test_is_sentinel_configured_true() {
3475        let conn = RedisConnection {
3476            sentinels: vec![RedisSentinel {
3477                host: "sentinel1".to_string(),
3478                port: 26379,
3479            }],
3480            ..Default::default()
3481        };
3482        assert!(conn.is_sentinel_configured());
3483    }
3484
3485    #[test]
3486    fn test_sentinel_url_basic() {
3487        let conn = RedisConnection {
3488            host: "127.0.0.1".to_string(),
3489            port: 6379,
3490            db: 0,
3491            username: None,
3492            password: None,
3493            key_prefix: "sockudo:".to_string(),
3494            sentinels: vec![
3495                RedisSentinel {
3496                    host: "sentinel1".to_string(),
3497                    port: 26379,
3498                },
3499                RedisSentinel {
3500                    host: "sentinel2".to_string(),
3501                    port: 26379,
3502                },
3503            ],
3504            sentinel_password: None,
3505            name: "mymaster".to_string(),
3506            cluster: RedisClusterConnection::default(),
3507            cluster_nodes: Vec::new(),
3508        };
3509        assert_eq!(
3510            conn.to_url(),
3511            "redis+sentinel://sentinel1:26379,sentinel2:26379/mymaster/0"
3512        );
3513    }
3514
3515    #[test]
3516    fn test_sentinel_url_with_sentinel_password() {
3517        let conn = RedisConnection {
3518            host: "127.0.0.1".to_string(),
3519            port: 6379,
3520            db: 0,
3521            username: None,
3522            password: None,
3523            key_prefix: "sockudo:".to_string(),
3524            sentinels: vec![RedisSentinel {
3525                host: "sentinel1".to_string(),
3526                port: 26379,
3527            }],
3528            sentinel_password: Some("sentinelpass".to_string()),
3529            name: "mymaster".to_string(),
3530            cluster: RedisClusterConnection::default(),
3531            cluster_nodes: Vec::new(),
3532        };
3533        assert_eq!(
3534            conn.to_url(),
3535            "redis+sentinel://:sentinelpass@sentinel1:26379/mymaster/0"
3536        );
3537    }
3538
3539    #[test]
3540    fn test_sentinel_url_with_master_password() {
3541        let conn = RedisConnection {
3542            host: "127.0.0.1".to_string(),
3543            port: 6379,
3544            db: 1,
3545            username: None,
3546            password: Some("masterpass".to_string()),
3547            key_prefix: "sockudo:".to_string(),
3548            sentinels: vec![RedisSentinel {
3549                host: "sentinel1".to_string(),
3550                port: 26379,
3551            }],
3552            sentinel_password: None,
3553            name: "mymaster".to_string(),
3554            cluster: RedisClusterConnection::default(),
3555            cluster_nodes: Vec::new(),
3556        };
3557        assert_eq!(
3558            conn.to_url(),
3559            "redis+sentinel://sentinel1:26379/mymaster/1?password=masterpass"
3560        );
3561    }
3562
3563    #[test]
3564    fn test_sentinel_url_with_all_auth() {
3565        let conn = RedisConnection {
3566            host: "127.0.0.1".to_string(),
3567            port: 6379,
3568            db: 2,
3569            username: Some("redisuser".to_string()),
3570            password: Some("redispass".to_string()),
3571            key_prefix: "sockudo:".to_string(),
3572            sentinels: vec![
3573                RedisSentinel {
3574                    host: "sentinel1".to_string(),
3575                    port: 26379,
3576                },
3577                RedisSentinel {
3578                    host: "sentinel2".to_string(),
3579                    port: 26380,
3580                },
3581            ],
3582            sentinel_password: Some("sentinelauth".to_string()),
3583            name: "production-master".to_string(),
3584            cluster: RedisClusterConnection::default(),
3585            cluster_nodes: Vec::new(),
3586        };
3587        assert_eq!(
3588            conn.to_url(),
3589            "redis+sentinel://:sentinelauth@sentinel1:26379,sentinel2:26380/production-master/2?password=redispass&username=redisuser"
3590        );
3591    }
3592
3593    #[test]
3594    fn test_sentinel_to_host_port() {
3595        let sentinel = RedisSentinel {
3596            host: "sentinel.example.com".to_string(),
3597            port: 26379,
3598        };
3599        assert_eq!(sentinel.to_host_port(), "sentinel.example.com:26379");
3600    }
3601
3602    #[test]
3603    fn test_cluster_node_urls_with_shared_cluster_auth_and_tls() {
3604        let conn = RedisConnection {
3605            cluster: RedisClusterConnection {
3606                nodes: vec![
3607                    ClusterNode {
3608                        host: "node1.secure-cluster.com".to_string(),
3609                        port: 7000,
3610                    },
3611                    ClusterNode {
3612                        host: "redis://node2.secure-cluster.com:7001".to_string(),
3613                        port: 7001,
3614                    },
3615                    ClusterNode {
3616                        host: "rediss://node3.secure-cluster.com".to_string(),
3617                        port: 7002,
3618                    },
3619                ],
3620                username: None,
3621                password: Some("cluster-secret".to_string()),
3622                use_tls: true,
3623            },
3624            ..Default::default()
3625        };
3626
3627        assert_eq!(
3628            conn.cluster_node_urls(),
3629            vec![
3630                "rediss://:cluster-secret@node1.secure-cluster.com:7000",
3631                "redis://:cluster-secret@node2.secure-cluster.com:7001",
3632                "rediss://:cluster-secret@node3.secure-cluster.com:7002",
3633            ]
3634        );
3635    }
3636
3637    #[test]
3638    fn test_cluster_node_urls_fallback_to_legacy_nodes() {
3639        let conn = RedisConnection {
3640            password: Some("fallback-secret".to_string()),
3641            cluster_nodes: vec![ClusterNode {
3642                host: "legacy-node.example.com".to_string(),
3643                port: 7000,
3644            }],
3645            ..Default::default()
3646        };
3647
3648        assert_eq!(
3649            conn.cluster_node_urls(),
3650            vec!["redis://:fallback-secret@legacy-node.example.com:7000"]
3651        );
3652    }
3653
3654    #[test]
3655    fn test_normalize_cluster_seed_urls() {
3656        let conn = RedisConnection {
3657            cluster: RedisClusterConnection {
3658                nodes: Vec::new(),
3659                username: Some("svc-user".to_string()),
3660                password: Some("svc-pass".to_string()),
3661                use_tls: true,
3662            },
3663            ..Default::default()
3664        };
3665
3666        let seeds = vec![
3667            "node1.example.com:7000".to_string(),
3668            "redis://node2.example.com:7001".to_string(),
3669            "rediss://node3.example.com".to_string(),
3670        ];
3671
3672        assert_eq!(
3673            conn.normalize_cluster_seed_urls(&seeds),
3674            vec![
3675                "rediss://svc-user:svc-pass@node1.example.com:7000",
3676                "redis://svc-user:svc-pass@node2.example.com:7001",
3677                "rediss://svc-user:svc-pass@node3.example.com:6379",
3678            ]
3679        );
3680    }
3681}
3682
3683#[cfg(test)]
3684mod cluster_node_tests {
3685    use super::ClusterNode;
3686
3687    #[test]
3688    fn test_to_url_basic_host() {
3689        let node = ClusterNode {
3690            host: "localhost".to_string(),
3691            port: 6379,
3692        };
3693        assert_eq!(node.to_url(), "redis://localhost:6379");
3694    }
3695
3696    #[test]
3697    fn test_to_url_ip_address() {
3698        let node = ClusterNode {
3699            host: "127.0.0.1".to_string(),
3700            port: 6379,
3701        };
3702        assert_eq!(node.to_url(), "redis://127.0.0.1:6379");
3703    }
3704
3705    #[test]
3706    fn test_to_url_with_redis_protocol() {
3707        let node = ClusterNode {
3708            host: "redis://example.com".to_string(),
3709            port: 6379,
3710        };
3711        assert_eq!(node.to_url(), "redis://example.com:6379");
3712    }
3713
3714    #[test]
3715    fn test_to_url_with_rediss_protocol() {
3716        let node = ClusterNode {
3717            host: "rediss://secure.example.com".to_string(),
3718            port: 6379,
3719        };
3720        assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
3721    }
3722
3723    #[test]
3724    fn test_to_url_with_rediss_protocol_and_port_in_url() {
3725        let node = ClusterNode {
3726            host: "rediss://secure.example.com:7000".to_string(),
3727            port: 6379,
3728        };
3729        assert_eq!(node.to_url(), "rediss://secure.example.com:7000");
3730    }
3731
3732    #[test]
3733    fn test_to_url_with_redis_protocol_and_port_in_url() {
3734        let node = ClusterNode {
3735            host: "redis://example.com:7001".to_string(),
3736            port: 6379,
3737        };
3738        assert_eq!(node.to_url(), "redis://example.com:7001");
3739    }
3740
3741    #[test]
3742    fn test_to_url_with_trailing_whitespace() {
3743        let node = ClusterNode {
3744            host: "  rediss://secure.example.com  ".to_string(),
3745            port: 6379,
3746        };
3747        assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
3748    }
3749
3750    #[test]
3751    fn test_to_url_custom_port() {
3752        let node = ClusterNode {
3753            host: "redis-cluster.example.com".to_string(),
3754            port: 7000,
3755        };
3756        assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7000");
3757    }
3758
3759    #[test]
3760    fn test_to_url_plain_host_with_port_in_host_field() {
3761        let node = ClusterNode {
3762            host: "redis-cluster.example.com:7010".to_string(),
3763            port: 7000,
3764        };
3765        assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7010");
3766    }
3767
3768    #[test]
3769    fn test_to_url_with_options_adds_auth_and_tls() {
3770        let node = ClusterNode {
3771            host: "node.example.com".to_string(),
3772            port: 7000,
3773        };
3774        assert_eq!(
3775            node.to_url_with_options(true, Some("svc-user"), Some("secret")),
3776            "rediss://svc-user:secret@node.example.com:7000"
3777        );
3778    }
3779
3780    #[test]
3781    fn test_to_url_with_options_keeps_embedded_auth() {
3782        let node = ClusterNode {
3783            host: "rediss://:node-secret@node.example.com:7000".to_string(),
3784            port: 7000,
3785        };
3786        assert_eq!(
3787            node.to_url_with_options(true, Some("global-user"), Some("global-secret")),
3788            "rediss://:node-secret@node.example.com:7000"
3789        );
3790    }
3791
3792    #[test]
3793    fn test_from_seed_parses_plain_host_port() {
3794        let node = ClusterNode::from_seed("cluster-node-1:7005").expect("node should parse");
3795        assert_eq!(node.host, "cluster-node-1");
3796        assert_eq!(node.port, 7005);
3797    }
3798
3799    #[test]
3800    fn test_from_seed_keeps_scheme_urls() {
3801        let node =
3802            ClusterNode::from_seed("rediss://secure.example.com:7005").expect("node should parse");
3803        assert_eq!(node.host, "rediss://secure.example.com:7005");
3804        assert_eq!(node.port, 7005);
3805    }
3806
3807    #[test]
3808    fn test_to_url_aws_elasticache_hostname() {
3809        let node = ClusterNode {
3810            host: "rediss://my-cluster.use1.cache.amazonaws.com".to_string(),
3811            port: 6379,
3812        };
3813        assert_eq!(
3814            node.to_url(),
3815            "rediss://my-cluster.use1.cache.amazonaws.com:6379"
3816        );
3817    }
3818
3819    #[test]
3820    fn test_to_url_with_ipv6_no_port() {
3821        let node = ClusterNode {
3822            host: "rediss://[::1]".to_string(),
3823            port: 6379,
3824        };
3825        assert_eq!(node.to_url(), "rediss://[::1]:6379");
3826    }
3827
3828    #[test]
3829    fn test_to_url_with_ipv6_and_port_in_url() {
3830        let node = ClusterNode {
3831            host: "rediss://[::1]:7000".to_string(),
3832            port: 6379,
3833        };
3834        assert_eq!(node.to_url(), "rediss://[::1]:7000");
3835    }
3836
3837    #[test]
3838    fn test_to_url_with_ipv6_full_address_no_port() {
3839        let node = ClusterNode {
3840            host: "rediss://[2001:db8::1]".to_string(),
3841            port: 6379,
3842        };
3843        assert_eq!(node.to_url(), "rediss://[2001:db8::1]:6379");
3844    }
3845
3846    #[test]
3847    fn test_to_url_with_ipv6_full_address_with_port() {
3848        let node = ClusterNode {
3849            host: "rediss://[2001:db8::1]:7000".to_string(),
3850            port: 6379,
3851        };
3852        assert_eq!(node.to_url(), "rediss://[2001:db8::1]:7000");
3853    }
3854
3855    #[test]
3856    fn test_to_url_with_redis_protocol_ipv6() {
3857        let node = ClusterNode {
3858            host: "redis://[::1]".to_string(),
3859            port: 6379,
3860        };
3861        assert_eq!(node.to_url(), "redis://[::1]:6379");
3862    }
3863}
3864
3865#[cfg(test)]
3866mod cors_config_tests {
3867    use super::CorsConfig;
3868
3869    fn cors_from_json(json: &str) -> sonic_rs::Result<CorsConfig> {
3870        sonic_rs::from_str(json)
3871    }
3872
3873    #[test]
3874    fn test_deserialize_valid_exact_origins() {
3875        let config =
3876            cors_from_json(r#"{"origin": ["https://example.com", "https://other.com"]}"#).unwrap();
3877        assert_eq!(config.origin.len(), 2);
3878    }
3879
3880    #[test]
3881    fn test_deserialize_valid_wildcard_patterns() {
3882        let config =
3883            cors_from_json(r#"{"origin": ["*.example.com", "https://*.staging.example.com"]}"#)
3884                .unwrap();
3885        assert_eq!(config.origin.len(), 2);
3886    }
3887
3888    #[test]
3889    fn test_deserialize_allows_special_markers() {
3890        assert!(cors_from_json(r#"{"origin": ["*"]}"#).is_ok());
3891        assert!(cors_from_json(r#"{"origin": ["Any"]}"#).is_ok());
3892        assert!(cors_from_json(r#"{"origin": ["any"]}"#).is_ok());
3893        assert!(cors_from_json(r#"{"origin": ["*", "https://example.com"]}"#).is_ok());
3894    }
3895
3896    #[test]
3897    fn test_deserialize_rejects_invalid_patterns() {
3898        assert!(cors_from_json(r#"{"origin": ["*.*bad"]}"#).is_err());
3899        assert!(cors_from_json(r#"{"origin": ["*."]}"#).is_err());
3900        assert!(cors_from_json(r#"{"origin": [""]}"#).is_err());
3901        assert!(cors_from_json(r#"{"origin": ["https://"]}"#).is_err());
3902    }
3903
3904    #[test]
3905    fn test_deserialize_rejects_mixed_valid_and_invalid() {
3906        assert!(cors_from_json(r#"{"origin": ["https://good.com", "*.*bad"]}"#).is_err());
3907    }
3908}