1use crate::app::App;
2use crate::utils::{parse_bool_env, parse_env, parse_env_optional};
3use ahash::AHashMap;
4use serde::{Deserialize, Serialize};
5use std::str::FromStr;
6use tracing::{info, warn};
7use url::Url;
8
9fn deserialize_octal_permission<'de, D>(deserializer: D) -> Result<u32, D::Error>
11where
12 D: serde::Deserializer<'de>,
13{
14 use serde::de::{self};
15
16 let s = String::deserialize(deserializer)?;
17
18 if !s.chars().all(|c| c.is_digit(8)) {
20 return Err(de::Error::custom(format!(
21 "invalid octal permission mode '{}': must contain only digits 0-7",
22 s
23 )));
24 }
25
26 let mode = u32::from_str_radix(&s, 8)
28 .map_err(|_| de::Error::custom(format!("invalid octal permission mode: {}", s)))?;
29
30 if mode > 0o777 {
32 return Err(de::Error::custom(format!(
33 "permission mode '{}' exceeds maximum value 777",
34 s
35 )));
36 }
37
38 Ok(mode)
39}
40
41fn parse_driver_enum<T: FromStr + Clone + std::fmt::Debug>(
43 driver_str: String,
44 default_driver: T,
45 driver_name: &str,
46) -> T
47where
48 <T as FromStr>::Err: std::fmt::Debug,
49{
50 match T::from_str(&driver_str.to_lowercase()) {
51 Ok(driver_enum) => driver_enum,
52 Err(e) => {
53 warn!(
54 "Failed to parse {} driver from string '{}': {:?}. Using default: {:?}.",
55 driver_name, driver_str, e, default_driver
56 );
57 default_driver
58 }
59 }
60}
61
62fn override_db_pool_settings(db_conn: &mut DatabaseConnection, prefix: &str) {
63 if let Some(min) = parse_env_optional::<u32>(&format!("{}_POOL_MIN", prefix)) {
64 db_conn.pool_min = Some(min);
65 }
66 if let Some(max) = parse_env_optional::<u32>(&format!("{}_POOL_MAX", prefix)) {
67 db_conn.pool_max = Some(max);
68 }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(rename_all = "lowercase")]
75pub enum AdapterDriver {
76 #[default]
77 Local,
78 Redis,
79 #[serde(rename = "redis-cluster")]
80 RedisCluster,
81 Nats,
82 Pulsar,
83 RabbitMq,
84 #[serde(rename = "google-pubsub")]
85 GooglePubSub,
86 Kafka,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
90#[serde(default)]
91pub struct DynamoDbSettings {
92 pub region: String,
93 pub table_name: String,
94 pub endpoint_url: Option<String>,
95 pub aws_access_key_id: Option<String>,
96 pub aws_secret_access_key: Option<String>,
97 pub aws_profile_name: Option<String>,
98}
99
100impl Default for DynamoDbSettings {
101 fn default() -> Self {
102 Self {
103 region: "us-east-1".to_string(),
104 table_name: "sockudo-applications".to_string(),
105 endpoint_url: None,
106 aws_access_key_id: None,
107 aws_secret_access_key: None,
108 aws_profile_name: None,
109 }
110 }
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
114#[serde(default)]
115pub struct ScyllaDbSettings {
116 pub nodes: Vec<String>,
117 pub keyspace: String,
118 pub table_name: String,
119 pub username: Option<String>,
120 pub password: Option<String>,
121 pub replication_class: String,
122 pub replication_factor: u32,
123}
124
125impl Default for ScyllaDbSettings {
126 fn default() -> Self {
127 Self {
128 nodes: vec!["127.0.0.1:9042".to_string()],
129 keyspace: "sockudo".to_string(),
130 table_name: "applications".to_string(),
131 username: None,
132 password: None,
133 replication_class: "SimpleStrategy".to_string(),
134 replication_factor: 3,
135 }
136 }
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
140#[serde(default)]
141pub struct SurrealDbSettings {
142 pub url: String,
143 pub namespace: String,
144 pub database: String,
145 pub username: String,
146 pub password: String,
147 pub table_name: String,
148 pub cache_ttl: u64,
149 pub cache_max_capacity: u64,
150}
151
152impl Default for SurrealDbSettings {
153 fn default() -> Self {
154 Self {
155 url: "ws://127.0.0.1:8000".to_string(),
156 namespace: "sockudo".to_string(),
157 database: "sockudo".to_string(),
158 username: "root".to_string(),
159 password: "root".to_string(),
160 table_name: "applications".to_string(),
161 cache_ttl: 300,
162 cache_max_capacity: 100,
163 }
164 }
165}
166
167impl FromStr for AdapterDriver {
168 type Err = String;
169 fn from_str(s: &str) -> Result<Self, Self::Err> {
170 match s.to_lowercase().as_str() {
171 "local" => Ok(AdapterDriver::Local),
172 "redis" => Ok(AdapterDriver::Redis),
173 "redis-cluster" => Ok(AdapterDriver::RedisCluster),
174 "nats" => Ok(AdapterDriver::Nats),
175 "pulsar" => Ok(AdapterDriver::Pulsar),
176 "rabbitmq" | "rabbit-mq" => Ok(AdapterDriver::RabbitMq),
177 "google-pubsub" | "gcp-pubsub" | "pubsub" => Ok(AdapterDriver::GooglePubSub),
178 "kafka" => Ok(AdapterDriver::Kafka),
179 _ => Err(format!("Unknown adapter driver: {s}")),
180 }
181 }
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
185#[serde(rename_all = "lowercase")]
186pub enum AppManagerDriver {
187 #[default]
188 Memory,
189 Mysql,
190 Dynamodb,
191 PgSql,
192 SurrealDb,
193 ScyllaDb,
194}
195impl FromStr for AppManagerDriver {
196 type Err = String;
197 fn from_str(s: &str) -> Result<Self, Self::Err> {
198 match s.to_lowercase().as_str() {
199 "memory" => Ok(AppManagerDriver::Memory),
200 "mysql" => Ok(AppManagerDriver::Mysql),
201 "dynamodb" => Ok(AppManagerDriver::Dynamodb),
202 "pgsql" | "postgres" | "postgresql" => Ok(AppManagerDriver::PgSql),
203 "surreal" | "surrealdb" => Ok(AppManagerDriver::SurrealDb),
204 "scylladb" | "scylla" => Ok(AppManagerDriver::ScyllaDb),
205 _ => Err(format!("Unknown app manager driver: {s}")),
206 }
207 }
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
211#[serde(rename_all = "lowercase")]
212pub enum CacheDriver {
213 #[default]
214 Memory,
215 Redis,
216 #[serde(rename = "redis-cluster")]
217 RedisCluster,
218 None,
219}
220
221impl FromStr for CacheDriver {
222 type Err = String;
223 fn from_str(s: &str) -> Result<Self, Self::Err> {
224 match s.to_lowercase().as_str() {
225 "memory" => Ok(CacheDriver::Memory),
226 "redis" => Ok(CacheDriver::Redis),
227 "redis-cluster" => Ok(CacheDriver::RedisCluster),
228 "none" => Ok(CacheDriver::None),
229 _ => Err(format!("Unknown cache driver: {s}")),
230 }
231 }
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
235#[serde(rename_all = "lowercase")]
236pub enum QueueDriver {
237 Memory,
238 #[default]
239 Redis,
240 #[serde(rename = "redis-cluster")]
241 RedisCluster,
242 Nats,
243 Pulsar,
244 RabbitMq,
245 #[serde(rename = "google-pubsub")]
246 GooglePubSub,
247 Kafka,
248 Sqs,
249 Sns,
250 None,
251}
252
253impl FromStr for QueueDriver {
254 type Err = String;
255 fn from_str(s: &str) -> Result<Self, Self::Err> {
256 match s.to_lowercase().as_str() {
257 "memory" => Ok(QueueDriver::Memory),
258 "redis" => Ok(QueueDriver::Redis),
259 "redis-cluster" => Ok(QueueDriver::RedisCluster),
260 "nats" => Ok(QueueDriver::Nats),
261 "pulsar" => Ok(QueueDriver::Pulsar),
262 "rabbitmq" | "rabbit-mq" => Ok(QueueDriver::RabbitMq),
263 "google-pubsub" | "gcp-pubsub" | "pubsub" => Ok(QueueDriver::GooglePubSub),
264 "kafka" => Ok(QueueDriver::Kafka),
265 "sqs" => Ok(QueueDriver::Sqs),
266 "sns" => Ok(QueueDriver::Sns),
267 "none" => Ok(QueueDriver::None),
268 _ => Err(format!("Unknown queue driver: {s}")),
269 }
270 }
271}
272
273#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
274#[serde(rename_all = "snake_case")]
275pub enum DeltaCoordinationBackend {
276 #[default]
277 Auto,
278 None,
279 Redis,
280 RedisCluster,
281 Nats,
282}
283
284impl FromStr for DeltaCoordinationBackend {
285 type Err = String;
286
287 fn from_str(s: &str) -> Result<Self, Self::Err> {
288 match s.trim().to_ascii_lowercase().as_str() {
289 "auto" => Ok(Self::Auto),
290 "none" => Ok(Self::None),
291 "redis" => Ok(Self::Redis),
292 "redis-cluster" | "redis_cluster" => Ok(Self::RedisCluster),
293 "nats" => Ok(Self::Nats),
294 _ => Err(format!("Unknown delta coordination backend: {s}")),
295 }
296 }
297}
298
299impl AsRef<str> for QueueDriver {
300 fn as_ref(&self) -> &str {
301 match self {
302 QueueDriver::Memory => "memory",
303 QueueDriver::Redis => "redis",
304 QueueDriver::RedisCluster => "redis-cluster",
305 QueueDriver::Nats => "nats",
306 QueueDriver::Pulsar => "pulsar",
307 QueueDriver::RabbitMq => "rabbitmq",
308 QueueDriver::GooglePubSub => "google-pubsub",
309 QueueDriver::Kafka => "kafka",
310 QueueDriver::Sqs => "sqs",
311 QueueDriver::Sns => "sns",
312 QueueDriver::None => "none",
313 }
314 }
315}
316
317#[derive(Debug, Clone, Serialize, Deserialize)]
318#[serde(default)]
319pub struct RedisClusterQueueConfig {
320 pub concurrency: u32,
321 pub prefix: Option<String>,
322 pub nodes: Vec<String>,
323 pub request_timeout_ms: u64,
324}
325
326impl Default for RedisClusterQueueConfig {
327 fn default() -> Self {
328 Self {
329 concurrency: 5,
330 prefix: Some("sockudo_queue:".to_string()),
331 nodes: vec!["redis://127.0.0.1:6379".to_string()],
332 request_timeout_ms: 5000,
333 }
334 }
335}
336
337#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
338#[serde(rename_all = "lowercase")]
339pub enum MetricsDriver {
340 #[default]
341 Prometheus,
342}
343
344#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
345#[serde(rename_all = "lowercase")]
346pub enum LogOutputFormat {
347 #[default]
348 Human,
349 Json,
350}
351
352impl FromStr for LogOutputFormat {
353 type Err = String;
354 fn from_str(s: &str) -> Result<Self, Self::Err> {
355 match s.to_lowercase().as_str() {
356 "human" => Ok(LogOutputFormat::Human),
357 "json" => Ok(LogOutputFormat::Json),
358 _ => Err(format!("Unknown log output format: {s}")),
359 }
360 }
361}
362
363impl FromStr for MetricsDriver {
364 type Err = String;
365 fn from_str(s: &str) -> Result<Self, Self::Err> {
366 match s.to_lowercase().as_str() {
367 "prometheus" => Ok(MetricsDriver::Prometheus),
368 _ => Err(format!("Unknown metrics driver: {s}")),
369 }
370 }
371}
372
373impl AsRef<str> for MetricsDriver {
374 fn as_ref(&self) -> &str {
375 match self {
376 MetricsDriver::Prometheus => "prometheus",
377 }
378 }
379}
380
381#[derive(Debug, Clone, Serialize, Deserialize)]
383#[serde(default)]
384pub struct ServerOptions {
385 pub adapter: AdapterConfig,
386 pub app_manager: AppManagerConfig,
387 pub cache: CacheConfig,
388 pub channel_limits: ChannelLimits,
389 pub cors: CorsConfig,
390 pub database: DatabaseConfig,
391 pub database_pooling: DatabasePooling,
392 pub debug: bool,
393 pub event_limits: EventLimits,
394 pub host: String,
395 pub http_api: HttpApiConfig,
396 pub instance: InstanceConfig,
397 pub logging: Option<LoggingConfig>,
398 pub metrics: MetricsConfig,
399 pub mode: String,
400 pub port: u16,
401 pub path_prefix: String,
402 pub presence: PresenceConfig,
403 pub queue: QueueConfig,
404 pub rate_limiter: RateLimiterConfig,
405 pub shutdown_grace_period: u64,
406 pub ssl: SslConfig,
407 pub user_authentication_timeout: u64,
408 pub webhooks: WebhooksConfig,
409 pub websocket_max_payload_kb: u32,
410 pub cleanup: CleanupConfig,
411 pub activity_timeout: u64,
412 pub cluster_health: ClusterHealthConfig,
413 pub unix_socket: UnixSocketConfig,
414 pub delta_compression: DeltaCompressionOptionsConfig,
415 pub tag_filtering: TagFilteringConfig,
416 pub websocket: WebSocketConfig,
417 pub connection_recovery: ConnectionRecoveryConfig,
418 pub history: HistoryConfig,
419 pub presence_history: PresenceHistoryConfig,
420 pub idempotency: IdempotencyConfig,
421 pub ephemeral: EphemeralConfig,
422 pub echo_control: EchoControlConfig,
423 pub event_name_filtering: EventNameFilteringConfig,
424}
425
426#[derive(Debug, Clone, Serialize, Deserialize)]
429#[serde(default)]
430pub struct SqsQueueConfig {
431 pub region: String,
432 pub queue_url_prefix: Option<String>,
433 pub visibility_timeout: i32,
434 pub endpoint_url: Option<String>,
435 pub max_messages: i32,
436 pub wait_time_seconds: i32,
437 pub concurrency: u32,
438 pub fifo: bool,
439 pub message_group_id: Option<String>,
440}
441
442#[derive(Debug, Clone, Serialize, Deserialize)]
443#[serde(default)]
444pub struct SnsQueueConfig {
445 pub region: String,
446 pub topic_arn: String,
447 pub endpoint_url: Option<String>,
448}
449
450#[derive(Debug, Clone, Serialize, Deserialize)]
451#[serde(default)]
452pub struct AdapterConfig {
453 pub driver: AdapterDriver,
454 pub redis: RedisAdapterConfig,
455 pub cluster: RedisClusterAdapterConfig,
456 pub nats: NatsAdapterConfig,
457 pub pulsar: PulsarAdapterConfig,
458 pub rabbitmq: RabbitMqAdapterConfig,
459 pub google_pubsub: GooglePubSubAdapterConfig,
460 pub kafka: KafkaAdapterConfig,
461 #[serde(default = "default_buffer_multiplier_per_cpu")]
462 pub buffer_multiplier_per_cpu: usize,
463 pub cluster_health: ClusterHealthConfig,
464 #[serde(default = "default_enable_socket_counting")]
465 pub enable_socket_counting: bool,
466 #[serde(default = "default_fallback_to_local")]
467 pub fallback_to_local: bool,
468}
469
470fn default_enable_socket_counting() -> bool {
471 true
472}
473
474fn default_fallback_to_local() -> bool {
475 true
476}
477
478fn default_buffer_multiplier_per_cpu() -> usize {
479 64
480}
481
482impl Default for AdapterConfig {
483 fn default() -> Self {
484 Self {
485 driver: AdapterDriver::default(),
486 redis: RedisAdapterConfig::default(),
487 cluster: RedisClusterAdapterConfig::default(),
488 nats: NatsAdapterConfig::default(),
489 pulsar: PulsarAdapterConfig::default(),
490 rabbitmq: RabbitMqAdapterConfig::default(),
491 google_pubsub: GooglePubSubAdapterConfig::default(),
492 kafka: KafkaAdapterConfig::default(),
493 buffer_multiplier_per_cpu: default_buffer_multiplier_per_cpu(),
494 cluster_health: ClusterHealthConfig::default(),
495 enable_socket_counting: default_enable_socket_counting(),
496 fallback_to_local: default_fallback_to_local(),
497 }
498 }
499}
500
501#[derive(Debug, Clone, Serialize, Deserialize)]
502#[serde(default)]
503pub struct RedisAdapterConfig {
504 pub requests_timeout: u64,
505 pub prefix: String,
506 pub redis_pub_options: AHashMap<String, sonic_rs::Value>,
507 pub redis_sub_options: AHashMap<String, sonic_rs::Value>,
508 pub cluster_mode: bool,
509}
510
511#[derive(Debug, Clone, Serialize, Deserialize)]
512#[serde(default)]
513pub struct RedisClusterAdapterConfig {
514 pub nodes: Vec<String>,
515 pub prefix: String,
516 pub request_timeout_ms: u64,
517 pub use_connection_manager: bool,
518 #[serde(default)]
519 pub use_sharded_pubsub: bool,
520}
521
522#[derive(Debug, Clone, Serialize, Deserialize)]
523#[serde(default)]
524pub struct NatsAdapterConfig {
525 pub servers: Vec<String>,
526 pub prefix: String,
527 pub request_timeout_ms: u64,
528 pub username: Option<String>,
529 pub password: Option<String>,
530 pub token: Option<String>,
531 pub connection_timeout_ms: u64,
532 pub nodes_number: Option<u32>,
533 pub discovery_max_wait_ms: u64,
534 pub discovery_idle_wait_ms: u64,
535}
536
537#[derive(Debug, Clone, Serialize, Deserialize)]
538#[serde(default)]
539pub struct PulsarAdapterConfig {
540 pub url: String,
541 pub prefix: String,
542 pub request_timeout_ms: u64,
543 pub token: Option<String>,
544 pub nodes_number: Option<u32>,
545}
546
547#[derive(Debug, Clone, Serialize, Deserialize)]
548#[serde(default)]
549pub struct RabbitMqAdapterConfig {
550 pub url: String,
551 pub prefix: String,
552 pub request_timeout_ms: u64,
553 pub connection_timeout_ms: u64,
554 pub nodes_number: Option<u32>,
555}
556
557#[derive(Debug, Clone, Serialize, Deserialize)]
558#[serde(default)]
559pub struct GooglePubSubAdapterConfig {
560 pub project_id: String,
561 pub prefix: String,
562 pub request_timeout_ms: u64,
563 pub emulator_host: Option<String>,
564 pub nodes_number: Option<u32>,
565}
566
567#[derive(Debug, Clone, Serialize, Deserialize)]
568#[serde(default)]
569pub struct KafkaAdapterConfig {
570 pub brokers: Vec<String>,
571 pub prefix: String,
572 pub request_timeout_ms: u64,
573 pub security_protocol: Option<String>,
574 pub sasl_mechanism: Option<String>,
575 pub sasl_username: Option<String>,
576 pub sasl_password: Option<String>,
577 pub nodes_number: Option<u32>,
578}
579
580#[derive(Debug, Clone, Serialize, Deserialize, Default)]
581#[serde(default)]
582pub struct AppManagerConfig {
583 pub driver: AppManagerDriver,
584 pub array: ArrayConfig,
585 pub cache: CacheSettings,
586 pub scylladb: ScyllaDbSettings,
587}
588
589#[derive(Debug, Clone, Serialize, Deserialize, Default)]
590#[serde(default)]
591pub struct ArrayConfig {
592 pub apps: Vec<App>,
593}
594
595#[derive(Debug, Clone, Serialize, Deserialize)]
596#[serde(default)]
597pub struct CacheSettings {
598 pub enabled: bool,
599 pub ttl: u64,
600}
601
602#[derive(Debug, Clone, Serialize, Deserialize)]
603#[serde(default)]
604pub struct MemoryCacheOptions {
605 pub ttl: u64,
606 pub cleanup_interval: u64,
607 pub max_capacity: u64,
608}
609
610#[derive(Debug, Clone, Serialize, Deserialize)]
611#[serde(default)]
612pub struct CacheConfig {
613 pub driver: CacheDriver,
614 pub redis: RedisConfig,
615 pub memory: MemoryCacheOptions,
616}
617
618#[derive(Debug, Clone, Serialize, Deserialize, Default)]
619#[serde(default)]
620pub struct RedisConfig {
621 pub prefix: Option<String>,
622 pub url_override: Option<String>,
623 pub cluster_mode: bool,
624}
625
626#[derive(Debug, Clone, Serialize, Deserialize)]
627#[serde(default)]
628pub struct ChannelLimits {
629 pub max_name_length: u32,
630 pub cache_ttl: u64,
631}
632
633#[derive(Debug, Clone, Serialize, Deserialize)]
634#[serde(default)]
635pub struct CorsConfig {
636 pub credentials: bool,
637 #[serde(deserialize_with = "deserialize_and_validate_cors_origins")]
638 pub origin: Vec<String>,
639 pub methods: Vec<String>,
640 pub allowed_headers: Vec<String>,
641}
642
643fn deserialize_and_validate_cors_origins<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
644where
645 D: serde::Deserializer<'de>,
646{
647 use serde::de::Error;
648 let origins = Vec::<String>::deserialize(deserializer)?;
649
650 if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&origins) {
651 return Err(D::Error::custom(format!(
652 "CORS origin pattern validation failed: {}",
653 e
654 )));
655 }
656
657 Ok(origins)
658}
659
660#[derive(Debug, Clone, Serialize, Deserialize, Default)]
661#[serde(default)]
662pub struct DatabaseConfig {
663 pub mysql: DatabaseConnection,
664 pub postgres: DatabaseConnection,
665 pub redis: RedisConnection,
666 pub dynamodb: DynamoDbSettings,
667 pub surrealdb: SurrealDbSettings,
668 pub scylladb: ScyllaDbSettings,
669}
670
671#[derive(Debug, Clone, Serialize, Deserialize)]
672#[serde(default)]
673pub struct DatabaseConnection {
674 pub host: String,
675 pub port: u16,
676 pub username: String,
677 pub password: String,
678 pub database: String,
679 pub table_name: String,
680 pub connection_pool_size: u32,
681 pub pool_min: Option<u32>,
682 pub pool_max: Option<u32>,
683 pub cache_ttl: u64,
684 pub cache_cleanup_interval: u64,
685 pub cache_max_capacity: u64,
686}
687
688#[derive(Debug, Clone, Serialize, Deserialize)]
689#[serde(default)]
690pub struct RedisConnection {
691 pub host: String,
692 pub port: u16,
693 pub db: u32,
694 pub username: Option<String>,
695 pub password: Option<String>,
696 pub key_prefix: String,
697 pub sentinels: Vec<RedisSentinel>,
698 pub sentinel_password: Option<String>,
699 pub name: String,
700 pub cluster: RedisClusterConnection,
701 pub cluster_nodes: Vec<ClusterNode>,
703}
704
705#[derive(Debug, Clone, Serialize, Deserialize)]
706#[serde(default)]
707pub struct RedisSentinel {
708 pub host: String,
709 pub port: u16,
710}
711
712#[derive(Debug, Clone, Serialize, Deserialize, Default)]
713#[serde(default)]
714pub struct RedisClusterConnection {
715 pub nodes: Vec<ClusterNode>,
716 pub username: Option<String>,
717 pub password: Option<String>,
718 #[serde(alias = "useTLS")]
719 pub use_tls: bool,
720}
721
722impl RedisConnection {
723 pub fn is_sentinel_configured(&self) -> bool {
725 !self.sentinels.is_empty()
726 }
727
728 pub fn to_url(&self) -> String {
730 if self.is_sentinel_configured() {
731 self.build_sentinel_url()
732 } else {
733 self.build_standard_url()
734 }
735 }
736
737 fn build_standard_url(&self) -> String {
738 let (scheme, host) = if self.host.starts_with("rediss://") {
740 ("rediss://", self.host.trim_start_matches("rediss://"))
741 } else if self.host.starts_with("redis://") {
742 ("redis://", self.host.trim_start_matches("redis://"))
743 } else {
744 ("redis://", self.host.as_str())
745 };
746
747 let mut url = String::from(scheme);
748
749 if let Some(ref username) = self.username {
750 url.push_str(username);
751 if let Some(ref password) = self.password {
752 url.push(':');
753 url.push_str(&urlencoding::encode(password));
754 }
755 url.push('@');
756 } else if let Some(ref password) = self.password {
757 url.push(':');
758 url.push_str(&urlencoding::encode(password));
759 url.push('@');
760 }
761
762 url.push_str(host);
763 url.push(':');
764 url.push_str(&self.port.to_string());
765 url.push('/');
766 url.push_str(&self.db.to_string());
767
768 url
769 }
770
771 fn build_sentinel_url(&self) -> String {
772 let mut url = String::from("redis+sentinel://");
773
774 if let Some(ref sentinel_password) = self.sentinel_password {
775 url.push(':');
776 url.push_str(&urlencoding::encode(sentinel_password));
777 url.push('@');
778 }
779
780 let sentinel_hosts: Vec<String> = self
781 .sentinels
782 .iter()
783 .map(|s| format!("{}:{}", s.host, s.port))
784 .collect();
785 url.push_str(&sentinel_hosts.join(","));
786
787 url.push('/');
788 url.push_str(&self.name);
789 url.push('/');
790 url.push_str(&self.db.to_string());
791
792 let mut params = Vec::new();
793 if let Some(ref password) = self.password {
794 params.push(format!("password={}", urlencoding::encode(password)));
795 }
796 if let Some(ref username) = self.username {
797 params.push(format!("username={}", urlencoding::encode(username)));
798 }
799
800 if !params.is_empty() {
801 url.push('?');
802 url.push_str(¶ms.join("&"));
803 }
804
805 url
806 }
807
808 pub fn has_cluster_nodes(&self) -> bool {
811 !self.cluster.nodes.is_empty() || !self.cluster_nodes.is_empty()
812 }
813
814 pub fn cluster_node_urls(&self) -> Vec<String> {
817 if !self.cluster.nodes.is_empty() {
818 return self.build_cluster_urls(&self.cluster.nodes);
819 }
820 self.build_cluster_urls(&self.cluster_nodes)
821 }
822
823 pub fn normalize_cluster_seed_urls(&self, seeds: &[String]) -> Vec<String> {
826 self.build_cluster_urls(
827 &seeds
828 .iter()
829 .filter_map(|seed| ClusterNode::from_seed(seed))
830 .collect::<Vec<ClusterNode>>(),
831 )
832 }
833
834 fn build_cluster_urls(&self, nodes: &[ClusterNode]) -> Vec<String> {
835 let username = self
836 .cluster
837 .username
838 .as_deref()
839 .or(self.username.as_deref());
840 let password = self
841 .cluster
842 .password
843 .as_deref()
844 .or(self.password.as_deref());
845 let use_tls = self.cluster.use_tls;
846
847 nodes
848 .iter()
849 .map(|node| node.to_url_with_options(use_tls, username, password))
850 .collect()
851 }
852}
853
854impl RedisSentinel {
855 pub fn to_host_port(&self) -> String {
856 format!("{}:{}", self.host, self.port)
857 }
858}
859
860#[derive(Debug, Clone, Serialize, Deserialize)]
861#[serde(default)]
862pub struct ClusterNode {
863 pub host: String,
864 pub port: u16,
865}
866
867impl ClusterNode {
868 pub fn to_url(&self) -> String {
869 self.to_url_with_options(false, None, None)
870 }
871
872 pub fn to_url_with_options(
873 &self,
874 use_tls: bool,
875 username: Option<&str>,
876 password: Option<&str>,
877 ) -> String {
878 let host = self.host.trim();
879
880 if host.starts_with("redis://") || host.starts_with("rediss://") {
881 if let Ok(parsed) = Url::parse(host)
882 && let Some(host_str) = parsed.host_str()
883 {
884 let scheme = parsed.scheme();
885 let port = parsed.port_or_known_default().unwrap_or(self.port);
886 let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
887 let parsed_password = parsed.password();
888 let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
889 let (effective_username, effective_password) = if has_embedded_auth {
890 (parsed_username, parsed_password)
891 } else {
892 (username, password)
893 };
894
895 return build_redis_url(
896 scheme,
897 host_str,
898 port,
899 effective_username,
900 effective_password,
901 );
902 }
903
904 let has_port = if let Some(bracket_pos) = host.rfind(']') {
906 host[bracket_pos..].contains(':')
907 } else {
908 host.split(':').count() >= 3
909 };
910 let base = if has_port {
911 host.to_string()
912 } else {
913 format!("{}:{}", host, self.port)
914 };
915
916 if let Ok(parsed) = Url::parse(&base) {
917 let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
918 let parsed_password = parsed.password();
919 if let Some(host_str) = parsed.host_str() {
920 let port = parsed.port_or_known_default().unwrap_or(self.port);
921 let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
922 let (effective_username, effective_password) = if has_embedded_auth {
923 (parsed_username, parsed_password)
924 } else {
925 (username, password)
926 };
927 return build_redis_url(
928 parsed.scheme(),
929 host_str,
930 port,
931 effective_username,
932 effective_password,
933 );
934 }
935 }
936 return base;
937 }
938
939 let (normalized_host, normalized_port) = split_plain_host_and_port(host, self.port);
940 let scheme = if use_tls { "rediss" } else { "redis" };
941 build_redis_url(
942 scheme,
943 &normalized_host,
944 normalized_port,
945 username,
946 password,
947 )
948 }
949
950 pub fn from_seed(seed: &str) -> Option<Self> {
951 let trimmed = seed.trim();
952 if trimmed.is_empty() {
953 return None;
954 }
955
956 if trimmed.starts_with("redis://") || trimmed.starts_with("rediss://") {
957 let port = Url::parse(trimmed)
958 .ok()
959 .and_then(|parsed| parsed.port_or_known_default())
960 .unwrap_or(6379);
961 return Some(Self {
962 host: trimmed.to_string(),
963 port,
964 });
965 }
966
967 let (host, port) = split_plain_host_and_port(trimmed, 6379);
968 Some(Self { host, port })
969 }
970}
971
972fn split_plain_host_and_port(raw_host: &str, default_port: u16) -> (String, u16) {
973 let host = raw_host.trim();
974
975 if host.starts_with('[') {
977 if let Some(end_bracket) = host.find(']') {
978 let host_part = host[1..end_bracket].to_string();
979 let remainder = &host[end_bracket + 1..];
980 if let Some(port_str) = remainder.strip_prefix(':')
981 && let Ok(port) = port_str.parse::<u16>()
982 {
983 return (host_part, port);
984 }
985 return (host_part, default_port);
986 }
987 return (host.to_string(), default_port);
988 }
989
990 if host.matches(':').count() == 1
992 && let Some((host_part, port_part)) = host.rsplit_once(':')
993 && let Ok(port) = port_part.parse::<u16>()
994 {
995 return (host_part.to_string(), port);
996 }
997
998 (host.to_string(), default_port)
999}
1000
1001fn build_redis_url(
1002 scheme: &str,
1003 host: &str,
1004 port: u16,
1005 username: Option<&str>,
1006 password: Option<&str>,
1007) -> String {
1008 let mut url = format!("{scheme}://");
1009
1010 if let Some(user) = username {
1011 url.push_str(&urlencoding::encode(user));
1012 if let Some(pass) = password {
1013 url.push(':');
1014 url.push_str(&urlencoding::encode(pass));
1015 }
1016 url.push('@');
1017 } else if let Some(pass) = password {
1018 url.push(':');
1019 url.push_str(&urlencoding::encode(pass));
1020 url.push('@');
1021 }
1022
1023 if host.contains(':') && !host.starts_with('[') {
1024 url.push('[');
1025 url.push_str(host);
1026 url.push(']');
1027 } else {
1028 url.push_str(host);
1029 }
1030 url.push(':');
1031 url.push_str(&port.to_string());
1032 url
1033}
1034
1035#[derive(Debug, Clone, Serialize, Deserialize)]
1036#[serde(default)]
1037pub struct DatabasePooling {
1038 pub enabled: bool,
1039 pub min: u32,
1040 pub max: u32,
1041}
1042
1043#[derive(Debug, Clone, Serialize, Deserialize)]
1044#[serde(default)]
1045pub struct EventLimits {
1046 pub max_channels_at_once: u32,
1047 pub max_name_length: u32,
1048 pub max_payload_in_kb: u32,
1049 pub max_batch_size: u32,
1050}
1051
1052#[derive(Debug, Clone, Serialize, Deserialize)]
1053#[serde(default)]
1054pub struct IdempotencyConfig {
1055 pub enabled: bool,
1057 pub ttl_seconds: u64,
1059 pub max_key_length: usize,
1061}
1062
1063#[derive(Debug, Clone, Serialize, Deserialize)]
1064#[serde(default)]
1065pub struct EphemeralConfig {
1066 pub enabled: bool,
1068}
1069
1070#[derive(Debug, Clone, Serialize, Deserialize)]
1071#[serde(default)]
1072pub struct EchoControlConfig {
1073 pub enabled: bool,
1075 pub default_echo_messages: bool,
1077}
1078
1079#[derive(Debug, Clone, Serialize, Deserialize)]
1080#[serde(default)]
1081pub struct EventNameFilteringConfig {
1082 pub enabled: bool,
1084 pub max_events_per_filter: usize,
1086 pub max_event_name_length: usize,
1088}
1089
1090#[derive(Debug, Clone, Serialize, Deserialize)]
1091#[serde(default)]
1092pub struct ConnectionRecoveryConfig {
1093 pub enabled: bool,
1098 pub buffer_ttl_seconds: u64,
1100 pub max_buffer_size: usize,
1102}
1103
1104#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
1105#[serde(rename_all = "lowercase")]
1106pub enum HistoryBackend {
1107 #[default]
1108 Postgres,
1109 Mysql,
1110 DynamoDb,
1111 SurrealDb,
1112 ScyllaDb,
1113 Memory,
1114}
1115
1116impl FromStr for HistoryBackend {
1117 type Err = String;
1118 fn from_str(s: &str) -> Result<Self, Self::Err> {
1119 match s.to_lowercase().as_str() {
1120 "postgres" | "postgresql" | "pgsql" => Ok(Self::Postgres),
1121 "mysql" => Ok(Self::Mysql),
1122 "dynamodb" => Ok(Self::DynamoDb),
1123 "surrealdb" | "surreal" => Ok(Self::SurrealDb),
1124 "scylladb" | "scylla" => Ok(Self::ScyllaDb),
1125 "memory" => Ok(Self::Memory),
1126 _ => Err(format!("Unknown history backend: {s}")),
1127 }
1128 }
1129}
1130
1131#[derive(Debug, Clone, Serialize, Deserialize)]
1132#[serde(default)]
1133pub struct PostgresHistoryConfig {
1134 pub table_prefix: String,
1135 pub write_timeout_ms: u64,
1136}
1137
1138impl Default for PostgresHistoryConfig {
1139 fn default() -> Self {
1140 Self {
1141 table_prefix: "sockudo_history".to_string(),
1142 write_timeout_ms: 5000,
1143 }
1144 }
1145}
1146
1147#[derive(Debug, Clone, Serialize, Deserialize)]
1148#[serde(default)]
1149pub struct MySqlHistoryConfig {
1150 pub table_prefix: String,
1151 pub write_timeout_ms: u64,
1152}
1153
1154impl Default for MySqlHistoryConfig {
1155 fn default() -> Self {
1156 Self {
1157 table_prefix: "sockudo_history".to_string(),
1158 write_timeout_ms: 5000,
1159 }
1160 }
1161}
1162
1163#[derive(Debug, Clone, Serialize, Deserialize)]
1164#[serde(default)]
1165pub struct DynamoDbHistoryConfig {
1166 pub table_prefix: String,
1167 pub write_timeout_ms: u64,
1168}
1169
1170impl Default for DynamoDbHistoryConfig {
1171 fn default() -> Self {
1172 Self {
1173 table_prefix: "sockudo_history".to_string(),
1174 write_timeout_ms: 5000,
1175 }
1176 }
1177}
1178
1179#[derive(Debug, Clone, Serialize, Deserialize)]
1180#[serde(default)]
1181pub struct SurrealDbHistoryConfig {
1182 pub table_prefix: String,
1183 pub write_timeout_ms: u64,
1184}
1185
1186impl Default for SurrealDbHistoryConfig {
1187 fn default() -> Self {
1188 Self {
1189 table_prefix: "sockudo_history".to_string(),
1190 write_timeout_ms: 5000,
1191 }
1192 }
1193}
1194
1195#[derive(Debug, Clone, Serialize, Deserialize)]
1196#[serde(default)]
1197pub struct ScyllaDbHistoryConfig {
1198 pub table_prefix: String,
1199 pub write_timeout_ms: u64,
1200}
1201
1202impl Default for ScyllaDbHistoryConfig {
1203 fn default() -> Self {
1204 Self {
1205 table_prefix: "sockudo_history".to_string(),
1206 write_timeout_ms: 5000,
1207 }
1208 }
1209}
1210
1211#[derive(Debug, Clone, Serialize, Deserialize)]
1212#[serde(default)]
1213pub struct HistoryConfig {
1214 pub enabled: bool,
1215 pub rewind_enabled: bool,
1216 pub backend: HistoryBackend,
1217 pub retention_window_seconds: u64,
1218 pub max_page_size: usize,
1219 pub max_messages_per_channel: Option<usize>,
1220 pub max_bytes_per_channel: Option<u64>,
1221 pub writer_shards: usize,
1222 pub writer_queue_capacity: usize,
1223 pub postgres: PostgresHistoryConfig,
1224 pub mysql: MySqlHistoryConfig,
1225 pub dynamodb: DynamoDbHistoryConfig,
1226 pub surrealdb: SurrealDbHistoryConfig,
1227 pub scylladb: ScyllaDbHistoryConfig,
1228}
1229
1230impl Default for HistoryConfig {
1231 fn default() -> Self {
1232 Self {
1233 enabled: false,
1234 rewind_enabled: true,
1235 backend: HistoryBackend::Postgres,
1236 retention_window_seconds: 86400,
1237 max_page_size: 100,
1238 max_messages_per_channel: None,
1239 max_bytes_per_channel: None,
1240 writer_shards: 16,
1241 writer_queue_capacity: 4096,
1242 postgres: PostgresHistoryConfig::default(),
1243 mysql: MySqlHistoryConfig::default(),
1244 dynamodb: DynamoDbHistoryConfig::default(),
1245 surrealdb: SurrealDbHistoryConfig::default(),
1246 scylladb: ScyllaDbHistoryConfig::default(),
1247 }
1248 }
1249}
1250
1251#[derive(Debug, Clone, Serialize, Deserialize)]
1252#[serde(default)]
1253pub struct PresenceHistoryConfig {
1254 pub enabled: bool,
1255 pub retention_window_seconds: u64,
1256 pub max_page_size: usize,
1257 pub max_events_per_channel: Option<usize>,
1258 pub max_bytes_per_channel: Option<u64>,
1259}
1260
1261impl Default for PresenceHistoryConfig {
1262 fn default() -> Self {
1263 Self {
1264 enabled: false,
1265 retention_window_seconds: 86400,
1266 max_page_size: 100,
1267 max_events_per_channel: None,
1268 max_bytes_per_channel: None,
1269 }
1270 }
1271}
1272
1273#[derive(Debug, Clone, Serialize, Deserialize)]
1274#[serde(default)]
1275pub struct HttpApiConfig {
1276 pub request_limit_in_mb: u32,
1277 pub accept_traffic: AcceptTraffic,
1278 pub usage_enabled: bool,
1279}
1280
1281#[derive(Debug, Clone, Serialize, Deserialize)]
1282#[serde(default)]
1283pub struct AcceptTraffic {
1284 pub memory_threshold: f64,
1285}
1286
1287#[derive(Debug, Clone, Serialize, Deserialize)]
1288#[serde(default)]
1289pub struct InstanceConfig {
1290 pub process_id: String,
1291}
1292
1293#[derive(Debug, Clone, Serialize, Deserialize)]
1294#[serde(default)]
1295pub struct MetricsConfig {
1296 pub enabled: bool,
1297 pub driver: MetricsDriver,
1298 pub host: String,
1299 pub prometheus: PrometheusConfig,
1300 pub port: u16,
1301}
1302
1303#[derive(Debug, Clone, Serialize, Deserialize)]
1304#[serde(default)]
1305pub struct PrometheusConfig {
1306 pub prefix: String,
1307}
1308
1309#[derive(Debug, Clone, Serialize, Deserialize)]
1310#[serde(default)]
1311pub struct LoggingConfig {
1312 pub colors_enabled: bool,
1313 pub include_target: bool,
1314}
1315
1316#[derive(Debug, Clone, Serialize, Deserialize)]
1317#[serde(default)]
1318pub struct PresenceConfig {
1319 pub max_members_per_channel: u32,
1320 pub max_member_size_in_kb: u32,
1321}
1322
1323#[derive(Debug, Clone, Serialize, Deserialize)]
1326#[serde(default)]
1327pub struct WebSocketConfig {
1328 pub max_messages: Option<usize>,
1329 pub max_bytes: Option<usize>,
1330 pub disconnect_on_buffer_full: bool,
1331 pub max_message_size: usize,
1332 pub max_frame_size: usize,
1333 pub write_buffer_size: usize,
1334 pub max_backpressure: usize,
1335 pub auto_ping: bool,
1336 pub ping_interval: u32,
1337 pub idle_timeout: u32,
1338 pub compression: String,
1339}
1340
1341impl Default for WebSocketConfig {
1342 fn default() -> Self {
1343 Self {
1344 max_messages: Some(1000),
1345 max_bytes: None,
1346 disconnect_on_buffer_full: true,
1347 max_message_size: 64 * 1024 * 1024,
1348 max_frame_size: 16 * 1024 * 1024,
1349 write_buffer_size: 16 * 1024,
1350 max_backpressure: 1024 * 1024,
1351 auto_ping: true,
1352 ping_interval: 30,
1353 idle_timeout: 120,
1354 compression: "disabled".to_string(),
1355 }
1356 }
1357}
1358
1359impl WebSocketConfig {
1360 pub fn to_buffer_config(&self) -> crate::websocket::WebSocketBufferConfig {
1362 use crate::websocket::{BufferLimit, WebSocketBufferConfig};
1363
1364 let limit = match (self.max_messages, self.max_bytes) {
1365 (Some(messages), Some(bytes)) => BufferLimit::Both { messages, bytes },
1366 (Some(messages), None) => BufferLimit::Messages(messages),
1367 (None, Some(bytes)) => BufferLimit::Bytes(bytes),
1368 (None, None) => BufferLimit::Messages(1000),
1369 };
1370
1371 WebSocketBufferConfig {
1372 limit,
1373 disconnect_on_full: self.disconnect_on_buffer_full,
1374 }
1375 }
1376
1377 pub fn to_sockudo_ws_config(
1379 &self,
1380 websocket_max_payload_kb: u32,
1381 activity_timeout: u64,
1382 ) -> sockudo_ws::Config {
1383 use sockudo_ws::Compression;
1384
1385 let compression = match self.compression.to_lowercase().as_str() {
1386 "dedicated" => Compression::Dedicated,
1387 "shared" => Compression::Shared,
1388 "window256b" => Compression::Window256B,
1389 "window1kb" => Compression::Window1KB,
1390 "window2kb" => Compression::Window2KB,
1391 "window4kb" => Compression::Window4KB,
1392 "window8kb" => Compression::Window8KB,
1393 "window16kb" => Compression::Window16KB,
1394 "window32kb" => Compression::Window32KB,
1395 _ => Compression::Disabled,
1396 };
1397
1398 sockudo_ws::Config::builder()
1399 .max_payload_length(
1400 self.max_bytes
1401 .unwrap_or(websocket_max_payload_kb as usize * 1024),
1402 )
1403 .max_message_size(self.max_message_size)
1404 .max_frame_size(self.max_frame_size)
1405 .write_buffer_size(self.write_buffer_size)
1406 .max_backpressure(self.max_backpressure)
1407 .idle_timeout(self.idle_timeout)
1408 .auto_ping(self.auto_ping)
1409 .ping_interval(self.ping_interval.max((activity_timeout / 2).max(5) as u32))
1410 .compression(compression)
1411 .build()
1412 }
1413}
1414
1415#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1416#[serde(default)]
1417pub struct QueueConfig {
1418 pub driver: QueueDriver,
1419 pub redis: RedisQueueConfig,
1420 pub redis_cluster: RedisClusterQueueConfig,
1421 pub nats: NatsAdapterConfig,
1422 pub pulsar: PulsarAdapterConfig,
1423 pub rabbitmq: RabbitMqAdapterConfig,
1424 pub google_pubsub: GooglePubSubAdapterConfig,
1425 pub kafka: KafkaAdapterConfig,
1426 pub sqs: SqsQueueConfig,
1427 pub sns: SnsQueueConfig,
1428}
1429
1430#[derive(Debug, Clone, Serialize, Deserialize)]
1431#[serde(default)]
1432pub struct RedisQueueConfig {
1433 pub concurrency: u32,
1434 pub prefix: Option<String>,
1435 pub url_override: Option<String>,
1436 pub cluster_mode: bool,
1437}
1438
1439#[derive(Clone, Debug, Serialize, Deserialize)]
1440#[serde(default)]
1441pub struct RateLimit {
1442 pub max_requests: u32,
1443 pub window_seconds: u64,
1444 pub identifier: Option<String>,
1445 pub trust_hops: Option<u32>,
1446}
1447
1448#[derive(Debug, Clone, Serialize, Deserialize)]
1449#[serde(default)]
1450pub struct RateLimiterConfig {
1451 pub enabled: bool,
1452 pub driver: CacheDriver,
1453 pub api_rate_limit: RateLimit,
1454 pub websocket_rate_limit: RateLimit,
1455 pub redis: RedisConfig,
1456}
1457
1458#[derive(Debug, Clone, Serialize, Deserialize)]
1459#[serde(default)]
1460pub struct SslConfig {
1461 pub enabled: bool,
1462 pub cert_path: String,
1463 pub key_path: String,
1464 pub passphrase: Option<String>,
1465 pub ca_path: Option<String>,
1466 pub redirect_http: bool,
1467 pub http_port: Option<u16>,
1468}
1469
1470#[derive(Debug, Clone, Serialize, Deserialize)]
1471#[serde(default)]
1472pub struct WebhooksConfig {
1473 pub batching: BatchingConfig,
1474 pub retry: WebhookRetryConfig,
1475 pub request_timeout_ms: u64,
1476}
1477
1478#[derive(Debug, Clone, Serialize, Deserialize)]
1479#[serde(default)]
1480pub struct BatchingConfig {
1481 pub enabled: bool,
1482 pub duration: u64,
1483 pub size: usize,
1484}
1485
1486#[derive(Debug, Clone, Serialize, Deserialize)]
1487#[serde(default)]
1488pub struct WebhookRetryConfig {
1489 pub enabled: bool,
1490 pub max_attempts: Option<u32>,
1491 pub max_elapsed_time_ms: u64,
1492 pub initial_backoff_ms: u64,
1493 pub max_backoff_ms: u64,
1494}
1495
1496impl Default for WebhooksConfig {
1497 fn default() -> Self {
1498 Self {
1499 batching: BatchingConfig::default(),
1500 retry: WebhookRetryConfig::default(),
1501 request_timeout_ms: 10_000,
1502 }
1503 }
1504}
1505
1506impl Default for WebhookRetryConfig {
1507 fn default() -> Self {
1508 Self {
1509 enabled: true,
1510 max_attempts: None,
1511 max_elapsed_time_ms: 300_000,
1512 initial_backoff_ms: 1_000,
1513 max_backoff_ms: 60_000,
1514 }
1515 }
1516}
1517
1518#[derive(Debug, Clone, Serialize, Deserialize)]
1519#[serde(default)]
1520pub struct ClusterHealthConfig {
1521 pub enabled: bool,
1522 pub heartbeat_interval_ms: u64,
1523 pub node_timeout_ms: u64,
1524 pub cleanup_interval_ms: u64,
1525}
1526
1527#[derive(Debug, Clone, Serialize, Deserialize)]
1528#[serde(default)]
1529pub struct UnixSocketConfig {
1530 pub enabled: bool,
1531 pub path: String,
1532 #[serde(deserialize_with = "deserialize_octal_permission")]
1533 pub permission_mode: u32,
1534}
1535
1536#[derive(Debug, Clone, Serialize, Deserialize)]
1537#[serde(default)]
1538pub struct DeltaCompressionOptionsConfig {
1539 pub enabled: bool,
1540 pub algorithm: String,
1541 pub full_message_interval: u32,
1542 pub min_message_size: usize,
1543 pub max_state_age_secs: u64,
1544 pub max_channel_states_per_socket: usize,
1545 pub max_conflation_states_per_channel: Option<usize>,
1546 pub conflation_key_path: Option<String>,
1547 pub cluster_coordination: bool,
1548 pub coordination_backend: DeltaCoordinationBackend,
1549 pub omit_delta_algorithm: bool,
1550}
1551
1552#[derive(Debug, Clone, Serialize, Deserialize)]
1554#[serde(default)]
1555pub struct CleanupConfig {
1556 pub queue_buffer_size: usize,
1557 pub batch_size: usize,
1558 pub batch_timeout_ms: u64,
1559 pub worker_threads: WorkerThreadsConfig,
1560 pub max_retry_attempts: u32,
1561 pub async_enabled: bool,
1562 pub fallback_to_sync: bool,
1563}
1564
1565#[derive(Debug, Clone)]
1567pub enum WorkerThreadsConfig {
1568 Auto,
1569 Fixed(usize),
1570}
1571
1572impl serde::Serialize for WorkerThreadsConfig {
1573 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1574 where
1575 S: serde::Serializer,
1576 {
1577 match self {
1578 WorkerThreadsConfig::Auto => serializer.serialize_str("auto"),
1579 WorkerThreadsConfig::Fixed(n) => serializer.serialize_u64(*n as u64),
1580 }
1581 }
1582}
1583
1584impl<'de> serde::Deserialize<'de> for WorkerThreadsConfig {
1585 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1586 where
1587 D: serde::Deserializer<'de>,
1588 {
1589 use serde::de;
1590 struct WorkerThreadsVisitor;
1591 impl<'de> de::Visitor<'de> for WorkerThreadsVisitor {
1592 type Value = WorkerThreadsConfig;
1593
1594 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1595 formatter.write_str(r#""auto" or a positive integer"#)
1596 }
1597
1598 fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
1599 if value.eq_ignore_ascii_case("auto") {
1600 Ok(WorkerThreadsConfig::Auto)
1601 } else if let Ok(n) = value.parse::<usize>() {
1602 Ok(WorkerThreadsConfig::Fixed(n))
1603 } else {
1604 Err(E::custom(format!(
1605 "expected 'auto' or a number, got '{value}'"
1606 )))
1607 }
1608 }
1609
1610 fn visit_u64<E: de::Error>(self, value: u64) -> Result<Self::Value, E> {
1611 Ok(WorkerThreadsConfig::Fixed(value as usize))
1612 }
1613
1614 fn visit_i64<E: de::Error>(self, value: i64) -> Result<Self::Value, E> {
1615 if value >= 0 {
1616 Ok(WorkerThreadsConfig::Fixed(value as usize))
1617 } else {
1618 Err(E::custom("worker_threads must be non-negative"))
1619 }
1620 }
1621 }
1622 deserializer.deserialize_any(WorkerThreadsVisitor)
1623 }
1624}
1625
1626impl Default for CleanupConfig {
1627 fn default() -> Self {
1628 Self {
1629 queue_buffer_size: 1024,
1630 batch_size: 64,
1631 batch_timeout_ms: 100,
1632 worker_threads: WorkerThreadsConfig::Auto,
1633 max_retry_attempts: 3,
1634 async_enabled: true,
1635 fallback_to_sync: true,
1636 }
1637 }
1638}
1639
1640impl CleanupConfig {
1641 pub fn validate(&self) -> Result<(), String> {
1642 if self.queue_buffer_size == 0 {
1643 return Err("queue_buffer_size must be greater than 0".to_string());
1644 }
1645 if self.batch_size == 0 {
1646 return Err("batch_size must be greater than 0".to_string());
1647 }
1648 if self.batch_timeout_ms == 0 {
1649 return Err("batch_timeout_ms must be greater than 0".to_string());
1650 }
1651 if let WorkerThreadsConfig::Fixed(n) = self.worker_threads
1652 && n == 0
1653 {
1654 return Err("worker_threads must be greater than 0 when using fixed count".to_string());
1655 }
1656 Ok(())
1657 }
1658}
1659
1660#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1661#[serde(default)]
1662pub struct TagFilteringConfig {
1663 #[serde(default)]
1664 pub enabled: bool,
1665 #[serde(default = "default_true")]
1666 pub enable_tags: bool,
1667}
1668
1669fn default_true() -> bool {
1670 true
1671}
1672
1673impl Default for ServerOptions {
1676 fn default() -> Self {
1677 Self {
1678 adapter: AdapterConfig::default(),
1679 app_manager: AppManagerConfig::default(),
1680 cache: CacheConfig::default(),
1681 channel_limits: ChannelLimits::default(),
1682 cors: CorsConfig::default(),
1683 database: DatabaseConfig::default(),
1684 database_pooling: DatabasePooling::default(),
1685 debug: false,
1686 tag_filtering: TagFilteringConfig::default(),
1687 event_limits: EventLimits::default(),
1688 host: "0.0.0.0".to_string(),
1689 http_api: HttpApiConfig::default(),
1690 instance: InstanceConfig::default(),
1691 logging: None,
1692 metrics: MetricsConfig::default(),
1693 mode: "production".to_string(),
1694 port: 6001,
1695 path_prefix: "/".to_string(),
1696 presence: PresenceConfig::default(),
1697 queue: QueueConfig::default(),
1698 rate_limiter: RateLimiterConfig::default(),
1699 shutdown_grace_period: 10,
1700 ssl: SslConfig::default(),
1701 user_authentication_timeout: 3600,
1702 webhooks: WebhooksConfig::default(),
1703 websocket_max_payload_kb: 64,
1704 cleanup: CleanupConfig::default(),
1705 activity_timeout: 120,
1706 cluster_health: ClusterHealthConfig::default(),
1707 unix_socket: UnixSocketConfig::default(),
1708 delta_compression: DeltaCompressionOptionsConfig::default(),
1709 websocket: WebSocketConfig::default(),
1710 connection_recovery: ConnectionRecoveryConfig::default(),
1711 history: HistoryConfig::default(),
1712 presence_history: PresenceHistoryConfig::default(),
1713 idempotency: IdempotencyConfig::default(),
1714 ephemeral: EphemeralConfig::default(),
1715 echo_control: EchoControlConfig::default(),
1716 event_name_filtering: EventNameFilteringConfig::default(),
1717 }
1718 }
1719}
1720
1721impl Default for SqsQueueConfig {
1722 fn default() -> Self {
1723 Self {
1724 region: "us-east-1".to_string(),
1725 queue_url_prefix: None,
1726 visibility_timeout: 30,
1727 endpoint_url: None,
1728 max_messages: 10,
1729 wait_time_seconds: 5,
1730 concurrency: 5,
1731 fifo: false,
1732 message_group_id: Some("default".to_string()),
1733 }
1734 }
1735}
1736
1737impl Default for SnsQueueConfig {
1738 fn default() -> Self {
1739 Self {
1740 region: "us-east-1".to_string(),
1741 topic_arn: String::new(),
1742 endpoint_url: None,
1743 }
1744 }
1745}
1746
1747impl Default for RedisAdapterConfig {
1748 fn default() -> Self {
1749 Self {
1750 requests_timeout: 5000,
1751 prefix: "sockudo_adapter:".to_string(),
1752 redis_pub_options: AHashMap::new(),
1753 redis_sub_options: AHashMap::new(),
1754 cluster_mode: false,
1755 }
1756 }
1757}
1758
1759impl Default for RedisClusterAdapterConfig {
1760 fn default() -> Self {
1761 Self {
1762 nodes: vec![],
1763 prefix: "sockudo_adapter:".to_string(),
1764 request_timeout_ms: 1000,
1765 use_connection_manager: true,
1766 use_sharded_pubsub: false,
1767 }
1768 }
1769}
1770
1771impl Default for NatsAdapterConfig {
1772 fn default() -> Self {
1773 Self {
1774 servers: vec!["nats://localhost:4222".to_string()],
1775 prefix: "sockudo_adapter:".to_string(),
1776 request_timeout_ms: 5000,
1777 username: None,
1778 password: None,
1779 token: None,
1780 connection_timeout_ms: 5000,
1781 nodes_number: None,
1782 discovery_max_wait_ms: 1000,
1783 discovery_idle_wait_ms: 150,
1784 }
1785 }
1786}
1787
1788impl Default for PulsarAdapterConfig {
1789 fn default() -> Self {
1790 Self {
1791 url: "pulsar://127.0.0.1:6650".to_string(),
1792 prefix: "sockudo-adapter".to_string(),
1793 request_timeout_ms: 5000,
1794 token: None,
1795 nodes_number: None,
1796 }
1797 }
1798}
1799
1800impl Default for RabbitMqAdapterConfig {
1801 fn default() -> Self {
1802 Self {
1803 url: "amqp://guest:guest@127.0.0.1:5672/%2f".to_string(),
1804 prefix: "sockudo_adapter".to_string(),
1805 request_timeout_ms: 5000,
1806 connection_timeout_ms: 5000,
1807 nodes_number: None,
1808 }
1809 }
1810}
1811
1812impl Default for GooglePubSubAdapterConfig {
1813 fn default() -> Self {
1814 Self {
1815 project_id: "".to_string(),
1816 prefix: "sockudo-adapter".to_string(),
1817 request_timeout_ms: 5000,
1818 emulator_host: None,
1819 nodes_number: None,
1820 }
1821 }
1822}
1823
1824impl Default for KafkaAdapterConfig {
1825 fn default() -> Self {
1826 Self {
1827 brokers: vec!["localhost:9092".to_string()],
1828 prefix: "sockudo_adapter".to_string(),
1829 request_timeout_ms: 5000,
1830 security_protocol: None,
1831 sasl_mechanism: None,
1832 sasl_username: None,
1833 sasl_password: None,
1834 nodes_number: None,
1835 }
1836 }
1837}
1838
1839impl Default for CacheSettings {
1840 fn default() -> Self {
1841 Self {
1842 enabled: true,
1843 ttl: 300,
1844 }
1845 }
1846}
1847
1848impl Default for MemoryCacheOptions {
1849 fn default() -> Self {
1850 Self {
1851 ttl: 300,
1852 cleanup_interval: 60,
1853 max_capacity: 10000,
1854 }
1855 }
1856}
1857
1858impl Default for CacheConfig {
1859 fn default() -> Self {
1860 Self {
1861 driver: CacheDriver::default(),
1862 redis: RedisConfig {
1863 prefix: Some("sockudo_cache:".to_string()),
1864 url_override: None,
1865 cluster_mode: false,
1866 },
1867 memory: MemoryCacheOptions::default(),
1868 }
1869 }
1870}
1871
1872impl Default for ChannelLimits {
1873 fn default() -> Self {
1874 Self {
1875 max_name_length: 200,
1876 cache_ttl: 3600,
1877 }
1878 }
1879}
1880impl Default for CorsConfig {
1881 fn default() -> Self {
1882 Self {
1883 credentials: true,
1884 origin: vec!["*".to_string()],
1885 methods: vec!["GET".to_string(), "POST".to_string(), "OPTIONS".to_string()],
1886 allowed_headers: vec![
1887 "Authorization".to_string(),
1888 "Content-Type".to_string(),
1889 "X-Requested-With".to_string(),
1890 "Accept".to_string(),
1891 ],
1892 }
1893 }
1894}
1895
1896impl Default for DatabaseConnection {
1897 fn default() -> Self {
1898 Self {
1899 host: "localhost".to_string(),
1900 port: 3306,
1901 username: "root".to_string(),
1902 password: "".to_string(),
1903 database: "sockudo".to_string(),
1904 table_name: "applications".to_string(),
1905 connection_pool_size: 10,
1906 pool_min: None,
1907 pool_max: None,
1908 cache_ttl: 300,
1909 cache_cleanup_interval: 60,
1910 cache_max_capacity: 100,
1911 }
1912 }
1913}
1914
1915impl Default for RedisConnection {
1916 fn default() -> Self {
1917 Self {
1918 host: "127.0.0.1".to_string(),
1919 port: 6379,
1920 db: 0,
1921 username: None,
1922 password: None,
1923 key_prefix: "sockudo:".to_string(),
1924 sentinels: Vec::new(),
1925 sentinel_password: None,
1926 name: "mymaster".to_string(),
1927 cluster: RedisClusterConnection::default(),
1928 cluster_nodes: Vec::new(),
1929 }
1930 }
1931}
1932
1933impl Default for RedisSentinel {
1934 fn default() -> Self {
1935 Self {
1936 host: "localhost".to_string(),
1937 port: 26379,
1938 }
1939 }
1940}
1941
1942impl Default for ClusterNode {
1943 fn default() -> Self {
1944 Self {
1945 host: "127.0.0.1".to_string(),
1946 port: 7000,
1947 }
1948 }
1949}
1950
1951impl Default for DatabasePooling {
1952 fn default() -> Self {
1953 Self {
1954 enabled: true,
1955 min: 2,
1956 max: 10,
1957 }
1958 }
1959}
1960
1961impl Default for EventLimits {
1962 fn default() -> Self {
1963 Self {
1964 max_channels_at_once: 100,
1965 max_name_length: 200,
1966 max_payload_in_kb: 100,
1967 max_batch_size: 10,
1968 }
1969 }
1970}
1971
1972impl Default for IdempotencyConfig {
1973 fn default() -> Self {
1974 Self {
1975 enabled: true,
1976 ttl_seconds: 120,
1977 max_key_length: 128,
1978 }
1979 }
1980}
1981
1982impl Default for EphemeralConfig {
1983 fn default() -> Self {
1984 Self { enabled: true }
1985 }
1986}
1987
1988impl Default for EchoControlConfig {
1989 fn default() -> Self {
1990 Self {
1991 enabled: true,
1992 default_echo_messages: true,
1993 }
1994 }
1995}
1996
1997impl Default for EventNameFilteringConfig {
1998 fn default() -> Self {
1999 Self {
2000 enabled: true,
2001 max_events_per_filter: 50,
2002 max_event_name_length: 200,
2003 }
2004 }
2005}
2006
2007impl Default for ConnectionRecoveryConfig {
2008 fn default() -> Self {
2009 Self {
2010 enabled: false,
2011 buffer_ttl_seconds: 120,
2012 max_buffer_size: 100,
2013 }
2014 }
2015}
2016
2017impl Default for HttpApiConfig {
2018 fn default() -> Self {
2019 Self {
2020 request_limit_in_mb: 10,
2021 accept_traffic: AcceptTraffic::default(),
2022 usage_enabled: true,
2023 }
2024 }
2025}
2026
2027impl Default for AcceptTraffic {
2028 fn default() -> Self {
2029 Self {
2030 memory_threshold: 0.90,
2031 }
2032 }
2033}
2034
2035impl Default for InstanceConfig {
2036 fn default() -> Self {
2037 Self {
2038 process_id: uuid::Uuid::new_v4().to_string(),
2039 }
2040 }
2041}
2042
2043impl Default for LoggingConfig {
2044 fn default() -> Self {
2045 Self {
2046 colors_enabled: true,
2047 include_target: true,
2048 }
2049 }
2050}
2051
2052impl Default for MetricsConfig {
2053 fn default() -> Self {
2054 Self {
2055 enabled: true,
2056 driver: MetricsDriver::default(),
2057 host: "0.0.0.0".to_string(),
2058 prometheus: PrometheusConfig::default(),
2059 port: 9601,
2060 }
2061 }
2062}
2063
2064impl Default for PrometheusConfig {
2065 fn default() -> Self {
2066 Self {
2067 prefix: "sockudo_".to_string(),
2068 }
2069 }
2070}
2071
2072impl Default for PresenceConfig {
2073 fn default() -> Self {
2074 Self {
2075 max_members_per_channel: 100,
2076 max_member_size_in_kb: 2,
2077 }
2078 }
2079}
2080
2081impl Default for RedisQueueConfig {
2082 fn default() -> Self {
2083 Self {
2084 concurrency: 5,
2085 prefix: Some("sockudo_queue:".to_string()),
2086 url_override: None,
2087 cluster_mode: false,
2088 }
2089 }
2090}
2091
2092impl Default for RateLimit {
2093 fn default() -> Self {
2094 Self {
2095 max_requests: 60,
2096 window_seconds: 60,
2097 identifier: Some("default".to_string()),
2098 trust_hops: Some(0),
2099 }
2100 }
2101}
2102
2103impl Default for RateLimiterConfig {
2104 fn default() -> Self {
2105 Self {
2106 enabled: true,
2107 driver: CacheDriver::Memory,
2108 api_rate_limit: RateLimit {
2109 max_requests: 100,
2110 window_seconds: 60,
2111 identifier: Some("api".to_string()),
2112 trust_hops: Some(0),
2113 },
2114 websocket_rate_limit: RateLimit {
2115 max_requests: 20,
2116 window_seconds: 60,
2117 identifier: Some("websocket_connect".to_string()),
2118 trust_hops: Some(0),
2119 },
2120 redis: RedisConfig {
2121 prefix: Some("sockudo_rl:".to_string()),
2122 url_override: None,
2123 cluster_mode: false,
2124 },
2125 }
2126 }
2127}
2128
2129impl Default for SslConfig {
2130 fn default() -> Self {
2131 Self {
2132 enabled: false,
2133 cert_path: "".to_string(),
2134 key_path: "".to_string(),
2135 passphrase: None,
2136 ca_path: None,
2137 redirect_http: false,
2138 http_port: Some(80),
2139 }
2140 }
2141}
2142
2143impl Default for BatchingConfig {
2144 fn default() -> Self {
2145 Self {
2146 enabled: true,
2147 duration: 50,
2148 size: 100,
2149 }
2150 }
2151}
2152
2153impl Default for ClusterHealthConfig {
2154 fn default() -> Self {
2155 Self {
2156 enabled: true,
2157 heartbeat_interval_ms: 10000,
2158 node_timeout_ms: 30000,
2159 cleanup_interval_ms: 10000,
2160 }
2161 }
2162}
2163
2164impl Default for UnixSocketConfig {
2165 fn default() -> Self {
2166 Self {
2167 enabled: false,
2168 path: "/var/run/sockudo/sockudo.sock".to_string(),
2169 permission_mode: 0o660,
2170 }
2171 }
2172}
2173
2174impl Default for DeltaCompressionOptionsConfig {
2175 fn default() -> Self {
2176 Self {
2177 enabled: true,
2178 algorithm: "fossil".to_string(),
2179 full_message_interval: 10,
2180 min_message_size: 100,
2181 max_state_age_secs: 300,
2182 max_channel_states_per_socket: 100,
2183 max_conflation_states_per_channel: Some(100),
2184 conflation_key_path: None,
2185 cluster_coordination: false,
2186 coordination_backend: DeltaCoordinationBackend::Auto,
2187 omit_delta_algorithm: false,
2188 }
2189 }
2190}
2191
2192#[cfg(test)]
2193mod tests {
2194 use super::{DeltaCoordinationBackend, QueueDriver};
2195 use std::str::FromStr;
2196
2197 #[test]
2198 fn queue_driver_parses_broker_backends() {
2199 assert_eq!(
2200 QueueDriver::from_str("rabbitmq").unwrap(),
2201 QueueDriver::RabbitMq
2202 );
2203 assert_eq!(QueueDriver::from_str("kafka").unwrap(), QueueDriver::Kafka);
2204 assert_eq!(
2205 QueueDriver::from_str("pulsar").unwrap(),
2206 QueueDriver::Pulsar
2207 );
2208 assert_eq!(
2209 QueueDriver::from_str("google-pubsub").unwrap(),
2210 QueueDriver::GooglePubSub
2211 );
2212 }
2213
2214 #[test]
2215 fn delta_coordination_backend_parses_expected_values() {
2216 assert_eq!(
2217 DeltaCoordinationBackend::from_str("auto").unwrap(),
2218 DeltaCoordinationBackend::Auto
2219 );
2220 assert_eq!(
2221 DeltaCoordinationBackend::from_str("redis-cluster").unwrap(),
2222 DeltaCoordinationBackend::RedisCluster
2223 );
2224 assert_eq!(
2225 DeltaCoordinationBackend::from_str("nats").unwrap(),
2226 DeltaCoordinationBackend::Nats
2227 );
2228 }
2229}
2230
2231impl ClusterHealthConfig {
2232 pub fn validate(&self) -> Result<(), String> {
2233 if self.heartbeat_interval_ms == 0 {
2234 return Err("heartbeat_interval_ms must be greater than 0".to_string());
2235 }
2236 if self.node_timeout_ms == 0 {
2237 return Err("node_timeout_ms must be greater than 0".to_string());
2238 }
2239 if self.cleanup_interval_ms == 0 {
2240 return Err("cleanup_interval_ms must be greater than 0".to_string());
2241 }
2242
2243 if self.heartbeat_interval_ms > self.node_timeout_ms / 3 {
2244 return Err(format!(
2245 "heartbeat_interval_ms ({}) should be at least 3x smaller than node_timeout_ms ({}) to avoid false positive dead node detection. Recommended: heartbeat_interval_ms <= {}",
2246 self.heartbeat_interval_ms,
2247 self.node_timeout_ms,
2248 self.node_timeout_ms / 3
2249 ));
2250 }
2251
2252 if self.cleanup_interval_ms > self.node_timeout_ms {
2253 return Err(format!(
2254 "cleanup_interval_ms ({}) should not be larger than node_timeout_ms ({}) to ensure timely dead node detection",
2255 self.cleanup_interval_ms, self.node_timeout_ms
2256 ));
2257 }
2258
2259 Ok(())
2260 }
2261}
2262
2263impl ServerOptions {
2264 pub async fn load_from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
2265 let content = tokio::fs::read_to_string(path).await?;
2266 let options: Self = if path.ends_with(".toml") {
2267 toml::from_str(&content)?
2268 } else {
2269 sonic_rs::from_str(&content)?
2271 };
2272 Ok(options)
2273 }
2274
2275 pub async fn override_from_env(&mut self) -> Result<(), Box<dyn std::error::Error>> {
2276 if let Ok(mode) = std::env::var("ENVIRONMENT") {
2278 self.mode = mode;
2279 }
2280 self.debug = parse_bool_env("DEBUG_MODE", self.debug);
2281 if parse_bool_env("DEBUG", false) {
2282 self.debug = true;
2283 info!("DEBUG environment variable forces debug mode ON");
2284 }
2285
2286 self.activity_timeout = parse_env::<u64>("ACTIVITY_TIMEOUT", self.activity_timeout);
2287
2288 if let Ok(host) = std::env::var("HOST") {
2289 self.host = host;
2290 }
2291 self.port = parse_env::<u16>("PORT", self.port);
2292 self.shutdown_grace_period =
2293 parse_env::<u64>("SHUTDOWN_GRACE_PERIOD", self.shutdown_grace_period);
2294 self.user_authentication_timeout = parse_env::<u64>(
2295 "USER_AUTHENTICATION_TIMEOUT",
2296 self.user_authentication_timeout,
2297 );
2298 self.websocket_max_payload_kb =
2299 parse_env::<u32>("WEBSOCKET_MAX_PAYLOAD_KB", self.websocket_max_payload_kb);
2300 if let Ok(id) = std::env::var("INSTANCE_PROCESS_ID") {
2301 self.instance.process_id = id;
2302 }
2303
2304 if let Ok(driver_str) = std::env::var("ADAPTER_DRIVER") {
2306 self.adapter.driver =
2307 parse_driver_enum(driver_str, self.adapter.driver.clone(), "Adapter");
2308 }
2309 self.adapter.buffer_multiplier_per_cpu = parse_env::<usize>(
2310 "ADAPTER_BUFFER_MULTIPLIER_PER_CPU",
2311 self.adapter.buffer_multiplier_per_cpu,
2312 );
2313 self.adapter.enable_socket_counting = parse_env::<bool>(
2314 "ADAPTER_ENABLE_SOCKET_COUNTING",
2315 self.adapter.enable_socket_counting,
2316 );
2317 self.adapter.fallback_to_local =
2318 parse_env::<bool>("ADAPTER_FALLBACK_TO_LOCAL", self.adapter.fallback_to_local);
2319 if let Ok(driver_str) = std::env::var("CACHE_DRIVER") {
2320 self.cache.driver = parse_driver_enum(driver_str, self.cache.driver.clone(), "Cache");
2321 }
2322 if let Ok(driver_str) = std::env::var("QUEUE_DRIVER") {
2323 self.queue.driver = parse_driver_enum(driver_str, self.queue.driver.clone(), "Queue");
2324 }
2325 if let Ok(driver_str) = std::env::var("APP_MANAGER_DRIVER") {
2326 self.app_manager.driver =
2327 parse_driver_enum(driver_str, self.app_manager.driver.clone(), "AppManager");
2328 }
2329 if let Ok(driver_str) = std::env::var("RATE_LIMITER_DRIVER") {
2330 self.rate_limiter.driver = parse_driver_enum(
2331 driver_str,
2332 self.rate_limiter.driver.clone(),
2333 "RateLimiter Backend",
2334 );
2335 }
2336
2337 if let Ok(host) = std::env::var("DATABASE_REDIS_HOST") {
2339 self.database.redis.host = host;
2340 }
2341 self.database.redis.port =
2342 parse_env::<u16>("DATABASE_REDIS_PORT", self.database.redis.port);
2343 if let Ok(username) = std::env::var("DATABASE_REDIS_USERNAME") {
2344 self.database.redis.username = if username.is_empty() {
2345 None
2346 } else {
2347 Some(username)
2348 };
2349 }
2350 if let Ok(password) = std::env::var("DATABASE_REDIS_PASSWORD") {
2351 self.database.redis.password = Some(password);
2352 }
2353 self.database.redis.db = parse_env::<u32>("DATABASE_REDIS_DB", self.database.redis.db);
2354 if let Ok(prefix) = std::env::var("DATABASE_REDIS_KEY_PREFIX") {
2355 self.database.redis.key_prefix = prefix;
2356 }
2357 if let Ok(cluster_username) = std::env::var("DATABASE_REDIS_CLUSTER_USERNAME") {
2358 self.database.redis.cluster.username = if cluster_username.is_empty() {
2359 None
2360 } else {
2361 Some(cluster_username)
2362 };
2363 }
2364 if let Ok(cluster_password) = std::env::var("DATABASE_REDIS_CLUSTER_PASSWORD") {
2365 self.database.redis.cluster.password = Some(cluster_password);
2366 }
2367 self.database.redis.cluster.use_tls = parse_bool_env(
2368 "DATABASE_REDIS_CLUSTER_USE_TLS",
2369 self.database.redis.cluster.use_tls,
2370 );
2371
2372 if let Ok(host) = std::env::var("DATABASE_MYSQL_HOST") {
2374 self.database.mysql.host = host;
2375 }
2376 self.database.mysql.port =
2377 parse_env::<u16>("DATABASE_MYSQL_PORT", self.database.mysql.port);
2378 if let Ok(user) = std::env::var("DATABASE_MYSQL_USERNAME") {
2379 self.database.mysql.username = user;
2380 }
2381 if let Ok(pass) = std::env::var("DATABASE_MYSQL_PASSWORD") {
2382 self.database.mysql.password = pass;
2383 }
2384 if let Ok(db) = std::env::var("DATABASE_MYSQL_DATABASE") {
2385 self.database.mysql.database = db;
2386 }
2387 if let Ok(table) = std::env::var("DATABASE_MYSQL_TABLE_NAME") {
2388 self.database.mysql.table_name = table;
2389 }
2390 override_db_pool_settings(&mut self.database.mysql, "DATABASE_MYSQL");
2391
2392 if let Ok(host) = std::env::var("DATABASE_POSTGRES_HOST") {
2394 self.database.postgres.host = host;
2395 }
2396 self.database.postgres.port =
2397 parse_env::<u16>("DATABASE_POSTGRES_PORT", self.database.postgres.port);
2398 if let Ok(user) = std::env::var("DATABASE_POSTGRES_USERNAME") {
2399 self.database.postgres.username = user;
2400 }
2401 if let Ok(pass) = std::env::var("DATABASE_POSTGRES_PASSWORD") {
2402 self.database.postgres.password = pass;
2403 }
2404 if let Ok(db) = std::env::var("DATABASE_POSTGRES_DATABASE") {
2405 self.database.postgres.database = db;
2406 }
2407 override_db_pool_settings(&mut self.database.postgres, "DATABASE_POSTGRES");
2408
2409 if let Ok(region) = std::env::var("DATABASE_DYNAMODB_REGION") {
2411 self.database.dynamodb.region = region;
2412 }
2413 if let Ok(table) = std::env::var("DATABASE_DYNAMODB_TABLE_NAME") {
2414 self.database.dynamodb.table_name = table;
2415 }
2416 if let Ok(endpoint) = std::env::var("DATABASE_DYNAMODB_ENDPOINT_URL") {
2417 self.database.dynamodb.endpoint_url = Some(endpoint);
2418 }
2419 if let Ok(key_id) = std::env::var("AWS_ACCESS_KEY_ID") {
2420 self.database.dynamodb.aws_access_key_id = Some(key_id);
2421 }
2422 if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") {
2423 self.database.dynamodb.aws_secret_access_key = Some(secret);
2424 }
2425
2426 if let Ok(url) = std::env::var("DATABASE_SURREALDB_URL") {
2428 self.database.surrealdb.url = url;
2429 }
2430 if let Ok(namespace) = std::env::var("DATABASE_SURREALDB_NAMESPACE") {
2431 self.database.surrealdb.namespace = namespace;
2432 }
2433 if let Ok(database) = std::env::var("DATABASE_SURREALDB_DATABASE") {
2434 self.database.surrealdb.database = database;
2435 }
2436 if let Ok(username) = std::env::var("DATABASE_SURREALDB_USERNAME") {
2437 self.database.surrealdb.username = username;
2438 }
2439 if let Ok(password) = std::env::var("DATABASE_SURREALDB_PASSWORD") {
2440 self.database.surrealdb.password = password;
2441 }
2442 if let Ok(table) = std::env::var("DATABASE_SURREALDB_TABLE_NAME") {
2443 self.database.surrealdb.table_name = table;
2444 }
2445
2446 let apply_redis_cluster_nodes = |options: &mut Self, nodes: &str| {
2448 let node_list: Vec<String> = nodes
2449 .split(',')
2450 .map(|s| s.trim())
2451 .filter(|s| !s.is_empty())
2452 .map(ToString::to_string)
2453 .collect();
2454
2455 options.adapter.cluster.nodes = node_list.clone();
2456 options.queue.redis_cluster.nodes = node_list.clone();
2457
2458 let parsed_nodes: Vec<ClusterNode> = node_list
2459 .iter()
2460 .filter_map(|seed| ClusterNode::from_seed(seed))
2461 .collect();
2462 options.database.redis.cluster.nodes = parsed_nodes.clone();
2463 options.database.redis.cluster_nodes = parsed_nodes;
2464 };
2465
2466 if let Ok(nodes) = std::env::var("REDIS_CLUSTER_NODES") {
2467 apply_redis_cluster_nodes(self, &nodes);
2468 }
2469 if let Ok(nodes) = std::env::var("DATABASE_REDIS_CLUSTER_NODES") {
2470 apply_redis_cluster_nodes(self, &nodes);
2471 }
2472 self.queue.redis_cluster.concurrency = parse_env::<u32>(
2473 "REDIS_CLUSTER_QUEUE_CONCURRENCY",
2474 self.queue.redis_cluster.concurrency,
2475 );
2476 if let Ok(prefix) = std::env::var("REDIS_CLUSTER_QUEUE_PREFIX") {
2477 self.queue.redis_cluster.prefix = Some(prefix);
2478 }
2479
2480 self.ssl.enabled = parse_bool_env("SSL_ENABLED", self.ssl.enabled);
2482 if let Ok(val) = std::env::var("SSL_CERT_PATH") {
2483 self.ssl.cert_path = val;
2484 }
2485 if let Ok(val) = std::env::var("SSL_KEY_PATH") {
2486 self.ssl.key_path = val;
2487 }
2488 self.ssl.redirect_http = parse_bool_env("SSL_REDIRECT_HTTP", self.ssl.redirect_http);
2489 if let Some(port) = parse_env_optional::<u16>("SSL_HTTP_PORT") {
2490 self.ssl.http_port = Some(port);
2491 }
2492
2493 self.unix_socket.enabled = parse_bool_env("UNIX_SOCKET_ENABLED", self.unix_socket.enabled);
2495 if let Ok(path) = std::env::var("UNIX_SOCKET_PATH") {
2496 self.unix_socket.path = path;
2497 }
2498 if let Ok(mode_str) = std::env::var("UNIX_SOCKET_PERMISSION_MODE") {
2499 if mode_str.chars().all(|c| c.is_digit(8)) {
2500 if let Ok(mode) = u32::from_str_radix(&mode_str, 8) {
2501 if mode <= 0o777 {
2502 self.unix_socket.permission_mode = mode;
2503 } else {
2504 warn!(
2505 "UNIX_SOCKET_PERMISSION_MODE '{}' exceeds maximum value 777. Using default: {:o}",
2506 mode_str, self.unix_socket.permission_mode
2507 );
2508 }
2509 } else {
2510 warn!(
2511 "Failed to parse UNIX_SOCKET_PERMISSION_MODE '{}' as octal. Using default: {:o}",
2512 mode_str, self.unix_socket.permission_mode
2513 );
2514 }
2515 } else {
2516 warn!(
2517 "UNIX_SOCKET_PERMISSION_MODE '{}' must contain only octal digits (0-7). Using default: {:o}",
2518 mode_str, self.unix_socket.permission_mode
2519 );
2520 }
2521 }
2522
2523 if let Ok(driver_str) = std::env::var("METRICS_DRIVER") {
2525 self.metrics.driver =
2526 parse_driver_enum(driver_str, self.metrics.driver.clone(), "Metrics");
2527 }
2528 self.metrics.enabled = parse_bool_env("METRICS_ENABLED", self.metrics.enabled);
2529 if let Ok(val) = std::env::var("METRICS_HOST") {
2530 self.metrics.host = val;
2531 }
2532 self.metrics.port = parse_env::<u16>("METRICS_PORT", self.metrics.port);
2533 if let Ok(val) = std::env::var("METRICS_PROMETHEUS_PREFIX") {
2534 self.metrics.prometheus.prefix = val;
2535 }
2536
2537 self.http_api.usage_enabled =
2539 parse_bool_env("HTTP_API_USAGE_ENABLED", self.http_api.usage_enabled);
2540
2541 self.rate_limiter.enabled =
2543 parse_bool_env("RATE_LIMITER_ENABLED", self.rate_limiter.enabled);
2544 self.rate_limiter.api_rate_limit.max_requests = parse_env::<u32>(
2545 "RATE_LIMITER_API_MAX_REQUESTS",
2546 self.rate_limiter.api_rate_limit.max_requests,
2547 );
2548 self.rate_limiter.api_rate_limit.window_seconds = parse_env::<u64>(
2549 "RATE_LIMITER_API_WINDOW_SECONDS",
2550 self.rate_limiter.api_rate_limit.window_seconds,
2551 );
2552 if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_API_TRUST_HOPS") {
2553 self.rate_limiter.api_rate_limit.trust_hops = Some(hops);
2554 }
2555 self.rate_limiter.websocket_rate_limit.max_requests = parse_env::<u32>(
2556 "RATE_LIMITER_WS_MAX_REQUESTS",
2557 self.rate_limiter.websocket_rate_limit.max_requests,
2558 );
2559 self.rate_limiter.websocket_rate_limit.window_seconds = parse_env::<u64>(
2560 "RATE_LIMITER_WS_WINDOW_SECONDS",
2561 self.rate_limiter.websocket_rate_limit.window_seconds,
2562 );
2563 if let Ok(prefix) = std::env::var("RATE_LIMITER_REDIS_PREFIX") {
2564 self.rate_limiter.redis.prefix = Some(prefix);
2565 }
2566
2567 self.queue.redis.concurrency =
2569 parse_env::<u32>("QUEUE_REDIS_CONCURRENCY", self.queue.redis.concurrency);
2570 if let Ok(prefix) = std::env::var("QUEUE_REDIS_PREFIX") {
2571 self.queue.redis.prefix = Some(prefix);
2572 }
2573
2574 if let Ok(region) = std::env::var("QUEUE_SQS_REGION") {
2576 self.queue.sqs.region = region;
2577 }
2578 self.queue.sqs.visibility_timeout = parse_env::<i32>(
2579 "QUEUE_SQS_VISIBILITY_TIMEOUT",
2580 self.queue.sqs.visibility_timeout,
2581 );
2582 self.queue.sqs.max_messages =
2583 parse_env::<i32>("QUEUE_SQS_MAX_MESSAGES", self.queue.sqs.max_messages);
2584 self.queue.sqs.wait_time_seconds = parse_env::<i32>(
2585 "QUEUE_SQS_WAIT_TIME_SECONDS",
2586 self.queue.sqs.wait_time_seconds,
2587 );
2588 self.queue.sqs.concurrency =
2589 parse_env::<u32>("QUEUE_SQS_CONCURRENCY", self.queue.sqs.concurrency);
2590 self.queue.sqs.fifo = parse_bool_env("QUEUE_SQS_FIFO", self.queue.sqs.fifo);
2591 if let Ok(endpoint) = std::env::var("QUEUE_SQS_ENDPOINT_URL") {
2592 self.queue.sqs.endpoint_url = Some(endpoint);
2593 }
2594
2595 if let Ok(region) = std::env::var("QUEUE_SNS_REGION") {
2597 self.queue.sns.region = region;
2598 }
2599 if let Ok(topic_arn) = std::env::var("QUEUE_SNS_TOPIC_ARN") {
2600 self.queue.sns.topic_arn = topic_arn;
2601 }
2602 if let Ok(endpoint) = std::env::var("QUEUE_SNS_ENDPOINT_URL") {
2603 self.queue.sns.endpoint_url = Some(endpoint);
2604 }
2605
2606 self.webhooks.batching.enabled =
2608 parse_bool_env("WEBHOOK_BATCHING_ENABLED", self.webhooks.batching.enabled);
2609 self.webhooks.batching.duration =
2610 parse_env::<u64>("WEBHOOK_BATCHING_DURATION", self.webhooks.batching.duration);
2611 self.webhooks.batching.size =
2612 parse_env::<usize>("WEBHOOK_BATCHING_SIZE", self.webhooks.batching.size);
2613
2614 if let Ok(servers) = std::env::var("NATS_SERVERS") {
2616 self.adapter.nats.servers = servers.split(',').map(|s| s.trim().to_string()).collect();
2617 }
2618 if let Ok(user) = std::env::var("NATS_USERNAME") {
2619 self.adapter.nats.username = Some(user);
2620 }
2621 if let Ok(pass) = std::env::var("NATS_PASSWORD") {
2622 self.adapter.nats.password = Some(pass);
2623 }
2624 if let Ok(token) = std::env::var("NATS_TOKEN") {
2625 self.adapter.nats.token = Some(token);
2626 }
2627 if let Ok(prefix) = std::env::var("NATS_PREFIX") {
2628 self.adapter.nats.prefix = prefix;
2629 }
2630 self.adapter.nats.connection_timeout_ms = parse_env::<u64>(
2631 "NATS_CONNECTION_TIMEOUT_MS",
2632 self.adapter.nats.connection_timeout_ms,
2633 );
2634 self.adapter.nats.request_timeout_ms = parse_env::<u64>(
2635 "NATS_REQUEST_TIMEOUT_MS",
2636 self.adapter.nats.request_timeout_ms,
2637 );
2638 self.adapter.nats.discovery_max_wait_ms = parse_env::<u64>(
2639 "NATS_DISCOVERY_MAX_WAIT_MS",
2640 self.adapter.nats.discovery_max_wait_ms,
2641 );
2642 self.adapter.nats.discovery_idle_wait_ms = parse_env::<u64>(
2643 "NATS_DISCOVERY_IDLE_WAIT_MS",
2644 self.adapter.nats.discovery_idle_wait_ms,
2645 );
2646 if let Some(nodes) = parse_env_optional::<u32>("NATS_NODES_NUMBER") {
2647 self.adapter.nats.nodes_number = Some(nodes);
2648 }
2649
2650 if let Ok(url) = std::env::var("PULSAR_URL") {
2652 self.adapter.pulsar.url = url;
2653 }
2654 if let Ok(prefix) = std::env::var("PULSAR_PREFIX") {
2655 self.adapter.pulsar.prefix = prefix;
2656 }
2657 if let Ok(token) = std::env::var("PULSAR_TOKEN") {
2658 self.adapter.pulsar.token = Some(token);
2659 }
2660 self.adapter.pulsar.request_timeout_ms = parse_env::<u64>(
2661 "PULSAR_REQUEST_TIMEOUT_MS",
2662 self.adapter.pulsar.request_timeout_ms,
2663 );
2664 if let Some(nodes) = parse_env_optional::<u32>("PULSAR_NODES_NUMBER") {
2665 self.adapter.pulsar.nodes_number = Some(nodes);
2666 }
2667
2668 if let Ok(url) = std::env::var("RABBITMQ_URL") {
2670 self.adapter.rabbitmq.url = url;
2671 }
2672 if let Ok(prefix) = std::env::var("RABBITMQ_PREFIX") {
2673 self.adapter.rabbitmq.prefix = prefix;
2674 }
2675 self.adapter.rabbitmq.connection_timeout_ms = parse_env::<u64>(
2676 "RABBITMQ_CONNECTION_TIMEOUT_MS",
2677 self.adapter.rabbitmq.connection_timeout_ms,
2678 );
2679 self.adapter.rabbitmq.request_timeout_ms = parse_env::<u64>(
2680 "RABBITMQ_REQUEST_TIMEOUT_MS",
2681 self.adapter.rabbitmq.request_timeout_ms,
2682 );
2683 if let Some(nodes) = parse_env_optional::<u32>("RABBITMQ_NODES_NUMBER") {
2684 self.adapter.rabbitmq.nodes_number = Some(nodes);
2685 }
2686
2687 if let Ok(project_id) = std::env::var("GOOGLE_PUBSUB_PROJECT_ID") {
2689 self.adapter.google_pubsub.project_id = project_id;
2690 }
2691 if let Ok(prefix) = std::env::var("GOOGLE_PUBSUB_PREFIX") {
2692 self.adapter.google_pubsub.prefix = prefix;
2693 }
2694 if let Ok(emulator_host) = std::env::var("PUBSUB_EMULATOR_HOST") {
2695 self.adapter.google_pubsub.emulator_host = Some(emulator_host);
2696 }
2697 self.adapter.google_pubsub.request_timeout_ms = parse_env::<u64>(
2698 "GOOGLE_PUBSUB_REQUEST_TIMEOUT_MS",
2699 self.adapter.google_pubsub.request_timeout_ms,
2700 );
2701 if let Some(nodes) = parse_env_optional::<u32>("GOOGLE_PUBSUB_NODES_NUMBER") {
2702 self.adapter.google_pubsub.nodes_number = Some(nodes);
2703 }
2704
2705 if let Ok(brokers) = std::env::var("KAFKA_BROKERS") {
2707 self.adapter.kafka.brokers = brokers.split(',').map(|s| s.trim().to_string()).collect();
2708 }
2709 if let Ok(prefix) = std::env::var("KAFKA_PREFIX") {
2710 self.adapter.kafka.prefix = prefix;
2711 }
2712 if let Ok(protocol) = std::env::var("KAFKA_SECURITY_PROTOCOL") {
2713 self.adapter.kafka.security_protocol = Some(protocol);
2714 }
2715 if let Ok(mechanism) = std::env::var("KAFKA_SASL_MECHANISM") {
2716 self.adapter.kafka.sasl_mechanism = Some(mechanism);
2717 }
2718 if let Ok(username) = std::env::var("KAFKA_SASL_USERNAME") {
2719 self.adapter.kafka.sasl_username = Some(username);
2720 }
2721 if let Ok(password) = std::env::var("KAFKA_SASL_PASSWORD") {
2722 self.adapter.kafka.sasl_password = Some(password);
2723 }
2724 self.adapter.kafka.request_timeout_ms = parse_env::<u64>(
2725 "KAFKA_REQUEST_TIMEOUT_MS",
2726 self.adapter.kafka.request_timeout_ms,
2727 );
2728 if let Some(nodes) = parse_env_optional::<u32>("KAFKA_NODES_NUMBER") {
2729 self.adapter.kafka.nodes_number = Some(nodes);
2730 }
2731
2732 if let Ok(origins) = std::env::var("CORS_ORIGINS") {
2734 let parsed: Vec<String> = origins.split(',').map(|s| s.trim().to_string()).collect();
2735 if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&parsed) {
2736 warn!(
2737 "CORS_ORIGINS contains invalid patterns: {}. Keeping previous CORS origins.",
2738 e
2739 );
2740 } else {
2741 self.cors.origin = parsed;
2742 }
2743 }
2744 if let Ok(methods) = std::env::var("CORS_METHODS") {
2745 self.cors.methods = methods.split(',').map(|s| s.trim().to_string()).collect();
2746 }
2747 if let Ok(headers) = std::env::var("CORS_HEADERS") {
2748 self.cors.allowed_headers = headers.split(',').map(|s| s.trim().to_string()).collect();
2749 }
2750 self.cors.credentials = parse_bool_env("CORS_CREDENTIALS", self.cors.credentials);
2751
2752 self.database_pooling.enabled =
2754 parse_bool_env("DATABASE_POOLING_ENABLED", self.database_pooling.enabled);
2755 if let Some(min) = parse_env_optional::<u32>("DATABASE_POOL_MIN") {
2756 self.database_pooling.min = min;
2757 }
2758 if let Some(max) = parse_env_optional::<u32>("DATABASE_POOL_MAX") {
2759 self.database_pooling.max = max;
2760 }
2761
2762 if let Some(pool_size) = parse_env_optional::<u32>("DATABASE_CONNECTION_POOL_SIZE") {
2763 self.database.mysql.connection_pool_size = pool_size;
2764 self.database.postgres.connection_pool_size = pool_size;
2765 }
2766 if let Some(cache_ttl) = parse_env_optional::<u64>("CACHE_TTL_SECONDS") {
2767 self.app_manager.cache.ttl = cache_ttl;
2768 self.channel_limits.cache_ttl = cache_ttl;
2769 self.database.mysql.cache_ttl = cache_ttl;
2770 self.database.postgres.cache_ttl = cache_ttl;
2771 self.database.surrealdb.cache_ttl = cache_ttl;
2772 self.cache.memory.ttl = cache_ttl;
2773 }
2774 if let Some(cleanup_interval) = parse_env_optional::<u64>("CACHE_CLEANUP_INTERVAL") {
2775 self.database.mysql.cache_cleanup_interval = cleanup_interval;
2776 self.database.postgres.cache_cleanup_interval = cleanup_interval;
2777 self.cache.memory.cleanup_interval = cleanup_interval;
2778 }
2779 if let Some(max_capacity) = parse_env_optional::<u64>("CACHE_MAX_CAPACITY") {
2780 self.database.mysql.cache_max_capacity = max_capacity;
2781 self.database.postgres.cache_max_capacity = max_capacity;
2782 self.database.surrealdb.cache_max_capacity = max_capacity;
2783 self.cache.memory.max_capacity = max_capacity;
2784 }
2785
2786 let default_app_id = std::env::var("SOCKUDO_DEFAULT_APP_ID");
2787 let default_app_key = std::env::var("SOCKUDO_DEFAULT_APP_KEY");
2788 let default_app_secret = std::env::var("SOCKUDO_DEFAULT_APP_SECRET");
2789 let default_app_enabled = parse_bool_env("SOCKUDO_DEFAULT_APP_ENABLED", true);
2790
2791 if let (Ok(app_id), Ok(app_key), Ok(app_secret)) =
2792 (default_app_id, default_app_key, default_app_secret)
2793 && default_app_enabled
2794 {
2795 let default_app = App::from_policy(
2796 app_id,
2797 app_key,
2798 app_secret,
2799 default_app_enabled,
2800 crate::app::AppPolicy {
2801 limits: crate::app::AppLimitsPolicy {
2802 max_connections: parse_env::<u32>(
2803 "SOCKUDO_DEFAULT_APP_MAX_CONNECTIONS",
2804 100,
2805 ),
2806 max_backend_events_per_second: Some(parse_env::<u32>(
2807 "SOCKUDO_DEFAULT_APP_MAX_BACKEND_EVENTS_PER_SECOND",
2808 100,
2809 )),
2810 max_client_events_per_second: parse_env::<u32>(
2811 "SOCKUDO_DEFAULT_APP_MAX_CLIENT_EVENTS_PER_SECOND",
2812 100,
2813 ),
2814 max_read_requests_per_second: Some(parse_env::<u32>(
2815 "SOCKUDO_DEFAULT_APP_MAX_READ_REQUESTS_PER_SECOND",
2816 100,
2817 )),
2818 max_presence_members_per_channel: Some(parse_env::<u32>(
2819 "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBERS_PER_CHANNEL",
2820 100,
2821 )),
2822 max_presence_member_size_in_kb: Some(parse_env::<u32>(
2823 "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBER_SIZE_IN_KB",
2824 100,
2825 )),
2826 max_channel_name_length: Some(parse_env::<u32>(
2827 "SOCKUDO_DEFAULT_APP_MAX_CHANNEL_NAME_LENGTH",
2828 100,
2829 )),
2830 max_event_channels_at_once: Some(parse_env::<u32>(
2831 "SOCKUDO_DEFAULT_APP_MAX_EVENT_CHANNELS_AT_ONCE",
2832 100,
2833 )),
2834 max_event_name_length: Some(parse_env::<u32>(
2835 "SOCKUDO_DEFAULT_APP_MAX_EVENT_NAME_LENGTH",
2836 100,
2837 )),
2838 max_event_payload_in_kb: Some(parse_env::<u32>(
2839 "SOCKUDO_DEFAULT_APP_MAX_EVENT_PAYLOAD_IN_KB",
2840 100,
2841 )),
2842 max_event_batch_size: Some(parse_env::<u32>(
2843 "SOCKUDO_DEFAULT_APP_MAX_EVENT_BATCH_SIZE",
2844 100,
2845 )),
2846 decay_seconds: None,
2847 terminate_on_limit: false,
2848 message_rate_limit: None,
2849 },
2850 features: crate::app::AppFeaturesPolicy {
2851 enable_client_messages: std::env::var(
2852 "SOCKUDO_DEFAULT_APP_ENABLE_CLIENT_MESSAGES",
2853 )
2854 .ok()
2855 .map(|value| {
2856 matches!(
2857 value.trim().to_ascii_lowercase().as_str(),
2858 "true" | "1" | "yes" | "on"
2859 )
2860 })
2861 .unwrap_or_else(|| parse_bool_env("SOCKUDO_ENABLE_CLIENT_MESSAGES", false)),
2862 enable_user_authentication: Some(parse_bool_env(
2863 "SOCKUDO_DEFAULT_APP_ENABLE_USER_AUTHENTICATION",
2864 false,
2865 )),
2866 enable_watchlist_events: Some(parse_bool_env(
2867 "SOCKUDO_DEFAULT_APP_ENABLE_WATCHLIST_EVENTS",
2868 false,
2869 )),
2870 },
2871 channels: crate::app::AppChannelsPolicy {
2872 allowed_origins: {
2873 if let Ok(origins_str) =
2874 std::env::var("SOCKUDO_DEFAULT_APP_ALLOWED_ORIGINS")
2875 {
2876 if !origins_str.is_empty() {
2877 Some(
2878 origins_str
2879 .split(',')
2880 .map(|s| s.trim().to_string())
2881 .collect(),
2882 )
2883 } else {
2884 None
2885 }
2886 } else {
2887 None
2888 }
2889 },
2890 channel_delta_compression: None,
2891 channel_namespaces: None,
2892 },
2893 webhooks: None,
2894 idempotency: None,
2895 connection_recovery: None,
2896 history: None,
2897 presence_history: None,
2898 },
2899 );
2900
2901 self.app_manager.array.apps.push(default_app);
2902 info!("Successfully registered default app from env");
2903 }
2904
2905 if let Ok(redis_url_env) = std::env::var("REDIS_URL") {
2907 info!("Applying REDIS_URL environment variable override");
2908
2909 let redis_url_json = sonic_rs::json!(redis_url_env);
2910
2911 self.adapter
2912 .redis
2913 .redis_pub_options
2914 .insert("url".to_string(), redis_url_json.clone());
2915 self.adapter
2916 .redis
2917 .redis_sub_options
2918 .insert("url".to_string(), redis_url_json);
2919
2920 self.cache.redis.url_override = Some(redis_url_env.clone());
2921 self.queue.redis.url_override = Some(redis_url_env.clone());
2922 self.rate_limiter.redis.url_override = Some(redis_url_env);
2923 }
2924
2925 let has_colors_env = std::env::var("LOG_COLORS_ENABLED").is_ok();
2927 let has_target_env = std::env::var("LOG_INCLUDE_TARGET").is_ok();
2928 if has_colors_env || has_target_env {
2929 let logging_config = self.logging.get_or_insert_with(Default::default);
2930 if has_colors_env {
2931 logging_config.colors_enabled =
2932 parse_bool_env("LOG_COLORS_ENABLED", logging_config.colors_enabled);
2933 }
2934 if has_target_env {
2935 logging_config.include_target =
2936 parse_bool_env("LOG_INCLUDE_TARGET", logging_config.include_target);
2937 }
2938 }
2939
2940 self.cleanup.async_enabled =
2942 parse_bool_env("CLEANUP_ASYNC_ENABLED", self.cleanup.async_enabled);
2943 self.cleanup.fallback_to_sync =
2944 parse_bool_env("CLEANUP_FALLBACK_TO_SYNC", self.cleanup.fallback_to_sync);
2945 self.cleanup.queue_buffer_size =
2946 parse_env::<usize>("CLEANUP_QUEUE_BUFFER_SIZE", self.cleanup.queue_buffer_size);
2947 self.cleanup.batch_size = parse_env::<usize>("CLEANUP_BATCH_SIZE", self.cleanup.batch_size);
2948 self.cleanup.batch_timeout_ms =
2949 parse_env::<u64>("CLEANUP_BATCH_TIMEOUT_MS", self.cleanup.batch_timeout_ms);
2950 if let Ok(worker_threads_str) = std::env::var("CLEANUP_WORKER_THREADS") {
2951 self.cleanup.worker_threads = if worker_threads_str.to_lowercase() == "auto" {
2952 WorkerThreadsConfig::Auto
2953 } else if let Ok(n) = worker_threads_str.parse::<usize>() {
2954 WorkerThreadsConfig::Fixed(n)
2955 } else {
2956 warn!(
2957 "Invalid CLEANUP_WORKER_THREADS value '{}', keeping current setting",
2958 worker_threads_str
2959 );
2960 self.cleanup.worker_threads.clone()
2961 };
2962 }
2963 self.cleanup.max_retry_attempts = parse_env::<u32>(
2964 "CLEANUP_MAX_RETRY_ATTEMPTS",
2965 self.cleanup.max_retry_attempts,
2966 );
2967
2968 self.cluster_health.enabled =
2970 parse_bool_env("CLUSTER_HEALTH_ENABLED", self.cluster_health.enabled);
2971 self.cluster_health.heartbeat_interval_ms = parse_env::<u64>(
2972 "CLUSTER_HEALTH_HEARTBEAT_INTERVAL",
2973 self.cluster_health.heartbeat_interval_ms,
2974 );
2975 self.cluster_health.node_timeout_ms = parse_env::<u64>(
2976 "CLUSTER_HEALTH_NODE_TIMEOUT",
2977 self.cluster_health.node_timeout_ms,
2978 );
2979 self.cluster_health.cleanup_interval_ms = parse_env::<u64>(
2980 "CLUSTER_HEALTH_CLEANUP_INTERVAL",
2981 self.cluster_health.cleanup_interval_ms,
2982 );
2983
2984 self.tag_filtering.enabled =
2986 parse_bool_env("TAG_FILTERING_ENABLED", self.tag_filtering.enabled);
2987
2988 if let Ok(val) = std::env::var("WEBSOCKET_MAX_MESSAGES") {
2990 if val.to_lowercase() == "none" || val == "0" {
2991 self.websocket.max_messages = None;
2992 } else if let Ok(n) = val.parse::<usize>() {
2993 self.websocket.max_messages = Some(n);
2994 }
2995 }
2996 if let Ok(val) = std::env::var("WEBSOCKET_MAX_BYTES") {
2997 if val.to_lowercase() == "none" || val == "0" {
2998 self.websocket.max_bytes = None;
2999 } else if let Ok(n) = val.parse::<usize>() {
3000 self.websocket.max_bytes = Some(n);
3001 }
3002 }
3003 self.websocket.disconnect_on_buffer_full = parse_bool_env(
3004 "WEBSOCKET_DISCONNECT_ON_BUFFER_FULL",
3005 self.websocket.disconnect_on_buffer_full,
3006 );
3007 self.websocket.max_message_size = parse_env::<usize>(
3008 "WEBSOCKET_MAX_MESSAGE_SIZE",
3009 self.websocket.max_message_size,
3010 );
3011 self.websocket.max_frame_size =
3012 parse_env::<usize>("WEBSOCKET_MAX_FRAME_SIZE", self.websocket.max_frame_size);
3013 self.websocket.write_buffer_size = parse_env::<usize>(
3014 "WEBSOCKET_WRITE_BUFFER_SIZE",
3015 self.websocket.write_buffer_size,
3016 );
3017 self.websocket.max_backpressure = parse_env::<usize>(
3018 "WEBSOCKET_MAX_BACKPRESSURE",
3019 self.websocket.max_backpressure,
3020 );
3021 self.websocket.auto_ping = parse_bool_env("WEBSOCKET_AUTO_PING", self.websocket.auto_ping);
3022 self.websocket.ping_interval =
3023 parse_env::<u32>("WEBSOCKET_PING_INTERVAL", self.websocket.ping_interval);
3024 self.websocket.idle_timeout =
3025 parse_env::<u32>("WEBSOCKET_IDLE_TIMEOUT", self.websocket.idle_timeout);
3026 if let Ok(mode) = std::env::var("WEBSOCKET_COMPRESSION") {
3027 self.websocket.compression = mode;
3028 }
3029
3030 self.connection_recovery.enabled = parse_bool_env(
3032 "CONNECTION_RECOVERY_ENABLED",
3033 self.connection_recovery.enabled,
3034 );
3035 self.connection_recovery.buffer_ttl_seconds = parse_env::<u64>(
3036 "CONNECTION_RECOVERY_BUFFER_TTL",
3037 self.connection_recovery.buffer_ttl_seconds,
3038 );
3039 self.connection_recovery.max_buffer_size = parse_env::<usize>(
3040 "CONNECTION_RECOVERY_MAX_BUFFER_SIZE",
3041 self.connection_recovery.max_buffer_size,
3042 );
3043
3044 self.history.enabled = parse_bool_env("HISTORY_ENABLED", self.history.enabled);
3045 self.history.rewind_enabled =
3046 parse_bool_env("HISTORY_REWIND_ENABLED", self.history.rewind_enabled);
3047 self.history.retention_window_seconds = parse_env::<u64>(
3048 "HISTORY_RETENTION_WINDOW_SECONDS",
3049 self.history.retention_window_seconds,
3050 );
3051 self.history.max_page_size =
3052 parse_env::<usize>("HISTORY_MAX_PAGE_SIZE", self.history.max_page_size);
3053 self.history.writer_shards =
3054 parse_env::<usize>("HISTORY_WRITER_SHARDS", self.history.writer_shards);
3055 self.history.writer_queue_capacity = parse_env::<usize>(
3056 "HISTORY_WRITER_QUEUE_CAPACITY",
3057 self.history.writer_queue_capacity,
3058 );
3059 if let Ok(backend) = std::env::var("HISTORY_BACKEND") {
3060 self.history.backend = HistoryBackend::from_str(&backend)?;
3061 }
3062 if let Ok(max_messages) = std::env::var("HISTORY_MAX_MESSAGES_PER_CHANNEL") {
3063 self.history.max_messages_per_channel = Some(
3064 max_messages
3065 .parse::<usize>()
3066 .map_err(|e| format!("Invalid HISTORY_MAX_MESSAGES_PER_CHANNEL: {e}"))?,
3067 );
3068 }
3069 if let Ok(max_bytes) = std::env::var("HISTORY_MAX_BYTES_PER_CHANNEL") {
3070 self.history.max_bytes_per_channel = Some(
3071 max_bytes
3072 .parse::<u64>()
3073 .map_err(|e| format!("Invalid HISTORY_MAX_BYTES_PER_CHANNEL: {e}"))?,
3074 );
3075 }
3076 if let Ok(table_prefix) = std::env::var("HISTORY_POSTGRES_TABLE_PREFIX") {
3077 self.history.postgres.table_prefix = table_prefix;
3078 }
3079 self.history.postgres.write_timeout_ms = parse_env::<u64>(
3080 "HISTORY_POSTGRES_WRITE_TIMEOUT_MS",
3081 self.history.postgres.write_timeout_ms,
3082 );
3083
3084 self.presence_history.enabled =
3085 parse_bool_env("PRESENCE_HISTORY_ENABLED", self.presence_history.enabled);
3086 self.presence_history.retention_window_seconds = parse_env::<u64>(
3087 "PRESENCE_HISTORY_RETENTION_WINDOW_SECONDS",
3088 self.presence_history.retention_window_seconds,
3089 );
3090 self.presence_history.max_page_size = parse_env::<usize>(
3091 "PRESENCE_HISTORY_MAX_PAGE_SIZE",
3092 self.presence_history.max_page_size,
3093 );
3094 if let Ok(max_events) = std::env::var("PRESENCE_HISTORY_MAX_EVENTS_PER_CHANNEL") {
3095 self.presence_history.max_events_per_channel =
3096 Some(max_events.parse::<usize>().map_err(|e| {
3097 format!("Invalid PRESENCE_HISTORY_MAX_EVENTS_PER_CHANNEL: {e}")
3098 })?);
3099 }
3100 if let Ok(max_bytes) = std::env::var("PRESENCE_HISTORY_MAX_BYTES_PER_CHANNEL") {
3101 self.presence_history.max_bytes_per_channel = Some(
3102 max_bytes
3103 .parse::<u64>()
3104 .map_err(|e| format!("Invalid PRESENCE_HISTORY_MAX_BYTES_PER_CHANNEL: {e}"))?,
3105 );
3106 }
3107 self.idempotency.enabled = parse_bool_env("IDEMPOTENCY_ENABLED", self.idempotency.enabled);
3108 self.idempotency.ttl_seconds =
3109 parse_env::<u64>("IDEMPOTENCY_TTL_SECONDS", self.idempotency.ttl_seconds);
3110 self.idempotency.max_key_length = parse_env::<usize>(
3111 "IDEMPOTENCY_MAX_KEY_LENGTH",
3112 self.idempotency.max_key_length,
3113 );
3114
3115 self.ephemeral.enabled = parse_bool_env("EPHEMERAL_ENABLED", self.ephemeral.enabled);
3116
3117 self.echo_control.enabled =
3118 parse_bool_env("ECHO_CONTROL_ENABLED", self.echo_control.enabled);
3119 self.echo_control.default_echo_messages = parse_bool_env(
3120 "ECHO_CONTROL_DEFAULT_ECHO_MESSAGES",
3121 self.echo_control.default_echo_messages,
3122 );
3123
3124 self.event_name_filtering.enabled = parse_bool_env(
3125 "EVENT_NAME_FILTERING_ENABLED",
3126 self.event_name_filtering.enabled,
3127 );
3128 self.event_name_filtering.max_events_per_filter = parse_env::<usize>(
3129 "EVENT_NAME_FILTERING_MAX_EVENTS_PER_FILTER",
3130 self.event_name_filtering.max_events_per_filter,
3131 );
3132 self.event_name_filtering.max_event_name_length = parse_env::<usize>(
3133 "EVENT_NAME_FILTERING_MAX_EVENT_NAME_LENGTH",
3134 self.event_name_filtering.max_event_name_length,
3135 );
3136
3137 Ok(())
3138 }
3139
3140 pub fn validate(&self) -> Result<(), String> {
3141 if self.unix_socket.enabled {
3142 if self.unix_socket.path.is_empty() {
3143 return Err(
3144 "Unix socket path cannot be empty when Unix socket is enabled".to_string(),
3145 );
3146 }
3147
3148 self.validate_unix_socket_security()?;
3149
3150 if self.ssl.enabled {
3151 tracing::warn!(
3152 "Both Unix socket and SSL are enabled. This is unusual as Unix sockets are typically used behind reverse proxies that handle SSL termination."
3153 );
3154 }
3155
3156 if self.unix_socket.permission_mode > 0o777 {
3157 return Err(format!(
3158 "Unix socket permission_mode ({:o}) is invalid. Must be a valid octal mode (0o000 to 0o777)",
3159 self.unix_socket.permission_mode
3160 ));
3161 }
3162 }
3163
3164 if let Err(e) = self.cleanup.validate() {
3165 return Err(format!("Invalid cleanup configuration: {}", e));
3166 }
3167
3168 if self.history.enabled {
3169 if self.history.max_page_size == 0 {
3170 return Err("history.max_page_size must be greater than 0".to_string());
3171 }
3172 if self.history.writer_shards == 0 {
3173 return Err("history.writer_shards must be greater than 0".to_string());
3174 }
3175 if self.history.writer_queue_capacity == 0 {
3176 return Err("history.writer_queue_capacity must be greater than 0".to_string());
3177 }
3178 if self.history.retention_window_seconds == 0 {
3179 return Err("history.retention_window_seconds must be greater than 0".to_string());
3180 }
3181 if self.history.postgres.table_prefix.trim().is_empty() {
3182 return Err("history.postgres.table_prefix must not be empty".to_string());
3183 }
3184 }
3185
3186 if self.presence_history.enabled {
3187 if self.presence_history.max_page_size == 0 {
3188 return Err("presence_history.max_page_size must be greater than 0".to_string());
3189 }
3190 if self.presence_history.retention_window_seconds == 0 {
3191 return Err(
3192 "presence_history.retention_window_seconds must be greater than 0".to_string(),
3193 );
3194 }
3195 }
3196
3197 Ok(())
3198 }
3199
3200 fn validate_unix_socket_security(&self) -> Result<(), String> {
3201 let path = &self.unix_socket.path;
3202
3203 if path.contains("../") || path.contains("..\\") {
3204 return Err(
3205 "Unix socket path contains directory traversal sequences (../). This is not allowed for security reasons.".to_string()
3206 );
3207 }
3208
3209 if self.unix_socket.permission_mode & 0o002 != 0 {
3210 warn!(
3211 "Unix socket permission mode ({:o}) allows world write access. This may be a security risk. Consider using more restrictive permissions like 0o660 or 0o750.",
3212 self.unix_socket.permission_mode
3213 );
3214 }
3215
3216 if self.unix_socket.permission_mode & 0o007 > 0o005 {
3217 warn!(
3218 "Unix socket permission mode ({:o}) grants write permissions to others. Consider using more restrictive permissions.",
3219 self.unix_socket.permission_mode
3220 );
3221 }
3222
3223 if self.mode == "production" && path.starts_with("/tmp/") {
3224 warn!(
3225 "Unix socket path '{}' is in /tmp directory. In production, consider using a more permanent location like /var/run/sockudo/ for better security and persistence.",
3226 path
3227 );
3228 }
3229
3230 if !path.starts_with('/') {
3231 return Err(
3232 "Unix socket path must be absolute (start with /) for security and reliability."
3233 .to_string(),
3234 );
3235 }
3236
3237 Ok(())
3238 }
3239}
3240
3241#[cfg(test)]
3242mod redis_connection_tests {
3243 use super::{ClusterNode, RedisClusterConnection, RedisConnection, RedisSentinel};
3244
3245 #[test]
3246 fn test_standard_url_basic() {
3247 let conn = RedisConnection {
3248 host: "127.0.0.1".to_string(),
3249 port: 6379,
3250 db: 0,
3251 username: None,
3252 password: None,
3253 key_prefix: "sockudo:".to_string(),
3254 sentinels: Vec::new(),
3255 sentinel_password: None,
3256 name: "mymaster".to_string(),
3257 cluster: RedisClusterConnection::default(),
3258 cluster_nodes: Vec::new(),
3259 };
3260 assert_eq!(conn.to_url(), "redis://127.0.0.1:6379/0");
3261 }
3262
3263 #[test]
3264 fn test_standard_url_with_password() {
3265 let conn = RedisConnection {
3266 host: "127.0.0.1".to_string(),
3267 port: 6379,
3268 db: 2,
3269 username: None,
3270 password: Some("secret".to_string()),
3271 key_prefix: "sockudo:".to_string(),
3272 sentinels: Vec::new(),
3273 sentinel_password: None,
3274 name: "mymaster".to_string(),
3275 cluster: RedisClusterConnection::default(),
3276 cluster_nodes: Vec::new(),
3277 };
3278 assert_eq!(conn.to_url(), "redis://:secret@127.0.0.1:6379/2");
3279 }
3280
3281 #[test]
3282 fn test_standard_url_with_username_and_password() {
3283 let conn = RedisConnection {
3284 host: "redis.example.com".to_string(),
3285 port: 6380,
3286 db: 1,
3287 username: Some("admin".to_string()),
3288 password: Some("pass123".to_string()),
3289 key_prefix: "sockudo:".to_string(),
3290 sentinels: Vec::new(),
3291 sentinel_password: None,
3292 name: "mymaster".to_string(),
3293 cluster: RedisClusterConnection::default(),
3294 cluster_nodes: Vec::new(),
3295 };
3296 assert_eq!(
3297 conn.to_url(),
3298 "redis://admin:pass123@redis.example.com:6380/1"
3299 );
3300 }
3301
3302 #[test]
3303 fn test_standard_url_with_special_chars_in_password() {
3304 let conn = RedisConnection {
3305 host: "127.0.0.1".to_string(),
3306 port: 6379,
3307 db: 0,
3308 username: None,
3309 password: Some("pass@word#123".to_string()),
3310 key_prefix: "sockudo:".to_string(),
3311 sentinels: Vec::new(),
3312 sentinel_password: None,
3313 name: "mymaster".to_string(),
3314 cluster: RedisClusterConnection::default(),
3315 cluster_nodes: Vec::new(),
3316 };
3317 assert_eq!(conn.to_url(), "redis://:pass%40word%23123@127.0.0.1:6379/0");
3318 }
3319
3320 #[test]
3321 fn test_is_sentinel_configured_false() {
3322 let conn = RedisConnection::default();
3323 assert!(!conn.is_sentinel_configured());
3324 }
3325
3326 #[test]
3327 fn test_is_sentinel_configured_true() {
3328 let conn = RedisConnection {
3329 sentinels: vec![RedisSentinel {
3330 host: "sentinel1".to_string(),
3331 port: 26379,
3332 }],
3333 ..Default::default()
3334 };
3335 assert!(conn.is_sentinel_configured());
3336 }
3337
3338 #[test]
3339 fn test_sentinel_url_basic() {
3340 let conn = RedisConnection {
3341 host: "127.0.0.1".to_string(),
3342 port: 6379,
3343 db: 0,
3344 username: None,
3345 password: None,
3346 key_prefix: "sockudo:".to_string(),
3347 sentinels: vec![
3348 RedisSentinel {
3349 host: "sentinel1".to_string(),
3350 port: 26379,
3351 },
3352 RedisSentinel {
3353 host: "sentinel2".to_string(),
3354 port: 26379,
3355 },
3356 ],
3357 sentinel_password: None,
3358 name: "mymaster".to_string(),
3359 cluster: RedisClusterConnection::default(),
3360 cluster_nodes: Vec::new(),
3361 };
3362 assert_eq!(
3363 conn.to_url(),
3364 "redis+sentinel://sentinel1:26379,sentinel2:26379/mymaster/0"
3365 );
3366 }
3367
3368 #[test]
3369 fn test_sentinel_url_with_sentinel_password() {
3370 let conn = RedisConnection {
3371 host: "127.0.0.1".to_string(),
3372 port: 6379,
3373 db: 0,
3374 username: None,
3375 password: None,
3376 key_prefix: "sockudo:".to_string(),
3377 sentinels: vec![RedisSentinel {
3378 host: "sentinel1".to_string(),
3379 port: 26379,
3380 }],
3381 sentinel_password: Some("sentinelpass".to_string()),
3382 name: "mymaster".to_string(),
3383 cluster: RedisClusterConnection::default(),
3384 cluster_nodes: Vec::new(),
3385 };
3386 assert_eq!(
3387 conn.to_url(),
3388 "redis+sentinel://:sentinelpass@sentinel1:26379/mymaster/0"
3389 );
3390 }
3391
3392 #[test]
3393 fn test_sentinel_url_with_master_password() {
3394 let conn = RedisConnection {
3395 host: "127.0.0.1".to_string(),
3396 port: 6379,
3397 db: 1,
3398 username: None,
3399 password: Some("masterpass".to_string()),
3400 key_prefix: "sockudo:".to_string(),
3401 sentinels: vec![RedisSentinel {
3402 host: "sentinel1".to_string(),
3403 port: 26379,
3404 }],
3405 sentinel_password: None,
3406 name: "mymaster".to_string(),
3407 cluster: RedisClusterConnection::default(),
3408 cluster_nodes: Vec::new(),
3409 };
3410 assert_eq!(
3411 conn.to_url(),
3412 "redis+sentinel://sentinel1:26379/mymaster/1?password=masterpass"
3413 );
3414 }
3415
3416 #[test]
3417 fn test_sentinel_url_with_all_auth() {
3418 let conn = RedisConnection {
3419 host: "127.0.0.1".to_string(),
3420 port: 6379,
3421 db: 2,
3422 username: Some("redisuser".to_string()),
3423 password: Some("redispass".to_string()),
3424 key_prefix: "sockudo:".to_string(),
3425 sentinels: vec![
3426 RedisSentinel {
3427 host: "sentinel1".to_string(),
3428 port: 26379,
3429 },
3430 RedisSentinel {
3431 host: "sentinel2".to_string(),
3432 port: 26380,
3433 },
3434 ],
3435 sentinel_password: Some("sentinelauth".to_string()),
3436 name: "production-master".to_string(),
3437 cluster: RedisClusterConnection::default(),
3438 cluster_nodes: Vec::new(),
3439 };
3440 assert_eq!(
3441 conn.to_url(),
3442 "redis+sentinel://:sentinelauth@sentinel1:26379,sentinel2:26380/production-master/2?password=redispass&username=redisuser"
3443 );
3444 }
3445
3446 #[test]
3447 fn test_sentinel_to_host_port() {
3448 let sentinel = RedisSentinel {
3449 host: "sentinel.example.com".to_string(),
3450 port: 26379,
3451 };
3452 assert_eq!(sentinel.to_host_port(), "sentinel.example.com:26379");
3453 }
3454
3455 #[test]
3456 fn test_cluster_node_urls_with_shared_cluster_auth_and_tls() {
3457 let conn = RedisConnection {
3458 cluster: RedisClusterConnection {
3459 nodes: vec![
3460 ClusterNode {
3461 host: "node1.secure-cluster.com".to_string(),
3462 port: 7000,
3463 },
3464 ClusterNode {
3465 host: "redis://node2.secure-cluster.com:7001".to_string(),
3466 port: 7001,
3467 },
3468 ClusterNode {
3469 host: "rediss://node3.secure-cluster.com".to_string(),
3470 port: 7002,
3471 },
3472 ],
3473 username: None,
3474 password: Some("cluster-secret".to_string()),
3475 use_tls: true,
3476 },
3477 ..Default::default()
3478 };
3479
3480 assert_eq!(
3481 conn.cluster_node_urls(),
3482 vec![
3483 "rediss://:cluster-secret@node1.secure-cluster.com:7000",
3484 "redis://:cluster-secret@node2.secure-cluster.com:7001",
3485 "rediss://:cluster-secret@node3.secure-cluster.com:7002",
3486 ]
3487 );
3488 }
3489
3490 #[test]
3491 fn test_cluster_node_urls_fallback_to_legacy_nodes() {
3492 let conn = RedisConnection {
3493 password: Some("fallback-secret".to_string()),
3494 cluster_nodes: vec![ClusterNode {
3495 host: "legacy-node.example.com".to_string(),
3496 port: 7000,
3497 }],
3498 ..Default::default()
3499 };
3500
3501 assert_eq!(
3502 conn.cluster_node_urls(),
3503 vec!["redis://:fallback-secret@legacy-node.example.com:7000"]
3504 );
3505 }
3506
3507 #[test]
3508 fn test_normalize_cluster_seed_urls() {
3509 let conn = RedisConnection {
3510 cluster: RedisClusterConnection {
3511 nodes: Vec::new(),
3512 username: Some("svc-user".to_string()),
3513 password: Some("svc-pass".to_string()),
3514 use_tls: true,
3515 },
3516 ..Default::default()
3517 };
3518
3519 let seeds = vec![
3520 "node1.example.com:7000".to_string(),
3521 "redis://node2.example.com:7001".to_string(),
3522 "rediss://node3.example.com".to_string(),
3523 ];
3524
3525 assert_eq!(
3526 conn.normalize_cluster_seed_urls(&seeds),
3527 vec![
3528 "rediss://svc-user:svc-pass@node1.example.com:7000",
3529 "redis://svc-user:svc-pass@node2.example.com:7001",
3530 "rediss://svc-user:svc-pass@node3.example.com:6379",
3531 ]
3532 );
3533 }
3534}
3535
3536#[cfg(test)]
3537mod cluster_node_tests {
3538 use super::ClusterNode;
3539
3540 #[test]
3541 fn test_to_url_basic_host() {
3542 let node = ClusterNode {
3543 host: "localhost".to_string(),
3544 port: 6379,
3545 };
3546 assert_eq!(node.to_url(), "redis://localhost:6379");
3547 }
3548
3549 #[test]
3550 fn test_to_url_ip_address() {
3551 let node = ClusterNode {
3552 host: "127.0.0.1".to_string(),
3553 port: 6379,
3554 };
3555 assert_eq!(node.to_url(), "redis://127.0.0.1:6379");
3556 }
3557
3558 #[test]
3559 fn test_to_url_with_redis_protocol() {
3560 let node = ClusterNode {
3561 host: "redis://example.com".to_string(),
3562 port: 6379,
3563 };
3564 assert_eq!(node.to_url(), "redis://example.com:6379");
3565 }
3566
3567 #[test]
3568 fn test_to_url_with_rediss_protocol() {
3569 let node = ClusterNode {
3570 host: "rediss://secure.example.com".to_string(),
3571 port: 6379,
3572 };
3573 assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
3574 }
3575
3576 #[test]
3577 fn test_to_url_with_rediss_protocol_and_port_in_url() {
3578 let node = ClusterNode {
3579 host: "rediss://secure.example.com:7000".to_string(),
3580 port: 6379,
3581 };
3582 assert_eq!(node.to_url(), "rediss://secure.example.com:7000");
3583 }
3584
3585 #[test]
3586 fn test_to_url_with_redis_protocol_and_port_in_url() {
3587 let node = ClusterNode {
3588 host: "redis://example.com:7001".to_string(),
3589 port: 6379,
3590 };
3591 assert_eq!(node.to_url(), "redis://example.com:7001");
3592 }
3593
3594 #[test]
3595 fn test_to_url_with_trailing_whitespace() {
3596 let node = ClusterNode {
3597 host: " rediss://secure.example.com ".to_string(),
3598 port: 6379,
3599 };
3600 assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
3601 }
3602
3603 #[test]
3604 fn test_to_url_custom_port() {
3605 let node = ClusterNode {
3606 host: "redis-cluster.example.com".to_string(),
3607 port: 7000,
3608 };
3609 assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7000");
3610 }
3611
3612 #[test]
3613 fn test_to_url_plain_host_with_port_in_host_field() {
3614 let node = ClusterNode {
3615 host: "redis-cluster.example.com:7010".to_string(),
3616 port: 7000,
3617 };
3618 assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7010");
3619 }
3620
3621 #[test]
3622 fn test_to_url_with_options_adds_auth_and_tls() {
3623 let node = ClusterNode {
3624 host: "node.example.com".to_string(),
3625 port: 7000,
3626 };
3627 assert_eq!(
3628 node.to_url_with_options(true, Some("svc-user"), Some("secret")),
3629 "rediss://svc-user:secret@node.example.com:7000"
3630 );
3631 }
3632
3633 #[test]
3634 fn test_to_url_with_options_keeps_embedded_auth() {
3635 let node = ClusterNode {
3636 host: "rediss://:node-secret@node.example.com:7000".to_string(),
3637 port: 7000,
3638 };
3639 assert_eq!(
3640 node.to_url_with_options(true, Some("global-user"), Some("global-secret")),
3641 "rediss://:node-secret@node.example.com:7000"
3642 );
3643 }
3644
3645 #[test]
3646 fn test_from_seed_parses_plain_host_port() {
3647 let node = ClusterNode::from_seed("cluster-node-1:7005").expect("node should parse");
3648 assert_eq!(node.host, "cluster-node-1");
3649 assert_eq!(node.port, 7005);
3650 }
3651
3652 #[test]
3653 fn test_from_seed_keeps_scheme_urls() {
3654 let node =
3655 ClusterNode::from_seed("rediss://secure.example.com:7005").expect("node should parse");
3656 assert_eq!(node.host, "rediss://secure.example.com:7005");
3657 assert_eq!(node.port, 7005);
3658 }
3659
3660 #[test]
3661 fn test_to_url_aws_elasticache_hostname() {
3662 let node = ClusterNode {
3663 host: "rediss://my-cluster.use1.cache.amazonaws.com".to_string(),
3664 port: 6379,
3665 };
3666 assert_eq!(
3667 node.to_url(),
3668 "rediss://my-cluster.use1.cache.amazonaws.com:6379"
3669 );
3670 }
3671
3672 #[test]
3673 fn test_to_url_with_ipv6_no_port() {
3674 let node = ClusterNode {
3675 host: "rediss://[::1]".to_string(),
3676 port: 6379,
3677 };
3678 assert_eq!(node.to_url(), "rediss://[::1]:6379");
3679 }
3680
3681 #[test]
3682 fn test_to_url_with_ipv6_and_port_in_url() {
3683 let node = ClusterNode {
3684 host: "rediss://[::1]:7000".to_string(),
3685 port: 6379,
3686 };
3687 assert_eq!(node.to_url(), "rediss://[::1]:7000");
3688 }
3689
3690 #[test]
3691 fn test_to_url_with_ipv6_full_address_no_port() {
3692 let node = ClusterNode {
3693 host: "rediss://[2001:db8::1]".to_string(),
3694 port: 6379,
3695 };
3696 assert_eq!(node.to_url(), "rediss://[2001:db8::1]:6379");
3697 }
3698
3699 #[test]
3700 fn test_to_url_with_ipv6_full_address_with_port() {
3701 let node = ClusterNode {
3702 host: "rediss://[2001:db8::1]:7000".to_string(),
3703 port: 6379,
3704 };
3705 assert_eq!(node.to_url(), "rediss://[2001:db8::1]:7000");
3706 }
3707
3708 #[test]
3709 fn test_to_url_with_redis_protocol_ipv6() {
3710 let node = ClusterNode {
3711 host: "redis://[::1]".to_string(),
3712 port: 6379,
3713 };
3714 assert_eq!(node.to_url(), "redis://[::1]:6379");
3715 }
3716}
3717
3718#[cfg(test)]
3719mod cors_config_tests {
3720 use super::CorsConfig;
3721
3722 fn cors_from_json(json: &str) -> sonic_rs::Result<CorsConfig> {
3723 sonic_rs::from_str(json)
3724 }
3725
3726 #[test]
3727 fn test_deserialize_valid_exact_origins() {
3728 let config =
3729 cors_from_json(r#"{"origin": ["https://example.com", "https://other.com"]}"#).unwrap();
3730 assert_eq!(config.origin.len(), 2);
3731 }
3732
3733 #[test]
3734 fn test_deserialize_valid_wildcard_patterns() {
3735 let config =
3736 cors_from_json(r#"{"origin": ["*.example.com", "https://*.staging.example.com"]}"#)
3737 .unwrap();
3738 assert_eq!(config.origin.len(), 2);
3739 }
3740
3741 #[test]
3742 fn test_deserialize_allows_special_markers() {
3743 assert!(cors_from_json(r#"{"origin": ["*"]}"#).is_ok());
3744 assert!(cors_from_json(r#"{"origin": ["Any"]}"#).is_ok());
3745 assert!(cors_from_json(r#"{"origin": ["any"]}"#).is_ok());
3746 assert!(cors_from_json(r#"{"origin": ["*", "https://example.com"]}"#).is_ok());
3747 }
3748
3749 #[test]
3750 fn test_deserialize_rejects_invalid_patterns() {
3751 assert!(cors_from_json(r#"{"origin": ["*.*bad"]}"#).is_err());
3752 assert!(cors_from_json(r#"{"origin": ["*."]}"#).is_err());
3753 assert!(cors_from_json(r#"{"origin": [""]}"#).is_err());
3754 assert!(cors_from_json(r#"{"origin": ["https://"]}"#).is_err());
3755 }
3756
3757 #[test]
3758 fn test_deserialize_rejects_mixed_valid_and_invalid() {
3759 assert!(cors_from_json(r#"{"origin": ["https://good.com", "*.*bad"]}"#).is_err());
3760 }
3761}