Skip to main content

sockudo_core/
options.rs

1use crate::app::App;
2use crate::utils::{parse_bool_env, parse_env, parse_env_optional};
3use ahash::AHashMap;
4use serde::{Deserialize, Serialize};
5use std::str::FromStr;
6use tracing::{info, warn};
7use url::Url;
8
9// Custom deserializer for octal permission mode (string format only, like chmod)
10fn deserialize_octal_permission<'de, D>(deserializer: D) -> Result<u32, D::Error>
11where
12    D: serde::Deserializer<'de>,
13{
14    use serde::de::{self};
15
16    let s = String::deserialize(deserializer)?;
17
18    // Validate that the string contains only octal digits
19    if !s.chars().all(|c| c.is_digit(8)) {
20        return Err(de::Error::custom(format!(
21            "invalid octal permission mode '{}': must contain only digits 0-7",
22            s
23        )));
24    }
25
26    // Parse as octal
27    let mode = u32::from_str_radix(&s, 8)
28        .map_err(|_| de::Error::custom(format!("invalid octal permission mode: {}", s)))?;
29
30    // Validate it's within valid Unix permission range
31    if mode > 0o777 {
32        return Err(de::Error::custom(format!(
33            "permission mode '{}' exceeds maximum value 777",
34            s
35        )));
36    }
37
38    Ok(mode)
39}
40
41// Helper function to parse driver enums with fallback behavior (matches main.rs)
42fn parse_driver_enum<T: FromStr + Clone + std::fmt::Debug>(
43    driver_str: String,
44    default_driver: T,
45    driver_name: &str,
46) -> T
47where
48    <T as FromStr>::Err: std::fmt::Debug,
49{
50    match T::from_str(&driver_str.to_lowercase()) {
51        Ok(driver_enum) => driver_enum,
52        Err(e) => {
53            warn!(
54                "Failed to parse {} driver from string '{}': {:?}. Using default: {:?}.",
55                driver_name, driver_str, e, default_driver
56            );
57            default_driver
58        }
59    }
60}
61
62fn override_db_pool_settings(db_conn: &mut DatabaseConnection, prefix: &str) {
63    if let Some(min) = parse_env_optional::<u32>(&format!("{}_POOL_MIN", prefix)) {
64        db_conn.pool_min = Some(min);
65    }
66    if let Some(max) = parse_env_optional::<u32>(&format!("{}_POOL_MAX", prefix)) {
67        db_conn.pool_max = Some(max);
68    }
69}
70
71// --- Enums for Driver Types ---
72
73#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(rename_all = "lowercase")]
75pub enum AdapterDriver {
76    #[default]
77    Local,
78    Redis,
79    #[serde(rename = "redis-cluster")]
80    RedisCluster,
81    Nats,
82    Pulsar,
83    RabbitMq,
84    #[serde(rename = "google-pubsub")]
85    GooglePubSub,
86    Kafka,
87    Iggy,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91#[serde(default)]
92pub struct DynamoDbSettings {
93    pub region: String,
94    pub table_name: String,
95    pub endpoint_url: Option<String>,
96    pub aws_access_key_id: Option<String>,
97    pub aws_secret_access_key: Option<String>,
98    pub aws_profile_name: Option<String>,
99}
100
101impl Default for DynamoDbSettings {
102    fn default() -> Self {
103        Self {
104            region: "us-east-1".to_string(),
105            table_name: "sockudo-applications".to_string(),
106            endpoint_url: None,
107            aws_access_key_id: None,
108            aws_secret_access_key: None,
109            aws_profile_name: None,
110        }
111    }
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
115#[serde(default)]
116pub struct ScyllaDbSettings {
117    pub nodes: Vec<String>,
118    pub keyspace: String,
119    pub table_name: String,
120    pub username: Option<String>,
121    pub password: Option<String>,
122    pub replication_class: String,
123    pub replication_factor: u32,
124}
125
126impl Default for ScyllaDbSettings {
127    fn default() -> Self {
128        Self {
129            nodes: vec!["127.0.0.1:9042".to_string()],
130            keyspace: "sockudo".to_string(),
131            table_name: "applications".to_string(),
132            username: None,
133            password: None,
134            replication_class: "SimpleStrategy".to_string(),
135            replication_factor: 3,
136        }
137    }
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
141#[serde(default)]
142pub struct SurrealDbSettings {
143    pub url: String,
144    pub namespace: String,
145    pub database: String,
146    pub username: String,
147    pub password: String,
148    pub table_name: String,
149    pub cache_ttl: u64,
150    pub cache_max_capacity: u64,
151}
152
153impl Default for SurrealDbSettings {
154    fn default() -> Self {
155        Self {
156            url: "ws://127.0.0.1:8000".to_string(),
157            namespace: "sockudo".to_string(),
158            database: "sockudo".to_string(),
159            username: "root".to_string(),
160            password: "root".to_string(),
161            table_name: "applications".to_string(),
162            cache_ttl: 300,
163            cache_max_capacity: 100,
164        }
165    }
166}
167
168impl FromStr for AdapterDriver {
169    type Err = String;
170    fn from_str(s: &str) -> Result<Self, Self::Err> {
171        match s.to_lowercase().as_str() {
172            "local" => Ok(AdapterDriver::Local),
173            "redis" => Ok(AdapterDriver::Redis),
174            "redis-cluster" => Ok(AdapterDriver::RedisCluster),
175            "nats" => Ok(AdapterDriver::Nats),
176            "pulsar" => Ok(AdapterDriver::Pulsar),
177            "rabbitmq" | "rabbit-mq" => Ok(AdapterDriver::RabbitMq),
178            "google-pubsub" | "gcp-pubsub" | "pubsub" => Ok(AdapterDriver::GooglePubSub),
179            "kafka" => Ok(AdapterDriver::Kafka),
180            "iggy" | "apache-iggy" | "apache_iggy" => Ok(AdapterDriver::Iggy),
181            _ => Err(format!("Unknown adapter driver: {s}")),
182        }
183    }
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
187#[serde(rename_all = "lowercase")]
188pub enum AppManagerDriver {
189    #[default]
190    Memory,
191    Mysql,
192    Dynamodb,
193    PgSql,
194    SurrealDb,
195    ScyllaDb,
196}
197impl FromStr for AppManagerDriver {
198    type Err = String;
199    fn from_str(s: &str) -> Result<Self, Self::Err> {
200        match s.to_lowercase().as_str() {
201            "memory" => Ok(AppManagerDriver::Memory),
202            "mysql" => Ok(AppManagerDriver::Mysql),
203            "dynamodb" => Ok(AppManagerDriver::Dynamodb),
204            "pgsql" | "postgres" | "postgresql" => Ok(AppManagerDriver::PgSql),
205            "surreal" | "surrealdb" => Ok(AppManagerDriver::SurrealDb),
206            "scylladb" | "scylla" => Ok(AppManagerDriver::ScyllaDb),
207            _ => Err(format!("Unknown app manager driver: {s}")),
208        }
209    }
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
213#[serde(rename_all = "lowercase")]
214pub enum CacheDriver {
215    #[default]
216    Memory,
217    Redis,
218    #[serde(rename = "redis-cluster")]
219    RedisCluster,
220    None,
221}
222
223impl FromStr for CacheDriver {
224    type Err = String;
225    fn from_str(s: &str) -> Result<Self, Self::Err> {
226        match s.to_lowercase().as_str() {
227            "memory" => Ok(CacheDriver::Memory),
228            "redis" => Ok(CacheDriver::Redis),
229            "redis-cluster" => Ok(CacheDriver::RedisCluster),
230            "none" => Ok(CacheDriver::None),
231            _ => Err(format!("Unknown cache driver: {s}")),
232        }
233    }
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
237#[serde(rename_all = "lowercase")]
238pub enum QueueDriver {
239    Memory,
240    #[default]
241    Redis,
242    #[serde(rename = "redis-cluster")]
243    RedisCluster,
244    Nats,
245    Pulsar,
246    RabbitMq,
247    #[serde(rename = "google-pubsub")]
248    GooglePubSub,
249    Kafka,
250    Iggy,
251    Sqs,
252    Sns,
253    None,
254}
255
256impl FromStr for QueueDriver {
257    type Err = String;
258    fn from_str(s: &str) -> Result<Self, Self::Err> {
259        match s.to_lowercase().as_str() {
260            "memory" => Ok(QueueDriver::Memory),
261            "redis" => Ok(QueueDriver::Redis),
262            "redis-cluster" => Ok(QueueDriver::RedisCluster),
263            "nats" => Ok(QueueDriver::Nats),
264            "pulsar" => Ok(QueueDriver::Pulsar),
265            "rabbitmq" | "rabbit-mq" => Ok(QueueDriver::RabbitMq),
266            "google-pubsub" | "gcp-pubsub" | "pubsub" => Ok(QueueDriver::GooglePubSub),
267            "kafka" => Ok(QueueDriver::Kafka),
268            "iggy" | "apache-iggy" | "apache_iggy" => Ok(QueueDriver::Iggy),
269            "sqs" => Ok(QueueDriver::Sqs),
270            "sns" => Ok(QueueDriver::Sns),
271            "none" => Ok(QueueDriver::None),
272            _ => Err(format!("Unknown queue driver: {s}")),
273        }
274    }
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
278#[serde(rename_all = "snake_case")]
279pub enum DeltaCoordinationBackend {
280    #[default]
281    Auto,
282    None,
283    Redis,
284    RedisCluster,
285    Nats,
286}
287
288impl FromStr for DeltaCoordinationBackend {
289    type Err = String;
290
291    fn from_str(s: &str) -> Result<Self, Self::Err> {
292        match s.trim().to_ascii_lowercase().as_str() {
293            "auto" => Ok(Self::Auto),
294            "none" => Ok(Self::None),
295            "redis" => Ok(Self::Redis),
296            "redis-cluster" | "redis_cluster" => Ok(Self::RedisCluster),
297            "nats" => Ok(Self::Nats),
298            _ => Err(format!("Unknown delta coordination backend: {s}")),
299        }
300    }
301}
302
303impl AsRef<str> for QueueDriver {
304    fn as_ref(&self) -> &str {
305        match self {
306            QueueDriver::Memory => "memory",
307            QueueDriver::Redis => "redis",
308            QueueDriver::RedisCluster => "redis-cluster",
309            QueueDriver::Nats => "nats",
310            QueueDriver::Pulsar => "pulsar",
311            QueueDriver::RabbitMq => "rabbitmq",
312            QueueDriver::GooglePubSub => "google-pubsub",
313            QueueDriver::Kafka => "kafka",
314            QueueDriver::Iggy => "iggy",
315            QueueDriver::Sqs => "sqs",
316            QueueDriver::Sns => "sns",
317            QueueDriver::None => "none",
318        }
319    }
320}
321
322#[derive(Debug, Clone, Serialize, Deserialize)]
323#[serde(default)]
324pub struct RedisClusterQueueConfig {
325    pub concurrency: u32,
326    pub prefix: Option<String>,
327    pub nodes: Vec<String>,
328    pub request_timeout_ms: u64,
329}
330
331impl Default for RedisClusterQueueConfig {
332    fn default() -> Self {
333        Self {
334            concurrency: 5,
335            prefix: Some("sockudo_queue:".to_string()),
336            nodes: vec!["redis://127.0.0.1:6379".to_string()],
337            request_timeout_ms: 5000,
338        }
339    }
340}
341
342#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
343#[serde(rename_all = "lowercase")]
344pub enum MetricsDriver {
345    #[default]
346    Prometheus,
347}
348
349#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
350#[serde(rename_all = "lowercase")]
351pub enum LogOutputFormat {
352    #[default]
353    Human,
354    Json,
355}
356
357impl FromStr for LogOutputFormat {
358    type Err = String;
359    fn from_str(s: &str) -> Result<Self, Self::Err> {
360        match s.to_lowercase().as_str() {
361            "human" => Ok(LogOutputFormat::Human),
362            "json" => Ok(LogOutputFormat::Json),
363            _ => Err(format!("Unknown log output format: {s}")),
364        }
365    }
366}
367
368impl FromStr for MetricsDriver {
369    type Err = String;
370    fn from_str(s: &str) -> Result<Self, Self::Err> {
371        match s.to_lowercase().as_str() {
372            "prometheus" => Ok(MetricsDriver::Prometheus),
373            _ => Err(format!("Unknown metrics driver: {s}")),
374        }
375    }
376}
377
378impl AsRef<str> for MetricsDriver {
379    fn as_ref(&self) -> &str {
380        match self {
381            MetricsDriver::Prometheus => "prometheus",
382        }
383    }
384}
385
386// --- Main Configuration Struct ---
387#[derive(Debug, Clone, Serialize, Deserialize)]
388#[serde(default)]
389pub struct ServerOptions {
390    pub adapter: AdapterConfig,
391    pub app_manager: AppManagerConfig,
392    pub cache: CacheConfig,
393    pub channel_limits: ChannelLimits,
394    pub cors: CorsConfig,
395    pub database: DatabaseConfig,
396    pub database_pooling: DatabasePooling,
397    pub debug: bool,
398    pub event_limits: EventLimits,
399    pub host: String,
400    pub http_api: HttpApiConfig,
401    pub instance: InstanceConfig,
402    pub logging: Option<LoggingConfig>,
403    pub metrics: MetricsConfig,
404    pub mode: String,
405    pub port: u16,
406    pub path_prefix: String,
407    pub presence: PresenceConfig,
408    pub queue: QueueConfig,
409    pub rate_limiter: RateLimiterConfig,
410    pub shutdown_grace_period: u64,
411    pub ssl: SslConfig,
412    pub user_authentication_timeout: u64,
413    pub webhooks: WebhooksConfig,
414    pub websocket_max_payload_kb: u32,
415    pub cleanup: CleanupConfig,
416    pub activity_timeout: u64,
417    pub cluster_health: ClusterHealthConfig,
418    pub unix_socket: UnixSocketConfig,
419    pub delta_compression: DeltaCompressionOptionsConfig,
420    pub tag_filtering: TagFilteringConfig,
421    pub websocket: WebSocketConfig,
422    pub connection_recovery: ConnectionRecoveryConfig,
423    pub history: HistoryConfig,
424    pub presence_history: PresenceHistoryConfig,
425    pub idempotency: IdempotencyConfig,
426    pub ephemeral: EphemeralConfig,
427    pub echo_control: EchoControlConfig,
428    pub event_name_filtering: EventNameFilteringConfig,
429    pub versioned_messages: VersionedMessagesConfig,
430    pub annotations: AnnotationsConfig,
431    pub push: PushConfig,
432    /// Timeout in milliseconds for each subsystem check in the `/up` health endpoint.
433    /// Applies to adapter, cache, queue, and app manager checks independently.
434    pub health_check_timeout_ms: u64,
435}
436
437// --- Configuration Sub-Structs ---
438
439#[derive(Debug, Clone, Serialize, Deserialize)]
440#[serde(default)]
441pub struct SqsQueueConfig {
442    pub region: String,
443    pub queue_url_prefix: Option<String>,
444    pub visibility_timeout: i32,
445    pub endpoint_url: Option<String>,
446    pub max_messages: i32,
447    pub wait_time_seconds: i32,
448    pub concurrency: u32,
449    pub fifo: bool,
450    pub message_group_id: Option<String>,
451}
452
453#[derive(Debug, Clone, Serialize, Deserialize)]
454#[serde(default)]
455pub struct SnsQueueConfig {
456    pub region: String,
457    pub topic_arn: String,
458    pub endpoint_url: Option<String>,
459}
460
461#[derive(Debug, Clone, Serialize, Deserialize)]
462#[serde(default)]
463pub struct AdapterConfig {
464    pub driver: AdapterDriver,
465    pub redis: RedisAdapterConfig,
466    pub cluster: RedisClusterAdapterConfig,
467    pub nats: NatsAdapterConfig,
468    pub pulsar: PulsarAdapterConfig,
469    pub rabbitmq: RabbitMqAdapterConfig,
470    pub google_pubsub: GooglePubSubAdapterConfig,
471    pub kafka: KafkaAdapterConfig,
472    pub iggy: IggyConfig,
473    #[serde(default = "default_buffer_multiplier_per_cpu")]
474    pub buffer_multiplier_per_cpu: usize,
475    pub cluster_health: ClusterHealthConfig,
476    #[serde(default = "default_enable_socket_counting")]
477    pub enable_socket_counting: bool,
478    #[serde(default = "default_fallback_to_local")]
479    pub fallback_to_local: bool,
480    /// Tier 1A: maintain cluster-wide channel counts locally via gossip so count
481    /// reads (subscription_count, /channels, occupancy) become local with zero
482    /// cross-node fan-out. Off by default; falls back to request/reply when off.
483    #[serde(default = "default_aggregate_counts")]
484    pub aggregate_counts: bool,
485}
486
487fn default_aggregate_counts() -> bool {
488    false
489}
490
491fn default_enable_socket_counting() -> bool {
492    true
493}
494
495fn default_fallback_to_local() -> bool {
496    true
497}
498
499fn default_buffer_multiplier_per_cpu() -> usize {
500    64
501}
502
503impl Default for AdapterConfig {
504    fn default() -> Self {
505        Self {
506            driver: AdapterDriver::default(),
507            redis: RedisAdapterConfig::default(),
508            cluster: RedisClusterAdapterConfig::default(),
509            nats: NatsAdapterConfig::default(),
510            pulsar: PulsarAdapterConfig::default(),
511            rabbitmq: RabbitMqAdapterConfig::default(),
512            google_pubsub: GooglePubSubAdapterConfig::default(),
513            kafka: KafkaAdapterConfig::default(),
514            iggy: IggyConfig::default(),
515            buffer_multiplier_per_cpu: default_buffer_multiplier_per_cpu(),
516            cluster_health: ClusterHealthConfig::default(),
517            enable_socket_counting: default_enable_socket_counting(),
518            fallback_to_local: default_fallback_to_local(),
519            aggregate_counts: default_aggregate_counts(),
520        }
521    }
522}
523
524#[derive(Debug, Clone, Serialize, Deserialize)]
525#[serde(default)]
526pub struct RedisAdapterConfig {
527    pub requests_timeout: u64,
528    pub prefix: String,
529    pub redis_pub_options: AHashMap<String, sonic_rs::Value>,
530    pub redis_sub_options: AHashMap<String, sonic_rs::Value>,
531    pub cluster_mode: bool,
532}
533
534#[derive(Debug, Clone, Serialize, Deserialize)]
535#[serde(default)]
536pub struct RedisClusterAdapterConfig {
537    pub nodes: Vec<String>,
538    pub prefix: String,
539    pub request_timeout_ms: u64,
540    pub use_connection_manager: bool,
541    #[serde(default)]
542    pub use_sharded_pubsub: bool,
543}
544
545#[derive(Debug, Clone, Serialize, Deserialize)]
546#[serde(default)]
547pub struct NatsAdapterConfig {
548    pub servers: Vec<String>,
549    pub prefix: String,
550    pub request_timeout_ms: u64,
551    pub username: Option<String>,
552    pub password: Option<String>,
553    pub token: Option<String>,
554    pub connection_timeout_ms: u64,
555    pub nodes_number: Option<u32>,
556    pub discovery_max_wait_ms: u64,
557    pub discovery_idle_wait_ms: u64,
558    pub subscription_capacity: Option<usize>,
559    pub client_capacity: Option<usize>,
560    pub max_reconnects: Option<usize>,
561    pub presence_sync_chunk_size: Option<usize>,
562}
563
564#[derive(Debug, Clone, Serialize, Deserialize)]
565#[serde(default)]
566pub struct PulsarAdapterConfig {
567    pub url: String,
568    pub prefix: String,
569    pub request_timeout_ms: u64,
570    pub token: Option<String>,
571    pub nodes_number: Option<u32>,
572}
573
574#[derive(Debug, Clone, Serialize, Deserialize)]
575#[serde(default)]
576pub struct RabbitMqAdapterConfig {
577    pub url: String,
578    pub prefix: String,
579    pub request_timeout_ms: u64,
580    pub connection_timeout_ms: u64,
581    pub nodes_number: Option<u32>,
582}
583
584#[derive(Debug, Clone, Serialize, Deserialize)]
585#[serde(default)]
586pub struct GooglePubSubAdapterConfig {
587    pub project_id: String,
588    pub prefix: String,
589    pub request_timeout_ms: u64,
590    pub emulator_host: Option<String>,
591    pub nodes_number: Option<u32>,
592}
593
594#[derive(Debug, Clone, Serialize, Deserialize)]
595#[serde(default)]
596pub struct KafkaAdapterConfig {
597    pub brokers: Vec<String>,
598    pub prefix: String,
599    pub request_timeout_ms: u64,
600    pub security_protocol: Option<String>,
601    pub sasl_mechanism: Option<String>,
602    pub sasl_username: Option<String>,
603    pub sasl_password: Option<String>,
604    pub nodes_number: Option<u32>,
605}
606
607#[derive(Debug, Clone, Serialize, Deserialize)]
608#[serde(default)]
609pub struct IggyConfig {
610    pub connection_string: String,
611    pub username: Option<String>,
612    pub password: Option<String>,
613    pub consumer_name: Option<String>,
614    pub stream: String,
615    pub topic_prefix: String,
616    pub queue_topic_prefix: String,
617    pub consumer_group_prefix: String,
618    pub request_timeout_ms: u64,
619    pub poll_interval_ms: u64,
620    pub poll_batch_size: u32,
621    pub partitions_count: u32,
622    pub partition_id: u32,
623    pub auto_create: bool,
624    pub start_from_latest: bool,
625    pub nodes_number: Option<u32>,
626}
627
628#[derive(Debug, Clone, Serialize, Deserialize, Default)]
629#[serde(default)]
630pub struct AppManagerConfig {
631    pub driver: AppManagerDriver,
632    pub array: ArrayConfig,
633    pub cache: CacheSettings,
634    pub scylladb: ScyllaDbSettings,
635}
636
637#[derive(Debug, Clone, Serialize, Deserialize, Default)]
638#[serde(default)]
639pub struct ArrayConfig {
640    pub apps: Vec<App>,
641}
642
643#[derive(Debug, Clone, Serialize, Deserialize)]
644#[serde(default)]
645pub struct CacheSettings {
646    pub enabled: bool,
647    pub ttl: u64,
648}
649
650#[derive(Debug, Clone, Serialize, Deserialize)]
651#[serde(default)]
652pub struct MemoryCacheOptions {
653    pub ttl: u64,
654    pub cleanup_interval: u64,
655    pub max_capacity: u64,
656}
657
658#[derive(Debug, Clone, Serialize, Deserialize)]
659#[serde(default)]
660pub struct CacheConfig {
661    pub driver: CacheDriver,
662    pub redis: RedisConfig,
663    pub memory: MemoryCacheOptions,
664}
665
666#[derive(Debug, Clone, Serialize, Deserialize, Default)]
667#[serde(default)]
668pub struct RedisConfig {
669    pub prefix: Option<String>,
670    pub url_override: Option<String>,
671    pub cluster_mode: bool,
672}
673
674#[derive(Debug, Clone, Serialize, Deserialize)]
675#[serde(default)]
676pub struct ChannelLimits {
677    pub max_name_length: u32,
678    pub cache_ttl: u64,
679}
680
681#[derive(Debug, Clone, Serialize, Deserialize)]
682#[serde(default)]
683pub struct CorsConfig {
684    pub credentials: bool,
685    #[serde(deserialize_with = "deserialize_and_validate_cors_origins")]
686    pub origin: Vec<String>,
687    pub methods: Vec<String>,
688    pub allowed_headers: Vec<String>,
689}
690
691fn deserialize_and_validate_cors_origins<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
692where
693    D: serde::Deserializer<'de>,
694{
695    use serde::de::Error;
696    let origins = Vec::<String>::deserialize(deserializer)?;
697
698    if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&origins) {
699        return Err(D::Error::custom(format!(
700            "CORS origin pattern validation failed: {}",
701            e
702        )));
703    }
704
705    Ok(origins)
706}
707
708#[derive(Debug, Clone, Serialize, Deserialize, Default)]
709#[serde(default)]
710pub struct DatabaseConfig {
711    pub mysql: DatabaseConnection,
712    pub postgres: DatabaseConnection,
713    pub redis: RedisConnection,
714    pub dynamodb: DynamoDbSettings,
715    pub surrealdb: SurrealDbSettings,
716    pub scylladb: ScyllaDbSettings,
717}
718
719#[derive(Debug, Clone, Serialize, Deserialize)]
720#[serde(default)]
721pub struct DatabaseConnection {
722    pub host: String,
723    pub port: u16,
724    pub username: String,
725    pub password: String,
726    pub database: String,
727    pub table_name: String,
728    pub connection_pool_size: u32,
729    pub pool_min: Option<u32>,
730    pub pool_max: Option<u32>,
731    pub cache_ttl: u64,
732    pub cache_cleanup_interval: u64,
733    pub cache_max_capacity: u64,
734}
735
736#[derive(Debug, Clone, Serialize, Deserialize)]
737#[serde(default)]
738pub struct RedisConnection {
739    pub host: String,
740    pub port: u16,
741    pub db: u32,
742    pub username: Option<String>,
743    pub password: Option<String>,
744    pub key_prefix: String,
745    pub sentinels: Vec<RedisSentinel>,
746    pub sentinel_password: Option<String>,
747    pub name: String,
748    pub cluster: RedisClusterConnection,
749    /// Legacy field kept for backward compatibility. Prefer `database.redis.cluster.nodes`.
750    pub cluster_nodes: Vec<ClusterNode>,
751}
752
753#[derive(Debug, Clone, Serialize, Deserialize)]
754#[serde(default)]
755pub struct RedisSentinel {
756    pub host: String,
757    pub port: u16,
758}
759
760#[derive(Debug, Clone, Serialize, Deserialize, Default)]
761#[serde(default)]
762pub struct RedisClusterConnection {
763    pub nodes: Vec<ClusterNode>,
764    pub username: Option<String>,
765    pub password: Option<String>,
766    #[serde(alias = "useTLS")]
767    pub use_tls: bool,
768}
769
770impl RedisConnection {
771    /// Returns true if Redis Sentinel is configured.
772    pub fn is_sentinel_configured(&self) -> bool {
773        !self.sentinels.is_empty()
774    }
775
776    /// Builds a Redis connection URL based on the configuration.
777    pub fn to_url(&self) -> String {
778        if self.is_sentinel_configured() {
779            self.build_sentinel_url()
780        } else {
781            self.build_standard_url()
782        }
783    }
784
785    fn build_standard_url(&self) -> String {
786        // Extract scheme from host if present, otherwise default to redis://
787        let (scheme, host) = if self.host.starts_with("rediss://") {
788            ("rediss://", self.host.trim_start_matches("rediss://"))
789        } else if self.host.starts_with("redis://") {
790            ("redis://", self.host.trim_start_matches("redis://"))
791        } else {
792            ("redis://", self.host.as_str())
793        };
794
795        let mut url = String::from(scheme);
796
797        if let Some(ref username) = self.username {
798            url.push_str(username);
799            if let Some(ref password) = self.password {
800                url.push(':');
801                url.push_str(&urlencoding::encode(password));
802            }
803            url.push('@');
804        } else if let Some(ref password) = self.password {
805            url.push(':');
806            url.push_str(&urlencoding::encode(password));
807            url.push('@');
808        }
809
810        url.push_str(host);
811        url.push(':');
812        url.push_str(&self.port.to_string());
813        url.push('/');
814        url.push_str(&self.db.to_string());
815
816        url
817    }
818
819    fn build_sentinel_url(&self) -> String {
820        let mut url = String::from("redis+sentinel://");
821
822        if let Some(ref sentinel_password) = self.sentinel_password {
823            url.push(':');
824            url.push_str(&urlencoding::encode(sentinel_password));
825            url.push('@');
826        }
827
828        let sentinel_hosts: Vec<String> = self
829            .sentinels
830            .iter()
831            .map(|s| format!("{}:{}", s.host, s.port))
832            .collect();
833        url.push_str(&sentinel_hosts.join(","));
834
835        url.push('/');
836        url.push_str(&self.name);
837        url.push('/');
838        url.push_str(&self.db.to_string());
839
840        let mut params = Vec::new();
841        if let Some(ref password) = self.password {
842            params.push(format!("password={}", urlencoding::encode(password)));
843        }
844        if let Some(ref username) = self.username {
845            params.push(format!("username={}", urlencoding::encode(username)));
846        }
847
848        if !params.is_empty() {
849            url.push('?');
850            url.push_str(&params.join("&"));
851        }
852
853        url
854    }
855
856    /// Returns true when cluster nodes are configured via either the new (`cluster.nodes`)
857    /// or legacy (`cluster_nodes`) field.
858    pub fn has_cluster_nodes(&self) -> bool {
859        !self.cluster.nodes.is_empty() || !self.cluster_nodes.is_empty()
860    }
861
862    /// Returns normalized Redis Cluster seed URLs from the canonical cluster configuration.
863    /// Falls back to legacy `cluster_nodes` for backward compatibility.
864    pub fn cluster_node_urls(&self) -> Vec<String> {
865        if !self.cluster.nodes.is_empty() {
866            return self.build_cluster_urls(&self.cluster.nodes);
867        }
868        self.build_cluster_urls(&self.cluster_nodes)
869    }
870
871    /// Normalizes any list of seed strings (`host:port`, `redis://...`, `rediss://...`) using
872    /// shared cluster auth/TLS options.
873    pub fn normalize_cluster_seed_urls(&self, seeds: &[String]) -> Vec<String> {
874        self.build_cluster_urls(
875            &seeds
876                .iter()
877                .filter_map(|seed| ClusterNode::from_seed(seed))
878                .collect::<Vec<ClusterNode>>(),
879        )
880    }
881
882    fn build_cluster_urls(&self, nodes: &[ClusterNode]) -> Vec<String> {
883        let username = self
884            .cluster
885            .username
886            .as_deref()
887            .or(self.username.as_deref());
888        let password = self
889            .cluster
890            .password
891            .as_deref()
892            .or(self.password.as_deref());
893        let use_tls = self.cluster.use_tls;
894
895        nodes
896            .iter()
897            .map(|node| node.to_url_with_options(use_tls, username, password))
898            .collect()
899    }
900}
901
902impl RedisSentinel {
903    pub fn to_host_port(&self) -> String {
904        format!("{}:{}", self.host, self.port)
905    }
906}
907
908#[derive(Debug, Clone, Serialize, Deserialize)]
909#[serde(default)]
910pub struct ClusterNode {
911    pub host: String,
912    pub port: u16,
913}
914
915impl ClusterNode {
916    pub fn to_url(&self) -> String {
917        self.to_url_with_options(false, None, None)
918    }
919
920    pub fn to_url_with_options(
921        &self,
922        use_tls: bool,
923        username: Option<&str>,
924        password: Option<&str>,
925    ) -> String {
926        let host = self.host.trim();
927
928        if host.starts_with("redis://") || host.starts_with("rediss://") {
929            if let Ok(parsed) = Url::parse(host)
930                && let Some(host_str) = parsed.host_str()
931            {
932                let scheme = parsed.scheme();
933                let port = parsed.port_or_known_default().unwrap_or(self.port);
934                let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
935                let parsed_password = parsed.password();
936                let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
937                let (effective_username, effective_password) = if has_embedded_auth {
938                    (parsed_username, parsed_password)
939                } else {
940                    (username, password)
941                };
942
943                return build_redis_url(
944                    scheme,
945                    host_str,
946                    port,
947                    effective_username,
948                    effective_password,
949                );
950            }
951
952            // Fallback for malformed URLs
953            let has_port = if let Some(bracket_pos) = host.rfind(']') {
954                host[bracket_pos..].contains(':')
955            } else {
956                host.split(':').count() >= 3
957            };
958            let base = if has_port {
959                host.to_string()
960            } else {
961                format!("{}:{}", host, self.port)
962            };
963
964            if let Ok(parsed) = Url::parse(&base) {
965                let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
966                let parsed_password = parsed.password();
967                if let Some(host_str) = parsed.host_str() {
968                    let port = parsed.port_or_known_default().unwrap_or(self.port);
969                    let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
970                    let (effective_username, effective_password) = if has_embedded_auth {
971                        (parsed_username, parsed_password)
972                    } else {
973                        (username, password)
974                    };
975                    return build_redis_url(
976                        parsed.scheme(),
977                        host_str,
978                        port,
979                        effective_username,
980                        effective_password,
981                    );
982                }
983            }
984            return base;
985        }
986
987        let (normalized_host, normalized_port) = split_plain_host_and_port(host, self.port);
988        let scheme = if use_tls { "rediss" } else { "redis" };
989        build_redis_url(
990            scheme,
991            &normalized_host,
992            normalized_port,
993            username,
994            password,
995        )
996    }
997
998    pub fn from_seed(seed: &str) -> Option<Self> {
999        let trimmed = seed.trim();
1000        if trimmed.is_empty() {
1001            return None;
1002        }
1003
1004        if trimmed.starts_with("redis://") || trimmed.starts_with("rediss://") {
1005            let port = Url::parse(trimmed)
1006                .ok()
1007                .and_then(|parsed| parsed.port_or_known_default())
1008                .unwrap_or(6379);
1009            return Some(Self {
1010                host: trimmed.to_string(),
1011                port,
1012            });
1013        }
1014
1015        let (host, port) = split_plain_host_and_port(trimmed, 6379);
1016        Some(Self { host, port })
1017    }
1018}
1019
1020fn split_plain_host_and_port(raw_host: &str, default_port: u16) -> (String, u16) {
1021    let host = raw_host.trim();
1022
1023    // Handle bracketed IPv6: [::1]:6379
1024    if host.starts_with('[') {
1025        if let Some(end_bracket) = host.find(']') {
1026            let host_part = host[1..end_bracket].to_string();
1027            let remainder = &host[end_bracket + 1..];
1028            if let Some(port_str) = remainder.strip_prefix(':')
1029                && let Ok(port) = port_str.parse::<u16>()
1030            {
1031                return (host_part, port);
1032            }
1033            return (host_part, default_port);
1034        }
1035        return (host.to_string(), default_port);
1036    }
1037
1038    // Handle hostname/IP with port: host:6379
1039    if host.matches(':').count() == 1
1040        && let Some((host_part, port_part)) = host.rsplit_once(':')
1041        && let Ok(port) = port_part.parse::<u16>()
1042    {
1043        return (host_part.to_string(), port);
1044    }
1045
1046    (host.to_string(), default_port)
1047}
1048
1049fn build_redis_url(
1050    scheme: &str,
1051    host: &str,
1052    port: u16,
1053    username: Option<&str>,
1054    password: Option<&str>,
1055) -> String {
1056    let mut url = format!("{scheme}://");
1057
1058    if let Some(user) = username {
1059        url.push_str(&urlencoding::encode(user));
1060        if let Some(pass) = password {
1061            url.push(':');
1062            url.push_str(&urlencoding::encode(pass));
1063        }
1064        url.push('@');
1065    } else if let Some(pass) = password {
1066        url.push(':');
1067        url.push_str(&urlencoding::encode(pass));
1068        url.push('@');
1069    }
1070
1071    if host.contains(':') && !host.starts_with('[') {
1072        url.push('[');
1073        url.push_str(host);
1074        url.push(']');
1075    } else {
1076        url.push_str(host);
1077    }
1078    url.push(':');
1079    url.push_str(&port.to_string());
1080    url
1081}
1082
1083#[derive(Debug, Clone, Serialize, Deserialize)]
1084#[serde(default)]
1085pub struct DatabasePooling {
1086    pub enabled: bool,
1087    pub min: u32,
1088    pub max: u32,
1089}
1090
1091#[derive(Debug, Clone, Serialize, Deserialize)]
1092#[serde(default)]
1093pub struct EventLimits {
1094    pub max_channels_at_once: u32,
1095    pub max_name_length: u32,
1096    pub max_payload_in_kb: u32,
1097    pub max_batch_size: u32,
1098}
1099
1100#[derive(Debug, Clone, Serialize, Deserialize)]
1101#[serde(default)]
1102pub struct IdempotencyConfig {
1103    /// Whether idempotency key support is enabled
1104    pub enabled: bool,
1105    /// TTL in seconds for idempotency keys (default: 120s, like Ably's 2-minute window)
1106    pub ttl_seconds: u64,
1107    /// Maximum length of an idempotency key
1108    pub max_key_length: usize,
1109}
1110
1111#[derive(Debug, Clone, Serialize, Deserialize)]
1112#[serde(default)]
1113pub struct EphemeralConfig {
1114    /// Whether ephemeral message handling is enabled.
1115    pub enabled: bool,
1116}
1117
1118#[derive(Debug, Clone, Serialize, Deserialize)]
1119#[serde(default)]
1120pub struct EchoControlConfig {
1121    /// Whether connection-level and per-message echo control is enabled.
1122    pub enabled: bool,
1123    /// Default echo behavior for new V2 connections when the query param is omitted.
1124    pub default_echo_messages: bool,
1125}
1126
1127#[derive(Debug, Clone, Serialize, Deserialize)]
1128#[serde(default)]
1129pub struct EventNameFilteringConfig {
1130    /// Whether per-subscription event name filtering is enabled.
1131    pub enabled: bool,
1132    /// Maximum event names allowed in a single filter.
1133    pub max_events_per_filter: usize,
1134    /// Maximum length for each event name in a filter.
1135    pub max_event_name_length: usize,
1136}
1137
1138#[derive(Debug, Clone, Serialize, Deserialize)]
1139#[serde(default)]
1140pub struct ConnectionRecoveryConfig {
1141    /// Whether connection recovery (resume) is enabled.
1142    /// When enabled, the server keeps a bounded replay buffer per channel so that
1143    /// reconnecting clients can receive missed messages. Disabled by default for
1144    /// Pusher protocol compatibility.
1145    pub enabled: bool,
1146    /// How long messages stay in the replay buffer (seconds). Default: 120 (2 min).
1147    pub buffer_ttl_seconds: u64,
1148    /// Maximum number of messages kept per channel. Default: 100.
1149    pub max_buffer_size: usize,
1150}
1151
1152#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1153#[serde(rename_all = "lowercase")]
1154pub enum VersionStoreDriver {
1155    #[default]
1156    Memory,
1157    Postgres,
1158    Mysql,
1159    DynamoDb,
1160    ScyllaDb,
1161    SurrealDb,
1162}
1163
1164impl FromStr for VersionStoreDriver {
1165    type Err = String;
1166    fn from_str(s: &str) -> Result<Self, Self::Err> {
1167        match s.to_lowercase().as_str() {
1168            "postgres" | "postgresql" | "pgsql" => Ok(Self::Postgres),
1169            "mysql" => Ok(Self::Mysql),
1170            "dynamodb" => Ok(Self::DynamoDb),
1171            "scylladb" | "scylla" => Ok(Self::ScyllaDb),
1172            "surrealdb" | "surreal" => Ok(Self::SurrealDb),
1173            "memory" => Ok(Self::Memory),
1174            _ => Err(format!("Unknown version store driver: {s}")),
1175        }
1176    }
1177}
1178
1179#[derive(Debug, Clone, Serialize, Deserialize)]
1180#[serde(default)]
1181pub struct VersionedMessagesConfig {
1182    /// Whether V2 mutable message HTTP/retrieval surfaces are enabled.
1183    pub enabled: bool,
1184    /// Storage driver for versioned messages. Defaults to memory.
1185    pub driver: VersionStoreDriver,
1186    /// Maximum page size for version-history retrieval.
1187    pub max_page_size: usize,
1188    /// Retention window for version entries, in seconds. `0` disables expiry
1189    /// (entries are kept forever).
1190    ///
1191    /// Backends with native row TTL (ScyllaDB `USING TTL`, DynamoDB TTL
1192    /// attribute) apply this at the storage layer. Other backends (MySQL,
1193    /// PostgreSQL, SurrealDB, Memory) enforce it via a periodic background
1194    /// purge worker controlled by `purge_interval_seconds`.
1195    pub retention_window_seconds: u64,
1196    /// Interval between purge worker runs, in seconds. Only consulted for
1197    /// backends without native TTL. Clamped to a minimum of 10 seconds.
1198    pub purge_interval_seconds: u64,
1199    /// Maximum rows deleted per purge query. Bounds lock/transaction sizes
1200    /// for SQL backends. The worker loops until no more expired rows remain
1201    /// or `max_purge_per_tick` is reached.
1202    pub purge_batch_size: usize,
1203    /// Hard cap on rows deleted per purge tick across all loop iterations.
1204    /// Prevents a backlog of expired rows from monopolising a worker run.
1205    pub max_purge_per_tick: usize,
1206}
1207
1208#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1209#[serde(default)]
1210pub struct AnnotationsConfig {
1211    /// Whether Sockudo-native annotation APIs and realtime annotation protocol
1212    /// surfaces are enabled. Disabled by default while the feature is opt-in.
1213    pub enabled: bool,
1214}
1215
1216#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1217#[serde(rename_all = "lowercase")]
1218pub enum HistoryBackend {
1219    #[default]
1220    Postgres,
1221    Mysql,
1222    DynamoDb,
1223    SurrealDb,
1224    ScyllaDb,
1225    Memory,
1226}
1227
1228impl FromStr for HistoryBackend {
1229    type Err = String;
1230    fn from_str(s: &str) -> Result<Self, Self::Err> {
1231        match s.to_lowercase().as_str() {
1232            "postgres" | "postgresql" | "pgsql" => Ok(Self::Postgres),
1233            "mysql" => Ok(Self::Mysql),
1234            "dynamodb" => Ok(Self::DynamoDb),
1235            "surrealdb" | "surreal" => Ok(Self::SurrealDb),
1236            "scylladb" | "scylla" => Ok(Self::ScyllaDb),
1237            "memory" => Ok(Self::Memory),
1238            _ => Err(format!("Unknown history backend: {s}")),
1239        }
1240    }
1241}
1242
1243#[derive(Debug, Clone, Serialize, Deserialize)]
1244#[serde(default)]
1245pub struct PostgresHistoryConfig {
1246    pub table_prefix: String,
1247    pub write_timeout_ms: u64,
1248}
1249
1250impl Default for PostgresHistoryConfig {
1251    fn default() -> Self {
1252        Self {
1253            table_prefix: "sockudo_history".to_string(),
1254            write_timeout_ms: 5000,
1255        }
1256    }
1257}
1258
1259#[derive(Debug, Clone, Serialize, Deserialize)]
1260#[serde(default)]
1261pub struct MySqlHistoryConfig {
1262    pub table_prefix: String,
1263    pub write_timeout_ms: u64,
1264}
1265
1266impl Default for MySqlHistoryConfig {
1267    fn default() -> Self {
1268        Self {
1269            table_prefix: "sockudo_history".to_string(),
1270            write_timeout_ms: 5000,
1271        }
1272    }
1273}
1274
1275#[derive(Debug, Clone, Serialize, Deserialize)]
1276#[serde(default)]
1277pub struct DynamoDbHistoryConfig {
1278    pub table_prefix: String,
1279    pub write_timeout_ms: u64,
1280}
1281
1282impl Default for DynamoDbHistoryConfig {
1283    fn default() -> Self {
1284        Self {
1285            table_prefix: "sockudo_history".to_string(),
1286            write_timeout_ms: 5000,
1287        }
1288    }
1289}
1290
1291#[derive(Debug, Clone, Serialize, Deserialize)]
1292#[serde(default)]
1293pub struct SurrealDbHistoryConfig {
1294    pub table_prefix: String,
1295    pub write_timeout_ms: u64,
1296}
1297
1298impl Default for SurrealDbHistoryConfig {
1299    fn default() -> Self {
1300        Self {
1301            table_prefix: "sockudo_history".to_string(),
1302            write_timeout_ms: 5000,
1303        }
1304    }
1305}
1306
1307#[derive(Debug, Clone, Serialize, Deserialize)]
1308#[serde(default)]
1309pub struct ScyllaDbHistoryConfig {
1310    pub table_prefix: String,
1311    pub write_timeout_ms: u64,
1312}
1313
1314impl Default for ScyllaDbHistoryConfig {
1315    fn default() -> Self {
1316        Self {
1317            table_prefix: "sockudo_history".to_string(),
1318            write_timeout_ms: 5000,
1319        }
1320    }
1321}
1322
1323#[derive(Debug, Clone, Serialize, Deserialize)]
1324#[serde(default)]
1325pub struct HistoryConfig {
1326    pub enabled: bool,
1327    pub rewind_enabled: bool,
1328    pub backend: HistoryBackend,
1329    pub retention_window_seconds: u64,
1330    pub max_page_size: usize,
1331    pub max_messages_per_channel: Option<usize>,
1332    pub max_bytes_per_channel: Option<u64>,
1333    pub writer_shards: usize,
1334    pub writer_queue_capacity: usize,
1335    pub purge_interval_seconds: u64,
1336    pub purge_batch_size: usize,
1337    pub max_purge_per_tick: usize,
1338    pub postgres: PostgresHistoryConfig,
1339    pub mysql: MySqlHistoryConfig,
1340    pub dynamodb: DynamoDbHistoryConfig,
1341    pub surrealdb: SurrealDbHistoryConfig,
1342    pub scylladb: ScyllaDbHistoryConfig,
1343}
1344
1345impl Default for HistoryConfig {
1346    fn default() -> Self {
1347        Self {
1348            enabled: false,
1349            rewind_enabled: true,
1350            backend: HistoryBackend::Postgres,
1351            retention_window_seconds: 86400,
1352            max_page_size: 100,
1353            max_messages_per_channel: None,
1354            max_bytes_per_channel: None,
1355            writer_shards: 16,
1356            writer_queue_capacity: 4096,
1357            purge_interval_seconds: 300,
1358            purge_batch_size: 1000,
1359            max_purge_per_tick: 100_000,
1360            postgres: PostgresHistoryConfig::default(),
1361            mysql: MySqlHistoryConfig::default(),
1362            dynamodb: DynamoDbHistoryConfig::default(),
1363            surrealdb: SurrealDbHistoryConfig::default(),
1364            scylladb: ScyllaDbHistoryConfig::default(),
1365        }
1366    }
1367}
1368
1369impl Default for VersionedMessagesConfig {
1370    fn default() -> Self {
1371        Self {
1372            enabled: false,
1373            driver: VersionStoreDriver::Memory,
1374            max_page_size: 100,
1375            retention_window_seconds: 0,
1376            purge_interval_seconds: 300,
1377            purge_batch_size: 1000,
1378            max_purge_per_tick: 100_000,
1379        }
1380    }
1381}
1382
1383#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1384#[serde(rename_all = "lowercase")]
1385pub enum PushStorageDriver {
1386    #[default]
1387    Memory,
1388    Postgres,
1389    Mysql,
1390    DynamoDb,
1391    SurrealDb,
1392    ScyllaDb,
1393}
1394
1395impl FromStr for PushStorageDriver {
1396    type Err = String;
1397    fn from_str(s: &str) -> Result<Self, Self::Err> {
1398        match s.to_lowercase().as_str() {
1399            "memory" => Ok(Self::Memory),
1400            "postgres" | "postgresql" | "pgsql" => Ok(Self::Postgres),
1401            "mysql" => Ok(Self::Mysql),
1402            "dynamodb" => Ok(Self::DynamoDb),
1403            "surrealdb" | "surreal" => Ok(Self::SurrealDb),
1404            "scylladb" | "scylla" => Ok(Self::ScyllaDb),
1405            _ => Err(format!("Unknown push storage driver: {s}")),
1406        }
1407    }
1408}
1409
1410#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1411#[serde(rename_all = "lowercase")]
1412pub enum PushQueueDriver {
1413    #[default]
1414    Memory,
1415    Redis,
1416    #[serde(rename = "redis-cluster")]
1417    RedisCluster,
1418    Nats,
1419    Pulsar,
1420    RabbitMq,
1421    #[serde(rename = "google-pubsub")]
1422    GooglePubsub,
1423    Kafka,
1424    Iggy,
1425    Sqs,
1426    Sns,
1427}
1428
1429impl FromStr for PushQueueDriver {
1430    type Err = String;
1431    fn from_str(s: &str) -> Result<Self, Self::Err> {
1432        match s.to_lowercase().as_str() {
1433            "memory" => Ok(Self::Memory),
1434            "redis" => Ok(Self::Redis),
1435            "redis-cluster" | "redis_cluster" => Ok(Self::RedisCluster),
1436            "nats" => Ok(Self::Nats),
1437            "pulsar" => Ok(Self::Pulsar),
1438            "rabbitmq" | "rabbit-mq" => Ok(Self::RabbitMq),
1439            "google-pubsub" | "google_pubsub" | "gcp-pubsub" | "pubsub" => Ok(Self::GooglePubsub),
1440            "kafka" => Ok(Self::Kafka),
1441            "iggy" | "apache-iggy" | "apache_iggy" => Ok(Self::Iggy),
1442            "sqs" => Ok(Self::Sqs),
1443            "sns" => Ok(Self::Sns),
1444            _ => Err(format!("Unknown push queue driver: {s}")),
1445        }
1446    }
1447}
1448
1449#[derive(Debug, Clone, Serialize, Deserialize)]
1450#[serde(default)]
1451pub struct PushConfig {
1452    pub storage_driver: PushStorageDriver,
1453    pub queue_driver: PushQueueDriver,
1454    pub fcm_enabled: bool,
1455    pub apns_enabled: bool,
1456    pub webpush_enabled: bool,
1457    pub hms_enabled: bool,
1458    pub wns_enabled: bool,
1459    pub fcm_credential_ref: Option<String>,
1460    pub apns_credential_ref: Option<String>,
1461    pub webpush_credential_ref: Option<String>,
1462    pub hms_credential_ref: Option<String>,
1463    pub wns_credential_ref: Option<String>,
1464    pub accept_worker_count: u32,
1465    pub planner_worker_count: u32,
1466    pub shard_worker_count: u32,
1467    pub dispatch_worker_count: u32,
1468    pub feedback_worker_count: u32,
1469    pub queue_partition_count: u32,
1470    pub channel_shard_count: u32,
1471    pub fanout_fast_threshold: u64,
1472    pub fanout_shard_size: u64,
1473    pub fanout_sync_threshold: u64,
1474    pub backpressure_lag_threshold_secs: u64,
1475    pub publish_status_ttl_days: u64,
1476    pub stale_device_max_age_days: u64,
1477    pub retry: PushRetryConfig,
1478    pub circuit_breaker: PushCircuitBreakerConfig,
1479    pub default_quotas: PushDefaultQuotas,
1480    pub credential_encryption_key: Option<String>,
1481    pub kms_key_ref: Option<String>,
1482    pub vault_secret_ref: Option<String>,
1483    pub dry_run: bool,
1484    pub analytics_enabled: bool,
1485    pub analytics_retention_days: u64,
1486    pub payload_redaction: PushPayloadRedactionConfig,
1487    pub scheduler_interval_secs: u64,
1488}
1489
1490impl Default for PushConfig {
1491    fn default() -> Self {
1492        Self {
1493            storage_driver: PushStorageDriver::Memory,
1494            queue_driver: PushQueueDriver::Memory,
1495            fcm_enabled: false,
1496            apns_enabled: false,
1497            webpush_enabled: false,
1498            hms_enabled: false,
1499            wns_enabled: false,
1500            fcm_credential_ref: None,
1501            apns_credential_ref: None,
1502            webpush_credential_ref: None,
1503            hms_credential_ref: None,
1504            wns_credential_ref: None,
1505            accept_worker_count: 1,
1506            planner_worker_count: 1,
1507            shard_worker_count: 1,
1508            dispatch_worker_count: 1,
1509            feedback_worker_count: 1,
1510            queue_partition_count: 1,
1511            channel_shard_count: 1,
1512            fanout_fast_threshold: 10_000,
1513            fanout_shard_size: 100_000,
1514            fanout_sync_threshold: 0,
1515            backpressure_lag_threshold_secs: 60,
1516            publish_status_ttl_days: 30,
1517            stale_device_max_age_days: 90,
1518            retry: PushRetryConfig::default(),
1519            circuit_breaker: PushCircuitBreakerConfig::default(),
1520            default_quotas: PushDefaultQuotas::default(),
1521            credential_encryption_key: None,
1522            kms_key_ref: None,
1523            vault_secret_ref: None,
1524            dry_run: false,
1525            analytics_enabled: false,
1526            analytics_retention_days: 30,
1527            payload_redaction: PushPayloadRedactionConfig::default(),
1528            scheduler_interval_secs: 5,
1529        }
1530    }
1531}
1532
1533#[derive(Debug, Clone, Serialize, Deserialize)]
1534#[serde(default)]
1535pub struct PushRetryConfig {
1536    pub max_attempts: u32,
1537    pub initial_backoff_ms: u64,
1538    pub max_backoff_ms: u64,
1539    pub max_elapsed_secs: u64,
1540    pub jitter: bool,
1541    pub respect_retry_after: bool,
1542}
1543
1544impl Default for PushRetryConfig {
1545    fn default() -> Self {
1546        Self {
1547            max_attempts: 5,
1548            initial_backoff_ms: 1_000,
1549            max_backoff_ms: 60_000,
1550            max_elapsed_secs: 86_400,
1551            jitter: true,
1552            respect_retry_after: true,
1553        }
1554    }
1555}
1556
1557#[derive(Debug, Clone, Serialize, Deserialize)]
1558#[serde(default)]
1559pub struct PushCircuitBreakerConfig {
1560    pub failure_threshold: u32,
1561    pub cooldown_secs: u64,
1562    pub half_open_max_inflight: u32,
1563}
1564
1565impl Default for PushCircuitBreakerConfig {
1566    fn default() -> Self {
1567        Self {
1568            failure_threshold: 5,
1569            cooldown_secs: 60,
1570            half_open_max_inflight: 10,
1571        }
1572    }
1573}
1574
1575#[derive(Debug, Clone, Serialize, Deserialize)]
1576#[serde(default)]
1577pub struct PushDefaultQuotas {
1578    pub acceptance_rps: u64,
1579    pub delivery_quota_daily: u64,
1580    pub fanout_max: u64,
1581    pub inflight_max: u64,
1582}
1583
1584impl Default for PushDefaultQuotas {
1585    fn default() -> Self {
1586        Self {
1587            acceptance_rps: 100,
1588            delivery_quota_daily: 0,
1589            fanout_max: 0,
1590            inflight_max: 1_000,
1591        }
1592    }
1593}
1594
1595#[derive(Debug, Clone, Serialize, Deserialize)]
1596#[serde(default)]
1597pub struct PushPayloadRedactionConfig {
1598    pub redact_payload: bool,
1599    pub redact_template_data: bool,
1600    pub redact_provider_overrides: bool,
1601    pub allow_debug_payload_logging: bool,
1602}
1603
1604impl Default for PushPayloadRedactionConfig {
1605    fn default() -> Self {
1606        Self {
1607            redact_payload: true,
1608            redact_template_data: true,
1609            redact_provider_overrides: true,
1610            allow_debug_payload_logging: false,
1611        }
1612    }
1613}
1614
1615#[derive(Debug, Clone, Serialize, Deserialize)]
1616#[serde(default)]
1617pub struct PresenceHistoryConfig {
1618    pub enabled: bool,
1619    pub retention_window_seconds: u64,
1620    pub max_page_size: usize,
1621    pub max_events_per_channel: Option<usize>,
1622    pub max_bytes_per_channel: Option<u64>,
1623}
1624
1625impl Default for PresenceHistoryConfig {
1626    fn default() -> Self {
1627        Self {
1628            enabled: false,
1629            retention_window_seconds: 86400,
1630            max_page_size: 100,
1631            max_events_per_channel: None,
1632            max_bytes_per_channel: None,
1633        }
1634    }
1635}
1636
1637#[derive(Debug, Clone, Serialize, Deserialize)]
1638#[serde(default)]
1639pub struct HttpApiConfig {
1640    pub request_limit_in_mb: u32,
1641    pub accept_traffic: AcceptTraffic,
1642    pub usage_enabled: bool,
1643}
1644
1645#[derive(Debug, Clone, Serialize, Deserialize)]
1646#[serde(default)]
1647pub struct AcceptTraffic {
1648    pub memory_threshold: f64,
1649}
1650
1651#[derive(Debug, Clone, Serialize, Deserialize)]
1652#[serde(default)]
1653pub struct InstanceConfig {
1654    pub process_id: String,
1655}
1656
1657#[derive(Debug, Clone, Serialize, Deserialize)]
1658#[serde(default)]
1659pub struct MetricsConfig {
1660    pub enabled: bool,
1661    pub driver: MetricsDriver,
1662    pub host: String,
1663    pub prometheus: PrometheusConfig,
1664    pub tcp_exporter: MetricsTcpExporterConfig,
1665    pub port: u16,
1666}
1667
1668#[derive(Debug, Clone, Serialize, Deserialize)]
1669#[serde(default)]
1670pub struct PrometheusConfig {
1671    pub prefix: String,
1672}
1673
1674#[derive(Debug, Clone, Serialize, Deserialize)]
1675#[serde(default)]
1676pub struct MetricsTcpExporterConfig {
1677    pub enabled: bool,
1678    pub host: String,
1679    pub port: u16,
1680    pub buffer_size: Option<usize>,
1681}
1682
1683#[derive(Debug, Clone, Serialize, Deserialize)]
1684#[serde(default)]
1685pub struct LoggingConfig {
1686    pub colors_enabled: bool,
1687    pub include_target: bool,
1688}
1689
1690#[derive(Debug, Clone, Serialize, Deserialize)]
1691#[serde(default)]
1692pub struct PresenceConfig {
1693    pub max_members_per_channel: u32,
1694    pub max_member_size_in_kb: u32,
1695}
1696
1697/// WebSocket connection buffer configuration
1698/// Controls backpressure handling for slow consumers
1699#[derive(Debug, Clone, Serialize, Deserialize)]
1700#[serde(default)]
1701pub struct WebSocketConfig {
1702    pub max_messages: Option<usize>,
1703    pub max_bytes: Option<usize>,
1704    pub disconnect_on_buffer_full: bool,
1705    pub max_message_size: usize,
1706    pub max_frame_size: usize,
1707    pub write_buffer_size: usize,
1708    pub max_backpressure: usize,
1709    pub auto_ping: bool,
1710    pub ping_interval: u32,
1711    pub idle_timeout: u32,
1712    pub compression: String,
1713}
1714
1715impl Default for WebSocketConfig {
1716    fn default() -> Self {
1717        Self {
1718            max_messages: Some(1000),
1719            max_bytes: None,
1720            disconnect_on_buffer_full: true,
1721            max_message_size: 64 * 1024 * 1024,
1722            max_frame_size: 16 * 1024 * 1024,
1723            write_buffer_size: 16 * 1024,
1724            max_backpressure: 1024 * 1024,
1725            auto_ping: true,
1726            ping_interval: 30,
1727            idle_timeout: 120,
1728            compression: "disabled".to_string(),
1729        }
1730    }
1731}
1732
1733impl WebSocketConfig {
1734    /// Convert to WebSocketBufferConfig for runtime use
1735    pub fn to_buffer_config(&self) -> crate::websocket::WebSocketBufferConfig {
1736        use crate::websocket::{BufferLimit, WebSocketBufferConfig};
1737
1738        let limit = match (self.max_messages, self.max_bytes) {
1739            (Some(messages), Some(bytes)) => BufferLimit::Both { messages, bytes },
1740            (Some(messages), None) => BufferLimit::Messages(messages),
1741            (None, Some(bytes)) => BufferLimit::Bytes(bytes),
1742            (None, None) => BufferLimit::Messages(1000),
1743        };
1744
1745        WebSocketBufferConfig {
1746            limit,
1747            disconnect_on_full: self.disconnect_on_buffer_full,
1748        }
1749    }
1750
1751    /// Convert to native sockudo-ws runtime configuration.
1752    pub fn to_sockudo_ws_config(
1753        &self,
1754        websocket_max_payload_kb: u32,
1755        activity_timeout: u64,
1756    ) -> sockudo_ws::Config {
1757        use sockudo_ws::Compression;
1758
1759        let compression = match self.compression.to_lowercase().as_str() {
1760            "dedicated" => Compression::Dedicated,
1761            "shared" => Compression::Shared,
1762            "window256b" => Compression::Window256B,
1763            "window1kb" => Compression::Window1KB,
1764            "window2kb" => Compression::Window2KB,
1765            "window4kb" => Compression::Window4KB,
1766            "window8kb" => Compression::Window8KB,
1767            "window16kb" => Compression::Window16KB,
1768            "window32kb" => Compression::Window32KB,
1769            _ => Compression::Disabled,
1770        };
1771
1772        sockudo_ws::Config::builder()
1773            .max_payload_length(
1774                self.max_bytes
1775                    .unwrap_or(websocket_max_payload_kb as usize * 1024),
1776            )
1777            .max_message_size(self.max_message_size)
1778            .max_frame_size(self.max_frame_size)
1779            .write_buffer_size(self.write_buffer_size)
1780            .max_backpressure(self.max_backpressure)
1781            .idle_timeout(self.idle_timeout)
1782            .auto_ping(self.auto_ping)
1783            .ping_interval(self.ping_interval.max((activity_timeout / 2).max(5) as u32))
1784            .compression(compression)
1785            .build()
1786    }
1787}
1788
1789#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1790#[serde(default)]
1791pub struct QueueConfig {
1792    pub driver: QueueDriver,
1793    pub redis: RedisQueueConfig,
1794    pub redis_cluster: RedisClusterQueueConfig,
1795    pub nats: NatsAdapterConfig,
1796    pub pulsar: PulsarAdapterConfig,
1797    pub rabbitmq: RabbitMqAdapterConfig,
1798    pub google_pubsub: GooglePubSubAdapterConfig,
1799    pub kafka: KafkaAdapterConfig,
1800    pub iggy: IggyConfig,
1801    pub sqs: SqsQueueConfig,
1802    pub sns: SnsQueueConfig,
1803}
1804
1805#[derive(Debug, Clone, Serialize, Deserialize)]
1806#[serde(default)]
1807pub struct RedisQueueConfig {
1808    pub concurrency: u32,
1809    pub prefix: Option<String>,
1810    pub url_override: Option<String>,
1811    pub cluster_mode: bool,
1812}
1813
1814#[derive(Clone, Debug, Serialize, Deserialize)]
1815#[serde(default)]
1816pub struct RateLimit {
1817    pub max_requests: u32,
1818    pub window_seconds: u64,
1819    pub identifier: Option<String>,
1820    pub trust_hops: Option<u32>,
1821}
1822
1823#[derive(Debug, Clone, Serialize, Deserialize)]
1824#[serde(default)]
1825pub struct RateLimiterConfig {
1826    pub enabled: bool,
1827    pub driver: CacheDriver,
1828    pub api_rate_limit: RateLimit,
1829    pub websocket_rate_limit: RateLimit,
1830    pub redis: RedisConfig,
1831}
1832
1833#[derive(Debug, Clone, Serialize, Deserialize)]
1834#[serde(default)]
1835pub struct SslConfig {
1836    pub enabled: bool,
1837    pub cert_path: String,
1838    pub key_path: String,
1839    pub passphrase: Option<String>,
1840    pub ca_path: Option<String>,
1841    pub redirect_http: bool,
1842    pub http_port: Option<u16>,
1843}
1844
1845#[derive(Debug, Clone, Serialize, Deserialize)]
1846#[serde(default)]
1847pub struct WebhooksConfig {
1848    pub batching: BatchingConfig,
1849    pub retry: WebhookRetryConfig,
1850    pub request_timeout_ms: u64,
1851}
1852
1853#[derive(Debug, Clone, Serialize, Deserialize)]
1854#[serde(default)]
1855pub struct BatchingConfig {
1856    pub enabled: bool,
1857    pub duration: u64,
1858    pub size: usize,
1859}
1860
1861#[derive(Debug, Clone, Serialize, Deserialize)]
1862#[serde(default)]
1863pub struct WebhookRetryConfig {
1864    pub enabled: bool,
1865    pub max_attempts: Option<u32>,
1866    pub max_elapsed_time_ms: u64,
1867    pub initial_backoff_ms: u64,
1868    pub max_backoff_ms: u64,
1869}
1870
1871impl Default for WebhooksConfig {
1872    fn default() -> Self {
1873        Self {
1874            batching: BatchingConfig::default(),
1875            retry: WebhookRetryConfig::default(),
1876            request_timeout_ms: 10_000,
1877        }
1878    }
1879}
1880
1881impl Default for WebhookRetryConfig {
1882    fn default() -> Self {
1883        Self {
1884            enabled: true,
1885            max_attempts: None,
1886            max_elapsed_time_ms: 300_000,
1887            initial_backoff_ms: 1_000,
1888            max_backoff_ms: 60_000,
1889        }
1890    }
1891}
1892
1893#[derive(Debug, Clone, Serialize, Deserialize)]
1894#[serde(default)]
1895pub struct ClusterHealthConfig {
1896    pub enabled: bool,
1897    pub heartbeat_interval_ms: u64,
1898    pub node_timeout_ms: u64,
1899    pub cleanup_interval_ms: u64,
1900}
1901
1902#[derive(Debug, Clone, Serialize, Deserialize)]
1903#[serde(default)]
1904pub struct UnixSocketConfig {
1905    pub enabled: bool,
1906    pub path: String,
1907    #[serde(deserialize_with = "deserialize_octal_permission")]
1908    pub permission_mode: u32,
1909}
1910
1911#[derive(Debug, Clone, Serialize, Deserialize)]
1912#[serde(default)]
1913pub struct DeltaCompressionOptionsConfig {
1914    pub enabled: bool,
1915    pub algorithm: String,
1916    pub full_message_interval: u32,
1917    pub min_message_size: usize,
1918    pub max_state_age_secs: u64,
1919    pub max_channel_states_per_socket: usize,
1920    pub max_conflation_states_per_channel: Option<usize>,
1921    pub conflation_key_path: Option<String>,
1922    pub cluster_coordination: bool,
1923    pub coordination_backend: DeltaCoordinationBackend,
1924    pub omit_delta_algorithm: bool,
1925}
1926
1927/// Cleanup system configuration (minimal version for options; full impl lives in sockudo crate)
1928#[derive(Debug, Clone, Serialize, Deserialize)]
1929#[serde(default)]
1930pub struct CleanupConfig {
1931    pub queue_buffer_size: usize,
1932    pub batch_size: usize,
1933    pub batch_timeout_ms: u64,
1934    pub worker_threads: WorkerThreadsConfig,
1935    pub max_retry_attempts: u32,
1936    pub async_enabled: bool,
1937    pub fallback_to_sync: bool,
1938}
1939
1940/// Worker threads configuration for the cleanup system
1941#[derive(Debug, Clone)]
1942pub enum WorkerThreadsConfig {
1943    Auto,
1944    Fixed(usize),
1945}
1946
1947impl serde::Serialize for WorkerThreadsConfig {
1948    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1949    where
1950        S: serde::Serializer,
1951    {
1952        match self {
1953            WorkerThreadsConfig::Auto => serializer.serialize_str("auto"),
1954            WorkerThreadsConfig::Fixed(n) => serializer.serialize_u64(*n as u64),
1955        }
1956    }
1957}
1958
1959impl<'de> serde::Deserialize<'de> for WorkerThreadsConfig {
1960    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1961    where
1962        D: serde::Deserializer<'de>,
1963    {
1964        use serde::de;
1965        struct WorkerThreadsVisitor;
1966        impl<'de> de::Visitor<'de> for WorkerThreadsVisitor {
1967            type Value = WorkerThreadsConfig;
1968
1969            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1970                formatter.write_str(r#""auto" or a positive integer"#)
1971            }
1972
1973            fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
1974                if value.eq_ignore_ascii_case("auto") {
1975                    Ok(WorkerThreadsConfig::Auto)
1976                } else if let Ok(n) = value.parse::<usize>() {
1977                    Ok(WorkerThreadsConfig::Fixed(n))
1978                } else {
1979                    Err(E::custom(format!(
1980                        "expected 'auto' or a number, got '{value}'"
1981                    )))
1982                }
1983            }
1984
1985            fn visit_u64<E: de::Error>(self, value: u64) -> Result<Self::Value, E> {
1986                Ok(WorkerThreadsConfig::Fixed(value as usize))
1987            }
1988
1989            fn visit_i64<E: de::Error>(self, value: i64) -> Result<Self::Value, E> {
1990                if value >= 0 {
1991                    Ok(WorkerThreadsConfig::Fixed(value as usize))
1992                } else {
1993                    Err(E::custom("worker_threads must be non-negative"))
1994                }
1995            }
1996        }
1997        deserializer.deserialize_any(WorkerThreadsVisitor)
1998    }
1999}
2000
2001impl Default for CleanupConfig {
2002    fn default() -> Self {
2003        Self {
2004            queue_buffer_size: 1024,
2005            batch_size: 64,
2006            batch_timeout_ms: 100,
2007            worker_threads: WorkerThreadsConfig::Auto,
2008            max_retry_attempts: 3,
2009            async_enabled: true,
2010            fallback_to_sync: true,
2011        }
2012    }
2013}
2014
2015impl CleanupConfig {
2016    pub fn validate(&self) -> Result<(), String> {
2017        if self.queue_buffer_size == 0 {
2018            return Err("queue_buffer_size must be greater than 0".to_string());
2019        }
2020        if self.batch_size == 0 {
2021            return Err("batch_size must be greater than 0".to_string());
2022        }
2023        if self.batch_timeout_ms == 0 {
2024            return Err("batch_timeout_ms must be greater than 0".to_string());
2025        }
2026        if let WorkerThreadsConfig::Fixed(n) = self.worker_threads
2027            && n == 0
2028        {
2029            return Err("worker_threads must be greater than 0 when using fixed count".to_string());
2030        }
2031        Ok(())
2032    }
2033}
2034
2035#[derive(Debug, Clone, Serialize, Deserialize, Default)]
2036#[serde(default)]
2037pub struct TagFilteringConfig {
2038    #[serde(default)]
2039    pub enabled: bool,
2040    #[serde(default = "default_true")]
2041    pub enable_tags: bool,
2042}
2043
2044fn default_true() -> bool {
2045    true
2046}
2047
2048// --- Default Implementations ---
2049
2050impl Default for ServerOptions {
2051    fn default() -> Self {
2052        Self {
2053            adapter: AdapterConfig::default(),
2054            app_manager: AppManagerConfig::default(),
2055            cache: CacheConfig::default(),
2056            channel_limits: ChannelLimits::default(),
2057            cors: CorsConfig::default(),
2058            database: DatabaseConfig::default(),
2059            database_pooling: DatabasePooling::default(),
2060            debug: false,
2061            tag_filtering: TagFilteringConfig::default(),
2062            event_limits: EventLimits::default(),
2063            host: "0.0.0.0".to_string(),
2064            http_api: HttpApiConfig::default(),
2065            instance: InstanceConfig::default(),
2066            logging: None,
2067            metrics: MetricsConfig::default(),
2068            mode: "production".to_string(),
2069            port: 6001,
2070            path_prefix: "/".to_string(),
2071            presence: PresenceConfig::default(),
2072            queue: QueueConfig::default(),
2073            rate_limiter: RateLimiterConfig::default(),
2074            shutdown_grace_period: 10,
2075            ssl: SslConfig::default(),
2076            user_authentication_timeout: 3600,
2077            webhooks: WebhooksConfig::default(),
2078            websocket_max_payload_kb: 64,
2079            cleanup: CleanupConfig::default(),
2080            activity_timeout: 120,
2081            cluster_health: ClusterHealthConfig::default(),
2082            unix_socket: UnixSocketConfig::default(),
2083            delta_compression: DeltaCompressionOptionsConfig::default(),
2084            websocket: WebSocketConfig::default(),
2085            connection_recovery: ConnectionRecoveryConfig::default(),
2086            history: HistoryConfig::default(),
2087            presence_history: PresenceHistoryConfig::default(),
2088            idempotency: IdempotencyConfig::default(),
2089            ephemeral: EphemeralConfig::default(),
2090            echo_control: EchoControlConfig::default(),
2091            event_name_filtering: EventNameFilteringConfig::default(),
2092            versioned_messages: VersionedMessagesConfig::default(),
2093            annotations: AnnotationsConfig::default(),
2094            push: PushConfig::default(),
2095            health_check_timeout_ms: 400,
2096        }
2097    }
2098}
2099
2100impl Default for SqsQueueConfig {
2101    fn default() -> Self {
2102        Self {
2103            region: "us-east-1".to_string(),
2104            queue_url_prefix: None,
2105            visibility_timeout: 30,
2106            endpoint_url: None,
2107            max_messages: 10,
2108            wait_time_seconds: 5,
2109            concurrency: 5,
2110            fifo: false,
2111            message_group_id: Some("default".to_string()),
2112        }
2113    }
2114}
2115
2116impl Default for SnsQueueConfig {
2117    fn default() -> Self {
2118        Self {
2119            region: "us-east-1".to_string(),
2120            topic_arn: String::new(),
2121            endpoint_url: None,
2122        }
2123    }
2124}
2125
2126impl Default for RedisAdapterConfig {
2127    fn default() -> Self {
2128        Self {
2129            requests_timeout: 5000,
2130            prefix: "sockudo_adapter:".to_string(),
2131            redis_pub_options: AHashMap::new(),
2132            redis_sub_options: AHashMap::new(),
2133            cluster_mode: false,
2134        }
2135    }
2136}
2137
2138impl Default for RedisClusterAdapterConfig {
2139    fn default() -> Self {
2140        Self {
2141            nodes: vec![],
2142            prefix: "sockudo_adapter:".to_string(),
2143            request_timeout_ms: 1000,
2144            use_connection_manager: true,
2145            use_sharded_pubsub: false,
2146        }
2147    }
2148}
2149
2150impl Default for NatsAdapterConfig {
2151    fn default() -> Self {
2152        Self {
2153            servers: vec!["nats://localhost:4222".to_string()],
2154            prefix: "sockudo_adapter:".to_string(),
2155            request_timeout_ms: 5000,
2156            username: None,
2157            password: None,
2158            token: None,
2159            connection_timeout_ms: 5000,
2160            nodes_number: None,
2161            discovery_max_wait_ms: 1000,
2162            discovery_idle_wait_ms: 150,
2163            subscription_capacity: None,
2164            client_capacity: None,
2165            max_reconnects: None,
2166            presence_sync_chunk_size: None,
2167        }
2168    }
2169}
2170
2171impl Default for PulsarAdapterConfig {
2172    fn default() -> Self {
2173        Self {
2174            url: "pulsar://127.0.0.1:6650".to_string(),
2175            prefix: "sockudo-adapter".to_string(),
2176            request_timeout_ms: 5000,
2177            token: None,
2178            nodes_number: None,
2179        }
2180    }
2181}
2182
2183impl Default for RabbitMqAdapterConfig {
2184    fn default() -> Self {
2185        Self {
2186            url: "amqp://guest:guest@127.0.0.1:5672/%2f".to_string(),
2187            prefix: "sockudo_adapter".to_string(),
2188            request_timeout_ms: 5000,
2189            connection_timeout_ms: 5000,
2190            nodes_number: None,
2191        }
2192    }
2193}
2194
2195impl Default for GooglePubSubAdapterConfig {
2196    fn default() -> Self {
2197        Self {
2198            project_id: "".to_string(),
2199            prefix: "sockudo-adapter".to_string(),
2200            request_timeout_ms: 5000,
2201            emulator_host: None,
2202            nodes_number: None,
2203        }
2204    }
2205}
2206
2207impl Default for KafkaAdapterConfig {
2208    fn default() -> Self {
2209        Self {
2210            brokers: vec!["localhost:9092".to_string()],
2211            prefix: "sockudo_adapter".to_string(),
2212            request_timeout_ms: 5000,
2213            security_protocol: None,
2214            sasl_mechanism: None,
2215            sasl_username: None,
2216            sasl_password: None,
2217            nodes_number: None,
2218        }
2219    }
2220}
2221
2222impl Default for IggyConfig {
2223    fn default() -> Self {
2224        Self {
2225            connection_string: "iggy://iggy:iggy@127.0.0.1:8090".to_string(),
2226            username: None,
2227            password: None,
2228            consumer_name: None,
2229            stream: "sockudo".to_string(),
2230            topic_prefix: "sockudo-adapter".to_string(),
2231            queue_topic_prefix: "sockudo-queue".to_string(),
2232            consumer_group_prefix: "sockudo-workers".to_string(),
2233            request_timeout_ms: 5000,
2234            poll_interval_ms: 50,
2235            poll_batch_size: 100,
2236            partitions_count: 1,
2237            partition_id: 0,
2238            auto_create: true,
2239            start_from_latest: true,
2240            nodes_number: None,
2241        }
2242    }
2243}
2244
2245impl Default for CacheSettings {
2246    fn default() -> Self {
2247        Self {
2248            enabled: true,
2249            ttl: 300,
2250        }
2251    }
2252}
2253
2254impl Default for MemoryCacheOptions {
2255    fn default() -> Self {
2256        Self {
2257            ttl: 300,
2258            cleanup_interval: 60,
2259            max_capacity: 10000,
2260        }
2261    }
2262}
2263
2264impl Default for CacheConfig {
2265    fn default() -> Self {
2266        Self {
2267            driver: CacheDriver::default(),
2268            redis: RedisConfig {
2269                prefix: Some("sockudo_cache:".to_string()),
2270                url_override: None,
2271                cluster_mode: false,
2272            },
2273            memory: MemoryCacheOptions::default(),
2274        }
2275    }
2276}
2277
2278impl Default for ChannelLimits {
2279    fn default() -> Self {
2280        Self {
2281            max_name_length: 200,
2282            cache_ttl: 3600,
2283        }
2284    }
2285}
2286impl Default for CorsConfig {
2287    fn default() -> Self {
2288        Self {
2289            credentials: true,
2290            origin: vec!["*".to_string()],
2291            methods: vec!["GET".to_string(), "POST".to_string(), "OPTIONS".to_string()],
2292            allowed_headers: vec![
2293                "Authorization".to_string(),
2294                "Content-Type".to_string(),
2295                "X-Requested-With".to_string(),
2296                "Accept".to_string(),
2297            ],
2298        }
2299    }
2300}
2301
2302impl Default for DatabaseConnection {
2303    fn default() -> Self {
2304        Self {
2305            host: "localhost".to_string(),
2306            port: 3306,
2307            username: "root".to_string(),
2308            password: "".to_string(),
2309            database: "sockudo".to_string(),
2310            table_name: "applications".to_string(),
2311            connection_pool_size: 10,
2312            pool_min: None,
2313            pool_max: None,
2314            cache_ttl: 300,
2315            cache_cleanup_interval: 60,
2316            cache_max_capacity: 100,
2317        }
2318    }
2319}
2320
2321impl Default for RedisConnection {
2322    fn default() -> Self {
2323        Self {
2324            host: "127.0.0.1".to_string(),
2325            port: 6379,
2326            db: 0,
2327            username: None,
2328            password: None,
2329            key_prefix: "sockudo:".to_string(),
2330            sentinels: Vec::new(),
2331            sentinel_password: None,
2332            name: "mymaster".to_string(),
2333            cluster: RedisClusterConnection::default(),
2334            cluster_nodes: Vec::new(),
2335        }
2336    }
2337}
2338
2339impl Default for RedisSentinel {
2340    fn default() -> Self {
2341        Self {
2342            host: "localhost".to_string(),
2343            port: 26379,
2344        }
2345    }
2346}
2347
2348impl Default for ClusterNode {
2349    fn default() -> Self {
2350        Self {
2351            host: "127.0.0.1".to_string(),
2352            port: 7000,
2353        }
2354    }
2355}
2356
2357impl Default for DatabasePooling {
2358    fn default() -> Self {
2359        Self {
2360            enabled: true,
2361            min: 2,
2362            max: 10,
2363        }
2364    }
2365}
2366
2367impl Default for EventLimits {
2368    fn default() -> Self {
2369        Self {
2370            max_channels_at_once: 100,
2371            max_name_length: 200,
2372            max_payload_in_kb: 100,
2373            max_batch_size: 10,
2374        }
2375    }
2376}
2377
2378impl Default for IdempotencyConfig {
2379    fn default() -> Self {
2380        Self {
2381            enabled: true,
2382            ttl_seconds: 120,
2383            max_key_length: 128,
2384        }
2385    }
2386}
2387
2388impl Default for EphemeralConfig {
2389    fn default() -> Self {
2390        Self { enabled: true }
2391    }
2392}
2393
2394impl Default for EchoControlConfig {
2395    fn default() -> Self {
2396        Self {
2397            enabled: true,
2398            default_echo_messages: true,
2399        }
2400    }
2401}
2402
2403impl Default for EventNameFilteringConfig {
2404    fn default() -> Self {
2405        Self {
2406            enabled: true,
2407            max_events_per_filter: 50,
2408            max_event_name_length: 200,
2409        }
2410    }
2411}
2412
2413impl Default for ConnectionRecoveryConfig {
2414    fn default() -> Self {
2415        Self {
2416            enabled: false,
2417            buffer_ttl_seconds: 120,
2418            max_buffer_size: 100,
2419        }
2420    }
2421}
2422
2423impl Default for HttpApiConfig {
2424    fn default() -> Self {
2425        Self {
2426            request_limit_in_mb: 10,
2427            accept_traffic: AcceptTraffic::default(),
2428            usage_enabled: true,
2429        }
2430    }
2431}
2432
2433impl Default for AcceptTraffic {
2434    fn default() -> Self {
2435        Self {
2436            memory_threshold: 0.90,
2437        }
2438    }
2439}
2440
2441impl Default for InstanceConfig {
2442    fn default() -> Self {
2443        Self {
2444            process_id: uuid::Uuid::new_v4().to_string(),
2445        }
2446    }
2447}
2448
2449impl Default for LoggingConfig {
2450    fn default() -> Self {
2451        Self {
2452            colors_enabled: true,
2453            include_target: true,
2454        }
2455    }
2456}
2457
2458impl Default for MetricsConfig {
2459    fn default() -> Self {
2460        Self {
2461            enabled: true,
2462            driver: MetricsDriver::default(),
2463            host: "0.0.0.0".to_string(),
2464            prometheus: PrometheusConfig::default(),
2465            tcp_exporter: MetricsTcpExporterConfig::default(),
2466            port: 9601,
2467        }
2468    }
2469}
2470
2471impl Default for PrometheusConfig {
2472    fn default() -> Self {
2473        Self {
2474            prefix: "sockudo_".to_string(),
2475        }
2476    }
2477}
2478
2479impl Default for MetricsTcpExporterConfig {
2480    fn default() -> Self {
2481        Self {
2482            enabled: false,
2483            host: "127.0.0.1".to_string(),
2484            port: 5000,
2485            buffer_size: Some(1024),
2486        }
2487    }
2488}
2489
2490impl Default for PresenceConfig {
2491    fn default() -> Self {
2492        Self {
2493            max_members_per_channel: 100,
2494            max_member_size_in_kb: 2,
2495        }
2496    }
2497}
2498
2499impl Default for RedisQueueConfig {
2500    fn default() -> Self {
2501        Self {
2502            concurrency: 5,
2503            prefix: Some("sockudo_queue:".to_string()),
2504            url_override: None,
2505            cluster_mode: false,
2506        }
2507    }
2508}
2509
2510impl Default for RateLimit {
2511    fn default() -> Self {
2512        Self {
2513            max_requests: 60,
2514            window_seconds: 60,
2515            identifier: Some("default".to_string()),
2516            trust_hops: Some(0),
2517        }
2518    }
2519}
2520
2521impl Default for RateLimiterConfig {
2522    fn default() -> Self {
2523        Self {
2524            enabled: true,
2525            driver: CacheDriver::Memory,
2526            api_rate_limit: RateLimit {
2527                max_requests: 100,
2528                window_seconds: 60,
2529                identifier: Some("api".to_string()),
2530                trust_hops: Some(0),
2531            },
2532            websocket_rate_limit: RateLimit {
2533                max_requests: 20,
2534                window_seconds: 60,
2535                identifier: Some("websocket_connect".to_string()),
2536                trust_hops: Some(0),
2537            },
2538            redis: RedisConfig {
2539                prefix: Some("sockudo_rl:".to_string()),
2540                url_override: None,
2541                cluster_mode: false,
2542            },
2543        }
2544    }
2545}
2546
2547impl Default for SslConfig {
2548    fn default() -> Self {
2549        Self {
2550            enabled: false,
2551            cert_path: "".to_string(),
2552            key_path: "".to_string(),
2553            passphrase: None,
2554            ca_path: None,
2555            redirect_http: false,
2556            http_port: Some(80),
2557        }
2558    }
2559}
2560
2561impl Default for BatchingConfig {
2562    fn default() -> Self {
2563        Self {
2564            enabled: true,
2565            duration: 50,
2566            size: 100,
2567        }
2568    }
2569}
2570
2571impl Default for ClusterHealthConfig {
2572    fn default() -> Self {
2573        Self {
2574            enabled: true,
2575            heartbeat_interval_ms: 10000,
2576            node_timeout_ms: 30000,
2577            cleanup_interval_ms: 10000,
2578        }
2579    }
2580}
2581
2582impl Default for UnixSocketConfig {
2583    fn default() -> Self {
2584        Self {
2585            enabled: false,
2586            path: "/var/run/sockudo/sockudo.sock".to_string(),
2587            permission_mode: 0o660,
2588        }
2589    }
2590}
2591
2592impl Default for DeltaCompressionOptionsConfig {
2593    fn default() -> Self {
2594        Self {
2595            enabled: true,
2596            algorithm: "fossil".to_string(),
2597            full_message_interval: 10,
2598            min_message_size: 100,
2599            max_state_age_secs: 300,
2600            max_channel_states_per_socket: 100,
2601            max_conflation_states_per_channel: Some(100),
2602            conflation_key_path: None,
2603            cluster_coordination: false,
2604            coordination_backend: DeltaCoordinationBackend::Auto,
2605            omit_delta_algorithm: false,
2606        }
2607    }
2608}
2609
2610#[cfg(test)]
2611mod tests {
2612    use super::{
2613        DeltaCoordinationBackend, PushQueueDriver, PushStorageDriver, QueueDriver, ServerOptions,
2614        VersionStoreDriver,
2615    };
2616    use crate::app::{App, AppPolicy};
2617    use std::str::FromStr;
2618
2619    const APP_BOOTSTRAP_ENV_KEYS: &[&str] = &[
2620        "SOCKUDO_DEFAULT_APP_ID",
2621        "SOCKUDO_DEFAULT_APP_KEY",
2622        "SOCKUDO_DEFAULT_APP_SECRET",
2623        "SOCKUDO_DEFAULT_APP_ENABLED",
2624        "SOCKUDO_SKIP_INLINE_APPS",
2625        "APP_MANAGER_REGISTER_INLINE_APPS",
2626    ];
2627
2628    struct EnvGuard {
2629        previous: Vec<(&'static str, Option<String>)>,
2630    }
2631
2632    impl EnvGuard {
2633        fn app_bootstrap(overrides: &[(&'static str, &'static str)]) -> Self {
2634            let previous = APP_BOOTSTRAP_ENV_KEYS
2635                .iter()
2636                .map(|key| (*key, std::env::var(key).ok()))
2637                .collect();
2638
2639            // SAFETY: These tests isolate the app-bootstrap environment keys
2640            // before applying per-test overrides and restore them in Drop.
2641            unsafe {
2642                for key in APP_BOOTSTRAP_ENV_KEYS {
2643                    std::env::remove_var(key);
2644                }
2645                for (key, value) in overrides {
2646                    std::env::set_var(key, value);
2647                }
2648            }
2649
2650            Self { previous }
2651        }
2652    }
2653
2654    impl Drop for EnvGuard {
2655        fn drop(&mut self) {
2656            // SAFETY: Restores each key to its pre-test value or removes it if
2657            // it did not exist before the test.
2658            unsafe {
2659                for (key, value) in &self.previous {
2660                    if let Some(value) = value {
2661                        std::env::set_var(key, value);
2662                    } else {
2663                        std::env::remove_var(key);
2664                    }
2665                }
2666            }
2667        }
2668    }
2669
2670    fn inline_test_app() -> App {
2671        App::from_policy(
2672            "app-id".to_string(),
2673            "app-key".to_string(),
2674            "app-secret".to_string(),
2675            true,
2676            AppPolicy::default(),
2677        )
2678    }
2679
2680    #[test]
2681    fn queue_driver_parses_broker_backends() {
2682        assert_eq!(
2683            QueueDriver::from_str("rabbitmq").unwrap(),
2684            QueueDriver::RabbitMq
2685        );
2686        assert_eq!(QueueDriver::from_str("kafka").unwrap(), QueueDriver::Kafka);
2687        assert_eq!(
2688            QueueDriver::from_str("pulsar").unwrap(),
2689            QueueDriver::Pulsar
2690        );
2691        assert_eq!(
2692            QueueDriver::from_str("google-pubsub").unwrap(),
2693            QueueDriver::GooglePubSub
2694        );
2695    }
2696
2697    #[test]
2698    fn delta_coordination_backend_parses_expected_values() {
2699        assert_eq!(
2700            DeltaCoordinationBackend::from_str("auto").unwrap(),
2701            DeltaCoordinationBackend::Auto
2702        );
2703        assert_eq!(
2704            DeltaCoordinationBackend::from_str("redis-cluster").unwrap(),
2705            DeltaCoordinationBackend::RedisCluster
2706        );
2707        assert_eq!(
2708            DeltaCoordinationBackend::from_str("nats").unwrap(),
2709            DeltaCoordinationBackend::Nats
2710        );
2711    }
2712
2713    #[tokio::test]
2714    async fn app_bootstrap_env_overrides_inline_apps() {
2715        {
2716            let _env = EnvGuard::app_bootstrap(&[("SOCKUDO_DEFAULT_APP_ENABLED", "false")]);
2717            let mut options = ServerOptions::default();
2718            options.app_manager.array.apps.push(inline_test_app());
2719
2720            options.override_from_env().await.unwrap();
2721
2722            assert!(options.app_manager.array.apps.is_empty());
2723        }
2724
2725        {
2726            let _env = EnvGuard::app_bootstrap(&[("SOCKUDO_DEFAULT_APP_ENABLED", "true")]);
2727            let mut options = ServerOptions::default();
2728            options.app_manager.array.apps.push(inline_test_app());
2729
2730            options.override_from_env().await.unwrap();
2731
2732            assert_eq!(options.app_manager.array.apps.len(), 1);
2733            assert_eq!(options.app_manager.array.apps[0].id, "app-id");
2734        }
2735
2736        {
2737            let _env = EnvGuard::app_bootstrap(&[
2738                ("SOCKUDO_DEFAULT_APP_ID", "prod-app"),
2739                ("SOCKUDO_DEFAULT_APP_KEY", "prod-key"),
2740                ("SOCKUDO_DEFAULT_APP_SECRET", "prod-secret"),
2741                ("SOCKUDO_DEFAULT_APP_ENABLED", "true"),
2742            ]);
2743            let mut options = ServerOptions::default();
2744            options.app_manager.array.apps.push(inline_test_app());
2745
2746            options.override_from_env().await.unwrap();
2747
2748            assert_eq!(options.app_manager.array.apps.len(), 1);
2749            let app = &options.app_manager.array.apps[0];
2750            assert_eq!(app.id, "prod-app");
2751            assert_eq!(app.key, "prod-key");
2752            assert_eq!(app.secret, "prod-secret");
2753            assert!(app.enabled);
2754        }
2755
2756        {
2757            let _env = EnvGuard::app_bootstrap(&[("APP_MANAGER_REGISTER_INLINE_APPS", "false")]);
2758            let mut options = ServerOptions::default();
2759            options.app_manager.array.apps.push(inline_test_app());
2760
2761            options.override_from_env().await.unwrap();
2762
2763            assert!(options.app_manager.array.apps.is_empty());
2764        }
2765    }
2766
2767    #[tokio::test]
2768    async fn versioned_messages_driver_overrides_from_env() {
2769        let previous = std::env::var("VERSIONED_MESSAGES_DRIVER").ok();
2770        // SAFETY: This test controls the environment variable lifecycle for a
2771        // single key and restores the prior value before it returns.
2772        unsafe { std::env::set_var("VERSIONED_MESSAGES_DRIVER", "postgres") };
2773
2774        let mut options = ServerOptions::default();
2775        options.override_from_env().await.unwrap();
2776
2777        if let Some(previous) = previous {
2778            // SAFETY: Restoring the pre-test value for the same key.
2779            unsafe { std::env::set_var("VERSIONED_MESSAGES_DRIVER", previous) };
2780        } else {
2781            // SAFETY: Removing the test-only environment variable before exit.
2782            unsafe { std::env::remove_var("VERSIONED_MESSAGES_DRIVER") };
2783        }
2784
2785        assert_eq!(
2786            options.versioned_messages.driver,
2787            VersionStoreDriver::Postgres
2788        );
2789    }
2790
2791    #[tokio::test]
2792    async fn push_storage_driver_overrides_from_env() {
2793        let previous = std::env::var("PUSH_STORAGE_DRIVER").ok();
2794        // SAFETY: This test controls the environment variable lifecycle for a
2795        // single key and restores the prior value before it returns.
2796        unsafe { std::env::set_var("PUSH_STORAGE_DRIVER", "mysql") };
2797
2798        let mut options = ServerOptions::default();
2799        options.override_from_env().await.unwrap();
2800
2801        if let Some(previous) = previous {
2802            // SAFETY: Restoring the pre-test value for the same key.
2803            unsafe { std::env::set_var("PUSH_STORAGE_DRIVER", previous) };
2804        } else {
2805            // SAFETY: Removing the test-only environment variable before exit.
2806            unsafe { std::env::remove_var("PUSH_STORAGE_DRIVER") };
2807        }
2808
2809        assert_eq!(options.push.storage_driver, PushStorageDriver::Mysql);
2810    }
2811
2812    #[tokio::test]
2813    async fn push_queue_driver_overrides_from_env() {
2814        let previous = std::env::var("PUSH_QUEUE_DRIVER").ok();
2815        // SAFETY: This test controls the environment variable lifecycle for a
2816        // single key and restores the prior value before it returns.
2817        unsafe { std::env::set_var("PUSH_QUEUE_DRIVER", "redis-cluster") };
2818
2819        let mut options = ServerOptions::default();
2820        options.override_from_env().await.unwrap();
2821
2822        if let Some(previous) = previous {
2823            // SAFETY: Restoring the pre-test value for the same key.
2824            unsafe { std::env::set_var("PUSH_QUEUE_DRIVER", previous) };
2825        } else {
2826            // SAFETY: Removing the test-only environment variable before exit.
2827            unsafe { std::env::remove_var("PUSH_QUEUE_DRIVER") };
2828        }
2829
2830        assert_eq!(options.push.queue_driver, PushQueueDriver::RedisCluster);
2831    }
2832
2833    #[tokio::test]
2834    async fn websocket_rate_limit_trust_hops_overrides_from_env() {
2835        let previous = std::env::var("RATE_LIMITER_WS_TRUST_HOPS").ok();
2836        // SAFETY: This test controls the environment variable lifecycle for a
2837        // single key and restores the prior value before it returns.
2838        unsafe { std::env::set_var("RATE_LIMITER_WS_TRUST_HOPS", "2") };
2839
2840        let mut options = ServerOptions::default();
2841        options.override_from_env().await.unwrap();
2842
2843        if let Some(previous) = previous {
2844            // SAFETY: Restoring the pre-test value for the same key.
2845            unsafe { std::env::set_var("RATE_LIMITER_WS_TRUST_HOPS", previous) };
2846        } else {
2847            // SAFETY: Removing the test-only environment variable before exit.
2848            unsafe { std::env::remove_var("RATE_LIMITER_WS_TRUST_HOPS") };
2849        }
2850
2851        assert_eq!(
2852            options.rate_limiter.websocket_rate_limit.trust_hops,
2853            Some(2)
2854        );
2855    }
2856
2857    #[test]
2858    fn push_config_defaults_follow_capacity_model() {
2859        let options = ServerOptions::default();
2860
2861        assert_eq!(options.push.storage_driver, PushStorageDriver::Memory);
2862        assert_eq!(options.push.queue_driver, PushQueueDriver::Memory);
2863        assert!(!options.push.fcm_enabled);
2864        assert!(!options.push.apns_enabled);
2865        assert!(!options.push.webpush_enabled);
2866        assert!(!options.push.hms_enabled);
2867        assert!(!options.push.wns_enabled);
2868        assert_eq!(options.push.fanout_fast_threshold, 10_000);
2869        assert_eq!(options.push.fanout_shard_size, 100_000);
2870        assert_eq!(options.push.publish_status_ttl_days, 30);
2871        assert_eq!(options.push.default_quotas.acceptance_rps, 100);
2872        assert_eq!(options.push.circuit_breaker.failure_threshold, 5);
2873        assert!(options.push.payload_redaction.redact_payload);
2874    }
2875
2876    #[tokio::test]
2877    async fn push_release_env_overrides_are_parsed() {
2878        let keys = [
2879            "PUSH_FCM_ENABLED",
2880            "PUSH_APNS_ENABLED",
2881            "PUSH_WEBPUSH_ENABLED",
2882            "PUSH_HMS_ENABLED",
2883            "PUSH_WNS_ENABLED",
2884            "PUSH_CREDENTIAL_ENCRYPTION_KEY",
2885            "PUSH_FANOUT_FAST_THRESHOLD",
2886            "PUSH_FANOUT_SHARD_SIZE",
2887            "PUSH_FANOUT_SYNC_THRESHOLD",
2888            "PUSH_BACKPRESSURE_LAG_THRESHOLD_SECS",
2889            "PUSH_PUBLISH_STATUS_TTL_DAYS",
2890            "PUSH_FAILURE_THRESHOLD",
2891            "PUSH_SCHEDULER_INTERVAL_SECS",
2892            "PUSH_STALE_DEVICE_MAX_AGE_DAYS",
2893            "PUSH_ANALYTICS_ENABLED",
2894            "PUSH_DEFAULT_ACCEPTANCE_RPS",
2895            "PUSH_DEFAULT_DELIVERY_QUOTA_DAILY",
2896            "PUSH_DEFAULT_FANOUT_MAX",
2897            "PUSH_DEFAULT_INFLIGHT_MAX",
2898        ];
2899        let previous: Vec<_> = keys
2900            .iter()
2901            .map(|key| (*key, std::env::var(key).ok()))
2902            .collect();
2903
2904        // SAFETY: This test owns the listed environment variables and restores
2905        // their previous values before returning.
2906        unsafe {
2907            std::env::set_var("PUSH_FCM_ENABLED", "true");
2908            std::env::set_var("PUSH_APNS_ENABLED", "true");
2909            std::env::set_var("PUSH_WEBPUSH_ENABLED", "true");
2910            std::env::set_var("PUSH_HMS_ENABLED", "true");
2911            std::env::set_var("PUSH_WNS_ENABLED", "true");
2912            std::env::set_var("PUSH_CREDENTIAL_ENCRYPTION_KEY", "env:key:v1");
2913            std::env::set_var("PUSH_FANOUT_FAST_THRESHOLD", "12345");
2914            std::env::set_var("PUSH_FANOUT_SHARD_SIZE", "54321");
2915            std::env::set_var("PUSH_FANOUT_SYNC_THRESHOLD", "250");
2916            std::env::set_var("PUSH_BACKPRESSURE_LAG_THRESHOLD_SECS", "42");
2917            std::env::set_var("PUSH_PUBLISH_STATUS_TTL_DAYS", "14");
2918            std::env::set_var("PUSH_FAILURE_THRESHOLD", "9");
2919            std::env::set_var("PUSH_SCHEDULER_INTERVAL_SECS", "11");
2920            std::env::set_var("PUSH_STALE_DEVICE_MAX_AGE_DAYS", "120");
2921            std::env::set_var("PUSH_ANALYTICS_ENABLED", "true");
2922            std::env::set_var("PUSH_DEFAULT_ACCEPTANCE_RPS", "700");
2923            std::env::set_var("PUSH_DEFAULT_DELIVERY_QUOTA_DAILY", "8000");
2924            std::env::set_var("PUSH_DEFAULT_FANOUT_MAX", "9000");
2925            std::env::set_var("PUSH_DEFAULT_INFLIGHT_MAX", "1000");
2926        }
2927
2928        let mut options = ServerOptions::default();
2929        options.override_from_env().await.unwrap();
2930
2931        for (key, value) in previous {
2932            // SAFETY: Restoring each pre-test value or removing the test-only
2933            // variable for the same key.
2934            unsafe {
2935                if let Some(value) = value {
2936                    std::env::set_var(key, value);
2937                } else {
2938                    std::env::remove_var(key);
2939                }
2940            }
2941        }
2942
2943        assert!(options.push.fcm_enabled);
2944        assert!(options.push.apns_enabled);
2945        assert!(options.push.webpush_enabled);
2946        assert!(options.push.hms_enabled);
2947        assert!(options.push.wns_enabled);
2948        assert_eq!(
2949            options.push.credential_encryption_key.as_deref(),
2950            Some("env:key:v1")
2951        );
2952        assert_eq!(options.push.fanout_fast_threshold, 12_345);
2953        assert_eq!(options.push.fanout_shard_size, 54_321);
2954        assert_eq!(options.push.fanout_sync_threshold, 250);
2955        assert_eq!(options.push.backpressure_lag_threshold_secs, 42);
2956        assert_eq!(options.push.publish_status_ttl_days, 14);
2957        assert_eq!(options.push.circuit_breaker.failure_threshold, 9);
2958        assert_eq!(options.push.scheduler_interval_secs, 11);
2959        assert_eq!(options.push.stale_device_max_age_days, 120);
2960        assert!(options.push.analytics_enabled);
2961        assert_eq!(options.push.default_quotas.acceptance_rps, 700);
2962        assert_eq!(options.push.default_quotas.delivery_quota_daily, 8_000);
2963        assert_eq!(options.push.default_quotas.fanout_max, 9_000);
2964        assert_eq!(options.push.default_quotas.inflight_max, 1_000);
2965    }
2966}
2967
2968impl ClusterHealthConfig {
2969    pub fn validate(&self) -> Result<(), String> {
2970        if self.heartbeat_interval_ms == 0 {
2971            return Err("heartbeat_interval_ms must be greater than 0".to_string());
2972        }
2973        if self.node_timeout_ms == 0 {
2974            return Err("node_timeout_ms must be greater than 0".to_string());
2975        }
2976        if self.cleanup_interval_ms == 0 {
2977            return Err("cleanup_interval_ms must be greater than 0".to_string());
2978        }
2979
2980        if self.heartbeat_interval_ms > self.node_timeout_ms / 3 {
2981            return Err(format!(
2982                "heartbeat_interval_ms ({}) should be at least 3x smaller than node_timeout_ms ({}) to avoid false positive dead node detection. Recommended: heartbeat_interval_ms <= {}",
2983                self.heartbeat_interval_ms,
2984                self.node_timeout_ms,
2985                self.node_timeout_ms / 3
2986            ));
2987        }
2988
2989        if self.cleanup_interval_ms > self.node_timeout_ms {
2990            return Err(format!(
2991                "cleanup_interval_ms ({}) should not be larger than node_timeout_ms ({}) to ensure timely dead node detection",
2992                self.cleanup_interval_ms, self.node_timeout_ms
2993            ));
2994        }
2995
2996        Ok(())
2997    }
2998}
2999
3000impl ServerOptions {
3001    pub async fn load_from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
3002        let content = tokio::fs::read_to_string(path).await?;
3003        let options: Self = if path.ends_with(".toml") {
3004            toml::from_str(&content)?
3005        } else {
3006            // Legacy JSON support
3007            sonic_rs::from_str(&content)?
3008        };
3009        Ok(options)
3010    }
3011
3012    pub async fn override_from_env(&mut self) -> Result<(), Box<dyn std::error::Error>> {
3013        // --- General Settings ---
3014        if let Ok(mode) = std::env::var("ENVIRONMENT") {
3015            self.mode = mode;
3016        }
3017        self.debug = parse_bool_env("DEBUG_MODE", self.debug);
3018        if parse_bool_env("DEBUG", false) {
3019            self.debug = true;
3020            info!("DEBUG environment variable forces debug mode ON");
3021        }
3022
3023        self.activity_timeout = parse_env::<u64>("ACTIVITY_TIMEOUT", self.activity_timeout);
3024
3025        if let Ok(host) = std::env::var("HOST") {
3026            self.host = host;
3027        }
3028        self.port = parse_env::<u16>("PORT", self.port);
3029        self.shutdown_grace_period =
3030            parse_env::<u64>("SHUTDOWN_GRACE_PERIOD", self.shutdown_grace_period);
3031        self.user_authentication_timeout = parse_env::<u64>(
3032            "USER_AUTHENTICATION_TIMEOUT",
3033            self.user_authentication_timeout,
3034        );
3035        self.websocket_max_payload_kb =
3036            parse_env::<u32>("WEBSOCKET_MAX_PAYLOAD_KB", self.websocket_max_payload_kb);
3037        if let Ok(id) = std::env::var("INSTANCE_PROCESS_ID") {
3038            self.instance.process_id = id;
3039        }
3040
3041        // --- Driver Configuration ---
3042        if let Ok(driver_str) = std::env::var("ADAPTER_DRIVER") {
3043            self.adapter.driver =
3044                parse_driver_enum(driver_str, self.adapter.driver.clone(), "Adapter");
3045        }
3046        self.adapter.buffer_multiplier_per_cpu = parse_env::<usize>(
3047            "ADAPTER_BUFFER_MULTIPLIER_PER_CPU",
3048            self.adapter.buffer_multiplier_per_cpu,
3049        );
3050        self.adapter.enable_socket_counting = parse_env::<bool>(
3051            "ADAPTER_ENABLE_SOCKET_COUNTING",
3052            self.adapter.enable_socket_counting,
3053        );
3054        self.adapter.aggregate_counts =
3055            parse_env::<bool>("ADAPTER_AGGREGATE_COUNTS", self.adapter.aggregate_counts);
3056        self.adapter.fallback_to_local =
3057            parse_env::<bool>("ADAPTER_FALLBACK_TO_LOCAL", self.adapter.fallback_to_local);
3058        if let Ok(driver_str) = std::env::var("CACHE_DRIVER") {
3059            self.cache.driver = parse_driver_enum(driver_str, self.cache.driver.clone(), "Cache");
3060        }
3061        if let Ok(driver_str) = std::env::var("QUEUE_DRIVER") {
3062            self.queue.driver = parse_driver_enum(driver_str, self.queue.driver.clone(), "Queue");
3063        }
3064        if let Ok(driver_str) = std::env::var("APP_MANAGER_DRIVER") {
3065            self.app_manager.driver =
3066                parse_driver_enum(driver_str, self.app_manager.driver.clone(), "AppManager");
3067        }
3068        if let Ok(driver_str) = std::env::var("RATE_LIMITER_DRIVER") {
3069            self.rate_limiter.driver = parse_driver_enum(
3070                driver_str,
3071                self.rate_limiter.driver.clone(),
3072                "RateLimiter Backend",
3073            );
3074        }
3075
3076        // --- Database: Redis ---
3077        if let Ok(host) = std::env::var("DATABASE_REDIS_HOST") {
3078            self.database.redis.host = host;
3079        }
3080        self.database.redis.port =
3081            parse_env::<u16>("DATABASE_REDIS_PORT", self.database.redis.port);
3082        if let Ok(username) = std::env::var("DATABASE_REDIS_USERNAME") {
3083            self.database.redis.username = if username.is_empty() {
3084                None
3085            } else {
3086                Some(username)
3087            };
3088        }
3089        if let Ok(password) = std::env::var("DATABASE_REDIS_PASSWORD") {
3090            self.database.redis.password = Some(password);
3091        }
3092        self.database.redis.db = parse_env::<u32>("DATABASE_REDIS_DB", self.database.redis.db);
3093        if let Ok(prefix) = std::env::var("DATABASE_REDIS_KEY_PREFIX") {
3094            self.database.redis.key_prefix = prefix;
3095        }
3096        if let Ok(cluster_username) = std::env::var("DATABASE_REDIS_CLUSTER_USERNAME") {
3097            self.database.redis.cluster.username = if cluster_username.is_empty() {
3098                None
3099            } else {
3100                Some(cluster_username)
3101            };
3102        }
3103        if let Ok(cluster_password) = std::env::var("DATABASE_REDIS_CLUSTER_PASSWORD") {
3104            self.database.redis.cluster.password = Some(cluster_password);
3105        }
3106        self.database.redis.cluster.use_tls = parse_bool_env(
3107            "DATABASE_REDIS_CLUSTER_USE_TLS",
3108            self.database.redis.cluster.use_tls,
3109        );
3110
3111        // --- Database: MySQL ---
3112        if let Ok(host) = std::env::var("DATABASE_MYSQL_HOST") {
3113            self.database.mysql.host = host;
3114        }
3115        self.database.mysql.port =
3116            parse_env::<u16>("DATABASE_MYSQL_PORT", self.database.mysql.port);
3117        if let Ok(user) = std::env::var("DATABASE_MYSQL_USERNAME") {
3118            self.database.mysql.username = user;
3119        }
3120        if let Ok(pass) = std::env::var("DATABASE_MYSQL_PASSWORD") {
3121            self.database.mysql.password = pass;
3122        }
3123        if let Ok(db) = std::env::var("DATABASE_MYSQL_DATABASE") {
3124            self.database.mysql.database = db;
3125        }
3126        if let Ok(table) = std::env::var("DATABASE_MYSQL_TABLE_NAME") {
3127            self.database.mysql.table_name = table;
3128        }
3129        override_db_pool_settings(&mut self.database.mysql, "DATABASE_MYSQL");
3130
3131        // --- Database: PostgreSQL ---
3132        if let Ok(host) = std::env::var("DATABASE_POSTGRES_HOST") {
3133            self.database.postgres.host = host;
3134        }
3135        self.database.postgres.port =
3136            parse_env::<u16>("DATABASE_POSTGRES_PORT", self.database.postgres.port);
3137        if let Ok(user) = std::env::var("DATABASE_POSTGRES_USERNAME") {
3138            self.database.postgres.username = user;
3139        }
3140        if let Ok(pass) = std::env::var("DATABASE_POSTGRES_PASSWORD") {
3141            self.database.postgres.password = pass;
3142        }
3143        if let Ok(db) = std::env::var("DATABASE_POSTGRES_DATABASE") {
3144            self.database.postgres.database = db;
3145        }
3146        override_db_pool_settings(&mut self.database.postgres, "DATABASE_POSTGRES");
3147
3148        // --- Database: DynamoDB ---
3149        if let Ok(region) = std::env::var("DATABASE_DYNAMODB_REGION") {
3150            self.database.dynamodb.region = region;
3151        }
3152        if let Ok(table) = std::env::var("DATABASE_DYNAMODB_TABLE_NAME") {
3153            self.database.dynamodb.table_name = table;
3154        }
3155        if let Ok(endpoint) = std::env::var("DATABASE_DYNAMODB_ENDPOINT_URL") {
3156            self.database.dynamodb.endpoint_url = Some(endpoint);
3157        }
3158        if let Ok(key_id) = std::env::var("AWS_ACCESS_KEY_ID") {
3159            self.database.dynamodb.aws_access_key_id = Some(key_id);
3160        }
3161        if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") {
3162            self.database.dynamodb.aws_secret_access_key = Some(secret);
3163        }
3164
3165        // --- Database: SurrealDB ---
3166        if let Ok(url) = std::env::var("DATABASE_SURREALDB_URL") {
3167            self.database.surrealdb.url = url;
3168        }
3169        if let Ok(namespace) = std::env::var("DATABASE_SURREALDB_NAMESPACE") {
3170            self.database.surrealdb.namespace = namespace;
3171        }
3172        if let Ok(database) = std::env::var("DATABASE_SURREALDB_DATABASE") {
3173            self.database.surrealdb.database = database;
3174        }
3175        if let Ok(username) = std::env::var("DATABASE_SURREALDB_USERNAME") {
3176            self.database.surrealdb.username = username;
3177        }
3178        if let Ok(password) = std::env::var("DATABASE_SURREALDB_PASSWORD") {
3179            self.database.surrealdb.password = password;
3180        }
3181        if let Ok(table) = std::env::var("DATABASE_SURREALDB_TABLE_NAME") {
3182            self.database.surrealdb.table_name = table;
3183        }
3184
3185        // --- Redis Cluster ---
3186        let apply_redis_cluster_nodes = |options: &mut Self, nodes: &str| {
3187            let node_list: Vec<String> = nodes
3188                .split(',')
3189                .map(|s| s.trim())
3190                .filter(|s| !s.is_empty())
3191                .map(ToString::to_string)
3192                .collect();
3193
3194            options.adapter.cluster.nodes = node_list.clone();
3195            options.queue.redis_cluster.nodes = node_list.clone();
3196
3197            let parsed_nodes: Vec<ClusterNode> = node_list
3198                .iter()
3199                .filter_map(|seed| ClusterNode::from_seed(seed))
3200                .collect();
3201            options.database.redis.cluster.nodes = parsed_nodes.clone();
3202            options.database.redis.cluster_nodes = parsed_nodes;
3203        };
3204
3205        if let Ok(nodes) = std::env::var("REDIS_CLUSTER_NODES") {
3206            apply_redis_cluster_nodes(self, &nodes);
3207        }
3208        if let Ok(nodes) = std::env::var("DATABASE_REDIS_CLUSTER_NODES") {
3209            apply_redis_cluster_nodes(self, &nodes);
3210        }
3211        self.queue.redis_cluster.concurrency = parse_env::<u32>(
3212            "REDIS_CLUSTER_QUEUE_CONCURRENCY",
3213            self.queue.redis_cluster.concurrency,
3214        );
3215        if let Ok(prefix) = std::env::var("REDIS_CLUSTER_QUEUE_PREFIX") {
3216            self.queue.redis_cluster.prefix = Some(prefix);
3217        }
3218
3219        // --- SSL Configuration ---
3220        self.ssl.enabled = parse_bool_env("SSL_ENABLED", self.ssl.enabled);
3221        if let Ok(val) = std::env::var("SSL_CERT_PATH") {
3222            self.ssl.cert_path = val;
3223        }
3224        if let Ok(val) = std::env::var("SSL_KEY_PATH") {
3225            self.ssl.key_path = val;
3226        }
3227        self.ssl.redirect_http = parse_bool_env("SSL_REDIRECT_HTTP", self.ssl.redirect_http);
3228        if let Some(port) = parse_env_optional::<u16>("SSL_HTTP_PORT") {
3229            self.ssl.http_port = Some(port);
3230        }
3231
3232        // --- Unix Socket Configuration ---
3233        self.unix_socket.enabled = parse_bool_env("UNIX_SOCKET_ENABLED", self.unix_socket.enabled);
3234        if let Ok(path) = std::env::var("UNIX_SOCKET_PATH") {
3235            self.unix_socket.path = path;
3236        }
3237        if let Ok(mode_str) = std::env::var("UNIX_SOCKET_PERMISSION_MODE") {
3238            if mode_str.chars().all(|c| c.is_digit(8)) {
3239                if let Ok(mode) = u32::from_str_radix(&mode_str, 8) {
3240                    if mode <= 0o777 {
3241                        self.unix_socket.permission_mode = mode;
3242                    } else {
3243                        warn!(
3244                            "UNIX_SOCKET_PERMISSION_MODE '{}' exceeds maximum value 777. Using default: {:o}",
3245                            mode_str, self.unix_socket.permission_mode
3246                        );
3247                    }
3248                } else {
3249                    warn!(
3250                        "Failed to parse UNIX_SOCKET_PERMISSION_MODE '{}' as octal. Using default: {:o}",
3251                        mode_str, self.unix_socket.permission_mode
3252                    );
3253                }
3254            } else {
3255                warn!(
3256                    "UNIX_SOCKET_PERMISSION_MODE '{}' must contain only octal digits (0-7). Using default: {:o}",
3257                    mode_str, self.unix_socket.permission_mode
3258                );
3259            }
3260        }
3261
3262        // --- Metrics ---
3263        if let Ok(driver_str) = std::env::var("METRICS_DRIVER") {
3264            self.metrics.driver =
3265                parse_driver_enum(driver_str, self.metrics.driver.clone(), "Metrics");
3266        }
3267        self.metrics.enabled = parse_bool_env("METRICS_ENABLED", self.metrics.enabled);
3268        if let Ok(val) = std::env::var("METRICS_HOST") {
3269            self.metrics.host = val;
3270        }
3271        self.metrics.port = parse_env::<u16>("METRICS_PORT", self.metrics.port);
3272        if let Ok(val) = std::env::var("METRICS_PROMETHEUS_PREFIX") {
3273            self.metrics.prometheus.prefix = val;
3274        }
3275        self.metrics.tcp_exporter.enabled = parse_bool_env(
3276            "METRICS_TCP_EXPORTER_ENABLED",
3277            self.metrics.tcp_exporter.enabled,
3278        );
3279        if let Ok(val) = std::env::var("METRICS_TCP_EXPORTER_HOST") {
3280            self.metrics.tcp_exporter.host = val;
3281        }
3282        self.metrics.tcp_exporter.port =
3283            parse_env::<u16>("METRICS_TCP_EXPORTER_PORT", self.metrics.tcp_exporter.port);
3284        if let Some(buffer_size) = parse_env_optional::<usize>("METRICS_TCP_EXPORTER_BUFFER_SIZE") {
3285            self.metrics.tcp_exporter.buffer_size = Some(buffer_size);
3286        }
3287
3288        // --- HTTP API ---
3289        self.http_api.usage_enabled =
3290            parse_bool_env("HTTP_API_USAGE_ENABLED", self.http_api.usage_enabled);
3291
3292        // --- Rate Limiter ---
3293        self.rate_limiter.enabled =
3294            parse_bool_env("RATE_LIMITER_ENABLED", self.rate_limiter.enabled);
3295        self.rate_limiter.api_rate_limit.max_requests = parse_env::<u32>(
3296            "RATE_LIMITER_API_MAX_REQUESTS",
3297            self.rate_limiter.api_rate_limit.max_requests,
3298        );
3299        self.rate_limiter.api_rate_limit.window_seconds = parse_env::<u64>(
3300            "RATE_LIMITER_API_WINDOW_SECONDS",
3301            self.rate_limiter.api_rate_limit.window_seconds,
3302        );
3303        if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_API_TRUST_HOPS") {
3304            self.rate_limiter.api_rate_limit.trust_hops = Some(hops);
3305        }
3306        self.rate_limiter.websocket_rate_limit.max_requests = parse_env::<u32>(
3307            "RATE_LIMITER_WS_MAX_REQUESTS",
3308            self.rate_limiter.websocket_rate_limit.max_requests,
3309        );
3310        self.rate_limiter.websocket_rate_limit.window_seconds = parse_env::<u64>(
3311            "RATE_LIMITER_WS_WINDOW_SECONDS",
3312            self.rate_limiter.websocket_rate_limit.window_seconds,
3313        );
3314        if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_WS_TRUST_HOPS") {
3315            self.rate_limiter.websocket_rate_limit.trust_hops = Some(hops);
3316        }
3317        if let Ok(prefix) = std::env::var("RATE_LIMITER_REDIS_PREFIX") {
3318            self.rate_limiter.redis.prefix = Some(prefix);
3319        }
3320
3321        // --- Queue: Redis ---
3322        self.queue.redis.concurrency =
3323            parse_env::<u32>("QUEUE_REDIS_CONCURRENCY", self.queue.redis.concurrency);
3324        if let Ok(prefix) = std::env::var("QUEUE_REDIS_PREFIX") {
3325            self.queue.redis.prefix = Some(prefix);
3326        }
3327
3328        // --- Queue: SQS ---
3329        if let Ok(region) = std::env::var("QUEUE_SQS_REGION") {
3330            self.queue.sqs.region = region;
3331        }
3332        self.queue.sqs.visibility_timeout = parse_env::<i32>(
3333            "QUEUE_SQS_VISIBILITY_TIMEOUT",
3334            self.queue.sqs.visibility_timeout,
3335        );
3336        self.queue.sqs.max_messages =
3337            parse_env::<i32>("QUEUE_SQS_MAX_MESSAGES", self.queue.sqs.max_messages);
3338        self.queue.sqs.wait_time_seconds = parse_env::<i32>(
3339            "QUEUE_SQS_WAIT_TIME_SECONDS",
3340            self.queue.sqs.wait_time_seconds,
3341        );
3342        self.queue.sqs.concurrency =
3343            parse_env::<u32>("QUEUE_SQS_CONCURRENCY", self.queue.sqs.concurrency);
3344        self.queue.sqs.fifo = parse_bool_env("QUEUE_SQS_FIFO", self.queue.sqs.fifo);
3345        if let Ok(endpoint) = std::env::var("QUEUE_SQS_ENDPOINT_URL") {
3346            self.queue.sqs.endpoint_url = Some(endpoint);
3347        }
3348
3349        // --- Queue: SNS ---
3350        if let Ok(region) = std::env::var("QUEUE_SNS_REGION") {
3351            self.queue.sns.region = region;
3352        }
3353        if let Ok(topic_arn) = std::env::var("QUEUE_SNS_TOPIC_ARN") {
3354            self.queue.sns.topic_arn = topic_arn;
3355        }
3356        if let Ok(endpoint) = std::env::var("QUEUE_SNS_ENDPOINT_URL") {
3357            self.queue.sns.endpoint_url = Some(endpoint);
3358        }
3359
3360        // --- Webhooks ---
3361        self.webhooks.batching.enabled =
3362            parse_bool_env("WEBHOOK_BATCHING_ENABLED", self.webhooks.batching.enabled);
3363        self.webhooks.batching.duration =
3364            parse_env::<u64>("WEBHOOK_BATCHING_DURATION", self.webhooks.batching.duration);
3365        self.webhooks.batching.size =
3366            parse_env::<usize>("WEBHOOK_BATCHING_SIZE", self.webhooks.batching.size);
3367
3368        // --- NATS Adapter ---
3369        if let Ok(servers) = std::env::var("NATS_SERVERS") {
3370            self.adapter.nats.servers = servers.split(',').map(|s| s.trim().to_string()).collect();
3371        }
3372        if let Ok(user) = std::env::var("NATS_USERNAME") {
3373            self.adapter.nats.username = Some(user);
3374        }
3375        if let Ok(pass) = std::env::var("NATS_PASSWORD") {
3376            self.adapter.nats.password = Some(pass);
3377        }
3378        if let Ok(token) = std::env::var("NATS_TOKEN") {
3379            self.adapter.nats.token = Some(token);
3380        }
3381        if let Ok(prefix) = std::env::var("NATS_PREFIX") {
3382            self.adapter.nats.prefix = prefix;
3383        }
3384        self.adapter.nats.connection_timeout_ms = parse_env::<u64>(
3385            "NATS_CONNECTION_TIMEOUT_MS",
3386            self.adapter.nats.connection_timeout_ms,
3387        );
3388        self.adapter.nats.request_timeout_ms = parse_env::<u64>(
3389            "NATS_REQUEST_TIMEOUT_MS",
3390            self.adapter.nats.request_timeout_ms,
3391        );
3392        self.adapter.nats.discovery_max_wait_ms = parse_env::<u64>(
3393            "NATS_DISCOVERY_MAX_WAIT_MS",
3394            self.adapter.nats.discovery_max_wait_ms,
3395        );
3396        self.adapter.nats.discovery_idle_wait_ms = parse_env::<u64>(
3397            "NATS_DISCOVERY_IDLE_WAIT_MS",
3398            self.adapter.nats.discovery_idle_wait_ms,
3399        );
3400        if let Some(nodes) = parse_env_optional::<u32>("NATS_NODES_NUMBER") {
3401            self.adapter.nats.nodes_number = Some(nodes);
3402        }
3403        if let Some(v) = parse_env_optional::<usize>("NATS_SUBSCRIPTION_CAPACITY") {
3404            self.adapter.nats.subscription_capacity = Some(v);
3405        }
3406        if let Some(v) = parse_env_optional::<usize>("NATS_CLIENT_CAPACITY") {
3407            self.adapter.nats.client_capacity = Some(v);
3408        }
3409        if let Some(v) = parse_env_optional::<usize>("NATS_MAX_RECONNECTS") {
3410            self.adapter.nats.max_reconnects = Some(v);
3411        }
3412        if let Some(v) = parse_env_optional::<usize>("NATS_PRESENCE_SYNC_CHUNK_SIZE") {
3413            self.adapter.nats.presence_sync_chunk_size = Some(v);
3414        }
3415
3416        // --- Pulsar Adapter ---
3417        if let Ok(url) = std::env::var("PULSAR_URL") {
3418            self.adapter.pulsar.url = url;
3419        }
3420        if let Ok(prefix) = std::env::var("PULSAR_PREFIX") {
3421            self.adapter.pulsar.prefix = prefix;
3422        }
3423        if let Ok(token) = std::env::var("PULSAR_TOKEN") {
3424            self.adapter.pulsar.token = Some(token);
3425        }
3426        self.adapter.pulsar.request_timeout_ms = parse_env::<u64>(
3427            "PULSAR_REQUEST_TIMEOUT_MS",
3428            self.adapter.pulsar.request_timeout_ms,
3429        );
3430        if let Some(nodes) = parse_env_optional::<u32>("PULSAR_NODES_NUMBER") {
3431            self.adapter.pulsar.nodes_number = Some(nodes);
3432        }
3433
3434        // --- RabbitMQ Adapter ---
3435        if let Ok(url) = std::env::var("RABBITMQ_URL") {
3436            self.adapter.rabbitmq.url = url;
3437        }
3438        if let Ok(prefix) = std::env::var("RABBITMQ_PREFIX") {
3439            self.adapter.rabbitmq.prefix = prefix;
3440        }
3441        self.adapter.rabbitmq.connection_timeout_ms = parse_env::<u64>(
3442            "RABBITMQ_CONNECTION_TIMEOUT_MS",
3443            self.adapter.rabbitmq.connection_timeout_ms,
3444        );
3445        self.adapter.rabbitmq.request_timeout_ms = parse_env::<u64>(
3446            "RABBITMQ_REQUEST_TIMEOUT_MS",
3447            self.adapter.rabbitmq.request_timeout_ms,
3448        );
3449        if let Some(nodes) = parse_env_optional::<u32>("RABBITMQ_NODES_NUMBER") {
3450            self.adapter.rabbitmq.nodes_number = Some(nodes);
3451        }
3452
3453        // --- Google Pub/Sub Adapter ---
3454        if let Ok(project_id) = std::env::var("GOOGLE_PUBSUB_PROJECT_ID") {
3455            self.adapter.google_pubsub.project_id = project_id;
3456        }
3457        if let Ok(prefix) = std::env::var("GOOGLE_PUBSUB_PREFIX") {
3458            self.adapter.google_pubsub.prefix = prefix;
3459        }
3460        if let Ok(emulator_host) = std::env::var("PUBSUB_EMULATOR_HOST") {
3461            self.adapter.google_pubsub.emulator_host = Some(emulator_host);
3462        }
3463        self.adapter.google_pubsub.request_timeout_ms = parse_env::<u64>(
3464            "GOOGLE_PUBSUB_REQUEST_TIMEOUT_MS",
3465            self.adapter.google_pubsub.request_timeout_ms,
3466        );
3467        if let Some(nodes) = parse_env_optional::<u32>("GOOGLE_PUBSUB_NODES_NUMBER") {
3468            self.adapter.google_pubsub.nodes_number = Some(nodes);
3469        }
3470
3471        // --- Kafka Adapter ---
3472        if let Ok(brokers) = std::env::var("KAFKA_BROKERS") {
3473            self.adapter.kafka.brokers = brokers.split(',').map(|s| s.trim().to_string()).collect();
3474        }
3475        if let Ok(prefix) = std::env::var("KAFKA_PREFIX") {
3476            self.adapter.kafka.prefix = prefix;
3477        }
3478        if let Ok(protocol) = std::env::var("KAFKA_SECURITY_PROTOCOL") {
3479            self.adapter.kafka.security_protocol = Some(protocol);
3480        }
3481        if let Ok(mechanism) = std::env::var("KAFKA_SASL_MECHANISM") {
3482            self.adapter.kafka.sasl_mechanism = Some(mechanism);
3483        }
3484        if let Ok(username) = std::env::var("KAFKA_SASL_USERNAME") {
3485            self.adapter.kafka.sasl_username = Some(username);
3486        }
3487        if let Ok(password) = std::env::var("KAFKA_SASL_PASSWORD") {
3488            self.adapter.kafka.sasl_password = Some(password);
3489        }
3490        self.adapter.kafka.request_timeout_ms = parse_env::<u64>(
3491            "KAFKA_REQUEST_TIMEOUT_MS",
3492            self.adapter.kafka.request_timeout_ms,
3493        );
3494        if let Some(nodes) = parse_env_optional::<u32>("KAFKA_NODES_NUMBER") {
3495            self.adapter.kafka.nodes_number = Some(nodes);
3496        }
3497
3498        // --- Apache Iggy Adapter / Queue ---
3499        if let Ok(connection_string) = std::env::var("IGGY_CONNECTION_STRING") {
3500            self.adapter.iggy.connection_string = connection_string.clone();
3501            self.queue.iggy.connection_string = connection_string;
3502        }
3503        if let Ok(username) = std::env::var("IGGY_USERNAME") {
3504            let username = (!username.is_empty()).then_some(username);
3505            self.adapter.iggy.username = username.clone();
3506            self.queue.iggy.username = username;
3507        }
3508        if let Ok(password) = std::env::var("IGGY_PASSWORD") {
3509            let password = (!password.is_empty()).then_some(password);
3510            self.adapter.iggy.password = password.clone();
3511            self.queue.iggy.password = password;
3512        }
3513        if let Ok(consumer_name) = std::env::var("IGGY_CONSUMER_NAME") {
3514            let consumer_name = (!consumer_name.is_empty()).then_some(consumer_name);
3515            self.adapter.iggy.consumer_name = consumer_name.clone();
3516            self.queue.iggy.consumer_name = consumer_name;
3517        } else if let Ok(process_id) = std::env::var("INSTANCE_PROCESS_ID") {
3518            let process_id = (!process_id.is_empty()).then_some(process_id);
3519            self.adapter.iggy.consumer_name = process_id.clone();
3520            self.queue.iggy.consumer_name = process_id;
3521        }
3522        if let Ok(stream) = std::env::var("IGGY_STREAM") {
3523            self.adapter.iggy.stream = stream.clone();
3524            self.queue.iggy.stream = stream;
3525        }
3526        if let Ok(prefix) = std::env::var("IGGY_TOPIC_PREFIX") {
3527            self.adapter.iggy.topic_prefix = prefix;
3528        }
3529        if let Ok(prefix) = std::env::var("IGGY_QUEUE_TOPIC_PREFIX") {
3530            self.queue.iggy.queue_topic_prefix = prefix;
3531        }
3532        if let Ok(prefix) = std::env::var("IGGY_CONSUMER_GROUP_PREFIX") {
3533            self.queue.iggy.consumer_group_prefix = prefix;
3534        }
3535        self.adapter.iggy.request_timeout_ms = parse_env::<u64>(
3536            "IGGY_REQUEST_TIMEOUT_MS",
3537            self.adapter.iggy.request_timeout_ms,
3538        );
3539        self.queue.iggy.request_timeout_ms = parse_env::<u64>(
3540            "IGGY_REQUEST_TIMEOUT_MS",
3541            self.queue.iggy.request_timeout_ms,
3542        );
3543        self.adapter.iggy.poll_interval_ms =
3544            parse_env::<u64>("IGGY_POLL_INTERVAL_MS", self.adapter.iggy.poll_interval_ms);
3545        self.queue.iggy.poll_interval_ms =
3546            parse_env::<u64>("IGGY_POLL_INTERVAL_MS", self.queue.iggy.poll_interval_ms);
3547        self.adapter.iggy.poll_batch_size =
3548            parse_env::<u32>("IGGY_POLL_BATCH_SIZE", self.adapter.iggy.poll_batch_size);
3549        self.queue.iggy.poll_batch_size =
3550            parse_env::<u32>("IGGY_POLL_BATCH_SIZE", self.queue.iggy.poll_batch_size);
3551        self.adapter.iggy.partitions_count = parse_env::<u32>(
3552            "ADAPTER_IGGY_PARTITIONS_COUNT",
3553            parse_env::<u32>("IGGY_PARTITIONS_COUNT", self.adapter.iggy.partitions_count),
3554        );
3555        self.queue.iggy.partitions_count = parse_env::<u32>(
3556            "QUEUE_IGGY_PARTITIONS_COUNT",
3557            parse_env::<u32>("IGGY_PARTITIONS_COUNT", self.queue.iggy.partitions_count),
3558        );
3559        self.adapter.iggy.partition_id = parse_env::<u32>(
3560            "ADAPTER_IGGY_PARTITION_ID",
3561            parse_env::<u32>("IGGY_PARTITION_ID", self.adapter.iggy.partition_id),
3562        );
3563        self.queue.iggy.partition_id = parse_env::<u32>(
3564            "QUEUE_IGGY_PARTITION_ID",
3565            parse_env::<u32>("IGGY_PARTITION_ID", self.queue.iggy.partition_id),
3566        );
3567        self.adapter.iggy.auto_create =
3568            parse_env::<bool>("IGGY_AUTO_CREATE", self.adapter.iggy.auto_create);
3569        self.queue.iggy.auto_create =
3570            parse_env::<bool>("IGGY_AUTO_CREATE", self.queue.iggy.auto_create);
3571        self.adapter.iggy.start_from_latest = parse_env::<bool>(
3572            "IGGY_START_FROM_LATEST",
3573            self.adapter.iggy.start_from_latest,
3574        );
3575        if let Some(nodes) = parse_env_optional::<u32>("IGGY_NODES_NUMBER") {
3576            self.adapter.iggy.nodes_number = Some(nodes);
3577            self.queue.iggy.nodes_number = Some(nodes);
3578        }
3579
3580        // --- CORS ---
3581        if let Ok(origins) = std::env::var("CORS_ORIGINS") {
3582            let parsed: Vec<String> = origins.split(',').map(|s| s.trim().to_string()).collect();
3583            if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&parsed) {
3584                warn!(
3585                    "CORS_ORIGINS contains invalid patterns: {}. Keeping previous CORS origins.",
3586                    e
3587                );
3588            } else {
3589                self.cors.origin = parsed;
3590            }
3591        }
3592        if let Ok(methods) = std::env::var("CORS_METHODS") {
3593            self.cors.methods = methods.split(',').map(|s| s.trim().to_string()).collect();
3594        }
3595        if let Ok(headers) = std::env::var("CORS_HEADERS") {
3596            self.cors.allowed_headers = headers.split(',').map(|s| s.trim().to_string()).collect();
3597        }
3598        self.cors.credentials = parse_bool_env("CORS_CREDENTIALS", self.cors.credentials);
3599
3600        // --- Performance Tuning ---
3601        self.database_pooling.enabled =
3602            parse_bool_env("DATABASE_POOLING_ENABLED", self.database_pooling.enabled);
3603        if let Some(min) = parse_env_optional::<u32>("DATABASE_POOL_MIN") {
3604            self.database_pooling.min = min;
3605        }
3606        if let Some(max) = parse_env_optional::<u32>("DATABASE_POOL_MAX") {
3607            self.database_pooling.max = max;
3608        }
3609
3610        if let Some(pool_size) = parse_env_optional::<u32>("DATABASE_CONNECTION_POOL_SIZE") {
3611            self.database.mysql.connection_pool_size = pool_size;
3612            self.database.postgres.connection_pool_size = pool_size;
3613        }
3614        if let Some(cache_ttl) = parse_env_optional::<u64>("CACHE_TTL_SECONDS") {
3615            self.app_manager.cache.ttl = cache_ttl;
3616            self.channel_limits.cache_ttl = cache_ttl;
3617            self.database.mysql.cache_ttl = cache_ttl;
3618            self.database.postgres.cache_ttl = cache_ttl;
3619            self.database.surrealdb.cache_ttl = cache_ttl;
3620            self.cache.memory.ttl = cache_ttl;
3621        }
3622        if let Some(cleanup_interval) = parse_env_optional::<u64>("CACHE_CLEANUP_INTERVAL") {
3623            self.database.mysql.cache_cleanup_interval = cleanup_interval;
3624            self.database.postgres.cache_cleanup_interval = cleanup_interval;
3625            self.cache.memory.cleanup_interval = cleanup_interval;
3626        }
3627        if let Some(max_capacity) = parse_env_optional::<u64>("CACHE_MAX_CAPACITY") {
3628            self.database.mysql.cache_max_capacity = max_capacity;
3629            self.database.postgres.cache_max_capacity = max_capacity;
3630            self.database.surrealdb.cache_max_capacity = max_capacity;
3631            self.cache.memory.max_capacity = max_capacity;
3632        }
3633
3634        let skip_inline_apps = parse_bool_env("SOCKUDO_SKIP_INLINE_APPS", false)
3635            || !parse_bool_env("APP_MANAGER_REGISTER_INLINE_APPS", true);
3636        if skip_inline_apps {
3637            let app_count = self.app_manager.array.apps.len();
3638            self.app_manager.array.apps.clear();
3639            if app_count > 0 {
3640                info!(
3641                    "Skipping {} inline app(s) from configuration due to environment override",
3642                    app_count
3643                );
3644            }
3645        }
3646
3647        let default_app_id = std::env::var("SOCKUDO_DEFAULT_APP_ID").ok();
3648        let default_app_key = std::env::var("SOCKUDO_DEFAULT_APP_KEY").ok();
3649        let default_app_secret = std::env::var("SOCKUDO_DEFAULT_APP_SECRET").ok();
3650        let default_app_enabled_env = std::env::var("SOCKUDO_DEFAULT_APP_ENABLED").ok();
3651        let default_app_enabled = parse_bool_env("SOCKUDO_DEFAULT_APP_ENABLED", true);
3652        let default_app_credentials_configured =
3653            default_app_id.is_some() || default_app_key.is_some() || default_app_secret.is_some();
3654        let default_app_env_configured =
3655            default_app_credentials_configured || default_app_enabled_env.is_some();
3656        let default_app_should_override_inline = default_app_credentials_configured
3657            || default_app_enabled_env.is_some_and(|_| !default_app_enabled);
3658
3659        if default_app_should_override_inline {
3660            let app_count = self.app_manager.array.apps.len();
3661            self.app_manager.array.apps.clear();
3662            if app_count > 0 {
3663                info!(
3664                    "Replacing {} inline app(s) from configuration with SOCKUDO_DEFAULT_APP_* settings",
3665                    app_count
3666                );
3667            }
3668        }
3669
3670        if let (Some(app_id), Some(app_key), Some(app_secret)) =
3671            (default_app_id, default_app_key, default_app_secret)
3672            && default_app_enabled
3673        {
3674            let default_app = App::from_policy(
3675                app_id,
3676                app_key,
3677                app_secret,
3678                default_app_enabled,
3679                crate::app::AppPolicy {
3680                    limits: crate::app::AppLimitsPolicy {
3681                        max_connections: parse_env::<u32>(
3682                            "SOCKUDO_DEFAULT_APP_MAX_CONNECTIONS",
3683                            100,
3684                        ),
3685                        max_backend_events_per_second: Some(parse_env::<u32>(
3686                            "SOCKUDO_DEFAULT_APP_MAX_BACKEND_EVENTS_PER_SECOND",
3687                            100,
3688                        )),
3689                        max_client_events_per_second: parse_env::<u32>(
3690                            "SOCKUDO_DEFAULT_APP_MAX_CLIENT_EVENTS_PER_SECOND",
3691                            100,
3692                        ),
3693                        max_read_requests_per_second: Some(parse_env::<u32>(
3694                            "SOCKUDO_DEFAULT_APP_MAX_READ_REQUESTS_PER_SECOND",
3695                            100,
3696                        )),
3697                        max_presence_members_per_channel: Some(parse_env::<u32>(
3698                            "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBERS_PER_CHANNEL",
3699                            100,
3700                        )),
3701                        max_presence_member_size_in_kb: Some(parse_env::<u32>(
3702                            "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBER_SIZE_IN_KB",
3703                            100,
3704                        )),
3705                        max_channel_name_length: Some(parse_env::<u32>(
3706                            "SOCKUDO_DEFAULT_APP_MAX_CHANNEL_NAME_LENGTH",
3707                            100,
3708                        )),
3709                        max_event_channels_at_once: Some(parse_env::<u32>(
3710                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_CHANNELS_AT_ONCE",
3711                            100,
3712                        )),
3713                        max_event_name_length: Some(parse_env::<u32>(
3714                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_NAME_LENGTH",
3715                            100,
3716                        )),
3717                        max_event_payload_in_kb: Some(parse_env::<u32>(
3718                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_PAYLOAD_IN_KB",
3719                            100,
3720                        )),
3721                        max_event_batch_size: Some(parse_env::<u32>(
3722                            "SOCKUDO_DEFAULT_APP_MAX_EVENT_BATCH_SIZE",
3723                            100,
3724                        )),
3725                        decay_seconds: None,
3726                        terminate_on_limit: false,
3727                        message_rate_limit: None,
3728                    },
3729                    features: crate::app::AppFeaturesPolicy {
3730                        enable_client_messages: std::env::var(
3731                            "SOCKUDO_DEFAULT_APP_ENABLE_CLIENT_MESSAGES",
3732                        )
3733                        .ok()
3734                        .map(|value| {
3735                            matches!(
3736                                value.trim().to_ascii_lowercase().as_str(),
3737                                "true" | "1" | "yes" | "on"
3738                            )
3739                        })
3740                        .unwrap_or_else(|| parse_bool_env("SOCKUDO_ENABLE_CLIENT_MESSAGES", false)),
3741                        enable_user_authentication: Some(parse_bool_env(
3742                            "SOCKUDO_DEFAULT_APP_ENABLE_USER_AUTHENTICATION",
3743                            false,
3744                        )),
3745                        enable_watchlist_events: Some(parse_bool_env(
3746                            "SOCKUDO_DEFAULT_APP_ENABLE_WATCHLIST_EVENTS",
3747                            false,
3748                        )),
3749                    },
3750                    channels: crate::app::AppChannelsPolicy {
3751                        allowed_origins: {
3752                            if let Ok(origins_str) =
3753                                std::env::var("SOCKUDO_DEFAULT_APP_ALLOWED_ORIGINS")
3754                            {
3755                                if !origins_str.is_empty() {
3756                                    Some(
3757                                        origins_str
3758                                            .split(',')
3759                                            .map(|s| s.trim().to_string())
3760                                            .collect(),
3761                                    )
3762                                } else {
3763                                    None
3764                                }
3765                            } else {
3766                                None
3767                            }
3768                        },
3769                        annotations_enabled: Some(parse_bool_env(
3770                            "SOCKUDO_DEFAULT_APP_ANNOTATIONS_ENABLED",
3771                            false,
3772                        )),
3773                        channel_delta_compression: None,
3774                        channel_namespaces: None,
3775                    },
3776                    webhooks: None,
3777                    idempotency: None,
3778                    connection_recovery: None,
3779                    history: None,
3780                    presence_history: None,
3781                },
3782            );
3783
3784            self.app_manager.array.apps.push(default_app);
3785            info!("Successfully registered default app from env");
3786        } else if default_app_env_configured && !default_app_enabled {
3787            info!("Default app registration disabled by SOCKUDO_DEFAULT_APP_ENABLED=false");
3788        } else if default_app_credentials_configured {
3789            warn!(
3790                "SOCKUDO_DEFAULT_APP_* environment was configured but id, key, and secret were not all provided; no default app registered"
3791            );
3792        }
3793
3794        // Special handling for REDIS_URL
3795        if let Ok(redis_url_env) = std::env::var("REDIS_URL") {
3796            info!("Applying REDIS_URL environment variable override");
3797
3798            let redis_url_json = sonic_rs::json!(redis_url_env);
3799
3800            self.adapter
3801                .redis
3802                .redis_pub_options
3803                .insert("url".to_string(), redis_url_json.clone());
3804            self.adapter
3805                .redis
3806                .redis_sub_options
3807                .insert("url".to_string(), redis_url_json);
3808
3809            self.cache.redis.url_override = Some(redis_url_env.clone());
3810            self.queue.redis.url_override = Some(redis_url_env.clone());
3811            self.rate_limiter.redis.url_override = Some(redis_url_env);
3812        }
3813
3814        // --- Logging Configuration ---
3815        let has_colors_env = std::env::var("LOG_COLORS_ENABLED").is_ok();
3816        let has_target_env = std::env::var("LOG_INCLUDE_TARGET").is_ok();
3817        if has_colors_env || has_target_env {
3818            let logging_config = self.logging.get_or_insert_with(Default::default);
3819            if has_colors_env {
3820                logging_config.colors_enabled =
3821                    parse_bool_env("LOG_COLORS_ENABLED", logging_config.colors_enabled);
3822            }
3823            if has_target_env {
3824                logging_config.include_target =
3825                    parse_bool_env("LOG_INCLUDE_TARGET", logging_config.include_target);
3826            }
3827        }
3828
3829        // --- Cleanup Configuration ---
3830        self.cleanup.async_enabled =
3831            parse_bool_env("CLEANUP_ASYNC_ENABLED", self.cleanup.async_enabled);
3832        self.cleanup.fallback_to_sync =
3833            parse_bool_env("CLEANUP_FALLBACK_TO_SYNC", self.cleanup.fallback_to_sync);
3834        self.cleanup.queue_buffer_size =
3835            parse_env::<usize>("CLEANUP_QUEUE_BUFFER_SIZE", self.cleanup.queue_buffer_size);
3836        self.cleanup.batch_size = parse_env::<usize>("CLEANUP_BATCH_SIZE", self.cleanup.batch_size);
3837        self.cleanup.batch_timeout_ms =
3838            parse_env::<u64>("CLEANUP_BATCH_TIMEOUT_MS", self.cleanup.batch_timeout_ms);
3839        if let Ok(worker_threads_str) = std::env::var("CLEANUP_WORKER_THREADS") {
3840            self.cleanup.worker_threads = if worker_threads_str.to_lowercase() == "auto" {
3841                WorkerThreadsConfig::Auto
3842            } else if let Ok(n) = worker_threads_str.parse::<usize>() {
3843                WorkerThreadsConfig::Fixed(n)
3844            } else {
3845                warn!(
3846                    "Invalid CLEANUP_WORKER_THREADS value '{}', keeping current setting",
3847                    worker_threads_str
3848                );
3849                self.cleanup.worker_threads.clone()
3850            };
3851        }
3852        self.cleanup.max_retry_attempts = parse_env::<u32>(
3853            "CLEANUP_MAX_RETRY_ATTEMPTS",
3854            self.cleanup.max_retry_attempts,
3855        );
3856
3857        // Cluster health configuration
3858        self.cluster_health.enabled =
3859            parse_bool_env("CLUSTER_HEALTH_ENABLED", self.cluster_health.enabled);
3860        self.cluster_health.heartbeat_interval_ms = parse_env::<u64>(
3861            "CLUSTER_HEALTH_HEARTBEAT_INTERVAL",
3862            self.cluster_health.heartbeat_interval_ms,
3863        );
3864        self.cluster_health.node_timeout_ms = parse_env::<u64>(
3865            "CLUSTER_HEALTH_NODE_TIMEOUT",
3866            self.cluster_health.node_timeout_ms,
3867        );
3868        self.cluster_health.cleanup_interval_ms = parse_env::<u64>(
3869            "CLUSTER_HEALTH_CLEANUP_INTERVAL",
3870            self.cluster_health.cleanup_interval_ms,
3871        );
3872
3873        // Health check endpoint timeout
3874        self.health_check_timeout_ms =
3875            parse_env::<u64>("HEALTH_CHECK_TIMEOUT_MS", self.health_check_timeout_ms);
3876
3877        // Tag filtering configuration
3878        self.tag_filtering.enabled =
3879            parse_bool_env("TAG_FILTERING_ENABLED", self.tag_filtering.enabled);
3880
3881        // WebSocket buffer configuration
3882        if let Ok(val) = std::env::var("WEBSOCKET_MAX_MESSAGES") {
3883            if val.to_lowercase() == "none" || val == "0" {
3884                self.websocket.max_messages = None;
3885            } else if let Ok(n) = val.parse::<usize>() {
3886                self.websocket.max_messages = Some(n);
3887            }
3888        }
3889        if let Ok(val) = std::env::var("WEBSOCKET_MAX_BYTES") {
3890            if val.to_lowercase() == "none" || val == "0" {
3891                self.websocket.max_bytes = None;
3892            } else if let Ok(n) = val.parse::<usize>() {
3893                self.websocket.max_bytes = Some(n);
3894            }
3895        }
3896        self.websocket.disconnect_on_buffer_full = parse_bool_env(
3897            "WEBSOCKET_DISCONNECT_ON_BUFFER_FULL",
3898            self.websocket.disconnect_on_buffer_full,
3899        );
3900        self.websocket.max_message_size = parse_env::<usize>(
3901            "WEBSOCKET_MAX_MESSAGE_SIZE",
3902            self.websocket.max_message_size,
3903        );
3904        self.websocket.max_frame_size =
3905            parse_env::<usize>("WEBSOCKET_MAX_FRAME_SIZE", self.websocket.max_frame_size);
3906        self.websocket.write_buffer_size = parse_env::<usize>(
3907            "WEBSOCKET_WRITE_BUFFER_SIZE",
3908            self.websocket.write_buffer_size,
3909        );
3910        self.websocket.max_backpressure = parse_env::<usize>(
3911            "WEBSOCKET_MAX_BACKPRESSURE",
3912            self.websocket.max_backpressure,
3913        );
3914        self.websocket.auto_ping = parse_bool_env("WEBSOCKET_AUTO_PING", self.websocket.auto_ping);
3915        self.websocket.ping_interval =
3916            parse_env::<u32>("WEBSOCKET_PING_INTERVAL", self.websocket.ping_interval);
3917        self.websocket.idle_timeout =
3918            parse_env::<u32>("WEBSOCKET_IDLE_TIMEOUT", self.websocket.idle_timeout);
3919        if let Ok(mode) = std::env::var("WEBSOCKET_COMPRESSION") {
3920            self.websocket.compression = mode;
3921        }
3922
3923        // Connection recovery (includes serial + message_id + replay buffer)
3924        self.connection_recovery.enabled = parse_bool_env(
3925            "CONNECTION_RECOVERY_ENABLED",
3926            self.connection_recovery.enabled,
3927        );
3928        self.connection_recovery.buffer_ttl_seconds = parse_env::<u64>(
3929            "CONNECTION_RECOVERY_BUFFER_TTL",
3930            self.connection_recovery.buffer_ttl_seconds,
3931        );
3932        self.connection_recovery.max_buffer_size = parse_env::<usize>(
3933            "CONNECTION_RECOVERY_MAX_BUFFER_SIZE",
3934            self.connection_recovery.max_buffer_size,
3935        );
3936
3937        self.history.enabled = parse_bool_env("HISTORY_ENABLED", self.history.enabled);
3938        self.history.rewind_enabled =
3939            parse_bool_env("HISTORY_REWIND_ENABLED", self.history.rewind_enabled);
3940        self.history.retention_window_seconds = parse_env::<u64>(
3941            "HISTORY_RETENTION_WINDOW_SECONDS",
3942            self.history.retention_window_seconds,
3943        );
3944        self.history.max_page_size =
3945            parse_env::<usize>("HISTORY_MAX_PAGE_SIZE", self.history.max_page_size);
3946        self.history.writer_shards =
3947            parse_env::<usize>("HISTORY_WRITER_SHARDS", self.history.writer_shards);
3948        self.history.writer_queue_capacity = parse_env::<usize>(
3949            "HISTORY_WRITER_QUEUE_CAPACITY",
3950            self.history.writer_queue_capacity,
3951        );
3952        if let Ok(backend) = std::env::var("HISTORY_BACKEND") {
3953            self.history.backend = HistoryBackend::from_str(&backend)?;
3954        }
3955        if let Ok(max_messages) = std::env::var("HISTORY_MAX_MESSAGES_PER_CHANNEL") {
3956            self.history.max_messages_per_channel = Some(
3957                max_messages
3958                    .parse::<usize>()
3959                    .map_err(|e| format!("Invalid HISTORY_MAX_MESSAGES_PER_CHANNEL: {e}"))?,
3960            );
3961        }
3962        if let Ok(max_bytes) = std::env::var("HISTORY_MAX_BYTES_PER_CHANNEL") {
3963            self.history.max_bytes_per_channel = Some(
3964                max_bytes
3965                    .parse::<u64>()
3966                    .map_err(|e| format!("Invalid HISTORY_MAX_BYTES_PER_CHANNEL: {e}"))?,
3967            );
3968        }
3969        if let Ok(table_prefix) = std::env::var("HISTORY_POSTGRES_TABLE_PREFIX") {
3970            self.history.postgres.table_prefix = table_prefix;
3971        }
3972        self.history.postgres.write_timeout_ms = parse_env::<u64>(
3973            "HISTORY_POSTGRES_WRITE_TIMEOUT_MS",
3974            self.history.postgres.write_timeout_ms,
3975        );
3976        self.history.purge_interval_seconds = parse_env::<u64>(
3977            "HISTORY_PURGE_INTERVAL_SECONDS",
3978            self.history.purge_interval_seconds,
3979        );
3980        self.history.purge_batch_size =
3981            parse_env::<usize>("HISTORY_PURGE_BATCH_SIZE", self.history.purge_batch_size);
3982        self.history.max_purge_per_tick = parse_env::<usize>(
3983            "HISTORY_MAX_PURGE_PER_TICK",
3984            self.history.max_purge_per_tick,
3985        );
3986
3987        self.presence_history.enabled =
3988            parse_bool_env("PRESENCE_HISTORY_ENABLED", self.presence_history.enabled);
3989        self.presence_history.retention_window_seconds = parse_env::<u64>(
3990            "PRESENCE_HISTORY_RETENTION_WINDOW_SECONDS",
3991            self.presence_history.retention_window_seconds,
3992        );
3993        self.presence_history.max_page_size = parse_env::<usize>(
3994            "PRESENCE_HISTORY_MAX_PAGE_SIZE",
3995            self.presence_history.max_page_size,
3996        );
3997        if let Ok(max_events) = std::env::var("PRESENCE_HISTORY_MAX_EVENTS_PER_CHANNEL") {
3998            self.presence_history.max_events_per_channel =
3999                Some(max_events.parse::<usize>().map_err(|e| {
4000                    format!("Invalid PRESENCE_HISTORY_MAX_EVENTS_PER_CHANNEL: {e}")
4001                })?);
4002        }
4003        if let Ok(max_bytes) = std::env::var("PRESENCE_HISTORY_MAX_BYTES_PER_CHANNEL") {
4004            self.presence_history.max_bytes_per_channel = Some(
4005                max_bytes
4006                    .parse::<u64>()
4007                    .map_err(|e| format!("Invalid PRESENCE_HISTORY_MAX_BYTES_PER_CHANNEL: {e}"))?,
4008            );
4009        }
4010        self.idempotency.enabled = parse_bool_env("IDEMPOTENCY_ENABLED", self.idempotency.enabled);
4011        self.idempotency.ttl_seconds =
4012            parse_env::<u64>("IDEMPOTENCY_TTL_SECONDS", self.idempotency.ttl_seconds);
4013        self.idempotency.max_key_length = parse_env::<usize>(
4014            "IDEMPOTENCY_MAX_KEY_LENGTH",
4015            self.idempotency.max_key_length,
4016        );
4017
4018        self.ephemeral.enabled = parse_bool_env("EPHEMERAL_ENABLED", self.ephemeral.enabled);
4019
4020        self.echo_control.enabled =
4021            parse_bool_env("ECHO_CONTROL_ENABLED", self.echo_control.enabled);
4022        self.echo_control.default_echo_messages = parse_bool_env(
4023            "ECHO_CONTROL_DEFAULT_ECHO_MESSAGES",
4024            self.echo_control.default_echo_messages,
4025        );
4026
4027        self.event_name_filtering.enabled = parse_bool_env(
4028            "EVENT_NAME_FILTERING_ENABLED",
4029            self.event_name_filtering.enabled,
4030        );
4031        self.event_name_filtering.max_events_per_filter = parse_env::<usize>(
4032            "EVENT_NAME_FILTERING_MAX_EVENTS_PER_FILTER",
4033            self.event_name_filtering.max_events_per_filter,
4034        );
4035        self.event_name_filtering.max_event_name_length = parse_env::<usize>(
4036            "EVENT_NAME_FILTERING_MAX_EVENT_NAME_LENGTH",
4037            self.event_name_filtering.max_event_name_length,
4038        );
4039        self.versioned_messages.enabled = parse_bool_env(
4040            "VERSIONED_MESSAGES_ENABLED",
4041            self.versioned_messages.enabled,
4042        );
4043        if let Ok(driver_str) = std::env::var("VERSIONED_MESSAGES_DRIVER") {
4044            self.versioned_messages.driver = parse_driver_enum(
4045                driver_str,
4046                self.versioned_messages.driver.clone(),
4047                "VersionedMessages Backend",
4048            );
4049        }
4050        if let Ok(driver_str) = std::env::var("PUSH_STORAGE_DRIVER") {
4051            self.push.storage_driver = parse_driver_enum(
4052                driver_str,
4053                self.push.storage_driver.clone(),
4054                "Push Storage Backend",
4055            );
4056        }
4057        if let Ok(driver_str) = std::env::var("PUSH_QUEUE_DRIVER") {
4058            self.push.queue_driver = parse_driver_enum(
4059                driver_str,
4060                self.push.queue_driver.clone(),
4061                "Push Queue Backend",
4062            );
4063        }
4064        self.push.fcm_enabled = parse_bool_env("PUSH_FCM_ENABLED", self.push.fcm_enabled);
4065        self.push.apns_enabled = parse_bool_env("PUSH_APNS_ENABLED", self.push.apns_enabled);
4066        self.push.webpush_enabled =
4067            parse_bool_env("PUSH_WEBPUSH_ENABLED", self.push.webpush_enabled);
4068        self.push.hms_enabled = parse_bool_env("PUSH_HMS_ENABLED", self.push.hms_enabled);
4069        self.push.wns_enabled = parse_bool_env("PUSH_WNS_ENABLED", self.push.wns_enabled);
4070        if let Some(key) = parse_env_optional::<String>("PUSH_CREDENTIAL_ENCRYPTION_KEY") {
4071            self.push.credential_encryption_key = Some(key);
4072        }
4073        self.push.fanout_fast_threshold = parse_env::<u64>(
4074            "PUSH_FANOUT_FAST_THRESHOLD",
4075            self.push.fanout_fast_threshold,
4076        );
4077        self.push.fanout_shard_size =
4078            parse_env::<u64>("PUSH_FANOUT_SHARD_SIZE", self.push.fanout_shard_size);
4079        self.push.fanout_sync_threshold = parse_env::<u64>(
4080            "PUSH_FANOUT_SYNC_THRESHOLD",
4081            self.push.fanout_sync_threshold,
4082        );
4083        self.push.backpressure_lag_threshold_secs = parse_env::<u64>(
4084            "PUSH_BACKPRESSURE_LAG_THRESHOLD_SECS",
4085            self.push.backpressure_lag_threshold_secs,
4086        );
4087        self.push.publish_status_ttl_days = parse_env::<u64>(
4088            "PUSH_PUBLISH_STATUS_TTL_DAYS",
4089            self.push.publish_status_ttl_days,
4090        );
4091        self.push.circuit_breaker.failure_threshold = parse_env::<u32>(
4092            "PUSH_FAILURE_THRESHOLD",
4093            self.push.circuit_breaker.failure_threshold,
4094        );
4095        self.push.scheduler_interval_secs = parse_env::<u64>(
4096            "PUSH_SCHEDULER_INTERVAL_SECS",
4097            self.push.scheduler_interval_secs,
4098        );
4099        self.push.stale_device_max_age_days = parse_env::<u64>(
4100            "PUSH_STALE_DEVICE_MAX_AGE_DAYS",
4101            self.push.stale_device_max_age_days,
4102        );
4103        self.push.analytics_enabled =
4104            parse_bool_env("PUSH_ANALYTICS_ENABLED", self.push.analytics_enabled);
4105        self.push.default_quotas.acceptance_rps = parse_env::<u64>(
4106            "PUSH_DEFAULT_ACCEPTANCE_RPS",
4107            self.push.default_quotas.acceptance_rps,
4108        );
4109        self.push.default_quotas.delivery_quota_daily = parse_env::<u64>(
4110            "PUSH_DEFAULT_DELIVERY_QUOTA_DAILY",
4111            self.push.default_quotas.delivery_quota_daily,
4112        );
4113        self.push.default_quotas.fanout_max = parse_env::<u64>(
4114            "PUSH_DEFAULT_FANOUT_MAX",
4115            self.push.default_quotas.fanout_max,
4116        );
4117        self.push.default_quotas.inflight_max = parse_env::<u64>(
4118            "PUSH_DEFAULT_INFLIGHT_MAX",
4119            self.push.default_quotas.inflight_max,
4120        );
4121        self.versioned_messages.max_page_size = parse_env::<usize>(
4122            "VERSIONED_MESSAGES_MAX_PAGE_SIZE",
4123            self.versioned_messages.max_page_size,
4124        );
4125        self.versioned_messages.retention_window_seconds = parse_env::<u64>(
4126            "VERSIONED_MESSAGES_RETENTION_WINDOW_SECONDS",
4127            self.versioned_messages.retention_window_seconds,
4128        );
4129        self.versioned_messages.purge_interval_seconds = parse_env::<u64>(
4130            "VERSIONED_MESSAGES_PURGE_INTERVAL_SECONDS",
4131            self.versioned_messages.purge_interval_seconds,
4132        );
4133        self.versioned_messages.purge_batch_size = parse_env::<usize>(
4134            "VERSIONED_MESSAGES_PURGE_BATCH_SIZE",
4135            self.versioned_messages.purge_batch_size,
4136        );
4137        self.versioned_messages.max_purge_per_tick = parse_env::<usize>(
4138            "VERSIONED_MESSAGES_MAX_PURGE_PER_TICK",
4139            self.versioned_messages.max_purge_per_tick,
4140        );
4141        self.annotations.enabled = parse_bool_env("ANNOTATIONS_ENABLED", self.annotations.enabled);
4142
4143        Ok(())
4144    }
4145
4146    pub fn validate(&self) -> Result<(), String> {
4147        if self.unix_socket.enabled {
4148            if self.unix_socket.path.is_empty() {
4149                return Err(
4150                    "Unix socket path cannot be empty when Unix socket is enabled".to_string(),
4151                );
4152            }
4153
4154            self.validate_unix_socket_security()?;
4155
4156            if self.ssl.enabled {
4157                tracing::warn!(
4158                    "Both Unix socket and SSL are enabled. This is unusual as Unix sockets are typically used behind reverse proxies that handle SSL termination."
4159                );
4160            }
4161
4162            if self.unix_socket.permission_mode > 0o777 {
4163                return Err(format!(
4164                    "Unix socket permission_mode ({:o}) is invalid. Must be a valid octal mode (0o000 to 0o777)",
4165                    self.unix_socket.permission_mode
4166                ));
4167            }
4168        }
4169
4170        if let Err(e) = self.cleanup.validate() {
4171            return Err(format!("Invalid cleanup configuration: {}", e));
4172        }
4173
4174        if self.history.enabled {
4175            if self.history.max_page_size == 0 {
4176                return Err("history.max_page_size must be greater than 0".to_string());
4177            }
4178            if self.history.writer_shards == 0 {
4179                return Err("history.writer_shards must be greater than 0".to_string());
4180            }
4181            if self.history.writer_queue_capacity == 0 {
4182                return Err("history.writer_queue_capacity must be greater than 0".to_string());
4183            }
4184            if self.history.retention_window_seconds == 0 {
4185                return Err("history.retention_window_seconds must be greater than 0".to_string());
4186            }
4187            if self.history.postgres.table_prefix.trim().is_empty() {
4188                return Err("history.postgres.table_prefix must not be empty".to_string());
4189            }
4190        }
4191
4192        if self.presence_history.enabled {
4193            if self.presence_history.max_page_size == 0 {
4194                return Err("presence_history.max_page_size must be greater than 0".to_string());
4195            }
4196            if self.presence_history.retention_window_seconds == 0 {
4197                return Err(
4198                    "presence_history.retention_window_seconds must be greater than 0".to_string(),
4199                );
4200            }
4201        }
4202
4203        if self.versioned_messages.enabled && self.versioned_messages.max_page_size == 0 {
4204            return Err("versioned_messages.max_page_size must be greater than 0".to_string());
4205        }
4206        if self.annotations.enabled && !self.versioned_messages.enabled {
4207            return Err("annotations require versioned_messages.enabled".to_string());
4208        }
4209
4210        if self.adapter.nats.presence_sync_chunk_size == Some(0) {
4211            return Err("nats.presence_sync_chunk_size must be > 0 when set".to_string());
4212        }
4213
4214        Ok(())
4215    }
4216
4217    fn validate_unix_socket_security(&self) -> Result<(), String> {
4218        let path = &self.unix_socket.path;
4219
4220        if path.contains("../") || path.contains("..\\") {
4221            return Err(
4222                "Unix socket path contains directory traversal sequences (../). This is not allowed for security reasons.".to_string()
4223            );
4224        }
4225
4226        if self.unix_socket.permission_mode & 0o002 != 0 {
4227            warn!(
4228                "Unix socket permission mode ({:o}) allows world write access. This may be a security risk. Consider using more restrictive permissions like 0o660 or 0o750.",
4229                self.unix_socket.permission_mode
4230            );
4231        }
4232
4233        if self.unix_socket.permission_mode & 0o007 > 0o005 {
4234            warn!(
4235                "Unix socket permission mode ({:o}) grants write permissions to others. Consider using more restrictive permissions.",
4236                self.unix_socket.permission_mode
4237            );
4238        }
4239
4240        if self.mode == "production" && path.starts_with("/tmp/") {
4241            warn!(
4242                "Unix socket path '{}' is in /tmp directory. In production, consider using a more permanent location like /var/run/sockudo/ for better security and persistence.",
4243                path
4244            );
4245        }
4246
4247        if !path.starts_with('/') {
4248            return Err(
4249                "Unix socket path must be absolute (start with /) for security and reliability."
4250                    .to_string(),
4251            );
4252        }
4253
4254        Ok(())
4255    }
4256}
4257
4258#[cfg(test)]
4259mod redis_connection_tests {
4260    use super::{ClusterNode, RedisClusterConnection, RedisConnection, RedisSentinel};
4261
4262    #[test]
4263    fn test_standard_url_basic() {
4264        let conn = RedisConnection {
4265            host: "127.0.0.1".to_string(),
4266            port: 6379,
4267            db: 0,
4268            username: None,
4269            password: None,
4270            key_prefix: "sockudo:".to_string(),
4271            sentinels: Vec::new(),
4272            sentinel_password: None,
4273            name: "mymaster".to_string(),
4274            cluster: RedisClusterConnection::default(),
4275            cluster_nodes: Vec::new(),
4276        };
4277        assert_eq!(conn.to_url(), "redis://127.0.0.1:6379/0");
4278    }
4279
4280    #[test]
4281    fn test_standard_url_with_password() {
4282        let conn = RedisConnection {
4283            host: "127.0.0.1".to_string(),
4284            port: 6379,
4285            db: 2,
4286            username: None,
4287            password: Some("secret".to_string()),
4288            key_prefix: "sockudo:".to_string(),
4289            sentinels: Vec::new(),
4290            sentinel_password: None,
4291            name: "mymaster".to_string(),
4292            cluster: RedisClusterConnection::default(),
4293            cluster_nodes: Vec::new(),
4294        };
4295        assert_eq!(conn.to_url(), "redis://:secret@127.0.0.1:6379/2");
4296    }
4297
4298    #[test]
4299    fn test_standard_url_with_username_and_password() {
4300        let conn = RedisConnection {
4301            host: "redis.example.com".to_string(),
4302            port: 6380,
4303            db: 1,
4304            username: Some("admin".to_string()),
4305            password: Some("pass123".to_string()),
4306            key_prefix: "sockudo:".to_string(),
4307            sentinels: Vec::new(),
4308            sentinel_password: None,
4309            name: "mymaster".to_string(),
4310            cluster: RedisClusterConnection::default(),
4311            cluster_nodes: Vec::new(),
4312        };
4313        assert_eq!(
4314            conn.to_url(),
4315            "redis://admin:pass123@redis.example.com:6380/1"
4316        );
4317    }
4318
4319    #[test]
4320    fn test_standard_url_with_special_chars_in_password() {
4321        let conn = RedisConnection {
4322            host: "127.0.0.1".to_string(),
4323            port: 6379,
4324            db: 0,
4325            username: None,
4326            password: Some("pass@word#123".to_string()),
4327            key_prefix: "sockudo:".to_string(),
4328            sentinels: Vec::new(),
4329            sentinel_password: None,
4330            name: "mymaster".to_string(),
4331            cluster: RedisClusterConnection::default(),
4332            cluster_nodes: Vec::new(),
4333        };
4334        assert_eq!(conn.to_url(), "redis://:pass%40word%23123@127.0.0.1:6379/0");
4335    }
4336
4337    #[test]
4338    fn test_is_sentinel_configured_false() {
4339        let conn = RedisConnection::default();
4340        assert!(!conn.is_sentinel_configured());
4341    }
4342
4343    #[test]
4344    fn test_is_sentinel_configured_true() {
4345        let conn = RedisConnection {
4346            sentinels: vec![RedisSentinel {
4347                host: "sentinel1".to_string(),
4348                port: 26379,
4349            }],
4350            ..Default::default()
4351        };
4352        assert!(conn.is_sentinel_configured());
4353    }
4354
4355    #[test]
4356    fn test_sentinel_url_basic() {
4357        let conn = RedisConnection {
4358            host: "127.0.0.1".to_string(),
4359            port: 6379,
4360            db: 0,
4361            username: None,
4362            password: None,
4363            key_prefix: "sockudo:".to_string(),
4364            sentinels: vec![
4365                RedisSentinel {
4366                    host: "sentinel1".to_string(),
4367                    port: 26379,
4368                },
4369                RedisSentinel {
4370                    host: "sentinel2".to_string(),
4371                    port: 26379,
4372                },
4373            ],
4374            sentinel_password: None,
4375            name: "mymaster".to_string(),
4376            cluster: RedisClusterConnection::default(),
4377            cluster_nodes: Vec::new(),
4378        };
4379        assert_eq!(
4380            conn.to_url(),
4381            "redis+sentinel://sentinel1:26379,sentinel2:26379/mymaster/0"
4382        );
4383    }
4384
4385    #[test]
4386    fn test_sentinel_url_with_sentinel_password() {
4387        let conn = RedisConnection {
4388            host: "127.0.0.1".to_string(),
4389            port: 6379,
4390            db: 0,
4391            username: None,
4392            password: None,
4393            key_prefix: "sockudo:".to_string(),
4394            sentinels: vec![RedisSentinel {
4395                host: "sentinel1".to_string(),
4396                port: 26379,
4397            }],
4398            sentinel_password: Some("sentinelpass".to_string()),
4399            name: "mymaster".to_string(),
4400            cluster: RedisClusterConnection::default(),
4401            cluster_nodes: Vec::new(),
4402        };
4403        assert_eq!(
4404            conn.to_url(),
4405            "redis+sentinel://:sentinelpass@sentinel1:26379/mymaster/0"
4406        );
4407    }
4408
4409    #[test]
4410    fn test_sentinel_url_with_master_password() {
4411        let conn = RedisConnection {
4412            host: "127.0.0.1".to_string(),
4413            port: 6379,
4414            db: 1,
4415            username: None,
4416            password: Some("masterpass".to_string()),
4417            key_prefix: "sockudo:".to_string(),
4418            sentinels: vec![RedisSentinel {
4419                host: "sentinel1".to_string(),
4420                port: 26379,
4421            }],
4422            sentinel_password: None,
4423            name: "mymaster".to_string(),
4424            cluster: RedisClusterConnection::default(),
4425            cluster_nodes: Vec::new(),
4426        };
4427        assert_eq!(
4428            conn.to_url(),
4429            "redis+sentinel://sentinel1:26379/mymaster/1?password=masterpass"
4430        );
4431    }
4432
4433    #[test]
4434    fn test_sentinel_url_with_all_auth() {
4435        let conn = RedisConnection {
4436            host: "127.0.0.1".to_string(),
4437            port: 6379,
4438            db: 2,
4439            username: Some("redisuser".to_string()),
4440            password: Some("redispass".to_string()),
4441            key_prefix: "sockudo:".to_string(),
4442            sentinels: vec![
4443                RedisSentinel {
4444                    host: "sentinel1".to_string(),
4445                    port: 26379,
4446                },
4447                RedisSentinel {
4448                    host: "sentinel2".to_string(),
4449                    port: 26380,
4450                },
4451            ],
4452            sentinel_password: Some("sentinelauth".to_string()),
4453            name: "production-master".to_string(),
4454            cluster: RedisClusterConnection::default(),
4455            cluster_nodes: Vec::new(),
4456        };
4457        assert_eq!(
4458            conn.to_url(),
4459            "redis+sentinel://:sentinelauth@sentinel1:26379,sentinel2:26380/production-master/2?password=redispass&username=redisuser"
4460        );
4461    }
4462
4463    #[test]
4464    fn test_sentinel_to_host_port() {
4465        let sentinel = RedisSentinel {
4466            host: "sentinel.example.com".to_string(),
4467            port: 26379,
4468        };
4469        assert_eq!(sentinel.to_host_port(), "sentinel.example.com:26379");
4470    }
4471
4472    #[test]
4473    fn test_cluster_node_urls_with_shared_cluster_auth_and_tls() {
4474        let conn = RedisConnection {
4475            cluster: RedisClusterConnection {
4476                nodes: vec![
4477                    ClusterNode {
4478                        host: "node1.secure-cluster.com".to_string(),
4479                        port: 7000,
4480                    },
4481                    ClusterNode {
4482                        host: "redis://node2.secure-cluster.com:7001".to_string(),
4483                        port: 7001,
4484                    },
4485                    ClusterNode {
4486                        host: "rediss://node3.secure-cluster.com".to_string(),
4487                        port: 7002,
4488                    },
4489                ],
4490                username: None,
4491                password: Some("cluster-secret".to_string()),
4492                use_tls: true,
4493            },
4494            ..Default::default()
4495        };
4496
4497        assert_eq!(
4498            conn.cluster_node_urls(),
4499            vec![
4500                "rediss://:cluster-secret@node1.secure-cluster.com:7000",
4501                "redis://:cluster-secret@node2.secure-cluster.com:7001",
4502                "rediss://:cluster-secret@node3.secure-cluster.com:7002",
4503            ]
4504        );
4505    }
4506
4507    #[test]
4508    fn test_cluster_node_urls_fallback_to_legacy_nodes() {
4509        let conn = RedisConnection {
4510            password: Some("fallback-secret".to_string()),
4511            cluster_nodes: vec![ClusterNode {
4512                host: "legacy-node.example.com".to_string(),
4513                port: 7000,
4514            }],
4515            ..Default::default()
4516        };
4517
4518        assert_eq!(
4519            conn.cluster_node_urls(),
4520            vec!["redis://:fallback-secret@legacy-node.example.com:7000"]
4521        );
4522    }
4523
4524    #[test]
4525    fn test_normalize_cluster_seed_urls() {
4526        let conn = RedisConnection {
4527            cluster: RedisClusterConnection {
4528                nodes: Vec::new(),
4529                username: Some("svc-user".to_string()),
4530                password: Some("svc-pass".to_string()),
4531                use_tls: true,
4532            },
4533            ..Default::default()
4534        };
4535
4536        let seeds = vec![
4537            "node1.example.com:7000".to_string(),
4538            "redis://node2.example.com:7001".to_string(),
4539            "rediss://node3.example.com".to_string(),
4540        ];
4541
4542        assert_eq!(
4543            conn.normalize_cluster_seed_urls(&seeds),
4544            vec![
4545                "rediss://svc-user:svc-pass@node1.example.com:7000",
4546                "redis://svc-user:svc-pass@node2.example.com:7001",
4547                "rediss://svc-user:svc-pass@node3.example.com:6379",
4548            ]
4549        );
4550    }
4551}
4552
4553#[cfg(test)]
4554mod cluster_node_tests {
4555    use super::ClusterNode;
4556
4557    #[test]
4558    fn test_to_url_basic_host() {
4559        let node = ClusterNode {
4560            host: "localhost".to_string(),
4561            port: 6379,
4562        };
4563        assert_eq!(node.to_url(), "redis://localhost:6379");
4564    }
4565
4566    #[test]
4567    fn test_to_url_ip_address() {
4568        let node = ClusterNode {
4569            host: "127.0.0.1".to_string(),
4570            port: 6379,
4571        };
4572        assert_eq!(node.to_url(), "redis://127.0.0.1:6379");
4573    }
4574
4575    #[test]
4576    fn test_to_url_with_redis_protocol() {
4577        let node = ClusterNode {
4578            host: "redis://example.com".to_string(),
4579            port: 6379,
4580        };
4581        assert_eq!(node.to_url(), "redis://example.com:6379");
4582    }
4583
4584    #[test]
4585    fn test_to_url_with_rediss_protocol() {
4586        let node = ClusterNode {
4587            host: "rediss://secure.example.com".to_string(),
4588            port: 6379,
4589        };
4590        assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
4591    }
4592
4593    #[test]
4594    fn test_to_url_with_rediss_protocol_and_port_in_url() {
4595        let node = ClusterNode {
4596            host: "rediss://secure.example.com:7000".to_string(),
4597            port: 6379,
4598        };
4599        assert_eq!(node.to_url(), "rediss://secure.example.com:7000");
4600    }
4601
4602    #[test]
4603    fn test_to_url_with_redis_protocol_and_port_in_url() {
4604        let node = ClusterNode {
4605            host: "redis://example.com:7001".to_string(),
4606            port: 6379,
4607        };
4608        assert_eq!(node.to_url(), "redis://example.com:7001");
4609    }
4610
4611    #[test]
4612    fn test_to_url_with_trailing_whitespace() {
4613        let node = ClusterNode {
4614            host: "  rediss://secure.example.com  ".to_string(),
4615            port: 6379,
4616        };
4617        assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
4618    }
4619
4620    #[test]
4621    fn test_to_url_custom_port() {
4622        let node = ClusterNode {
4623            host: "redis-cluster.example.com".to_string(),
4624            port: 7000,
4625        };
4626        assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7000");
4627    }
4628
4629    #[test]
4630    fn test_to_url_plain_host_with_port_in_host_field() {
4631        let node = ClusterNode {
4632            host: "redis-cluster.example.com:7010".to_string(),
4633            port: 7000,
4634        };
4635        assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7010");
4636    }
4637
4638    #[test]
4639    fn test_to_url_with_options_adds_auth_and_tls() {
4640        let node = ClusterNode {
4641            host: "node.example.com".to_string(),
4642            port: 7000,
4643        };
4644        assert_eq!(
4645            node.to_url_with_options(true, Some("svc-user"), Some("secret")),
4646            "rediss://svc-user:secret@node.example.com:7000"
4647        );
4648    }
4649
4650    #[test]
4651    fn test_to_url_with_options_keeps_embedded_auth() {
4652        let node = ClusterNode {
4653            host: "rediss://:node-secret@node.example.com:7000".to_string(),
4654            port: 7000,
4655        };
4656        assert_eq!(
4657            node.to_url_with_options(true, Some("global-user"), Some("global-secret")),
4658            "rediss://:node-secret@node.example.com:7000"
4659        );
4660    }
4661
4662    #[test]
4663    fn test_from_seed_parses_plain_host_port() {
4664        let node = ClusterNode::from_seed("cluster-node-1:7005").expect("node should parse");
4665        assert_eq!(node.host, "cluster-node-1");
4666        assert_eq!(node.port, 7005);
4667    }
4668
4669    #[test]
4670    fn test_from_seed_keeps_scheme_urls() {
4671        let node =
4672            ClusterNode::from_seed("rediss://secure.example.com:7005").expect("node should parse");
4673        assert_eq!(node.host, "rediss://secure.example.com:7005");
4674        assert_eq!(node.port, 7005);
4675    }
4676
4677    #[test]
4678    fn test_to_url_aws_elasticache_hostname() {
4679        let node = ClusterNode {
4680            host: "rediss://my-cluster.use1.cache.amazonaws.com".to_string(),
4681            port: 6379,
4682        };
4683        assert_eq!(
4684            node.to_url(),
4685            "rediss://my-cluster.use1.cache.amazonaws.com:6379"
4686        );
4687    }
4688
4689    #[test]
4690    fn test_to_url_with_ipv6_no_port() {
4691        let node = ClusterNode {
4692            host: "rediss://[::1]".to_string(),
4693            port: 6379,
4694        };
4695        assert_eq!(node.to_url(), "rediss://[::1]:6379");
4696    }
4697
4698    #[test]
4699    fn test_to_url_with_ipv6_and_port_in_url() {
4700        let node = ClusterNode {
4701            host: "rediss://[::1]:7000".to_string(),
4702            port: 6379,
4703        };
4704        assert_eq!(node.to_url(), "rediss://[::1]:7000");
4705    }
4706
4707    #[test]
4708    fn test_to_url_with_ipv6_full_address_no_port() {
4709        let node = ClusterNode {
4710            host: "rediss://[2001:db8::1]".to_string(),
4711            port: 6379,
4712        };
4713        assert_eq!(node.to_url(), "rediss://[2001:db8::1]:6379");
4714    }
4715
4716    #[test]
4717    fn test_to_url_with_ipv6_full_address_with_port() {
4718        let node = ClusterNode {
4719            host: "rediss://[2001:db8::1]:7000".to_string(),
4720            port: 6379,
4721        };
4722        assert_eq!(node.to_url(), "rediss://[2001:db8::1]:7000");
4723    }
4724
4725    #[test]
4726    fn test_to_url_with_redis_protocol_ipv6() {
4727        let node = ClusterNode {
4728            host: "redis://[::1]".to_string(),
4729            port: 6379,
4730        };
4731        assert_eq!(node.to_url(), "redis://[::1]:6379");
4732    }
4733}
4734
4735#[cfg(test)]
4736mod cors_config_tests {
4737    use super::CorsConfig;
4738
4739    fn cors_from_json(json: &str) -> sonic_rs::Result<CorsConfig> {
4740        sonic_rs::from_str(json)
4741    }
4742
4743    #[test]
4744    fn test_deserialize_valid_exact_origins() {
4745        let config =
4746            cors_from_json(r#"{"origin": ["https://example.com", "https://other.com"]}"#).unwrap();
4747        assert_eq!(config.origin.len(), 2);
4748    }
4749
4750    #[test]
4751    fn test_deserialize_valid_wildcard_patterns() {
4752        let config =
4753            cors_from_json(r#"{"origin": ["*.example.com", "https://*.staging.example.com"]}"#)
4754                .unwrap();
4755        assert_eq!(config.origin.len(), 2);
4756    }
4757
4758    #[test]
4759    fn test_deserialize_allows_special_markers() {
4760        assert!(cors_from_json(r#"{"origin": ["*"]}"#).is_ok());
4761        assert!(cors_from_json(r#"{"origin": ["Any"]}"#).is_ok());
4762        assert!(cors_from_json(r#"{"origin": ["any"]}"#).is_ok());
4763        assert!(cors_from_json(r#"{"origin": ["*", "https://example.com"]}"#).is_ok());
4764    }
4765
4766    #[test]
4767    fn test_deserialize_rejects_invalid_patterns() {
4768        assert!(cors_from_json(r#"{"origin": ["*.*bad"]}"#).is_err());
4769        assert!(cors_from_json(r#"{"origin": ["*."]}"#).is_err());
4770        assert!(cors_from_json(r#"{"origin": [""]}"#).is_err());
4771        assert!(cors_from_json(r#"{"origin": ["https://"]}"#).is_err());
4772    }
4773
4774    #[test]
4775    fn test_deserialize_rejects_mixed_valid_and_invalid() {
4776        assert!(cors_from_json(r#"{"origin": ["https://good.com", "*.*bad"]}"#).is_err());
4777    }
4778}