1use crate::app::App;
2use crate::utils::{parse_bool_env, parse_env, parse_env_optional};
3use ahash::AHashMap;
4use serde::{Deserialize, Serialize};
5use std::str::FromStr;
6use tracing::{info, warn};
7use url::Url;
8
9fn deserialize_octal_permission<'de, D>(deserializer: D) -> Result<u32, D::Error>
11where
12 D: serde::Deserializer<'de>,
13{
14 use serde::de::{self};
15
16 let s = String::deserialize(deserializer)?;
17
18 if !s.chars().all(|c| c.is_digit(8)) {
20 return Err(de::Error::custom(format!(
21 "invalid octal permission mode '{}': must contain only digits 0-7",
22 s
23 )));
24 }
25
26 let mode = u32::from_str_radix(&s, 8)
28 .map_err(|_| de::Error::custom(format!("invalid octal permission mode: {}", s)))?;
29
30 if mode > 0o777 {
32 return Err(de::Error::custom(format!(
33 "permission mode '{}' exceeds maximum value 777",
34 s
35 )));
36 }
37
38 Ok(mode)
39}
40
41fn parse_driver_enum<T: FromStr + Clone + std::fmt::Debug>(
43 driver_str: String,
44 default_driver: T,
45 driver_name: &str,
46) -> T
47where
48 <T as FromStr>::Err: std::fmt::Debug,
49{
50 match T::from_str(&driver_str.to_lowercase()) {
51 Ok(driver_enum) => driver_enum,
52 Err(e) => {
53 warn!(
54 "Failed to parse {} driver from string '{}': {:?}. Using default: {:?}.",
55 driver_name, driver_str, e, default_driver
56 );
57 default_driver
58 }
59 }
60}
61
62fn override_db_pool_settings(db_conn: &mut DatabaseConnection, prefix: &str) {
63 if let Some(min) = parse_env_optional::<u32>(&format!("{}_POOL_MIN", prefix)) {
64 db_conn.pool_min = Some(min);
65 }
66 if let Some(max) = parse_env_optional::<u32>(&format!("{}_POOL_MAX", prefix)) {
67 db_conn.pool_max = Some(max);
68 }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(rename_all = "lowercase")]
75pub enum AdapterDriver {
76 #[default]
77 Local,
78 Redis,
79 #[serde(rename = "redis-cluster")]
80 RedisCluster,
81 Nats,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
85#[serde(default)]
86pub struct DynamoDbSettings {
87 pub region: String,
88 pub table_name: String,
89 pub endpoint_url: Option<String>,
90 pub aws_access_key_id: Option<String>,
91 pub aws_secret_access_key: Option<String>,
92 pub aws_profile_name: Option<String>,
93}
94
95impl Default for DynamoDbSettings {
96 fn default() -> Self {
97 Self {
98 region: "us-east-1".to_string(),
99 table_name: "sockudo-applications".to_string(),
100 endpoint_url: None,
101 aws_access_key_id: None,
102 aws_secret_access_key: None,
103 aws_profile_name: None,
104 }
105 }
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
109#[serde(default)]
110pub struct ScyllaDbSettings {
111 pub nodes: Vec<String>,
112 pub keyspace: String,
113 pub table_name: String,
114 pub username: Option<String>,
115 pub password: Option<String>,
116 pub replication_class: String,
117 pub replication_factor: u32,
118}
119
120impl Default for ScyllaDbSettings {
121 fn default() -> Self {
122 Self {
123 nodes: vec!["127.0.0.1:9042".to_string()],
124 keyspace: "sockudo".to_string(),
125 table_name: "applications".to_string(),
126 username: None,
127 password: None,
128 replication_class: "SimpleStrategy".to_string(),
129 replication_factor: 3,
130 }
131 }
132}
133
134impl FromStr for AdapterDriver {
135 type Err = String;
136 fn from_str(s: &str) -> Result<Self, Self::Err> {
137 match s.to_lowercase().as_str() {
138 "local" => Ok(AdapterDriver::Local),
139 "redis" => Ok(AdapterDriver::Redis),
140 "redis-cluster" => Ok(AdapterDriver::RedisCluster),
141 "nats" => Ok(AdapterDriver::Nats),
142 _ => Err(format!("Unknown adapter driver: {s}")),
143 }
144 }
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
148#[serde(rename_all = "lowercase")]
149pub enum AppManagerDriver {
150 #[default]
151 Memory,
152 Mysql,
153 Dynamodb,
154 PgSql,
155 ScyllaDb,
156}
157impl FromStr for AppManagerDriver {
158 type Err = String;
159 fn from_str(s: &str) -> Result<Self, Self::Err> {
160 match s.to_lowercase().as_str() {
161 "memory" => Ok(AppManagerDriver::Memory),
162 "mysql" => Ok(AppManagerDriver::Mysql),
163 "dynamodb" => Ok(AppManagerDriver::Dynamodb),
164 "pgsql" | "postgres" | "postgresql" => Ok(AppManagerDriver::PgSql),
165 "scylladb" | "scylla" => Ok(AppManagerDriver::ScyllaDb),
166 _ => Err(format!("Unknown app manager driver: {s}")),
167 }
168 }
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
172#[serde(rename_all = "lowercase")]
173pub enum CacheDriver {
174 #[default]
175 Memory,
176 Redis,
177 #[serde(rename = "redis-cluster")]
178 RedisCluster,
179 None,
180}
181
182impl FromStr for CacheDriver {
183 type Err = String;
184 fn from_str(s: &str) -> Result<Self, Self::Err> {
185 match s.to_lowercase().as_str() {
186 "memory" => Ok(CacheDriver::Memory),
187 "redis" => Ok(CacheDriver::Redis),
188 "redis-cluster" => Ok(CacheDriver::RedisCluster),
189 "none" => Ok(CacheDriver::None),
190 _ => Err(format!("Unknown cache driver: {s}")),
191 }
192 }
193}
194
195#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
196#[serde(rename_all = "lowercase")]
197pub enum QueueDriver {
198 Memory,
199 #[default]
200 Redis,
201 #[serde(rename = "redis-cluster")]
202 RedisCluster,
203 Sqs,
204 Sns,
205 None,
206}
207
208impl FromStr for QueueDriver {
209 type Err = String;
210 fn from_str(s: &str) -> Result<Self, Self::Err> {
211 match s.to_lowercase().as_str() {
212 "memory" => Ok(QueueDriver::Memory),
213 "redis" => Ok(QueueDriver::Redis),
214 "redis-cluster" => Ok(QueueDriver::RedisCluster),
215 "sqs" => Ok(QueueDriver::Sqs),
216 "sns" => Ok(QueueDriver::Sns),
217 "none" => Ok(QueueDriver::None),
218 _ => Err(format!("Unknown queue driver: {s}")),
219 }
220 }
221}
222
223impl AsRef<str> for QueueDriver {
224 fn as_ref(&self) -> &str {
225 match self {
226 QueueDriver::Memory => "memory",
227 QueueDriver::Redis => "redis",
228 QueueDriver::RedisCluster => "redis-cluster",
229 QueueDriver::Sqs => "sqs",
230 QueueDriver::Sns => "sns",
231 QueueDriver::None => "none",
232 }
233 }
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
237#[serde(default)]
238pub struct RedisClusterQueueConfig {
239 pub concurrency: u32,
240 pub prefix: Option<String>,
241 pub nodes: Vec<String>,
242 pub request_timeout_ms: u64,
243}
244
245impl Default for RedisClusterQueueConfig {
246 fn default() -> Self {
247 Self {
248 concurrency: 5,
249 prefix: Some("sockudo_queue:".to_string()),
250 nodes: vec!["redis://127.0.0.1:6379".to_string()],
251 request_timeout_ms: 5000,
252 }
253 }
254}
255
256#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
257#[serde(rename_all = "lowercase")]
258pub enum MetricsDriver {
259 #[default]
260 Prometheus,
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
264#[serde(rename_all = "lowercase")]
265pub enum LogOutputFormat {
266 #[default]
267 Human,
268 Json,
269}
270
271impl FromStr for LogOutputFormat {
272 type Err = String;
273 fn from_str(s: &str) -> Result<Self, Self::Err> {
274 match s.to_lowercase().as_str() {
275 "human" => Ok(LogOutputFormat::Human),
276 "json" => Ok(LogOutputFormat::Json),
277 _ => Err(format!("Unknown log output format: {s}")),
278 }
279 }
280}
281
282impl FromStr for MetricsDriver {
283 type Err = String;
284 fn from_str(s: &str) -> Result<Self, Self::Err> {
285 match s.to_lowercase().as_str() {
286 "prometheus" => Ok(MetricsDriver::Prometheus),
287 _ => Err(format!("Unknown metrics driver: {s}")),
288 }
289 }
290}
291
292impl AsRef<str> for MetricsDriver {
293 fn as_ref(&self) -> &str {
294 match self {
295 MetricsDriver::Prometheus => "prometheus",
296 }
297 }
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize)]
302#[serde(default)]
303pub struct ServerOptions {
304 pub adapter: AdapterConfig,
305 pub app_manager: AppManagerConfig,
306 pub cache: CacheConfig,
307 pub channel_limits: ChannelLimits,
308 pub cors: CorsConfig,
309 pub database: DatabaseConfig,
310 pub database_pooling: DatabasePooling,
311 pub debug: bool,
312 pub event_limits: EventLimits,
313 pub host: String,
314 pub http_api: HttpApiConfig,
315 pub instance: InstanceConfig,
316 pub logging: Option<LoggingConfig>,
317 pub metrics: MetricsConfig,
318 pub mode: String,
319 pub port: u16,
320 pub path_prefix: String,
321 pub presence: PresenceConfig,
322 pub queue: QueueConfig,
323 pub rate_limiter: RateLimiterConfig,
324 pub shutdown_grace_period: u64,
325 pub ssl: SslConfig,
326 pub user_authentication_timeout: u64,
327 pub webhooks: WebhooksConfig,
328 pub websocket_max_payload_kb: u32,
329 pub cleanup: CleanupConfig,
330 pub activity_timeout: u64,
331 pub cluster_health: ClusterHealthConfig,
332 pub unix_socket: UnixSocketConfig,
333 pub delta_compression: DeltaCompressionOptionsConfig,
334 pub tag_filtering: TagFilteringConfig,
335 pub websocket: WebSocketConfig,
336}
337
338#[derive(Debug, Clone, Serialize, Deserialize)]
341#[serde(default)]
342pub struct SqsQueueConfig {
343 pub region: String,
344 pub queue_url_prefix: Option<String>,
345 pub visibility_timeout: i32,
346 pub endpoint_url: Option<String>,
347 pub max_messages: i32,
348 pub wait_time_seconds: i32,
349 pub concurrency: u32,
350 pub fifo: bool,
351 pub message_group_id: Option<String>,
352}
353
354#[derive(Debug, Clone, Serialize, Deserialize)]
355#[serde(default)]
356pub struct SnsQueueConfig {
357 pub region: String,
358 pub topic_arn: String,
359 pub endpoint_url: Option<String>,
360}
361
362#[derive(Debug, Clone, Serialize, Deserialize)]
363#[serde(default)]
364pub struct AdapterConfig {
365 pub driver: AdapterDriver,
366 pub redis: RedisAdapterConfig,
367 pub cluster: RedisClusterAdapterConfig,
368 pub nats: NatsAdapterConfig,
369 #[serde(default = "default_buffer_multiplier_per_cpu")]
370 pub buffer_multiplier_per_cpu: usize,
371 pub cluster_health: ClusterHealthConfig,
372 #[serde(default = "default_enable_socket_counting")]
373 pub enable_socket_counting: bool,
374}
375
376fn default_enable_socket_counting() -> bool {
377 true
378}
379
380fn default_buffer_multiplier_per_cpu() -> usize {
381 64
382}
383
384impl Default for AdapterConfig {
385 fn default() -> Self {
386 Self {
387 driver: AdapterDriver::default(),
388 redis: RedisAdapterConfig::default(),
389 cluster: RedisClusterAdapterConfig::default(),
390 nats: NatsAdapterConfig::default(),
391 buffer_multiplier_per_cpu: default_buffer_multiplier_per_cpu(),
392 cluster_health: ClusterHealthConfig::default(),
393 enable_socket_counting: default_enable_socket_counting(),
394 }
395 }
396}
397
398#[derive(Debug, Clone, Serialize, Deserialize)]
399#[serde(default)]
400pub struct RedisAdapterConfig {
401 pub requests_timeout: u64,
402 pub prefix: String,
403 pub redis_pub_options: AHashMap<String, sonic_rs::Value>,
404 pub redis_sub_options: AHashMap<String, sonic_rs::Value>,
405 pub cluster_mode: bool,
406}
407
408#[derive(Debug, Clone, Serialize, Deserialize)]
409#[serde(default)]
410pub struct RedisClusterAdapterConfig {
411 pub nodes: Vec<String>,
412 pub prefix: String,
413 pub request_timeout_ms: u64,
414 pub use_connection_manager: bool,
415 #[serde(default)]
416 pub use_sharded_pubsub: bool,
417}
418
419#[derive(Debug, Clone, Serialize, Deserialize)]
420#[serde(default)]
421pub struct NatsAdapterConfig {
422 pub servers: Vec<String>,
423 pub prefix: String,
424 pub request_timeout_ms: u64,
425 pub username: Option<String>,
426 pub password: Option<String>,
427 pub token: Option<String>,
428 pub connection_timeout_ms: u64,
429 pub nodes_number: Option<u32>,
430}
431
432#[derive(Debug, Clone, Serialize, Deserialize, Default)]
433#[serde(default)]
434pub struct AppManagerConfig {
435 pub driver: AppManagerDriver,
436 pub array: ArrayConfig,
437 pub cache: CacheSettings,
438 pub scylladb: ScyllaDbSettings,
439}
440
441#[derive(Debug, Clone, Serialize, Deserialize, Default)]
442#[serde(default)]
443pub struct ArrayConfig {
444 pub apps: Vec<App>,
445}
446
447#[derive(Debug, Clone, Serialize, Deserialize)]
448#[serde(default)]
449pub struct CacheSettings {
450 pub enabled: bool,
451 pub ttl: u64,
452}
453
454#[derive(Debug, Clone, Serialize, Deserialize)]
455#[serde(default)]
456pub struct MemoryCacheOptions {
457 pub ttl: u64,
458 pub cleanup_interval: u64,
459 pub max_capacity: u64,
460}
461
462#[derive(Debug, Clone, Serialize, Deserialize)]
463#[serde(default)]
464pub struct CacheConfig {
465 pub driver: CacheDriver,
466 pub redis: RedisConfig,
467 pub memory: MemoryCacheOptions,
468}
469
470#[derive(Debug, Clone, Serialize, Deserialize, Default)]
471#[serde(default)]
472pub struct RedisConfig {
473 pub prefix: Option<String>,
474 pub url_override: Option<String>,
475 pub cluster_mode: bool,
476}
477
478#[derive(Debug, Clone, Serialize, Deserialize)]
479#[serde(default)]
480pub struct ChannelLimits {
481 pub max_name_length: u32,
482 pub cache_ttl: u64,
483}
484
485#[derive(Debug, Clone, Serialize, Deserialize)]
486#[serde(default)]
487pub struct CorsConfig {
488 pub credentials: bool,
489 #[serde(deserialize_with = "deserialize_and_validate_cors_origins")]
490 pub origin: Vec<String>,
491 pub methods: Vec<String>,
492 pub allowed_headers: Vec<String>,
493}
494
495fn deserialize_and_validate_cors_origins<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
496where
497 D: serde::Deserializer<'de>,
498{
499 use serde::de::Error;
500 let origins = Vec::<String>::deserialize(deserializer)?;
501
502 if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&origins) {
503 return Err(D::Error::custom(format!(
504 "CORS origin pattern validation failed: {}",
505 e
506 )));
507 }
508
509 Ok(origins)
510}
511
512#[derive(Debug, Clone, Serialize, Deserialize, Default)]
513#[serde(default)]
514pub struct DatabaseConfig {
515 pub mysql: DatabaseConnection,
516 pub postgres: DatabaseConnection,
517 pub redis: RedisConnection,
518 pub dynamodb: DynamoDbSettings,
519 pub scylladb: ScyllaDbSettings,
520}
521
522#[derive(Debug, Clone, Serialize, Deserialize)]
523#[serde(default)]
524pub struct DatabaseConnection {
525 pub host: String,
526 pub port: u16,
527 pub username: String,
528 pub password: String,
529 pub database: String,
530 pub table_name: String,
531 pub connection_pool_size: u32,
532 pub pool_min: Option<u32>,
533 pub pool_max: Option<u32>,
534 pub cache_ttl: u64,
535 pub cache_cleanup_interval: u64,
536 pub cache_max_capacity: u64,
537}
538
539#[derive(Debug, Clone, Serialize, Deserialize)]
540#[serde(default)]
541pub struct RedisConnection {
542 pub host: String,
543 pub port: u16,
544 pub db: u32,
545 pub username: Option<String>,
546 pub password: Option<String>,
547 pub key_prefix: String,
548 pub sentinels: Vec<RedisSentinel>,
549 pub sentinel_password: Option<String>,
550 pub name: String,
551 pub cluster: RedisClusterConnection,
552 pub cluster_nodes: Vec<ClusterNode>,
554}
555
556#[derive(Debug, Clone, Serialize, Deserialize)]
557#[serde(default)]
558pub struct RedisSentinel {
559 pub host: String,
560 pub port: u16,
561}
562
563#[derive(Debug, Clone, Serialize, Deserialize, Default)]
564#[serde(default)]
565pub struct RedisClusterConnection {
566 pub nodes: Vec<ClusterNode>,
567 pub username: Option<String>,
568 pub password: Option<String>,
569 #[serde(alias = "useTLS")]
570 pub use_tls: bool,
571}
572
573impl RedisConnection {
574 pub fn is_sentinel_configured(&self) -> bool {
576 !self.sentinels.is_empty()
577 }
578
579 pub fn to_url(&self) -> String {
581 if self.is_sentinel_configured() {
582 self.build_sentinel_url()
583 } else {
584 self.build_standard_url()
585 }
586 }
587
588 fn build_standard_url(&self) -> String {
589 let (scheme, host) = if self.host.starts_with("rediss://") {
591 ("rediss://", self.host.trim_start_matches("rediss://"))
592 } else if self.host.starts_with("redis://") {
593 ("redis://", self.host.trim_start_matches("redis://"))
594 } else {
595 ("redis://", self.host.as_str())
596 };
597
598 let mut url = String::from(scheme);
599
600 if let Some(ref username) = self.username {
601 url.push_str(username);
602 if let Some(ref password) = self.password {
603 url.push(':');
604 url.push_str(&urlencoding::encode(password));
605 }
606 url.push('@');
607 } else if let Some(ref password) = self.password {
608 url.push(':');
609 url.push_str(&urlencoding::encode(password));
610 url.push('@');
611 }
612
613 url.push_str(host);
614 url.push(':');
615 url.push_str(&self.port.to_string());
616 url.push('/');
617 url.push_str(&self.db.to_string());
618
619 url
620 }
621
622 fn build_sentinel_url(&self) -> String {
623 let mut url = String::from("redis+sentinel://");
624
625 if let Some(ref sentinel_password) = self.sentinel_password {
626 url.push(':');
627 url.push_str(&urlencoding::encode(sentinel_password));
628 url.push('@');
629 }
630
631 let sentinel_hosts: Vec<String> = self
632 .sentinels
633 .iter()
634 .map(|s| format!("{}:{}", s.host, s.port))
635 .collect();
636 url.push_str(&sentinel_hosts.join(","));
637
638 url.push('/');
639 url.push_str(&self.name);
640 url.push('/');
641 url.push_str(&self.db.to_string());
642
643 let mut params = Vec::new();
644 if let Some(ref password) = self.password {
645 params.push(format!("password={}", urlencoding::encode(password)));
646 }
647 if let Some(ref username) = self.username {
648 params.push(format!("username={}", urlencoding::encode(username)));
649 }
650
651 if !params.is_empty() {
652 url.push('?');
653 url.push_str(¶ms.join("&"));
654 }
655
656 url
657 }
658
659 pub fn has_cluster_nodes(&self) -> bool {
662 !self.cluster.nodes.is_empty() || !self.cluster_nodes.is_empty()
663 }
664
665 pub fn cluster_node_urls(&self) -> Vec<String> {
668 if !self.cluster.nodes.is_empty() {
669 return self.build_cluster_urls(&self.cluster.nodes);
670 }
671 self.build_cluster_urls(&self.cluster_nodes)
672 }
673
674 pub fn normalize_cluster_seed_urls(&self, seeds: &[String]) -> Vec<String> {
677 self.build_cluster_urls(
678 &seeds
679 .iter()
680 .filter_map(|seed| ClusterNode::from_seed(seed))
681 .collect::<Vec<ClusterNode>>(),
682 )
683 }
684
685 fn build_cluster_urls(&self, nodes: &[ClusterNode]) -> Vec<String> {
686 let username = self
687 .cluster
688 .username
689 .as_deref()
690 .or(self.username.as_deref());
691 let password = self
692 .cluster
693 .password
694 .as_deref()
695 .or(self.password.as_deref());
696 let use_tls = self.cluster.use_tls;
697
698 nodes
699 .iter()
700 .map(|node| node.to_url_with_options(use_tls, username, password))
701 .collect()
702 }
703}
704
705impl RedisSentinel {
706 pub fn to_host_port(&self) -> String {
707 format!("{}:{}", self.host, self.port)
708 }
709}
710
711#[derive(Debug, Clone, Serialize, Deserialize)]
712#[serde(default)]
713pub struct ClusterNode {
714 pub host: String,
715 pub port: u16,
716}
717
718impl ClusterNode {
719 pub fn to_url(&self) -> String {
720 self.to_url_with_options(false, None, None)
721 }
722
723 pub fn to_url_with_options(
724 &self,
725 use_tls: bool,
726 username: Option<&str>,
727 password: Option<&str>,
728 ) -> String {
729 let host = self.host.trim();
730
731 if host.starts_with("redis://") || host.starts_with("rediss://") {
732 if let Ok(parsed) = Url::parse(host)
733 && let Some(host_str) = parsed.host_str()
734 {
735 let scheme = parsed.scheme();
736 let port = parsed.port_or_known_default().unwrap_or(self.port);
737 let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
738 let parsed_password = parsed.password();
739 let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
740 let (effective_username, effective_password) = if has_embedded_auth {
741 (parsed_username, parsed_password)
742 } else {
743 (username, password)
744 };
745
746 return build_redis_url(
747 scheme,
748 host_str,
749 port,
750 effective_username,
751 effective_password,
752 );
753 }
754
755 let has_port = if let Some(bracket_pos) = host.rfind(']') {
757 host[bracket_pos..].contains(':')
758 } else {
759 host.split(':').count() >= 3
760 };
761 let base = if has_port {
762 host.to_string()
763 } else {
764 format!("{}:{}", host, self.port)
765 };
766
767 if let Ok(parsed) = Url::parse(&base) {
768 let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
769 let parsed_password = parsed.password();
770 if let Some(host_str) = parsed.host_str() {
771 let port = parsed.port_or_known_default().unwrap_or(self.port);
772 let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
773 let (effective_username, effective_password) = if has_embedded_auth {
774 (parsed_username, parsed_password)
775 } else {
776 (username, password)
777 };
778 return build_redis_url(
779 parsed.scheme(),
780 host_str,
781 port,
782 effective_username,
783 effective_password,
784 );
785 }
786 }
787 return base;
788 }
789
790 let (normalized_host, normalized_port) = split_plain_host_and_port(host, self.port);
791 let scheme = if use_tls { "rediss" } else { "redis" };
792 build_redis_url(
793 scheme,
794 &normalized_host,
795 normalized_port,
796 username,
797 password,
798 )
799 }
800
801 pub fn from_seed(seed: &str) -> Option<Self> {
802 let trimmed = seed.trim();
803 if trimmed.is_empty() {
804 return None;
805 }
806
807 if trimmed.starts_with("redis://") || trimmed.starts_with("rediss://") {
808 let port = Url::parse(trimmed)
809 .ok()
810 .and_then(|parsed| parsed.port_or_known_default())
811 .unwrap_or(6379);
812 return Some(Self {
813 host: trimmed.to_string(),
814 port,
815 });
816 }
817
818 let (host, port) = split_plain_host_and_port(trimmed, 6379);
819 Some(Self { host, port })
820 }
821}
822
823fn split_plain_host_and_port(raw_host: &str, default_port: u16) -> (String, u16) {
824 let host = raw_host.trim();
825
826 if host.starts_with('[') {
828 if let Some(end_bracket) = host.find(']') {
829 let host_part = host[1..end_bracket].to_string();
830 let remainder = &host[end_bracket + 1..];
831 if let Some(port_str) = remainder.strip_prefix(':')
832 && let Ok(port) = port_str.parse::<u16>()
833 {
834 return (host_part, port);
835 }
836 return (host_part, default_port);
837 }
838 return (host.to_string(), default_port);
839 }
840
841 if host.matches(':').count() == 1
843 && let Some((host_part, port_part)) = host.rsplit_once(':')
844 && let Ok(port) = port_part.parse::<u16>()
845 {
846 return (host_part.to_string(), port);
847 }
848
849 (host.to_string(), default_port)
850}
851
852fn build_redis_url(
853 scheme: &str,
854 host: &str,
855 port: u16,
856 username: Option<&str>,
857 password: Option<&str>,
858) -> String {
859 let mut url = format!("{scheme}://");
860
861 if let Some(user) = username {
862 url.push_str(&urlencoding::encode(user));
863 if let Some(pass) = password {
864 url.push(':');
865 url.push_str(&urlencoding::encode(pass));
866 }
867 url.push('@');
868 } else if let Some(pass) = password {
869 url.push(':');
870 url.push_str(&urlencoding::encode(pass));
871 url.push('@');
872 }
873
874 if host.contains(':') && !host.starts_with('[') {
875 url.push('[');
876 url.push_str(host);
877 url.push(']');
878 } else {
879 url.push_str(host);
880 }
881 url.push(':');
882 url.push_str(&port.to_string());
883 url
884}
885
886#[derive(Debug, Clone, Serialize, Deserialize)]
887#[serde(default)]
888pub struct DatabasePooling {
889 pub enabled: bool,
890 pub min: u32,
891 pub max: u32,
892}
893
894#[derive(Debug, Clone, Serialize, Deserialize)]
895#[serde(default)]
896pub struct EventLimits {
897 pub max_channels_at_once: u32,
898 pub max_name_length: u32,
899 pub max_payload_in_kb: u32,
900 pub max_batch_size: u32,
901}
902
903#[derive(Debug, Clone, Serialize, Deserialize)]
904#[serde(default)]
905pub struct HttpApiConfig {
906 pub request_limit_in_mb: u32,
907 pub accept_traffic: AcceptTraffic,
908 pub usage_enabled: bool,
909}
910
911#[derive(Debug, Clone, Serialize, Deserialize)]
912#[serde(default)]
913pub struct AcceptTraffic {
914 pub memory_threshold: f64,
915}
916
917#[derive(Debug, Clone, Serialize, Deserialize)]
918#[serde(default)]
919pub struct InstanceConfig {
920 pub process_id: String,
921}
922
923#[derive(Debug, Clone, Serialize, Deserialize)]
924#[serde(default)]
925pub struct MetricsConfig {
926 pub enabled: bool,
927 pub driver: MetricsDriver,
928 pub host: String,
929 pub prometheus: PrometheusConfig,
930 pub port: u16,
931}
932
933#[derive(Debug, Clone, Serialize, Deserialize)]
934#[serde(default)]
935pub struct PrometheusConfig {
936 pub prefix: String,
937}
938
939#[derive(Debug, Clone, Serialize, Deserialize)]
940#[serde(default)]
941pub struct LoggingConfig {
942 pub colors_enabled: bool,
943 pub include_target: bool,
944}
945
946#[derive(Debug, Clone, Serialize, Deserialize)]
947#[serde(default)]
948pub struct PresenceConfig {
949 pub max_members_per_channel: u32,
950 pub max_member_size_in_kb: u32,
951}
952
953#[derive(Debug, Clone, Serialize, Deserialize)]
956#[serde(default)]
957pub struct WebSocketConfig {
958 pub max_messages: Option<usize>,
959 pub max_bytes: Option<usize>,
960 pub disconnect_on_buffer_full: bool,
961 pub max_message_size: usize,
962 pub max_frame_size: usize,
963 pub write_buffer_size: usize,
964 pub max_backpressure: usize,
965 pub auto_ping: bool,
966 pub ping_interval: u32,
967 pub idle_timeout: u32,
968 pub compression: String,
969}
970
971impl Default for WebSocketConfig {
972 fn default() -> Self {
973 Self {
974 max_messages: Some(1000),
975 max_bytes: None,
976 disconnect_on_buffer_full: true,
977 max_message_size: 64 * 1024 * 1024,
978 max_frame_size: 16 * 1024 * 1024,
979 write_buffer_size: 16 * 1024,
980 max_backpressure: 1024 * 1024,
981 auto_ping: true,
982 ping_interval: 30,
983 idle_timeout: 120,
984 compression: "disabled".to_string(),
985 }
986 }
987}
988
989impl WebSocketConfig {
990 pub fn to_buffer_config(&self) -> crate::websocket::WebSocketBufferConfig {
992 use crate::websocket::{BufferLimit, WebSocketBufferConfig};
993
994 let limit = match (self.max_messages, self.max_bytes) {
995 (Some(messages), Some(bytes)) => BufferLimit::Both { messages, bytes },
996 (Some(messages), None) => BufferLimit::Messages(messages),
997 (None, Some(bytes)) => BufferLimit::Bytes(bytes),
998 (None, None) => BufferLimit::Messages(1000),
999 };
1000
1001 WebSocketBufferConfig {
1002 limit,
1003 disconnect_on_full: self.disconnect_on_buffer_full,
1004 }
1005 }
1006
1007 pub fn to_sockudo_ws_config(
1009 &self,
1010 websocket_max_payload_kb: u32,
1011 activity_timeout: u64,
1012 ) -> sockudo_ws::Config {
1013 use sockudo_ws::Compression;
1014
1015 let compression = match self.compression.to_lowercase().as_str() {
1016 "dedicated" => Compression::Dedicated,
1017 "shared" => Compression::Shared,
1018 "window256b" => Compression::Window256B,
1019 "window1kb" => Compression::Window1KB,
1020 "window2kb" => Compression::Window2KB,
1021 "window4kb" => Compression::Window4KB,
1022 "window8kb" => Compression::Window8KB,
1023 "window16kb" => Compression::Window16KB,
1024 "window32kb" => Compression::Window32KB,
1025 _ => Compression::Disabled,
1026 };
1027
1028 sockudo_ws::Config::builder()
1029 .max_payload_length(
1030 self.max_bytes
1031 .unwrap_or(websocket_max_payload_kb as usize * 1024),
1032 )
1033 .max_message_size(self.max_message_size)
1034 .max_frame_size(self.max_frame_size)
1035 .write_buffer_size(self.write_buffer_size)
1036 .max_backpressure(self.max_backpressure)
1037 .idle_timeout(self.idle_timeout)
1038 .auto_ping(self.auto_ping)
1039 .ping_interval(self.ping_interval.max((activity_timeout / 2).max(5) as u32))
1040 .compression(compression)
1041 .build()
1042 }
1043}
1044
1045#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1046#[serde(default)]
1047pub struct QueueConfig {
1048 pub driver: QueueDriver,
1049 pub redis: RedisQueueConfig,
1050 pub redis_cluster: RedisClusterQueueConfig,
1051 pub sqs: SqsQueueConfig,
1052 pub sns: SnsQueueConfig,
1053}
1054
1055#[derive(Debug, Clone, Serialize, Deserialize)]
1056#[serde(default)]
1057pub struct RedisQueueConfig {
1058 pub concurrency: u32,
1059 pub prefix: Option<String>,
1060 pub url_override: Option<String>,
1061 pub cluster_mode: bool,
1062}
1063
1064#[derive(Clone, Debug, Serialize, Deserialize)]
1065#[serde(default)]
1066pub struct RateLimit {
1067 pub max_requests: u32,
1068 pub window_seconds: u64,
1069 pub identifier: Option<String>,
1070 pub trust_hops: Option<u32>,
1071}
1072
1073#[derive(Debug, Clone, Serialize, Deserialize)]
1074#[serde(default)]
1075pub struct RateLimiterConfig {
1076 pub enabled: bool,
1077 pub driver: CacheDriver,
1078 pub api_rate_limit: RateLimit,
1079 pub websocket_rate_limit: RateLimit,
1080 pub redis: RedisConfig,
1081}
1082
1083#[derive(Debug, Clone, Serialize, Deserialize)]
1084#[serde(default)]
1085pub struct SslConfig {
1086 pub enabled: bool,
1087 pub cert_path: String,
1088 pub key_path: String,
1089 pub passphrase: Option<String>,
1090 pub ca_path: Option<String>,
1091 pub redirect_http: bool,
1092 pub http_port: Option<u16>,
1093}
1094
1095#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1096#[serde(default)]
1097pub struct WebhooksConfig {
1098 pub batching: BatchingConfig,
1099}
1100
1101#[derive(Debug, Clone, Serialize, Deserialize)]
1102#[serde(default)]
1103pub struct BatchingConfig {
1104 pub enabled: bool,
1105 pub duration: u64,
1106 pub size: usize,
1107}
1108
1109#[derive(Debug, Clone, Serialize, Deserialize)]
1110#[serde(default)]
1111pub struct ClusterHealthConfig {
1112 pub enabled: bool,
1113 pub heartbeat_interval_ms: u64,
1114 pub node_timeout_ms: u64,
1115 pub cleanup_interval_ms: u64,
1116}
1117
1118#[derive(Debug, Clone, Serialize, Deserialize)]
1119#[serde(default)]
1120pub struct UnixSocketConfig {
1121 pub enabled: bool,
1122 pub path: String,
1123 #[serde(deserialize_with = "deserialize_octal_permission")]
1124 pub permission_mode: u32,
1125}
1126
1127#[derive(Debug, Clone, Serialize, Deserialize)]
1128#[serde(default)]
1129pub struct DeltaCompressionOptionsConfig {
1130 pub enabled: bool,
1131 pub algorithm: String,
1132 pub full_message_interval: u32,
1133 pub min_message_size: usize,
1134 pub max_state_age_secs: u64,
1135 pub max_channel_states_per_socket: usize,
1136 pub max_conflation_states_per_channel: Option<usize>,
1137 pub conflation_key_path: Option<String>,
1138 pub cluster_coordination: bool,
1139 pub omit_delta_algorithm: bool,
1140}
1141
1142#[derive(Debug, Clone, Serialize, Deserialize)]
1144#[serde(default)]
1145pub struct CleanupConfig {
1146 pub queue_buffer_size: usize,
1147 pub batch_size: usize,
1148 pub batch_timeout_ms: u64,
1149 pub worker_threads: WorkerThreadsConfig,
1150 pub max_retry_attempts: u32,
1151 pub async_enabled: bool,
1152 pub fallback_to_sync: bool,
1153}
1154
1155#[derive(Debug, Clone)]
1157pub enum WorkerThreadsConfig {
1158 Auto,
1159 Fixed(usize),
1160}
1161
1162impl serde::Serialize for WorkerThreadsConfig {
1163 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1164 where
1165 S: serde::Serializer,
1166 {
1167 match self {
1168 WorkerThreadsConfig::Auto => serializer.serialize_str("auto"),
1169 WorkerThreadsConfig::Fixed(n) => serializer.serialize_u64(*n as u64),
1170 }
1171 }
1172}
1173
1174impl<'de> serde::Deserialize<'de> for WorkerThreadsConfig {
1175 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1176 where
1177 D: serde::Deserializer<'de>,
1178 {
1179 use serde::de;
1180 struct WorkerThreadsVisitor;
1181 impl<'de> de::Visitor<'de> for WorkerThreadsVisitor {
1182 type Value = WorkerThreadsConfig;
1183
1184 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1185 formatter.write_str(r#""auto" or a positive integer"#)
1186 }
1187
1188 fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
1189 if value.eq_ignore_ascii_case("auto") {
1190 Ok(WorkerThreadsConfig::Auto)
1191 } else if let Ok(n) = value.parse::<usize>() {
1192 Ok(WorkerThreadsConfig::Fixed(n))
1193 } else {
1194 Err(E::custom(format!(
1195 "expected 'auto' or a number, got '{value}'"
1196 )))
1197 }
1198 }
1199
1200 fn visit_u64<E: de::Error>(self, value: u64) -> Result<Self::Value, E> {
1201 Ok(WorkerThreadsConfig::Fixed(value as usize))
1202 }
1203
1204 fn visit_i64<E: de::Error>(self, value: i64) -> Result<Self::Value, E> {
1205 if value >= 0 {
1206 Ok(WorkerThreadsConfig::Fixed(value as usize))
1207 } else {
1208 Err(E::custom("worker_threads must be non-negative"))
1209 }
1210 }
1211 }
1212 deserializer.deserialize_any(WorkerThreadsVisitor)
1213 }
1214}
1215
1216impl Default for CleanupConfig {
1217 fn default() -> Self {
1218 Self {
1219 queue_buffer_size: 1024,
1220 batch_size: 64,
1221 batch_timeout_ms: 100,
1222 worker_threads: WorkerThreadsConfig::Auto,
1223 max_retry_attempts: 3,
1224 async_enabled: true,
1225 fallback_to_sync: true,
1226 }
1227 }
1228}
1229
1230impl CleanupConfig {
1231 pub fn validate(&self) -> Result<(), String> {
1232 if self.queue_buffer_size == 0 {
1233 return Err("queue_buffer_size must be greater than 0".to_string());
1234 }
1235 if self.batch_size == 0 {
1236 return Err("batch_size must be greater than 0".to_string());
1237 }
1238 if self.batch_timeout_ms == 0 {
1239 return Err("batch_timeout_ms must be greater than 0".to_string());
1240 }
1241 if let WorkerThreadsConfig::Fixed(n) = self.worker_threads
1242 && n == 0
1243 {
1244 return Err("worker_threads must be greater than 0 when using fixed count".to_string());
1245 }
1246 Ok(())
1247 }
1248}
1249
1250#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1251#[serde(default)]
1252pub struct TagFilteringConfig {
1253 #[serde(default)]
1254 pub enabled: bool,
1255 #[serde(default = "default_true")]
1256 pub enable_tags: bool,
1257}
1258
1259fn default_true() -> bool {
1260 true
1261}
1262
1263impl Default for ServerOptions {
1266 fn default() -> Self {
1267 Self {
1268 adapter: AdapterConfig::default(),
1269 app_manager: AppManagerConfig::default(),
1270 cache: CacheConfig::default(),
1271 channel_limits: ChannelLimits::default(),
1272 cors: CorsConfig::default(),
1273 database: DatabaseConfig::default(),
1274 database_pooling: DatabasePooling::default(),
1275 debug: false,
1276 tag_filtering: TagFilteringConfig::default(),
1277 event_limits: EventLimits::default(),
1278 host: "0.0.0.0".to_string(),
1279 http_api: HttpApiConfig::default(),
1280 instance: InstanceConfig::default(),
1281 logging: None,
1282 metrics: MetricsConfig::default(),
1283 mode: "production".to_string(),
1284 port: 6001,
1285 path_prefix: "/".to_string(),
1286 presence: PresenceConfig::default(),
1287 queue: QueueConfig::default(),
1288 rate_limiter: RateLimiterConfig::default(),
1289 shutdown_grace_period: 10,
1290 ssl: SslConfig::default(),
1291 user_authentication_timeout: 3600,
1292 webhooks: WebhooksConfig::default(),
1293 websocket_max_payload_kb: 64,
1294 cleanup: CleanupConfig::default(),
1295 activity_timeout: 120,
1296 cluster_health: ClusterHealthConfig::default(),
1297 unix_socket: UnixSocketConfig::default(),
1298 delta_compression: DeltaCompressionOptionsConfig::default(),
1299 websocket: WebSocketConfig::default(),
1300 }
1301 }
1302}
1303
1304impl Default for SqsQueueConfig {
1305 fn default() -> Self {
1306 Self {
1307 region: "us-east-1".to_string(),
1308 queue_url_prefix: None,
1309 visibility_timeout: 30,
1310 endpoint_url: None,
1311 max_messages: 10,
1312 wait_time_seconds: 5,
1313 concurrency: 5,
1314 fifo: false,
1315 message_group_id: Some("default".to_string()),
1316 }
1317 }
1318}
1319
1320impl Default for SnsQueueConfig {
1321 fn default() -> Self {
1322 Self {
1323 region: "us-east-1".to_string(),
1324 topic_arn: String::new(),
1325 endpoint_url: None,
1326 }
1327 }
1328}
1329
1330impl Default for RedisAdapterConfig {
1331 fn default() -> Self {
1332 Self {
1333 requests_timeout: 5000,
1334 prefix: "sockudo_adapter:".to_string(),
1335 redis_pub_options: AHashMap::new(),
1336 redis_sub_options: AHashMap::new(),
1337 cluster_mode: false,
1338 }
1339 }
1340}
1341
1342impl Default for RedisClusterAdapterConfig {
1343 fn default() -> Self {
1344 Self {
1345 nodes: vec![],
1346 prefix: "sockudo_adapter:".to_string(),
1347 request_timeout_ms: 1000,
1348 use_connection_manager: true,
1349 use_sharded_pubsub: false,
1350 }
1351 }
1352}
1353
1354impl Default for NatsAdapterConfig {
1355 fn default() -> Self {
1356 Self {
1357 servers: vec!["nats://localhost:4222".to_string()],
1358 prefix: "sockudo_adapter:".to_string(),
1359 request_timeout_ms: 5000,
1360 username: None,
1361 password: None,
1362 token: None,
1363 connection_timeout_ms: 5000,
1364 nodes_number: None,
1365 }
1366 }
1367}
1368
1369impl Default for CacheSettings {
1370 fn default() -> Self {
1371 Self {
1372 enabled: true,
1373 ttl: 300,
1374 }
1375 }
1376}
1377
1378impl Default for MemoryCacheOptions {
1379 fn default() -> Self {
1380 Self {
1381 ttl: 300,
1382 cleanup_interval: 60,
1383 max_capacity: 10000,
1384 }
1385 }
1386}
1387
1388impl Default for CacheConfig {
1389 fn default() -> Self {
1390 Self {
1391 driver: CacheDriver::default(),
1392 redis: RedisConfig {
1393 prefix: Some("sockudo_cache:".to_string()),
1394 url_override: None,
1395 cluster_mode: false,
1396 },
1397 memory: MemoryCacheOptions::default(),
1398 }
1399 }
1400}
1401
1402impl Default for ChannelLimits {
1403 fn default() -> Self {
1404 Self {
1405 max_name_length: 200,
1406 cache_ttl: 3600,
1407 }
1408 }
1409}
1410impl Default for CorsConfig {
1411 fn default() -> Self {
1412 Self {
1413 credentials: true,
1414 origin: vec!["*".to_string()],
1415 methods: vec!["GET".to_string(), "POST".to_string(), "OPTIONS".to_string()],
1416 allowed_headers: vec![
1417 "Authorization".to_string(),
1418 "Content-Type".to_string(),
1419 "X-Requested-With".to_string(),
1420 "Accept".to_string(),
1421 ],
1422 }
1423 }
1424}
1425
1426impl Default for DatabaseConnection {
1427 fn default() -> Self {
1428 Self {
1429 host: "localhost".to_string(),
1430 port: 3306,
1431 username: "root".to_string(),
1432 password: "".to_string(),
1433 database: "sockudo".to_string(),
1434 table_name: "applications".to_string(),
1435 connection_pool_size: 10,
1436 pool_min: None,
1437 pool_max: None,
1438 cache_ttl: 300,
1439 cache_cleanup_interval: 60,
1440 cache_max_capacity: 100,
1441 }
1442 }
1443}
1444
1445impl Default for RedisConnection {
1446 fn default() -> Self {
1447 Self {
1448 host: "127.0.0.1".to_string(),
1449 port: 6379,
1450 db: 0,
1451 username: None,
1452 password: None,
1453 key_prefix: "sockudo:".to_string(),
1454 sentinels: Vec::new(),
1455 sentinel_password: None,
1456 name: "mymaster".to_string(),
1457 cluster: RedisClusterConnection::default(),
1458 cluster_nodes: Vec::new(),
1459 }
1460 }
1461}
1462
1463impl Default for RedisSentinel {
1464 fn default() -> Self {
1465 Self {
1466 host: "localhost".to_string(),
1467 port: 26379,
1468 }
1469 }
1470}
1471
1472impl Default for ClusterNode {
1473 fn default() -> Self {
1474 Self {
1475 host: "127.0.0.1".to_string(),
1476 port: 7000,
1477 }
1478 }
1479}
1480
1481impl Default for DatabasePooling {
1482 fn default() -> Self {
1483 Self {
1484 enabled: true,
1485 min: 2,
1486 max: 10,
1487 }
1488 }
1489}
1490
1491impl Default for EventLimits {
1492 fn default() -> Self {
1493 Self {
1494 max_channels_at_once: 100,
1495 max_name_length: 200,
1496 max_payload_in_kb: 100,
1497 max_batch_size: 10,
1498 }
1499 }
1500}
1501
1502impl Default for HttpApiConfig {
1503 fn default() -> Self {
1504 Self {
1505 request_limit_in_mb: 10,
1506 accept_traffic: AcceptTraffic::default(),
1507 usage_enabled: true,
1508 }
1509 }
1510}
1511
1512impl Default for AcceptTraffic {
1513 fn default() -> Self {
1514 Self {
1515 memory_threshold: 0.90,
1516 }
1517 }
1518}
1519
1520impl Default for InstanceConfig {
1521 fn default() -> Self {
1522 Self {
1523 process_id: uuid::Uuid::new_v4().to_string(),
1524 }
1525 }
1526}
1527
1528impl Default for LoggingConfig {
1529 fn default() -> Self {
1530 Self {
1531 colors_enabled: true,
1532 include_target: true,
1533 }
1534 }
1535}
1536
1537impl Default for MetricsConfig {
1538 fn default() -> Self {
1539 Self {
1540 enabled: true,
1541 driver: MetricsDriver::default(),
1542 host: "0.0.0.0".to_string(),
1543 prometheus: PrometheusConfig::default(),
1544 port: 9601,
1545 }
1546 }
1547}
1548
1549impl Default for PrometheusConfig {
1550 fn default() -> Self {
1551 Self {
1552 prefix: "sockudo_".to_string(),
1553 }
1554 }
1555}
1556
1557impl Default for PresenceConfig {
1558 fn default() -> Self {
1559 Self {
1560 max_members_per_channel: 100,
1561 max_member_size_in_kb: 2,
1562 }
1563 }
1564}
1565
1566impl Default for RedisQueueConfig {
1567 fn default() -> Self {
1568 Self {
1569 concurrency: 5,
1570 prefix: Some("sockudo_queue:".to_string()),
1571 url_override: None,
1572 cluster_mode: false,
1573 }
1574 }
1575}
1576
1577impl Default for RateLimit {
1578 fn default() -> Self {
1579 Self {
1580 max_requests: 60,
1581 window_seconds: 60,
1582 identifier: Some("default".to_string()),
1583 trust_hops: Some(0),
1584 }
1585 }
1586}
1587
1588impl Default for RateLimiterConfig {
1589 fn default() -> Self {
1590 Self {
1591 enabled: true,
1592 driver: CacheDriver::Memory,
1593 api_rate_limit: RateLimit {
1594 max_requests: 100,
1595 window_seconds: 60,
1596 identifier: Some("api".to_string()),
1597 trust_hops: Some(0),
1598 },
1599 websocket_rate_limit: RateLimit {
1600 max_requests: 20,
1601 window_seconds: 60,
1602 identifier: Some("websocket_connect".to_string()),
1603 trust_hops: Some(0),
1604 },
1605 redis: RedisConfig {
1606 prefix: Some("sockudo_rl:".to_string()),
1607 url_override: None,
1608 cluster_mode: false,
1609 },
1610 }
1611 }
1612}
1613
1614impl Default for SslConfig {
1615 fn default() -> Self {
1616 Self {
1617 enabled: false,
1618 cert_path: "".to_string(),
1619 key_path: "".to_string(),
1620 passphrase: None,
1621 ca_path: None,
1622 redirect_http: false,
1623 http_port: Some(80),
1624 }
1625 }
1626}
1627
1628impl Default for BatchingConfig {
1629 fn default() -> Self {
1630 Self {
1631 enabled: true,
1632 duration: 50,
1633 size: 100,
1634 }
1635 }
1636}
1637
1638impl Default for ClusterHealthConfig {
1639 fn default() -> Self {
1640 Self {
1641 enabled: true,
1642 heartbeat_interval_ms: 10000,
1643 node_timeout_ms: 30000,
1644 cleanup_interval_ms: 10000,
1645 }
1646 }
1647}
1648
1649impl Default for UnixSocketConfig {
1650 fn default() -> Self {
1651 Self {
1652 enabled: false,
1653 path: "/var/run/sockudo/sockudo.sock".to_string(),
1654 permission_mode: 0o660,
1655 }
1656 }
1657}
1658
1659impl Default for DeltaCompressionOptionsConfig {
1660 fn default() -> Self {
1661 Self {
1662 enabled: true,
1663 algorithm: "fossil".to_string(),
1664 full_message_interval: 10,
1665 min_message_size: 100,
1666 max_state_age_secs: 300,
1667 max_channel_states_per_socket: 100,
1668 max_conflation_states_per_channel: Some(100),
1669 conflation_key_path: None,
1670 cluster_coordination: false,
1671 omit_delta_algorithm: false,
1672 }
1673 }
1674}
1675
1676impl ClusterHealthConfig {
1677 pub fn validate(&self) -> Result<(), String> {
1678 if self.heartbeat_interval_ms == 0 {
1679 return Err("heartbeat_interval_ms must be greater than 0".to_string());
1680 }
1681 if self.node_timeout_ms == 0 {
1682 return Err("node_timeout_ms must be greater than 0".to_string());
1683 }
1684 if self.cleanup_interval_ms == 0 {
1685 return Err("cleanup_interval_ms must be greater than 0".to_string());
1686 }
1687
1688 if self.heartbeat_interval_ms > self.node_timeout_ms / 3 {
1689 return Err(format!(
1690 "heartbeat_interval_ms ({}) should be at least 3x smaller than node_timeout_ms ({}) to avoid false positive dead node detection. Recommended: heartbeat_interval_ms <= {}",
1691 self.heartbeat_interval_ms,
1692 self.node_timeout_ms,
1693 self.node_timeout_ms / 3
1694 ));
1695 }
1696
1697 if self.cleanup_interval_ms > self.node_timeout_ms {
1698 return Err(format!(
1699 "cleanup_interval_ms ({}) should not be larger than node_timeout_ms ({}) to ensure timely dead node detection",
1700 self.cleanup_interval_ms, self.node_timeout_ms
1701 ));
1702 }
1703
1704 Ok(())
1705 }
1706}
1707
1708impl ServerOptions {
1709 pub async fn load_from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
1710 let content = tokio::fs::read_to_string(path).await?;
1711 let options: Self = sonic_rs::from_str(&content)?;
1712 Ok(options)
1713 }
1714
1715 pub async fn override_from_env(&mut self) -> Result<(), Box<dyn std::error::Error>> {
1716 if let Ok(mode) = std::env::var("ENVIRONMENT") {
1718 self.mode = mode;
1719 }
1720 self.debug = parse_bool_env("DEBUG_MODE", self.debug);
1721 if parse_bool_env("DEBUG", false) {
1722 self.debug = true;
1723 info!("DEBUG environment variable forces debug mode ON");
1724 }
1725
1726 self.activity_timeout = parse_env::<u64>("ACTIVITY_TIMEOUT", self.activity_timeout);
1727
1728 if let Ok(host) = std::env::var("HOST") {
1729 self.host = host;
1730 }
1731 self.port = parse_env::<u16>("PORT", self.port);
1732 self.shutdown_grace_period =
1733 parse_env::<u64>("SHUTDOWN_GRACE_PERIOD", self.shutdown_grace_period);
1734 self.user_authentication_timeout = parse_env::<u64>(
1735 "USER_AUTHENTICATION_TIMEOUT",
1736 self.user_authentication_timeout,
1737 );
1738 self.websocket_max_payload_kb =
1739 parse_env::<u32>("WEBSOCKET_MAX_PAYLOAD_KB", self.websocket_max_payload_kb);
1740 if let Ok(id) = std::env::var("INSTANCE_PROCESS_ID") {
1741 self.instance.process_id = id;
1742 }
1743
1744 if let Ok(driver_str) = std::env::var("ADAPTER_DRIVER") {
1746 self.adapter.driver =
1747 parse_driver_enum(driver_str, self.adapter.driver.clone(), "Adapter");
1748 }
1749 self.adapter.buffer_multiplier_per_cpu = parse_env::<usize>(
1750 "ADAPTER_BUFFER_MULTIPLIER_PER_CPU",
1751 self.adapter.buffer_multiplier_per_cpu,
1752 );
1753 self.adapter.enable_socket_counting = parse_env::<bool>(
1754 "ADAPTER_ENABLE_SOCKET_COUNTING",
1755 self.adapter.enable_socket_counting,
1756 );
1757 if let Ok(driver_str) = std::env::var("CACHE_DRIVER") {
1758 self.cache.driver = parse_driver_enum(driver_str, self.cache.driver.clone(), "Cache");
1759 }
1760 if let Ok(driver_str) = std::env::var("QUEUE_DRIVER") {
1761 self.queue.driver = parse_driver_enum(driver_str, self.queue.driver.clone(), "Queue");
1762 }
1763 if let Ok(driver_str) = std::env::var("APP_MANAGER_DRIVER") {
1764 self.app_manager.driver =
1765 parse_driver_enum(driver_str, self.app_manager.driver.clone(), "AppManager");
1766 }
1767 if let Ok(driver_str) = std::env::var("RATE_LIMITER_DRIVER") {
1768 self.rate_limiter.driver = parse_driver_enum(
1769 driver_str,
1770 self.rate_limiter.driver.clone(),
1771 "RateLimiter Backend",
1772 );
1773 }
1774
1775 if let Ok(host) = std::env::var("DATABASE_REDIS_HOST") {
1777 self.database.redis.host = host;
1778 }
1779 self.database.redis.port =
1780 parse_env::<u16>("DATABASE_REDIS_PORT", self.database.redis.port);
1781 if let Ok(username) = std::env::var("DATABASE_REDIS_USERNAME") {
1782 self.database.redis.username = if username.is_empty() {
1783 None
1784 } else {
1785 Some(username)
1786 };
1787 }
1788 if let Ok(password) = std::env::var("DATABASE_REDIS_PASSWORD") {
1789 self.database.redis.password = Some(password);
1790 }
1791 self.database.redis.db = parse_env::<u32>("DATABASE_REDIS_DB", self.database.redis.db);
1792 if let Ok(prefix) = std::env::var("DATABASE_REDIS_KEY_PREFIX") {
1793 self.database.redis.key_prefix = prefix;
1794 }
1795 if let Ok(cluster_username) = std::env::var("DATABASE_REDIS_CLUSTER_USERNAME") {
1796 self.database.redis.cluster.username = if cluster_username.is_empty() {
1797 None
1798 } else {
1799 Some(cluster_username)
1800 };
1801 }
1802 if let Ok(cluster_password) = std::env::var("DATABASE_REDIS_CLUSTER_PASSWORD") {
1803 self.database.redis.cluster.password = Some(cluster_password);
1804 }
1805 self.database.redis.cluster.use_tls = parse_bool_env(
1806 "DATABASE_REDIS_CLUSTER_USE_TLS",
1807 self.database.redis.cluster.use_tls,
1808 );
1809
1810 if let Ok(host) = std::env::var("DATABASE_MYSQL_HOST") {
1812 self.database.mysql.host = host;
1813 }
1814 self.database.mysql.port =
1815 parse_env::<u16>("DATABASE_MYSQL_PORT", self.database.mysql.port);
1816 if let Ok(user) = std::env::var("DATABASE_MYSQL_USERNAME") {
1817 self.database.mysql.username = user;
1818 }
1819 if let Ok(pass) = std::env::var("DATABASE_MYSQL_PASSWORD") {
1820 self.database.mysql.password = pass;
1821 }
1822 if let Ok(db) = std::env::var("DATABASE_MYSQL_DATABASE") {
1823 self.database.mysql.database = db;
1824 }
1825 if let Ok(table) = std::env::var("DATABASE_MYSQL_TABLE_NAME") {
1826 self.database.mysql.table_name = table;
1827 }
1828 override_db_pool_settings(&mut self.database.mysql, "DATABASE_MYSQL");
1829
1830 if let Ok(host) = std::env::var("DATABASE_POSTGRES_HOST") {
1832 self.database.postgres.host = host;
1833 }
1834 self.database.postgres.port =
1835 parse_env::<u16>("DATABASE_POSTGRES_PORT", self.database.postgres.port);
1836 if let Ok(user) = std::env::var("DATABASE_POSTGRES_USERNAME") {
1837 self.database.postgres.username = user;
1838 }
1839 if let Ok(pass) = std::env::var("DATABASE_POSTGRES_PASSWORD") {
1840 self.database.postgres.password = pass;
1841 }
1842 if let Ok(db) = std::env::var("DATABASE_POSTGRES_DATABASE") {
1843 self.database.postgres.database = db;
1844 }
1845 override_db_pool_settings(&mut self.database.postgres, "DATABASE_POSTGRES");
1846
1847 if let Ok(region) = std::env::var("DATABASE_DYNAMODB_REGION") {
1849 self.database.dynamodb.region = region;
1850 }
1851 if let Ok(table) = std::env::var("DATABASE_DYNAMODB_TABLE_NAME") {
1852 self.database.dynamodb.table_name = table;
1853 }
1854 if let Ok(endpoint) = std::env::var("DATABASE_DYNAMODB_ENDPOINT_URL") {
1855 self.database.dynamodb.endpoint_url = Some(endpoint);
1856 }
1857 if let Ok(key_id) = std::env::var("AWS_ACCESS_KEY_ID") {
1858 self.database.dynamodb.aws_access_key_id = Some(key_id);
1859 }
1860 if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") {
1861 self.database.dynamodb.aws_secret_access_key = Some(secret);
1862 }
1863
1864 let apply_redis_cluster_nodes = |options: &mut Self, nodes: &str| {
1866 let node_list: Vec<String> = nodes
1867 .split(',')
1868 .map(|s| s.trim())
1869 .filter(|s| !s.is_empty())
1870 .map(ToString::to_string)
1871 .collect();
1872
1873 options.adapter.cluster.nodes = node_list.clone();
1874 options.queue.redis_cluster.nodes = node_list.clone();
1875
1876 let parsed_nodes: Vec<ClusterNode> = node_list
1877 .iter()
1878 .filter_map(|seed| ClusterNode::from_seed(seed))
1879 .collect();
1880 options.database.redis.cluster.nodes = parsed_nodes.clone();
1881 options.database.redis.cluster_nodes = parsed_nodes;
1882 };
1883
1884 if let Ok(nodes) = std::env::var("REDIS_CLUSTER_NODES") {
1885 apply_redis_cluster_nodes(self, &nodes);
1886 }
1887 if let Ok(nodes) = std::env::var("DATABASE_REDIS_CLUSTER_NODES") {
1888 apply_redis_cluster_nodes(self, &nodes);
1889 }
1890 self.queue.redis_cluster.concurrency = parse_env::<u32>(
1891 "REDIS_CLUSTER_QUEUE_CONCURRENCY",
1892 self.queue.redis_cluster.concurrency,
1893 );
1894 if let Ok(prefix) = std::env::var("REDIS_CLUSTER_QUEUE_PREFIX") {
1895 self.queue.redis_cluster.prefix = Some(prefix);
1896 }
1897
1898 self.ssl.enabled = parse_bool_env("SSL_ENABLED", self.ssl.enabled);
1900 if let Ok(val) = std::env::var("SSL_CERT_PATH") {
1901 self.ssl.cert_path = val;
1902 }
1903 if let Ok(val) = std::env::var("SSL_KEY_PATH") {
1904 self.ssl.key_path = val;
1905 }
1906 self.ssl.redirect_http = parse_bool_env("SSL_REDIRECT_HTTP", self.ssl.redirect_http);
1907 if let Some(port) = parse_env_optional::<u16>("SSL_HTTP_PORT") {
1908 self.ssl.http_port = Some(port);
1909 }
1910
1911 self.unix_socket.enabled = parse_bool_env("UNIX_SOCKET_ENABLED", self.unix_socket.enabled);
1913 if let Ok(path) = std::env::var("UNIX_SOCKET_PATH") {
1914 self.unix_socket.path = path;
1915 }
1916 if let Ok(mode_str) = std::env::var("UNIX_SOCKET_PERMISSION_MODE") {
1917 if mode_str.chars().all(|c| c.is_digit(8)) {
1918 if let Ok(mode) = u32::from_str_radix(&mode_str, 8) {
1919 if mode <= 0o777 {
1920 self.unix_socket.permission_mode = mode;
1921 } else {
1922 warn!(
1923 "UNIX_SOCKET_PERMISSION_MODE '{}' exceeds maximum value 777. Using default: {:o}",
1924 mode_str, self.unix_socket.permission_mode
1925 );
1926 }
1927 } else {
1928 warn!(
1929 "Failed to parse UNIX_SOCKET_PERMISSION_MODE '{}' as octal. Using default: {:o}",
1930 mode_str, self.unix_socket.permission_mode
1931 );
1932 }
1933 } else {
1934 warn!(
1935 "UNIX_SOCKET_PERMISSION_MODE '{}' must contain only octal digits (0-7). Using default: {:o}",
1936 mode_str, self.unix_socket.permission_mode
1937 );
1938 }
1939 }
1940
1941 if let Ok(driver_str) = std::env::var("METRICS_DRIVER") {
1943 self.metrics.driver =
1944 parse_driver_enum(driver_str, self.metrics.driver.clone(), "Metrics");
1945 }
1946 self.metrics.enabled = parse_bool_env("METRICS_ENABLED", self.metrics.enabled);
1947 if let Ok(val) = std::env::var("METRICS_HOST") {
1948 self.metrics.host = val;
1949 }
1950 self.metrics.port = parse_env::<u16>("METRICS_PORT", self.metrics.port);
1951 if let Ok(val) = std::env::var("METRICS_PROMETHEUS_PREFIX") {
1952 self.metrics.prometheus.prefix = val;
1953 }
1954
1955 self.http_api.usage_enabled =
1957 parse_bool_env("HTTP_API_USAGE_ENABLED", self.http_api.usage_enabled);
1958
1959 self.rate_limiter.enabled =
1961 parse_bool_env("RATE_LIMITER_ENABLED", self.rate_limiter.enabled);
1962 self.rate_limiter.api_rate_limit.max_requests = parse_env::<u32>(
1963 "RATE_LIMITER_API_MAX_REQUESTS",
1964 self.rate_limiter.api_rate_limit.max_requests,
1965 );
1966 self.rate_limiter.api_rate_limit.window_seconds = parse_env::<u64>(
1967 "RATE_LIMITER_API_WINDOW_SECONDS",
1968 self.rate_limiter.api_rate_limit.window_seconds,
1969 );
1970 if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_API_TRUST_HOPS") {
1971 self.rate_limiter.api_rate_limit.trust_hops = Some(hops);
1972 }
1973 self.rate_limiter.websocket_rate_limit.max_requests = parse_env::<u32>(
1974 "RATE_LIMITER_WS_MAX_REQUESTS",
1975 self.rate_limiter.websocket_rate_limit.max_requests,
1976 );
1977 self.rate_limiter.websocket_rate_limit.window_seconds = parse_env::<u64>(
1978 "RATE_LIMITER_WS_WINDOW_SECONDS",
1979 self.rate_limiter.websocket_rate_limit.window_seconds,
1980 );
1981 if let Ok(prefix) = std::env::var("RATE_LIMITER_REDIS_PREFIX") {
1982 self.rate_limiter.redis.prefix = Some(prefix);
1983 }
1984
1985 self.queue.redis.concurrency =
1987 parse_env::<u32>("QUEUE_REDIS_CONCURRENCY", self.queue.redis.concurrency);
1988 if let Ok(prefix) = std::env::var("QUEUE_REDIS_PREFIX") {
1989 self.queue.redis.prefix = Some(prefix);
1990 }
1991
1992 if let Ok(region) = std::env::var("QUEUE_SQS_REGION") {
1994 self.queue.sqs.region = region;
1995 }
1996 self.queue.sqs.visibility_timeout = parse_env::<i32>(
1997 "QUEUE_SQS_VISIBILITY_TIMEOUT",
1998 self.queue.sqs.visibility_timeout,
1999 );
2000 self.queue.sqs.max_messages =
2001 parse_env::<i32>("QUEUE_SQS_MAX_MESSAGES", self.queue.sqs.max_messages);
2002 self.queue.sqs.wait_time_seconds = parse_env::<i32>(
2003 "QUEUE_SQS_WAIT_TIME_SECONDS",
2004 self.queue.sqs.wait_time_seconds,
2005 );
2006 self.queue.sqs.concurrency =
2007 parse_env::<u32>("QUEUE_SQS_CONCURRENCY", self.queue.sqs.concurrency);
2008 self.queue.sqs.fifo = parse_bool_env("QUEUE_SQS_FIFO", self.queue.sqs.fifo);
2009 if let Ok(endpoint) = std::env::var("QUEUE_SQS_ENDPOINT_URL") {
2010 self.queue.sqs.endpoint_url = Some(endpoint);
2011 }
2012
2013 if let Ok(region) = std::env::var("QUEUE_SNS_REGION") {
2015 self.queue.sns.region = region;
2016 }
2017 if let Ok(topic_arn) = std::env::var("QUEUE_SNS_TOPIC_ARN") {
2018 self.queue.sns.topic_arn = topic_arn;
2019 }
2020 if let Ok(endpoint) = std::env::var("QUEUE_SNS_ENDPOINT_URL") {
2021 self.queue.sns.endpoint_url = Some(endpoint);
2022 }
2023
2024 self.webhooks.batching.enabled =
2026 parse_bool_env("WEBHOOK_BATCHING_ENABLED", self.webhooks.batching.enabled);
2027 self.webhooks.batching.duration =
2028 parse_env::<u64>("WEBHOOK_BATCHING_DURATION", self.webhooks.batching.duration);
2029 self.webhooks.batching.size =
2030 parse_env::<usize>("WEBHOOK_BATCHING_SIZE", self.webhooks.batching.size);
2031
2032 if let Ok(servers) = std::env::var("NATS_SERVERS") {
2034 self.adapter.nats.servers = servers.split(',').map(|s| s.trim().to_string()).collect();
2035 }
2036 if let Ok(user) = std::env::var("NATS_USERNAME") {
2037 self.adapter.nats.username = Some(user);
2038 }
2039 if let Ok(pass) = std::env::var("NATS_PASSWORD") {
2040 self.adapter.nats.password = Some(pass);
2041 }
2042 if let Ok(token) = std::env::var("NATS_TOKEN") {
2043 self.adapter.nats.token = Some(token);
2044 }
2045 self.adapter.nats.connection_timeout_ms = parse_env::<u64>(
2046 "NATS_CONNECTION_TIMEOUT_MS",
2047 self.adapter.nats.connection_timeout_ms,
2048 );
2049 self.adapter.nats.request_timeout_ms = parse_env::<u64>(
2050 "NATS_REQUEST_TIMEOUT_MS",
2051 self.adapter.nats.request_timeout_ms,
2052 );
2053
2054 if let Ok(origins) = std::env::var("CORS_ORIGINS") {
2056 let parsed: Vec<String> = origins.split(',').map(|s| s.trim().to_string()).collect();
2057 if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&parsed) {
2058 warn!(
2059 "CORS_ORIGINS contains invalid patterns: {}. Keeping previous CORS origins.",
2060 e
2061 );
2062 } else {
2063 self.cors.origin = parsed;
2064 }
2065 }
2066 if let Ok(methods) = std::env::var("CORS_METHODS") {
2067 self.cors.methods = methods.split(',').map(|s| s.trim().to_string()).collect();
2068 }
2069 if let Ok(headers) = std::env::var("CORS_HEADERS") {
2070 self.cors.allowed_headers = headers.split(',').map(|s| s.trim().to_string()).collect();
2071 }
2072 self.cors.credentials = parse_bool_env("CORS_CREDENTIALS", self.cors.credentials);
2073
2074 self.database_pooling.enabled =
2076 parse_bool_env("DATABASE_POOLING_ENABLED", self.database_pooling.enabled);
2077 if let Some(min) = parse_env_optional::<u32>("DATABASE_POOL_MIN") {
2078 self.database_pooling.min = min;
2079 }
2080 if let Some(max) = parse_env_optional::<u32>("DATABASE_POOL_MAX") {
2081 self.database_pooling.max = max;
2082 }
2083
2084 if let Some(pool_size) = parse_env_optional::<u32>("DATABASE_CONNECTION_POOL_SIZE") {
2085 self.database.mysql.connection_pool_size = pool_size;
2086 self.database.postgres.connection_pool_size = pool_size;
2087 }
2088 if let Some(cache_ttl) = parse_env_optional::<u64>("CACHE_TTL_SECONDS") {
2089 self.app_manager.cache.ttl = cache_ttl;
2090 self.channel_limits.cache_ttl = cache_ttl;
2091 self.database.mysql.cache_ttl = cache_ttl;
2092 self.database.postgres.cache_ttl = cache_ttl;
2093 self.cache.memory.ttl = cache_ttl;
2094 }
2095 if let Some(cleanup_interval) = parse_env_optional::<u64>("CACHE_CLEANUP_INTERVAL") {
2096 self.database.mysql.cache_cleanup_interval = cleanup_interval;
2097 self.database.postgres.cache_cleanup_interval = cleanup_interval;
2098 self.cache.memory.cleanup_interval = cleanup_interval;
2099 }
2100 if let Some(max_capacity) = parse_env_optional::<u64>("CACHE_MAX_CAPACITY") {
2101 self.database.mysql.cache_max_capacity = max_capacity;
2102 self.database.postgres.cache_max_capacity = max_capacity;
2103 self.cache.memory.max_capacity = max_capacity;
2104 }
2105
2106 let default_app_id = std::env::var("SOCKUDO_DEFAULT_APP_ID");
2107 let default_app_key = std::env::var("SOCKUDO_DEFAULT_APP_KEY");
2108 let default_app_secret = std::env::var("SOCKUDO_DEFAULT_APP_SECRET");
2109 let default_app_enabled = parse_bool_env("SOCKUDO_DEFAULT_APP_ENABLED", true);
2110
2111 if let (Ok(app_id), Ok(app_key), Ok(app_secret)) =
2112 (default_app_id, default_app_key, default_app_secret)
2113 && default_app_enabled
2114 {
2115 let default_app = App {
2116 id: app_id,
2117 key: app_key,
2118 secret: app_secret,
2119 enable_client_messages: parse_bool_env("SOCKUDO_ENABLE_CLIENT_MESSAGES", false),
2120 enabled: default_app_enabled,
2121 max_connections: parse_env::<u32>("SOCKUDO_DEFAULT_APP_MAX_CONNECTIONS", 100),
2122 max_client_events_per_second: parse_env::<u32>(
2123 "SOCKUDO_DEFAULT_APP_MAX_CLIENT_EVENTS_PER_SECOND",
2124 100,
2125 ),
2126 max_read_requests_per_second: Some(parse_env::<u32>(
2127 "SOCKUDO_DEFAULT_APP_MAX_READ_REQUESTS_PER_SECOND",
2128 100,
2129 )),
2130 max_presence_members_per_channel: Some(parse_env::<u32>(
2131 "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBERS_PER_CHANNEL",
2132 100,
2133 )),
2134 max_presence_member_size_in_kb: Some(parse_env::<u32>(
2135 "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBER_SIZE_IN_KB",
2136 100,
2137 )),
2138 max_channel_name_length: Some(parse_env::<u32>(
2139 "SOCKUDO_DEFAULT_APP_MAX_CHANNEL_NAME_LENGTH",
2140 100,
2141 )),
2142 max_event_channels_at_once: Some(parse_env::<u32>(
2143 "SOCKUDO_DEFAULT_APP_MAX_EVENT_CHANNELS_AT_ONCE",
2144 100,
2145 )),
2146 max_event_name_length: Some(parse_env::<u32>(
2147 "SOCKUDO_DEFAULT_APP_MAX_EVENT_NAME_LENGTH",
2148 100,
2149 )),
2150 max_event_payload_in_kb: Some(parse_env::<u32>(
2151 "SOCKUDO_DEFAULT_APP_MAX_EVENT_PAYLOAD_IN_KB",
2152 100,
2153 )),
2154 max_event_batch_size: Some(parse_env::<u32>(
2155 "SOCKUDO_DEFAULT_APP_MAX_EVENT_BATCH_SIZE",
2156 100,
2157 )),
2158 enable_user_authentication: Some(parse_bool_env(
2159 "SOCKUDO_DEFAULT_APP_ENABLE_USER_AUTHENTICATION",
2160 false,
2161 )),
2162 webhooks: None,
2163 max_backend_events_per_second: Some(parse_env::<u32>(
2164 "SOCKUDO_DEFAULT_APP_MAX_BACKEND_EVENTS_PER_SECOND",
2165 100,
2166 )),
2167 enable_watchlist_events: Some(parse_bool_env(
2168 "SOCKUDO_DEFAULT_APP_ENABLE_WATCHLIST_EVENTS",
2169 false,
2170 )),
2171 allowed_origins: {
2172 if let Ok(origins_str) = std::env::var("SOCKUDO_DEFAULT_APP_ALLOWED_ORIGINS") {
2173 if !origins_str.is_empty() {
2174 Some(
2175 origins_str
2176 .split(',')
2177 .map(|s| s.trim().to_string())
2178 .collect(),
2179 )
2180 } else {
2181 None
2182 }
2183 } else {
2184 None
2185 }
2186 },
2187 channel_delta_compression: None,
2188 };
2189
2190 self.app_manager.array.apps.push(default_app);
2191 info!("Successfully registered default app from env");
2192 }
2193
2194 if let Ok(redis_url_env) = std::env::var("REDIS_URL") {
2196 info!("Applying REDIS_URL environment variable override");
2197
2198 let redis_url_json = sonic_rs::json!(redis_url_env);
2199
2200 self.adapter
2201 .redis
2202 .redis_pub_options
2203 .insert("url".to_string(), redis_url_json.clone());
2204 self.adapter
2205 .redis
2206 .redis_sub_options
2207 .insert("url".to_string(), redis_url_json);
2208
2209 self.cache.redis.url_override = Some(redis_url_env.clone());
2210 self.queue.redis.url_override = Some(redis_url_env.clone());
2211 self.rate_limiter.redis.url_override = Some(redis_url_env);
2212 }
2213
2214 let has_colors_env = std::env::var("LOG_COLORS_ENABLED").is_ok();
2216 let has_target_env = std::env::var("LOG_INCLUDE_TARGET").is_ok();
2217 if has_colors_env || has_target_env {
2218 let logging_config = self.logging.get_or_insert_with(Default::default);
2219 if has_colors_env {
2220 logging_config.colors_enabled =
2221 parse_bool_env("LOG_COLORS_ENABLED", logging_config.colors_enabled);
2222 }
2223 if has_target_env {
2224 logging_config.include_target =
2225 parse_bool_env("LOG_INCLUDE_TARGET", logging_config.include_target);
2226 }
2227 }
2228
2229 self.cleanup.async_enabled =
2231 parse_bool_env("CLEANUP_ASYNC_ENABLED", self.cleanup.async_enabled);
2232 self.cleanup.fallback_to_sync =
2233 parse_bool_env("CLEANUP_FALLBACK_TO_SYNC", self.cleanup.fallback_to_sync);
2234 self.cleanup.queue_buffer_size =
2235 parse_env::<usize>("CLEANUP_QUEUE_BUFFER_SIZE", self.cleanup.queue_buffer_size);
2236 self.cleanup.batch_size = parse_env::<usize>("CLEANUP_BATCH_SIZE", self.cleanup.batch_size);
2237 self.cleanup.batch_timeout_ms =
2238 parse_env::<u64>("CLEANUP_BATCH_TIMEOUT_MS", self.cleanup.batch_timeout_ms);
2239 if let Ok(worker_threads_str) = std::env::var("CLEANUP_WORKER_THREADS") {
2240 self.cleanup.worker_threads = if worker_threads_str.to_lowercase() == "auto" {
2241 WorkerThreadsConfig::Auto
2242 } else if let Ok(n) = worker_threads_str.parse::<usize>() {
2243 WorkerThreadsConfig::Fixed(n)
2244 } else {
2245 warn!(
2246 "Invalid CLEANUP_WORKER_THREADS value '{}', keeping current setting",
2247 worker_threads_str
2248 );
2249 self.cleanup.worker_threads.clone()
2250 };
2251 }
2252 self.cleanup.max_retry_attempts = parse_env::<u32>(
2253 "CLEANUP_MAX_RETRY_ATTEMPTS",
2254 self.cleanup.max_retry_attempts,
2255 );
2256
2257 self.cluster_health.enabled =
2259 parse_bool_env("CLUSTER_HEALTH_ENABLED", self.cluster_health.enabled);
2260 self.cluster_health.heartbeat_interval_ms = parse_env::<u64>(
2261 "CLUSTER_HEALTH_HEARTBEAT_INTERVAL",
2262 self.cluster_health.heartbeat_interval_ms,
2263 );
2264 self.cluster_health.node_timeout_ms = parse_env::<u64>(
2265 "CLUSTER_HEALTH_NODE_TIMEOUT",
2266 self.cluster_health.node_timeout_ms,
2267 );
2268 self.cluster_health.cleanup_interval_ms = parse_env::<u64>(
2269 "CLUSTER_HEALTH_CLEANUP_INTERVAL",
2270 self.cluster_health.cleanup_interval_ms,
2271 );
2272
2273 self.tag_filtering.enabled =
2275 parse_bool_env("TAG_FILTERING_ENABLED", self.tag_filtering.enabled);
2276
2277 if let Ok(val) = std::env::var("WEBSOCKET_MAX_MESSAGES") {
2279 if val.to_lowercase() == "none" || val == "0" {
2280 self.websocket.max_messages = None;
2281 } else if let Ok(n) = val.parse::<usize>() {
2282 self.websocket.max_messages = Some(n);
2283 }
2284 }
2285 if let Ok(val) = std::env::var("WEBSOCKET_MAX_BYTES") {
2286 if val.to_lowercase() == "none" || val == "0" {
2287 self.websocket.max_bytes = None;
2288 } else if let Ok(n) = val.parse::<usize>() {
2289 self.websocket.max_bytes = Some(n);
2290 }
2291 }
2292 self.websocket.disconnect_on_buffer_full = parse_bool_env(
2293 "WEBSOCKET_DISCONNECT_ON_BUFFER_FULL",
2294 self.websocket.disconnect_on_buffer_full,
2295 );
2296 self.websocket.max_message_size = parse_env::<usize>(
2297 "WEBSOCKET_MAX_MESSAGE_SIZE",
2298 self.websocket.max_message_size,
2299 );
2300 self.websocket.max_frame_size =
2301 parse_env::<usize>("WEBSOCKET_MAX_FRAME_SIZE", self.websocket.max_frame_size);
2302 self.websocket.write_buffer_size = parse_env::<usize>(
2303 "WEBSOCKET_WRITE_BUFFER_SIZE",
2304 self.websocket.write_buffer_size,
2305 );
2306 self.websocket.max_backpressure = parse_env::<usize>(
2307 "WEBSOCKET_MAX_BACKPRESSURE",
2308 self.websocket.max_backpressure,
2309 );
2310 self.websocket.auto_ping = parse_bool_env("WEBSOCKET_AUTO_PING", self.websocket.auto_ping);
2311 self.websocket.ping_interval =
2312 parse_env::<u32>("WEBSOCKET_PING_INTERVAL", self.websocket.ping_interval);
2313 self.websocket.idle_timeout =
2314 parse_env::<u32>("WEBSOCKET_IDLE_TIMEOUT", self.websocket.idle_timeout);
2315 if let Ok(mode) = std::env::var("WEBSOCKET_COMPRESSION") {
2316 self.websocket.compression = mode;
2317 }
2318
2319 Ok(())
2320 }
2321
2322 pub fn validate(&self) -> Result<(), String> {
2323 if self.unix_socket.enabled {
2324 if self.unix_socket.path.is_empty() {
2325 return Err(
2326 "Unix socket path cannot be empty when Unix socket is enabled".to_string(),
2327 );
2328 }
2329
2330 self.validate_unix_socket_security()?;
2331
2332 if self.ssl.enabled {
2333 tracing::warn!(
2334 "Both Unix socket and SSL are enabled. This is unusual as Unix sockets are typically used behind reverse proxies that handle SSL termination."
2335 );
2336 }
2337
2338 if self.unix_socket.permission_mode > 0o777 {
2339 return Err(format!(
2340 "Unix socket permission_mode ({:o}) is invalid. Must be a valid octal mode (0o000 to 0o777)",
2341 self.unix_socket.permission_mode
2342 ));
2343 }
2344 }
2345
2346 if let Err(e) = self.cleanup.validate() {
2347 return Err(format!("Invalid cleanup configuration: {}", e));
2348 }
2349
2350 Ok(())
2351 }
2352
2353 fn validate_unix_socket_security(&self) -> Result<(), String> {
2354 let path = &self.unix_socket.path;
2355
2356 if path.contains("../") || path.contains("..\\") {
2357 return Err(
2358 "Unix socket path contains directory traversal sequences (../). This is not allowed for security reasons.".to_string()
2359 );
2360 }
2361
2362 if self.unix_socket.permission_mode & 0o002 != 0 {
2363 warn!(
2364 "Unix socket permission mode ({:o}) allows world write access. This may be a security risk. Consider using more restrictive permissions like 0o660 or 0o750.",
2365 self.unix_socket.permission_mode
2366 );
2367 }
2368
2369 if self.unix_socket.permission_mode & 0o007 > 0o005 {
2370 warn!(
2371 "Unix socket permission mode ({:o}) grants write permissions to others. Consider using more restrictive permissions.",
2372 self.unix_socket.permission_mode
2373 );
2374 }
2375
2376 if self.mode == "production" && path.starts_with("/tmp/") {
2377 warn!(
2378 "Unix socket path '{}' is in /tmp directory. In production, consider using a more permanent location like /var/run/sockudo/ for better security and persistence.",
2379 path
2380 );
2381 }
2382
2383 if !path.starts_with('/') {
2384 return Err(
2385 "Unix socket path must be absolute (start with /) for security and reliability."
2386 .to_string(),
2387 );
2388 }
2389
2390 Ok(())
2391 }
2392}
2393
2394#[cfg(test)]
2395mod redis_connection_tests {
2396 use super::{ClusterNode, RedisClusterConnection, RedisConnection, RedisSentinel};
2397
2398 #[test]
2399 fn test_standard_url_basic() {
2400 let conn = RedisConnection {
2401 host: "127.0.0.1".to_string(),
2402 port: 6379,
2403 db: 0,
2404 username: None,
2405 password: None,
2406 key_prefix: "sockudo:".to_string(),
2407 sentinels: Vec::new(),
2408 sentinel_password: None,
2409 name: "mymaster".to_string(),
2410 cluster: RedisClusterConnection::default(),
2411 cluster_nodes: Vec::new(),
2412 };
2413 assert_eq!(conn.to_url(), "redis://127.0.0.1:6379/0");
2414 }
2415
2416 #[test]
2417 fn test_standard_url_with_password() {
2418 let conn = RedisConnection {
2419 host: "127.0.0.1".to_string(),
2420 port: 6379,
2421 db: 2,
2422 username: None,
2423 password: Some("secret".to_string()),
2424 key_prefix: "sockudo:".to_string(),
2425 sentinels: Vec::new(),
2426 sentinel_password: None,
2427 name: "mymaster".to_string(),
2428 cluster: RedisClusterConnection::default(),
2429 cluster_nodes: Vec::new(),
2430 };
2431 assert_eq!(conn.to_url(), "redis://:secret@127.0.0.1:6379/2");
2432 }
2433
2434 #[test]
2435 fn test_standard_url_with_username_and_password() {
2436 let conn = RedisConnection {
2437 host: "redis.example.com".to_string(),
2438 port: 6380,
2439 db: 1,
2440 username: Some("admin".to_string()),
2441 password: Some("pass123".to_string()),
2442 key_prefix: "sockudo:".to_string(),
2443 sentinels: Vec::new(),
2444 sentinel_password: None,
2445 name: "mymaster".to_string(),
2446 cluster: RedisClusterConnection::default(),
2447 cluster_nodes: Vec::new(),
2448 };
2449 assert_eq!(
2450 conn.to_url(),
2451 "redis://admin:pass123@redis.example.com:6380/1"
2452 );
2453 }
2454
2455 #[test]
2456 fn test_standard_url_with_special_chars_in_password() {
2457 let conn = RedisConnection {
2458 host: "127.0.0.1".to_string(),
2459 port: 6379,
2460 db: 0,
2461 username: None,
2462 password: Some("pass@word#123".to_string()),
2463 key_prefix: "sockudo:".to_string(),
2464 sentinels: Vec::new(),
2465 sentinel_password: None,
2466 name: "mymaster".to_string(),
2467 cluster: RedisClusterConnection::default(),
2468 cluster_nodes: Vec::new(),
2469 };
2470 assert_eq!(conn.to_url(), "redis://:pass%40word%23123@127.0.0.1:6379/0");
2471 }
2472
2473 #[test]
2474 fn test_is_sentinel_configured_false() {
2475 let conn = RedisConnection::default();
2476 assert!(!conn.is_sentinel_configured());
2477 }
2478
2479 #[test]
2480 fn test_is_sentinel_configured_true() {
2481 let conn = RedisConnection {
2482 sentinels: vec![RedisSentinel {
2483 host: "sentinel1".to_string(),
2484 port: 26379,
2485 }],
2486 ..Default::default()
2487 };
2488 assert!(conn.is_sentinel_configured());
2489 }
2490
2491 #[test]
2492 fn test_sentinel_url_basic() {
2493 let conn = RedisConnection {
2494 host: "127.0.0.1".to_string(),
2495 port: 6379,
2496 db: 0,
2497 username: None,
2498 password: None,
2499 key_prefix: "sockudo:".to_string(),
2500 sentinels: vec![
2501 RedisSentinel {
2502 host: "sentinel1".to_string(),
2503 port: 26379,
2504 },
2505 RedisSentinel {
2506 host: "sentinel2".to_string(),
2507 port: 26379,
2508 },
2509 ],
2510 sentinel_password: None,
2511 name: "mymaster".to_string(),
2512 cluster: RedisClusterConnection::default(),
2513 cluster_nodes: Vec::new(),
2514 };
2515 assert_eq!(
2516 conn.to_url(),
2517 "redis+sentinel://sentinel1:26379,sentinel2:26379/mymaster/0"
2518 );
2519 }
2520
2521 #[test]
2522 fn test_sentinel_url_with_sentinel_password() {
2523 let conn = RedisConnection {
2524 host: "127.0.0.1".to_string(),
2525 port: 6379,
2526 db: 0,
2527 username: None,
2528 password: None,
2529 key_prefix: "sockudo:".to_string(),
2530 sentinels: vec![RedisSentinel {
2531 host: "sentinel1".to_string(),
2532 port: 26379,
2533 }],
2534 sentinel_password: Some("sentinelpass".to_string()),
2535 name: "mymaster".to_string(),
2536 cluster: RedisClusterConnection::default(),
2537 cluster_nodes: Vec::new(),
2538 };
2539 assert_eq!(
2540 conn.to_url(),
2541 "redis+sentinel://:sentinelpass@sentinel1:26379/mymaster/0"
2542 );
2543 }
2544
2545 #[test]
2546 fn test_sentinel_url_with_master_password() {
2547 let conn = RedisConnection {
2548 host: "127.0.0.1".to_string(),
2549 port: 6379,
2550 db: 1,
2551 username: None,
2552 password: Some("masterpass".to_string()),
2553 key_prefix: "sockudo:".to_string(),
2554 sentinels: vec![RedisSentinel {
2555 host: "sentinel1".to_string(),
2556 port: 26379,
2557 }],
2558 sentinel_password: None,
2559 name: "mymaster".to_string(),
2560 cluster: RedisClusterConnection::default(),
2561 cluster_nodes: Vec::new(),
2562 };
2563 assert_eq!(
2564 conn.to_url(),
2565 "redis+sentinel://sentinel1:26379/mymaster/1?password=masterpass"
2566 );
2567 }
2568
2569 #[test]
2570 fn test_sentinel_url_with_all_auth() {
2571 let conn = RedisConnection {
2572 host: "127.0.0.1".to_string(),
2573 port: 6379,
2574 db: 2,
2575 username: Some("redisuser".to_string()),
2576 password: Some("redispass".to_string()),
2577 key_prefix: "sockudo:".to_string(),
2578 sentinels: vec![
2579 RedisSentinel {
2580 host: "sentinel1".to_string(),
2581 port: 26379,
2582 },
2583 RedisSentinel {
2584 host: "sentinel2".to_string(),
2585 port: 26380,
2586 },
2587 ],
2588 sentinel_password: Some("sentinelauth".to_string()),
2589 name: "production-master".to_string(),
2590 cluster: RedisClusterConnection::default(),
2591 cluster_nodes: Vec::new(),
2592 };
2593 assert_eq!(
2594 conn.to_url(),
2595 "redis+sentinel://:sentinelauth@sentinel1:26379,sentinel2:26380/production-master/2?password=redispass&username=redisuser"
2596 );
2597 }
2598
2599 #[test]
2600 fn test_sentinel_to_host_port() {
2601 let sentinel = RedisSentinel {
2602 host: "sentinel.example.com".to_string(),
2603 port: 26379,
2604 };
2605 assert_eq!(sentinel.to_host_port(), "sentinel.example.com:26379");
2606 }
2607
2608 #[test]
2609 fn test_cluster_node_urls_with_shared_cluster_auth_and_tls() {
2610 let conn = RedisConnection {
2611 cluster: RedisClusterConnection {
2612 nodes: vec![
2613 ClusterNode {
2614 host: "node1.secure-cluster.com".to_string(),
2615 port: 7000,
2616 },
2617 ClusterNode {
2618 host: "redis://node2.secure-cluster.com:7001".to_string(),
2619 port: 7001,
2620 },
2621 ClusterNode {
2622 host: "rediss://node3.secure-cluster.com".to_string(),
2623 port: 7002,
2624 },
2625 ],
2626 username: None,
2627 password: Some("cluster-secret".to_string()),
2628 use_tls: true,
2629 },
2630 ..Default::default()
2631 };
2632
2633 assert_eq!(
2634 conn.cluster_node_urls(),
2635 vec![
2636 "rediss://:cluster-secret@node1.secure-cluster.com:7000",
2637 "redis://:cluster-secret@node2.secure-cluster.com:7001",
2638 "rediss://:cluster-secret@node3.secure-cluster.com:7002",
2639 ]
2640 );
2641 }
2642
2643 #[test]
2644 fn test_cluster_node_urls_fallback_to_legacy_nodes() {
2645 let conn = RedisConnection {
2646 password: Some("fallback-secret".to_string()),
2647 cluster_nodes: vec![ClusterNode {
2648 host: "legacy-node.example.com".to_string(),
2649 port: 7000,
2650 }],
2651 ..Default::default()
2652 };
2653
2654 assert_eq!(
2655 conn.cluster_node_urls(),
2656 vec!["redis://:fallback-secret@legacy-node.example.com:7000"]
2657 );
2658 }
2659
2660 #[test]
2661 fn test_normalize_cluster_seed_urls() {
2662 let conn = RedisConnection {
2663 cluster: RedisClusterConnection {
2664 nodes: Vec::new(),
2665 username: Some("svc-user".to_string()),
2666 password: Some("svc-pass".to_string()),
2667 use_tls: true,
2668 },
2669 ..Default::default()
2670 };
2671
2672 let seeds = vec![
2673 "node1.example.com:7000".to_string(),
2674 "redis://node2.example.com:7001".to_string(),
2675 "rediss://node3.example.com".to_string(),
2676 ];
2677
2678 assert_eq!(
2679 conn.normalize_cluster_seed_urls(&seeds),
2680 vec![
2681 "rediss://svc-user:svc-pass@node1.example.com:7000",
2682 "redis://svc-user:svc-pass@node2.example.com:7001",
2683 "rediss://svc-user:svc-pass@node3.example.com:6379",
2684 ]
2685 );
2686 }
2687}
2688
2689#[cfg(test)]
2690mod cluster_node_tests {
2691 use super::ClusterNode;
2692
2693 #[test]
2694 fn test_to_url_basic_host() {
2695 let node = ClusterNode {
2696 host: "localhost".to_string(),
2697 port: 6379,
2698 };
2699 assert_eq!(node.to_url(), "redis://localhost:6379");
2700 }
2701
2702 #[test]
2703 fn test_to_url_ip_address() {
2704 let node = ClusterNode {
2705 host: "127.0.0.1".to_string(),
2706 port: 6379,
2707 };
2708 assert_eq!(node.to_url(), "redis://127.0.0.1:6379");
2709 }
2710
2711 #[test]
2712 fn test_to_url_with_redis_protocol() {
2713 let node = ClusterNode {
2714 host: "redis://example.com".to_string(),
2715 port: 6379,
2716 };
2717 assert_eq!(node.to_url(), "redis://example.com:6379");
2718 }
2719
2720 #[test]
2721 fn test_to_url_with_rediss_protocol() {
2722 let node = ClusterNode {
2723 host: "rediss://secure.example.com".to_string(),
2724 port: 6379,
2725 };
2726 assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
2727 }
2728
2729 #[test]
2730 fn test_to_url_with_rediss_protocol_and_port_in_url() {
2731 let node = ClusterNode {
2732 host: "rediss://secure.example.com:7000".to_string(),
2733 port: 6379,
2734 };
2735 assert_eq!(node.to_url(), "rediss://secure.example.com:7000");
2736 }
2737
2738 #[test]
2739 fn test_to_url_with_redis_protocol_and_port_in_url() {
2740 let node = ClusterNode {
2741 host: "redis://example.com:7001".to_string(),
2742 port: 6379,
2743 };
2744 assert_eq!(node.to_url(), "redis://example.com:7001");
2745 }
2746
2747 #[test]
2748 fn test_to_url_with_trailing_whitespace() {
2749 let node = ClusterNode {
2750 host: " rediss://secure.example.com ".to_string(),
2751 port: 6379,
2752 };
2753 assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
2754 }
2755
2756 #[test]
2757 fn test_to_url_custom_port() {
2758 let node = ClusterNode {
2759 host: "redis-cluster.example.com".to_string(),
2760 port: 7000,
2761 };
2762 assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7000");
2763 }
2764
2765 #[test]
2766 fn test_to_url_plain_host_with_port_in_host_field() {
2767 let node = ClusterNode {
2768 host: "redis-cluster.example.com:7010".to_string(),
2769 port: 7000,
2770 };
2771 assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7010");
2772 }
2773
2774 #[test]
2775 fn test_to_url_with_options_adds_auth_and_tls() {
2776 let node = ClusterNode {
2777 host: "node.example.com".to_string(),
2778 port: 7000,
2779 };
2780 assert_eq!(
2781 node.to_url_with_options(true, Some("svc-user"), Some("secret")),
2782 "rediss://svc-user:secret@node.example.com:7000"
2783 );
2784 }
2785
2786 #[test]
2787 fn test_to_url_with_options_keeps_embedded_auth() {
2788 let node = ClusterNode {
2789 host: "rediss://:node-secret@node.example.com:7000".to_string(),
2790 port: 7000,
2791 };
2792 assert_eq!(
2793 node.to_url_with_options(true, Some("global-user"), Some("global-secret")),
2794 "rediss://:node-secret@node.example.com:7000"
2795 );
2796 }
2797
2798 #[test]
2799 fn test_from_seed_parses_plain_host_port() {
2800 let node = ClusterNode::from_seed("cluster-node-1:7005").expect("node should parse");
2801 assert_eq!(node.host, "cluster-node-1");
2802 assert_eq!(node.port, 7005);
2803 }
2804
2805 #[test]
2806 fn test_from_seed_keeps_scheme_urls() {
2807 let node =
2808 ClusterNode::from_seed("rediss://secure.example.com:7005").expect("node should parse");
2809 assert_eq!(node.host, "rediss://secure.example.com:7005");
2810 assert_eq!(node.port, 7005);
2811 }
2812
2813 #[test]
2814 fn test_to_url_aws_elasticache_hostname() {
2815 let node = ClusterNode {
2816 host: "rediss://my-cluster.use1.cache.amazonaws.com".to_string(),
2817 port: 6379,
2818 };
2819 assert_eq!(
2820 node.to_url(),
2821 "rediss://my-cluster.use1.cache.amazonaws.com:6379"
2822 );
2823 }
2824
2825 #[test]
2826 fn test_to_url_with_ipv6_no_port() {
2827 let node = ClusterNode {
2828 host: "rediss://[::1]".to_string(),
2829 port: 6379,
2830 };
2831 assert_eq!(node.to_url(), "rediss://[::1]:6379");
2832 }
2833
2834 #[test]
2835 fn test_to_url_with_ipv6_and_port_in_url() {
2836 let node = ClusterNode {
2837 host: "rediss://[::1]:7000".to_string(),
2838 port: 6379,
2839 };
2840 assert_eq!(node.to_url(), "rediss://[::1]:7000");
2841 }
2842
2843 #[test]
2844 fn test_to_url_with_ipv6_full_address_no_port() {
2845 let node = ClusterNode {
2846 host: "rediss://[2001:db8::1]".to_string(),
2847 port: 6379,
2848 };
2849 assert_eq!(node.to_url(), "rediss://[2001:db8::1]:6379");
2850 }
2851
2852 #[test]
2853 fn test_to_url_with_ipv6_full_address_with_port() {
2854 let node = ClusterNode {
2855 host: "rediss://[2001:db8::1]:7000".to_string(),
2856 port: 6379,
2857 };
2858 assert_eq!(node.to_url(), "rediss://[2001:db8::1]:7000");
2859 }
2860
2861 #[test]
2862 fn test_to_url_with_redis_protocol_ipv6() {
2863 let node = ClusterNode {
2864 host: "redis://[::1]".to_string(),
2865 port: 6379,
2866 };
2867 assert_eq!(node.to_url(), "redis://[::1]:6379");
2868 }
2869}
2870
2871#[cfg(test)]
2872mod cors_config_tests {
2873 use super::CorsConfig;
2874
2875 fn cors_from_json(json: &str) -> sonic_rs::Result<CorsConfig> {
2876 sonic_rs::from_str(json)
2877 }
2878
2879 #[test]
2880 fn test_deserialize_valid_exact_origins() {
2881 let config =
2882 cors_from_json(r#"{"origin": ["https://example.com", "https://other.com"]}"#).unwrap();
2883 assert_eq!(config.origin.len(), 2);
2884 }
2885
2886 #[test]
2887 fn test_deserialize_valid_wildcard_patterns() {
2888 let config =
2889 cors_from_json(r#"{"origin": ["*.example.com", "https://*.staging.example.com"]}"#)
2890 .unwrap();
2891 assert_eq!(config.origin.len(), 2);
2892 }
2893
2894 #[test]
2895 fn test_deserialize_allows_special_markers() {
2896 assert!(cors_from_json(r#"{"origin": ["*"]}"#).is_ok());
2897 assert!(cors_from_json(r#"{"origin": ["Any"]}"#).is_ok());
2898 assert!(cors_from_json(r#"{"origin": ["any"]}"#).is_ok());
2899 assert!(cors_from_json(r#"{"origin": ["*", "https://example.com"]}"#).is_ok());
2900 }
2901
2902 #[test]
2903 fn test_deserialize_rejects_invalid_patterns() {
2904 assert!(cors_from_json(r#"{"origin": ["*.*bad"]}"#).is_err());
2905 assert!(cors_from_json(r#"{"origin": ["*."]}"#).is_err());
2906 assert!(cors_from_json(r#"{"origin": [""]}"#).is_err());
2907 assert!(cors_from_json(r#"{"origin": ["https://"]}"#).is_err());
2908 }
2909
2910 #[test]
2911 fn test_deserialize_rejects_mixed_valid_and_invalid() {
2912 assert!(cors_from_json(r#"{"origin": ["https://good.com", "*.*bad"]}"#).is_err());
2913 }
2914}