1use crate::app::App;
2use crate::utils::{parse_bool_env, parse_env, parse_env_optional};
3use ahash::AHashMap;
4use serde::{Deserialize, Serialize};
5use std::str::FromStr;
6use tracing::{info, warn};
7use url::Url;
8
9fn deserialize_octal_permission<'de, D>(deserializer: D) -> Result<u32, D::Error>
11where
12 D: serde::Deserializer<'de>,
13{
14 use serde::de::{self};
15
16 let s = String::deserialize(deserializer)?;
17
18 if !s.chars().all(|c| c.is_digit(8)) {
20 return Err(de::Error::custom(format!(
21 "invalid octal permission mode '{}': must contain only digits 0-7",
22 s
23 )));
24 }
25
26 let mode = u32::from_str_radix(&s, 8)
28 .map_err(|_| de::Error::custom(format!("invalid octal permission mode: {}", s)))?;
29
30 if mode > 0o777 {
32 return Err(de::Error::custom(format!(
33 "permission mode '{}' exceeds maximum value 777",
34 s
35 )));
36 }
37
38 Ok(mode)
39}
40
41fn parse_driver_enum<T: FromStr + Clone + std::fmt::Debug>(
43 driver_str: String,
44 default_driver: T,
45 driver_name: &str,
46) -> T
47where
48 <T as FromStr>::Err: std::fmt::Debug,
49{
50 match T::from_str(&driver_str.to_lowercase()) {
51 Ok(driver_enum) => driver_enum,
52 Err(e) => {
53 warn!(
54 "Failed to parse {} driver from string '{}': {:?}. Using default: {:?}.",
55 driver_name, driver_str, e, default_driver
56 );
57 default_driver
58 }
59 }
60}
61
62fn override_db_pool_settings(db_conn: &mut DatabaseConnection, prefix: &str) {
63 if let Some(min) = parse_env_optional::<u32>(&format!("{}_POOL_MIN", prefix)) {
64 db_conn.pool_min = Some(min);
65 }
66 if let Some(max) = parse_env_optional::<u32>(&format!("{}_POOL_MAX", prefix)) {
67 db_conn.pool_max = Some(max);
68 }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(rename_all = "lowercase")]
75pub enum AdapterDriver {
76 #[default]
77 Local,
78 Redis,
79 #[serde(rename = "redis-cluster")]
80 RedisCluster,
81 Nats,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
85#[serde(default)]
86pub struct DynamoDbSettings {
87 pub region: String,
88 pub table_name: String,
89 pub endpoint_url: Option<String>,
90 pub aws_access_key_id: Option<String>,
91 pub aws_secret_access_key: Option<String>,
92 pub aws_profile_name: Option<String>,
93}
94
95impl Default for DynamoDbSettings {
96 fn default() -> Self {
97 Self {
98 region: "us-east-1".to_string(),
99 table_name: "sockudo-applications".to_string(),
100 endpoint_url: None,
101 aws_access_key_id: None,
102 aws_secret_access_key: None,
103 aws_profile_name: None,
104 }
105 }
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
109#[serde(default)]
110pub struct ScyllaDbSettings {
111 pub nodes: Vec<String>,
112 pub keyspace: String,
113 pub table_name: String,
114 pub username: Option<String>,
115 pub password: Option<String>,
116 pub replication_class: String,
117 pub replication_factor: u32,
118}
119
120impl Default for ScyllaDbSettings {
121 fn default() -> Self {
122 Self {
123 nodes: vec!["127.0.0.1:9042".to_string()],
124 keyspace: "sockudo".to_string(),
125 table_name: "applications".to_string(),
126 username: None,
127 password: None,
128 replication_class: "SimpleStrategy".to_string(),
129 replication_factor: 3,
130 }
131 }
132}
133
134impl FromStr for AdapterDriver {
135 type Err = String;
136 fn from_str(s: &str) -> Result<Self, Self::Err> {
137 match s.to_lowercase().as_str() {
138 "local" => Ok(AdapterDriver::Local),
139 "redis" => Ok(AdapterDriver::Redis),
140 "redis-cluster" => Ok(AdapterDriver::RedisCluster),
141 "nats" => Ok(AdapterDriver::Nats),
142 _ => Err(format!("Unknown adapter driver: {s}")),
143 }
144 }
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
148#[serde(rename_all = "lowercase")]
149pub enum AppManagerDriver {
150 #[default]
151 Memory,
152 Mysql,
153 Dynamodb,
154 PgSql,
155 ScyllaDb,
156}
157impl FromStr for AppManagerDriver {
158 type Err = String;
159 fn from_str(s: &str) -> Result<Self, Self::Err> {
160 match s.to_lowercase().as_str() {
161 "memory" => Ok(AppManagerDriver::Memory),
162 "mysql" => Ok(AppManagerDriver::Mysql),
163 "dynamodb" => Ok(AppManagerDriver::Dynamodb),
164 "pgsql" | "postgres" | "postgresql" => Ok(AppManagerDriver::PgSql),
165 "scylladb" | "scylla" => Ok(AppManagerDriver::ScyllaDb),
166 _ => Err(format!("Unknown app manager driver: {s}")),
167 }
168 }
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
172#[serde(rename_all = "lowercase")]
173pub enum CacheDriver {
174 #[default]
175 Memory,
176 Redis,
177 #[serde(rename = "redis-cluster")]
178 RedisCluster,
179 None,
180}
181
182impl FromStr for CacheDriver {
183 type Err = String;
184 fn from_str(s: &str) -> Result<Self, Self::Err> {
185 match s.to_lowercase().as_str() {
186 "memory" => Ok(CacheDriver::Memory),
187 "redis" => Ok(CacheDriver::Redis),
188 "redis-cluster" => Ok(CacheDriver::RedisCluster),
189 "none" => Ok(CacheDriver::None),
190 _ => Err(format!("Unknown cache driver: {s}")),
191 }
192 }
193}
194
195#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
196#[serde(rename_all = "lowercase")]
197pub enum QueueDriver {
198 Memory,
199 #[default]
200 Redis,
201 #[serde(rename = "redis-cluster")]
202 RedisCluster,
203 Sqs,
204 Sns,
205 None,
206}
207
208impl FromStr for QueueDriver {
209 type Err = String;
210 fn from_str(s: &str) -> Result<Self, Self::Err> {
211 match s.to_lowercase().as_str() {
212 "memory" => Ok(QueueDriver::Memory),
213 "redis" => Ok(QueueDriver::Redis),
214 "redis-cluster" => Ok(QueueDriver::RedisCluster),
215 "sqs" => Ok(QueueDriver::Sqs),
216 "sns" => Ok(QueueDriver::Sns),
217 "none" => Ok(QueueDriver::None),
218 _ => Err(format!("Unknown queue driver: {s}")),
219 }
220 }
221}
222
223impl AsRef<str> for QueueDriver {
224 fn as_ref(&self) -> &str {
225 match self {
226 QueueDriver::Memory => "memory",
227 QueueDriver::Redis => "redis",
228 QueueDriver::RedisCluster => "redis-cluster",
229 QueueDriver::Sqs => "sqs",
230 QueueDriver::Sns => "sns",
231 QueueDriver::None => "none",
232 }
233 }
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
237#[serde(default)]
238pub struct RedisClusterQueueConfig {
239 pub concurrency: u32,
240 pub prefix: Option<String>,
241 pub nodes: Vec<String>,
242 pub request_timeout_ms: u64,
243}
244
245impl Default for RedisClusterQueueConfig {
246 fn default() -> Self {
247 Self {
248 concurrency: 5,
249 prefix: Some("sockudo_queue:".to_string()),
250 nodes: vec!["redis://127.0.0.1:6379".to_string()],
251 request_timeout_ms: 5000,
252 }
253 }
254}
255
256#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
257#[serde(rename_all = "lowercase")]
258pub enum MetricsDriver {
259 #[default]
260 Prometheus,
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
264#[serde(rename_all = "lowercase")]
265pub enum LogOutputFormat {
266 #[default]
267 Human,
268 Json,
269}
270
271impl FromStr for LogOutputFormat {
272 type Err = String;
273 fn from_str(s: &str) -> Result<Self, Self::Err> {
274 match s.to_lowercase().as_str() {
275 "human" => Ok(LogOutputFormat::Human),
276 "json" => Ok(LogOutputFormat::Json),
277 _ => Err(format!("Unknown log output format: {s}")),
278 }
279 }
280}
281
282impl FromStr for MetricsDriver {
283 type Err = String;
284 fn from_str(s: &str) -> Result<Self, Self::Err> {
285 match s.to_lowercase().as_str() {
286 "prometheus" => Ok(MetricsDriver::Prometheus),
287 _ => Err(format!("Unknown metrics driver: {s}")),
288 }
289 }
290}
291
292impl AsRef<str> for MetricsDriver {
293 fn as_ref(&self) -> &str {
294 match self {
295 MetricsDriver::Prometheus => "prometheus",
296 }
297 }
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize)]
302#[serde(default)]
303pub struct ServerOptions {
304 pub adapter: AdapterConfig,
305 pub app_manager: AppManagerConfig,
306 pub cache: CacheConfig,
307 pub channel_limits: ChannelLimits,
308 pub cors: CorsConfig,
309 pub database: DatabaseConfig,
310 pub database_pooling: DatabasePooling,
311 pub debug: bool,
312 pub event_limits: EventLimits,
313 pub host: String,
314 pub http_api: HttpApiConfig,
315 pub instance: InstanceConfig,
316 pub logging: Option<LoggingConfig>,
317 pub metrics: MetricsConfig,
318 pub mode: String,
319 pub port: u16,
320 pub path_prefix: String,
321 pub presence: PresenceConfig,
322 pub queue: QueueConfig,
323 pub rate_limiter: RateLimiterConfig,
324 pub shutdown_grace_period: u64,
325 pub ssl: SslConfig,
326 pub user_authentication_timeout: u64,
327 pub webhooks: WebhooksConfig,
328 pub websocket_max_payload_kb: u32,
329 pub cleanup: CleanupConfig,
330 pub activity_timeout: u64,
331 pub cluster_health: ClusterHealthConfig,
332 pub unix_socket: UnixSocketConfig,
333 pub delta_compression: DeltaCompressionOptionsConfig,
334 pub tag_filtering: TagFilteringConfig,
335 pub websocket: WebSocketConfig,
336}
337
338#[derive(Debug, Clone, Serialize, Deserialize)]
341#[serde(default)]
342pub struct SqsQueueConfig {
343 pub region: String,
344 pub queue_url_prefix: Option<String>,
345 pub visibility_timeout: i32,
346 pub endpoint_url: Option<String>,
347 pub max_messages: i32,
348 pub wait_time_seconds: i32,
349 pub concurrency: u32,
350 pub fifo: bool,
351 pub message_group_id: Option<String>,
352}
353
354#[derive(Debug, Clone, Serialize, Deserialize)]
355#[serde(default)]
356pub struct SnsQueueConfig {
357 pub region: String,
358 pub topic_arn: String,
359 pub endpoint_url: Option<String>,
360}
361
362#[derive(Debug, Clone, Serialize, Deserialize)]
363#[serde(default)]
364pub struct AdapterConfig {
365 pub driver: AdapterDriver,
366 pub redis: RedisAdapterConfig,
367 pub cluster: RedisClusterAdapterConfig,
368 pub nats: NatsAdapterConfig,
369 #[serde(default = "default_buffer_multiplier_per_cpu")]
370 pub buffer_multiplier_per_cpu: usize,
371 pub cluster_health: ClusterHealthConfig,
372 #[serde(default = "default_enable_socket_counting")]
373 pub enable_socket_counting: bool,
374}
375
376fn default_enable_socket_counting() -> bool {
377 true
378}
379
380fn default_buffer_multiplier_per_cpu() -> usize {
381 64
382}
383
384impl Default for AdapterConfig {
385 fn default() -> Self {
386 Self {
387 driver: AdapterDriver::default(),
388 redis: RedisAdapterConfig::default(),
389 cluster: RedisClusterAdapterConfig::default(),
390 nats: NatsAdapterConfig::default(),
391 buffer_multiplier_per_cpu: default_buffer_multiplier_per_cpu(),
392 cluster_health: ClusterHealthConfig::default(),
393 enable_socket_counting: default_enable_socket_counting(),
394 }
395 }
396}
397
398#[derive(Debug, Clone, Serialize, Deserialize)]
399#[serde(default)]
400pub struct RedisAdapterConfig {
401 pub requests_timeout: u64,
402 pub prefix: String,
403 pub redis_pub_options: AHashMap<String, sonic_rs::Value>,
404 pub redis_sub_options: AHashMap<String, sonic_rs::Value>,
405 pub cluster_mode: bool,
406}
407
408#[derive(Debug, Clone, Serialize, Deserialize)]
409#[serde(default)]
410pub struct RedisClusterAdapterConfig {
411 pub nodes: Vec<String>,
412 pub prefix: String,
413 pub request_timeout_ms: u64,
414 pub use_connection_manager: bool,
415 #[serde(default)]
416 pub use_sharded_pubsub: bool,
417}
418
419#[derive(Debug, Clone, Serialize, Deserialize)]
420#[serde(default)]
421pub struct NatsAdapterConfig {
422 pub servers: Vec<String>,
423 pub prefix: String,
424 pub request_timeout_ms: u64,
425 pub username: Option<String>,
426 pub password: Option<String>,
427 pub token: Option<String>,
428 pub connection_timeout_ms: u64,
429 pub nodes_number: Option<u32>,
430}
431
432#[derive(Debug, Clone, Serialize, Deserialize, Default)]
433#[serde(default)]
434pub struct AppManagerConfig {
435 pub driver: AppManagerDriver,
436 pub array: ArrayConfig,
437 pub cache: CacheSettings,
438 pub scylladb: ScyllaDbSettings,
439}
440
441#[derive(Debug, Clone, Serialize, Deserialize, Default)]
442#[serde(default)]
443pub struct ArrayConfig {
444 pub apps: Vec<App>,
445}
446
447#[derive(Debug, Clone, Serialize, Deserialize)]
448#[serde(default)]
449pub struct CacheSettings {
450 pub enabled: bool,
451 pub ttl: u64,
452}
453
454#[derive(Debug, Clone, Serialize, Deserialize)]
455#[serde(default)]
456pub struct MemoryCacheOptions {
457 pub ttl: u64,
458 pub cleanup_interval: u64,
459 pub max_capacity: u64,
460}
461
462#[derive(Debug, Clone, Serialize, Deserialize)]
463#[serde(default)]
464pub struct CacheConfig {
465 pub driver: CacheDriver,
466 pub redis: RedisConfig,
467 pub memory: MemoryCacheOptions,
468}
469
470#[derive(Debug, Clone, Serialize, Deserialize, Default)]
471#[serde(default)]
472pub struct RedisConfig {
473 pub prefix: Option<String>,
474 pub url_override: Option<String>,
475 pub cluster_mode: bool,
476}
477
478#[derive(Debug, Clone, Serialize, Deserialize)]
479#[serde(default)]
480pub struct ChannelLimits {
481 pub max_name_length: u32,
482 pub cache_ttl: u64,
483}
484
485#[derive(Debug, Clone, Serialize, Deserialize)]
486#[serde(default)]
487pub struct CorsConfig {
488 pub credentials: bool,
489 pub origin: Vec<String>,
490 pub methods: Vec<String>,
491 pub allowed_headers: Vec<String>,
492}
493
494#[derive(Debug, Clone, Serialize, Deserialize, Default)]
495#[serde(default)]
496pub struct DatabaseConfig {
497 pub mysql: DatabaseConnection,
498 pub postgres: DatabaseConnection,
499 pub redis: RedisConnection,
500 pub dynamodb: DynamoDbSettings,
501 pub scylladb: ScyllaDbSettings,
502}
503
504#[derive(Debug, Clone, Serialize, Deserialize)]
505#[serde(default)]
506pub struct DatabaseConnection {
507 pub host: String,
508 pub port: u16,
509 pub username: String,
510 pub password: String,
511 pub database: String,
512 pub table_name: String,
513 pub connection_pool_size: u32,
514 pub pool_min: Option<u32>,
515 pub pool_max: Option<u32>,
516 pub cache_ttl: u64,
517 pub cache_cleanup_interval: u64,
518 pub cache_max_capacity: u64,
519}
520
521#[derive(Debug, Clone, Serialize, Deserialize)]
522#[serde(default)]
523pub struct RedisConnection {
524 pub host: String,
525 pub port: u16,
526 pub db: u32,
527 pub username: Option<String>,
528 pub password: Option<String>,
529 pub key_prefix: String,
530 pub sentinels: Vec<RedisSentinel>,
531 pub sentinel_password: Option<String>,
532 pub name: String,
533 pub cluster: RedisClusterConnection,
534 pub cluster_nodes: Vec<ClusterNode>,
536}
537
538#[derive(Debug, Clone, Serialize, Deserialize)]
539#[serde(default)]
540pub struct RedisSentinel {
541 pub host: String,
542 pub port: u16,
543}
544
545#[derive(Debug, Clone, Serialize, Deserialize, Default)]
546#[serde(default)]
547pub struct RedisClusterConnection {
548 pub nodes: Vec<ClusterNode>,
549 pub username: Option<String>,
550 pub password: Option<String>,
551 #[serde(alias = "useTLS")]
552 pub use_tls: bool,
553}
554
555impl RedisConnection {
556 pub fn is_sentinel_configured(&self) -> bool {
558 !self.sentinels.is_empty()
559 }
560
561 pub fn to_url(&self) -> String {
563 if self.is_sentinel_configured() {
564 self.build_sentinel_url()
565 } else {
566 self.build_standard_url()
567 }
568 }
569
570 fn build_standard_url(&self) -> String {
571 let (scheme, host) = if self.host.starts_with("rediss://") {
573 ("rediss://", self.host.trim_start_matches("rediss://"))
574 } else if self.host.starts_with("redis://") {
575 ("redis://", self.host.trim_start_matches("redis://"))
576 } else {
577 ("redis://", self.host.as_str())
578 };
579
580 let mut url = String::from(scheme);
581
582 if let Some(ref username) = self.username {
583 url.push_str(username);
584 if let Some(ref password) = self.password {
585 url.push(':');
586 url.push_str(&urlencoding::encode(password));
587 }
588 url.push('@');
589 } else if let Some(ref password) = self.password {
590 url.push(':');
591 url.push_str(&urlencoding::encode(password));
592 url.push('@');
593 }
594
595 url.push_str(host);
596 url.push(':');
597 url.push_str(&self.port.to_string());
598 url.push('/');
599 url.push_str(&self.db.to_string());
600
601 url
602 }
603
604 fn build_sentinel_url(&self) -> String {
605 let mut url = String::from("redis+sentinel://");
606
607 if let Some(ref sentinel_password) = self.sentinel_password {
608 url.push(':');
609 url.push_str(&urlencoding::encode(sentinel_password));
610 url.push('@');
611 }
612
613 let sentinel_hosts: Vec<String> = self
614 .sentinels
615 .iter()
616 .map(|s| format!("{}:{}", s.host, s.port))
617 .collect();
618 url.push_str(&sentinel_hosts.join(","));
619
620 url.push('/');
621 url.push_str(&self.name);
622 url.push('/');
623 url.push_str(&self.db.to_string());
624
625 let mut params = Vec::new();
626 if let Some(ref password) = self.password {
627 params.push(format!("password={}", urlencoding::encode(password)));
628 }
629 if let Some(ref username) = self.username {
630 params.push(format!("username={}", urlencoding::encode(username)));
631 }
632
633 if !params.is_empty() {
634 url.push('?');
635 url.push_str(¶ms.join("&"));
636 }
637
638 url
639 }
640
641 pub fn has_cluster_nodes(&self) -> bool {
644 !self.cluster.nodes.is_empty() || !self.cluster_nodes.is_empty()
645 }
646
647 pub fn cluster_node_urls(&self) -> Vec<String> {
650 if !self.cluster.nodes.is_empty() {
651 return self.build_cluster_urls(&self.cluster.nodes);
652 }
653 self.build_cluster_urls(&self.cluster_nodes)
654 }
655
656 pub fn normalize_cluster_seed_urls(&self, seeds: &[String]) -> Vec<String> {
659 self.build_cluster_urls(
660 &seeds
661 .iter()
662 .filter_map(|seed| ClusterNode::from_seed(seed))
663 .collect::<Vec<ClusterNode>>(),
664 )
665 }
666
667 fn build_cluster_urls(&self, nodes: &[ClusterNode]) -> Vec<String> {
668 let username = self
669 .cluster
670 .username
671 .as_deref()
672 .or(self.username.as_deref());
673 let password = self
674 .cluster
675 .password
676 .as_deref()
677 .or(self.password.as_deref());
678 let use_tls = self.cluster.use_tls;
679
680 nodes
681 .iter()
682 .map(|node| node.to_url_with_options(use_tls, username, password))
683 .collect()
684 }
685}
686
687impl RedisSentinel {
688 pub fn to_host_port(&self) -> String {
689 format!("{}:{}", self.host, self.port)
690 }
691}
692
693#[derive(Debug, Clone, Serialize, Deserialize)]
694#[serde(default)]
695pub struct ClusterNode {
696 pub host: String,
697 pub port: u16,
698}
699
700impl ClusterNode {
701 pub fn to_url(&self) -> String {
702 self.to_url_with_options(false, None, None)
703 }
704
705 pub fn to_url_with_options(
706 &self,
707 use_tls: bool,
708 username: Option<&str>,
709 password: Option<&str>,
710 ) -> String {
711 let host = self.host.trim();
712
713 if host.starts_with("redis://") || host.starts_with("rediss://") {
714 if let Ok(parsed) = Url::parse(host)
715 && let Some(host_str) = parsed.host_str()
716 {
717 let scheme = parsed.scheme();
718 let port = parsed.port_or_known_default().unwrap_or(self.port);
719 let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
720 let parsed_password = parsed.password();
721 let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
722 let (effective_username, effective_password) = if has_embedded_auth {
723 (parsed_username, parsed_password)
724 } else {
725 (username, password)
726 };
727
728 return build_redis_url(
729 scheme,
730 host_str,
731 port,
732 effective_username,
733 effective_password,
734 );
735 }
736
737 let has_port = if let Some(bracket_pos) = host.rfind(']') {
739 host[bracket_pos..].contains(':')
740 } else {
741 host.split(':').count() >= 3
742 };
743 let base = if has_port {
744 host.to_string()
745 } else {
746 format!("{}:{}", host, self.port)
747 };
748
749 if let Ok(parsed) = Url::parse(&base) {
750 let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
751 let parsed_password = parsed.password();
752 if let Some(host_str) = parsed.host_str() {
753 let port = parsed.port_or_known_default().unwrap_or(self.port);
754 let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
755 let (effective_username, effective_password) = if has_embedded_auth {
756 (parsed_username, parsed_password)
757 } else {
758 (username, password)
759 };
760 return build_redis_url(
761 parsed.scheme(),
762 host_str,
763 port,
764 effective_username,
765 effective_password,
766 );
767 }
768 }
769 return base;
770 }
771
772 let (normalized_host, normalized_port) = split_plain_host_and_port(host, self.port);
773 let scheme = if use_tls { "rediss" } else { "redis" };
774 build_redis_url(
775 scheme,
776 &normalized_host,
777 normalized_port,
778 username,
779 password,
780 )
781 }
782
783 pub fn from_seed(seed: &str) -> Option<Self> {
784 let trimmed = seed.trim();
785 if trimmed.is_empty() {
786 return None;
787 }
788
789 if trimmed.starts_with("redis://") || trimmed.starts_with("rediss://") {
790 let port = Url::parse(trimmed)
791 .ok()
792 .and_then(|parsed| parsed.port_or_known_default())
793 .unwrap_or(6379);
794 return Some(Self {
795 host: trimmed.to_string(),
796 port,
797 });
798 }
799
800 let (host, port) = split_plain_host_and_port(trimmed, 6379);
801 Some(Self { host, port })
802 }
803}
804
805fn split_plain_host_and_port(raw_host: &str, default_port: u16) -> (String, u16) {
806 let host = raw_host.trim();
807
808 if host.starts_with('[') {
810 if let Some(end_bracket) = host.find(']') {
811 let host_part = host[1..end_bracket].to_string();
812 let remainder = &host[end_bracket + 1..];
813 if let Some(port_str) = remainder.strip_prefix(':')
814 && let Ok(port) = port_str.parse::<u16>()
815 {
816 return (host_part, port);
817 }
818 return (host_part, default_port);
819 }
820 return (host.to_string(), default_port);
821 }
822
823 if host.matches(':').count() == 1
825 && let Some((host_part, port_part)) = host.rsplit_once(':')
826 && let Ok(port) = port_part.parse::<u16>()
827 {
828 return (host_part.to_string(), port);
829 }
830
831 (host.to_string(), default_port)
832}
833
834fn build_redis_url(
835 scheme: &str,
836 host: &str,
837 port: u16,
838 username: Option<&str>,
839 password: Option<&str>,
840) -> String {
841 let mut url = format!("{scheme}://");
842
843 if let Some(user) = username {
844 url.push_str(&urlencoding::encode(user));
845 if let Some(pass) = password {
846 url.push(':');
847 url.push_str(&urlencoding::encode(pass));
848 }
849 url.push('@');
850 } else if let Some(pass) = password {
851 url.push(':');
852 url.push_str(&urlencoding::encode(pass));
853 url.push('@');
854 }
855
856 if host.contains(':') && !host.starts_with('[') {
857 url.push('[');
858 url.push_str(host);
859 url.push(']');
860 } else {
861 url.push_str(host);
862 }
863 url.push(':');
864 url.push_str(&port.to_string());
865 url
866}
867
868#[derive(Debug, Clone, Serialize, Deserialize)]
869#[serde(default)]
870pub struct DatabasePooling {
871 pub enabled: bool,
872 pub min: u32,
873 pub max: u32,
874}
875
876#[derive(Debug, Clone, Serialize, Deserialize)]
877#[serde(default)]
878pub struct EventLimits {
879 pub max_channels_at_once: u32,
880 pub max_name_length: u32,
881 pub max_payload_in_kb: u32,
882 pub max_batch_size: u32,
883}
884
885#[derive(Debug, Clone, Serialize, Deserialize)]
886#[serde(default)]
887pub struct HttpApiConfig {
888 pub request_limit_in_mb: u32,
889 pub accept_traffic: AcceptTraffic,
890 pub usage_enabled: bool,
891}
892
893#[derive(Debug, Clone, Serialize, Deserialize)]
894#[serde(default)]
895pub struct AcceptTraffic {
896 pub memory_threshold: f64,
897}
898
899#[derive(Debug, Clone, Serialize, Deserialize)]
900#[serde(default)]
901pub struct InstanceConfig {
902 pub process_id: String,
903}
904
905#[derive(Debug, Clone, Serialize, Deserialize)]
906#[serde(default)]
907pub struct MetricsConfig {
908 pub enabled: bool,
909 pub driver: MetricsDriver,
910 pub host: String,
911 pub prometheus: PrometheusConfig,
912 pub port: u16,
913}
914
915#[derive(Debug, Clone, Serialize, Deserialize)]
916#[serde(default)]
917pub struct PrometheusConfig {
918 pub prefix: String,
919}
920
921#[derive(Debug, Clone, Serialize, Deserialize)]
922#[serde(default)]
923pub struct LoggingConfig {
924 pub colors_enabled: bool,
925 pub include_target: bool,
926}
927
928#[derive(Debug, Clone, Serialize, Deserialize)]
929#[serde(default)]
930pub struct PresenceConfig {
931 pub max_members_per_channel: u32,
932 pub max_member_size_in_kb: u32,
933}
934
935#[derive(Debug, Clone, Serialize, Deserialize)]
938#[serde(default)]
939pub struct WebSocketConfig {
940 pub max_messages: Option<usize>,
941 pub max_bytes: Option<usize>,
942 pub disconnect_on_buffer_full: bool,
943 pub max_message_size: usize,
944 pub max_frame_size: usize,
945 pub write_buffer_size: usize,
946 pub max_backpressure: usize,
947 pub auto_ping: bool,
948 pub ping_interval: u32,
949 pub idle_timeout: u32,
950 pub compression: String,
951}
952
953impl Default for WebSocketConfig {
954 fn default() -> Self {
955 Self {
956 max_messages: Some(1000),
957 max_bytes: None,
958 disconnect_on_buffer_full: true,
959 max_message_size: 64 * 1024 * 1024,
960 max_frame_size: 16 * 1024 * 1024,
961 write_buffer_size: 16 * 1024,
962 max_backpressure: 1024 * 1024,
963 auto_ping: true,
964 ping_interval: 30,
965 idle_timeout: 120,
966 compression: "disabled".to_string(),
967 }
968 }
969}
970
971impl WebSocketConfig {
972 pub fn to_buffer_config(&self) -> crate::websocket::WebSocketBufferConfig {
974 use crate::websocket::{BufferLimit, WebSocketBufferConfig};
975
976 let limit = match (self.max_messages, self.max_bytes) {
977 (Some(messages), Some(bytes)) => BufferLimit::Both { messages, bytes },
978 (Some(messages), None) => BufferLimit::Messages(messages),
979 (None, Some(bytes)) => BufferLimit::Bytes(bytes),
980 (None, None) => BufferLimit::Messages(1000),
981 };
982
983 WebSocketBufferConfig {
984 limit,
985 disconnect_on_full: self.disconnect_on_buffer_full,
986 }
987 }
988
989 pub fn to_sockudo_ws_config(
991 &self,
992 websocket_max_payload_kb: u32,
993 activity_timeout: u64,
994 ) -> sockudo_ws::Config {
995 use sockudo_ws::Compression;
996
997 let compression = match self.compression.to_lowercase().as_str() {
998 "dedicated" => Compression::Dedicated,
999 "shared" => Compression::Shared,
1000 "window256b" => Compression::Window256B,
1001 "window1kb" => Compression::Window1KB,
1002 "window2kb" => Compression::Window2KB,
1003 "window4kb" => Compression::Window4KB,
1004 "window8kb" => Compression::Window8KB,
1005 "window16kb" => Compression::Window16KB,
1006 "window32kb" => Compression::Window32KB,
1007 _ => Compression::Disabled,
1008 };
1009
1010 sockudo_ws::Config::builder()
1011 .max_payload_length(
1012 self.max_bytes
1013 .unwrap_or(websocket_max_payload_kb as usize * 1024),
1014 )
1015 .max_message_size(self.max_message_size)
1016 .max_frame_size(self.max_frame_size)
1017 .write_buffer_size(self.write_buffer_size)
1018 .max_backpressure(self.max_backpressure)
1019 .idle_timeout(self.idle_timeout)
1020 .auto_ping(self.auto_ping)
1021 .ping_interval(self.ping_interval.max((activity_timeout / 2).max(5) as u32))
1022 .compression(compression)
1023 .build()
1024 }
1025}
1026
1027#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1028#[serde(default)]
1029pub struct QueueConfig {
1030 pub driver: QueueDriver,
1031 pub redis: RedisQueueConfig,
1032 pub redis_cluster: RedisClusterQueueConfig,
1033 pub sqs: SqsQueueConfig,
1034 pub sns: SnsQueueConfig,
1035}
1036
1037#[derive(Debug, Clone, Serialize, Deserialize)]
1038#[serde(default)]
1039pub struct RedisQueueConfig {
1040 pub concurrency: u32,
1041 pub prefix: Option<String>,
1042 pub url_override: Option<String>,
1043 pub cluster_mode: bool,
1044}
1045
1046#[derive(Clone, Debug, Serialize, Deserialize)]
1047#[serde(default)]
1048pub struct RateLimit {
1049 pub max_requests: u32,
1050 pub window_seconds: u64,
1051 pub identifier: Option<String>,
1052 pub trust_hops: Option<u32>,
1053}
1054
1055#[derive(Debug, Clone, Serialize, Deserialize)]
1056#[serde(default)]
1057pub struct RateLimiterConfig {
1058 pub enabled: bool,
1059 pub driver: CacheDriver,
1060 pub api_rate_limit: RateLimit,
1061 pub websocket_rate_limit: RateLimit,
1062 pub redis: RedisConfig,
1063}
1064
1065#[derive(Debug, Clone, Serialize, Deserialize)]
1066#[serde(default)]
1067pub struct SslConfig {
1068 pub enabled: bool,
1069 pub cert_path: String,
1070 pub key_path: String,
1071 pub passphrase: Option<String>,
1072 pub ca_path: Option<String>,
1073 pub redirect_http: bool,
1074 pub http_port: Option<u16>,
1075}
1076
1077#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1078#[serde(default)]
1079pub struct WebhooksConfig {
1080 pub batching: BatchingConfig,
1081}
1082
1083#[derive(Debug, Clone, Serialize, Deserialize)]
1084#[serde(default)]
1085pub struct BatchingConfig {
1086 pub enabled: bool,
1087 pub duration: u64,
1088 pub size: usize,
1089}
1090
1091#[derive(Debug, Clone, Serialize, Deserialize)]
1092#[serde(default)]
1093pub struct ClusterHealthConfig {
1094 pub enabled: bool,
1095 pub heartbeat_interval_ms: u64,
1096 pub node_timeout_ms: u64,
1097 pub cleanup_interval_ms: u64,
1098}
1099
1100#[derive(Debug, Clone, Serialize, Deserialize)]
1101#[serde(default)]
1102pub struct UnixSocketConfig {
1103 pub enabled: bool,
1104 pub path: String,
1105 #[serde(deserialize_with = "deserialize_octal_permission")]
1106 pub permission_mode: u32,
1107}
1108
1109#[derive(Debug, Clone, Serialize, Deserialize)]
1110#[serde(default)]
1111pub struct DeltaCompressionOptionsConfig {
1112 pub enabled: bool,
1113 pub algorithm: String,
1114 pub full_message_interval: u32,
1115 pub min_message_size: usize,
1116 pub max_state_age_secs: u64,
1117 pub max_channel_states_per_socket: usize,
1118 pub max_conflation_states_per_channel: Option<usize>,
1119 pub conflation_key_path: Option<String>,
1120 pub cluster_coordination: bool,
1121 pub omit_delta_algorithm: bool,
1122}
1123
1124#[derive(Debug, Clone, Serialize, Deserialize)]
1126#[serde(default)]
1127pub struct CleanupConfig {
1128 pub queue_buffer_size: usize,
1129 pub batch_size: usize,
1130 pub batch_timeout_ms: u64,
1131 pub worker_threads: WorkerThreadsConfig,
1132 pub max_retry_attempts: u32,
1133 pub async_enabled: bool,
1134 pub fallback_to_sync: bool,
1135}
1136
1137#[derive(Debug, Clone)]
1139pub enum WorkerThreadsConfig {
1140 Auto,
1141 Fixed(usize),
1142}
1143
1144impl serde::Serialize for WorkerThreadsConfig {
1145 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1146 where
1147 S: serde::Serializer,
1148 {
1149 match self {
1150 WorkerThreadsConfig::Auto => serializer.serialize_str("auto"),
1151 WorkerThreadsConfig::Fixed(n) => serializer.serialize_u64(*n as u64),
1152 }
1153 }
1154}
1155
1156impl<'de> serde::Deserialize<'de> for WorkerThreadsConfig {
1157 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1158 where
1159 D: serde::Deserializer<'de>,
1160 {
1161 use serde::de;
1162 struct WorkerThreadsVisitor;
1163 impl<'de> de::Visitor<'de> for WorkerThreadsVisitor {
1164 type Value = WorkerThreadsConfig;
1165
1166 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1167 formatter.write_str(r#""auto" or a positive integer"#)
1168 }
1169
1170 fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
1171 if value.eq_ignore_ascii_case("auto") {
1172 Ok(WorkerThreadsConfig::Auto)
1173 } else if let Ok(n) = value.parse::<usize>() {
1174 Ok(WorkerThreadsConfig::Fixed(n))
1175 } else {
1176 Err(E::custom(format!(
1177 "expected 'auto' or a number, got '{value}'"
1178 )))
1179 }
1180 }
1181
1182 fn visit_u64<E: de::Error>(self, value: u64) -> Result<Self::Value, E> {
1183 Ok(WorkerThreadsConfig::Fixed(value as usize))
1184 }
1185
1186 fn visit_i64<E: de::Error>(self, value: i64) -> Result<Self::Value, E> {
1187 if value >= 0 {
1188 Ok(WorkerThreadsConfig::Fixed(value as usize))
1189 } else {
1190 Err(E::custom("worker_threads must be non-negative"))
1191 }
1192 }
1193 }
1194 deserializer.deserialize_any(WorkerThreadsVisitor)
1195 }
1196}
1197
1198impl Default for CleanupConfig {
1199 fn default() -> Self {
1200 Self {
1201 queue_buffer_size: 1024,
1202 batch_size: 64,
1203 batch_timeout_ms: 100,
1204 worker_threads: WorkerThreadsConfig::Auto,
1205 max_retry_attempts: 3,
1206 async_enabled: true,
1207 fallback_to_sync: true,
1208 }
1209 }
1210}
1211
1212impl CleanupConfig {
1213 pub fn validate(&self) -> Result<(), String> {
1214 if self.queue_buffer_size == 0 {
1215 return Err("queue_buffer_size must be greater than 0".to_string());
1216 }
1217 if self.batch_size == 0 {
1218 return Err("batch_size must be greater than 0".to_string());
1219 }
1220 if self.batch_timeout_ms == 0 {
1221 return Err("batch_timeout_ms must be greater than 0".to_string());
1222 }
1223 if let WorkerThreadsConfig::Fixed(n) = self.worker_threads
1224 && n == 0
1225 {
1226 return Err("worker_threads must be greater than 0 when using fixed count".to_string());
1227 }
1228 Ok(())
1229 }
1230}
1231
1232#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1233#[serde(default)]
1234pub struct TagFilteringConfig {
1235 #[serde(default)]
1236 pub enabled: bool,
1237 #[serde(default = "default_true")]
1238 pub enable_tags: bool,
1239}
1240
1241fn default_true() -> bool {
1242 true
1243}
1244
1245impl Default for ServerOptions {
1248 fn default() -> Self {
1249 Self {
1250 adapter: AdapterConfig::default(),
1251 app_manager: AppManagerConfig::default(),
1252 cache: CacheConfig::default(),
1253 channel_limits: ChannelLimits::default(),
1254 cors: CorsConfig::default(),
1255 database: DatabaseConfig::default(),
1256 database_pooling: DatabasePooling::default(),
1257 debug: false,
1258 tag_filtering: TagFilteringConfig::default(),
1259 event_limits: EventLimits::default(),
1260 host: "0.0.0.0".to_string(),
1261 http_api: HttpApiConfig::default(),
1262 instance: InstanceConfig::default(),
1263 logging: None,
1264 metrics: MetricsConfig::default(),
1265 mode: "production".to_string(),
1266 port: 6001,
1267 path_prefix: "/".to_string(),
1268 presence: PresenceConfig::default(),
1269 queue: QueueConfig::default(),
1270 rate_limiter: RateLimiterConfig::default(),
1271 shutdown_grace_period: 10,
1272 ssl: SslConfig::default(),
1273 user_authentication_timeout: 3600,
1274 webhooks: WebhooksConfig::default(),
1275 websocket_max_payload_kb: 64,
1276 cleanup: CleanupConfig::default(),
1277 activity_timeout: 120,
1278 cluster_health: ClusterHealthConfig::default(),
1279 unix_socket: UnixSocketConfig::default(),
1280 delta_compression: DeltaCompressionOptionsConfig::default(),
1281 websocket: WebSocketConfig::default(),
1282 }
1283 }
1284}
1285
1286impl Default for SqsQueueConfig {
1287 fn default() -> Self {
1288 Self {
1289 region: "us-east-1".to_string(),
1290 queue_url_prefix: None,
1291 visibility_timeout: 30,
1292 endpoint_url: None,
1293 max_messages: 10,
1294 wait_time_seconds: 5,
1295 concurrency: 5,
1296 fifo: false,
1297 message_group_id: Some("default".to_string()),
1298 }
1299 }
1300}
1301
1302impl Default for SnsQueueConfig {
1303 fn default() -> Self {
1304 Self {
1305 region: "us-east-1".to_string(),
1306 topic_arn: String::new(),
1307 endpoint_url: None,
1308 }
1309 }
1310}
1311
1312impl Default for RedisAdapterConfig {
1313 fn default() -> Self {
1314 Self {
1315 requests_timeout: 5000,
1316 prefix: "sockudo_adapter:".to_string(),
1317 redis_pub_options: AHashMap::new(),
1318 redis_sub_options: AHashMap::new(),
1319 cluster_mode: false,
1320 }
1321 }
1322}
1323
1324impl Default for RedisClusterAdapterConfig {
1325 fn default() -> Self {
1326 Self {
1327 nodes: vec![],
1328 prefix: "sockudo_adapter:".to_string(),
1329 request_timeout_ms: 1000,
1330 use_connection_manager: true,
1331 use_sharded_pubsub: false,
1332 }
1333 }
1334}
1335
1336impl Default for NatsAdapterConfig {
1337 fn default() -> Self {
1338 Self {
1339 servers: vec!["nats://localhost:4222".to_string()],
1340 prefix: "sockudo_adapter:".to_string(),
1341 request_timeout_ms: 5000,
1342 username: None,
1343 password: None,
1344 token: None,
1345 connection_timeout_ms: 5000,
1346 nodes_number: None,
1347 }
1348 }
1349}
1350
1351impl Default for CacheSettings {
1352 fn default() -> Self {
1353 Self {
1354 enabled: true,
1355 ttl: 300,
1356 }
1357 }
1358}
1359
1360impl Default for MemoryCacheOptions {
1361 fn default() -> Self {
1362 Self {
1363 ttl: 300,
1364 cleanup_interval: 60,
1365 max_capacity: 10000,
1366 }
1367 }
1368}
1369
1370impl Default for CacheConfig {
1371 fn default() -> Self {
1372 Self {
1373 driver: CacheDriver::default(),
1374 redis: RedisConfig {
1375 prefix: Some("sockudo_cache:".to_string()),
1376 url_override: None,
1377 cluster_mode: false,
1378 },
1379 memory: MemoryCacheOptions::default(),
1380 }
1381 }
1382}
1383
1384impl Default for ChannelLimits {
1385 fn default() -> Self {
1386 Self {
1387 max_name_length: 200,
1388 cache_ttl: 3600,
1389 }
1390 }
1391}
1392impl Default for CorsConfig {
1393 fn default() -> Self {
1394 Self {
1395 credentials: true,
1396 origin: vec!["*".to_string()],
1397 methods: vec!["GET".to_string(), "POST".to_string(), "OPTIONS".to_string()],
1398 allowed_headers: vec![
1399 "Authorization".to_string(),
1400 "Content-Type".to_string(),
1401 "X-Requested-With".to_string(),
1402 "Accept".to_string(),
1403 ],
1404 }
1405 }
1406}
1407
1408impl Default for DatabaseConnection {
1409 fn default() -> Self {
1410 Self {
1411 host: "localhost".to_string(),
1412 port: 3306,
1413 username: "root".to_string(),
1414 password: "".to_string(),
1415 database: "sockudo".to_string(),
1416 table_name: "applications".to_string(),
1417 connection_pool_size: 10,
1418 pool_min: None,
1419 pool_max: None,
1420 cache_ttl: 300,
1421 cache_cleanup_interval: 60,
1422 cache_max_capacity: 100,
1423 }
1424 }
1425}
1426
1427impl Default for RedisConnection {
1428 fn default() -> Self {
1429 Self {
1430 host: "127.0.0.1".to_string(),
1431 port: 6379,
1432 db: 0,
1433 username: None,
1434 password: None,
1435 key_prefix: "sockudo:".to_string(),
1436 sentinels: Vec::new(),
1437 sentinel_password: None,
1438 name: "mymaster".to_string(),
1439 cluster: RedisClusterConnection::default(),
1440 cluster_nodes: Vec::new(),
1441 }
1442 }
1443}
1444
1445impl Default for RedisSentinel {
1446 fn default() -> Self {
1447 Self {
1448 host: "localhost".to_string(),
1449 port: 26379,
1450 }
1451 }
1452}
1453
1454impl Default for ClusterNode {
1455 fn default() -> Self {
1456 Self {
1457 host: "127.0.0.1".to_string(),
1458 port: 7000,
1459 }
1460 }
1461}
1462
1463impl Default for DatabasePooling {
1464 fn default() -> Self {
1465 Self {
1466 enabled: true,
1467 min: 2,
1468 max: 10,
1469 }
1470 }
1471}
1472
1473impl Default for EventLimits {
1474 fn default() -> Self {
1475 Self {
1476 max_channels_at_once: 100,
1477 max_name_length: 200,
1478 max_payload_in_kb: 100,
1479 max_batch_size: 10,
1480 }
1481 }
1482}
1483
1484impl Default for HttpApiConfig {
1485 fn default() -> Self {
1486 Self {
1487 request_limit_in_mb: 10,
1488 accept_traffic: AcceptTraffic::default(),
1489 usage_enabled: true,
1490 }
1491 }
1492}
1493
1494impl Default for AcceptTraffic {
1495 fn default() -> Self {
1496 Self {
1497 memory_threshold: 0.90,
1498 }
1499 }
1500}
1501
1502impl Default for InstanceConfig {
1503 fn default() -> Self {
1504 Self {
1505 process_id: uuid::Uuid::new_v4().to_string(),
1506 }
1507 }
1508}
1509
1510impl Default for LoggingConfig {
1511 fn default() -> Self {
1512 Self {
1513 colors_enabled: true,
1514 include_target: true,
1515 }
1516 }
1517}
1518
1519impl Default for MetricsConfig {
1520 fn default() -> Self {
1521 Self {
1522 enabled: true,
1523 driver: MetricsDriver::default(),
1524 host: "0.0.0.0".to_string(),
1525 prometheus: PrometheusConfig::default(),
1526 port: 9601,
1527 }
1528 }
1529}
1530
1531impl Default for PrometheusConfig {
1532 fn default() -> Self {
1533 Self {
1534 prefix: "sockudo_".to_string(),
1535 }
1536 }
1537}
1538
1539impl Default for PresenceConfig {
1540 fn default() -> Self {
1541 Self {
1542 max_members_per_channel: 100,
1543 max_member_size_in_kb: 2,
1544 }
1545 }
1546}
1547
1548impl Default for RedisQueueConfig {
1549 fn default() -> Self {
1550 Self {
1551 concurrency: 5,
1552 prefix: Some("sockudo_queue:".to_string()),
1553 url_override: None,
1554 cluster_mode: false,
1555 }
1556 }
1557}
1558
1559impl Default for RateLimit {
1560 fn default() -> Self {
1561 Self {
1562 max_requests: 60,
1563 window_seconds: 60,
1564 identifier: Some("default".to_string()),
1565 trust_hops: Some(0),
1566 }
1567 }
1568}
1569
1570impl Default for RateLimiterConfig {
1571 fn default() -> Self {
1572 Self {
1573 enabled: true,
1574 driver: CacheDriver::Memory,
1575 api_rate_limit: RateLimit {
1576 max_requests: 100,
1577 window_seconds: 60,
1578 identifier: Some("api".to_string()),
1579 trust_hops: Some(0),
1580 },
1581 websocket_rate_limit: RateLimit {
1582 max_requests: 20,
1583 window_seconds: 60,
1584 identifier: Some("websocket_connect".to_string()),
1585 trust_hops: Some(0),
1586 },
1587 redis: RedisConfig {
1588 prefix: Some("sockudo_rl:".to_string()),
1589 url_override: None,
1590 cluster_mode: false,
1591 },
1592 }
1593 }
1594}
1595
1596impl Default for SslConfig {
1597 fn default() -> Self {
1598 Self {
1599 enabled: false,
1600 cert_path: "".to_string(),
1601 key_path: "".to_string(),
1602 passphrase: None,
1603 ca_path: None,
1604 redirect_http: false,
1605 http_port: Some(80),
1606 }
1607 }
1608}
1609
1610impl Default for BatchingConfig {
1611 fn default() -> Self {
1612 Self {
1613 enabled: true,
1614 duration: 50,
1615 size: 100,
1616 }
1617 }
1618}
1619
1620impl Default for ClusterHealthConfig {
1621 fn default() -> Self {
1622 Self {
1623 enabled: true,
1624 heartbeat_interval_ms: 10000,
1625 node_timeout_ms: 30000,
1626 cleanup_interval_ms: 10000,
1627 }
1628 }
1629}
1630
1631impl Default for UnixSocketConfig {
1632 fn default() -> Self {
1633 Self {
1634 enabled: false,
1635 path: "/var/run/sockudo/sockudo.sock".to_string(),
1636 permission_mode: 0o660,
1637 }
1638 }
1639}
1640
1641impl Default for DeltaCompressionOptionsConfig {
1642 fn default() -> Self {
1643 Self {
1644 enabled: true,
1645 algorithm: "fossil".to_string(),
1646 full_message_interval: 10,
1647 min_message_size: 100,
1648 max_state_age_secs: 300,
1649 max_channel_states_per_socket: 100,
1650 max_conflation_states_per_channel: Some(100),
1651 conflation_key_path: None,
1652 cluster_coordination: false,
1653 omit_delta_algorithm: false,
1654 }
1655 }
1656}
1657
1658impl ClusterHealthConfig {
1659 pub fn validate(&self) -> Result<(), String> {
1660 if self.heartbeat_interval_ms == 0 {
1661 return Err("heartbeat_interval_ms must be greater than 0".to_string());
1662 }
1663 if self.node_timeout_ms == 0 {
1664 return Err("node_timeout_ms must be greater than 0".to_string());
1665 }
1666 if self.cleanup_interval_ms == 0 {
1667 return Err("cleanup_interval_ms must be greater than 0".to_string());
1668 }
1669
1670 if self.heartbeat_interval_ms > self.node_timeout_ms / 3 {
1671 return Err(format!(
1672 "heartbeat_interval_ms ({}) should be at least 3x smaller than node_timeout_ms ({}) to avoid false positive dead node detection. Recommended: heartbeat_interval_ms <= {}",
1673 self.heartbeat_interval_ms,
1674 self.node_timeout_ms,
1675 self.node_timeout_ms / 3
1676 ));
1677 }
1678
1679 if self.cleanup_interval_ms > self.node_timeout_ms {
1680 return Err(format!(
1681 "cleanup_interval_ms ({}) should not be larger than node_timeout_ms ({}) to ensure timely dead node detection",
1682 self.cleanup_interval_ms, self.node_timeout_ms
1683 ));
1684 }
1685
1686 Ok(())
1687 }
1688}
1689
1690impl ServerOptions {
1691 pub async fn load_from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
1692 let content = tokio::fs::read_to_string(path).await?;
1693 let options: Self = sonic_rs::from_str(&content)?;
1694 Ok(options)
1695 }
1696
1697 pub async fn override_from_env(&mut self) -> Result<(), Box<dyn std::error::Error>> {
1698 if let Ok(mode) = std::env::var("ENVIRONMENT") {
1700 self.mode = mode;
1701 }
1702 self.debug = parse_bool_env("DEBUG_MODE", self.debug);
1703 if parse_bool_env("DEBUG", false) {
1704 self.debug = true;
1705 info!("DEBUG environment variable forces debug mode ON");
1706 }
1707
1708 self.activity_timeout = parse_env::<u64>("ACTIVITY_TIMEOUT", self.activity_timeout);
1709
1710 if let Ok(host) = std::env::var("HOST") {
1711 self.host = host;
1712 }
1713 self.port = parse_env::<u16>("PORT", self.port);
1714 self.shutdown_grace_period =
1715 parse_env::<u64>("SHUTDOWN_GRACE_PERIOD", self.shutdown_grace_period);
1716 self.user_authentication_timeout = parse_env::<u64>(
1717 "USER_AUTHENTICATION_TIMEOUT",
1718 self.user_authentication_timeout,
1719 );
1720 self.websocket_max_payload_kb =
1721 parse_env::<u32>("WEBSOCKET_MAX_PAYLOAD_KB", self.websocket_max_payload_kb);
1722 if let Ok(id) = std::env::var("INSTANCE_PROCESS_ID") {
1723 self.instance.process_id = id;
1724 }
1725
1726 if let Ok(driver_str) = std::env::var("ADAPTER_DRIVER") {
1728 self.adapter.driver =
1729 parse_driver_enum(driver_str, self.adapter.driver.clone(), "Adapter");
1730 }
1731 self.adapter.buffer_multiplier_per_cpu = parse_env::<usize>(
1732 "ADAPTER_BUFFER_MULTIPLIER_PER_CPU",
1733 self.adapter.buffer_multiplier_per_cpu,
1734 );
1735 self.adapter.enable_socket_counting = parse_env::<bool>(
1736 "ADAPTER_ENABLE_SOCKET_COUNTING",
1737 self.adapter.enable_socket_counting,
1738 );
1739 if let Ok(driver_str) = std::env::var("CACHE_DRIVER") {
1740 self.cache.driver = parse_driver_enum(driver_str, self.cache.driver.clone(), "Cache");
1741 }
1742 if let Ok(driver_str) = std::env::var("QUEUE_DRIVER") {
1743 self.queue.driver = parse_driver_enum(driver_str, self.queue.driver.clone(), "Queue");
1744 }
1745 if let Ok(driver_str) = std::env::var("APP_MANAGER_DRIVER") {
1746 self.app_manager.driver =
1747 parse_driver_enum(driver_str, self.app_manager.driver.clone(), "AppManager");
1748 }
1749 if let Ok(driver_str) = std::env::var("RATE_LIMITER_DRIVER") {
1750 self.rate_limiter.driver = parse_driver_enum(
1751 driver_str,
1752 self.rate_limiter.driver.clone(),
1753 "RateLimiter Backend",
1754 );
1755 }
1756
1757 if let Ok(host) = std::env::var("DATABASE_REDIS_HOST") {
1759 self.database.redis.host = host;
1760 }
1761 self.database.redis.port =
1762 parse_env::<u16>("DATABASE_REDIS_PORT", self.database.redis.port);
1763 if let Ok(username) = std::env::var("DATABASE_REDIS_USERNAME") {
1764 self.database.redis.username = if username.is_empty() {
1765 None
1766 } else {
1767 Some(username)
1768 };
1769 }
1770 if let Ok(password) = std::env::var("DATABASE_REDIS_PASSWORD") {
1771 self.database.redis.password = Some(password);
1772 }
1773 self.database.redis.db = parse_env::<u32>("DATABASE_REDIS_DB", self.database.redis.db);
1774 if let Ok(prefix) = std::env::var("DATABASE_REDIS_KEY_PREFIX") {
1775 self.database.redis.key_prefix = prefix;
1776 }
1777 if let Ok(cluster_username) = std::env::var("DATABASE_REDIS_CLUSTER_USERNAME") {
1778 self.database.redis.cluster.username = if cluster_username.is_empty() {
1779 None
1780 } else {
1781 Some(cluster_username)
1782 };
1783 }
1784 if let Ok(cluster_password) = std::env::var("DATABASE_REDIS_CLUSTER_PASSWORD") {
1785 self.database.redis.cluster.password = Some(cluster_password);
1786 }
1787 self.database.redis.cluster.use_tls = parse_bool_env(
1788 "DATABASE_REDIS_CLUSTER_USE_TLS",
1789 self.database.redis.cluster.use_tls,
1790 );
1791
1792 if let Ok(host) = std::env::var("DATABASE_MYSQL_HOST") {
1794 self.database.mysql.host = host;
1795 }
1796 self.database.mysql.port =
1797 parse_env::<u16>("DATABASE_MYSQL_PORT", self.database.mysql.port);
1798 if let Ok(user) = std::env::var("DATABASE_MYSQL_USERNAME") {
1799 self.database.mysql.username = user;
1800 }
1801 if let Ok(pass) = std::env::var("DATABASE_MYSQL_PASSWORD") {
1802 self.database.mysql.password = pass;
1803 }
1804 if let Ok(db) = std::env::var("DATABASE_MYSQL_DATABASE") {
1805 self.database.mysql.database = db;
1806 }
1807 if let Ok(table) = std::env::var("DATABASE_MYSQL_TABLE_NAME") {
1808 self.database.mysql.table_name = table;
1809 }
1810 override_db_pool_settings(&mut self.database.mysql, "DATABASE_MYSQL");
1811
1812 if let Ok(host) = std::env::var("DATABASE_POSTGRES_HOST") {
1814 self.database.postgres.host = host;
1815 }
1816 self.database.postgres.port =
1817 parse_env::<u16>("DATABASE_POSTGRES_PORT", self.database.postgres.port);
1818 if let Ok(user) = std::env::var("DATABASE_POSTGRES_USERNAME") {
1819 self.database.postgres.username = user;
1820 }
1821 if let Ok(pass) = std::env::var("DATABASE_POSTGRES_PASSWORD") {
1822 self.database.postgres.password = pass;
1823 }
1824 if let Ok(db) = std::env::var("DATABASE_POSTGRES_DATABASE") {
1825 self.database.postgres.database = db;
1826 }
1827 override_db_pool_settings(&mut self.database.postgres, "DATABASE_POSTGRES");
1828
1829 if let Ok(region) = std::env::var("DATABASE_DYNAMODB_REGION") {
1831 self.database.dynamodb.region = region;
1832 }
1833 if let Ok(table) = std::env::var("DATABASE_DYNAMODB_TABLE_NAME") {
1834 self.database.dynamodb.table_name = table;
1835 }
1836 if let Ok(endpoint) = std::env::var("DATABASE_DYNAMODB_ENDPOINT_URL") {
1837 self.database.dynamodb.endpoint_url = Some(endpoint);
1838 }
1839 if let Ok(key_id) = std::env::var("AWS_ACCESS_KEY_ID") {
1840 self.database.dynamodb.aws_access_key_id = Some(key_id);
1841 }
1842 if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") {
1843 self.database.dynamodb.aws_secret_access_key = Some(secret);
1844 }
1845
1846 let apply_redis_cluster_nodes = |options: &mut Self, nodes: &str| {
1848 let node_list: Vec<String> = nodes
1849 .split(',')
1850 .map(|s| s.trim())
1851 .filter(|s| !s.is_empty())
1852 .map(ToString::to_string)
1853 .collect();
1854
1855 options.adapter.cluster.nodes = node_list.clone();
1856 options.queue.redis_cluster.nodes = node_list.clone();
1857
1858 let parsed_nodes: Vec<ClusterNode> = node_list
1859 .iter()
1860 .filter_map(|seed| ClusterNode::from_seed(seed))
1861 .collect();
1862 options.database.redis.cluster.nodes = parsed_nodes.clone();
1863 options.database.redis.cluster_nodes = parsed_nodes;
1864 };
1865
1866 if let Ok(nodes) = std::env::var("REDIS_CLUSTER_NODES") {
1867 apply_redis_cluster_nodes(self, &nodes);
1868 }
1869 if let Ok(nodes) = std::env::var("DATABASE_REDIS_CLUSTER_NODES") {
1870 apply_redis_cluster_nodes(self, &nodes);
1871 }
1872 self.queue.redis_cluster.concurrency = parse_env::<u32>(
1873 "REDIS_CLUSTER_QUEUE_CONCURRENCY",
1874 self.queue.redis_cluster.concurrency,
1875 );
1876 if let Ok(prefix) = std::env::var("REDIS_CLUSTER_QUEUE_PREFIX") {
1877 self.queue.redis_cluster.prefix = Some(prefix);
1878 }
1879
1880 self.ssl.enabled = parse_bool_env("SSL_ENABLED", self.ssl.enabled);
1882 if let Ok(val) = std::env::var("SSL_CERT_PATH") {
1883 self.ssl.cert_path = val;
1884 }
1885 if let Ok(val) = std::env::var("SSL_KEY_PATH") {
1886 self.ssl.key_path = val;
1887 }
1888 self.ssl.redirect_http = parse_bool_env("SSL_REDIRECT_HTTP", self.ssl.redirect_http);
1889 if let Some(port) = parse_env_optional::<u16>("SSL_HTTP_PORT") {
1890 self.ssl.http_port = Some(port);
1891 }
1892
1893 self.unix_socket.enabled = parse_bool_env("UNIX_SOCKET_ENABLED", self.unix_socket.enabled);
1895 if let Ok(path) = std::env::var("UNIX_SOCKET_PATH") {
1896 self.unix_socket.path = path;
1897 }
1898 if let Ok(mode_str) = std::env::var("UNIX_SOCKET_PERMISSION_MODE") {
1899 if mode_str.chars().all(|c| c.is_digit(8)) {
1900 if let Ok(mode) = u32::from_str_radix(&mode_str, 8) {
1901 if mode <= 0o777 {
1902 self.unix_socket.permission_mode = mode;
1903 } else {
1904 warn!(
1905 "UNIX_SOCKET_PERMISSION_MODE '{}' exceeds maximum value 777. Using default: {:o}",
1906 mode_str, self.unix_socket.permission_mode
1907 );
1908 }
1909 } else {
1910 warn!(
1911 "Failed to parse UNIX_SOCKET_PERMISSION_MODE '{}' as octal. Using default: {:o}",
1912 mode_str, self.unix_socket.permission_mode
1913 );
1914 }
1915 } else {
1916 warn!(
1917 "UNIX_SOCKET_PERMISSION_MODE '{}' must contain only octal digits (0-7). Using default: {:o}",
1918 mode_str, self.unix_socket.permission_mode
1919 );
1920 }
1921 }
1922
1923 if let Ok(driver_str) = std::env::var("METRICS_DRIVER") {
1925 self.metrics.driver =
1926 parse_driver_enum(driver_str, self.metrics.driver.clone(), "Metrics");
1927 }
1928 self.metrics.enabled = parse_bool_env("METRICS_ENABLED", self.metrics.enabled);
1929 if let Ok(val) = std::env::var("METRICS_HOST") {
1930 self.metrics.host = val;
1931 }
1932 self.metrics.port = parse_env::<u16>("METRICS_PORT", self.metrics.port);
1933 if let Ok(val) = std::env::var("METRICS_PROMETHEUS_PREFIX") {
1934 self.metrics.prometheus.prefix = val;
1935 }
1936
1937 self.http_api.usage_enabled =
1939 parse_bool_env("HTTP_API_USAGE_ENABLED", self.http_api.usage_enabled);
1940
1941 self.rate_limiter.enabled =
1943 parse_bool_env("RATE_LIMITER_ENABLED", self.rate_limiter.enabled);
1944 self.rate_limiter.api_rate_limit.max_requests = parse_env::<u32>(
1945 "RATE_LIMITER_API_MAX_REQUESTS",
1946 self.rate_limiter.api_rate_limit.max_requests,
1947 );
1948 self.rate_limiter.api_rate_limit.window_seconds = parse_env::<u64>(
1949 "RATE_LIMITER_API_WINDOW_SECONDS",
1950 self.rate_limiter.api_rate_limit.window_seconds,
1951 );
1952 if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_API_TRUST_HOPS") {
1953 self.rate_limiter.api_rate_limit.trust_hops = Some(hops);
1954 }
1955 self.rate_limiter.websocket_rate_limit.max_requests = parse_env::<u32>(
1956 "RATE_LIMITER_WS_MAX_REQUESTS",
1957 self.rate_limiter.websocket_rate_limit.max_requests,
1958 );
1959 self.rate_limiter.websocket_rate_limit.window_seconds = parse_env::<u64>(
1960 "RATE_LIMITER_WS_WINDOW_SECONDS",
1961 self.rate_limiter.websocket_rate_limit.window_seconds,
1962 );
1963 if let Ok(prefix) = std::env::var("RATE_LIMITER_REDIS_PREFIX") {
1964 self.rate_limiter.redis.prefix = Some(prefix);
1965 }
1966
1967 self.queue.redis.concurrency =
1969 parse_env::<u32>("QUEUE_REDIS_CONCURRENCY", self.queue.redis.concurrency);
1970 if let Ok(prefix) = std::env::var("QUEUE_REDIS_PREFIX") {
1971 self.queue.redis.prefix = Some(prefix);
1972 }
1973
1974 if let Ok(region) = std::env::var("QUEUE_SQS_REGION") {
1976 self.queue.sqs.region = region;
1977 }
1978 self.queue.sqs.visibility_timeout = parse_env::<i32>(
1979 "QUEUE_SQS_VISIBILITY_TIMEOUT",
1980 self.queue.sqs.visibility_timeout,
1981 );
1982 self.queue.sqs.max_messages =
1983 parse_env::<i32>("QUEUE_SQS_MAX_MESSAGES", self.queue.sqs.max_messages);
1984 self.queue.sqs.wait_time_seconds = parse_env::<i32>(
1985 "QUEUE_SQS_WAIT_TIME_SECONDS",
1986 self.queue.sqs.wait_time_seconds,
1987 );
1988 self.queue.sqs.concurrency =
1989 parse_env::<u32>("QUEUE_SQS_CONCURRENCY", self.queue.sqs.concurrency);
1990 self.queue.sqs.fifo = parse_bool_env("QUEUE_SQS_FIFO", self.queue.sqs.fifo);
1991 if let Ok(endpoint) = std::env::var("QUEUE_SQS_ENDPOINT_URL") {
1992 self.queue.sqs.endpoint_url = Some(endpoint);
1993 }
1994
1995 if let Ok(region) = std::env::var("QUEUE_SNS_REGION") {
1997 self.queue.sns.region = region;
1998 }
1999 if let Ok(topic_arn) = std::env::var("QUEUE_SNS_TOPIC_ARN") {
2000 self.queue.sns.topic_arn = topic_arn;
2001 }
2002 if let Ok(endpoint) = std::env::var("QUEUE_SNS_ENDPOINT_URL") {
2003 self.queue.sns.endpoint_url = Some(endpoint);
2004 }
2005
2006 self.webhooks.batching.enabled =
2008 parse_bool_env("WEBHOOK_BATCHING_ENABLED", self.webhooks.batching.enabled);
2009 self.webhooks.batching.duration =
2010 parse_env::<u64>("WEBHOOK_BATCHING_DURATION", self.webhooks.batching.duration);
2011 self.webhooks.batching.size =
2012 parse_env::<usize>("WEBHOOK_BATCHING_SIZE", self.webhooks.batching.size);
2013
2014 if let Ok(servers) = std::env::var("NATS_SERVERS") {
2016 self.adapter.nats.servers = servers.split(',').map(|s| s.trim().to_string()).collect();
2017 }
2018 if let Ok(user) = std::env::var("NATS_USERNAME") {
2019 self.adapter.nats.username = Some(user);
2020 }
2021 if let Ok(pass) = std::env::var("NATS_PASSWORD") {
2022 self.adapter.nats.password = Some(pass);
2023 }
2024 if let Ok(token) = std::env::var("NATS_TOKEN") {
2025 self.adapter.nats.token = Some(token);
2026 }
2027 self.adapter.nats.connection_timeout_ms = parse_env::<u64>(
2028 "NATS_CONNECTION_TIMEOUT_MS",
2029 self.adapter.nats.connection_timeout_ms,
2030 );
2031 self.adapter.nats.request_timeout_ms = parse_env::<u64>(
2032 "NATS_REQUEST_TIMEOUT_MS",
2033 self.adapter.nats.request_timeout_ms,
2034 );
2035
2036 if let Ok(origins) = std::env::var("CORS_ORIGINS") {
2038 self.cors.origin = origins.split(',').map(|s| s.trim().to_string()).collect();
2039 }
2040 if let Ok(methods) = std::env::var("CORS_METHODS") {
2041 self.cors.methods = methods.split(',').map(|s| s.trim().to_string()).collect();
2042 }
2043 if let Ok(headers) = std::env::var("CORS_HEADERS") {
2044 self.cors.allowed_headers = headers.split(',').map(|s| s.trim().to_string()).collect();
2045 }
2046 self.cors.credentials = parse_bool_env("CORS_CREDENTIALS", self.cors.credentials);
2047
2048 self.database_pooling.enabled =
2050 parse_bool_env("DATABASE_POOLING_ENABLED", self.database_pooling.enabled);
2051 if let Some(min) = parse_env_optional::<u32>("DATABASE_POOL_MIN") {
2052 self.database_pooling.min = min;
2053 }
2054 if let Some(max) = parse_env_optional::<u32>("DATABASE_POOL_MAX") {
2055 self.database_pooling.max = max;
2056 }
2057
2058 if let Some(pool_size) = parse_env_optional::<u32>("DATABASE_CONNECTION_POOL_SIZE") {
2059 self.database.mysql.connection_pool_size = pool_size;
2060 self.database.postgres.connection_pool_size = pool_size;
2061 }
2062 if let Some(cache_ttl) = parse_env_optional::<u64>("CACHE_TTL_SECONDS") {
2063 self.app_manager.cache.ttl = cache_ttl;
2064 self.channel_limits.cache_ttl = cache_ttl;
2065 self.database.mysql.cache_ttl = cache_ttl;
2066 self.database.postgres.cache_ttl = cache_ttl;
2067 self.cache.memory.ttl = cache_ttl;
2068 }
2069 if let Some(cleanup_interval) = parse_env_optional::<u64>("CACHE_CLEANUP_INTERVAL") {
2070 self.database.mysql.cache_cleanup_interval = cleanup_interval;
2071 self.database.postgres.cache_cleanup_interval = cleanup_interval;
2072 self.cache.memory.cleanup_interval = cleanup_interval;
2073 }
2074 if let Some(max_capacity) = parse_env_optional::<u64>("CACHE_MAX_CAPACITY") {
2075 self.database.mysql.cache_max_capacity = max_capacity;
2076 self.database.postgres.cache_max_capacity = max_capacity;
2077 self.cache.memory.max_capacity = max_capacity;
2078 }
2079
2080 let default_app_id = std::env::var("SOCKUDO_DEFAULT_APP_ID");
2081 let default_app_key = std::env::var("SOCKUDO_DEFAULT_APP_KEY");
2082 let default_app_secret = std::env::var("SOCKUDO_DEFAULT_APP_SECRET");
2083 let default_app_enabled = parse_bool_env("SOCKUDO_DEFAULT_APP_ENABLED", true);
2084
2085 if let (Ok(app_id), Ok(app_key), Ok(app_secret)) =
2086 (default_app_id, default_app_key, default_app_secret)
2087 && default_app_enabled
2088 {
2089 let default_app = App {
2090 id: app_id,
2091 key: app_key,
2092 secret: app_secret,
2093 enable_client_messages: parse_bool_env("SOCKUDO_ENABLE_CLIENT_MESSAGES", false),
2094 enabled: default_app_enabled,
2095 max_connections: parse_env::<u32>("SOCKUDO_DEFAULT_APP_MAX_CONNECTIONS", 100),
2096 max_client_events_per_second: parse_env::<u32>(
2097 "SOCKUDO_DEFAULT_APP_MAX_CLIENT_EVENTS_PER_SECOND",
2098 100,
2099 ),
2100 max_read_requests_per_second: Some(parse_env::<u32>(
2101 "SOCKUDO_DEFAULT_APP_MAX_READ_REQUESTS_PER_SECOND",
2102 100,
2103 )),
2104 max_presence_members_per_channel: Some(parse_env::<u32>(
2105 "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBERS_PER_CHANNEL",
2106 100,
2107 )),
2108 max_presence_member_size_in_kb: Some(parse_env::<u32>(
2109 "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBER_SIZE_IN_KB",
2110 100,
2111 )),
2112 max_channel_name_length: Some(parse_env::<u32>(
2113 "SOCKUDO_DEFAULT_APP_MAX_CHANNEL_NAME_LENGTH",
2114 100,
2115 )),
2116 max_event_channels_at_once: Some(parse_env::<u32>(
2117 "SOCKUDO_DEFAULT_APP_MAX_EVENT_CHANNELS_AT_ONCE",
2118 100,
2119 )),
2120 max_event_name_length: Some(parse_env::<u32>(
2121 "SOCKUDO_DEFAULT_APP_MAX_EVENT_NAME_LENGTH",
2122 100,
2123 )),
2124 max_event_payload_in_kb: Some(parse_env::<u32>(
2125 "SOCKUDO_DEFAULT_APP_MAX_EVENT_PAYLOAD_IN_KB",
2126 100,
2127 )),
2128 max_event_batch_size: Some(parse_env::<u32>(
2129 "SOCKUDO_DEFAULT_APP_MAX_EVENT_BATCH_SIZE",
2130 100,
2131 )),
2132 enable_user_authentication: Some(parse_bool_env(
2133 "SOCKUDO_DEFAULT_APP_ENABLE_USER_AUTHENTICATION",
2134 false,
2135 )),
2136 webhooks: None,
2137 max_backend_events_per_second: Some(parse_env::<u32>(
2138 "SOCKUDO_DEFAULT_APP_MAX_BACKEND_EVENTS_PER_SECOND",
2139 100,
2140 )),
2141 enable_watchlist_events: Some(parse_bool_env(
2142 "SOCKUDO_DEFAULT_APP_ENABLE_WATCHLIST_EVENTS",
2143 false,
2144 )),
2145 allowed_origins: {
2146 if let Ok(origins_str) = std::env::var("SOCKUDO_DEFAULT_APP_ALLOWED_ORIGINS") {
2147 if !origins_str.is_empty() {
2148 Some(
2149 origins_str
2150 .split(',')
2151 .map(|s| s.trim().to_string())
2152 .collect(),
2153 )
2154 } else {
2155 None
2156 }
2157 } else {
2158 None
2159 }
2160 },
2161 channel_delta_compression: None,
2162 };
2163
2164 self.app_manager.array.apps.push(default_app);
2165 info!("Successfully registered default app from env");
2166 }
2167
2168 if let Ok(redis_url_env) = std::env::var("REDIS_URL") {
2170 info!("Applying REDIS_URL environment variable override");
2171
2172 let redis_url_json = sonic_rs::json!(redis_url_env);
2173
2174 self.adapter
2175 .redis
2176 .redis_pub_options
2177 .insert("url".to_string(), redis_url_json.clone());
2178 self.adapter
2179 .redis
2180 .redis_sub_options
2181 .insert("url".to_string(), redis_url_json);
2182
2183 self.cache.redis.url_override = Some(redis_url_env.clone());
2184 self.queue.redis.url_override = Some(redis_url_env.clone());
2185 self.rate_limiter.redis.url_override = Some(redis_url_env);
2186 }
2187
2188 let has_colors_env = std::env::var("LOG_COLORS_ENABLED").is_ok();
2190 let has_target_env = std::env::var("LOG_INCLUDE_TARGET").is_ok();
2191 if has_colors_env || has_target_env {
2192 let logging_config = self.logging.get_or_insert_with(Default::default);
2193 if has_colors_env {
2194 logging_config.colors_enabled =
2195 parse_bool_env("LOG_COLORS_ENABLED", logging_config.colors_enabled);
2196 }
2197 if has_target_env {
2198 logging_config.include_target =
2199 parse_bool_env("LOG_INCLUDE_TARGET", logging_config.include_target);
2200 }
2201 }
2202
2203 self.cleanup.async_enabled =
2205 parse_bool_env("CLEANUP_ASYNC_ENABLED", self.cleanup.async_enabled);
2206 self.cleanup.fallback_to_sync =
2207 parse_bool_env("CLEANUP_FALLBACK_TO_SYNC", self.cleanup.fallback_to_sync);
2208 self.cleanup.queue_buffer_size =
2209 parse_env::<usize>("CLEANUP_QUEUE_BUFFER_SIZE", self.cleanup.queue_buffer_size);
2210 self.cleanup.batch_size = parse_env::<usize>("CLEANUP_BATCH_SIZE", self.cleanup.batch_size);
2211 self.cleanup.batch_timeout_ms =
2212 parse_env::<u64>("CLEANUP_BATCH_TIMEOUT_MS", self.cleanup.batch_timeout_ms);
2213 if let Ok(worker_threads_str) = std::env::var("CLEANUP_WORKER_THREADS") {
2214 self.cleanup.worker_threads = if worker_threads_str.to_lowercase() == "auto" {
2215 WorkerThreadsConfig::Auto
2216 } else if let Ok(n) = worker_threads_str.parse::<usize>() {
2217 WorkerThreadsConfig::Fixed(n)
2218 } else {
2219 warn!(
2220 "Invalid CLEANUP_WORKER_THREADS value '{}', keeping current setting",
2221 worker_threads_str
2222 );
2223 self.cleanup.worker_threads.clone()
2224 };
2225 }
2226 self.cleanup.max_retry_attempts = parse_env::<u32>(
2227 "CLEANUP_MAX_RETRY_ATTEMPTS",
2228 self.cleanup.max_retry_attempts,
2229 );
2230
2231 self.cluster_health.enabled =
2233 parse_bool_env("CLUSTER_HEALTH_ENABLED", self.cluster_health.enabled);
2234 self.cluster_health.heartbeat_interval_ms = parse_env::<u64>(
2235 "CLUSTER_HEALTH_HEARTBEAT_INTERVAL",
2236 self.cluster_health.heartbeat_interval_ms,
2237 );
2238 self.cluster_health.node_timeout_ms = parse_env::<u64>(
2239 "CLUSTER_HEALTH_NODE_TIMEOUT",
2240 self.cluster_health.node_timeout_ms,
2241 );
2242 self.cluster_health.cleanup_interval_ms = parse_env::<u64>(
2243 "CLUSTER_HEALTH_CLEANUP_INTERVAL",
2244 self.cluster_health.cleanup_interval_ms,
2245 );
2246
2247 self.tag_filtering.enabled =
2249 parse_bool_env("TAG_FILTERING_ENABLED", self.tag_filtering.enabled);
2250
2251 if let Ok(val) = std::env::var("WEBSOCKET_MAX_MESSAGES") {
2253 if val.to_lowercase() == "none" || val == "0" {
2254 self.websocket.max_messages = None;
2255 } else if let Ok(n) = val.parse::<usize>() {
2256 self.websocket.max_messages = Some(n);
2257 }
2258 }
2259 if let Ok(val) = std::env::var("WEBSOCKET_MAX_BYTES") {
2260 if val.to_lowercase() == "none" || val == "0" {
2261 self.websocket.max_bytes = None;
2262 } else if let Ok(n) = val.parse::<usize>() {
2263 self.websocket.max_bytes = Some(n);
2264 }
2265 }
2266 self.websocket.disconnect_on_buffer_full = parse_bool_env(
2267 "WEBSOCKET_DISCONNECT_ON_BUFFER_FULL",
2268 self.websocket.disconnect_on_buffer_full,
2269 );
2270 self.websocket.max_message_size = parse_env::<usize>(
2271 "WEBSOCKET_MAX_MESSAGE_SIZE",
2272 self.websocket.max_message_size,
2273 );
2274 self.websocket.max_frame_size =
2275 parse_env::<usize>("WEBSOCKET_MAX_FRAME_SIZE", self.websocket.max_frame_size);
2276 self.websocket.write_buffer_size = parse_env::<usize>(
2277 "WEBSOCKET_WRITE_BUFFER_SIZE",
2278 self.websocket.write_buffer_size,
2279 );
2280 self.websocket.max_backpressure = parse_env::<usize>(
2281 "WEBSOCKET_MAX_BACKPRESSURE",
2282 self.websocket.max_backpressure,
2283 );
2284 self.websocket.auto_ping = parse_bool_env("WEBSOCKET_AUTO_PING", self.websocket.auto_ping);
2285 self.websocket.ping_interval =
2286 parse_env::<u32>("WEBSOCKET_PING_INTERVAL", self.websocket.ping_interval);
2287 self.websocket.idle_timeout =
2288 parse_env::<u32>("WEBSOCKET_IDLE_TIMEOUT", self.websocket.idle_timeout);
2289 if let Ok(mode) = std::env::var("WEBSOCKET_COMPRESSION") {
2290 self.websocket.compression = mode;
2291 }
2292
2293 Ok(())
2294 }
2295
2296 pub fn validate(&self) -> Result<(), String> {
2297 if self.unix_socket.enabled {
2298 if self.unix_socket.path.is_empty() {
2299 return Err(
2300 "Unix socket path cannot be empty when Unix socket is enabled".to_string(),
2301 );
2302 }
2303
2304 self.validate_unix_socket_security()?;
2305
2306 if self.ssl.enabled {
2307 tracing::warn!(
2308 "Both Unix socket and SSL are enabled. This is unusual as Unix sockets are typically used behind reverse proxies that handle SSL termination."
2309 );
2310 }
2311
2312 if self.unix_socket.permission_mode > 0o777 {
2313 return Err(format!(
2314 "Unix socket permission_mode ({:o}) is invalid. Must be a valid octal mode (0o000 to 0o777)",
2315 self.unix_socket.permission_mode
2316 ));
2317 }
2318 }
2319
2320 if let Err(e) = self.cleanup.validate() {
2321 return Err(format!("Invalid cleanup configuration: {}", e));
2322 }
2323
2324 Ok(())
2325 }
2326
2327 fn validate_unix_socket_security(&self) -> Result<(), String> {
2328 let path = &self.unix_socket.path;
2329
2330 if path.contains("../") || path.contains("..\\") {
2331 return Err(
2332 "Unix socket path contains directory traversal sequences (../). This is not allowed for security reasons.".to_string()
2333 );
2334 }
2335
2336 if self.unix_socket.permission_mode & 0o002 != 0 {
2337 warn!(
2338 "Unix socket permission mode ({:o}) allows world write access. This may be a security risk. Consider using more restrictive permissions like 0o660 or 0o750.",
2339 self.unix_socket.permission_mode
2340 );
2341 }
2342
2343 if self.unix_socket.permission_mode & 0o007 > 0o005 {
2344 warn!(
2345 "Unix socket permission mode ({:o}) grants write permissions to others. Consider using more restrictive permissions.",
2346 self.unix_socket.permission_mode
2347 );
2348 }
2349
2350 if self.mode == "production" && path.starts_with("/tmp/") {
2351 warn!(
2352 "Unix socket path '{}' is in /tmp directory. In production, consider using a more permanent location like /var/run/sockudo/ for better security and persistence.",
2353 path
2354 );
2355 }
2356
2357 if !path.starts_with('/') {
2358 return Err(
2359 "Unix socket path must be absolute (start with /) for security and reliability."
2360 .to_string(),
2361 );
2362 }
2363
2364 Ok(())
2365 }
2366}
2367
2368#[cfg(test)]
2369mod redis_connection_tests {
2370 use super::{ClusterNode, RedisClusterConnection, RedisConnection, RedisSentinel};
2371
2372 #[test]
2373 fn test_standard_url_basic() {
2374 let conn = RedisConnection {
2375 host: "127.0.0.1".to_string(),
2376 port: 6379,
2377 db: 0,
2378 username: None,
2379 password: None,
2380 key_prefix: "sockudo:".to_string(),
2381 sentinels: Vec::new(),
2382 sentinel_password: None,
2383 name: "mymaster".to_string(),
2384 cluster: RedisClusterConnection::default(),
2385 cluster_nodes: Vec::new(),
2386 };
2387 assert_eq!(conn.to_url(), "redis://127.0.0.1:6379/0");
2388 }
2389
2390 #[test]
2391 fn test_standard_url_with_password() {
2392 let conn = RedisConnection {
2393 host: "127.0.0.1".to_string(),
2394 port: 6379,
2395 db: 2,
2396 username: None,
2397 password: Some("secret".to_string()),
2398 key_prefix: "sockudo:".to_string(),
2399 sentinels: Vec::new(),
2400 sentinel_password: None,
2401 name: "mymaster".to_string(),
2402 cluster: RedisClusterConnection::default(),
2403 cluster_nodes: Vec::new(),
2404 };
2405 assert_eq!(conn.to_url(), "redis://:secret@127.0.0.1:6379/2");
2406 }
2407
2408 #[test]
2409 fn test_standard_url_with_username_and_password() {
2410 let conn = RedisConnection {
2411 host: "redis.example.com".to_string(),
2412 port: 6380,
2413 db: 1,
2414 username: Some("admin".to_string()),
2415 password: Some("pass123".to_string()),
2416 key_prefix: "sockudo:".to_string(),
2417 sentinels: Vec::new(),
2418 sentinel_password: None,
2419 name: "mymaster".to_string(),
2420 cluster: RedisClusterConnection::default(),
2421 cluster_nodes: Vec::new(),
2422 };
2423 assert_eq!(
2424 conn.to_url(),
2425 "redis://admin:pass123@redis.example.com:6380/1"
2426 );
2427 }
2428
2429 #[test]
2430 fn test_standard_url_with_special_chars_in_password() {
2431 let conn = RedisConnection {
2432 host: "127.0.0.1".to_string(),
2433 port: 6379,
2434 db: 0,
2435 username: None,
2436 password: Some("pass@word#123".to_string()),
2437 key_prefix: "sockudo:".to_string(),
2438 sentinels: Vec::new(),
2439 sentinel_password: None,
2440 name: "mymaster".to_string(),
2441 cluster: RedisClusterConnection::default(),
2442 cluster_nodes: Vec::new(),
2443 };
2444 assert_eq!(conn.to_url(), "redis://:pass%40word%23123@127.0.0.1:6379/0");
2445 }
2446
2447 #[test]
2448 fn test_is_sentinel_configured_false() {
2449 let conn = RedisConnection::default();
2450 assert!(!conn.is_sentinel_configured());
2451 }
2452
2453 #[test]
2454 fn test_is_sentinel_configured_true() {
2455 let conn = RedisConnection {
2456 sentinels: vec![RedisSentinel {
2457 host: "sentinel1".to_string(),
2458 port: 26379,
2459 }],
2460 ..Default::default()
2461 };
2462 assert!(conn.is_sentinel_configured());
2463 }
2464
2465 #[test]
2466 fn test_sentinel_url_basic() {
2467 let conn = RedisConnection {
2468 host: "127.0.0.1".to_string(),
2469 port: 6379,
2470 db: 0,
2471 username: None,
2472 password: None,
2473 key_prefix: "sockudo:".to_string(),
2474 sentinels: vec![
2475 RedisSentinel {
2476 host: "sentinel1".to_string(),
2477 port: 26379,
2478 },
2479 RedisSentinel {
2480 host: "sentinel2".to_string(),
2481 port: 26379,
2482 },
2483 ],
2484 sentinel_password: None,
2485 name: "mymaster".to_string(),
2486 cluster: RedisClusterConnection::default(),
2487 cluster_nodes: Vec::new(),
2488 };
2489 assert_eq!(
2490 conn.to_url(),
2491 "redis+sentinel://sentinel1:26379,sentinel2:26379/mymaster/0"
2492 );
2493 }
2494
2495 #[test]
2496 fn test_sentinel_url_with_sentinel_password() {
2497 let conn = RedisConnection {
2498 host: "127.0.0.1".to_string(),
2499 port: 6379,
2500 db: 0,
2501 username: None,
2502 password: None,
2503 key_prefix: "sockudo:".to_string(),
2504 sentinels: vec![RedisSentinel {
2505 host: "sentinel1".to_string(),
2506 port: 26379,
2507 }],
2508 sentinel_password: Some("sentinelpass".to_string()),
2509 name: "mymaster".to_string(),
2510 cluster: RedisClusterConnection::default(),
2511 cluster_nodes: Vec::new(),
2512 };
2513 assert_eq!(
2514 conn.to_url(),
2515 "redis+sentinel://:sentinelpass@sentinel1:26379/mymaster/0"
2516 );
2517 }
2518
2519 #[test]
2520 fn test_sentinel_url_with_master_password() {
2521 let conn = RedisConnection {
2522 host: "127.0.0.1".to_string(),
2523 port: 6379,
2524 db: 1,
2525 username: None,
2526 password: Some("masterpass".to_string()),
2527 key_prefix: "sockudo:".to_string(),
2528 sentinels: vec![RedisSentinel {
2529 host: "sentinel1".to_string(),
2530 port: 26379,
2531 }],
2532 sentinel_password: None,
2533 name: "mymaster".to_string(),
2534 cluster: RedisClusterConnection::default(),
2535 cluster_nodes: Vec::new(),
2536 };
2537 assert_eq!(
2538 conn.to_url(),
2539 "redis+sentinel://sentinel1:26379/mymaster/1?password=masterpass"
2540 );
2541 }
2542
2543 #[test]
2544 fn test_sentinel_url_with_all_auth() {
2545 let conn = RedisConnection {
2546 host: "127.0.0.1".to_string(),
2547 port: 6379,
2548 db: 2,
2549 username: Some("redisuser".to_string()),
2550 password: Some("redispass".to_string()),
2551 key_prefix: "sockudo:".to_string(),
2552 sentinels: vec![
2553 RedisSentinel {
2554 host: "sentinel1".to_string(),
2555 port: 26379,
2556 },
2557 RedisSentinel {
2558 host: "sentinel2".to_string(),
2559 port: 26380,
2560 },
2561 ],
2562 sentinel_password: Some("sentinelauth".to_string()),
2563 name: "production-master".to_string(),
2564 cluster: RedisClusterConnection::default(),
2565 cluster_nodes: Vec::new(),
2566 };
2567 assert_eq!(
2568 conn.to_url(),
2569 "redis+sentinel://:sentinelauth@sentinel1:26379,sentinel2:26380/production-master/2?password=redispass&username=redisuser"
2570 );
2571 }
2572
2573 #[test]
2574 fn test_sentinel_to_host_port() {
2575 let sentinel = RedisSentinel {
2576 host: "sentinel.example.com".to_string(),
2577 port: 26379,
2578 };
2579 assert_eq!(sentinel.to_host_port(), "sentinel.example.com:26379");
2580 }
2581
2582 #[test]
2583 fn test_cluster_node_urls_with_shared_cluster_auth_and_tls() {
2584 let conn = RedisConnection {
2585 cluster: RedisClusterConnection {
2586 nodes: vec![
2587 ClusterNode {
2588 host: "node1.secure-cluster.com".to_string(),
2589 port: 7000,
2590 },
2591 ClusterNode {
2592 host: "redis://node2.secure-cluster.com:7001".to_string(),
2593 port: 7001,
2594 },
2595 ClusterNode {
2596 host: "rediss://node3.secure-cluster.com".to_string(),
2597 port: 7002,
2598 },
2599 ],
2600 username: None,
2601 password: Some("cluster-secret".to_string()),
2602 use_tls: true,
2603 },
2604 ..Default::default()
2605 };
2606
2607 assert_eq!(
2608 conn.cluster_node_urls(),
2609 vec![
2610 "rediss://:cluster-secret@node1.secure-cluster.com:7000",
2611 "redis://:cluster-secret@node2.secure-cluster.com:7001",
2612 "rediss://:cluster-secret@node3.secure-cluster.com:7002",
2613 ]
2614 );
2615 }
2616
2617 #[test]
2618 fn test_cluster_node_urls_fallback_to_legacy_nodes() {
2619 let conn = RedisConnection {
2620 password: Some("fallback-secret".to_string()),
2621 cluster_nodes: vec![ClusterNode {
2622 host: "legacy-node.example.com".to_string(),
2623 port: 7000,
2624 }],
2625 ..Default::default()
2626 };
2627
2628 assert_eq!(
2629 conn.cluster_node_urls(),
2630 vec!["redis://:fallback-secret@legacy-node.example.com:7000"]
2631 );
2632 }
2633
2634 #[test]
2635 fn test_normalize_cluster_seed_urls() {
2636 let conn = RedisConnection {
2637 cluster: RedisClusterConnection {
2638 nodes: Vec::new(),
2639 username: Some("svc-user".to_string()),
2640 password: Some("svc-pass".to_string()),
2641 use_tls: true,
2642 },
2643 ..Default::default()
2644 };
2645
2646 let seeds = vec![
2647 "node1.example.com:7000".to_string(),
2648 "redis://node2.example.com:7001".to_string(),
2649 "rediss://node3.example.com".to_string(),
2650 ];
2651
2652 assert_eq!(
2653 conn.normalize_cluster_seed_urls(&seeds),
2654 vec![
2655 "rediss://svc-user:svc-pass@node1.example.com:7000",
2656 "redis://svc-user:svc-pass@node2.example.com:7001",
2657 "rediss://svc-user:svc-pass@node3.example.com:6379",
2658 ]
2659 );
2660 }
2661}
2662
2663#[cfg(test)]
2664mod cluster_node_tests {
2665 use super::ClusterNode;
2666
2667 #[test]
2668 fn test_to_url_basic_host() {
2669 let node = ClusterNode {
2670 host: "localhost".to_string(),
2671 port: 6379,
2672 };
2673 assert_eq!(node.to_url(), "redis://localhost:6379");
2674 }
2675
2676 #[test]
2677 fn test_to_url_ip_address() {
2678 let node = ClusterNode {
2679 host: "127.0.0.1".to_string(),
2680 port: 6379,
2681 };
2682 assert_eq!(node.to_url(), "redis://127.0.0.1:6379");
2683 }
2684
2685 #[test]
2686 fn test_to_url_with_redis_protocol() {
2687 let node = ClusterNode {
2688 host: "redis://example.com".to_string(),
2689 port: 6379,
2690 };
2691 assert_eq!(node.to_url(), "redis://example.com:6379");
2692 }
2693
2694 #[test]
2695 fn test_to_url_with_rediss_protocol() {
2696 let node = ClusterNode {
2697 host: "rediss://secure.example.com".to_string(),
2698 port: 6379,
2699 };
2700 assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
2701 }
2702
2703 #[test]
2704 fn test_to_url_with_rediss_protocol_and_port_in_url() {
2705 let node = ClusterNode {
2706 host: "rediss://secure.example.com:7000".to_string(),
2707 port: 6379,
2708 };
2709 assert_eq!(node.to_url(), "rediss://secure.example.com:7000");
2710 }
2711
2712 #[test]
2713 fn test_to_url_with_redis_protocol_and_port_in_url() {
2714 let node = ClusterNode {
2715 host: "redis://example.com:7001".to_string(),
2716 port: 6379,
2717 };
2718 assert_eq!(node.to_url(), "redis://example.com:7001");
2719 }
2720
2721 #[test]
2722 fn test_to_url_with_trailing_whitespace() {
2723 let node = ClusterNode {
2724 host: " rediss://secure.example.com ".to_string(),
2725 port: 6379,
2726 };
2727 assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
2728 }
2729
2730 #[test]
2731 fn test_to_url_custom_port() {
2732 let node = ClusterNode {
2733 host: "redis-cluster.example.com".to_string(),
2734 port: 7000,
2735 };
2736 assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7000");
2737 }
2738
2739 #[test]
2740 fn test_to_url_plain_host_with_port_in_host_field() {
2741 let node = ClusterNode {
2742 host: "redis-cluster.example.com:7010".to_string(),
2743 port: 7000,
2744 };
2745 assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7010");
2746 }
2747
2748 #[test]
2749 fn test_to_url_with_options_adds_auth_and_tls() {
2750 let node = ClusterNode {
2751 host: "node.example.com".to_string(),
2752 port: 7000,
2753 };
2754 assert_eq!(
2755 node.to_url_with_options(true, Some("svc-user"), Some("secret")),
2756 "rediss://svc-user:secret@node.example.com:7000"
2757 );
2758 }
2759
2760 #[test]
2761 fn test_to_url_with_options_keeps_embedded_auth() {
2762 let node = ClusterNode {
2763 host: "rediss://:node-secret@node.example.com:7000".to_string(),
2764 port: 7000,
2765 };
2766 assert_eq!(
2767 node.to_url_with_options(true, Some("global-user"), Some("global-secret")),
2768 "rediss://:node-secret@node.example.com:7000"
2769 );
2770 }
2771
2772 #[test]
2773 fn test_from_seed_parses_plain_host_port() {
2774 let node = ClusterNode::from_seed("cluster-node-1:7005").expect("node should parse");
2775 assert_eq!(node.host, "cluster-node-1");
2776 assert_eq!(node.port, 7005);
2777 }
2778
2779 #[test]
2780 fn test_from_seed_keeps_scheme_urls() {
2781 let node =
2782 ClusterNode::from_seed("rediss://secure.example.com:7005").expect("node should parse");
2783 assert_eq!(node.host, "rediss://secure.example.com:7005");
2784 assert_eq!(node.port, 7005);
2785 }
2786
2787 #[test]
2788 fn test_to_url_aws_elasticache_hostname() {
2789 let node = ClusterNode {
2790 host: "rediss://my-cluster.use1.cache.amazonaws.com".to_string(),
2791 port: 6379,
2792 };
2793 assert_eq!(
2794 node.to_url(),
2795 "rediss://my-cluster.use1.cache.amazonaws.com:6379"
2796 );
2797 }
2798
2799 #[test]
2800 fn test_to_url_with_ipv6_no_port() {
2801 let node = ClusterNode {
2802 host: "rediss://[::1]".to_string(),
2803 port: 6379,
2804 };
2805 assert_eq!(node.to_url(), "rediss://[::1]:6379");
2806 }
2807
2808 #[test]
2809 fn test_to_url_with_ipv6_and_port_in_url() {
2810 let node = ClusterNode {
2811 host: "rediss://[::1]:7000".to_string(),
2812 port: 6379,
2813 };
2814 assert_eq!(node.to_url(), "rediss://[::1]:7000");
2815 }
2816
2817 #[test]
2818 fn test_to_url_with_ipv6_full_address_no_port() {
2819 let node = ClusterNode {
2820 host: "rediss://[2001:db8::1]".to_string(),
2821 port: 6379,
2822 };
2823 assert_eq!(node.to_url(), "rediss://[2001:db8::1]:6379");
2824 }
2825
2826 #[test]
2827 fn test_to_url_with_ipv6_full_address_with_port() {
2828 let node = ClusterNode {
2829 host: "rediss://[2001:db8::1]:7000".to_string(),
2830 port: 6379,
2831 };
2832 assert_eq!(node.to_url(), "rediss://[2001:db8::1]:7000");
2833 }
2834
2835 #[test]
2836 fn test_to_url_with_redis_protocol_ipv6() {
2837 let node = ClusterNode {
2838 host: "redis://[::1]".to_string(),
2839 port: 6379,
2840 };
2841 assert_eq!(node.to_url(), "redis://[::1]:6379");
2842 }
2843}