Skip to main content

sockudo_core/
options.rs

1use crate::app::App;
2use crate::utils::{parse_bool_env, parse_env, parse_env_optional};
3use ahash::AHashMap;
4use serde::{Deserialize, Serialize};
5use std::str::FromStr;
6use tracing::{info, warn};
7use url::Url;
8
9// Custom deserializer for octal permission mode (string format only, like chmod)
10fn deserialize_octal_permission<'de, D>(deserializer: D) -> Result<u32, D::Error>
11where
12    D: serde::Deserializer<'de>,
13{
14    use serde::de::{self};
15
16    let s = String::deserialize(deserializer)?;
17
18    // Validate that the string contains only octal digits
19    if !s.chars().all(|c| c.is_digit(8)) {
20        return Err(de::Error::custom(format!(
21            "invalid octal permission mode '{}': must contain only digits 0-7",
22            s
23        )));
24    }
25
26    // Parse as octal
27    let mode = u32::from_str_radix(&s, 8)
28        .map_err(|_| de::Error::custom(format!("invalid octal permission mode: {}", s)))?;
29
30    // Validate it's within valid Unix permission range
31    if mode > 0o777 {
32        return Err(de::Error::custom(format!(
33            "permission mode '{}' exceeds maximum value 777",
34            s
35        )));
36    }
37
38    Ok(mode)
39}
40
41// Helper function to parse driver enums with fallback behavior (matches main.rs)
42fn parse_driver_enum<T: FromStr + Clone + std::fmt::Debug>(
43    driver_str: String,
44    default_driver: T,
45    driver_name: &str,
46) -> T
47where
48    <T as FromStr>::Err: std::fmt::Debug,
49{
50    match T::from_str(&driver_str.to_lowercase()) {
51        Ok(driver_enum) => driver_enum,
52        Err(e) => {
53            warn!(
54                "Failed to parse {} driver from string '{}': {:?}. Using default: {:?}.",
55                driver_name, driver_str, e, default_driver
56            );
57            default_driver
58        }
59    }
60}
61
62fn override_db_pool_settings(db_conn: &mut DatabaseConnection, prefix: &str) {
63    if let Some(min) = parse_env_optional::<u32>(&format!("{}_POOL_MIN", prefix)) {
64        db_conn.pool_min = Some(min);
65    }
66    if let Some(max) = parse_env_optional::<u32>(&format!("{}_POOL_MAX", prefix)) {
67        db_conn.pool_max = Some(max);
68    }
69}
70
71// --- Enums for Driver Types ---
72
73#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(rename_all = "lowercase")]
75pub enum AdapterDriver {
76    #[default]
77    Local,
78    Redis,
79    #[serde(rename = "redis-cluster")]
80    RedisCluster,
81    Nats,
82    Pulsar,
83    RabbitMq,
84    #[serde(rename = "google-pubsub")]
85    GooglePubSub,
86    Kafka,
87    Iggy,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91#[serde(default)]
92pub struct DynamoDbSettings {
93    pub region: String,
94    pub table_name: String,
95    pub endpoint_url: Option<String>,
96    pub aws_access_key_id: Option<String>,
97    pub aws_secret_access_key: Option<String>,
98    pub aws_profile_name: Option<String>,
99}
100
101impl Default for DynamoDbSettings {
102    fn default() -> Self {
103        Self {
104            region: "us-east-1".to_string(),
105            table_name: "sockudo-applications".to_string(),
106            endpoint_url: None,
107            aws_access_key_id: None,
108            aws_secret_access_key: None,
109            aws_profile_name: None,
110        }
111    }
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
115#[serde(default)]
116pub struct ScyllaDbSettings {
117    pub nodes: Vec<String>,
118    pub keyspace: String,
119    pub table_name: String,
120    pub username: Option<String>,
121    pub password: Option<String>,
122    pub replication_class: String,
123    pub replication_factor: u32,
124}
125
126impl Default for ScyllaDbSettings {
127    fn default() -> Self {
128        Self {
129            nodes: vec!["127.0.0.1:9042".to_string()],
130            keyspace: "sockudo".to_string(),
131            table_name: "applications".to_string(),
132            username: None,
133            password: None,
134            replication_class: "SimpleStrategy".to_string(),
135            replication_factor: 3,
136        }
137    }
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
141#[serde(default)]
142pub struct SurrealDbSettings {
143    pub url: String,
144    pub namespace: String,
145    pub database: String,
146    pub username: String,
147    pub password: String,
148    pub table_name: String,
149    pub cache_ttl: u64,
150    pub cache_max_capacity: u64,
151}
152
153impl Default for SurrealDbSettings {
154    fn default() -> Self {
155        Self {
156            url: "ws://127.0.0.1:8000".to_string(),
157            namespace: "sockudo".to_string(),
158            database: "sockudo".to_string(),
159            username: "root".to_string(),
160            password: "root".to_string(),
161            table_name: "applications".to_string(),
162            cache_ttl: 300,
163            cache_max_capacity: 100,
164        }
165    }
166}
167
168impl FromStr for AdapterDriver {
169    type Err = String;
170    fn from_str(s: &str) -> Result<Self, Self::Err> {
171        match s.to_lowercase().as_str() {
172            "local" => Ok(AdapterDriver::Local),
173            "redis" => Ok(AdapterDriver::Redis),
174            "redis-cluster" => Ok(AdapterDriver::RedisCluster),
175            "nats" => Ok(AdapterDriver::Nats),
176            "pulsar" => Ok(AdapterDriver::Pulsar),
177            "rabbitmq" | "rabbit-mq" => Ok(AdapterDriver::RabbitMq),
178            "google-pubsub" | "gcp-pubsub" | "pubsub" => Ok(AdapterDriver::GooglePubSub),
179            "kafka" => Ok(AdapterDriver::Kafka),
180            "iggy" | "apache-iggy" | "apache_iggy" => Ok(AdapterDriver::Iggy),
181            _ => Err(format!("Unknown adapter driver: {s}")),
182        }
183    }
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
187#[serde(rename_all = "lowercase")]
188pub enum AppManagerDriver {
189    #[default]
190    Memory,
191    Mysql,
192    Dynamodb,
193    PgSql,
194    SurrealDb,
195    ScyllaDb,
196}
197impl FromStr for AppManagerDriver {
198    type Err = String;
199    fn from_str(s: &str) -> Result<Self, Self::Err> {
200        match s.to_lowercase().as_str() {
201            "memory" => Ok(AppManagerDriver::Memory),
202            "mysql" => Ok(AppManagerDriver::Mysql),
203            "dynamodb" => Ok(AppManagerDriver::Dynamodb),
204            "pgsql" | "postgres" | "postgresql" => Ok(AppManagerDriver::PgSql),
205            "surreal" | "surrealdb" => Ok(AppManagerDriver::SurrealDb),
206            "scylladb" | "scylla" => Ok(AppManagerDriver::ScyllaDb),
207            _ => Err(format!("Unknown app manager driver: {s}")),
208        }
209    }
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
213#[serde(rename_all = "lowercase")]
214pub enum CacheDriver {
215    #[default]
216    Memory,
217    Redis,
218    #[serde(rename = "redis-cluster")]
219    RedisCluster,
220    None,
221}
222
223impl FromStr for CacheDriver {
224    type Err = String;
225    fn from_str(s: &str) -> Result<Self, Self::Err> {
226        match s.to_lowercase().as_str() {
227            "memory" => Ok(CacheDriver::Memory),
228            "redis" => Ok(CacheDriver::Redis),
229            "redis-cluster" => Ok(CacheDriver::RedisCluster),
230            "none" => Ok(CacheDriver::None),
231            _ => Err(format!("Unknown cache driver: {s}")),
232        }
233    }
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
237#[serde(rename_all = "lowercase")]
238pub enum QueueDriver {
239    Memory,
240    #[default]
241    Redis,
242    #[serde(rename = "redis-cluster")]
243    RedisCluster,
244    Nats,
245    Pulsar,
246    RabbitMq,
247    #[serde(rename = "google-pubsub")]
248    GooglePubSub,
249    Kafka,
250    Iggy,
251    Sqs,
252    Sns,
253    None,
254}
255
256impl FromStr for QueueDriver {
257    type Err = String;
258    fn from_str(s: &str) -> Result<Self, Self::Err> {
259        match s.to_lowercase().as_str() {
260            "memory" => Ok(QueueDriver::Memory),
261            "redis" => Ok(QueueDriver::Redis),
262            "redis-cluster" => Ok(QueueDriver::RedisCluster),
263            "nats" => Ok(QueueDriver::Nats),
264            "pulsar" => Ok(QueueDriver::Pulsar),
265            "rabbitmq" | "rabbit-mq" => Ok(QueueDriver::RabbitMq),
266            "google-pubsub" | "gcp-pubsub" | "pubsub" => Ok(QueueDriver::GooglePubSub),
267            "kafka" => Ok(QueueDriver::Kafka),
268            "iggy" | "apache-iggy" | "apache_iggy" => Ok(QueueDriver::Iggy),
269            "sqs" => Ok(QueueDriver::Sqs),
270            "sns" => Ok(QueueDriver::Sns),
271            "none" => Ok(QueueDriver::None),
272            _ => Err(format!("Unknown queue driver: {s}")),
273        }
274    }
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
278#[serde(rename_all = "snake_case")]
279pub enum DeltaCoordinationBackend {
280    #[default]
281    Auto,
282    None,
283    Redis,
284    RedisCluster,
285    Nats,
286}
287
288impl FromStr for DeltaCoordinationBackend {
289    type Err = String;
290
291    fn from_str(s: &str) -> Result<Self, Self::Err> {
292        match s.trim().to_ascii_lowercase().as_str() {
293            "auto" => Ok(Self::Auto),
294            "none" => Ok(Self::None),
295            "redis" => Ok(Self::Redis),
296            "redis-cluster" | "redis_cluster" => Ok(Self::RedisCluster),
297            "nats" => Ok(Self::Nats),
298            _ => Err(format!("Unknown delta coordination backend: {s}")),
299        }
300    }
301}
302
303impl AsRef<str> for QueueDriver {
304    fn as_ref(&self) -> &str {
305        match self {
306            QueueDriver::Memory => "memory",
307            QueueDriver::Redis => "redis",
308            QueueDriver::RedisCluster => "redis-cluster",
309            QueueDriver::Nats => "nats",
310            QueueDriver::Pulsar => "pulsar",
311            QueueDriver::RabbitMq => "rabbitmq",
312            QueueDriver::GooglePubSub => "google-pubsub",
313            QueueDriver::Kafka => "kafka",
314            QueueDriver::Iggy => "iggy",
315            QueueDriver::Sqs => "sqs",
316            QueueDriver::Sns => "sns",
317            QueueDriver::None => "none",
318        }
319    }
320}
321
322#[derive(Debug, Clone, Serialize, Deserialize)]
323#[serde(default)]
324pub struct RedisClusterQueueConfig {
325    pub concurrency: u32,
326    pub prefix: Option<String>,
327    pub nodes: Vec<String>,
328    pub request_timeout_ms: u64,
329}
330
331impl Default for RedisClusterQueueConfig {
332    fn default() -> Self {
333        Self {
334            concurrency: 5,
335            prefix: Some("sockudo_queue:".to_string()),
336            nodes: vec!["redis://127.0.0.1:6379".to_string()],
337            request_timeout_ms: 5000,
338        }
339    }
340}
341
342#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
343#[serde(rename_all = "lowercase")]
344pub enum MetricsDriver {
345    #[default]
346    Prometheus,
347}
348
349#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
350#[serde(rename_all = "lowercase")]
351pub enum LogOutputFormat {
352    #[default]
353    Human,
354    Json,
355}
356
357impl FromStr for LogOutputFormat {
358    type Err = String;
359    fn from_str(s: &str) -> Result<Self, Self::Err> {
360        match s.to_lowercase().as_str() {
361            "human" => Ok(LogOutputFormat::Human),
362            "json" => Ok(LogOutputFormat::Json),
363            _ => Err(format!("Unknown log output format: {s}")),
364        }
365    }
366}
367
368impl FromStr for MetricsDriver {
369    type Err = String;
370    fn from_str(s: &str) -> Result<Self, Self::Err> {
371        match s.to_lowercase().as_str() {
372            "prometheus" => Ok(MetricsDriver::Prometheus),
373            _ => Err(format!("Unknown metrics driver: {s}")),
374        }
375    }
376}
377
378impl AsRef<str> for MetricsDriver {
379    fn as_ref(&self) -> &str {
380        match self {
381            MetricsDriver::Prometheus => "prometheus",
382        }
383    }
384}
385
386// --- Main Configuration Struct ---
387#[derive(Debug, Clone, Serialize, Deserialize)]
388#[serde(default)]
389pub struct ServerOptions {
390    pub adapter: AdapterConfig,
391    pub app_manager: AppManagerConfig,
392    pub cache: CacheConfig,
393    pub channel_limits: ChannelLimits,
394    pub cors: CorsConfig,
395    pub database: DatabaseConfig,
396    pub database_pooling: DatabasePooling,
397    pub debug: bool,
398    pub event_limits: EventLimits,
399    pub host: String,
400    pub http_api: HttpApiConfig,
401    pub instance: InstanceConfig,
402    pub logging: Option<LoggingConfig>,
403    pub metrics: MetricsConfig,
404    pub mode: String,
405    pub port: u16,
406    pub path_prefix: String,
407    pub presence: PresenceConfig,
408    pub queue: QueueConfig,
409    pub rate_limiter: RateLimiterConfig,
410    pub shutdown_grace_period: u64,
411    pub ssl: SslConfig,
412    pub user_authentication_timeout: u64,
413    pub webhooks: WebhooksConfig,
414    pub websocket_max_payload_kb: u32,
415    pub cleanup: CleanupConfig,
416    pub activity_timeout: u64,
417    pub cluster_health: ClusterHealthConfig,
418    pub unix_socket: UnixSocketConfig,
419    pub delta_compression: DeltaCompressionOptionsConfig,
420    pub tag_filtering: TagFilteringConfig,
421    pub websocket: WebSocketConfig,
422    pub connection_recovery: ConnectionRecoveryConfig,
423    pub history: HistoryConfig,
424    pub presence_history: PresenceHistoryConfig,
425    pub idempotency: IdempotencyConfig,
426    pub ephemeral: EphemeralConfig,
427    pub echo_control: EchoControlConfig,
428    pub event_name_filtering: EventNameFilteringConfig,
429    pub versioned_messages: VersionedMessagesConfig,
430    pub annotations: AnnotationsConfig,
431    pub push: PushConfig,
432    /// Timeout in milliseconds for each subsystem check in the `/up` health endpoint.
433    /// Applies to adapter, cache, queue, and app manager checks independently.
434    pub health_check_timeout_ms: u64,
435}
436
437// --- Configuration Sub-Structs ---
438
439#[derive(Debug, Clone, Serialize, Deserialize)]
440#[serde(default)]
441pub struct SqsQueueConfig {
442    pub region: String,
443    pub queue_url_prefix: Option<String>,
444    pub visibility_timeout: i32,
445    pub endpoint_url: Option<String>,
446    pub max_messages: i32,
447    pub wait_time_seconds: i32,
448    pub concurrency: u32,
449    pub fifo: bool,
450    pub message_group_id: Option<String>,
451}
452
453#[derive(Debug, Clone, Serialize, Deserialize)]
454#[serde(default)]
455pub struct SnsQueueConfig {
456    pub region: String,
457    pub topic_arn: String,
458    pub endpoint_url: Option<String>,
459}
460
461#[derive(Debug, Clone, Serialize, Deserialize)]
462#[serde(default)]
463pub struct AdapterConfig {
464    pub driver: AdapterDriver,
465    pub redis: RedisAdapterConfig,
466    pub cluster: RedisClusterAdapterConfig,
467    pub nats: NatsAdapterConfig,
468    pub pulsar: PulsarAdapterConfig,
469    pub rabbitmq: RabbitMqAdapterConfig,
470    pub google_pubsub: GooglePubSubAdapterConfig,
471    pub kafka: KafkaAdapterConfig,
472    pub iggy: IggyConfig,
473    #[serde(default = "default_buffer_multiplier_per_cpu")]
474    pub buffer_multiplier_per_cpu: usize,
475    pub cluster_health: ClusterHealthConfig,
476    #[serde(default = "default_enable_socket_counting")]
477    pub enable_socket_counting: bool,
478    #[serde(default = "default_fallback_to_local")]
479    pub fallback_to_local: bool,
480}
481
482fn default_enable_socket_counting() -> bool {
483    true
484}
485
486fn default_fallback_to_local() -> bool {
487    true
488}
489
490fn default_buffer_multiplier_per_cpu() -> usize {
491    64
492}
493
494impl Default for AdapterConfig {
495    fn default() -> Self {
496        Self {
497            driver: AdapterDriver::default(),
498            redis: RedisAdapterConfig::default(),
499            cluster: RedisClusterAdapterConfig::default(),
500            nats: NatsAdapterConfig::default(),
501            pulsar: PulsarAdapterConfig::default(),
502            rabbitmq: RabbitMqAdapterConfig::default(),
503            google_pubsub: GooglePubSubAdapterConfig::default(),
504            kafka: KafkaAdapterConfig::default(),
505            iggy: IggyConfig::default(),
506            buffer_multiplier_per_cpu: default_buffer_multiplier_per_cpu(),
507            cluster_health: ClusterHealthConfig::default(),
508            enable_socket_counting: default_enable_socket_counting(),
509            fallback_to_local: default_fallback_to_local(),
510        }
511    }
512}
513
514#[derive(Debug, Clone, Serialize, Deserialize)]
515#[serde(default)]
516pub struct RedisAdapterConfig {
517    pub requests_timeout: u64,
518    pub prefix: String,
519    pub redis_pub_options: AHashMap<String, sonic_rs::Value>,
520    pub redis_sub_options: AHashMap<String, sonic_rs::Value>,
521    pub cluster_mode: bool,
522}
523
524#[derive(Debug, Clone, Serialize, Deserialize)]
525#[serde(default)]
526pub struct RedisClusterAdapterConfig {
527    pub nodes: Vec<String>,
528    pub prefix: String,
529    pub request_timeout_ms: u64,
530    pub use_connection_manager: bool,
531    #[serde(default)]
532    pub use_sharded_pubsub: bool,
533}
534
535#[derive(Debug, Clone, Serialize, Deserialize)]
536#[serde(default)]
537pub struct NatsAdapterConfig {
538    pub servers: Vec<String>,
539    pub prefix: String,
540    pub request_timeout_ms: u64,
541    pub username: Option<String>,
542    pub password: Option<String>,
543    pub token: Option<String>,
544    pub connection_timeout_ms: u64,
545    pub nodes_number: Option<u32>,
546    pub discovery_max_wait_ms: u64,
547    pub discovery_idle_wait_ms: u64,
548    pub subscription_capacity: Option<usize>,
549    pub client_capacity: Option<usize>,
550    pub max_reconnects: Option<usize>,
551    pub presence_sync_chunk_size: Option<usize>,
552}
553
554#[derive(Debug, Clone, Serialize, Deserialize)]
555#[serde(default)]
556pub struct PulsarAdapterConfig {
557    pub url: String,
558    pub prefix: String,
559    pub request_timeout_ms: u64,
560    pub token: Option<String>,
561    pub nodes_number: Option<u32>,
562}
563
564#[derive(Debug, Clone, Serialize, Deserialize)]
565#[serde(default)]
566pub struct RabbitMqAdapterConfig {
567    pub url: String,
568    pub prefix: String,
569    pub request_timeout_ms: u64,
570    pub connection_timeout_ms: u64,
571    pub nodes_number: Option<u32>,
572}
573
574#[derive(Debug, Clone, Serialize, Deserialize)]
575#[serde(default)]
576pub struct GooglePubSubAdapterConfig {
577    pub project_id: String,
578    pub prefix: String,
579    pub request_timeout_ms: u64,
580    pub emulator_host: Option<String>,
581    pub nodes_number: Option<u32>,
582}
583
584#[derive(Debug, Clone, Serialize, Deserialize)]
585#[serde(default)]
586pub struct KafkaAdapterConfig {
587    pub brokers: Vec<String>,
588    pub prefix: String,
589    pub request_timeout_ms: u64,
590    pub security_protocol: Option<String>,
591    pub sasl_mechanism: Option<String>,
592    pub sasl_username: Option<String>,
593    pub sasl_password: Option<String>,
594    pub nodes_number: Option<u32>,
595}
596
597#[derive(Debug, Clone, Serialize, Deserialize)]
598#[serde(default)]
599pub struct IggyConfig {
600    pub connection_string: String,
601    pub username: Option<String>,
602    pub password: Option<String>,
603    pub consumer_name: Option<String>,
604    pub stream: String,
605    pub topic_prefix: String,
606    pub queue_topic_prefix: String,
607    pub consumer_group_prefix: String,
608    pub request_timeout_ms: u64,
609    pub poll_interval_ms: u64,
610    pub poll_batch_size: u32,
611    pub partitions_count: u32,
612    pub partition_id: u32,
613    pub auto_create: bool,
614    pub start_from_latest: bool,
615    pub nodes_number: Option<u32>,
616}
617
618#[derive(Debug, Clone, Serialize, Deserialize, Default)]
619#[serde(default)]
620pub struct AppManagerConfig {
621    pub driver: AppManagerDriver,
622    pub array: ArrayConfig,
623    pub cache: CacheSettings,
624    pub scylladb: ScyllaDbSettings,
625}
626
627#[derive(Debug, Clone, Serialize, Deserialize, Default)]
628#[serde(default)]
629pub struct ArrayConfig {
630    pub apps: Vec<App>,
631}
632
633#[derive(Debug, Clone, Serialize, Deserialize)]
634#[serde(default)]
635pub struct CacheSettings {
636    pub enabled: bool,
637    pub ttl: u64,
638}
639
640#[derive(Debug, Clone, Serialize, Deserialize)]
641#[serde(default)]
642pub struct MemoryCacheOptions {
643    pub ttl: u64,
644    pub cleanup_interval: u64,
645    pub max_capacity: u64,
646}
647
648#[derive(Debug, Clone, Serialize, Deserialize)]
649#[serde(default)]
650pub struct CacheConfig {
651    pub driver: CacheDriver,
652    pub redis: RedisConfig,
653    pub memory: MemoryCacheOptions,
654}
655
656#[derive(Debug, Clone, Serialize, Deserialize, Default)]
657#[serde(default)]
658pub struct RedisConfig {
659    pub prefix: Option<String>,
660    pub url_override: Option<String>,
661    pub cluster_mode: bool,
662}
663
664#[derive(Debug, Clone, Serialize, Deserialize)]
665#[serde(default)]
666pub struct ChannelLimits {
667    pub max_name_length: u32,
668    pub cache_ttl: u64,
669}
670
671#[derive(Debug, Clone, Serialize, Deserialize)]
672#[serde(default)]
673pub struct CorsConfig {
674    pub credentials: bool,
675    #[serde(deserialize_with = "deserialize_and_validate_cors_origins")]
676    pub origin: Vec<String>,
677    pub methods: Vec<String>,
678    pub allowed_headers: Vec<String>,
679}
680
681fn deserialize_and_validate_cors_origins<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
682where
683    D: serde::Deserializer<'de>,
684{
685    use serde::de::Error;
686    let origins = Vec::<String>::deserialize(deserializer)?;
687
688    if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&origins) {
689        return Err(D::Error::custom(format!(
690            "CORS origin pattern validation failed: {}",
691            e
692        )));
693    }
694
695    Ok(origins)
696}
697
698#[derive(Debug, Clone, Serialize, Deserialize, Default)]
699#[serde(default)]
700pub struct DatabaseConfig {
701    pub mysql: DatabaseConnection,
702    pub postgres: DatabaseConnection,
703    pub redis: RedisConnection,
704    pub dynamodb: DynamoDbSettings,
705    pub surrealdb: SurrealDbSettings,
706    pub scylladb: ScyllaDbSettings,
707}
708
709#[derive(Debug, Clone, Serialize, Deserialize)]
710#[serde(default)]
711pub struct DatabaseConnection {
712    pub host: String,
713    pub port: u16,
714    pub username: String,
715    pub password: String,
716    pub database: String,
717    pub table_name: String,
718    pub connection_pool_size: u32,
719    pub pool_min: Option<u32>,
720    pub pool_max: Option<u32>,
721    pub cache_ttl: u64,
722    pub cache_cleanup_interval: u64,
723    pub cache_max_capacity: u64,
724}
725
726#[derive(Debug, Clone, Serialize, Deserialize)]
727#[serde(default)]
728pub struct RedisConnection {
729    pub host: String,
730    pub port: u16,
731    pub db: u32,
732    pub username: Option<String>,
733    pub password: Option<String>,
734    pub key_prefix: String,
735    pub sentinels: Vec<RedisSentinel>,
736    pub sentinel_password: Option<String>,
737    pub name: String,
738    pub cluster: RedisClusterConnection,
739    /// Legacy field kept for backward compatibility. Prefer `database.redis.cluster.nodes`.
740    pub cluster_nodes: Vec<ClusterNode>,
741}
742
743#[derive(Debug, Clone, Serialize, Deserialize)]
744#[serde(default)]
745pub struct RedisSentinel {
746    pub host: String,
747    pub port: u16,
748}
749
750#[derive(Debug, Clone, Serialize, Deserialize, Default)]
751#[serde(default)]
752pub struct RedisClusterConnection {
753    pub nodes: Vec<ClusterNode>,
754    pub username: Option<String>,
755    pub password: Option<String>,
756    #[serde(alias = "useTLS")]
757    pub use_tls: bool,
758}
759
760impl RedisConnection {
761    /// Returns true if Redis Sentinel is configured.
762    pub fn is_sentinel_configured(&self) -> bool {
763        !self.sentinels.is_empty()
764    }
765
766    /// Builds a Redis connection URL based on the configuration.
767    pub fn to_url(&self) -> String {
768        if self.is_sentinel_configured() {
769            self.build_sentinel_url()
770        } else {
771            self.build_standard_url()
772        }
773    }
774
775    fn build_standard_url(&self) -> String {
776        // Extract scheme from host if present, otherwise default to redis://
777        let (scheme, host) = if self.host.starts_with("rediss://") {
778            ("rediss://", self.host.trim_start_matches("rediss://"))
779        } else if self.host.starts_with("redis://") {
780            ("redis://", self.host.trim_start_matches("redis://"))
781        } else {
782            ("redis://", self.host.as_str())
783        };
784
785        let mut url = String::from(scheme);
786
787        if let Some(ref username) = self.username {
788            url.push_str(username);
789            if let Some(ref password) = self.password {
790                url.push(':');
791                url.push_str(&urlencoding::encode(password));
792            }
793            url.push('@');
794        } else if let Some(ref password) = self.password {
795            url.push(':');
796            url.push_str(&urlencoding::encode(password));
797            url.push('@');
798        }
799
800        url.push_str(host);
801        url.push(':');
802        url.push_str(&self.port.to_string());
803        url.push('/');
804        url.push_str(&self.db.to_string());
805
806        url
807    }
808
809    fn build_sentinel_url(&self) -> String {
810        let mut url = String::from("redis+sentinel://");
811
812        if let Some(ref sentinel_password) = self.sentinel_password {
813            url.push(':');
814            url.push_str(&urlencoding::encode(sentinel_password));
815            url.push('@');
816        }
817
818        let sentinel_hosts: Vec<String> = self
819            .sentinels
820            .iter()
821            .map(|s| format!("{}:{}", s.host, s.port))
822            .collect();
823        url.push_str(&sentinel_hosts.join(","));
824
825        url.push('/');
826        url.push_str(&self.name);
827        url.push('/');
828        url.push_str(&self.db.to_string());
829
830        let mut params = Vec::new();
831        if let Some(ref password) = self.password {
832            params.push(format!("password={}", urlencoding::encode(password)));
833        }
834        if let Some(ref username) = self.username {
835            params.push(format!("username={}", urlencoding::encode(username)));
836        }
837
838        if !params.is_empty() {
839            url.push('?');
840            url.push_str(&params.join("&"));
841        }
842
843        url
844    }
845
846    /// Returns true when cluster nodes are configured via either the new (`cluster.nodes`)
847    /// or legacy (`cluster_nodes`) field.
848    pub fn has_cluster_nodes(&self) -> bool {
849        !self.cluster.nodes.is_empty() || !self.cluster_nodes.is_empty()
850    }
851
852    /// Returns normalized Redis Cluster seed URLs from the canonical cluster configuration.
853    /// Falls back to legacy `cluster_nodes` for backward compatibility.
854    pub fn cluster_node_urls(&self) -> Vec<String> {
855        if !self.cluster.nodes.is_empty() {
856            return self.build_cluster_urls(&self.cluster.nodes);
857        }
858        self.build_cluster_urls(&self.cluster_nodes)
859    }
860
861    /// Normalizes any list of seed strings (`host:port`, `redis://...`, `rediss://...`) using
862    /// shared cluster auth/TLS options.
863    pub fn normalize_cluster_seed_urls(&self, seeds: &[String]) -> Vec<String> {
864        self.build_cluster_urls(
865            &seeds
866                .iter()
867                .filter_map(|seed| ClusterNode::from_seed(seed))
868                .collect::<Vec<ClusterNode>>(),
869        )
870    }
871
872    fn build_cluster_urls(&self, nodes: &[ClusterNode]) -> Vec<String> {
873        let username = self
874            .cluster
875            .username
876            .as_deref()
877            .or(self.username.as_deref());
878        let password = self
879            .cluster
880            .password
881            .as_deref()
882            .or(self.password.as_deref());
883        let use_tls = self.cluster.use_tls;
884
885        nodes
886            .iter()
887            .map(|node| node.to_url_with_options(use_tls, username, password))
888            .collect()
889    }
890}
891
892impl RedisSentinel {
893    pub fn to_host_port(&self) -> String {
894        format!("{}:{}", self.host, self.port)
895    }
896}
897
898#[derive(Debug, Clone, Serialize, Deserialize)]
899#[serde(default)]
900pub struct ClusterNode {
901    pub host: String,
902    pub port: u16,
903}
904
905impl ClusterNode {
906    pub fn to_url(&self) -> String {
907        self.to_url_with_options(false, None, None)
908    }
909
910    pub fn to_url_with_options(
911        &self,
912        use_tls: bool,
913        username: Option<&str>,
914        password: Option<&str>,
915    ) -> String {
916        let host = self.host.trim();
917
918        if host.starts_with("redis://") || host.starts_with("rediss://") {
919            if let Ok(parsed) = Url::parse(host)
920                && let Some(host_str) = parsed.host_str()
921            {
922                let scheme = parsed.scheme();
923                let port = parsed.port_or_known_default().unwrap_or(self.port);
924                let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
925                let parsed_password = parsed.password();
926                let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
927                let (effective_username, effective_password) = if has_embedded_auth {
928                    (parsed_username, parsed_password)
929                } else {
930                    (username, password)
931                };
932
933                return build_redis_url(
934                    scheme,
935                    host_str,
936                    port,
937                    effective_username,
938                    effective_password,
939                );
940            }
941
942            // Fallback for malformed URLs
943            let has_port = if let Some(bracket_pos) = host.rfind(']') {
944                host[bracket_pos..].contains(':')
945            } else {
946                host.split(':').count() >= 3
947            };
948            let base = if has_port {
949                host.to_string()
950            } else {
951                format!("{}:{}", host, self.port)
952            };
953
954            if let Ok(parsed) = Url::parse(&base) {
955                let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
956                let parsed_password = parsed.password();
957                if let Some(host_str) = parsed.host_str() {
958                    let port = parsed.port_or_known_default().unwrap_or(self.port);
959                    let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
960                    let (effective_username, effective_password) = if has_embedded_auth {
961                        (parsed_username, parsed_password)
962                    } else {
963                        (username, password)
964                    };
965                    return build_redis_url(
966                        parsed.scheme(),
967                        host_str,
968                        port,
969                        effective_username,
970                        effective_password,
971                    );
972                }
973            }
974            return base;
975        }
976
977        let (normalized_host, normalized_port) = split_plain_host_and_port(host, self.port);
978        let scheme = if use_tls { "rediss" } else { "redis" };
979        build_redis_url(
980            scheme,
981            &normalized_host,
982            normalized_port,
983            username,
984            password,
985        )
986    }
987
988    pub fn from_seed(seed: &str) -> Option<Self> {
989        let trimmed = seed.trim();
990        if trimmed.is_empty() {
991            return None;
992        }
993
994        if trimmed.starts_with("redis://") || trimmed.starts_with("rediss://") {
995            let port = Url::parse(trimmed)
996                .ok()
997                .and_then(|parsed| parsed.port_or_known_default())
998                .unwrap_or(6379);
999            return Some(Self {
1000                host: trimmed.to_string(),
1001                port,
1002            });
1003        }
1004
1005        let (host, port) = split_plain_host_and_port(trimmed, 6379);
1006        Some(Self { host, port })
1007    }
1008}
1009
1010fn split_plain_host_and_port(raw_host: &str, default_port: u16) -> (String, u16) {
1011    let host = raw_host.trim();
1012
1013    // Handle bracketed IPv6: [::1]:6379
1014    if host.starts_with('[') {
1015        if let Some(end_bracket) = host.find(']') {
1016            let host_part = host[1..end_bracket].to_string();
1017            let remainder = &host[end_bracket + 1..];
1018            if let Some(port_str) = remainder.strip_prefix(':')
1019                && let Ok(port) = port_str.parse::<u16>()
1020            {
1021                return (host_part, port);
1022            }
1023            return (host_part, default_port);
1024        }
1025        return (host.to_string(), default_port);
1026    }
1027
1028    // Handle hostname/IP with port: host:6379
1029    if host.matches(':').count() == 1
1030        && let Some((host_part, port_part)) = host.rsplit_once(':')
1031        && let Ok(port) = port_part.parse::<u16>()
1032    {
1033        return (host_part.to_string(), port);
1034    }
1035
1036    (host.to_string(), default_port)
1037}
1038
1039fn build_redis_url(
1040    scheme: &str,
1041    host: &str,
1042    port: u16,
1043    username: Option<&str>,
1044    password: Option<&str>,
1045) -> String {
1046    let mut url = format!("{scheme}://");
1047
1048    if let Some(user) = username {
1049        url.push_str(&urlencoding::encode(user));
1050        if let Some(pass) = password {
1051            url.push(':');
1052            url.push_str(&urlencoding::encode(pass));
1053        }
1054        url.push('@');
1055    } else if let Some(pass) = password {
1056        url.push(':');
1057        url.push_str(&urlencoding::encode(pass));
1058        url.push('@');
1059    }
1060
1061    if host.contains(':') && !host.starts_with('[') {
1062        url.push('[');
1063        url.push_str(host);
1064        url.push(']');
1065    } else {
1066        url.push_str(host);
1067    }
1068    url.push(':');
1069    url.push_str(&port.to_string());
1070    url
1071}
1072
1073#[derive(Debug, Clone, Serialize, Deserialize)]
1074#[serde(default)]
1075pub struct DatabasePooling {
1076    pub enabled: bool,
1077    pub min: u32,
1078    pub max: u32,
1079}
1080
1081#[derive(Debug, Clone, Serialize, Deserialize)]
1082#[serde(default)]
1083pub struct EventLimits {
1084    pub max_channels_at_once: u32,
1085    pub max_name_length: u32,
1086    pub max_payload_in_kb: u32,
1087    pub max_batch_size: u32,
1088}
1089
1090#[derive(Debug, Clone, Serialize, Deserialize)]
1091#[serde(default)]
1092pub struct IdempotencyConfig {
1093    /// Whether idempotency key support is enabled
1094    pub enabled: bool,
1095    /// TTL in seconds for idempotency keys (default: 120s, like Ably's 2-minute window)
1096    pub ttl_seconds: u64,
1097    /// Maximum length of an idempotency key
1098    pub max_key_length: usize,
1099}
1100
1101#[derive(Debug, Clone, Serialize, Deserialize)]
1102#[serde(default)]
1103pub struct EphemeralConfig {
1104    /// Whether ephemeral message handling is enabled.
1105    pub enabled: bool,
1106}
1107
1108#[derive(Debug, Clone, Serialize, Deserialize)]
1109#[serde(default)]
1110pub struct EchoControlConfig {
1111    /// Whether connection-level and per-message echo control is enabled.
1112    pub enabled: bool,
1113    /// Default echo behavior for new V2 connections when the query param is omitted.
1114    pub default_echo_messages: bool,
1115}
1116
1117#[derive(Debug, Clone, Serialize, Deserialize)]
1118#[serde(default)]
1119pub struct EventNameFilteringConfig {
1120    /// Whether per-subscription event name filtering is enabled.
1121    pub enabled: bool,
1122    /// Maximum event names allowed in a single filter.
1123    pub max_events_per_filter: usize,
1124    /// Maximum length for each event name in a filter.
1125    pub max_event_name_length: usize,
1126}
1127
1128#[derive(Debug, Clone, Serialize, Deserialize)]
1129#[serde(default)]
1130pub struct ConnectionRecoveryConfig {
1131    /// Whether connection recovery (resume) is enabled.
1132    /// When enabled, the server keeps a bounded replay buffer per channel so that
1133    /// reconnecting clients can receive missed messages. Disabled by default for
1134    /// Pusher protocol compatibility.
1135    pub enabled: bool,
1136    /// How long messages stay in the replay buffer (seconds). Default: 120 (2 min).
1137    pub buffer_ttl_seconds: u64,
1138    /// Maximum number of messages kept per channel. Default: 100.
1139    pub max_buffer_size: usize,
1140}
1141
1142#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1143#[serde(rename_all = "lowercase")]
1144pub enum VersionStoreDriver {
1145    #[default]
1146    Memory,
1147    Postgres,
1148    Mysql,
1149    DynamoDb,
1150    ScyllaDb,
1151    SurrealDb,
1152}
1153
1154impl FromStr for VersionStoreDriver {
1155    type Err = String;
1156    fn from_str(s: &str) -> Result<Self, Self::Err> {
1157        match s.to_lowercase().as_str() {
1158            "postgres" | "postgresql" | "pgsql" => Ok(Self::Postgres),
1159            "mysql" => Ok(Self::Mysql),
1160            "dynamodb" => Ok(Self::DynamoDb),
1161            "scylladb" | "scylla" => Ok(Self::ScyllaDb),
1162            "surrealdb" | "surreal" => Ok(Self::SurrealDb),
1163            "memory" => Ok(Self::Memory),
1164            _ => Err(format!("Unknown version store driver: {s}")),
1165        }
1166    }
1167}
1168
1169#[derive(Debug, Clone, Serialize, Deserialize)]
1170#[serde(default)]
1171pub struct VersionedMessagesConfig {
1172    /// Whether V2 mutable message HTTP/retrieval surfaces are enabled.
1173    pub enabled: bool,
1174    /// Storage driver for versioned messages. Defaults to memory.
1175    pub driver: VersionStoreDriver,
1176    /// Maximum page size for version-history retrieval.
1177    pub max_page_size: usize,
1178    /// Retention window for version entries, in seconds. `0` disables expiry
1179    /// (entries are kept forever).
1180    ///
1181    /// Backends with native row TTL (ScyllaDB `USING TTL`, DynamoDB TTL
1182    /// attribute) apply this at the storage layer. Other backends (MySQL,
1183    /// PostgreSQL, SurrealDB, Memory) enforce it via a periodic background
1184    /// purge worker controlled by `purge_interval_seconds`.
1185    pub retention_window_seconds: u64,
1186    /// Interval between purge worker runs, in seconds. Only consulted for
1187    /// backends without native TTL. Clamped to a minimum of 10 seconds.
1188    pub purge_interval_seconds: u64,
1189    /// Maximum rows deleted per purge query. Bounds lock/transaction sizes
1190    /// for SQL backends. The worker loops until no more expired rows remain
1191    /// or `max_purge_per_tick` is reached.
1192    pub purge_batch_size: usize,
1193    /// Hard cap on rows deleted per purge tick across all loop iterations.
1194    /// Prevents a backlog of expired rows from monopolising a worker run.
1195    pub max_purge_per_tick: usize,
1196}
1197
1198#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1199#[serde(default)]
1200pub struct AnnotationsConfig {
1201    /// Whether Sockudo-native annotation APIs and realtime annotation protocol
1202    /// surfaces are enabled. Disabled by default while the feature is opt-in.
1203    pub enabled: bool,
1204}
1205
1206#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1207#[serde(rename_all = "lowercase")]
1208pub enum HistoryBackend {
1209    #[default]
1210    Postgres,
1211    Mysql,
1212    DynamoDb,
1213    SurrealDb,
1214    ScyllaDb,
1215    Memory,
1216}
1217
1218impl FromStr for HistoryBackend {
1219    type Err = String;
1220    fn from_str(s: &str) -> Result<Self, Self::Err> {
1221        match s.to_lowercase().as_str() {
1222            "postgres" | "postgresql" | "pgsql" => Ok(Self::Postgres),
1223            "mysql" => Ok(Self::Mysql),
1224            "dynamodb" => Ok(Self::DynamoDb),
1225            "surrealdb" | "surreal" => Ok(Self::SurrealDb),
1226            "scylladb" | "scylla" => Ok(Self::ScyllaDb),
1227            "memory" => Ok(Self::Memory),
1228            _ => Err(format!("Unknown history backend: {s}")),
1229        }
1230    }
1231}
1232
1233#[derive(Debug, Clone, Serialize, Deserialize)]
1234#[serde(default)]
1235pub struct PostgresHistoryConfig {
1236    pub table_prefix: String,
1237    pub write_timeout_ms: u64,
1238}
1239
1240impl Default for PostgresHistoryConfig {
1241    fn default() -> Self {
1242        Self {
1243            table_prefix: "sockudo_history".to_string(),
1244            write_timeout_ms: 5000,
1245        }
1246    }
1247}
1248
1249#[derive(Debug, Clone, Serialize, Deserialize)]
1250#[serde(default)]
1251pub struct MySqlHistoryConfig {
1252    pub table_prefix: String,
1253    pub write_timeout_ms: u64,
1254}
1255
1256impl Default for MySqlHistoryConfig {
1257    fn default() -> Self {
1258        Self {
1259            table_prefix: "sockudo_history".to_string(),
1260            write_timeout_ms: 5000,
1261        }
1262    }
1263}
1264
1265#[derive(Debug, Clone, Serialize, Deserialize)]
1266#[serde(default)]
1267pub struct DynamoDbHistoryConfig {
1268    pub table_prefix: String,
1269    pub write_timeout_ms: u64,
1270}
1271
1272impl Default for DynamoDbHistoryConfig {
1273    fn default() -> Self {
1274        Self {
1275            table_prefix: "sockudo_history".to_string(),
1276            write_timeout_ms: 5000,
1277        }
1278    }
1279}
1280
1281#[derive(Debug, Clone, Serialize, Deserialize)]
1282#[serde(default)]
1283pub struct SurrealDbHistoryConfig {
1284    pub table_prefix: String,
1285    pub write_timeout_ms: u64,
1286}
1287
1288impl Default for SurrealDbHistoryConfig {
1289    fn default() -> Self {
1290        Self {
1291            table_prefix: "sockudo_history".to_string(),
1292            write_timeout_ms: 5000,
1293        }
1294    }
1295}
1296
1297#[derive(Debug, Clone, Serialize, Deserialize)]
1298#[serde(default)]
1299pub struct ScyllaDbHistoryConfig {
1300    pub table_prefix: String,
1301    pub write_timeout_ms: u64,
1302}
1303
1304impl Default for ScyllaDbHistoryConfig {
1305    fn default() -> Self {
1306        Self {
1307            table_prefix: "sockudo_history".to_string(),
1308            write_timeout_ms: 5000,
1309        }
1310    }
1311}
1312
1313#[derive(Debug, Clone, Serialize, Deserialize)]
1314#[serde(default)]
1315pub struct HistoryConfig {
1316    pub enabled: bool,
1317    pub rewind_enabled: bool,
1318    pub backend: HistoryBackend,
1319    pub retention_window_seconds: u64,
1320    pub max_page_size: usize,
1321    pub max_messages_per_channel: Option<usize>,
1322    pub max_bytes_per_channel: Option<u64>,
1323    pub writer_shards: usize,
1324    pub writer_queue_capacity: usize,
1325    pub purge_interval_seconds: u64,
1326    pub purge_batch_size: usize,
1327    pub max_purge_per_tick: usize,
1328    pub postgres: PostgresHistoryConfig,
1329    pub mysql: MySqlHistoryConfig,
1330    pub dynamodb: DynamoDbHistoryConfig,
1331    pub surrealdb: SurrealDbHistoryConfig,
1332    pub scylladb: ScyllaDbHistoryConfig,
1333}
1334
1335impl Default for HistoryConfig {
1336    fn default() -> Self {
1337        Self {
1338            enabled: false,
1339            rewind_enabled: true,
1340            backend: HistoryBackend::Postgres,
1341            retention_window_seconds: 86400,
1342            max_page_size: 100,
1343            max_messages_per_channel: None,
1344            max_bytes_per_channel: None,
1345            writer_shards: 16,
1346            writer_queue_capacity: 4096,
1347            purge_interval_seconds: 300,
1348            purge_batch_size: 1000,
1349            max_purge_per_tick: 100_000,
1350            postgres: PostgresHistoryConfig::default(),
1351            mysql: MySqlHistoryConfig::default(),
1352            dynamodb: DynamoDbHistoryConfig::default(),
1353            surrealdb: SurrealDbHistoryConfig::default(),
1354            scylladb: ScyllaDbHistoryConfig::default(),
1355        }
1356    }
1357}
1358
1359impl Default for VersionedMessagesConfig {
1360    fn default() -> Self {
1361        Self {
1362            enabled: false,
1363            driver: VersionStoreDriver::Memory,
1364            max_page_size: 100,
1365            retention_window_seconds: 0,
1366            purge_interval_seconds: 300,
1367            purge_batch_size: 1000,
1368            max_purge_per_tick: 100_000,
1369        }
1370    }
1371}
1372
1373#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1374#[serde(rename_all = "lowercase")]
1375pub enum PushStorageDriver {
1376    #[default]
1377    Memory,
1378    Postgres,
1379    Mysql,
1380    DynamoDb,
1381    SurrealDb,
1382    ScyllaDb,
1383}
1384
1385impl FromStr for PushStorageDriver {
1386    type Err = String;
1387    fn from_str(s: &str) -> Result<Self, Self::Err> {
1388        match s.to_lowercase().as_str() {
1389            "memory" => Ok(Self::Memory),
1390            "postgres" | "postgresql" | "pgsql" => Ok(Self::Postgres),
1391            "mysql" => Ok(Self::Mysql),
1392            "dynamodb" => Ok(Self::DynamoDb),
1393            "surrealdb" | "surreal" => Ok(Self::SurrealDb),
1394            "scylladb" | "scylla" => Ok(Self::ScyllaDb),
1395            _ => Err(format!("Unknown push storage driver: {s}")),
1396        }
1397    }
1398}
1399
1400#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1401#[serde(rename_all = "lowercase")]
1402pub enum PushQueueDriver {
1403    #[default]
1404    Memory,
1405    Redis,
1406    #[serde(rename = "redis-cluster")]
1407    RedisCluster,
1408    Nats,
1409    Pulsar,
1410    RabbitMq,
1411    #[serde(rename = "google-pubsub")]
1412    GooglePubsub,
1413    Kafka,
1414    Iggy,
1415    Sqs,
1416    Sns,
1417}
1418
1419impl FromStr for PushQueueDriver {
1420    type Err = String;
1421    fn from_str(s: &str) -> Result<Self, Self::Err> {
1422        match s.to_lowercase().as_str() {
1423            "memory" => Ok(Self::Memory),
1424            "redis" => Ok(Self::Redis),
1425            "redis-cluster" | "redis_cluster" => Ok(Self::RedisCluster),
1426            "nats" => Ok(Self::Nats),
1427            "pulsar" => Ok(Self::Pulsar),
1428            "rabbitmq" | "rabbit-mq" => Ok(Self::RabbitMq),
1429            "google-pubsub" | "google_pubsub" | "gcp-pubsub" | "pubsub" => Ok(Self::GooglePubsub),
1430            "kafka" => Ok(Self::Kafka),
1431            "iggy" | "apache-iggy" | "apache_iggy" => Ok(Self::Iggy),
1432            "sqs" => Ok(Self::Sqs),
1433            "sns" => Ok(Self::Sns),
1434            _ => Err(format!("Unknown push queue driver: {s}")),
1435        }
1436    }
1437}
1438
1439#[derive(Debug, Clone, Serialize, Deserialize)]
1440#[serde(default)]
1441pub struct PushConfig {
1442    pub storage_driver: PushStorageDriver,
1443    pub queue_driver: PushQueueDriver,
1444    pub fcm_enabled: bool,
1445    pub apns_enabled: bool,
1446    pub webpush_enabled: bool,
1447    pub hms_enabled: bool,
1448    pub wns_enabled: bool,
1449    pub fcm_credential_ref: Option<String>,
1450    pub apns_credential_ref: Option<String>,
1451    pub webpush_credential_ref: Option<String>,
1452    pub hms_credential_ref: Option<String>,
1453    pub wns_credential_ref: Option<String>,
1454    pub accept_worker_count: u32,
1455    pub planner_worker_count: u32,
1456    pub shard_worker_count: u32,
1457    pub dispatch_worker_count: u32,
1458    pub feedback_worker_count: u32,
1459    pub queue_partition_count: u32,
1460    pub channel_shard_count: u32,
1461    pub fanout_fast_threshold: u64,
1462    pub fanout_shard_size: u64,
1463    pub fanout_sync_threshold: u64,
1464    pub backpressure_lag_threshold_secs: u64,
1465    pub publish_status_ttl_days: u64,
1466    pub stale_device_max_age_days: u64,
1467    pub retry: PushRetryConfig,
1468    pub circuit_breaker: PushCircuitBreakerConfig,
1469    pub default_quotas: PushDefaultQuotas,
1470    pub credential_encryption_key: Option<String>,
1471    pub kms_key_ref: Option<String>,
1472    pub vault_secret_ref: Option<String>,
1473    pub dry_run: bool,
1474    pub analytics_enabled: bool,
1475    pub analytics_retention_days: u64,
1476    pub payload_redaction: PushPayloadRedactionConfig,
1477    pub scheduler_interval_secs: u64,
1478}
1479
1480impl Default for PushConfig {
1481    fn default() -> Self {
1482        Self {
1483            storage_driver: PushStorageDriver::Memory,
1484            queue_driver: PushQueueDriver::Memory,
1485            fcm_enabled: false,
1486            apns_enabled: false,
1487            webpush_enabled: false,
1488            hms_enabled: false,
1489            wns_enabled: false,
1490            fcm_credential_ref: None,
1491            apns_credential_ref: None,
1492            webpush_credential_ref: None,
1493            hms_credential_ref: None,
1494            wns_credential_ref: None,
1495            accept_worker_count: 1,
1496            planner_worker_count: 1,
1497            shard_worker_count: 1,
1498            dispatch_worker_count: 1,
1499            feedback_worker_count: 1,
1500            queue_partition_count: 1,
1501            channel_shard_count: 1,
1502            fanout_fast_threshold: 10_000,
1503            fanout_shard_size: 100_000,
1504            fanout_sync_threshold: 0,
1505            backpressure_lag_threshold_secs: 60,
1506            publish_status_ttl_days: 30,
1507            stale_device_max_age_days: 90,
1508            retry: PushRetryConfig::default(),
1509            circuit_breaker: PushCircuitBreakerConfig::default(),
1510            default_quotas: PushDefaultQuotas::default(),
1511            credential_encryption_key: None,
1512            kms_key_ref: None,
1513            vault_secret_ref: None,
1514            dry_run: false,
1515            analytics_enabled: false,
1516            analytics_retention_days: 30,
1517            payload_redaction: PushPayloadRedactionConfig::default(),
1518            scheduler_interval_secs: 5,
1519        }
1520    }
1521}
1522
1523#[derive(Debug, Clone, Serialize, Deserialize)]
1524#[serde(default)]
1525pub struct PushRetryConfig {
1526    pub max_attempts: u32,
1527    pub initial_backoff_ms: u64,
1528    pub max_backoff_ms: u64,
1529    pub max_elapsed_secs: u64,
1530    pub jitter: bool,
1531    pub respect_retry_after: bool,
1532}
1533
1534impl Default for PushRetryConfig {
1535    fn default() -> Self {
1536        Self {
1537            max_attempts: 5,
1538            initial_backoff_ms: 1_000,
1539            max_backoff_ms: 60_000,
1540            max_elapsed_secs: 86_400,
1541            jitter: true,
1542            respect_retry_after: true,
1543        }
1544    }
1545}
1546
1547#[derive(Debug, Clone, Serialize, Deserialize)]
1548#[serde(default)]
1549pub struct PushCircuitBreakerConfig {
1550    pub failure_threshold: u32,
1551    pub cooldown_secs: u64,
1552    pub half_open_max_inflight: u32,
1553}
1554
1555impl Default for PushCircuitBreakerConfig {
1556    fn default() -> Self {
1557        Self {
1558            failure_threshold: 5,
1559            cooldown_secs: 60,
1560            half_open_max_inflight: 10,
1561        }
1562    }
1563}
1564
1565#[derive(Debug, Clone, Serialize, Deserialize)]
1566#[serde(default)]
1567pub struct PushDefaultQuotas {
1568    pub acceptance_rps: u64,
1569    pub delivery_quota_daily: u64,
1570    pub fanout_max: u64,
1571    pub inflight_max: u64,
1572}
1573
1574impl Default for PushDefaultQuotas {
1575    fn default() -> Self {
1576        Self {
1577            acceptance_rps: 100,
1578            delivery_quota_daily: 0,
1579            fanout_max: 0,
1580            inflight_max: 1_000,
1581        }
1582    }
1583}
1584
1585#[derive(Debug, Clone, Serialize, Deserialize)]
1586#[serde(default)]
1587pub struct PushPayloadRedactionConfig {
1588    pub redact_payload: bool,
1589    pub redact_template_data: bool,
1590    pub redact_provider_overrides: bool,
1591    pub allow_debug_payload_logging: bool,
1592}
1593
1594impl Default for PushPayloadRedactionConfig {
1595    fn default() -> Self {
1596        Self {
1597            redact_payload: true,
1598            redact_template_data: true,
1599            redact_provider_overrides: true,
1600            allow_debug_payload_logging: false,
1601        }
1602    }
1603}
1604
1605#[derive(Debug, Clone, Serialize, Deserialize)]
1606#[serde(default)]
1607pub struct PresenceHistoryConfig {
1608    pub enabled: bool,
1609    pub retention_window_seconds: u64,
1610    pub max_page_size: usize,
1611    pub max_events_per_channel: Option<usize>,
1612    pub max_bytes_per_channel: Option<u64>,
1613}
1614
1615impl Default for PresenceHistoryConfig {
1616    fn default() -> Self {
1617        Self {
1618            enabled: false,
1619            retention_window_seconds: 86400,
1620            max_page_size: 100,
1621            max_events_per_channel: None,
1622            max_bytes_per_channel: None,
1623        }
1624    }
1625}
1626
1627#[derive(Debug, Clone, Serialize, Deserialize)]
1628#[serde(default)]
1629pub struct HttpApiConfig {
1630    pub request_limit_in_mb: u32,
1631    pub accept_traffic: AcceptTraffic,
1632    pub usage_enabled: bool,
1633}
1634
1635#[derive(Debug, Clone, Serialize, Deserialize)]
1636#[serde(default)]
1637pub struct AcceptTraffic {
1638    pub memory_threshold: f64,
1639}
1640
1641#[derive(Debug, Clone, Serialize, Deserialize)]
1642#[serde(default)]
1643pub struct InstanceConfig {
1644    pub process_id: String,
1645}
1646
1647#[derive(Debug, Clone, Serialize, Deserialize)]
1648#[serde(default)]
1649pub struct MetricsConfig {
1650    pub enabled: bool,
1651    pub driver: MetricsDriver,
1652    pub host: String,
1653    pub prometheus: PrometheusConfig,
1654    pub tcp_exporter: MetricsTcpExporterConfig,
1655    pub port: u16,
1656}
1657
1658#[derive(Debug, Clone, Serialize, Deserialize)]
1659#[serde(default)]
1660pub struct PrometheusConfig {
1661    pub prefix: String,
1662}
1663
1664#[derive(Debug, Clone, Serialize, Deserialize)]
1665#[serde(default)]
1666pub struct MetricsTcpExporterConfig {
1667    pub enabled: bool,
1668    pub host: String,
1669    pub port: u16,
1670    pub buffer_size: Option<usize>,
1671}
1672
1673#[derive(Debug, Clone, Serialize, Deserialize)]
1674#[serde(default)]
1675pub struct LoggingConfig {
1676    pub colors_enabled: bool,
1677    pub include_target: bool,
1678}
1679
1680#[derive(Debug, Clone, Serialize, Deserialize)]
1681#[serde(default)]
1682pub struct PresenceConfig {
1683    pub max_members_per_channel: u32,
1684    pub max_member_size_in_kb: u32,
1685}
1686
1687/// WebSocket connection buffer configuration
1688/// Controls backpressure handling for slow consumers
1689#[derive(Debug, Clone, Serialize, Deserialize)]
1690#[serde(default)]
1691pub struct WebSocketConfig {
1692    pub max_messages: Option<usize>,
1693    pub max_bytes: Option<usize>,
1694    pub disconnect_on_buffer_full: bool,
1695    pub max_message_size: usize,
1696    pub max_frame_size: usize,
1697    pub write_buffer_size: usize,
1698    pub max_backpressure: usize,
1699    pub auto_ping: bool,
1700    pub ping_interval: u32,
1701    pub idle_timeout: u32,
1702    pub compression: String,
1703}
1704
1705impl Default for WebSocketConfig {
1706    fn default() -> Self {
1707        Self {
1708            max_messages: Some(1000),
1709            max_bytes: None,
1710            disconnect_on_buffer_full: true,
1711            max_message_size: 64 * 1024 * 1024,
1712            max_frame_size: 16 * 1024 * 1024,
1713            write_buffer_size: 16 * 1024,
1714            max_backpressure: 1024 * 1024,
1715            auto_ping: true,
1716            ping_interval: 30,
1717            idle_timeout: 120,
1718            compression: "disabled".to_string(),
1719        }
1720    }
1721}
1722
1723impl WebSocketConfig {
1724    /// Convert to WebSocketBufferConfig for runtime use
1725    pub fn to_buffer_config(&self) -> crate::websocket::WebSocketBufferConfig {
1726        use crate::websocket::{BufferLimit, WebSocketBufferConfig};
1727
1728        let limit = match (self.max_messages, self.max_bytes) {
1729            (Some(messages), Some(bytes)) => BufferLimit::Both { messages, bytes },
1730            (Some(messages), None) => BufferLimit::Messages(messages),
1731            (None, Some(bytes)) => BufferLimit::Bytes(bytes),
1732            (None, None) => BufferLimit::Messages(1000),
1733        };
1734
1735        WebSocketBufferConfig {
1736            limit,
1737            disconnect_on_full: self.disconnect_on_buffer_full,
1738        }
1739    }
1740
1741    /// Convert to native sockudo-ws runtime configuration.
1742    pub fn to_sockudo_ws_config(
1743        &self,
1744        websocket_max_payload_kb: u32,
1745        activity_timeout: u64,
1746    ) -> sockudo_ws::Config {
1747        use sockudo_ws::Compression;
1748
1749        let compression = match self.compression.to_lowercase().as_str() {
1750            "dedicated" => Compression::Dedicated,
1751            "shared" => Compression::Shared,
1752            "window256b" => Compression::Window256B,
1753            "window1kb" => Compression::Window1KB,
1754            "window2kb" => Compression::Window2KB,
1755            "window4kb" => Compression::Window4KB,
1756            "window8kb" => Compression::Window8KB,
1757            "window16kb" => Compression::Window16KB,
1758            "window32kb" => Compression::Window32KB,
1759            _ => Compression::Disabled,
1760        };
1761
1762        sockudo_ws::Config::builder()
1763            .max_payload_length(
1764                self.max_bytes
1765                    .unwrap_or(websocket_max_payload_kb as usize * 1024),
1766            )
1767            .max_message_size(self.max_message_size)
1768            .max_frame_size(self.max_frame_size)
1769            .write_buffer_size(self.write_buffer_size)
1770            .max_backpressure(self.max_backpressure)
1771            .idle_timeout(self.idle_timeout)
1772            .auto_ping(self.auto_ping)
1773            .ping_interval(self.ping_interval.max((activity_timeout / 2).max(5) as u32))
1774            .compression(compression)
1775            .build()
1776    }
1777}
1778
1779#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1780#[serde(default)]
1781pub struct QueueConfig {
1782    pub driver: QueueDriver,
1783    pub redis: RedisQueueConfig,
1784    pub redis_cluster: RedisClusterQueueConfig,
1785    pub nats: NatsAdapterConfig,
1786    pub pulsar: PulsarAdapterConfig,
1787    pub rabbitmq: RabbitMqAdapterConfig,
1788    pub google_pubsub: GooglePubSubAdapterConfig,
1789    pub kafka: KafkaAdapterConfig,
1790    pub iggy: IggyConfig,
1791    pub sqs: SqsQueueConfig,
1792    pub sns: SnsQueueConfig,
1793}
1794
1795#[derive(Debug, Clone, Serialize, Deserialize)]
1796#[serde(default)]
1797pub struct RedisQueueConfig {
1798    pub concurrency: u32,
1799    pub prefix: Option<String>,
1800    pub url_override: Option<String>,
1801    pub cluster_mode: bool,
1802}
1803
1804#[derive(Clone, Debug, Serialize, Deserialize)]
1805#[serde(default)]
1806pub struct RateLimit {
1807    pub max_requests: u32,
1808    pub window_seconds: u64,
1809    pub identifier: Option<String>,
1810    pub trust_hops: Option<u32>,
1811}
1812
1813#[derive(Debug, Clone, Serialize, Deserialize)]
1814#[serde(default)]
1815pub struct RateLimiterConfig {
1816    pub enabled: bool,
1817    pub driver: CacheDriver,
1818    pub api_rate_limit: RateLimit,
1819    pub websocket_rate_limit: RateLimit,
1820    pub redis: RedisConfig,
1821}
1822
1823#[derive(Debug, Clone, Serialize, Deserialize)]
1824#[serde(default)]
1825pub struct SslConfig {
1826    pub enabled: bool,
1827    pub cert_path: String,
1828    pub key_path: String,
1829    pub passphrase: Option<String>,
1830    pub ca_path: Option<String>,
1831    pub redirect_http: bool,
1832    pub http_port: Option<u16>,
1833}
1834
1835#[derive(Debug, Clone, Serialize, Deserialize)]
1836#[serde(default)]
1837pub struct WebhooksConfig {
1838    pub batching: BatchingConfig,
1839    pub retry: WebhookRetryConfig,
1840    pub request_timeout_ms: u64,
1841}
1842
1843#[derive(Debug, Clone, Serialize, Deserialize)]
1844#[serde(default)]
1845pub struct BatchingConfig {
1846    pub enabled: bool,
1847    pub duration: u64,
1848    pub size: usize,
1849}
1850
1851#[derive(Debug, Clone, Serialize, Deserialize)]
1852#[serde(default)]
1853pub struct WebhookRetryConfig {
1854    pub enabled: bool,
1855    pub max_attempts: Option<u32>,
1856    pub max_elapsed_time_ms: u64,
1857    pub initial_backoff_ms: u64,
1858    pub max_backoff_ms: u64,
1859}
1860
1861impl Default for WebhooksConfig {
1862    fn default() -> Self {
1863        Self {
1864            batching: BatchingConfig::default(),
1865            retry: WebhookRetryConfig::default(),
1866            request_timeout_ms: 10_000,
1867        }
1868    }
1869}
1870
1871impl Default for WebhookRetryConfig {
1872    fn default() -> Self {
1873        Self {
1874            enabled: true,
1875            max_attempts: None,
1876            max_elapsed_time_ms: 300_000,
1877            initial_backoff_ms: 1_000,
1878            max_backoff_ms: 60_000,
1879        }
1880    }
1881}
1882
1883#[derive(Debug, Clone, Serialize, Deserialize)]
1884#[serde(default)]
1885pub struct ClusterHealthConfig {
1886    pub enabled: bool,
1887    pub heartbeat_interval_ms: u64,
1888    pub node_timeout_ms: u64,
1889    pub cleanup_interval_ms: u64,
1890}
1891
1892#[derive(Debug, Clone, Serialize, Deserialize)]
1893#[serde(default)]
1894pub struct UnixSocketConfig {
1895    pub enabled: bool,
1896    pub path: String,
1897    #[serde(deserialize_with = "deserialize_octal_permission")]
1898    pub permission_mode: u32,
1899}
1900
1901#[derive(Debug, Clone, Serialize, Deserialize)]
1902#[serde(default)]
1903pub struct DeltaCompressionOptionsConfig {
1904    pub enabled: bool,
1905    pub algorithm: String,
1906    pub full_message_interval: u32,
1907    pub min_message_size: usize,
1908    pub max_state_age_secs: u64,
1909    pub max_channel_states_per_socket: usize,
1910    pub max_conflation_states_per_channel: Option<usize>,
1911    pub conflation_key_path: Option<String>,
1912    pub cluster_coordination: bool,
1913    pub coordination_backend: DeltaCoordinationBackend,
1914    pub omit_delta_algorithm: bool,
1915}
1916
1917/// Cleanup system configuration (minimal version for options; full impl lives in sockudo crate)
1918#[derive(Debug, Clone, Serialize, Deserialize)]
1919#[serde(default)]
1920pub struct CleanupConfig {
1921    pub queue_buffer_size: usize,
1922    pub batch_size: usize,
1923    pub batch_timeout_ms: u64,
1924    pub worker_threads: WorkerThreadsConfig,
1925    pub max_retry_attempts: u32,
1926    pub async_enabled: bool,
1927    pub fallback_to_sync: bool,
1928}
1929
1930/// Worker threads configuration for the cleanup system
1931#[derive(Debug, Clone)]
1932pub enum WorkerThreadsConfig {
1933    Auto,
1934    Fixed(usize),
1935}
1936
1937impl serde::Serialize for WorkerThreadsConfig {
1938    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1939    where
1940        S: serde::Serializer,
1941    {
1942        match self {
1943            WorkerThreadsConfig::Auto => serializer.serialize_str("auto"),
1944            WorkerThreadsConfig::Fixed(n) => serializer.serialize_u64(*n as u64),
1945        }
1946    }
1947}
1948
1949impl<'de> serde::Deserialize<'de> for WorkerThreadsConfig {
1950    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1951    where
1952        D: serde::Deserializer<'de>,
1953    {
1954        use serde::de;
1955        struct WorkerThreadsVisitor;
1956        impl<'de> de::Visitor<'de> for WorkerThreadsVisitor {
1957            type Value = WorkerThreadsConfig;
1958
1959            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1960                formatter.write_str(r#""auto" or a positive integer"#)
1961            }
1962
1963            fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
1964                if value.eq_ignore_ascii_case("auto") {
1965                    Ok(WorkerThreadsConfig::Auto)
1966                } else if let Ok(n) = value.parse::<usize>() {
1967                    Ok(WorkerThreadsConfig::Fixed(n))
1968                } else {
1969                    Err(E::custom(format!(
1970                        "expected 'auto' or a number, got '{value}'"
1971                    )))
1972                }
1973            }
1974
1975            fn visit_u64<E: de::Error>(self, value: u64) -> Result<Self::Value, E> {
1976                Ok(WorkerThreadsConfig::Fixed(value as usize))
1977            }
1978
1979            fn visit_i64<E: de::Error>(self, value: i64) -> Result<Self::Value, E> {
1980                if value >= 0 {
1981                    Ok(WorkerThreadsConfig::Fixed(value as usize))
1982                } else {
1983                    Err(E::custom("worker_threads must be non-negative"))
1984                }
1985            }
1986        }
1987        deserializer.deserialize_any(WorkerThreadsVisitor)
1988    }
1989}
1990
1991impl Default for CleanupConfig {
1992    fn default() -> Self {
1993        Self {
1994            queue_buffer_size: 1024,
1995            batch_size: 64,
1996            batch_timeout_ms: 100,
1997            worker_threads: WorkerThreadsConfig::Auto,
1998            max_retry_attempts: 3,
1999            async_enabled: true,
2000            fallback_to_sync: true,
2001        }
2002    }
2003}
2004
2005impl CleanupConfig {
2006    pub fn validate(&self) -> Result<(), String> {
2007        if self.queue_buffer_size == 0 {
2008            return Err("queue_buffer_size must be greater than 0".to_string());
2009        }
2010        if self.batch_size == 0 {
2011            return Err("batch_size must be greater than 0".to_string());
2012        }
2013        if self.batch_timeout_ms == 0 {
2014            return Err("batch_timeout_ms must be greater than 0".to_string());
2015        }
2016        if let WorkerThreadsConfig::Fixed(n) = self.worker_threads
2017            && n == 0
2018        {
2019            return Err("worker_threads must be greater than 0 when using fixed count".to_string());
2020        }
2021        Ok(())
2022    }
2023}
2024
2025#[derive(Debug, Clone, Serialize, Deserialize, Default)]
2026#[serde(default)]
2027pub struct TagFilteringConfig {
2028    #[serde(default)]
2029    pub enabled: bool,
2030    #[serde(default = "default_true")]
2031    pub enable_tags: bool,
2032}
2033
2034fn default_true() -> bool {
2035    true
2036}
2037
2038// --- Default Implementations ---
2039
2040impl Default for ServerOptions {
2041    fn default() -> Self {
2042        Self {
2043            adapter: AdapterConfig::default(),
2044            app_manager: AppManagerConfig::default(),
2045            cache: CacheConfig::default(),
2046            channel_limits: ChannelLimits::default(),
2047            cors: CorsConfig::default(),
2048            database: DatabaseConfig::default(),
2049            database_pooling: DatabasePooling::default(),
2050            debug: false,
2051            tag_filtering: TagFilteringConfig::default(),
2052            event_limits: EventLimits::default(),
2053            host: "0.0.0.0".to_string(),
2054            http_api: HttpApiConfig::default(),
2055            instance: InstanceConfig::default(),
2056            logging: None,
2057            metrics: MetricsConfig::default(),
2058            mode: "production".to_string(),
2059            port: 6001,
2060            path_prefix: "/".to_string(),
2061            presence: PresenceConfig::default(),
2062            queue: QueueConfig::default(),
2063            rate_limiter: RateLimiterConfig::default(),
2064            shutdown_grace_period: 10,
2065            ssl: SslConfig::default(),
2066            user_authentication_timeout: 3600,
2067            webhooks: WebhooksConfig::default(),
2068            websocket_max_payload_kb: 64,
2069            cleanup: CleanupConfig::default(),
2070            activity_timeout: 120,
2071            cluster_health: ClusterHealthConfig::default(),
2072            unix_socket: UnixSocketConfig::default(),
2073            delta_compression: DeltaCompressionOptionsConfig::default(),
2074            websocket: WebSocketConfig::default(),
2075            connection_recovery: ConnectionRecoveryConfig::default(),
2076            history: HistoryConfig::default(),
2077            presence_history: PresenceHistoryConfig::default(),
2078            idempotency: IdempotencyConfig::default(),
2079            ephemeral: EphemeralConfig::default(),
2080            echo_control: EchoControlConfig::default(),
2081            event_name_filtering: EventNameFilteringConfig::default(),
2082            versioned_messages: VersionedMessagesConfig::default(),
2083            annotations: AnnotationsConfig::default(),
2084            push: PushConfig::default(),
2085            health_check_timeout_ms: 400,
2086        }
2087    }
2088}
2089
2090impl Default for SqsQueueConfig {
2091    fn default() -> Self {
2092        Self {
2093            region: "us-east-1".to_string(),
2094            queue_url_prefix: None,
2095            visibility_timeout: 30,
2096            endpoint_url: None,
2097            max_messages: 10,
2098            wait_time_seconds: 5,
2099            concurrency: 5,
2100            fifo: false,
2101            message_group_id: Some("default".to_string()),
2102        }
2103    }
2104}
2105
2106impl Default for SnsQueueConfig {
2107    fn default() -> Self {
2108        Self {
2109            region: "us-east-1".to_string(),
2110            topic_arn: String::new(),
2111            endpoint_url: None,
2112        }
2113    }
2114}
2115
2116impl Default for RedisAdapterConfig {
2117    fn default() -> Self {
2118        Self {
2119            requests_timeout: 5000,
2120            prefix: "sockudo_adapter:".to_string(),
2121            redis_pub_options: AHashMap::new(),
2122            redis_sub_options: AHashMap::new(),
2123            cluster_mode: false,
2124        }
2125    }
2126}
2127
2128impl Default for RedisClusterAdapterConfig {
2129    fn default() -> Self {
2130        Self {
2131            nodes: vec![],
2132            prefix: "sockudo_adapter:".to_string(),
2133            request_timeout_ms: 1000,
2134            use_connection_manager: true,
2135            use_sharded_pubsub: false,
2136        }
2137    }
2138}
2139
2140impl Default for NatsAdapterConfig {
2141    fn default() -> Self {
2142        Self {
2143            servers: vec!["nats://localhost:4222".to_string()],
2144            prefix: "sockudo_adapter:".to_string(),
2145            request_timeout_ms: 5000,
2146            username: None,
2147            password: None,
2148            token: None,
2149            connection_timeout_ms: 5000,
2150            nodes_number: None,
2151            discovery_max_wait_ms: 1000,
2152            discovery_idle_wait_ms: 150,
2153            subscription_capacity: None,
2154            client_capacity: None,
2155            max_reconnects: None,
2156            presence_sync_chunk_size: None,
2157        }
2158    }
2159}
2160
2161impl Default for PulsarAdapterConfig {
2162    fn default() -> Self {
2163        Self {
2164            url: "pulsar://127.0.0.1:6650".to_string(),
2165            prefix: "sockudo-adapter".to_string(),
2166            request_timeout_ms: 5000,
2167            token: None,
2168            nodes_number: None,
2169        }
2170    }
2171}
2172
2173impl Default for RabbitMqAdapterConfig {
2174    fn default() -> Self {
2175        Self {
2176            url: "amqp://guest:guest@127.0.0.1:5672/%2f".to_string(),
2177            prefix: "sockudo_adapter".to_string(),
2178            request_timeout_ms: 5000,
2179            connection_timeout_ms: 5000,
2180            nodes_number: None,
2181        }
2182    }
2183}
2184
2185impl Default for GooglePubSubAdapterConfig {
2186    fn default() -> Self {
2187        Self {
2188            project_id: "".to_string(),
2189            prefix: "sockudo-adapter".to_string(),
2190            request_timeout_ms: 5000,
2191            emulator_host: None,
2192            nodes_number: None,
2193        }
2194    }
2195}
2196
2197impl Default for KafkaAdapterConfig {
2198    fn default() -> Self {
2199        Self {
2200            brokers: vec!["localhost:9092".to_string()],
2201            prefix: "sockudo_adapter".to_string(),
2202            request_timeout_ms: 5000,
2203            security_protocol: None,
2204            sasl_mechanism: None,
2205            sasl_username: None,
2206            sasl_password: None,
2207            nodes_number: None,
2208        }
2209    }
2210}
2211
2212impl Default for IggyConfig {
2213    fn default() -> Self {
2214        Self {
2215            connection_string: "iggy://iggy:iggy@127.0.0.1:8090".to_string(),
2216            username: None,
2217            password: None,
2218            consumer_name: None,
2219            stream: "sockudo".to_string(),
2220            topic_prefix: "sockudo-adapter".to_string(),
2221            queue_topic_prefix: "sockudo-queue".to_string(),
2222            consumer_group_prefix: "sockudo-workers".to_string(),
2223            request_timeout_ms: 5000,
2224            poll_interval_ms: 50,
2225            poll_batch_size: 100,
2226            partitions_count: 1,
2227            partition_id: 0,
2228            auto_create: true,
2229            start_from_latest: true,
2230            nodes_number: None,
2231        }
2232    }
2233}
2234
2235impl Default for CacheSettings {
2236    fn default() -> Self {
2237        Self {
2238            enabled: true,
2239            ttl: 300,
2240        }
2241    }
2242}
2243
2244impl Default for MemoryCacheOptions {
2245    fn default() -> Self {
2246        Self {
2247            ttl: 300,
2248            cleanup_interval: 60,
2249            max_capacity: 10000,
2250        }
2251    }
2252}
2253
2254impl Default for CacheConfig {
2255    fn default() -> Self {
2256        Self {
2257            driver: CacheDriver::default(),
2258            redis: RedisConfig {
2259                prefix: Some("sockudo_cache:".to_string()),
2260                url_override: None,
2261                cluster_mode: false,
2262            },
2263            memory: MemoryCacheOptions::default(),
2264        }
2265    }
2266}
2267
2268impl Default for ChannelLimits {
2269    fn default() -> Self {
2270        Self {
2271            max_name_length: 200,
2272            cache_ttl: 3600,
2273        }
2274    }
2275}
2276impl Default for CorsConfig {
2277    fn default() -> Self {
2278        Self {
2279            credentials: true,
2280            origin: vec!["*".to_string()],
2281            methods: vec!["GET".to_string(), "POST".to_string(), "OPTIONS".to_string()],
2282            allowed_headers: vec![
2283                "Authorization".to_string(),
2284                "Content-Type".to_string(),
2285                "X-Requested-With".to_string(),
2286                "Accept".to_string(),
2287            ],
2288        }
2289    }
2290}
2291
2292impl Default for DatabaseConnection {
2293    fn default() -> Self {
2294        Self {
2295            host: "localhost".to_string(),
2296            port: 3306,
2297            username: "root".to_string(),
2298            password: "".to_string(),
2299            database: "sockudo".to_string(),
2300            table_name: "applications".to_string(),
2301            connection_pool_size: 10,
2302            pool_min: None,
2303            pool_max: None,
2304            cache_ttl: 300,
2305            cache_cleanup_interval: 60,
2306            cache_max_capacity: 100,
2307        }
2308    }
2309}
2310
2311impl Default for RedisConnection {
2312    fn default() -> Self {
2313        Self {
2314            host: "127.0.0.1".to_string(),
2315            port: 6379,
2316            db: 0,
2317            username: None,
2318            password: None,
2319            key_prefix: "sockudo:".to_string(),
2320            sentinels: Vec::new(),
2321            sentinel_password: None,
2322            name: "mymaster".to_string(),
2323            cluster: RedisClusterConnection::default(),
2324            cluster_nodes: Vec::new(),
2325        }
2326    }
2327}
2328
2329impl Default for RedisSentinel {
2330    fn default() -> Self {
2331        Self {
2332            host: "localhost".to_string(),
2333            port: 26379,
2334        }
2335    }
2336}
2337
2338impl Default for ClusterNode {
2339    fn default() -> Self {
2340        Self {
2341            host: "127.0.0.1".to_string(),
2342            port: 7000,
2343        }
2344    }
2345}
2346
2347impl Default for DatabasePooling {
2348    fn default() -> Self {
2349        Self {
2350            enabled: true,
2351            min: 2,
2352            max: 10,
2353        }
2354    }
2355}
2356
2357impl Default for EventLimits {
2358    fn default() -> Self {
2359        Self {
2360            max_channels_at_once: 100,
2361            max_name_length: 200,
2362            max_payload_in_kb: 100,
2363            max_batch_size: 10,
2364        }
2365    }
2366}
2367
2368impl Default for IdempotencyConfig {
2369    fn default() -> Self {
2370        Self {
2371            enabled: true,
2372            ttl_seconds: 120,
2373            max_key_length: 128,
2374        }
2375    }
2376}
2377
2378impl Default for EphemeralConfig {
2379    fn default() -> Self {
2380        Self { enabled: true }
2381    }
2382}
2383
2384impl Default for EchoControlConfig {
2385    fn default() -> Self {
2386        Self {
2387            enabled: true,
2388            default_echo_messages: true,
2389        }
2390    }
2391}
2392
2393impl Default for EventNameFilteringConfig {
2394    fn default() -> Self {
2395        Self {
2396            enabled: true,
2397            max_events_per_filter: 50,
2398            max_event_name_length: 200,
2399        }
2400    }
2401}
2402
2403impl Default for ConnectionRecoveryConfig {
2404    fn default() -> Self {
2405        Self {
2406            enabled: false,
2407            buffer_ttl_seconds: 120,
2408            max_buffer_size: 100,
2409        }
2410    }
2411}
2412
2413impl Default for HttpApiConfig {
2414    fn default() -> Self {
2415        Self {
2416            request_limit_in_mb: 10,
2417            accept_traffic: AcceptTraffic::default(),
2418            usage_enabled: true,
2419        }
2420    }
2421}
2422
2423impl Default for AcceptTraffic {
2424    fn default() -> Self {
2425        Self {
2426            memory_threshold: 0.90,
2427        }
2428    }
2429}
2430
2431impl Default for InstanceConfig {
2432    fn default() -> Self {
2433        Self {
2434            process_id: uuid::Uuid::new_v4().to_string(),
2435        }
2436    }
2437}
2438
2439impl Default for LoggingConfig {
2440    fn default() -> Self {
2441        Self {
2442            colors_enabled: true,
2443            include_target: true,
2444        }
2445    }
2446}
2447
2448impl Default for MetricsConfig {
2449    fn default() -> Self {
2450        Self {
2451            enabled: true,
2452            driver: MetricsDriver::default(),
2453            host: "0.0.0.0".to_string(),
2454            prometheus: PrometheusConfig::default(),
2455            tcp_exporter: MetricsTcpExporterConfig::default(),
2456            port: 9601,
2457        }
2458    }
2459}
2460
2461impl Default for PrometheusConfig {
2462    fn default() -> Self {
2463        Self {
2464            prefix: "sockudo_".to_string(),
2465        }
2466    }
2467}
2468
2469impl Default for MetricsTcpExporterConfig {
2470    fn default() -> Self {
2471        Self {
2472            enabled: false,
2473            host: "127.0.0.1".to_string(),
2474            port: 5000,
2475            buffer_size: Some(1024),
2476        }
2477    }
2478}
2479
2480impl Default for PresenceConfig {
2481    fn default() -> Self {
2482        Self {
2483            max_members_per_channel: 100,
2484            max_member_size_in_kb: 2,
2485        }
2486    }
2487}
2488
2489impl Default for RedisQueueConfig {
2490    fn default() -> Self {
2491        Self {
2492            concurrency: 5,
2493            prefix: Some("sockudo_queue:".to_string()),
2494            url_override: None,
2495            cluster_mode: false,
2496        }
2497    }
2498}
2499
2500impl Default for RateLimit {
2501    fn default() -> Self {
2502        Self {
2503            max_requests: 60,
2504            window_seconds: 60,
2505            identifier: Some("default".to_string()),
2506            trust_hops: Some(0),
2507        }
2508    }
2509}
2510
2511impl Default for RateLimiterConfig {
2512    fn default() -> Self {
2513        Self {
2514            enabled: true,
2515            driver: CacheDriver::Memory,
2516            api_rate_limit: RateLimit {
2517                max_requests: 100,
2518                window_seconds: 60,
2519                identifier: Some("api".to_string()),
2520                trust_hops: Some(0),
2521            },
2522            websocket_rate_limit: RateLimit {
2523                max_requests: 20,
2524                window_seconds: 60,
2525                identifier: Some("websocket_connect".to_string()),
2526                trust_hops: Some(0),
2527            },
2528            redis: RedisConfig {
2529                prefix: Some("sockudo_rl:".to_string()),
2530                url_override: None,
2531                cluster_mode: false,
2532            },
2533        }
2534    }
2535}
2536
2537impl Default for SslConfig {
2538    fn default() -> Self {
2539        Self {
2540            enabled: false,
2541            cert_path: "".to_string(),
2542            key_path: "".to_string(),
2543            passphrase: None,
2544            ca_path: None,
2545            redirect_http: false,
2546            http_port: Some(80),
2547        }
2548    }
2549}
2550
2551impl Default for BatchingConfig {
2552    fn default() -> Self {
2553        Self {
2554            enabled: true,
2555            duration: 50,
2556            size: 100,
2557        }
2558    }
2559}
2560
2561impl Default for ClusterHealthConfig {
2562    fn default() -> Self {
2563        Self {
2564            enabled: true,
2565            heartbeat_interval_ms: 10000,
2566            node_timeout_ms: 30000,
2567            cleanup_interval_ms: 10000,
2568        }
2569    }
2570}
2571
2572impl Default for UnixSocketConfig {
2573    fn default() -> Self {
2574        Self {
2575            enabled: false,
2576            path: "/var/run/sockudo/sockudo.sock".to_string(),
2577            permission_mode: 0o660,
2578        }
2579    }
2580}
2581
2582impl Default for DeltaCompressionOptionsConfig {
2583    fn default() -> Self {
2584        Self {
2585            enabled: true,
2586            algorithm: "fossil".to_string(),
2587            full_message_interval: 10,
2588            min_message_size: 100,
2589            max_state_age_secs: 300,
2590            max_channel_states_per_socket: 100,
2591            max_conflation_states_per_channel: Some(100),
2592            conflation_key_path: None,
2593            cluster_coordination: false,
2594            coordination_backend: DeltaCoordinationBackend::Auto,
2595            omit_delta_algorithm: false,
2596        }
2597    }
2598}
2599
2600#[cfg(test)]
2601mod tests {
2602    use super::{
2603        DeltaCoordinationBackend, PushQueueDriver, PushStorageDriver, QueueDriver, ServerOptions,
2604        VersionStoreDriver,
2605    };
2606    use crate::app::{App, AppPolicy};
2607    use std::str::FromStr;
2608
2609    const APP_BOOTSTRAP_ENV_KEYS: &[&str] = &[
2610        "SOCKUDO_DEFAULT_APP_ID",
2611        "SOCKUDO_DEFAULT_APP_KEY",
2612        "SOCKUDO_DEFAULT_APP_SECRET",
2613        "SOCKUDO_DEFAULT_APP_ENABLED",
2614        "SOCKUDO_SKIP_INLINE_APPS",
2615        "APP_MANAGER_REGISTER_INLINE_APPS",
2616    ];
2617
2618    struct EnvGuard {
2619        previous: Vec<(&'static str, Option<String>)>,
2620    }
2621
2622    impl EnvGuard {
2623        fn app_bootstrap(overrides: &[(&'static str, &'static str)]) -> Self {
2624            let previous = APP_BOOTSTRAP_ENV_KEYS
2625                .iter()
2626                .map(|key| (*key, std::env::var(key).ok()))
2627                .collect();
2628
2629            // SAFETY: These tests isolate the app-bootstrap environment keys
2630            // before applying per-test overrides and restore them in Drop.
2631            unsafe {
2632                for key in APP_BOOTSTRAP_ENV_KEYS {
2633                    std::env::remove_var(key);
2634                }
2635                for (key, value) in overrides {
2636                    std::env::set_var(key, value);
2637                }
2638            }
2639
2640            Self { previous }
2641        }
2642    }
2643
2644    impl Drop for EnvGuard {
2645        fn drop(&mut self) {
2646            // SAFETY: Restores each key to its pre-test value or removes it if
2647            // it did not exist before the test.
2648            unsafe {
2649                for (key, value) in &self.previous {
2650                    if let Some(value) = value {
2651                        std::env::set_var(key, value);
2652                    } else {
2653                        std::env::remove_var(key);
2654                    }
2655                }
2656            }
2657        }
2658    }
2659
2660    fn inline_test_app() -> App {
2661        App::from_policy(
2662            "app-id".to_string(),
2663            "app-key".to_string(),
2664            "app-secret".to_string(),
2665            true,
2666            AppPolicy::default(),
2667        )
2668    }
2669
2670    #[test]
2671    fn queue_driver_parses_broker_backends() {
2672        assert_eq!(
2673            QueueDriver::from_str("rabbitmq").unwrap(),
2674            QueueDriver::RabbitMq
2675        );
2676        assert_eq!(QueueDriver::from_str("kafka").unwrap(), QueueDriver::Kafka);
2677        assert_eq!(
2678            QueueDriver::from_str("pulsar").unwrap(),
2679            QueueDriver::Pulsar
2680        );
2681        assert_eq!(
2682            QueueDriver::from_str("google-pubsub").unwrap(),
2683            QueueDriver::GooglePubSub
2684        );
2685    }
2686
2687    #[test]
2688    fn delta_coordination_backend_parses_expected_values() {
2689        assert_eq!(
2690            DeltaCoordinationBackend::from_str("auto").unwrap(),
2691            DeltaCoordinationBackend::Auto
2692        );
2693        assert_eq!(
2694            DeltaCoordinationBackend::from_str("redis-cluster").unwrap(),
2695            DeltaCoordinationBackend::RedisCluster
2696        );
2697        assert_eq!(
2698            DeltaCoordinationBackend::from_str("nats").unwrap(),
2699            DeltaCoordinationBackend::Nats
2700        );
2701    }
2702
2703    #[tokio::test]
2704    async fn app_bootstrap_env_overrides_inline_apps() {
2705        {
2706            let _env = EnvGuard::app_bootstrap(&[("SOCKUDO_DEFAULT_APP_ENABLED", "false")]);
2707            let mut options = ServerOptions::default();
2708            options.app_manager.array.apps.push(inline_test_app());
2709
2710            options.override_from_env().await.unwrap();
2711
2712            assert!(options.app_manager.array.apps.is_empty());
2713        }
2714
2715        {
2716            let _env = EnvGuard::app_bootstrap(&[("SOCKUDO_DEFAULT_APP_ENABLED", "true")]);
2717            let mut options = ServerOptions::default();
2718            options.app_manager.array.apps.push(inline_test_app());
2719
2720            options.override_from_env().await.unwrap();
2721
2722            assert_eq!(options.app_manager.array.apps.len(), 1);
2723            assert_eq!(options.app_manager.array.apps[0].id, "app-id");
2724        }
2725
2726        {
2727            let _env = EnvGuard::app_bootstrap(&[
2728                ("SOCKUDO_DEFAULT_APP_ID", "prod-app"),
2729                ("SOCKUDO_DEFAULT_APP_KEY", "prod-key"),
2730                ("SOCKUDO_DEFAULT_APP_SECRET", "prod-secret"),
2731                ("SOCKUDO_DEFAULT_APP_ENABLED", "true"),
2732            ]);
2733            let mut options = ServerOptions::default();
2734            options.app_manager.array.apps.push(inline_test_app());
2735
2736            options.override_from_env().await.unwrap();
2737
2738            assert_eq!(options.app_manager.array.apps.len(), 1);
2739            let app = &options.app_manager.array.apps[0];
2740            assert_eq!(app.id, "prod-app");
2741            assert_eq!(app.key, "prod-key");
2742            assert_eq!(app.secret, "prod-secret");
2743            assert!(app.enabled);
2744        }
2745
2746        {
2747            let _env = EnvGuard::app_bootstrap(&[("APP_MANAGER_REGISTER_INLINE_APPS", "false")]);
2748            let mut options = ServerOptions::default();
2749            options.app_manager.array.apps.push(inline_test_app());
2750
2751            options.override_from_env().await.unwrap();
2752
2753            assert!(options.app_manager.array.apps.is_empty());
2754        }
2755    }
2756
2757    #[tokio::test]
2758    async fn versioned_messages_driver_overrides_from_env() {
2759        let previous = std::env::var("VERSIONED_MESSAGES_DRIVER").ok();
2760        // SAFETY: This test controls the environment variable lifecycle for a
2761        // single key and restores the prior value before it returns.
2762        unsafe { std::env::set_var("VERSIONED_MESSAGES_DRIVER", "postgres") };
2763
2764        let mut options = ServerOptions::default();
2765        options.override_from_env().await.unwrap();
2766
2767        if let Some(previous) = previous {
2768            // SAFETY: Restoring the pre-test value for the same key.
2769            unsafe { std::env::set_var("VERSIONED_MESSAGES_DRIVER", previous) };
2770        } else {
2771            // SAFETY: Removing the test-only environment variable before exit.
2772            unsafe { std::env::remove_var("VERSIONED_MESSAGES_DRIVER") };
2773        }
2774
2775        assert_eq!(
2776            options.versioned_messages.driver,
2777            VersionStoreDriver::Postgres
2778        );
2779    }
2780
2781    #[tokio::test]
2782    async fn push_storage_driver_overrides_from_env() {
2783        let previous = std::env::var("PUSH_STORAGE_DRIVER").ok();
2784        // SAFETY: This test controls the environment variable lifecycle for a
2785        // single key and restores the prior value before it returns.
2786        unsafe { std::env::set_var("PUSH_STORAGE_DRIVER", "mysql") };
2787
2788        let mut options = ServerOptions::default();
2789        options.override_from_env().await.unwrap();
2790
2791        if let Some(previous) = previous {
2792            // SAFETY: Restoring the pre-test value for the same key.
2793            unsafe { std::env::set_var("PUSH_STORAGE_DRIVER", previous) };
2794        } else {
2795            // SAFETY: Removing the test-only environment variable before exit.
2796            unsafe { std::env::remove_var("PUSH_STORAGE_DRIVER") };
2797        }
2798
2799        assert_eq!(options.push.storage_driver, PushStorageDriver::Mysql);
2800    }
2801
2802    #[tokio::test]
2803    async fn push_queue_driver_overrides_from_env() {
2804        let previous = std::env::var("PUSH_QUEUE_DRIVER").ok();
2805        // SAFETY: This test controls the environment variable lifecycle for a
2806        // single key and restores the prior value before it returns.
2807        unsafe { std::env::set_var("PUSH_QUEUE_DRIVER", "redis-cluster") };
2808
2809        let mut options = ServerOptions::default();
2810        options.override_from_env().await.unwrap();
2811
2812        if let Some(previous) = previous {
2813            // SAFETY: Restoring the pre-test value for the same key.
2814            unsafe { std::env::set_var("PUSH_QUEUE_DRIVER", previous) };
2815        } else {
2816            // SAFETY: Removing the test-only environment variable before exit.
2817            unsafe { std::env::remove_var("PUSH_QUEUE_DRIVER") };
2818        }
2819
2820        assert_eq!(options.push.queue_driver, PushQueueDriver::RedisCluster);
2821    }
2822
2823    #[tokio::test]
2824    async fn websocket_rate_limit_trust_hops_overrides_from_env() {
2825        let previous = std::env::var("RATE_LIMITER_WS_TRUST_HOPS").ok();
2826        // SAFETY: This test controls the environment variable lifecycle for a
2827        // single key and restores the prior value before it returns.
2828        unsafe { std::env::set_var("RATE_LIMITER_WS_TRUST_HOPS", "2") };
2829
2830        let mut options = ServerOptions::default();
2831        options.override_from_env().await.unwrap();
2832
2833        if let Some(previous) = previous {
2834            // SAFETY: Restoring the pre-test value for the same key.
2835            unsafe { std::env::set_var("RATE_LIMITER_WS_TRUST_HOPS", previous) };
2836        } else {
2837            // SAFETY: Removing the test-only environment variable before exit.
2838            unsafe { std::env::remove_var("RATE_LIMITER_WS_TRUST_HOPS") };
2839        }
2840
2841        assert_eq!(
2842            options.rate_limiter.websocket_rate_limit.trust_hops,
2843            Some(2)
2844        );
2845    }
2846
2847    #[test]
2848    fn push_config_defaults_follow_capacity_model() {
2849        let options = ServerOptions::default();
2850
2851        assert_eq!(options.push.storage_driver, PushStorageDriver::Memory);
2852        assert_eq!(options.push.queue_driver, PushQueueDriver::Memory);
2853        assert!(!options.push.fcm_enabled);
2854        assert!(!options.push.apns_enabled);
2855        assert!(!options.push.webpush_enabled);
2856        assert!(!options.push.hms_enabled);
2857        assert!(!options.push.wns_enabled);
2858        assert_eq!(options.push.fanout_fast_threshold, 10_000);
2859        assert_eq!(options.push.fanout_shard_size, 100_000);
2860        assert_eq!(options.push.publish_status_ttl_days, 30);
2861        assert_eq!(options.push.default_quotas.acceptance_rps, 100);
2862        assert_eq!(options.push.circuit_breaker.failure_threshold, 5);
2863        assert!(options.push.payload_redaction.redact_payload);
2864    }
2865
2866    #[tokio::test]
2867    async fn push_release_env_overrides_are_parsed() {
2868        let keys = [
2869            "PUSH_FCM_ENABLED",
2870            "PUSH_APNS_ENABLED",
2871            "PUSH_WEBPUSH_ENABLED",
2872            "PUSH_HMS_ENABLED",
2873            "PUSH_WNS_ENABLED",
2874            "PUSH_CREDENTIAL_ENCRYPTION_KEY",
2875            "PUSH_FANOUT_FAST_THRESHOLD",
2876            "PUSH_FANOUT_SHARD_SIZE",
2877            "PUSH_FANOUT_SYNC_THRESHOLD",
2878            "PUSH_BACKPRESSURE_LAG_THRESHOLD_SECS",
2879            "PUSH_PUBLISH_STATUS_TTL_DAYS",
2880            "PUSH_FAILURE_THRESHOLD",
2881            "PUSH_SCHEDULER_INTERVAL_SECS",
2882            "PUSH_STALE_DEVICE_MAX_AGE_DAYS",
2883            "PUSH_ANALYTICS_ENABLED",
2884            "PUSH_DEFAULT_ACCEPTANCE_RPS",
2885            "PUSH_DEFAULT_DELIVERY_QUOTA_DAILY",
2886            "PUSH_DEFAULT_FANOUT_MAX",
2887            "PUSH_DEFAULT_INFLIGHT_MAX",
2888        ];
2889        let previous: Vec<_> = keys
2890            .iter()
2891            .map(|key| (*key, std::env::var(key).ok()))
2892            .collect();
2893
2894        // SAFETY: This test owns the listed environment variables and restores
2895        // their previous values before returning.
2896        unsafe {
2897            std::env::set_var("PUSH_FCM_ENABLED", "true");
2898            std::env::set_var("PUSH_APNS_ENABLED", "true");
2899            std::env::set_var("PUSH_WEBPUSH_ENABLED", "true");
2900            std::env::set_var("PUSH_HMS_ENABLED", "true");
2901            std::env::set_var("PUSH_WNS_ENABLED", "true");
2902            std::env::set_var("PUSH_CREDENTIAL_ENCRYPTION_KEY", "env:key:v1");
2903            std::env::set_var("PUSH_FANOUT_FAST_THRESHOLD", "12345");
2904            std::env::set_var("PUSH_FANOUT_SHARD_SIZE", "54321");
2905            std::env::set_var("PUSH_FANOUT_SYNC_THRESHOLD", "250");
2906            std::env::set_var("PUSH_BACKPRESSURE_LAG_THRESHOLD_SECS", "42");
2907            std::env::set_var("PUSH_PUBLISH_STATUS_TTL_DAYS", "14");
2908            std::env::set_var("PUSH_FAILURE_THRESHOLD", "9");
2909            std::env::set_var("PUSH_SCHEDULER_INTERVAL_SECS", "11");
2910            std::env::set_var("PUSH_STALE_DEVICE_MAX_AGE_DAYS", "120");
2911            std::env::set_var("PUSH_ANALYTICS_ENABLED", "true");
2912            std::env::set_var("PUSH_DEFAULT_ACCEPTANCE_RPS", "700");
2913            std::env::set_var("PUSH_DEFAULT_DELIVERY_QUOTA_DAILY", "8000");
2914            std::env::set_var("PUSH_DEFAULT_FANOUT_MAX", "9000");
2915            std::env::set_var("PUSH_DEFAULT_INFLIGHT_MAX", "1000");
2916        }
2917
2918        let mut options = ServerOptions::default();
2919        options.override_from_env().await.unwrap();
2920
2921        for (key, value) in previous {
2922            // SAFETY: Restoring each pre-test value or removing the test-only
2923            // variable for the same key.
2924            unsafe {
2925                if let Some(value) = value {
2926                    std::env::set_var(key, value);
2927                } else {
2928                    std::env::remove_var(key);
2929                }
2930            }
2931        }
2932
2933        assert!(options.push.fcm_enabled);
2934        assert!(options.push.apns_enabled);
2935        assert!(options.push.webpush_enabled);
2936        assert!(options.push.hms_enabled);
2937        assert!(options.push.wns_enabled);
2938        assert_eq!(
2939            options.push.credential_encryption_key.as_deref(),
2940            Some("env:key:v1")
2941        );
2942        assert_eq!(options.push.fanout_fast_threshold, 12_345);
2943        assert_eq!(options.push.fanout_shard_size, 54_321);
2944        assert_eq!(options.push.fanout_sync_threshold, 250);
2945        assert_eq!(options.push.backpressure_lag_threshold_secs, 42);
2946        assert_eq!(options.push.publish_status_ttl_days, 14);
2947        assert_eq!(options.push.circuit_breaker.failure_threshold, 9);
2948        assert_eq!(options.push.scheduler_interval_secs, 11);
2949        assert_eq!(options.push.stale_device_max_age_days, 120);
2950        assert!(options.push.analytics_enabled);
2951        assert_eq!(options.push.default_quotas.acceptance_rps, 700);
2952        assert_eq!(options.push.default_quotas.delivery_quota_daily, 8_000);
2953        assert_eq!(options.push.default_quotas.fanout_max, 9_000);
2954        assert_eq!(options.push.default_quotas.inflight_max, 1_000);
2955    }
2956}
2957
2958impl ClusterHealthConfig {
2959    pub fn validate(&self) -> Result<(), String> {
2960        if self.heartbeat_interval_ms == 0 {
2961            return Err("heartbeat_interval_ms must be greater than 0".to_string());
2962        }
2963        if self.node_timeout_ms == 0 {
2964            return Err("node_timeout_ms must be greater than 0".to_string());
2965        }
2966        if self.cleanup_interval_ms == 0 {
2967            return Err("cleanup_interval_ms must be greater than 0".to_string());
2968        }
2969
2970        if self.heartbeat_interval_ms > self.node_timeout_ms / 3 {
2971            return Err(format!(
2972                "heartbeat_interval_ms ({}) should be at least 3x smaller than node_timeout_ms ({}) to avoid false positive dead node detection. Recommended: heartbeat_interval_ms <= {}",
2973                self.heartbeat_interval_ms,
2974                self.node_timeout_ms,
2975                self.node_timeout_ms / 3
2976            ));
2977        }
2978
2979        if self.cleanup_interval_ms > self.node_timeout_ms {
2980            return Err(format!(
2981                "cleanup_interval_ms ({}) should not be larger than node_timeout_ms ({}) to ensure timely dead node detection",
2982                self.cleanup_interval_ms, self.node_timeout_ms
2983            ));
2984        }
2985
2986        Ok(())
2987    }
2988}
2989
2990impl ServerOptions {
2991    pub async fn load_from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
2992        let content = tokio::fs::read_to_string(path).await?;
2993        let options: Self = if path.ends_with(".toml") {
2994            toml::from_str(&content)?
2995        } else {
2996            // Legacy JSON support
2997            sonic_rs::from_str(&content)?
2998        };
2999        Ok(options)
3000    }
3001
3002    pub async fn override_from_env(&mut self) -> Result<(), Box<dyn std::error::Error>> {
3003        // --- General Settings ---
3004        if let Ok(mode) = std::env::var("ENVIRONMENT") {
3005            self.mode = mode;
3006        }
3007        self.debug = parse_bool_env("DEBUG_MODE", self.debug);
3008        if parse_bool_env("DEBUG", false) {
3009            self.debug = true;
3010            info!("DEBUG environment variable forces debug mode ON");
3011        }
3012
3013        self.activity_timeout = parse_env::<u64>("ACTIVITY_TIMEOUT", self.activity_timeout);
3014
3015        if let Ok(host) = std::env::var("HOST") {
3016            self.host = host;
3017        }
3018        self.port = parse_env::<u16>("PORT", self.port);
3019        self.shutdown_grace_period =
3020            parse_env::<u64>("SHUTDOWN_GRACE_PERIOD", self.shutdown_grace_period);
3021        self.user_authentication_timeout = parse_env::<u64>(
3022            "USER_AUTHENTICATION_TIMEOUT",
3023            self.user_authentication_timeout,
3024        );
3025        self.websocket_max_payload_kb =
3026            parse_env::<u32>("WEBSOCKET_MAX_PAYLOAD_KB", self.websocket_max_payload_kb);
3027        if let Ok(id) = std::env::var("INSTANCE_PROCESS_ID") {
3028            self.instance.process_id = id;
3029        }
3030
3031        // --- Driver Configuration ---
3032        if let Ok(driver_str) = std::env::var("ADAPTER_DRIVER") {
3033            self.adapter.driver =
3034                parse_driver_enum(driver_str, self.adapter.driver.clone(), "Adapter");
3035        }
3036        self.adapter.buffer_multiplier_per_cpu = parse_env::<usize>(
3037            "ADAPTER_BUFFER_MULTIPLIER_PER_CPU",
3038            self.adapter.buffer_multiplier_per_cpu,
3039        );
3040        self.adapter.enable_socket_counting = parse_env::<bool>(
3041            "ADAPTER_ENABLE_SOCKET_COUNTING",
3042            self.adapter.enable_socket_counting,
3043        );
3044        self.adapter.fallback_to_local =
3045            parse_env::<bool>("ADAPTER_FALLBACK_TO_LOCAL", self.adapter.fallback_to_local);
3046        if let Ok(driver_str) = std::env::var("CACHE_DRIVER") {
3047            self.cache.driver = parse_driver_enum(driver_str, self.cache.driver.clone(), "Cache");
3048        }
3049        if let Ok(driver_str) = std::env::var("QUEUE_DRIVER") {
3050            self.queue.driver = parse_driver_enum(driver_str, self.queue.driver.clone(), "Queue");
3051        }
3052        if let Ok(driver_str) = std::env::var("APP_MANAGER_DRIVER") {
3053            self.app_manager.driver =
3054                parse_driver_enum(driver_str, self.app_manager.driver.clone(), "AppManager");
3055        }
3056        if let Ok(driver_str) = std::env::var("RATE_LIMITER_DRIVER") {
3057            self.rate_limiter.driver = parse_driver_enum(
3058                driver_str,
3059                self.rate_limiter.driver.clone(),
3060                "RateLimiter Backend",
3061            );
3062        }
3063
3064        // --- Database: Redis ---
3065        if let Ok(host) = std::env::var("DATABASE_REDIS_HOST") {
3066            self.database.redis.host = host;
3067        }
3068        self.database.redis.port =
3069            parse_env::<u16>("DATABASE_REDIS_PORT", self.database.redis.port);
3070        if let Ok(username) = std::env::var("DATABASE_REDIS_USERNAME") {
3071            self.database.redis.username = if username.is_empty() {
3072                None
3073            } else {
3074                Some(username)
3075            };
3076        }
3077        if let Ok(password) = std::env::var("DATABASE_REDIS_PASSWORD") {
3078            self.database.redis.password = Some(password);
3079        }
3080        self.database.redis.db = parse_env::<u32>("DATABASE_REDIS_DB", self.database.redis.db);
3081        if let Ok(prefix) = std::env::var("DATABASE_REDIS_KEY_PREFIX") {
3082            self.database.redis.key_prefix = prefix;
3083        }
3084        if let Ok(cluster_username) = std::env::var("DATABASE_REDIS_CLUSTER_USERNAME") {
3085            self.database.redis.cluster.username = if cluster_username.is_empty() {
3086                None
3087            } else {
3088                Some(cluster_username)
3089            };
3090        }
3091        if let Ok(cluster_password) = std::env::var("DATABASE_REDIS_CLUSTER_PASSWORD") {
3092            self.database.redis.cluster.password = Some(cluster_password);
3093        }
3094        self.database.redis.cluster.use_tls = parse_bool_env(
3095            "DATABASE_REDIS_CLUSTER_USE_TLS",
3096            self.database.redis.cluster.use_tls,
3097        );
3098
3099        // --- Database: MySQL ---
3100        if let Ok(host) = std::env::var("DATABASE_MYSQL_HOST") {
3101            self.database.mysql.host = host;
3102        }
3103        self.database.mysql.port =
3104            parse_env::<u16>("DATABASE_MYSQL_PORT", self.database.mysql.port);
3105        if let Ok(user) = std::env::var("DATABASE_MYSQL_USERNAME") {
3106            self.database.mysql.username = user;
3107        }
3108        if let Ok(pass) = std::env::var("DATABASE_MYSQL_PASSWORD") {
3109            self.database.mysql.password = pass;
3110        }
3111        if let Ok(db) = std::env::var("DATABASE_MYSQL_DATABASE") {
3112            self.database.mysql.database = db;
3113        }
3114        if let Ok(table) = std::env::var("DATABASE_MYSQL_TABLE_NAME") {
3115            self.database.mysql.table_name = table;
3116        }
3117        override_db_pool_settings(&mut self.database.mysql, "DATABASE_MYSQL");
3118
3119        // --- Database: PostgreSQL ---
3120        if let Ok(host) = std::env::var("DATABASE_POSTGRES_HOST") {
3121            self.database.postgres.host = host;
3122        }
3123        self.database.postgres.port =
3124            parse_env::<u16>("DATABASE_POSTGRES_PORT", self.database.postgres.port);
3125        if let Ok(user) = std::env::var("DATABASE_POSTGRES_USERNAME") {
3126            self.database.postgres.username = user;
3127        }
3128        if let Ok(pass) = std::env::var("DATABASE_POSTGRES_PASSWORD") {
3129            self.database.postgres.password = pass;
3130        }
3131        if let Ok(db) = std::env::var("DATABASE_POSTGRES_DATABASE") {
3132            self.database.postgres.database = db;
3133        }
3134        override_db_pool_settings(&mut self.database.postgres, "DATABASE_POSTGRES");
3135
3136        // --- Database: DynamoDB ---
3137        if let Ok(region) = std::env::var("DATABASE_DYNAMODB_REGION") {
3138            self.database.dynamodb.region = region;
3139        }
3140        if let Ok(table) = std::env::var("DATABASE_DYNAMODB_TABLE_NAME") {
3141            self.database.dynamodb.table_name = table;
3142        }
3143        if let Ok(endpoint) = std::env::var("DATABASE_DYNAMODB_ENDPOINT_URL") {
3144            self.database.dynamodb.endpoint_url = Some(endpoint);
3145        }
3146        if let Ok(key_id) = std::env::var("AWS_ACCESS_KEY_ID") {
3147            self.database.dynamodb.aws_access_key_id = Some(key_id);
3148        }
3149        if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") {
3150            self.database.dynamodb.aws_secret_access_key = Some(secret);
3151        }
3152
3153        // --- Database: SurrealDB ---
3154        if let Ok(url) = std::env::var("DATABASE_SURREALDB_URL") {
3155            self.database.surrealdb.url = url;
3156        }
3157        if let Ok(namespace) = std::env::var("DATABASE_SURREALDB_NAMESPACE") {
3158            self.database.surrealdb.namespace = namespace;
3159        }
3160        if let Ok(database) = std::env::var("DATABASE_SURREALDB_DATABASE") {
3161            self.database.surrealdb.database = database;
3162        }
3163        if let Ok(username) = std::env::var("DATABASE_SURREALDB_USERNAME") {
3164            self.database.surrealdb.username = username;
3165        }
3166        if let Ok(password) = std::env::var("DATABASE_SURREALDB_PASSWORD") {
3167            self.database.surrealdb.password = password;
3168        }
3169        if let Ok(table) = std::env::var("DATABASE_SURREALDB_TABLE_NAME") {
3170            self.database.surrealdb.table_name = table;
3171        }
3172
3173        // --- Redis Cluster ---
3174        let apply_redis_cluster_nodes = |options: &mut Self, nodes: &str| {
3175            let node_list: Vec<String> = nodes
3176                .split(',')
3177                .map(|s| s.trim())
3178                .filter(|s| !s.is_empty())
3179                .map(ToString::to_string)
3180                .collect();
3181
3182            options.adapter.cluster.nodes = node_list.clone();
3183            options.queue.redis_cluster.nodes = node_list.clone();
3184
3185            let parsed_nodes: Vec<ClusterNode> = node_list
3186                .iter()
3187                .filter_map(|seed| ClusterNode::from_seed(seed))
3188                .collect();
3189            options.database.redis.cluster.nodes = parsed_nodes.clone();
3190            options.database.redis.cluster_nodes = parsed_nodes;
3191        };
3192
3193        if let Ok(nodes) = std::env::var("REDIS_CLUSTER_NODES") {
3194            apply_redis_cluster_nodes(self, &nodes);
3195        }
3196        if let Ok(nodes) = std::env::var("DATABASE_REDIS_CLUSTER_NODES") {
3197            apply_redis_cluster_nodes(self, &nodes);
3198        }
3199        self.queue.redis_cluster.concurrency = parse_env::<u32>(
3200            "REDIS_CLUSTER_QUEUE_CONCURRENCY",
3201            self.queue.redis_cluster.concurrency,
3202        );
3203        if let Ok(prefix) = std::env::var("REDIS_CLUSTER_QUEUE_PREFIX") {
3204            self.queue.redis_cluster.prefix = Some(prefix);
3205        }
3206
3207        // --- SSL Configuration ---
3208        self.ssl.enabled = parse_bool_env("SSL_ENABLED", self.ssl.enabled);
3209        if let Ok(val) = std::env::var("SSL_CERT_PATH") {
3210            self.ssl.cert_path = val;
3211        }
3212        if let Ok(val) = std::env::var("SSL_KEY_PATH") {
3213            self.ssl.key_path = val;
3214        }
3215        self.ssl.redirect_http = parse_bool_env("SSL_REDIRECT_HTTP", self.ssl.redirect_http);
3216        if let Some(port) = parse_env_optional::<u16>("SSL_HTTP_PORT") {
3217            self.ssl.http_port = Some(port);
3218        }
3219
3220        // --- Unix Socket Configuration ---
3221        self.unix_socket.enabled = parse_bool_env("UNIX_SOCKET_ENABLED", self.unix_socket.enabled);
3222        if let Ok(path) = std::env::var("UNIX_SOCKET_PATH") {
3223            self.unix_socket.path = path;
3224        }
3225        if let Ok(mode_str) = std::env::var("UNIX_SOCKET_PERMISSION_MODE") {
3226            if mode_str.chars().all(|c| c.is_digit(8)) {
3227                if let Ok(mode) = u32::from_str_radix(&mode_str, 8) {
3228                    if mode <= 0o777 {
3229                        self.unix_socket.permission_mode = mode;
3230                    } else {
3231                        warn!(
3232                            "UNIX_SOCKET_PERMISSION_MODE '{}' exceeds maximum value 777. Using default: {:o}",
3233                            mode_str, self.unix_socket.permission_mode
3234                        );
3235                    }
3236                } else {
3237                    warn!(
3238                        "Failed to parse UNIX_SOCKET_PERMISSION_MODE '{}' as octal. Using default: {:o}",
3239                        mode_str, self.unix_socket.permission_mode
3240                    );
3241                }
3242            } else {
3243                warn!(
3244                    "UNIX_SOCKET_PERMISSION_MODE '{}' must contain only octal digits (0-7). Using default: {:o}",
3245                    mode_str, self.unix_socket.permission_mode
3246                );
3247            }
3248        }
3249
3250        // --- Metrics ---
3251        if let Ok(driver_str) = std::env::var("METRICS_DRIVER") {
3252            self.metrics.driver =
3253                parse_driver_enum(driver_str, self.metrics.driver.clone(), "Metrics");
3254        }
3255        self.metrics.enabled = parse_bool_env("METRICS_ENABLED", self.metrics.enabled);
3256        if let Ok(val) = std::env::var("METRICS_HOST") {
3257            self.metrics.host = val;
3258        }
3259        self.metrics.port = parse_env::<u16>("METRICS_PORT", self.metrics.port);
3260        if let Ok(val) = std::env::var("METRICS_PROMETHEUS_PREFIX") {
3261            self.metrics.prometheus.prefix = val;
3262        }
3263        self.metrics.tcp_exporter.enabled = parse_bool_env(
3264            "METRICS_TCP_EXPORTER_ENABLED",
3265            self.metrics.tcp_exporter.enabled,
3266        );
3267        if let Ok(val) = std::env::var("METRICS_TCP_EXPORTER_HOST") {
3268            self.metrics.tcp_exporter.host = val;
3269        }
3270        self.metrics.tcp_exporter.port =
3271            parse_env::<u16>("METRICS_TCP_EXPORTER_PORT", self.metrics.tcp_exporter.port);
3272        if let Some(buffer_size) = parse_env_optional::<usize>("METRICS_TCP_EXPORTER_BUFFER_SIZE") {
3273            self.metrics.tcp_exporter.buffer_size = Some(buffer_size);
3274        }
3275
3276        // --- HTTP API ---
3277        self.http_api.usage_enabled =
3278            parse_bool_env("HTTP_API_USAGE_ENABLED", self.http_api.usage_enabled);
3279
3280        // --- Rate Limiter ---
3281        self.rate_limiter.enabled =
3282            parse_bool_env("RATE_LIMITER_ENABLED", self.rate_limiter.enabled);
3283        self.rate_limiter.api_rate_limit.max_requests = parse_env::<u32>(
3284            "RATE_LIMITER_API_MAX_REQUESTS",
3285            self.rate_limiter.api_rate_limit.max_requests,
3286        );
3287        self.rate_limiter.api_rate_limit.window_seconds = parse_env::<u64>(
3288            "RATE_LIMITER_API_WINDOW_SECONDS",
3289            self.rate_limiter.api_rate_limit.window_seconds,
3290        );
3291        if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_API_TRUST_HOPS") {
3292            self.rate_limiter.api_rate_limit.trust_hops = Some(hops);
3293        }
3294        self.rate_limiter.websocket_rate_limit.max_requests = parse_env::<u32>(
3295            "RATE_LIMITER_WS_MAX_REQUESTS",
3296            self.rate_limiter.websocket_rate_limit.max_requests,
3297        );
3298        self.rate_limiter.websocket_rate_limit.window_seconds = parse_env::<u64>(
3299            "RATE_LIMITER_WS_WINDOW_SECONDS",
3300            self.rate_limiter.websocket_rate_limit.window_seconds,
3301        );
3302        if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_WS_TRUST_HOPS") {
3303            self.rate_limiter.websocket_rate_limit.trust_hops = Some(hops);
3304        }
3305        if let Ok(prefix) = std::env::var("RATE_LIMITER_REDIS_PREFIX") {
3306            self.rate_limiter.redis.prefix = Some(prefix);
3307        }
3308
3309        // --- Queue: Redis ---
3310        self.queue.redis.concurrency =
3311            parse_env::<u32>("QUEUE_REDIS_CONCURRENCY", self.queue.redis.concurrency);
3312        if let Ok(prefix) = std::env::var("QUEUE_REDIS_PREFIX") {
3313            self.queue.redis.prefix = Some(prefix);
3314        }
3315
3316        // --- Queue: SQS ---
3317        if let Ok(region) = std::env::var("QUEUE_SQS_REGION") {
3318            self.queue.sqs.region = region;
3319        }
3320        self.queue.sqs.visibility_timeout = parse_env::<i32>(
3321            "QUEUE_SQS_VISIBILITY_TIMEOUT",
3322            self.queue.sqs.visibility_timeout,
3323        );
3324        self.queue.sqs.max_messages =
3325            parse_env::<i32>("QUEUE_SQS_MAX_MESSAGES", self.queue.sqs.max_messages);
3326        self.queue.sqs.wait_time_seconds = parse_env::<i32>(
3327            "QUEUE_SQS_WAIT_TIME_SECONDS",
3328            self.queue.sqs.wait_time_seconds,
3329        );
3330        self.queue.sqs.concurrency =
3331            parse_env::<u32>("QUEUE_SQS_CONCURRENCY", self.queue.sqs.concurrency);
3332        self.queue.sqs.fifo = parse_bool_env("QUEUE_SQS_FIFO", self.queue.sqs.fifo);
3333        if let Ok(endpoint) = std::env::var("QUEUE_SQS_ENDPOINT_URL") {
3334            self.queue.sqs.endpoint_url = Some(endpoint);
3335        }
3336
3337        // --- Queue: SNS ---
3338        if let Ok(region) = std::env::var("QUEUE_SNS_REGION") {
3339            self.queue.sns.region = region;
3340        }
3341        if let Ok(topic_arn) = std::env::var("QUEUE_SNS_TOPIC_ARN") {
3342            self.queue.sns.topic_arn = topic_arn;
3343        }
3344        if let Ok(endpoint) = std::env::var("QUEUE_SNS_ENDPOINT_URL") {
3345            self.queue.sns.endpoint_url = Some(endpoint);
3346        }
3347
3348        // --- Webhooks ---
3349        self.webhooks.batching.enabled =
3350            parse_bool_env("WEBHOOK_BATCHING_ENABLED", self.webhooks.batching.enabled);
3351        self.webhooks.batching.duration =
3352            parse_env::<u64>("WEBHOOK_BATCHING_DURATION", self.webhooks.batching.duration);
3353        self.webhooks.batching.size =
3354            parse_env::<usize>("WEBHOOK_BATCHING_SIZE", self.webhooks.batching.size);
3355
3356        // --- NATS Adapter ---
3357        if let Ok(servers) = std::env::var("NATS_SERVERS") {
3358            self.adapter.nats.servers = servers.split(',').map(|s| s.trim().to_string()).collect();
3359        }
3360        if let Ok(user) = std::env::var("NATS_USERNAME") {
3361            self.adapter.nats.username = Some(user);
3362        }
3363        if let Ok(pass) = std::env::var("NATS_PASSWORD") {
3364            self.adapter.nats.password = Some(pass);
3365        }
3366        if let Ok(token) = std::env::var("NATS_TOKEN") {
3367            self.adapter.nats.token = Some(token);
3368        }
3369        if let Ok(prefix) = std::env::var("NATS_PREFIX") {
3370            self.adapter.nats.prefix = prefix;
3371        }
3372        self.adapter.nats.connection_timeout_ms = parse_env::<u64>(
3373            "NATS_CONNECTION_TIMEOUT_MS",
3374            self.adapter.nats.connection_timeout_ms,
3375        );
3376        self.adapter.nats.request_timeout_ms = parse_env::<u64>(
3377            "NATS_REQUEST_TIMEOUT_MS",
3378            self.adapter.nats.request_timeout_ms,
3379        );
3380        self.adapter.nats.discovery_max_wait_ms = parse_env::<u64>(
3381            "NATS_DISCOVERY_MAX_WAIT_MS",
3382            self.adapter.nats.discovery_max_wait_ms,
3383        );
3384        self.adapter.nats.discovery_idle_wait_ms = parse_env::<u64>(
3385            "NATS_DISCOVERY_IDLE_WAIT_MS",
3386            self.adapter.nats.discovery_idle_wait_ms,
3387        );
3388        if let Some(nodes) = parse_env_optional::<u32>("NATS_NODES_NUMBER") {
3389            self.adapter.nats.nodes_number = Some(nodes);
3390        }
3391        if let Some(v) = parse_env_optional::<usize>("NATS_SUBSCRIPTION_CAPACITY") {
3392            self.adapter.nats.subscription_capacity = Some(v);
3393        }
3394        if let Some(v) = parse_env_optional::<usize>("NATS_CLIENT_CAPACITY") {
3395            self.adapter.nats.client_capacity = Some(v);
3396        }
3397        if let Some(v) = parse_env_optional::<usize>("NATS_MAX_RECONNECTS") {
3398            self.adapter.nats.max_reconnects = Some(v);
3399        }
3400        if let Some(v) = parse_env_optional::<usize>("NATS_PRESENCE_SYNC_CHUNK_SIZE") {
3401            self.adapter.nats.presence_sync_chunk_size = Some(v);
3402        }
3403
3404        // --- Pulsar Adapter ---
3405        if let Ok(url) = std::env::var("PULSAR_URL") {
3406            self.adapter.pulsar.url = url;
3407        }
3408        if let Ok(prefix) = std::env::var("PULSAR_PREFIX") {
3409            self.adapter.pulsar.prefix = prefix;
3410        }
3411        if let Ok(token) = std::env::var("PULSAR_TOKEN") {
3412            self.adapter.pulsar.token = Some(token);
3413        }
3414        self.adapter.pulsar.request_timeout_ms = parse_env::<u64>(
3415            "PULSAR_REQUEST_TIMEOUT_MS",
3416            self.adapter.pulsar.request_timeout_ms,
3417        );
3418        if let Some(nodes) = parse_env_optional::<u32>("PULSAR_NODES_NUMBER") {
3419            self.adapter.pulsar.nodes_number = Some(nodes);
3420        }
3421
3422        // --- RabbitMQ Adapter ---
3423        if let Ok(url) = std::env::var("RABBITMQ_URL") {
3424            self.adapter.rabbitmq.url = url;
3425        }
3426        if let Ok(prefix) = std::env::var("RABBITMQ_PREFIX") {
3427            self.adapter.rabbitmq.prefix = prefix;
3428        }
3429        self.adapter.rabbitmq.connection_timeout_ms = parse_env::<u64>(
3430            "RABBITMQ_CONNECTION_TIMEOUT_MS",
3431            self.adapter.rabbitmq.connection_timeout_ms,
3432        );
3433        self.adapter.rabbitmq.request_timeout_ms = parse_env::<u64>(
3434            "RABBITMQ_REQUEST_TIMEOUT_MS",
3435            self.adapter.rabbitmq.request_timeout_ms,
3436        );
3437        if let Some(nodes) = parse_env_optional::<u32>("RABBITMQ_NODES_NUMBER") {
3438            self.adapter.rabbitmq.nodes_number = Some(nodes);
3439        }
3440
3441        // --- Google Pub/Sub Adapter ---
3442        if let Ok(project_id) = std::env::var("GOOGLE_PUBSUB_PROJECT_ID") {
3443            self.adapter.google_pubsub.project_id = project_id;
3444        }
3445        if let Ok(prefix) = std::env::var("GOOGLE_PUBSUB_PREFIX") {
3446            self.adapter.google_pubsub.prefix = prefix;
3447        }
3448        if let Ok(emulator_host) = std::env::var("PUBSUB_EMULATOR_HOST") {
3449            self.adapter.google_pubsub.emulator_host = Some(emulator_host);
3450        }
3451        self.adapter.google_pubsub.request_timeout_ms = parse_env::<u64>(
3452            "GOOGLE_PUBSUB_REQUEST_TIMEOUT_MS",
3453            self.adapter.google_pubsub.request_timeout_ms,
3454        );
3455        if let Some(nodes) = parse_env_optional::<u32>("GOOGLE_PUBSUB_NODES_NUMBER") {
3456            self.adapter.google_pubsub.nodes_number = Some(nodes);
3457        }
3458
3459        // --- Kafka Adapter ---
3460        if let Ok(brokers) = std::env::var("KAFKA_BROKERS") {
3461            self.adapter.kafka.brokers = brokers.split(',').map(|s| s.trim().to_string()).collect();
3462        }
3463        if let Ok(prefix) = std::env::var("KAFKA_PREFIX") {
3464            self.adapter.kafka.prefix = prefix;
3465        }
3466        if let Ok(protocol) = std::env::var("KAFKA_SECURITY_PROTOCOL") {
3467            self.adapter.kafka.security_protocol = Some(protocol);
3468        }
3469        if let Ok(mechanism) = std::env::var("KAFKA_SASL_MECHANISM") {
3470            self.adapter.kafka.sasl_mechanism = Some(mechanism);
3471        }
3472        if let Ok(username) = std::env::var("KAFKA_SASL_USERNAME") {
3473            self.adapter.kafka.sasl_username = Some(username);
3474        }
3475        if let Ok(password) = std::env::var("KAFKA_SASL_PASSWORD") {
3476            self.adapter.kafka.sasl_password = Some(password);
3477        }
3478        self.adapter.kafka.request_timeout_ms = parse_env::<u64>(
3479            "KAFKA_REQUEST_TIMEOUT_MS",
3480            self.adapter.kafka.request_timeout_ms,
3481        );
3482        if let Some(nodes) = parse_env_optional::<u32>("KAFKA_NODES_NUMBER") {
3483            self.adapter.kafka.nodes_number = Some(nodes);
3484        }
3485
3486        // --- Apache Iggy Adapter / Queue ---
3487        if let Ok(connection_string) = std::env::var("IGGY_CONNECTION_STRING") {
3488            self.adapter.iggy.connection_string = connection_string.clone();
3489            self.queue.iggy.connection_string = connection_string;
3490        }
3491        if let Ok(username) = std::env::var("IGGY_USERNAME") {
3492            let username = (!username.is_empty()).then_some(username);
3493            self.adapter.iggy.username = username.clone();
3494            self.queue.iggy.username = username;
3495        }
3496        if let Ok(password) = std::env::var("IGGY_PASSWORD") {
3497            let password = (!password.is_empty()).then_some(password);
3498            self.adapter.iggy.password = password.clone();
3499            self.queue.iggy.password = password;
3500        }
3501        if let Ok(consumer_name) = std::env::var("IGGY_CONSUMER_NAME") {
3502            let consumer_name = (!consumer_name.is_empty()).then_some(consumer_name);
3503            self.adapter.iggy.consumer_name = consumer_name.clone();
3504            self.queue.iggy.consumer_name = consumer_name;
3505        } else if let Ok(process_id) = std::env::var("INSTANCE_PROCESS_ID") {
3506            let process_id = (!process_id.is_empty()).then_some(process_id);
3507            self.adapter.iggy.consumer_name = process_id.clone();
3508            self.queue.iggy.consumer_name = process_id;
3509        }
3510        if let Ok(stream) = std::env::var("IGGY_STREAM") {
3511            self.adapter.iggy.stream = stream.clone();
3512            self.queue.iggy.stream = stream;
3513        }
3514        if let Ok(prefix) = std::env::var("IGGY_TOPIC_PREFIX") {
3515            self.adapter.iggy.topic_prefix = prefix;
3516        }
3517        if let Ok(prefix) = std::env::var("IGGY_QUEUE_TOPIC_PREFIX") {
3518            self.queue.iggy.queue_topic_prefix = prefix;
3519        }
3520        if let Ok(prefix) = std::env::var("IGGY_CONSUMER_GROUP_PREFIX") {
3521            self.queue.iggy.consumer_group_prefix = prefix;
3522        }
3523        self.adapter.iggy.request_timeout_ms = parse_env::<u64>(
3524            "IGGY_REQUEST_TIMEOUT_MS",
3525            self.adapter.iggy.request_timeout_ms,
3526        );
3527        self.queue.iggy.request_timeout_ms = parse_env::<u64>(
3528            "IGGY_REQUEST_TIMEOUT_MS",
3529            self.queue.iggy.request_timeout_ms,
3530        );
3531        self.adapter.iggy.poll_interval_ms =
3532            parse_env::<u64>("IGGY_POLL_INTERVAL_MS", self.adapter.iggy.poll_interval_ms);
3533        self.queue.iggy.poll_interval_ms =
3534            parse_env::<u64>("IGGY_POLL_INTERVAL_MS", self.queue.iggy.poll_interval_ms);
3535        self.adapter.iggy.poll_batch_size =
3536            parse_env::<u32>("IGGY_POLL_BATCH_SIZE", self.adapter.iggy.poll_batch_size);
3537        self.queue.iggy.poll_batch_size =
3538            parse_env::<u32>("IGGY_POLL_BATCH_SIZE", self.queue.iggy.poll_batch_size);
3539        self.adapter.iggy.partitions_count = parse_env::<u32>(
3540            "ADAPTER_IGGY_PARTITIONS_COUNT",
3541            parse_env::<u32>("IGGY_PARTITIONS_COUNT", self.adapter.iggy.partitions_count),
3542        );
3543        self.queue.iggy.partitions_count = parse_env::<u32>(
3544            "QUEUE_IGGY_PARTITIONS_COUNT",
3545            parse_env::<u32>("IGGY_PARTITIONS_COUNT", self.queue.iggy.partitions_count),
3546        );
3547        self.adapter.iggy.partition_id = parse_env::<u32>(
3548            "ADAPTER_IGGY_PARTITION_ID",
3549            parse_env::<u32>("IGGY_PARTITION_ID", self.adapter.iggy.partition_id),
3550        );
3551        self.queue.iggy.partition_id = parse_env::<u32>(
3552            "QUEUE_IGGY_PARTITION_ID",
3553            parse_env::<u32>("IGGY_PARTITION_ID", self.queue.iggy.partition_id),
3554        );
3555        self.adapter.iggy.auto_create =
3556            parse_env::<bool>("IGGY_AUTO_CREATE", self.adapter.iggy.auto_create);
3557        self.queue.iggy.auto_create =
3558            parse_env::<bool>("IGGY_AUTO_CREATE", self.queue.iggy.auto_create);
3559        self.adapter.iggy.start_from_latest = parse_env::<bool>(
3560            "IGGY_START_FROM_LATEST",
3561            self.adapter.iggy.start_from_latest,
3562        );
3563        if let Some(nodes) = parse_env_optional::<u32>("IGGY_NODES_NUMBER") {
3564            self.adapter.iggy.nodes_number = Some(nodes);
3565            self.queue.iggy.nodes_number = Some(nodes);
3566        }
3567
3568        // --- CORS ---
3569        if let Ok(origins) = std::env::var("CORS_ORIGINS") {
3570            let parsed: Vec<String> = origins.split(',').map(|s| s.trim().to_string()).collect();
3571            if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&parsed) {
3572                warn!(
3573                    "CORS_ORIGINS contains invalid patterns: {}. Keeping previous CORS origins.",
3574                    e
3575                );
3576            } else {
3577                self.cors.origin = parsed;
3578            }
3579        }
3580        if let Ok(methods) = std::env::var("CORS_METHODS") {
3581            self.cors.methods = methods.split(',').map(|s| s.trim().to_string()).collect();
3582        }
3583        if let Ok(headers) = std::env::var("CORS_HEADERS") {
3584            self.cors.allowed_headers = headers.split(',').map(|s| s.trim().to_string()).collect();
3585        }
3586        self.cors.credentials = parse_bool_env("CORS_CREDENTIALS", self.cors.credentials);
3587
3588        // --- Performance Tuning ---
3589        self.database_pooling.enabled =
3590            parse_bool_env("DATABASE_POOLING_ENABLED", self.database_pooling.enabled);
3591        if let Some(min) = parse_env_optional::<u32>("DATABASE_POOL_MIN") {
3592            self.database_pooling.min = min;
3593        }
3594        if let Some(max) = parse_env_optional::<u32>("DATABASE_POOL_MAX") {
3595            self.database_pooling.max = max;
3596        }
3597
3598        if let Some(pool_size) = parse_env_optional::<u32>("DATABASE_CONNECTION_POOL_SIZE") {
3599            self.database.mysql.connection_pool_size = pool_size;
3600            self.database.postgres.connection_pool_size = pool_size;
3601        }
3602        if let Some(cache_ttl) = parse_env_optional::<u64>("CACHE_TTL_SECONDS") {
3603            self.app_manager.cache.ttl = cache_ttl;
3604            self.channel_limits.cache_ttl = cache_ttl;
3605            self.database.mysql.cache_ttl = cache_ttl;
3606            self.database.postgres.cache_ttl = cache_ttl;
3607            self.database.surrealdb.cache_ttl = cache_ttl;
3608            self.cache.memory.ttl = cache_ttl;
3609        }
3610        if let Some(cleanup_interval) = parse_env_optional::<u64>("CACHE_CLEANUP_INTERVAL") {
3611            self.database.mysql.cache_cleanup_interval = cleanup_interval;
3612            self.database.postgres.cache_cleanup_interval = cleanup_interval;
3613            self.cache.memory.cleanup_interval = cleanup_interval;
3614        }
3615        if let Some(max_capacity) = parse_env_optional::<u64>("CACHE_MAX_CAPACITY") {
3616            self.database.mysql.cache_max_capacity = max_capacity;
3617            self.database.postgres.cache_max_capacity = max_capacity;
3618            self.database.surrealdb.cache_max_capacity = max_capacity;
3619            self.cache.memory.max_capacity = max_capacity;
3620        }
3621
3622        let skip_inline_apps = parse_bool_env("SOCKUDO_SKIP_INLINE_APPS", false)
3623            || !parse_bool_env("APP_MANAGER_REGISTER_INLINE_APPS", true);
3624        if skip_inline_apps {
3625            let app_count = self.app_manager.array.apps.len();
3626            self.app_manager.array.apps.clear();
3627            if app_count > 0 {
3628                info!(
3629                    "Skipping {} inline app(s) from configuration due to environment override",
3630                    app_count
3631                );
3632            }
3633        }
3634
3635        let default_app_id = std::env::var("SOCKUDO_DEFAULT_APP_ID").ok();
3636        let default_app_key = std::env::var("SOCKUDO_DEFAULT_APP_KEY").ok();
3637        let default_app_secret = std::env::var("SOCKUDO_DEFAULT_APP_SECRET").ok();
3638        let default_app_enabled_env = std::env::var("SOCKUDO_DEFAULT_APP_ENABLED").ok();
3639        let default_app_enabled = parse_bool_env("SOCKUDO_DEFAULT_APP_ENABLED", true);
3640        let default_app_credentials_configured =
3641            default_app_id.is_some() || default_app_key.is_some() || default_app_secret.is_some();
3642        let default_app_env_configured =
3643            default_app_credentials_configured || default_app_enabled_env.is_some();
3644        let default_app_should_override_inline = default_app_credentials_configured
3645            || default_app_enabled_env.is_some_and(|_| !default_app_enabled);
3646
3647        if default_app_should_override_inline {
3648            let app_count = self.app_manager.array.apps.len();
3649            self.app_manager.array.apps.clear();
3650            if app_count > 0 {
3651                info!(
3652                    "Replacing {} inline app(s) from configuration with SOCKUDO_DEFAULT_APP_* settings",
3653                    app_count
3654                );
3655            }
3656        }
3657
3658        if let (Some(app_id), Some(app_key), Some(app_secret)) =
3659            (default_app_id, default_app_key, default_app_secret)
3660            && default_app_enabled
3661        {
3662            let default_app = App::from_policy(
3663                app_id,
3664                app_key,
3665                app_secret,
3666                default_app_enabled,
3667                crate::app::AppPolicy {
3668                    limits: crate::app::AppLimitsPolicy {
3669                        max_connections: parse_env::<u32>(
3670                            "SOCKUDO_DEFAULT_APP_MAX_CONNECTIONS",
3671                            100,
3672                        ),
3673                        max_backend_events_per_second: Some(parse_env::<u32>(
3674                            "SOCKUDO_DEFAULT_APP_MAX_BACKEND_EVENTS_PER_SECOND",
3675                            100,
3676                        )),
3677                        max_client_events_per_second: parse_env::<u32>(
3678                            "SOCKUDO_DEFAULT_APP_MAX_CLIENT_EVENTS_PER_SECOND",
3679                            100,
3680                        ),
3681                        max_read_requests_per_second: Some(parse_env::<u32>(
3682                            "SOCKUDO_DEFAULT_APP_MAX_READ_REQUESTS_PER_SECOND",
3683                            100,
3684                        )),
3685                        max_presence_members_per_channel: Some(parse_env::<u32>(
3686                            "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBERS_PER_CHANNEL",
3687                            100,
3688                        )),
3689                        max_presence_member_size_in_kb: Some(parse_env::<u32>(
3690                            "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBER_SIZE_IN_KB",
3691                            100,
3692                        )),
3693                        max_channel_name_length: Some(parse_env::<u32>(
3694                            "SOCKUDO_DEFAULT_APP_MAX_CHANNEL_NAME_LENGTH",
3695                            100,
3696                        )),
3697                        max_event_channels_at_once: Some(parse_env::<u32>(
3698                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_CHANNELS_AT_ONCE",
3699                            100,
3700                        )),
3701                        max_event_name_length: Some(parse_env::<u32>(
3702                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_NAME_LENGTH",
3703                            100,
3704                        )),
3705                        max_event_payload_in_kb: Some(parse_env::<u32>(
3706                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_PAYLOAD_IN_KB",
3707                            100,
3708                        )),
3709                        max_event_batch_size: Some(parse_env::<u32>(
3710                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_BATCH_SIZE",
3711                            100,
3712                        )),
3713                        decay_seconds: None,
3714                        terminate_on_limit: false,
3715                        message_rate_limit: None,
3716                    },
3717                    features: crate::app::AppFeaturesPolicy {
3718                        enable_client_messages: std::env::var(
3719                            "SOCKUDO_DEFAULT_APP_ENABLE_CLIENT_MESSAGES",
3720                        )
3721                        .ok()
3722                        .map(|value| {
3723                            matches!(
3724                                value.trim().to_ascii_lowercase().as_str(),
3725                                "true" | "1" | "yes" | "on"
3726                            )
3727                        })
3728                        .unwrap_or_else(|| parse_bool_env("SOCKUDO_ENABLE_CLIENT_MESSAGES", false)),
3729                        enable_user_authentication: Some(parse_bool_env(
3730                            "SOCKUDO_DEFAULT_APP_ENABLE_USER_AUTHENTICATION",
3731                            false,
3732                        )),
3733                        enable_watchlist_events: Some(parse_bool_env(
3734                            "SOCKUDO_DEFAULT_APP_ENABLE_WATCHLIST_EVENTS",
3735                            false,
3736                        )),
3737                    },
3738                    channels: crate::app::AppChannelsPolicy {
3739                        allowed_origins: {
3740                            if let Ok(origins_str) =
3741                                std::env::var("SOCKUDO_DEFAULT_APP_ALLOWED_ORIGINS")
3742                            {
3743                                if !origins_str.is_empty() {
3744                                    Some(
3745                                        origins_str
3746                                            .split(',')
3747                                            .map(|s| s.trim().to_string())
3748                                            .collect(),
3749                                    )
3750                                } else {
3751                                    None
3752                                }
3753                            } else {
3754                                None
3755                            }
3756                        },
3757                        annotations_enabled: Some(parse_bool_env(
3758                            "SOCKUDO_DEFAULT_APP_ANNOTATIONS_ENABLED",
3759                            false,
3760                        )),
3761                        channel_delta_compression: None,
3762                        channel_namespaces: None,
3763                    },
3764                    webhooks: None,
3765                    idempotency: None,
3766                    connection_recovery: None,
3767                    history: None,
3768                    presence_history: None,
3769                },
3770            );
3771
3772            self.app_manager.array.apps.push(default_app);
3773            info!("Successfully registered default app from env");
3774        } else if default_app_env_configured && !default_app_enabled {
3775            info!("Default app registration disabled by SOCKUDO_DEFAULT_APP_ENABLED=false");
3776        } else if default_app_credentials_configured {
3777            warn!(
3778                "SOCKUDO_DEFAULT_APP_* environment was configured but id, key, and secret were not all provided; no default app registered"
3779            );
3780        }
3781
3782        // Special handling for REDIS_URL
3783        if let Ok(redis_url_env) = std::env::var("REDIS_URL") {
3784            info!("Applying REDIS_URL environment variable override");
3785
3786            let redis_url_json = sonic_rs::json!(redis_url_env);
3787
3788            self.adapter
3789                .redis
3790                .redis_pub_options
3791                .insert("url".to_string(), redis_url_json.clone());
3792            self.adapter
3793                .redis
3794                .redis_sub_options
3795                .insert("url".to_string(), redis_url_json);
3796
3797            self.cache.redis.url_override = Some(redis_url_env.clone());
3798            self.queue.redis.url_override = Some(redis_url_env.clone());
3799            self.rate_limiter.redis.url_override = Some(redis_url_env);
3800        }
3801
3802        // --- Logging Configuration ---
3803        let has_colors_env = std::env::var("LOG_COLORS_ENABLED").is_ok();
3804        let has_target_env = std::env::var("LOG_INCLUDE_TARGET").is_ok();
3805        if has_colors_env || has_target_env {
3806            let logging_config = self.logging.get_or_insert_with(Default::default);
3807            if has_colors_env {
3808                logging_config.colors_enabled =
3809                    parse_bool_env("LOG_COLORS_ENABLED", logging_config.colors_enabled);
3810            }
3811            if has_target_env {
3812                logging_config.include_target =
3813                    parse_bool_env("LOG_INCLUDE_TARGET", logging_config.include_target);
3814            }
3815        }
3816
3817        // --- Cleanup Configuration ---
3818        self.cleanup.async_enabled =
3819            parse_bool_env("CLEANUP_ASYNC_ENABLED", self.cleanup.async_enabled);
3820        self.cleanup.fallback_to_sync =
3821            parse_bool_env("CLEANUP_FALLBACK_TO_SYNC", self.cleanup.fallback_to_sync);
3822        self.cleanup.queue_buffer_size =
3823            parse_env::<usize>("CLEANUP_QUEUE_BUFFER_SIZE", self.cleanup.queue_buffer_size);
3824        self.cleanup.batch_size = parse_env::<usize>("CLEANUP_BATCH_SIZE", self.cleanup.batch_size);
3825        self.cleanup.batch_timeout_ms =
3826            parse_env::<u64>("CLEANUP_BATCH_TIMEOUT_MS", self.cleanup.batch_timeout_ms);
3827        if let Ok(worker_threads_str) = std::env::var("CLEANUP_WORKER_THREADS") {
3828            self.cleanup.worker_threads = if worker_threads_str.to_lowercase() == "auto" {
3829                WorkerThreadsConfig::Auto
3830            } else if let Ok(n) = worker_threads_str.parse::<usize>() {
3831                WorkerThreadsConfig::Fixed(n)
3832            } else {
3833                warn!(
3834                    "Invalid CLEANUP_WORKER_THREADS value '{}', keeping current setting",
3835                    worker_threads_str
3836                );
3837                self.cleanup.worker_threads.clone()
3838            };
3839        }
3840        self.cleanup.max_retry_attempts = parse_env::<u32>(
3841            "CLEANUP_MAX_RETRY_ATTEMPTS",
3842            self.cleanup.max_retry_attempts,
3843        );
3844
3845        // Cluster health configuration
3846        self.cluster_health.enabled =
3847            parse_bool_env("CLUSTER_HEALTH_ENABLED", self.cluster_health.enabled);
3848        self.cluster_health.heartbeat_interval_ms = parse_env::<u64>(
3849            "CLUSTER_HEALTH_HEARTBEAT_INTERVAL",
3850            self.cluster_health.heartbeat_interval_ms,
3851        );
3852        self.cluster_health.node_timeout_ms = parse_env::<u64>(
3853            "CLUSTER_HEALTH_NODE_TIMEOUT",
3854            self.cluster_health.node_timeout_ms,
3855        );
3856        self.cluster_health.cleanup_interval_ms = parse_env::<u64>(
3857            "CLUSTER_HEALTH_CLEANUP_INTERVAL",
3858            self.cluster_health.cleanup_interval_ms,
3859        );
3860
3861        // Health check endpoint timeout
3862        self.health_check_timeout_ms =
3863            parse_env::<u64>("HEALTH_CHECK_TIMEOUT_MS", self.health_check_timeout_ms);
3864
3865        // Tag filtering configuration
3866        self.tag_filtering.enabled =
3867            parse_bool_env("TAG_FILTERING_ENABLED", self.tag_filtering.enabled);
3868
3869        // WebSocket buffer configuration
3870        if let Ok(val) = std::env::var("WEBSOCKET_MAX_MESSAGES") {
3871            if val.to_lowercase() == "none" || val == "0" {
3872                self.websocket.max_messages = None;
3873            } else if let Ok(n) = val.parse::<usize>() {
3874                self.websocket.max_messages = Some(n);
3875            }
3876        }
3877        if let Ok(val) = std::env::var("WEBSOCKET_MAX_BYTES") {
3878            if val.to_lowercase() == "none" || val == "0" {
3879                self.websocket.max_bytes = None;
3880            } else if let Ok(n) = val.parse::<usize>() {
3881                self.websocket.max_bytes = Some(n);
3882            }
3883        }
3884        self.websocket.disconnect_on_buffer_full = parse_bool_env(
3885            "WEBSOCKET_DISCONNECT_ON_BUFFER_FULL",
3886            self.websocket.disconnect_on_buffer_full,
3887        );
3888        self.websocket.max_message_size = parse_env::<usize>(
3889            "WEBSOCKET_MAX_MESSAGE_SIZE",
3890            self.websocket.max_message_size,
3891        );
3892        self.websocket.max_frame_size =
3893            parse_env::<usize>("WEBSOCKET_MAX_FRAME_SIZE", self.websocket.max_frame_size);
3894        self.websocket.write_buffer_size = parse_env::<usize>(
3895            "WEBSOCKET_WRITE_BUFFER_SIZE",
3896            self.websocket.write_buffer_size,
3897        );
3898        self.websocket.max_backpressure = parse_env::<usize>(
3899            "WEBSOCKET_MAX_BACKPRESSURE",
3900            self.websocket.max_backpressure,
3901        );
3902        self.websocket.auto_ping = parse_bool_env("WEBSOCKET_AUTO_PING", self.websocket.auto_ping);
3903        self.websocket.ping_interval =
3904            parse_env::<u32>("WEBSOCKET_PING_INTERVAL", self.websocket.ping_interval);
3905        self.websocket.idle_timeout =
3906            parse_env::<u32>("WEBSOCKET_IDLE_TIMEOUT", self.websocket.idle_timeout);
3907        if let Ok(mode) = std::env::var("WEBSOCKET_COMPRESSION") {
3908            self.websocket.compression = mode;
3909        }
3910
3911        // Connection recovery (includes serial + message_id + replay buffer)
3912        self.connection_recovery.enabled = parse_bool_env(
3913            "CONNECTION_RECOVERY_ENABLED",
3914            self.connection_recovery.enabled,
3915        );
3916        self.connection_recovery.buffer_ttl_seconds = parse_env::<u64>(
3917            "CONNECTION_RECOVERY_BUFFER_TTL",
3918            self.connection_recovery.buffer_ttl_seconds,
3919        );
3920        self.connection_recovery.max_buffer_size = parse_env::<usize>(
3921            "CONNECTION_RECOVERY_MAX_BUFFER_SIZE",
3922            self.connection_recovery.max_buffer_size,
3923        );
3924
3925        self.history.enabled = parse_bool_env("HISTORY_ENABLED", self.history.enabled);
3926        self.history.rewind_enabled =
3927            parse_bool_env("HISTORY_REWIND_ENABLED", self.history.rewind_enabled);
3928        self.history.retention_window_seconds = parse_env::<u64>(
3929            "HISTORY_RETENTION_WINDOW_SECONDS",
3930            self.history.retention_window_seconds,
3931        );
3932        self.history.max_page_size =
3933            parse_env::<usize>("HISTORY_MAX_PAGE_SIZE", self.history.max_page_size);
3934        self.history.writer_shards =
3935            parse_env::<usize>("HISTORY_WRITER_SHARDS", self.history.writer_shards);
3936        self.history.writer_queue_capacity = parse_env::<usize>(
3937            "HISTORY_WRITER_QUEUE_CAPACITY",
3938            self.history.writer_queue_capacity,
3939        );
3940        if let Ok(backend) = std::env::var("HISTORY_BACKEND") {
3941            self.history.backend = HistoryBackend::from_str(&backend)?;
3942        }
3943        if let Ok(max_messages) = std::env::var("HISTORY_MAX_MESSAGES_PER_CHANNEL") {
3944            self.history.max_messages_per_channel = Some(
3945                max_messages
3946                    .parse::<usize>()
3947                    .map_err(|e| format!("Invalid HISTORY_MAX_MESSAGES_PER_CHANNEL: {e}"))?,
3948            );
3949        }
3950        if let Ok(max_bytes) = std::env::var("HISTORY_MAX_BYTES_PER_CHANNEL") {
3951            self.history.max_bytes_per_channel = Some(
3952                max_bytes
3953                    .parse::<u64>()
3954                    .map_err(|e| format!("Invalid HISTORY_MAX_BYTES_PER_CHANNEL: {e}"))?,
3955            );
3956        }
3957        if let Ok(table_prefix) = std::env::var("HISTORY_POSTGRES_TABLE_PREFIX") {
3958            self.history.postgres.table_prefix = table_prefix;
3959        }
3960        self.history.postgres.write_timeout_ms = parse_env::<u64>(
3961            "HISTORY_POSTGRES_WRITE_TIMEOUT_MS",
3962            self.history.postgres.write_timeout_ms,
3963        );
3964        self.history.purge_interval_seconds = parse_env::<u64>(
3965            "HISTORY_PURGE_INTERVAL_SECONDS",
3966            self.history.purge_interval_seconds,
3967        );
3968        self.history.purge_batch_size =
3969            parse_env::<usize>("HISTORY_PURGE_BATCH_SIZE", self.history.purge_batch_size);
3970        self.history.max_purge_per_tick = parse_env::<usize>(
3971            "HISTORY_MAX_PURGE_PER_TICK",
3972            self.history.max_purge_per_tick,
3973        );
3974
3975        self.presence_history.enabled =
3976            parse_bool_env("PRESENCE_HISTORY_ENABLED", self.presence_history.enabled);
3977        self.presence_history.retention_window_seconds = parse_env::<u64>(
3978            "PRESENCE_HISTORY_RETENTION_WINDOW_SECONDS",
3979            self.presence_history.retention_window_seconds,
3980        );
3981        self.presence_history.max_page_size = parse_env::<usize>(
3982            "PRESENCE_HISTORY_MAX_PAGE_SIZE",
3983            self.presence_history.max_page_size,
3984        );
3985        if let Ok(max_events) = std::env::var("PRESENCE_HISTORY_MAX_EVENTS_PER_CHANNEL") {
3986            self.presence_history.max_events_per_channel =
3987                Some(max_events.parse::<usize>().map_err(|e| {
3988                    format!("Invalid PRESENCE_HISTORY_MAX_EVENTS_PER_CHANNEL: {e}")
3989                })?);
3990        }
3991        if let Ok(max_bytes) = std::env::var("PRESENCE_HISTORY_MAX_BYTES_PER_CHANNEL") {
3992            self.presence_history.max_bytes_per_channel = Some(
3993                max_bytes
3994                    .parse::<u64>()
3995                    .map_err(|e| format!("Invalid PRESENCE_HISTORY_MAX_BYTES_PER_CHANNEL: {e}"))?,
3996            );
3997        }
3998        self.idempotency.enabled = parse_bool_env("IDEMPOTENCY_ENABLED", self.idempotency.enabled);
3999        self.idempotency.ttl_seconds =
4000            parse_env::<u64>("IDEMPOTENCY_TTL_SECONDS", self.idempotency.ttl_seconds);
4001        self.idempotency.max_key_length = parse_env::<usize>(
4002            "IDEMPOTENCY_MAX_KEY_LENGTH",
4003            self.idempotency.max_key_length,
4004        );
4005
4006        self.ephemeral.enabled = parse_bool_env("EPHEMERAL_ENABLED", self.ephemeral.enabled);
4007
4008        self.echo_control.enabled =
4009            parse_bool_env("ECHO_CONTROL_ENABLED", self.echo_control.enabled);
4010        self.echo_control.default_echo_messages = parse_bool_env(
4011            "ECHO_CONTROL_DEFAULT_ECHO_MESSAGES",
4012            self.echo_control.default_echo_messages,
4013        );
4014
4015        self.event_name_filtering.enabled = parse_bool_env(
4016            "EVENT_NAME_FILTERING_ENABLED",
4017            self.event_name_filtering.enabled,
4018        );
4019        self.event_name_filtering.max_events_per_filter = parse_env::<usize>(
4020            "EVENT_NAME_FILTERING_MAX_EVENTS_PER_FILTER",
4021            self.event_name_filtering.max_events_per_filter,
4022        );
4023        self.event_name_filtering.max_event_name_length = parse_env::<usize>(
4024            "EVENT_NAME_FILTERING_MAX_EVENT_NAME_LENGTH",
4025            self.event_name_filtering.max_event_name_length,
4026        );
4027        self.versioned_messages.enabled = parse_bool_env(
4028            "VERSIONED_MESSAGES_ENABLED",
4029            self.versioned_messages.enabled,
4030        );
4031        if let Ok(driver_str) = std::env::var("VERSIONED_MESSAGES_DRIVER") {
4032            self.versioned_messages.driver = parse_driver_enum(
4033                driver_str,
4034                self.versioned_messages.driver.clone(),
4035                "VersionedMessages Backend",
4036            );
4037        }
4038        if let Ok(driver_str) = std::env::var("PUSH_STORAGE_DRIVER") {
4039            self.push.storage_driver = parse_driver_enum(
4040                driver_str,
4041                self.push.storage_driver.clone(),
4042                "Push Storage Backend",
4043            );
4044        }
4045        if let Ok(driver_str) = std::env::var("PUSH_QUEUE_DRIVER") {
4046            self.push.queue_driver = parse_driver_enum(
4047                driver_str,
4048                self.push.queue_driver.clone(),
4049                "Push Queue Backend",
4050            );
4051        }
4052        self.push.fcm_enabled = parse_bool_env("PUSH_FCM_ENABLED", self.push.fcm_enabled);
4053        self.push.apns_enabled = parse_bool_env("PUSH_APNS_ENABLED", self.push.apns_enabled);
4054        self.push.webpush_enabled =
4055            parse_bool_env("PUSH_WEBPUSH_ENABLED", self.push.webpush_enabled);
4056        self.push.hms_enabled = parse_bool_env("PUSH_HMS_ENABLED", self.push.hms_enabled);
4057        self.push.wns_enabled = parse_bool_env("PUSH_WNS_ENABLED", self.push.wns_enabled);
4058        if let Some(key) = parse_env_optional::<String>("PUSH_CREDENTIAL_ENCRYPTION_KEY") {
4059            self.push.credential_encryption_key = Some(key);
4060        }
4061        self.push.fanout_fast_threshold = parse_env::<u64>(
4062            "PUSH_FANOUT_FAST_THRESHOLD",
4063            self.push.fanout_fast_threshold,
4064        );
4065        self.push.fanout_shard_size =
4066            parse_env::<u64>("PUSH_FANOUT_SHARD_SIZE", self.push.fanout_shard_size);
4067        self.push.fanout_sync_threshold = parse_env::<u64>(
4068            "PUSH_FANOUT_SYNC_THRESHOLD",
4069            self.push.fanout_sync_threshold,
4070        );
4071        self.push.backpressure_lag_threshold_secs = parse_env::<u64>(
4072            "PUSH_BACKPRESSURE_LAG_THRESHOLD_SECS",
4073            self.push.backpressure_lag_threshold_secs,
4074        );
4075        self.push.publish_status_ttl_days = parse_env::<u64>(
4076            "PUSH_PUBLISH_STATUS_TTL_DAYS",
4077            self.push.publish_status_ttl_days,
4078        );
4079        self.push.circuit_breaker.failure_threshold = parse_env::<u32>(
4080            "PUSH_FAILURE_THRESHOLD",
4081            self.push.circuit_breaker.failure_threshold,
4082        );
4083        self.push.scheduler_interval_secs = parse_env::<u64>(
4084            "PUSH_SCHEDULER_INTERVAL_SECS",
4085            self.push.scheduler_interval_secs,
4086        );
4087        self.push.stale_device_max_age_days = parse_env::<u64>(
4088            "PUSH_STALE_DEVICE_MAX_AGE_DAYS",
4089            self.push.stale_device_max_age_days,
4090        );
4091        self.push.analytics_enabled =
4092            parse_bool_env("PUSH_ANALYTICS_ENABLED", self.push.analytics_enabled);
4093        self.push.default_quotas.acceptance_rps = parse_env::<u64>(
4094            "PUSH_DEFAULT_ACCEPTANCE_RPS",
4095            self.push.default_quotas.acceptance_rps,
4096        );
4097        self.push.default_quotas.delivery_quota_daily = parse_env::<u64>(
4098            "PUSH_DEFAULT_DELIVERY_QUOTA_DAILY",
4099            self.push.default_quotas.delivery_quota_daily,
4100        );
4101        self.push.default_quotas.fanout_max = parse_env::<u64>(
4102            "PUSH_DEFAULT_FANOUT_MAX",
4103            self.push.default_quotas.fanout_max,
4104        );
4105        self.push.default_quotas.inflight_max = parse_env::<u64>(
4106            "PUSH_DEFAULT_INFLIGHT_MAX",
4107            self.push.default_quotas.inflight_max,
4108        );
4109        self.versioned_messages.max_page_size = parse_env::<usize>(
4110            "VERSIONED_MESSAGES_MAX_PAGE_SIZE",
4111            self.versioned_messages.max_page_size,
4112        );
4113        self.versioned_messages.retention_window_seconds = parse_env::<u64>(
4114            "VERSIONED_MESSAGES_RETENTION_WINDOW_SECONDS",
4115            self.versioned_messages.retention_window_seconds,
4116        );
4117        self.versioned_messages.purge_interval_seconds = parse_env::<u64>(
4118            "VERSIONED_MESSAGES_PURGE_INTERVAL_SECONDS",
4119            self.versioned_messages.purge_interval_seconds,
4120        );
4121        self.versioned_messages.purge_batch_size = parse_env::<usize>(
4122            "VERSIONED_MESSAGES_PURGE_BATCH_SIZE",
4123            self.versioned_messages.purge_batch_size,
4124        );
4125        self.versioned_messages.max_purge_per_tick = parse_env::<usize>(
4126            "VERSIONED_MESSAGES_MAX_PURGE_PER_TICK",
4127            self.versioned_messages.max_purge_per_tick,
4128        );
4129        self.annotations.enabled = parse_bool_env("ANNOTATIONS_ENABLED", self.annotations.enabled);
4130
4131        Ok(())
4132    }
4133
4134    pub fn validate(&self) -> Result<(), String> {
4135        if self.unix_socket.enabled {
4136            if self.unix_socket.path.is_empty() {
4137                return Err(
4138                    "Unix socket path cannot be empty when Unix socket is enabled".to_string(),
4139                );
4140            }
4141
4142            self.validate_unix_socket_security()?;
4143
4144            if self.ssl.enabled {
4145                tracing::warn!(
4146                    "Both Unix socket and SSL are enabled. This is unusual as Unix sockets are typically used behind reverse proxies that handle SSL termination."
4147                );
4148            }
4149
4150            if self.unix_socket.permission_mode > 0o777 {
4151                return Err(format!(
4152                    "Unix socket permission_mode ({:o}) is invalid. Must be a valid octal mode (0o000 to 0o777)",
4153                    self.unix_socket.permission_mode
4154                ));
4155            }
4156        }
4157
4158        if let Err(e) = self.cleanup.validate() {
4159            return Err(format!("Invalid cleanup configuration: {}", e));
4160        }
4161
4162        if self.history.enabled {
4163            if self.history.max_page_size == 0 {
4164                return Err("history.max_page_size must be greater than 0".to_string());
4165            }
4166            if self.history.writer_shards == 0 {
4167                return Err("history.writer_shards must be greater than 0".to_string());
4168            }
4169            if self.history.writer_queue_capacity == 0 {
4170                return Err("history.writer_queue_capacity must be greater than 0".to_string());
4171            }
4172            if self.history.retention_window_seconds == 0 {
4173                return Err("history.retention_window_seconds must be greater than 0".to_string());
4174            }
4175            if self.history.postgres.table_prefix.trim().is_empty() {
4176                return Err("history.postgres.table_prefix must not be empty".to_string());
4177            }
4178        }
4179
4180        if self.presence_history.enabled {
4181            if self.presence_history.max_page_size == 0 {
4182                return Err("presence_history.max_page_size must be greater than 0".to_string());
4183            }
4184            if self.presence_history.retention_window_seconds == 0 {
4185                return Err(
4186                    "presence_history.retention_window_seconds must be greater than 0".to_string(),
4187                );
4188            }
4189        }
4190
4191        if self.versioned_messages.enabled && self.versioned_messages.max_page_size == 0 {
4192            return Err("versioned_messages.max_page_size must be greater than 0".to_string());
4193        }
4194        if self.annotations.enabled && !self.versioned_messages.enabled {
4195            return Err("annotations require versioned_messages.enabled".to_string());
4196        }
4197
4198        if self.adapter.nats.presence_sync_chunk_size == Some(0) {
4199            return Err("nats.presence_sync_chunk_size must be > 0 when set".to_string());
4200        }
4201
4202        Ok(())
4203    }
4204
4205    fn validate_unix_socket_security(&self) -> Result<(), String> {
4206        let path = &self.unix_socket.path;
4207
4208        if path.contains("../") || path.contains("..\\") {
4209            return Err(
4210                "Unix socket path contains directory traversal sequences (../). This is not allowed for security reasons.".to_string()
4211            );
4212        }
4213
4214        if self.unix_socket.permission_mode & 0o002 != 0 {
4215            warn!(
4216                "Unix socket permission mode ({:o}) allows world write access. This may be a security risk. Consider using more restrictive permissions like 0o660 or 0o750.",
4217                self.unix_socket.permission_mode
4218            );
4219        }
4220
4221        if self.unix_socket.permission_mode & 0o007 > 0o005 {
4222            warn!(
4223                "Unix socket permission mode ({:o}) grants write permissions to others. Consider using more restrictive permissions.",
4224                self.unix_socket.permission_mode
4225            );
4226        }
4227
4228        if self.mode == "production" && path.starts_with("/tmp/") {
4229            warn!(
4230                "Unix socket path '{}' is in /tmp directory. In production, consider using a more permanent location like /var/run/sockudo/ for better security and persistence.",
4231                path
4232            );
4233        }
4234
4235        if !path.starts_with('/') {
4236            return Err(
4237                "Unix socket path must be absolute (start with /) for security and reliability."
4238                    .to_string(),
4239            );
4240        }
4241
4242        Ok(())
4243    }
4244}
4245
4246#[cfg(test)]
4247mod redis_connection_tests {
4248    use super::{ClusterNode, RedisClusterConnection, RedisConnection, RedisSentinel};
4249
4250    #[test]
4251    fn test_standard_url_basic() {
4252        let conn = RedisConnection {
4253            host: "127.0.0.1".to_string(),
4254            port: 6379,
4255            db: 0,
4256            username: None,
4257            password: None,
4258            key_prefix: "sockudo:".to_string(),
4259            sentinels: Vec::new(),
4260            sentinel_password: None,
4261            name: "mymaster".to_string(),
4262            cluster: RedisClusterConnection::default(),
4263            cluster_nodes: Vec::new(),
4264        };
4265        assert_eq!(conn.to_url(), "redis://127.0.0.1:6379/0");
4266    }
4267
4268    #[test]
4269    fn test_standard_url_with_password() {
4270        let conn = RedisConnection {
4271            host: "127.0.0.1".to_string(),
4272            port: 6379,
4273            db: 2,
4274            username: None,
4275            password: Some("secret".to_string()),
4276            key_prefix: "sockudo:".to_string(),
4277            sentinels: Vec::new(),
4278            sentinel_password: None,
4279            name: "mymaster".to_string(),
4280            cluster: RedisClusterConnection::default(),
4281            cluster_nodes: Vec::new(),
4282        };
4283        assert_eq!(conn.to_url(), "redis://:secret@127.0.0.1:6379/2");
4284    }
4285
4286    #[test]
4287    fn test_standard_url_with_username_and_password() {
4288        let conn = RedisConnection {
4289            host: "redis.example.com".to_string(),
4290            port: 6380,
4291            db: 1,
4292            username: Some("admin".to_string()),
4293            password: Some("pass123".to_string()),
4294            key_prefix: "sockudo:".to_string(),
4295            sentinels: Vec::new(),
4296            sentinel_password: None,
4297            name: "mymaster".to_string(),
4298            cluster: RedisClusterConnection::default(),
4299            cluster_nodes: Vec::new(),
4300        };
4301        assert_eq!(
4302            conn.to_url(),
4303            "redis://admin:pass123@redis.example.com:6380/1"
4304        );
4305    }
4306
4307    #[test]
4308    fn test_standard_url_with_special_chars_in_password() {
4309        let conn = RedisConnection {
4310            host: "127.0.0.1".to_string(),
4311            port: 6379,
4312            db: 0,
4313            username: None,
4314            password: Some("pass@word#123".to_string()),
4315            key_prefix: "sockudo:".to_string(),
4316            sentinels: Vec::new(),
4317            sentinel_password: None,
4318            name: "mymaster".to_string(),
4319            cluster: RedisClusterConnection::default(),
4320            cluster_nodes: Vec::new(),
4321        };
4322        assert_eq!(conn.to_url(), "redis://:pass%40word%23123@127.0.0.1:6379/0");
4323    }
4324
4325    #[test]
4326    fn test_is_sentinel_configured_false() {
4327        let conn = RedisConnection::default();
4328        assert!(!conn.is_sentinel_configured());
4329    }
4330
4331    #[test]
4332    fn test_is_sentinel_configured_true() {
4333        let conn = RedisConnection {
4334            sentinels: vec![RedisSentinel {
4335                host: "sentinel1".to_string(),
4336                port: 26379,
4337            }],
4338            ..Default::default()
4339        };
4340        assert!(conn.is_sentinel_configured());
4341    }
4342
4343    #[test]
4344    fn test_sentinel_url_basic() {
4345        let conn = RedisConnection {
4346            host: "127.0.0.1".to_string(),
4347            port: 6379,
4348            db: 0,
4349            username: None,
4350            password: None,
4351            key_prefix: "sockudo:".to_string(),
4352            sentinels: vec![
4353                RedisSentinel {
4354                    host: "sentinel1".to_string(),
4355                    port: 26379,
4356                },
4357                RedisSentinel {
4358                    host: "sentinel2".to_string(),
4359                    port: 26379,
4360                },
4361            ],
4362            sentinel_password: None,
4363            name: "mymaster".to_string(),
4364            cluster: RedisClusterConnection::default(),
4365            cluster_nodes: Vec::new(),
4366        };
4367        assert_eq!(
4368            conn.to_url(),
4369            "redis+sentinel://sentinel1:26379,sentinel2:26379/mymaster/0"
4370        );
4371    }
4372
4373    #[test]
4374    fn test_sentinel_url_with_sentinel_password() {
4375        let conn = RedisConnection {
4376            host: "127.0.0.1".to_string(),
4377            port: 6379,
4378            db: 0,
4379            username: None,
4380            password: None,
4381            key_prefix: "sockudo:".to_string(),
4382            sentinels: vec![RedisSentinel {
4383                host: "sentinel1".to_string(),
4384                port: 26379,
4385            }],
4386            sentinel_password: Some("sentinelpass".to_string()),
4387            name: "mymaster".to_string(),
4388            cluster: RedisClusterConnection::default(),
4389            cluster_nodes: Vec::new(),
4390        };
4391        assert_eq!(
4392            conn.to_url(),
4393            "redis+sentinel://:sentinelpass@sentinel1:26379/mymaster/0"
4394        );
4395    }
4396
4397    #[test]
4398    fn test_sentinel_url_with_master_password() {
4399        let conn = RedisConnection {
4400            host: "127.0.0.1".to_string(),
4401            port: 6379,
4402            db: 1,
4403            username: None,
4404            password: Some("masterpass".to_string()),
4405            key_prefix: "sockudo:".to_string(),
4406            sentinels: vec![RedisSentinel {
4407                host: "sentinel1".to_string(),
4408                port: 26379,
4409            }],
4410            sentinel_password: None,
4411            name: "mymaster".to_string(),
4412            cluster: RedisClusterConnection::default(),
4413            cluster_nodes: Vec::new(),
4414        };
4415        assert_eq!(
4416            conn.to_url(),
4417            "redis+sentinel://sentinel1:26379/mymaster/1?password=masterpass"
4418        );
4419    }
4420
4421    #[test]
4422    fn test_sentinel_url_with_all_auth() {
4423        let conn = RedisConnection {
4424            host: "127.0.0.1".to_string(),
4425            port: 6379,
4426            db: 2,
4427            username: Some("redisuser".to_string()),
4428            password: Some("redispass".to_string()),
4429            key_prefix: "sockudo:".to_string(),
4430            sentinels: vec![
4431                RedisSentinel {
4432                    host: "sentinel1".to_string(),
4433                    port: 26379,
4434                },
4435                RedisSentinel {
4436                    host: "sentinel2".to_string(),
4437                    port: 26380,
4438                },
4439            ],
4440            sentinel_password: Some("sentinelauth".to_string()),
4441            name: "production-master".to_string(),
4442            cluster: RedisClusterConnection::default(),
4443            cluster_nodes: Vec::new(),
4444        };
4445        assert_eq!(
4446            conn.to_url(),
4447            "redis+sentinel://:sentinelauth@sentinel1:26379,sentinel2:26380/production-master/2?password=redispass&username=redisuser"
4448        );
4449    }
4450
4451    #[test]
4452    fn test_sentinel_to_host_port() {
4453        let sentinel = RedisSentinel {
4454            host: "sentinel.example.com".to_string(),
4455            port: 26379,
4456        };
4457        assert_eq!(sentinel.to_host_port(), "sentinel.example.com:26379");
4458    }
4459
4460    #[test]
4461    fn test_cluster_node_urls_with_shared_cluster_auth_and_tls() {
4462        let conn = RedisConnection {
4463            cluster: RedisClusterConnection {
4464                nodes: vec![
4465                    ClusterNode {
4466                        host: "node1.secure-cluster.com".to_string(),
4467                        port: 7000,
4468                    },
4469                    ClusterNode {
4470                        host: "redis://node2.secure-cluster.com:7001".to_string(),
4471                        port: 7001,
4472                    },
4473                    ClusterNode {
4474                        host: "rediss://node3.secure-cluster.com".to_string(),
4475                        port: 7002,
4476                    },
4477                ],
4478                username: None,
4479                password: Some("cluster-secret".to_string()),
4480                use_tls: true,
4481            },
4482            ..Default::default()
4483        };
4484
4485        assert_eq!(
4486            conn.cluster_node_urls(),
4487            vec![
4488                "rediss://:cluster-secret@node1.secure-cluster.com:7000",
4489                "redis://:cluster-secret@node2.secure-cluster.com:7001",
4490                "rediss://:cluster-secret@node3.secure-cluster.com:7002",
4491            ]
4492        );
4493    }
4494
4495    #[test]
4496    fn test_cluster_node_urls_fallback_to_legacy_nodes() {
4497        let conn = RedisConnection {
4498            password: Some("fallback-secret".to_string()),
4499            cluster_nodes: vec![ClusterNode {
4500                host: "legacy-node.example.com".to_string(),
4501                port: 7000,
4502            }],
4503            ..Default::default()
4504        };
4505
4506        assert_eq!(
4507            conn.cluster_node_urls(),
4508            vec!["redis://:fallback-secret@legacy-node.example.com:7000"]
4509        );
4510    }
4511
4512    #[test]
4513    fn test_normalize_cluster_seed_urls() {
4514        let conn = RedisConnection {
4515            cluster: RedisClusterConnection {
4516                nodes: Vec::new(),
4517                username: Some("svc-user".to_string()),
4518                password: Some("svc-pass".to_string()),
4519                use_tls: true,
4520            },
4521            ..Default::default()
4522        };
4523
4524        let seeds = vec![
4525            "node1.example.com:7000".to_string(),
4526            "redis://node2.example.com:7001".to_string(),
4527            "rediss://node3.example.com".to_string(),
4528        ];
4529
4530        assert_eq!(
4531            conn.normalize_cluster_seed_urls(&seeds),
4532            vec![
4533                "rediss://svc-user:svc-pass@node1.example.com:7000",
4534                "redis://svc-user:svc-pass@node2.example.com:7001",
4535                "rediss://svc-user:svc-pass@node3.example.com:6379",
4536            ]
4537        );
4538    }
4539}
4540
4541#[cfg(test)]
4542mod cluster_node_tests {
4543    use super::ClusterNode;
4544
4545    #[test]
4546    fn test_to_url_basic_host() {
4547        let node = ClusterNode {
4548            host: "localhost".to_string(),
4549            port: 6379,
4550        };
4551        assert_eq!(node.to_url(), "redis://localhost:6379");
4552    }
4553
4554    #[test]
4555    fn test_to_url_ip_address() {
4556        let node = ClusterNode {
4557            host: "127.0.0.1".to_string(),
4558            port: 6379,
4559        };
4560        assert_eq!(node.to_url(), "redis://127.0.0.1:6379");
4561    }
4562
4563    #[test]
4564    fn test_to_url_with_redis_protocol() {
4565        let node = ClusterNode {
4566            host: "redis://example.com".to_string(),
4567            port: 6379,
4568        };
4569        assert_eq!(node.to_url(), "redis://example.com:6379");
4570    }
4571
4572    #[test]
4573    fn test_to_url_with_rediss_protocol() {
4574        let node = ClusterNode {
4575            host: "rediss://secure.example.com".to_string(),
4576            port: 6379,
4577        };
4578        assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
4579    }
4580
4581    #[test]
4582    fn test_to_url_with_rediss_protocol_and_port_in_url() {
4583        let node = ClusterNode {
4584            host: "rediss://secure.example.com:7000".to_string(),
4585            port: 6379,
4586        };
4587        assert_eq!(node.to_url(), "rediss://secure.example.com:7000");
4588    }
4589
4590    #[test]
4591    fn test_to_url_with_redis_protocol_and_port_in_url() {
4592        let node = ClusterNode {
4593            host: "redis://example.com:7001".to_string(),
4594            port: 6379,
4595        };
4596        assert_eq!(node.to_url(), "redis://example.com:7001");
4597    }
4598
4599    #[test]
4600    fn test_to_url_with_trailing_whitespace() {
4601        let node = ClusterNode {
4602            host: "  rediss://secure.example.com  ".to_string(),
4603            port: 6379,
4604        };
4605        assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
4606    }
4607
4608    #[test]
4609    fn test_to_url_custom_port() {
4610        let node = ClusterNode {
4611            host: "redis-cluster.example.com".to_string(),
4612            port: 7000,
4613        };
4614        assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7000");
4615    }
4616
4617    #[test]
4618    fn test_to_url_plain_host_with_port_in_host_field() {
4619        let node = ClusterNode {
4620            host: "redis-cluster.example.com:7010".to_string(),
4621            port: 7000,
4622        };
4623        assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7010");
4624    }
4625
4626    #[test]
4627    fn test_to_url_with_options_adds_auth_and_tls() {
4628        let node = ClusterNode {
4629            host: "node.example.com".to_string(),
4630            port: 7000,
4631        };
4632        assert_eq!(
4633            node.to_url_with_options(true, Some("svc-user"), Some("secret")),
4634            "rediss://svc-user:secret@node.example.com:7000"
4635        );
4636    }
4637
4638    #[test]
4639    fn test_to_url_with_options_keeps_embedded_auth() {
4640        let node = ClusterNode {
4641            host: "rediss://:node-secret@node.example.com:7000".to_string(),
4642            port: 7000,
4643        };
4644        assert_eq!(
4645            node.to_url_with_options(true, Some("global-user"), Some("global-secret")),
4646            "rediss://:node-secret@node.example.com:7000"
4647        );
4648    }
4649
4650    #[test]
4651    fn test_from_seed_parses_plain_host_port() {
4652        let node = ClusterNode::from_seed("cluster-node-1:7005").expect("node should parse");
4653        assert_eq!(node.host, "cluster-node-1");
4654        assert_eq!(node.port, 7005);
4655    }
4656
4657    #[test]
4658    fn test_from_seed_keeps_scheme_urls() {
4659        let node =
4660            ClusterNode::from_seed("rediss://secure.example.com:7005").expect("node should parse");
4661        assert_eq!(node.host, "rediss://secure.example.com:7005");
4662        assert_eq!(node.port, 7005);
4663    }
4664
4665    #[test]
4666    fn test_to_url_aws_elasticache_hostname() {
4667        let node = ClusterNode {
4668            host: "rediss://my-cluster.use1.cache.amazonaws.com".to_string(),
4669            port: 6379,
4670        };
4671        assert_eq!(
4672            node.to_url(),
4673            "rediss://my-cluster.use1.cache.amazonaws.com:6379"
4674        );
4675    }
4676
4677    #[test]
4678    fn test_to_url_with_ipv6_no_port() {
4679        let node = ClusterNode {
4680            host: "rediss://[::1]".to_string(),
4681            port: 6379,
4682        };
4683        assert_eq!(node.to_url(), "rediss://[::1]:6379");
4684    }
4685
4686    #[test]
4687    fn test_to_url_with_ipv6_and_port_in_url() {
4688        let node = ClusterNode {
4689            host: "rediss://[::1]:7000".to_string(),
4690            port: 6379,
4691        };
4692        assert_eq!(node.to_url(), "rediss://[::1]:7000");
4693    }
4694
4695    #[test]
4696    fn test_to_url_with_ipv6_full_address_no_port() {
4697        let node = ClusterNode {
4698            host: "rediss://[2001:db8::1]".to_string(),
4699            port: 6379,
4700        };
4701        assert_eq!(node.to_url(), "rediss://[2001:db8::1]:6379");
4702    }
4703
4704    #[test]
4705    fn test_to_url_with_ipv6_full_address_with_port() {
4706        let node = ClusterNode {
4707            host: "rediss://[2001:db8::1]:7000".to_string(),
4708            port: 6379,
4709        };
4710        assert_eq!(node.to_url(), "rediss://[2001:db8::1]:7000");
4711    }
4712
4713    #[test]
4714    fn test_to_url_with_redis_protocol_ipv6() {
4715        let node = ClusterNode {
4716            host: "redis://[::1]".to_string(),
4717            port: 6379,
4718        };
4719        assert_eq!(node.to_url(), "redis://[::1]:6379");
4720    }
4721}
4722
4723#[cfg(test)]
4724mod cors_config_tests {
4725    use super::CorsConfig;
4726
4727    fn cors_from_json(json: &str) -> sonic_rs::Result<CorsConfig> {
4728        sonic_rs::from_str(json)
4729    }
4730
4731    #[test]
4732    fn test_deserialize_valid_exact_origins() {
4733        let config =
4734            cors_from_json(r#"{"origin": ["https://example.com", "https://other.com"]}"#).unwrap();
4735        assert_eq!(config.origin.len(), 2);
4736    }
4737
4738    #[test]
4739    fn test_deserialize_valid_wildcard_patterns() {
4740        let config =
4741            cors_from_json(r#"{"origin": ["*.example.com", "https://*.staging.example.com"]}"#)
4742                .unwrap();
4743        assert_eq!(config.origin.len(), 2);
4744    }
4745
4746    #[test]
4747    fn test_deserialize_allows_special_markers() {
4748        assert!(cors_from_json(r#"{"origin": ["*"]}"#).is_ok());
4749        assert!(cors_from_json(r#"{"origin": ["Any"]}"#).is_ok());
4750        assert!(cors_from_json(r#"{"origin": ["any"]}"#).is_ok());
4751        assert!(cors_from_json(r#"{"origin": ["*", "https://example.com"]}"#).is_ok());
4752    }
4753
4754    #[test]
4755    fn test_deserialize_rejects_invalid_patterns() {
4756        assert!(cors_from_json(r#"{"origin": ["*.*bad"]}"#).is_err());
4757        assert!(cors_from_json(r#"{"origin": ["*."]}"#).is_err());
4758        assert!(cors_from_json(r#"{"origin": [""]}"#).is_err());
4759        assert!(cors_from_json(r#"{"origin": ["https://"]}"#).is_err());
4760    }
4761
4762    #[test]
4763    fn test_deserialize_rejects_mixed_valid_and_invalid() {
4764        assert!(cors_from_json(r#"{"origin": ["https://good.com", "*.*bad"]}"#).is_err());
4765    }
4766}