1use crate::app::App;
2use crate::utils::{parse_bool_env, parse_env, parse_env_optional};
3use ahash::AHashMap;
4use serde::{Deserialize, Serialize};
5use std::str::FromStr;
6use tracing::{info, warn};
7use url::Url;
8
9fn deserialize_octal_permission<'de, D>(deserializer: D) -> Result<u32, D::Error>
11where
12 D: serde::Deserializer<'de>,
13{
14 use serde::de::{self};
15
16 let s = String::deserialize(deserializer)?;
17
18 if !s.chars().all(|c| c.is_digit(8)) {
20 return Err(de::Error::custom(format!(
21 "invalid octal permission mode '{}': must contain only digits 0-7",
22 s
23 )));
24 }
25
26 let mode = u32::from_str_radix(&s, 8)
28 .map_err(|_| de::Error::custom(format!("invalid octal permission mode: {}", s)))?;
29
30 if mode > 0o777 {
32 return Err(de::Error::custom(format!(
33 "permission mode '{}' exceeds maximum value 777",
34 s
35 )));
36 }
37
38 Ok(mode)
39}
40
41fn parse_driver_enum<T: FromStr + Clone + std::fmt::Debug>(
43 driver_str: String,
44 default_driver: T,
45 driver_name: &str,
46) -> T
47where
48 <T as FromStr>::Err: std::fmt::Debug,
49{
50 match T::from_str(&driver_str.to_lowercase()) {
51 Ok(driver_enum) => driver_enum,
52 Err(e) => {
53 warn!(
54 "Failed to parse {} driver from string '{}': {:?}. Using default: {:?}.",
55 driver_name, driver_str, e, default_driver
56 );
57 default_driver
58 }
59 }
60}
61
62fn override_db_pool_settings(db_conn: &mut DatabaseConnection, prefix: &str) {
63 if let Some(min) = parse_env_optional::<u32>(&format!("{}_POOL_MIN", prefix)) {
64 db_conn.pool_min = Some(min);
65 }
66 if let Some(max) = parse_env_optional::<u32>(&format!("{}_POOL_MAX", prefix)) {
67 db_conn.pool_max = Some(max);
68 }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(rename_all = "lowercase")]
75pub enum AdapterDriver {
76 #[default]
77 Local,
78 Redis,
79 #[serde(rename = "redis-cluster")]
80 RedisCluster,
81 Nats,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
85#[serde(default)]
86pub struct DynamoDbSettings {
87 pub region: String,
88 pub table_name: String,
89 pub endpoint_url: Option<String>,
90 pub aws_access_key_id: Option<String>,
91 pub aws_secret_access_key: Option<String>,
92 pub aws_profile_name: Option<String>,
93}
94
95impl Default for DynamoDbSettings {
96 fn default() -> Self {
97 Self {
98 region: "us-east-1".to_string(),
99 table_name: "sockudo-applications".to_string(),
100 endpoint_url: None,
101 aws_access_key_id: None,
102 aws_secret_access_key: None,
103 aws_profile_name: None,
104 }
105 }
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
109#[serde(default)]
110pub struct ScyllaDbSettings {
111 pub nodes: Vec<String>,
112 pub keyspace: String,
113 pub table_name: String,
114 pub username: Option<String>,
115 pub password: Option<String>,
116 pub replication_class: String,
117 pub replication_factor: u32,
118}
119
120impl Default for ScyllaDbSettings {
121 fn default() -> Self {
122 Self {
123 nodes: vec!["127.0.0.1:9042".to_string()],
124 keyspace: "sockudo".to_string(),
125 table_name: "applications".to_string(),
126 username: None,
127 password: None,
128 replication_class: "SimpleStrategy".to_string(),
129 replication_factor: 3,
130 }
131 }
132}
133
134impl FromStr for AdapterDriver {
135 type Err = String;
136 fn from_str(s: &str) -> Result<Self, Self::Err> {
137 match s.to_lowercase().as_str() {
138 "local" => Ok(AdapterDriver::Local),
139 "redis" => Ok(AdapterDriver::Redis),
140 "redis-cluster" => Ok(AdapterDriver::RedisCluster),
141 "nats" => Ok(AdapterDriver::Nats),
142 _ => Err(format!("Unknown adapter driver: {s}")),
143 }
144 }
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
148#[serde(rename_all = "lowercase")]
149pub enum AppManagerDriver {
150 #[default]
151 Memory,
152 Mysql,
153 Dynamodb,
154 PgSql,
155 ScyllaDb,
156}
157impl FromStr for AppManagerDriver {
158 type Err = String;
159 fn from_str(s: &str) -> Result<Self, Self::Err> {
160 match s.to_lowercase().as_str() {
161 "memory" => Ok(AppManagerDriver::Memory),
162 "mysql" => Ok(AppManagerDriver::Mysql),
163 "dynamodb" => Ok(AppManagerDriver::Dynamodb),
164 "pgsql" | "postgres" | "postgresql" => Ok(AppManagerDriver::PgSql),
165 "scylladb" | "scylla" => Ok(AppManagerDriver::ScyllaDb),
166 _ => Err(format!("Unknown app manager driver: {s}")),
167 }
168 }
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
172#[serde(rename_all = "lowercase")]
173pub enum CacheDriver {
174 #[default]
175 Memory,
176 Redis,
177 #[serde(rename = "redis-cluster")]
178 RedisCluster,
179 None,
180}
181
182impl FromStr for CacheDriver {
183 type Err = String;
184 fn from_str(s: &str) -> Result<Self, Self::Err> {
185 match s.to_lowercase().as_str() {
186 "memory" => Ok(CacheDriver::Memory),
187 "redis" => Ok(CacheDriver::Redis),
188 "redis-cluster" => Ok(CacheDriver::RedisCluster),
189 "none" => Ok(CacheDriver::None),
190 _ => Err(format!("Unknown cache driver: {s}")),
191 }
192 }
193}
194
195#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
196#[serde(rename_all = "lowercase")]
197pub enum QueueDriver {
198 Memory,
199 #[default]
200 Redis,
201 #[serde(rename = "redis-cluster")]
202 RedisCluster,
203 Sqs,
204 Sns,
205 None,
206}
207
208impl FromStr for QueueDriver {
209 type Err = String;
210 fn from_str(s: &str) -> Result<Self, Self::Err> {
211 match s.to_lowercase().as_str() {
212 "memory" => Ok(QueueDriver::Memory),
213 "redis" => Ok(QueueDriver::Redis),
214 "redis-cluster" => Ok(QueueDriver::RedisCluster),
215 "sqs" => Ok(QueueDriver::Sqs),
216 "sns" => Ok(QueueDriver::Sns),
217 "none" => Ok(QueueDriver::None),
218 _ => Err(format!("Unknown queue driver: {s}")),
219 }
220 }
221}
222
223impl AsRef<str> for QueueDriver {
224 fn as_ref(&self) -> &str {
225 match self {
226 QueueDriver::Memory => "memory",
227 QueueDriver::Redis => "redis",
228 QueueDriver::RedisCluster => "redis-cluster",
229 QueueDriver::Sqs => "sqs",
230 QueueDriver::Sns => "sns",
231 QueueDriver::None => "none",
232 }
233 }
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
237#[serde(default)]
238pub struct RedisClusterQueueConfig {
239 pub concurrency: u32,
240 pub prefix: Option<String>,
241 pub nodes: Vec<String>,
242 pub request_timeout_ms: u64,
243}
244
245impl Default for RedisClusterQueueConfig {
246 fn default() -> Self {
247 Self {
248 concurrency: 5,
249 prefix: Some("sockudo_queue:".to_string()),
250 nodes: vec!["redis://127.0.0.1:6379".to_string()],
251 request_timeout_ms: 5000,
252 }
253 }
254}
255
256#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
257#[serde(rename_all = "lowercase")]
258pub enum MetricsDriver {
259 #[default]
260 Prometheus,
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
264#[serde(rename_all = "lowercase")]
265pub enum LogOutputFormat {
266 #[default]
267 Human,
268 Json,
269}
270
271impl FromStr for LogOutputFormat {
272 type Err = String;
273 fn from_str(s: &str) -> Result<Self, Self::Err> {
274 match s.to_lowercase().as_str() {
275 "human" => Ok(LogOutputFormat::Human),
276 "json" => Ok(LogOutputFormat::Json),
277 _ => Err(format!("Unknown log output format: {s}")),
278 }
279 }
280}
281
282impl FromStr for MetricsDriver {
283 type Err = String;
284 fn from_str(s: &str) -> Result<Self, Self::Err> {
285 match s.to_lowercase().as_str() {
286 "prometheus" => Ok(MetricsDriver::Prometheus),
287 _ => Err(format!("Unknown metrics driver: {s}")),
288 }
289 }
290}
291
292impl AsRef<str> for MetricsDriver {
293 fn as_ref(&self) -> &str {
294 match self {
295 MetricsDriver::Prometheus => "prometheus",
296 }
297 }
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize)]
302#[serde(default)]
303pub struct ServerOptions {
304 pub adapter: AdapterConfig,
305 pub app_manager: AppManagerConfig,
306 pub cache: CacheConfig,
307 pub channel_limits: ChannelLimits,
308 pub cors: CorsConfig,
309 pub database: DatabaseConfig,
310 pub database_pooling: DatabasePooling,
311 pub debug: bool,
312 pub event_limits: EventLimits,
313 pub host: String,
314 pub http_api: HttpApiConfig,
315 pub instance: InstanceConfig,
316 pub logging: Option<LoggingConfig>,
317 pub metrics: MetricsConfig,
318 pub mode: String,
319 pub port: u16,
320 pub path_prefix: String,
321 pub presence: PresenceConfig,
322 pub queue: QueueConfig,
323 pub rate_limiter: RateLimiterConfig,
324 pub shutdown_grace_period: u64,
325 pub ssl: SslConfig,
326 pub user_authentication_timeout: u64,
327 pub webhooks: WebhooksConfig,
328 pub websocket_max_payload_kb: u32,
329 pub cleanup: CleanupConfig,
330 pub activity_timeout: u64,
331 pub cluster_health: ClusterHealthConfig,
332 pub unix_socket: UnixSocketConfig,
333 pub delta_compression: DeltaCompressionOptionsConfig,
334 pub tag_filtering: TagFilteringConfig,
335 pub websocket: WebSocketConfig,
336}
337
338#[derive(Debug, Clone, Serialize, Deserialize)]
341#[serde(default)]
342pub struct SqsQueueConfig {
343 pub region: String,
344 pub queue_url_prefix: Option<String>,
345 pub visibility_timeout: i32,
346 pub endpoint_url: Option<String>,
347 pub max_messages: i32,
348 pub wait_time_seconds: i32,
349 pub concurrency: u32,
350 pub fifo: bool,
351 pub message_group_id: Option<String>,
352}
353
354#[derive(Debug, Clone, Serialize, Deserialize)]
355#[serde(default)]
356pub struct SnsQueueConfig {
357 pub region: String,
358 pub topic_arn: String,
359 pub endpoint_url: Option<String>,
360}
361
362#[derive(Debug, Clone, Serialize, Deserialize)]
363#[serde(default)]
364pub struct AdapterConfig {
365 pub driver: AdapterDriver,
366 pub redis: RedisAdapterConfig,
367 pub cluster: RedisClusterAdapterConfig,
368 pub nats: NatsAdapterConfig,
369 #[serde(default = "default_buffer_multiplier_per_cpu")]
370 pub buffer_multiplier_per_cpu: usize,
371 pub cluster_health: ClusterHealthConfig,
372 #[serde(default = "default_enable_socket_counting")]
373 pub enable_socket_counting: bool,
374}
375
376fn default_enable_socket_counting() -> bool {
377 true
378}
379
380fn default_buffer_multiplier_per_cpu() -> usize {
381 64
382}
383
384impl Default for AdapterConfig {
385 fn default() -> Self {
386 Self {
387 driver: AdapterDriver::default(),
388 redis: RedisAdapterConfig::default(),
389 cluster: RedisClusterAdapterConfig::default(),
390 nats: NatsAdapterConfig::default(),
391 buffer_multiplier_per_cpu: default_buffer_multiplier_per_cpu(),
392 cluster_health: ClusterHealthConfig::default(),
393 enable_socket_counting: default_enable_socket_counting(),
394 }
395 }
396}
397
398#[derive(Debug, Clone, Serialize, Deserialize)]
399#[serde(default)]
400pub struct RedisAdapterConfig {
401 pub requests_timeout: u64,
402 pub prefix: String,
403 pub redis_pub_options: AHashMap<String, sonic_rs::Value>,
404 pub redis_sub_options: AHashMap<String, sonic_rs::Value>,
405 pub cluster_mode: bool,
406}
407
408#[derive(Debug, Clone, Serialize, Deserialize)]
409#[serde(default)]
410pub struct RedisClusterAdapterConfig {
411 pub nodes: Vec<String>,
412 pub prefix: String,
413 pub request_timeout_ms: u64,
414 pub use_connection_manager: bool,
415 #[serde(default)]
416 pub use_sharded_pubsub: bool,
417}
418
419#[derive(Debug, Clone, Serialize, Deserialize)]
420#[serde(default)]
421pub struct NatsAdapterConfig {
422 pub servers: Vec<String>,
423 pub prefix: String,
424 pub request_timeout_ms: u64,
425 pub username: Option<String>,
426 pub password: Option<String>,
427 pub token: Option<String>,
428 pub connection_timeout_ms: u64,
429 pub nodes_number: Option<u32>,
430}
431
432#[derive(Debug, Clone, Serialize, Deserialize, Default)]
433#[serde(default)]
434pub struct AppManagerConfig {
435 pub driver: AppManagerDriver,
436 pub array: ArrayConfig,
437 pub cache: CacheSettings,
438 pub scylladb: ScyllaDbSettings,
439}
440
441#[derive(Debug, Clone, Serialize, Deserialize, Default)]
442#[serde(default)]
443pub struct ArrayConfig {
444 pub apps: Vec<App>,
445}
446
447#[derive(Debug, Clone, Serialize, Deserialize)]
448#[serde(default)]
449pub struct CacheSettings {
450 pub enabled: bool,
451 pub ttl: u64,
452}
453
454#[derive(Debug, Clone, Serialize, Deserialize)]
455#[serde(default)]
456pub struct MemoryCacheOptions {
457 pub ttl: u64,
458 pub cleanup_interval: u64,
459 pub max_capacity: u64,
460}
461
462#[derive(Debug, Clone, Serialize, Deserialize)]
463#[serde(default)]
464pub struct CacheConfig {
465 pub driver: CacheDriver,
466 pub redis: RedisConfig,
467 pub memory: MemoryCacheOptions,
468}
469
470#[derive(Debug, Clone, Serialize, Deserialize, Default)]
471#[serde(default)]
472pub struct RedisConfig {
473 pub prefix: Option<String>,
474 pub url_override: Option<String>,
475 pub cluster_mode: bool,
476}
477
478#[derive(Debug, Clone, Serialize, Deserialize)]
479#[serde(default)]
480pub struct ChannelLimits {
481 pub max_name_length: u32,
482 pub cache_ttl: u64,
483}
484
485#[derive(Debug, Clone, Serialize, Deserialize)]
486#[serde(default)]
487pub struct CorsConfig {
488 pub credentials: bool,
489 pub origin: Vec<String>,
490 pub methods: Vec<String>,
491 pub allowed_headers: Vec<String>,
492}
493
494#[derive(Debug, Clone, Serialize, Deserialize, Default)]
495#[serde(default)]
496pub struct DatabaseConfig {
497 pub mysql: DatabaseConnection,
498 pub postgres: DatabaseConnection,
499 pub redis: RedisConnection,
500 pub dynamodb: DynamoDbSettings,
501 pub scylladb: ScyllaDbSettings,
502}
503
504#[derive(Debug, Clone, Serialize, Deserialize)]
505#[serde(default)]
506pub struct DatabaseConnection {
507 pub host: String,
508 pub port: u16,
509 pub username: String,
510 pub password: String,
511 pub database: String,
512 pub table_name: String,
513 pub connection_pool_size: u32,
514 pub pool_min: Option<u32>,
515 pub pool_max: Option<u32>,
516 pub cache_ttl: u64,
517 pub cache_cleanup_interval: u64,
518 pub cache_max_capacity: u64,
519}
520
521#[derive(Debug, Clone, Serialize, Deserialize)]
522#[serde(default)]
523pub struct RedisConnection {
524 pub host: String,
525 pub port: u16,
526 pub db: u32,
527 pub username: Option<String>,
528 pub password: Option<String>,
529 pub key_prefix: String,
530 pub sentinels: Vec<RedisSentinel>,
531 pub sentinel_password: Option<String>,
532 pub name: String,
533 pub cluster: RedisClusterConnection,
534 pub cluster_nodes: Vec<ClusterNode>,
536}
537
538#[derive(Debug, Clone, Serialize, Deserialize)]
539#[serde(default)]
540pub struct RedisSentinel {
541 pub host: String,
542 pub port: u16,
543}
544
545#[derive(Debug, Clone, Serialize, Deserialize, Default)]
546#[serde(default)]
547pub struct RedisClusterConnection {
548 pub nodes: Vec<ClusterNode>,
549 pub username: Option<String>,
550 pub password: Option<String>,
551 #[serde(alias = "useTLS")]
552 pub use_tls: bool,
553}
554
555impl RedisConnection {
556 pub fn is_sentinel_configured(&self) -> bool {
558 !self.sentinels.is_empty()
559 }
560
561 pub fn to_url(&self) -> String {
563 if self.is_sentinel_configured() {
564 self.build_sentinel_url()
565 } else {
566 self.build_standard_url()
567 }
568 }
569
570 fn build_standard_url(&self) -> String {
571 let (scheme, host) = if self.host.starts_with("rediss://") {
573 ("rediss://", self.host.trim_start_matches("rediss://"))
574 } else if self.host.starts_with("redis://") {
575 ("redis://", self.host.trim_start_matches("redis://"))
576 } else {
577 ("redis://", self.host.as_str())
578 };
579
580 let mut url = String::from(scheme);
581
582 if let Some(ref username) = self.username {
583 url.push_str(username);
584 if let Some(ref password) = self.password {
585 url.push(':');
586 url.push_str(&urlencoding::encode(password));
587 }
588 url.push('@');
589 } else if let Some(ref password) = self.password {
590 url.push(':');
591 url.push_str(&urlencoding::encode(password));
592 url.push('@');
593 }
594
595 url.push_str(host);
596 url.push(':');
597 url.push_str(&self.port.to_string());
598 url.push('/');
599 url.push_str(&self.db.to_string());
600
601 url
602 }
603
604 fn build_sentinel_url(&self) -> String {
605 let mut url = String::from("redis+sentinel://");
606
607 if let Some(ref sentinel_password) = self.sentinel_password {
608 url.push(':');
609 url.push_str(&urlencoding::encode(sentinel_password));
610 url.push('@');
611 }
612
613 let sentinel_hosts: Vec<String> = self
614 .sentinels
615 .iter()
616 .map(|s| format!("{}:{}", s.host, s.port))
617 .collect();
618 url.push_str(&sentinel_hosts.join(","));
619
620 url.push('/');
621 url.push_str(&self.name);
622 url.push('/');
623 url.push_str(&self.db.to_string());
624
625 let mut params = Vec::new();
626 if let Some(ref password) = self.password {
627 params.push(format!("password={}", urlencoding::encode(password)));
628 }
629 if let Some(ref username) = self.username {
630 params.push(format!("username={}", urlencoding::encode(username)));
631 }
632
633 if !params.is_empty() {
634 url.push('?');
635 url.push_str(¶ms.join("&"));
636 }
637
638 url
639 }
640
641 pub fn has_cluster_nodes(&self) -> bool {
644 !self.cluster.nodes.is_empty() || !self.cluster_nodes.is_empty()
645 }
646
647 pub fn cluster_node_urls(&self) -> Vec<String> {
650 if !self.cluster.nodes.is_empty() {
651 return self.build_cluster_urls(&self.cluster.nodes);
652 }
653 self.build_cluster_urls(&self.cluster_nodes)
654 }
655
656 pub fn normalize_cluster_seed_urls(&self, seeds: &[String]) -> Vec<String> {
659 self.build_cluster_urls(
660 &seeds
661 .iter()
662 .filter_map(|seed| ClusterNode::from_seed(seed))
663 .collect::<Vec<ClusterNode>>(),
664 )
665 }
666
667 fn build_cluster_urls(&self, nodes: &[ClusterNode]) -> Vec<String> {
668 let username = self
669 .cluster
670 .username
671 .as_deref()
672 .or(self.username.as_deref());
673 let password = self
674 .cluster
675 .password
676 .as_deref()
677 .or(self.password.as_deref());
678 let use_tls = self.cluster.use_tls;
679
680 nodes
681 .iter()
682 .map(|node| node.to_url_with_options(use_tls, username, password))
683 .collect()
684 }
685}
686
687impl RedisSentinel {
688 pub fn to_host_port(&self) -> String {
689 format!("{}:{}", self.host, self.port)
690 }
691}
692
693#[derive(Debug, Clone, Serialize, Deserialize)]
694#[serde(default)]
695pub struct ClusterNode {
696 pub host: String,
697 pub port: u16,
698}
699
700impl ClusterNode {
701 pub fn to_url(&self) -> String {
702 self.to_url_with_options(false, None, None)
703 }
704
705 pub fn to_url_with_options(
706 &self,
707 use_tls: bool,
708 username: Option<&str>,
709 password: Option<&str>,
710 ) -> String {
711 let host = self.host.trim();
712
713 if host.starts_with("redis://") || host.starts_with("rediss://") {
714 if let Ok(parsed) = Url::parse(host)
715 && let Some(host_str) = parsed.host_str()
716 {
717 let scheme = parsed.scheme();
718 let port = parsed.port_or_known_default().unwrap_or(self.port);
719 let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
720 let parsed_password = parsed.password();
721 let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
722 let (effective_username, effective_password) = if has_embedded_auth {
723 (parsed_username, parsed_password)
724 } else {
725 (username, password)
726 };
727
728 return build_redis_url(
729 scheme,
730 host_str,
731 port,
732 effective_username,
733 effective_password,
734 );
735 }
736
737 let has_port = if let Some(bracket_pos) = host.rfind(']') {
739 host[bracket_pos..].contains(':')
740 } else {
741 host.split(':').count() >= 3
742 };
743 let base = if has_port {
744 host.to_string()
745 } else {
746 format!("{}:{}", host, self.port)
747 };
748
749 if let Ok(parsed) = Url::parse(&base) {
750 let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
751 let parsed_password = parsed.password();
752 if let Some(host_str) = parsed.host_str() {
753 let port = parsed.port_or_known_default().unwrap_or(self.port);
754 let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
755 let (effective_username, effective_password) = if has_embedded_auth {
756 (parsed_username, parsed_password)
757 } else {
758 (username, password)
759 };
760 return build_redis_url(
761 parsed.scheme(),
762 host_str,
763 port,
764 effective_username,
765 effective_password,
766 );
767 }
768 }
769 return base;
770 }
771
772 let (normalized_host, normalized_port) = split_plain_host_and_port(host, self.port);
773 let scheme = if use_tls { "rediss" } else { "redis" };
774 build_redis_url(
775 scheme,
776 &normalized_host,
777 normalized_port,
778 username,
779 password,
780 )
781 }
782
783 pub fn from_seed(seed: &str) -> Option<Self> {
784 let trimmed = seed.trim();
785 if trimmed.is_empty() {
786 return None;
787 }
788
789 if trimmed.starts_with("redis://") || trimmed.starts_with("rediss://") {
790 let port = Url::parse(trimmed)
791 .ok()
792 .and_then(|parsed| parsed.port_or_known_default())
793 .unwrap_or(6379);
794 return Some(Self {
795 host: trimmed.to_string(),
796 port,
797 });
798 }
799
800 let (host, port) = split_plain_host_and_port(trimmed, 6379);
801 Some(Self { host, port })
802 }
803}
804
805fn split_plain_host_and_port(raw_host: &str, default_port: u16) -> (String, u16) {
806 let host = raw_host.trim();
807
808 if host.starts_with('[') {
810 if let Some(end_bracket) = host.find(']') {
811 let host_part = host[1..end_bracket].to_string();
812 let remainder = &host[end_bracket + 1..];
813 if let Some(port_str) = remainder.strip_prefix(':')
814 && let Ok(port) = port_str.parse::<u16>()
815 {
816 return (host_part, port);
817 }
818 return (host_part, default_port);
819 }
820 return (host.to_string(), default_port);
821 }
822
823 if host.matches(':').count() == 1
825 && let Some((host_part, port_part)) = host.rsplit_once(':')
826 && let Ok(port) = port_part.parse::<u16>()
827 {
828 return (host_part.to_string(), port);
829 }
830
831 (host.to_string(), default_port)
832}
833
834fn build_redis_url(
835 scheme: &str,
836 host: &str,
837 port: u16,
838 username: Option<&str>,
839 password: Option<&str>,
840) -> String {
841 let mut url = format!("{scheme}://");
842
843 if let Some(user) = username {
844 url.push_str(&urlencoding::encode(user));
845 if let Some(pass) = password {
846 url.push(':');
847 url.push_str(&urlencoding::encode(pass));
848 }
849 url.push('@');
850 } else if let Some(pass) = password {
851 url.push(':');
852 url.push_str(&urlencoding::encode(pass));
853 url.push('@');
854 }
855
856 if host.contains(':') && !host.starts_with('[') {
857 url.push('[');
858 url.push_str(host);
859 url.push(']');
860 } else {
861 url.push_str(host);
862 }
863 url.push(':');
864 url.push_str(&port.to_string());
865 url
866}
867
868#[derive(Debug, Clone, Serialize, Deserialize)]
869#[serde(default)]
870pub struct DatabasePooling {
871 pub enabled: bool,
872 pub min: u32,
873 pub max: u32,
874}
875
876#[derive(Debug, Clone, Serialize, Deserialize)]
877#[serde(default)]
878pub struct EventLimits {
879 pub max_channels_at_once: u32,
880 pub max_name_length: u32,
881 pub max_payload_in_kb: u32,
882 pub max_batch_size: u32,
883}
884
885#[derive(Debug, Clone, Serialize, Deserialize)]
886#[serde(default)]
887pub struct HttpApiConfig {
888 pub request_limit_in_mb: u32,
889 pub accept_traffic: AcceptTraffic,
890}
891
892#[derive(Debug, Clone, Serialize, Deserialize)]
893#[serde(default)]
894pub struct AcceptTraffic {
895 pub memory_threshold: f64,
896}
897
898#[derive(Debug, Clone, Serialize, Deserialize)]
899#[serde(default)]
900pub struct InstanceConfig {
901 pub process_id: String,
902}
903
904#[derive(Debug, Clone, Serialize, Deserialize)]
905#[serde(default)]
906pub struct MetricsConfig {
907 pub enabled: bool,
908 pub driver: MetricsDriver,
909 pub host: String,
910 pub prometheus: PrometheusConfig,
911 pub port: u16,
912}
913
914#[derive(Debug, Clone, Serialize, Deserialize)]
915#[serde(default)]
916pub struct PrometheusConfig {
917 pub prefix: String,
918}
919
920#[derive(Debug, Clone, Serialize, Deserialize)]
921#[serde(default)]
922pub struct LoggingConfig {
923 pub colors_enabled: bool,
924 pub include_target: bool,
925}
926
927#[derive(Debug, Clone, Serialize, Deserialize)]
928#[serde(default)]
929pub struct PresenceConfig {
930 pub max_members_per_channel: u32,
931 pub max_member_size_in_kb: u32,
932}
933
934#[derive(Debug, Clone, Serialize, Deserialize)]
937#[serde(default)]
938pub struct WebSocketConfig {
939 pub max_messages: Option<usize>,
940 pub max_bytes: Option<usize>,
941 pub disconnect_on_buffer_full: bool,
942 pub max_message_size: usize,
943 pub max_frame_size: usize,
944 pub write_buffer_size: usize,
945 pub max_backpressure: usize,
946 pub auto_ping: bool,
947 pub ping_interval: u32,
948 pub idle_timeout: u32,
949 pub compression: String,
950}
951
952impl Default for WebSocketConfig {
953 fn default() -> Self {
954 Self {
955 max_messages: Some(1000),
956 max_bytes: None,
957 disconnect_on_buffer_full: true,
958 max_message_size: 64 * 1024 * 1024,
959 max_frame_size: 16 * 1024 * 1024,
960 write_buffer_size: 16 * 1024,
961 max_backpressure: 1024 * 1024,
962 auto_ping: true,
963 ping_interval: 30,
964 idle_timeout: 120,
965 compression: "disabled".to_string(),
966 }
967 }
968}
969
970impl WebSocketConfig {
971 pub fn to_buffer_config(&self) -> crate::websocket::WebSocketBufferConfig {
973 use crate::websocket::{BufferLimit, WebSocketBufferConfig};
974
975 let limit = match (self.max_messages, self.max_bytes) {
976 (Some(messages), Some(bytes)) => BufferLimit::Both { messages, bytes },
977 (Some(messages), None) => BufferLimit::Messages(messages),
978 (None, Some(bytes)) => BufferLimit::Bytes(bytes),
979 (None, None) => BufferLimit::Messages(1000),
980 };
981
982 WebSocketBufferConfig {
983 limit,
984 disconnect_on_full: self.disconnect_on_buffer_full,
985 }
986 }
987
988 pub fn to_sockudo_ws_config(
990 &self,
991 websocket_max_payload_kb: u32,
992 activity_timeout: u64,
993 ) -> sockudo_ws::Config {
994 use sockudo_ws::Compression;
995
996 let compression = match self.compression.to_lowercase().as_str() {
997 "dedicated" => Compression::Dedicated,
998 "shared" => Compression::Shared,
999 "window256b" => Compression::Window256B,
1000 "window1kb" => Compression::Window1KB,
1001 "window2kb" => Compression::Window2KB,
1002 "window4kb" => Compression::Window4KB,
1003 "window8kb" => Compression::Window8KB,
1004 "window16kb" => Compression::Window16KB,
1005 "window32kb" => Compression::Window32KB,
1006 _ => Compression::Disabled,
1007 };
1008
1009 sockudo_ws::Config::builder()
1010 .max_payload_length(
1011 self.max_bytes
1012 .unwrap_or(websocket_max_payload_kb as usize * 1024),
1013 )
1014 .max_message_size(self.max_message_size)
1015 .max_frame_size(self.max_frame_size)
1016 .write_buffer_size(self.write_buffer_size)
1017 .max_backpressure(self.max_backpressure)
1018 .idle_timeout(self.idle_timeout)
1019 .auto_ping(self.auto_ping)
1020 .ping_interval(self.ping_interval.max((activity_timeout / 2).max(5) as u32))
1021 .compression(compression)
1022 .build()
1023 }
1024}
1025
1026#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1027#[serde(default)]
1028pub struct QueueConfig {
1029 pub driver: QueueDriver,
1030 pub redis: RedisQueueConfig,
1031 pub redis_cluster: RedisClusterQueueConfig,
1032 pub sqs: SqsQueueConfig,
1033 pub sns: SnsQueueConfig,
1034}
1035
1036#[derive(Debug, Clone, Serialize, Deserialize)]
1037#[serde(default)]
1038pub struct RedisQueueConfig {
1039 pub concurrency: u32,
1040 pub prefix: Option<String>,
1041 pub url_override: Option<String>,
1042 pub cluster_mode: bool,
1043}
1044
1045#[derive(Clone, Debug, Serialize, Deserialize)]
1046#[serde(default)]
1047pub struct RateLimit {
1048 pub max_requests: u32,
1049 pub window_seconds: u64,
1050 pub identifier: Option<String>,
1051 pub trust_hops: Option<u32>,
1052}
1053
1054#[derive(Debug, Clone, Serialize, Deserialize)]
1055#[serde(default)]
1056pub struct RateLimiterConfig {
1057 pub enabled: bool,
1058 pub driver: CacheDriver,
1059 pub api_rate_limit: RateLimit,
1060 pub websocket_rate_limit: RateLimit,
1061 pub redis: RedisConfig,
1062}
1063
1064#[derive(Debug, Clone, Serialize, Deserialize)]
1065#[serde(default)]
1066pub struct SslConfig {
1067 pub enabled: bool,
1068 pub cert_path: String,
1069 pub key_path: String,
1070 pub passphrase: Option<String>,
1071 pub ca_path: Option<String>,
1072 pub redirect_http: bool,
1073 pub http_port: Option<u16>,
1074}
1075
1076#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1077#[serde(default)]
1078pub struct WebhooksConfig {
1079 pub batching: BatchingConfig,
1080}
1081
1082#[derive(Debug, Clone, Serialize, Deserialize)]
1083#[serde(default)]
1084pub struct BatchingConfig {
1085 pub enabled: bool,
1086 pub duration: u64,
1087 pub size: usize,
1088}
1089
1090#[derive(Debug, Clone, Serialize, Deserialize)]
1091#[serde(default)]
1092pub struct ClusterHealthConfig {
1093 pub enabled: bool,
1094 pub heartbeat_interval_ms: u64,
1095 pub node_timeout_ms: u64,
1096 pub cleanup_interval_ms: u64,
1097}
1098
1099#[derive(Debug, Clone, Serialize, Deserialize)]
1100#[serde(default)]
1101pub struct UnixSocketConfig {
1102 pub enabled: bool,
1103 pub path: String,
1104 #[serde(deserialize_with = "deserialize_octal_permission")]
1105 pub permission_mode: u32,
1106}
1107
1108#[derive(Debug, Clone, Serialize, Deserialize)]
1109#[serde(default)]
1110pub struct DeltaCompressionOptionsConfig {
1111 pub enabled: bool,
1112 pub algorithm: String,
1113 pub full_message_interval: u32,
1114 pub min_message_size: usize,
1115 pub max_state_age_secs: u64,
1116 pub max_channel_states_per_socket: usize,
1117 pub max_conflation_states_per_channel: Option<usize>,
1118 pub conflation_key_path: Option<String>,
1119 pub cluster_coordination: bool,
1120 pub omit_delta_algorithm: bool,
1121}
1122
1123#[derive(Debug, Clone, Serialize, Deserialize)]
1125#[serde(default)]
1126pub struct CleanupConfig {
1127 pub queue_buffer_size: usize,
1128 pub batch_size: usize,
1129 pub batch_timeout_ms: u64,
1130 pub worker_threads: WorkerThreadsConfig,
1131 pub max_retry_attempts: u32,
1132 pub async_enabled: bool,
1133 pub fallback_to_sync: bool,
1134}
1135
1136#[derive(Debug, Clone)]
1138pub enum WorkerThreadsConfig {
1139 Auto,
1140 Fixed(usize),
1141}
1142
1143impl serde::Serialize for WorkerThreadsConfig {
1144 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1145 where
1146 S: serde::Serializer,
1147 {
1148 match self {
1149 WorkerThreadsConfig::Auto => serializer.serialize_str("auto"),
1150 WorkerThreadsConfig::Fixed(n) => serializer.serialize_u64(*n as u64),
1151 }
1152 }
1153}
1154
1155impl<'de> serde::Deserialize<'de> for WorkerThreadsConfig {
1156 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1157 where
1158 D: serde::Deserializer<'de>,
1159 {
1160 use serde::de;
1161 struct WorkerThreadsVisitor;
1162 impl<'de> de::Visitor<'de> for WorkerThreadsVisitor {
1163 type Value = WorkerThreadsConfig;
1164
1165 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1166 formatter.write_str(r#""auto" or a positive integer"#)
1167 }
1168
1169 fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
1170 if value.eq_ignore_ascii_case("auto") {
1171 Ok(WorkerThreadsConfig::Auto)
1172 } else if let Ok(n) = value.parse::<usize>() {
1173 Ok(WorkerThreadsConfig::Fixed(n))
1174 } else {
1175 Err(E::custom(format!(
1176 "expected 'auto' or a number, got '{value}'"
1177 )))
1178 }
1179 }
1180
1181 fn visit_u64<E: de::Error>(self, value: u64) -> Result<Self::Value, E> {
1182 Ok(WorkerThreadsConfig::Fixed(value as usize))
1183 }
1184
1185 fn visit_i64<E: de::Error>(self, value: i64) -> Result<Self::Value, E> {
1186 if value >= 0 {
1187 Ok(WorkerThreadsConfig::Fixed(value as usize))
1188 } else {
1189 Err(E::custom("worker_threads must be non-negative"))
1190 }
1191 }
1192 }
1193 deserializer.deserialize_any(WorkerThreadsVisitor)
1194 }
1195}
1196
1197impl Default for CleanupConfig {
1198 fn default() -> Self {
1199 Self {
1200 queue_buffer_size: 1024,
1201 batch_size: 64,
1202 batch_timeout_ms: 100,
1203 worker_threads: WorkerThreadsConfig::Auto,
1204 max_retry_attempts: 3,
1205 async_enabled: true,
1206 fallback_to_sync: true,
1207 }
1208 }
1209}
1210
1211impl CleanupConfig {
1212 pub fn validate(&self) -> Result<(), String> {
1213 if self.queue_buffer_size == 0 {
1214 return Err("queue_buffer_size must be greater than 0".to_string());
1215 }
1216 if self.batch_size == 0 {
1217 return Err("batch_size must be greater than 0".to_string());
1218 }
1219 if self.batch_timeout_ms == 0 {
1220 return Err("batch_timeout_ms must be greater than 0".to_string());
1221 }
1222 if let WorkerThreadsConfig::Fixed(n) = self.worker_threads
1223 && n == 0
1224 {
1225 return Err("worker_threads must be greater than 0 when using fixed count".to_string());
1226 }
1227 Ok(())
1228 }
1229}
1230
1231#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1232#[serde(default)]
1233pub struct TagFilteringConfig {
1234 #[serde(default)]
1235 pub enabled: bool,
1236 #[serde(default = "default_true")]
1237 pub enable_tags: bool,
1238}
1239
1240fn default_true() -> bool {
1241 true
1242}
1243
1244impl Default for ServerOptions {
1247 fn default() -> Self {
1248 Self {
1249 adapter: AdapterConfig::default(),
1250 app_manager: AppManagerConfig::default(),
1251 cache: CacheConfig::default(),
1252 channel_limits: ChannelLimits::default(),
1253 cors: CorsConfig::default(),
1254 database: DatabaseConfig::default(),
1255 database_pooling: DatabasePooling::default(),
1256 debug: false,
1257 tag_filtering: TagFilteringConfig::default(),
1258 event_limits: EventLimits::default(),
1259 host: "0.0.0.0".to_string(),
1260 http_api: HttpApiConfig::default(),
1261 instance: InstanceConfig::default(),
1262 logging: None,
1263 metrics: MetricsConfig::default(),
1264 mode: "production".to_string(),
1265 port: 6001,
1266 path_prefix: "/".to_string(),
1267 presence: PresenceConfig::default(),
1268 queue: QueueConfig::default(),
1269 rate_limiter: RateLimiterConfig::default(),
1270 shutdown_grace_period: 10,
1271 ssl: SslConfig::default(),
1272 user_authentication_timeout: 3600,
1273 webhooks: WebhooksConfig::default(),
1274 websocket_max_payload_kb: 64,
1275 cleanup: CleanupConfig::default(),
1276 activity_timeout: 120,
1277 cluster_health: ClusterHealthConfig::default(),
1278 unix_socket: UnixSocketConfig::default(),
1279 delta_compression: DeltaCompressionOptionsConfig::default(),
1280 websocket: WebSocketConfig::default(),
1281 }
1282 }
1283}
1284
1285impl Default for SqsQueueConfig {
1286 fn default() -> Self {
1287 Self {
1288 region: "us-east-1".to_string(),
1289 queue_url_prefix: None,
1290 visibility_timeout: 30,
1291 endpoint_url: None,
1292 max_messages: 10,
1293 wait_time_seconds: 5,
1294 concurrency: 5,
1295 fifo: false,
1296 message_group_id: Some("default".to_string()),
1297 }
1298 }
1299}
1300
1301impl Default for SnsQueueConfig {
1302 fn default() -> Self {
1303 Self {
1304 region: "us-east-1".to_string(),
1305 topic_arn: String::new(),
1306 endpoint_url: None,
1307 }
1308 }
1309}
1310
1311impl Default for RedisAdapterConfig {
1312 fn default() -> Self {
1313 Self {
1314 requests_timeout: 5000,
1315 prefix: "sockudo_adapter:".to_string(),
1316 redis_pub_options: AHashMap::new(),
1317 redis_sub_options: AHashMap::new(),
1318 cluster_mode: false,
1319 }
1320 }
1321}
1322
1323impl Default for RedisClusterAdapterConfig {
1324 fn default() -> Self {
1325 Self {
1326 nodes: vec![],
1327 prefix: "sockudo_adapter:".to_string(),
1328 request_timeout_ms: 1000,
1329 use_connection_manager: true,
1330 use_sharded_pubsub: false,
1331 }
1332 }
1333}
1334
1335impl Default for NatsAdapterConfig {
1336 fn default() -> Self {
1337 Self {
1338 servers: vec!["nats://localhost:4222".to_string()],
1339 prefix: "sockudo_adapter:".to_string(),
1340 request_timeout_ms: 5000,
1341 username: None,
1342 password: None,
1343 token: None,
1344 connection_timeout_ms: 5000,
1345 nodes_number: None,
1346 }
1347 }
1348}
1349
1350impl Default for CacheSettings {
1351 fn default() -> Self {
1352 Self {
1353 enabled: true,
1354 ttl: 300,
1355 }
1356 }
1357}
1358
1359impl Default for MemoryCacheOptions {
1360 fn default() -> Self {
1361 Self {
1362 ttl: 300,
1363 cleanup_interval: 60,
1364 max_capacity: 10000,
1365 }
1366 }
1367}
1368
1369impl Default for CacheConfig {
1370 fn default() -> Self {
1371 Self {
1372 driver: CacheDriver::default(),
1373 redis: RedisConfig {
1374 prefix: Some("sockudo_cache:".to_string()),
1375 url_override: None,
1376 cluster_mode: false,
1377 },
1378 memory: MemoryCacheOptions::default(),
1379 }
1380 }
1381}
1382
1383impl Default for ChannelLimits {
1384 fn default() -> Self {
1385 Self {
1386 max_name_length: 200,
1387 cache_ttl: 3600,
1388 }
1389 }
1390}
1391impl Default for CorsConfig {
1392 fn default() -> Self {
1393 Self {
1394 credentials: true,
1395 origin: vec!["*".to_string()],
1396 methods: vec!["GET".to_string(), "POST".to_string(), "OPTIONS".to_string()],
1397 allowed_headers: vec![
1398 "Authorization".to_string(),
1399 "Content-Type".to_string(),
1400 "X-Requested-With".to_string(),
1401 "Accept".to_string(),
1402 ],
1403 }
1404 }
1405}
1406
1407impl Default for DatabaseConnection {
1408 fn default() -> Self {
1409 Self {
1410 host: "localhost".to_string(),
1411 port: 3306,
1412 username: "root".to_string(),
1413 password: "".to_string(),
1414 database: "sockudo".to_string(),
1415 table_name: "applications".to_string(),
1416 connection_pool_size: 10,
1417 pool_min: None,
1418 pool_max: None,
1419 cache_ttl: 300,
1420 cache_cleanup_interval: 60,
1421 cache_max_capacity: 100,
1422 }
1423 }
1424}
1425
1426impl Default for RedisConnection {
1427 fn default() -> Self {
1428 Self {
1429 host: "127.0.0.1".to_string(),
1430 port: 6379,
1431 db: 0,
1432 username: None,
1433 password: None,
1434 key_prefix: "sockudo:".to_string(),
1435 sentinels: Vec::new(),
1436 sentinel_password: None,
1437 name: "mymaster".to_string(),
1438 cluster: RedisClusterConnection::default(),
1439 cluster_nodes: Vec::new(),
1440 }
1441 }
1442}
1443
1444impl Default for RedisSentinel {
1445 fn default() -> Self {
1446 Self {
1447 host: "localhost".to_string(),
1448 port: 26379,
1449 }
1450 }
1451}
1452
1453impl Default for ClusterNode {
1454 fn default() -> Self {
1455 Self {
1456 host: "127.0.0.1".to_string(),
1457 port: 7000,
1458 }
1459 }
1460}
1461
1462impl Default for DatabasePooling {
1463 fn default() -> Self {
1464 Self {
1465 enabled: true,
1466 min: 2,
1467 max: 10,
1468 }
1469 }
1470}
1471
1472impl Default for EventLimits {
1473 fn default() -> Self {
1474 Self {
1475 max_channels_at_once: 100,
1476 max_name_length: 200,
1477 max_payload_in_kb: 100,
1478 max_batch_size: 10,
1479 }
1480 }
1481}
1482
1483impl Default for HttpApiConfig {
1484 fn default() -> Self {
1485 Self {
1486 request_limit_in_mb: 10,
1487 accept_traffic: AcceptTraffic::default(),
1488 }
1489 }
1490}
1491
1492impl Default for AcceptTraffic {
1493 fn default() -> Self {
1494 Self {
1495 memory_threshold: 0.90,
1496 }
1497 }
1498}
1499
1500impl Default for InstanceConfig {
1501 fn default() -> Self {
1502 Self {
1503 process_id: uuid::Uuid::new_v4().to_string(),
1504 }
1505 }
1506}
1507
1508impl Default for LoggingConfig {
1509 fn default() -> Self {
1510 Self {
1511 colors_enabled: true,
1512 include_target: true,
1513 }
1514 }
1515}
1516
1517impl Default for MetricsConfig {
1518 fn default() -> Self {
1519 Self {
1520 enabled: true,
1521 driver: MetricsDriver::default(),
1522 host: "0.0.0.0".to_string(),
1523 prometheus: PrometheusConfig::default(),
1524 port: 9601,
1525 }
1526 }
1527}
1528
1529impl Default for PrometheusConfig {
1530 fn default() -> Self {
1531 Self {
1532 prefix: "sockudo_".to_string(),
1533 }
1534 }
1535}
1536
1537impl Default for PresenceConfig {
1538 fn default() -> Self {
1539 Self {
1540 max_members_per_channel: 100,
1541 max_member_size_in_kb: 2,
1542 }
1543 }
1544}
1545
1546impl Default for RedisQueueConfig {
1547 fn default() -> Self {
1548 Self {
1549 concurrency: 5,
1550 prefix: Some("sockudo_queue:".to_string()),
1551 url_override: None,
1552 cluster_mode: false,
1553 }
1554 }
1555}
1556
1557impl Default for RateLimit {
1558 fn default() -> Self {
1559 Self {
1560 max_requests: 60,
1561 window_seconds: 60,
1562 identifier: Some("default".to_string()),
1563 trust_hops: Some(0),
1564 }
1565 }
1566}
1567
1568impl Default for RateLimiterConfig {
1569 fn default() -> Self {
1570 Self {
1571 enabled: true,
1572 driver: CacheDriver::Memory,
1573 api_rate_limit: RateLimit {
1574 max_requests: 100,
1575 window_seconds: 60,
1576 identifier: Some("api".to_string()),
1577 trust_hops: Some(0),
1578 },
1579 websocket_rate_limit: RateLimit {
1580 max_requests: 20,
1581 window_seconds: 60,
1582 identifier: Some("websocket_connect".to_string()),
1583 trust_hops: Some(0),
1584 },
1585 redis: RedisConfig {
1586 prefix: Some("sockudo_rl:".to_string()),
1587 url_override: None,
1588 cluster_mode: false,
1589 },
1590 }
1591 }
1592}
1593
1594impl Default for SslConfig {
1595 fn default() -> Self {
1596 Self {
1597 enabled: false,
1598 cert_path: "".to_string(),
1599 key_path: "".to_string(),
1600 passphrase: None,
1601 ca_path: None,
1602 redirect_http: false,
1603 http_port: Some(80),
1604 }
1605 }
1606}
1607
1608impl Default for BatchingConfig {
1609 fn default() -> Self {
1610 Self {
1611 enabled: true,
1612 duration: 50,
1613 size: 100,
1614 }
1615 }
1616}
1617
1618impl Default for ClusterHealthConfig {
1619 fn default() -> Self {
1620 Self {
1621 enabled: true,
1622 heartbeat_interval_ms: 10000,
1623 node_timeout_ms: 30000,
1624 cleanup_interval_ms: 10000,
1625 }
1626 }
1627}
1628
1629impl Default for UnixSocketConfig {
1630 fn default() -> Self {
1631 Self {
1632 enabled: false,
1633 path: "/var/run/sockudo/sockudo.sock".to_string(),
1634 permission_mode: 0o660,
1635 }
1636 }
1637}
1638
1639impl Default for DeltaCompressionOptionsConfig {
1640 fn default() -> Self {
1641 Self {
1642 enabled: true,
1643 algorithm: "fossil".to_string(),
1644 full_message_interval: 10,
1645 min_message_size: 100,
1646 max_state_age_secs: 300,
1647 max_channel_states_per_socket: 100,
1648 max_conflation_states_per_channel: Some(100),
1649 conflation_key_path: None,
1650 cluster_coordination: false,
1651 omit_delta_algorithm: false,
1652 }
1653 }
1654}
1655
1656impl ClusterHealthConfig {
1657 pub fn validate(&self) -> Result<(), String> {
1658 if self.heartbeat_interval_ms == 0 {
1659 return Err("heartbeat_interval_ms must be greater than 0".to_string());
1660 }
1661 if self.node_timeout_ms == 0 {
1662 return Err("node_timeout_ms must be greater than 0".to_string());
1663 }
1664 if self.cleanup_interval_ms == 0 {
1665 return Err("cleanup_interval_ms must be greater than 0".to_string());
1666 }
1667
1668 if self.heartbeat_interval_ms > self.node_timeout_ms / 3 {
1669 return Err(format!(
1670 "heartbeat_interval_ms ({}) should be at least 3x smaller than node_timeout_ms ({}) to avoid false positive dead node detection. Recommended: heartbeat_interval_ms <= {}",
1671 self.heartbeat_interval_ms,
1672 self.node_timeout_ms,
1673 self.node_timeout_ms / 3
1674 ));
1675 }
1676
1677 if self.cleanup_interval_ms > self.node_timeout_ms {
1678 return Err(format!(
1679 "cleanup_interval_ms ({}) should not be larger than node_timeout_ms ({}) to ensure timely dead node detection",
1680 self.cleanup_interval_ms, self.node_timeout_ms
1681 ));
1682 }
1683
1684 Ok(())
1685 }
1686}
1687
1688impl ServerOptions {
1689 pub async fn load_from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
1690 let content = tokio::fs::read_to_string(path).await?;
1691 let options: Self = sonic_rs::from_str(&content)?;
1692 Ok(options)
1693 }
1694
1695 pub async fn override_from_env(&mut self) -> Result<(), Box<dyn std::error::Error>> {
1696 if let Ok(mode) = std::env::var("ENVIRONMENT") {
1698 self.mode = mode;
1699 }
1700 self.debug = parse_bool_env("DEBUG_MODE", self.debug);
1701 if parse_bool_env("DEBUG", false) {
1702 self.debug = true;
1703 info!("DEBUG environment variable forces debug mode ON");
1704 }
1705
1706 self.activity_timeout = parse_env::<u64>("ACTIVITY_TIMEOUT", self.activity_timeout);
1707
1708 if let Ok(host) = std::env::var("HOST") {
1709 self.host = host;
1710 }
1711 self.port = parse_env::<u16>("PORT", self.port);
1712 self.shutdown_grace_period =
1713 parse_env::<u64>("SHUTDOWN_GRACE_PERIOD", self.shutdown_grace_period);
1714 self.user_authentication_timeout = parse_env::<u64>(
1715 "USER_AUTHENTICATION_TIMEOUT",
1716 self.user_authentication_timeout,
1717 );
1718 self.websocket_max_payload_kb =
1719 parse_env::<u32>("WEBSOCKET_MAX_PAYLOAD_KB", self.websocket_max_payload_kb);
1720 if let Ok(id) = std::env::var("INSTANCE_PROCESS_ID") {
1721 self.instance.process_id = id;
1722 }
1723
1724 if let Ok(driver_str) = std::env::var("ADAPTER_DRIVER") {
1726 self.adapter.driver =
1727 parse_driver_enum(driver_str, self.adapter.driver.clone(), "Adapter");
1728 }
1729 self.adapter.buffer_multiplier_per_cpu = parse_env::<usize>(
1730 "ADAPTER_BUFFER_MULTIPLIER_PER_CPU",
1731 self.adapter.buffer_multiplier_per_cpu,
1732 );
1733 self.adapter.enable_socket_counting = parse_env::<bool>(
1734 "ADAPTER_ENABLE_SOCKET_COUNTING",
1735 self.adapter.enable_socket_counting,
1736 );
1737 if let Ok(driver_str) = std::env::var("CACHE_DRIVER") {
1738 self.cache.driver = parse_driver_enum(driver_str, self.cache.driver.clone(), "Cache");
1739 }
1740 if let Ok(driver_str) = std::env::var("QUEUE_DRIVER") {
1741 self.queue.driver = parse_driver_enum(driver_str, self.queue.driver.clone(), "Queue");
1742 }
1743 if let Ok(driver_str) = std::env::var("APP_MANAGER_DRIVER") {
1744 self.app_manager.driver =
1745 parse_driver_enum(driver_str, self.app_manager.driver.clone(), "AppManager");
1746 }
1747 if let Ok(driver_str) = std::env::var("RATE_LIMITER_DRIVER") {
1748 self.rate_limiter.driver = parse_driver_enum(
1749 driver_str,
1750 self.rate_limiter.driver.clone(),
1751 "RateLimiter Backend",
1752 );
1753 }
1754
1755 if let Ok(host) = std::env::var("DATABASE_REDIS_HOST") {
1757 self.database.redis.host = host;
1758 }
1759 self.database.redis.port =
1760 parse_env::<u16>("DATABASE_REDIS_PORT", self.database.redis.port);
1761 if let Ok(username) = std::env::var("DATABASE_REDIS_USERNAME") {
1762 self.database.redis.username = if username.is_empty() {
1763 None
1764 } else {
1765 Some(username)
1766 };
1767 }
1768 if let Ok(password) = std::env::var("DATABASE_REDIS_PASSWORD") {
1769 self.database.redis.password = Some(password);
1770 }
1771 self.database.redis.db = parse_env::<u32>("DATABASE_REDIS_DB", self.database.redis.db);
1772 if let Ok(prefix) = std::env::var("DATABASE_REDIS_KEY_PREFIX") {
1773 self.database.redis.key_prefix = prefix;
1774 }
1775 if let Ok(cluster_username) = std::env::var("DATABASE_REDIS_CLUSTER_USERNAME") {
1776 self.database.redis.cluster.username = if cluster_username.is_empty() {
1777 None
1778 } else {
1779 Some(cluster_username)
1780 };
1781 }
1782 if let Ok(cluster_password) = std::env::var("DATABASE_REDIS_CLUSTER_PASSWORD") {
1783 self.database.redis.cluster.password = Some(cluster_password);
1784 }
1785 self.database.redis.cluster.use_tls = parse_bool_env(
1786 "DATABASE_REDIS_CLUSTER_USE_TLS",
1787 self.database.redis.cluster.use_tls,
1788 );
1789
1790 if let Ok(host) = std::env::var("DATABASE_MYSQL_HOST") {
1792 self.database.mysql.host = host;
1793 }
1794 self.database.mysql.port =
1795 parse_env::<u16>("DATABASE_MYSQL_PORT", self.database.mysql.port);
1796 if let Ok(user) = std::env::var("DATABASE_MYSQL_USERNAME") {
1797 self.database.mysql.username = user;
1798 }
1799 if let Ok(pass) = std::env::var("DATABASE_MYSQL_PASSWORD") {
1800 self.database.mysql.password = pass;
1801 }
1802 if let Ok(db) = std::env::var("DATABASE_MYSQL_DATABASE") {
1803 self.database.mysql.database = db;
1804 }
1805 if let Ok(table) = std::env::var("DATABASE_MYSQL_TABLE_NAME") {
1806 self.database.mysql.table_name = table;
1807 }
1808 override_db_pool_settings(&mut self.database.mysql, "DATABASE_MYSQL");
1809
1810 if let Ok(host) = std::env::var("DATABASE_POSTGRES_HOST") {
1812 self.database.postgres.host = host;
1813 }
1814 self.database.postgres.port =
1815 parse_env::<u16>("DATABASE_POSTGRES_PORT", self.database.postgres.port);
1816 if let Ok(user) = std::env::var("DATABASE_POSTGRES_USERNAME") {
1817 self.database.postgres.username = user;
1818 }
1819 if let Ok(pass) = std::env::var("DATABASE_POSTGRES_PASSWORD") {
1820 self.database.postgres.password = pass;
1821 }
1822 if let Ok(db) = std::env::var("DATABASE_POSTGRES_DATABASE") {
1823 self.database.postgres.database = db;
1824 }
1825 override_db_pool_settings(&mut self.database.postgres, "DATABASE_POSTGRES");
1826
1827 if let Ok(region) = std::env::var("DATABASE_DYNAMODB_REGION") {
1829 self.database.dynamodb.region = region;
1830 }
1831 if let Ok(table) = std::env::var("DATABASE_DYNAMODB_TABLE_NAME") {
1832 self.database.dynamodb.table_name = table;
1833 }
1834 if let Ok(endpoint) = std::env::var("DATABASE_DYNAMODB_ENDPOINT_URL") {
1835 self.database.dynamodb.endpoint_url = Some(endpoint);
1836 }
1837 if let Ok(key_id) = std::env::var("AWS_ACCESS_KEY_ID") {
1838 self.database.dynamodb.aws_access_key_id = Some(key_id);
1839 }
1840 if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") {
1841 self.database.dynamodb.aws_secret_access_key = Some(secret);
1842 }
1843
1844 let apply_redis_cluster_nodes = |options: &mut Self, nodes: &str| {
1846 let node_list: Vec<String> = nodes
1847 .split(',')
1848 .map(|s| s.trim())
1849 .filter(|s| !s.is_empty())
1850 .map(ToString::to_string)
1851 .collect();
1852
1853 options.adapter.cluster.nodes = node_list.clone();
1854 options.queue.redis_cluster.nodes = node_list.clone();
1855
1856 let parsed_nodes: Vec<ClusterNode> = node_list
1857 .iter()
1858 .filter_map(|seed| ClusterNode::from_seed(seed))
1859 .collect();
1860 options.database.redis.cluster.nodes = parsed_nodes.clone();
1861 options.database.redis.cluster_nodes = parsed_nodes;
1862 };
1863
1864 if let Ok(nodes) = std::env::var("REDIS_CLUSTER_NODES") {
1865 apply_redis_cluster_nodes(self, &nodes);
1866 }
1867 if let Ok(nodes) = std::env::var("DATABASE_REDIS_CLUSTER_NODES") {
1868 apply_redis_cluster_nodes(self, &nodes);
1869 }
1870 self.queue.redis_cluster.concurrency = parse_env::<u32>(
1871 "REDIS_CLUSTER_QUEUE_CONCURRENCY",
1872 self.queue.redis_cluster.concurrency,
1873 );
1874 if let Ok(prefix) = std::env::var("REDIS_CLUSTER_QUEUE_PREFIX") {
1875 self.queue.redis_cluster.prefix = Some(prefix);
1876 }
1877
1878 self.ssl.enabled = parse_bool_env("SSL_ENABLED", self.ssl.enabled);
1880 if let Ok(val) = std::env::var("SSL_CERT_PATH") {
1881 self.ssl.cert_path = val;
1882 }
1883 if let Ok(val) = std::env::var("SSL_KEY_PATH") {
1884 self.ssl.key_path = val;
1885 }
1886 self.ssl.redirect_http = parse_bool_env("SSL_REDIRECT_HTTP", self.ssl.redirect_http);
1887 if let Some(port) = parse_env_optional::<u16>("SSL_HTTP_PORT") {
1888 self.ssl.http_port = Some(port);
1889 }
1890
1891 self.unix_socket.enabled = parse_bool_env("UNIX_SOCKET_ENABLED", self.unix_socket.enabled);
1893 if let Ok(path) = std::env::var("UNIX_SOCKET_PATH") {
1894 self.unix_socket.path = path;
1895 }
1896 if let Ok(mode_str) = std::env::var("UNIX_SOCKET_PERMISSION_MODE") {
1897 if mode_str.chars().all(|c| c.is_digit(8)) {
1898 if let Ok(mode) = u32::from_str_radix(&mode_str, 8) {
1899 if mode <= 0o777 {
1900 self.unix_socket.permission_mode = mode;
1901 } else {
1902 warn!(
1903 "UNIX_SOCKET_PERMISSION_MODE '{}' exceeds maximum value 777. Using default: {:o}",
1904 mode_str, self.unix_socket.permission_mode
1905 );
1906 }
1907 } else {
1908 warn!(
1909 "Failed to parse UNIX_SOCKET_PERMISSION_MODE '{}' as octal. Using default: {:o}",
1910 mode_str, self.unix_socket.permission_mode
1911 );
1912 }
1913 } else {
1914 warn!(
1915 "UNIX_SOCKET_PERMISSION_MODE '{}' must contain only octal digits (0-7). Using default: {:o}",
1916 mode_str, self.unix_socket.permission_mode
1917 );
1918 }
1919 }
1920
1921 if let Ok(driver_str) = std::env::var("METRICS_DRIVER") {
1923 self.metrics.driver =
1924 parse_driver_enum(driver_str, self.metrics.driver.clone(), "Metrics");
1925 }
1926 self.metrics.enabled = parse_bool_env("METRICS_ENABLED", self.metrics.enabled);
1927 if let Ok(val) = std::env::var("METRICS_HOST") {
1928 self.metrics.host = val;
1929 }
1930 self.metrics.port = parse_env::<u16>("METRICS_PORT", self.metrics.port);
1931 if let Ok(val) = std::env::var("METRICS_PROMETHEUS_PREFIX") {
1932 self.metrics.prometheus.prefix = val;
1933 }
1934
1935 self.rate_limiter.enabled =
1937 parse_bool_env("RATE_LIMITER_ENABLED", self.rate_limiter.enabled);
1938 self.rate_limiter.api_rate_limit.max_requests = parse_env::<u32>(
1939 "RATE_LIMITER_API_MAX_REQUESTS",
1940 self.rate_limiter.api_rate_limit.max_requests,
1941 );
1942 self.rate_limiter.api_rate_limit.window_seconds = parse_env::<u64>(
1943 "RATE_LIMITER_API_WINDOW_SECONDS",
1944 self.rate_limiter.api_rate_limit.window_seconds,
1945 );
1946 if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_API_TRUST_HOPS") {
1947 self.rate_limiter.api_rate_limit.trust_hops = Some(hops);
1948 }
1949 self.rate_limiter.websocket_rate_limit.max_requests = parse_env::<u32>(
1950 "RATE_LIMITER_WS_MAX_REQUESTS",
1951 self.rate_limiter.websocket_rate_limit.max_requests,
1952 );
1953 self.rate_limiter.websocket_rate_limit.window_seconds = parse_env::<u64>(
1954 "RATE_LIMITER_WS_WINDOW_SECONDS",
1955 self.rate_limiter.websocket_rate_limit.window_seconds,
1956 );
1957 if let Ok(prefix) = std::env::var("RATE_LIMITER_REDIS_PREFIX") {
1958 self.rate_limiter.redis.prefix = Some(prefix);
1959 }
1960
1961 self.queue.redis.concurrency =
1963 parse_env::<u32>("QUEUE_REDIS_CONCURRENCY", self.queue.redis.concurrency);
1964 if let Ok(prefix) = std::env::var("QUEUE_REDIS_PREFIX") {
1965 self.queue.redis.prefix = Some(prefix);
1966 }
1967
1968 if let Ok(region) = std::env::var("QUEUE_SQS_REGION") {
1970 self.queue.sqs.region = region;
1971 }
1972 self.queue.sqs.visibility_timeout = parse_env::<i32>(
1973 "QUEUE_SQS_VISIBILITY_TIMEOUT",
1974 self.queue.sqs.visibility_timeout,
1975 );
1976 self.queue.sqs.max_messages =
1977 parse_env::<i32>("QUEUE_SQS_MAX_MESSAGES", self.queue.sqs.max_messages);
1978 self.queue.sqs.wait_time_seconds = parse_env::<i32>(
1979 "QUEUE_SQS_WAIT_TIME_SECONDS",
1980 self.queue.sqs.wait_time_seconds,
1981 );
1982 self.queue.sqs.concurrency =
1983 parse_env::<u32>("QUEUE_SQS_CONCURRENCY", self.queue.sqs.concurrency);
1984 self.queue.sqs.fifo = parse_bool_env("QUEUE_SQS_FIFO", self.queue.sqs.fifo);
1985 if let Ok(endpoint) = std::env::var("QUEUE_SQS_ENDPOINT_URL") {
1986 self.queue.sqs.endpoint_url = Some(endpoint);
1987 }
1988
1989 if let Ok(region) = std::env::var("QUEUE_SNS_REGION") {
1991 self.queue.sns.region = region;
1992 }
1993 if let Ok(topic_arn) = std::env::var("QUEUE_SNS_TOPIC_ARN") {
1994 self.queue.sns.topic_arn = topic_arn;
1995 }
1996 if let Ok(endpoint) = std::env::var("QUEUE_SNS_ENDPOINT_URL") {
1997 self.queue.sns.endpoint_url = Some(endpoint);
1998 }
1999
2000 self.webhooks.batching.enabled =
2002 parse_bool_env("WEBHOOK_BATCHING_ENABLED", self.webhooks.batching.enabled);
2003 self.webhooks.batching.duration =
2004 parse_env::<u64>("WEBHOOK_BATCHING_DURATION", self.webhooks.batching.duration);
2005 self.webhooks.batching.size =
2006 parse_env::<usize>("WEBHOOK_BATCHING_SIZE", self.webhooks.batching.size);
2007
2008 if let Ok(servers) = std::env::var("NATS_SERVERS") {
2010 self.adapter.nats.servers = servers.split(',').map(|s| s.trim().to_string()).collect();
2011 }
2012 if let Ok(user) = std::env::var("NATS_USERNAME") {
2013 self.adapter.nats.username = Some(user);
2014 }
2015 if let Ok(pass) = std::env::var("NATS_PASSWORD") {
2016 self.adapter.nats.password = Some(pass);
2017 }
2018 if let Ok(token) = std::env::var("NATS_TOKEN") {
2019 self.adapter.nats.token = Some(token);
2020 }
2021 self.adapter.nats.connection_timeout_ms = parse_env::<u64>(
2022 "NATS_CONNECTION_TIMEOUT_MS",
2023 self.adapter.nats.connection_timeout_ms,
2024 );
2025 self.adapter.nats.request_timeout_ms = parse_env::<u64>(
2026 "NATS_REQUEST_TIMEOUT_MS",
2027 self.adapter.nats.request_timeout_ms,
2028 );
2029
2030 if let Ok(origins) = std::env::var("CORS_ORIGINS") {
2032 self.cors.origin = origins.split(',').map(|s| s.trim().to_string()).collect();
2033 }
2034 if let Ok(methods) = std::env::var("CORS_METHODS") {
2035 self.cors.methods = methods.split(',').map(|s| s.trim().to_string()).collect();
2036 }
2037 if let Ok(headers) = std::env::var("CORS_HEADERS") {
2038 self.cors.allowed_headers = headers.split(',').map(|s| s.trim().to_string()).collect();
2039 }
2040 self.cors.credentials = parse_bool_env("CORS_CREDENTIALS", self.cors.credentials);
2041
2042 self.database_pooling.enabled =
2044 parse_bool_env("DATABASE_POOLING_ENABLED", self.database_pooling.enabled);
2045 if let Some(min) = parse_env_optional::<u32>("DATABASE_POOL_MIN") {
2046 self.database_pooling.min = min;
2047 }
2048 if let Some(max) = parse_env_optional::<u32>("DATABASE_POOL_MAX") {
2049 self.database_pooling.max = max;
2050 }
2051
2052 if let Some(pool_size) = parse_env_optional::<u32>("DATABASE_CONNECTION_POOL_SIZE") {
2053 self.database.mysql.connection_pool_size = pool_size;
2054 self.database.postgres.connection_pool_size = pool_size;
2055 }
2056 if let Some(cache_ttl) = parse_env_optional::<u64>("CACHE_TTL_SECONDS") {
2057 self.app_manager.cache.ttl = cache_ttl;
2058 self.channel_limits.cache_ttl = cache_ttl;
2059 self.database.mysql.cache_ttl = cache_ttl;
2060 self.database.postgres.cache_ttl = cache_ttl;
2061 self.cache.memory.ttl = cache_ttl;
2062 }
2063 if let Some(cleanup_interval) = parse_env_optional::<u64>("CACHE_CLEANUP_INTERVAL") {
2064 self.database.mysql.cache_cleanup_interval = cleanup_interval;
2065 self.database.postgres.cache_cleanup_interval = cleanup_interval;
2066 self.cache.memory.cleanup_interval = cleanup_interval;
2067 }
2068 if let Some(max_capacity) = parse_env_optional::<u64>("CACHE_MAX_CAPACITY") {
2069 self.database.mysql.cache_max_capacity = max_capacity;
2070 self.database.postgres.cache_max_capacity = max_capacity;
2071 self.cache.memory.max_capacity = max_capacity;
2072 }
2073
2074 let default_app_id = std::env::var("SOCKUDO_DEFAULT_APP_ID");
2075 let default_app_key = std::env::var("SOCKUDO_DEFAULT_APP_KEY");
2076 let default_app_secret = std::env::var("SOCKUDO_DEFAULT_APP_SECRET");
2077 let default_app_enabled = parse_bool_env("SOCKUDO_DEFAULT_APP_ENABLED", true);
2078
2079 if let (Ok(app_id), Ok(app_key), Ok(app_secret)) =
2080 (default_app_id, default_app_key, default_app_secret)
2081 && default_app_enabled
2082 {
2083 let default_app = App {
2084 id: app_id,
2085 key: app_key,
2086 secret: app_secret,
2087 enable_client_messages: parse_bool_env("SOCKUDO_ENABLE_CLIENT_MESSAGES", false),
2088 enabled: default_app_enabled,
2089 max_connections: parse_env::<u32>("SOCKUDO_DEFAULT_APP_MAX_CONNECTIONS", 100),
2090 max_client_events_per_second: parse_env::<u32>(
2091 "SOCKUDO_DEFAULT_APP_MAX_CLIENT_EVENTS_PER_SECOND",
2092 100,
2093 ),
2094 max_read_requests_per_second: Some(parse_env::<u32>(
2095 "SOCKUDO_DEFAULT_APP_MAX_READ_REQUESTS_PER_SECOND",
2096 100,
2097 )),
2098 max_presence_members_per_channel: Some(parse_env::<u32>(
2099 "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBERS_PER_CHANNEL",
2100 100,
2101 )),
2102 max_presence_member_size_in_kb: Some(parse_env::<u32>(
2103 "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBER_SIZE_IN_KB",
2104 100,
2105 )),
2106 max_channel_name_length: Some(parse_env::<u32>(
2107 "SOCKUDO_DEFAULT_APP_MAX_CHANNEL_NAME_LENGTH",
2108 100,
2109 )),
2110 max_event_channels_at_once: Some(parse_env::<u32>(
2111 "SOCKUDO_DEFAULT_APP_MAX_EVENT_CHANNELS_AT_ONCE",
2112 100,
2113 )),
2114 max_event_name_length: Some(parse_env::<u32>(
2115 "SOCKUDO_DEFAULT_APP_MAX_EVENT_NAME_LENGTH",
2116 100,
2117 )),
2118 max_event_payload_in_kb: Some(parse_env::<u32>(
2119 "SOCKUDO_DEFAULT_APP_MAX_EVENT_PAYLOAD_IN_KB",
2120 100,
2121 )),
2122 max_event_batch_size: Some(parse_env::<u32>(
2123 "SOCKUDO_DEFAULT_APP_MAX_EVENT_BATCH_SIZE",
2124 100,
2125 )),
2126 enable_user_authentication: Some(parse_bool_env(
2127 "SOCKUDO_DEFAULT_APP_ENABLE_USER_AUTHENTICATION",
2128 false,
2129 )),
2130 webhooks: None,
2131 max_backend_events_per_second: Some(parse_env::<u32>(
2132 "SOCKUDO_DEFAULT_APP_MAX_BACKEND_EVENTS_PER_SECOND",
2133 100,
2134 )),
2135 enable_watchlist_events: Some(parse_bool_env(
2136 "SOCKUDO_DEFAULT_APP_ENABLE_WATCHLIST_EVENTS",
2137 false,
2138 )),
2139 allowed_origins: {
2140 if let Ok(origins_str) = std::env::var("SOCKUDO_DEFAULT_APP_ALLOWED_ORIGINS") {
2141 if !origins_str.is_empty() {
2142 Some(
2143 origins_str
2144 .split(',')
2145 .map(|s| s.trim().to_string())
2146 .collect(),
2147 )
2148 } else {
2149 None
2150 }
2151 } else {
2152 None
2153 }
2154 },
2155 channel_delta_compression: None,
2156 };
2157
2158 self.app_manager.array.apps.push(default_app);
2159 info!("Successfully registered default app from env");
2160 }
2161
2162 if let Ok(redis_url_env) = std::env::var("REDIS_URL") {
2164 info!("Applying REDIS_URL environment variable override");
2165
2166 let redis_url_json = sonic_rs::json!(redis_url_env);
2167
2168 self.adapter
2169 .redis
2170 .redis_pub_options
2171 .insert("url".to_string(), redis_url_json.clone());
2172 self.adapter
2173 .redis
2174 .redis_sub_options
2175 .insert("url".to_string(), redis_url_json);
2176
2177 self.cache.redis.url_override = Some(redis_url_env.clone());
2178 self.queue.redis.url_override = Some(redis_url_env.clone());
2179 self.rate_limiter.redis.url_override = Some(redis_url_env);
2180 }
2181
2182 let has_colors_env = std::env::var("LOG_COLORS_ENABLED").is_ok();
2184 let has_target_env = std::env::var("LOG_INCLUDE_TARGET").is_ok();
2185 if has_colors_env || has_target_env {
2186 let logging_config = self.logging.get_or_insert_with(Default::default);
2187 if has_colors_env {
2188 logging_config.colors_enabled =
2189 parse_bool_env("LOG_COLORS_ENABLED", logging_config.colors_enabled);
2190 }
2191 if has_target_env {
2192 logging_config.include_target =
2193 parse_bool_env("LOG_INCLUDE_TARGET", logging_config.include_target);
2194 }
2195 }
2196
2197 self.cleanup.async_enabled =
2199 parse_bool_env("CLEANUP_ASYNC_ENABLED", self.cleanup.async_enabled);
2200 self.cleanup.fallback_to_sync =
2201 parse_bool_env("CLEANUP_FALLBACK_TO_SYNC", self.cleanup.fallback_to_sync);
2202 self.cleanup.queue_buffer_size =
2203 parse_env::<usize>("CLEANUP_QUEUE_BUFFER_SIZE", self.cleanup.queue_buffer_size);
2204 self.cleanup.batch_size = parse_env::<usize>("CLEANUP_BATCH_SIZE", self.cleanup.batch_size);
2205 self.cleanup.batch_timeout_ms =
2206 parse_env::<u64>("CLEANUP_BATCH_TIMEOUT_MS", self.cleanup.batch_timeout_ms);
2207 if let Ok(worker_threads_str) = std::env::var("CLEANUP_WORKER_THREADS") {
2208 self.cleanup.worker_threads = if worker_threads_str.to_lowercase() == "auto" {
2209 WorkerThreadsConfig::Auto
2210 } else if let Ok(n) = worker_threads_str.parse::<usize>() {
2211 WorkerThreadsConfig::Fixed(n)
2212 } else {
2213 warn!(
2214 "Invalid CLEANUP_WORKER_THREADS value '{}', keeping current setting",
2215 worker_threads_str
2216 );
2217 self.cleanup.worker_threads.clone()
2218 };
2219 }
2220 self.cleanup.max_retry_attempts = parse_env::<u32>(
2221 "CLEANUP_MAX_RETRY_ATTEMPTS",
2222 self.cleanup.max_retry_attempts,
2223 );
2224
2225 self.cluster_health.enabled =
2227 parse_bool_env("CLUSTER_HEALTH_ENABLED", self.cluster_health.enabled);
2228 self.cluster_health.heartbeat_interval_ms = parse_env::<u64>(
2229 "CLUSTER_HEALTH_HEARTBEAT_INTERVAL",
2230 self.cluster_health.heartbeat_interval_ms,
2231 );
2232 self.cluster_health.node_timeout_ms = parse_env::<u64>(
2233 "CLUSTER_HEALTH_NODE_TIMEOUT",
2234 self.cluster_health.node_timeout_ms,
2235 );
2236 self.cluster_health.cleanup_interval_ms = parse_env::<u64>(
2237 "CLUSTER_HEALTH_CLEANUP_INTERVAL",
2238 self.cluster_health.cleanup_interval_ms,
2239 );
2240
2241 self.tag_filtering.enabled =
2243 parse_bool_env("TAG_FILTERING_ENABLED", self.tag_filtering.enabled);
2244
2245 if let Ok(val) = std::env::var("WEBSOCKET_MAX_MESSAGES") {
2247 if val.to_lowercase() == "none" || val == "0" {
2248 self.websocket.max_messages = None;
2249 } else if let Ok(n) = val.parse::<usize>() {
2250 self.websocket.max_messages = Some(n);
2251 }
2252 }
2253 if let Ok(val) = std::env::var("WEBSOCKET_MAX_BYTES") {
2254 if val.to_lowercase() == "none" || val == "0" {
2255 self.websocket.max_bytes = None;
2256 } else if let Ok(n) = val.parse::<usize>() {
2257 self.websocket.max_bytes = Some(n);
2258 }
2259 }
2260 self.websocket.disconnect_on_buffer_full = parse_bool_env(
2261 "WEBSOCKET_DISCONNECT_ON_BUFFER_FULL",
2262 self.websocket.disconnect_on_buffer_full,
2263 );
2264 self.websocket.max_message_size = parse_env::<usize>(
2265 "WEBSOCKET_MAX_MESSAGE_SIZE",
2266 self.websocket.max_message_size,
2267 );
2268 self.websocket.max_frame_size =
2269 parse_env::<usize>("WEBSOCKET_MAX_FRAME_SIZE", self.websocket.max_frame_size);
2270 self.websocket.write_buffer_size = parse_env::<usize>(
2271 "WEBSOCKET_WRITE_BUFFER_SIZE",
2272 self.websocket.write_buffer_size,
2273 );
2274 self.websocket.max_backpressure = parse_env::<usize>(
2275 "WEBSOCKET_MAX_BACKPRESSURE",
2276 self.websocket.max_backpressure,
2277 );
2278 self.websocket.auto_ping = parse_bool_env("WEBSOCKET_AUTO_PING", self.websocket.auto_ping);
2279 self.websocket.ping_interval =
2280 parse_env::<u32>("WEBSOCKET_PING_INTERVAL", self.websocket.ping_interval);
2281 self.websocket.idle_timeout =
2282 parse_env::<u32>("WEBSOCKET_IDLE_TIMEOUT", self.websocket.idle_timeout);
2283 if let Ok(mode) = std::env::var("WEBSOCKET_COMPRESSION") {
2284 self.websocket.compression = mode;
2285 }
2286
2287 Ok(())
2288 }
2289
2290 pub fn validate(&self) -> Result<(), String> {
2291 if self.unix_socket.enabled {
2292 if self.unix_socket.path.is_empty() {
2293 return Err(
2294 "Unix socket path cannot be empty when Unix socket is enabled".to_string(),
2295 );
2296 }
2297
2298 self.validate_unix_socket_security()?;
2299
2300 if self.ssl.enabled {
2301 tracing::warn!(
2302 "Both Unix socket and SSL are enabled. This is unusual as Unix sockets are typically used behind reverse proxies that handle SSL termination."
2303 );
2304 }
2305
2306 if self.unix_socket.permission_mode > 0o777 {
2307 return Err(format!(
2308 "Unix socket permission_mode ({:o}) is invalid. Must be a valid octal mode (0o000 to 0o777)",
2309 self.unix_socket.permission_mode
2310 ));
2311 }
2312 }
2313
2314 if let Err(e) = self.cleanup.validate() {
2315 return Err(format!("Invalid cleanup configuration: {}", e));
2316 }
2317
2318 Ok(())
2319 }
2320
2321 fn validate_unix_socket_security(&self) -> Result<(), String> {
2322 let path = &self.unix_socket.path;
2323
2324 if path.contains("../") || path.contains("..\\") {
2325 return Err(
2326 "Unix socket path contains directory traversal sequences (../). This is not allowed for security reasons.".to_string()
2327 );
2328 }
2329
2330 if self.unix_socket.permission_mode & 0o002 != 0 {
2331 warn!(
2332 "Unix socket permission mode ({:o}) allows world write access. This may be a security risk. Consider using more restrictive permissions like 0o660 or 0o750.",
2333 self.unix_socket.permission_mode
2334 );
2335 }
2336
2337 if self.unix_socket.permission_mode & 0o007 > 0o005 {
2338 warn!(
2339 "Unix socket permission mode ({:o}) grants write permissions to others. Consider using more restrictive permissions.",
2340 self.unix_socket.permission_mode
2341 );
2342 }
2343
2344 if self.mode == "production" && path.starts_with("/tmp/") {
2345 warn!(
2346 "Unix socket path '{}' is in /tmp directory. In production, consider using a more permanent location like /var/run/sockudo/ for better security and persistence.",
2347 path
2348 );
2349 }
2350
2351 if !path.starts_with('/') {
2352 return Err(
2353 "Unix socket path must be absolute (start with /) for security and reliability."
2354 .to_string(),
2355 );
2356 }
2357
2358 Ok(())
2359 }
2360}
2361
2362#[cfg(test)]
2363mod redis_connection_tests {
2364 use super::{ClusterNode, RedisClusterConnection, RedisConnection, RedisSentinel};
2365
2366 #[test]
2367 fn test_standard_url_basic() {
2368 let conn = RedisConnection {
2369 host: "127.0.0.1".to_string(),
2370 port: 6379,
2371 db: 0,
2372 username: None,
2373 password: None,
2374 key_prefix: "sockudo:".to_string(),
2375 sentinels: Vec::new(),
2376 sentinel_password: None,
2377 name: "mymaster".to_string(),
2378 cluster: RedisClusterConnection::default(),
2379 cluster_nodes: Vec::new(),
2380 };
2381 assert_eq!(conn.to_url(), "redis://127.0.0.1:6379/0");
2382 }
2383
2384 #[test]
2385 fn test_standard_url_with_password() {
2386 let conn = RedisConnection {
2387 host: "127.0.0.1".to_string(),
2388 port: 6379,
2389 db: 2,
2390 username: None,
2391 password: Some("secret".to_string()),
2392 key_prefix: "sockudo:".to_string(),
2393 sentinels: Vec::new(),
2394 sentinel_password: None,
2395 name: "mymaster".to_string(),
2396 cluster: RedisClusterConnection::default(),
2397 cluster_nodes: Vec::new(),
2398 };
2399 assert_eq!(conn.to_url(), "redis://:secret@127.0.0.1:6379/2");
2400 }
2401
2402 #[test]
2403 fn test_standard_url_with_username_and_password() {
2404 let conn = RedisConnection {
2405 host: "redis.example.com".to_string(),
2406 port: 6380,
2407 db: 1,
2408 username: Some("admin".to_string()),
2409 password: Some("pass123".to_string()),
2410 key_prefix: "sockudo:".to_string(),
2411 sentinels: Vec::new(),
2412 sentinel_password: None,
2413 name: "mymaster".to_string(),
2414 cluster: RedisClusterConnection::default(),
2415 cluster_nodes: Vec::new(),
2416 };
2417 assert_eq!(
2418 conn.to_url(),
2419 "redis://admin:pass123@redis.example.com:6380/1"
2420 );
2421 }
2422
2423 #[test]
2424 fn test_standard_url_with_special_chars_in_password() {
2425 let conn = RedisConnection {
2426 host: "127.0.0.1".to_string(),
2427 port: 6379,
2428 db: 0,
2429 username: None,
2430 password: Some("pass@word#123".to_string()),
2431 key_prefix: "sockudo:".to_string(),
2432 sentinels: Vec::new(),
2433 sentinel_password: None,
2434 name: "mymaster".to_string(),
2435 cluster: RedisClusterConnection::default(),
2436 cluster_nodes: Vec::new(),
2437 };
2438 assert_eq!(conn.to_url(), "redis://:pass%40word%23123@127.0.0.1:6379/0");
2439 }
2440
2441 #[test]
2442 fn test_is_sentinel_configured_false() {
2443 let conn = RedisConnection::default();
2444 assert!(!conn.is_sentinel_configured());
2445 }
2446
2447 #[test]
2448 fn test_is_sentinel_configured_true() {
2449 let conn = RedisConnection {
2450 sentinels: vec![RedisSentinel {
2451 host: "sentinel1".to_string(),
2452 port: 26379,
2453 }],
2454 ..Default::default()
2455 };
2456 assert!(conn.is_sentinel_configured());
2457 }
2458
2459 #[test]
2460 fn test_sentinel_url_basic() {
2461 let conn = RedisConnection {
2462 host: "127.0.0.1".to_string(),
2463 port: 6379,
2464 db: 0,
2465 username: None,
2466 password: None,
2467 key_prefix: "sockudo:".to_string(),
2468 sentinels: vec![
2469 RedisSentinel {
2470 host: "sentinel1".to_string(),
2471 port: 26379,
2472 },
2473 RedisSentinel {
2474 host: "sentinel2".to_string(),
2475 port: 26379,
2476 },
2477 ],
2478 sentinel_password: None,
2479 name: "mymaster".to_string(),
2480 cluster: RedisClusterConnection::default(),
2481 cluster_nodes: Vec::new(),
2482 };
2483 assert_eq!(
2484 conn.to_url(),
2485 "redis+sentinel://sentinel1:26379,sentinel2:26379/mymaster/0"
2486 );
2487 }
2488
2489 #[test]
2490 fn test_sentinel_url_with_sentinel_password() {
2491 let conn = RedisConnection {
2492 host: "127.0.0.1".to_string(),
2493 port: 6379,
2494 db: 0,
2495 username: None,
2496 password: None,
2497 key_prefix: "sockudo:".to_string(),
2498 sentinels: vec![RedisSentinel {
2499 host: "sentinel1".to_string(),
2500 port: 26379,
2501 }],
2502 sentinel_password: Some("sentinelpass".to_string()),
2503 name: "mymaster".to_string(),
2504 cluster: RedisClusterConnection::default(),
2505 cluster_nodes: Vec::new(),
2506 };
2507 assert_eq!(
2508 conn.to_url(),
2509 "redis+sentinel://:sentinelpass@sentinel1:26379/mymaster/0"
2510 );
2511 }
2512
2513 #[test]
2514 fn test_sentinel_url_with_master_password() {
2515 let conn = RedisConnection {
2516 host: "127.0.0.1".to_string(),
2517 port: 6379,
2518 db: 1,
2519 username: None,
2520 password: Some("masterpass".to_string()),
2521 key_prefix: "sockudo:".to_string(),
2522 sentinels: vec![RedisSentinel {
2523 host: "sentinel1".to_string(),
2524 port: 26379,
2525 }],
2526 sentinel_password: None,
2527 name: "mymaster".to_string(),
2528 cluster: RedisClusterConnection::default(),
2529 cluster_nodes: Vec::new(),
2530 };
2531 assert_eq!(
2532 conn.to_url(),
2533 "redis+sentinel://sentinel1:26379/mymaster/1?password=masterpass"
2534 );
2535 }
2536
2537 #[test]
2538 fn test_sentinel_url_with_all_auth() {
2539 let conn = RedisConnection {
2540 host: "127.0.0.1".to_string(),
2541 port: 6379,
2542 db: 2,
2543 username: Some("redisuser".to_string()),
2544 password: Some("redispass".to_string()),
2545 key_prefix: "sockudo:".to_string(),
2546 sentinels: vec![
2547 RedisSentinel {
2548 host: "sentinel1".to_string(),
2549 port: 26379,
2550 },
2551 RedisSentinel {
2552 host: "sentinel2".to_string(),
2553 port: 26380,
2554 },
2555 ],
2556 sentinel_password: Some("sentinelauth".to_string()),
2557 name: "production-master".to_string(),
2558 cluster: RedisClusterConnection::default(),
2559 cluster_nodes: Vec::new(),
2560 };
2561 assert_eq!(
2562 conn.to_url(),
2563 "redis+sentinel://:sentinelauth@sentinel1:26379,sentinel2:26380/production-master/2?password=redispass&username=redisuser"
2564 );
2565 }
2566
2567 #[test]
2568 fn test_sentinel_to_host_port() {
2569 let sentinel = RedisSentinel {
2570 host: "sentinel.example.com".to_string(),
2571 port: 26379,
2572 };
2573 assert_eq!(sentinel.to_host_port(), "sentinel.example.com:26379");
2574 }
2575
2576 #[test]
2577 fn test_cluster_node_urls_with_shared_cluster_auth_and_tls() {
2578 let conn = RedisConnection {
2579 cluster: RedisClusterConnection {
2580 nodes: vec![
2581 ClusterNode {
2582 host: "node1.secure-cluster.com".to_string(),
2583 port: 7000,
2584 },
2585 ClusterNode {
2586 host: "redis://node2.secure-cluster.com:7001".to_string(),
2587 port: 7001,
2588 },
2589 ClusterNode {
2590 host: "rediss://node3.secure-cluster.com".to_string(),
2591 port: 7002,
2592 },
2593 ],
2594 username: None,
2595 password: Some("cluster-secret".to_string()),
2596 use_tls: true,
2597 },
2598 ..Default::default()
2599 };
2600
2601 assert_eq!(
2602 conn.cluster_node_urls(),
2603 vec![
2604 "rediss://:cluster-secret@node1.secure-cluster.com:7000",
2605 "redis://:cluster-secret@node2.secure-cluster.com:7001",
2606 "rediss://:cluster-secret@node3.secure-cluster.com:7002",
2607 ]
2608 );
2609 }
2610
2611 #[test]
2612 fn test_cluster_node_urls_fallback_to_legacy_nodes() {
2613 let conn = RedisConnection {
2614 password: Some("fallback-secret".to_string()),
2615 cluster_nodes: vec![ClusterNode {
2616 host: "legacy-node.example.com".to_string(),
2617 port: 7000,
2618 }],
2619 ..Default::default()
2620 };
2621
2622 assert_eq!(
2623 conn.cluster_node_urls(),
2624 vec!["redis://:fallback-secret@legacy-node.example.com:7000"]
2625 );
2626 }
2627
2628 #[test]
2629 fn test_normalize_cluster_seed_urls() {
2630 let conn = RedisConnection {
2631 cluster: RedisClusterConnection {
2632 nodes: Vec::new(),
2633 username: Some("svc-user".to_string()),
2634 password: Some("svc-pass".to_string()),
2635 use_tls: true,
2636 },
2637 ..Default::default()
2638 };
2639
2640 let seeds = vec![
2641 "node1.example.com:7000".to_string(),
2642 "redis://node2.example.com:7001".to_string(),
2643 "rediss://node3.example.com".to_string(),
2644 ];
2645
2646 assert_eq!(
2647 conn.normalize_cluster_seed_urls(&seeds),
2648 vec![
2649 "rediss://svc-user:svc-pass@node1.example.com:7000",
2650 "redis://svc-user:svc-pass@node2.example.com:7001",
2651 "rediss://svc-user:svc-pass@node3.example.com:6379",
2652 ]
2653 );
2654 }
2655}
2656
2657#[cfg(test)]
2658mod cluster_node_tests {
2659 use super::ClusterNode;
2660
2661 #[test]
2662 fn test_to_url_basic_host() {
2663 let node = ClusterNode {
2664 host: "localhost".to_string(),
2665 port: 6379,
2666 };
2667 assert_eq!(node.to_url(), "redis://localhost:6379");
2668 }
2669
2670 #[test]
2671 fn test_to_url_ip_address() {
2672 let node = ClusterNode {
2673 host: "127.0.0.1".to_string(),
2674 port: 6379,
2675 };
2676 assert_eq!(node.to_url(), "redis://127.0.0.1:6379");
2677 }
2678
2679 #[test]
2680 fn test_to_url_with_redis_protocol() {
2681 let node = ClusterNode {
2682 host: "redis://example.com".to_string(),
2683 port: 6379,
2684 };
2685 assert_eq!(node.to_url(), "redis://example.com:6379");
2686 }
2687
2688 #[test]
2689 fn test_to_url_with_rediss_protocol() {
2690 let node = ClusterNode {
2691 host: "rediss://secure.example.com".to_string(),
2692 port: 6379,
2693 };
2694 assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
2695 }
2696
2697 #[test]
2698 fn test_to_url_with_rediss_protocol_and_port_in_url() {
2699 let node = ClusterNode {
2700 host: "rediss://secure.example.com:7000".to_string(),
2701 port: 6379,
2702 };
2703 assert_eq!(node.to_url(), "rediss://secure.example.com:7000");
2704 }
2705
2706 #[test]
2707 fn test_to_url_with_redis_protocol_and_port_in_url() {
2708 let node = ClusterNode {
2709 host: "redis://example.com:7001".to_string(),
2710 port: 6379,
2711 };
2712 assert_eq!(node.to_url(), "redis://example.com:7001");
2713 }
2714
2715 #[test]
2716 fn test_to_url_with_trailing_whitespace() {
2717 let node = ClusterNode {
2718 host: " rediss://secure.example.com ".to_string(),
2719 port: 6379,
2720 };
2721 assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
2722 }
2723
2724 #[test]
2725 fn test_to_url_custom_port() {
2726 let node = ClusterNode {
2727 host: "redis-cluster.example.com".to_string(),
2728 port: 7000,
2729 };
2730 assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7000");
2731 }
2732
2733 #[test]
2734 fn test_to_url_plain_host_with_port_in_host_field() {
2735 let node = ClusterNode {
2736 host: "redis-cluster.example.com:7010".to_string(),
2737 port: 7000,
2738 };
2739 assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7010");
2740 }
2741
2742 #[test]
2743 fn test_to_url_with_options_adds_auth_and_tls() {
2744 let node = ClusterNode {
2745 host: "node.example.com".to_string(),
2746 port: 7000,
2747 };
2748 assert_eq!(
2749 node.to_url_with_options(true, Some("svc-user"), Some("secret")),
2750 "rediss://svc-user:secret@node.example.com:7000"
2751 );
2752 }
2753
2754 #[test]
2755 fn test_to_url_with_options_keeps_embedded_auth() {
2756 let node = ClusterNode {
2757 host: "rediss://:node-secret@node.example.com:7000".to_string(),
2758 port: 7000,
2759 };
2760 assert_eq!(
2761 node.to_url_with_options(true, Some("global-user"), Some("global-secret")),
2762 "rediss://:node-secret@node.example.com:7000"
2763 );
2764 }
2765
2766 #[test]
2767 fn test_from_seed_parses_plain_host_port() {
2768 let node = ClusterNode::from_seed("cluster-node-1:7005").expect("node should parse");
2769 assert_eq!(node.host, "cluster-node-1");
2770 assert_eq!(node.port, 7005);
2771 }
2772
2773 #[test]
2774 fn test_from_seed_keeps_scheme_urls() {
2775 let node =
2776 ClusterNode::from_seed("rediss://secure.example.com:7005").expect("node should parse");
2777 assert_eq!(node.host, "rediss://secure.example.com:7005");
2778 assert_eq!(node.port, 7005);
2779 }
2780
2781 #[test]
2782 fn test_to_url_aws_elasticache_hostname() {
2783 let node = ClusterNode {
2784 host: "rediss://my-cluster.use1.cache.amazonaws.com".to_string(),
2785 port: 6379,
2786 };
2787 assert_eq!(
2788 node.to_url(),
2789 "rediss://my-cluster.use1.cache.amazonaws.com:6379"
2790 );
2791 }
2792
2793 #[test]
2794 fn test_to_url_with_ipv6_no_port() {
2795 let node = ClusterNode {
2796 host: "rediss://[::1]".to_string(),
2797 port: 6379,
2798 };
2799 assert_eq!(node.to_url(), "rediss://[::1]:6379");
2800 }
2801
2802 #[test]
2803 fn test_to_url_with_ipv6_and_port_in_url() {
2804 let node = ClusterNode {
2805 host: "rediss://[::1]:7000".to_string(),
2806 port: 6379,
2807 };
2808 assert_eq!(node.to_url(), "rediss://[::1]:7000");
2809 }
2810
2811 #[test]
2812 fn test_to_url_with_ipv6_full_address_no_port() {
2813 let node = ClusterNode {
2814 host: "rediss://[2001:db8::1]".to_string(),
2815 port: 6379,
2816 };
2817 assert_eq!(node.to_url(), "rediss://[2001:db8::1]:6379");
2818 }
2819
2820 #[test]
2821 fn test_to_url_with_ipv6_full_address_with_port() {
2822 let node = ClusterNode {
2823 host: "rediss://[2001:db8::1]:7000".to_string(),
2824 port: 6379,
2825 };
2826 assert_eq!(node.to_url(), "rediss://[2001:db8::1]:7000");
2827 }
2828
2829 #[test]
2830 fn test_to_url_with_redis_protocol_ipv6() {
2831 let node = ClusterNode {
2832 host: "redis://[::1]".to_string(),
2833 port: 6379,
2834 };
2835 assert_eq!(node.to_url(), "redis://[::1]:6379");
2836 }
2837}