1use crate::app::App;
2use crate::utils::{parse_bool_env, parse_env, parse_env_optional};
3use ahash::AHashMap;
4use serde::{Deserialize, Serialize};
5use std::str::FromStr;
6use tracing::{info, warn};
7use url::Url;
8
9fn deserialize_octal_permission<'de, D>(deserializer: D) -> Result<u32, D::Error>
11where
12 D: serde::Deserializer<'de>,
13{
14 use serde::de::{self};
15
16 let s = String::deserialize(deserializer)?;
17
18 if !s.chars().all(|c| c.is_digit(8)) {
20 return Err(de::Error::custom(format!(
21 "invalid octal permission mode '{}': must contain only digits 0-7",
22 s
23 )));
24 }
25
26 let mode = u32::from_str_radix(&s, 8)
28 .map_err(|_| de::Error::custom(format!("invalid octal permission mode: {}", s)))?;
29
30 if mode > 0o777 {
32 return Err(de::Error::custom(format!(
33 "permission mode '{}' exceeds maximum value 777",
34 s
35 )));
36 }
37
38 Ok(mode)
39}
40
41fn parse_driver_enum<T: FromStr + Clone + std::fmt::Debug>(
43 driver_str: String,
44 default_driver: T,
45 driver_name: &str,
46) -> T
47where
48 <T as FromStr>::Err: std::fmt::Debug,
49{
50 match T::from_str(&driver_str.to_lowercase()) {
51 Ok(driver_enum) => driver_enum,
52 Err(e) => {
53 warn!(
54 "Failed to parse {} driver from string '{}': {:?}. Using default: {:?}.",
55 driver_name, driver_str, e, default_driver
56 );
57 default_driver
58 }
59 }
60}
61
62fn override_db_pool_settings(db_conn: &mut DatabaseConnection, prefix: &str) {
63 if let Some(min) = parse_env_optional::<u32>(&format!("{}_POOL_MIN", prefix)) {
64 db_conn.pool_min = Some(min);
65 }
66 if let Some(max) = parse_env_optional::<u32>(&format!("{}_POOL_MAX", prefix)) {
67 db_conn.pool_max = Some(max);
68 }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
74#[serde(rename_all = "lowercase")]
75pub enum AdapterDriver {
76 #[default]
77 Local,
78 Redis,
79 #[serde(rename = "redis-cluster")]
80 RedisCluster,
81 Nats,
82 RabbitMq,
83 #[serde(rename = "google-pubsub")]
84 GooglePubSub,
85 Kafka,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
89#[serde(default)]
90pub struct DynamoDbSettings {
91 pub region: String,
92 pub table_name: String,
93 pub endpoint_url: Option<String>,
94 pub aws_access_key_id: Option<String>,
95 pub aws_secret_access_key: Option<String>,
96 pub aws_profile_name: Option<String>,
97}
98
99impl Default for DynamoDbSettings {
100 fn default() -> Self {
101 Self {
102 region: "us-east-1".to_string(),
103 table_name: "sockudo-applications".to_string(),
104 endpoint_url: None,
105 aws_access_key_id: None,
106 aws_secret_access_key: None,
107 aws_profile_name: None,
108 }
109 }
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
113#[serde(default)]
114pub struct ScyllaDbSettings {
115 pub nodes: Vec<String>,
116 pub keyspace: String,
117 pub table_name: String,
118 pub username: Option<String>,
119 pub password: Option<String>,
120 pub replication_class: String,
121 pub replication_factor: u32,
122}
123
124impl Default for ScyllaDbSettings {
125 fn default() -> Self {
126 Self {
127 nodes: vec!["127.0.0.1:9042".to_string()],
128 keyspace: "sockudo".to_string(),
129 table_name: "applications".to_string(),
130 username: None,
131 password: None,
132 replication_class: "SimpleStrategy".to_string(),
133 replication_factor: 3,
134 }
135 }
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize)]
139#[serde(default)]
140pub struct SurrealDbSettings {
141 pub url: String,
142 pub namespace: String,
143 pub database: String,
144 pub username: String,
145 pub password: String,
146 pub table_name: String,
147 pub cache_ttl: u64,
148 pub cache_max_capacity: u64,
149}
150
151impl Default for SurrealDbSettings {
152 fn default() -> Self {
153 Self {
154 url: "ws://127.0.0.1:8000".to_string(),
155 namespace: "sockudo".to_string(),
156 database: "sockudo".to_string(),
157 username: "root".to_string(),
158 password: "root".to_string(),
159 table_name: "applications".to_string(),
160 cache_ttl: 300,
161 cache_max_capacity: 100,
162 }
163 }
164}
165
166impl FromStr for AdapterDriver {
167 type Err = String;
168 fn from_str(s: &str) -> Result<Self, Self::Err> {
169 match s.to_lowercase().as_str() {
170 "local" => Ok(AdapterDriver::Local),
171 "redis" => Ok(AdapterDriver::Redis),
172 "redis-cluster" => Ok(AdapterDriver::RedisCluster),
173 "nats" => Ok(AdapterDriver::Nats),
174 "rabbitmq" | "rabbit-mq" => Ok(AdapterDriver::RabbitMq),
175 "google-pubsub" | "gcp-pubsub" | "pubsub" => Ok(AdapterDriver::GooglePubSub),
176 "kafka" => Ok(AdapterDriver::Kafka),
177 _ => Err(format!("Unknown adapter driver: {s}")),
178 }
179 }
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
183#[serde(rename_all = "lowercase")]
184pub enum AppManagerDriver {
185 #[default]
186 Memory,
187 Mysql,
188 Dynamodb,
189 PgSql,
190 SurrealDb,
191 ScyllaDb,
192}
193impl FromStr for AppManagerDriver {
194 type Err = String;
195 fn from_str(s: &str) -> Result<Self, Self::Err> {
196 match s.to_lowercase().as_str() {
197 "memory" => Ok(AppManagerDriver::Memory),
198 "mysql" => Ok(AppManagerDriver::Mysql),
199 "dynamodb" => Ok(AppManagerDriver::Dynamodb),
200 "pgsql" | "postgres" | "postgresql" => Ok(AppManagerDriver::PgSql),
201 "surreal" | "surrealdb" => Ok(AppManagerDriver::SurrealDb),
202 "scylladb" | "scylla" => Ok(AppManagerDriver::ScyllaDb),
203 _ => Err(format!("Unknown app manager driver: {s}")),
204 }
205 }
206}
207
208#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
209#[serde(rename_all = "lowercase")]
210pub enum CacheDriver {
211 #[default]
212 Memory,
213 Redis,
214 #[serde(rename = "redis-cluster")]
215 RedisCluster,
216 None,
217}
218
219impl FromStr for CacheDriver {
220 type Err = String;
221 fn from_str(s: &str) -> Result<Self, Self::Err> {
222 match s.to_lowercase().as_str() {
223 "memory" => Ok(CacheDriver::Memory),
224 "redis" => Ok(CacheDriver::Redis),
225 "redis-cluster" => Ok(CacheDriver::RedisCluster),
226 "none" => Ok(CacheDriver::None),
227 _ => Err(format!("Unknown cache driver: {s}")),
228 }
229 }
230}
231
232#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
233#[serde(rename_all = "lowercase")]
234pub enum QueueDriver {
235 Memory,
236 #[default]
237 Redis,
238 #[serde(rename = "redis-cluster")]
239 RedisCluster,
240 Sqs,
241 Sns,
242 None,
243}
244
245impl FromStr for QueueDriver {
246 type Err = String;
247 fn from_str(s: &str) -> Result<Self, Self::Err> {
248 match s.to_lowercase().as_str() {
249 "memory" => Ok(QueueDriver::Memory),
250 "redis" => Ok(QueueDriver::Redis),
251 "redis-cluster" => Ok(QueueDriver::RedisCluster),
252 "sqs" => Ok(QueueDriver::Sqs),
253 "sns" => Ok(QueueDriver::Sns),
254 "none" => Ok(QueueDriver::None),
255 _ => Err(format!("Unknown queue driver: {s}")),
256 }
257 }
258}
259
260impl AsRef<str> for QueueDriver {
261 fn as_ref(&self) -> &str {
262 match self {
263 QueueDriver::Memory => "memory",
264 QueueDriver::Redis => "redis",
265 QueueDriver::RedisCluster => "redis-cluster",
266 QueueDriver::Sqs => "sqs",
267 QueueDriver::Sns => "sns",
268 QueueDriver::None => "none",
269 }
270 }
271}
272
273#[derive(Debug, Clone, Serialize, Deserialize)]
274#[serde(default)]
275pub struct RedisClusterQueueConfig {
276 pub concurrency: u32,
277 pub prefix: Option<String>,
278 pub nodes: Vec<String>,
279 pub request_timeout_ms: u64,
280}
281
282impl Default for RedisClusterQueueConfig {
283 fn default() -> Self {
284 Self {
285 concurrency: 5,
286 prefix: Some("sockudo_queue:".to_string()),
287 nodes: vec!["redis://127.0.0.1:6379".to_string()],
288 request_timeout_ms: 5000,
289 }
290 }
291}
292
293#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
294#[serde(rename_all = "lowercase")]
295pub enum MetricsDriver {
296 #[default]
297 Prometheus,
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
301#[serde(rename_all = "lowercase")]
302pub enum LogOutputFormat {
303 #[default]
304 Human,
305 Json,
306}
307
308impl FromStr for LogOutputFormat {
309 type Err = String;
310 fn from_str(s: &str) -> Result<Self, Self::Err> {
311 match s.to_lowercase().as_str() {
312 "human" => Ok(LogOutputFormat::Human),
313 "json" => Ok(LogOutputFormat::Json),
314 _ => Err(format!("Unknown log output format: {s}")),
315 }
316 }
317}
318
319impl FromStr for MetricsDriver {
320 type Err = String;
321 fn from_str(s: &str) -> Result<Self, Self::Err> {
322 match s.to_lowercase().as_str() {
323 "prometheus" => Ok(MetricsDriver::Prometheus),
324 _ => Err(format!("Unknown metrics driver: {s}")),
325 }
326 }
327}
328
329impl AsRef<str> for MetricsDriver {
330 fn as_ref(&self) -> &str {
331 match self {
332 MetricsDriver::Prometheus => "prometheus",
333 }
334 }
335}
336
337#[derive(Debug, Clone, Serialize, Deserialize)]
339#[serde(default)]
340pub struct ServerOptions {
341 pub adapter: AdapterConfig,
342 pub app_manager: AppManagerConfig,
343 pub cache: CacheConfig,
344 pub channel_limits: ChannelLimits,
345 pub cors: CorsConfig,
346 pub database: DatabaseConfig,
347 pub database_pooling: DatabasePooling,
348 pub debug: bool,
349 pub event_limits: EventLimits,
350 pub host: String,
351 pub http_api: HttpApiConfig,
352 pub instance: InstanceConfig,
353 pub logging: Option<LoggingConfig>,
354 pub metrics: MetricsConfig,
355 pub mode: String,
356 pub port: u16,
357 pub path_prefix: String,
358 pub presence: PresenceConfig,
359 pub queue: QueueConfig,
360 pub rate_limiter: RateLimiterConfig,
361 pub shutdown_grace_period: u64,
362 pub ssl: SslConfig,
363 pub user_authentication_timeout: u64,
364 pub webhooks: WebhooksConfig,
365 pub websocket_max_payload_kb: u32,
366 pub cleanup: CleanupConfig,
367 pub activity_timeout: u64,
368 pub cluster_health: ClusterHealthConfig,
369 pub unix_socket: UnixSocketConfig,
370 pub delta_compression: DeltaCompressionOptionsConfig,
371 pub tag_filtering: TagFilteringConfig,
372 pub websocket: WebSocketConfig,
373 pub connection_recovery: ConnectionRecoveryConfig,
374 pub idempotency: IdempotencyConfig,
375 pub ephemeral: EphemeralConfig,
376 pub echo_control: EchoControlConfig,
377 pub event_name_filtering: EventNameFilteringConfig,
378}
379
380#[derive(Debug, Clone, Serialize, Deserialize)]
383#[serde(default)]
384pub struct SqsQueueConfig {
385 pub region: String,
386 pub queue_url_prefix: Option<String>,
387 pub visibility_timeout: i32,
388 pub endpoint_url: Option<String>,
389 pub max_messages: i32,
390 pub wait_time_seconds: i32,
391 pub concurrency: u32,
392 pub fifo: bool,
393 pub message_group_id: Option<String>,
394}
395
396#[derive(Debug, Clone, Serialize, Deserialize)]
397#[serde(default)]
398pub struct SnsQueueConfig {
399 pub region: String,
400 pub topic_arn: String,
401 pub endpoint_url: Option<String>,
402}
403
404#[derive(Debug, Clone, Serialize, Deserialize)]
405#[serde(default)]
406pub struct AdapterConfig {
407 pub driver: AdapterDriver,
408 pub redis: RedisAdapterConfig,
409 pub cluster: RedisClusterAdapterConfig,
410 pub nats: NatsAdapterConfig,
411 pub rabbitmq: RabbitMqAdapterConfig,
412 pub google_pubsub: GooglePubSubAdapterConfig,
413 pub kafka: KafkaAdapterConfig,
414 #[serde(default = "default_buffer_multiplier_per_cpu")]
415 pub buffer_multiplier_per_cpu: usize,
416 pub cluster_health: ClusterHealthConfig,
417 #[serde(default = "default_enable_socket_counting")]
418 pub enable_socket_counting: bool,
419}
420
421fn default_enable_socket_counting() -> bool {
422 true
423}
424
425fn default_buffer_multiplier_per_cpu() -> usize {
426 64
427}
428
429impl Default for AdapterConfig {
430 fn default() -> Self {
431 Self {
432 driver: AdapterDriver::default(),
433 redis: RedisAdapterConfig::default(),
434 cluster: RedisClusterAdapterConfig::default(),
435 nats: NatsAdapterConfig::default(),
436 rabbitmq: RabbitMqAdapterConfig::default(),
437 google_pubsub: GooglePubSubAdapterConfig::default(),
438 kafka: KafkaAdapterConfig::default(),
439 buffer_multiplier_per_cpu: default_buffer_multiplier_per_cpu(),
440 cluster_health: ClusterHealthConfig::default(),
441 enable_socket_counting: default_enable_socket_counting(),
442 }
443 }
444}
445
446#[derive(Debug, Clone, Serialize, Deserialize)]
447#[serde(default)]
448pub struct RedisAdapterConfig {
449 pub requests_timeout: u64,
450 pub prefix: String,
451 pub redis_pub_options: AHashMap<String, sonic_rs::Value>,
452 pub redis_sub_options: AHashMap<String, sonic_rs::Value>,
453 pub cluster_mode: bool,
454}
455
456#[derive(Debug, Clone, Serialize, Deserialize)]
457#[serde(default)]
458pub struct RedisClusterAdapterConfig {
459 pub nodes: Vec<String>,
460 pub prefix: String,
461 pub request_timeout_ms: u64,
462 pub use_connection_manager: bool,
463 #[serde(default)]
464 pub use_sharded_pubsub: bool,
465}
466
467#[derive(Debug, Clone, Serialize, Deserialize)]
468#[serde(default)]
469pub struct NatsAdapterConfig {
470 pub servers: Vec<String>,
471 pub prefix: String,
472 pub request_timeout_ms: u64,
473 pub username: Option<String>,
474 pub password: Option<String>,
475 pub token: Option<String>,
476 pub connection_timeout_ms: u64,
477 pub nodes_number: Option<u32>,
478}
479
480#[derive(Debug, Clone, Serialize, Deserialize)]
481#[serde(default)]
482pub struct RabbitMqAdapterConfig {
483 pub url: String,
484 pub prefix: String,
485 pub request_timeout_ms: u64,
486 pub connection_timeout_ms: u64,
487 pub nodes_number: Option<u32>,
488}
489
490#[derive(Debug, Clone, Serialize, Deserialize)]
491#[serde(default)]
492pub struct GooglePubSubAdapterConfig {
493 pub project_id: String,
494 pub prefix: String,
495 pub request_timeout_ms: u64,
496 pub emulator_host: Option<String>,
497 pub nodes_number: Option<u32>,
498}
499
500#[derive(Debug, Clone, Serialize, Deserialize)]
501#[serde(default)]
502pub struct KafkaAdapterConfig {
503 pub brokers: Vec<String>,
504 pub prefix: String,
505 pub request_timeout_ms: u64,
506 pub security_protocol: Option<String>,
507 pub sasl_mechanism: Option<String>,
508 pub sasl_username: Option<String>,
509 pub sasl_password: Option<String>,
510 pub nodes_number: Option<u32>,
511}
512
513#[derive(Debug, Clone, Serialize, Deserialize, Default)]
514#[serde(default)]
515pub struct AppManagerConfig {
516 pub driver: AppManagerDriver,
517 pub array: ArrayConfig,
518 pub cache: CacheSettings,
519 pub scylladb: ScyllaDbSettings,
520}
521
522#[derive(Debug, Clone, Serialize, Deserialize, Default)]
523#[serde(default)]
524pub struct ArrayConfig {
525 pub apps: Vec<App>,
526}
527
528#[derive(Debug, Clone, Serialize, Deserialize)]
529#[serde(default)]
530pub struct CacheSettings {
531 pub enabled: bool,
532 pub ttl: u64,
533}
534
535#[derive(Debug, Clone, Serialize, Deserialize)]
536#[serde(default)]
537pub struct MemoryCacheOptions {
538 pub ttl: u64,
539 pub cleanup_interval: u64,
540 pub max_capacity: u64,
541}
542
543#[derive(Debug, Clone, Serialize, Deserialize)]
544#[serde(default)]
545pub struct CacheConfig {
546 pub driver: CacheDriver,
547 pub redis: RedisConfig,
548 pub memory: MemoryCacheOptions,
549}
550
551#[derive(Debug, Clone, Serialize, Deserialize, Default)]
552#[serde(default)]
553pub struct RedisConfig {
554 pub prefix: Option<String>,
555 pub url_override: Option<String>,
556 pub cluster_mode: bool,
557}
558
559#[derive(Debug, Clone, Serialize, Deserialize)]
560#[serde(default)]
561pub struct ChannelLimits {
562 pub max_name_length: u32,
563 pub cache_ttl: u64,
564}
565
566#[derive(Debug, Clone, Serialize, Deserialize)]
567#[serde(default)]
568pub struct CorsConfig {
569 pub credentials: bool,
570 #[serde(deserialize_with = "deserialize_and_validate_cors_origins")]
571 pub origin: Vec<String>,
572 pub methods: Vec<String>,
573 pub allowed_headers: Vec<String>,
574}
575
576fn deserialize_and_validate_cors_origins<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
577where
578 D: serde::Deserializer<'de>,
579{
580 use serde::de::Error;
581 let origins = Vec::<String>::deserialize(deserializer)?;
582
583 if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&origins) {
584 return Err(D::Error::custom(format!(
585 "CORS origin pattern validation failed: {}",
586 e
587 )));
588 }
589
590 Ok(origins)
591}
592
593#[derive(Debug, Clone, Serialize, Deserialize, Default)]
594#[serde(default)]
595pub struct DatabaseConfig {
596 pub mysql: DatabaseConnection,
597 pub postgres: DatabaseConnection,
598 pub redis: RedisConnection,
599 pub dynamodb: DynamoDbSettings,
600 pub surrealdb: SurrealDbSettings,
601 pub scylladb: ScyllaDbSettings,
602}
603
604#[derive(Debug, Clone, Serialize, Deserialize)]
605#[serde(default)]
606pub struct DatabaseConnection {
607 pub host: String,
608 pub port: u16,
609 pub username: String,
610 pub password: String,
611 pub database: String,
612 pub table_name: String,
613 pub connection_pool_size: u32,
614 pub pool_min: Option<u32>,
615 pub pool_max: Option<u32>,
616 pub cache_ttl: u64,
617 pub cache_cleanup_interval: u64,
618 pub cache_max_capacity: u64,
619}
620
621#[derive(Debug, Clone, Serialize, Deserialize)]
622#[serde(default)]
623pub struct RedisConnection {
624 pub host: String,
625 pub port: u16,
626 pub db: u32,
627 pub username: Option<String>,
628 pub password: Option<String>,
629 pub key_prefix: String,
630 pub sentinels: Vec<RedisSentinel>,
631 pub sentinel_password: Option<String>,
632 pub name: String,
633 pub cluster: RedisClusterConnection,
634 pub cluster_nodes: Vec<ClusterNode>,
636}
637
638#[derive(Debug, Clone, Serialize, Deserialize)]
639#[serde(default)]
640pub struct RedisSentinel {
641 pub host: String,
642 pub port: u16,
643}
644
645#[derive(Debug, Clone, Serialize, Deserialize, Default)]
646#[serde(default)]
647pub struct RedisClusterConnection {
648 pub nodes: Vec<ClusterNode>,
649 pub username: Option<String>,
650 pub password: Option<String>,
651 #[serde(alias = "useTLS")]
652 pub use_tls: bool,
653}
654
655impl RedisConnection {
656 pub fn is_sentinel_configured(&self) -> bool {
658 !self.sentinels.is_empty()
659 }
660
661 pub fn to_url(&self) -> String {
663 if self.is_sentinel_configured() {
664 self.build_sentinel_url()
665 } else {
666 self.build_standard_url()
667 }
668 }
669
670 fn build_standard_url(&self) -> String {
671 let (scheme, host) = if self.host.starts_with("rediss://") {
673 ("rediss://", self.host.trim_start_matches("rediss://"))
674 } else if self.host.starts_with("redis://") {
675 ("redis://", self.host.trim_start_matches("redis://"))
676 } else {
677 ("redis://", self.host.as_str())
678 };
679
680 let mut url = String::from(scheme);
681
682 if let Some(ref username) = self.username {
683 url.push_str(username);
684 if let Some(ref password) = self.password {
685 url.push(':');
686 url.push_str(&urlencoding::encode(password));
687 }
688 url.push('@');
689 } else if let Some(ref password) = self.password {
690 url.push(':');
691 url.push_str(&urlencoding::encode(password));
692 url.push('@');
693 }
694
695 url.push_str(host);
696 url.push(':');
697 url.push_str(&self.port.to_string());
698 url.push('/');
699 url.push_str(&self.db.to_string());
700
701 url
702 }
703
704 fn build_sentinel_url(&self) -> String {
705 let mut url = String::from("redis+sentinel://");
706
707 if let Some(ref sentinel_password) = self.sentinel_password {
708 url.push(':');
709 url.push_str(&urlencoding::encode(sentinel_password));
710 url.push('@');
711 }
712
713 let sentinel_hosts: Vec<String> = self
714 .sentinels
715 .iter()
716 .map(|s| format!("{}:{}", s.host, s.port))
717 .collect();
718 url.push_str(&sentinel_hosts.join(","));
719
720 url.push('/');
721 url.push_str(&self.name);
722 url.push('/');
723 url.push_str(&self.db.to_string());
724
725 let mut params = Vec::new();
726 if let Some(ref password) = self.password {
727 params.push(format!("password={}", urlencoding::encode(password)));
728 }
729 if let Some(ref username) = self.username {
730 params.push(format!("username={}", urlencoding::encode(username)));
731 }
732
733 if !params.is_empty() {
734 url.push('?');
735 url.push_str(¶ms.join("&"));
736 }
737
738 url
739 }
740
741 pub fn has_cluster_nodes(&self) -> bool {
744 !self.cluster.nodes.is_empty() || !self.cluster_nodes.is_empty()
745 }
746
747 pub fn cluster_node_urls(&self) -> Vec<String> {
750 if !self.cluster.nodes.is_empty() {
751 return self.build_cluster_urls(&self.cluster.nodes);
752 }
753 self.build_cluster_urls(&self.cluster_nodes)
754 }
755
756 pub fn normalize_cluster_seed_urls(&self, seeds: &[String]) -> Vec<String> {
759 self.build_cluster_urls(
760 &seeds
761 .iter()
762 .filter_map(|seed| ClusterNode::from_seed(seed))
763 .collect::<Vec<ClusterNode>>(),
764 )
765 }
766
767 fn build_cluster_urls(&self, nodes: &[ClusterNode]) -> Vec<String> {
768 let username = self
769 .cluster
770 .username
771 .as_deref()
772 .or(self.username.as_deref());
773 let password = self
774 .cluster
775 .password
776 .as_deref()
777 .or(self.password.as_deref());
778 let use_tls = self.cluster.use_tls;
779
780 nodes
781 .iter()
782 .map(|node| node.to_url_with_options(use_tls, username, password))
783 .collect()
784 }
785}
786
787impl RedisSentinel {
788 pub fn to_host_port(&self) -> String {
789 format!("{}:{}", self.host, self.port)
790 }
791}
792
793#[derive(Debug, Clone, Serialize, Deserialize)]
794#[serde(default)]
795pub struct ClusterNode {
796 pub host: String,
797 pub port: u16,
798}
799
800impl ClusterNode {
801 pub fn to_url(&self) -> String {
802 self.to_url_with_options(false, None, None)
803 }
804
805 pub fn to_url_with_options(
806 &self,
807 use_tls: bool,
808 username: Option<&str>,
809 password: Option<&str>,
810 ) -> String {
811 let host = self.host.trim();
812
813 if host.starts_with("redis://") || host.starts_with("rediss://") {
814 if let Ok(parsed) = Url::parse(host)
815 && let Some(host_str) = parsed.host_str()
816 {
817 let scheme = parsed.scheme();
818 let port = parsed.port_or_known_default().unwrap_or(self.port);
819 let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
820 let parsed_password = parsed.password();
821 let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
822 let (effective_username, effective_password) = if has_embedded_auth {
823 (parsed_username, parsed_password)
824 } else {
825 (username, password)
826 };
827
828 return build_redis_url(
829 scheme,
830 host_str,
831 port,
832 effective_username,
833 effective_password,
834 );
835 }
836
837 let has_port = if let Some(bracket_pos) = host.rfind(']') {
839 host[bracket_pos..].contains(':')
840 } else {
841 host.split(':').count() >= 3
842 };
843 let base = if has_port {
844 host.to_string()
845 } else {
846 format!("{}:{}", host, self.port)
847 };
848
849 if let Ok(parsed) = Url::parse(&base) {
850 let parsed_username = (!parsed.username().is_empty()).then_some(parsed.username());
851 let parsed_password = parsed.password();
852 if let Some(host_str) = parsed.host_str() {
853 let port = parsed.port_or_known_default().unwrap_or(self.port);
854 let has_embedded_auth = parsed_username.is_some() || parsed_password.is_some();
855 let (effective_username, effective_password) = if has_embedded_auth {
856 (parsed_username, parsed_password)
857 } else {
858 (username, password)
859 };
860 return build_redis_url(
861 parsed.scheme(),
862 host_str,
863 port,
864 effective_username,
865 effective_password,
866 );
867 }
868 }
869 return base;
870 }
871
872 let (normalized_host, normalized_port) = split_plain_host_and_port(host, self.port);
873 let scheme = if use_tls { "rediss" } else { "redis" };
874 build_redis_url(
875 scheme,
876 &normalized_host,
877 normalized_port,
878 username,
879 password,
880 )
881 }
882
883 pub fn from_seed(seed: &str) -> Option<Self> {
884 let trimmed = seed.trim();
885 if trimmed.is_empty() {
886 return None;
887 }
888
889 if trimmed.starts_with("redis://") || trimmed.starts_with("rediss://") {
890 let port = Url::parse(trimmed)
891 .ok()
892 .and_then(|parsed| parsed.port_or_known_default())
893 .unwrap_or(6379);
894 return Some(Self {
895 host: trimmed.to_string(),
896 port,
897 });
898 }
899
900 let (host, port) = split_plain_host_and_port(trimmed, 6379);
901 Some(Self { host, port })
902 }
903}
904
905fn split_plain_host_and_port(raw_host: &str, default_port: u16) -> (String, u16) {
906 let host = raw_host.trim();
907
908 if host.starts_with('[') {
910 if let Some(end_bracket) = host.find(']') {
911 let host_part = host[1..end_bracket].to_string();
912 let remainder = &host[end_bracket + 1..];
913 if let Some(port_str) = remainder.strip_prefix(':')
914 && let Ok(port) = port_str.parse::<u16>()
915 {
916 return (host_part, port);
917 }
918 return (host_part, default_port);
919 }
920 return (host.to_string(), default_port);
921 }
922
923 if host.matches(':').count() == 1
925 && let Some((host_part, port_part)) = host.rsplit_once(':')
926 && let Ok(port) = port_part.parse::<u16>()
927 {
928 return (host_part.to_string(), port);
929 }
930
931 (host.to_string(), default_port)
932}
933
934fn build_redis_url(
935 scheme: &str,
936 host: &str,
937 port: u16,
938 username: Option<&str>,
939 password: Option<&str>,
940) -> String {
941 let mut url = format!("{scheme}://");
942
943 if let Some(user) = username {
944 url.push_str(&urlencoding::encode(user));
945 if let Some(pass) = password {
946 url.push(':');
947 url.push_str(&urlencoding::encode(pass));
948 }
949 url.push('@');
950 } else if let Some(pass) = password {
951 url.push(':');
952 url.push_str(&urlencoding::encode(pass));
953 url.push('@');
954 }
955
956 if host.contains(':') && !host.starts_with('[') {
957 url.push('[');
958 url.push_str(host);
959 url.push(']');
960 } else {
961 url.push_str(host);
962 }
963 url.push(':');
964 url.push_str(&port.to_string());
965 url
966}
967
968#[derive(Debug, Clone, Serialize, Deserialize)]
969#[serde(default)]
970pub struct DatabasePooling {
971 pub enabled: bool,
972 pub min: u32,
973 pub max: u32,
974}
975
976#[derive(Debug, Clone, Serialize, Deserialize)]
977#[serde(default)]
978pub struct EventLimits {
979 pub max_channels_at_once: u32,
980 pub max_name_length: u32,
981 pub max_payload_in_kb: u32,
982 pub max_batch_size: u32,
983}
984
985#[derive(Debug, Clone, Serialize, Deserialize)]
986#[serde(default)]
987pub struct IdempotencyConfig {
988 pub enabled: bool,
990 pub ttl_seconds: u64,
992 pub max_key_length: usize,
994}
995
996#[derive(Debug, Clone, Serialize, Deserialize)]
997#[serde(default)]
998pub struct EphemeralConfig {
999 pub enabled: bool,
1001}
1002
1003#[derive(Debug, Clone, Serialize, Deserialize)]
1004#[serde(default)]
1005pub struct EchoControlConfig {
1006 pub enabled: bool,
1008 pub default_echo_messages: bool,
1010}
1011
1012#[derive(Debug, Clone, Serialize, Deserialize)]
1013#[serde(default)]
1014pub struct EventNameFilteringConfig {
1015 pub enabled: bool,
1017 pub max_events_per_filter: usize,
1019 pub max_event_name_length: usize,
1021}
1022
1023#[derive(Debug, Clone, Serialize, Deserialize)]
1024#[serde(default)]
1025pub struct ConnectionRecoveryConfig {
1026 pub enabled: bool,
1031 pub buffer_ttl_seconds: u64,
1033 pub max_buffer_size: usize,
1035}
1036
1037#[derive(Debug, Clone, Serialize, Deserialize)]
1038#[serde(default)]
1039pub struct HttpApiConfig {
1040 pub request_limit_in_mb: u32,
1041 pub accept_traffic: AcceptTraffic,
1042 pub usage_enabled: bool,
1043}
1044
1045#[derive(Debug, Clone, Serialize, Deserialize)]
1046#[serde(default)]
1047pub struct AcceptTraffic {
1048 pub memory_threshold: f64,
1049}
1050
1051#[derive(Debug, Clone, Serialize, Deserialize)]
1052#[serde(default)]
1053pub struct InstanceConfig {
1054 pub process_id: String,
1055}
1056
1057#[derive(Debug, Clone, Serialize, Deserialize)]
1058#[serde(default)]
1059pub struct MetricsConfig {
1060 pub enabled: bool,
1061 pub driver: MetricsDriver,
1062 pub host: String,
1063 pub prometheus: PrometheusConfig,
1064 pub port: u16,
1065}
1066
1067#[derive(Debug, Clone, Serialize, Deserialize)]
1068#[serde(default)]
1069pub struct PrometheusConfig {
1070 pub prefix: String,
1071}
1072
1073#[derive(Debug, Clone, Serialize, Deserialize)]
1074#[serde(default)]
1075pub struct LoggingConfig {
1076 pub colors_enabled: bool,
1077 pub include_target: bool,
1078}
1079
1080#[derive(Debug, Clone, Serialize, Deserialize)]
1081#[serde(default)]
1082pub struct PresenceConfig {
1083 pub max_members_per_channel: u32,
1084 pub max_member_size_in_kb: u32,
1085}
1086
1087#[derive(Debug, Clone, Serialize, Deserialize)]
1090#[serde(default)]
1091pub struct WebSocketConfig {
1092 pub max_messages: Option<usize>,
1093 pub max_bytes: Option<usize>,
1094 pub disconnect_on_buffer_full: bool,
1095 pub max_message_size: usize,
1096 pub max_frame_size: usize,
1097 pub write_buffer_size: usize,
1098 pub max_backpressure: usize,
1099 pub auto_ping: bool,
1100 pub ping_interval: u32,
1101 pub idle_timeout: u32,
1102 pub compression: String,
1103}
1104
1105impl Default for WebSocketConfig {
1106 fn default() -> Self {
1107 Self {
1108 max_messages: Some(1000),
1109 max_bytes: None,
1110 disconnect_on_buffer_full: true,
1111 max_message_size: 64 * 1024 * 1024,
1112 max_frame_size: 16 * 1024 * 1024,
1113 write_buffer_size: 16 * 1024,
1114 max_backpressure: 1024 * 1024,
1115 auto_ping: true,
1116 ping_interval: 30,
1117 idle_timeout: 120,
1118 compression: "disabled".to_string(),
1119 }
1120 }
1121}
1122
1123impl WebSocketConfig {
1124 pub fn to_buffer_config(&self) -> crate::websocket::WebSocketBufferConfig {
1126 use crate::websocket::{BufferLimit, WebSocketBufferConfig};
1127
1128 let limit = match (self.max_messages, self.max_bytes) {
1129 (Some(messages), Some(bytes)) => BufferLimit::Both { messages, bytes },
1130 (Some(messages), None) => BufferLimit::Messages(messages),
1131 (None, Some(bytes)) => BufferLimit::Bytes(bytes),
1132 (None, None) => BufferLimit::Messages(1000),
1133 };
1134
1135 WebSocketBufferConfig {
1136 limit,
1137 disconnect_on_full: self.disconnect_on_buffer_full,
1138 }
1139 }
1140
1141 pub fn to_sockudo_ws_config(
1143 &self,
1144 websocket_max_payload_kb: u32,
1145 activity_timeout: u64,
1146 ) -> sockudo_ws::Config {
1147 use sockudo_ws::Compression;
1148
1149 let compression = match self.compression.to_lowercase().as_str() {
1150 "dedicated" => Compression::Dedicated,
1151 "shared" => Compression::Shared,
1152 "window256b" => Compression::Window256B,
1153 "window1kb" => Compression::Window1KB,
1154 "window2kb" => Compression::Window2KB,
1155 "window4kb" => Compression::Window4KB,
1156 "window8kb" => Compression::Window8KB,
1157 "window16kb" => Compression::Window16KB,
1158 "window32kb" => Compression::Window32KB,
1159 _ => Compression::Disabled,
1160 };
1161
1162 sockudo_ws::Config::builder()
1163 .max_payload_length(
1164 self.max_bytes
1165 .unwrap_or(websocket_max_payload_kb as usize * 1024),
1166 )
1167 .max_message_size(self.max_message_size)
1168 .max_frame_size(self.max_frame_size)
1169 .write_buffer_size(self.write_buffer_size)
1170 .max_backpressure(self.max_backpressure)
1171 .idle_timeout(self.idle_timeout)
1172 .auto_ping(self.auto_ping)
1173 .ping_interval(self.ping_interval.max((activity_timeout / 2).max(5) as u32))
1174 .compression(compression)
1175 .build()
1176 }
1177}
1178
1179#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1180#[serde(default)]
1181pub struct QueueConfig {
1182 pub driver: QueueDriver,
1183 pub redis: RedisQueueConfig,
1184 pub redis_cluster: RedisClusterQueueConfig,
1185 pub sqs: SqsQueueConfig,
1186 pub sns: SnsQueueConfig,
1187}
1188
1189#[derive(Debug, Clone, Serialize, Deserialize)]
1190#[serde(default)]
1191pub struct RedisQueueConfig {
1192 pub concurrency: u32,
1193 pub prefix: Option<String>,
1194 pub url_override: Option<String>,
1195 pub cluster_mode: bool,
1196}
1197
1198#[derive(Clone, Debug, Serialize, Deserialize)]
1199#[serde(default)]
1200pub struct RateLimit {
1201 pub max_requests: u32,
1202 pub window_seconds: u64,
1203 pub identifier: Option<String>,
1204 pub trust_hops: Option<u32>,
1205}
1206
1207#[derive(Debug, Clone, Serialize, Deserialize)]
1208#[serde(default)]
1209pub struct RateLimiterConfig {
1210 pub enabled: bool,
1211 pub driver: CacheDriver,
1212 pub api_rate_limit: RateLimit,
1213 pub websocket_rate_limit: RateLimit,
1214 pub redis: RedisConfig,
1215}
1216
1217#[derive(Debug, Clone, Serialize, Deserialize)]
1218#[serde(default)]
1219pub struct SslConfig {
1220 pub enabled: bool,
1221 pub cert_path: String,
1222 pub key_path: String,
1223 pub passphrase: Option<String>,
1224 pub ca_path: Option<String>,
1225 pub redirect_http: bool,
1226 pub http_port: Option<u16>,
1227}
1228
1229#[derive(Debug, Clone, Serialize, Deserialize)]
1230#[serde(default)]
1231pub struct WebhooksConfig {
1232 pub batching: BatchingConfig,
1233 pub retry: WebhookRetryConfig,
1234 pub request_timeout_ms: u64,
1235}
1236
1237#[derive(Debug, Clone, Serialize, Deserialize)]
1238#[serde(default)]
1239pub struct BatchingConfig {
1240 pub enabled: bool,
1241 pub duration: u64,
1242 pub size: usize,
1243}
1244
1245#[derive(Debug, Clone, Serialize, Deserialize)]
1246#[serde(default)]
1247pub struct WebhookRetryConfig {
1248 pub enabled: bool,
1249 pub max_attempts: Option<u32>,
1250 pub max_elapsed_time_ms: u64,
1251 pub initial_backoff_ms: u64,
1252 pub max_backoff_ms: u64,
1253}
1254
1255impl Default for WebhooksConfig {
1256 fn default() -> Self {
1257 Self {
1258 batching: BatchingConfig::default(),
1259 retry: WebhookRetryConfig::default(),
1260 request_timeout_ms: 10_000,
1261 }
1262 }
1263}
1264
1265impl Default for WebhookRetryConfig {
1266 fn default() -> Self {
1267 Self {
1268 enabled: true,
1269 max_attempts: None,
1270 max_elapsed_time_ms: 300_000,
1271 initial_backoff_ms: 1_000,
1272 max_backoff_ms: 60_000,
1273 }
1274 }
1275}
1276
1277#[derive(Debug, Clone, Serialize, Deserialize)]
1278#[serde(default)]
1279pub struct ClusterHealthConfig {
1280 pub enabled: bool,
1281 pub heartbeat_interval_ms: u64,
1282 pub node_timeout_ms: u64,
1283 pub cleanup_interval_ms: u64,
1284}
1285
1286#[derive(Debug, Clone, Serialize, Deserialize)]
1287#[serde(default)]
1288pub struct UnixSocketConfig {
1289 pub enabled: bool,
1290 pub path: String,
1291 #[serde(deserialize_with = "deserialize_octal_permission")]
1292 pub permission_mode: u32,
1293}
1294
1295#[derive(Debug, Clone, Serialize, Deserialize)]
1296#[serde(default)]
1297pub struct DeltaCompressionOptionsConfig {
1298 pub enabled: bool,
1299 pub algorithm: String,
1300 pub full_message_interval: u32,
1301 pub min_message_size: usize,
1302 pub max_state_age_secs: u64,
1303 pub max_channel_states_per_socket: usize,
1304 pub max_conflation_states_per_channel: Option<usize>,
1305 pub conflation_key_path: Option<String>,
1306 pub cluster_coordination: bool,
1307 pub omit_delta_algorithm: bool,
1308}
1309
1310#[derive(Debug, Clone, Serialize, Deserialize)]
1312#[serde(default)]
1313pub struct CleanupConfig {
1314 pub queue_buffer_size: usize,
1315 pub batch_size: usize,
1316 pub batch_timeout_ms: u64,
1317 pub worker_threads: WorkerThreadsConfig,
1318 pub max_retry_attempts: u32,
1319 pub async_enabled: bool,
1320 pub fallback_to_sync: bool,
1321}
1322
1323#[derive(Debug, Clone)]
1325pub enum WorkerThreadsConfig {
1326 Auto,
1327 Fixed(usize),
1328}
1329
1330impl serde::Serialize for WorkerThreadsConfig {
1331 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1332 where
1333 S: serde::Serializer,
1334 {
1335 match self {
1336 WorkerThreadsConfig::Auto => serializer.serialize_str("auto"),
1337 WorkerThreadsConfig::Fixed(n) => serializer.serialize_u64(*n as u64),
1338 }
1339 }
1340}
1341
1342impl<'de> serde::Deserialize<'de> for WorkerThreadsConfig {
1343 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1344 where
1345 D: serde::Deserializer<'de>,
1346 {
1347 use serde::de;
1348 struct WorkerThreadsVisitor;
1349 impl<'de> de::Visitor<'de> for WorkerThreadsVisitor {
1350 type Value = WorkerThreadsConfig;
1351
1352 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1353 formatter.write_str(r#""auto" or a positive integer"#)
1354 }
1355
1356 fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
1357 if value.eq_ignore_ascii_case("auto") {
1358 Ok(WorkerThreadsConfig::Auto)
1359 } else if let Ok(n) = value.parse::<usize>() {
1360 Ok(WorkerThreadsConfig::Fixed(n))
1361 } else {
1362 Err(E::custom(format!(
1363 "expected 'auto' or a number, got '{value}'"
1364 )))
1365 }
1366 }
1367
1368 fn visit_u64<E: de::Error>(self, value: u64) -> Result<Self::Value, E> {
1369 Ok(WorkerThreadsConfig::Fixed(value as usize))
1370 }
1371
1372 fn visit_i64<E: de::Error>(self, value: i64) -> Result<Self::Value, E> {
1373 if value >= 0 {
1374 Ok(WorkerThreadsConfig::Fixed(value as usize))
1375 } else {
1376 Err(E::custom("worker_threads must be non-negative"))
1377 }
1378 }
1379 }
1380 deserializer.deserialize_any(WorkerThreadsVisitor)
1381 }
1382}
1383
1384impl Default for CleanupConfig {
1385 fn default() -> Self {
1386 Self {
1387 queue_buffer_size: 1024,
1388 batch_size: 64,
1389 batch_timeout_ms: 100,
1390 worker_threads: WorkerThreadsConfig::Auto,
1391 max_retry_attempts: 3,
1392 async_enabled: true,
1393 fallback_to_sync: true,
1394 }
1395 }
1396}
1397
1398impl CleanupConfig {
1399 pub fn validate(&self) -> Result<(), String> {
1400 if self.queue_buffer_size == 0 {
1401 return Err("queue_buffer_size must be greater than 0".to_string());
1402 }
1403 if self.batch_size == 0 {
1404 return Err("batch_size must be greater than 0".to_string());
1405 }
1406 if self.batch_timeout_ms == 0 {
1407 return Err("batch_timeout_ms must be greater than 0".to_string());
1408 }
1409 if let WorkerThreadsConfig::Fixed(n) = self.worker_threads
1410 && n == 0
1411 {
1412 return Err("worker_threads must be greater than 0 when using fixed count".to_string());
1413 }
1414 Ok(())
1415 }
1416}
1417
1418#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1419#[serde(default)]
1420pub struct TagFilteringConfig {
1421 #[serde(default)]
1422 pub enabled: bool,
1423 #[serde(default = "default_true")]
1424 pub enable_tags: bool,
1425}
1426
1427fn default_true() -> bool {
1428 true
1429}
1430
1431impl Default for ServerOptions {
1434 fn default() -> Self {
1435 Self {
1436 adapter: AdapterConfig::default(),
1437 app_manager: AppManagerConfig::default(),
1438 cache: CacheConfig::default(),
1439 channel_limits: ChannelLimits::default(),
1440 cors: CorsConfig::default(),
1441 database: DatabaseConfig::default(),
1442 database_pooling: DatabasePooling::default(),
1443 debug: false,
1444 tag_filtering: TagFilteringConfig::default(),
1445 event_limits: EventLimits::default(),
1446 host: "0.0.0.0".to_string(),
1447 http_api: HttpApiConfig::default(),
1448 instance: InstanceConfig::default(),
1449 logging: None,
1450 metrics: MetricsConfig::default(),
1451 mode: "production".to_string(),
1452 port: 6001,
1453 path_prefix: "/".to_string(),
1454 presence: PresenceConfig::default(),
1455 queue: QueueConfig::default(),
1456 rate_limiter: RateLimiterConfig::default(),
1457 shutdown_grace_period: 10,
1458 ssl: SslConfig::default(),
1459 user_authentication_timeout: 3600,
1460 webhooks: WebhooksConfig::default(),
1461 websocket_max_payload_kb: 64,
1462 cleanup: CleanupConfig::default(),
1463 activity_timeout: 120,
1464 cluster_health: ClusterHealthConfig::default(),
1465 unix_socket: UnixSocketConfig::default(),
1466 delta_compression: DeltaCompressionOptionsConfig::default(),
1467 websocket: WebSocketConfig::default(),
1468 connection_recovery: ConnectionRecoveryConfig::default(),
1469 idempotency: IdempotencyConfig::default(),
1470 ephemeral: EphemeralConfig::default(),
1471 echo_control: EchoControlConfig::default(),
1472 event_name_filtering: EventNameFilteringConfig::default(),
1473 }
1474 }
1475}
1476
1477impl Default for SqsQueueConfig {
1478 fn default() -> Self {
1479 Self {
1480 region: "us-east-1".to_string(),
1481 queue_url_prefix: None,
1482 visibility_timeout: 30,
1483 endpoint_url: None,
1484 max_messages: 10,
1485 wait_time_seconds: 5,
1486 concurrency: 5,
1487 fifo: false,
1488 message_group_id: Some("default".to_string()),
1489 }
1490 }
1491}
1492
1493impl Default for SnsQueueConfig {
1494 fn default() -> Self {
1495 Self {
1496 region: "us-east-1".to_string(),
1497 topic_arn: String::new(),
1498 endpoint_url: None,
1499 }
1500 }
1501}
1502
1503impl Default for RedisAdapterConfig {
1504 fn default() -> Self {
1505 Self {
1506 requests_timeout: 5000,
1507 prefix: "sockudo_adapter:".to_string(),
1508 redis_pub_options: AHashMap::new(),
1509 redis_sub_options: AHashMap::new(),
1510 cluster_mode: false,
1511 }
1512 }
1513}
1514
1515impl Default for RedisClusterAdapterConfig {
1516 fn default() -> Self {
1517 Self {
1518 nodes: vec![],
1519 prefix: "sockudo_adapter:".to_string(),
1520 request_timeout_ms: 1000,
1521 use_connection_manager: true,
1522 use_sharded_pubsub: false,
1523 }
1524 }
1525}
1526
1527impl Default for NatsAdapterConfig {
1528 fn default() -> Self {
1529 Self {
1530 servers: vec!["nats://localhost:4222".to_string()],
1531 prefix: "sockudo_adapter:".to_string(),
1532 request_timeout_ms: 5000,
1533 username: None,
1534 password: None,
1535 token: None,
1536 connection_timeout_ms: 5000,
1537 nodes_number: None,
1538 }
1539 }
1540}
1541
1542impl Default for RabbitMqAdapterConfig {
1543 fn default() -> Self {
1544 Self {
1545 url: "amqp://guest:guest@127.0.0.1:5672/%2f".to_string(),
1546 prefix: "sockudo_adapter".to_string(),
1547 request_timeout_ms: 5000,
1548 connection_timeout_ms: 5000,
1549 nodes_number: None,
1550 }
1551 }
1552}
1553
1554impl Default for GooglePubSubAdapterConfig {
1555 fn default() -> Self {
1556 Self {
1557 project_id: "".to_string(),
1558 prefix: "sockudo-adapter".to_string(),
1559 request_timeout_ms: 5000,
1560 emulator_host: None,
1561 nodes_number: None,
1562 }
1563 }
1564}
1565
1566impl Default for KafkaAdapterConfig {
1567 fn default() -> Self {
1568 Self {
1569 brokers: vec!["localhost:9092".to_string()],
1570 prefix: "sockudo_adapter".to_string(),
1571 request_timeout_ms: 5000,
1572 security_protocol: None,
1573 sasl_mechanism: None,
1574 sasl_username: None,
1575 sasl_password: None,
1576 nodes_number: None,
1577 }
1578 }
1579}
1580
1581impl Default for CacheSettings {
1582 fn default() -> Self {
1583 Self {
1584 enabled: true,
1585 ttl: 300,
1586 }
1587 }
1588}
1589
1590impl Default for MemoryCacheOptions {
1591 fn default() -> Self {
1592 Self {
1593 ttl: 300,
1594 cleanup_interval: 60,
1595 max_capacity: 10000,
1596 }
1597 }
1598}
1599
1600impl Default for CacheConfig {
1601 fn default() -> Self {
1602 Self {
1603 driver: CacheDriver::default(),
1604 redis: RedisConfig {
1605 prefix: Some("sockudo_cache:".to_string()),
1606 url_override: None,
1607 cluster_mode: false,
1608 },
1609 memory: MemoryCacheOptions::default(),
1610 }
1611 }
1612}
1613
1614impl Default for ChannelLimits {
1615 fn default() -> Self {
1616 Self {
1617 max_name_length: 200,
1618 cache_ttl: 3600,
1619 }
1620 }
1621}
1622impl Default for CorsConfig {
1623 fn default() -> Self {
1624 Self {
1625 credentials: true,
1626 origin: vec!["*".to_string()],
1627 methods: vec!["GET".to_string(), "POST".to_string(), "OPTIONS".to_string()],
1628 allowed_headers: vec![
1629 "Authorization".to_string(),
1630 "Content-Type".to_string(),
1631 "X-Requested-With".to_string(),
1632 "Accept".to_string(),
1633 ],
1634 }
1635 }
1636}
1637
1638impl Default for DatabaseConnection {
1639 fn default() -> Self {
1640 Self {
1641 host: "localhost".to_string(),
1642 port: 3306,
1643 username: "root".to_string(),
1644 password: "".to_string(),
1645 database: "sockudo".to_string(),
1646 table_name: "applications".to_string(),
1647 connection_pool_size: 10,
1648 pool_min: None,
1649 pool_max: None,
1650 cache_ttl: 300,
1651 cache_cleanup_interval: 60,
1652 cache_max_capacity: 100,
1653 }
1654 }
1655}
1656
1657impl Default for RedisConnection {
1658 fn default() -> Self {
1659 Self {
1660 host: "127.0.0.1".to_string(),
1661 port: 6379,
1662 db: 0,
1663 username: None,
1664 password: None,
1665 key_prefix: "sockudo:".to_string(),
1666 sentinels: Vec::new(),
1667 sentinel_password: None,
1668 name: "mymaster".to_string(),
1669 cluster: RedisClusterConnection::default(),
1670 cluster_nodes: Vec::new(),
1671 }
1672 }
1673}
1674
1675impl Default for RedisSentinel {
1676 fn default() -> Self {
1677 Self {
1678 host: "localhost".to_string(),
1679 port: 26379,
1680 }
1681 }
1682}
1683
1684impl Default for ClusterNode {
1685 fn default() -> Self {
1686 Self {
1687 host: "127.0.0.1".to_string(),
1688 port: 7000,
1689 }
1690 }
1691}
1692
1693impl Default for DatabasePooling {
1694 fn default() -> Self {
1695 Self {
1696 enabled: true,
1697 min: 2,
1698 max: 10,
1699 }
1700 }
1701}
1702
1703impl Default for EventLimits {
1704 fn default() -> Self {
1705 Self {
1706 max_channels_at_once: 100,
1707 max_name_length: 200,
1708 max_payload_in_kb: 100,
1709 max_batch_size: 10,
1710 }
1711 }
1712}
1713
1714impl Default for IdempotencyConfig {
1715 fn default() -> Self {
1716 Self {
1717 enabled: true,
1718 ttl_seconds: 120,
1719 max_key_length: 128,
1720 }
1721 }
1722}
1723
1724impl Default for EphemeralConfig {
1725 fn default() -> Self {
1726 Self { enabled: true }
1727 }
1728}
1729
1730impl Default for EchoControlConfig {
1731 fn default() -> Self {
1732 Self {
1733 enabled: true,
1734 default_echo_messages: true,
1735 }
1736 }
1737}
1738
1739impl Default for EventNameFilteringConfig {
1740 fn default() -> Self {
1741 Self {
1742 enabled: true,
1743 max_events_per_filter: 50,
1744 max_event_name_length: 200,
1745 }
1746 }
1747}
1748
1749impl Default for ConnectionRecoveryConfig {
1750 fn default() -> Self {
1751 Self {
1752 enabled: false,
1753 buffer_ttl_seconds: 120,
1754 max_buffer_size: 100,
1755 }
1756 }
1757}
1758
1759impl Default for HttpApiConfig {
1760 fn default() -> Self {
1761 Self {
1762 request_limit_in_mb: 10,
1763 accept_traffic: AcceptTraffic::default(),
1764 usage_enabled: true,
1765 }
1766 }
1767}
1768
1769impl Default for AcceptTraffic {
1770 fn default() -> Self {
1771 Self {
1772 memory_threshold: 0.90,
1773 }
1774 }
1775}
1776
1777impl Default for InstanceConfig {
1778 fn default() -> Self {
1779 Self {
1780 process_id: uuid::Uuid::new_v4().to_string(),
1781 }
1782 }
1783}
1784
1785impl Default for LoggingConfig {
1786 fn default() -> Self {
1787 Self {
1788 colors_enabled: true,
1789 include_target: true,
1790 }
1791 }
1792}
1793
1794impl Default for MetricsConfig {
1795 fn default() -> Self {
1796 Self {
1797 enabled: true,
1798 driver: MetricsDriver::default(),
1799 host: "0.0.0.0".to_string(),
1800 prometheus: PrometheusConfig::default(),
1801 port: 9601,
1802 }
1803 }
1804}
1805
1806impl Default for PrometheusConfig {
1807 fn default() -> Self {
1808 Self {
1809 prefix: "sockudo_".to_string(),
1810 }
1811 }
1812}
1813
1814impl Default for PresenceConfig {
1815 fn default() -> Self {
1816 Self {
1817 max_members_per_channel: 100,
1818 max_member_size_in_kb: 2,
1819 }
1820 }
1821}
1822
1823impl Default for RedisQueueConfig {
1824 fn default() -> Self {
1825 Self {
1826 concurrency: 5,
1827 prefix: Some("sockudo_queue:".to_string()),
1828 url_override: None,
1829 cluster_mode: false,
1830 }
1831 }
1832}
1833
1834impl Default for RateLimit {
1835 fn default() -> Self {
1836 Self {
1837 max_requests: 60,
1838 window_seconds: 60,
1839 identifier: Some("default".to_string()),
1840 trust_hops: Some(0),
1841 }
1842 }
1843}
1844
1845impl Default for RateLimiterConfig {
1846 fn default() -> Self {
1847 Self {
1848 enabled: true,
1849 driver: CacheDriver::Memory,
1850 api_rate_limit: RateLimit {
1851 max_requests: 100,
1852 window_seconds: 60,
1853 identifier: Some("api".to_string()),
1854 trust_hops: Some(0),
1855 },
1856 websocket_rate_limit: RateLimit {
1857 max_requests: 20,
1858 window_seconds: 60,
1859 identifier: Some("websocket_connect".to_string()),
1860 trust_hops: Some(0),
1861 },
1862 redis: RedisConfig {
1863 prefix: Some("sockudo_rl:".to_string()),
1864 url_override: None,
1865 cluster_mode: false,
1866 },
1867 }
1868 }
1869}
1870
1871impl Default for SslConfig {
1872 fn default() -> Self {
1873 Self {
1874 enabled: false,
1875 cert_path: "".to_string(),
1876 key_path: "".to_string(),
1877 passphrase: None,
1878 ca_path: None,
1879 redirect_http: false,
1880 http_port: Some(80),
1881 }
1882 }
1883}
1884
1885impl Default for BatchingConfig {
1886 fn default() -> Self {
1887 Self {
1888 enabled: true,
1889 duration: 50,
1890 size: 100,
1891 }
1892 }
1893}
1894
1895impl Default for ClusterHealthConfig {
1896 fn default() -> Self {
1897 Self {
1898 enabled: true,
1899 heartbeat_interval_ms: 10000,
1900 node_timeout_ms: 30000,
1901 cleanup_interval_ms: 10000,
1902 }
1903 }
1904}
1905
1906impl Default for UnixSocketConfig {
1907 fn default() -> Self {
1908 Self {
1909 enabled: false,
1910 path: "/var/run/sockudo/sockudo.sock".to_string(),
1911 permission_mode: 0o660,
1912 }
1913 }
1914}
1915
1916impl Default for DeltaCompressionOptionsConfig {
1917 fn default() -> Self {
1918 Self {
1919 enabled: true,
1920 algorithm: "fossil".to_string(),
1921 full_message_interval: 10,
1922 min_message_size: 100,
1923 max_state_age_secs: 300,
1924 max_channel_states_per_socket: 100,
1925 max_conflation_states_per_channel: Some(100),
1926 conflation_key_path: None,
1927 cluster_coordination: false,
1928 omit_delta_algorithm: false,
1929 }
1930 }
1931}
1932
1933impl ClusterHealthConfig {
1934 pub fn validate(&self) -> Result<(), String> {
1935 if self.heartbeat_interval_ms == 0 {
1936 return Err("heartbeat_interval_ms must be greater than 0".to_string());
1937 }
1938 if self.node_timeout_ms == 0 {
1939 return Err("node_timeout_ms must be greater than 0".to_string());
1940 }
1941 if self.cleanup_interval_ms == 0 {
1942 return Err("cleanup_interval_ms must be greater than 0".to_string());
1943 }
1944
1945 if self.heartbeat_interval_ms > self.node_timeout_ms / 3 {
1946 return Err(format!(
1947 "heartbeat_interval_ms ({}) should be at least 3x smaller than node_timeout_ms ({}) to avoid false positive dead node detection. Recommended: heartbeat_interval_ms <= {}",
1948 self.heartbeat_interval_ms,
1949 self.node_timeout_ms,
1950 self.node_timeout_ms / 3
1951 ));
1952 }
1953
1954 if self.cleanup_interval_ms > self.node_timeout_ms {
1955 return Err(format!(
1956 "cleanup_interval_ms ({}) should not be larger than node_timeout_ms ({}) to ensure timely dead node detection",
1957 self.cleanup_interval_ms, self.node_timeout_ms
1958 ));
1959 }
1960
1961 Ok(())
1962 }
1963}
1964
1965impl ServerOptions {
1966 pub async fn load_from_file(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
1967 let content = tokio::fs::read_to_string(path).await?;
1968 let options: Self = if path.ends_with(".toml") {
1969 toml::from_str(&content)?
1970 } else {
1971 sonic_rs::from_str(&content)?
1973 };
1974 Ok(options)
1975 }
1976
1977 pub async fn override_from_env(&mut self) -> Result<(), Box<dyn std::error::Error>> {
1978 if let Ok(mode) = std::env::var("ENVIRONMENT") {
1980 self.mode = mode;
1981 }
1982 self.debug = parse_bool_env("DEBUG_MODE", self.debug);
1983 if parse_bool_env("DEBUG", false) {
1984 self.debug = true;
1985 info!("DEBUG environment variable forces debug mode ON");
1986 }
1987
1988 self.activity_timeout = parse_env::<u64>("ACTIVITY_TIMEOUT", self.activity_timeout);
1989
1990 if let Ok(host) = std::env::var("HOST") {
1991 self.host = host;
1992 }
1993 self.port = parse_env::<u16>("PORT", self.port);
1994 self.shutdown_grace_period =
1995 parse_env::<u64>("SHUTDOWN_GRACE_PERIOD", self.shutdown_grace_period);
1996 self.user_authentication_timeout = parse_env::<u64>(
1997 "USER_AUTHENTICATION_TIMEOUT",
1998 self.user_authentication_timeout,
1999 );
2000 self.websocket_max_payload_kb =
2001 parse_env::<u32>("WEBSOCKET_MAX_PAYLOAD_KB", self.websocket_max_payload_kb);
2002 if let Ok(id) = std::env::var("INSTANCE_PROCESS_ID") {
2003 self.instance.process_id = id;
2004 }
2005
2006 if let Ok(driver_str) = std::env::var("ADAPTER_DRIVER") {
2008 self.adapter.driver =
2009 parse_driver_enum(driver_str, self.adapter.driver.clone(), "Adapter");
2010 }
2011 self.adapter.buffer_multiplier_per_cpu = parse_env::<usize>(
2012 "ADAPTER_BUFFER_MULTIPLIER_PER_CPU",
2013 self.adapter.buffer_multiplier_per_cpu,
2014 );
2015 self.adapter.enable_socket_counting = parse_env::<bool>(
2016 "ADAPTER_ENABLE_SOCKET_COUNTING",
2017 self.adapter.enable_socket_counting,
2018 );
2019 if let Ok(driver_str) = std::env::var("CACHE_DRIVER") {
2020 self.cache.driver = parse_driver_enum(driver_str, self.cache.driver.clone(), "Cache");
2021 }
2022 if let Ok(driver_str) = std::env::var("QUEUE_DRIVER") {
2023 self.queue.driver = parse_driver_enum(driver_str, self.queue.driver.clone(), "Queue");
2024 }
2025 if let Ok(driver_str) = std::env::var("APP_MANAGER_DRIVER") {
2026 self.app_manager.driver =
2027 parse_driver_enum(driver_str, self.app_manager.driver.clone(), "AppManager");
2028 }
2029 if let Ok(driver_str) = std::env::var("RATE_LIMITER_DRIVER") {
2030 self.rate_limiter.driver = parse_driver_enum(
2031 driver_str,
2032 self.rate_limiter.driver.clone(),
2033 "RateLimiter Backend",
2034 );
2035 }
2036
2037 if let Ok(host) = std::env::var("DATABASE_REDIS_HOST") {
2039 self.database.redis.host = host;
2040 }
2041 self.database.redis.port =
2042 parse_env::<u16>("DATABASE_REDIS_PORT", self.database.redis.port);
2043 if let Ok(username) = std::env::var("DATABASE_REDIS_USERNAME") {
2044 self.database.redis.username = if username.is_empty() {
2045 None
2046 } else {
2047 Some(username)
2048 };
2049 }
2050 if let Ok(password) = std::env::var("DATABASE_REDIS_PASSWORD") {
2051 self.database.redis.password = Some(password);
2052 }
2053 self.database.redis.db = parse_env::<u32>("DATABASE_REDIS_DB", self.database.redis.db);
2054 if let Ok(prefix) = std::env::var("DATABASE_REDIS_KEY_PREFIX") {
2055 self.database.redis.key_prefix = prefix;
2056 }
2057 if let Ok(cluster_username) = std::env::var("DATABASE_REDIS_CLUSTER_USERNAME") {
2058 self.database.redis.cluster.username = if cluster_username.is_empty() {
2059 None
2060 } else {
2061 Some(cluster_username)
2062 };
2063 }
2064 if let Ok(cluster_password) = std::env::var("DATABASE_REDIS_CLUSTER_PASSWORD") {
2065 self.database.redis.cluster.password = Some(cluster_password);
2066 }
2067 self.database.redis.cluster.use_tls = parse_bool_env(
2068 "DATABASE_REDIS_CLUSTER_USE_TLS",
2069 self.database.redis.cluster.use_tls,
2070 );
2071
2072 if let Ok(host) = std::env::var("DATABASE_MYSQL_HOST") {
2074 self.database.mysql.host = host;
2075 }
2076 self.database.mysql.port =
2077 parse_env::<u16>("DATABASE_MYSQL_PORT", self.database.mysql.port);
2078 if let Ok(user) = std::env::var("DATABASE_MYSQL_USERNAME") {
2079 self.database.mysql.username = user;
2080 }
2081 if let Ok(pass) = std::env::var("DATABASE_MYSQL_PASSWORD") {
2082 self.database.mysql.password = pass;
2083 }
2084 if let Ok(db) = std::env::var("DATABASE_MYSQL_DATABASE") {
2085 self.database.mysql.database = db;
2086 }
2087 if let Ok(table) = std::env::var("DATABASE_MYSQL_TABLE_NAME") {
2088 self.database.mysql.table_name = table;
2089 }
2090 override_db_pool_settings(&mut self.database.mysql, "DATABASE_MYSQL");
2091
2092 if let Ok(host) = std::env::var("DATABASE_POSTGRES_HOST") {
2094 self.database.postgres.host = host;
2095 }
2096 self.database.postgres.port =
2097 parse_env::<u16>("DATABASE_POSTGRES_PORT", self.database.postgres.port);
2098 if let Ok(user) = std::env::var("DATABASE_POSTGRES_USERNAME") {
2099 self.database.postgres.username = user;
2100 }
2101 if let Ok(pass) = std::env::var("DATABASE_POSTGRES_PASSWORD") {
2102 self.database.postgres.password = pass;
2103 }
2104 if let Ok(db) = std::env::var("DATABASE_POSTGRES_DATABASE") {
2105 self.database.postgres.database = db;
2106 }
2107 override_db_pool_settings(&mut self.database.postgres, "DATABASE_POSTGRES");
2108
2109 if let Ok(region) = std::env::var("DATABASE_DYNAMODB_REGION") {
2111 self.database.dynamodb.region = region;
2112 }
2113 if let Ok(table) = std::env::var("DATABASE_DYNAMODB_TABLE_NAME") {
2114 self.database.dynamodb.table_name = table;
2115 }
2116 if let Ok(endpoint) = std::env::var("DATABASE_DYNAMODB_ENDPOINT_URL") {
2117 self.database.dynamodb.endpoint_url = Some(endpoint);
2118 }
2119 if let Ok(key_id) = std::env::var("AWS_ACCESS_KEY_ID") {
2120 self.database.dynamodb.aws_access_key_id = Some(key_id);
2121 }
2122 if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") {
2123 self.database.dynamodb.aws_secret_access_key = Some(secret);
2124 }
2125
2126 if let Ok(url) = std::env::var("DATABASE_SURREALDB_URL") {
2128 self.database.surrealdb.url = url;
2129 }
2130 if let Ok(namespace) = std::env::var("DATABASE_SURREALDB_NAMESPACE") {
2131 self.database.surrealdb.namespace = namespace;
2132 }
2133 if let Ok(database) = std::env::var("DATABASE_SURREALDB_DATABASE") {
2134 self.database.surrealdb.database = database;
2135 }
2136 if let Ok(username) = std::env::var("DATABASE_SURREALDB_USERNAME") {
2137 self.database.surrealdb.username = username;
2138 }
2139 if let Ok(password) = std::env::var("DATABASE_SURREALDB_PASSWORD") {
2140 self.database.surrealdb.password = password;
2141 }
2142 if let Ok(table) = std::env::var("DATABASE_SURREALDB_TABLE_NAME") {
2143 self.database.surrealdb.table_name = table;
2144 }
2145
2146 let apply_redis_cluster_nodes = |options: &mut Self, nodes: &str| {
2148 let node_list: Vec<String> = nodes
2149 .split(',')
2150 .map(|s| s.trim())
2151 .filter(|s| !s.is_empty())
2152 .map(ToString::to_string)
2153 .collect();
2154
2155 options.adapter.cluster.nodes = node_list.clone();
2156 options.queue.redis_cluster.nodes = node_list.clone();
2157
2158 let parsed_nodes: Vec<ClusterNode> = node_list
2159 .iter()
2160 .filter_map(|seed| ClusterNode::from_seed(seed))
2161 .collect();
2162 options.database.redis.cluster.nodes = parsed_nodes.clone();
2163 options.database.redis.cluster_nodes = parsed_nodes;
2164 };
2165
2166 if let Ok(nodes) = std::env::var("REDIS_CLUSTER_NODES") {
2167 apply_redis_cluster_nodes(self, &nodes);
2168 }
2169 if let Ok(nodes) = std::env::var("DATABASE_REDIS_CLUSTER_NODES") {
2170 apply_redis_cluster_nodes(self, &nodes);
2171 }
2172 self.queue.redis_cluster.concurrency = parse_env::<u32>(
2173 "REDIS_CLUSTER_QUEUE_CONCURRENCY",
2174 self.queue.redis_cluster.concurrency,
2175 );
2176 if let Ok(prefix) = std::env::var("REDIS_CLUSTER_QUEUE_PREFIX") {
2177 self.queue.redis_cluster.prefix = Some(prefix);
2178 }
2179
2180 self.ssl.enabled = parse_bool_env("SSL_ENABLED", self.ssl.enabled);
2182 if let Ok(val) = std::env::var("SSL_CERT_PATH") {
2183 self.ssl.cert_path = val;
2184 }
2185 if let Ok(val) = std::env::var("SSL_KEY_PATH") {
2186 self.ssl.key_path = val;
2187 }
2188 self.ssl.redirect_http = parse_bool_env("SSL_REDIRECT_HTTP", self.ssl.redirect_http);
2189 if let Some(port) = parse_env_optional::<u16>("SSL_HTTP_PORT") {
2190 self.ssl.http_port = Some(port);
2191 }
2192
2193 self.unix_socket.enabled = parse_bool_env("UNIX_SOCKET_ENABLED", self.unix_socket.enabled);
2195 if let Ok(path) = std::env::var("UNIX_SOCKET_PATH") {
2196 self.unix_socket.path = path;
2197 }
2198 if let Ok(mode_str) = std::env::var("UNIX_SOCKET_PERMISSION_MODE") {
2199 if mode_str.chars().all(|c| c.is_digit(8)) {
2200 if let Ok(mode) = u32::from_str_radix(&mode_str, 8) {
2201 if mode <= 0o777 {
2202 self.unix_socket.permission_mode = mode;
2203 } else {
2204 warn!(
2205 "UNIX_SOCKET_PERMISSION_MODE '{}' exceeds maximum value 777. Using default: {:o}",
2206 mode_str, self.unix_socket.permission_mode
2207 );
2208 }
2209 } else {
2210 warn!(
2211 "Failed to parse UNIX_SOCKET_PERMISSION_MODE '{}' as octal. Using default: {:o}",
2212 mode_str, self.unix_socket.permission_mode
2213 );
2214 }
2215 } else {
2216 warn!(
2217 "UNIX_SOCKET_PERMISSION_MODE '{}' must contain only octal digits (0-7). Using default: {:o}",
2218 mode_str, self.unix_socket.permission_mode
2219 );
2220 }
2221 }
2222
2223 if let Ok(driver_str) = std::env::var("METRICS_DRIVER") {
2225 self.metrics.driver =
2226 parse_driver_enum(driver_str, self.metrics.driver.clone(), "Metrics");
2227 }
2228 self.metrics.enabled = parse_bool_env("METRICS_ENABLED", self.metrics.enabled);
2229 if let Ok(val) = std::env::var("METRICS_HOST") {
2230 self.metrics.host = val;
2231 }
2232 self.metrics.port = parse_env::<u16>("METRICS_PORT", self.metrics.port);
2233 if let Ok(val) = std::env::var("METRICS_PROMETHEUS_PREFIX") {
2234 self.metrics.prometheus.prefix = val;
2235 }
2236
2237 self.http_api.usage_enabled =
2239 parse_bool_env("HTTP_API_USAGE_ENABLED", self.http_api.usage_enabled);
2240
2241 self.rate_limiter.enabled =
2243 parse_bool_env("RATE_LIMITER_ENABLED", self.rate_limiter.enabled);
2244 self.rate_limiter.api_rate_limit.max_requests = parse_env::<u32>(
2245 "RATE_LIMITER_API_MAX_REQUESTS",
2246 self.rate_limiter.api_rate_limit.max_requests,
2247 );
2248 self.rate_limiter.api_rate_limit.window_seconds = parse_env::<u64>(
2249 "RATE_LIMITER_API_WINDOW_SECONDS",
2250 self.rate_limiter.api_rate_limit.window_seconds,
2251 );
2252 if let Some(hops) = parse_env_optional::<u32>("RATE_LIMITER_API_TRUST_HOPS") {
2253 self.rate_limiter.api_rate_limit.trust_hops = Some(hops);
2254 }
2255 self.rate_limiter.websocket_rate_limit.max_requests = parse_env::<u32>(
2256 "RATE_LIMITER_WS_MAX_REQUESTS",
2257 self.rate_limiter.websocket_rate_limit.max_requests,
2258 );
2259 self.rate_limiter.websocket_rate_limit.window_seconds = parse_env::<u64>(
2260 "RATE_LIMITER_WS_WINDOW_SECONDS",
2261 self.rate_limiter.websocket_rate_limit.window_seconds,
2262 );
2263 if let Ok(prefix) = std::env::var("RATE_LIMITER_REDIS_PREFIX") {
2264 self.rate_limiter.redis.prefix = Some(prefix);
2265 }
2266
2267 self.queue.redis.concurrency =
2269 parse_env::<u32>("QUEUE_REDIS_CONCURRENCY", self.queue.redis.concurrency);
2270 if let Ok(prefix) = std::env::var("QUEUE_REDIS_PREFIX") {
2271 self.queue.redis.prefix = Some(prefix);
2272 }
2273
2274 if let Ok(region) = std::env::var("QUEUE_SQS_REGION") {
2276 self.queue.sqs.region = region;
2277 }
2278 self.queue.sqs.visibility_timeout = parse_env::<i32>(
2279 "QUEUE_SQS_VISIBILITY_TIMEOUT",
2280 self.queue.sqs.visibility_timeout,
2281 );
2282 self.queue.sqs.max_messages =
2283 parse_env::<i32>("QUEUE_SQS_MAX_MESSAGES", self.queue.sqs.max_messages);
2284 self.queue.sqs.wait_time_seconds = parse_env::<i32>(
2285 "QUEUE_SQS_WAIT_TIME_SECONDS",
2286 self.queue.sqs.wait_time_seconds,
2287 );
2288 self.queue.sqs.concurrency =
2289 parse_env::<u32>("QUEUE_SQS_CONCURRENCY", self.queue.sqs.concurrency);
2290 self.queue.sqs.fifo = parse_bool_env("QUEUE_SQS_FIFO", self.queue.sqs.fifo);
2291 if let Ok(endpoint) = std::env::var("QUEUE_SQS_ENDPOINT_URL") {
2292 self.queue.sqs.endpoint_url = Some(endpoint);
2293 }
2294
2295 if let Ok(region) = std::env::var("QUEUE_SNS_REGION") {
2297 self.queue.sns.region = region;
2298 }
2299 if let Ok(topic_arn) = std::env::var("QUEUE_SNS_TOPIC_ARN") {
2300 self.queue.sns.topic_arn = topic_arn;
2301 }
2302 if let Ok(endpoint) = std::env::var("QUEUE_SNS_ENDPOINT_URL") {
2303 self.queue.sns.endpoint_url = Some(endpoint);
2304 }
2305
2306 self.webhooks.batching.enabled =
2308 parse_bool_env("WEBHOOK_BATCHING_ENABLED", self.webhooks.batching.enabled);
2309 self.webhooks.batching.duration =
2310 parse_env::<u64>("WEBHOOK_BATCHING_DURATION", self.webhooks.batching.duration);
2311 self.webhooks.batching.size =
2312 parse_env::<usize>("WEBHOOK_BATCHING_SIZE", self.webhooks.batching.size);
2313
2314 if let Ok(servers) = std::env::var("NATS_SERVERS") {
2316 self.adapter.nats.servers = servers.split(',').map(|s| s.trim().to_string()).collect();
2317 }
2318 if let Ok(user) = std::env::var("NATS_USERNAME") {
2319 self.adapter.nats.username = Some(user);
2320 }
2321 if let Ok(pass) = std::env::var("NATS_PASSWORD") {
2322 self.adapter.nats.password = Some(pass);
2323 }
2324 if let Ok(token) = std::env::var("NATS_TOKEN") {
2325 self.adapter.nats.token = Some(token);
2326 }
2327 self.adapter.nats.connection_timeout_ms = parse_env::<u64>(
2328 "NATS_CONNECTION_TIMEOUT_MS",
2329 self.adapter.nats.connection_timeout_ms,
2330 );
2331 self.adapter.nats.request_timeout_ms = parse_env::<u64>(
2332 "NATS_REQUEST_TIMEOUT_MS",
2333 self.adapter.nats.request_timeout_ms,
2334 );
2335
2336 if let Ok(url) = std::env::var("RABBITMQ_URL") {
2338 self.adapter.rabbitmq.url = url;
2339 }
2340 if let Ok(prefix) = std::env::var("RABBITMQ_PREFIX") {
2341 self.adapter.rabbitmq.prefix = prefix;
2342 }
2343 self.adapter.rabbitmq.connection_timeout_ms = parse_env::<u64>(
2344 "RABBITMQ_CONNECTION_TIMEOUT_MS",
2345 self.adapter.rabbitmq.connection_timeout_ms,
2346 );
2347 self.adapter.rabbitmq.request_timeout_ms = parse_env::<u64>(
2348 "RABBITMQ_REQUEST_TIMEOUT_MS",
2349 self.adapter.rabbitmq.request_timeout_ms,
2350 );
2351 if let Some(nodes) = parse_env_optional::<u32>("RABBITMQ_NODES_NUMBER") {
2352 self.adapter.rabbitmq.nodes_number = Some(nodes);
2353 }
2354
2355 if let Ok(project_id) = std::env::var("GOOGLE_PUBSUB_PROJECT_ID") {
2357 self.adapter.google_pubsub.project_id = project_id;
2358 }
2359 if let Ok(prefix) = std::env::var("GOOGLE_PUBSUB_PREFIX") {
2360 self.adapter.google_pubsub.prefix = prefix;
2361 }
2362 if let Ok(emulator_host) = std::env::var("PUBSUB_EMULATOR_HOST") {
2363 self.adapter.google_pubsub.emulator_host = Some(emulator_host);
2364 }
2365 self.adapter.google_pubsub.request_timeout_ms = parse_env::<u64>(
2366 "GOOGLE_PUBSUB_REQUEST_TIMEOUT_MS",
2367 self.adapter.google_pubsub.request_timeout_ms,
2368 );
2369 if let Some(nodes) = parse_env_optional::<u32>("GOOGLE_PUBSUB_NODES_NUMBER") {
2370 self.adapter.google_pubsub.nodes_number = Some(nodes);
2371 }
2372
2373 if let Ok(brokers) = std::env::var("KAFKA_BROKERS") {
2375 self.adapter.kafka.brokers = brokers.split(',').map(|s| s.trim().to_string()).collect();
2376 }
2377 if let Ok(prefix) = std::env::var("KAFKA_PREFIX") {
2378 self.adapter.kafka.prefix = prefix;
2379 }
2380 if let Ok(protocol) = std::env::var("KAFKA_SECURITY_PROTOCOL") {
2381 self.adapter.kafka.security_protocol = Some(protocol);
2382 }
2383 if let Ok(mechanism) = std::env::var("KAFKA_SASL_MECHANISM") {
2384 self.adapter.kafka.sasl_mechanism = Some(mechanism);
2385 }
2386 if let Ok(username) = std::env::var("KAFKA_SASL_USERNAME") {
2387 self.adapter.kafka.sasl_username = Some(username);
2388 }
2389 if let Ok(password) = std::env::var("KAFKA_SASL_PASSWORD") {
2390 self.adapter.kafka.sasl_password = Some(password);
2391 }
2392 self.adapter.kafka.request_timeout_ms = parse_env::<u64>(
2393 "KAFKA_REQUEST_TIMEOUT_MS",
2394 self.adapter.kafka.request_timeout_ms,
2395 );
2396 if let Some(nodes) = parse_env_optional::<u32>("KAFKA_NODES_NUMBER") {
2397 self.adapter.kafka.nodes_number = Some(nodes);
2398 }
2399
2400 if let Ok(origins) = std::env::var("CORS_ORIGINS") {
2402 let parsed: Vec<String> = origins.split(',').map(|s| s.trim().to_string()).collect();
2403 if let Err(e) = crate::origin_validation::OriginValidator::validate_patterns(&parsed) {
2404 warn!(
2405 "CORS_ORIGINS contains invalid patterns: {}. Keeping previous CORS origins.",
2406 e
2407 );
2408 } else {
2409 self.cors.origin = parsed;
2410 }
2411 }
2412 if let Ok(methods) = std::env::var("CORS_METHODS") {
2413 self.cors.methods = methods.split(',').map(|s| s.trim().to_string()).collect();
2414 }
2415 if let Ok(headers) = std::env::var("CORS_HEADERS") {
2416 self.cors.allowed_headers = headers.split(',').map(|s| s.trim().to_string()).collect();
2417 }
2418 self.cors.credentials = parse_bool_env("CORS_CREDENTIALS", self.cors.credentials);
2419
2420 self.database_pooling.enabled =
2422 parse_bool_env("DATABASE_POOLING_ENABLED", self.database_pooling.enabled);
2423 if let Some(min) = parse_env_optional::<u32>("DATABASE_POOL_MIN") {
2424 self.database_pooling.min = min;
2425 }
2426 if let Some(max) = parse_env_optional::<u32>("DATABASE_POOL_MAX") {
2427 self.database_pooling.max = max;
2428 }
2429
2430 if let Some(pool_size) = parse_env_optional::<u32>("DATABASE_CONNECTION_POOL_SIZE") {
2431 self.database.mysql.connection_pool_size = pool_size;
2432 self.database.postgres.connection_pool_size = pool_size;
2433 }
2434 if let Some(cache_ttl) = parse_env_optional::<u64>("CACHE_TTL_SECONDS") {
2435 self.app_manager.cache.ttl = cache_ttl;
2436 self.channel_limits.cache_ttl = cache_ttl;
2437 self.database.mysql.cache_ttl = cache_ttl;
2438 self.database.postgres.cache_ttl = cache_ttl;
2439 self.database.surrealdb.cache_ttl = cache_ttl;
2440 self.cache.memory.ttl = cache_ttl;
2441 }
2442 if let Some(cleanup_interval) = parse_env_optional::<u64>("CACHE_CLEANUP_INTERVAL") {
2443 self.database.mysql.cache_cleanup_interval = cleanup_interval;
2444 self.database.postgres.cache_cleanup_interval = cleanup_interval;
2445 self.cache.memory.cleanup_interval = cleanup_interval;
2446 }
2447 if let Some(max_capacity) = parse_env_optional::<u64>("CACHE_MAX_CAPACITY") {
2448 self.database.mysql.cache_max_capacity = max_capacity;
2449 self.database.postgres.cache_max_capacity = max_capacity;
2450 self.database.surrealdb.cache_max_capacity = max_capacity;
2451 self.cache.memory.max_capacity = max_capacity;
2452 }
2453
2454 let default_app_id = std::env::var("SOCKUDO_DEFAULT_APP_ID");
2455 let default_app_key = std::env::var("SOCKUDO_DEFAULT_APP_KEY");
2456 let default_app_secret = std::env::var("SOCKUDO_DEFAULT_APP_SECRET");
2457 let default_app_enabled = parse_bool_env("SOCKUDO_DEFAULT_APP_ENABLED", true);
2458
2459 if let (Ok(app_id), Ok(app_key), Ok(app_secret)) =
2460 (default_app_id, default_app_key, default_app_secret)
2461 && default_app_enabled
2462 {
2463 let default_app = App::from_policy(
2464 app_id,
2465 app_key,
2466 app_secret,
2467 default_app_enabled,
2468 crate::app::AppPolicy {
2469 limits: crate::app::AppLimitsPolicy {
2470 max_connections: parse_env::<u32>(
2471 "SOCKUDO_DEFAULT_APP_MAX_CONNECTIONS",
2472 100,
2473 ),
2474 max_backend_events_per_second: Some(parse_env::<u32>(
2475 "SOCKUDO_DEFAULT_APP_MAX_BACKEND_EVENTS_PER_SECOND",
2476 100,
2477 )),
2478 max_client_events_per_second: parse_env::<u32>(
2479 "SOCKUDO_DEFAULT_APP_MAX_CLIENT_EVENTS_PER_SECOND",
2480 100,
2481 ),
2482 max_read_requests_per_second: Some(parse_env::<u32>(
2483 "SOCKUDO_DEFAULT_APP_MAX_READ_REQUESTS_PER_SECOND",
2484 100,
2485 )),
2486 max_presence_members_per_channel: Some(parse_env::<u32>(
2487 "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBERS_PER_CHANNEL",
2488 100,
2489 )),
2490 max_presence_member_size_in_kb: Some(parse_env::<u32>(
2491 "SOCKUDO_DEFAULT_APP_MAX_PRESENCE_MEMBER_SIZE_IN_KB",
2492 100,
2493 )),
2494 max_channel_name_length: Some(parse_env::<u32>(
2495 "SOCKUDO_DEFAULT_APP_MAX_CHANNEL_NAME_LENGTH",
2496 100,
2497 )),
2498 max_event_channels_at_once: Some(parse_env::<u32>(
2499 "SOCKUDO_DEFAULT_APP_MAX_EVENT_CHANNELS_AT_ONCE",
2500 100,
2501 )),
2502 max_event_name_length: Some(parse_env::<u32>(
2503 "SOCKUDO_DEFAULT_APP_MAX_EVENT_NAME_LENGTH",
2504 100,
2505 )),
2506 max_event_payload_in_kb: Some(parse_env::<u32>(
2507 "SOCKUDO_DEFAULT_APP_MAX_EVENT_PAYLOAD_IN_KB",
2508 100,
2509 )),
2510 max_event_batch_size: Some(parse_env::<u32>(
2511 "SOCKUDO_DEFAULT_APP_MAX_EVENT_BATCH_SIZE",
2512 100,
2513 )),
2514 },
2515 features: crate::app::AppFeaturesPolicy {
2516 enable_client_messages: std::env::var(
2517 "SOCKUDO_DEFAULT_APP_ENABLE_CLIENT_MESSAGES",
2518 )
2519 .ok()
2520 .map(|value| {
2521 matches!(
2522 value.trim().to_ascii_lowercase().as_str(),
2523 "true" | "1" | "yes" | "on"
2524 )
2525 })
2526 .unwrap_or_else(|| parse_bool_env("SOCKUDO_ENABLE_CLIENT_MESSAGES", false)),
2527 enable_user_authentication: Some(parse_bool_env(
2528 "SOCKUDO_DEFAULT_APP_ENABLE_USER_AUTHENTICATION",
2529 false,
2530 )),
2531 enable_watchlist_events: Some(parse_bool_env(
2532 "SOCKUDO_DEFAULT_APP_ENABLE_WATCHLIST_EVENTS",
2533 false,
2534 )),
2535 },
2536 channels: crate::app::AppChannelsPolicy {
2537 allowed_origins: {
2538 if let Ok(origins_str) =
2539 std::env::var("SOCKUDO_DEFAULT_APP_ALLOWED_ORIGINS")
2540 {
2541 if !origins_str.is_empty() {
2542 Some(
2543 origins_str
2544 .split(',')
2545 .map(|s| s.trim().to_string())
2546 .collect(),
2547 )
2548 } else {
2549 None
2550 }
2551 } else {
2552 None
2553 }
2554 },
2555 channel_delta_compression: None,
2556 channel_namespaces: None,
2557 },
2558 webhooks: None,
2559 idempotency: None,
2560 connection_recovery: None,
2561 },
2562 );
2563
2564 self.app_manager.array.apps.push(default_app);
2565 info!("Successfully registered default app from env");
2566 }
2567
2568 if let Ok(redis_url_env) = std::env::var("REDIS_URL") {
2570 info!("Applying REDIS_URL environment variable override");
2571
2572 let redis_url_json = sonic_rs::json!(redis_url_env);
2573
2574 self.adapter
2575 .redis
2576 .redis_pub_options
2577 .insert("url".to_string(), redis_url_json.clone());
2578 self.adapter
2579 .redis
2580 .redis_sub_options
2581 .insert("url".to_string(), redis_url_json);
2582
2583 self.cache.redis.url_override = Some(redis_url_env.clone());
2584 self.queue.redis.url_override = Some(redis_url_env.clone());
2585 self.rate_limiter.redis.url_override = Some(redis_url_env);
2586 }
2587
2588 let has_colors_env = std::env::var("LOG_COLORS_ENABLED").is_ok();
2590 let has_target_env = std::env::var("LOG_INCLUDE_TARGET").is_ok();
2591 if has_colors_env || has_target_env {
2592 let logging_config = self.logging.get_or_insert_with(Default::default);
2593 if has_colors_env {
2594 logging_config.colors_enabled =
2595 parse_bool_env("LOG_COLORS_ENABLED", logging_config.colors_enabled);
2596 }
2597 if has_target_env {
2598 logging_config.include_target =
2599 parse_bool_env("LOG_INCLUDE_TARGET", logging_config.include_target);
2600 }
2601 }
2602
2603 self.cleanup.async_enabled =
2605 parse_bool_env("CLEANUP_ASYNC_ENABLED", self.cleanup.async_enabled);
2606 self.cleanup.fallback_to_sync =
2607 parse_bool_env("CLEANUP_FALLBACK_TO_SYNC", self.cleanup.fallback_to_sync);
2608 self.cleanup.queue_buffer_size =
2609 parse_env::<usize>("CLEANUP_QUEUE_BUFFER_SIZE", self.cleanup.queue_buffer_size);
2610 self.cleanup.batch_size = parse_env::<usize>("CLEANUP_BATCH_SIZE", self.cleanup.batch_size);
2611 self.cleanup.batch_timeout_ms =
2612 parse_env::<u64>("CLEANUP_BATCH_TIMEOUT_MS", self.cleanup.batch_timeout_ms);
2613 if let Ok(worker_threads_str) = std::env::var("CLEANUP_WORKER_THREADS") {
2614 self.cleanup.worker_threads = if worker_threads_str.to_lowercase() == "auto" {
2615 WorkerThreadsConfig::Auto
2616 } else if let Ok(n) = worker_threads_str.parse::<usize>() {
2617 WorkerThreadsConfig::Fixed(n)
2618 } else {
2619 warn!(
2620 "Invalid CLEANUP_WORKER_THREADS value '{}', keeping current setting",
2621 worker_threads_str
2622 );
2623 self.cleanup.worker_threads.clone()
2624 };
2625 }
2626 self.cleanup.max_retry_attempts = parse_env::<u32>(
2627 "CLEANUP_MAX_RETRY_ATTEMPTS",
2628 self.cleanup.max_retry_attempts,
2629 );
2630
2631 self.cluster_health.enabled =
2633 parse_bool_env("CLUSTER_HEALTH_ENABLED", self.cluster_health.enabled);
2634 self.cluster_health.heartbeat_interval_ms = parse_env::<u64>(
2635 "CLUSTER_HEALTH_HEARTBEAT_INTERVAL",
2636 self.cluster_health.heartbeat_interval_ms,
2637 );
2638 self.cluster_health.node_timeout_ms = parse_env::<u64>(
2639 "CLUSTER_HEALTH_NODE_TIMEOUT",
2640 self.cluster_health.node_timeout_ms,
2641 );
2642 self.cluster_health.cleanup_interval_ms = parse_env::<u64>(
2643 "CLUSTER_HEALTH_CLEANUP_INTERVAL",
2644 self.cluster_health.cleanup_interval_ms,
2645 );
2646
2647 self.tag_filtering.enabled =
2649 parse_bool_env("TAG_FILTERING_ENABLED", self.tag_filtering.enabled);
2650
2651 if let Ok(val) = std::env::var("WEBSOCKET_MAX_MESSAGES") {
2653 if val.to_lowercase() == "none" || val == "0" {
2654 self.websocket.max_messages = None;
2655 } else if let Ok(n) = val.parse::<usize>() {
2656 self.websocket.max_messages = Some(n);
2657 }
2658 }
2659 if let Ok(val) = std::env::var("WEBSOCKET_MAX_BYTES") {
2660 if val.to_lowercase() == "none" || val == "0" {
2661 self.websocket.max_bytes = None;
2662 } else if let Ok(n) = val.parse::<usize>() {
2663 self.websocket.max_bytes = Some(n);
2664 }
2665 }
2666 self.websocket.disconnect_on_buffer_full = parse_bool_env(
2667 "WEBSOCKET_DISCONNECT_ON_BUFFER_FULL",
2668 self.websocket.disconnect_on_buffer_full,
2669 );
2670 self.websocket.max_message_size = parse_env::<usize>(
2671 "WEBSOCKET_MAX_MESSAGE_SIZE",
2672 self.websocket.max_message_size,
2673 );
2674 self.websocket.max_frame_size =
2675 parse_env::<usize>("WEBSOCKET_MAX_FRAME_SIZE", self.websocket.max_frame_size);
2676 self.websocket.write_buffer_size = parse_env::<usize>(
2677 "WEBSOCKET_WRITE_BUFFER_SIZE",
2678 self.websocket.write_buffer_size,
2679 );
2680 self.websocket.max_backpressure = parse_env::<usize>(
2681 "WEBSOCKET_MAX_BACKPRESSURE",
2682 self.websocket.max_backpressure,
2683 );
2684 self.websocket.auto_ping = parse_bool_env("WEBSOCKET_AUTO_PING", self.websocket.auto_ping);
2685 self.websocket.ping_interval =
2686 parse_env::<u32>("WEBSOCKET_PING_INTERVAL", self.websocket.ping_interval);
2687 self.websocket.idle_timeout =
2688 parse_env::<u32>("WEBSOCKET_IDLE_TIMEOUT", self.websocket.idle_timeout);
2689 if let Ok(mode) = std::env::var("WEBSOCKET_COMPRESSION") {
2690 self.websocket.compression = mode;
2691 }
2692
2693 self.connection_recovery.enabled = parse_bool_env(
2695 "CONNECTION_RECOVERY_ENABLED",
2696 self.connection_recovery.enabled,
2697 );
2698 self.connection_recovery.buffer_ttl_seconds = parse_env::<u64>(
2699 "CONNECTION_RECOVERY_BUFFER_TTL",
2700 self.connection_recovery.buffer_ttl_seconds,
2701 );
2702 self.connection_recovery.max_buffer_size = parse_env::<usize>(
2703 "CONNECTION_RECOVERY_MAX_BUFFER_SIZE",
2704 self.connection_recovery.max_buffer_size,
2705 );
2706
2707 self.idempotency.enabled = parse_bool_env("IDEMPOTENCY_ENABLED", self.idempotency.enabled);
2708 self.idempotency.ttl_seconds =
2709 parse_env::<u64>("IDEMPOTENCY_TTL_SECONDS", self.idempotency.ttl_seconds);
2710 self.idempotency.max_key_length = parse_env::<usize>(
2711 "IDEMPOTENCY_MAX_KEY_LENGTH",
2712 self.idempotency.max_key_length,
2713 );
2714
2715 self.ephemeral.enabled = parse_bool_env("EPHEMERAL_ENABLED", self.ephemeral.enabled);
2716
2717 self.echo_control.enabled =
2718 parse_bool_env("ECHO_CONTROL_ENABLED", self.echo_control.enabled);
2719 self.echo_control.default_echo_messages = parse_bool_env(
2720 "ECHO_CONTROL_DEFAULT_ECHO_MESSAGES",
2721 self.echo_control.default_echo_messages,
2722 );
2723
2724 self.event_name_filtering.enabled = parse_bool_env(
2725 "EVENT_NAME_FILTERING_ENABLED",
2726 self.event_name_filtering.enabled,
2727 );
2728 self.event_name_filtering.max_events_per_filter = parse_env::<usize>(
2729 "EVENT_NAME_FILTERING_MAX_EVENTS_PER_FILTER",
2730 self.event_name_filtering.max_events_per_filter,
2731 );
2732 self.event_name_filtering.max_event_name_length = parse_env::<usize>(
2733 "EVENT_NAME_FILTERING_MAX_EVENT_NAME_LENGTH",
2734 self.event_name_filtering.max_event_name_length,
2735 );
2736
2737 Ok(())
2738 }
2739
2740 pub fn validate(&self) -> Result<(), String> {
2741 if self.unix_socket.enabled {
2742 if self.unix_socket.path.is_empty() {
2743 return Err(
2744 "Unix socket path cannot be empty when Unix socket is enabled".to_string(),
2745 );
2746 }
2747
2748 self.validate_unix_socket_security()?;
2749
2750 if self.ssl.enabled {
2751 tracing::warn!(
2752 "Both Unix socket and SSL are enabled. This is unusual as Unix sockets are typically used behind reverse proxies that handle SSL termination."
2753 );
2754 }
2755
2756 if self.unix_socket.permission_mode > 0o777 {
2757 return Err(format!(
2758 "Unix socket permission_mode ({:o}) is invalid. Must be a valid octal mode (0o000 to 0o777)",
2759 self.unix_socket.permission_mode
2760 ));
2761 }
2762 }
2763
2764 if let Err(e) = self.cleanup.validate() {
2765 return Err(format!("Invalid cleanup configuration: {}", e));
2766 }
2767
2768 Ok(())
2769 }
2770
2771 fn validate_unix_socket_security(&self) -> Result<(), String> {
2772 let path = &self.unix_socket.path;
2773
2774 if path.contains("../") || path.contains("..\\") {
2775 return Err(
2776 "Unix socket path contains directory traversal sequences (../). This is not allowed for security reasons.".to_string()
2777 );
2778 }
2779
2780 if self.unix_socket.permission_mode & 0o002 != 0 {
2781 warn!(
2782 "Unix socket permission mode ({:o}) allows world write access. This may be a security risk. Consider using more restrictive permissions like 0o660 or 0o750.",
2783 self.unix_socket.permission_mode
2784 );
2785 }
2786
2787 if self.unix_socket.permission_mode & 0o007 > 0o005 {
2788 warn!(
2789 "Unix socket permission mode ({:o}) grants write permissions to others. Consider using more restrictive permissions.",
2790 self.unix_socket.permission_mode
2791 );
2792 }
2793
2794 if self.mode == "production" && path.starts_with("/tmp/") {
2795 warn!(
2796 "Unix socket path '{}' is in /tmp directory. In production, consider using a more permanent location like /var/run/sockudo/ for better security and persistence.",
2797 path
2798 );
2799 }
2800
2801 if !path.starts_with('/') {
2802 return Err(
2803 "Unix socket path must be absolute (start with /) for security and reliability."
2804 .to_string(),
2805 );
2806 }
2807
2808 Ok(())
2809 }
2810}
2811
2812#[cfg(test)]
2813mod redis_connection_tests {
2814 use super::{ClusterNode, RedisClusterConnection, RedisConnection, RedisSentinel};
2815
2816 #[test]
2817 fn test_standard_url_basic() {
2818 let conn = RedisConnection {
2819 host: "127.0.0.1".to_string(),
2820 port: 6379,
2821 db: 0,
2822 username: None,
2823 password: None,
2824 key_prefix: "sockudo:".to_string(),
2825 sentinels: Vec::new(),
2826 sentinel_password: None,
2827 name: "mymaster".to_string(),
2828 cluster: RedisClusterConnection::default(),
2829 cluster_nodes: Vec::new(),
2830 };
2831 assert_eq!(conn.to_url(), "redis://127.0.0.1:6379/0");
2832 }
2833
2834 #[test]
2835 fn test_standard_url_with_password() {
2836 let conn = RedisConnection {
2837 host: "127.0.0.1".to_string(),
2838 port: 6379,
2839 db: 2,
2840 username: None,
2841 password: Some("secret".to_string()),
2842 key_prefix: "sockudo:".to_string(),
2843 sentinels: Vec::new(),
2844 sentinel_password: None,
2845 name: "mymaster".to_string(),
2846 cluster: RedisClusterConnection::default(),
2847 cluster_nodes: Vec::new(),
2848 };
2849 assert_eq!(conn.to_url(), "redis://:secret@127.0.0.1:6379/2");
2850 }
2851
2852 #[test]
2853 fn test_standard_url_with_username_and_password() {
2854 let conn = RedisConnection {
2855 host: "redis.example.com".to_string(),
2856 port: 6380,
2857 db: 1,
2858 username: Some("admin".to_string()),
2859 password: Some("pass123".to_string()),
2860 key_prefix: "sockudo:".to_string(),
2861 sentinels: Vec::new(),
2862 sentinel_password: None,
2863 name: "mymaster".to_string(),
2864 cluster: RedisClusterConnection::default(),
2865 cluster_nodes: Vec::new(),
2866 };
2867 assert_eq!(
2868 conn.to_url(),
2869 "redis://admin:pass123@redis.example.com:6380/1"
2870 );
2871 }
2872
2873 #[test]
2874 fn test_standard_url_with_special_chars_in_password() {
2875 let conn = RedisConnection {
2876 host: "127.0.0.1".to_string(),
2877 port: 6379,
2878 db: 0,
2879 username: None,
2880 password: Some("pass@word#123".to_string()),
2881 key_prefix: "sockudo:".to_string(),
2882 sentinels: Vec::new(),
2883 sentinel_password: None,
2884 name: "mymaster".to_string(),
2885 cluster: RedisClusterConnection::default(),
2886 cluster_nodes: Vec::new(),
2887 };
2888 assert_eq!(conn.to_url(), "redis://:pass%40word%23123@127.0.0.1:6379/0");
2889 }
2890
2891 #[test]
2892 fn test_is_sentinel_configured_false() {
2893 let conn = RedisConnection::default();
2894 assert!(!conn.is_sentinel_configured());
2895 }
2896
2897 #[test]
2898 fn test_is_sentinel_configured_true() {
2899 let conn = RedisConnection {
2900 sentinels: vec![RedisSentinel {
2901 host: "sentinel1".to_string(),
2902 port: 26379,
2903 }],
2904 ..Default::default()
2905 };
2906 assert!(conn.is_sentinel_configured());
2907 }
2908
2909 #[test]
2910 fn test_sentinel_url_basic() {
2911 let conn = RedisConnection {
2912 host: "127.0.0.1".to_string(),
2913 port: 6379,
2914 db: 0,
2915 username: None,
2916 password: None,
2917 key_prefix: "sockudo:".to_string(),
2918 sentinels: vec![
2919 RedisSentinel {
2920 host: "sentinel1".to_string(),
2921 port: 26379,
2922 },
2923 RedisSentinel {
2924 host: "sentinel2".to_string(),
2925 port: 26379,
2926 },
2927 ],
2928 sentinel_password: None,
2929 name: "mymaster".to_string(),
2930 cluster: RedisClusterConnection::default(),
2931 cluster_nodes: Vec::new(),
2932 };
2933 assert_eq!(
2934 conn.to_url(),
2935 "redis+sentinel://sentinel1:26379,sentinel2:26379/mymaster/0"
2936 );
2937 }
2938
2939 #[test]
2940 fn test_sentinel_url_with_sentinel_password() {
2941 let conn = RedisConnection {
2942 host: "127.0.0.1".to_string(),
2943 port: 6379,
2944 db: 0,
2945 username: None,
2946 password: None,
2947 key_prefix: "sockudo:".to_string(),
2948 sentinels: vec![RedisSentinel {
2949 host: "sentinel1".to_string(),
2950 port: 26379,
2951 }],
2952 sentinel_password: Some("sentinelpass".to_string()),
2953 name: "mymaster".to_string(),
2954 cluster: RedisClusterConnection::default(),
2955 cluster_nodes: Vec::new(),
2956 };
2957 assert_eq!(
2958 conn.to_url(),
2959 "redis+sentinel://:sentinelpass@sentinel1:26379/mymaster/0"
2960 );
2961 }
2962
2963 #[test]
2964 fn test_sentinel_url_with_master_password() {
2965 let conn = RedisConnection {
2966 host: "127.0.0.1".to_string(),
2967 port: 6379,
2968 db: 1,
2969 username: None,
2970 password: Some("masterpass".to_string()),
2971 key_prefix: "sockudo:".to_string(),
2972 sentinels: vec![RedisSentinel {
2973 host: "sentinel1".to_string(),
2974 port: 26379,
2975 }],
2976 sentinel_password: None,
2977 name: "mymaster".to_string(),
2978 cluster: RedisClusterConnection::default(),
2979 cluster_nodes: Vec::new(),
2980 };
2981 assert_eq!(
2982 conn.to_url(),
2983 "redis+sentinel://sentinel1:26379/mymaster/1?password=masterpass"
2984 );
2985 }
2986
2987 #[test]
2988 fn test_sentinel_url_with_all_auth() {
2989 let conn = RedisConnection {
2990 host: "127.0.0.1".to_string(),
2991 port: 6379,
2992 db: 2,
2993 username: Some("redisuser".to_string()),
2994 password: Some("redispass".to_string()),
2995 key_prefix: "sockudo:".to_string(),
2996 sentinels: vec![
2997 RedisSentinel {
2998 host: "sentinel1".to_string(),
2999 port: 26379,
3000 },
3001 RedisSentinel {
3002 host: "sentinel2".to_string(),
3003 port: 26380,
3004 },
3005 ],
3006 sentinel_password: Some("sentinelauth".to_string()),
3007 name: "production-master".to_string(),
3008 cluster: RedisClusterConnection::default(),
3009 cluster_nodes: Vec::new(),
3010 };
3011 assert_eq!(
3012 conn.to_url(),
3013 "redis+sentinel://:sentinelauth@sentinel1:26379,sentinel2:26380/production-master/2?password=redispass&username=redisuser"
3014 );
3015 }
3016
3017 #[test]
3018 fn test_sentinel_to_host_port() {
3019 let sentinel = RedisSentinel {
3020 host: "sentinel.example.com".to_string(),
3021 port: 26379,
3022 };
3023 assert_eq!(sentinel.to_host_port(), "sentinel.example.com:26379");
3024 }
3025
3026 #[test]
3027 fn test_cluster_node_urls_with_shared_cluster_auth_and_tls() {
3028 let conn = RedisConnection {
3029 cluster: RedisClusterConnection {
3030 nodes: vec![
3031 ClusterNode {
3032 host: "node1.secure-cluster.com".to_string(),
3033 port: 7000,
3034 },
3035 ClusterNode {
3036 host: "redis://node2.secure-cluster.com:7001".to_string(),
3037 port: 7001,
3038 },
3039 ClusterNode {
3040 host: "rediss://node3.secure-cluster.com".to_string(),
3041 port: 7002,
3042 },
3043 ],
3044 username: None,
3045 password: Some("cluster-secret".to_string()),
3046 use_tls: true,
3047 },
3048 ..Default::default()
3049 };
3050
3051 assert_eq!(
3052 conn.cluster_node_urls(),
3053 vec![
3054 "rediss://:cluster-secret@node1.secure-cluster.com:7000",
3055 "redis://:cluster-secret@node2.secure-cluster.com:7001",
3056 "rediss://:cluster-secret@node3.secure-cluster.com:7002",
3057 ]
3058 );
3059 }
3060
3061 #[test]
3062 fn test_cluster_node_urls_fallback_to_legacy_nodes() {
3063 let conn = RedisConnection {
3064 password: Some("fallback-secret".to_string()),
3065 cluster_nodes: vec![ClusterNode {
3066 host: "legacy-node.example.com".to_string(),
3067 port: 7000,
3068 }],
3069 ..Default::default()
3070 };
3071
3072 assert_eq!(
3073 conn.cluster_node_urls(),
3074 vec!["redis://:fallback-secret@legacy-node.example.com:7000"]
3075 );
3076 }
3077
3078 #[test]
3079 fn test_normalize_cluster_seed_urls() {
3080 let conn = RedisConnection {
3081 cluster: RedisClusterConnection {
3082 nodes: Vec::new(),
3083 username: Some("svc-user".to_string()),
3084 password: Some("svc-pass".to_string()),
3085 use_tls: true,
3086 },
3087 ..Default::default()
3088 };
3089
3090 let seeds = vec![
3091 "node1.example.com:7000".to_string(),
3092 "redis://node2.example.com:7001".to_string(),
3093 "rediss://node3.example.com".to_string(),
3094 ];
3095
3096 assert_eq!(
3097 conn.normalize_cluster_seed_urls(&seeds),
3098 vec![
3099 "rediss://svc-user:svc-pass@node1.example.com:7000",
3100 "redis://svc-user:svc-pass@node2.example.com:7001",
3101 "rediss://svc-user:svc-pass@node3.example.com:6379",
3102 ]
3103 );
3104 }
3105}
3106
3107#[cfg(test)]
3108mod cluster_node_tests {
3109 use super::ClusterNode;
3110
3111 #[test]
3112 fn test_to_url_basic_host() {
3113 let node = ClusterNode {
3114 host: "localhost".to_string(),
3115 port: 6379,
3116 };
3117 assert_eq!(node.to_url(), "redis://localhost:6379");
3118 }
3119
3120 #[test]
3121 fn test_to_url_ip_address() {
3122 let node = ClusterNode {
3123 host: "127.0.0.1".to_string(),
3124 port: 6379,
3125 };
3126 assert_eq!(node.to_url(), "redis://127.0.0.1:6379");
3127 }
3128
3129 #[test]
3130 fn test_to_url_with_redis_protocol() {
3131 let node = ClusterNode {
3132 host: "redis://example.com".to_string(),
3133 port: 6379,
3134 };
3135 assert_eq!(node.to_url(), "redis://example.com:6379");
3136 }
3137
3138 #[test]
3139 fn test_to_url_with_rediss_protocol() {
3140 let node = ClusterNode {
3141 host: "rediss://secure.example.com".to_string(),
3142 port: 6379,
3143 };
3144 assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
3145 }
3146
3147 #[test]
3148 fn test_to_url_with_rediss_protocol_and_port_in_url() {
3149 let node = ClusterNode {
3150 host: "rediss://secure.example.com:7000".to_string(),
3151 port: 6379,
3152 };
3153 assert_eq!(node.to_url(), "rediss://secure.example.com:7000");
3154 }
3155
3156 #[test]
3157 fn test_to_url_with_redis_protocol_and_port_in_url() {
3158 let node = ClusterNode {
3159 host: "redis://example.com:7001".to_string(),
3160 port: 6379,
3161 };
3162 assert_eq!(node.to_url(), "redis://example.com:7001");
3163 }
3164
3165 #[test]
3166 fn test_to_url_with_trailing_whitespace() {
3167 let node = ClusterNode {
3168 host: " rediss://secure.example.com ".to_string(),
3169 port: 6379,
3170 };
3171 assert_eq!(node.to_url(), "rediss://secure.example.com:6379");
3172 }
3173
3174 #[test]
3175 fn test_to_url_custom_port() {
3176 let node = ClusterNode {
3177 host: "redis-cluster.example.com".to_string(),
3178 port: 7000,
3179 };
3180 assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7000");
3181 }
3182
3183 #[test]
3184 fn test_to_url_plain_host_with_port_in_host_field() {
3185 let node = ClusterNode {
3186 host: "redis-cluster.example.com:7010".to_string(),
3187 port: 7000,
3188 };
3189 assert_eq!(node.to_url(), "redis://redis-cluster.example.com:7010");
3190 }
3191
3192 #[test]
3193 fn test_to_url_with_options_adds_auth_and_tls() {
3194 let node = ClusterNode {
3195 host: "node.example.com".to_string(),
3196 port: 7000,
3197 };
3198 assert_eq!(
3199 node.to_url_with_options(true, Some("svc-user"), Some("secret")),
3200 "rediss://svc-user:secret@node.example.com:7000"
3201 );
3202 }
3203
3204 #[test]
3205 fn test_to_url_with_options_keeps_embedded_auth() {
3206 let node = ClusterNode {
3207 host: "rediss://:node-secret@node.example.com:7000".to_string(),
3208 port: 7000,
3209 };
3210 assert_eq!(
3211 node.to_url_with_options(true, Some("global-user"), Some("global-secret")),
3212 "rediss://:node-secret@node.example.com:7000"
3213 );
3214 }
3215
3216 #[test]
3217 fn test_from_seed_parses_plain_host_port() {
3218 let node = ClusterNode::from_seed("cluster-node-1:7005").expect("node should parse");
3219 assert_eq!(node.host, "cluster-node-1");
3220 assert_eq!(node.port, 7005);
3221 }
3222
3223 #[test]
3224 fn test_from_seed_keeps_scheme_urls() {
3225 let node =
3226 ClusterNode::from_seed("rediss://secure.example.com:7005").expect("node should parse");
3227 assert_eq!(node.host, "rediss://secure.example.com:7005");
3228 assert_eq!(node.port, 7005);
3229 }
3230
3231 #[test]
3232 fn test_to_url_aws_elasticache_hostname() {
3233 let node = ClusterNode {
3234 host: "rediss://my-cluster.use1.cache.amazonaws.com".to_string(),
3235 port: 6379,
3236 };
3237 assert_eq!(
3238 node.to_url(),
3239 "rediss://my-cluster.use1.cache.amazonaws.com:6379"
3240 );
3241 }
3242
3243 #[test]
3244 fn test_to_url_with_ipv6_no_port() {
3245 let node = ClusterNode {
3246 host: "rediss://[::1]".to_string(),
3247 port: 6379,
3248 };
3249 assert_eq!(node.to_url(), "rediss://[::1]:6379");
3250 }
3251
3252 #[test]
3253 fn test_to_url_with_ipv6_and_port_in_url() {
3254 let node = ClusterNode {
3255 host: "rediss://[::1]:7000".to_string(),
3256 port: 6379,
3257 };
3258 assert_eq!(node.to_url(), "rediss://[::1]:7000");
3259 }
3260
3261 #[test]
3262 fn test_to_url_with_ipv6_full_address_no_port() {
3263 let node = ClusterNode {
3264 host: "rediss://[2001:db8::1]".to_string(),
3265 port: 6379,
3266 };
3267 assert_eq!(node.to_url(), "rediss://[2001:db8::1]:6379");
3268 }
3269
3270 #[test]
3271 fn test_to_url_with_ipv6_full_address_with_port() {
3272 let node = ClusterNode {
3273 host: "rediss://[2001:db8::1]:7000".to_string(),
3274 port: 6379,
3275 };
3276 assert_eq!(node.to_url(), "rediss://[2001:db8::1]:7000");
3277 }
3278
3279 #[test]
3280 fn test_to_url_with_redis_protocol_ipv6() {
3281 let node = ClusterNode {
3282 host: "redis://[::1]".to_string(),
3283 port: 6379,
3284 };
3285 assert_eq!(node.to_url(), "redis://[::1]:6379");
3286 }
3287}
3288
3289#[cfg(test)]
3290mod cors_config_tests {
3291 use super::CorsConfig;
3292
3293 fn cors_from_json(json: &str) -> sonic_rs::Result<CorsConfig> {
3294 sonic_rs::from_str(json)
3295 }
3296
3297 #[test]
3298 fn test_deserialize_valid_exact_origins() {
3299 let config =
3300 cors_from_json(r#"{"origin": ["https://example.com", "https://other.com"]}"#).unwrap();
3301 assert_eq!(config.origin.len(), 2);
3302 }
3303
3304 #[test]
3305 fn test_deserialize_valid_wildcard_patterns() {
3306 let config =
3307 cors_from_json(r#"{"origin": ["*.example.com", "https://*.staging.example.com"]}"#)
3308 .unwrap();
3309 assert_eq!(config.origin.len(), 2);
3310 }
3311
3312 #[test]
3313 fn test_deserialize_allows_special_markers() {
3314 assert!(cors_from_json(r#"{"origin": ["*"]}"#).is_ok());
3315 assert!(cors_from_json(r#"{"origin": ["Any"]}"#).is_ok());
3316 assert!(cors_from_json(r#"{"origin": ["any"]}"#).is_ok());
3317 assert!(cors_from_json(r#"{"origin": ["*", "https://example.com"]}"#).is_ok());
3318 }
3319
3320 #[test]
3321 fn test_deserialize_rejects_invalid_patterns() {
3322 assert!(cors_from_json(r#"{"origin": ["*.*bad"]}"#).is_err());
3323 assert!(cors_from_json(r#"{"origin": ["*."]}"#).is_err());
3324 assert!(cors_from_json(r#"{"origin": [""]}"#).is_err());
3325 assert!(cors_from_json(r#"{"origin": ["https://"]}"#).is_err());
3326 }
3327
3328 #[test]
3329 fn test_deserialize_rejects_mixed_valid_and_invalid() {
3330 assert!(cors_from_json(r#"{"origin": ["https://good.com", "*.*bad"]}"#).is_err());
3331 }
3332}